// lib/sync-service.ts (시스템별 분리 버전 - DOLCE 업로드 통합) import db from "@/db/db" import { changeLogs, syncBatches, type ChangeLog, type SyncBatch } from "@/db/schema/vendorDocu" import { documents, revisions, documentAttachments } from "@/db/schema/vendorDocu" import { eq, and, lt, desc, sql, inArray } from "drizzle-orm" export interface SyncableEntity { entityType: 'document' | 'revision' | 'attachment' entityId: number action: 'CREATE' | 'UPDATE' | 'DELETE' data: any metadata?: Record } export interface SyncResult { batchId: number success: boolean successCount: number failureCount: number errors?: string[] endpointResults?: Record } class SyncService { private readonly CHUNK_SIZE = 50 /** * 동기화 활성화 여부 확인 */ private isSyncEnabled(targetSystem: string): boolean { const upperSystem = targetSystem.toUpperCase() const enabled = process.env[`SYNC_${upperSystem}_ENABLED`] return enabled === 'true' || enabled === '1' } /** * 변경사항을 change_logs에 기록 */ async logChange( contractId: number, entityType: 'document' | 'revision' | 'attachment', entityId: number, action: 'CREATE' | 'UPDATE' | 'DELETE', newValues?: any, oldValues?: any, userId?: number, userName?: string, targetSystems: string[] = ["DOLCE", "SWP"] ) { try { const changedFields = this.detectChangedFields(oldValues, newValues) await db.insert(changeLogs).values({ contractId, entityType, entityId, action, changedFields, oldValues, newValues, userId, userName, targetSystems, }) console.log(`Change logged: ${entityType}/${entityId} - ${action}`) } catch (error) { console.error('Failed to log change:', error) throw error } } /** * 변경된 필드 감지 */ private detectChangedFields(oldValues: any, newValues: any): Record | null { if (!oldValues || !newValues) return null const changes: Record = {} for (const [key, newValue] of Object.entries(newValues)) { if (JSON.stringify(oldValues[key]) !== JSON.stringify(newValue)) { changes[key] = { from: oldValues[key], to: newValue } } } return Object.keys(changes).length > 0 ? changes : null } /** * 동기화할 변경사항 조회 (증분) */ async getPendingChanges( contractId: number, targetSystem: string = 'DOLCE', limit?: number ): Promise { const query = db .select() .from(changeLogs) .where(and( eq(changeLogs.contractId, contractId), eq(changeLogs.isSynced, false), lt(changeLogs.syncAttempts, 3), sql`(${changeLogs.targetSystems} IS NULL OR ${changeLogs.targetSystems} @> ${JSON.stringify([targetSystem])})` )) .orderBy(changeLogs.createdAt) if (limit) { query.limit(limit) } return await query } /** * 배열을 청크 단위로 분할 */ private chunkArray(array: T[], chunkSize: number): T[][] { const chunks: T[][] = [] for (let i = 0; i < array.length; i += chunkSize) { chunks.push(array.slice(i, i + chunkSize)) } return chunks } /** * 동기화 배치 생성 */ async createSyncBatch( contractId: number, targetSystem: string, changeLogIds: number[] ): Promise { const [batch] = await db .insert(syncBatches) .values({ contractId, targetSystem, batchSize: changeLogIds.length, changeLogIds, status: 'PENDING' }) .returning({ id: syncBatches.id }) return batch.id } /** * 메인 동기화 실행 함수 (청크 처리 포함) */ async syncToExternalSystem( contractId: number, targetSystem: string = 'DOLCE', manualTrigger: boolean = false ): Promise { try { // 1. 동기화 활성화 확인 if (!this.isSyncEnabled(targetSystem)) { throw new Error(`Sync not enabled for ${targetSystem}`) } // 2. 대기 중인 변경사항 조회 (전체) const pendingChanges = await this.getPendingChanges(contractId, targetSystem) if (pendingChanges.length === 0) { return { batchId: 0, success: true, successCount: 0, failureCount: 0 } } // 3. 배치 생성 const batchId = await this.createSyncBatch( contractId, targetSystem, pendingChanges.map(c => c.id) ) // 4. 배치 상태를 PROCESSING으로 업데이트 await db .update(syncBatches) .set({ status: 'PROCESSING', startedAt: new Date(), updatedAt: new Date() }) .where(eq(syncBatches.id, batchId)) // 5. 청크 단위로 동기화 수행 - 시스템별 분기 const chunks = this.chunkArray(pendingChanges, this.CHUNK_SIZE) let totalSuccessCount = 0 let totalFailureCount = 0 const allErrors: string[] = [] const endpointResults: Record = {} for (let i = 0; i < chunks.length; i++) { const chunk = chunks[i] console.log(`Processing chunk ${i + 1}/${chunks.length} (${chunk.length} items) for ${targetSystem}`) try { let chunkResult; // 시스템별로 다른 동기화 메서드 호출 switch (targetSystem.toUpperCase()) { case 'DOLCE': chunkResult = await this.performSyncDOLCE(chunk, contractId) break case 'SWP': chunkResult = await this.performSyncSWP(chunk, contractId) break default: throw new Error(`Unsupported target system: ${targetSystem}`) } totalSuccessCount += chunkResult.successCount totalFailureCount += chunkResult.failureCount if (chunkResult.errors) { allErrors.push(...chunkResult.errors) } // 엔드포인트별 결과 병합 Object.assign(endpointResults, chunkResult.endpointResults || {}) // 성공한 변경사항들을 동기화 완료로 표시 if (chunkResult.successCount > 0) { const successfulChangeIds = chunk .slice(0, chunkResult.successCount) .map(c => c.id) await this.markChangesAsSynced(successfulChangeIds) } // 실패한 변경사항들의 재시도 횟수 증가 if (chunkResult.failureCount > 0) { const failedChangeIds = chunk .slice(chunkResult.successCount) .map(c => c.id) await this.incrementSyncAttempts(failedChangeIds, chunkResult.errors?.[0]) } } catch (error) { console.error(`Chunk ${i + 1} failed for ${targetSystem}:`, error) totalFailureCount += chunk.length allErrors.push(`Chunk ${i + 1}: ${error instanceof Error ? error.message : 'Unknown error'}`) // 전체 청크 실패 시 재시도 횟수 증가 await this.incrementSyncAttempts(chunk.map(c => c.id), error instanceof Error ? error.message : 'Unknown error') } } const overallSuccess = totalFailureCount === 0 // 6. 배치 상태 업데이트 await db .update(syncBatches) .set({ status: overallSuccess ? 'SUCCESS' : (totalSuccessCount > 0 ? 'PARTIAL' : 'FAILED'), completedAt: new Date(), successCount: totalSuccessCount, failureCount: totalFailureCount, errorMessage: allErrors.length > 0 ? allErrors.join('; ') : null, updatedAt: new Date() }) .where(eq(syncBatches.id, batchId)) return { batchId, success: overallSuccess, successCount: totalSuccessCount, failureCount: totalFailureCount, errors: allErrors.length > 0 ? allErrors : undefined, endpointResults } } catch (error) { console.error('Sync failed:', error) throw error } } /** * DOLCE 시스템 전용 동기화 수행 - 실제 업로드 서비스 사용 */ private async performSyncDOLCE( changes: ChangeLog[], contractId: number ): Promise<{ success: boolean; successCount: number; failureCount: number; errors?: string[]; endpointResults?: Record }> { const errors: string[] = [] const endpointResults: Record = {} try { // DOLCE 업로드 서비스 동적 임포트 const { dolceUploadService } = await import('./dolce-upload-service') if (!dolceUploadService.isUploadEnabled()) { throw new Error('DOLCE upload is not enabled') } // 변경사항에서 리비전 ID들 추출 const revisionIds = changes .filter(change => change.entityType === 'revision') .map(change => change.entityId) if (revisionIds.length === 0) { return { success: true, successCount: 0, failureCount: 0, endpointResults: { message: 'No revisions to upload' } } } // DOLCE 업로드 실행 const uploadResult = await dolceUploadService.uploadToDoLCE( contractId, revisionIds, 'system_user', // 시스템 사용자 ID 'System Upload' ) endpointResults['dolce_upload'] = uploadResult if (uploadResult.success) { console.log(`✅ DOLCE upload successful: ${uploadResult.uploadedDocuments} documents, ${uploadResult.uploadedFiles} files`) return { success: true, successCount: changes.length, failureCount: 0, endpointResults } } else { console.error(`❌ DOLCE upload failed:`, uploadResult.errors) return { success: false, successCount: 0, failureCount: changes.length, errors: uploadResult.errors, endpointResults } } } catch (error) { const errorMessage = `DOLCE upload failed: ${error instanceof Error ? error.message : 'Unknown error'}` errors.push(errorMessage) console.error(`❌ DOLCE upload error:`, error) return { success: false, successCount: 0, failureCount: changes.length, errors, endpointResults } } } /** * SWP 시스템 전용 동기화 수행 */ private async performSyncSWP( changes: ChangeLog[], contractId: number ): Promise<{ success: boolean; successCount: number; failureCount: number; errors?: string[]; endpointResults?: Record }> { const errors: string[] = [] const endpointResults: Record = {} let overallSuccess = true // 변경사항을 SWP 시스템 형태로 변환 const syncData = await this.transformChangesForSWP(changes) // 1. SWP 메인 엔드포인트 (XML 전송) const mainUrl = process.env.SYNC_SWP_URL if (mainUrl) { try { console.log(`Sending to SWP main: ${mainUrl}`) const transformedData = this.convertToXML({ contractId, systemType: 'SWP', changes: syncData, batchSize: changes.length, timestamp: new Date().toISOString(), source: 'EVCP' }) const response = await fetch(mainUrl, { method: 'POST', headers: { 'Content-Type': 'application/xml', 'Authorization': `Basic ${Buffer.from(`${process.env.SYNC_SWP_USER}:${process.env.SYNC_SWP_PASSWORD}`).toString('base64')}`, 'X-System': 'SWP' }, body: transformedData }) if (!response.ok) { const errorText = await response.text() throw new Error(`SWP main: HTTP ${response.status} - ${errorText}`) } let result const contentType = response.headers.get('content-type') if (contentType?.includes('application/json')) { result = await response.json() } else { result = await response.text() } endpointResults['swp_main'] = result console.log(`✅ SWP main sync successful`) } catch (error) { const errorMessage = `SWP main: ${error instanceof Error ? error.message : 'Unknown error'}` errors.push(errorMessage) overallSuccess = false console.error(`❌ SWP main sync failed:`, error) } } // 2. SWP 알림 엔드포인트 (선택사항) const notificationUrl = process.env.SYNC_SWP_NOTIFICATION_URL if (notificationUrl) { try { console.log(`Sending to SWP notification: ${notificationUrl}`) const notificationData = { event: 'swp_sync_notification', itemCount: syncData.length, syncTime: new Date().toISOString(), system: 'SWP' } const response = await fetch(notificationUrl, { method: 'POST', headers: { 'Content-Type': 'application/json', }, body: JSON.stringify(notificationData) }) if (!response.ok) { const errorText = await response.text() throw new Error(`SWP notification: HTTP ${response.status} - ${errorText}`) } const result = await response.json() endpointResults['swp_notification'] = result console.log(`✅ SWP notification sync successful`) } catch (error) { const errorMessage = `SWP notification: ${error instanceof Error ? error.message : 'Unknown error'}` errors.push(errorMessage) // 알림은 실패해도 전체 동기화는 성공으로 처리 console.error(`❌ SWP notification sync failed:`, error) } } if (!mainUrl) { throw new Error('No SWP main endpoint configured') } console.log(`SWP sync completed with ${errors.length} errors`) return { success: overallSuccess && errors.length === 0, successCount: overallSuccess ? changes.length : 0, failureCount: overallSuccess ? 0 : changes.length, errors: errors.length > 0 ? errors : undefined, endpointResults } } /** * SWP 시스템용 데이터 변환 */ private async transformChangesForSWP(changes: ChangeLog[]): Promise { const syncData: SyncableEntity[] = [] for (const change of changes) { try { let entityData = null // 엔티티 타입별로 현재 데이터 조회 switch (change.entityType) { case 'document': if (change.action !== 'DELETE') { const [document] = await db .select() .from(documents) .where(eq(documents.id, change.entityId)) .limit(1) entityData = document } break case 'revision': if (change.action !== 'DELETE') { const [revision] = await db .select() .from(revisions) .where(eq(revisions.id, change.entityId)) .limit(1) entityData = revision } break case 'attachment': if (change.action !== 'DELETE') { const [attachment] = await db .select() .from(documentAttachments) .where(eq(documentAttachments.id, change.entityId)) .limit(1) entityData = attachment } break } // SWP 특화 데이터 구조 syncData.push({ entityType: change.entityType as any, entityId: change.entityId, action: change.action as any, data: entityData || change.oldValues, metadata: { changeId: change.id, changedAt: change.createdAt, changedBy: change.userName, changedFields: change.changedFields, // SWP 전용 메타데이터 swpFormat: 'legacy', batchSequence: syncData.length + 1, needsValidation: change.entityType === 'document', legacyId: `SWP_${change.entityId}_${Date.now()}` } }) } catch (error) { console.error(`Failed to transform change ${change.id} for SWP:`, error) } } return syncData } /** * 간단한 XML 변환 헬퍼 (SWP용) */ private convertToXML(data: any): string { const xmlHeader = '' const xmlBody = ` ${data.contractId} ${data.systemType} ${data.batchSize} ${data.timestamp} ${data.source} ${data.changes.map((change: SyncableEntity) => ` ${change.entityType} ${change.entityId} ${change.action} ${JSON.stringify(change.data)} `).join('')} ` return xmlHeader + xmlBody } /** * 성공한 변경사항들을 동기화 완료로 표시 */ private async markChangesAsSynced(changeIds: number[]) { if (changeIds.length === 0) return await db .update(changeLogs) .set({ isSynced: true, syncedAt: new Date() }) .where(inArray(changeLogs.id, changeIds)) // 리비전 상태 업데이트 const revisionChanges = await db .select({ entityId: changeLogs.entityId }) .from(changeLogs) .where(and( inArray(changeLogs.id, changeIds), eq(changeLogs.entityType, 'revision') )) if (revisionChanges.length > 0) { const revisionIds = revisionChanges.map(c => c.entityId) await db.update(revisions) .set({ revisionStatus: "SUBMITTED", submittedDate: new Date().toISOString().slice(0, 10) }) .where(inArray(revisions.id, revisionIds)) } } /** * 실패한 변경사항들의 재시도 횟수 증가 */ private async incrementSyncAttempts(changeIds: number[], errorMessage?: string) { if (changeIds.length === 0) return await db .update(changeLogs) .set({ syncAttempts: sql`${changeLogs.syncAttempts} + 1`, lastSyncError: errorMessage || 'Unknown error' }) .where(inArray(changeLogs.id, changeIds)) } /** * 동기화 상태 조회 */ async getSyncStatus(contractId: number, targetSystem: string = 'DOLCE') { try { // 대기 중인 변경사항 수 조회 const pendingCount = await db.$count( changeLogs, and( eq(changeLogs.contractId, contractId), eq(changeLogs.isSynced, false), lt(changeLogs.syncAttempts, 3), sql`(${changeLogs.targetSystems} IS NULL OR ${changeLogs.targetSystems} @> ${JSON.stringify([targetSystem])})` ) ) // 동기화된 변경사항 수 조회 const syncedCount = await db.$count( changeLogs, and( eq(changeLogs.contractId, contractId), eq(changeLogs.isSynced, true), sql`(${changeLogs.targetSystems} IS NULL OR ${changeLogs.targetSystems} @> ${JSON.stringify([targetSystem])})` ) ) // 실패한 변경사항 수 조회 const failedCount = await db.$count( changeLogs, and( eq(changeLogs.contractId, contractId), eq(changeLogs.isSynced, false), sql`${changeLogs.syncAttempts} >= 3`, sql`(${changeLogs.targetSystems} IS NULL OR ${changeLogs.targetSystems} @> ${JSON.stringify([targetSystem])})` ) ) // 마지막 성공한 배치 조회 const [lastSuccessfulBatch] = await db .select() .from(syncBatches) .where(and( eq(syncBatches.contractId, contractId), eq(syncBatches.targetSystem, targetSystem), eq(syncBatches.status, 'SUCCESS') )) .orderBy(desc(syncBatches.completedAt)) .limit(1) return { contractId, targetSystem, totalChanges: pendingCount + syncedCount + failedCount, pendingChanges: pendingCount, syncedChanges: syncedCount, failedChanges: failedCount, lastSyncAt: lastSuccessfulBatch?.completedAt?.toISOString() || null, syncEnabled: this.isSyncEnabled(targetSystem) } } catch (error) { console.error('Failed to get sync status:', error) throw error } } /** * 최근 동기화 배치 목록 조회 */ async getRecentSyncBatches(contractId: number, targetSystem: string = 'DOLCE', limit: number = 10) { try { const batches = await db .select() .from(syncBatches) .where(and( eq(syncBatches.contractId, contractId), eq(syncBatches.targetSystem, targetSystem) )) .orderBy(desc(syncBatches.createdAt)) .limit(limit) // Date 객체를 문자열로 변환 return batches.map(batch => ({ id: Number(batch.id), contractId: batch.contractId, targetSystem: batch.targetSystem, batchSize: batch.batchSize, status: batch.status, startedAt: batch.startedAt?.toISOString() || null, completedAt: batch.completedAt?.toISOString() || null, errorMessage: batch.errorMessage, retryCount: batch.retryCount, successCount: batch.successCount, failureCount: batch.failureCount, createdAt: batch.createdAt.toISOString(), updatedAt: batch.updatedAt.toISOString() })) } catch (error) { console.error('Failed to get sync batches:', error) throw error } } } export const syncService = new SyncService() // 편의 함수들 (기본 타겟 시스템을 DOLCE로 변경) export async function logDocumentChange( contractId: number, documentId: number, action: 'CREATE' | 'UPDATE' | 'DELETE', newValues?: any, oldValues?: any, userId?: number, userName?: string, targetSystems: string[] = ["DOLCE", "SWP"] ) { return syncService.logChange(contractId, 'document', documentId, action, newValues, oldValues, userId, userName, targetSystems) } export async function logRevisionChange( contractId: number, revisionId: number, action: 'CREATE' | 'UPDATE' | 'DELETE', newValues?: any, oldValues?: any, userId?: number, userName?: string, targetSystems: string[] = ["DOLCE", "SWP"] ) { return syncService.logChange(contractId, 'revision', revisionId, action, newValues, oldValues, userId, userName, targetSystems) } export async function logAttachmentChange( contractId: number, attachmentId: number, action: 'CREATE' | 'UPDATE' | 'DELETE', newValues?: any, oldValues?: any, userId?: number, userName?: string, targetSystems: string[] = ["DOLCE", "SWP"] ) { return syncService.logChange(contractId, 'attachment', attachmentId, action, newValues, oldValues, userId, userName, targetSystems) }