// lib/sync-service.ts import db from "@/db/db" import { syncConfigs, changeLogs, syncBatches, syncStatusView, type SyncConfig, type ChangeLog, type SyncBatch } from "@/db/schema/vendorDocu" import { documents, revisions, documentAttachments } from "@/db/schema/vendorDocu" import { eq, and, lt, desc, sql, inArray } from "drizzle-orm" import { toast } from "sonner" export interface SyncableEntity { entityType: 'document' | 'revision' | 'attachment' entityId: number action: 'CREATE' | 'UPDATE' | 'DELETE' data: any metadata?: Record } export interface SyncResult { batchId: number success: boolean successCount: number failureCount: number errors?: string[] } class SyncService { /** * 변경사항을 change_logs에 기록 */ async logChange( contractId: number, entityType: 'document' | 'revision' | 'attachment', entityId: number, action: 'CREATE' | 'UPDATE' | 'DELETE', newValues?: any, oldValues?: any, userId?: number, userName?: string ) { try { const changedFields = this.detectChangedFields(oldValues, newValues) await db.insert(changeLogs).values({ contractId, entityType, entityId, action, changedFields, oldValues, newValues, userId, userName, targetSystems: ['SHI'], // 기본적으로 SHI로 동기화 }) console.log(`Change logged: ${entityType}/${entityId} - ${action}`) } catch (error) { console.error('Failed to log change:', error) throw error } } /** * 변경된 필드 감지 */ private detectChangedFields(oldValues: any, newValues: any): Record | null { if (!oldValues || !newValues) return null const changes: Record = {} for (const [key, newValue] of Object.entries(newValues)) { if (JSON.stringify(oldValues[key]) !== JSON.stringify(newValue)) { changes[key] = { from: oldValues[key], to: newValue } } } return Object.keys(changes).length > 0 ? changes : null } /** * 계약별 동기화 설정 조회 */ async getSyncConfig(contractId: number, targetSystem: string = 'SHI'): Promise { const [config] = await db .select() .from(syncConfigs) .where(and( eq(syncConfigs.contractId, contractId), eq(syncConfigs.targetSystem, targetSystem) )) .limit(1) return config || null } /** * 동기화 설정 생성/업데이트 */ async upsertSyncConfig(config: Partial & { contractId: number targetSystem: string endpointUrl: string }) { const existing = await this.getSyncConfig(config.contractId, config.targetSystem) if (existing) { await db .update(syncConfigs) .set({ ...config, updatedAt: new Date() }) .where(eq(syncConfigs.id, existing.id)) } else { await db.insert(syncConfigs).values(config) } } /** * 동기화할 변경사항 조회 (증분) */ async getPendingChanges( contractId: number, targetSystem: string = 'SHI', limit: number = 100 ): Promise { return await db .select() .from(changeLogs) .where(and( eq(changeLogs.contractId, contractId), eq(changeLogs.isSynced, false), lt(changeLogs.syncAttempts, 3), // 최대 3회 재시도 sql`(${changeLogs.targetSystems} IS NULL OR ${changeLogs.targetSystems} @> ${JSON.stringify([targetSystem])})` )) .orderBy(changeLogs.createdAt) .limit(limit) } /** * 동기화 배치 생성 */ async createSyncBatch( contractId: number, targetSystem: string, changeLogIds: number[] ): Promise { const [batch] = await db .insert(syncBatches) .values({ contractId, targetSystem, batchSize: changeLogIds.length, changeLogIds, status: 'PENDING' }) .returning({ id: syncBatches.id }) return batch.id } /** * 메인 동기화 실행 함수 */ async syncToExternalSystem( contractId: number, targetSystem: string = 'SHI', manualTrigger: boolean = false ): Promise { try { // 1. 동기화 설정 확인 const config = await this.getSyncConfig(contractId, targetSystem) if (!config || !config.syncEnabled) { throw new Error(`Sync not enabled for contract ${contractId} to ${targetSystem}`) } // 2. 대기 중인 변경사항 조회 const pendingChanges = await this.getPendingChanges( contractId, targetSystem, config.maxBatchSize || 100 ) if (pendingChanges.length === 0) { return { batchId: 0, success: true, successCount: 0, failureCount: 0 } } // 3. 배치 생성 const batchId = await this.createSyncBatch( contractId, targetSystem, pendingChanges.map(c => c.id) ) // 4. 배치 상태를 PROCESSING으로 업데이트 await db .update(syncBatches) .set({ status: 'PROCESSING', startedAt: new Date(), updatedAt: new Date() }) .where(eq(syncBatches.id, batchId)) // 5. 실제 데이터 동기화 수행 const syncResult = await this.performSync(config, pendingChanges) // 6. 배치 상태 업데이트 await db .update(syncBatches) .set({ status: syncResult.success ? 'SUCCESS' : (syncResult.successCount > 0 ? 'PARTIAL' : 'FAILED'), completedAt: new Date(), successCount: syncResult.successCount, failureCount: syncResult.failureCount, errorMessage: syncResult.errors?.join('; '), updatedAt: new Date() }) .where(eq(syncBatches.id, batchId)) // 7. 성공한 변경사항들을 동기화 완료로 표시 if (syncResult.successCount > 0) { const successfulChangeIds = pendingChanges .slice(0, syncResult.successCount) .map(c => c.id) await db .update(changeLogs) .set({ isSynced: true, syncedAt: new Date() }) .where(inArray(changeLogs.id, successfulChangeIds)) } // 8. 실패한 변경사항들의 재시도 횟수 증가 if (syncResult.failureCount > 0) { const failedChangeIds = pendingChanges .slice(syncResult.successCount) .map(c => c.id) await db .update(changeLogs) .set({ syncAttempts: sql`${changeLogs.syncAttempts} + 1`, lastSyncError: syncResult.errors?.[0] || 'Unknown error' }) .where(inArray(changeLogs.id, failedChangeIds)) } // 9. 동기화 설정의 마지막 동기화 시간 업데이트 await db .update(syncConfigs) .set({ lastSyncAttempt: new Date(), ...(syncResult.success && { lastSuccessfulSync: new Date() }), updatedAt: new Date() }) .where(eq(syncConfigs.id, config.id)) return { batchId, success: syncResult.success, successCount: syncResult.successCount, failureCount: syncResult.failureCount, errors: syncResult.errors } } catch (error) { console.error('Sync failed:', error) throw error } } /** * 실제 외부 시스템으로 데이터 전송 */ private async performSync( config: SyncConfig, changes: ChangeLog[] ): Promise<{ success: boolean; successCount: number; failureCount: number; errors?: string[] }> { const errors: string[] = [] let successCount = 0 let failureCount = 0 try { // 변경사항을 외부 시스템 형태로 변환 const syncData = await this.transformChangesForExternalSystem(changes) // 외부 API 호출 const response = await fetch(config.endpointUrl, { method: 'POST', headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${config.authToken}`, 'X-API-Version': config.apiVersion || 'v1' }, body: JSON.stringify({ contractId: changes[0]?.contractId, changes: syncData, batchSize: changes.length, timestamp: new Date().toISOString() }) }) if (!response.ok) { const errorText = await response.text() throw new Error(`HTTP ${response.status}: ${errorText}`) } const result = await response.json() // 응답에 따라 성공/실패 카운트 처리 if (result.success) { successCount = changes.length } else if (result.partialSuccess) { successCount = result.successCount || 0 failureCount = changes.length - successCount if (result.errors) { errors.push(...result.errors) } } else { failureCount = changes.length if (result.error) { errors.push(result.error) } } } catch (error) { console.error('External sync failed:', error) failureCount = changes.length errors.push(error instanceof Error ? error.message : 'Unknown error') } return { success: failureCount === 0, successCount, failureCount, errors: errors.length > 0 ? errors : undefined } } /** * 변경사항을 외부 시스템 형태로 변환 */ private async transformChangesForExternalSystem(changes: ChangeLog[]): Promise { const syncData: SyncableEntity[] = [] for (const change of changes) { try { let entityData = null // 엔티티 타입별로 현재 데이터 조회 switch (change.entityType) { case 'document': if (change.action !== 'DELETE') { const [document] = await db .select() .from(documents) .where(eq(documents.id, change.entityId)) .limit(1) entityData = document } break case 'revision': if (change.action !== 'DELETE') { const [revision] = await db .select() .from(revisions) .where(eq(revisions.id, change.entityId)) .limit(1) entityData = revision } break case 'attachment': if (change.action !== 'DELETE') { const [attachment] = await db .select() .from(documentAttachments) .where(eq(documentAttachments.id, change.entityId)) .limit(1) entityData = attachment } break } syncData.push({ entityType: change.entityType as any, entityId: change.entityId, action: change.action as any, data: entityData || change.oldValues, // DELETE의 경우 oldValues 사용 metadata: { changeId: change.id, changedAt: change.createdAt, changedBy: change.userName, changedFields: change.changedFields } }) } catch (error) { console.error(`Failed to transform change ${change.id}:`, error) } } return syncData } /** * 동기화 상태 조회 */ async getSyncStatus(contractId: number, targetSystem: string = 'SHI') { const [status] = await db .select() .from(syncStatusView) .where(and( eq(syncStatusView.contractId, contractId), eq(syncStatusView.targetSystem, targetSystem) )) .limit(1) return status } /** * 최근 동기화 배치 목록 조회 */ async getRecentSyncBatches(contractId: number, targetSystem: string = 'SHI', limit: number = 10) { return await db .select() .from(syncBatches) .where(and( eq(syncBatches.contractId, contractId), eq(syncBatches.targetSystem, targetSystem) )) .orderBy(desc(syncBatches.createdAt)) .limit(limit) } } export const syncService = new SyncService() // 편의 함수들 export async function logDocumentChange( contractId: number, documentId: number, action: 'CREATE' | 'UPDATE' | 'DELETE', newValues?: any, oldValues?: any, userId?: number, userName?: string ) { return syncService.logChange(contractId, 'document', documentId, action, newValues, oldValues, userId, userName) } export async function logRevisionChange( contractId: number, revisionId: number, action: 'CREATE' | 'UPDATE' | 'DELETE', newValues?: any, oldValues?: any, userId?: number, userName?: string ) { return syncService.logChange(contractId, 'revision', revisionId, action, newValues, oldValues, userId, userName) } export async function logAttachmentChange( contractId: number, attachmentId: number, action: 'CREATE' | 'UPDATE' | 'DELETE', newValues?: any, oldValues?: any, userId?: number, userName?: string ) { return syncService.logChange(contractId, 'attachment', attachmentId, action, newValues, oldValues, userId, userName) }