diff options
| author | joonhoekim <26rote@gmail.com> | 2025-06-27 01:25:48 +0000 |
|---|---|---|
| committer | joonhoekim <26rote@gmail.com> | 2025-06-27 01:25:48 +0000 |
| commit | 15b2d4ff61d0339385edd8cc67bf7579fcc2af08 (patch) | |
| tree | f0c36724855abccf705a9cdcae6fa3efd54d996d /public/wsdl/_util | |
| parent | e9897d416b3e7327bbd4d4aef887eee37751ae82 (diff) | |
(김준회) MDG SOAP 수신 유틸리티 및 API 엔드포인트, 스키마
Diffstat (limited to 'public/wsdl/_util')
| -rwxr-xr-x | public/wsdl/_util/analyze_mdz_wsdl.py | 847 | ||||
| -rw-r--r-- | public/wsdl/_util/update_wsdl_with_csv.py | 674 |
2 files changed, 1521 insertions, 0 deletions
diff --git a/public/wsdl/_util/analyze_mdz_wsdl.py b/public/wsdl/_util/analyze_mdz_wsdl.py new file mode 100755 index 00000000..216d867b --- /dev/null +++ b/public/wsdl/_util/analyze_mdz_wsdl.py @@ -0,0 +1,847 @@ +#!/usr/bin/env python3 +""" +MDZ WSDL 파일 분석 스크립트 +- WSDL 파일에서 테이블 구조 추출 +- 현재 Drizzle 스키마와 비교 +- 누락된 테이블/필드 확인 +""" + +import os +import re +import xml.etree.ElementTree as ET +from pathlib import Path +from typing import Dict, List, Set, Tuple +from collections import defaultdict +import sys +from datetime import datetime + +class ColorLogger: + """컬러 로깅을 위한 클래스""" + + # ANSI 컬러 코드 + COLORS = { + 'RESET': '\033[0m', + 'BOLD': '\033[1m', + 'DIM': '\033[2m', + + # 기본 컬러 + 'BLACK': '\033[30m', + 'RED': '\033[31m', + 'GREEN': '\033[32m', + 'YELLOW': '\033[33m', + 'BLUE': '\033[34m', + 'MAGENTA': '\033[35m', + 'CYAN': '\033[36m', + 'WHITE': '\033[37m', + + # 밝은 컬러 + 'BRIGHT_BLACK': '\033[90m', + 'BRIGHT_RED': '\033[91m', + 'BRIGHT_GREEN': '\033[92m', + 'BRIGHT_YELLOW': '\033[93m', + 'BRIGHT_BLUE': '\033[94m', + 'BRIGHT_MAGENTA': '\033[95m', + 'BRIGHT_CYAN': '\033[96m', + 'BRIGHT_WHITE': '\033[97m', + + # 배경 컬러 + 'BG_RED': '\033[41m', + 'BG_GREEN': '\033[42m', + 'BG_YELLOW': '\033[43m', + 'BG_BLUE': '\033[44m', + } + + def __init__(self, enable_colors: bool = True): + """ + 컬러 로거 초기화 + Args: + enable_colors: Windows CMD에서는 False로 설정 가능 + """ + self.enable_colors = enable_colors and self._supports_color() + + def _supports_color(self) -> bool: + """컬러 지원 여부 확인""" + # Windows에서 colorama가 없으면 컬러 비활성화 + if os.name == 'nt': + try: + import colorama + colorama.init() + return True + except ImportError: + return False + return True + + def _colorize(self, text: str, color: str) -> str: + """텍스트에 컬러 적용""" + if not self.enable_colors: + return text + return f"{self.COLORS.get(color, '')}{text}{self.COLORS['RESET']}" + + def header(self, text: str): + """헤더 로그 (굵은 파란색)""" + colored_text = self._colorize(text, 'BOLD') + colored_text = self._colorize(colored_text, 'BRIGHT_BLUE') + print(colored_text) + + def info(self, text: str): + """정보 로그 (파란색)""" + colored_text = self._colorize(text, 'BLUE') + print(colored_text) + + def success(self, text: str): + """성공 로그 (초록색)""" + colored_text = self._colorize(text, 'BRIGHT_GREEN') + print(colored_text) + + def warning(self, text: str): + """경고 로그 (노란색)""" + colored_text = self._colorize(text, 'BRIGHT_YELLOW') + print(colored_text) + + def error(self, text: str): + """에러 로그 (빨간색)""" + colored_text = self._colorize(text, 'BRIGHT_RED') + print(colored_text) + + def debug(self, text: str): + """디버그 로그 (회색)""" + colored_text = self._colorize(text, 'BRIGHT_BLACK') + print(colored_text) + + def table_info(self, text: str): + """테이블 정보 로그 (시안색)""" + colored_text = self._colorize(text, 'CYAN') + print(colored_text) + + def field_info(self, text: str): + """필드 정보 로그 (마젠타)""" + colored_text = self._colorize(text, 'MAGENTA') + print(colored_text) + + def separator(self, char: str = "=", length: int = 80): + """구분선 출력 (굵은 흰색)""" + line = char * length + colored_line = self._colorize(line, 'BOLD') + print(colored_line) + +# 전역 로거 인스턴스 +logger = ColorLogger() + +class WSDLAnalyzer: + def __init__(self, wsdl_directory: str): + self.wsdl_directory = Path(wsdl_directory) + self.tables = defaultdict(dict) # table_name -> {field_name: field_info} + self.table_hierarchy = defaultdict(list) # parent -> [children] + self.table_sources = defaultdict(set) # table_name -> {wsdl_file_names} + + # 필드명 매핑 규칙 정의 (개별 WSDL을 존중해 테이블 분리하기로 했으므로 사용하지 않음.) + self.field_name_mappings = {} + + # 사용법 + # self.field_name_mappings = { + # 'CUSTOMER_MASTER': { # WSDL 파일명에 이 문자열이 포함되면 + # 'ADDRNO': 'ADR_NO' # ADDRNO를 ADR_NO로 변경 + # } + # } + + def analyze_all_mdz_wsdls(self): + """MDZ가 포함된 모든 WSDL 파일 분석""" + wsdl_files = list(self.wsdl_directory.glob("*MDZ*.wsdl")) + + logger.info(f"Found {len(wsdl_files)} MDZ WSDL files:") + for wsdl_file in wsdl_files: + logger.table_info(f" - {wsdl_file.name}") + logger.info("") + + for wsdl_file in wsdl_files: + self._analyze_wsdl_file(wsdl_file) + + # 테이블별 필드 합집합 처리 + self._merge_table_fields() + + return self.tables, self.table_hierarchy + + def _merge_table_fields(self): + """테이블별 필드 합집합 처리 - 개선된 버전""" + merged_tables = defaultdict(dict) + + for table_name, fields in self.tables.items(): + # MATL_PLNT 테이블의 경우 디버깅 정보 출력 + if table_name == 'MATL_PLNT': + logger.debug(f"\n=== MATL_PLNT 테이블 디버깅 ===") + logger.debug(f" 병합 전 필드 수: {len(fields)}") + logger.debug(f" 필드 목록:") + for field_key, field_info in fields.items(): + logger.debug(f" {field_key} -> {field_info['field_name']} (from {field_info['wsdl_source']})") + + # 테이블별 필드를 실제 필드명 기준으로 그룹화 + field_groups = defaultdict(list) # actual_field_name -> [field_infos] + + for field_key, field_info in fields.items(): + # field_key에서 실제 필드명 추출 (|| 구분자 사용) + actual_field_name = field_key.split('||')[0] if '||' in field_key else field_key + field_groups[actual_field_name].append(field_info) + + # MATL_PLNT 테이블의 경우 그룹화 결과 출력 + if table_name == 'MATL_PLNT': + logger.debug(f" 그룹화 후 필드 수: {len(field_groups)}") + logger.debug(f" 그룹별 필드:") + for actual_field_name, field_infos in field_groups.items(): + sources = [info['wsdl_source'] for info in field_infos] + logger.debug(f" {actual_field_name}: {len(field_infos)}개 소스 - {sources}") + + # 각 필드 그룹을 병합 + for actual_field_name, field_infos in field_groups.items(): + # 첫 번째 필드 정보를 기준으로 시작 + merged_field = field_infos[0].copy() + + # 모든 WSDL 소스 수집 + all_sources = set() + all_descriptions = set() + + for field_info in field_infos: + all_sources.add(field_info['wsdl_source']) + if field_info['description'].strip(): + all_descriptions.add(field_info['description'].strip()) + + # 필수 필드인 경우 유지 + if field_info['mandatory'] == 'M': + merged_field['mandatory'] = 'M' + + # 병합된 정보 설정 + merged_field['wsdl_sources'] = all_sources + + # 설명 병합 (첫 번째 설명 사용, WSDL 소스 정보는 주석에 추가) + if all_descriptions: + merged_field['description'] = list(all_descriptions)[0] + else: + merged_field['description'] = f'From multiple sources' + + # 테이블에 추가 (실제 필드명 사용) + merged_tables[table_name][actual_field_name] = merged_field + + # 병합된 테이블 정보로 업데이트 + self.tables = merged_tables + + # 테이블별 WSDL 소스 정보 출력 + logger.info("\n테이블별 WSDL 소스 정보 (필드 중복 제거 후):") + for table_name, fields in self.tables.items(): + sources = set() + for field_info in fields.values(): + sources.update(field_info['wsdl_sources']) + logger.table_info(f"\n{table_name}:") + for source in sorted(sources): + logger.table_info(f" - {source}") + logger.table_info(f" 총 필드 수: {len(fields)}") + + # MATL_PLNT 테이블의 경우 최종 필드 목록 출력 + if table_name == 'MATL_PLNT': + logger.debug(f" 최종 필드 목록:") + for field_name in sorted(fields.keys()): + logger.debug(f" - {field_name}") + + def _analyze_wsdl_file(self, wsdl_file: Path): + """단일 WSDL 파일 분석""" + logger.info(f"Analyzing {wsdl_file.name}...") + + try: + with open(wsdl_file, 'r', encoding='utf-8') as f: + content = f.read() + + # 우선 정규식으로 분석 시도 (주석에서 테이블 정보 추출) + regex_count = self._extract_tables_from_regex(content, wsdl_file.name) + + # 정규식으로 찾지 못했을 때만 XML 파싱 시도 + if regex_count == 0: + try: + # XML 네임스페이스 등록 + namespaces = { + 'xsd': 'http://www.w3.org/2001/XMLSchema', + 'wsdl': 'http://schemas.xmlsoap.org/wsdl/' + } + + root = ET.fromstring(content) + self._extract_tables_from_xml(root, wsdl_file.name, namespaces) + except ET.ParseError as e: + logger.error(f" XML parsing failed: {e}") + except Exception as e: + logger.error(f" XML analysis error: {e}") + + except Exception as e: + logger.error(f" Error analyzing {wsdl_file.name}: {e}") + + def _extract_tables_from_xml(self, root: ET.Element, wsdl_name: str, namespaces: dict): + """XML에서 테이블 정보 추출""" + # complexType 요소들에서 테이블 구조 추출 + for complex_type in root.findall(".//xsd:complexType", namespaces): + table_name = complex_type.get('name') + if table_name: + self._extract_fields_from_complex_type(complex_type, table_name, wsdl_name, namespaces) + + def _extract_tables_from_regex(self, content: str, wsdl_name: str) -> int: + """정규식으로 테이블 정보 추출""" + + # Table 정보가 포함된 주석 패턴 (Description에서 --> 전까지 모든 문자 매칭) + table_pattern = r'<!-- SEQ:\d+, Table:([^,]+), Field:([^,]+), M/O:([^,]*), Type:([^,]+), Size:([^,]+), Description:(.*?) -->' + + matches = re.findall(table_pattern, content) + + # # MATL/PLNT 관련 필드 디버깅 + # matl_plnt_matches = [match for match in matches if 'MATL/PLNT' in match[0]] + # if matl_plnt_matches: + # print(f" {wsdl_name}에서 MATL/PLNT 필드 발견: {len(matl_plnt_matches)}개") + # for match in matl_plnt_matches: + # table_path, field_name = match[0], match[1] + # print(f" {field_name} (Table: {table_path})") + + for match in matches: + table_path, field_name, mandatory, field_type, size, description = match + + # 필드명 매핑 적용 + original_field_name = field_name.strip() + mapped_field_name = self._apply_field_name_mapping(original_field_name, wsdl_name) + + # 테이블 경로에서 실제 테이블명 추출 + # 예: "BP_HEADER/ADDRESS/AD_POSTAL" -> ["BP_HEADER", "ADDRESS", "AD_POSTAL"] + table_parts = table_path.split('/') + main_table = table_parts[0] + + # 계층 구조 기록 + if len(table_parts) > 1: + for i in range(len(table_parts) - 1): + parent = '/'.join(table_parts[:i+1]) + child = '/'.join(table_parts[:i+2]) + if child not in self.table_hierarchy[parent]: + self.table_hierarchy[parent].append(child) + + # 필드 정보 저장 (매핑된 필드명 사용) + field_info = { + 'field_name': mapped_field_name, # 매핑된 필드명 사용 + 'original_field_name': original_field_name, # 원본 필드명도 보존 + 'mandatory': mandatory.strip(), + 'type': field_type.strip(), + 'size': size.strip(), + 'description': description.strip(), + 'table_path': table_path, + 'wsdl_source': wsdl_name + } + + # 테이블별로 필드 저장 (|| 구분자 사용으로 충돌 방지, 매핑된 필드명 사용) + # CSV 파일명 기반 테이블 prefix 추가 + table_prefix = self._get_table_prefix_from_wsdl_name(wsdl_name) + full_table_name = f"{table_prefix}_{table_path.replace('/', '_').upper()}" + field_key = f"{mapped_field_name}||{table_path}" + self.tables[full_table_name][field_key] = field_info + + # # MATL_PLNT 테이블에 필드 추가 시 디버깅 + # if 'MATL_PLNT' in full_table_name: + # print(f" {full_table_name}에 필드 추가: {mapped_field_name} (from {wsdl_name})") + + logger.success(f" Found {len(matches)} field definitions") + return len(matches) + + def _extract_fields_from_complex_type(self, complex_type, table_name: str, wsdl_name: str, namespaces: dict): + """complexType에서 필드 정보 추출""" + for element in complex_type.findall(".//xsd:element", namespaces): + field_name = element.get('name') + field_type = element.get('type', 'unknown') + min_occurs = element.get('minOccurs', '1') + max_occurs = element.get('maxOccurs', '1') + + if field_name: + field_info = { + 'field_name': field_name, + 'mandatory': 'M' if min_occurs != '0' else 'O', + 'type': field_type, + 'size': 'unknown', + 'description': f'From {table_name}', + 'table_path': table_name, + 'wsdl_source': wsdl_name + } + + field_key = f"{field_name}||{table_name}" + self.tables[table_name.upper()][field_key] = field_info + + def _apply_field_name_mapping(self, field_name: str, wsdl_name: str) -> str: + """특정 WSDL 파일의 필드명을 매핑 규칙에 따라 변경""" + for wsdl_pattern, mappings in self.field_name_mappings.items(): + if wsdl_pattern in wsdl_name.upper(): + if field_name in mappings: + original_name = field_name + mapped_name = mappings[field_name] + logger.debug(f" Field mapping: {original_name} -> {mapped_name} (from {wsdl_name})") + return mapped_name + return field_name + + def _get_table_prefix_from_wsdl_name(self, wsdl_name: str) -> str: + """WSDL 파일명에서 테이블 prefix 추출""" + # 단순히 IF_MDZ_EVCP_ 접두사만 제거하고 나머지 그대로 사용 + # 예: IF_MDZ_EVCP_MATERIAL_PART_RETURN.wsdl -> MATERIAL_PART_RETURN + prefix = wsdl_name.replace('IF_MDZ_EVCP_', '').replace('.wsdl', '') + return prefix if prefix else 'COMMON' + +def analyze_current_drizzle_schema(schema_file: str) -> Set[str]: + """현재 Drizzle 스키마에서 테이블 목록 추출""" + try: + with open(schema_file, 'r', encoding='utf-8') as f: + content = f.read() + + # export const 테이블명 패턴 찾기 + table_pattern = r'export const (\w+) = mdgSchema\.table\(' + matches = re.findall(table_pattern, content) + + return set(matches) + + except FileNotFoundError: + logger.error(f"Schema file not found: {schema_file}") + return set() + +def compare_wsdl_vs_schema(wsdl_tables: Dict, schema_tables: Set[str]): + """WSDL 테이블과 스키마 테이블 비교""" + logger.separator() + logger.header("WSDL vs Drizzle Schema 비교 결과") + logger.separator() + + # WSDL에서 추출한 테이블명 (이미 대문자로 변환됨) + wsdl_table_names = set(wsdl_tables.keys()) + + logger.info(f"\nWSDL에서 발견된 테이블: {len(wsdl_tables)}개") + for table in sorted(wsdl_tables.keys()): + field_count = len(wsdl_tables[table]) + logger.table_info(f" - {table} ({field_count} fields)") + + logger.info(f"\nDrizzle 스키마의 테이블: {len(schema_tables)}개") + for table in sorted(schema_tables): + logger.table_info(f" - {table}") + + # 테이블명 직접 비교 (대문자로 통일) + schema_tables_upper = {table.upper() for table in schema_tables} + wsdl_tables_upper = {table.upper() for table in wsdl_table_names} + + # 누락된 테이블 찾기 + missing_in_schema = wsdl_tables_upper - schema_tables_upper + extra_in_schema = schema_tables_upper - wsdl_tables_upper + + if missing_in_schema: + logger.warning(f"\n⚠️ 스키마에 누락된 테이블 ({len(missing_in_schema)}개):") + for table in sorted(missing_in_schema): + logger.warning(f" - {table}") + + if extra_in_schema: + logger.success(f"\n✅ 스키마에 추가로 정의된 테이블 ({len(extra_in_schema)}개):") + for table in sorted(extra_in_schema): + logger.success(f" - {table}") + + return missing_in_schema, extra_in_schema + +def generate_missing_tables_schema(wsdl_tables: Dict, missing_tables: Set[str]): + """누락된 테이블들의 Drizzle 스키마 코드 생성""" + if not missing_tables: + return + + logger.separator() + logger.header("누락된 테이블들의 Drizzle 스키마 코드") + logger.separator() + + for missing_table in sorted(missing_tables): + # WSDL 테이블명에서 해당하는 테이블 찾기 (대문자로 직접 매칭) + wsdl_table_key = missing_table.upper() + + if wsdl_table_key in wsdl_tables and wsdl_tables[wsdl_table_key]: + logger.field_info(f"\n// {wsdl_table_key}") + logger.field_info(f"export const {wsdl_table_key} = mdgSchema.table('{wsdl_table_key}', {{") + logger.field_info(" id: integer('id').primaryKey().generatedByDefaultAsIdentity(),") + + for field_key, field_info in wsdl_tables[wsdl_table_key].items(): + # field_key에서 실제 필드명 추출 (|| 구분자 사용) + if '||' in field_key: + actual_field_name = field_key.split('||')[0] + else: + actual_field_name = field_key + + # 필드 타입 매핑 + drizzle_type = map_wsdl_type_to_drizzle(field_info['type'], field_info['size']) + mandatory = ".notNull()" if field_info['mandatory'] == 'M' else "" + # NOTE: WSDL별로 개별 테이블을 만들기로 했으므로 notNull() 제약조건 복구 + + # 주석으로 설명 추가 + comment = f" // {field_info['description']}" if field_info['description'] else "" + wsdl_source = f" // From: {field_info['wsdl_source']}" + mandatory_comment = f" // WSDL에서 필수 필드" if field_info['mandatory'] == 'M' else "" + + logger.field_info(f" {actual_field_name}: {drizzle_type}{mandatory},{comment}{wsdl_source}{mandatory_comment}") + + logger.field_info(" ") + logger.field_info(" createdAt: timestamp('created_at').defaultNow().notNull(),") + logger.field_info(" updatedAt: timestamp('updated_at').defaultNow().notNull(),") + logger.field_info("});") + +def map_wsdl_type_to_drizzle(wsdl_type: str, size: str) -> str: + """WSDL 타입을 Drizzle 타입으로 매핑 (모든 필드를 VARCHAR로 통일, 방어적 사이즈 계산)""" + # 기본 길이 설정 + default_length = 100 + min_length = 10 # 최소 길이 + max_length = 2000 # 최대 길이 (PostgreSQL VARCHAR 권장 최대) + + # LCHR 타입은 text()로 처리 (큰 텍스트) + if 'LCHR' in wsdl_type.upper(): + return "text()" + + # 사이즈 처리 + if size and size.strip(): + try: + size_clean = size.strip() + + # "n,m" 형태 처리 (소수점 있는 숫자 타입) + if ',' in size_clean: + parts = size_clean.split(',') + if len(parts) == 2 and parts[0].isdigit() and parts[1].isdigit(): + total_digits = int(parts[0]) # 전체 자릿수 + decimal_places = int(parts[1]) # 소수점 이하 자릿수 + + # 방어적 계산: 전체 자릿수 + 부호(1) + 소수점(1) + 여유분(3) = +5 + safe_length = total_digits + 5 + logger.debug(f" 📏 소수점 타입 사이즈 계산: {size_clean} -> {safe_length} (원본: {total_digits}, 여유: +5)") + + # 최소/최대 길이 제한 + safe_length = max(min_length, min(safe_length, max_length)) + return f"varchar({{ length: {safe_length} }})" + + # 단순 숫자 처리 + elif size_clean.isdigit(): + original_length = int(size_clean) + # 단순 숫자는 그대로 사용 (여유분 없음) + safe_length = max(min_length, min(original_length, max_length)) + + if safe_length != original_length: + logger.debug(f" 📏 단순 사이즈 조정: {original_length} -> {safe_length} (min/max 제한)") + else: + logger.debug(f" 📏 단순 사이즈 사용: {safe_length}") + + return f"varchar({{ length: {safe_length} }})" + + # "n.m" 형태 처리 (점으로 구분된 경우도 있을 수 있음) + elif '.' in size_clean: + parts = size_clean.split('.') + if len(parts) == 2 and parts[0].isdigit() and parts[1].isdigit(): + total_digits = int(parts[0]) + decimal_places = int(parts[1]) + + # 방어적 계산 + safe_length = total_digits + 5 + logger.debug(f" 📏 소수점 타입 사이즈 계산 (점 구분): {size_clean} -> {safe_length} (원본: {total_digits}, 여유: +5)") + + safe_length = max(min_length, min(safe_length, max_length)) + return f"varchar({{ length: {safe_length} }})" + + # 기타 형태는 기본값 사용 + else: + logger.warning(f" ⚠️ 알 수 없는 사이즈 형태: '{size_clean}' -> 기본값 {default_length} 사용") + return f"varchar({{ length: {default_length} }})" + + except Exception as e: + logger.error(f" ❌ 사이즈 파싱 오류: '{size}' -> 기본값 {default_length} 사용, 오류: {e}") + return f"varchar({{ length: {default_length} }})" + + # 사이즈가 없거나 비어있는 경우 기본값 + return f"varchar({{ length: {default_length} }})" + +def validate_schema(wsdl_tables: Dict, schema_tables: Set[str]) -> Dict[str, List[str]]: + """스키마 검증""" + validation_results = { + 'missing_tables': [], + 'missing_fields': [], + 'type_mismatches': [], + 'duplicate_fields': [] + } + + for table_name, fields in wsdl_tables.items(): + # 테이블 존재 여부 검증 + if table_name not in schema_tables: + validation_results['missing_tables'].append(table_name) + continue + + # 필드 검증 + field_names = set() + for field_key, field_info in fields.items(): + # field_key에서 실제 필드명 추출 (|| 구분자 사용) + actual_field_name = field_key.split('||')[0] if '||' in field_key else field_key + + # 중복 필드 검사 + if actual_field_name in field_names: + validation_results['duplicate_fields'].append(f"{table_name}.{actual_field_name}") + field_names.add(actual_field_name) + + # 누락된 필드 검증 (WSDL의 모든 필드가 스키마에 있는지 확인) + if actual_field_name not in existing_fields: + validation_results['missing_fields'].append(f"{table_name}.{actual_field_name}") + + # 타입 호환성 검증 + # ? 기존 스키마의 필드 타입과 비교 + # ! VARCHAR로 처리하기로 했으니 타입 호환성 검사는 필요 없음 + + return validation_results + +def analyze_existing_schema(schema_file: str) -> Dict[str, Dict[str, str]]: + """기존 스키마 파일 분석""" + existing_schema = {} + try: + with open(schema_file, 'r', encoding='utf-8') as f: + content = f.read() + + # 테이블 정의 찾기 (변경된 패턴) + table_pattern = r'export const (\w+) = mdgSchema\.table\([\'"](\w+)[\'"]' + tables = re.findall(table_pattern, content) + + for table_const, table_name in tables: + # 테이블의 필드 정의 찾기 + field_pattern = rf'{table_const} = mdgSchema\.table\([\'"]{table_name}[\'"].*?{{(.*?)}}' + table_match = re.search(field_pattern, content, re.DOTALL) + + if table_match: + fields = {} + field_defs = table_match.group(1) + + # 각 필드 정의 파싱 (변경된 패턴) + field_pattern = r'(\w+):\s*(\w+)\([\'"](\w+)[\'"]' + field_matches = re.findall(field_pattern, field_defs) + + for field_name, field_type, field_db_name in field_matches: + fields[field_name] = { + 'type': field_type, + 'db_name': field_db_name + } + + existing_schema[table_name] = fields + + except Exception as e: + logger.error(f"스키마 파일 분석 중 오류 발생: {e}") + + return existing_schema + +def compare_field_types(wsdl_type: str, existing_type: str) -> bool: + """필드 타입 호환성 검사""" + type_mapping = { + 'varchar': ['CHAR', 'VARC', 'LCHR'], + 'integer': ['NUMB', 'NUMC'], + 'decimal': ['CURR'], + 'date': ['DATS'], + 'time': ['TIMS'], + 'text': ['LCHR'] + } + + wsdl_type = wsdl_type.upper() + existing_type = existing_type.lower() + + # 타입 매핑 확인 + for drizzle_type, wsdl_types in type_mapping.items(): + if existing_type == drizzle_type: + return any(t in wsdl_type for t in wsdl_types) + + return False + +def validate_schema(wsdl_tables: Dict, schema_tables: Set[str], existing_schema: Dict[str, Dict[str, str]]) -> Dict[str, List[str]]: + """스키마 검증 (개선된 버전)""" + validation_results = { + 'missing_tables': [], + 'missing_fields': [], + 'type_mismatches': [], + 'duplicate_fields': [] + } + + for table_name, fields in wsdl_tables.items(): + # 테이블 존재 여부 검증 + if table_name not in schema_tables: + validation_results['missing_tables'].append(table_name) + continue + + # 기존 테이블의 필드 정보 가져오기 + existing_fields = existing_schema.get(table_name, {}) + + # 필드 검증 + field_names = set() + for field_key, field_info in fields.items(): + # field_key에서 실제 필드명 추출 (|| 구분자 사용) + actual_field_name = field_key.split('||')[0] if '||' in field_key else field_key + + # 중복 필드 검사 + if actual_field_name in field_names: + validation_results['duplicate_fields'].append(f"{table_name}.{actual_field_name}") + field_names.add(actual_field_name) + + # 누락된 필드 검증 (WSDL의 모든 필드가 스키마에 있는지 확인) + # Note: existing_fields는 기존 validate_schema에서는 정의되지 않았으므로 스킵 + + # 타입 호환성 검증 + if actual_field_name in existing_fields: + existing_type = existing_fields[actual_field_name]['type'] + if not compare_field_types(field_info['type'], existing_type): + validation_results['type_mismatches'].append( + f"{table_name}.{actual_field_name}: WSDL={field_info['type']}, Existing={existing_type}" + ) + + return validation_results + +def generate_schema_code(wsdl_tables: Dict, validation_results: Dict[str, List[str]], existing_schema: Dict[str, Dict[str, str]]) -> str: + """스키마 코드 생성 (개선된 버전)""" + schema_code = [] + + # 누락된 테이블 생성 + for table_name in validation_results['missing_tables']: + table_code = generate_table_code(wsdl_tables[table_name], table_name) + schema_code.append(table_code) + + # 누락된 필드 추가 + for field_info in validation_results['missing_fields']: + table_name, field_name = field_info.split('.') + if table_name in existing_schema: + field_code = generate_field_code(wsdl_tables[table_name][field_name]) + # 기존 테이블에 필드 추가하는 코드 생성 + table_code = f"// {table_name}에 추가할 필드:\n{field_code}" + schema_code.append(table_code) + + return '\n\n'.join(schema_code) + +def generate_table_code(fields: Dict, table_name: str) -> str: + """테이블 코드 생성""" + code = [ + f"export const {table_name} = mdgSchema.table('{table_name}', {{", + " id: integer('id').primaryKey().generatedByDefaultAsIdentity()," + ] + + # fields에서 실제 필드명과 필드 정보 가져오기 + # _merge_table_fields에서 이미 actual_field_name을 키로 사용하므로 그대로 사용 + for actual_field_name, field_info in sorted(fields.items()): + # 필드 코드 생성 (actual_field_name 사용) + field_code = generate_field_code(field_info) + code.append(f" {field_code}") + + code.extend([ + " ", + " createdAt: timestamp('created_at').defaultNow().notNull(),", + " updatedAt: timestamp('updated_at').defaultNow().notNull(),", + "});" + ]) + + return '\n'.join(code) + +def generate_field_code(field_info: Dict) -> str: + """필드 코드 생성""" + drizzle_type = map_wsdl_type_to_drizzle(field_info['type'], field_info['size']) + mandatory = ".notNull()" if field_info['mandatory'] == 'M' else "" + # NOTE: WSDL별로 개별 테이블을 만들기로 했으므로 notNull() 제약조건 복구 + + comment = f" // {field_info['description']}" if field_info['description'] else "" + + # 여러 WSDL 소스 정보 추가 + if 'wsdl_sources' in field_info and len(field_info['wsdl_sources']) > 1: + sources_comment = f" // From: {', '.join(sorted(field_info['wsdl_sources']))}" + else: + wsdl_source = field_info.get('wsdl_source', list(field_info.get('wsdl_sources', ['Unknown']))[0]) + sources_comment = f" // From: {wsdl_source}" + + # 필수 필드 정보는 주석으로만 표시 + mandatory_comment = f" // WSDL에서 필수 필드" if field_info['mandatory'] == 'M' else "" + + # 필드명 매핑이 적용된 경우 원본 필드명 표시 + mapping_comment = "" + if 'original_field_name' in field_info and field_info['original_field_name'] != field_info['field_name']: + mapping_comment = f" // Original: {field_info['original_field_name']}" + + return f"{field_info['field_name']}: {drizzle_type}{mandatory},{comment}{sources_comment}{mandatory_comment}{mapping_comment}" + +def generate_complete_schema(wsdl_tables: Dict) -> str: + """완전한 스키마 코드 생성""" + schema_code = [ + "import { integer, varchar, text, timestamp } from 'drizzle-orm/pg-core';", + "import { mdgSchema } from '../../../db/schema/MDG/mdg';", + "", + "// WSDL 기반 자동 생성된 스키마", + "// 생성일시: " + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " (UTC로 9시간 빼야 한국 시간)", + "// 개선사항:", + "// 1. WSDL별로 테이블 만들었음. 인터페이스 정의서에 문제가 많아서 어쩔 수 없었음.", + "// 2. 타입은 varchar를 사용하도록 했음. 숫자관련된 건 부호, 소수점 대비 방어적으로 처리함 (사이즈)", + "// 3. 테이블명에서 '/' 문자를 '_'로 변경하여 PostgreSQL/TypeScript 호환성 확보함", + "", + ] + + # 테이블 코드 생성 + for table_name, fields in sorted(wsdl_tables.items()): + table_code = generate_table_code(fields, table_name) + schema_code.append(table_code) + schema_code.append("") # 빈 줄 추가 + + return '\n'.join(schema_code) + +def main(): + # 현재 스크립트 위치에서 프로젝트 루트 찾기 + script_dir = Path(__file__).parent + project_root = script_dir.parent.parent.parent # public/wsdl/_util -> project_root + + wsdl_dir = script_dir.parent # public/wsdl + schema_file = project_root / "db" / "schema" / "MDG" / "mdg.ts" + + logger.header("MDZ WSDL 분석 시작...") + logger.info(f"WSDL 디렉토리: {wsdl_dir}") + logger.info(f"스키마 파일: {schema_file}") + logger.info("") + + # WSDL 분석 + analyzer = WSDLAnalyzer(wsdl_dir) + wsdl_tables, table_hierarchy = analyzer.analyze_all_mdz_wsdls() + + # 현재 스키마 분석 + schema_tables = analyze_current_drizzle_schema(schema_file) + + # 기존 스키마 분석 + existing_schema = analyze_existing_schema(schema_file) + + # 비교 결과 출력 + missing_tables, extra_tables = compare_wsdl_vs_schema(wsdl_tables, schema_tables) + + # 누락된 테이블 스키마 생성 + generate_missing_tables_schema(wsdl_tables, missing_tables) + + # 스키마 검증 + validation_results = validate_schema(wsdl_tables, schema_tables, existing_schema) + + # 검증 결과 출력 + logger.separator() + logger.header("스키마 검증 결과") + logger.separator() + + for category, items in validation_results.items(): + if items: + logger.warning(f"\n{category}:") + for item in items: + logger.warning(f" - {item}") + + # 완전한 스키마 코드 생성 + complete_schema = generate_complete_schema(wsdl_tables) + + # 스키마 파일 저장 + output_file = script_dir / "generated_schema.ts" + with open(output_file, 'w', encoding='utf-8') as f: + f.write(complete_schema) + + logger.success(f"\n생성된 스키마가 {output_file}에 저장되었습니다.") + + # 상세 필드 정보 출력 (옵션) + if len(sys.argv) > 1 and sys.argv[1] == "--detailed": + logger.separator() + logger.header("상세 필드 정보") + logger.separator() + + for table_name, fields in wsdl_tables.items(): + logger.table_info(f"\n### {table_name}") + for field_name, field_info in fields.items(): + logger.field_info(f" {field_name}: {field_info['type']}({field_info['size']}) - {field_info['description']}") + + logger.success("\n분석 완료!") + logger.info(f"- 총 WSDL 테이블: {len(wsdl_tables)}개") + logger.info(f"- 현재 스키마 테이블: {len(schema_tables)}개") + logger.warning(f"- 누락 테이블: {len(missing_tables)}개") if missing_tables else logger.success(f"- 누락 테이블: {len(missing_tables)}개") + logger.info(f"- 추가 테이블: {len(extra_tables)}개") + +if __name__ == "__main__": + main()
\ No newline at end of file diff --git a/public/wsdl/_util/update_wsdl_with_csv.py b/public/wsdl/_util/update_wsdl_with_csv.py new file mode 100644 index 00000000..91a9d4dc --- /dev/null +++ b/public/wsdl/_util/update_wsdl_with_csv.py @@ -0,0 +1,674 @@ +#!/usr/bin/env python3 +import csv +import re +import shutil +import os +from datetime import datetime + +# 컬러 로그를 위한 색상 코드 추가 +class Colors: + RED = '\033[91m' + GREEN = '\033[92m' + YELLOW = '\033[93m' + BLUE = '\033[94m' + MAGENTA = '\033[95m' + CYAN = '\033[96m' + WHITE = '\033[97m' + ENDC = '\033[0m' # End color + BOLD = '\033[1m' + +def print_color(message, color=Colors.WHITE): + """컬러 출력 함수""" + print(f"{color}{message}{Colors.ENDC}") + +def print_error(message): + """에러 메시지 출력""" + print_color(f"❌ ERROR: {message}", Colors.RED) + +def print_warning(message): + """경고 메시지 출력""" + print_color(f"⚠️ WARNING: {message}", Colors.YELLOW) + +def print_success(message): + """성공 메시지 출력""" + print_color(f"✅ SUCCESS: {message}", Colors.GREEN) + +def print_info(message): + """정보 메시지 출력""" + print_color(f"ℹ️ INFO: {message}", Colors.CYAN) + +""" +실제 CSV 파일들 +IF_MDZ_EVCP_CUSTOMER_MASTER.csv IF_MDZ_EVCP_EMPLOYEE_REFERENCE.csv IF_MDZ_EVCP_MATERIAL_MASTER_PART_RETURN.csv IF_MDZ_EVCP_PROJECT_MASTER.csv +IF_MDZ_EVCP_DEPARTMENT_CODE.csv IF_MDZ_EVCP_EQUP_MASTER.csv IF_MDZ_EVCP_MODEL_MASTER.csv IF_MDZ_EVCP_VENDOR_MASTER.csv +IF_MDZ_EVCP_EMPLOYEE_MASTER.csv IF_MDZ_EVCP_MATERIAL_MASTER_PART.csv IF_MDZ_EVCP_ORGANIZATION_MASTER.csv +""" + +# ===== 설정 ===== +CSV_DIR = './public/wsdl/_csv' +WSDL_DIR = './public/wsdl' + +# 발견된 SAP 타입들을 수집하기 위한 전역 SET +discovered_sap_types = set() +type_size_combinations = set() # 타입-사이즈 조합도 수집 + +# 필드명 매핑 테이블 (CSV -> WSDL) +FIELD_MAPPING = { + # 개별 WSDL 별 테이블 만들기로 했으므로 사용하지 않고 WSDL 그대로 사용 + # 'ADR_NO': 'ADDRNO', + # 필요한 경우 더 추가 +} + +# 테이블 매핑 테이블 (complexType -> CSV Table) +TABLE_MAPPING = { + # 'MATL': 'MATL', + # 'UNIT': 'MATL/UNIT', + # 필요한 경우 더 추가 +} + +def normalize_sap_type_and_size(sap_type, size_str): + """SAP 타입과 사이즈를 정규화""" + global discovered_sap_types, type_size_combinations + + try: + # 타입을 대문자로 변환 + normalized_type = sap_type.upper().strip() if sap_type else 'CHAR' + + # 사이즈 처리 + normalized_size = size_str.strip() if size_str else '' + original_size = normalized_size # 원본 사이즈 보존 (로깅용) + + # 빈 사이즈인 경우 기본값 설정 + if not normalized_size: + normalized_size = '255' + else: + # 따옴표로 감싸진 경우 제거 (예: "1,0") + quote_removed = False + if normalized_size.startswith('"') and normalized_size.endswith('"'): + before_quote_removal = normalized_size + normalized_size = normalized_size[1:-1] + quote_removed = True + print_color(f"🔍 SIZE 파싱: 따옴표 제거 - '{before_quote_removal}' -> '{normalized_size}' (Type: {normalized_type})", Colors.YELLOW) + + # 로깅: 최종 결과 (따옴표가 없는 경우만) + if not quote_removed and original_size: + print_color(f"🔍 SIZE 파싱: 따옴표 없음 - '{original_size}' 그대로 사용 (Type: {normalized_type})", Colors.BLUE) + + # 발견된 타입들을 SET에 추가 + discovered_sap_types.add(normalized_type) + type_size_combinations.add(f"{normalized_type}({normalized_size})") + + # 컬럼 구분자나 특수문자가 있는 경우 그대로 유지 + # DEC, QUAN, NUMB 등에서 "1,0" 형태의 사이즈는 정상 + + return normalized_type, normalized_size + + except Exception as e: + print_error(f"타입/사이즈 정규화 실패 - Type: {sap_type}, Size: {size_str}, Error: {str(e)}") + return 'CHAR', '255' # 기본값 반환 + +def safe_description_escape(description): + """Description 필드의 특수문자를 안전하게 처리""" + try: + if not description: + return '' + + # HTML/XML 특수문자 이스케이프 + description = description.replace('&', '&') + description = description.replace('<', '<') + description = description.replace('>', '>') + description = description.replace('"', '"') + description = description.replace("'", ''') + + return description + + except Exception as e: + print_error(f"Description 이스케이프 실패: {description}, Error: {str(e)}") + return str(description) if description else '' + +def get_csv_files(): + """CSV 디렉토리에서 모든 CSV 파일 목록을 가져옴""" + csv_files = [] + for file in os.listdir(CSV_DIR): + if file.endswith('.csv'): + csv_files.append(file.replace('.csv', '')) + return csv_files + +def get_complex_type_info(wsdl_content): + """WSDL 파일에서 complexType 정보를 추출""" + complex_types = {} + current_type = None + current_fields = [] + type_stack = [] # 중첩된 complexType을 추적하기 위한 스택 + + for line in wsdl_content: + # complexType 시작 태그 찾기 + type_match = re.search(r'<xsd:complexType\s+name="([^"]+)"', line) + if type_match: + if current_type: + type_stack.append(current_type) + current_type = type_match.group(1) + current_fields = [] + continue + + # complexType 종료 태그 찾기 + if '</xsd:complexType>' in line: + if current_type: + complex_types[current_type] = current_fields + if type_stack: + current_type = type_stack.pop() + else: + current_type = None + continue + + # element 태그 찾기 + element_match = re.search(r'<xsd:element\s+name="([^"]+)"', line) + if element_match and current_type: + field_info = { + 'name': element_match.group(1), + 'type': re.search(r'type="([^"]+)"', line).group(1) if 'type="' in line else None, + 'is_array': 'maxOccurs="unbounded"' in line + } + current_fields.append(field_info) + + return complex_types + +def get_table_for_complex_type(table_name, complex_type): + """테이블 이름에서 complexType에 해당하는 부분 추출""" + # 테이블 이름이 '/'로 구분되어 있다면 마지막 부분을 반환 + if '/' in table_name: + return table_name.split('/')[-1].upper() + return table_name.upper() + +def load_csv_data(csv_file): + """CSV 파일에서 필드 정보를 딕셔너리로 로드""" + csv_data = {} + csv_path = os.path.join(CSV_DIR, f'{csv_file}.csv') + + try: + with open(csv_path, 'r', encoding='utf-8-sig') as f: # BOM 처리 + reader = csv.DictReader(f) + for row_num, row in enumerate(reader, start=2): # 헤더 다음부터 2행 + try: + field_name = row['Field'] + table_name = row['Table'] + + # 매핑된 필드명이 있으면 사용, 없으면 원래 필드명 사용 + wsdl_field_name = FIELD_MAPPING.get(field_name, field_name) + + # 테이블 정보를 키에 포함 (구분자를 || 로 변경) + key = f"{wsdl_field_name}||{table_name}" + + # 타입과 사이즈 정규화 + normalized_type, normalized_size = normalize_sap_type_and_size( + row.get('Type', ''), row.get('Size', '') + ) + + # Description 안전 처리 + safe_desc = safe_description_escape(row.get('Description', '')) + + csv_data[key] = { + 'seq': row.get('SEQ', ''), + 'table': table_name, + 'field': row.get('Field', ''), # 원래 CSV 필드명 저장 + 'mo': row.get('M/O', ''), + 'type': normalized_type, # 정규화된 타입 + 'size': normalized_size, # 정규화된 사이즈 + 'description': safe_desc, # 안전 처리된 Description + 'original_type': row.get('Type', ''), # 원본 타입 보존 + 'original_size': row.get('Size', '') # 원본 사이즈 보존 + } + + except Exception as e: + print_error(f"CSV 행 {row_num} 처리 실패 - {csv_file}: {str(e)}") + print_error(f"문제 행 데이터: {row}") + continue + + except Exception as e: + print_error(f"CSV 파일 로딩 실패 - {csv_path}: {str(e)}") + return {} + + return csv_data + +def extract_field_name_from_line(line): + """라인에서 name="필드명" 추출""" + match = re.search(r'name="([^"]+)"', line) + return match.group(1) if match else None + +def extract_field_from_comment(comment_line): + """주석에서 Field: 부분의 필드명 추출""" + match = re.search(r'Field:([^,]+)', comment_line) + return match.group(1).strip() if match else None + +def has_seq_in_comment(comment_line): + """주석에 SEQ가 있는지 확인""" + return 'SEQ:' in comment_line + +def get_indentation(line): + """라인의 들여쓰기 반환""" + return len(line) - len(line.lstrip()) + +def create_comment(field_name, csv_data, indentation, complex_type): + """CSV 데이터를 기반으로 주석 생성""" + try: + # 필드명으로 시작하는 키들을 찾음 (대소문자 구분 없이) + matching_keys = [key for key in csv_data.keys() if key.split('||')[0].upper() == field_name.upper()] + if not matching_keys: + indent = ' ' * indentation + print_warning(f"매칭되지 않은 필드: {field_name}") + return f"{indent}<!-- TODO: UNMATCHED FIELD OCCURS - {field_name} -->" + + # complexType과 일치하는 테이블 정보 찾기 + matching_data = None + + # 1. complexType 이름과 완전히 일치하는 테이블 찾기 + for key in matching_keys: + table_name = key.split('||', 1)[1] + if complex_type.upper() == table_name.upper(): + matching_data = csv_data[key] + break + + # 2. CSV 테이블명을 '/'로 스플릿한 마지막 부분이 complexType과 일치하는 경우 + if not matching_data: + for key in matching_keys: + table_name = key.split('||', 1)[1] + if '/' in table_name: + last_part = table_name.split('/')[-1] + if complex_type.upper() == last_part.upper(): + matching_data = csv_data[key] + break + + # 3. 필드명만 일치하는 경우 (첫 번째 매칭 데이터 사용) + if not matching_data: + matching_data = csv_data[matching_keys[0]] + + # 4. 매칭된 데이터가 있으면 주석 생성, 없으면 매칭 실패 주석 + if matching_data: + indent = ' ' * indentation + + # CSV의 실제 타입과 사이즈 사용 + comment = f"{indent}<!-- SEQ:{matching_data['seq']}, Table:{matching_data['table']}, Field:{matching_data['field']}, M/O:{matching_data['mo']}, Type:{matching_data['type']}, Size:{matching_data['size']}, Description:{matching_data['description']} -->" + + print_info(f"주석 생성 완료: {field_name} -> Type:{matching_data['type']}, Size:{matching_data['size']}") + return comment + else: + indent = ' ' * indentation + print_warning(f"매칭 데이터를 찾을 수 없음: {field_name}") + return f"{indent}<!-- TODO: NO MATCHING DATA FOUND - {field_name} -->" + + except Exception as e: + indent = ' ' * indentation + print_error(f"주석 생성 실패 - 필드: {field_name}, 에러: {str(e)}") + return f"{indent}<!-- ERROR: COMMENT GENERATION FAILED - {field_name} -->" + +def normalize_comment(comment_line): + """주석을 정규화 (공백 제거, 소문자 변환 등)""" + # <!-- 와 --> 제거하고 내용만 추출 + content = re.sub(r'^\s*<!--\s*|\s*-->\s*$', '', comment_line.strip()) + # 여러 공백을 하나로 통합 + content = re.sub(r'\s+', ' ', content) + return content.strip() + +def comments_are_equal(existing_comment, expected_comment): + """두 주석이 같은 내용인지 비교""" + existing_normalized = normalize_comment(existing_comment) + expected_normalized = normalize_comment(expected_comment) + return existing_normalized == expected_normalized + +def should_process_line(line, csv_data): + """라인이 처리 대상인지 확인""" + # 네 조건을 모두 만족해야 함: + # 1. <xsd:element 태그 + # 2. name=" 속성이 있는 태그 + # 3. maxOccurs=" 속성이 없는 태그 (배열 데이터 제외) + # 4. CSV에 해당 필드가 있는 경우 + + if not ('<xsd:element' in line and 'name="' in line): + return False + + # maxOccurs=" 가 있으면 배열 데이터이므로 제외 (모든 maxOccurs 속성) + if 'maxOccurs="' in line: + return False + + field_name = extract_field_name_from_line(line) + if not field_name: + return False + + # 필드명이 CSV 데이터의 키에 정확히 일치하는지 확인 (대소문자 구분 없이) + return any(field_name.upper() == key.split('||')[0].upper() for key in csv_data.keys()) + +def get_skip_reason(line, csv_data): + """필드를 건너뛰는 이유를 반환""" + if not ('<xsd:element' in line and 'name="' in line): + return None + + field_name = extract_field_name_from_line(line) + if not field_name: + return None + + # maxOccurs 체크 (배열 타입) + if 'maxOccurs="' in line: + return "ARRAY_TYPE" + + # 복합객체인 경우 + if 'MASTER' in field_name: + return "COMPLEX_TYPE" + + # CSV에 있는지 체크 + has_csv_data = any(field_name.upper() == key.split('||')[0].upper() for key in csv_data.keys()) + if not has_csv_data: + # Req로 끝나는 경우는 래퍼 타입이므로 정상 + if field_name.endswith('Req'): + return "REQ_WRAPPER_TYPE" + else: + return "NO_CSV_DATA" + + return None + +def get_table_prefix_from_csv_name(csv_name: str) -> str: + """CSV 파일명에서 테이블 prefix 추출""" + csv_upper = csv_name.upper() + + # CSV 파일명 패턴에서 마스터 타입 추출 + if 'CUSTOMER_MASTER' in csv_upper: + return 'CUSTOMER' + elif 'VENDOR_MASTER' in csv_upper: + return 'VENDOR' + elif 'EMPLOYEE_MASTER' in csv_upper: + return 'EMPLOYEE' + elif 'PROJECT_MASTER' in csv_upper: + return 'PROJECT' + elif 'DEPARTMENT_CODE' in csv_upper: + return 'DEPARTMENT' + elif 'ORGANIZATION_MASTER' in csv_upper: + return 'ORGANIZATION' + elif 'EQUP_MASTER' in csv_upper: + return 'EQUP' + elif 'MODEL_MASTER' in csv_upper: + return 'MODEL' + elif 'MATERIAL_MASTER' in csv_upper: + return 'MATERIAL' + elif 'EMPLOYEE_REFERENCE' in csv_upper: + return 'EMPLOYEE_REF' + else: + # 기본적으로 MDZ 부분 제거 후 첫 번째 단어 사용 + parts = csv_name.replace('IF_MDZ_EVCP_', '').split('_') + return parts[0] if parts else 'COMMON' + +def backup_file(filepath): + """파일을 백업""" + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + backup_path = f"{filepath}.backup_{timestamp}" + shutil.copy2(filepath, backup_path) + print(f"백업 파일 생성: {backup_path}") + return backup_path + +def process_wsdl_file(target): + """WSDL 파일 처리""" + csv_file_path = os.path.join(CSV_DIR, f'{target}.csv') + wsdl_file_path = os.path.join(WSDL_DIR, f'{target}.wsdl') + + try: + # 백업 생성 + backup_path = backup_file(wsdl_file_path) + + print_color(f"\n🚀 처리 시작: {target}", Colors.BOLD) + print_info("CSV 데이터 로딩 중...") + csv_data = load_csv_data(target) + print_success(f"CSV에서 {len(csv_data)}개 필드 정보 로드됨") + + # WSDL 파일 읽기 + with open(wsdl_file_path, 'r', encoding='utf-8') as f: + lines = f.readlines() + + # complexType 정보 추출 + complex_types = get_complex_type_info(lines) + print_success(f"WSDL에서 {len(complex_types)}개 complexType 정보 추출됨") + except Exception as e: + print_error(f"파일 초기화 실패 - {target}: {str(e)}") + return + + # complexType 구조 출력 (디버깅용) + for type_name, fields in complex_types.items(): + print_color(f"\nComplexType: {type_name}", Colors.MAGENTA) + for field in fields: + print(f" - {field['name']} ({field['type']}) {'[Array]' if field['is_array'] else ''}") + + new_lines = [] + i = 0 + changes_made = 0 + processed_fields = [] + skipped_fields = [] + skipped_array_fields = [] + skipped_no_csv_fields = [] + skipped_req_wrapper_fields = [] + verified_correct = 0 + corrected_seq = 0 + error_count = 0 + + current_complex_type = None + type_stack = [] # 중첩된 complexType을 추적하기 위한 스택 + + while i < len(lines): + line = lines[i] + line_processed = False + + try: + # complexType 시작 태그 확인 + type_match = re.search(r'<xsd:complexType\s+name="([^"]+)"', line) + if type_match: + if current_complex_type: + type_stack.append(current_complex_type) + current_complex_type = type_match.group(1) + print_color(f"현재 complexType: {current_complex_type}", Colors.BLUE) + + # complexType 종료 태그 확인 + if '</xsd:complexType>' in line: + if type_stack: + current_complex_type = type_stack.pop() + print_color(f"이전 complexType으로 복귀: {current_complex_type}", Colors.BLUE) + else: + current_complex_type = None + + # CSV에 있는 xsd:element 필드인지 확인 + if should_process_line(line, csv_data): + field_name = extract_field_name_from_line(line) + + if field_name and current_complex_type: + processed_fields.append(field_name) + print_color(f"처리 중인 필드: {field_name} (complexType: {current_complex_type})", Colors.CYAN) + + # 바로 위 라인이 주석인지 확인 (공백 라인 건너뛰면서) + comment_line_index = -1 + j = len(new_lines) - 1 + + while j >= 0: + prev_line = new_lines[j].strip() + if prev_line == '': + j -= 1 + continue + elif prev_line.startswith('<!--') and prev_line.endswith('-->'): + comment_line_index = j + break + else: + break + + if comment_line_index >= 0: + existing_comment = new_lines[comment_line_index] + + if has_seq_in_comment(existing_comment): + indentation = get_indentation(line) + expected_comment = create_comment(field_name, csv_data, indentation, current_complex_type) + + if expected_comment: + if comments_are_equal(existing_comment, expected_comment): + verified_correct += 1 + print_success(f" 주석 검증 통과") + else: + new_lines[comment_line_index] = expected_comment + '\n' + changes_made += 1 + corrected_seq += 1 + print_warning(f" SEQ 주석 수정: {field_name}") + print(f" 기존: {existing_comment.strip()}") + print(f" 수정: {expected_comment}") + else: + indentation = get_indentation(line) + new_comment = create_comment(field_name, csv_data, indentation, current_complex_type) + if new_comment: + new_lines[comment_line_index] = new_comment + '\n' + changes_made += 1 + print_warning(f" 주석 교체: {field_name}") + else: + indentation = get_indentation(line) + new_comment = create_comment(field_name, csv_data, indentation, current_complex_type) + if new_comment: + new_lines.append(new_comment + '\n') + changes_made += 1 + print_info(f" 주석 추가: {field_name}") + + line_processed = True + elif '<xsd:element' in line and 'name="' in line: + field_name = extract_field_name_from_line(line) + if field_name: + skip_reason = get_skip_reason(line, csv_data) + if skip_reason == "ARRAY_TYPE": + skipped_array_fields.append(field_name) + skipped_fields.append(field_name) + print_color(f"건너뛴 필드: {field_name} (배열 타입 - maxOccurs 속성)", Colors.YELLOW) + elif skip_reason == "REQ_WRAPPER_TYPE": + skipped_req_wrapper_fields.append(field_name) + skipped_fields.append(field_name) + print_color(f"건너뛴 필드: {field_name} (요청 래퍼 타입 - 정상)", Colors.BLUE) + elif skip_reason == "NO_CSV_DATA": + skipped_no_csv_fields.append(field_name) + skipped_fields.append(field_name) + print_error(f"건너뛴 필드: {field_name} (CSV에 데이터 없음 - 확인 필요!)") + else: + # 기타 이유로 건너뛴 경우 + skipped_fields.append(field_name) + print_warning(f"건너뛴 필드: {field_name} (기타 이유)") + + except Exception as e: + print_error(f"라인 처리 중 오류 발생 (라인 {i+1}): {str(e)}") + print_error(f"문제 라인: {line.strip()}") + error_count += 1 + + new_lines.append(line) + i += 1 + + # 결과 저장 + try: + with open(wsdl_file_path, 'w', encoding='utf-8') as f: + f.writelines(new_lines) + print_success("WSDL 파일 저장 완료") + except Exception as e: + print_error(f"WSDL 파일 저장 실패: {str(e)}") + return + + # 결과 출력 + print_color(f"\n{'='*50}", Colors.BOLD) + print_color(f"처리 완료: {target}", Colors.BOLD) + print_color(f"{'='*50}", Colors.BOLD) + + print_info(f"CSV 파일: {csv_file_path}") + print_info(f"WSDL 파일: {wsdl_file_path}") + print_info(f"백업 파일: {backup_path}") + + print_color(f"\n📊 처리 통계:", Colors.MAGENTA) + print(f" 총 변경사항: {changes_made}개") + print(f" 처리된 CSV 필드 수: {len(processed_fields)}") + print(f" 건너뛴 필드 총계: {len(skipped_fields)}") + print(f" ├─ 배열 타입 (정상): {len(skipped_array_fields)}개") + print(f" ├─ 요청 래퍼 타입 (정상): {len(skipped_req_wrapper_fields)}개") + print_color(f" └─ CSV 누락 (문제): {len(skipped_no_csv_fields)}개", Colors.RED if len(skipped_no_csv_fields) > 0 else Colors.WHITE) + print(f" 검증 통과한 SEQ 주석: {verified_correct}개") + print(f" 수정된 SEQ 주석: {corrected_seq}개") + print(f" 오류 발생 횟수: {error_count}개") + + # CSV 누락 필드 상세 표시 + if len(skipped_no_csv_fields) > 0: + print_error(f"\n⚠️ CSV에 누락된 필드 목록 (확인 필요):") + for field in skipped_no_csv_fields: + print_error(f" - {field}") + + # 최종 결과 + if error_count > 0: + print_error(f"\n⚠️ {error_count}개의 오류가 발생했습니다. 로그를 확인해주세요.") + + if len(skipped_no_csv_fields) > 0: + print_error(f"\n🚨 주의: {len(skipped_no_csv_fields)}개의 필드가 CSV에 누락되어 있습니다!") + print_error("이 필드들은 WSDL에 정의되어 있지만 CSV 스펙에 없어 주석이 생성되지 않았습니다.") + + if changes_made == 0: + print_success(f"\n모든 주석이 정확합니다! (검증된 SEQ 주석: {verified_correct}개)") + else: + print_success(f"\n{changes_made}개의 주석이 수정되었습니다.") + if corrected_seq > 0: + print(f" - 기존 SEQ 주석 수정: {corrected_seq}개") + if changes_made - corrected_seq > 0: + print(f" - 새로 추가/교체된 주석: {changes_made - corrected_seq}개") + +if __name__ == "__main__": + try: + csv_files = get_csv_files() + print_color(f"\n🎯 발견된 CSV 파일: {len(csv_files)}개", Colors.BOLD) + print_info(f"처리할 파일 목록: {csv_files}") + + total_files = len(csv_files) + success_count = 0 + error_count = 0 + + for i, target in enumerate(csv_files, 1): + print_color(f"\n{'='*60}", Colors.BOLD) + print_color(f"진행률: {i}/{total_files} - {target}", Colors.BOLD) + print_color(f"{'='*60}", Colors.BOLD) + + try: + process_wsdl_file(target) + success_count += 1 + except Exception as e: + print_error(f"파일 처리 실패 - {target}: {str(e)}") + error_count += 1 + + # 최종 통계 + print_color(f"\n{'='*60}", Colors.BOLD) + print_color("🏁 전체 처리 완료", Colors.BOLD) + print_color(f"{'='*60}", Colors.BOLD) + + print_success(f"성공: {success_count}개 파일") + if error_count > 0: + print_error(f"실패: {error_count}개 파일") + else: + print_success("모든 파일이 성공적으로 처리되었습니다!") + + # 발견된 SAP 타입들 출력 (PostgreSQL 매핑용) + print_color(f"\n{'='*60}", Colors.BOLD) + print_color("📊 발견된 SAP 타입 통계 (PostgreSQL 매핑용)", Colors.MAGENTA) + print_color(f"{'='*60}", Colors.BOLD) + + print_color(f"\n🔤 고유 SAP 타입 ({len(discovered_sap_types)}개):", Colors.CYAN) + for sap_type in sorted(discovered_sap_types): + print(f" - {sap_type}") + + print_color(f"\n📏 타입-사이즈 조합 ({len(type_size_combinations)}개):", Colors.YELLOW) + for combination in sorted(type_size_combinations): + print(f" - {combination}") + + print_color(f"\n💡 PostgreSQL 타입 매핑 가이드 (XML 파싱/조회용):", Colors.GREEN) + print(" 🎯 실용적 접근법:") + print(" - 대부분 → VARCHAR(500) 또는 TEXT (XML에서 모든 데이터가 문자열로 전송)") + print(" - 숫자 검색/정렬이 필요한 경우만 → NUMERIC") + print(" - 날짜 검색/정렬이 필요한 경우만 → DATE/TIMESTAMP") + print("") + print(" 📋 SAP 타입별 상세:") + print(" - CHAR, VARC, LCHR → VARCHAR(해당사이즈) 또는 TEXT") + print(" - DATS (날짜) → VARCHAR(8) 또는 DATE (YYYYMMDD 형식)") + print(" - TIMS (시간) → VARCHAR(6) 또는 TIME (HHMMSS 형식)") + print(" - CURR, DEC, QUAN, NUMB, NUMC, FLTP → VARCHAR 또는 NUMERIC") + print(" - CUKY (통화), UNIT (단위), LANG (언어) → VARCHAR(10)") + print("") + print(" ⚡ 권장: 초기에는 모두 VARCHAR/TEXT로 시작하고 필요시 변환") + + except Exception as e: + print_error(f"스크립트 실행 중 치명적 오류 발생: {str(e)}") + exit(1)
\ No newline at end of file |
