summaryrefslogtreecommitdiff
path: root/lib/realtime/RealtimeNotificationService.ts
diff options
context:
space:
mode:
Diffstat (limited to 'lib/realtime/RealtimeNotificationService.ts')
-rw-r--r--lib/realtime/RealtimeNotificationService.ts362
1 files changed, 362 insertions, 0 deletions
diff --git a/lib/realtime/RealtimeNotificationService.ts b/lib/realtime/RealtimeNotificationService.ts
new file mode 100644
index 00000000..4bc728ee
--- /dev/null
+++ b/lib/realtime/RealtimeNotificationService.ts
@@ -0,0 +1,362 @@
+// lib/realtime/RealtimeNotificationService.ts
+import notificationManager from './NotificationManager';
+
+interface ConnectedClient {
+ userId: string;
+ controller: ReadableStreamDefaultController;
+ lastHeartbeat: number;
+ connectionId: string;
+ userAgent?: string;
+ ipAddress?: string;
+}
+
+class RealtimeNotificationService {
+ private connectedClients = new Map<string, ConnectedClient[]>();
+ private heartbeatInterval: NodeJS.Timeout | null = null;
+ private clientTimeout = 5 * 60 * 1000; // 5분
+ private heartbeatFrequency = 30 * 1000; // 30초
+
+ constructor() {
+ this.setupEventListeners();
+ this.startHeartbeat();
+ this.startConnectionCleanup();
+ }
+
+ private setupEventListeners() {
+ // PostgreSQL NOTIFY 이벤트 수신 시 클라이언트들에게 브로드캐스트
+ notificationManager.on('newNotification', (payload: any) => {
+ console.log('Broadcasting new notification to user:', payload.user_id);
+ this.broadcastToUser(payload.user_id, {
+ type: 'new_notification',
+ data: payload
+ });
+ });
+
+ notificationManager.on('notificationRead', (payload: any) => {
+ console.log('Broadcasting notification read to user:', payload.user_id);
+ this.broadcastToUser(payload.user_id, {
+ type: 'notification_read',
+ data: payload
+ });
+ });
+
+ // NotificationManager 연결 상태 변화 시 클라이언트들에게 알림
+ notificationManager.on('connected', () => {
+ this.broadcastToAllUsers({
+ type: 'system_status',
+ data: { status: 'connected', message: 'Real-time notifications are now active' }
+ });
+ });
+
+ notificationManager.on('disconnected', () => {
+ this.broadcastToAllUsers({
+ type: 'system_status',
+ data: { status: 'disconnected', message: 'Real-time notifications temporarily unavailable' }
+ });
+ });
+ }
+
+ // 클라이언트 연결 등록
+ public addClient(
+ userId: string,
+ controller: ReadableStreamDefaultController,
+ metadata?: { userAgent?: string; ipAddress?: string }
+ ): string {
+ const connectionId = `${userId}_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
+
+ if (!this.connectedClients.has(userId)) {
+ this.connectedClients.set(userId, []);
+ }
+
+ const clients = this.connectedClients.get(userId)!;
+
+ // 동일 사용자의 최대 연결 수 제한
+ const maxConnectionsPerUser = parseInt(process.env.MAX_CONNECTIONS_PER_USER || '5');
+ if (clients.length >= maxConnectionsPerUser) {
+ console.warn(`Max connections (${maxConnectionsPerUser}) reached for user ${userId}`);
+ // 가장 오래된 연결 제거
+ const oldestClient = clients.shift();
+ if (oldestClient) {
+ try {
+ oldestClient.controller.close();
+ } catch (error) {
+ console.warn('Error closing oldest connection:', error);
+ }
+ }
+ }
+
+ clients.push({
+ userId,
+ controller,
+ lastHeartbeat: Date.now(),
+ connectionId,
+ userAgent: metadata?.userAgent,
+ ipAddress: metadata?.ipAddress
+ });
+
+ console.log(`Client ${connectionId} connected for user ${userId} (total: ${clients.length})`);
+
+ // 연결 확인 메시지 전송
+ this.sendToClient(controller, {
+ type: 'connected',
+ data: {
+ connectionId,
+ serverTime: new Date().toISOString(),
+ dbStatus: notificationManager.getConnectionStatus()
+ }
+ });
+
+ return connectionId;
+ }
+
+ // 클라이언트 연결 해제
+ public removeClient(userId: string, controller: ReadableStreamDefaultController): void {
+ const clients = this.connectedClients.get(userId);
+ if (!clients) return;
+
+ const filteredClients = clients.filter(client => {
+ if (client.controller === controller) {
+ console.log(`Client ${client.connectionId} disconnected for user ${userId}`);
+ return false;
+ }
+ return true;
+ });
+
+ if (filteredClients.length === 0) {
+ this.connectedClients.delete(userId);
+ console.log(`All clients disconnected for user ${userId}`);
+ } else {
+ this.connectedClients.set(userId, filteredClients);
+ }
+ }
+
+ // 특정 사용자에게 메시지 브로드캐스트
+ private broadcastToUser(userId: string, message: any): void {
+ const clients = this.connectedClients.get(userId);
+ if (!clients || clients.length === 0) {
+ console.log(`No connected clients for user ${userId}`);
+ return;
+ }
+
+ const activeClients: ConnectedClient[] = [];
+ let sentCount = 0;
+
+ for (const client of clients) {
+ if (this.sendToClient(client.controller, message)) {
+ client.lastHeartbeat = Date.now();
+ activeClients.push(client);
+ sentCount++;
+ }
+ }
+
+ // 활성 클라이언트만 유지
+ if (activeClients.length === 0) {
+ this.connectedClients.delete(userId);
+ } else {
+ this.connectedClients.set(userId, activeClients);
+ }
+
+ console.log(`Message sent to ${sentCount}/${clients.length} clients for user ${userId}`);
+ }
+
+ // 모든 사용자에게 브로드캐스트 (시스템 메시지용)
+ private broadcastToAllUsers(message: any): void {
+ let totalSent = 0;
+ let totalClients = 0;
+
+ for (const [userId, clients] of this.connectedClients.entries()) {
+ totalClients += clients.length;
+ const activeClients: ConnectedClient[] = [];
+
+ for (const client of clients) {
+ if (this.sendToClient(client.controller, message)) {
+ client.lastHeartbeat = Date.now();
+ activeClients.push(client);
+ totalSent++;
+ }
+ }
+
+ if (activeClients.length === 0) {
+ this.connectedClients.delete(userId);
+ } else {
+ this.connectedClients.set(userId, activeClients);
+ }
+ }
+
+ console.log(`System message sent to ${totalSent}/${totalClients} clients`);
+ }
+
+ // 개별 클라이언트에게 메시지 전송
+ private sendToClient(controller: ReadableStreamDefaultController, message: any): boolean {
+ try {
+ const encoder = new TextEncoder();
+ const data = `data: ${JSON.stringify(message)}\n\n`;
+ controller.enqueue(encoder.encode(data));
+ return true;
+ } catch (error) {
+ console.warn('Failed to send message to client:', error);
+ return false;
+ }
+ }
+
+ // 하트비트 전송 (연결 유지)
+ private startHeartbeat(): void {
+ this.heartbeatInterval = setInterval(() => {
+ const heartbeatMessage = {
+ type: 'heartbeat',
+ data: {
+ timestamp: new Date().toISOString(),
+ serverStatus: 'healthy',
+ dbStatus: notificationManager.getConnectionStatus()
+ }
+ };
+
+ this.broadcastToAllUsers(heartbeatMessage);
+ }, this.heartbeatFrequency);
+
+ console.log(`Heartbeat started with ${this.heartbeatFrequency}ms interval`);
+ }
+
+ // 비활성 연결 정리
+ private startConnectionCleanup(): void {
+ setInterval(() => {
+ const now = Date.now();
+ let cleanedConnections = 0;
+
+ for (const [userId, clients] of this.connectedClients.entries()) {
+ const activeClients = clients.filter(client => {
+ const isActive = (now - client.lastHeartbeat) < this.clientTimeout;
+ if (!isActive) {
+ try {
+ client.controller.close();
+ } catch (error) {
+ // 이미 닫힌 연결일 수 있음
+ }
+ cleanedConnections++;
+ }
+ return isActive;
+ });
+
+ if (activeClients.length === 0) {
+ this.connectedClients.delete(userId);
+ } else {
+ this.connectedClients.set(userId, activeClients);
+ }
+ }
+
+ if (cleanedConnections > 0) {
+ console.log(`Cleaned up ${cleanedConnections} inactive connections`);
+ }
+ }, 60000); // 1분마다 정리
+ }
+
+ // 연결된 클라이언트 수 조회
+ public getConnectedClientCount(): number {
+ let count = 0;
+ for (const clients of this.connectedClients.values()) {
+ count += clients.length;
+ }
+ return count;
+ }
+
+ // 특정 사용자의 연결된 클라이언트 수 조회
+ public getUserClientCount(userId: string): number {
+ return this.connectedClients.get(userId)?.length || 0;
+ }
+
+ // 연결된 사용자 수 조회
+ public getConnectedUserCount(): number {
+ return this.connectedClients.size;
+ }
+
+ // 상세 연결 정보 조회 (관리자용)
+ public getDetailedConnectionInfo() {
+ const info = {
+ totalClients: this.getConnectedClientCount(),
+ totalUsers: this.getConnectedUserCount(),
+ dbConnectionStatus: notificationManager.getConnectionStatus(),
+ connections: {} as any
+ };
+
+ for (const [userId, clients] of this.connectedClients.entries()) {
+ info.connections[userId] = clients.map(client => ({
+ connectionId: client.connectionId,
+ lastHeartbeat: new Date(client.lastHeartbeat).toISOString(),
+ userAgent: client.userAgent,
+ ipAddress: client.ipAddress,
+ connectedFor: Date.now() - client.lastHeartbeat
+ }));
+ }
+
+ return info;
+ }
+
+ // 특정 사용자의 모든 연결 해제 (관리자용)
+ public disconnectUser(userId: string): number {
+ const clients = this.connectedClients.get(userId);
+ if (!clients) return 0;
+
+ const count = clients.length;
+
+ for (const client of clients) {
+ try {
+ this.sendToClient(client.controller, {
+ type: 'force_disconnect',
+ data: { reason: 'Administrative action' }
+ });
+ client.controller.close();
+ } catch (error) {
+ console.warn(`Error disconnecting client ${client.connectionId}:`, error);
+ }
+ }
+
+ this.connectedClients.delete(userId);
+ console.log(`Forcibly disconnected ${count} clients for user ${userId}`);
+
+ return count;
+ }
+
+ // 서비스 종료
+ public shutdown(): void {
+ console.log('Shutting down RealtimeNotificationService...');
+
+ if (this.heartbeatInterval) {
+ clearInterval(this.heartbeatInterval);
+ this.heartbeatInterval = null;
+ }
+
+ // 모든 연결에게 종료 메시지 전송 후 연결 해제
+ const shutdownMessage = {
+ type: 'server_shutdown',
+ data: { message: 'Server is shutting down. Please reconnect in a moment.' }
+ };
+
+ for (const clients of this.connectedClients.values()) {
+ for (const client of clients) {
+ try {
+ this.sendToClient(client.controller, shutdownMessage);
+ client.controller.close();
+ } catch (error) {
+ // 이미 종료된 연결 무시
+ }
+ }
+ }
+
+ this.connectedClients.clear();
+ console.log('RealtimeNotificationService shutdown complete');
+ }
+}
+
+// 싱글톤 인스턴스
+const realtimeNotificationService = new RealtimeNotificationService();
+
+// 프로세스 종료 시 정리
+process.on('SIGINT', () => {
+ realtimeNotificationService.shutdown();
+});
+
+process.on('SIGTERM', () => {
+ realtimeNotificationService.shutdown();
+});
+
+export default realtimeNotificationService; \ No newline at end of file