summaryrefslogtreecommitdiff
path: root/lib/realtime
diff options
context:
space:
mode:
authordujinkim <dujin.kim@dtsolution.co.kr>2025-07-18 07:52:02 +0000
committerdujinkim <dujin.kim@dtsolution.co.kr>2025-07-18 07:52:02 +0000
commit48a2255bfc45ffcfb0b39ffefdd57cbacf8b36df (patch)
tree0c88b7c126138233875e8d372a4e999e49c38a62 /lib/realtime
parent2ef02e27dbe639876fa3b90c30307dda183545ec (diff)
(대표님) 파일관리변경, 클라IP추적, 실시간알림, 미들웨어변경, 알림API
Diffstat (limited to 'lib/realtime')
-rw-r--r--lib/realtime/NotificationManager.ts223
-rw-r--r--lib/realtime/RealtimeNotificationService.ts362
2 files changed, 585 insertions, 0 deletions
diff --git a/lib/realtime/NotificationManager.ts b/lib/realtime/NotificationManager.ts
new file mode 100644
index 00000000..88eed387
--- /dev/null
+++ b/lib/realtime/NotificationManager.ts
@@ -0,0 +1,223 @@
+// lib/realtime/NotificationManager.ts
+import { Client } from 'pg';
+import { EventEmitter } from 'events';
+
+interface NotificationPayload {
+ id: string;
+ user_id: string;
+ title: string;
+ message: string;
+ type: string;
+ related_record_id?: string;
+ related_record_type?: string;
+ is_read: boolean;
+ created_at: string;
+}
+
+interface NotificationReadPayload {
+ id: string;
+ user_id: string;
+ is_read: boolean;
+ read_at?: string;
+}
+
+class NotificationManager extends EventEmitter {
+ private client: Client | null = null;
+ private isConnected = false;
+ private reconnectAttempts = 0;
+ private maxReconnectAttempts = 5;
+ private reconnectDelay = 1000;
+ private reconnectTimeout: NodeJS.Timeout | null = null;
+
+ constructor() {
+ super();
+ this.setMaxListeners(100); // SSE 연결이 많을 수 있으므로 제한 증가
+ this.connect();
+ }
+
+ private async connect() {
+ try {
+ // 기존 연결이 있으면 정리
+ if (this.client) {
+ try {
+ await this.client.end();
+ } catch (error) {
+ console.warn('Error closing existing connection:', error);
+ }
+ }
+
+ this.client = new Client({
+ connectionString: process.env.DATABASE_URL,
+ application_name: 'notification_listener',
+ // 연결 유지 설정
+ keepAlive: true,
+ keepAliveInitialDelayMillis: 10000,
+ });
+
+ await this.client.connect();
+
+ // LISTEN 채널 구독
+ await this.client.query('LISTEN new_notification');
+ await this.client.query('LISTEN notification_read');
+
+ console.log('NotificationManager: PostgreSQL LISTEN connected');
+
+ // 알림 수신 처리
+ this.client.on('notification', (msg) => {
+ try {
+ if (msg.channel === 'new_notification') {
+ const payload: NotificationPayload = JSON.parse(msg.payload || '{}');
+ console.log('New notification received:', payload.id, 'for user:', payload.user_id);
+ this.emit('newNotification', payload);
+ } else if (msg.channel === 'notification_read') {
+ const payload: NotificationReadPayload = JSON.parse(msg.payload || '{}');
+ console.log('Notification read:', payload.id, 'by user:', payload.user_id);
+ this.emit('notificationRead', payload);
+ }
+ } catch (error) {
+ console.error('Error parsing notification payload:', error);
+ }
+ });
+
+ // 연결 오류 처리
+ this.client.on('error', (error) => {
+ console.error('PostgreSQL LISTEN error:', error);
+ this.isConnected = false;
+ this.scheduleReconnect();
+ });
+
+ // 연결 종료 처리
+ this.client.on('end', () => {
+ console.log('PostgreSQL LISTEN connection ended');
+ this.isConnected = false;
+ this.scheduleReconnect();
+ });
+
+ this.isConnected = true;
+ this.reconnectAttempts = 0;
+
+ // 연결 성공 이벤트 발송
+ this.emit('connected');
+
+ } catch (error) {
+ console.error('Failed to connect NotificationManager:', error);
+ this.isConnected = false;
+ this.scheduleReconnect();
+ }
+ }
+
+ private scheduleReconnect() {
+ // 이미 재연결이 예약되어 있으면 무시
+ if (this.reconnectTimeout) {
+ return;
+ }
+
+ if (this.reconnectAttempts >= this.maxReconnectAttempts) {
+ console.error('Max reconnection attempts reached for NotificationManager');
+ this.emit('maxReconnectAttemptsReached');
+ return;
+ }
+
+ this.reconnectAttempts++;
+ const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
+
+ console.log(`Scheduling NotificationManager reconnection in ${delay}ms (attempt ${this.reconnectAttempts})`);
+
+ this.reconnectTimeout = setTimeout(() => {
+ this.reconnectTimeout = null;
+ this.connect();
+ }, delay);
+ }
+
+ public async disconnect() {
+ if (this.reconnectTimeout) {
+ clearTimeout(this.reconnectTimeout);
+ this.reconnectTimeout = null;
+ }
+
+ if (this.client) {
+ try {
+ await this.client.end();
+ } catch (error) {
+ console.warn('Error disconnecting NotificationManager:', error);
+ }
+ this.client = null;
+ }
+
+ this.isConnected = false;
+ this.emit('disconnected');
+ }
+
+ public getConnectionStatus(): boolean {
+ return this.isConnected && this.client !== null;
+ }
+
+ public getReconnectAttempts(): number {
+ return this.reconnectAttempts;
+ }
+
+ // 강제 재연결 (관리자 API용)
+ public async forceReconnect() {
+ console.log('Forcing NotificationManager reconnection...');
+ this.reconnectAttempts = 0;
+ if (this.reconnectTimeout) {
+ clearTimeout(this.reconnectTimeout);
+ this.reconnectTimeout = null;
+ }
+ await this.disconnect();
+ await this.connect();
+ }
+
+ // 연결 상태 체크 (헬스체크용)
+ public async healthCheck(): Promise<{ status: string; details: any }> {
+ try {
+ if (!this.client || !this.isConnected) {
+ return {
+ status: 'unhealthy',
+ details: {
+ connected: false,
+ reconnectAttempts: this.reconnectAttempts,
+ maxReconnectAttempts: this.maxReconnectAttempts
+ }
+ };
+ }
+
+ // 간단한 쿼리로 연결 상태 확인
+ await this.client.query('SELECT 1');
+
+ return {
+ status: 'healthy',
+ details: {
+ connected: true,
+ reconnectAttempts: this.reconnectAttempts,
+ uptime: process.uptime()
+ }
+ };
+ } catch (error) {
+ return {
+ status: 'unhealthy',
+ details: {
+ connected: false,
+ error: error instanceof Error ? error.message : 'Unknown error',
+ reconnectAttempts: this.reconnectAttempts
+ }
+ };
+ }
+ }
+}
+
+// 싱글톤 인스턴스
+const notificationManager = new NotificationManager();
+
+// 프로세스 종료 시 정리
+process.on('SIGINT', async () => {
+ console.log('Shutting down NotificationManager...');
+ await notificationManager.disconnect();
+});
+
+process.on('SIGTERM', async () => {
+ console.log('Shutting down NotificationManager...');
+ await notificationManager.disconnect();
+});
+
+export default notificationManager; \ No newline at end of file
diff --git a/lib/realtime/RealtimeNotificationService.ts b/lib/realtime/RealtimeNotificationService.ts
new file mode 100644
index 00000000..4bc728ee
--- /dev/null
+++ b/lib/realtime/RealtimeNotificationService.ts
@@ -0,0 +1,362 @@
+// lib/realtime/RealtimeNotificationService.ts
+import notificationManager from './NotificationManager';
+
+interface ConnectedClient {
+ userId: string;
+ controller: ReadableStreamDefaultController;
+ lastHeartbeat: number;
+ connectionId: string;
+ userAgent?: string;
+ ipAddress?: string;
+}
+
+class RealtimeNotificationService {
+ private connectedClients = new Map<string, ConnectedClient[]>();
+ private heartbeatInterval: NodeJS.Timeout | null = null;
+ private clientTimeout = 5 * 60 * 1000; // 5분
+ private heartbeatFrequency = 30 * 1000; // 30초
+
+ constructor() {
+ this.setupEventListeners();
+ this.startHeartbeat();
+ this.startConnectionCleanup();
+ }
+
+ private setupEventListeners() {
+ // PostgreSQL NOTIFY 이벤트 수신 시 클라이언트들에게 브로드캐스트
+ notificationManager.on('newNotification', (payload: any) => {
+ console.log('Broadcasting new notification to user:', payload.user_id);
+ this.broadcastToUser(payload.user_id, {
+ type: 'new_notification',
+ data: payload
+ });
+ });
+
+ notificationManager.on('notificationRead', (payload: any) => {
+ console.log('Broadcasting notification read to user:', payload.user_id);
+ this.broadcastToUser(payload.user_id, {
+ type: 'notification_read',
+ data: payload
+ });
+ });
+
+ // NotificationManager 연결 상태 변화 시 클라이언트들에게 알림
+ notificationManager.on('connected', () => {
+ this.broadcastToAllUsers({
+ type: 'system_status',
+ data: { status: 'connected', message: 'Real-time notifications are now active' }
+ });
+ });
+
+ notificationManager.on('disconnected', () => {
+ this.broadcastToAllUsers({
+ type: 'system_status',
+ data: { status: 'disconnected', message: 'Real-time notifications temporarily unavailable' }
+ });
+ });
+ }
+
+ // 클라이언트 연결 등록
+ public addClient(
+ userId: string,
+ controller: ReadableStreamDefaultController,
+ metadata?: { userAgent?: string; ipAddress?: string }
+ ): string {
+ const connectionId = `${userId}_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
+
+ if (!this.connectedClients.has(userId)) {
+ this.connectedClients.set(userId, []);
+ }
+
+ const clients = this.connectedClients.get(userId)!;
+
+ // 동일 사용자의 최대 연결 수 제한
+ const maxConnectionsPerUser = parseInt(process.env.MAX_CONNECTIONS_PER_USER || '5');
+ if (clients.length >= maxConnectionsPerUser) {
+ console.warn(`Max connections (${maxConnectionsPerUser}) reached for user ${userId}`);
+ // 가장 오래된 연결 제거
+ const oldestClient = clients.shift();
+ if (oldestClient) {
+ try {
+ oldestClient.controller.close();
+ } catch (error) {
+ console.warn('Error closing oldest connection:', error);
+ }
+ }
+ }
+
+ clients.push({
+ userId,
+ controller,
+ lastHeartbeat: Date.now(),
+ connectionId,
+ userAgent: metadata?.userAgent,
+ ipAddress: metadata?.ipAddress
+ });
+
+ console.log(`Client ${connectionId} connected for user ${userId} (total: ${clients.length})`);
+
+ // 연결 확인 메시지 전송
+ this.sendToClient(controller, {
+ type: 'connected',
+ data: {
+ connectionId,
+ serverTime: new Date().toISOString(),
+ dbStatus: notificationManager.getConnectionStatus()
+ }
+ });
+
+ return connectionId;
+ }
+
+ // 클라이언트 연결 해제
+ public removeClient(userId: string, controller: ReadableStreamDefaultController): void {
+ const clients = this.connectedClients.get(userId);
+ if (!clients) return;
+
+ const filteredClients = clients.filter(client => {
+ if (client.controller === controller) {
+ console.log(`Client ${client.connectionId} disconnected for user ${userId}`);
+ return false;
+ }
+ return true;
+ });
+
+ if (filteredClients.length === 0) {
+ this.connectedClients.delete(userId);
+ console.log(`All clients disconnected for user ${userId}`);
+ } else {
+ this.connectedClients.set(userId, filteredClients);
+ }
+ }
+
+ // 특정 사용자에게 메시지 브로드캐스트
+ private broadcastToUser(userId: string, message: any): void {
+ const clients = this.connectedClients.get(userId);
+ if (!clients || clients.length === 0) {
+ console.log(`No connected clients for user ${userId}`);
+ return;
+ }
+
+ const activeClients: ConnectedClient[] = [];
+ let sentCount = 0;
+
+ for (const client of clients) {
+ if (this.sendToClient(client.controller, message)) {
+ client.lastHeartbeat = Date.now();
+ activeClients.push(client);
+ sentCount++;
+ }
+ }
+
+ // 활성 클라이언트만 유지
+ if (activeClients.length === 0) {
+ this.connectedClients.delete(userId);
+ } else {
+ this.connectedClients.set(userId, activeClients);
+ }
+
+ console.log(`Message sent to ${sentCount}/${clients.length} clients for user ${userId}`);
+ }
+
+ // 모든 사용자에게 브로드캐스트 (시스템 메시지용)
+ private broadcastToAllUsers(message: any): void {
+ let totalSent = 0;
+ let totalClients = 0;
+
+ for (const [userId, clients] of this.connectedClients.entries()) {
+ totalClients += clients.length;
+ const activeClients: ConnectedClient[] = [];
+
+ for (const client of clients) {
+ if (this.sendToClient(client.controller, message)) {
+ client.lastHeartbeat = Date.now();
+ activeClients.push(client);
+ totalSent++;
+ }
+ }
+
+ if (activeClients.length === 0) {
+ this.connectedClients.delete(userId);
+ } else {
+ this.connectedClients.set(userId, activeClients);
+ }
+ }
+
+ console.log(`System message sent to ${totalSent}/${totalClients} clients`);
+ }
+
+ // 개별 클라이언트에게 메시지 전송
+ private sendToClient(controller: ReadableStreamDefaultController, message: any): boolean {
+ try {
+ const encoder = new TextEncoder();
+ const data = `data: ${JSON.stringify(message)}\n\n`;
+ controller.enqueue(encoder.encode(data));
+ return true;
+ } catch (error) {
+ console.warn('Failed to send message to client:', error);
+ return false;
+ }
+ }
+
+ // 하트비트 전송 (연결 유지)
+ private startHeartbeat(): void {
+ this.heartbeatInterval = setInterval(() => {
+ const heartbeatMessage = {
+ type: 'heartbeat',
+ data: {
+ timestamp: new Date().toISOString(),
+ serverStatus: 'healthy',
+ dbStatus: notificationManager.getConnectionStatus()
+ }
+ };
+
+ this.broadcastToAllUsers(heartbeatMessage);
+ }, this.heartbeatFrequency);
+
+ console.log(`Heartbeat started with ${this.heartbeatFrequency}ms interval`);
+ }
+
+ // 비활성 연결 정리
+ private startConnectionCleanup(): void {
+ setInterval(() => {
+ const now = Date.now();
+ let cleanedConnections = 0;
+
+ for (const [userId, clients] of this.connectedClients.entries()) {
+ const activeClients = clients.filter(client => {
+ const isActive = (now - client.lastHeartbeat) < this.clientTimeout;
+ if (!isActive) {
+ try {
+ client.controller.close();
+ } catch (error) {
+ // 이미 닫힌 연결일 수 있음
+ }
+ cleanedConnections++;
+ }
+ return isActive;
+ });
+
+ if (activeClients.length === 0) {
+ this.connectedClients.delete(userId);
+ } else {
+ this.connectedClients.set(userId, activeClients);
+ }
+ }
+
+ if (cleanedConnections > 0) {
+ console.log(`Cleaned up ${cleanedConnections} inactive connections`);
+ }
+ }, 60000); // 1분마다 정리
+ }
+
+ // 연결된 클라이언트 수 조회
+ public getConnectedClientCount(): number {
+ let count = 0;
+ for (const clients of this.connectedClients.values()) {
+ count += clients.length;
+ }
+ return count;
+ }
+
+ // 특정 사용자의 연결된 클라이언트 수 조회
+ public getUserClientCount(userId: string): number {
+ return this.connectedClients.get(userId)?.length || 0;
+ }
+
+ // 연결된 사용자 수 조회
+ public getConnectedUserCount(): number {
+ return this.connectedClients.size;
+ }
+
+ // 상세 연결 정보 조회 (관리자용)
+ public getDetailedConnectionInfo() {
+ const info = {
+ totalClients: this.getConnectedClientCount(),
+ totalUsers: this.getConnectedUserCount(),
+ dbConnectionStatus: notificationManager.getConnectionStatus(),
+ connections: {} as any
+ };
+
+ for (const [userId, clients] of this.connectedClients.entries()) {
+ info.connections[userId] = clients.map(client => ({
+ connectionId: client.connectionId,
+ lastHeartbeat: new Date(client.lastHeartbeat).toISOString(),
+ userAgent: client.userAgent,
+ ipAddress: client.ipAddress,
+ connectedFor: Date.now() - client.lastHeartbeat
+ }));
+ }
+
+ return info;
+ }
+
+ // 특정 사용자의 모든 연결 해제 (관리자용)
+ public disconnectUser(userId: string): number {
+ const clients = this.connectedClients.get(userId);
+ if (!clients) return 0;
+
+ const count = clients.length;
+
+ for (const client of clients) {
+ try {
+ this.sendToClient(client.controller, {
+ type: 'force_disconnect',
+ data: { reason: 'Administrative action' }
+ });
+ client.controller.close();
+ } catch (error) {
+ console.warn(`Error disconnecting client ${client.connectionId}:`, error);
+ }
+ }
+
+ this.connectedClients.delete(userId);
+ console.log(`Forcibly disconnected ${count} clients for user ${userId}`);
+
+ return count;
+ }
+
+ // 서비스 종료
+ public shutdown(): void {
+ console.log('Shutting down RealtimeNotificationService...');
+
+ if (this.heartbeatInterval) {
+ clearInterval(this.heartbeatInterval);
+ this.heartbeatInterval = null;
+ }
+
+ // 모든 연결에게 종료 메시지 전송 후 연결 해제
+ const shutdownMessage = {
+ type: 'server_shutdown',
+ data: { message: 'Server is shutting down. Please reconnect in a moment.' }
+ };
+
+ for (const clients of this.connectedClients.values()) {
+ for (const client of clients) {
+ try {
+ this.sendToClient(client.controller, shutdownMessage);
+ client.controller.close();
+ } catch (error) {
+ // 이미 종료된 연결 무시
+ }
+ }
+ }
+
+ this.connectedClients.clear();
+ console.log('RealtimeNotificationService shutdown complete');
+ }
+}
+
+// 싱글톤 인스턴스
+const realtimeNotificationService = new RealtimeNotificationService();
+
+// 프로세스 종료 시 정리
+process.on('SIGINT', () => {
+ realtimeNotificationService.shutdown();
+});
+
+process.on('SIGTERM', () => {
+ realtimeNotificationService.shutdown();
+});
+
+export default realtimeNotificationService; \ No newline at end of file