diff options
| author | joonhoekim <26rote@gmail.com> | 2025-07-01 10:44:02 +0000 |
|---|---|---|
| committer | joonhoekim <26rote@gmail.com> | 2025-07-01 10:44:02 +0000 |
| commit | 6e25ab8da8a90a6d9bf40ccc83e36f119fb27568 (patch) | |
| tree | f608ec6315b845b5770c2a357c6540116145cb41 /lib/nonsap-sync | |
| parent | af52dbc2b96e619be18dea857ea67d99622092a7 (diff) | |
(김준회) 비활성화한 node-cron 진입점 (instrumentation.ts) 추가 및 NONSAP 동기화 개발건
Diffstat (limited to 'lib/nonsap-sync')
| -rw-r--r-- | lib/nonsap-sync/enhanced-sync-service.ts | 873 | ||||
| -rw-r--r-- | lib/nonsap-sync/sync-config.ts | 46 | ||||
| -rw-r--r-- | lib/nonsap-sync/sync-service.ts | 414 | ||||
| -rw-r--r-- | lib/nonsap-sync/table-config.ts | 352 |
4 files changed, 1685 insertions, 0 deletions
diff --git a/lib/nonsap-sync/enhanced-sync-service.ts b/lib/nonsap-sync/enhanced-sync-service.ts new file mode 100644 index 00000000..d5bd216e --- /dev/null +++ b/lib/nonsap-sync/enhanced-sync-service.ts @@ -0,0 +1,873 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ +"use server"; + +import * as cron from 'node-cron'; +import { sql } from 'drizzle-orm'; +import { oracleKnex } from '@/lib/oracle-db/db'; +import db from '@/db/db'; +import { DatabaseSchema, TableName, ALL_TABLE_NAMES } from '@/lib/oracle-db/nonsap/oracle-schema'; +import * as nonsapSchema from '@/db/schema/NONSAP/nonsap'; +import { getTableSyncConfig, canUseDeltaSync, getTimestampColumn } from './table-config'; +import { PAGE_SIZE, BATCH_SIZE, MAX_WORKERS, DELTA_SYNC_ENABLED } from './sync-config'; + +interface SyncProgress { + tableName: string; + lastSyncDate: string; + currentPage: number; + totalProcessed: number; + status: 'running' | 'completed' | 'error' | 'skipped'; + lastError?: string; + syncType: 'full' | 'delta' | 'rebuild'; + startTime: number; + endTime?: number; + recordsSkipped?: number; +} + +interface DeltaSyncInfo { + tableName: string; + lastSyncTimestamp: string | null; + timestampColumn: string; +} + +// 동기화 진행 상태 저장 +const syncProgress = new Map<string, SyncProgress>(); + +// PostgreSQL에 마지막 동기화 정보 저장 테이블 +const SYNC_STATUS_TABLE = 'nonsap_sync_status'; + +// 진행률 바 유틸리티 (기존과 동일) +const createProgressBar = (current: number, total: number, width: number = 20): string => { + const filled = Math.floor((current / total) * width); + const empty = width - filled; + const percentage = Math.round((current / total) * 100); + + const bar = '█'.repeat(filled) + '░'.repeat(empty); + return `[ ${current.toString().padStart(3)} / ${total.toString().padEnd(3)} | ${bar} | ${percentage.toString().padStart(3)}% ]`; +}; + +const updateProgressBar = (message: string, current: number, total: number): void => { + const progressBar = createProgressBar(current, total); + const fullMessage = `[NONSAP-SYNC] ${message} ${progressBar}`; + + process.stdout.write('\r' + ' '.repeat(100) + '\r'); + process.stdout.write(fullMessage); + + if (current >= total) { + process.stdout.write('\n'); + } +}; + +// 로거 (기존과 동일) +const logger = { + info: (message: string, ...args: unknown[]) => console.log(`[NONSAP-SYNC INFO] ${message}`, ...args), + error: (message: string, ...args: unknown[]) => console.error(`[NONSAP-SYNC ERROR] ${message}`, ...args), + warn: (message: string, ...args: unknown[]) => console.warn(`[NONSAP-SYNC WARN] ${message}`, ...args), + success: (message: string, ...args: unknown[]) => console.log(`[NONSAP-SYNC SUCCESS] ${message}`, ...args), + header: (message: string) => { + console.log('\n' + '='.repeat(80)); + console.log(`[NONSAP-SYNC] ${message}`); + console.log('='.repeat(80) + '\n'); + }, + progress: updateProgressBar +}; + +/** + * 동기화 상태 테이블 초기화 + */ +async function initializeSyncStatusTable(): Promise<void> { + try { + const tableExists = await db.execute(sql` + SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_schema = 'nonsap' + AND table_name = ${SYNC_STATUS_TABLE} + ); + `); + + if (!tableExists.rows[0].exists) { + await db.execute(sql` + CREATE TABLE nonsap.${sql.identifier(SYNC_STATUS_TABLE)} ( + table_name VARCHAR(50) PRIMARY KEY, + last_sync_timestamp VARCHAR(14), + last_sync_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + sync_type VARCHAR(10) DEFAULT 'full', + total_records_processed INTEGER DEFAULT 0, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ); + `); + logger.info('Sync status table created'); + } + } catch (error) { + logger.warn('Failed to initialize sync status table:', error); + } +} + +/** + * 마지막 동기화 정보 조회 + */ +async function getLastSyncInfo(tableName: string): Promise<DeltaSyncInfo> { + try { + const result = await db.execute(sql` + SELECT last_sync_timestamp + FROM nonsap.${sql.identifier(SYNC_STATUS_TABLE)} + WHERE table_name = ${tableName} + `); + + const timestampColumn = getTimestampColumn(tableName as TableName); + + return { + tableName, + lastSyncTimestamp: (result.rows[0] as any)?.last_sync_timestamp || null, + timestampColumn: timestampColumn || 'CHG_DT' + }; + } catch (error) { + logger.warn(`Failed to get last sync info for ${tableName}:`, error); + return { + tableName, + lastSyncTimestamp: null, + timestampColumn: getTimestampColumn(tableName as TableName) || 'CHG_DT' + }; + } +} + +/** + * 마지막 동기화 정보 업데이트 + */ +async function updateLastSyncInfo(tableName: string, timestamp: string, recordsProcessed: number): Promise<void> { + try { + await db.execute(sql` + INSERT INTO nonsap.${sql.identifier(SYNC_STATUS_TABLE)} + (table_name, last_sync_timestamp, total_records_processed, updated_at) + VALUES (${tableName}, ${timestamp}, ${recordsProcessed}, CURRENT_TIMESTAMP) + ON CONFLICT (table_name) + DO UPDATE SET + last_sync_timestamp = ${timestamp}, + total_records_processed = ${recordsProcessed}, + updated_at = CURRENT_TIMESTAMP + `); + } catch (error) { + logger.error(`Failed to update sync info for ${tableName}:`, error); + } +} + +/** + * 전체 동기화를 위한 Oracle 데이터 조회 (ROWNUM 대신 ROW_NUMBER 사용) + */ +async function fetchOracleDataFull<T extends TableName>( + tableName: T, + page: number, + pageSize: number = PAGE_SIZE +): Promise<DatabaseSchema[T][]> { + const offset = (page - 1) * pageSize; + + try { + // ROW_NUMBER() OVER() 구문을 사용한 안전한 페이징 + const query = ` + SELECT * FROM ( + SELECT t.*, ROW_NUMBER() OVER (ORDER BY ROWID) as rn + FROM ${tableName} t + ) + WHERE rn > ${offset} AND rn <= ${offset + pageSize} + `; + + const result = await oracleKnex.raw(query); + + // Oracle knex raw 결과는 result[0]에 실제 rows가 있음 + const rows = Array.isArray(result) ? result : result.rows || []; + + // Oracle raw 쿼리 결과에서 실제 데이터 추출 (rn 컬럼 제거) + const cleanResults = rows.map((row: any) => { + if (row && typeof row === 'object') { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const { rn: _rn, RN: _RN, ...cleanRow } = row; // 대소문자 모두 제거 + return cleanRow; + } + return row; + }); + + logger.info(`Fetched ${cleanResults.length} records from ${tableName} (page ${page}, full sync)`); + + return cleanResults as DatabaseSchema[T][]; + } catch (error) { + logger.error(`Error fetching full data from ${tableName}:`, error); + throw error; + } +} + +/** + * 차분 동기화를 위한 Oracle 데이터 조회 (ROW_NUMBER 사용) + */ +async function fetchOracleDataDelta<T extends TableName>( + tableName: T, + lastSyncTimestamp: string | null, + timestampColumn: string, + page: number, + pageSize: number = PAGE_SIZE +): Promise<{ data: DatabaseSchema[T][]; hasMore: boolean; maxTimestamp: string | null }> { + const offset = (page - 1) * pageSize; + + try { + let whereClause = ''; + const params: any[] = []; + + // 차분 동기화 조건 추가 + if (lastSyncTimestamp && timestampColumn) { + if (timestampColumn.includes('DTM')) { + // 14자리 YYYYMMDDHHMISS 형태 + whereClause = `WHERE ${timestampColumn} > ?`; + params.push(lastSyncTimestamp); + } else { + // 8자리 YYYYMMDD 형태만 사용 (CHG_TM 컬럼이 없는 테이블들) + const lastDate = lastSyncTimestamp.substring(0, 8); + + // 테이블에 따라 조건 분기 + const timeColumn = timestampColumn.replace('_DT', '_TM'); + + // 실제로 CHG_TM 컬럼이 있는지 확인 (간접적으로 테이블명으로 판단) + const hasTimeColumn = [ + 'CMCTB_VENDOR_GENERAL', 'CMCTB_VENDOR_GRP', 'CMCTB_VENDOR_INCO', + 'CMCTB_CDNM', 'CMCTB_CD_CLF_NM' + ].includes(tableName); + + if (hasTimeColumn) { + const lastTime = lastSyncTimestamp.substring(8, 14); + whereClause = `WHERE (${timestampColumn} > ? OR (${timestampColumn} = ? AND ${timeColumn} > ?))`; + params.push(lastDate, lastDate, lastTime); + } else { + // CHG_TM 컬럼이 없는 테이블은 날짜만으로 비교 + whereClause = `WHERE ${timestampColumn} > ?`; + params.push(lastDate); + } + } + } + + // ROW_NUMBER() OVER() 구문을 사용한 안전한 페이징 + const orderColumn = timestampColumn || 'ROWID'; + const query = ` + SELECT * FROM ( + SELECT t.*, ROW_NUMBER() OVER (ORDER BY ${orderColumn}) as rn + FROM ${tableName} t + ${whereClause} + ) + WHERE rn > ${offset} AND rn <= ${offset + pageSize} + `; + + const result = await oracleKnex.raw(query, params); + + // Oracle knex raw 결과는 result[0]에 실제 rows가 있음 + const rows = Array.isArray(result) ? result : result.rows || []; + + // Oracle raw 쿼리 결과에서 실제 데이터 추출 (rn 컬럼 제거) + const cleanResults = rows.map((row: any) => { + if (row && typeof row === 'object') { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const { rn: _rn, RN: _RN, ...cleanRow } = row; // 대소문자 모두 제거 + return cleanRow; + } + return row; + }); + + // 최대 타임스탬프 계산 + let maxTimestamp: string | null = null; + if (cleanResults.length > 0 && timestampColumn) { + const lastRecord = cleanResults[cleanResults.length - 1] as any; + if (timestampColumn.includes('DTM')) { + maxTimestamp = lastRecord[timestampColumn]; + } else { + const timeColumn = timestampColumn.replace('_DT', '_TM'); + const date = lastRecord[timestampColumn] || ''; + + // CHG_TM 컬럼이 있는 테이블만 시간 조합 + const hasTimeColumn = [ + 'CMCTB_VENDOR_GENERAL', 'CMCTB_VENDOR_GRP', 'CMCTB_VENDOR_INCO', + 'CMCTB_CDNM', 'CMCTB_CD_CLF_NM' + ].includes(tableName); + + if (hasTimeColumn && lastRecord[timeColumn]) { + const time = lastRecord[timeColumn] || '000000'; + maxTimestamp = date + time.padEnd(6, '0'); + } else { + // 시간 정보가 없는 경우 날짜만 + 시간은 끝시간으로 + maxTimestamp = date + '235959'; + } + } + } + + const hasMore = cleanResults.length === pageSize; + + logger.info(`Fetched ${cleanResults.length} records from ${tableName} (page ${page}, delta: ${!!lastSyncTimestamp})`); + + return { + data: cleanResults as DatabaseSchema[T][], + hasMore, + maxTimestamp + }; + } catch (error) { + logger.error(`Error fetching delta data from ${tableName}:`, error); + throw error; + } +} + +/** + * 컬럼명 정규화 (Oracle -> PostgreSQL) + */ +function normalizeColumnNames(record: any, tableSchema: any): any { + const normalizedRecord: any = {}; + const schemaColumns = Object.keys(tableSchema); + + for (const [key, value] of Object.entries(record)) { + if (!key || typeof key !== 'string' || key === 'undefined' || key.trim() === '') { + continue; + } + + // RN 컬럼 제거 (ROW_NUMBER 결과) + if (key.toUpperCase() === 'RN') { + continue; + } + + // 대문자로 변환하여 스키마와 매칭 + const upperKey = key.toUpperCase(); + + // 스키마에 해당 컬럼이 있는지 확인 + if (schemaColumns.includes(upperKey)) { + normalizedRecord[upperKey] = value; + } else { + // 스키마에 없는 컬럼은 경고 출력하지 않고 조용히 스킵 (너무 많은 로그 방지) + // logger.warn(`Column ${key} (${upperKey}) not found in schema, skipping`); + } + } + + return normalizedRecord; +} + +/** + * PostgreSQL에서 실제 constraint 존재 여부 확인 + */ +async function hasValidConstraint(tableName: string, columnNames: string[]): Promise<boolean> { + try { + if (columnNames.length === 0) return false; + + const tableNameLower = tableName.toLowerCase(); + + // Primary key나 unique constraint 확인 + const constraintQuery = sql` + SELECT tc.constraint_name, tc.constraint_type, kcu.column_name + FROM information_schema.table_constraints tc + JOIN information_schema.key_column_usage kcu + ON tc.constraint_name = kcu.constraint_name + WHERE tc.table_schema = 'nonsap' + AND tc.table_name = ${tableNameLower} + AND tc.constraint_type IN ('PRIMARY KEY', 'UNIQUE') + `; + + const result = await db.execute(constraintQuery); + const constraints = result.rows; + + // 해당 컬럼들이 실제 constraint에 포함되는지 확인 + for (const constraint of constraints) { + const constraintColumns = constraints + .filter((c: any) => c.constraint_name === constraint.constraint_name) + .map((c: any) => c.column_name.toUpperCase()); + + // 모든 컬럼이 constraint에 포함되는지 확인 + const allColumnsInConstraint = columnNames.every(col => + constraintColumns.includes(col.toUpperCase()) + ); + + if (allColumnsInConstraint) { + return true; + } + } + + return false; + } catch (error) { + logger.warn(`Failed to check constraints for ${tableName}:`, error); + return false; + } +} + +/** + * PostgreSQL에 데이터 upsert (최적화된 버전) + */ +async function upsertToPostgresOptimized<T extends TableName>( + tableName: T, + data: DatabaseSchema[T][], + primaryKeys: string[] +): Promise<void> { + if (data.length === 0) return; + + try { + const tableCamelCase = tableName.toLowerCase() + .split('_') + .map((word, index) => index === 0 ? word : word.charAt(0).toUpperCase() + word.slice(1)) + .join(''); + + const tableSchema = (nonsapSchema as any)[tableCamelCase]; + + if (!tableSchema) { + throw new Error(`Table schema not found for ${tableName} (${tableCamelCase})`); + } + + for (let i = 0; i < data.length; i += BATCH_SIZE) { + const batch = data.slice(i, i + BATCH_SIZE); + const currentBatch = Math.floor(i / BATCH_SIZE) + 1; + + try { + // primaryKeys에서 undefined 값 필터링 및 검증 + const validPrimaryKeys = primaryKeys.filter(key => + key && + typeof key === 'string' && + key !== 'ROWID' && + key !== 'undefined' && + key.trim() !== '' + ); + + if (currentBatch === 1) { + logger.info(`${tableName} - Valid primaryKeys: [${validPrimaryKeys.join(', ')}]`); + } + + // 데이터 정규화 (컬럼명 매핑) + const cleanBatch = batch + .map(record => normalizeColumnNames(record, tableSchema)) + .filter(record => Object.keys(record).length > 0); + + if (cleanBatch.length === 0) { + logger.warn(`${tableName} - No valid records after cleaning batch ${currentBatch}`); + continue; + } + + if (validPrimaryKeys.length > 0) { + // 실제 constraint 존재 여부 확인 + const hasConstraint = await hasValidConstraint(tableName, validPrimaryKeys); + + if (hasConstraint) { + // 스키마에서 실제 컬럼 객체들 가져오기 + const conflictColumns = validPrimaryKeys + .filter(key => key in tableSchema) + .map(key => tableSchema[key]); + + if (conflictColumns.length > 0) { + // 업데이트할 필드들만 선별 (기본키 제외) + const updateFields = Object.keys(cleanBatch[0]) + .filter(key => !validPrimaryKeys.includes(key) && key in tableSchema) + .reduce((acc, key) => { + acc[key] = sql.raw(`excluded."${key}"`); + return acc; + }, {} as Record<string, any>); + + await db.insert(tableSchema as any) + .values(cleanBatch) + .onConflictDoUpdate({ + target: conflictColumns, + set: updateFields + }); + } else { + // 스키마에서 컬럼을 찾을 수 없는 경우 + logger.warn(`${tableName} - Primary key columns not found in schema, using simple insert`); + await db.insert(tableSchema as any).values(cleanBatch); + } + } else { + // constraint가 없는 경우: 개별 처리로 중복 방지 + logger.info(`${tableName} - No valid constraints found, using individual upsert`); + + for (const record of cleanBatch) { + try { + await db.insert(tableSchema as any) + .values(record) + .onConflictDoNothing(); // 에러 무시하고 계속 + } catch { + // 개별 레코드 실패는 조용히 무시 + } + } + } + } else { + // 기본키가 없는 테이블: 삭제 후 재삽입 방식 + if (currentBatch === 1) { + // 첫 번째 배치에서만 기존 데이터 확인 및 삭제 + const existingDataCheck = await db.execute(sql` + SELECT COUNT(*) as count FROM ${sql.identifier('nonsap')}.${sql.identifier(tableName.toLowerCase())} + `); + + const existingCount = Number(existingDataCheck.rows[0].count); + if (existingCount > 0) { + logger.info(`${tableName} - Existing data found (${existingCount} records), deleting before insert`); + await db.execute(sql` + DELETE FROM ${sql.identifier('nonsap')}.${sql.identifier(tableName.toLowerCase())} + `); + } else { + logger.info(`${tableName} - No existing data, proceeding with fresh insert`); + } + } + + // 데이터 정규화 후 삽입 + if (cleanBatch.length > 0) { + await db.insert(tableSchema as any).values(cleanBatch); + } + } + + } catch (insertError: unknown) { + const isDuplicateKeyError = insertError && + typeof insertError === 'object' && + 'code' in insertError && + insertError.code === '23505'; + + if (!isDuplicateKeyError) { + logger.warn(`Batch upsert failed for ${tableName}, trying individual operations`, insertError); + + // 개별 처리 + for (const record of batch) { + try { + // 레코드 정규화 + const cleanRecord = normalizeColumnNames(record, tableSchema); + + if (Object.keys(cleanRecord).length > 0) { + await db.insert(tableSchema as any) + .values(cleanRecord) + .onConflictDoNothing(); + } + } catch { + // 개별 레코드 실패는 무시 + } + } + } + } + + if (currentBatch % 10 === 0) { + await new Promise(resolve => setTimeout(resolve, 50)); + } + } + + logger.success(`Successfully processed ${data.length} records for ${tableName}`); + } catch (error) { + logger.error(`Error upserting data to ${tableName}:`, error); + throw error; + } +} + +/** + * 워커를 사용한 테이블 동기화 + */ +async function syncTableWithWorker<T extends TableName>( + tableName: T, + tableIndex: number, + totalTables: number +): Promise<void> { + logger.info(`Starting enhanced sync for table: ${tableName} (${tableIndex}/${totalTables})`); + + const config = getTableSyncConfig(tableName); + const useDeltaSync = DELTA_SYNC_ENABLED && canUseDeltaSync(tableName); + const timestampColumn = getTimestampColumn(tableName); + + // 동기화 타입 결정 + let syncType: 'full' | 'delta' | 'rebuild'; + if (useDeltaSync) { + syncType = 'delta'; + } else if (config.primaryKeys.length === 0) { + // 기본키가 없는 테이블은 삭제 후 재구성 + syncType = 'rebuild'; + } else { + // 기본키가 있지만 차분 동기화가 불가능한 경우 전체 동기화 + syncType = 'full'; + } + + const syncTypeDescription = { + 'delta': '차분 동기화', + 'full': '전체 동기화', + 'rebuild': '삭제 후 재구성' + }[syncType]; + + // 동기화 진행 상태 초기화 + const progress: SyncProgress = { + tableName, + lastSyncDate: new Date().toISOString(), + currentPage: 1, + totalProcessed: 0, + status: 'running', + syncType, + startTime: Date.now() + }; + syncProgress.set(tableName, progress); + + try { + let page = 1; + let totalProcessed = 0; + let hasMore = true; + let maxTimestamp: string | null = null; + let lastSyncTimestamp: string | null = null; + + // 차분 동기화 정보 조회 + if (useDeltaSync) { + const deltaInfo = await getLastSyncInfo(tableName); + lastSyncTimestamp = deltaInfo.lastSyncTimestamp; + logger.info(`${syncTypeDescription} for ${tableName}, last timestamp: ${lastSyncTimestamp || 'none'}`); + } else { + logger.info(`${syncTypeDescription} for ${tableName}`); + } + + while (hasMore) { + let result: { data: DatabaseSchema[T][]; hasMore: boolean; maxTimestamp: string | null }; + + if (useDeltaSync && timestampColumn) { + // 차분 동기화 사용 + result = await fetchOracleDataDelta( + tableName, + lastSyncTimestamp, + timestampColumn, + page + ); + } else { + // 전체 동기화 사용 (기존 방식) + const oracleData = await fetchOracleDataFull(tableName, page); + result = { + data: oracleData, + hasMore: oracleData.length === PAGE_SIZE, + maxTimestamp: null + }; + } + + if (result.data.length === 0) { + logger.info(`No new data found for ${tableName}, skipping...`); + progress.status = 'skipped'; + break; + } + + // PostgreSQL에 upsert + await upsertToPostgresOptimized(tableName, result.data, config.primaryKeys); + + totalProcessed += result.data.length; + hasMore = result.hasMore; + + if (result.maxTimestamp) { + maxTimestamp = result.maxTimestamp; + } + + // 진행 상태 업데이트 + progress.currentPage = page; + progress.totalProcessed = totalProcessed; + + page++; + + // 차분 동기화가 아닌 경우 페이지 크기보다 적으면 중단 + if (!useDeltaSync && result.data.length < PAGE_SIZE) { + hasMore = false; + } + // 차분 동기화에서는 연속된 빈 페이지가 나오면 중단 + else if (useDeltaSync && result.data.length < PAGE_SIZE) { + hasMore = false; + } + } + + // 동기화 완료 처리 + progress.status = progress.status === 'skipped' ? 'skipped' : 'completed'; + progress.endTime = Date.now(); + + // 마지막 동기화 정보 업데이트 + if (maxTimestamp && useDeltaSync) { + await updateLastSyncInfo(tableName, maxTimestamp, totalProcessed); + } + + const duration = (progress.endTime - progress.startTime) / 1000; + logger.success(`Table ${tableName} sync completed. Processed: ${totalProcessed}, Duration: ${duration.toFixed(2)}s, Type: ${syncTypeDescription}`); + + } catch (error) { + progress.status = 'error'; + progress.lastError = error instanceof Error ? error.message : String(error); + progress.endTime = Date.now(); + + logger.error(`Table ${tableName} sync failed:`, error); + throw error; + } +} + +/** + * 멀티스레드 동기화 (테이블별 병렬 처리) + */ +async function syncTablesInParallel(tables: TableName[]): Promise<void> { + const chunks: TableName[][] = []; + const chunkSize = Math.ceil(tables.length / MAX_WORKERS); + + for (let i = 0; i < tables.length; i += chunkSize) { + chunks.push(tables.slice(i, i + chunkSize)); + } + + logger.info(`Processing ${tables.length} tables in ${chunks.length} parallel groups`); + + const promises = chunks.map(async (tableChunk, chunkIndex) => { + logger.info(`Starting worker ${chunkIndex + 1} with ${tableChunk.length} tables`); + + for (let i = 0; i < tableChunk.length; i++) { + const tableName = tableChunk[i]; + const globalIndex = chunks.slice(0, chunkIndex).reduce((sum, chunk) => sum + chunk.length, 0) + i + 1; + + try { + await syncTableWithWorker(tableName, globalIndex, tables.length); + } catch (error) { + logger.error(`Worker ${chunkIndex + 1} failed on table ${tableName}:`, error); + // 개별 테이블 실패는 무시하고 계속 진행 + } + } + + logger.info(`Worker ${chunkIndex + 1} completed`); + }); + + await Promise.all(promises); +} + +/** + * 모든 테이블 동기화 (개선된 버전) + */ +async function syncAllTablesEnhanced(): Promise<void> { + logger.header('Starting Enhanced NONSAP Data Synchronization'); + + await initializeSyncStatusTable(); + + const startTime = Date.now(); + let successCount = 0; + let errorCount = 0; + let skippedCount = 0; + const totalTables = ALL_TABLE_NAMES.length; + + logger.info(`Total tables to sync: ${totalTables}`); + logger.info(`Delta sync enabled: ${DELTA_SYNC_ENABLED}`); + logger.info(`Max workers: ${MAX_WORKERS}`); + + try { + await syncTablesInParallel(ALL_TABLE_NAMES); + + // 결과 집계 + for (const progress of syncProgress.values()) { + switch (progress.status) { + case 'completed': + successCount++; + break; + case 'error': + errorCount++; + break; + case 'skipped': + skippedCount++; + break; + } + } + + } catch (error) { + logger.error('Sync process failed:', error); + } + + const duration = Date.now() - startTime; + logger.info(`Enhanced sync completed in ${(duration / 1000).toFixed(2)}s`); + logger.info(`Success: ${successCount}, Errors: ${errorCount}, Skipped: ${skippedCount}`); + logger.info(`Total tables processed: ${totalTables}`); + + if (errorCount > 0) { + logger.warn(`${errorCount} tables had errors during sync`); + } + + if (successCount + skippedCount === totalTables) { + logger.success('🚀 All tables synchronized successfully with enhanced performance!'); + } +} + +/** + * 동기화 상태 조회 API (개선된 버전) + */ +export async function getSyncProgressEnhanced(): Promise<SyncProgress[]> { + return Array.from(syncProgress.values()); +} + +/** + * 수동 동기화 트리거 (개선된 버전) + */ +export async function triggerEnhancedSync(): Promise<void> { + logger.info('Enhanced manual sync triggered'); + await syncAllTablesEnhanced(); +} + +/** + * 특정 테이블 전체 재동기화 + */ +export async function triggerFullResync(tableName: TableName): Promise<void> { + logger.info(`Full resync triggered for table: ${tableName}`); + + try { + // 마지막 동기화 정보 삭제 (전체 동기화 강제) + await db.execute(sql` + DELETE FROM nonsap.${sql.identifier(SYNC_STATUS_TABLE)} + WHERE table_name = ${tableName} + `); + + await syncTableWithWorker(tableName, 1, 1); + logger.success(`Full resync completed for ${tableName}`); + } catch (error) { + logger.error(`Full resync failed for ${tableName}:`, error); + throw error; + } +} + +/** + * 개선된 동기화 스케줄러 시작 + */ +export async function startEnhancedSyncScheduler(): Promise<void> { + logger.info('Initializing Enhanced NONSAP data synchronization scheduler...'); + + try { + // 매일 새벽 1시에 실행 (아래 타임존 설정이 있으므로 KST로 기입 가능) + cron.schedule('0 1 * * *', async () => { + try { + logger.info('Cron job triggered: Starting enhanced scheduled sync'); + + const isConnected = await testOracleConnection(); + if (!isConnected) { + logger.warn('Oracle DB not available, skipping sync'); + return; + } + + await syncAllTablesEnhanced(); + } catch (error) { + logger.error('Enhanced scheduled sync failed:', error); + } + }, { + timezone: 'Asia/Seoul' + }); + + logger.success(`Enhanced NONSAP sync scheduler registered (every 30 minutes) for ${ALL_TABLE_NAMES.length} tables`); + + if (process.env.NODE_ENV === 'development') { + logger.info('Development mode: Enhanced cron registered but initial sync skipped'); + return; + } + + if (process.env.SYNC_ON_START === 'true') { + logger.info('Initial enhanced sync on startup enabled'); + setTimeout(async () => { + try { + const isConnected = await testOracleConnection(); + if (isConnected) { + await syncAllTablesEnhanced(); + } else { + logger.warn('Initial enhanced sync skipped - Oracle DB not available'); + } + } catch (error) { + logger.error('Initial enhanced sync failed:', error); + } + }, 10000); + } + + } catch (error) { + logger.error('Failed to set up enhanced cron scheduler:', error); + logger.warn('Application will continue without enhanced sync scheduler'); + } +} + +/** + * Oracle DB 연결 테스트 + */ +async function testOracleConnection(): Promise<boolean> { + try { + const result = await oracleKnex.raw('SELECT 1 FROM DUAL'); + return !!result; + } catch (error) { + logger.error('Oracle DB connection test failed:', error); + return false; + } +}
\ No newline at end of file diff --git a/lib/nonsap-sync/sync-config.ts b/lib/nonsap-sync/sync-config.ts new file mode 100644 index 00000000..a18dd208 --- /dev/null +++ b/lib/nonsap-sync/sync-config.ts @@ -0,0 +1,46 @@ +// NONSAP 동기화 설정 +export interface SyncConfig { + pageSize: number; + batchSize: number; + maxWorkers: number; + deltaSyncEnabled: boolean; + cronSchedule: string; + autoRefreshInterval: number; // UI 자동 새로고침 간격 (ms) +} + +// 환경별 설정 +const configs: Record<string, SyncConfig> = { + development: { + pageSize: 1000, + batchSize: 50, + maxWorkers: 4, + deltaSyncEnabled: true, + cronSchedule: '0 0 1 * * *', // 매일 새벽 1시 + autoRefreshInterval: 1000, // 1초 + }, + production: { + pageSize: 5000, + batchSize: 200, + maxWorkers: 4, + deltaSyncEnabled: true, + cronSchedule: '0 0 1 * * *', // 매일 새벽 1시 + autoRefreshInterval: 30000, // 30초 + } +}; + +// 현재 환경의 설정 가져오기 +export const SYNC_CONFIG: SyncConfig = configs[process.env.NODE_ENV || 'development']; + +// 개별 설정값들 (기존 코드와의 호환성을 위해) +export const PAGE_SIZE = SYNC_CONFIG.pageSize; +export const BATCH_SIZE = SYNC_CONFIG.batchSize; +export const MAX_WORKERS = SYNC_CONFIG.maxWorkers; +export const DELTA_SYNC_ENABLED = SYNC_CONFIG.deltaSyncEnabled; + +// 설정 정보를 반환하는 함수 (API에서 사용) +export function getSyncConfigInfo() { + return { + ...SYNC_CONFIG, + environment: process.env.NODE_ENV || 'development' + }; +}
\ No newline at end of file diff --git a/lib/nonsap-sync/sync-service.ts b/lib/nonsap-sync/sync-service.ts new file mode 100644 index 00000000..286952eb --- /dev/null +++ b/lib/nonsap-sync/sync-service.ts @@ -0,0 +1,414 @@ +"use server"; + +import * as cron from 'node-cron'; +import { oracleKnex } from '@/lib/oracle-db/db'; +import db from '@/db/db'; +import { DatabaseSchema, TableName, ALL_TABLE_NAMES } from '@/lib/oracle-db/nonsap/oracle-schema'; +import * as nonsapSchema from '@/db/schema/NONSAP/nonsap'; + +// oracle-schema.ts에서 테이블 목록 자동 추출 +const SYNC_TABLES: TableName[] = ALL_TABLE_NAMES; + +// 페이지 단위 +const PAGE_SIZE = 2000; +const BATCH_SIZE = 100; + +interface SyncProgress { + tableName: string; + lastSyncDate: string; + currentPage: number; + totalProcessed: number; + status: 'running' | 'completed' | 'error'; + lastError?: string; +} + +// 동기화 진행 상태 저장 +const syncProgress = new Map<string, SyncProgress>(); + +// 진행률 바 유틸리티 +const createProgressBar = (current: number, total: number, width: number = 20): string => { + const filled = Math.floor((current / total) * width); + const empty = width - filled; + const percentage = Math.round((current / total) * 100); + + const bar = '█'.repeat(filled) + '░'.repeat(empty); + return `[ ${current.toString().padStart(3)} / ${total.toString().padEnd(3)} | ${bar} | ${percentage.toString().padStart(3)}% ]`; +}; + +// 같은 줄에서 업데이트되는 진행률 바 출력 +const updateProgressBar = (message: string, current: number, total: number): void => { + const progressBar = createProgressBar(current, total); + const fullMessage = `[NONSAP-SYNC] ${message} ${progressBar}`; + + // 이전 줄을 지우고 새로운 진행률 출력 + process.stdout.write('\r' + ' '.repeat(100) + '\r'); // 줄 지우기 + process.stdout.write(fullMessage); + + // 완료되면 줄바꿈 + if (current >= total) { + process.stdout.write('\n'); + } +}; + +// 간단한 로거 +const logger = { + info: (message: string, ...args: unknown[]) => console.log(`[NONSAP-SYNC INFO] ${message}`, ...args), + error: (message: string, ...args: unknown[]) => console.error(`[NONSAP-SYNC ERROR] ${message}`, ...args), + warn: (message: string, ...args: unknown[]) => console.warn(`[NONSAP-SYNC WARN] ${message}`, ...args), + success: (message: string, ...args: unknown[]) => console.log(`[NONSAP-SYNC SUCCESS] ${message}`, ...args), + header: (message: string) => { + console.log('\n' + '='.repeat(80)); + console.log(`[NONSAP-SYNC] ${message}`); + console.log('='.repeat(80) + '\n'); + }, + progress: updateProgressBar +}; + +/** + * Oracle DB에서 특정 테이블의 데이터를 페이지 단위로 조회 + */ +async function fetchOracleData<T extends TableName>( + tableName: T, + page: number, + pageSize: number = PAGE_SIZE +): Promise<DatabaseSchema[T][]> { + const offset = (page - 1) * pageSize; + + try { + // Oracle에서는 ROWID나 다른 고유 컬럼으로 정렬해야 함 + const query = oracleKnex(tableName) + .select('*') + .orderBy('ROWID') // Oracle ROWID로 정렬 (또는 적절한 기본키) + .offset(offset) + .limit(pageSize); + + const results = await query; + logger.info(`Fetched ${results.length} records from ${tableName} (page ${page})`); + + return results as DatabaseSchema[T][]; + } catch (error) { + logger.error(`Error fetching data from ${tableName}:`, error); + throw error; + } +} + +/** + * PostgreSQL에 데이터 upsert + */ +async function upsertToPostgres<T extends TableName>( + tableName: T, + data: DatabaseSchema[T][], + pageInfo?: { current: number; total: number } +): Promise<void> { + if (data.length === 0) return; + + try { + // 테이블명을 camelCase로 변환하여 스키마에서 찾기 + const tableCamelCase = tableName.toLowerCase() + .split('_') + .map((word, index) => index === 0 ? word : word.charAt(0).toUpperCase() + word.slice(1)) + .join(''); + + const tableSchema = (nonsapSchema as any)[tableCamelCase]; + + if (!tableSchema) { + throw new Error(`Table schema not found for ${tableName} (${tableCamelCase})`); + } + + const totalBatches = Math.ceil(data.length / BATCH_SIZE); + + // 배치 단위로 upsert 처리 + for (let i = 0; i < data.length; i += BATCH_SIZE) { + const batch = data.slice(i, i + BATCH_SIZE); + const currentBatch = Math.floor(i / BATCH_SIZE) + 1; + + // 배치 진행률 표시 + const batchMessage = `${tableName} - Batch Processing`; + logger.progress(batchMessage, currentBatch, totalBatches); + + try { + // PostgreSQL에 삽입 시도 + await db.insert(tableSchema as any).values(batch); + } catch (insertError: unknown) { + // 중복키 에러인지 확인 + const isDuplicateKeyError = insertError && + typeof insertError === 'object' && + 'code' in insertError && + insertError.code === '23505'; + + // Production 환경에서는 중복키 에러 로그 생략 + if (!isDuplicateKeyError) { + logger.warn(`Batch insert failed for ${tableName}, trying individual upserts`, insertError); + } + + // 삽입 실패 시 개별 레코드 upsert 시도 + for (const record of batch) { + try { + await db.insert(tableSchema as any) + .values(record) + .onConflictDoNothing(); // 중복 시 무시하거나 업데이트 + } catch (upsertError) { + // 중복키 에러인지 확인 + const isDuplicateKeyError = upsertError && + typeof upsertError === 'object' && + 'code' in upsertError && + upsertError.code === '23505'; + + // Production 환경에서는 중복키 에러 로그 생략 + if (!isDuplicateKeyError) { + logger.error(`Failed to upsert record in ${tableName}:`, upsertError); + } + // 개별 레코드 실패는 로그만 남기고 계속 진행 + } + } + } + + // 잠시 대기 (시스템 부하 방지) + if (currentBatch % 10 === 0) { + await new Promise(resolve => setTimeout(resolve, 100)); + } + } + + logger.success(`Successfully processed ${data.length} records for ${tableName}`); + } catch (error) { + logger.error(`Error upserting data to ${tableName}:`, error); + throw error; + } +} + +/** + * 특정 테이블 동기화 + */ +async function syncTable<T extends TableName>( + tableName: T, + tableIndex: number, + totalTables: number +): Promise<void> { + logger.info(`Starting sync for table: ${tableName} (${tableIndex}/${totalTables})`); + + // 동기화 진행 상태 초기화 + syncProgress.set(tableName, { + tableName, + lastSyncDate: new Date().toISOString(), + currentPage: 1, + totalProcessed: 0, + status: 'running' + }); + + try { + let page = 1; + let totalProcessed = 0; + let hasMore = true; + let estimatedTotalPages = 1; // 추정 총 페이지 수 + + while (hasMore) { + // Oracle에서 데이터 조회 + const oracleData = await fetchOracleData(tableName, page); + + if (oracleData.length === 0) { + hasMore = false; + break; + } + + // 첫 페이지에서 대략적인 총 페이지 수 추정 + if (page === 1 && oracleData.length === PAGE_SIZE) { + // 첫 페이지가 가득 찼다면 더 많은 페이지가 있을 것으로 추정 + estimatedTotalPages = Math.max(10, page * 2); // 최소 10페이지로 추정 + } + + // 페이지 진행률 표시 + const pageMessage = `${tableName} - Page Processing`; + const displayTotalPages = hasMore ? Math.max(estimatedTotalPages, page + 1) : page; + logger.progress(pageMessage, page, displayTotalPages); + + // PostgreSQL에 upsert (페이지 정보 전달) + await upsertToPostgres(tableName, oracleData, { current: page, total: displayTotalPages }); + + totalProcessed += oracleData.length; + + // 진행 상태 업데이트 + const progress = syncProgress.get(tableName)!; + progress.currentPage = page; + progress.totalProcessed = totalProcessed; + + // 다음 페이지로 + page++; + + // 페이지 크기보다 적으면 마지막 페이지 + if (oracleData.length < PAGE_SIZE) { + hasMore = false; + // 마지막 페이지 진행률 업데이트 + logger.progress(pageMessage, page - 1, page - 1); + } else { + // 추정 페이지 수 업데이트 + estimatedTotalPages = Math.max(estimatedTotalPages, page + 5); + } + } + + // 동기화 완료 + const progress = syncProgress.get(tableName)!; + progress.status = 'completed'; + + logger.success(`Table ${tableName} sync completed. Total processed: ${totalProcessed}`); + + } catch (error) { + const progress = syncProgress.get(tableName)!; + progress.status = 'error'; + progress.lastError = error instanceof Error ? error.message : String(error); + + logger.error(`Table ${tableName} sync failed:`, error); + throw error; + } +} + +/** + * 모든 테이블 동기화 + */ +async function syncAllTables(): Promise<void> { + logger.header('Starting NONSAP Data Synchronization'); + + const startTime = Date.now(); + let successCount = 0; + let errorCount = 0; + const totalTables = SYNC_TABLES.length; + + // 전체 테이블 진행률 표시 + logger.info(`Total tables to sync: ${totalTables}`); + + // oracle-schema.ts에서 테이블 목록 가져오기 + for (let i = 0; i < SYNC_TABLES.length; i++) { + const tableName = SYNC_TABLES[i]; + const tableIndex = i + 1; + + // 전체 테이블 진행률 표시 + const overallMessage = `Overall Progress`; + logger.progress(overallMessage, tableIndex - 1, totalTables); + + try { + await syncTable(tableName, tableIndex, totalTables); + successCount++; + } catch (error) { + errorCount++; + logger.error(`Failed to sync table ${tableName}:`, error); + // 에러가 발생해도 다른 테이블은 계속 동기화 + continue; + } + } + + // 최종 진행률 표시 + const overallMessage = `Overall Progress`; + logger.progress(overallMessage, totalTables, totalTables); + + const duration = Date.now() - startTime; + logger.info(`Sync completed in ${(duration / 1000).toFixed(2)}s`); + logger.info(`Success: ${successCount}, Errors: ${errorCount}`); + logger.info(`Total tables processed: ${totalTables}`); + + if (errorCount > 0) { + logger.warn(`${errorCount} tables had errors during sync`); + } + + if (successCount === totalTables) { + logger.success('🎉 All tables synchronized successfully!'); + } +} + +/** + * 동기화 상태 조회 API + */ +export async function getSyncProgress(): Promise<SyncProgress[]> { + return Array.from(syncProgress.values()); +} + +/** + * 수동 동기화 트리거 + */ +export async function triggerManualSync(): Promise<void> { + logger.info('Manual sync triggered'); + await syncAllTables(); +} + +/** + * Oracle DB 연결 테스트 + */ +async function testOracleConnection(): Promise<boolean> { + try { + const result = await oracleKnex.raw('SELECT 1 FROM DUAL'); + return !!result; + } catch (error) { + logger.error('Oracle DB connection test failed:', error); + return false; + } +} + +/** + * 동기화 스케줄러 시작 + */ +export async function startNonsapSyncScheduler(): Promise<void> { + logger.info('Initializing NONSAP data synchronization scheduler...'); + + // Oracle DB 연결 테스트 (비동기) + testOracleConnection().then(isConnected => { + if (!isConnected) { + logger.warn('Oracle DB connection failed - sync scheduler will be disabled'); + logger.warn('Application will continue to run normally'); + return; + } + logger.success('Oracle DB connection test passed'); + }).catch(error => { + logger.error('Oracle DB connection test error:', error); + logger.warn('Sync scheduler will be disabled, application continues'); + }); + + try { + // 매 시간마다 실행 (0분에) + cron.schedule('0 * * * *', async () => { + try { + logger.info('Cron job triggered: Starting scheduled sync'); + + // 동기화 전 Oracle 연결 확인 + const isConnected = await testOracleConnection(); + if (!isConnected) { + logger.warn('Oracle DB not available, skipping sync'); + return; + } + + await syncAllTables(); + } catch (error) { + logger.error('Scheduled sync failed:', error); + // 동기화 실패해도 다음 스케줄은 계속 실행 + } + }, { + timezone: 'Asia/Seoul' + }); + + logger.success(`NONSAP data synchronization cron job registered (every hour) for ${SYNC_TABLES.length} tables`); + + // 개발 환경에서는 스케줄러만 등록하고 실제 실행은 안 함 + if (process.env.NODE_ENV === 'development') { + logger.info('Development mode: Cron registered but initial sync skipped'); + return; + } + + // 애플리케이션 시작 시 한 번 실행 (선택사항) + if (process.env.SYNC_ON_START === 'true') { + logger.info('Initial sync on startup enabled'); + setTimeout(async () => { + try { + const isConnected = await testOracleConnection(); + if (isConnected) { + await syncAllTables(); + } else { + logger.warn('Initial sync skipped - Oracle DB not available'); + } + } catch (error) { + logger.error('Initial sync failed:', error); + } + }, 10000); // 10초 후 실행 (DB 연결 안정화 대기) + } + + } catch (error) { + logger.error('Failed to set up cron scheduler:', error); + logger.warn('Application will continue without sync scheduler'); + } +}
\ No newline at end of file diff --git a/lib/nonsap-sync/table-config.ts b/lib/nonsap-sync/table-config.ts new file mode 100644 index 00000000..85744b44 --- /dev/null +++ b/lib/nonsap-sync/table-config.ts @@ -0,0 +1,352 @@ +import { TableName } from '@/lib/oracle-db/nonsap/oracle-schema'; + +export interface TableSyncConfig { + /** 차분 동기화에 사용할 타임스탬프 컬럼 (우선순위 순) */ + timestampColumns: string[]; + /** 기본키 컬럼들 */ + primaryKeys: string[]; + /** 정렬에 사용할 컬럼 (차분 동기화용) */ + orderByColumns: string[]; + /** 인덱스가 있는 컬럼인지 여부 */ + hasTimestampIndex: boolean; + /** 예상 변경 빈도 (high/medium/low) */ + changeFrequency: 'high' | 'medium' | 'low'; +} + +// 테이블별 동기화 설정 +export const TABLE_SYNC_CONFIG: Record<TableName, TableSyncConfig> = { + // 자재 관련 테이블 (변경 빈도 높음) + 'CMCTB_MAT_BSE': { + timestampColumns: ['FIN_CHG_DTM', 'FS_INP_DTM', 'IF_DT'], + primaryKeys: ['MAT_NO'], + orderByColumns: ['FIN_CHG_DTM', 'MAT_NO'], + hasTimestampIndex: true, + changeFrequency: 'high' + }, + 'CMCTB_MAT_PLNT': { + timestampColumns: ['IF_DT', 'IF_TM'], + primaryKeys: ['MAT_NO', 'PLNT'], + orderByColumns: ['IF_DT', 'IF_TM', 'MAT_NO'], + hasTimestampIndex: true, + changeFrequency: 'high' + }, + + // Customer 관련 테이블 (변경 빈도 중간) + 'CMCTB_CUSTOMER_GENERAL': { + timestampColumns: ['IF_DT', 'CHG_DT'], + primaryKeys: ['CSTM_CD'], + orderByColumns: ['IF_DT', 'IF_TM', 'CSTM_CD'], + hasTimestampIndex: true, + changeFrequency: 'medium' + }, + 'CMCTB_CUSTOMER_ADDR': { + timestampColumns: ['IF_DT', 'IF_TM'], + primaryKeys: ['CSTM_CD', 'ADR_NO', 'INTL_ADR_VER_ID'], + orderByColumns: ['IF_DT', 'IF_TM', 'CSTM_CD'], + hasTimestampIndex: true, + changeFrequency: 'medium' + }, + + // 코드 관련 테이블 (변경 빈도 낮음) - 기본값으로 설정 + 'CMCTB_CD': { + timestampColumns: ['CHG_DT', 'CRTE_DT'], + primaryKeys: [], // PostgreSQL 스키마에 기본키 미정의 + orderByColumns: ['CHG_DT', 'CHG_TM', 'CD_CLF'], + hasTimestampIndex: false, + changeFrequency: 'low' + }, + 'CMCTB_CDNM': { + timestampColumns: ['CHG_DT', 'CHG_TM', 'CRTE_DT', 'CRTE_TM'], + primaryKeys: [], // PostgreSQL 스키마에 기본키 미정의 + orderByColumns: ['CHG_DT', 'CHG_TM', 'LANG_KEY', 'CD_CLF'], + hasTimestampIndex: false, + changeFrequency: 'low' + }, + 'CMCTB_CD_CLF': { + timestampColumns: ['XDATS', 'XTIMS', 'CHG_DT', 'CHG_TM'], + primaryKeys: [], // PostgreSQL 스키마에 기본키 미정의 + orderByColumns: ['XDATS', 'XTIMS', 'CD_CLF'], + hasTimestampIndex: false, + changeFrequency: 'low' + }, + 'CMCTB_CD_CLF_NM': { + timestampColumns: ['CHG_DT', 'CHG_TM', 'CRTE_DT', 'CRTE_TM'], + primaryKeys: [], // PostgreSQL 스키마에 기본키 미정의 + orderByColumns: ['CHG_DT', 'CHG_TM', 'LANG_KEY', 'CD_CLF'], + hasTimestampIndex: false, + changeFrequency: 'low' + }, + + // Customer 관련 테이블 (변경 빈도 중간) + 'CMCTB_CUSTOMER_CFPN': { + timestampColumns: ['IF_DT', 'IF_TM'], + primaryKeys: ['CSTM_CD', 'SALE_ORG_CD', 'DIST_PATH', 'PDT_GRP', 'PTNR_SKL', 'PTNR_CNT'], + orderByColumns: ['IF_DT', 'IF_TM', 'CSTM_CD'], + hasTimestampIndex: true, + changeFrequency: 'medium' + }, + 'CMCTB_CUSTOMER_COMPNY': { + timestampColumns: ['IF_DT', 'IF_TM'], + primaryKeys: ['CSTM_CD', 'CO_ID'], + orderByColumns: ['IF_DT', 'IF_TM', 'CSTM_CD'], + hasTimestampIndex: true, + changeFrequency: 'medium' + }, + 'CMCTB_CUSTOMER_REPREMAIL': { + timestampColumns: ['IF_DT', 'IF_TM'], + primaryKeys: ['CSTM_CD', 'ADR_NO', 'REPR_SER', 'VLD_ST_DT'], + orderByColumns: ['IF_DT', 'IF_TM', 'CSTM_CD'], + hasTimestampIndex: true, + changeFrequency: 'medium' + }, + 'CMCTB_CUSTOMER_REPRFAX': { + timestampColumns: ['IF_DT', 'IF_TM'], + primaryKeys: ['CSTM_CD', 'ADR_NO', 'REPR_SER', 'VLD_ST_DT'], + orderByColumns: ['IF_DT', 'IF_TM', 'CSTM_CD'], + hasTimestampIndex: true, + changeFrequency: 'medium' + }, + 'CMCTB_CUSTOMER_REPRTEL': { + timestampColumns: ['IF_DT', 'IF_TM'], + primaryKeys: ['CSTM_CD', 'ADR_NO', 'REPR_SER', 'VLD_ST_DT'], + orderByColumns: ['IF_DT', 'IF_TM', 'CSTM_CD'], + hasTimestampIndex: true, + changeFrequency: 'medium' + }, + 'CMCTB_CUSTOMER_REPRURL': { + timestampColumns: ['IF_DT', 'IF_TM'], + primaryKeys: ['CSTM_CD', 'ADR_NO', 'REPR_SER', 'VLD_ST_DT'], + orderByColumns: ['IF_DT', 'IF_TM', 'CSTM_CD'], + hasTimestampIndex: true, + changeFrequency: 'medium' + }, + 'CMCTB_CUSTOMER_SORG': { + timestampColumns: ['IF_DT', 'IF_TM'], + primaryKeys: ['CSTM_CD', 'SALE_ORG_CD', 'DIST_PATH', 'PDT_GRP'], + orderByColumns: ['IF_DT', 'IF_TM', 'CSTM_CD'], + hasTimestampIndex: true, + changeFrequency: 'medium' + }, + 'CMCTB_CUSTOMER_TAXCD': { + timestampColumns: ['IF_DT', 'IF_TM'], + primaryKeys: ['CSTM_CD', 'DPRT_NTN', 'TX_CTG'], + orderByColumns: ['IF_DT', 'IF_TM', 'CSTM_CD'], + hasTimestampIndex: true, + changeFrequency: 'medium' + }, + 'CMCTB_CUSTOMER_TAXNUM': { + timestampColumns: ['IF_DT', 'IF_TM'], + primaryKeys: ['CSTM_CD', 'TX_NO_CTG'], + orderByColumns: ['IF_DT', 'IF_TM', 'CSTM_CD'], + hasTimestampIndex: true, + changeFrequency: 'medium' + }, + + // 자재 관련 테이블 (변경 빈도 높음) + 'CMCTB_MAT_CLAS': { + timestampColumns: ['CHG_DT'], + primaryKeys: ['CLAS_CD'], + orderByColumns: ['CHG_DT', 'CLAS_CD'], + hasTimestampIndex: false, + changeFrequency: 'medium' + }, + 'CMCTB_MAT_CLAS_SPCHAR': { + timestampColumns: ['CHG_DT'], + primaryKeys: ['CLAS_CD', 'SPCHAR_CD'], + orderByColumns: ['CHG_DT', 'CLAS_CD'], + hasTimestampIndex: false, + changeFrequency: 'medium' + }, + 'CMCTB_MAT_DSC': { + timestampColumns: [], // 시간 컬럼 없음 + primaryKeys: ['MAT_NO', 'LANG_KEY'], + orderByColumns: ['MAT_NO', 'LANG_KEY'], + hasTimestampIndex: false, + changeFrequency: 'low' + }, + 'CMCTB_MAT_SPCHAR': { + timestampColumns: [], // 시간 컬럼 없음 + primaryKeys: ['MAT_NO', 'SPCHAR_CD'], + orderByColumns: ['MAT_NO', 'SPCHAR_CD'], + hasTimestampIndex: false, + changeFrequency: 'medium' + }, + 'CMCTB_MAT_SPCHAR_MAST': { + timestampColumns: ['CHG_DT'], + primaryKeys: ['SPCHAR_CD'], + orderByColumns: ['CHG_DT', 'SPCHAR_CD'], + hasTimestampIndex: false, + changeFrequency: 'low' + }, + 'CMCTB_MAT_SPCHAR_VAL': { + timestampColumns: ['CHG_DT'], + primaryKeys: ['SPCHAR_CD', 'SPCHAR_VAL_CD'], + orderByColumns: ['CHG_DT', 'SPCHAR_CD'], + hasTimestampIndex: false, + changeFrequency: 'low' + }, + 'CMCTB_MAT_UOM': { + timestampColumns: [], // 시간 컬럼 없음 + primaryKeys: ['MAT_NO', 'SBST_UOM'], + orderByColumns: ['MAT_NO', 'SBST_UOM'], + hasTimestampIndex: false, + changeFrequency: 'low' + }, + + // 프로젝트 관련 테이블 + 'CMCTB_PROJ_BIZCLS': { + timestampColumns: [], // 시간 컬럼 없음 + primaryKeys: ['PROJ_NO', 'TYPE'], + orderByColumns: ['PROJ_NO', 'TYPE'], + hasTimestampIndex: false, + changeFrequency: 'medium' + }, + 'CMCTB_PROJ_MAST': { + timestampColumns: ['XDATS', 'XTIMS'], + primaryKeys: ['PROJ_NO'], + orderByColumns: ['XDATS', 'XTIMS', 'PROJ_NO'], + hasTimestampIndex: true, + changeFrequency: 'medium' + }, + 'CMCTB_PROJ_WBS': { + timestampColumns: ['XDATS', 'XTIMS'], + primaryKeys: ['PROJ_NO', 'WBS_ELMT'], + orderByColumns: ['XDATS', 'XTIMS', 'PROJ_NO'], + hasTimestampIndex: true, + changeFrequency: 'medium' + }, + + // Vendor 관련 테이블 + 'CMCTB_VENDOR_ADDR': { + timestampColumns: ['IF_DT', 'IF_TM'], + primaryKeys: ['VNDRCD', 'INTL_ADR_VER_ID'], + orderByColumns: ['IF_DT', 'IF_TM', 'VNDRCD'], + hasTimestampIndex: true, + changeFrequency: 'medium' + }, + 'CMCTB_VENDOR_COMPNY': { + timestampColumns: ['IF_DT', 'IF_TM'], + primaryKeys: ['VNDRCD', 'CO_CD'], + orderByColumns: ['IF_DT', 'IF_TM', 'VNDRCD'], + hasTimestampIndex: true, + changeFrequency: 'medium' + }, + 'CMCTB_VENDOR_GENERAL': { + timestampColumns: ['IF_DT', 'IF_TM', 'CHG_DT', 'CHG_TM'], + primaryKeys: ['VNDRCD'], + orderByColumns: ['IF_DT', 'IF_TM', 'VNDRCD'], + hasTimestampIndex: true, + changeFrequency: 'medium' + }, + 'CMCTB_VENDOR_GRP': { + timestampColumns: ['CHG_DT', 'CHG_TM', 'CRTE_DT', 'CRTE_TM'], + primaryKeys: ['VNDRCD', 'BIZ_GRP_CD'], + orderByColumns: ['CHG_DT', 'CHG_TM', 'VNDRCD'], + hasTimestampIndex: false, + changeFrequency: 'medium' + }, + 'CMCTB_VENDOR_INCO': { + timestampColumns: ['CHG_DT', 'CHG_TM', 'CRTE_DT', 'CRTE_TM'], + primaryKeys: ['VNDRCD'], + orderByColumns: ['CHG_DT', 'CHG_TM', 'VNDRCD'], + hasTimestampIndex: false, + changeFrequency: 'medium' + }, + 'CMCTB_VENDOR_PORG': { + timestampColumns: ['IF_DT', 'IF_TM'], + primaryKeys: ['VNDRCD', 'PUR_ORG_CD'], + orderByColumns: ['IF_DT', 'IF_TM', 'VNDRCD'], + hasTimestampIndex: true, + changeFrequency: 'medium' + }, + 'CMCTB_VENDOR_REPREMAIL': { + timestampColumns: ['IF_DT', 'IF_TM'], + primaryKeys: ['VNDRCD', 'REPR_SER', 'VLD_ST_DT'], + orderByColumns: ['IF_DT', 'IF_TM', 'VNDRCD'], + hasTimestampIndex: true, + changeFrequency: 'medium' + }, + 'CMCTB_VENDOR_REPRFAX': { + timestampColumns: ['IF_DT', 'IF_TM'], + primaryKeys: ['VNDRCD', 'REPR_SER', 'VLD_ST_DT'], + orderByColumns: ['IF_DT', 'IF_TM', 'VNDRCD'], + hasTimestampIndex: true, + changeFrequency: 'medium' + }, + 'CMCTB_VENDOR_REPRTEL': { + timestampColumns: ['IF_DT', 'IF_TM'], + primaryKeys: ['VNDRCD', 'REPR_SER', 'VLD_ST_DT'], + orderByColumns: ['IF_DT', 'IF_TM', 'VNDRCD'], + hasTimestampIndex: true, + changeFrequency: 'medium' + }, + 'CMCTB_VENDOR_REPRURL': { + timestampColumns: ['IF_DT', 'IF_TM'], + primaryKeys: ['VNDRCD', 'REPR_SER', 'VLD_ST_DT'], + orderByColumns: ['IF_DT', 'IF_TM', 'VNDRCD'], + hasTimestampIndex: true, + changeFrequency: 'medium' + }, + 'CMCTB_VENDOR_TAXNUM': { + timestampColumns: ['IF_DT', 'IF_TM'], + primaryKeys: ['VNDRCD', 'TX_NO_CTG'], + orderByColumns: ['IF_DT', 'IF_TM', 'VNDRCD'], + hasTimestampIndex: true, + changeFrequency: 'medium' + }, + 'CMCTB_VENDOR_VFPN': { + timestampColumns: ['IF_DT', 'IF_TM'], + primaryKeys: ['VNDRCD', 'PUR_ORG_CD', 'VNDR_SUB_NO', 'PLNT_CD', 'PTNR_SKL', 'PTNR_CNT'], + orderByColumns: ['IF_DT', 'IF_TM', 'VNDRCD'], + hasTimestampIndex: true, + changeFrequency: 'medium' + }, + 'CMCTB_VENDOR_WHTHX': { + timestampColumns: ['IF_DT', 'IF_TM'], + primaryKeys: ['VNDRCD', 'CO_CD', 'SRCE_TX_TP'], + orderByColumns: ['IF_DT', 'IF_TM', 'VNDRCD'], + hasTimestampIndex: true, + changeFrequency: 'medium' + }, + + // 견적 관련 테이블 + 'PLFTB_ESTM_PROJ_MAST': { + timestampColumns: ['FIN_CHG_DTM', 'FS_INP_DTM', 'ESTM_AOM_STAT_CHG_DTM'], + primaryKeys: ['ESTM_PROJ_NO'], + orderByColumns: ['FIN_CHG_DTM', 'ESTM_PROJ_NO'], + hasTimestampIndex: true, + changeFrequency: 'high' + } +}; + +// 기본 설정 (설정되지 않은 테이블용) +const DEFAULT_CONFIG: TableSyncConfig = { + timestampColumns: ['CHG_DT', 'IF_DT', 'CRTE_DT'], + primaryKeys: [], // 기본키 없음 - onConflictDoNothing 사용 + orderByColumns: ['CHG_DT'], + hasTimestampIndex: false, + changeFrequency: 'medium' +}; + +/** + * 테이블의 동기화 설정 가져오기 + */ +export function getTableSyncConfig(tableName: TableName): TableSyncConfig { + return TABLE_SYNC_CONFIG[tableName] || DEFAULT_CONFIG; +} + +/** + * 차분 동기화가 가능한 테이블인지 확인 + */ +export function canUseDeltaSync(tableName: TableName): boolean { + const config = getTableSyncConfig(tableName); + return config.timestampColumns.length > 0 && config.timestampColumns[0] !== 'ROWID'; +} + +/** + * 테이블의 최적 타임스탬프 컬럼 가져오기 + */ +export function getTimestampColumn(tableName: TableName): string | null { + const config = getTableSyncConfig(tableName); + return config.timestampColumns.length > 0 ? config.timestampColumns[0] : null; +}
\ No newline at end of file |
