diff options
Diffstat (limited to 'lib/nonsap-sync/sync-service.ts')
| -rw-r--r-- | lib/nonsap-sync/sync-service.ts | 414 |
1 files changed, 0 insertions, 414 deletions
diff --git a/lib/nonsap-sync/sync-service.ts b/lib/nonsap-sync/sync-service.ts deleted file mode 100644 index 286952eb..00000000 --- a/lib/nonsap-sync/sync-service.ts +++ /dev/null @@ -1,414 +0,0 @@ -"use server"; - -import * as cron from 'node-cron'; -import { oracleKnex } from '@/lib/oracle-db/db'; -import db from '@/db/db'; -import { DatabaseSchema, TableName, ALL_TABLE_NAMES } from '@/lib/oracle-db/nonsap/oracle-schema'; -import * as nonsapSchema from '@/db/schema/NONSAP/nonsap'; - -// oracle-schema.ts에서 테이블 목록 자동 추출 -const SYNC_TABLES: TableName[] = ALL_TABLE_NAMES; - -// 페이지 단위 -const PAGE_SIZE = 2000; -const BATCH_SIZE = 100; - -interface SyncProgress { - tableName: string; - lastSyncDate: string; - currentPage: number; - totalProcessed: number; - status: 'running' | 'completed' | 'error'; - lastError?: string; -} - -// 동기화 진행 상태 저장 -const syncProgress = new Map<string, SyncProgress>(); - -// 진행률 바 유틸리티 -const createProgressBar = (current: number, total: number, width: number = 20): string => { - const filled = Math.floor((current / total) * width); - const empty = width - filled; - const percentage = Math.round((current / total) * 100); - - const bar = '█'.repeat(filled) + '░'.repeat(empty); - return `[ ${current.toString().padStart(3)} / ${total.toString().padEnd(3)} | ${bar} | ${percentage.toString().padStart(3)}% ]`; -}; - -// 같은 줄에서 업데이트되는 진행률 바 출력 -const updateProgressBar = (message: string, current: number, total: number): void => { - const progressBar = createProgressBar(current, total); - const fullMessage = `[NONSAP-SYNC] ${message} ${progressBar}`; - - // 이전 줄을 지우고 새로운 진행률 출력 - process.stdout.write('\r' + ' '.repeat(100) + '\r'); // 줄 지우기 - process.stdout.write(fullMessage); - - // 완료되면 줄바꿈 - if (current >= total) { - process.stdout.write('\n'); - } -}; - -// 간단한 로거 -const logger = { - info: (message: string, ...args: unknown[]) => console.log(`[NONSAP-SYNC INFO] ${message}`, ...args), - error: (message: string, ...args: unknown[]) => console.error(`[NONSAP-SYNC ERROR] ${message}`, ...args), - warn: (message: string, ...args: unknown[]) => console.warn(`[NONSAP-SYNC WARN] ${message}`, ...args), - success: (message: string, ...args: unknown[]) => console.log(`[NONSAP-SYNC SUCCESS] ${message}`, ...args), - header: (message: string) => { - console.log('\n' + '='.repeat(80)); - console.log(`[NONSAP-SYNC] ${message}`); - console.log('='.repeat(80) + '\n'); - }, - progress: updateProgressBar -}; - -/** - * Oracle DB에서 특정 테이블의 데이터를 페이지 단위로 조회 - */ -async function fetchOracleData<T extends TableName>( - tableName: T, - page: number, - pageSize: number = PAGE_SIZE -): Promise<DatabaseSchema[T][]> { - const offset = (page - 1) * pageSize; - - try { - // Oracle에서는 ROWID나 다른 고유 컬럼으로 정렬해야 함 - const query = oracleKnex(tableName) - .select('*') - .orderBy('ROWID') // Oracle ROWID로 정렬 (또는 적절한 기본키) - .offset(offset) - .limit(pageSize); - - const results = await query; - logger.info(`Fetched ${results.length} records from ${tableName} (page ${page})`); - - return results as DatabaseSchema[T][]; - } catch (error) { - logger.error(`Error fetching data from ${tableName}:`, error); - throw error; - } -} - -/** - * PostgreSQL에 데이터 upsert - */ -async function upsertToPostgres<T extends TableName>( - tableName: T, - data: DatabaseSchema[T][], - pageInfo?: { current: number; total: number } -): Promise<void> { - if (data.length === 0) return; - - try { - // 테이블명을 camelCase로 변환하여 스키마에서 찾기 - const tableCamelCase = tableName.toLowerCase() - .split('_') - .map((word, index) => index === 0 ? word : word.charAt(0).toUpperCase() + word.slice(1)) - .join(''); - - const tableSchema = (nonsapSchema as any)[tableCamelCase]; - - if (!tableSchema) { - throw new Error(`Table schema not found for ${tableName} (${tableCamelCase})`); - } - - const totalBatches = Math.ceil(data.length / BATCH_SIZE); - - // 배치 단위로 upsert 처리 - for (let i = 0; i < data.length; i += BATCH_SIZE) { - const batch = data.slice(i, i + BATCH_SIZE); - const currentBatch = Math.floor(i / BATCH_SIZE) + 1; - - // 배치 진행률 표시 - const batchMessage = `${tableName} - Batch Processing`; - logger.progress(batchMessage, currentBatch, totalBatches); - - try { - // PostgreSQL에 삽입 시도 - await db.insert(tableSchema as any).values(batch); - } catch (insertError: unknown) { - // 중복키 에러인지 확인 - const isDuplicateKeyError = insertError && - typeof insertError === 'object' && - 'code' in insertError && - insertError.code === '23505'; - - // Production 환경에서는 중복키 에러 로그 생략 - if (!isDuplicateKeyError) { - logger.warn(`Batch insert failed for ${tableName}, trying individual upserts`, insertError); - } - - // 삽입 실패 시 개별 레코드 upsert 시도 - for (const record of batch) { - try { - await db.insert(tableSchema as any) - .values(record) - .onConflictDoNothing(); // 중복 시 무시하거나 업데이트 - } catch (upsertError) { - // 중복키 에러인지 확인 - const isDuplicateKeyError = upsertError && - typeof upsertError === 'object' && - 'code' in upsertError && - upsertError.code === '23505'; - - // Production 환경에서는 중복키 에러 로그 생략 - if (!isDuplicateKeyError) { - logger.error(`Failed to upsert record in ${tableName}:`, upsertError); - } - // 개별 레코드 실패는 로그만 남기고 계속 진행 - } - } - } - - // 잠시 대기 (시스템 부하 방지) - if (currentBatch % 10 === 0) { - await new Promise(resolve => setTimeout(resolve, 100)); - } - } - - logger.success(`Successfully processed ${data.length} records for ${tableName}`); - } catch (error) { - logger.error(`Error upserting data to ${tableName}:`, error); - throw error; - } -} - -/** - * 특정 테이블 동기화 - */ -async function syncTable<T extends TableName>( - tableName: T, - tableIndex: number, - totalTables: number -): Promise<void> { - logger.info(`Starting sync for table: ${tableName} (${tableIndex}/${totalTables})`); - - // 동기화 진행 상태 초기화 - syncProgress.set(tableName, { - tableName, - lastSyncDate: new Date().toISOString(), - currentPage: 1, - totalProcessed: 0, - status: 'running' - }); - - try { - let page = 1; - let totalProcessed = 0; - let hasMore = true; - let estimatedTotalPages = 1; // 추정 총 페이지 수 - - while (hasMore) { - // Oracle에서 데이터 조회 - const oracleData = await fetchOracleData(tableName, page); - - if (oracleData.length === 0) { - hasMore = false; - break; - } - - // 첫 페이지에서 대략적인 총 페이지 수 추정 - if (page === 1 && oracleData.length === PAGE_SIZE) { - // 첫 페이지가 가득 찼다면 더 많은 페이지가 있을 것으로 추정 - estimatedTotalPages = Math.max(10, page * 2); // 최소 10페이지로 추정 - } - - // 페이지 진행률 표시 - const pageMessage = `${tableName} - Page Processing`; - const displayTotalPages = hasMore ? Math.max(estimatedTotalPages, page + 1) : page; - logger.progress(pageMessage, page, displayTotalPages); - - // PostgreSQL에 upsert (페이지 정보 전달) - await upsertToPostgres(tableName, oracleData, { current: page, total: displayTotalPages }); - - totalProcessed += oracleData.length; - - // 진행 상태 업데이트 - const progress = syncProgress.get(tableName)!; - progress.currentPage = page; - progress.totalProcessed = totalProcessed; - - // 다음 페이지로 - page++; - - // 페이지 크기보다 적으면 마지막 페이지 - if (oracleData.length < PAGE_SIZE) { - hasMore = false; - // 마지막 페이지 진행률 업데이트 - logger.progress(pageMessage, page - 1, page - 1); - } else { - // 추정 페이지 수 업데이트 - estimatedTotalPages = Math.max(estimatedTotalPages, page + 5); - } - } - - // 동기화 완료 - const progress = syncProgress.get(tableName)!; - progress.status = 'completed'; - - logger.success(`Table ${tableName} sync completed. Total processed: ${totalProcessed}`); - - } catch (error) { - const progress = syncProgress.get(tableName)!; - progress.status = 'error'; - progress.lastError = error instanceof Error ? error.message : String(error); - - logger.error(`Table ${tableName} sync failed:`, error); - throw error; - } -} - -/** - * 모든 테이블 동기화 - */ -async function syncAllTables(): Promise<void> { - logger.header('Starting NONSAP Data Synchronization'); - - const startTime = Date.now(); - let successCount = 0; - let errorCount = 0; - const totalTables = SYNC_TABLES.length; - - // 전체 테이블 진행률 표시 - logger.info(`Total tables to sync: ${totalTables}`); - - // oracle-schema.ts에서 테이블 목록 가져오기 - for (let i = 0; i < SYNC_TABLES.length; i++) { - const tableName = SYNC_TABLES[i]; - const tableIndex = i + 1; - - // 전체 테이블 진행률 표시 - const overallMessage = `Overall Progress`; - logger.progress(overallMessage, tableIndex - 1, totalTables); - - try { - await syncTable(tableName, tableIndex, totalTables); - successCount++; - } catch (error) { - errorCount++; - logger.error(`Failed to sync table ${tableName}:`, error); - // 에러가 발생해도 다른 테이블은 계속 동기화 - continue; - } - } - - // 최종 진행률 표시 - const overallMessage = `Overall Progress`; - logger.progress(overallMessage, totalTables, totalTables); - - const duration = Date.now() - startTime; - logger.info(`Sync completed in ${(duration / 1000).toFixed(2)}s`); - logger.info(`Success: ${successCount}, Errors: ${errorCount}`); - logger.info(`Total tables processed: ${totalTables}`); - - if (errorCount > 0) { - logger.warn(`${errorCount} tables had errors during sync`); - } - - if (successCount === totalTables) { - logger.success('🎉 All tables synchronized successfully!'); - } -} - -/** - * 동기화 상태 조회 API - */ -export async function getSyncProgress(): Promise<SyncProgress[]> { - return Array.from(syncProgress.values()); -} - -/** - * 수동 동기화 트리거 - */ -export async function triggerManualSync(): Promise<void> { - logger.info('Manual sync triggered'); - await syncAllTables(); -} - -/** - * Oracle DB 연결 테스트 - */ -async function testOracleConnection(): Promise<boolean> { - try { - const result = await oracleKnex.raw('SELECT 1 FROM DUAL'); - return !!result; - } catch (error) { - logger.error('Oracle DB connection test failed:', error); - return false; - } -} - -/** - * 동기화 스케줄러 시작 - */ -export async function startNonsapSyncScheduler(): Promise<void> { - logger.info('Initializing NONSAP data synchronization scheduler...'); - - // Oracle DB 연결 테스트 (비동기) - testOracleConnection().then(isConnected => { - if (!isConnected) { - logger.warn('Oracle DB connection failed - sync scheduler will be disabled'); - logger.warn('Application will continue to run normally'); - return; - } - logger.success('Oracle DB connection test passed'); - }).catch(error => { - logger.error('Oracle DB connection test error:', error); - logger.warn('Sync scheduler will be disabled, application continues'); - }); - - try { - // 매 시간마다 실행 (0분에) - cron.schedule('0 * * * *', async () => { - try { - logger.info('Cron job triggered: Starting scheduled sync'); - - // 동기화 전 Oracle 연결 확인 - const isConnected = await testOracleConnection(); - if (!isConnected) { - logger.warn('Oracle DB not available, skipping sync'); - return; - } - - await syncAllTables(); - } catch (error) { - logger.error('Scheduled sync failed:', error); - // 동기화 실패해도 다음 스케줄은 계속 실행 - } - }, { - timezone: 'Asia/Seoul' - }); - - logger.success(`NONSAP data synchronization cron job registered (every hour) for ${SYNC_TABLES.length} tables`); - - // 개발 환경에서는 스케줄러만 등록하고 실제 실행은 안 함 - if (process.env.NODE_ENV === 'development') { - logger.info('Development mode: Cron registered but initial sync skipped'); - return; - } - - // 애플리케이션 시작 시 한 번 실행 (선택사항) - if (process.env.SYNC_ON_START === 'true') { - logger.info('Initial sync on startup enabled'); - setTimeout(async () => { - try { - const isConnected = await testOracleConnection(); - if (isConnected) { - await syncAllTables(); - } else { - logger.warn('Initial sync skipped - Oracle DB not available'); - } - } catch (error) { - logger.error('Initial sync failed:', error); - } - }, 10000); // 10초 후 실행 (DB 연결 안정화 대기) - } - - } catch (error) { - logger.error('Failed to set up cron scheduler:', error); - logger.warn('Application will continue without sync scheduler'); - } -}
\ No newline at end of file |
