summaryrefslogtreecommitdiff
path: root/public/wsdl/_util
diff options
context:
space:
mode:
Diffstat (limited to 'public/wsdl/_util')
-rw-r--r--public/wsdl/_util/update_wsdl_with_csv.py342
-rw-r--r--public/wsdl/_util/update_wsdl_with_csv_for_mdg.py632
-rwxr-xr-xpublic/wsdl/_util/wsdl_comment_to_drizzle_schema.py584
3 files changed, 1439 insertions, 119 deletions
diff --git a/public/wsdl/_util/update_wsdl_with_csv.py b/public/wsdl/_util/update_wsdl_with_csv.py
index 91a9d4dc..b914d411 100644
--- a/public/wsdl/_util/update_wsdl_with_csv.py
+++ b/public/wsdl/_util/update_wsdl_with_csv.py
@@ -3,6 +3,7 @@ import csv
import re
import shutil
import os
+import argparse
from datetime import datetime
# 컬러 로그를 위한 색상 코드 추가
@@ -142,8 +143,8 @@ def get_complex_type_info(wsdl_content):
type_stack = [] # 중첩된 complexType을 추적하기 위한 스택
for line in wsdl_content:
- # complexType 시작 태그 찾기
- type_match = re.search(r'<xsd:complexType\s+name="([^"]+)"', line)
+ # complexType 시작 태그 찾기 (xsd: 또는 xs: 네임스페이스 지원)
+ type_match = re.search(r'<(?:xsd:|xs:)complexType\s+name="([^"]+)"', line)
if type_match:
if current_type:
type_stack.append(current_type)
@@ -151,8 +152,8 @@ def get_complex_type_info(wsdl_content):
current_fields = []
continue
- # complexType 종료 태그 찾기
- if '</xsd:complexType>' in line:
+ # complexType 종료 태그 찾기 (xsd: 또는 xs: 네임스페이스 지원)
+ if re.search(r'</(?:xsd:|xs:)complexType>', line):
if current_type:
complex_types[current_type] = current_fields
if type_stack:
@@ -161,25 +162,19 @@ def get_complex_type_info(wsdl_content):
current_type = None
continue
- # element 태그 찾기
- element_match = re.search(r'<xsd:element\s+name="([^"]+)"', line)
+ # element 태그 찾기 (xsd: 또는 xs: 네임스페이스 지원)
+ element_match = re.search(r'<(?:xsd:|xs:)element\s+name="([^"]+)"', line)
if element_match and current_type:
+ type_match = re.search(r'type="([^"]+)"', line) if 'type="' in line else None
field_info = {
'name': element_match.group(1),
- 'type': re.search(r'type="([^"]+)"', line).group(1) if 'type="' in line else None,
+ 'type': type_match.group(1) if type_match else None,
'is_array': 'maxOccurs="unbounded"' in line
}
current_fields.append(field_info)
return complex_types
-def get_table_for_complex_type(table_name, complex_type):
- """테이블 이름에서 complexType에 해당하는 부분 추출"""
- # 테이블 이름이 '/'로 구분되어 있다면 마지막 부분을 반환
- if '/' in table_name:
- return table_name.split('/')[-1].upper()
- return table_name.upper()
-
def load_csv_data(csv_file):
"""CSV 파일에서 필드 정보를 딕셔너리로 로드"""
csv_data = {}
@@ -230,16 +225,61 @@ def load_csv_data(csv_file):
return csv_data
+def load_csv_data_from_path(csv_file_path):
+ """CSV 파일 경로에서 필드 정보를 딕셔너리로 로드"""
+ csv_data = {}
+ csv_filename = os.path.basename(csv_file_path)
+
+ try:
+ with open(csv_file_path, 'r', encoding='utf-8-sig') as f: # BOM 처리
+ reader = csv.DictReader(f)
+ for row_num, row in enumerate(reader, start=2): # 헤더 다음부터 2행
+ try:
+ field_name = row['Field']
+ table_name = row['Table']
+
+ # 매핑된 필드명이 있으면 사용, 없으면 원래 필드명 사용
+ wsdl_field_name = FIELD_MAPPING.get(field_name, field_name)
+
+ # 테이블 정보를 키에 포함 (구분자를 || 로 변경)
+ key = f"{wsdl_field_name}||{table_name}"
+
+ # 타입과 사이즈 정규화
+ normalized_type, normalized_size = normalize_sap_type_and_size(
+ row.get('Type', ''), row.get('Size', '')
+ )
+
+ # Description 안전 처리
+ safe_desc = safe_description_escape(row.get('Description', ''))
+
+ csv_data[key] = {
+ 'seq': row.get('SEQ', ''),
+ 'table': table_name,
+ 'field': row.get('Field', ''), # 원래 CSV 필드명 저장
+ 'mo': row.get('M/O', ''),
+ 'type': normalized_type, # 정규화된 타입
+ 'size': normalized_size, # 정규화된 사이즈
+ 'description': safe_desc, # 안전 처리된 Description
+ 'original_type': row.get('Type', ''), # 원본 타입 보존
+ 'original_size': row.get('Size', '') # 원본 사이즈 보존
+ }
+
+ except Exception as e:
+ print_error(f"CSV 행 {row_num} 처리 실패 - {csv_filename}: {str(e)}")
+ print_error(f"문제 행 데이터: {row}")
+ continue
+
+ except Exception as e:
+ print_error(f"CSV 파일 로딩 실패 - {csv_file_path}: {str(e)}")
+ return {}
+
+ return csv_data
+
def extract_field_name_from_line(line):
"""라인에서 name="필드명" 추출"""
match = re.search(r'name="([^"]+)"', line)
return match.group(1) if match else None
-def extract_field_from_comment(comment_line):
- """주석에서 Field: 부분의 필드명 추출"""
- match = re.search(r'Field:([^,]+)', comment_line)
- return match.group(1).strip() if match else None
-
def has_seq_in_comment(comment_line):
"""주석에 SEQ가 있는지 확인"""
return 'SEQ:' in comment_line
@@ -318,12 +358,13 @@ def comments_are_equal(existing_comment, expected_comment):
def should_process_line(line, csv_data):
"""라인이 처리 대상인지 확인"""
# 네 조건을 모두 만족해야 함:
- # 1. <xsd:element 태그
+ # 1. <xsd:element 또는 <xs:element 태그
# 2. name=" 속성이 있는 태그
# 3. maxOccurs=" 속성이 없는 태그 (배열 데이터 제외)
# 4. CSV에 해당 필드가 있는 경우
- if not ('<xsd:element' in line and 'name="' in line):
+ # xsd:element 또는 xs:element 태그 검사
+ if not (re.search(r'<(?:xsd:|xs:)element\s+', line) and 'name="' in line):
return False
# maxOccurs=" 가 있으면 배열 데이터이므로 제외 (모든 maxOccurs 속성)
@@ -339,7 +380,8 @@ def should_process_line(line, csv_data):
def get_skip_reason(line, csv_data):
"""필드를 건너뛰는 이유를 반환"""
- if not ('<xsd:element' in line and 'name="' in line):
+ # xsd:element 또는 xs:element 태그 검사
+ if not (re.search(r'<(?:xsd:|xs:)element\s+', line) and 'name="' in line):
return None
field_name = extract_field_name_from_line(line)
@@ -365,36 +407,6 @@ def get_skip_reason(line, csv_data):
return None
-def get_table_prefix_from_csv_name(csv_name: str) -> str:
- """CSV 파일명에서 테이블 prefix 추출"""
- csv_upper = csv_name.upper()
-
- # CSV 파일명 패턴에서 마스터 타입 추출
- if 'CUSTOMER_MASTER' in csv_upper:
- return 'CUSTOMER'
- elif 'VENDOR_MASTER' in csv_upper:
- return 'VENDOR'
- elif 'EMPLOYEE_MASTER' in csv_upper:
- return 'EMPLOYEE'
- elif 'PROJECT_MASTER' in csv_upper:
- return 'PROJECT'
- elif 'DEPARTMENT_CODE' in csv_upper:
- return 'DEPARTMENT'
- elif 'ORGANIZATION_MASTER' in csv_upper:
- return 'ORGANIZATION'
- elif 'EQUP_MASTER' in csv_upper:
- return 'EQUP'
- elif 'MODEL_MASTER' in csv_upper:
- return 'MODEL'
- elif 'MATERIAL_MASTER' in csv_upper:
- return 'MATERIAL'
- elif 'EMPLOYEE_REFERENCE' in csv_upper:
- return 'EMPLOYEE_REF'
- else:
- # 기본적으로 MDZ 부분 제거 후 첫 번째 단어 사용
- parts = csv_name.replace('IF_MDZ_EVCP_', '').split('_')
- return parts[0] if parts else 'COMMON'
-
def backup_file(filepath):
"""파일을 백업"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
@@ -403,18 +415,19 @@ def backup_file(filepath):
print(f"백업 파일 생성: {backup_path}")
return backup_path
-def process_wsdl_file(target):
- """WSDL 파일 처리"""
- csv_file_path = os.path.join(CSV_DIR, f'{target}.csv')
- wsdl_file_path = os.path.join(WSDL_DIR, f'{target}.wsdl')
-
+def process_files(csv_file_path, wsdl_file_path):
+ """개별 CSV와 WSDL 파일 처리"""
try:
# 백업 생성
backup_path = backup_file(wsdl_file_path)
- print_color(f"\n🚀 처리 시작: {target}", Colors.BOLD)
+ # 파일명 추출 (로깅용)
+ csv_filename = os.path.basename(csv_file_path)
+ wsdl_filename = os.path.basename(wsdl_file_path)
+
+ print_color(f"\n🚀 처리 시작: {csv_filename} → {wsdl_filename}", Colors.BOLD)
print_info("CSV 데이터 로딩 중...")
- csv_data = load_csv_data(target)
+ csv_data = load_csv_data_from_path(csv_file_path)
print_success(f"CSV에서 {len(csv_data)}개 필드 정보 로드됨")
# WSDL 파일 읽기
@@ -425,7 +438,7 @@ def process_wsdl_file(target):
complex_types = get_complex_type_info(lines)
print_success(f"WSDL에서 {len(complex_types)}개 complexType 정보 추출됨")
except Exception as e:
- print_error(f"파일 초기화 실패 - {target}: {str(e)}")
+ print_error(f"파일 초기화 실패 - {csv_filename} → {wsdl_filename}: {str(e)}")
return
# complexType 구조 출력 (디버깅용)
@@ -454,23 +467,23 @@ def process_wsdl_file(target):
line_processed = False
try:
- # complexType 시작 태그 확인
- type_match = re.search(r'<xsd:complexType\s+name="([^"]+)"', line)
+ # complexType 시작 태그 확인 (xsd: 또는 xs: 네임스페이스 지원)
+ type_match = re.search(r'<(?:xsd:|xs:)complexType\s+name="([^"]+)"', line)
if type_match:
if current_complex_type:
type_stack.append(current_complex_type)
current_complex_type = type_match.group(1)
print_color(f"현재 complexType: {current_complex_type}", Colors.BLUE)
- # complexType 종료 태그 확인
- if '</xsd:complexType>' in line:
+ # complexType 종료 태그 확인 (xsd: 또는 xs: 네임스페이스 지원)
+ if re.search(r'</(?:xsd:|xs:)complexType>', line):
if type_stack:
current_complex_type = type_stack.pop()
print_color(f"이전 complexType으로 복귀: {current_complex_type}", Colors.BLUE)
else:
current_complex_type = None
- # CSV에 있는 xsd:element 필드인지 확인
+ # CSV에 있는 xsd:element 또는 xs:element 필드인지 확인
if should_process_line(line, csv_data):
field_name = extract_field_name_from_line(line)
@@ -527,7 +540,7 @@ def process_wsdl_file(target):
print_info(f" 주석 추가: {field_name}")
line_processed = True
- elif '<xsd:element' in line and 'name="' in line:
+ elif re.search(r'<(?:xsd:|xs:)element\s+', line) and 'name="' in line:
field_name = extract_field_name_from_line(line)
if field_name:
skip_reason = get_skip_reason(line, csv_data)
@@ -567,7 +580,7 @@ def process_wsdl_file(target):
# 결과 출력
print_color(f"\n{'='*50}", Colors.BOLD)
- print_color(f"처리 완료: {target}", Colors.BOLD)
+ print_color(f"처리 완료: {csv_filename} → {wsdl_filename}", Colors.BOLD)
print_color(f"{'='*50}", Colors.BOLD)
print_info(f"CSV 파일: {csv_file_path}")
@@ -608,66 +621,157 @@ def process_wsdl_file(target):
if changes_made - corrected_seq > 0:
print(f" - 새로 추가/교체된 주석: {changes_made - corrected_seq}개")
-if __name__ == "__main__":
+def process_wsdl_file(target):
+ """WSDL 파일 처리 (기존 방식, 일괄 처리용)"""
+ csv_file_path = os.path.join(CSV_DIR, f'{target}.csv')
+ wsdl_file_path = os.path.join(WSDL_DIR, f'{target}.wsdl')
+
try:
- csv_files = get_csv_files()
- print_color(f"\n🎯 발견된 CSV 파일: {len(csv_files)}개", Colors.BOLD)
- print_info(f"처리할 파일 목록: {csv_files}")
+ # 백업 생성
+ backup_path = backup_file(wsdl_file_path)
- total_files = len(csv_files)
- success_count = 0
- error_count = 0
+ print_color(f"\n🚀 처리 시작: {target}", Colors.BOLD)
+ print_info("CSV 데이터 로딩 중...")
+ csv_data = load_csv_data(target)
+ print_success(f"CSV에서 {len(csv_data)}개 필드 정보 로드됨")
+
+ # WSDL 파일 읽기
+ with open(wsdl_file_path, 'r', encoding='utf-8') as f:
+ lines = f.readlines()
- for i, target in enumerate(csv_files, 1):
+ # complexType 정보 추출
+ complex_types = get_complex_type_info(lines)
+ print_success(f"WSDL에서 {len(complex_types)}개 complexType 정보 추출됨")
+ except Exception as e:
+ print_error(f"파일 초기화 실패 - {target}: {str(e)}")
+ return
+
+def parse_arguments():
+ """커맨드라인 아규먼트 파싱"""
+ parser = argparse.ArgumentParser(
+ description="WSDL 파일에 CSV 정보를 기반으로 주석을 추가하는 도구",
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ epilog="""
+사용 예시:
+ python3 update_wsdl_with_csv.py --csv ./public/wsdl/IF_ECC_EVCP_PR_INFORMATION.csv --wsdl ./public/wsdl/IF_ECC_EVCP_PR_INFORMATION.wsdl
+ python3 update_wsdl_with_csv.py --batch # 기존 일괄 처리 모드
+ """
+ )
+
+ group = parser.add_mutually_exclusive_group(required=True)
+ group.add_argument(
+ '--csv',
+ type=str,
+ help='처리할 CSV 파일 경로'
+ )
+ group.add_argument(
+ '--batch',
+ action='store_true',
+ help='일괄 처리 모드 (기존 방식)'
+ )
+
+ parser.add_argument(
+ '--wsdl',
+ type=str,
+ help='처리할 WSDL 파일 경로 (--csv와 함께 사용)'
+ )
+
+ args = parser.parse_args()
+
+ # --csv 옵션 사용 시 --wsdl도 필수
+ if args.csv and not args.wsdl:
+ parser.error("--csv 옵션을 사용할 때는 --wsdl 옵션도 필요합니다.")
+
+ # 파일 존재 여부 확인
+ if args.csv:
+ if not os.path.exists(args.csv):
+ parser.error(f"CSV 파일을 찾을 수 없습니다: {args.csv}")
+ if not args.csv.endswith('.csv'):
+ parser.error(f"CSV 파일이 아닙니다: {args.csv}")
+
+ if args.wsdl:
+ if not os.path.exists(args.wsdl):
+ parser.error(f"WSDL 파일을 찾을 수 없습니다: {args.wsdl}")
+ if not args.wsdl.endswith('.wsdl'):
+ parser.error(f"WSDL 파일이 아닙니다: {args.wsdl}")
+
+ return args
+
+if __name__ == "__main__":
+ try:
+ args = parse_arguments()
+
+ if args.batch:
+ csv_files = get_csv_files()
+ print_color(f"\n🎯 발견된 CSV 파일: {len(csv_files)}개", Colors.BOLD)
+ print_info(f"처리할 파일 목록: {csv_files}")
+
+ total_files = len(csv_files)
+ success_count = 0
+ error_count = 0
+
+ for i, target in enumerate(csv_files, 1):
+ print_color(f"\n{'='*60}", Colors.BOLD)
+ print_color(f"진행률: {i}/{total_files} - {target}", Colors.BOLD)
+ print_color(f"{'='*60}", Colors.BOLD)
+
+ try:
+ process_wsdl_file(target)
+ success_count += 1
+ except Exception as e:
+ print_error(f"파일 처리 실패 - {target}: {str(e)}")
+ error_count += 1
+
+ # 최종 통계
print_color(f"\n{'='*60}", Colors.BOLD)
- print_color(f"진행률: {i}/{total_files} - {target}", Colors.BOLD)
+ print_color("🏁 전체 처리 완료", Colors.BOLD)
print_color(f"{'='*60}", Colors.BOLD)
+ print_success(f"성공: {success_count}개 파일")
+ if error_count > 0:
+ print_error(f"실패: {error_count}개 파일")
+ else:
+ print_success("모든 파일이 성공적으로 처리되었습니다!")
+
+ # 발견된 SAP 타입들 출력 (PostgreSQL 매핑용)
+ print_color(f"\n{'='*60}", Colors.BOLD)
+ print_color("📊 발견된 SAP 타입 통계 (PostgreSQL 매핑용)", Colors.MAGENTA)
+ print_color(f"{'='*60}", Colors.BOLD)
+
+ print_color(f"\n🔤 고유 SAP 타입 ({len(discovered_sap_types)}개):", Colors.CYAN)
+ for sap_type in sorted(discovered_sap_types):
+ print(f" - {sap_type}")
+
+ print_color(f"\n📏 타입-사이즈 조합 ({len(type_size_combinations)}개):", Colors.YELLOW)
+ for combination in sorted(type_size_combinations):
+ print(f" - {combination}")
+
+ print_color(f"\n💡 PostgreSQL 타입 매핑 가이드 (XML 파싱/조회용):", Colors.GREEN)
+ print(" 🎯 실용적 접근법:")
+ print(" - 대부분 → VARCHAR(500) 또는 TEXT (XML에서 모든 데이터가 문자열로 전송)")
+ print(" - 숫자 검색/정렬이 필요한 경우만 → NUMERIC")
+ print(" - 날짜 검색/정렬이 필요한 경우만 → DATE/TIMESTAMP")
+ print("")
+ print(" 📋 SAP 타입별 상세:")
+ print(" - CHAR, VARC, LCHR → VARCHAR(해당사이즈) 또는 TEXT")
+ print(" - DATS (날짜) → VARCHAR(8) 또는 DATE (YYYYMMDD 형식)")
+ print(" - TIMS (시간) → VARCHAR(6) 또는 TIME (HHMMSS 형식)")
+ print(" - CURR, DEC, QUAN, NUMB, NUMC, FLTP → VARCHAR 또는 NUMERIC")
+ print(" - CUKY (통화), UNIT (단위), LANG (언어) → VARCHAR(10)")
+ print("")
+ print(" ⚡ 권장: 초기에는 모두 VARCHAR/TEXT로 시작하고 필요시 변환")
+
+ else: # --csv 옵션 사용 시
+ print_color(f"\n🎯 개별 파일 처리 모드", Colors.BOLD)
+ print_info(f"CSV 파일: {args.csv}")
+ print_info(f"WSDL 파일: {args.wsdl}")
+
try:
- process_wsdl_file(target)
- success_count += 1
+ process_files(args.csv, args.wsdl)
+ print_success("개별 파일 처리 완료!")
except Exception as e:
- print_error(f"파일 처리 실패 - {target}: {str(e)}")
- error_count += 1
-
- # 최종 통계
- print_color(f"\n{'='*60}", Colors.BOLD)
- print_color("🏁 전체 처리 완료", Colors.BOLD)
- print_color(f"{'='*60}", Colors.BOLD)
-
- print_success(f"성공: {success_count}개 파일")
- if error_count > 0:
- print_error(f"실패: {error_count}개 파일")
- else:
- print_success("모든 파일이 성공적으로 처리되었습니다!")
-
- # 발견된 SAP 타입들 출력 (PostgreSQL 매핑용)
- print_color(f"\n{'='*60}", Colors.BOLD)
- print_color("📊 발견된 SAP 타입 통계 (PostgreSQL 매핑용)", Colors.MAGENTA)
- print_color(f"{'='*60}", Colors.BOLD)
-
- print_color(f"\n🔤 고유 SAP 타입 ({len(discovered_sap_types)}개):", Colors.CYAN)
- for sap_type in sorted(discovered_sap_types):
- print(f" - {sap_type}")
-
- print_color(f"\n📏 타입-사이즈 조합 ({len(type_size_combinations)}개):", Colors.YELLOW)
- for combination in sorted(type_size_combinations):
- print(f" - {combination}")
-
- print_color(f"\n💡 PostgreSQL 타입 매핑 가이드 (XML 파싱/조회용):", Colors.GREEN)
- print(" 🎯 실용적 접근법:")
- print(" - 대부분 → VARCHAR(500) 또는 TEXT (XML에서 모든 데이터가 문자열로 전송)")
- print(" - 숫자 검색/정렬이 필요한 경우만 → NUMERIC")
- print(" - 날짜 검색/정렬이 필요한 경우만 → DATE/TIMESTAMP")
- print("")
- print(" 📋 SAP 타입별 상세:")
- print(" - CHAR, VARC, LCHR → VARCHAR(해당사이즈) 또는 TEXT")
- print(" - DATS (날짜) → VARCHAR(8) 또는 DATE (YYYYMMDD 형식)")
- print(" - TIMS (시간) → VARCHAR(6) 또는 TIME (HHMMSS 형식)")
- print(" - CURR, DEC, QUAN, NUMB, NUMC, FLTP → VARCHAR 또는 NUMERIC")
- print(" - CUKY (통화), UNIT (단위), LANG (언어) → VARCHAR(10)")
- print("")
- print(" ⚡ 권장: 초기에는 모두 VARCHAR/TEXT로 시작하고 필요시 변환")
+ print_error(f"개별 파일 처리 실패: {str(e)}")
+ exit(1)
except Exception as e:
print_error(f"스크립트 실행 중 치명적 오류 발생: {str(e)}")
diff --git a/public/wsdl/_util/update_wsdl_with_csv_for_mdg.py b/public/wsdl/_util/update_wsdl_with_csv_for_mdg.py
new file mode 100644
index 00000000..8fb42141
--- /dev/null
+++ b/public/wsdl/_util/update_wsdl_with_csv_for_mdg.py
@@ -0,0 +1,632 @@
+#!/usr/bin/env python3
+import csv
+import re
+import shutil
+import os
+from datetime import datetime
+
+# 컬러 로그를 위한 색상 코드 추가
+class Colors:
+ RED = '\033[91m'
+ GREEN = '\033[92m'
+ YELLOW = '\033[93m'
+ BLUE = '\033[94m'
+ MAGENTA = '\033[95m'
+ CYAN = '\033[96m'
+ WHITE = '\033[97m'
+ ENDC = '\033[0m' # End color
+ BOLD = '\033[1m'
+
+def print_color(message, color=Colors.WHITE):
+ """컬러 출력 함수"""
+ print(f"{color}{message}{Colors.ENDC}")
+
+def print_error(message):
+ """에러 메시지 출력"""
+ print_color(f"❌ ERROR: {message}", Colors.RED)
+
+def print_warning(message):
+ """경고 메시지 출력"""
+ print_color(f"⚠️ WARNING: {message}", Colors.YELLOW)
+
+def print_success(message):
+ """성공 메시지 출력"""
+ print_color(f"✅ SUCCESS: {message}", Colors.GREEN)
+
+def print_info(message):
+ """정보 메시지 출력"""
+ print_color(f"ℹ️ INFO: {message}", Colors.CYAN)
+
+"""
+실제 CSV 파일들
+IF_MDZ_EVCP_CUSTOMER_MASTER.csv IF_MDZ_EVCP_EMPLOYEE_REFERENCE.csv IF_MDZ_EVCP_MATERIAL_MASTER_PART_RETURN.csv IF_MDZ_EVCP_PROJECT_MASTER.csv
+IF_MDZ_EVCP_DEPARTMENT_CODE.csv IF_MDZ_EVCP_EQUP_MASTER.csv IF_MDZ_EVCP_MODEL_MASTER.csv IF_MDZ_EVCP_VENDOR_MASTER.csv
+IF_MDZ_EVCP_EMPLOYEE_MASTER.csv IF_MDZ_EVCP_MATERIAL_MASTER_PART.csv IF_MDZ_EVCP_ORGANIZATION_MASTER.csv
+"""
+
+# ===== 설정 =====
+CSV_DIR = './public/wsdl/_csv'
+WSDL_DIR = './public/wsdl'
+
+# 발견된 SAP 타입들을 수집하기 위한 전역 SET
+discovered_sap_types = set()
+type_size_combinations = set() # 타입-사이즈 조합도 수집
+
+# 필드명 매핑 테이블 (CSV -> WSDL)
+FIELD_MAPPING = {
+ # 개별 WSDL 별 테이블 만들기로 했으므로 사용하지 않고 WSDL 그대로 사용
+ # 'ADR_NO': 'ADDRNO',
+ # 필요한 경우 더 추가
+}
+
+# 테이블 매핑 테이블 (complexType -> CSV Table)
+TABLE_MAPPING = {
+ # 'MATL': 'MATL',
+ # 'UNIT': 'MATL/UNIT',
+ # 필요한 경우 더 추가
+}
+
+def normalize_sap_type_and_size(sap_type, size_str):
+ """SAP 타입과 사이즈를 정규화"""
+ global discovered_sap_types, type_size_combinations
+
+ try:
+ # 타입을 대문자로 변환
+ normalized_type = sap_type.upper().strip() if sap_type else 'CHAR'
+
+ # 사이즈 처리
+ normalized_size = size_str.strip() if size_str else ''
+ original_size = normalized_size # 원본 사이즈 보존 (로깅용)
+
+ # 빈 사이즈인 경우 기본값 설정
+ if not normalized_size:
+ normalized_size = '255'
+ else:
+ # 따옴표로 감싸진 경우 제거 (예: "1,0")
+ quote_removed = False
+ if normalized_size.startswith('"') and normalized_size.endswith('"'):
+ before_quote_removal = normalized_size
+ normalized_size = normalized_size[1:-1]
+ quote_removed = True
+ print_color(f"🔍 SIZE 파싱: 따옴표 제거 - '{before_quote_removal}' -> '{normalized_size}' (Type: {normalized_type})", Colors.YELLOW)
+
+ # 로깅: 최종 결과 (따옴표가 없는 경우만)
+ if not quote_removed and original_size:
+ print_color(f"🔍 SIZE 파싱: 따옴표 없음 - '{original_size}' 그대로 사용 (Type: {normalized_type})", Colors.BLUE)
+
+ # 발견된 타입들을 SET에 추가
+ discovered_sap_types.add(normalized_type)
+ type_size_combinations.add(f"{normalized_type}({normalized_size})")
+
+ # 컬럼 구분자나 특수문자가 있는 경우 그대로 유지
+ # DEC, QUAN, NUMB 등에서 "1,0" 형태의 사이즈는 정상
+
+ return normalized_type, normalized_size
+
+ except Exception as e:
+ print_error(f"타입/사이즈 정규화 실패 - Type: {sap_type}, Size: {size_str}, Error: {str(e)}")
+ return 'CHAR', '255' # 기본값 반환
+
+def safe_description_escape(description):
+ """Description 필드의 특수문자를 안전하게 처리"""
+ try:
+ if not description:
+ return ''
+
+ # HTML/XML 특수문자 이스케이프
+ description = description.replace('&', '&amp;')
+ description = description.replace('<', '&lt;')
+ description = description.replace('>', '&gt;')
+ description = description.replace('"', '&quot;')
+ description = description.replace("'", '&apos;')
+
+ return description
+
+ except Exception as e:
+ print_error(f"Description 이스케이프 실패: {description}, Error: {str(e)}")
+ return str(description) if description else ''
+
+def get_csv_files():
+ """CSV 디렉토리에서 모든 CSV 파일 목록을 가져옴"""
+ csv_files = []
+ for file in os.listdir(CSV_DIR):
+ if file.endswith('.csv'):
+ csv_files.append(file.replace('.csv', ''))
+ return csv_files
+
+def get_complex_type_info(wsdl_content):
+ """WSDL 파일에서 complexType 정보를 추출"""
+ complex_types = {}
+ current_type = None
+ current_fields = []
+ type_stack = [] # 중첩된 complexType을 추적하기 위한 스택
+
+ for line in wsdl_content:
+ # complexType 시작 태그 찾기
+ type_match = re.search(r'<xsd:complexType\s+name="([^"]+)"', line)
+ if type_match:
+ if current_type:
+ type_stack.append(current_type)
+ current_type = type_match.group(1)
+ current_fields = []
+ continue
+
+ # complexType 종료 태그 찾기
+ if '</xsd:complexType>' in line:
+ if current_type:
+ complex_types[current_type] = current_fields
+ if type_stack:
+ current_type = type_stack.pop()
+ else:
+ current_type = None
+ continue
+
+ # element 태그 찾기
+ element_match = re.search(r'<xsd:element\s+name="([^"]+)"', line)
+ if element_match and current_type:
+ field_info = {
+ 'name': element_match.group(1),
+ 'type': re.search(r'type="([^"]+)"', line).group(1) if 'type="' in line else None,
+ 'is_array': 'maxOccurs="unbounded"' in line
+ }
+ current_fields.append(field_info)
+
+ return complex_types
+
+def load_csv_data(csv_file):
+ """CSV 파일에서 필드 정보를 딕셔너리로 로드"""
+ csv_data = {}
+ csv_path = os.path.join(CSV_DIR, f'{csv_file}.csv')
+
+ try:
+ with open(csv_path, 'r', encoding='utf-8-sig') as f: # BOM 처리
+ reader = csv.DictReader(f)
+ for row_num, row in enumerate(reader, start=2): # 헤더 다음부터 2행
+ try:
+ field_name = row['Field']
+ table_name = row['Table']
+
+ # 매핑된 필드명이 있으면 사용, 없으면 원래 필드명 사용
+ wsdl_field_name = FIELD_MAPPING.get(field_name, field_name)
+
+ # 테이블 정보를 키에 포함 (구분자를 || 로 변경)
+ key = f"{wsdl_field_name}||{table_name}"
+
+ # 타입과 사이즈 정규화
+ normalized_type, normalized_size = normalize_sap_type_and_size(
+ row.get('Type', ''), row.get('Size', '')
+ )
+
+ # Description 안전 처리
+ safe_desc = safe_description_escape(row.get('Description', ''))
+
+ csv_data[key] = {
+ 'seq': row.get('SEQ', ''),
+ 'table': table_name,
+ 'field': row.get('Field', ''), # 원래 CSV 필드명 저장
+ 'mo': row.get('M/O', ''),
+ 'type': normalized_type, # 정규화된 타입
+ 'size': normalized_size, # 정규화된 사이즈
+ 'description': safe_desc, # 안전 처리된 Description
+ 'original_type': row.get('Type', ''), # 원본 타입 보존
+ 'original_size': row.get('Size', '') # 원본 사이즈 보존
+ }
+
+ except Exception as e:
+ print_error(f"CSV 행 {row_num} 처리 실패 - {csv_file}: {str(e)}")
+ print_error(f"문제 행 데이터: {row}")
+ continue
+
+ except Exception as e:
+ print_error(f"CSV 파일 로딩 실패 - {csv_path}: {str(e)}")
+ return {}
+
+ return csv_data
+
+def extract_field_name_from_line(line):
+ """라인에서 name="필드명" 추출"""
+ match = re.search(r'name="([^"]+)"', line)
+ return match.group(1) if match else None
+
+def has_seq_in_comment(comment_line):
+ """주석에 SEQ가 있는지 확인"""
+ return 'SEQ:' in comment_line
+
+def get_indentation(line):
+ """라인의 들여쓰기 반환"""
+ return len(line) - len(line.lstrip())
+
+def create_comment(field_name, csv_data, indentation, complex_type):
+ """CSV 데이터를 기반으로 주석 생성"""
+ try:
+ # 필드명으로 시작하는 키들을 찾음 (대소문자 구분 없이)
+ matching_keys = [key for key in csv_data.keys() if key.split('||')[0].upper() == field_name.upper()]
+ if not matching_keys:
+ indent = ' ' * indentation
+ print_warning(f"매칭되지 않은 필드: {field_name}")
+ return f"{indent}<!-- TODO: UNMATCHED FIELD OCCURS - {field_name} -->"
+
+ # complexType과 일치하는 테이블 정보 찾기
+ matching_data = None
+
+ # 1. complexType 이름과 완전히 일치하는 테이블 찾기
+ for key in matching_keys:
+ table_name = key.split('||', 1)[1]
+ if complex_type.upper() == table_name.upper():
+ matching_data = csv_data[key]
+ break
+
+ # 2. CSV 테이블명을 '/'로 스플릿한 마지막 부분이 complexType과 일치하는 경우
+ if not matching_data:
+ for key in matching_keys:
+ table_name = key.split('||', 1)[1]
+ if '/' in table_name:
+ last_part = table_name.split('/')[-1]
+ if complex_type.upper() == last_part.upper():
+ matching_data = csv_data[key]
+ break
+
+ # 3. 필드명만 일치하는 경우 (첫 번째 매칭 데이터 사용)
+ if not matching_data:
+ matching_data = csv_data[matching_keys[0]]
+
+ # 4. 매칭된 데이터가 있으면 주석 생성, 없으면 매칭 실패 주석
+ if matching_data:
+ indent = ' ' * indentation
+
+ # CSV의 실제 타입과 사이즈 사용
+ comment = f"{indent}<!-- SEQ:{matching_data['seq']}, Table:{matching_data['table']}, Field:{matching_data['field']}, M/O:{matching_data['mo']}, Type:{matching_data['type']}, Size:{matching_data['size']}, Description:{matching_data['description']} -->"
+
+ print_info(f"주석 생성 완료: {field_name} -> Type:{matching_data['type']}, Size:{matching_data['size']}")
+ return comment
+ else:
+ indent = ' ' * indentation
+ print_warning(f"매칭 데이터를 찾을 수 없음: {field_name}")
+ return f"{indent}<!-- TODO: NO MATCHING DATA FOUND - {field_name} -->"
+
+ except Exception as e:
+ indent = ' ' * indentation
+ print_error(f"주석 생성 실패 - 필드: {field_name}, 에러: {str(e)}")
+ return f"{indent}<!-- ERROR: COMMENT GENERATION FAILED - {field_name} -->"
+
+def normalize_comment(comment_line):
+ """주석을 정규화 (공백 제거, 소문자 변환 등)"""
+ # <!-- 와 --> 제거하고 내용만 추출
+ content = re.sub(r'^\s*<!--\s*|\s*-->\s*$', '', comment_line.strip())
+ # 여러 공백을 하나로 통합
+ content = re.sub(r'\s+', ' ', content)
+ return content.strip()
+
+def comments_are_equal(existing_comment, expected_comment):
+ """두 주석이 같은 내용인지 비교"""
+ existing_normalized = normalize_comment(existing_comment)
+ expected_normalized = normalize_comment(expected_comment)
+ return existing_normalized == expected_normalized
+
+def should_process_line(line, csv_data):
+ """라인이 처리 대상인지 확인"""
+ # 네 조건을 모두 만족해야 함:
+ # 1. <xsd:element 태그
+ # 2. name=" 속성이 있는 태그
+ # 3. maxOccurs=" 속성이 없는 태그 (배열 데이터 제외)
+ # 4. CSV에 해당 필드가 있는 경우
+
+ if not ('<xsd:element' in line and 'name="' in line):
+ return False
+
+ # maxOccurs=" 가 있으면 배열 데이터이므로 제외 (모든 maxOccurs 속성)
+ if 'maxOccurs="' in line:
+ return False
+
+ field_name = extract_field_name_from_line(line)
+ if not field_name:
+ return False
+
+ # 필드명이 CSV 데이터의 키에 정확히 일치하는지 확인 (대소문자 구분 없이)
+ return any(field_name.upper() == key.split('||')[0].upper() for key in csv_data.keys())
+
+def get_skip_reason(line, csv_data):
+ """필드를 건너뛰는 이유를 반환"""
+ if not ('<xsd:element' in line and 'name="' in line):
+ return None
+
+ field_name = extract_field_name_from_line(line)
+ if not field_name:
+ return None
+
+ # maxOccurs 체크 (배열 타입)
+ if 'maxOccurs="' in line:
+ return "ARRAY_TYPE"
+
+ # 복합객체인 경우
+ if 'MASTER' in field_name:
+ return "COMPLEX_TYPE"
+
+ # CSV에 있는지 체크
+ has_csv_data = any(field_name.upper() == key.split('||')[0].upper() for key in csv_data.keys())
+ if not has_csv_data:
+ # Req로 끝나는 경우는 래퍼 타입이므로 정상
+ if field_name.endswith('Req'):
+ return "REQ_WRAPPER_TYPE"
+ else:
+ return "NO_CSV_DATA"
+
+ return None
+
+def backup_file(filepath):
+ """파일을 백업"""
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+ backup_path = f"{filepath}.backup_{timestamp}"
+ shutil.copy2(filepath, backup_path)
+ print(f"백업 파일 생성: {backup_path}")
+ return backup_path
+
+def process_wsdl_file(target):
+ """WSDL 파일 처리"""
+ csv_file_path = os.path.join(CSV_DIR, f'{target}.csv')
+ wsdl_file_path = os.path.join(WSDL_DIR, f'{target}.wsdl')
+
+ try:
+ # 백업 생성
+ backup_path = backup_file(wsdl_file_path)
+
+ print_color(f"\n🚀 처리 시작: {target}", Colors.BOLD)
+ print_info("CSV 데이터 로딩 중...")
+ csv_data = load_csv_data(target)
+ print_success(f"CSV에서 {len(csv_data)}개 필드 정보 로드됨")
+
+ # WSDL 파일 읽기
+ with open(wsdl_file_path, 'r', encoding='utf-8') as f:
+ lines = f.readlines()
+
+ # complexType 정보 추출
+ complex_types = get_complex_type_info(lines)
+ print_success(f"WSDL에서 {len(complex_types)}개 complexType 정보 추출됨")
+ except Exception as e:
+ print_error(f"파일 초기화 실패 - {target}: {str(e)}")
+ return
+
+ # complexType 구조 출력 (디버깅용)
+ for type_name, fields in complex_types.items():
+ print_color(f"\nComplexType: {type_name}", Colors.MAGENTA)
+ for field in fields:
+ print(f" - {field['name']} ({field['type']}) {'[Array]' if field['is_array'] else ''}")
+
+ new_lines = []
+ i = 0
+ changes_made = 0
+ processed_fields = []
+ skipped_fields = []
+ skipped_array_fields = []
+ skipped_no_csv_fields = []
+ skipped_req_wrapper_fields = []
+ verified_correct = 0
+ corrected_seq = 0
+ error_count = 0
+
+ current_complex_type = None
+ type_stack = [] # 중첩된 complexType을 추적하기 위한 스택
+
+ while i < len(lines):
+ line = lines[i]
+ line_processed = False
+
+ try:
+ # complexType 시작 태그 확인
+ type_match = re.search(r'<xsd:complexType\s+name="([^"]+)"', line)
+ if type_match:
+ if current_complex_type:
+ type_stack.append(current_complex_type)
+ current_complex_type = type_match.group(1)
+ print_color(f"현재 complexType: {current_complex_type}", Colors.BLUE)
+
+ # complexType 종료 태그 확인
+ if '</xsd:complexType>' in line:
+ if type_stack:
+ current_complex_type = type_stack.pop()
+ print_color(f"이전 complexType으로 복귀: {current_complex_type}", Colors.BLUE)
+ else:
+ current_complex_type = None
+
+ # CSV에 있는 xsd:element 필드인지 확인
+ if should_process_line(line, csv_data):
+ field_name = extract_field_name_from_line(line)
+
+ if field_name and current_complex_type:
+ processed_fields.append(field_name)
+ print_color(f"처리 중인 필드: {field_name} (complexType: {current_complex_type})", Colors.CYAN)
+
+ # 바로 위 라인이 주석인지 확인 (공백 라인 건너뛰면서)
+ comment_line_index = -1
+ j = len(new_lines) - 1
+
+ while j >= 0:
+ prev_line = new_lines[j].strip()
+ if prev_line == '':
+ j -= 1
+ continue
+ elif prev_line.startswith('<!--') and prev_line.endswith('-->'):
+ comment_line_index = j
+ break
+ else:
+ break
+
+ if comment_line_index >= 0:
+ existing_comment = new_lines[comment_line_index]
+
+ if has_seq_in_comment(existing_comment):
+ indentation = get_indentation(line)
+ expected_comment = create_comment(field_name, csv_data, indentation, current_complex_type)
+
+ if expected_comment:
+ if comments_are_equal(existing_comment, expected_comment):
+ verified_correct += 1
+ print_success(f" 주석 검증 통과")
+ else:
+ new_lines[comment_line_index] = expected_comment + '\n'
+ changes_made += 1
+ corrected_seq += 1
+ print_warning(f" SEQ 주석 수정: {field_name}")
+ print(f" 기존: {existing_comment.strip()}")
+ print(f" 수정: {expected_comment}")
+ else:
+ indentation = get_indentation(line)
+ new_comment = create_comment(field_name, csv_data, indentation, current_complex_type)
+ if new_comment:
+ new_lines[comment_line_index] = new_comment + '\n'
+ changes_made += 1
+ print_warning(f" 주석 교체: {field_name}")
+ else:
+ indentation = get_indentation(line)
+ new_comment = create_comment(field_name, csv_data, indentation, current_complex_type)
+ if new_comment:
+ new_lines.append(new_comment + '\n')
+ changes_made += 1
+ print_info(f" 주석 추가: {field_name}")
+
+ line_processed = True
+ elif '<xsd:element' in line and 'name="' in line:
+ field_name = extract_field_name_from_line(line)
+ if field_name:
+ skip_reason = get_skip_reason(line, csv_data)
+ if skip_reason == "ARRAY_TYPE":
+ skipped_array_fields.append(field_name)
+ skipped_fields.append(field_name)
+ print_color(f"건너뛴 필드: {field_name} (배열 타입 - maxOccurs 속성)", Colors.YELLOW)
+ elif skip_reason == "REQ_WRAPPER_TYPE":
+ skipped_req_wrapper_fields.append(field_name)
+ skipped_fields.append(field_name)
+ print_color(f"건너뛴 필드: {field_name} (요청 래퍼 타입 - 정상)", Colors.BLUE)
+ elif skip_reason == "NO_CSV_DATA":
+ skipped_no_csv_fields.append(field_name)
+ skipped_fields.append(field_name)
+ print_error(f"건너뛴 필드: {field_name} (CSV에 데이터 없음 - 확인 필요!)")
+ else:
+ # 기타 이유로 건너뛴 경우
+ skipped_fields.append(field_name)
+ print_warning(f"건너뛴 필드: {field_name} (기타 이유)")
+
+ except Exception as e:
+ print_error(f"라인 처리 중 오류 발생 (라인 {i+1}): {str(e)}")
+ print_error(f"문제 라인: {line.strip()}")
+ error_count += 1
+
+ new_lines.append(line)
+ i += 1
+
+ # 결과 저장
+ try:
+ with open(wsdl_file_path, 'w', encoding='utf-8') as f:
+ f.writelines(new_lines)
+ print_success("WSDL 파일 저장 완료")
+ except Exception as e:
+ print_error(f"WSDL 파일 저장 실패: {str(e)}")
+ return
+
+ # 결과 출력
+ print_color(f"\n{'='*50}", Colors.BOLD)
+ print_color(f"처리 완료: {target}", Colors.BOLD)
+ print_color(f"{'='*50}", Colors.BOLD)
+
+ print_info(f"CSV 파일: {csv_file_path}")
+ print_info(f"WSDL 파일: {wsdl_file_path}")
+ print_info(f"백업 파일: {backup_path}")
+
+ print_color(f"\n📊 처리 통계:", Colors.MAGENTA)
+ print(f" 총 변경사항: {changes_made}개")
+ print(f" 처리된 CSV 필드 수: {len(processed_fields)}")
+ print(f" 건너뛴 필드 총계: {len(skipped_fields)}")
+ print(f" ├─ 배열 타입 (정상): {len(skipped_array_fields)}개")
+ print(f" ├─ 요청 래퍼 타입 (정상): {len(skipped_req_wrapper_fields)}개")
+ print_color(f" └─ CSV 누락 (문제): {len(skipped_no_csv_fields)}개", Colors.RED if len(skipped_no_csv_fields) > 0 else Colors.WHITE)
+ print(f" 검증 통과한 SEQ 주석: {verified_correct}개")
+ print(f" 수정된 SEQ 주석: {corrected_seq}개")
+ print(f" 오류 발생 횟수: {error_count}개")
+
+ # CSV 누락 필드 상세 표시
+ if len(skipped_no_csv_fields) > 0:
+ print_error(f"\n⚠️ CSV에 누락된 필드 목록 (확인 필요):")
+ for field in skipped_no_csv_fields:
+ print_error(f" - {field}")
+
+ # 최종 결과
+ if error_count > 0:
+ print_error(f"\n⚠️ {error_count}개의 오류가 발생했습니다. 로그를 확인해주세요.")
+
+ if len(skipped_no_csv_fields) > 0:
+ print_error(f"\n🚨 주의: {len(skipped_no_csv_fields)}개의 필드가 CSV에 누락되어 있습니다!")
+ print_error("이 필드들은 WSDL에 정의되어 있지만 CSV 스펙에 없어 주석이 생성되지 않았습니다.")
+
+ if changes_made == 0:
+ print_success(f"\n모든 주석이 정확합니다! (검증된 SEQ 주석: {verified_correct}개)")
+ else:
+ print_success(f"\n{changes_made}개의 주석이 수정되었습니다.")
+ if corrected_seq > 0:
+ print(f" - 기존 SEQ 주석 수정: {corrected_seq}개")
+ if changes_made - corrected_seq > 0:
+ print(f" - 새로 추가/교체된 주석: {changes_made - corrected_seq}개")
+
+if __name__ == "__main__":
+ try:
+ csv_files = get_csv_files()
+ print_color(f"\n🎯 발견된 CSV 파일: {len(csv_files)}개", Colors.BOLD)
+ print_info(f"처리할 파일 목록: {csv_files}")
+
+ total_files = len(csv_files)
+ success_count = 0
+ error_count = 0
+
+ for i, target in enumerate(csv_files, 1):
+ print_color(f"\n{'='*60}", Colors.BOLD)
+ print_color(f"진행률: {i}/{total_files} - {target}", Colors.BOLD)
+ print_color(f"{'='*60}", Colors.BOLD)
+
+ try:
+ process_wsdl_file(target)
+ success_count += 1
+ except Exception as e:
+ print_error(f"파일 처리 실패 - {target}: {str(e)}")
+ error_count += 1
+
+ # 최종 통계
+ print_color(f"\n{'='*60}", Colors.BOLD)
+ print_color("🏁 전체 처리 완료", Colors.BOLD)
+ print_color(f"{'='*60}", Colors.BOLD)
+
+ print_success(f"성공: {success_count}개 파일")
+ if error_count > 0:
+ print_error(f"실패: {error_count}개 파일")
+ else:
+ print_success("모든 파일이 성공적으로 처리되었습니다!")
+
+ # 발견된 SAP 타입들 출력 (PostgreSQL 매핑용)
+ print_color(f"\n{'='*60}", Colors.BOLD)
+ print_color("📊 발견된 SAP 타입 통계 (PostgreSQL 매핑용)", Colors.MAGENTA)
+ print_color(f"{'='*60}", Colors.BOLD)
+
+ print_color(f"\n🔤 고유 SAP 타입 ({len(discovered_sap_types)}개):", Colors.CYAN)
+ for sap_type in sorted(discovered_sap_types):
+ print(f" - {sap_type}")
+
+ print_color(f"\n📏 타입-사이즈 조합 ({len(type_size_combinations)}개):", Colors.YELLOW)
+ for combination in sorted(type_size_combinations):
+ print(f" - {combination}")
+
+ print_color(f"\n💡 PostgreSQL 타입 매핑 가이드 (XML 파싱/조회용):", Colors.GREEN)
+ print(" 🎯 실용적 접근법:")
+ print(" - 대부분 → VARCHAR(500) 또는 TEXT (XML에서 모든 데이터가 문자열로 전송)")
+ print(" - 숫자 검색/정렬이 필요한 경우만 → NUMERIC")
+ print(" - 날짜 검색/정렬이 필요한 경우만 → DATE/TIMESTAMP")
+ print("")
+ print(" 📋 SAP 타입별 상세:")
+ print(" - CHAR, VARC, LCHR → VARCHAR(해당사이즈) 또는 TEXT")
+ print(" - DATS (날짜) → VARCHAR(8) 또는 DATE (YYYYMMDD 형식)")
+ print(" - TIMS (시간) → VARCHAR(6) 또는 TIME (HHMMSS 형식)")
+ print(" - CURR, DEC, QUAN, NUMB, NUMC, FLTP → VARCHAR 또는 NUMERIC")
+ print(" - CUKY (통화), UNIT (단위), LANG (언어) → VARCHAR(10)")
+ print("")
+ print(" ⚡ 권장: 초기에는 모두 VARCHAR/TEXT로 시작하고 필요시 변환")
+
+ except Exception as e:
+ print_error(f"스크립트 실행 중 치명적 오류 발생: {str(e)}")
+ exit(1) \ No newline at end of file
diff --git a/public/wsdl/_util/wsdl_comment_to_drizzle_schema.py b/public/wsdl/_util/wsdl_comment_to_drizzle_schema.py
new file mode 100755
index 00000000..73e71374
--- /dev/null
+++ b/public/wsdl/_util/wsdl_comment_to_drizzle_schema.py
@@ -0,0 +1,584 @@
+#!/usr/bin/env python3
+"""
+개별 WSDL 파일을 Drizzle 스키마로 변환하는 스크립트
+Usage: python3 wsdl_comment_to_drizzle_schema.py --wsdl IF_ECC_EVCP_PR_INFORMATION.wsdl
+"""
+
+import argparse
+import os
+import re
+import xml.etree.ElementTree as ET
+from pathlib import Path
+from typing import Dict, List, Set, Tuple, Optional
+from collections import defaultdict
+import sys
+from datetime import datetime
+
+class ColorLogger:
+ """컬러 로깅을 위한 클래스"""
+
+ # ANSI 컬러 코드
+ COLORS = {
+ 'RESET': '\033[0m',
+ 'BOLD': '\033[1m',
+ 'DIM': '\033[2m',
+
+ # 기본 컬러
+ 'BLACK': '\033[30m',
+ 'RED': '\033[31m',
+ 'GREEN': '\033[32m',
+ 'YELLOW': '\033[33m',
+ 'BLUE': '\033[34m',
+ 'MAGENTA': '\033[35m',
+ 'CYAN': '\033[36m',
+ 'WHITE': '\033[37m',
+
+ # 밝은 컬러
+ 'BRIGHT_BLACK': '\033[90m',
+ 'BRIGHT_RED': '\033[91m',
+ 'BRIGHT_GREEN': '\033[92m',
+ 'BRIGHT_YELLOW': '\033[93m',
+ 'BRIGHT_BLUE': '\033[94m',
+ 'BRIGHT_MAGENTA': '\033[95m',
+ 'BRIGHT_CYAN': '\033[96m',
+ 'BRIGHT_WHITE': '\033[97m',
+
+ # 배경 컬러
+ 'BG_RED': '\033[41m',
+ 'BG_GREEN': '\033[42m',
+ 'BG_YELLOW': '\033[43m',
+ 'BG_BLUE': '\033[44m',
+ }
+
+ def __init__(self, enable_colors: bool = True):
+ """
+ 컬러 로거 초기화
+ Args:
+ enable_colors: Windows CMD에서는 False로 설정 가능
+ """
+ self.enable_colors = enable_colors and self._supports_color()
+
+ def _supports_color(self) -> bool:
+ """컬러 지원 여부 확인"""
+ # Windows에서 colorama가 없으면 컬러 비활성화
+ if os.name == 'nt':
+ try:
+ import colorama
+ colorama.init()
+ return True
+ except ImportError:
+ return False
+ return True
+
+ def _colorize(self, text: str, color: str) -> str:
+ """텍스트에 컬러 적용"""
+ if not self.enable_colors:
+ return text
+ return f"{self.COLORS.get(color, '')}{text}{self.COLORS['RESET']}"
+
+ def header(self, text: str):
+ """헤더 로그 (굵은 파란색)"""
+ colored_text = self._colorize(text, 'BOLD')
+ colored_text = self._colorize(colored_text, 'BRIGHT_BLUE')
+ print(colored_text)
+
+ def info(self, text: str):
+ """정보 로그 (파란색)"""
+ colored_text = self._colorize(text, 'BLUE')
+ print(colored_text)
+
+ def success(self, text: str):
+ """성공 로그 (초록색)"""
+ colored_text = self._colorize(text, 'BRIGHT_GREEN')
+ print(colored_text)
+
+ def warning(self, text: str):
+ """경고 로그 (노란색)"""
+ colored_text = self._colorize(text, 'BRIGHT_YELLOW')
+ print(colored_text)
+
+ def error(self, text: str):
+ """에러 로그 (빨간색)"""
+ colored_text = self._colorize(text, 'BRIGHT_RED')
+ print(colored_text)
+
+ def debug(self, text: str):
+ """디버그 로그 (회색)"""
+ colored_text = self._colorize(text, 'BRIGHT_BLACK')
+ print(colored_text)
+
+ def table_info(self, text: str):
+ """테이블 정보 로그 (시안색)"""
+ colored_text = self._colorize(text, 'CYAN')
+ print(colored_text)
+
+ def field_info(self, text: str):
+ """필드 정보 로그 (마젠타)"""
+ colored_text = self._colorize(text, 'MAGENTA')
+ print(colored_text)
+
+ def separator(self, char: str = "=", length: int = 80):
+ """구분선 출력 (굵은 흰색)"""
+ line = char * length
+ colored_line = self._colorize(line, 'BOLD')
+ print(colored_line)
+
+# 전역 로거 인스턴스
+logger = ColorLogger()
+
+class WSDLAnalyzer:
+ def __init__(self, wsdl_file: str, table_prefix: Optional[str] = None):
+ """
+ WSDL 파일 분석기 초기화
+ Args:
+ wsdl_file: 분석할 WSDL 파일 경로
+ table_prefix: 테이블 접두사 (옵션)
+ """
+ self.wsdl_file = Path(wsdl_file)
+ self.table_prefix = table_prefix
+ self.tables = defaultdict(dict) # table_name -> {field_name: field_info}
+ self.table_hierarchy = defaultdict(list) # parent -> [children]
+
+ # 필드명 매핑 규칙 정의 (필요시 확장 가능)
+ self.field_name_mappings = {}
+
+ def analyze_wsdl(self) -> Tuple[Dict, Dict]:
+ """WSDL 파일을 분석하고 테이블 정보 반환"""
+ if not self.wsdl_file.exists():
+ raise FileNotFoundError(f"WSDL file not found: {self.wsdl_file}")
+
+ logger.info(f"Analyzing {self.wsdl_file.name}...")
+
+ try:
+ with open(self.wsdl_file, 'r', encoding='utf-8') as f:
+ content = f.read()
+
+ # 우선 정규식으로 분석 시도 (주석에서 테이블 정보 추출)
+ regex_count = self._extract_tables_from_regex(content, self.wsdl_file.name)
+
+ # 정규식으로 찾지 못했을 때만 XML 파싱 시도
+ if regex_count == 0:
+ try:
+ # XML 네임스페이스 등록
+ namespaces = {
+ 'xsd': 'http://www.w3.org/2001/XMLSchema',
+ 'wsdl': 'http://schemas.xmlsoap.org/wsdl/'
+ }
+
+ root = ET.fromstring(content)
+ self._extract_tables_from_xml(root, self.wsdl_file.name, namespaces)
+ except ET.ParseError as e:
+ logger.error(f" XML parsing failed: {e}")
+ except Exception as e:
+ logger.error(f" XML analysis error: {e}")
+
+ # 테이블별 필드 합집합 처리
+ self._merge_table_fields()
+
+ return self.tables, self.table_hierarchy
+
+ except Exception as e:
+ logger.error(f" Error analyzing {self.wsdl_file.name}: {e}")
+ raise
+
+ def _merge_table_fields(self):
+ """테이블별 필드 합집합 처리"""
+ merged_tables = defaultdict(dict)
+
+ for table_name, fields in self.tables.items():
+ # 테이블별 필드를 실제 필드명 기준으로 그룹화
+ field_groups = defaultdict(list) # actual_field_name -> [field_infos]
+
+ for field_key, field_info in fields.items():
+ # field_key에서 실제 필드명 추출 (|| 구분자 사용)
+ actual_field_name = field_key.split('||')[0] if '||' in field_key else field_key
+ field_groups[actual_field_name].append(field_info)
+
+ # 각 필드 그룹을 병합
+ for actual_field_name, field_infos in field_groups.items():
+ # 첫 번째 필드 정보를 기준으로 시작
+ merged_field = field_infos[0].copy()
+
+ # 모든 WSDL 소스 수집
+ all_sources = set()
+ all_descriptions = set()
+
+ for field_info in field_infos:
+ all_sources.add(field_info['wsdl_source'])
+ if field_info['description'].strip():
+ all_descriptions.add(field_info['description'].strip())
+
+ # 필수 필드인 경우 유지
+ if field_info['mandatory'] == 'M':
+ merged_field['mandatory'] = 'M'
+
+ # 병합된 정보 설정
+ merged_field['wsdl_sources'] = all_sources
+
+ # 설명 병합 (첫 번째 설명 사용)
+ if all_descriptions:
+ merged_field['description'] = list(all_descriptions)[0]
+ else:
+ merged_field['description'] = f'From {self.wsdl_file.name}'
+
+ # 테이블에 추가 (실제 필드명 사용)
+ merged_tables[table_name][actual_field_name] = merged_field
+
+ # 병합된 테이블 정보로 업데이트
+ self.tables = merged_tables
+
+ def _extract_tables_from_xml(self, root: ET.Element, wsdl_name: str, namespaces: dict):
+ """XML에서 테이블 정보 추출"""
+ # complexType 요소들에서 테이블 구조 추출
+ for complex_type in root.findall(".//xsd:complexType", namespaces):
+ table_name = complex_type.get('name')
+ if table_name:
+ self._extract_fields_from_complex_type(complex_type, table_name, wsdl_name, namespaces)
+
+ def _extract_tables_from_regex(self, content: str, wsdl_name: str) -> int:
+ """정규식으로 테이블 정보 추출"""
+
+ # 1단계: 모든 SEQ 주석 찾기
+ all_comments = re.findall(r'<!-- SEQ:\d+.*?-->', content, re.DOTALL)
+
+ matches = []
+ for comment in all_comments:
+ # 2단계: 단순한 파싱 방법 (콤마로 분할)
+ comment = comment.strip()
+
+ # 콤마로 분할해서 각 부분을 분석
+ parts = comment.split(', ')
+
+ if len(parts) >= 7:
+ try:
+ table = parts[1].split(':')[1] if ':' in parts[1] else ''
+ field = parts[2].split(':')[1] if ':' in parts[2] else ''
+ mo = parts[3].split(':')[1] if ':' in parts[3] else ''
+ type_val = parts[4].split(':')[1] if ':' in parts[4] else ''
+ size = parts[5].split(':')[1] if ':' in parts[5] else ''
+ desc = parts[6].split(':')[1].replace(' -->', '') if ':' in parts[6] else ''
+
+ matches.append((table, field, mo, type_val, size, desc))
+ except (IndexError, ValueError):
+ # 파싱 실패 시 무시
+ continue
+
+ for match in matches:
+ table_path, field_name, mandatory, field_type, size, description = match
+
+ # 필드명 매핑 적용
+ original_field_name = field_name.strip()
+ mapped_field_name = self._apply_field_name_mapping(original_field_name, wsdl_name)
+
+ # 테이블 경로에서 실제 테이블명 추출
+ table_parts = table_path.split('/')
+
+ # 계층 구조 기록
+ if len(table_parts) > 1:
+ for i in range(len(table_parts) - 1):
+ parent = '/'.join(table_parts[:i+1])
+ child = '/'.join(table_parts[:i+2])
+ if child not in self.table_hierarchy[parent]:
+ self.table_hierarchy[parent].append(child)
+
+ # 필드 정보 저장
+ field_info = {
+ 'field_name': mapped_field_name,
+ 'original_field_name': original_field_name,
+ 'mandatory': mandatory.strip(),
+ 'type': field_type.strip(),
+ 'size': size.strip(),
+ 'description': description.strip(),
+ 'table_path': table_path,
+ 'wsdl_source': wsdl_name
+ }
+
+ # 테이블명 생성
+ table_name = self._generate_table_name(table_path, wsdl_name)
+ field_key = f"{mapped_field_name}||{table_path}"
+ self.tables[table_name][field_key] = field_info
+
+ logger.success(f" Found {len(matches)} field definitions")
+ return len(matches)
+
+ def _extract_fields_from_complex_type(self, complex_type, table_name: str, wsdl_name: str, namespaces: dict):
+ """complexType에서 필드 정보 추출"""
+ for element in complex_type.findall(".//xsd:element", namespaces):
+ field_name = element.get('name')
+ field_type = element.get('type', 'unknown')
+ min_occurs = element.get('minOccurs', '1')
+
+ if field_name:
+ field_info = {
+ 'field_name': field_name,
+ 'original_field_name': field_name,
+ 'mandatory': 'M' if min_occurs != '0' else 'O',
+ 'type': field_type,
+ 'size': 'unknown',
+ 'description': f'From {table_name}',
+ 'table_path': table_name,
+ 'wsdl_source': wsdl_name
+ }
+
+ # 테이블명 생성
+ generated_table_name = self._generate_table_name(table_name, wsdl_name)
+ field_key = f"{field_name}||{table_name}"
+ self.tables[generated_table_name][field_key] = field_info
+
+ def _apply_field_name_mapping(self, field_name: str, wsdl_name: str) -> str:
+ """특정 WSDL 파일의 필드명을 매핑 규칙에 따라 변경"""
+ for wsdl_pattern, mappings in self.field_name_mappings.items():
+ if wsdl_pattern in wsdl_name.upper():
+ if field_name in mappings:
+ original_name = field_name
+ mapped_name = mappings[field_name]
+ logger.debug(f" Field mapping: {original_name} -> {mapped_name} (from {wsdl_name})")
+ return mapped_name
+ return field_name
+
+ def _generate_table_name(self, table_path: str, wsdl_name: str) -> str:
+ """테이블명 생성"""
+ # 테이블 접두사 생성
+ if self.table_prefix:
+ prefix = self.table_prefix
+ else:
+ # WSDL 파일명에서 접두사 추출
+ prefix = self._get_table_prefix_from_wsdl_name(wsdl_name)
+
+ # 테이블 경로를 테이블명으로 변환
+ table_suffix = table_path.replace('/', '_').upper()
+
+ return f"{prefix}_{table_suffix}"
+
+ def _get_table_prefix_from_wsdl_name(self, wsdl_name: str) -> str:
+ """WSDL 파일명에서 테이블 prefix 추출"""
+ # IF_XXX_EVCP_ 접두사 제거
+ prefix = wsdl_name.replace('.wsdl', '')
+ # 일반적인 접두사 패턴 제거
+ for pattern in ['IF_MDZ_EVCP_', 'IF_ECC_EVCP_', 'IF_']:
+ if prefix.startswith(pattern):
+ prefix = prefix[len(pattern):]
+ break
+ return prefix if prefix else 'COMMON'
+
+def map_wsdl_type_to_drizzle(wsdl_type: str, size: str) -> str:
+ """WSDL 타입을 Drizzle 타입으로 매핑"""
+ # 기본 길이 설정
+ default_length = 100
+ min_length = 10
+ max_length = 2000
+
+ # LCHR 타입은 text()로 처리
+ if 'LCHR' in wsdl_type.upper():
+ return "text()"
+
+ # 사이즈 처리
+ if size and size.strip():
+ try:
+ size_clean = size.strip()
+
+ # "n,m" 형태 처리 (소수점 있는 숫자 타입 또는 numeric 타입)
+ if ',' in size_clean:
+ parts = size_clean.split(',')
+ if len(parts) == 2 and parts[0].isdigit() and parts[1].isdigit():
+ total_digits = int(parts[0])
+ decimal_places = int(parts[1])
+
+ # numeric 타입 처리
+ if 'NUMERIC' in wsdl_type.upper() or 'CURR' in wsdl_type.upper() or 'NUMC' in wsdl_type.upper() or 'NUMB' in wsdl_type.upper() or 'DEC' in wsdl_type.upper():
+ # numeric 타입은 decimal 또는 varchar로 처리
+ if decimal_places > 0:
+ # 소수점이 있는 경우 decimal 타입 사용
+ return f"decimal({{ precision: {total_digits}, scale: {decimal_places} }})"
+ else:
+ # 소수점이 없는 경우 integer 또는 varchar 사용
+ if total_digits <= 10:
+ return "integer()"
+ else:
+ return f"varchar({{ length: {total_digits + 2} }})"
+ else:
+ # 기타 타입은 방어적 계산
+ safe_length = total_digits + 5
+ safe_length = max(min_length, min(safe_length, max_length))
+ return f"varchar({{ length: {safe_length} }})"
+
+ # 단순 숫자 처리
+ elif size_clean.isdigit():
+ original_length = int(size_clean)
+ safe_length = max(min_length, min(original_length, max_length))
+ return f"varchar({{ length: {safe_length} }})"
+
+ # "n.m" 형태 처리
+ elif '.' in size_clean:
+ parts = size_clean.split('.')
+ if len(parts) == 2 and parts[0].isdigit() and parts[1].isdigit():
+ total_digits = int(parts[0])
+ decimal_places = int(parts[1])
+
+ # numeric 타입 처리
+ if 'NUMERIC' in wsdl_type.upper() or 'CURR' in wsdl_type.upper() or 'NUMC' in wsdl_type.upper() or 'NUMB' in wsdl_type.upper() or 'DEC' in wsdl_type.upper():
+ if decimal_places > 0:
+ return f"decimal({{ precision: {total_digits}, scale: {decimal_places} }})"
+ else:
+ if total_digits <= 10:
+ return "integer()"
+ else:
+ return f"varchar({{ length: {total_digits + 2} }})"
+ else:
+ safe_length = total_digits + 5
+ safe_length = max(min_length, min(safe_length, max_length))
+ return f"varchar({{ length: {safe_length} }})"
+
+ # 기타 형태는 기본값 사용
+ else:
+ logger.warning(f" ⚠️ 알 수 없는 사이즈 형태: '{size_clean}' -> 기본값 {default_length} 사용")
+ return f"varchar({{ length: {default_length} }})"
+
+ except Exception as e:
+ logger.error(f" ❌ 사이즈 파싱 오류: '{size}' -> 기본값 {default_length} 사용, 오류: {e}")
+ return f"varchar({{ length: {default_length} }})"
+
+ # 사이즈가 없거나 비어있는 경우 기본값
+ return f"varchar({{ length: {default_length} }})"
+
+def generate_drizzle_schema(wsdl_tables: Dict, wsdl_file: str) -> str:
+ """Drizzle 스키마 코드 생성"""
+ wsdl_name = Path(wsdl_file).stem
+
+ schema_code = [
+ "import { integer, varchar, text, timestamp, decimal } from 'drizzle-orm/pg-core';",
+ "import { mdgSchema } from '../../../db/schema/MDG/mdg';",
+ "",
+ f"// WSDL 파일: {wsdl_name}.wsdl",
+ f"// 생성일시: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
+ "// 자동 생성된 스키마 파일 - 인터페이스 정의서가 비정형인 만큼, 스케치 용도로 사용하고, 실제 구현을 위해선 점검이 필수입니다.",
+ "",
+ ]
+
+ # 테이블 코드 생성
+ for table_name, fields in sorted(wsdl_tables.items()):
+ schema_code.append(f"// Table: {table_name}")
+ schema_code.append(f"export const {table_name} = mdgSchema.table('{table_name}', {{")
+ schema_code.append(" id: integer('id').primaryKey().generatedByDefaultAsIdentity(),")
+
+ for field_name, field_info in sorted(fields.items()):
+ drizzle_type = map_wsdl_type_to_drizzle(field_info['type'], field_info['size'])
+ mandatory = ".notNull()" if field_info['mandatory'] == 'M' else ""
+
+ comment = f" // {field_info['description']}" if field_info['description'] else ""
+ wsdl_source = f" // From: {field_info['wsdl_source']}"
+ mandatory_comment = f" // Required" if field_info['mandatory'] == 'M' else ""
+
+ schema_code.append(f" {field_name}: {drizzle_type}{mandatory},{comment}{wsdl_source}{mandatory_comment}")
+
+ schema_code.append(" ")
+ schema_code.append(" createdAt: timestamp('created_at').defaultNow().notNull(),")
+ schema_code.append(" updatedAt: timestamp('updated_at').defaultNow().notNull(),")
+ schema_code.append("});")
+ schema_code.append("")
+
+ return '\n'.join(schema_code)
+
+def print_analysis_summary(wsdl_tables: Dict, detailed: bool = False):
+ """분석 결과 요약 출력"""
+ logger.separator()
+ logger.header("분석 결과 요약")
+ logger.separator()
+
+ logger.info(f"총 테이블 수: {len(wsdl_tables)}")
+
+ total_fields = 0
+ for table_name, fields in wsdl_tables.items():
+ field_count = len(fields)
+ total_fields += field_count
+ logger.table_info(f" - {table_name}: {field_count} fields")
+
+ logger.info(f"총 필드 수: {total_fields}")
+
+ if detailed:
+ logger.separator()
+ logger.header("상세 필드 정보")
+ logger.separator()
+
+ for table_name, fields in wsdl_tables.items():
+ logger.table_info(f"\n### {table_name}")
+ for field_name, field_info in fields.items():
+ logger.field_info(f" {field_name}: {field_info['type']}({field_info['size']}) - {field_info['description']}")
+
+def main():
+ """메인 함수"""
+ parser = argparse.ArgumentParser(
+ description="WSDL 파일을 Drizzle 스키마로 변환",
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ epilog="""
+사용 예시:
+ %(prog)s --wsdl IF_ECC_EVCP_PR_INFORMATION.wsdl
+ %(prog)s --wsdl path/to/file.wsdl --output-dir ./schemas
+ %(prog)s --wsdl file.wsdl --table-prefix CUSTOM --detailed
+ """
+ )
+
+ parser.add_argument('--wsdl', required=True, help='분석할 WSDL 파일 경로')
+ parser.add_argument('--output-dir', help='출력 디렉토리 (기본값: WSDL 파일과 같은 디렉토리)')
+ parser.add_argument('--table-prefix', help='테이블 접두사 (기본값: WSDL 파일명에서 추출)')
+ parser.add_argument('--detailed', action='store_true', help='상세 분석 결과 출력')
+ parser.add_argument('--no-colors', action='store_true', help='컬러 출력 비활성화')
+
+ args = parser.parse_args()
+
+ # 컬러 설정
+ global logger
+ logger = ColorLogger(enable_colors=not args.no_colors)
+
+ try:
+ # WSDL 파일 경로 처리
+ wsdl_file = Path(args.wsdl)
+ if not wsdl_file.is_absolute():
+ wsdl_file = Path.cwd() / wsdl_file
+
+ if not wsdl_file.exists():
+ logger.error(f"WSDL 파일을 찾을 수 없습니다: {wsdl_file}")
+ return 1
+
+ # 출력 디렉토리 설정
+ if args.output_dir:
+ output_dir = Path(args.output_dir)
+ else:
+ output_dir = wsdl_file.parent
+
+ output_dir.mkdir(parents=True, exist_ok=True)
+
+ # 분석 시작
+ logger.header(f"WSDL 분석 시작: {wsdl_file.name}")
+ logger.info(f"입력 파일: {wsdl_file}")
+ logger.info(f"출력 디렉토리: {output_dir}")
+
+ # WSDL 분석
+ analyzer = WSDLAnalyzer(str(wsdl_file), args.table_prefix)
+ wsdl_tables, table_hierarchy = analyzer.analyze_wsdl()
+
+ if not wsdl_tables:
+ logger.warning("테이블이 발견되지 않았습니다.")
+ return 1
+
+ # 스키마 코드 생성
+ schema_code = generate_drizzle_schema(wsdl_tables, str(wsdl_file))
+
+ # 출력 파일 생성
+ output_file = output_dir / f"{wsdl_file.stem}.ts"
+ with open(output_file, 'w', encoding='utf-8') as f:
+ f.write(schema_code)
+
+ logger.success(f"스키마 파일이 생성되었습니다: {output_file}")
+
+ # 분석 결과 요약
+ print_analysis_summary(wsdl_tables, args.detailed)
+
+ return 0
+
+ except Exception as e:
+ logger.error(f"오류 발생: {e}")
+ return 1
+
+if __name__ == "__main__":
+ sys.exit(main()) \ No newline at end of file