1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
|
/* eslint-disable @typescript-eslint/no-explicit-any */
"use server";
import * as cron from 'node-cron';
import { sql } from 'drizzle-orm';
import { oracleKnex } from '@/lib/oracle-db/db';
import db from '@/db/db';
import { DatabaseSchema, TableName, ALL_TABLE_NAMES } from '@/lib/oracle-db/nonsap/oracle-schema';
import * as nonsapSchema from '@/db/schema/NONSAP/nonsap';
import { getTableSyncConfig, canUseDeltaSync, getTimestampColumn } from './table-config';
import { PAGE_SIZE, BATCH_SIZE, MAX_WORKERS, DELTA_SYNC_ENABLED } from './sync-config';
interface SyncProgress {
tableName: string;
lastSyncDate: string;
currentPage: number;
totalProcessed: number;
status: 'running' | 'completed' | 'error' | 'skipped';
lastError?: string;
syncType: 'full' | 'delta' | 'rebuild';
startTime: number;
endTime?: number;
recordsSkipped?: number;
}
interface DeltaSyncInfo {
tableName: string;
lastSyncTimestamp: string | null;
timestampColumn: string;
}
// 동기화 진행 상태 저장
const syncProgress = new Map<string, SyncProgress>();
// PostgreSQL에 마지막 동기화 정보 저장 테이블
const SYNC_STATUS_TABLE = 'nonsap_sync_status';
// 진행률 바 유틸리티 (기존과 동일)
const createProgressBar = (current: number, total: number, width: number = 20): string => {
const filled = Math.floor((current / total) * width);
const empty = width - filled;
const percentage = Math.round((current / total) * 100);
const bar = '█'.repeat(filled) + '░'.repeat(empty);
return `[ ${current.toString().padStart(3)} / ${total.toString().padEnd(3)} | ${bar} | ${percentage.toString().padStart(3)}% ]`;
};
const updateProgressBar = (message: string, current: number, total: number): void => {
const progressBar = createProgressBar(current, total);
const fullMessage = `[NONSAP-SYNC] ${message} ${progressBar}`;
process.stdout.write('\r' + ' '.repeat(100) + '\r');
process.stdout.write(fullMessage);
if (current >= total) {
process.stdout.write('\n');
}
};
// 로거 (기존과 동일)
const logger = {
info: (message: string, ...args: unknown[]) => console.log(`[NONSAP-SYNC INFO] ${message}`, ...args),
error: (message: string, ...args: unknown[]) => console.error(`[NONSAP-SYNC ERROR] ${message}`, ...args),
warn: (message: string, ...args: unknown[]) => console.warn(`[NONSAP-SYNC WARN] ${message}`, ...args),
success: (message: string, ...args: unknown[]) => console.log(`[NONSAP-SYNC SUCCESS] ${message}`, ...args),
header: (message: string) => {
console.log('\n' + '='.repeat(80));
console.log(`[NONSAP-SYNC] ${message}`);
console.log('='.repeat(80) + '\n');
},
progress: updateProgressBar
};
/**
* 동기화 상태 테이블 초기화
*/
async function initializeSyncStatusTable(): Promise<void> {
try {
const tableExists = await db.execute(sql`
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'nonsap'
AND table_name = ${SYNC_STATUS_TABLE}
);
`);
if (!tableExists.rows[0].exists) {
await db.execute(sql`
CREATE TABLE nonsap.${sql.identifier(SYNC_STATUS_TABLE)} (
table_name VARCHAR(50) PRIMARY KEY,
last_sync_timestamp VARCHAR(14),
last_sync_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
sync_type VARCHAR(10) DEFAULT 'full',
total_records_processed INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
`);
logger.info('Sync status table created');
}
} catch (error) {
logger.warn('Failed to initialize sync status table:', error);
}
}
/**
* 마지막 동기화 정보 조회
*/
async function getLastSyncInfo(tableName: string): Promise<DeltaSyncInfo> {
try {
const result = await db.execute(sql`
SELECT last_sync_timestamp
FROM nonsap.${sql.identifier(SYNC_STATUS_TABLE)}
WHERE table_name = ${tableName}
`);
const timestampColumn = getTimestampColumn(tableName as TableName);
return {
tableName,
lastSyncTimestamp: (result.rows[0] as any)?.last_sync_timestamp || null,
timestampColumn: timestampColumn || 'CHG_DT'
};
} catch (error) {
logger.warn(`Failed to get last sync info for ${tableName}:`, error);
return {
tableName,
lastSyncTimestamp: null,
timestampColumn: getTimestampColumn(tableName as TableName) || 'CHG_DT'
};
}
}
/**
* 마지막 동기화 정보 업데이트
*/
async function updateLastSyncInfo(tableName: string, timestamp: string, recordsProcessed: number): Promise<void> {
try {
await db.execute(sql`
INSERT INTO nonsap.${sql.identifier(SYNC_STATUS_TABLE)}
(table_name, last_sync_timestamp, total_records_processed, updated_at)
VALUES (${tableName}, ${timestamp}, ${recordsProcessed}, CURRENT_TIMESTAMP)
ON CONFLICT (table_name)
DO UPDATE SET
last_sync_timestamp = ${timestamp},
total_records_processed = ${recordsProcessed},
updated_at = CURRENT_TIMESTAMP
`);
} catch (error) {
logger.error(`Failed to update sync info for ${tableName}:`, error);
}
}
/**
* 전체 동기화를 위한 Oracle 데이터 조회 (ROWNUM 대신 ROW_NUMBER 사용)
*/
async function fetchOracleDataFull<T extends TableName>(
tableName: T,
page: number,
pageSize: number = PAGE_SIZE
): Promise<DatabaseSchema[T][]> {
const offset = (page - 1) * pageSize;
try {
// ROW_NUMBER() OVER() 구문을 사용한 안전한 페이징
const query = `
SELECT * FROM (
SELECT t.*, ROW_NUMBER() OVER (ORDER BY ROWID) as rn
FROM ${tableName} t
)
WHERE rn > ${offset} AND rn <= ${offset + pageSize}
`;
const result = await oracleKnex.raw(query);
// Oracle knex raw 결과는 result[0]에 실제 rows가 있음
const rows = Array.isArray(result) ? result : result.rows || [];
// Oracle raw 쿼리 결과에서 실제 데이터 추출 (rn 컬럼 제거)
const cleanResults = rows.map((row: any) => {
if (row && typeof row === 'object') {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { rn: _rn, RN: _RN, ...cleanRow } = row; // 대소문자 모두 제거
return cleanRow;
}
return row;
});
logger.info(`Fetched ${cleanResults.length} records from ${tableName} (page ${page}, full sync)`);
return cleanResults as DatabaseSchema[T][];
} catch (error) {
logger.error(`Error fetching full data from ${tableName}:`, error);
throw error;
}
}
/**
* 차분 동기화를 위한 Oracle 데이터 조회 (ROW_NUMBER 사용)
*/
async function fetchOracleDataDelta<T extends TableName>(
tableName: T,
lastSyncTimestamp: string | null,
timestampColumn: string,
page: number,
pageSize: number = PAGE_SIZE
): Promise<{ data: DatabaseSchema[T][]; hasMore: boolean; maxTimestamp: string | null }> {
const offset = (page - 1) * pageSize;
try {
let whereClause = '';
const params: any[] = [];
// 차분 동기화 조건 추가
if (lastSyncTimestamp && timestampColumn) {
if (timestampColumn.includes('DTM')) {
// 14자리 YYYYMMDDHHMISS 형태
whereClause = `WHERE ${timestampColumn} > ?`;
params.push(lastSyncTimestamp);
} else {
// 8자리 YYYYMMDD 형태만 사용 (CHG_TM 컬럼이 없는 테이블들)
const lastDate = lastSyncTimestamp.substring(0, 8);
// 테이블에 따라 조건 분기
const timeColumn = timestampColumn.replace('_DT', '_TM');
// 실제로 CHG_TM 컬럼이 있는지 확인 (간접적으로 테이블명으로 판단)
const hasTimeColumn = [
'CMCTB_VENDOR_GENERAL', 'CMCTB_VENDOR_GRP', 'CMCTB_VENDOR_INCO',
'CMCTB_CDNM', 'CMCTB_CD_CLF_NM'
].includes(tableName);
if (hasTimeColumn) {
const lastTime = lastSyncTimestamp.substring(8, 14);
whereClause = `WHERE (${timestampColumn} > ? OR (${timestampColumn} = ? AND ${timeColumn} > ?))`;
params.push(lastDate, lastDate, lastTime);
} else {
// CHG_TM 컬럼이 없는 테이블은 날짜만으로 비교
whereClause = `WHERE ${timestampColumn} > ?`;
params.push(lastDate);
}
}
}
// ROW_NUMBER() OVER() 구문을 사용한 안전한 페이징
const orderColumn = timestampColumn || 'ROWID';
const query = `
SELECT * FROM (
SELECT t.*, ROW_NUMBER() OVER (ORDER BY ${orderColumn}) as rn
FROM ${tableName} t
${whereClause}
)
WHERE rn > ${offset} AND rn <= ${offset + pageSize}
`;
const result = await oracleKnex.raw(query, params);
// Oracle knex raw 결과는 result[0]에 실제 rows가 있음
const rows = Array.isArray(result) ? result : result.rows || [];
// Oracle raw 쿼리 결과에서 실제 데이터 추출 (rn 컬럼 제거)
const cleanResults = rows.map((row: any) => {
if (row && typeof row === 'object') {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { rn: _rn, RN: _RN, ...cleanRow } = row; // 대소문자 모두 제거
return cleanRow;
}
return row;
});
// 최대 타임스탬프 계산
let maxTimestamp: string | null = null;
if (cleanResults.length > 0 && timestampColumn) {
const lastRecord = cleanResults[cleanResults.length - 1] as any;
if (timestampColumn.includes('DTM')) {
maxTimestamp = lastRecord[timestampColumn];
} else {
const timeColumn = timestampColumn.replace('_DT', '_TM');
const date = lastRecord[timestampColumn] || '';
// CHG_TM 컬럼이 있는 테이블만 시간 조합
const hasTimeColumn = [
'CMCTB_VENDOR_GENERAL', 'CMCTB_VENDOR_GRP', 'CMCTB_VENDOR_INCO',
'CMCTB_CDNM', 'CMCTB_CD_CLF_NM'
].includes(tableName);
if (hasTimeColumn && lastRecord[timeColumn]) {
const time = lastRecord[timeColumn] || '000000';
maxTimestamp = date + time.padEnd(6, '0');
} else {
// 시간 정보가 없는 경우 날짜만 + 시간은 끝시간으로
maxTimestamp = date + '235959';
}
}
}
const hasMore = cleanResults.length === pageSize;
logger.info(`Fetched ${cleanResults.length} records from ${tableName} (page ${page}, delta: ${!!lastSyncTimestamp})`);
return {
data: cleanResults as DatabaseSchema[T][],
hasMore,
maxTimestamp
};
} catch (error) {
logger.error(`Error fetching delta data from ${tableName}:`, error);
throw error;
}
}
/**
* 컬럼명 정규화 (Oracle -> PostgreSQL)
*/
function normalizeColumnNames(record: any, tableSchema: any): any {
const normalizedRecord: any = {};
const schemaColumns = Object.keys(tableSchema);
for (const [key, value] of Object.entries(record)) {
if (!key || typeof key !== 'string' || key === 'undefined' || key.trim() === '') {
continue;
}
// RN 컬럼 제거 (ROW_NUMBER 결과)
if (key.toUpperCase() === 'RN') {
continue;
}
// 대문자로 변환하여 스키마와 매칭
const upperKey = key.toUpperCase();
// 스키마에 해당 컬럼이 있는지 확인
if (schemaColumns.includes(upperKey)) {
normalizedRecord[upperKey] = value;
} else {
// 스키마에 없는 컬럼은 경고 출력하지 않고 조용히 스킵 (너무 많은 로그 방지)
// logger.warn(`Column ${key} (${upperKey}) not found in schema, skipping`);
}
}
return normalizedRecord;
}
/**
* PostgreSQL에서 실제 constraint 존재 여부 확인
*/
async function hasValidConstraint(tableName: string, columnNames: string[]): Promise<boolean> {
try {
if (columnNames.length === 0) return false;
const tableNameLower = tableName.toLowerCase();
// Primary key나 unique constraint 확인
const constraintQuery = sql`
SELECT tc.constraint_name, tc.constraint_type, kcu.column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
WHERE tc.table_schema = 'nonsap'
AND tc.table_name = ${tableNameLower}
AND tc.constraint_type IN ('PRIMARY KEY', 'UNIQUE')
`;
const result = await db.execute(constraintQuery);
const constraints = result.rows;
// 해당 컬럼들이 실제 constraint에 포함되는지 확인
for (const constraint of constraints) {
const constraintColumns = constraints
.filter((c: any) => c.constraint_name === constraint.constraint_name)
.map((c: any) => c.column_name.toUpperCase());
// 모든 컬럼이 constraint에 포함되는지 확인
const allColumnsInConstraint = columnNames.every(col =>
constraintColumns.includes(col.toUpperCase())
);
if (allColumnsInConstraint) {
return true;
}
}
return false;
} catch (error) {
logger.warn(`Failed to check constraints for ${tableName}:`, error);
return false;
}
}
/**
* PostgreSQL에 데이터 upsert (최적화된 버전)
*/
async function upsertToPostgresOptimized<T extends TableName>(
tableName: T,
data: DatabaseSchema[T][],
primaryKeys: string[]
): Promise<void> {
if (data.length === 0) return;
try {
const tableCamelCase = tableName.toLowerCase()
.split('_')
.map((word, index) => index === 0 ? word : word.charAt(0).toUpperCase() + word.slice(1))
.join('');
const tableSchema = (nonsapSchema as any)[tableCamelCase];
if (!tableSchema) {
throw new Error(`Table schema not found for ${tableName} (${tableCamelCase})`);
}
for (let i = 0; i < data.length; i += BATCH_SIZE) {
const batch = data.slice(i, i + BATCH_SIZE);
const currentBatch = Math.floor(i / BATCH_SIZE) + 1;
try {
// primaryKeys에서 undefined 값 필터링 및 검증
const validPrimaryKeys = primaryKeys.filter(key =>
key &&
typeof key === 'string' &&
key !== 'ROWID' &&
key !== 'undefined' &&
key.trim() !== ''
);
if (currentBatch === 1) {
logger.info(`${tableName} - Valid primaryKeys: [${validPrimaryKeys.join(', ')}]`);
}
// 데이터 정규화 (컬럼명 매핑)
const cleanBatch = batch
.map(record => normalizeColumnNames(record, tableSchema))
.filter(record => Object.keys(record).length > 0);
if (cleanBatch.length === 0) {
logger.warn(`${tableName} - No valid records after cleaning batch ${currentBatch}`);
continue;
}
if (validPrimaryKeys.length > 0) {
// 실제 constraint 존재 여부 확인
const hasConstraint = await hasValidConstraint(tableName, validPrimaryKeys);
if (hasConstraint) {
// 스키마에서 실제 컬럼 객체들 가져오기
const conflictColumns = validPrimaryKeys
.filter(key => key in tableSchema)
.map(key => tableSchema[key]);
if (conflictColumns.length > 0) {
// 업데이트할 필드들만 선별 (기본키 제외)
const updateFields = Object.keys(cleanBatch[0])
.filter(key => !validPrimaryKeys.includes(key) && key in tableSchema)
.reduce((acc, key) => {
acc[key] = sql.raw(`excluded."${key}"`);
return acc;
}, {} as Record<string, any>);
await db.insert(tableSchema as any)
.values(cleanBatch)
.onConflictDoUpdate({
target: conflictColumns,
set: updateFields
});
} else {
// 스키마에서 컬럼을 찾을 수 없는 경우
logger.warn(`${tableName} - Primary key columns not found in schema, using simple insert`);
await db.insert(tableSchema as any).values(cleanBatch);
}
} else {
// constraint가 없는 경우: 개별 처리로 중복 방지
logger.info(`${tableName} - No valid constraints found, using individual upsert`);
for (const record of cleanBatch) {
try {
await db.insert(tableSchema as any)
.values(record)
.onConflictDoNothing(); // 에러 무시하고 계속
} catch {
// 개별 레코드 실패는 조용히 무시
}
}
}
} else {
// 기본키가 없는 테이블: 삭제 후 재삽입 방식
if (currentBatch === 1) {
// 첫 번째 배치에서만 기존 데이터 확인 및 삭제
const existingDataCheck = await db.execute(sql`
SELECT COUNT(*) as count FROM ${sql.identifier('nonsap')}.${sql.identifier(tableName.toLowerCase())}
`);
const existingCount = Number(existingDataCheck.rows[0].count);
if (existingCount > 0) {
logger.info(`${tableName} - Existing data found (${existingCount} records), deleting before insert`);
await db.execute(sql`
DELETE FROM ${sql.identifier('nonsap')}.${sql.identifier(tableName.toLowerCase())}
`);
} else {
logger.info(`${tableName} - No existing data, proceeding with fresh insert`);
}
}
// 데이터 정규화 후 삽입
if (cleanBatch.length > 0) {
await db.insert(tableSchema as any).values(cleanBatch);
}
}
} catch (insertError: unknown) {
const isDuplicateKeyError = insertError &&
typeof insertError === 'object' &&
'code' in insertError &&
insertError.code === '23505';
if (!isDuplicateKeyError) {
logger.warn(`Batch upsert failed for ${tableName}, trying individual operations`, insertError);
// 개별 처리
for (const record of batch) {
try {
// 레코드 정규화
const cleanRecord = normalizeColumnNames(record, tableSchema);
if (Object.keys(cleanRecord).length > 0) {
await db.insert(tableSchema as any)
.values(cleanRecord)
.onConflictDoNothing();
}
} catch {
// 개별 레코드 실패는 무시
}
}
}
}
if (currentBatch % 10 === 0) {
await new Promise(resolve => setTimeout(resolve, 50));
}
}
logger.success(`Successfully processed ${data.length} records for ${tableName}`);
} catch (error) {
logger.error(`Error upserting data to ${tableName}:`, error);
throw error;
}
}
/**
* 워커를 사용한 테이블 동기화
*/
async function syncTableWithWorker<T extends TableName>(
tableName: T,
tableIndex: number,
totalTables: number
): Promise<void> {
logger.info(`Starting enhanced sync for table: ${tableName} (${tableIndex}/${totalTables})`);
const config = getTableSyncConfig(tableName);
const useDeltaSync = DELTA_SYNC_ENABLED && canUseDeltaSync(tableName);
const timestampColumn = getTimestampColumn(tableName);
// 동기화 타입 결정
let syncType: 'full' | 'delta' | 'rebuild';
if (useDeltaSync) {
syncType = 'delta';
} else if (config.primaryKeys.length === 0) {
// 기본키가 없는 테이블은 삭제 후 재구성
syncType = 'rebuild';
} else {
// 기본키가 있지만 차분 동기화가 불가능한 경우 전체 동기화
syncType = 'full';
}
const syncTypeDescription = {
'delta': '차분 동기화',
'full': '전체 동기화',
'rebuild': '삭제 후 재구성'
}[syncType];
// 동기화 진행 상태 초기화
const progress: SyncProgress = {
tableName,
lastSyncDate: new Date().toISOString(),
currentPage: 1,
totalProcessed: 0,
status: 'running',
syncType,
startTime: Date.now()
};
syncProgress.set(tableName, progress);
try {
let page = 1;
let totalProcessed = 0;
let hasMore = true;
let maxTimestamp: string | null = null;
let lastSyncTimestamp: string | null = null;
// 차분 동기화 정보 조회
if (useDeltaSync) {
const deltaInfo = await getLastSyncInfo(tableName);
lastSyncTimestamp = deltaInfo.lastSyncTimestamp;
logger.info(`${syncTypeDescription} for ${tableName}, last timestamp: ${lastSyncTimestamp || 'none'}`);
} else {
logger.info(`${syncTypeDescription} for ${tableName}`);
}
while (hasMore) {
let result: { data: DatabaseSchema[T][]; hasMore: boolean; maxTimestamp: string | null };
if (useDeltaSync && timestampColumn) {
// 차분 동기화 사용
result = await fetchOracleDataDelta(
tableName,
lastSyncTimestamp,
timestampColumn,
page
);
} else {
// 전체 동기화 사용 (기존 방식)
const oracleData = await fetchOracleDataFull(tableName, page);
result = {
data: oracleData,
hasMore: oracleData.length === PAGE_SIZE,
maxTimestamp: null
};
}
if (result.data.length === 0) {
logger.info(`No new data found for ${tableName}, skipping...`);
progress.status = 'skipped';
break;
}
// PostgreSQL에 upsert
await upsertToPostgresOptimized(tableName, result.data, config.primaryKeys);
totalProcessed += result.data.length;
hasMore = result.hasMore;
if (result.maxTimestamp) {
maxTimestamp = result.maxTimestamp;
}
// 진행 상태 업데이트
progress.currentPage = page;
progress.totalProcessed = totalProcessed;
page++;
// 차분 동기화가 아닌 경우 페이지 크기보다 적으면 중단
if (!useDeltaSync && result.data.length < PAGE_SIZE) {
hasMore = false;
}
// 차분 동기화에서는 연속된 빈 페이지가 나오면 중단
else if (useDeltaSync && result.data.length < PAGE_SIZE) {
hasMore = false;
}
}
// 동기화 완료 처리
progress.status = progress.status === 'skipped' ? 'skipped' : 'completed';
progress.endTime = Date.now();
// 마지막 동기화 정보 업데이트
if (maxTimestamp && useDeltaSync) {
await updateLastSyncInfo(tableName, maxTimestamp, totalProcessed);
}
const duration = (progress.endTime - progress.startTime) / 1000;
logger.success(`Table ${tableName} sync completed. Processed: ${totalProcessed}, Duration: ${duration.toFixed(2)}s, Type: ${syncTypeDescription}`);
} catch (error) {
progress.status = 'error';
progress.lastError = error instanceof Error ? error.message : String(error);
progress.endTime = Date.now();
logger.error(`Table ${tableName} sync failed:`, error);
throw error;
}
}
/**
* 멀티스레드 동기화 (테이블별 병렬 처리)
*/
async function syncTablesInParallel(tables: TableName[]): Promise<void> {
const chunks: TableName[][] = [];
const chunkSize = Math.ceil(tables.length / MAX_WORKERS);
for (let i = 0; i < tables.length; i += chunkSize) {
chunks.push(tables.slice(i, i + chunkSize));
}
logger.info(`Processing ${tables.length} tables in ${chunks.length} parallel groups`);
const promises = chunks.map(async (tableChunk, chunkIndex) => {
logger.info(`Starting worker ${chunkIndex + 1} with ${tableChunk.length} tables`);
for (let i = 0; i < tableChunk.length; i++) {
const tableName = tableChunk[i];
const globalIndex = chunks.slice(0, chunkIndex).reduce((sum, chunk) => sum + chunk.length, 0) + i + 1;
try {
await syncTableWithWorker(tableName, globalIndex, tables.length);
} catch (error) {
logger.error(`Worker ${chunkIndex + 1} failed on table ${tableName}:`, error);
// 개별 테이블 실패는 무시하고 계속 진행
}
}
logger.info(`Worker ${chunkIndex + 1} completed`);
});
await Promise.all(promises);
}
/**
* 모든 테이블 동기화 (개선된 버전)
*/
async function syncAllTablesEnhanced(): Promise<void> {
logger.header('Starting Enhanced NONSAP Data Synchronization');
await initializeSyncStatusTable();
const startTime = Date.now();
let successCount = 0;
let errorCount = 0;
let skippedCount = 0;
const totalTables = ALL_TABLE_NAMES.length;
logger.info(`Total tables to sync: ${totalTables}`);
logger.info(`Delta sync enabled: ${DELTA_SYNC_ENABLED}`);
logger.info(`Max workers: ${MAX_WORKERS}`);
try {
await syncTablesInParallel(ALL_TABLE_NAMES);
// 결과 집계
for (const progress of syncProgress.values()) {
switch (progress.status) {
case 'completed':
successCount++;
break;
case 'error':
errorCount++;
break;
case 'skipped':
skippedCount++;
break;
}
}
} catch (error) {
logger.error('Sync process failed:', error);
}
const duration = Date.now() - startTime;
logger.info(`Enhanced sync completed in ${(duration / 1000).toFixed(2)}s`);
logger.info(`Success: ${successCount}, Errors: ${errorCount}, Skipped: ${skippedCount}`);
logger.info(`Total tables processed: ${totalTables}`);
if (errorCount > 0) {
logger.warn(`${errorCount} tables had errors during sync`);
}
if (successCount + skippedCount === totalTables) {
logger.success('🚀 All tables synchronized successfully with enhanced performance!');
}
}
/**
* 동기화 상태 조회 API (개선된 버전)
*/
export async function getSyncProgressEnhanced(): Promise<SyncProgress[]> {
return Array.from(syncProgress.values());
}
/**
* 수동 동기화 트리거 (개선된 버전)
*/
export async function triggerEnhancedSync(): Promise<void> {
logger.info('Enhanced manual sync triggered');
await syncAllTablesEnhanced();
}
/**
* 특정 테이블 전체 재동기화
*/
export async function triggerFullResync(tableName: TableName): Promise<void> {
logger.info(`Full resync triggered for table: ${tableName}`);
try {
// 마지막 동기화 정보 삭제 (전체 동기화 강제)
await db.execute(sql`
DELETE FROM nonsap.${sql.identifier(SYNC_STATUS_TABLE)}
WHERE table_name = ${tableName}
`);
await syncTableWithWorker(tableName, 1, 1);
logger.success(`Full resync completed for ${tableName}`);
} catch (error) {
logger.error(`Full resync failed for ${tableName}:`, error);
throw error;
}
}
/**
* 개선된 동기화 스케줄러 시작
*/
export async function startEnhancedSyncScheduler(): Promise<void> {
logger.info('Initializing Enhanced NONSAP data synchronization scheduler...');
try {
// 매일 새벽 1시에 실행 (아래 타임존 설정이 있으므로 KST로 기입 가능)
cron.schedule('0 1 * * *', async () => {
try {
logger.info('Cron job triggered: Starting enhanced scheduled sync');
const isConnected = await testOracleConnection();
if (!isConnected) {
logger.warn('Oracle DB not available, skipping sync');
return;
}
await syncAllTablesEnhanced();
} catch (error) {
logger.error('Enhanced scheduled sync failed:', error);
}
}, {
timezone: 'Asia/Seoul'
});
logger.success(`Enhanced NONSAP sync scheduler registered (every 30 minutes) for ${ALL_TABLE_NAMES.length} tables`);
if (process.env.NODE_ENV === 'development') {
logger.info('Development mode: Enhanced cron registered but initial sync skipped');
return;
}
if (process.env.SYNC_ON_START === 'true') {
logger.info('Initial enhanced sync on startup enabled');
setTimeout(async () => {
try {
const isConnected = await testOracleConnection();
if (isConnected) {
await syncAllTablesEnhanced();
} else {
logger.warn('Initial enhanced sync skipped - Oracle DB not available');
}
} catch (error) {
logger.error('Initial enhanced sync failed:', error);
}
}, 10000);
}
} catch (error) {
logger.error('Failed to set up enhanced cron scheduler:', error);
logger.warn('Application will continue without enhanced sync scheduler');
}
}
/**
* Oracle DB 연결 테스트
*/
async function testOracleConnection(): Promise<boolean> {
try {
const result = await oracleKnex.raw('SELECT 1 FROM DUAL');
return !!result;
} catch (error) {
logger.error('Oracle DB connection test failed:', error);
return false;
}
}
|