// lib/realtime/RealtimeNotificationService.ts import notificationManager from './NotificationManager'; interface ConnectedClient { userId: string; controller: ReadableStreamDefaultController; lastHeartbeat: number; connectionId: string; userAgent?: string; ipAddress?: string; } class RealtimeNotificationService { private connectedClients = new Map(); private heartbeatInterval: NodeJS.Timeout | null = null; private clientTimeout = 5 * 60 * 1000; // 5분 private heartbeatFrequency = 30 * 1000; // 30초 constructor() { this.setupEventListeners(); this.startHeartbeat(); this.startConnectionCleanup(); } private setupEventListeners() { // PostgreSQL NOTIFY 이벤트 수신 시 클라이언트들에게 브로드캐스트 notificationManager.on('newNotification', (payload: any) => { console.log('Broadcasting new notification to user:', payload.user_id); this.broadcastToUser(payload.user_id, { type: 'new_notification', data: payload }); }); notificationManager.on('notificationRead', (payload: any) => { console.log('Broadcasting notification read to user:', payload.user_id); this.broadcastToUser(payload.user_id, { type: 'notification_read', data: payload }); }); // NotificationManager 연결 상태 변화 시 클라이언트들에게 알림 notificationManager.on('connected', () => { this.broadcastToAllUsers({ type: 'system_status', data: { status: 'connected', message: 'Real-time notifications are now active' } }); }); notificationManager.on('disconnected', () => { this.broadcastToAllUsers({ type: 'system_status', data: { status: 'disconnected', message: 'Real-time notifications temporarily unavailable' } }); }); } // 클라이언트 연결 등록 public addClient( userId: string, controller: ReadableStreamDefaultController, metadata?: { userAgent?: string; ipAddress?: string } ): string { const connectionId = `${userId}_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; if (!this.connectedClients.has(userId)) { this.connectedClients.set(userId, []); } const clients = this.connectedClients.get(userId)!; // 동일 사용자의 최대 연결 수 제한 const maxConnectionsPerUser = parseInt(process.env.MAX_CONNECTIONS_PER_USER || '5'); if (clients.length >= maxConnectionsPerUser) { console.warn(`Max connections (${maxConnectionsPerUser}) reached for user ${userId}`); // 가장 오래된 연결 제거 const oldestClient = clients.shift(); if (oldestClient) { try { oldestClient.controller.close(); } catch (error) { console.warn('Error closing oldest connection:', error); } } } clients.push({ userId, controller, lastHeartbeat: Date.now(), connectionId, userAgent: metadata?.userAgent, ipAddress: metadata?.ipAddress }); console.log(`Client ${connectionId} connected for user ${userId} (total: ${clients.length})`); // 연결 확인 메시지 전송 this.sendToClient(controller, { type: 'connected', data: { connectionId, serverTime: new Date().toISOString(), dbStatus: notificationManager.getConnectionStatus() } }); return connectionId; } // 클라이언트 연결 해제 public removeClient(userId: string, controller: ReadableStreamDefaultController): void { const clients = this.connectedClients.get(userId); if (!clients) return; const filteredClients = clients.filter(client => { if (client.controller === controller) { console.log(`Client ${client.connectionId} disconnected for user ${userId}`); return false; } return true; }); if (filteredClients.length === 0) { this.connectedClients.delete(userId); console.log(`All clients disconnected for user ${userId}`); } else { this.connectedClients.set(userId, filteredClients); } } // 특정 사용자에게 메시지 브로드캐스트 private broadcastToUser(userId: string, message: any): void { const clients = this.connectedClients.get(userId); if (!clients || clients.length === 0) { console.log(`No connected clients for user ${userId}`); return; } const activeClients: ConnectedClient[] = []; let sentCount = 0; for (const client of clients) { if (this.sendToClient(client.controller, message)) { client.lastHeartbeat = Date.now(); activeClients.push(client); sentCount++; } } // 활성 클라이언트만 유지 if (activeClients.length === 0) { this.connectedClients.delete(userId); } else { this.connectedClients.set(userId, activeClients); } console.log(`Message sent to ${sentCount}/${clients.length} clients for user ${userId}`); } // 모든 사용자에게 브로드캐스트 (시스템 메시지용) private broadcastToAllUsers(message: any): void { let totalSent = 0; let totalClients = 0; for (const [userId, clients] of this.connectedClients.entries()) { totalClients += clients.length; const activeClients: ConnectedClient[] = []; for (const client of clients) { if (this.sendToClient(client.controller, message)) { client.lastHeartbeat = Date.now(); activeClients.push(client); totalSent++; } } if (activeClients.length === 0) { this.connectedClients.delete(userId); } else { this.connectedClients.set(userId, activeClients); } } console.log(`System message sent to ${totalSent}/${totalClients} clients`); } // 개별 클라이언트에게 메시지 전송 private sendToClient(controller: ReadableStreamDefaultController, message: any): boolean { try { const encoder = new TextEncoder(); const data = `data: ${JSON.stringify(message)}\n\n`; controller.enqueue(encoder.encode(data)); return true; } catch (error) { console.warn('Failed to send message to client:', error); return false; } } // 하트비트 전송 (연결 유지) private startHeartbeat(): void { this.heartbeatInterval = setInterval(() => { const heartbeatMessage = { type: 'heartbeat', data: { timestamp: new Date().toISOString(), serverStatus: 'healthy', dbStatus: notificationManager.getConnectionStatus() } }; this.broadcastToAllUsers(heartbeatMessage); }, this.heartbeatFrequency); console.log(`Heartbeat started with ${this.heartbeatFrequency}ms interval`); } // 비활성 연결 정리 private startConnectionCleanup(): void { setInterval(() => { const now = Date.now(); let cleanedConnections = 0; for (const [userId, clients] of this.connectedClients.entries()) { const activeClients = clients.filter(client => { const isActive = (now - client.lastHeartbeat) < this.clientTimeout; if (!isActive) { try { client.controller.close(); } catch (error) { // 이미 닫힌 연결일 수 있음 } cleanedConnections++; } return isActive; }); if (activeClients.length === 0) { this.connectedClients.delete(userId); } else { this.connectedClients.set(userId, activeClients); } } if (cleanedConnections > 0) { console.log(`Cleaned up ${cleanedConnections} inactive connections`); } }, 60000); // 1분마다 정리 } // 연결된 클라이언트 수 조회 public getConnectedClientCount(): number { let count = 0; for (const clients of this.connectedClients.values()) { count += clients.length; } return count; } // 특정 사용자의 연결된 클라이언트 수 조회 public getUserClientCount(userId: string): number { return this.connectedClients.get(userId)?.length || 0; } // 연결된 사용자 수 조회 public getConnectedUserCount(): number { return this.connectedClients.size; } // 상세 연결 정보 조회 (관리자용) public getDetailedConnectionInfo() { const info = { totalClients: this.getConnectedClientCount(), totalUsers: this.getConnectedUserCount(), dbConnectionStatus: notificationManager.getConnectionStatus(), connections: {} as any }; for (const [userId, clients] of this.connectedClients.entries()) { info.connections[userId] = clients.map(client => ({ connectionId: client.connectionId, lastHeartbeat: new Date(client.lastHeartbeat).toISOString(), userAgent: client.userAgent, ipAddress: client.ipAddress, connectedFor: Date.now() - client.lastHeartbeat })); } return info; } // 특정 사용자의 모든 연결 해제 (관리자용) public disconnectUser(userId: string): number { const clients = this.connectedClients.get(userId); if (!clients) return 0; const count = clients.length; for (const client of clients) { try { this.sendToClient(client.controller, { type: 'force_disconnect', data: { reason: 'Administrative action' } }); client.controller.close(); } catch (error) { console.warn(`Error disconnecting client ${client.connectionId}:`, error); } } this.connectedClients.delete(userId); console.log(`Forcibly disconnected ${count} clients for user ${userId}`); return count; } // 서비스 종료 public shutdown(): void { console.log('Shutting down RealtimeNotificationService...'); if (this.heartbeatInterval) { clearInterval(this.heartbeatInterval); this.heartbeatInterval = null; } // 모든 연결에게 종료 메시지 전송 후 연결 해제 const shutdownMessage = { type: 'server_shutdown', data: { message: 'Server is shutting down. Please reconnect in a moment.' } }; for (const clients of this.connectedClients.values()) { for (const client of clients) { try { this.sendToClient(client.controller, shutdownMessage); client.controller.close(); } catch (error) { // 이미 종료된 연결 무시 } } } this.connectedClients.clear(); console.log('RealtimeNotificationService shutdown complete'); } } // 싱글톤 인스턴스 const realtimeNotificationService = new RealtimeNotificationService(); // 프로세스 종료 시 정리 process.on('SIGINT', () => { realtimeNotificationService.shutdown(); }); process.on('SIGTERM', () => { realtimeNotificationService.shutdown(); }); export default realtimeNotificationService;