// lib/sync-service.ts (시스템별 분리 버전 - DOLCE 업로드 통합) import db from "@/db/db" import { changeLogs, syncBatches, type ChangeLog, type SyncBatch } from "@/db/schema/vendorDocu" import { documents, revisions, documentAttachments } from "@/db/schema/vendorDocu" import { eq, and, lt, desc, sql, inArray } from "drizzle-orm" export interface SyncableEntity { entityType: 'document' | 'revision' | 'attachment' entityId: number action: 'CREATE' | 'UPDATE' | 'DELETE' data: any metadata?: Record } export interface SyncResult { batchId: number success: boolean successCount: number failureCount: number errors?: string[] endpointResults?: Record } class SyncService { private readonly CHUNK_SIZE = 50 /** * 동기화 활성화 여부 확인 */ private isSyncEnabled(targetSystem: string): boolean { const upperSystem = targetSystem.toUpperCase() const enabled = process.env[`SYNC_${upperSystem}_ENABLED`] return enabled === 'true' || enabled === '1' } /** * 변경사항을 change_logs에 기록 */ async logChange( projectId: number, entityType: 'document' | 'revision' | 'attachment', entityId: number, action: 'CREATE' | 'UPDATE' | 'DELETE', newValues?: any, oldValues?: any, userId?: number, userName?: string, targetSystems: string[] = ["DOLCE", "SWP"] ) { try { const changedFields = this.detectChangedFields(oldValues, newValues) await db.insert(changeLogs).values({ projectId, entityType, entityId, action, changedFields, oldValues, newValues, userId, userName, targetSystems, }) console.log(`Change logged: ${entityType}/${entityId} - ${action}`) } catch (error) { console.error('Failed to log change:', error) throw error } } /** * 변경된 필드 감지 */ private detectChangedFields(oldValues: any, newValues: any): Record | null { if (!oldValues || !newValues) return null const changes: Record = {} for (const [key, newValue] of Object.entries(newValues)) { if (JSON.stringify(oldValues[key]) !== JSON.stringify(newValue)) { changes[key] = { from: oldValues[key], to: newValue } } } return Object.keys(changes).length > 0 ? changes : null } /** * 동기화할 변경사항 조회 (증분) */ async getPendingChanges( projectId: number, targetSystem: string = 'DOLCE', limit?: number ): Promise { const query = db .select() .from(changeLogs) .where(and( eq(changeLogs.projectId, projectId), eq(changeLogs.isSynced, false), lt(changeLogs.syncAttempts, 3), sql`(${changeLogs.targetSystems} IS NULL OR ${changeLogs.targetSystems} @> ${JSON.stringify([targetSystem])})` )) .orderBy(changeLogs.createdAt) if (limit) { query.limit(limit) } return await query } /** * 배열을 청크 단위로 분할 */ private chunkArray(array: T[], chunkSize: number): T[][] { const chunks: T[][] = [] for (let i = 0; i < array.length; i += chunkSize) { chunks.push(array.slice(i, i + chunkSize)) } return chunks } /** * 동기화 배치 생성 */ async createSyncBatch( projectId: number, targetSystem: string, changeLogIds: number[] ): Promise { const [batch] = await db .insert(syncBatches) .values({ projectId, targetSystem, batchSize: changeLogIds.length, changeLogIds, status: 'PENDING' }) .returning({ id: syncBatches.id }) return batch.id } /** * 메인 동기화 실행 함수 (청크 처리 포함) */ async syncToExternalSystem( projectId: number, targetSystem: string = 'DOLCE', manualTrigger: boolean = false ): Promise { try { // 1. 동기화 활성화 확인 if (!this.isSyncEnabled(targetSystem)) { throw new Error(`Sync not enabled for ${targetSystem}`) } // 2. 대기 중인 변경사항 조회 (전체) const pendingChanges = await this.getPendingChanges(projectId, targetSystem) if (pendingChanges.length === 0) { return { batchId: 0, success: true, successCount: 0, failureCount: 0 } } // 3. 배치 생성 const batchId = await this.createSyncBatch( projectId, targetSystem, pendingChanges.map(c => c.id) ) // 4. 배치 상태를 PROCESSING으로 업데이트 await db .update(syncBatches) .set({ status: 'PROCESSING', startedAt: new Date(), updatedAt: new Date() }) .where(eq(syncBatches.id, batchId)) // 5. 청크 단위로 동기화 수행 - 시스템별 분기 const chunks = this.chunkArray(pendingChanges, this.CHUNK_SIZE) let totalSuccessCount = 0 let totalFailureCount = 0 const allErrors: string[] = [] const endpointResults: Record = {} for (let i = 0; i < chunks.length; i++) { const chunk = chunks[i] console.log(`Processing chunk ${i + 1}/${chunks.length} (${chunk.length} items) for ${targetSystem}`) try { let chunkResult; // 시스템별로 다른 동기화 메서드 호출 switch (targetSystem.toUpperCase()) { case 'DOLCE': chunkResult = await this.performSyncDOLCE(chunk, projectId) break case 'SWP': chunkResult = await this.performSyncSWP(chunk, projectId) break default: throw new Error(`Unsupported target system: ${targetSystem}`) } totalSuccessCount += chunkResult.successCount totalFailureCount += chunkResult.failureCount if (chunkResult.errors) { allErrors.push(...chunkResult.errors) } // 엔드포인트별 결과 병합 Object.assign(endpointResults, chunkResult.endpointResults || {}) // 성공한 변경사항들을 동기화 완료로 표시 if (chunkResult.successCount > 0) { const successfulChangeIds = chunk .slice(0, chunkResult.successCount) .map(c => c.id) await this.markChangesAsSynced(successfulChangeIds) } // 실패한 변경사항들의 재시도 횟수 증가 if (chunkResult.failureCount > 0) { const failedChangeIds = chunk .slice(chunkResult.successCount) .map(c => c.id) await this.incrementSyncAttempts(failedChangeIds, chunkResult.errors?.[0]) } } catch (error) { console.error(`Chunk ${i + 1} failed for ${targetSystem}:`, error) totalFailureCount += chunk.length allErrors.push(`Chunk ${i + 1}: ${error instanceof Error ? error.message : 'Unknown error'}`) // 전체 청크 실패 시 재시도 횟수 증가 await this.incrementSyncAttempts(chunk.map(c => c.id), error instanceof Error ? error.message : 'Unknown error') } } const overallSuccess = totalFailureCount === 0 // 6. 배치 상태 업데이트 await db .update(syncBatches) .set({ status: overallSuccess ? 'SUCCESS' : (totalSuccessCount > 0 ? 'PARTIAL' : 'FAILED'), completedAt: new Date(), successCount: totalSuccessCount, failureCount: totalFailureCount, errorMessage: allErrors.length > 0 ? allErrors.join('; ') : null, updatedAt: new Date() }) .where(eq(syncBatches.id, batchId)) return { batchId, success: overallSuccess, successCount: totalSuccessCount, failureCount: totalFailureCount, errors: allErrors.length > 0 ? allErrors : undefined, endpointResults } } catch (error) { console.error('Sync failed:', error) throw error } } /** * DOLCE 시스템 전용 동기화 수행 - 실제 업로드 서비스 사용 */ private async performSyncDOLCE( changes: ChangeLog[], projectId: number ): Promise<{ success: boolean; successCount: number; failureCount: number; errors?: string[]; endpointResults?: Record }> { const errors: string[] = [] const endpointResults: Record = {} try { // DOLCE 업로드 서비스 동적 임포트 const { dolceUploadService } = await import('./dolce-upload-service') if (!dolceUploadService.isUploadEnabled()) { throw new Error('DOLCE upload is not enabled') } // 변경사항에서 리비전 ID들 추출 const revisionIds = changes .filter(change => change.entityType === 'revision') .map(change => change.entityId) if (revisionIds.length === 0) { return { success: true, successCount: 0, failureCount: 0, endpointResults: { message: 'No revisions to upload' } } } // DOLCE 업로드 실행 const uploadResult = await dolceUploadService.uploadToDoLCE( projectId, revisionIds, 'system_user', // 시스템 사용자 ID 'System Upload' ) endpointResults['dolce_upload'] = uploadResult if (uploadResult.success) { console.log(`✅ DOLCE upload successful: ${uploadResult.uploadedDocuments} documents, ${uploadResult.uploadedFiles} files`) return { success: true, successCount: changes.length, failureCount: 0, endpointResults } } else { console.error(`❌ DOLCE upload failed:`, uploadResult.errors) return { success: false, successCount: 0, failureCount: changes.length, errors: uploadResult.errors, endpointResults } } } catch (error) { const errorMessage = `DOLCE upload failed: ${error instanceof Error ? error.message : 'Unknown error'}` errors.push(errorMessage) console.error(`❌ DOLCE upload error:`, error) return { success: false, successCount: 0, failureCount: changes.length, errors, endpointResults } } } /** * SWP 시스템 전용 동기화 수행 */ private async performSyncSWP( changes: ChangeLog[], projectId: number ): Promise<{ success: boolean; successCount: number; failureCount: number; errors?: string[]; endpointResults?: Record }> { // SWP 동기화 로직 구현 // 현재는 플레이스홀더 return { success: true, successCount: changes.length, failureCount: 0, endpointResults: { message: 'SWP sync placeholder' } } } /** * 성공한 변경사항들을 동기화 완료로 표시 */ private async markChangesAsSynced(changeIds: number[]) { if (changeIds.length === 0) return await db .update(changeLogs) .set({ isSynced: true, syncedAt: new Date() }) .where(inArray(changeLogs.id, changeIds)) // 리비전 상태 업데이트 const revisionChanges = await db .select({ entityId: changeLogs.entityId }) .from(changeLogs) .where(and( inArray(changeLogs.id, changeIds), eq(changeLogs.entityType, 'revision') )) if (revisionChanges.length > 0) { const revisionIds = revisionChanges.map(c => c.entityId) await db.update(revisions) .set({ revisionStatus: "SUBMITTED", submittedDate: new Date().toISOString().slice(0, 10) }) .where(inArray(revisions.id, revisionIds)) } } /** * 실패한 변경사항들의 재시도 횟수 증가 */ private async incrementSyncAttempts(changeIds: number[], errorMessage?: string) { if (changeIds.length === 0) return await db .update(changeLogs) .set({ syncAttempts: sql`${changeLogs.syncAttempts} + 1`, lastSyncError: errorMessage || 'Unknown error' }) .where(inArray(changeLogs.id, changeIds)) } /** * 동기화 상태 조회 */ async getSyncStatus(projectId: number, targetSystem: string = 'DOLCE') { try { // 대기 중인 변경사항 수 조회 const pendingCount = await db.$count( changeLogs, and( eq(changeLogs.projectId, projectId), eq(changeLogs.isSynced, false), lt(changeLogs.syncAttempts, 3), sql`(${changeLogs.targetSystems} IS NULL OR ${changeLogs.targetSystems} @> ${JSON.stringify([targetSystem])})` ) ) // 동기화된 변경사항 수 조회 const syncedCount = await db.$count( changeLogs, and( eq(changeLogs.projectId, projectId), eq(changeLogs.isSynced, true), sql`(${changeLogs.targetSystems} IS NULL OR ${changeLogs.targetSystems} @> ${JSON.stringify([targetSystem])})` ) ) // 실패한 변경사항 수 조회 const failedCount = await db.$count( changeLogs, and( eq(changeLogs.projectId, projectId), eq(changeLogs.isSynced, false), sql`${changeLogs.syncAttempts} >= 3`, sql`(${changeLogs.targetSystems} IS NULL OR ${changeLogs.targetSystems} @> ${JSON.stringify([targetSystem])})` ) ) // 마지막 성공한 배치 조회 const [lastSuccessfulBatch] = await db .select() .from(syncBatches) .where(and( eq(syncBatches.projectId, projectId), eq(syncBatches.targetSystem, targetSystem), eq(syncBatches.status, 'SUCCESS') )) .orderBy(desc(syncBatches.completedAt)) .limit(1) return { projectId, targetSystem, totalChanges: pendingCount + syncedCount + failedCount, pendingChanges: pendingCount, syncedChanges: syncedCount, failedChanges: failedCount, lastSyncAt: lastSuccessfulBatch?.completedAt?.toISOString() || null, syncEnabled: this.isSyncEnabled(targetSystem) } } catch (error) { console.error('Failed to get sync status:', error) throw error } } /** * 최근 동기화 배치 목록 조회 */ async getRecentSyncBatches(projectId: number, targetSystem: string = 'DOLCE', limit: number = 10) { try { const batches = await db .select() .from(syncBatches) .where(and( eq(syncBatches.projectId, projectId), eq(syncBatches.targetSystem, targetSystem) )) .orderBy(desc(syncBatches.createdAt)) .limit(limit) // Date 객체를 문자열로 변환 return batches.map(batch => ({ id: Number(batch.id), projectId: batch.projectId, targetSystem: batch.targetSystem, batchSize: batch.batchSize, status: batch.status, startedAt: batch.startedAt?.toISOString() || null, completedAt: batch.completedAt?.toISOString() || null, errorMessage: batch.errorMessage, retryCount: batch.retryCount, successCount: batch.successCount, failureCount: batch.failureCount, createdAt: batch.createdAt.toISOString(), updatedAt: batch.updatedAt.toISOString() })) } catch (error) { console.error('Failed to get sync batches:', error) throw error } } } export const syncService = new SyncService() // 편의 함수들 (기본 타겟 시스템을 DOLCE로 변경) export async function logDocumentChange( projectId: number, documentId: number, action: 'CREATE' | 'UPDATE' | 'DELETE', newValues?: any, oldValues?: any, userId?: number, userName?: string, targetSystems: string[] = ["DOLCE", "SWP"] ) { return syncService.logChange(projectId, 'document', documentId, action, newValues, oldValues, userId, userName, targetSystems) } export async function logRevisionChange( projectId: number, revisionId: number, action: 'CREATE' | 'UPDATE' | 'DELETE', newValues?: any, oldValues?: any, userId?: number, userName?: string, targetSystems: string[] = ["DOLCE", "SWP"] ) { return syncService.logChange(projectId, 'revision', revisionId, action, newValues, oldValues, userId, userName, targetSystems) } export async function logAttachmentChange( projectId: number, attachmentId: number, action: 'CREATE' | 'UPDATE' | 'DELETE', newValues?: any, oldValues?: any, userId?: number, userName?: string, targetSystems: string[] = ["DOLCE", "SWP"] ) { return syncService.logChange(projectId, 'attachment', attachmentId, action, newValues, oldValues, userId, userName, targetSystems) }