// lib/realtime/NotificationManager.ts import { Client } from 'pg'; import { EventEmitter } from 'events'; interface NotificationPayload { id: string; user_id: string; title: string; message: string; type: string; related_record_id?: string; related_record_type?: string; is_read: boolean; created_at: string; } interface NotificationReadPayload { id: string; user_id: string; is_read: boolean; read_at?: string; } class NotificationManager extends EventEmitter { private client: Client | null = null; private isConnected = false; private reconnectAttempts = 0; private maxReconnectAttempts = 5; private reconnectDelay = 1000; private reconnectTimeout: NodeJS.Timeout | null = null; constructor() { super(); this.setMaxListeners(100); // SSE 연결이 많을 수 있으므로 제한 증가 this.connect(); } private async connect() { try { // 기존 연결이 있으면 정리 if (this.client) { try { await this.client.end(); } catch (error) { console.warn('Error closing existing connection:', error); } } this.client = new Client({ connectionString: process.env.DATABASE_URL, application_name: 'notification_listener', // 연결 유지 설정 keepAlive: true, keepAliveInitialDelayMillis: 10000, }); await this.client.connect(); // LISTEN 채널 구독 await this.client.query('LISTEN new_notification'); await this.client.query('LISTEN notification_read'); console.log('NotificationManager: PostgreSQL LISTEN connected'); // 알림 수신 처리 this.client.on('notification', (msg) => { try { if (msg.channel === 'new_notification') { const payload: NotificationPayload = JSON.parse(msg.payload || '{}'); console.log('New notification received:', payload.id, 'for user:', payload.user_id); this.emit('newNotification', payload); } else if (msg.channel === 'notification_read') { const payload: NotificationReadPayload = JSON.parse(msg.payload || '{}'); console.log('Notification read:', payload.id, 'by user:', payload.user_id); this.emit('notificationRead', payload); } } catch (error) { console.error('Error parsing notification payload:', error); } }); // 연결 오류 처리 this.client.on('error', (error) => { console.error('PostgreSQL LISTEN error:', error); this.isConnected = false; this.scheduleReconnect(); }); // 연결 종료 처리 this.client.on('end', () => { console.log('PostgreSQL LISTEN connection ended'); this.isConnected = false; this.scheduleReconnect(); }); this.isConnected = true; this.reconnectAttempts = 0; // 연결 성공 이벤트 발송 this.emit('connected'); } catch (error) { console.error('Failed to connect NotificationManager:', error); this.isConnected = false; this.scheduleReconnect(); } } private scheduleReconnect() { // 이미 재연결이 예약되어 있으면 무시 if (this.reconnectTimeout) { return; } if (this.reconnectAttempts >= this.maxReconnectAttempts) { console.error('Max reconnection attempts reached for NotificationManager'); this.emit('maxReconnectAttemptsReached'); return; } this.reconnectAttempts++; const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1); console.log(`Scheduling NotificationManager reconnection in ${delay}ms (attempt ${this.reconnectAttempts})`); this.reconnectTimeout = setTimeout(() => { this.reconnectTimeout = null; this.connect(); }, delay); } public async disconnect() { if (this.reconnectTimeout) { clearTimeout(this.reconnectTimeout); this.reconnectTimeout = null; } if (this.client) { try { await this.client.end(); } catch (error) { console.warn('Error disconnecting NotificationManager:', error); } this.client = null; } this.isConnected = false; this.emit('disconnected'); } public getConnectionStatus(): boolean { return this.isConnected && this.client !== null; } public getReconnectAttempts(): number { return this.reconnectAttempts; } // 강제 재연결 (관리자 API용) public async forceReconnect() { console.log('Forcing NotificationManager reconnection...'); this.reconnectAttempts = 0; if (this.reconnectTimeout) { clearTimeout(this.reconnectTimeout); this.reconnectTimeout = null; } await this.disconnect(); await this.connect(); } // 연결 상태 체크 (헬스체크용) public async healthCheck(): Promise<{ status: string; details: any }> { try { if (!this.client || !this.isConnected) { return { status: 'unhealthy', details: { connected: false, reconnectAttempts: this.reconnectAttempts, maxReconnectAttempts: this.maxReconnectAttempts } }; } // 간단한 쿼리로 연결 상태 확인 await this.client.query('SELECT 1'); return { status: 'healthy', details: { connected: true, reconnectAttempts: this.reconnectAttempts, uptime: process.uptime() } }; } catch (error) { return { status: 'unhealthy', details: { connected: false, error: error instanceof Error ? error.message : 'Unknown error', reconnectAttempts: this.reconnectAttempts } }; } } } // 싱글톤 인스턴스 const notificationManager = new NotificationManager(); // 프로세스 종료 시 정리 process.on('SIGINT', async () => { console.log('Shutting down NotificationManager...'); await notificationManager.disconnect(); }); process.on('SIGTERM', async () => { console.log('Shutting down NotificationManager...'); await notificationManager.disconnect(); }); export default notificationManager;