"use server"; import * as cron from 'node-cron'; import { oracleKnex } from '@/lib/oracle-db/db'; import db from '@/db/db'; import { DatabaseSchema, TableName, ALL_TABLE_NAMES } from '@/lib/oracle-db/nonsap/oracle-schema'; import * as nonsapSchema from '@/db/schema/NONSAP/nonsap'; // oracle-schema.ts에서 테이블 목록 자동 추출 const SYNC_TABLES: TableName[] = ALL_TABLE_NAMES; // 페이지 단위 const PAGE_SIZE = 2000; const BATCH_SIZE = 100; interface SyncProgress { tableName: string; lastSyncDate: string; currentPage: number; totalProcessed: number; status: 'running' | 'completed' | 'error'; lastError?: string; } // 동기화 진행 상태 저장 const syncProgress = new Map(); // 진행률 바 유틸리티 const createProgressBar = (current: number, total: number, width: number = 20): string => { const filled = Math.floor((current / total) * width); const empty = width - filled; const percentage = Math.round((current / total) * 100); const bar = '█'.repeat(filled) + '░'.repeat(empty); return `[ ${current.toString().padStart(3)} / ${total.toString().padEnd(3)} | ${bar} | ${percentage.toString().padStart(3)}% ]`; }; // 같은 줄에서 업데이트되는 진행률 바 출력 const updateProgressBar = (message: string, current: number, total: number): void => { const progressBar = createProgressBar(current, total); const fullMessage = `[NONSAP-SYNC] ${message} ${progressBar}`; // 이전 줄을 지우고 새로운 진행률 출력 process.stdout.write('\r' + ' '.repeat(100) + '\r'); // 줄 지우기 process.stdout.write(fullMessage); // 완료되면 줄바꿈 if (current >= total) { process.stdout.write('\n'); } }; // 간단한 로거 const logger = { info: (message: string, ...args: unknown[]) => console.log(`[NONSAP-SYNC INFO] ${message}`, ...args), error: (message: string, ...args: unknown[]) => console.error(`[NONSAP-SYNC ERROR] ${message}`, ...args), warn: (message: string, ...args: unknown[]) => console.warn(`[NONSAP-SYNC WARN] ${message}`, ...args), success: (message: string, ...args: unknown[]) => console.log(`[NONSAP-SYNC SUCCESS] ${message}`, ...args), header: (message: string) => { console.log('\n' + '='.repeat(80)); console.log(`[NONSAP-SYNC] ${message}`); console.log('='.repeat(80) + '\n'); }, progress: updateProgressBar }; /** * Oracle DB에서 특정 테이블의 데이터를 페이지 단위로 조회 */ async function fetchOracleData( tableName: T, page: number, pageSize: number = PAGE_SIZE ): Promise { const offset = (page - 1) * pageSize; try { // Oracle에서는 ROWID나 다른 고유 컬럼으로 정렬해야 함 const query = oracleKnex(tableName) .select('*') .orderBy('ROWID') // Oracle ROWID로 정렬 (또는 적절한 기본키) .offset(offset) .limit(pageSize); const results = await query; logger.info(`Fetched ${results.length} records from ${tableName} (page ${page})`); return results as DatabaseSchema[T][]; } catch (error) { logger.error(`Error fetching data from ${tableName}:`, error); throw error; } } /** * PostgreSQL에 데이터 upsert */ async function upsertToPostgres( tableName: T, data: DatabaseSchema[T][], pageInfo?: { current: number; total: number } ): Promise { if (data.length === 0) return; try { // 테이블명을 camelCase로 변환하여 스키마에서 찾기 const tableCamelCase = tableName.toLowerCase() .split('_') .map((word, index) => index === 0 ? word : word.charAt(0).toUpperCase() + word.slice(1)) .join(''); const tableSchema = (nonsapSchema as any)[tableCamelCase]; if (!tableSchema) { throw new Error(`Table schema not found for ${tableName} (${tableCamelCase})`); } const totalBatches = Math.ceil(data.length / BATCH_SIZE); // 배치 단위로 upsert 처리 for (let i = 0; i < data.length; i += BATCH_SIZE) { const batch = data.slice(i, i + BATCH_SIZE); const currentBatch = Math.floor(i / BATCH_SIZE) + 1; // 배치 진행률 표시 const batchMessage = `${tableName} - Batch Processing`; logger.progress(batchMessage, currentBatch, totalBatches); try { // PostgreSQL에 삽입 시도 await db.insert(tableSchema as any).values(batch); } catch (insertError: unknown) { // 중복키 에러인지 확인 const isDuplicateKeyError = insertError && typeof insertError === 'object' && 'code' in insertError && insertError.code === '23505'; // Production 환경에서는 중복키 에러 로그 생략 if (!isDuplicateKeyError) { logger.warn(`Batch insert failed for ${tableName}, trying individual upserts`, insertError); } // 삽입 실패 시 개별 레코드 upsert 시도 for (const record of batch) { try { await db.insert(tableSchema as any) .values(record) .onConflictDoNothing(); // 중복 시 무시하거나 업데이트 } catch (upsertError) { // 중복키 에러인지 확인 const isDuplicateKeyError = upsertError && typeof upsertError === 'object' && 'code' in upsertError && upsertError.code === '23505'; // Production 환경에서는 중복키 에러 로그 생략 if (!isDuplicateKeyError) { logger.error(`Failed to upsert record in ${tableName}:`, upsertError); } // 개별 레코드 실패는 로그만 남기고 계속 진행 } } } // 잠시 대기 (시스템 부하 방지) if (currentBatch % 10 === 0) { await new Promise(resolve => setTimeout(resolve, 100)); } } logger.success(`Successfully processed ${data.length} records for ${tableName}`); } catch (error) { logger.error(`Error upserting data to ${tableName}:`, error); throw error; } } /** * 특정 테이블 동기화 */ async function syncTable( tableName: T, tableIndex: number, totalTables: number ): Promise { logger.info(`Starting sync for table: ${tableName} (${tableIndex}/${totalTables})`); // 동기화 진행 상태 초기화 syncProgress.set(tableName, { tableName, lastSyncDate: new Date().toISOString(), currentPage: 1, totalProcessed: 0, status: 'running' }); try { let page = 1; let totalProcessed = 0; let hasMore = true; let estimatedTotalPages = 1; // 추정 총 페이지 수 while (hasMore) { // Oracle에서 데이터 조회 const oracleData = await fetchOracleData(tableName, page); if (oracleData.length === 0) { hasMore = false; break; } // 첫 페이지에서 대략적인 총 페이지 수 추정 if (page === 1 && oracleData.length === PAGE_SIZE) { // 첫 페이지가 가득 찼다면 더 많은 페이지가 있을 것으로 추정 estimatedTotalPages = Math.max(10, page * 2); // 최소 10페이지로 추정 } // 페이지 진행률 표시 const pageMessage = `${tableName} - Page Processing`; const displayTotalPages = hasMore ? Math.max(estimatedTotalPages, page + 1) : page; logger.progress(pageMessage, page, displayTotalPages); // PostgreSQL에 upsert (페이지 정보 전달) await upsertToPostgres(tableName, oracleData, { current: page, total: displayTotalPages }); totalProcessed += oracleData.length; // 진행 상태 업데이트 const progress = syncProgress.get(tableName)!; progress.currentPage = page; progress.totalProcessed = totalProcessed; // 다음 페이지로 page++; // 페이지 크기보다 적으면 마지막 페이지 if (oracleData.length < PAGE_SIZE) { hasMore = false; // 마지막 페이지 진행률 업데이트 logger.progress(pageMessage, page - 1, page - 1); } else { // 추정 페이지 수 업데이트 estimatedTotalPages = Math.max(estimatedTotalPages, page + 5); } } // 동기화 완료 const progress = syncProgress.get(tableName)!; progress.status = 'completed'; logger.success(`Table ${tableName} sync completed. Total processed: ${totalProcessed}`); } catch (error) { const progress = syncProgress.get(tableName)!; progress.status = 'error'; progress.lastError = error instanceof Error ? error.message : String(error); logger.error(`Table ${tableName} sync failed:`, error); throw error; } } /** * 모든 테이블 동기화 */ async function syncAllTables(): Promise { logger.header('Starting NONSAP Data Synchronization'); const startTime = Date.now(); let successCount = 0; let errorCount = 0; const totalTables = SYNC_TABLES.length; // 전체 테이블 진행률 표시 logger.info(`Total tables to sync: ${totalTables}`); // oracle-schema.ts에서 테이블 목록 가져오기 for (let i = 0; i < SYNC_TABLES.length; i++) { const tableName = SYNC_TABLES[i]; const tableIndex = i + 1; // 전체 테이블 진행률 표시 const overallMessage = `Overall Progress`; logger.progress(overallMessage, tableIndex - 1, totalTables); try { await syncTable(tableName, tableIndex, totalTables); successCount++; } catch (error) { errorCount++; logger.error(`Failed to sync table ${tableName}:`, error); // 에러가 발생해도 다른 테이블은 계속 동기화 continue; } } // 최종 진행률 표시 const overallMessage = `Overall Progress`; logger.progress(overallMessage, totalTables, totalTables); const duration = Date.now() - startTime; logger.info(`Sync completed in ${(duration / 1000).toFixed(2)}s`); logger.info(`Success: ${successCount}, Errors: ${errorCount}`); logger.info(`Total tables processed: ${totalTables}`); if (errorCount > 0) { logger.warn(`${errorCount} tables had errors during sync`); } if (successCount === totalTables) { logger.success('🎉 All tables synchronized successfully!'); } } /** * 동기화 상태 조회 API */ export async function getSyncProgress(): Promise { return Array.from(syncProgress.values()); } /** * 수동 동기화 트리거 */ export async function triggerManualSync(): Promise { logger.info('Manual sync triggered'); await syncAllTables(); } /** * Oracle DB 연결 테스트 */ async function testOracleConnection(): Promise { try { const result = await oracleKnex.raw('SELECT 1 FROM DUAL'); return !!result; } catch (error) { logger.error('Oracle DB connection test failed:', error); return false; } } /** * 동기화 스케줄러 시작 */ export async function startNonsapSyncScheduler(): Promise { logger.info('Initializing NONSAP data synchronization scheduler...'); // Oracle DB 연결 테스트 (비동기) testOracleConnection().then(isConnected => { if (!isConnected) { logger.warn('Oracle DB connection failed - sync scheduler will be disabled'); logger.warn('Application will continue to run normally'); return; } logger.success('Oracle DB connection test passed'); }).catch(error => { logger.error('Oracle DB connection test error:', error); logger.warn('Sync scheduler will be disabled, application continues'); }); try { // 매 시간마다 실행 (0분에) cron.schedule('0 * * * *', async () => { try { logger.info('Cron job triggered: Starting scheduled sync'); // 동기화 전 Oracle 연결 확인 const isConnected = await testOracleConnection(); if (!isConnected) { logger.warn('Oracle DB not available, skipping sync'); return; } await syncAllTables(); } catch (error) { logger.error('Scheduled sync failed:', error); // 동기화 실패해도 다음 스케줄은 계속 실행 } }, { timezone: 'Asia/Seoul' }); logger.success(`NONSAP data synchronization cron job registered (every hour) for ${SYNC_TABLES.length} tables`); // 개발 환경에서는 스케줄러만 등록하고 실제 실행은 안 함 if (process.env.NODE_ENV === 'development') { logger.info('Development mode: Cron registered but initial sync skipped'); return; } // 애플리케이션 시작 시 한 번 실행 (선택사항) if (process.env.SYNC_ON_START === 'true') { logger.info('Initial sync on startup enabled'); setTimeout(async () => { try { const isConnected = await testOracleConnection(); if (isConnected) { await syncAllTables(); } else { logger.warn('Initial sync skipped - Oracle DB not available'); } } catch (error) { logger.error('Initial sync failed:', error); } }, 10000); // 10초 후 실행 (DB 연결 안정화 대기) } } catch (error) { logger.error('Failed to set up cron scheduler:', error); logger.warn('Application will continue without sync scheduler'); } }