diff options
Diffstat (limited to 'lib/realtime/NotificationManager.ts')
| -rw-r--r-- | lib/realtime/NotificationManager.ts | 223 |
1 files changed, 223 insertions, 0 deletions
diff --git a/lib/realtime/NotificationManager.ts b/lib/realtime/NotificationManager.ts new file mode 100644 index 00000000..88eed387 --- /dev/null +++ b/lib/realtime/NotificationManager.ts @@ -0,0 +1,223 @@ +// lib/realtime/NotificationManager.ts +import { Client } from 'pg'; +import { EventEmitter } from 'events'; + +interface NotificationPayload { + id: string; + user_id: string; + title: string; + message: string; + type: string; + related_record_id?: string; + related_record_type?: string; + is_read: boolean; + created_at: string; +} + +interface NotificationReadPayload { + id: string; + user_id: string; + is_read: boolean; + read_at?: string; +} + +class NotificationManager extends EventEmitter { + private client: Client | null = null; + private isConnected = false; + private reconnectAttempts = 0; + private maxReconnectAttempts = 5; + private reconnectDelay = 1000; + private reconnectTimeout: NodeJS.Timeout | null = null; + + constructor() { + super(); + this.setMaxListeners(100); // SSE 연결이 많을 수 있으므로 제한 증가 + this.connect(); + } + + private async connect() { + try { + // 기존 연결이 있으면 정리 + if (this.client) { + try { + await this.client.end(); + } catch (error) { + console.warn('Error closing existing connection:', error); + } + } + + this.client = new Client({ + connectionString: process.env.DATABASE_URL, + application_name: 'notification_listener', + // 연결 유지 설정 + keepAlive: true, + keepAliveInitialDelayMillis: 10000, + }); + + await this.client.connect(); + + // LISTEN 채널 구독 + await this.client.query('LISTEN new_notification'); + await this.client.query('LISTEN notification_read'); + + console.log('NotificationManager: PostgreSQL LISTEN connected'); + + // 알림 수신 처리 + this.client.on('notification', (msg) => { + try { + if (msg.channel === 'new_notification') { + const payload: NotificationPayload = JSON.parse(msg.payload || '{}'); + console.log('New notification received:', payload.id, 'for user:', payload.user_id); + this.emit('newNotification', payload); + } else if (msg.channel === 'notification_read') { + const payload: NotificationReadPayload = JSON.parse(msg.payload || '{}'); + console.log('Notification read:', payload.id, 'by user:', payload.user_id); + this.emit('notificationRead', payload); + } + } catch (error) { + console.error('Error parsing notification payload:', error); + } + }); + + // 연결 오류 처리 + this.client.on('error', (error) => { + console.error('PostgreSQL LISTEN error:', error); + this.isConnected = false; + this.scheduleReconnect(); + }); + + // 연결 종료 처리 + this.client.on('end', () => { + console.log('PostgreSQL LISTEN connection ended'); + this.isConnected = false; + this.scheduleReconnect(); + }); + + this.isConnected = true; + this.reconnectAttempts = 0; + + // 연결 성공 이벤트 발송 + this.emit('connected'); + + } catch (error) { + console.error('Failed to connect NotificationManager:', error); + this.isConnected = false; + this.scheduleReconnect(); + } + } + + private scheduleReconnect() { + // 이미 재연결이 예약되어 있으면 무시 + if (this.reconnectTimeout) { + return; + } + + if (this.reconnectAttempts >= this.maxReconnectAttempts) { + console.error('Max reconnection attempts reached for NotificationManager'); + this.emit('maxReconnectAttemptsReached'); + return; + } + + this.reconnectAttempts++; + const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1); + + console.log(`Scheduling NotificationManager reconnection in ${delay}ms (attempt ${this.reconnectAttempts})`); + + this.reconnectTimeout = setTimeout(() => { + this.reconnectTimeout = null; + this.connect(); + }, delay); + } + + public async disconnect() { + if (this.reconnectTimeout) { + clearTimeout(this.reconnectTimeout); + this.reconnectTimeout = null; + } + + if (this.client) { + try { + await this.client.end(); + } catch (error) { + console.warn('Error disconnecting NotificationManager:', error); + } + this.client = null; + } + + this.isConnected = false; + this.emit('disconnected'); + } + + public getConnectionStatus(): boolean { + return this.isConnected && this.client !== null; + } + + public getReconnectAttempts(): number { + return this.reconnectAttempts; + } + + // 강제 재연결 (관리자 API용) + public async forceReconnect() { + console.log('Forcing NotificationManager reconnection...'); + this.reconnectAttempts = 0; + if (this.reconnectTimeout) { + clearTimeout(this.reconnectTimeout); + this.reconnectTimeout = null; + } + await this.disconnect(); + await this.connect(); + } + + // 연결 상태 체크 (헬스체크용) + public async healthCheck(): Promise<{ status: string; details: any }> { + try { + if (!this.client || !this.isConnected) { + return { + status: 'unhealthy', + details: { + connected: false, + reconnectAttempts: this.reconnectAttempts, + maxReconnectAttempts: this.maxReconnectAttempts + } + }; + } + + // 간단한 쿼리로 연결 상태 확인 + await this.client.query('SELECT 1'); + + return { + status: 'healthy', + details: { + connected: true, + reconnectAttempts: this.reconnectAttempts, + uptime: process.uptime() + } + }; + } catch (error) { + return { + status: 'unhealthy', + details: { + connected: false, + error: error instanceof Error ? error.message : 'Unknown error', + reconnectAttempts: this.reconnectAttempts + } + }; + } + } +} + +// 싱글톤 인스턴스 +const notificationManager = new NotificationManager(); + +// 프로세스 종료 시 정리 +process.on('SIGINT', async () => { + console.log('Shutting down NotificationManager...'); + await notificationManager.disconnect(); +}); + +process.on('SIGTERM', async () => { + console.log('Shutting down NotificationManager...'); + await notificationManager.disconnect(); +}); + +export default notificationManager;
\ No newline at end of file |
