diff options
Diffstat (limited to 'lib/realtime/RealtimeNotificationService.ts')
| -rw-r--r-- | lib/realtime/RealtimeNotificationService.ts | 362 |
1 files changed, 362 insertions, 0 deletions
diff --git a/lib/realtime/RealtimeNotificationService.ts b/lib/realtime/RealtimeNotificationService.ts new file mode 100644 index 00000000..4bc728ee --- /dev/null +++ b/lib/realtime/RealtimeNotificationService.ts @@ -0,0 +1,362 @@ +// lib/realtime/RealtimeNotificationService.ts +import notificationManager from './NotificationManager'; + +interface ConnectedClient { + userId: string; + controller: ReadableStreamDefaultController; + lastHeartbeat: number; + connectionId: string; + userAgent?: string; + ipAddress?: string; +} + +class RealtimeNotificationService { + private connectedClients = new Map<string, ConnectedClient[]>(); + private heartbeatInterval: NodeJS.Timeout | null = null; + private clientTimeout = 5 * 60 * 1000; // 5분 + private heartbeatFrequency = 30 * 1000; // 30초 + + constructor() { + this.setupEventListeners(); + this.startHeartbeat(); + this.startConnectionCleanup(); + } + + private setupEventListeners() { + // PostgreSQL NOTIFY 이벤트 수신 시 클라이언트들에게 브로드캐스트 + notificationManager.on('newNotification', (payload: any) => { + console.log('Broadcasting new notification to user:', payload.user_id); + this.broadcastToUser(payload.user_id, { + type: 'new_notification', + data: payload + }); + }); + + notificationManager.on('notificationRead', (payload: any) => { + console.log('Broadcasting notification read to user:', payload.user_id); + this.broadcastToUser(payload.user_id, { + type: 'notification_read', + data: payload + }); + }); + + // NotificationManager 연결 상태 변화 시 클라이언트들에게 알림 + notificationManager.on('connected', () => { + this.broadcastToAllUsers({ + type: 'system_status', + data: { status: 'connected', message: 'Real-time notifications are now active' } + }); + }); + + notificationManager.on('disconnected', () => { + this.broadcastToAllUsers({ + type: 'system_status', + data: { status: 'disconnected', message: 'Real-time notifications temporarily unavailable' } + }); + }); + } + + // 클라이언트 연결 등록 + public addClient( + userId: string, + controller: ReadableStreamDefaultController, + metadata?: { userAgent?: string; ipAddress?: string } + ): string { + const connectionId = `${userId}_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; + + if (!this.connectedClients.has(userId)) { + this.connectedClients.set(userId, []); + } + + const clients = this.connectedClients.get(userId)!; + + // 동일 사용자의 최대 연결 수 제한 + const maxConnectionsPerUser = parseInt(process.env.MAX_CONNECTIONS_PER_USER || '5'); + if (clients.length >= maxConnectionsPerUser) { + console.warn(`Max connections (${maxConnectionsPerUser}) reached for user ${userId}`); + // 가장 오래된 연결 제거 + const oldestClient = clients.shift(); + if (oldestClient) { + try { + oldestClient.controller.close(); + } catch (error) { + console.warn('Error closing oldest connection:', error); + } + } + } + + clients.push({ + userId, + controller, + lastHeartbeat: Date.now(), + connectionId, + userAgent: metadata?.userAgent, + ipAddress: metadata?.ipAddress + }); + + console.log(`Client ${connectionId} connected for user ${userId} (total: ${clients.length})`); + + // 연결 확인 메시지 전송 + this.sendToClient(controller, { + type: 'connected', + data: { + connectionId, + serverTime: new Date().toISOString(), + dbStatus: notificationManager.getConnectionStatus() + } + }); + + return connectionId; + } + + // 클라이언트 연결 해제 + public removeClient(userId: string, controller: ReadableStreamDefaultController): void { + const clients = this.connectedClients.get(userId); + if (!clients) return; + + const filteredClients = clients.filter(client => { + if (client.controller === controller) { + console.log(`Client ${client.connectionId} disconnected for user ${userId}`); + return false; + } + return true; + }); + + if (filteredClients.length === 0) { + this.connectedClients.delete(userId); + console.log(`All clients disconnected for user ${userId}`); + } else { + this.connectedClients.set(userId, filteredClients); + } + } + + // 특정 사용자에게 메시지 브로드캐스트 + private broadcastToUser(userId: string, message: any): void { + const clients = this.connectedClients.get(userId); + if (!clients || clients.length === 0) { + console.log(`No connected clients for user ${userId}`); + return; + } + + const activeClients: ConnectedClient[] = []; + let sentCount = 0; + + for (const client of clients) { + if (this.sendToClient(client.controller, message)) { + client.lastHeartbeat = Date.now(); + activeClients.push(client); + sentCount++; + } + } + + // 활성 클라이언트만 유지 + if (activeClients.length === 0) { + this.connectedClients.delete(userId); + } else { + this.connectedClients.set(userId, activeClients); + } + + console.log(`Message sent to ${sentCount}/${clients.length} clients for user ${userId}`); + } + + // 모든 사용자에게 브로드캐스트 (시스템 메시지용) + private broadcastToAllUsers(message: any): void { + let totalSent = 0; + let totalClients = 0; + + for (const [userId, clients] of this.connectedClients.entries()) { + totalClients += clients.length; + const activeClients: ConnectedClient[] = []; + + for (const client of clients) { + if (this.sendToClient(client.controller, message)) { + client.lastHeartbeat = Date.now(); + activeClients.push(client); + totalSent++; + } + } + + if (activeClients.length === 0) { + this.connectedClients.delete(userId); + } else { + this.connectedClients.set(userId, activeClients); + } + } + + console.log(`System message sent to ${totalSent}/${totalClients} clients`); + } + + // 개별 클라이언트에게 메시지 전송 + private sendToClient(controller: ReadableStreamDefaultController, message: any): boolean { + try { + const encoder = new TextEncoder(); + const data = `data: ${JSON.stringify(message)}\n\n`; + controller.enqueue(encoder.encode(data)); + return true; + } catch (error) { + console.warn('Failed to send message to client:', error); + return false; + } + } + + // 하트비트 전송 (연결 유지) + private startHeartbeat(): void { + this.heartbeatInterval = setInterval(() => { + const heartbeatMessage = { + type: 'heartbeat', + data: { + timestamp: new Date().toISOString(), + serverStatus: 'healthy', + dbStatus: notificationManager.getConnectionStatus() + } + }; + + this.broadcastToAllUsers(heartbeatMessage); + }, this.heartbeatFrequency); + + console.log(`Heartbeat started with ${this.heartbeatFrequency}ms interval`); + } + + // 비활성 연결 정리 + private startConnectionCleanup(): void { + setInterval(() => { + const now = Date.now(); + let cleanedConnections = 0; + + for (const [userId, clients] of this.connectedClients.entries()) { + const activeClients = clients.filter(client => { + const isActive = (now - client.lastHeartbeat) < this.clientTimeout; + if (!isActive) { + try { + client.controller.close(); + } catch (error) { + // 이미 닫힌 연결일 수 있음 + } + cleanedConnections++; + } + return isActive; + }); + + if (activeClients.length === 0) { + this.connectedClients.delete(userId); + } else { + this.connectedClients.set(userId, activeClients); + } + } + + if (cleanedConnections > 0) { + console.log(`Cleaned up ${cleanedConnections} inactive connections`); + } + }, 60000); // 1분마다 정리 + } + + // 연결된 클라이언트 수 조회 + public getConnectedClientCount(): number { + let count = 0; + for (const clients of this.connectedClients.values()) { + count += clients.length; + } + return count; + } + + // 특정 사용자의 연결된 클라이언트 수 조회 + public getUserClientCount(userId: string): number { + return this.connectedClients.get(userId)?.length || 0; + } + + // 연결된 사용자 수 조회 + public getConnectedUserCount(): number { + return this.connectedClients.size; + } + + // 상세 연결 정보 조회 (관리자용) + public getDetailedConnectionInfo() { + const info = { + totalClients: this.getConnectedClientCount(), + totalUsers: this.getConnectedUserCount(), + dbConnectionStatus: notificationManager.getConnectionStatus(), + connections: {} as any + }; + + for (const [userId, clients] of this.connectedClients.entries()) { + info.connections[userId] = clients.map(client => ({ + connectionId: client.connectionId, + lastHeartbeat: new Date(client.lastHeartbeat).toISOString(), + userAgent: client.userAgent, + ipAddress: client.ipAddress, + connectedFor: Date.now() - client.lastHeartbeat + })); + } + + return info; + } + + // 특정 사용자의 모든 연결 해제 (관리자용) + public disconnectUser(userId: string): number { + const clients = this.connectedClients.get(userId); + if (!clients) return 0; + + const count = clients.length; + + for (const client of clients) { + try { + this.sendToClient(client.controller, { + type: 'force_disconnect', + data: { reason: 'Administrative action' } + }); + client.controller.close(); + } catch (error) { + console.warn(`Error disconnecting client ${client.connectionId}:`, error); + } + } + + this.connectedClients.delete(userId); + console.log(`Forcibly disconnected ${count} clients for user ${userId}`); + + return count; + } + + // 서비스 종료 + public shutdown(): void { + console.log('Shutting down RealtimeNotificationService...'); + + if (this.heartbeatInterval) { + clearInterval(this.heartbeatInterval); + this.heartbeatInterval = null; + } + + // 모든 연결에게 종료 메시지 전송 후 연결 해제 + const shutdownMessage = { + type: 'server_shutdown', + data: { message: 'Server is shutting down. Please reconnect in a moment.' } + }; + + for (const clients of this.connectedClients.values()) { + for (const client of clients) { + try { + this.sendToClient(client.controller, shutdownMessage); + client.controller.close(); + } catch (error) { + // 이미 종료된 연결 무시 + } + } + } + + this.connectedClients.clear(); + console.log('RealtimeNotificationService shutdown complete'); + } +} + +// 싱글톤 인스턴스 +const realtimeNotificationService = new RealtimeNotificationService(); + +// 프로세스 종료 시 정리 +process.on('SIGINT', () => { + realtimeNotificationService.shutdown(); +}); + +process.on('SIGTERM', () => { + realtimeNotificationService.shutdown(); +}); + +export default realtimeNotificationService;
\ No newline at end of file |
