summaryrefslogtreecommitdiff
path: root/public/wsdl/_util
diff options
context:
space:
mode:
authorjoonhoekim <26rote@gmail.com>2025-06-27 01:25:48 +0000
committerjoonhoekim <26rote@gmail.com>2025-06-27 01:25:48 +0000
commit15b2d4ff61d0339385edd8cc67bf7579fcc2af08 (patch)
treef0c36724855abccf705a9cdcae6fa3efd54d996d /public/wsdl/_util
parente9897d416b3e7327bbd4d4aef887eee37751ae82 (diff)
(김준회) MDG SOAP 수신 유틸리티 및 API 엔드포인트, 스키마
Diffstat (limited to 'public/wsdl/_util')
-rwxr-xr-xpublic/wsdl/_util/analyze_mdz_wsdl.py847
-rw-r--r--public/wsdl/_util/update_wsdl_with_csv.py674
2 files changed, 1521 insertions, 0 deletions
diff --git a/public/wsdl/_util/analyze_mdz_wsdl.py b/public/wsdl/_util/analyze_mdz_wsdl.py
new file mode 100755
index 00000000..216d867b
--- /dev/null
+++ b/public/wsdl/_util/analyze_mdz_wsdl.py
@@ -0,0 +1,847 @@
+#!/usr/bin/env python3
+"""
+MDZ WSDL 파일 분석 스크립트
+- WSDL 파일에서 테이블 구조 추출
+- 현재 Drizzle 스키마와 비교
+- 누락된 테이블/필드 확인
+"""
+
+import os
+import re
+import xml.etree.ElementTree as ET
+from pathlib import Path
+from typing import Dict, List, Set, Tuple
+from collections import defaultdict
+import sys
+from datetime import datetime
+
+class ColorLogger:
+ """컬러 로깅을 위한 클래스"""
+
+ # ANSI 컬러 코드
+ COLORS = {
+ 'RESET': '\033[0m',
+ 'BOLD': '\033[1m',
+ 'DIM': '\033[2m',
+
+ # 기본 컬러
+ 'BLACK': '\033[30m',
+ 'RED': '\033[31m',
+ 'GREEN': '\033[32m',
+ 'YELLOW': '\033[33m',
+ 'BLUE': '\033[34m',
+ 'MAGENTA': '\033[35m',
+ 'CYAN': '\033[36m',
+ 'WHITE': '\033[37m',
+
+ # 밝은 컬러
+ 'BRIGHT_BLACK': '\033[90m',
+ 'BRIGHT_RED': '\033[91m',
+ 'BRIGHT_GREEN': '\033[92m',
+ 'BRIGHT_YELLOW': '\033[93m',
+ 'BRIGHT_BLUE': '\033[94m',
+ 'BRIGHT_MAGENTA': '\033[95m',
+ 'BRIGHT_CYAN': '\033[96m',
+ 'BRIGHT_WHITE': '\033[97m',
+
+ # 배경 컬러
+ 'BG_RED': '\033[41m',
+ 'BG_GREEN': '\033[42m',
+ 'BG_YELLOW': '\033[43m',
+ 'BG_BLUE': '\033[44m',
+ }
+
+ def __init__(self, enable_colors: bool = True):
+ """
+ 컬러 로거 초기화
+ Args:
+ enable_colors: Windows CMD에서는 False로 설정 가능
+ """
+ self.enable_colors = enable_colors and self._supports_color()
+
+ def _supports_color(self) -> bool:
+ """컬러 지원 여부 확인"""
+ # Windows에서 colorama가 없으면 컬러 비활성화
+ if os.name == 'nt':
+ try:
+ import colorama
+ colorama.init()
+ return True
+ except ImportError:
+ return False
+ return True
+
+ def _colorize(self, text: str, color: str) -> str:
+ """텍스트에 컬러 적용"""
+ if not self.enable_colors:
+ return text
+ return f"{self.COLORS.get(color, '')}{text}{self.COLORS['RESET']}"
+
+ def header(self, text: str):
+ """헤더 로그 (굵은 파란색)"""
+ colored_text = self._colorize(text, 'BOLD')
+ colored_text = self._colorize(colored_text, 'BRIGHT_BLUE')
+ print(colored_text)
+
+ def info(self, text: str):
+ """정보 로그 (파란색)"""
+ colored_text = self._colorize(text, 'BLUE')
+ print(colored_text)
+
+ def success(self, text: str):
+ """성공 로그 (초록색)"""
+ colored_text = self._colorize(text, 'BRIGHT_GREEN')
+ print(colored_text)
+
+ def warning(self, text: str):
+ """경고 로그 (노란색)"""
+ colored_text = self._colorize(text, 'BRIGHT_YELLOW')
+ print(colored_text)
+
+ def error(self, text: str):
+ """에러 로그 (빨간색)"""
+ colored_text = self._colorize(text, 'BRIGHT_RED')
+ print(colored_text)
+
+ def debug(self, text: str):
+ """디버그 로그 (회색)"""
+ colored_text = self._colorize(text, 'BRIGHT_BLACK')
+ print(colored_text)
+
+ def table_info(self, text: str):
+ """테이블 정보 로그 (시안색)"""
+ colored_text = self._colorize(text, 'CYAN')
+ print(colored_text)
+
+ def field_info(self, text: str):
+ """필드 정보 로그 (마젠타)"""
+ colored_text = self._colorize(text, 'MAGENTA')
+ print(colored_text)
+
+ def separator(self, char: str = "=", length: int = 80):
+ """구분선 출력 (굵은 흰색)"""
+ line = char * length
+ colored_line = self._colorize(line, 'BOLD')
+ print(colored_line)
+
+# 전역 로거 인스턴스
+logger = ColorLogger()
+
+class WSDLAnalyzer:
+ def __init__(self, wsdl_directory: str):
+ self.wsdl_directory = Path(wsdl_directory)
+ self.tables = defaultdict(dict) # table_name -> {field_name: field_info}
+ self.table_hierarchy = defaultdict(list) # parent -> [children]
+ self.table_sources = defaultdict(set) # table_name -> {wsdl_file_names}
+
+ # 필드명 매핑 규칙 정의 (개별 WSDL을 존중해 테이블 분리하기로 했으므로 사용하지 않음.)
+ self.field_name_mappings = {}
+
+ # 사용법
+ # self.field_name_mappings = {
+ # 'CUSTOMER_MASTER': { # WSDL 파일명에 이 문자열이 포함되면
+ # 'ADDRNO': 'ADR_NO' # ADDRNO를 ADR_NO로 변경
+ # }
+ # }
+
+ def analyze_all_mdz_wsdls(self):
+ """MDZ가 포함된 모든 WSDL 파일 분석"""
+ wsdl_files = list(self.wsdl_directory.glob("*MDZ*.wsdl"))
+
+ logger.info(f"Found {len(wsdl_files)} MDZ WSDL files:")
+ for wsdl_file in wsdl_files:
+ logger.table_info(f" - {wsdl_file.name}")
+ logger.info("")
+
+ for wsdl_file in wsdl_files:
+ self._analyze_wsdl_file(wsdl_file)
+
+ # 테이블별 필드 합집합 처리
+ self._merge_table_fields()
+
+ return self.tables, self.table_hierarchy
+
+ def _merge_table_fields(self):
+ """테이블별 필드 합집합 처리 - 개선된 버전"""
+ merged_tables = defaultdict(dict)
+
+ for table_name, fields in self.tables.items():
+ # MATL_PLNT 테이블의 경우 디버깅 정보 출력
+ if table_name == 'MATL_PLNT':
+ logger.debug(f"\n=== MATL_PLNT 테이블 디버깅 ===")
+ logger.debug(f" 병합 전 필드 수: {len(fields)}")
+ logger.debug(f" 필드 목록:")
+ for field_key, field_info in fields.items():
+ logger.debug(f" {field_key} -> {field_info['field_name']} (from {field_info['wsdl_source']})")
+
+ # 테이블별 필드를 실제 필드명 기준으로 그룹화
+ field_groups = defaultdict(list) # actual_field_name -> [field_infos]
+
+ for field_key, field_info in fields.items():
+ # field_key에서 실제 필드명 추출 (|| 구분자 사용)
+ actual_field_name = field_key.split('||')[0] if '||' in field_key else field_key
+ field_groups[actual_field_name].append(field_info)
+
+ # MATL_PLNT 테이블의 경우 그룹화 결과 출력
+ if table_name == 'MATL_PLNT':
+ logger.debug(f" 그룹화 후 필드 수: {len(field_groups)}")
+ logger.debug(f" 그룹별 필드:")
+ for actual_field_name, field_infos in field_groups.items():
+ sources = [info['wsdl_source'] for info in field_infos]
+ logger.debug(f" {actual_field_name}: {len(field_infos)}개 소스 - {sources}")
+
+ # 각 필드 그룹을 병합
+ for actual_field_name, field_infos in field_groups.items():
+ # 첫 번째 필드 정보를 기준으로 시작
+ merged_field = field_infos[0].copy()
+
+ # 모든 WSDL 소스 수집
+ all_sources = set()
+ all_descriptions = set()
+
+ for field_info in field_infos:
+ all_sources.add(field_info['wsdl_source'])
+ if field_info['description'].strip():
+ all_descriptions.add(field_info['description'].strip())
+
+ # 필수 필드인 경우 유지
+ if field_info['mandatory'] == 'M':
+ merged_field['mandatory'] = 'M'
+
+ # 병합된 정보 설정
+ merged_field['wsdl_sources'] = all_sources
+
+ # 설명 병합 (첫 번째 설명 사용, WSDL 소스 정보는 주석에 추가)
+ if all_descriptions:
+ merged_field['description'] = list(all_descriptions)[0]
+ else:
+ merged_field['description'] = f'From multiple sources'
+
+ # 테이블에 추가 (실제 필드명 사용)
+ merged_tables[table_name][actual_field_name] = merged_field
+
+ # 병합된 테이블 정보로 업데이트
+ self.tables = merged_tables
+
+ # 테이블별 WSDL 소스 정보 출력
+ logger.info("\n테이블별 WSDL 소스 정보 (필드 중복 제거 후):")
+ for table_name, fields in self.tables.items():
+ sources = set()
+ for field_info in fields.values():
+ sources.update(field_info['wsdl_sources'])
+ logger.table_info(f"\n{table_name}:")
+ for source in sorted(sources):
+ logger.table_info(f" - {source}")
+ logger.table_info(f" 총 필드 수: {len(fields)}")
+
+ # MATL_PLNT 테이블의 경우 최종 필드 목록 출력
+ if table_name == 'MATL_PLNT':
+ logger.debug(f" 최종 필드 목록:")
+ for field_name in sorted(fields.keys()):
+ logger.debug(f" - {field_name}")
+
+ def _analyze_wsdl_file(self, wsdl_file: Path):
+ """단일 WSDL 파일 분석"""
+ logger.info(f"Analyzing {wsdl_file.name}...")
+
+ try:
+ with open(wsdl_file, 'r', encoding='utf-8') as f:
+ content = f.read()
+
+ # 우선 정규식으로 분석 시도 (주석에서 테이블 정보 추출)
+ regex_count = self._extract_tables_from_regex(content, wsdl_file.name)
+
+ # 정규식으로 찾지 못했을 때만 XML 파싱 시도
+ if regex_count == 0:
+ try:
+ # XML 네임스페이스 등록
+ namespaces = {
+ 'xsd': 'http://www.w3.org/2001/XMLSchema',
+ 'wsdl': 'http://schemas.xmlsoap.org/wsdl/'
+ }
+
+ root = ET.fromstring(content)
+ self._extract_tables_from_xml(root, wsdl_file.name, namespaces)
+ except ET.ParseError as e:
+ logger.error(f" XML parsing failed: {e}")
+ except Exception as e:
+ logger.error(f" XML analysis error: {e}")
+
+ except Exception as e:
+ logger.error(f" Error analyzing {wsdl_file.name}: {e}")
+
+ def _extract_tables_from_xml(self, root: ET.Element, wsdl_name: str, namespaces: dict):
+ """XML에서 테이블 정보 추출"""
+ # complexType 요소들에서 테이블 구조 추출
+ for complex_type in root.findall(".//xsd:complexType", namespaces):
+ table_name = complex_type.get('name')
+ if table_name:
+ self._extract_fields_from_complex_type(complex_type, table_name, wsdl_name, namespaces)
+
+ def _extract_tables_from_regex(self, content: str, wsdl_name: str) -> int:
+ """정규식으로 테이블 정보 추출"""
+
+ # Table 정보가 포함된 주석 패턴 (Description에서 --> 전까지 모든 문자 매칭)
+ table_pattern = r'<!-- SEQ:\d+, Table:([^,]+), Field:([^,]+), M/O:([^,]*), Type:([^,]+), Size:([^,]+), Description:(.*?) -->'
+
+ matches = re.findall(table_pattern, content)
+
+ # # MATL/PLNT 관련 필드 디버깅
+ # matl_plnt_matches = [match for match in matches if 'MATL/PLNT' in match[0]]
+ # if matl_plnt_matches:
+ # print(f" {wsdl_name}에서 MATL/PLNT 필드 발견: {len(matl_plnt_matches)}개")
+ # for match in matl_plnt_matches:
+ # table_path, field_name = match[0], match[1]
+ # print(f" {field_name} (Table: {table_path})")
+
+ for match in matches:
+ table_path, field_name, mandatory, field_type, size, description = match
+
+ # 필드명 매핑 적용
+ original_field_name = field_name.strip()
+ mapped_field_name = self._apply_field_name_mapping(original_field_name, wsdl_name)
+
+ # 테이블 경로에서 실제 테이블명 추출
+ # 예: "BP_HEADER/ADDRESS/AD_POSTAL" -> ["BP_HEADER", "ADDRESS", "AD_POSTAL"]
+ table_parts = table_path.split('/')
+ main_table = table_parts[0]
+
+ # 계층 구조 기록
+ if len(table_parts) > 1:
+ for i in range(len(table_parts) - 1):
+ parent = '/'.join(table_parts[:i+1])
+ child = '/'.join(table_parts[:i+2])
+ if child not in self.table_hierarchy[parent]:
+ self.table_hierarchy[parent].append(child)
+
+ # 필드 정보 저장 (매핑된 필드명 사용)
+ field_info = {
+ 'field_name': mapped_field_name, # 매핑된 필드명 사용
+ 'original_field_name': original_field_name, # 원본 필드명도 보존
+ 'mandatory': mandatory.strip(),
+ 'type': field_type.strip(),
+ 'size': size.strip(),
+ 'description': description.strip(),
+ 'table_path': table_path,
+ 'wsdl_source': wsdl_name
+ }
+
+ # 테이블별로 필드 저장 (|| 구분자 사용으로 충돌 방지, 매핑된 필드명 사용)
+ # CSV 파일명 기반 테이블 prefix 추가
+ table_prefix = self._get_table_prefix_from_wsdl_name(wsdl_name)
+ full_table_name = f"{table_prefix}_{table_path.replace('/', '_').upper()}"
+ field_key = f"{mapped_field_name}||{table_path}"
+ self.tables[full_table_name][field_key] = field_info
+
+ # # MATL_PLNT 테이블에 필드 추가 시 디버깅
+ # if 'MATL_PLNT' in full_table_name:
+ # print(f" {full_table_name}에 필드 추가: {mapped_field_name} (from {wsdl_name})")
+
+ logger.success(f" Found {len(matches)} field definitions")
+ return len(matches)
+
+ def _extract_fields_from_complex_type(self, complex_type, table_name: str, wsdl_name: str, namespaces: dict):
+ """complexType에서 필드 정보 추출"""
+ for element in complex_type.findall(".//xsd:element", namespaces):
+ field_name = element.get('name')
+ field_type = element.get('type', 'unknown')
+ min_occurs = element.get('minOccurs', '1')
+ max_occurs = element.get('maxOccurs', '1')
+
+ if field_name:
+ field_info = {
+ 'field_name': field_name,
+ 'mandatory': 'M' if min_occurs != '0' else 'O',
+ 'type': field_type,
+ 'size': 'unknown',
+ 'description': f'From {table_name}',
+ 'table_path': table_name,
+ 'wsdl_source': wsdl_name
+ }
+
+ field_key = f"{field_name}||{table_name}"
+ self.tables[table_name.upper()][field_key] = field_info
+
+ def _apply_field_name_mapping(self, field_name: str, wsdl_name: str) -> str:
+ """특정 WSDL 파일의 필드명을 매핑 규칙에 따라 변경"""
+ for wsdl_pattern, mappings in self.field_name_mappings.items():
+ if wsdl_pattern in wsdl_name.upper():
+ if field_name in mappings:
+ original_name = field_name
+ mapped_name = mappings[field_name]
+ logger.debug(f" Field mapping: {original_name} -> {mapped_name} (from {wsdl_name})")
+ return mapped_name
+ return field_name
+
+ def _get_table_prefix_from_wsdl_name(self, wsdl_name: str) -> str:
+ """WSDL 파일명에서 테이블 prefix 추출"""
+ # 단순히 IF_MDZ_EVCP_ 접두사만 제거하고 나머지 그대로 사용
+ # 예: IF_MDZ_EVCP_MATERIAL_PART_RETURN.wsdl -> MATERIAL_PART_RETURN
+ prefix = wsdl_name.replace('IF_MDZ_EVCP_', '').replace('.wsdl', '')
+ return prefix if prefix else 'COMMON'
+
+def analyze_current_drizzle_schema(schema_file: str) -> Set[str]:
+ """현재 Drizzle 스키마에서 테이블 목록 추출"""
+ try:
+ with open(schema_file, 'r', encoding='utf-8') as f:
+ content = f.read()
+
+ # export const 테이블명 패턴 찾기
+ table_pattern = r'export const (\w+) = mdgSchema\.table\('
+ matches = re.findall(table_pattern, content)
+
+ return set(matches)
+
+ except FileNotFoundError:
+ logger.error(f"Schema file not found: {schema_file}")
+ return set()
+
+def compare_wsdl_vs_schema(wsdl_tables: Dict, schema_tables: Set[str]):
+ """WSDL 테이블과 스키마 테이블 비교"""
+ logger.separator()
+ logger.header("WSDL vs Drizzle Schema 비교 결과")
+ logger.separator()
+
+ # WSDL에서 추출한 테이블명 (이미 대문자로 변환됨)
+ wsdl_table_names = set(wsdl_tables.keys())
+
+ logger.info(f"\nWSDL에서 발견된 테이블: {len(wsdl_tables)}개")
+ for table in sorted(wsdl_tables.keys()):
+ field_count = len(wsdl_tables[table])
+ logger.table_info(f" - {table} ({field_count} fields)")
+
+ logger.info(f"\nDrizzle 스키마의 테이블: {len(schema_tables)}개")
+ for table in sorted(schema_tables):
+ logger.table_info(f" - {table}")
+
+ # 테이블명 직접 비교 (대문자로 통일)
+ schema_tables_upper = {table.upper() for table in schema_tables}
+ wsdl_tables_upper = {table.upper() for table in wsdl_table_names}
+
+ # 누락된 테이블 찾기
+ missing_in_schema = wsdl_tables_upper - schema_tables_upper
+ extra_in_schema = schema_tables_upper - wsdl_tables_upper
+
+ if missing_in_schema:
+ logger.warning(f"\n⚠️ 스키마에 누락된 테이블 ({len(missing_in_schema)}개):")
+ for table in sorted(missing_in_schema):
+ logger.warning(f" - {table}")
+
+ if extra_in_schema:
+ logger.success(f"\n✅ 스키마에 추가로 정의된 테이블 ({len(extra_in_schema)}개):")
+ for table in sorted(extra_in_schema):
+ logger.success(f" - {table}")
+
+ return missing_in_schema, extra_in_schema
+
+def generate_missing_tables_schema(wsdl_tables: Dict, missing_tables: Set[str]):
+ """누락된 테이블들의 Drizzle 스키마 코드 생성"""
+ if not missing_tables:
+ return
+
+ logger.separator()
+ logger.header("누락된 테이블들의 Drizzle 스키마 코드")
+ logger.separator()
+
+ for missing_table in sorted(missing_tables):
+ # WSDL 테이블명에서 해당하는 테이블 찾기 (대문자로 직접 매칭)
+ wsdl_table_key = missing_table.upper()
+
+ if wsdl_table_key in wsdl_tables and wsdl_tables[wsdl_table_key]:
+ logger.field_info(f"\n// {wsdl_table_key}")
+ logger.field_info(f"export const {wsdl_table_key} = mdgSchema.table('{wsdl_table_key}', {{")
+ logger.field_info(" id: integer('id').primaryKey().generatedByDefaultAsIdentity(),")
+
+ for field_key, field_info in wsdl_tables[wsdl_table_key].items():
+ # field_key에서 실제 필드명 추출 (|| 구분자 사용)
+ if '||' in field_key:
+ actual_field_name = field_key.split('||')[0]
+ else:
+ actual_field_name = field_key
+
+ # 필드 타입 매핑
+ drizzle_type = map_wsdl_type_to_drizzle(field_info['type'], field_info['size'])
+ mandatory = ".notNull()" if field_info['mandatory'] == 'M' else ""
+ # NOTE: WSDL별로 개별 테이블을 만들기로 했으므로 notNull() 제약조건 복구
+
+ # 주석으로 설명 추가
+ comment = f" // {field_info['description']}" if field_info['description'] else ""
+ wsdl_source = f" // From: {field_info['wsdl_source']}"
+ mandatory_comment = f" // WSDL에서 필수 필드" if field_info['mandatory'] == 'M' else ""
+
+ logger.field_info(f" {actual_field_name}: {drizzle_type}{mandatory},{comment}{wsdl_source}{mandatory_comment}")
+
+ logger.field_info(" ")
+ logger.field_info(" createdAt: timestamp('created_at').defaultNow().notNull(),")
+ logger.field_info(" updatedAt: timestamp('updated_at').defaultNow().notNull(),")
+ logger.field_info("});")
+
+def map_wsdl_type_to_drizzle(wsdl_type: str, size: str) -> str:
+ """WSDL 타입을 Drizzle 타입으로 매핑 (모든 필드를 VARCHAR로 통일, 방어적 사이즈 계산)"""
+ # 기본 길이 설정
+ default_length = 100
+ min_length = 10 # 최소 길이
+ max_length = 2000 # 최대 길이 (PostgreSQL VARCHAR 권장 최대)
+
+ # LCHR 타입은 text()로 처리 (큰 텍스트)
+ if 'LCHR' in wsdl_type.upper():
+ return "text()"
+
+ # 사이즈 처리
+ if size and size.strip():
+ try:
+ size_clean = size.strip()
+
+ # "n,m" 형태 처리 (소수점 있는 숫자 타입)
+ if ',' in size_clean:
+ parts = size_clean.split(',')
+ if len(parts) == 2 and parts[0].isdigit() and parts[1].isdigit():
+ total_digits = int(parts[0]) # 전체 자릿수
+ decimal_places = int(parts[1]) # 소수점 이하 자릿수
+
+ # 방어적 계산: 전체 자릿수 + 부호(1) + 소수점(1) + 여유분(3) = +5
+ safe_length = total_digits + 5
+ logger.debug(f" 📏 소수점 타입 사이즈 계산: {size_clean} -> {safe_length} (원본: {total_digits}, 여유: +5)")
+
+ # 최소/최대 길이 제한
+ safe_length = max(min_length, min(safe_length, max_length))
+ return f"varchar({{ length: {safe_length} }})"
+
+ # 단순 숫자 처리
+ elif size_clean.isdigit():
+ original_length = int(size_clean)
+ # 단순 숫자는 그대로 사용 (여유분 없음)
+ safe_length = max(min_length, min(original_length, max_length))
+
+ if safe_length != original_length:
+ logger.debug(f" 📏 단순 사이즈 조정: {original_length} -> {safe_length} (min/max 제한)")
+ else:
+ logger.debug(f" 📏 단순 사이즈 사용: {safe_length}")
+
+ return f"varchar({{ length: {safe_length} }})"
+
+ # "n.m" 형태 처리 (점으로 구분된 경우도 있을 수 있음)
+ elif '.' in size_clean:
+ parts = size_clean.split('.')
+ if len(parts) == 2 and parts[0].isdigit() and parts[1].isdigit():
+ total_digits = int(parts[0])
+ decimal_places = int(parts[1])
+
+ # 방어적 계산
+ safe_length = total_digits + 5
+ logger.debug(f" 📏 소수점 타입 사이즈 계산 (점 구분): {size_clean} -> {safe_length} (원본: {total_digits}, 여유: +5)")
+
+ safe_length = max(min_length, min(safe_length, max_length))
+ return f"varchar({{ length: {safe_length} }})"
+
+ # 기타 형태는 기본값 사용
+ else:
+ logger.warning(f" ⚠️ 알 수 없는 사이즈 형태: '{size_clean}' -> 기본값 {default_length} 사용")
+ return f"varchar({{ length: {default_length} }})"
+
+ except Exception as e:
+ logger.error(f" ❌ 사이즈 파싱 오류: '{size}' -> 기본값 {default_length} 사용, 오류: {e}")
+ return f"varchar({{ length: {default_length} }})"
+
+ # 사이즈가 없거나 비어있는 경우 기본값
+ return f"varchar({{ length: {default_length} }})"
+
+def validate_schema(wsdl_tables: Dict, schema_tables: Set[str]) -> Dict[str, List[str]]:
+ """스키마 검증"""
+ validation_results = {
+ 'missing_tables': [],
+ 'missing_fields': [],
+ 'type_mismatches': [],
+ 'duplicate_fields': []
+ }
+
+ for table_name, fields in wsdl_tables.items():
+ # 테이블 존재 여부 검증
+ if table_name not in schema_tables:
+ validation_results['missing_tables'].append(table_name)
+ continue
+
+ # 필드 검증
+ field_names = set()
+ for field_key, field_info in fields.items():
+ # field_key에서 실제 필드명 추출 (|| 구분자 사용)
+ actual_field_name = field_key.split('||')[0] if '||' in field_key else field_key
+
+ # 중복 필드 검사
+ if actual_field_name in field_names:
+ validation_results['duplicate_fields'].append(f"{table_name}.{actual_field_name}")
+ field_names.add(actual_field_name)
+
+ # 누락된 필드 검증 (WSDL의 모든 필드가 스키마에 있는지 확인)
+ if actual_field_name not in existing_fields:
+ validation_results['missing_fields'].append(f"{table_name}.{actual_field_name}")
+
+ # 타입 호환성 검증
+ # ? 기존 스키마의 필드 타입과 비교
+ # ! VARCHAR로 처리하기로 했으니 타입 호환성 검사는 필요 없음
+
+ return validation_results
+
+def analyze_existing_schema(schema_file: str) -> Dict[str, Dict[str, str]]:
+ """기존 스키마 파일 분석"""
+ existing_schema = {}
+ try:
+ with open(schema_file, 'r', encoding='utf-8') as f:
+ content = f.read()
+
+ # 테이블 정의 찾기 (변경된 패턴)
+ table_pattern = r'export const (\w+) = mdgSchema\.table\([\'"](\w+)[\'"]'
+ tables = re.findall(table_pattern, content)
+
+ for table_const, table_name in tables:
+ # 테이블의 필드 정의 찾기
+ field_pattern = rf'{table_const} = mdgSchema\.table\([\'"]{table_name}[\'"].*?{{(.*?)}}'
+ table_match = re.search(field_pattern, content, re.DOTALL)
+
+ if table_match:
+ fields = {}
+ field_defs = table_match.group(1)
+
+ # 각 필드 정의 파싱 (변경된 패턴)
+ field_pattern = r'(\w+):\s*(\w+)\([\'"](\w+)[\'"]'
+ field_matches = re.findall(field_pattern, field_defs)
+
+ for field_name, field_type, field_db_name in field_matches:
+ fields[field_name] = {
+ 'type': field_type,
+ 'db_name': field_db_name
+ }
+
+ existing_schema[table_name] = fields
+
+ except Exception as e:
+ logger.error(f"스키마 파일 분석 중 오류 발생: {e}")
+
+ return existing_schema
+
+def compare_field_types(wsdl_type: str, existing_type: str) -> bool:
+ """필드 타입 호환성 검사"""
+ type_mapping = {
+ 'varchar': ['CHAR', 'VARC', 'LCHR'],
+ 'integer': ['NUMB', 'NUMC'],
+ 'decimal': ['CURR'],
+ 'date': ['DATS'],
+ 'time': ['TIMS'],
+ 'text': ['LCHR']
+ }
+
+ wsdl_type = wsdl_type.upper()
+ existing_type = existing_type.lower()
+
+ # 타입 매핑 확인
+ for drizzle_type, wsdl_types in type_mapping.items():
+ if existing_type == drizzle_type:
+ return any(t in wsdl_type for t in wsdl_types)
+
+ return False
+
+def validate_schema(wsdl_tables: Dict, schema_tables: Set[str], existing_schema: Dict[str, Dict[str, str]]) -> Dict[str, List[str]]:
+ """스키마 검증 (개선된 버전)"""
+ validation_results = {
+ 'missing_tables': [],
+ 'missing_fields': [],
+ 'type_mismatches': [],
+ 'duplicate_fields': []
+ }
+
+ for table_name, fields in wsdl_tables.items():
+ # 테이블 존재 여부 검증
+ if table_name not in schema_tables:
+ validation_results['missing_tables'].append(table_name)
+ continue
+
+ # 기존 테이블의 필드 정보 가져오기
+ existing_fields = existing_schema.get(table_name, {})
+
+ # 필드 검증
+ field_names = set()
+ for field_key, field_info in fields.items():
+ # field_key에서 실제 필드명 추출 (|| 구분자 사용)
+ actual_field_name = field_key.split('||')[0] if '||' in field_key else field_key
+
+ # 중복 필드 검사
+ if actual_field_name in field_names:
+ validation_results['duplicate_fields'].append(f"{table_name}.{actual_field_name}")
+ field_names.add(actual_field_name)
+
+ # 누락된 필드 검증 (WSDL의 모든 필드가 스키마에 있는지 확인)
+ # Note: existing_fields는 기존 validate_schema에서는 정의되지 않았으므로 스킵
+
+ # 타입 호환성 검증
+ if actual_field_name in existing_fields:
+ existing_type = existing_fields[actual_field_name]['type']
+ if not compare_field_types(field_info['type'], existing_type):
+ validation_results['type_mismatches'].append(
+ f"{table_name}.{actual_field_name}: WSDL={field_info['type']}, Existing={existing_type}"
+ )
+
+ return validation_results
+
+def generate_schema_code(wsdl_tables: Dict, validation_results: Dict[str, List[str]], existing_schema: Dict[str, Dict[str, str]]) -> str:
+ """스키마 코드 생성 (개선된 버전)"""
+ schema_code = []
+
+ # 누락된 테이블 생성
+ for table_name in validation_results['missing_tables']:
+ table_code = generate_table_code(wsdl_tables[table_name], table_name)
+ schema_code.append(table_code)
+
+ # 누락된 필드 추가
+ for field_info in validation_results['missing_fields']:
+ table_name, field_name = field_info.split('.')
+ if table_name in existing_schema:
+ field_code = generate_field_code(wsdl_tables[table_name][field_name])
+ # 기존 테이블에 필드 추가하는 코드 생성
+ table_code = f"// {table_name}에 추가할 필드:\n{field_code}"
+ schema_code.append(table_code)
+
+ return '\n\n'.join(schema_code)
+
+def generate_table_code(fields: Dict, table_name: str) -> str:
+ """테이블 코드 생성"""
+ code = [
+ f"export const {table_name} = mdgSchema.table('{table_name}', {{",
+ " id: integer('id').primaryKey().generatedByDefaultAsIdentity(),"
+ ]
+
+ # fields에서 실제 필드명과 필드 정보 가져오기
+ # _merge_table_fields에서 이미 actual_field_name을 키로 사용하므로 그대로 사용
+ for actual_field_name, field_info in sorted(fields.items()):
+ # 필드 코드 생성 (actual_field_name 사용)
+ field_code = generate_field_code(field_info)
+ code.append(f" {field_code}")
+
+ code.extend([
+ " ",
+ " createdAt: timestamp('created_at').defaultNow().notNull(),",
+ " updatedAt: timestamp('updated_at').defaultNow().notNull(),",
+ "});"
+ ])
+
+ return '\n'.join(code)
+
+def generate_field_code(field_info: Dict) -> str:
+ """필드 코드 생성"""
+ drizzle_type = map_wsdl_type_to_drizzle(field_info['type'], field_info['size'])
+ mandatory = ".notNull()" if field_info['mandatory'] == 'M' else ""
+ # NOTE: WSDL별로 개별 테이블을 만들기로 했으므로 notNull() 제약조건 복구
+
+ comment = f" // {field_info['description']}" if field_info['description'] else ""
+
+ # 여러 WSDL 소스 정보 추가
+ if 'wsdl_sources' in field_info and len(field_info['wsdl_sources']) > 1:
+ sources_comment = f" // From: {', '.join(sorted(field_info['wsdl_sources']))}"
+ else:
+ wsdl_source = field_info.get('wsdl_source', list(field_info.get('wsdl_sources', ['Unknown']))[0])
+ sources_comment = f" // From: {wsdl_source}"
+
+ # 필수 필드 정보는 주석으로만 표시
+ mandatory_comment = f" // WSDL에서 필수 필드" if field_info['mandatory'] == 'M' else ""
+
+ # 필드명 매핑이 적용된 경우 원본 필드명 표시
+ mapping_comment = ""
+ if 'original_field_name' in field_info and field_info['original_field_name'] != field_info['field_name']:
+ mapping_comment = f" // Original: {field_info['original_field_name']}"
+
+ return f"{field_info['field_name']}: {drizzle_type}{mandatory},{comment}{sources_comment}{mandatory_comment}{mapping_comment}"
+
+def generate_complete_schema(wsdl_tables: Dict) -> str:
+ """완전한 스키마 코드 생성"""
+ schema_code = [
+ "import { integer, varchar, text, timestamp } from 'drizzle-orm/pg-core';",
+ "import { mdgSchema } from '../../../db/schema/MDG/mdg';",
+ "",
+ "// WSDL 기반 자동 생성된 스키마",
+ "// 생성일시: " + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " (UTC로 9시간 빼야 한국 시간)",
+ "// 개선사항:",
+ "// 1. WSDL별로 테이블 만들었음. 인터페이스 정의서에 문제가 많아서 어쩔 수 없었음.",
+ "// 2. 타입은 varchar를 사용하도록 했음. 숫자관련된 건 부호, 소수점 대비 방어적으로 처리함 (사이즈)",
+ "// 3. 테이블명에서 '/' 문자를 '_'로 변경하여 PostgreSQL/TypeScript 호환성 확보함",
+ "",
+ ]
+
+ # 테이블 코드 생성
+ for table_name, fields in sorted(wsdl_tables.items()):
+ table_code = generate_table_code(fields, table_name)
+ schema_code.append(table_code)
+ schema_code.append("") # 빈 줄 추가
+
+ return '\n'.join(schema_code)
+
+def main():
+ # 현재 스크립트 위치에서 프로젝트 루트 찾기
+ script_dir = Path(__file__).parent
+ project_root = script_dir.parent.parent.parent # public/wsdl/_util -> project_root
+
+ wsdl_dir = script_dir.parent # public/wsdl
+ schema_file = project_root / "db" / "schema" / "MDG" / "mdg.ts"
+
+ logger.header("MDZ WSDL 분석 시작...")
+ logger.info(f"WSDL 디렉토리: {wsdl_dir}")
+ logger.info(f"스키마 파일: {schema_file}")
+ logger.info("")
+
+ # WSDL 분석
+ analyzer = WSDLAnalyzer(wsdl_dir)
+ wsdl_tables, table_hierarchy = analyzer.analyze_all_mdz_wsdls()
+
+ # 현재 스키마 분석
+ schema_tables = analyze_current_drizzle_schema(schema_file)
+
+ # 기존 스키마 분석
+ existing_schema = analyze_existing_schema(schema_file)
+
+ # 비교 결과 출력
+ missing_tables, extra_tables = compare_wsdl_vs_schema(wsdl_tables, schema_tables)
+
+ # 누락된 테이블 스키마 생성
+ generate_missing_tables_schema(wsdl_tables, missing_tables)
+
+ # 스키마 검증
+ validation_results = validate_schema(wsdl_tables, schema_tables, existing_schema)
+
+ # 검증 결과 출력
+ logger.separator()
+ logger.header("스키마 검증 결과")
+ logger.separator()
+
+ for category, items in validation_results.items():
+ if items:
+ logger.warning(f"\n{category}:")
+ for item in items:
+ logger.warning(f" - {item}")
+
+ # 완전한 스키마 코드 생성
+ complete_schema = generate_complete_schema(wsdl_tables)
+
+ # 스키마 파일 저장
+ output_file = script_dir / "generated_schema.ts"
+ with open(output_file, 'w', encoding='utf-8') as f:
+ f.write(complete_schema)
+
+ logger.success(f"\n생성된 스키마가 {output_file}에 저장되었습니다.")
+
+ # 상세 필드 정보 출력 (옵션)
+ if len(sys.argv) > 1 and sys.argv[1] == "--detailed":
+ logger.separator()
+ logger.header("상세 필드 정보")
+ logger.separator()
+
+ for table_name, fields in wsdl_tables.items():
+ logger.table_info(f"\n### {table_name}")
+ for field_name, field_info in fields.items():
+ logger.field_info(f" {field_name}: {field_info['type']}({field_info['size']}) - {field_info['description']}")
+
+ logger.success("\n분석 완료!")
+ logger.info(f"- 총 WSDL 테이블: {len(wsdl_tables)}개")
+ logger.info(f"- 현재 스키마 테이블: {len(schema_tables)}개")
+ logger.warning(f"- 누락 테이블: {len(missing_tables)}개") if missing_tables else logger.success(f"- 누락 테이블: {len(missing_tables)}개")
+ logger.info(f"- 추가 테이블: {len(extra_tables)}개")
+
+if __name__ == "__main__":
+ main() \ No newline at end of file
diff --git a/public/wsdl/_util/update_wsdl_with_csv.py b/public/wsdl/_util/update_wsdl_with_csv.py
new file mode 100644
index 00000000..91a9d4dc
--- /dev/null
+++ b/public/wsdl/_util/update_wsdl_with_csv.py
@@ -0,0 +1,674 @@
+#!/usr/bin/env python3
+import csv
+import re
+import shutil
+import os
+from datetime import datetime
+
+# 컬러 로그를 위한 색상 코드 추가
+class Colors:
+ RED = '\033[91m'
+ GREEN = '\033[92m'
+ YELLOW = '\033[93m'
+ BLUE = '\033[94m'
+ MAGENTA = '\033[95m'
+ CYAN = '\033[96m'
+ WHITE = '\033[97m'
+ ENDC = '\033[0m' # End color
+ BOLD = '\033[1m'
+
+def print_color(message, color=Colors.WHITE):
+ """컬러 출력 함수"""
+ print(f"{color}{message}{Colors.ENDC}")
+
+def print_error(message):
+ """에러 메시지 출력"""
+ print_color(f"❌ ERROR: {message}", Colors.RED)
+
+def print_warning(message):
+ """경고 메시지 출력"""
+ print_color(f"⚠️ WARNING: {message}", Colors.YELLOW)
+
+def print_success(message):
+ """성공 메시지 출력"""
+ print_color(f"✅ SUCCESS: {message}", Colors.GREEN)
+
+def print_info(message):
+ """정보 메시지 출력"""
+ print_color(f"ℹ️ INFO: {message}", Colors.CYAN)
+
+"""
+실제 CSV 파일들
+IF_MDZ_EVCP_CUSTOMER_MASTER.csv IF_MDZ_EVCP_EMPLOYEE_REFERENCE.csv IF_MDZ_EVCP_MATERIAL_MASTER_PART_RETURN.csv IF_MDZ_EVCP_PROJECT_MASTER.csv
+IF_MDZ_EVCP_DEPARTMENT_CODE.csv IF_MDZ_EVCP_EQUP_MASTER.csv IF_MDZ_EVCP_MODEL_MASTER.csv IF_MDZ_EVCP_VENDOR_MASTER.csv
+IF_MDZ_EVCP_EMPLOYEE_MASTER.csv IF_MDZ_EVCP_MATERIAL_MASTER_PART.csv IF_MDZ_EVCP_ORGANIZATION_MASTER.csv
+"""
+
+# ===== 설정 =====
+CSV_DIR = './public/wsdl/_csv'
+WSDL_DIR = './public/wsdl'
+
+# 발견된 SAP 타입들을 수집하기 위한 전역 SET
+discovered_sap_types = set()
+type_size_combinations = set() # 타입-사이즈 조합도 수집
+
+# 필드명 매핑 테이블 (CSV -> WSDL)
+FIELD_MAPPING = {
+ # 개별 WSDL 별 테이블 만들기로 했으므로 사용하지 않고 WSDL 그대로 사용
+ # 'ADR_NO': 'ADDRNO',
+ # 필요한 경우 더 추가
+}
+
+# 테이블 매핑 테이블 (complexType -> CSV Table)
+TABLE_MAPPING = {
+ # 'MATL': 'MATL',
+ # 'UNIT': 'MATL/UNIT',
+ # 필요한 경우 더 추가
+}
+
+def normalize_sap_type_and_size(sap_type, size_str):
+ """SAP 타입과 사이즈를 정규화"""
+ global discovered_sap_types, type_size_combinations
+
+ try:
+ # 타입을 대문자로 변환
+ normalized_type = sap_type.upper().strip() if sap_type else 'CHAR'
+
+ # 사이즈 처리
+ normalized_size = size_str.strip() if size_str else ''
+ original_size = normalized_size # 원본 사이즈 보존 (로깅용)
+
+ # 빈 사이즈인 경우 기본값 설정
+ if not normalized_size:
+ normalized_size = '255'
+ else:
+ # 따옴표로 감싸진 경우 제거 (예: "1,0")
+ quote_removed = False
+ if normalized_size.startswith('"') and normalized_size.endswith('"'):
+ before_quote_removal = normalized_size
+ normalized_size = normalized_size[1:-1]
+ quote_removed = True
+ print_color(f"🔍 SIZE 파싱: 따옴표 제거 - '{before_quote_removal}' -> '{normalized_size}' (Type: {normalized_type})", Colors.YELLOW)
+
+ # 로깅: 최종 결과 (따옴표가 없는 경우만)
+ if not quote_removed and original_size:
+ print_color(f"🔍 SIZE 파싱: 따옴표 없음 - '{original_size}' 그대로 사용 (Type: {normalized_type})", Colors.BLUE)
+
+ # 발견된 타입들을 SET에 추가
+ discovered_sap_types.add(normalized_type)
+ type_size_combinations.add(f"{normalized_type}({normalized_size})")
+
+ # 컬럼 구분자나 특수문자가 있는 경우 그대로 유지
+ # DEC, QUAN, NUMB 등에서 "1,0" 형태의 사이즈는 정상
+
+ return normalized_type, normalized_size
+
+ except Exception as e:
+ print_error(f"타입/사이즈 정규화 실패 - Type: {sap_type}, Size: {size_str}, Error: {str(e)}")
+ return 'CHAR', '255' # 기본값 반환
+
+def safe_description_escape(description):
+ """Description 필드의 특수문자를 안전하게 처리"""
+ try:
+ if not description:
+ return ''
+
+ # HTML/XML 특수문자 이스케이프
+ description = description.replace('&', '&amp;')
+ description = description.replace('<', '&lt;')
+ description = description.replace('>', '&gt;')
+ description = description.replace('"', '&quot;')
+ description = description.replace("'", '&apos;')
+
+ return description
+
+ except Exception as e:
+ print_error(f"Description 이스케이프 실패: {description}, Error: {str(e)}")
+ return str(description) if description else ''
+
+def get_csv_files():
+ """CSV 디렉토리에서 모든 CSV 파일 목록을 가져옴"""
+ csv_files = []
+ for file in os.listdir(CSV_DIR):
+ if file.endswith('.csv'):
+ csv_files.append(file.replace('.csv', ''))
+ return csv_files
+
+def get_complex_type_info(wsdl_content):
+ """WSDL 파일에서 complexType 정보를 추출"""
+ complex_types = {}
+ current_type = None
+ current_fields = []
+ type_stack = [] # 중첩된 complexType을 추적하기 위한 스택
+
+ for line in wsdl_content:
+ # complexType 시작 태그 찾기
+ type_match = re.search(r'<xsd:complexType\s+name="([^"]+)"', line)
+ if type_match:
+ if current_type:
+ type_stack.append(current_type)
+ current_type = type_match.group(1)
+ current_fields = []
+ continue
+
+ # complexType 종료 태그 찾기
+ if '</xsd:complexType>' in line:
+ if current_type:
+ complex_types[current_type] = current_fields
+ if type_stack:
+ current_type = type_stack.pop()
+ else:
+ current_type = None
+ continue
+
+ # element 태그 찾기
+ element_match = re.search(r'<xsd:element\s+name="([^"]+)"', line)
+ if element_match and current_type:
+ field_info = {
+ 'name': element_match.group(1),
+ 'type': re.search(r'type="([^"]+)"', line).group(1) if 'type="' in line else None,
+ 'is_array': 'maxOccurs="unbounded"' in line
+ }
+ current_fields.append(field_info)
+
+ return complex_types
+
+def get_table_for_complex_type(table_name, complex_type):
+ """테이블 이름에서 complexType에 해당하는 부분 추출"""
+ # 테이블 이름이 '/'로 구분되어 있다면 마지막 부분을 반환
+ if '/' in table_name:
+ return table_name.split('/')[-1].upper()
+ return table_name.upper()
+
+def load_csv_data(csv_file):
+ """CSV 파일에서 필드 정보를 딕셔너리로 로드"""
+ csv_data = {}
+ csv_path = os.path.join(CSV_DIR, f'{csv_file}.csv')
+
+ try:
+ with open(csv_path, 'r', encoding='utf-8-sig') as f: # BOM 처리
+ reader = csv.DictReader(f)
+ for row_num, row in enumerate(reader, start=2): # 헤더 다음부터 2행
+ try:
+ field_name = row['Field']
+ table_name = row['Table']
+
+ # 매핑된 필드명이 있으면 사용, 없으면 원래 필드명 사용
+ wsdl_field_name = FIELD_MAPPING.get(field_name, field_name)
+
+ # 테이블 정보를 키에 포함 (구분자를 || 로 변경)
+ key = f"{wsdl_field_name}||{table_name}"
+
+ # 타입과 사이즈 정규화
+ normalized_type, normalized_size = normalize_sap_type_and_size(
+ row.get('Type', ''), row.get('Size', '')
+ )
+
+ # Description 안전 처리
+ safe_desc = safe_description_escape(row.get('Description', ''))
+
+ csv_data[key] = {
+ 'seq': row.get('SEQ', ''),
+ 'table': table_name,
+ 'field': row.get('Field', ''), # 원래 CSV 필드명 저장
+ 'mo': row.get('M/O', ''),
+ 'type': normalized_type, # 정규화된 타입
+ 'size': normalized_size, # 정규화된 사이즈
+ 'description': safe_desc, # 안전 처리된 Description
+ 'original_type': row.get('Type', ''), # 원본 타입 보존
+ 'original_size': row.get('Size', '') # 원본 사이즈 보존
+ }
+
+ except Exception as e:
+ print_error(f"CSV 행 {row_num} 처리 실패 - {csv_file}: {str(e)}")
+ print_error(f"문제 행 데이터: {row}")
+ continue
+
+ except Exception as e:
+ print_error(f"CSV 파일 로딩 실패 - {csv_path}: {str(e)}")
+ return {}
+
+ return csv_data
+
+def extract_field_name_from_line(line):
+ """라인에서 name="필드명" 추출"""
+ match = re.search(r'name="([^"]+)"', line)
+ return match.group(1) if match else None
+
+def extract_field_from_comment(comment_line):
+ """주석에서 Field: 부분의 필드명 추출"""
+ match = re.search(r'Field:([^,]+)', comment_line)
+ return match.group(1).strip() if match else None
+
+def has_seq_in_comment(comment_line):
+ """주석에 SEQ가 있는지 확인"""
+ return 'SEQ:' in comment_line
+
+def get_indentation(line):
+ """라인의 들여쓰기 반환"""
+ return len(line) - len(line.lstrip())
+
+def create_comment(field_name, csv_data, indentation, complex_type):
+ """CSV 데이터를 기반으로 주석 생성"""
+ try:
+ # 필드명으로 시작하는 키들을 찾음 (대소문자 구분 없이)
+ matching_keys = [key for key in csv_data.keys() if key.split('||')[0].upper() == field_name.upper()]
+ if not matching_keys:
+ indent = ' ' * indentation
+ print_warning(f"매칭되지 않은 필드: {field_name}")
+ return f"{indent}<!-- TODO: UNMATCHED FIELD OCCURS - {field_name} -->"
+
+ # complexType과 일치하는 테이블 정보 찾기
+ matching_data = None
+
+ # 1. complexType 이름과 완전히 일치하는 테이블 찾기
+ for key in matching_keys:
+ table_name = key.split('||', 1)[1]
+ if complex_type.upper() == table_name.upper():
+ matching_data = csv_data[key]
+ break
+
+ # 2. CSV 테이블명을 '/'로 스플릿한 마지막 부분이 complexType과 일치하는 경우
+ if not matching_data:
+ for key in matching_keys:
+ table_name = key.split('||', 1)[1]
+ if '/' in table_name:
+ last_part = table_name.split('/')[-1]
+ if complex_type.upper() == last_part.upper():
+ matching_data = csv_data[key]
+ break
+
+ # 3. 필드명만 일치하는 경우 (첫 번째 매칭 데이터 사용)
+ if not matching_data:
+ matching_data = csv_data[matching_keys[0]]
+
+ # 4. 매칭된 데이터가 있으면 주석 생성, 없으면 매칭 실패 주석
+ if matching_data:
+ indent = ' ' * indentation
+
+ # CSV의 실제 타입과 사이즈 사용
+ comment = f"{indent}<!-- SEQ:{matching_data['seq']}, Table:{matching_data['table']}, Field:{matching_data['field']}, M/O:{matching_data['mo']}, Type:{matching_data['type']}, Size:{matching_data['size']}, Description:{matching_data['description']} -->"
+
+ print_info(f"주석 생성 완료: {field_name} -> Type:{matching_data['type']}, Size:{matching_data['size']}")
+ return comment
+ else:
+ indent = ' ' * indentation
+ print_warning(f"매칭 데이터를 찾을 수 없음: {field_name}")
+ return f"{indent}<!-- TODO: NO MATCHING DATA FOUND - {field_name} -->"
+
+ except Exception as e:
+ indent = ' ' * indentation
+ print_error(f"주석 생성 실패 - 필드: {field_name}, 에러: {str(e)}")
+ return f"{indent}<!-- ERROR: COMMENT GENERATION FAILED - {field_name} -->"
+
+def normalize_comment(comment_line):
+ """주석을 정규화 (공백 제거, 소문자 변환 등)"""
+ # <!-- 와 --> 제거하고 내용만 추출
+ content = re.sub(r'^\s*<!--\s*|\s*-->\s*$', '', comment_line.strip())
+ # 여러 공백을 하나로 통합
+ content = re.sub(r'\s+', ' ', content)
+ return content.strip()
+
+def comments_are_equal(existing_comment, expected_comment):
+ """두 주석이 같은 내용인지 비교"""
+ existing_normalized = normalize_comment(existing_comment)
+ expected_normalized = normalize_comment(expected_comment)
+ return existing_normalized == expected_normalized
+
+def should_process_line(line, csv_data):
+ """라인이 처리 대상인지 확인"""
+ # 네 조건을 모두 만족해야 함:
+ # 1. <xsd:element 태그
+ # 2. name=" 속성이 있는 태그
+ # 3. maxOccurs=" 속성이 없는 태그 (배열 데이터 제외)
+ # 4. CSV에 해당 필드가 있는 경우
+
+ if not ('<xsd:element' in line and 'name="' in line):
+ return False
+
+ # maxOccurs=" 가 있으면 배열 데이터이므로 제외 (모든 maxOccurs 속성)
+ if 'maxOccurs="' in line:
+ return False
+
+ field_name = extract_field_name_from_line(line)
+ if not field_name:
+ return False
+
+ # 필드명이 CSV 데이터의 키에 정확히 일치하는지 확인 (대소문자 구분 없이)
+ return any(field_name.upper() == key.split('||')[0].upper() for key in csv_data.keys())
+
+def get_skip_reason(line, csv_data):
+ """필드를 건너뛰는 이유를 반환"""
+ if not ('<xsd:element' in line and 'name="' in line):
+ return None
+
+ field_name = extract_field_name_from_line(line)
+ if not field_name:
+ return None
+
+ # maxOccurs 체크 (배열 타입)
+ if 'maxOccurs="' in line:
+ return "ARRAY_TYPE"
+
+ # 복합객체인 경우
+ if 'MASTER' in field_name:
+ return "COMPLEX_TYPE"
+
+ # CSV에 있는지 체크
+ has_csv_data = any(field_name.upper() == key.split('||')[0].upper() for key in csv_data.keys())
+ if not has_csv_data:
+ # Req로 끝나는 경우는 래퍼 타입이므로 정상
+ if field_name.endswith('Req'):
+ return "REQ_WRAPPER_TYPE"
+ else:
+ return "NO_CSV_DATA"
+
+ return None
+
+def get_table_prefix_from_csv_name(csv_name: str) -> str:
+ """CSV 파일명에서 테이블 prefix 추출"""
+ csv_upper = csv_name.upper()
+
+ # CSV 파일명 패턴에서 마스터 타입 추출
+ if 'CUSTOMER_MASTER' in csv_upper:
+ return 'CUSTOMER'
+ elif 'VENDOR_MASTER' in csv_upper:
+ return 'VENDOR'
+ elif 'EMPLOYEE_MASTER' in csv_upper:
+ return 'EMPLOYEE'
+ elif 'PROJECT_MASTER' in csv_upper:
+ return 'PROJECT'
+ elif 'DEPARTMENT_CODE' in csv_upper:
+ return 'DEPARTMENT'
+ elif 'ORGANIZATION_MASTER' in csv_upper:
+ return 'ORGANIZATION'
+ elif 'EQUP_MASTER' in csv_upper:
+ return 'EQUP'
+ elif 'MODEL_MASTER' in csv_upper:
+ return 'MODEL'
+ elif 'MATERIAL_MASTER' in csv_upper:
+ return 'MATERIAL'
+ elif 'EMPLOYEE_REFERENCE' in csv_upper:
+ return 'EMPLOYEE_REF'
+ else:
+ # 기본적으로 MDZ 부분 제거 후 첫 번째 단어 사용
+ parts = csv_name.replace('IF_MDZ_EVCP_', '').split('_')
+ return parts[0] if parts else 'COMMON'
+
+def backup_file(filepath):
+ """파일을 백업"""
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+ backup_path = f"{filepath}.backup_{timestamp}"
+ shutil.copy2(filepath, backup_path)
+ print(f"백업 파일 생성: {backup_path}")
+ return backup_path
+
+def process_wsdl_file(target):
+ """WSDL 파일 처리"""
+ csv_file_path = os.path.join(CSV_DIR, f'{target}.csv')
+ wsdl_file_path = os.path.join(WSDL_DIR, f'{target}.wsdl')
+
+ try:
+ # 백업 생성
+ backup_path = backup_file(wsdl_file_path)
+
+ print_color(f"\n🚀 처리 시작: {target}", Colors.BOLD)
+ print_info("CSV 데이터 로딩 중...")
+ csv_data = load_csv_data(target)
+ print_success(f"CSV에서 {len(csv_data)}개 필드 정보 로드됨")
+
+ # WSDL 파일 읽기
+ with open(wsdl_file_path, 'r', encoding='utf-8') as f:
+ lines = f.readlines()
+
+ # complexType 정보 추출
+ complex_types = get_complex_type_info(lines)
+ print_success(f"WSDL에서 {len(complex_types)}개 complexType 정보 추출됨")
+ except Exception as e:
+ print_error(f"파일 초기화 실패 - {target}: {str(e)}")
+ return
+
+ # complexType 구조 출력 (디버깅용)
+ for type_name, fields in complex_types.items():
+ print_color(f"\nComplexType: {type_name}", Colors.MAGENTA)
+ for field in fields:
+ print(f" - {field['name']} ({field['type']}) {'[Array]' if field['is_array'] else ''}")
+
+ new_lines = []
+ i = 0
+ changes_made = 0
+ processed_fields = []
+ skipped_fields = []
+ skipped_array_fields = []
+ skipped_no_csv_fields = []
+ skipped_req_wrapper_fields = []
+ verified_correct = 0
+ corrected_seq = 0
+ error_count = 0
+
+ current_complex_type = None
+ type_stack = [] # 중첩된 complexType을 추적하기 위한 스택
+
+ while i < len(lines):
+ line = lines[i]
+ line_processed = False
+
+ try:
+ # complexType 시작 태그 확인
+ type_match = re.search(r'<xsd:complexType\s+name="([^"]+)"', line)
+ if type_match:
+ if current_complex_type:
+ type_stack.append(current_complex_type)
+ current_complex_type = type_match.group(1)
+ print_color(f"현재 complexType: {current_complex_type}", Colors.BLUE)
+
+ # complexType 종료 태그 확인
+ if '</xsd:complexType>' in line:
+ if type_stack:
+ current_complex_type = type_stack.pop()
+ print_color(f"이전 complexType으로 복귀: {current_complex_type}", Colors.BLUE)
+ else:
+ current_complex_type = None
+
+ # CSV에 있는 xsd:element 필드인지 확인
+ if should_process_line(line, csv_data):
+ field_name = extract_field_name_from_line(line)
+
+ if field_name and current_complex_type:
+ processed_fields.append(field_name)
+ print_color(f"처리 중인 필드: {field_name} (complexType: {current_complex_type})", Colors.CYAN)
+
+ # 바로 위 라인이 주석인지 확인 (공백 라인 건너뛰면서)
+ comment_line_index = -1
+ j = len(new_lines) - 1
+
+ while j >= 0:
+ prev_line = new_lines[j].strip()
+ if prev_line == '':
+ j -= 1
+ continue
+ elif prev_line.startswith('<!--') and prev_line.endswith('-->'):
+ comment_line_index = j
+ break
+ else:
+ break
+
+ if comment_line_index >= 0:
+ existing_comment = new_lines[comment_line_index]
+
+ if has_seq_in_comment(existing_comment):
+ indentation = get_indentation(line)
+ expected_comment = create_comment(field_name, csv_data, indentation, current_complex_type)
+
+ if expected_comment:
+ if comments_are_equal(existing_comment, expected_comment):
+ verified_correct += 1
+ print_success(f" 주석 검증 통과")
+ else:
+ new_lines[comment_line_index] = expected_comment + '\n'
+ changes_made += 1
+ corrected_seq += 1
+ print_warning(f" SEQ 주석 수정: {field_name}")
+ print(f" 기존: {existing_comment.strip()}")
+ print(f" 수정: {expected_comment}")
+ else:
+ indentation = get_indentation(line)
+ new_comment = create_comment(field_name, csv_data, indentation, current_complex_type)
+ if new_comment:
+ new_lines[comment_line_index] = new_comment + '\n'
+ changes_made += 1
+ print_warning(f" 주석 교체: {field_name}")
+ else:
+ indentation = get_indentation(line)
+ new_comment = create_comment(field_name, csv_data, indentation, current_complex_type)
+ if new_comment:
+ new_lines.append(new_comment + '\n')
+ changes_made += 1
+ print_info(f" 주석 추가: {field_name}")
+
+ line_processed = True
+ elif '<xsd:element' in line and 'name="' in line:
+ field_name = extract_field_name_from_line(line)
+ if field_name:
+ skip_reason = get_skip_reason(line, csv_data)
+ if skip_reason == "ARRAY_TYPE":
+ skipped_array_fields.append(field_name)
+ skipped_fields.append(field_name)
+ print_color(f"건너뛴 필드: {field_name} (배열 타입 - maxOccurs 속성)", Colors.YELLOW)
+ elif skip_reason == "REQ_WRAPPER_TYPE":
+ skipped_req_wrapper_fields.append(field_name)
+ skipped_fields.append(field_name)
+ print_color(f"건너뛴 필드: {field_name} (요청 래퍼 타입 - 정상)", Colors.BLUE)
+ elif skip_reason == "NO_CSV_DATA":
+ skipped_no_csv_fields.append(field_name)
+ skipped_fields.append(field_name)
+ print_error(f"건너뛴 필드: {field_name} (CSV에 데이터 없음 - 확인 필요!)")
+ else:
+ # 기타 이유로 건너뛴 경우
+ skipped_fields.append(field_name)
+ print_warning(f"건너뛴 필드: {field_name} (기타 이유)")
+
+ except Exception as e:
+ print_error(f"라인 처리 중 오류 발생 (라인 {i+1}): {str(e)}")
+ print_error(f"문제 라인: {line.strip()}")
+ error_count += 1
+
+ new_lines.append(line)
+ i += 1
+
+ # 결과 저장
+ try:
+ with open(wsdl_file_path, 'w', encoding='utf-8') as f:
+ f.writelines(new_lines)
+ print_success("WSDL 파일 저장 완료")
+ except Exception as e:
+ print_error(f"WSDL 파일 저장 실패: {str(e)}")
+ return
+
+ # 결과 출력
+ print_color(f"\n{'='*50}", Colors.BOLD)
+ print_color(f"처리 완료: {target}", Colors.BOLD)
+ print_color(f"{'='*50}", Colors.BOLD)
+
+ print_info(f"CSV 파일: {csv_file_path}")
+ print_info(f"WSDL 파일: {wsdl_file_path}")
+ print_info(f"백업 파일: {backup_path}")
+
+ print_color(f"\n📊 처리 통계:", Colors.MAGENTA)
+ print(f" 총 변경사항: {changes_made}개")
+ print(f" 처리된 CSV 필드 수: {len(processed_fields)}")
+ print(f" 건너뛴 필드 총계: {len(skipped_fields)}")
+ print(f" ├─ 배열 타입 (정상): {len(skipped_array_fields)}개")
+ print(f" ├─ 요청 래퍼 타입 (정상): {len(skipped_req_wrapper_fields)}개")
+ print_color(f" └─ CSV 누락 (문제): {len(skipped_no_csv_fields)}개", Colors.RED if len(skipped_no_csv_fields) > 0 else Colors.WHITE)
+ print(f" 검증 통과한 SEQ 주석: {verified_correct}개")
+ print(f" 수정된 SEQ 주석: {corrected_seq}개")
+ print(f" 오류 발생 횟수: {error_count}개")
+
+ # CSV 누락 필드 상세 표시
+ if len(skipped_no_csv_fields) > 0:
+ print_error(f"\n⚠️ CSV에 누락된 필드 목록 (확인 필요):")
+ for field in skipped_no_csv_fields:
+ print_error(f" - {field}")
+
+ # 최종 결과
+ if error_count > 0:
+ print_error(f"\n⚠️ {error_count}개의 오류가 발생했습니다. 로그를 확인해주세요.")
+
+ if len(skipped_no_csv_fields) > 0:
+ print_error(f"\n🚨 주의: {len(skipped_no_csv_fields)}개의 필드가 CSV에 누락되어 있습니다!")
+ print_error("이 필드들은 WSDL에 정의되어 있지만 CSV 스펙에 없어 주석이 생성되지 않았습니다.")
+
+ if changes_made == 0:
+ print_success(f"\n모든 주석이 정확합니다! (검증된 SEQ 주석: {verified_correct}개)")
+ else:
+ print_success(f"\n{changes_made}개의 주석이 수정되었습니다.")
+ if corrected_seq > 0:
+ print(f" - 기존 SEQ 주석 수정: {corrected_seq}개")
+ if changes_made - corrected_seq > 0:
+ print(f" - 새로 추가/교체된 주석: {changes_made - corrected_seq}개")
+
+if __name__ == "__main__":
+ try:
+ csv_files = get_csv_files()
+ print_color(f"\n🎯 발견된 CSV 파일: {len(csv_files)}개", Colors.BOLD)
+ print_info(f"처리할 파일 목록: {csv_files}")
+
+ total_files = len(csv_files)
+ success_count = 0
+ error_count = 0
+
+ for i, target in enumerate(csv_files, 1):
+ print_color(f"\n{'='*60}", Colors.BOLD)
+ print_color(f"진행률: {i}/{total_files} - {target}", Colors.BOLD)
+ print_color(f"{'='*60}", Colors.BOLD)
+
+ try:
+ process_wsdl_file(target)
+ success_count += 1
+ except Exception as e:
+ print_error(f"파일 처리 실패 - {target}: {str(e)}")
+ error_count += 1
+
+ # 최종 통계
+ print_color(f"\n{'='*60}", Colors.BOLD)
+ print_color("🏁 전체 처리 완료", Colors.BOLD)
+ print_color(f"{'='*60}", Colors.BOLD)
+
+ print_success(f"성공: {success_count}개 파일")
+ if error_count > 0:
+ print_error(f"실패: {error_count}개 파일")
+ else:
+ print_success("모든 파일이 성공적으로 처리되었습니다!")
+
+ # 발견된 SAP 타입들 출력 (PostgreSQL 매핑용)
+ print_color(f"\n{'='*60}", Colors.BOLD)
+ print_color("📊 발견된 SAP 타입 통계 (PostgreSQL 매핑용)", Colors.MAGENTA)
+ print_color(f"{'='*60}", Colors.BOLD)
+
+ print_color(f"\n🔤 고유 SAP 타입 ({len(discovered_sap_types)}개):", Colors.CYAN)
+ for sap_type in sorted(discovered_sap_types):
+ print(f" - {sap_type}")
+
+ print_color(f"\n📏 타입-사이즈 조합 ({len(type_size_combinations)}개):", Colors.YELLOW)
+ for combination in sorted(type_size_combinations):
+ print(f" - {combination}")
+
+ print_color(f"\n💡 PostgreSQL 타입 매핑 가이드 (XML 파싱/조회용):", Colors.GREEN)
+ print(" 🎯 실용적 접근법:")
+ print(" - 대부분 → VARCHAR(500) 또는 TEXT (XML에서 모든 데이터가 문자열로 전송)")
+ print(" - 숫자 검색/정렬이 필요한 경우만 → NUMERIC")
+ print(" - 날짜 검색/정렬이 필요한 경우만 → DATE/TIMESTAMP")
+ print("")
+ print(" 📋 SAP 타입별 상세:")
+ print(" - CHAR, VARC, LCHR → VARCHAR(해당사이즈) 또는 TEXT")
+ print(" - DATS (날짜) → VARCHAR(8) 또는 DATE (YYYYMMDD 형식)")
+ print(" - TIMS (시간) → VARCHAR(6) 또는 TIME (HHMMSS 형식)")
+ print(" - CURR, DEC, QUAN, NUMB, NUMC, FLTP → VARCHAR 또는 NUMERIC")
+ print(" - CUKY (통화), UNIT (단위), LANG (언어) → VARCHAR(10)")
+ print("")
+ print(" ⚡ 권장: 초기에는 모두 VARCHAR/TEXT로 시작하고 필요시 변환")
+
+ except Exception as e:
+ print_error(f"스크립트 실행 중 치명적 오류 발생: {str(e)}")
+ exit(1) \ No newline at end of file