summaryrefslogtreecommitdiff
path: root/lib/soap/batch-utils.ts
blob: ff2c3520523e4e6aca459d6752695d8557c70d66 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import { inArray, sql, getTableColumns } from "drizzle-orm";
import db from "@/db/db";

/**
 * 대용량 INSERT/UPSERT 및 하위 테이블 교체 처리를 위한 공통 유틸 함수 모음.
 * 
 * - `bulkUpsert`  : 단일 PK(혹은 UNIQUE) 컬럼을 기준으로 다건 UPSERT 수행
 * - `bulkReplaceSubTableData` : FK 컬럼(IN ... ) 조건으로 기존 데이터 삭제 후 신규 데이터를 배치 삽입
 * 
 * NOTE: Drizzle ORM 의 onConflictDoUpdate 구문은 한 번의 INSERT 문으로도 여러 레코드의
 *       UPSERT 를 처리할 수 있으므로, 네트워크 왕복 횟수를 크게 줄일 수 있다.
 */

/**
 * Primary/Unique Key 하나를 기준으로 여러 레코드를 UPSERT 한다.
 * @param tx          Active transaction object from `db.transaction`
 * @param table       Drizzle table schema object
 * @param data        Rows to upsert
 * @param uniqueCol   Column name that has UNIQUE constraint (e.g. 'MATNR')
 * @param chunkSize   Split size to avoid exceeding Postgres parameter limit (default 1000)
 */
export async function bulkUpsert<T extends Record<string, unknown>>(
  tx: Parameters<Parameters<typeof db.transaction>[0]>[0],
  table: any, // Generic Drizzle table type – use `any` to stay flexible
  data: T[],
  uniqueCol: string,
  chunkSize: number = 500,
) {
  if (!data.length) return;

  // Build SET clause once, using excluded.* reference for every column except PK / createdAt / id
  const buildSetClause = (sample: T) => {
    const setObj: Record<string, unknown> = { updatedAt: new Date() };
    const tableColumns = getTableColumns(table);
    
    for (const col of Object.keys(sample)) {
      if (col === uniqueCol || col === "id" || col === "createdAt" || col === "updatedAt") continue;
      
      // Drizzle 테이블 스키마에서 실제 데이터베이스 컬럼명 가져오기
      const dbColumnName = tableColumns[col]?.name || col;
      setObj[col] = sql.raw(`excluded."${dbColumnName}"`);
    }
    return setObj;
  };

  const setClause = buildSetClause(data[0]);

  for (let i = 0; i < data.length; i += chunkSize) {
    const chunk = data.slice(i, i + chunkSize);
    await tx.insert(table)
      .values(chunk as any)
      .onConflictDoUpdate({
        target: table[uniqueCol],
        set: setClause,
      });
  }
}

/**
 * 여러 부모 레코드에 매핑된 하위 테이블(자식 테이블) 데이터를 한 번에 교체한다.
 * 1) parentIds 에 해당하는 기존 데이터를 삭제한 뒤
 * 2) 새 데이터 배열을 배치 삽입한다.
 * 
 * @param tx          Active transaction object
 * @param table       Drizzle table schema object (child table)
 * @param data        Rows to insert after deletion
 * @param parentField Column object that references the FK to parent (e.g. MATERIAL_MASTER_PART_MATL_DESC.MATNR)
 * @param parentIds   Parent id list to match IN (...)
 * @param chunkSize   Batch insert split size (default 1000)
 */
export async function bulkReplaceSubTableData<T extends Record<string, unknown>>(
  tx: Parameters<Parameters<typeof db.transaction>[0]>[0],
  table: any,
  data: T[],
  parentField: any,
  parentIds: string[],
  chunkSize: number = 1000,
) {
  // FK 값이 없는 서브테이블 건은 생략
  if (!parentIds.length) return;
  
  // 마이그레이션시는 삭제 생략
  const skipDelete = process.env.SOAP_IGNORE_DELETE_FOR_MIGRATION === "true";
  
  if (!skipDelete) {
    await tx.delete(table).where(inArray(parentField, parentIds));
  }

  // 2. 새 데이터 일괄 삽입 (chunking)
  if (!data.length) return;

  for (let i = 0; i < data.length; i += chunkSize) {
    const chunk = data.slice(i, i + chunkSize);
    await tx.insert(table).values(chunk as any);
  }
}