summaryrefslogtreecommitdiff
path: root/lib/nonsap-sync/enhanced-sync-service.ts
diff options
context:
space:
mode:
authorjoonhoekim <26rote@gmail.com>2025-07-01 10:44:02 +0000
committerjoonhoekim <26rote@gmail.com>2025-07-01 10:44:02 +0000
commit6e25ab8da8a90a6d9bf40ccc83e36f119fb27568 (patch)
treef608ec6315b845b5770c2a357c6540116145cb41 /lib/nonsap-sync/enhanced-sync-service.ts
parentaf52dbc2b96e619be18dea857ea67d99622092a7 (diff)
(김준회) 비활성화한 node-cron 진입점 (instrumentation.ts) 추가 및 NONSAP 동기화 개발건
Diffstat (limited to 'lib/nonsap-sync/enhanced-sync-service.ts')
-rw-r--r--lib/nonsap-sync/enhanced-sync-service.ts873
1 files changed, 873 insertions, 0 deletions
diff --git a/lib/nonsap-sync/enhanced-sync-service.ts b/lib/nonsap-sync/enhanced-sync-service.ts
new file mode 100644
index 00000000..d5bd216e
--- /dev/null
+++ b/lib/nonsap-sync/enhanced-sync-service.ts
@@ -0,0 +1,873 @@
+/* eslint-disable @typescript-eslint/no-explicit-any */
+"use server";
+
+import * as cron from 'node-cron';
+import { sql } from 'drizzle-orm';
+import { oracleKnex } from '@/lib/oracle-db/db';
+import db from '@/db/db';
+import { DatabaseSchema, TableName, ALL_TABLE_NAMES } from '@/lib/oracle-db/nonsap/oracle-schema';
+import * as nonsapSchema from '@/db/schema/NONSAP/nonsap';
+import { getTableSyncConfig, canUseDeltaSync, getTimestampColumn } from './table-config';
+import { PAGE_SIZE, BATCH_SIZE, MAX_WORKERS, DELTA_SYNC_ENABLED } from './sync-config';
+
+interface SyncProgress {
+ tableName: string;
+ lastSyncDate: string;
+ currentPage: number;
+ totalProcessed: number;
+ status: 'running' | 'completed' | 'error' | 'skipped';
+ lastError?: string;
+ syncType: 'full' | 'delta' | 'rebuild';
+ startTime: number;
+ endTime?: number;
+ recordsSkipped?: number;
+}
+
+interface DeltaSyncInfo {
+ tableName: string;
+ lastSyncTimestamp: string | null;
+ timestampColumn: string;
+}
+
+// 동기화 진행 상태 저장
+const syncProgress = new Map<string, SyncProgress>();
+
+// PostgreSQL에 마지막 동기화 정보 저장 테이블
+const SYNC_STATUS_TABLE = 'nonsap_sync_status';
+
+// 진행률 바 유틸리티 (기존과 동일)
+const createProgressBar = (current: number, total: number, width: number = 20): string => {
+ const filled = Math.floor((current / total) * width);
+ const empty = width - filled;
+ const percentage = Math.round((current / total) * 100);
+
+ const bar = '█'.repeat(filled) + '░'.repeat(empty);
+ return `[ ${current.toString().padStart(3)} / ${total.toString().padEnd(3)} | ${bar} | ${percentage.toString().padStart(3)}% ]`;
+};
+
+const updateProgressBar = (message: string, current: number, total: number): void => {
+ const progressBar = createProgressBar(current, total);
+ const fullMessage = `[NONSAP-SYNC] ${message} ${progressBar}`;
+
+ process.stdout.write('\r' + ' '.repeat(100) + '\r');
+ process.stdout.write(fullMessage);
+
+ if (current >= total) {
+ process.stdout.write('\n');
+ }
+};
+
+// 로거 (기존과 동일)
+const logger = {
+ info: (message: string, ...args: unknown[]) => console.log(`[NONSAP-SYNC INFO] ${message}`, ...args),
+ error: (message: string, ...args: unknown[]) => console.error(`[NONSAP-SYNC ERROR] ${message}`, ...args),
+ warn: (message: string, ...args: unknown[]) => console.warn(`[NONSAP-SYNC WARN] ${message}`, ...args),
+ success: (message: string, ...args: unknown[]) => console.log(`[NONSAP-SYNC SUCCESS] ${message}`, ...args),
+ header: (message: string) => {
+ console.log('\n' + '='.repeat(80));
+ console.log(`[NONSAP-SYNC] ${message}`);
+ console.log('='.repeat(80) + '\n');
+ },
+ progress: updateProgressBar
+};
+
+/**
+ * 동기화 상태 테이블 초기화
+ */
+async function initializeSyncStatusTable(): Promise<void> {
+ try {
+ const tableExists = await db.execute(sql`
+ SELECT EXISTS (
+ SELECT FROM information_schema.tables
+ WHERE table_schema = 'nonsap'
+ AND table_name = ${SYNC_STATUS_TABLE}
+ );
+ `);
+
+ if (!tableExists.rows[0].exists) {
+ await db.execute(sql`
+ CREATE TABLE nonsap.${sql.identifier(SYNC_STATUS_TABLE)} (
+ table_name VARCHAR(50) PRIMARY KEY,
+ last_sync_timestamp VARCHAR(14),
+ last_sync_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+ sync_type VARCHAR(10) DEFAULT 'full',
+ total_records_processed INTEGER DEFAULT 0,
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+ updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+ );
+ `);
+ logger.info('Sync status table created');
+ }
+ } catch (error) {
+ logger.warn('Failed to initialize sync status table:', error);
+ }
+}
+
+/**
+ * 마지막 동기화 정보 조회
+ */
+async function getLastSyncInfo(tableName: string): Promise<DeltaSyncInfo> {
+ try {
+ const result = await db.execute(sql`
+ SELECT last_sync_timestamp
+ FROM nonsap.${sql.identifier(SYNC_STATUS_TABLE)}
+ WHERE table_name = ${tableName}
+ `);
+
+ const timestampColumn = getTimestampColumn(tableName as TableName);
+
+ return {
+ tableName,
+ lastSyncTimestamp: (result.rows[0] as any)?.last_sync_timestamp || null,
+ timestampColumn: timestampColumn || 'CHG_DT'
+ };
+ } catch (error) {
+ logger.warn(`Failed to get last sync info for ${tableName}:`, error);
+ return {
+ tableName,
+ lastSyncTimestamp: null,
+ timestampColumn: getTimestampColumn(tableName as TableName) || 'CHG_DT'
+ };
+ }
+}
+
+/**
+ * 마지막 동기화 정보 업데이트
+ */
+async function updateLastSyncInfo(tableName: string, timestamp: string, recordsProcessed: number): Promise<void> {
+ try {
+ await db.execute(sql`
+ INSERT INTO nonsap.${sql.identifier(SYNC_STATUS_TABLE)}
+ (table_name, last_sync_timestamp, total_records_processed, updated_at)
+ VALUES (${tableName}, ${timestamp}, ${recordsProcessed}, CURRENT_TIMESTAMP)
+ ON CONFLICT (table_name)
+ DO UPDATE SET
+ last_sync_timestamp = ${timestamp},
+ total_records_processed = ${recordsProcessed},
+ updated_at = CURRENT_TIMESTAMP
+ `);
+ } catch (error) {
+ logger.error(`Failed to update sync info for ${tableName}:`, error);
+ }
+}
+
+/**
+ * 전체 동기화를 위한 Oracle 데이터 조회 (ROWNUM 대신 ROW_NUMBER 사용)
+ */
+async function fetchOracleDataFull<T extends TableName>(
+ tableName: T,
+ page: number,
+ pageSize: number = PAGE_SIZE
+): Promise<DatabaseSchema[T][]> {
+ const offset = (page - 1) * pageSize;
+
+ try {
+ // ROW_NUMBER() OVER() 구문을 사용한 안전한 페이징
+ const query = `
+ SELECT * FROM (
+ SELECT t.*, ROW_NUMBER() OVER (ORDER BY ROWID) as rn
+ FROM ${tableName} t
+ )
+ WHERE rn > ${offset} AND rn <= ${offset + pageSize}
+ `;
+
+ const result = await oracleKnex.raw(query);
+
+ // Oracle knex raw 결과는 result[0]에 실제 rows가 있음
+ const rows = Array.isArray(result) ? result : result.rows || [];
+
+ // Oracle raw 쿼리 결과에서 실제 데이터 추출 (rn 컬럼 제거)
+ const cleanResults = rows.map((row: any) => {
+ if (row && typeof row === 'object') {
+ // eslint-disable-next-line @typescript-eslint/no-unused-vars
+ const { rn: _rn, RN: _RN, ...cleanRow } = row; // 대소문자 모두 제거
+ return cleanRow;
+ }
+ return row;
+ });
+
+ logger.info(`Fetched ${cleanResults.length} records from ${tableName} (page ${page}, full sync)`);
+
+ return cleanResults as DatabaseSchema[T][];
+ } catch (error) {
+ logger.error(`Error fetching full data from ${tableName}:`, error);
+ throw error;
+ }
+}
+
+/**
+ * 차분 동기화를 위한 Oracle 데이터 조회 (ROW_NUMBER 사용)
+ */
+async function fetchOracleDataDelta<T extends TableName>(
+ tableName: T,
+ lastSyncTimestamp: string | null,
+ timestampColumn: string,
+ page: number,
+ pageSize: number = PAGE_SIZE
+): Promise<{ data: DatabaseSchema[T][]; hasMore: boolean; maxTimestamp: string | null }> {
+ const offset = (page - 1) * pageSize;
+
+ try {
+ let whereClause = '';
+ const params: any[] = [];
+
+ // 차분 동기화 조건 추가
+ if (lastSyncTimestamp && timestampColumn) {
+ if (timestampColumn.includes('DTM')) {
+ // 14자리 YYYYMMDDHHMISS 형태
+ whereClause = `WHERE ${timestampColumn} > ?`;
+ params.push(lastSyncTimestamp);
+ } else {
+ // 8자리 YYYYMMDD 형태만 사용 (CHG_TM 컬럼이 없는 테이블들)
+ const lastDate = lastSyncTimestamp.substring(0, 8);
+
+ // 테이블에 따라 조건 분기
+ const timeColumn = timestampColumn.replace('_DT', '_TM');
+
+ // 실제로 CHG_TM 컬럼이 있는지 확인 (간접적으로 테이블명으로 판단)
+ const hasTimeColumn = [
+ 'CMCTB_VENDOR_GENERAL', 'CMCTB_VENDOR_GRP', 'CMCTB_VENDOR_INCO',
+ 'CMCTB_CDNM', 'CMCTB_CD_CLF_NM'
+ ].includes(tableName);
+
+ if (hasTimeColumn) {
+ const lastTime = lastSyncTimestamp.substring(8, 14);
+ whereClause = `WHERE (${timestampColumn} > ? OR (${timestampColumn} = ? AND ${timeColumn} > ?))`;
+ params.push(lastDate, lastDate, lastTime);
+ } else {
+ // CHG_TM 컬럼이 없는 테이블은 날짜만으로 비교
+ whereClause = `WHERE ${timestampColumn} > ?`;
+ params.push(lastDate);
+ }
+ }
+ }
+
+ // ROW_NUMBER() OVER() 구문을 사용한 안전한 페이징
+ const orderColumn = timestampColumn || 'ROWID';
+ const query = `
+ SELECT * FROM (
+ SELECT t.*, ROW_NUMBER() OVER (ORDER BY ${orderColumn}) as rn
+ FROM ${tableName} t
+ ${whereClause}
+ )
+ WHERE rn > ${offset} AND rn <= ${offset + pageSize}
+ `;
+
+ const result = await oracleKnex.raw(query, params);
+
+ // Oracle knex raw 결과는 result[0]에 실제 rows가 있음
+ const rows = Array.isArray(result) ? result : result.rows || [];
+
+ // Oracle raw 쿼리 결과에서 실제 데이터 추출 (rn 컬럼 제거)
+ const cleanResults = rows.map((row: any) => {
+ if (row && typeof row === 'object') {
+ // eslint-disable-next-line @typescript-eslint/no-unused-vars
+ const { rn: _rn, RN: _RN, ...cleanRow } = row; // 대소문자 모두 제거
+ return cleanRow;
+ }
+ return row;
+ });
+
+ // 최대 타임스탬프 계산
+ let maxTimestamp: string | null = null;
+ if (cleanResults.length > 0 && timestampColumn) {
+ const lastRecord = cleanResults[cleanResults.length - 1] as any;
+ if (timestampColumn.includes('DTM')) {
+ maxTimestamp = lastRecord[timestampColumn];
+ } else {
+ const timeColumn = timestampColumn.replace('_DT', '_TM');
+ const date = lastRecord[timestampColumn] || '';
+
+ // CHG_TM 컬럼이 있는 테이블만 시간 조합
+ const hasTimeColumn = [
+ 'CMCTB_VENDOR_GENERAL', 'CMCTB_VENDOR_GRP', 'CMCTB_VENDOR_INCO',
+ 'CMCTB_CDNM', 'CMCTB_CD_CLF_NM'
+ ].includes(tableName);
+
+ if (hasTimeColumn && lastRecord[timeColumn]) {
+ const time = lastRecord[timeColumn] || '000000';
+ maxTimestamp = date + time.padEnd(6, '0');
+ } else {
+ // 시간 정보가 없는 경우 날짜만 + 시간은 끝시간으로
+ maxTimestamp = date + '235959';
+ }
+ }
+ }
+
+ const hasMore = cleanResults.length === pageSize;
+
+ logger.info(`Fetched ${cleanResults.length} records from ${tableName} (page ${page}, delta: ${!!lastSyncTimestamp})`);
+
+ return {
+ data: cleanResults as DatabaseSchema[T][],
+ hasMore,
+ maxTimestamp
+ };
+ } catch (error) {
+ logger.error(`Error fetching delta data from ${tableName}:`, error);
+ throw error;
+ }
+}
+
+/**
+ * 컬럼명 정규화 (Oracle -> PostgreSQL)
+ */
+function normalizeColumnNames(record: any, tableSchema: any): any {
+ const normalizedRecord: any = {};
+ const schemaColumns = Object.keys(tableSchema);
+
+ for (const [key, value] of Object.entries(record)) {
+ if (!key || typeof key !== 'string' || key === 'undefined' || key.trim() === '') {
+ continue;
+ }
+
+ // RN 컬럼 제거 (ROW_NUMBER 결과)
+ if (key.toUpperCase() === 'RN') {
+ continue;
+ }
+
+ // 대문자로 변환하여 스키마와 매칭
+ const upperKey = key.toUpperCase();
+
+ // 스키마에 해당 컬럼이 있는지 확인
+ if (schemaColumns.includes(upperKey)) {
+ normalizedRecord[upperKey] = value;
+ } else {
+ // 스키마에 없는 컬럼은 경고 출력하지 않고 조용히 스킵 (너무 많은 로그 방지)
+ // logger.warn(`Column ${key} (${upperKey}) not found in schema, skipping`);
+ }
+ }
+
+ return normalizedRecord;
+}
+
+/**
+ * PostgreSQL에서 실제 constraint 존재 여부 확인
+ */
+async function hasValidConstraint(tableName: string, columnNames: string[]): Promise<boolean> {
+ try {
+ if (columnNames.length === 0) return false;
+
+ const tableNameLower = tableName.toLowerCase();
+
+ // Primary key나 unique constraint 확인
+ const constraintQuery = sql`
+ SELECT tc.constraint_name, tc.constraint_type, kcu.column_name
+ FROM information_schema.table_constraints tc
+ JOIN information_schema.key_column_usage kcu
+ ON tc.constraint_name = kcu.constraint_name
+ WHERE tc.table_schema = 'nonsap'
+ AND tc.table_name = ${tableNameLower}
+ AND tc.constraint_type IN ('PRIMARY KEY', 'UNIQUE')
+ `;
+
+ const result = await db.execute(constraintQuery);
+ const constraints = result.rows;
+
+ // 해당 컬럼들이 실제 constraint에 포함되는지 확인
+ for (const constraint of constraints) {
+ const constraintColumns = constraints
+ .filter((c: any) => c.constraint_name === constraint.constraint_name)
+ .map((c: any) => c.column_name.toUpperCase());
+
+ // 모든 컬럼이 constraint에 포함되는지 확인
+ const allColumnsInConstraint = columnNames.every(col =>
+ constraintColumns.includes(col.toUpperCase())
+ );
+
+ if (allColumnsInConstraint) {
+ return true;
+ }
+ }
+
+ return false;
+ } catch (error) {
+ logger.warn(`Failed to check constraints for ${tableName}:`, error);
+ return false;
+ }
+}
+
+/**
+ * PostgreSQL에 데이터 upsert (최적화된 버전)
+ */
+async function upsertToPostgresOptimized<T extends TableName>(
+ tableName: T,
+ data: DatabaseSchema[T][],
+ primaryKeys: string[]
+): Promise<void> {
+ if (data.length === 0) return;
+
+ try {
+ const tableCamelCase = tableName.toLowerCase()
+ .split('_')
+ .map((word, index) => index === 0 ? word : word.charAt(0).toUpperCase() + word.slice(1))
+ .join('');
+
+ const tableSchema = (nonsapSchema as any)[tableCamelCase];
+
+ if (!tableSchema) {
+ throw new Error(`Table schema not found for ${tableName} (${tableCamelCase})`);
+ }
+
+ for (let i = 0; i < data.length; i += BATCH_SIZE) {
+ const batch = data.slice(i, i + BATCH_SIZE);
+ const currentBatch = Math.floor(i / BATCH_SIZE) + 1;
+
+ try {
+ // primaryKeys에서 undefined 값 필터링 및 검증
+ const validPrimaryKeys = primaryKeys.filter(key =>
+ key &&
+ typeof key === 'string' &&
+ key !== 'ROWID' &&
+ key !== 'undefined' &&
+ key.trim() !== ''
+ );
+
+ if (currentBatch === 1) {
+ logger.info(`${tableName} - Valid primaryKeys: [${validPrimaryKeys.join(', ')}]`);
+ }
+
+ // 데이터 정규화 (컬럼명 매핑)
+ const cleanBatch = batch
+ .map(record => normalizeColumnNames(record, tableSchema))
+ .filter(record => Object.keys(record).length > 0);
+
+ if (cleanBatch.length === 0) {
+ logger.warn(`${tableName} - No valid records after cleaning batch ${currentBatch}`);
+ continue;
+ }
+
+ if (validPrimaryKeys.length > 0) {
+ // 실제 constraint 존재 여부 확인
+ const hasConstraint = await hasValidConstraint(tableName, validPrimaryKeys);
+
+ if (hasConstraint) {
+ // 스키마에서 실제 컬럼 객체들 가져오기
+ const conflictColumns = validPrimaryKeys
+ .filter(key => key in tableSchema)
+ .map(key => tableSchema[key]);
+
+ if (conflictColumns.length > 0) {
+ // 업데이트할 필드들만 선별 (기본키 제외)
+ const updateFields = Object.keys(cleanBatch[0])
+ .filter(key => !validPrimaryKeys.includes(key) && key in tableSchema)
+ .reduce((acc, key) => {
+ acc[key] = sql.raw(`excluded."${key}"`);
+ return acc;
+ }, {} as Record<string, any>);
+
+ await db.insert(tableSchema as any)
+ .values(cleanBatch)
+ .onConflictDoUpdate({
+ target: conflictColumns,
+ set: updateFields
+ });
+ } else {
+ // 스키마에서 컬럼을 찾을 수 없는 경우
+ logger.warn(`${tableName} - Primary key columns not found in schema, using simple insert`);
+ await db.insert(tableSchema as any).values(cleanBatch);
+ }
+ } else {
+ // constraint가 없는 경우: 개별 처리로 중복 방지
+ logger.info(`${tableName} - No valid constraints found, using individual upsert`);
+
+ for (const record of cleanBatch) {
+ try {
+ await db.insert(tableSchema as any)
+ .values(record)
+ .onConflictDoNothing(); // 에러 무시하고 계속
+ } catch {
+ // 개별 레코드 실패는 조용히 무시
+ }
+ }
+ }
+ } else {
+ // 기본키가 없는 테이블: 삭제 후 재삽입 방식
+ if (currentBatch === 1) {
+ // 첫 번째 배치에서만 기존 데이터 확인 및 삭제
+ const existingDataCheck = await db.execute(sql`
+ SELECT COUNT(*) as count FROM ${sql.identifier('nonsap')}.${sql.identifier(tableName.toLowerCase())}
+ `);
+
+ const existingCount = Number(existingDataCheck.rows[0].count);
+ if (existingCount > 0) {
+ logger.info(`${tableName} - Existing data found (${existingCount} records), deleting before insert`);
+ await db.execute(sql`
+ DELETE FROM ${sql.identifier('nonsap')}.${sql.identifier(tableName.toLowerCase())}
+ `);
+ } else {
+ logger.info(`${tableName} - No existing data, proceeding with fresh insert`);
+ }
+ }
+
+ // 데이터 정규화 후 삽입
+ if (cleanBatch.length > 0) {
+ await db.insert(tableSchema as any).values(cleanBatch);
+ }
+ }
+
+ } catch (insertError: unknown) {
+ const isDuplicateKeyError = insertError &&
+ typeof insertError === 'object' &&
+ 'code' in insertError &&
+ insertError.code === '23505';
+
+ if (!isDuplicateKeyError) {
+ logger.warn(`Batch upsert failed for ${tableName}, trying individual operations`, insertError);
+
+ // 개별 처리
+ for (const record of batch) {
+ try {
+ // 레코드 정규화
+ const cleanRecord = normalizeColumnNames(record, tableSchema);
+
+ if (Object.keys(cleanRecord).length > 0) {
+ await db.insert(tableSchema as any)
+ .values(cleanRecord)
+ .onConflictDoNothing();
+ }
+ } catch {
+ // 개별 레코드 실패는 무시
+ }
+ }
+ }
+ }
+
+ if (currentBatch % 10 === 0) {
+ await new Promise(resolve => setTimeout(resolve, 50));
+ }
+ }
+
+ logger.success(`Successfully processed ${data.length} records for ${tableName}`);
+ } catch (error) {
+ logger.error(`Error upserting data to ${tableName}:`, error);
+ throw error;
+ }
+}
+
+/**
+ * 워커를 사용한 테이블 동기화
+ */
+async function syncTableWithWorker<T extends TableName>(
+ tableName: T,
+ tableIndex: number,
+ totalTables: number
+): Promise<void> {
+ logger.info(`Starting enhanced sync for table: ${tableName} (${tableIndex}/${totalTables})`);
+
+ const config = getTableSyncConfig(tableName);
+ const useDeltaSync = DELTA_SYNC_ENABLED && canUseDeltaSync(tableName);
+ const timestampColumn = getTimestampColumn(tableName);
+
+ // 동기화 타입 결정
+ let syncType: 'full' | 'delta' | 'rebuild';
+ if (useDeltaSync) {
+ syncType = 'delta';
+ } else if (config.primaryKeys.length === 0) {
+ // 기본키가 없는 테이블은 삭제 후 재구성
+ syncType = 'rebuild';
+ } else {
+ // 기본키가 있지만 차분 동기화가 불가능한 경우 전체 동기화
+ syncType = 'full';
+ }
+
+ const syncTypeDescription = {
+ 'delta': '차분 동기화',
+ 'full': '전체 동기화',
+ 'rebuild': '삭제 후 재구성'
+ }[syncType];
+
+ // 동기화 진행 상태 초기화
+ const progress: SyncProgress = {
+ tableName,
+ lastSyncDate: new Date().toISOString(),
+ currentPage: 1,
+ totalProcessed: 0,
+ status: 'running',
+ syncType,
+ startTime: Date.now()
+ };
+ syncProgress.set(tableName, progress);
+
+ try {
+ let page = 1;
+ let totalProcessed = 0;
+ let hasMore = true;
+ let maxTimestamp: string | null = null;
+ let lastSyncTimestamp: string | null = null;
+
+ // 차분 동기화 정보 조회
+ if (useDeltaSync) {
+ const deltaInfo = await getLastSyncInfo(tableName);
+ lastSyncTimestamp = deltaInfo.lastSyncTimestamp;
+ logger.info(`${syncTypeDescription} for ${tableName}, last timestamp: ${lastSyncTimestamp || 'none'}`);
+ } else {
+ logger.info(`${syncTypeDescription} for ${tableName}`);
+ }
+
+ while (hasMore) {
+ let result: { data: DatabaseSchema[T][]; hasMore: boolean; maxTimestamp: string | null };
+
+ if (useDeltaSync && timestampColumn) {
+ // 차분 동기화 사용
+ result = await fetchOracleDataDelta(
+ tableName,
+ lastSyncTimestamp,
+ timestampColumn,
+ page
+ );
+ } else {
+ // 전체 동기화 사용 (기존 방식)
+ const oracleData = await fetchOracleDataFull(tableName, page);
+ result = {
+ data: oracleData,
+ hasMore: oracleData.length === PAGE_SIZE,
+ maxTimestamp: null
+ };
+ }
+
+ if (result.data.length === 0) {
+ logger.info(`No new data found for ${tableName}, skipping...`);
+ progress.status = 'skipped';
+ break;
+ }
+
+ // PostgreSQL에 upsert
+ await upsertToPostgresOptimized(tableName, result.data, config.primaryKeys);
+
+ totalProcessed += result.data.length;
+ hasMore = result.hasMore;
+
+ if (result.maxTimestamp) {
+ maxTimestamp = result.maxTimestamp;
+ }
+
+ // 진행 상태 업데이트
+ progress.currentPage = page;
+ progress.totalProcessed = totalProcessed;
+
+ page++;
+
+ // 차분 동기화가 아닌 경우 페이지 크기보다 적으면 중단
+ if (!useDeltaSync && result.data.length < PAGE_SIZE) {
+ hasMore = false;
+ }
+ // 차분 동기화에서는 연속된 빈 페이지가 나오면 중단
+ else if (useDeltaSync && result.data.length < PAGE_SIZE) {
+ hasMore = false;
+ }
+ }
+
+ // 동기화 완료 처리
+ progress.status = progress.status === 'skipped' ? 'skipped' : 'completed';
+ progress.endTime = Date.now();
+
+ // 마지막 동기화 정보 업데이트
+ if (maxTimestamp && useDeltaSync) {
+ await updateLastSyncInfo(tableName, maxTimestamp, totalProcessed);
+ }
+
+ const duration = (progress.endTime - progress.startTime) / 1000;
+ logger.success(`Table ${tableName} sync completed. Processed: ${totalProcessed}, Duration: ${duration.toFixed(2)}s, Type: ${syncTypeDescription}`);
+
+ } catch (error) {
+ progress.status = 'error';
+ progress.lastError = error instanceof Error ? error.message : String(error);
+ progress.endTime = Date.now();
+
+ logger.error(`Table ${tableName} sync failed:`, error);
+ throw error;
+ }
+}
+
+/**
+ * 멀티스레드 동기화 (테이블별 병렬 처리)
+ */
+async function syncTablesInParallel(tables: TableName[]): Promise<void> {
+ const chunks: TableName[][] = [];
+ const chunkSize = Math.ceil(tables.length / MAX_WORKERS);
+
+ for (let i = 0; i < tables.length; i += chunkSize) {
+ chunks.push(tables.slice(i, i + chunkSize));
+ }
+
+ logger.info(`Processing ${tables.length} tables in ${chunks.length} parallel groups`);
+
+ const promises = chunks.map(async (tableChunk, chunkIndex) => {
+ logger.info(`Starting worker ${chunkIndex + 1} with ${tableChunk.length} tables`);
+
+ for (let i = 0; i < tableChunk.length; i++) {
+ const tableName = tableChunk[i];
+ const globalIndex = chunks.slice(0, chunkIndex).reduce((sum, chunk) => sum + chunk.length, 0) + i + 1;
+
+ try {
+ await syncTableWithWorker(tableName, globalIndex, tables.length);
+ } catch (error) {
+ logger.error(`Worker ${chunkIndex + 1} failed on table ${tableName}:`, error);
+ // 개별 테이블 실패는 무시하고 계속 진행
+ }
+ }
+
+ logger.info(`Worker ${chunkIndex + 1} completed`);
+ });
+
+ await Promise.all(promises);
+}
+
+/**
+ * 모든 테이블 동기화 (개선된 버전)
+ */
+async function syncAllTablesEnhanced(): Promise<void> {
+ logger.header('Starting Enhanced NONSAP Data Synchronization');
+
+ await initializeSyncStatusTable();
+
+ const startTime = Date.now();
+ let successCount = 0;
+ let errorCount = 0;
+ let skippedCount = 0;
+ const totalTables = ALL_TABLE_NAMES.length;
+
+ logger.info(`Total tables to sync: ${totalTables}`);
+ logger.info(`Delta sync enabled: ${DELTA_SYNC_ENABLED}`);
+ logger.info(`Max workers: ${MAX_WORKERS}`);
+
+ try {
+ await syncTablesInParallel(ALL_TABLE_NAMES);
+
+ // 결과 집계
+ for (const progress of syncProgress.values()) {
+ switch (progress.status) {
+ case 'completed':
+ successCount++;
+ break;
+ case 'error':
+ errorCount++;
+ break;
+ case 'skipped':
+ skippedCount++;
+ break;
+ }
+ }
+
+ } catch (error) {
+ logger.error('Sync process failed:', error);
+ }
+
+ const duration = Date.now() - startTime;
+ logger.info(`Enhanced sync completed in ${(duration / 1000).toFixed(2)}s`);
+ logger.info(`Success: ${successCount}, Errors: ${errorCount}, Skipped: ${skippedCount}`);
+ logger.info(`Total tables processed: ${totalTables}`);
+
+ if (errorCount > 0) {
+ logger.warn(`${errorCount} tables had errors during sync`);
+ }
+
+ if (successCount + skippedCount === totalTables) {
+ logger.success('🚀 All tables synchronized successfully with enhanced performance!');
+ }
+}
+
+/**
+ * 동기화 상태 조회 API (개선된 버전)
+ */
+export async function getSyncProgressEnhanced(): Promise<SyncProgress[]> {
+ return Array.from(syncProgress.values());
+}
+
+/**
+ * 수동 동기화 트리거 (개선된 버전)
+ */
+export async function triggerEnhancedSync(): Promise<void> {
+ logger.info('Enhanced manual sync triggered');
+ await syncAllTablesEnhanced();
+}
+
+/**
+ * 특정 테이블 전체 재동기화
+ */
+export async function triggerFullResync(tableName: TableName): Promise<void> {
+ logger.info(`Full resync triggered for table: ${tableName}`);
+
+ try {
+ // 마지막 동기화 정보 삭제 (전체 동기화 강제)
+ await db.execute(sql`
+ DELETE FROM nonsap.${sql.identifier(SYNC_STATUS_TABLE)}
+ WHERE table_name = ${tableName}
+ `);
+
+ await syncTableWithWorker(tableName, 1, 1);
+ logger.success(`Full resync completed for ${tableName}`);
+ } catch (error) {
+ logger.error(`Full resync failed for ${tableName}:`, error);
+ throw error;
+ }
+}
+
+/**
+ * 개선된 동기화 스케줄러 시작
+ */
+export async function startEnhancedSyncScheduler(): Promise<void> {
+ logger.info('Initializing Enhanced NONSAP data synchronization scheduler...');
+
+ try {
+ // 매일 새벽 1시에 실행 (아래 타임존 설정이 있으므로 KST로 기입 가능)
+ cron.schedule('0 1 * * *', async () => {
+ try {
+ logger.info('Cron job triggered: Starting enhanced scheduled sync');
+
+ const isConnected = await testOracleConnection();
+ if (!isConnected) {
+ logger.warn('Oracle DB not available, skipping sync');
+ return;
+ }
+
+ await syncAllTablesEnhanced();
+ } catch (error) {
+ logger.error('Enhanced scheduled sync failed:', error);
+ }
+ }, {
+ timezone: 'Asia/Seoul'
+ });
+
+ logger.success(`Enhanced NONSAP sync scheduler registered (every 30 minutes) for ${ALL_TABLE_NAMES.length} tables`);
+
+ if (process.env.NODE_ENV === 'development') {
+ logger.info('Development mode: Enhanced cron registered but initial sync skipped');
+ return;
+ }
+
+ if (process.env.SYNC_ON_START === 'true') {
+ logger.info('Initial enhanced sync on startup enabled');
+ setTimeout(async () => {
+ try {
+ const isConnected = await testOracleConnection();
+ if (isConnected) {
+ await syncAllTablesEnhanced();
+ } else {
+ logger.warn('Initial enhanced sync skipped - Oracle DB not available');
+ }
+ } catch (error) {
+ logger.error('Initial enhanced sync failed:', error);
+ }
+ }, 10000);
+ }
+
+ } catch (error) {
+ logger.error('Failed to set up enhanced cron scheduler:', error);
+ logger.warn('Application will continue without enhanced sync scheduler');
+ }
+}
+
+/**
+ * Oracle DB 연결 테스트
+ */
+async function testOracleConnection(): Promise<boolean> {
+ try {
+ const result = await oracleKnex.raw('SELECT 1 FROM DUAL');
+ return !!result;
+ } catch (error) {
+ logger.error('Oracle DB connection test failed:', error);
+ return false;
+ }
+} \ No newline at end of file