summaryrefslogtreecommitdiff
path: root/lib/nonsap-sync/sync-service.ts
diff options
context:
space:
mode:
Diffstat (limited to 'lib/nonsap-sync/sync-service.ts')
-rw-r--r--lib/nonsap-sync/sync-service.ts414
1 files changed, 414 insertions, 0 deletions
diff --git a/lib/nonsap-sync/sync-service.ts b/lib/nonsap-sync/sync-service.ts
new file mode 100644
index 00000000..286952eb
--- /dev/null
+++ b/lib/nonsap-sync/sync-service.ts
@@ -0,0 +1,414 @@
+"use server";
+
+import * as cron from 'node-cron';
+import { oracleKnex } from '@/lib/oracle-db/db';
+import db from '@/db/db';
+import { DatabaseSchema, TableName, ALL_TABLE_NAMES } from '@/lib/oracle-db/nonsap/oracle-schema';
+import * as nonsapSchema from '@/db/schema/NONSAP/nonsap';
+
+// oracle-schema.ts에서 테이블 목록 자동 추출
+const SYNC_TABLES: TableName[] = ALL_TABLE_NAMES;
+
+// 페이지 단위
+const PAGE_SIZE = 2000;
+const BATCH_SIZE = 100;
+
+interface SyncProgress {
+ tableName: string;
+ lastSyncDate: string;
+ currentPage: number;
+ totalProcessed: number;
+ status: 'running' | 'completed' | 'error';
+ lastError?: string;
+}
+
+// 동기화 진행 상태 저장
+const syncProgress = new Map<string, SyncProgress>();
+
+// 진행률 바 유틸리티
+const createProgressBar = (current: number, total: number, width: number = 20): string => {
+ const filled = Math.floor((current / total) * width);
+ const empty = width - filled;
+ const percentage = Math.round((current / total) * 100);
+
+ const bar = '█'.repeat(filled) + '░'.repeat(empty);
+ return `[ ${current.toString().padStart(3)} / ${total.toString().padEnd(3)} | ${bar} | ${percentage.toString().padStart(3)}% ]`;
+};
+
+// 같은 줄에서 업데이트되는 진행률 바 출력
+const updateProgressBar = (message: string, current: number, total: number): void => {
+ const progressBar = createProgressBar(current, total);
+ const fullMessage = `[NONSAP-SYNC] ${message} ${progressBar}`;
+
+ // 이전 줄을 지우고 새로운 진행률 출력
+ process.stdout.write('\r' + ' '.repeat(100) + '\r'); // 줄 지우기
+ process.stdout.write(fullMessage);
+
+ // 완료되면 줄바꿈
+ if (current >= total) {
+ process.stdout.write('\n');
+ }
+};
+
+// 간단한 로거
+const logger = {
+ info: (message: string, ...args: unknown[]) => console.log(`[NONSAP-SYNC INFO] ${message}`, ...args),
+ error: (message: string, ...args: unknown[]) => console.error(`[NONSAP-SYNC ERROR] ${message}`, ...args),
+ warn: (message: string, ...args: unknown[]) => console.warn(`[NONSAP-SYNC WARN] ${message}`, ...args),
+ success: (message: string, ...args: unknown[]) => console.log(`[NONSAP-SYNC SUCCESS] ${message}`, ...args),
+ header: (message: string) => {
+ console.log('\n' + '='.repeat(80));
+ console.log(`[NONSAP-SYNC] ${message}`);
+ console.log('='.repeat(80) + '\n');
+ },
+ progress: updateProgressBar
+};
+
+/**
+ * Oracle DB에서 특정 테이블의 데이터를 페이지 단위로 조회
+ */
+async function fetchOracleData<T extends TableName>(
+ tableName: T,
+ page: number,
+ pageSize: number = PAGE_SIZE
+): Promise<DatabaseSchema[T][]> {
+ const offset = (page - 1) * pageSize;
+
+ try {
+ // Oracle에서는 ROWID나 다른 고유 컬럼으로 정렬해야 함
+ const query = oracleKnex(tableName)
+ .select('*')
+ .orderBy('ROWID') // Oracle ROWID로 정렬 (또는 적절한 기본키)
+ .offset(offset)
+ .limit(pageSize);
+
+ const results = await query;
+ logger.info(`Fetched ${results.length} records from ${tableName} (page ${page})`);
+
+ return results as DatabaseSchema[T][];
+ } catch (error) {
+ logger.error(`Error fetching data from ${tableName}:`, error);
+ throw error;
+ }
+}
+
+/**
+ * PostgreSQL에 데이터 upsert
+ */
+async function upsertToPostgres<T extends TableName>(
+ tableName: T,
+ data: DatabaseSchema[T][],
+ pageInfo?: { current: number; total: number }
+): Promise<void> {
+ if (data.length === 0) return;
+
+ try {
+ // 테이블명을 camelCase로 변환하여 스키마에서 찾기
+ const tableCamelCase = tableName.toLowerCase()
+ .split('_')
+ .map((word, index) => index === 0 ? word : word.charAt(0).toUpperCase() + word.slice(1))
+ .join('');
+
+ const tableSchema = (nonsapSchema as any)[tableCamelCase];
+
+ if (!tableSchema) {
+ throw new Error(`Table schema not found for ${tableName} (${tableCamelCase})`);
+ }
+
+ const totalBatches = Math.ceil(data.length / BATCH_SIZE);
+
+ // 배치 단위로 upsert 처리
+ for (let i = 0; i < data.length; i += BATCH_SIZE) {
+ const batch = data.slice(i, i + BATCH_SIZE);
+ const currentBatch = Math.floor(i / BATCH_SIZE) + 1;
+
+ // 배치 진행률 표시
+ const batchMessage = `${tableName} - Batch Processing`;
+ logger.progress(batchMessage, currentBatch, totalBatches);
+
+ try {
+ // PostgreSQL에 삽입 시도
+ await db.insert(tableSchema as any).values(batch);
+ } catch (insertError: unknown) {
+ // 중복키 에러인지 확인
+ const isDuplicateKeyError = insertError &&
+ typeof insertError === 'object' &&
+ 'code' in insertError &&
+ insertError.code === '23505';
+
+ // Production 환경에서는 중복키 에러 로그 생략
+ if (!isDuplicateKeyError) {
+ logger.warn(`Batch insert failed for ${tableName}, trying individual upserts`, insertError);
+ }
+
+ // 삽입 실패 시 개별 레코드 upsert 시도
+ for (const record of batch) {
+ try {
+ await db.insert(tableSchema as any)
+ .values(record)
+ .onConflictDoNothing(); // 중복 시 무시하거나 업데이트
+ } catch (upsertError) {
+ // 중복키 에러인지 확인
+ const isDuplicateKeyError = upsertError &&
+ typeof upsertError === 'object' &&
+ 'code' in upsertError &&
+ upsertError.code === '23505';
+
+ // Production 환경에서는 중복키 에러 로그 생략
+ if (!isDuplicateKeyError) {
+ logger.error(`Failed to upsert record in ${tableName}:`, upsertError);
+ }
+ // 개별 레코드 실패는 로그만 남기고 계속 진행
+ }
+ }
+ }
+
+ // 잠시 대기 (시스템 부하 방지)
+ if (currentBatch % 10 === 0) {
+ await new Promise(resolve => setTimeout(resolve, 100));
+ }
+ }
+
+ logger.success(`Successfully processed ${data.length} records for ${tableName}`);
+ } catch (error) {
+ logger.error(`Error upserting data to ${tableName}:`, error);
+ throw error;
+ }
+}
+
+/**
+ * 특정 테이블 동기화
+ */
+async function syncTable<T extends TableName>(
+ tableName: T,
+ tableIndex: number,
+ totalTables: number
+): Promise<void> {
+ logger.info(`Starting sync for table: ${tableName} (${tableIndex}/${totalTables})`);
+
+ // 동기화 진행 상태 초기화
+ syncProgress.set(tableName, {
+ tableName,
+ lastSyncDate: new Date().toISOString(),
+ currentPage: 1,
+ totalProcessed: 0,
+ status: 'running'
+ });
+
+ try {
+ let page = 1;
+ let totalProcessed = 0;
+ let hasMore = true;
+ let estimatedTotalPages = 1; // 추정 총 페이지 수
+
+ while (hasMore) {
+ // Oracle에서 데이터 조회
+ const oracleData = await fetchOracleData(tableName, page);
+
+ if (oracleData.length === 0) {
+ hasMore = false;
+ break;
+ }
+
+ // 첫 페이지에서 대략적인 총 페이지 수 추정
+ if (page === 1 && oracleData.length === PAGE_SIZE) {
+ // 첫 페이지가 가득 찼다면 더 많은 페이지가 있을 것으로 추정
+ estimatedTotalPages = Math.max(10, page * 2); // 최소 10페이지로 추정
+ }
+
+ // 페이지 진행률 표시
+ const pageMessage = `${tableName} - Page Processing`;
+ const displayTotalPages = hasMore ? Math.max(estimatedTotalPages, page + 1) : page;
+ logger.progress(pageMessage, page, displayTotalPages);
+
+ // PostgreSQL에 upsert (페이지 정보 전달)
+ await upsertToPostgres(tableName, oracleData, { current: page, total: displayTotalPages });
+
+ totalProcessed += oracleData.length;
+
+ // 진행 상태 업데이트
+ const progress = syncProgress.get(tableName)!;
+ progress.currentPage = page;
+ progress.totalProcessed = totalProcessed;
+
+ // 다음 페이지로
+ page++;
+
+ // 페이지 크기보다 적으면 마지막 페이지
+ if (oracleData.length < PAGE_SIZE) {
+ hasMore = false;
+ // 마지막 페이지 진행률 업데이트
+ logger.progress(pageMessage, page - 1, page - 1);
+ } else {
+ // 추정 페이지 수 업데이트
+ estimatedTotalPages = Math.max(estimatedTotalPages, page + 5);
+ }
+ }
+
+ // 동기화 완료
+ const progress = syncProgress.get(tableName)!;
+ progress.status = 'completed';
+
+ logger.success(`Table ${tableName} sync completed. Total processed: ${totalProcessed}`);
+
+ } catch (error) {
+ const progress = syncProgress.get(tableName)!;
+ progress.status = 'error';
+ progress.lastError = error instanceof Error ? error.message : String(error);
+
+ logger.error(`Table ${tableName} sync failed:`, error);
+ throw error;
+ }
+}
+
+/**
+ * 모든 테이블 동기화
+ */
+async function syncAllTables(): Promise<void> {
+ logger.header('Starting NONSAP Data Synchronization');
+
+ const startTime = Date.now();
+ let successCount = 0;
+ let errorCount = 0;
+ const totalTables = SYNC_TABLES.length;
+
+ // 전체 테이블 진행률 표시
+ logger.info(`Total tables to sync: ${totalTables}`);
+
+ // oracle-schema.ts에서 테이블 목록 가져오기
+ for (let i = 0; i < SYNC_TABLES.length; i++) {
+ const tableName = SYNC_TABLES[i];
+ const tableIndex = i + 1;
+
+ // 전체 테이블 진행률 표시
+ const overallMessage = `Overall Progress`;
+ logger.progress(overallMessage, tableIndex - 1, totalTables);
+
+ try {
+ await syncTable(tableName, tableIndex, totalTables);
+ successCount++;
+ } catch (error) {
+ errorCount++;
+ logger.error(`Failed to sync table ${tableName}:`, error);
+ // 에러가 발생해도 다른 테이블은 계속 동기화
+ continue;
+ }
+ }
+
+ // 최종 진행률 표시
+ const overallMessage = `Overall Progress`;
+ logger.progress(overallMessage, totalTables, totalTables);
+
+ const duration = Date.now() - startTime;
+ logger.info(`Sync completed in ${(duration / 1000).toFixed(2)}s`);
+ logger.info(`Success: ${successCount}, Errors: ${errorCount}`);
+ logger.info(`Total tables processed: ${totalTables}`);
+
+ if (errorCount > 0) {
+ logger.warn(`${errorCount} tables had errors during sync`);
+ }
+
+ if (successCount === totalTables) {
+ logger.success('🎉 All tables synchronized successfully!');
+ }
+}
+
+/**
+ * 동기화 상태 조회 API
+ */
+export async function getSyncProgress(): Promise<SyncProgress[]> {
+ return Array.from(syncProgress.values());
+}
+
+/**
+ * 수동 동기화 트리거
+ */
+export async function triggerManualSync(): Promise<void> {
+ logger.info('Manual sync triggered');
+ await syncAllTables();
+}
+
+/**
+ * Oracle DB 연결 테스트
+ */
+async function testOracleConnection(): Promise<boolean> {
+ try {
+ const result = await oracleKnex.raw('SELECT 1 FROM DUAL');
+ return !!result;
+ } catch (error) {
+ logger.error('Oracle DB connection test failed:', error);
+ return false;
+ }
+}
+
+/**
+ * 동기화 스케줄러 시작
+ */
+export async function startNonsapSyncScheduler(): Promise<void> {
+ logger.info('Initializing NONSAP data synchronization scheduler...');
+
+ // Oracle DB 연결 테스트 (비동기)
+ testOracleConnection().then(isConnected => {
+ if (!isConnected) {
+ logger.warn('Oracle DB connection failed - sync scheduler will be disabled');
+ logger.warn('Application will continue to run normally');
+ return;
+ }
+ logger.success('Oracle DB connection test passed');
+ }).catch(error => {
+ logger.error('Oracle DB connection test error:', error);
+ logger.warn('Sync scheduler will be disabled, application continues');
+ });
+
+ try {
+ // 매 시간마다 실행 (0분에)
+ cron.schedule('0 * * * *', async () => {
+ try {
+ logger.info('Cron job triggered: Starting scheduled sync');
+
+ // 동기화 전 Oracle 연결 확인
+ const isConnected = await testOracleConnection();
+ if (!isConnected) {
+ logger.warn('Oracle DB not available, skipping sync');
+ return;
+ }
+
+ await syncAllTables();
+ } catch (error) {
+ logger.error('Scheduled sync failed:', error);
+ // 동기화 실패해도 다음 스케줄은 계속 실행
+ }
+ }, {
+ timezone: 'Asia/Seoul'
+ });
+
+ logger.success(`NONSAP data synchronization cron job registered (every hour) for ${SYNC_TABLES.length} tables`);
+
+ // 개발 환경에서는 스케줄러만 등록하고 실제 실행은 안 함
+ if (process.env.NODE_ENV === 'development') {
+ logger.info('Development mode: Cron registered but initial sync skipped');
+ return;
+ }
+
+ // 애플리케이션 시작 시 한 번 실행 (선택사항)
+ if (process.env.SYNC_ON_START === 'true') {
+ logger.info('Initial sync on startup enabled');
+ setTimeout(async () => {
+ try {
+ const isConnected = await testOracleConnection();
+ if (isConnected) {
+ await syncAllTables();
+ } else {
+ logger.warn('Initial sync skipped - Oracle DB not available');
+ }
+ } catch (error) {
+ logger.error('Initial sync failed:', error);
+ }
+ }, 10000); // 10초 후 실행 (DB 연결 안정화 대기)
+ }
+
+ } catch (error) {
+ logger.error('Failed to set up cron scheduler:', error);
+ logger.warn('Application will continue without sync scheduler');
+ }
+} \ No newline at end of file