diff options
| author | dujinkim <dujin.kim@dtsolution.co.kr> | 2025-07-18 07:52:02 +0000 |
|---|---|---|
| committer | dujinkim <dujin.kim@dtsolution.co.kr> | 2025-07-18 07:52:02 +0000 |
| commit | 48a2255bfc45ffcfb0b39ffefdd57cbacf8b36df (patch) | |
| tree | 0c88b7c126138233875e8d372a4e999e49c38a62 /lib/realtime | |
| parent | 2ef02e27dbe639876fa3b90c30307dda183545ec (diff) | |
(대표님) 파일관리변경, 클라IP추적, 실시간알림, 미들웨어변경, 알림API
Diffstat (limited to 'lib/realtime')
| -rw-r--r-- | lib/realtime/NotificationManager.ts | 223 | ||||
| -rw-r--r-- | lib/realtime/RealtimeNotificationService.ts | 362 |
2 files changed, 585 insertions, 0 deletions
diff --git a/lib/realtime/NotificationManager.ts b/lib/realtime/NotificationManager.ts new file mode 100644 index 00000000..88eed387 --- /dev/null +++ b/lib/realtime/NotificationManager.ts @@ -0,0 +1,223 @@ +// lib/realtime/NotificationManager.ts +import { Client } from 'pg'; +import { EventEmitter } from 'events'; + +interface NotificationPayload { + id: string; + user_id: string; + title: string; + message: string; + type: string; + related_record_id?: string; + related_record_type?: string; + is_read: boolean; + created_at: string; +} + +interface NotificationReadPayload { + id: string; + user_id: string; + is_read: boolean; + read_at?: string; +} + +class NotificationManager extends EventEmitter { + private client: Client | null = null; + private isConnected = false; + private reconnectAttempts = 0; + private maxReconnectAttempts = 5; + private reconnectDelay = 1000; + private reconnectTimeout: NodeJS.Timeout | null = null; + + constructor() { + super(); + this.setMaxListeners(100); // SSE 연결이 많을 수 있으므로 제한 증가 + this.connect(); + } + + private async connect() { + try { + // 기존 연결이 있으면 정리 + if (this.client) { + try { + await this.client.end(); + } catch (error) { + console.warn('Error closing existing connection:', error); + } + } + + this.client = new Client({ + connectionString: process.env.DATABASE_URL, + application_name: 'notification_listener', + // 연결 유지 설정 + keepAlive: true, + keepAliveInitialDelayMillis: 10000, + }); + + await this.client.connect(); + + // LISTEN 채널 구독 + await this.client.query('LISTEN new_notification'); + await this.client.query('LISTEN notification_read'); + + console.log('NotificationManager: PostgreSQL LISTEN connected'); + + // 알림 수신 처리 + this.client.on('notification', (msg) => { + try { + if (msg.channel === 'new_notification') { + const payload: NotificationPayload = JSON.parse(msg.payload || '{}'); + console.log('New notification received:', payload.id, 'for user:', payload.user_id); + this.emit('newNotification', payload); + } else if (msg.channel === 'notification_read') { + const payload: NotificationReadPayload = JSON.parse(msg.payload || '{}'); + console.log('Notification read:', payload.id, 'by user:', payload.user_id); + this.emit('notificationRead', payload); + } + } catch (error) { + console.error('Error parsing notification payload:', error); + } + }); + + // 연결 오류 처리 + this.client.on('error', (error) => { + console.error('PostgreSQL LISTEN error:', error); + this.isConnected = false; + this.scheduleReconnect(); + }); + + // 연결 종료 처리 + this.client.on('end', () => { + console.log('PostgreSQL LISTEN connection ended'); + this.isConnected = false; + this.scheduleReconnect(); + }); + + this.isConnected = true; + this.reconnectAttempts = 0; + + // 연결 성공 이벤트 발송 + this.emit('connected'); + + } catch (error) { + console.error('Failed to connect NotificationManager:', error); + this.isConnected = false; + this.scheduleReconnect(); + } + } + + private scheduleReconnect() { + // 이미 재연결이 예약되어 있으면 무시 + if (this.reconnectTimeout) { + return; + } + + if (this.reconnectAttempts >= this.maxReconnectAttempts) { + console.error('Max reconnection attempts reached for NotificationManager'); + this.emit('maxReconnectAttemptsReached'); + return; + } + + this.reconnectAttempts++; + const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1); + + console.log(`Scheduling NotificationManager reconnection in ${delay}ms (attempt ${this.reconnectAttempts})`); + + this.reconnectTimeout = setTimeout(() => { + this.reconnectTimeout = null; + this.connect(); + }, delay); + } + + public async disconnect() { + if (this.reconnectTimeout) { + clearTimeout(this.reconnectTimeout); + this.reconnectTimeout = null; + } + + if (this.client) { + try { + await this.client.end(); + } catch (error) { + console.warn('Error disconnecting NotificationManager:', error); + } + this.client = null; + } + + this.isConnected = false; + this.emit('disconnected'); + } + + public getConnectionStatus(): boolean { + return this.isConnected && this.client !== null; + } + + public getReconnectAttempts(): number { + return this.reconnectAttempts; + } + + // 강제 재연결 (관리자 API용) + public async forceReconnect() { + console.log('Forcing NotificationManager reconnection...'); + this.reconnectAttempts = 0; + if (this.reconnectTimeout) { + clearTimeout(this.reconnectTimeout); + this.reconnectTimeout = null; + } + await this.disconnect(); + await this.connect(); + } + + // 연결 상태 체크 (헬스체크용) + public async healthCheck(): Promise<{ status: string; details: any }> { + try { + if (!this.client || !this.isConnected) { + return { + status: 'unhealthy', + details: { + connected: false, + reconnectAttempts: this.reconnectAttempts, + maxReconnectAttempts: this.maxReconnectAttempts + } + }; + } + + // 간단한 쿼리로 연결 상태 확인 + await this.client.query('SELECT 1'); + + return { + status: 'healthy', + details: { + connected: true, + reconnectAttempts: this.reconnectAttempts, + uptime: process.uptime() + } + }; + } catch (error) { + return { + status: 'unhealthy', + details: { + connected: false, + error: error instanceof Error ? error.message : 'Unknown error', + reconnectAttempts: this.reconnectAttempts + } + }; + } + } +} + +// 싱글톤 인스턴스 +const notificationManager = new NotificationManager(); + +// 프로세스 종료 시 정리 +process.on('SIGINT', async () => { + console.log('Shutting down NotificationManager...'); + await notificationManager.disconnect(); +}); + +process.on('SIGTERM', async () => { + console.log('Shutting down NotificationManager...'); + await notificationManager.disconnect(); +}); + +export default notificationManager;
\ No newline at end of file diff --git a/lib/realtime/RealtimeNotificationService.ts b/lib/realtime/RealtimeNotificationService.ts new file mode 100644 index 00000000..4bc728ee --- /dev/null +++ b/lib/realtime/RealtimeNotificationService.ts @@ -0,0 +1,362 @@ +// lib/realtime/RealtimeNotificationService.ts +import notificationManager from './NotificationManager'; + +interface ConnectedClient { + userId: string; + controller: ReadableStreamDefaultController; + lastHeartbeat: number; + connectionId: string; + userAgent?: string; + ipAddress?: string; +} + +class RealtimeNotificationService { + private connectedClients = new Map<string, ConnectedClient[]>(); + private heartbeatInterval: NodeJS.Timeout | null = null; + private clientTimeout = 5 * 60 * 1000; // 5분 + private heartbeatFrequency = 30 * 1000; // 30초 + + constructor() { + this.setupEventListeners(); + this.startHeartbeat(); + this.startConnectionCleanup(); + } + + private setupEventListeners() { + // PostgreSQL NOTIFY 이벤트 수신 시 클라이언트들에게 브로드캐스트 + notificationManager.on('newNotification', (payload: any) => { + console.log('Broadcasting new notification to user:', payload.user_id); + this.broadcastToUser(payload.user_id, { + type: 'new_notification', + data: payload + }); + }); + + notificationManager.on('notificationRead', (payload: any) => { + console.log('Broadcasting notification read to user:', payload.user_id); + this.broadcastToUser(payload.user_id, { + type: 'notification_read', + data: payload + }); + }); + + // NotificationManager 연결 상태 변화 시 클라이언트들에게 알림 + notificationManager.on('connected', () => { + this.broadcastToAllUsers({ + type: 'system_status', + data: { status: 'connected', message: 'Real-time notifications are now active' } + }); + }); + + notificationManager.on('disconnected', () => { + this.broadcastToAllUsers({ + type: 'system_status', + data: { status: 'disconnected', message: 'Real-time notifications temporarily unavailable' } + }); + }); + } + + // 클라이언트 연결 등록 + public addClient( + userId: string, + controller: ReadableStreamDefaultController, + metadata?: { userAgent?: string; ipAddress?: string } + ): string { + const connectionId = `${userId}_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; + + if (!this.connectedClients.has(userId)) { + this.connectedClients.set(userId, []); + } + + const clients = this.connectedClients.get(userId)!; + + // 동일 사용자의 최대 연결 수 제한 + const maxConnectionsPerUser = parseInt(process.env.MAX_CONNECTIONS_PER_USER || '5'); + if (clients.length >= maxConnectionsPerUser) { + console.warn(`Max connections (${maxConnectionsPerUser}) reached for user ${userId}`); + // 가장 오래된 연결 제거 + const oldestClient = clients.shift(); + if (oldestClient) { + try { + oldestClient.controller.close(); + } catch (error) { + console.warn('Error closing oldest connection:', error); + } + } + } + + clients.push({ + userId, + controller, + lastHeartbeat: Date.now(), + connectionId, + userAgent: metadata?.userAgent, + ipAddress: metadata?.ipAddress + }); + + console.log(`Client ${connectionId} connected for user ${userId} (total: ${clients.length})`); + + // 연결 확인 메시지 전송 + this.sendToClient(controller, { + type: 'connected', + data: { + connectionId, + serverTime: new Date().toISOString(), + dbStatus: notificationManager.getConnectionStatus() + } + }); + + return connectionId; + } + + // 클라이언트 연결 해제 + public removeClient(userId: string, controller: ReadableStreamDefaultController): void { + const clients = this.connectedClients.get(userId); + if (!clients) return; + + const filteredClients = clients.filter(client => { + if (client.controller === controller) { + console.log(`Client ${client.connectionId} disconnected for user ${userId}`); + return false; + } + return true; + }); + + if (filteredClients.length === 0) { + this.connectedClients.delete(userId); + console.log(`All clients disconnected for user ${userId}`); + } else { + this.connectedClients.set(userId, filteredClients); + } + } + + // 특정 사용자에게 메시지 브로드캐스트 + private broadcastToUser(userId: string, message: any): void { + const clients = this.connectedClients.get(userId); + if (!clients || clients.length === 0) { + console.log(`No connected clients for user ${userId}`); + return; + } + + const activeClients: ConnectedClient[] = []; + let sentCount = 0; + + for (const client of clients) { + if (this.sendToClient(client.controller, message)) { + client.lastHeartbeat = Date.now(); + activeClients.push(client); + sentCount++; + } + } + + // 활성 클라이언트만 유지 + if (activeClients.length === 0) { + this.connectedClients.delete(userId); + } else { + this.connectedClients.set(userId, activeClients); + } + + console.log(`Message sent to ${sentCount}/${clients.length} clients for user ${userId}`); + } + + // 모든 사용자에게 브로드캐스트 (시스템 메시지용) + private broadcastToAllUsers(message: any): void { + let totalSent = 0; + let totalClients = 0; + + for (const [userId, clients] of this.connectedClients.entries()) { + totalClients += clients.length; + const activeClients: ConnectedClient[] = []; + + for (const client of clients) { + if (this.sendToClient(client.controller, message)) { + client.lastHeartbeat = Date.now(); + activeClients.push(client); + totalSent++; + } + } + + if (activeClients.length === 0) { + this.connectedClients.delete(userId); + } else { + this.connectedClients.set(userId, activeClients); + } + } + + console.log(`System message sent to ${totalSent}/${totalClients} clients`); + } + + // 개별 클라이언트에게 메시지 전송 + private sendToClient(controller: ReadableStreamDefaultController, message: any): boolean { + try { + const encoder = new TextEncoder(); + const data = `data: ${JSON.stringify(message)}\n\n`; + controller.enqueue(encoder.encode(data)); + return true; + } catch (error) { + console.warn('Failed to send message to client:', error); + return false; + } + } + + // 하트비트 전송 (연결 유지) + private startHeartbeat(): void { + this.heartbeatInterval = setInterval(() => { + const heartbeatMessage = { + type: 'heartbeat', + data: { + timestamp: new Date().toISOString(), + serverStatus: 'healthy', + dbStatus: notificationManager.getConnectionStatus() + } + }; + + this.broadcastToAllUsers(heartbeatMessage); + }, this.heartbeatFrequency); + + console.log(`Heartbeat started with ${this.heartbeatFrequency}ms interval`); + } + + // 비활성 연결 정리 + private startConnectionCleanup(): void { + setInterval(() => { + const now = Date.now(); + let cleanedConnections = 0; + + for (const [userId, clients] of this.connectedClients.entries()) { + const activeClients = clients.filter(client => { + const isActive = (now - client.lastHeartbeat) < this.clientTimeout; + if (!isActive) { + try { + client.controller.close(); + } catch (error) { + // 이미 닫힌 연결일 수 있음 + } + cleanedConnections++; + } + return isActive; + }); + + if (activeClients.length === 0) { + this.connectedClients.delete(userId); + } else { + this.connectedClients.set(userId, activeClients); + } + } + + if (cleanedConnections > 0) { + console.log(`Cleaned up ${cleanedConnections} inactive connections`); + } + }, 60000); // 1분마다 정리 + } + + // 연결된 클라이언트 수 조회 + public getConnectedClientCount(): number { + let count = 0; + for (const clients of this.connectedClients.values()) { + count += clients.length; + } + return count; + } + + // 특정 사용자의 연결된 클라이언트 수 조회 + public getUserClientCount(userId: string): number { + return this.connectedClients.get(userId)?.length || 0; + } + + // 연결된 사용자 수 조회 + public getConnectedUserCount(): number { + return this.connectedClients.size; + } + + // 상세 연결 정보 조회 (관리자용) + public getDetailedConnectionInfo() { + const info = { + totalClients: this.getConnectedClientCount(), + totalUsers: this.getConnectedUserCount(), + dbConnectionStatus: notificationManager.getConnectionStatus(), + connections: {} as any + }; + + for (const [userId, clients] of this.connectedClients.entries()) { + info.connections[userId] = clients.map(client => ({ + connectionId: client.connectionId, + lastHeartbeat: new Date(client.lastHeartbeat).toISOString(), + userAgent: client.userAgent, + ipAddress: client.ipAddress, + connectedFor: Date.now() - client.lastHeartbeat + })); + } + + return info; + } + + // 특정 사용자의 모든 연결 해제 (관리자용) + public disconnectUser(userId: string): number { + const clients = this.connectedClients.get(userId); + if (!clients) return 0; + + const count = clients.length; + + for (const client of clients) { + try { + this.sendToClient(client.controller, { + type: 'force_disconnect', + data: { reason: 'Administrative action' } + }); + client.controller.close(); + } catch (error) { + console.warn(`Error disconnecting client ${client.connectionId}:`, error); + } + } + + this.connectedClients.delete(userId); + console.log(`Forcibly disconnected ${count} clients for user ${userId}`); + + return count; + } + + // 서비스 종료 + public shutdown(): void { + console.log('Shutting down RealtimeNotificationService...'); + + if (this.heartbeatInterval) { + clearInterval(this.heartbeatInterval); + this.heartbeatInterval = null; + } + + // 모든 연결에게 종료 메시지 전송 후 연결 해제 + const shutdownMessage = { + type: 'server_shutdown', + data: { message: 'Server is shutting down. Please reconnect in a moment.' } + }; + + for (const clients of this.connectedClients.values()) { + for (const client of clients) { + try { + this.sendToClient(client.controller, shutdownMessage); + client.controller.close(); + } catch (error) { + // 이미 종료된 연결 무시 + } + } + } + + this.connectedClients.clear(); + console.log('RealtimeNotificationService shutdown complete'); + } +} + +// 싱글톤 인스턴스 +const realtimeNotificationService = new RealtimeNotificationService(); + +// 프로세스 종료 시 정리 +process.on('SIGINT', () => { + realtimeNotificationService.shutdown(); +}); + +process.on('SIGTERM', () => { + realtimeNotificationService.shutdown(); +}); + +export default realtimeNotificationService;
\ No newline at end of file |
