summaryrefslogtreecommitdiff
path: root/lib/realtime/NotificationManager.ts
diff options
context:
space:
mode:
Diffstat (limited to 'lib/realtime/NotificationManager.ts')
-rw-r--r--lib/realtime/NotificationManager.ts223
1 files changed, 223 insertions, 0 deletions
diff --git a/lib/realtime/NotificationManager.ts b/lib/realtime/NotificationManager.ts
new file mode 100644
index 00000000..88eed387
--- /dev/null
+++ b/lib/realtime/NotificationManager.ts
@@ -0,0 +1,223 @@
+// lib/realtime/NotificationManager.ts
+import { Client } from 'pg';
+import { EventEmitter } from 'events';
+
+interface NotificationPayload {
+ id: string;
+ user_id: string;
+ title: string;
+ message: string;
+ type: string;
+ related_record_id?: string;
+ related_record_type?: string;
+ is_read: boolean;
+ created_at: string;
+}
+
+interface NotificationReadPayload {
+ id: string;
+ user_id: string;
+ is_read: boolean;
+ read_at?: string;
+}
+
+class NotificationManager extends EventEmitter {
+ private client: Client | null = null;
+ private isConnected = false;
+ private reconnectAttempts = 0;
+ private maxReconnectAttempts = 5;
+ private reconnectDelay = 1000;
+ private reconnectTimeout: NodeJS.Timeout | null = null;
+
+ constructor() {
+ super();
+ this.setMaxListeners(100); // SSE 연결이 많을 수 있으므로 제한 증가
+ this.connect();
+ }
+
+ private async connect() {
+ try {
+ // 기존 연결이 있으면 정리
+ if (this.client) {
+ try {
+ await this.client.end();
+ } catch (error) {
+ console.warn('Error closing existing connection:', error);
+ }
+ }
+
+ this.client = new Client({
+ connectionString: process.env.DATABASE_URL,
+ application_name: 'notification_listener',
+ // 연결 유지 설정
+ keepAlive: true,
+ keepAliveInitialDelayMillis: 10000,
+ });
+
+ await this.client.connect();
+
+ // LISTEN 채널 구독
+ await this.client.query('LISTEN new_notification');
+ await this.client.query('LISTEN notification_read');
+
+ console.log('NotificationManager: PostgreSQL LISTEN connected');
+
+ // 알림 수신 처리
+ this.client.on('notification', (msg) => {
+ try {
+ if (msg.channel === 'new_notification') {
+ const payload: NotificationPayload = JSON.parse(msg.payload || '{}');
+ console.log('New notification received:', payload.id, 'for user:', payload.user_id);
+ this.emit('newNotification', payload);
+ } else if (msg.channel === 'notification_read') {
+ const payload: NotificationReadPayload = JSON.parse(msg.payload || '{}');
+ console.log('Notification read:', payload.id, 'by user:', payload.user_id);
+ this.emit('notificationRead', payload);
+ }
+ } catch (error) {
+ console.error('Error parsing notification payload:', error);
+ }
+ });
+
+ // 연결 오류 처리
+ this.client.on('error', (error) => {
+ console.error('PostgreSQL LISTEN error:', error);
+ this.isConnected = false;
+ this.scheduleReconnect();
+ });
+
+ // 연결 종료 처리
+ this.client.on('end', () => {
+ console.log('PostgreSQL LISTEN connection ended');
+ this.isConnected = false;
+ this.scheduleReconnect();
+ });
+
+ this.isConnected = true;
+ this.reconnectAttempts = 0;
+
+ // 연결 성공 이벤트 발송
+ this.emit('connected');
+
+ } catch (error) {
+ console.error('Failed to connect NotificationManager:', error);
+ this.isConnected = false;
+ this.scheduleReconnect();
+ }
+ }
+
+ private scheduleReconnect() {
+ // 이미 재연결이 예약되어 있으면 무시
+ if (this.reconnectTimeout) {
+ return;
+ }
+
+ if (this.reconnectAttempts >= this.maxReconnectAttempts) {
+ console.error('Max reconnection attempts reached for NotificationManager');
+ this.emit('maxReconnectAttemptsReached');
+ return;
+ }
+
+ this.reconnectAttempts++;
+ const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
+
+ console.log(`Scheduling NotificationManager reconnection in ${delay}ms (attempt ${this.reconnectAttempts})`);
+
+ this.reconnectTimeout = setTimeout(() => {
+ this.reconnectTimeout = null;
+ this.connect();
+ }, delay);
+ }
+
+ public async disconnect() {
+ if (this.reconnectTimeout) {
+ clearTimeout(this.reconnectTimeout);
+ this.reconnectTimeout = null;
+ }
+
+ if (this.client) {
+ try {
+ await this.client.end();
+ } catch (error) {
+ console.warn('Error disconnecting NotificationManager:', error);
+ }
+ this.client = null;
+ }
+
+ this.isConnected = false;
+ this.emit('disconnected');
+ }
+
+ public getConnectionStatus(): boolean {
+ return this.isConnected && this.client !== null;
+ }
+
+ public getReconnectAttempts(): number {
+ return this.reconnectAttempts;
+ }
+
+ // 강제 재연결 (관리자 API용)
+ public async forceReconnect() {
+ console.log('Forcing NotificationManager reconnection...');
+ this.reconnectAttempts = 0;
+ if (this.reconnectTimeout) {
+ clearTimeout(this.reconnectTimeout);
+ this.reconnectTimeout = null;
+ }
+ await this.disconnect();
+ await this.connect();
+ }
+
+ // 연결 상태 체크 (헬스체크용)
+ public async healthCheck(): Promise<{ status: string; details: any }> {
+ try {
+ if (!this.client || !this.isConnected) {
+ return {
+ status: 'unhealthy',
+ details: {
+ connected: false,
+ reconnectAttempts: this.reconnectAttempts,
+ maxReconnectAttempts: this.maxReconnectAttempts
+ }
+ };
+ }
+
+ // 간단한 쿼리로 연결 상태 확인
+ await this.client.query('SELECT 1');
+
+ return {
+ status: 'healthy',
+ details: {
+ connected: true,
+ reconnectAttempts: this.reconnectAttempts,
+ uptime: process.uptime()
+ }
+ };
+ } catch (error) {
+ return {
+ status: 'unhealthy',
+ details: {
+ connected: false,
+ error: error instanceof Error ? error.message : 'Unknown error',
+ reconnectAttempts: this.reconnectAttempts
+ }
+ };
+ }
+ }
+}
+
+// 싱글톤 인스턴스
+const notificationManager = new NotificationManager();
+
+// 프로세스 종료 시 정리
+process.on('SIGINT', async () => {
+ console.log('Shutting down NotificationManager...');
+ await notificationManager.disconnect();
+});
+
+process.on('SIGTERM', async () => {
+ console.log('Shutting down NotificationManager...');
+ await notificationManager.disconnect();
+});
+
+export default notificationManager; \ No newline at end of file