#!/usr/bin/env python3 """ MDZ WSDL 파일 분석 스크립트 - WSDL 파일에서 테이블 구조 추출 - 현재 Drizzle 스키마와 비교 - 누락된 테이블/필드 확인 """ import os import re import xml.etree.ElementTree as ET from pathlib import Path from typing import Dict, List, Set, Tuple from collections import defaultdict import sys from datetime import datetime class ColorLogger: """컬러 로깅을 위한 클래스""" # ANSI 컬러 코드 COLORS = { 'RESET': '\033[0m', 'BOLD': '\033[1m', 'DIM': '\033[2m', # 기본 컬러 'BLACK': '\033[30m', 'RED': '\033[31m', 'GREEN': '\033[32m', 'YELLOW': '\033[33m', 'BLUE': '\033[34m', 'MAGENTA': '\033[35m', 'CYAN': '\033[36m', 'WHITE': '\033[37m', # 밝은 컬러 'BRIGHT_BLACK': '\033[90m', 'BRIGHT_RED': '\033[91m', 'BRIGHT_GREEN': '\033[92m', 'BRIGHT_YELLOW': '\033[93m', 'BRIGHT_BLUE': '\033[94m', 'BRIGHT_MAGENTA': '\033[95m', 'BRIGHT_CYAN': '\033[96m', 'BRIGHT_WHITE': '\033[97m', # 배경 컬러 'BG_RED': '\033[41m', 'BG_GREEN': '\033[42m', 'BG_YELLOW': '\033[43m', 'BG_BLUE': '\033[44m', } def __init__(self, enable_colors: bool = True): """ 컬러 로거 초기화 Args: enable_colors: Windows CMD에서는 False로 설정 가능 """ self.enable_colors = enable_colors and self._supports_color() def _supports_color(self) -> bool: """컬러 지원 여부 확인""" # Windows에서 colorama가 없으면 컬러 비활성화 if os.name == 'nt': try: import colorama colorama.init() return True except ImportError: return False return True def _colorize(self, text: str, color: str) -> str: """텍스트에 컬러 적용""" if not self.enable_colors: return text return f"{self.COLORS.get(color, '')}{text}{self.COLORS['RESET']}" def header(self, text: str): """헤더 로그 (굵은 파란색)""" colored_text = self._colorize(text, 'BOLD') colored_text = self._colorize(colored_text, 'BRIGHT_BLUE') print(colored_text) def info(self, text: str): """정보 로그 (파란색)""" colored_text = self._colorize(text, 'BLUE') print(colored_text) def success(self, text: str): """성공 로그 (초록색)""" colored_text = self._colorize(text, 'BRIGHT_GREEN') print(colored_text) def warning(self, text: str): """경고 로그 (노란색)""" colored_text = self._colorize(text, 'BRIGHT_YELLOW') print(colored_text) def error(self, text: str): """에러 로그 (빨간색)""" colored_text = self._colorize(text, 'BRIGHT_RED') print(colored_text) def debug(self, text: str): """디버그 로그 (회색)""" colored_text = self._colorize(text, 'BRIGHT_BLACK') print(colored_text) def table_info(self, text: str): """테이블 정보 로그 (시안색)""" colored_text = self._colorize(text, 'CYAN') print(colored_text) def field_info(self, text: str): """필드 정보 로그 (마젠타)""" colored_text = self._colorize(text, 'MAGENTA') print(colored_text) def separator(self, char: str = "=", length: int = 80): """구분선 출력 (굵은 흰색)""" line = char * length colored_line = self._colorize(line, 'BOLD') print(colored_line) # 전역 로거 인스턴스 logger = ColorLogger() class WSDLAnalyzer: def __init__(self, wsdl_directory: str): self.wsdl_directory = Path(wsdl_directory) self.tables = defaultdict(dict) # table_name -> {field_name: field_info} self.table_hierarchy = defaultdict(list) # parent -> [children] self.table_sources = defaultdict(set) # table_name -> {wsdl_file_names} # 필드명 매핑 규칙 정의 (개별 WSDL을 존중해 테이블 분리하기로 했으므로 사용하지 않음.) self.field_name_mappings = {} # 사용법 # self.field_name_mappings = { # 'CUSTOMER_MASTER': { # WSDL 파일명에 이 문자열이 포함되면 # 'ADDRNO': 'ADR_NO' # ADDRNO를 ADR_NO로 변경 # } # } def analyze_all_mdz_wsdls(self): """MDZ가 포함된 모든 WSDL 파일 분석""" wsdl_files = list(self.wsdl_directory.glob("*MDZ*.wsdl")) logger.info(f"Found {len(wsdl_files)} MDZ WSDL files:") for wsdl_file in wsdl_files: logger.table_info(f" - {wsdl_file.name}") logger.info("") for wsdl_file in wsdl_files: self._analyze_wsdl_file(wsdl_file) # 테이블별 필드 합집합 처리 self._merge_table_fields() return self.tables, self.table_hierarchy def _merge_table_fields(self): """테이블별 필드 합집합 처리 - 개선된 버전""" merged_tables = defaultdict(dict) for table_name, fields in self.tables.items(): # MATL_PLNT 테이블의 경우 디버깅 정보 출력 if table_name == 'MATL_PLNT': logger.debug(f"\n=== MATL_PLNT 테이블 디버깅 ===") logger.debug(f" 병합 전 필드 수: {len(fields)}") logger.debug(f" 필드 목록:") for field_key, field_info in fields.items(): logger.debug(f" {field_key} -> {field_info['field_name']} (from {field_info['wsdl_source']})") # 테이블별 필드를 실제 필드명 기준으로 그룹화 field_groups = defaultdict(list) # actual_field_name -> [field_infos] for field_key, field_info in fields.items(): # field_key에서 실제 필드명 추출 (|| 구분자 사용) actual_field_name = field_key.split('||')[0] if '||' in field_key else field_key field_groups[actual_field_name].append(field_info) # MATL_PLNT 테이블의 경우 그룹화 결과 출력 if table_name == 'MATL_PLNT': logger.debug(f" 그룹화 후 필드 수: {len(field_groups)}") logger.debug(f" 그룹별 필드:") for actual_field_name, field_infos in field_groups.items(): sources = [info['wsdl_source'] for info in field_infos] logger.debug(f" {actual_field_name}: {len(field_infos)}개 소스 - {sources}") # 각 필드 그룹을 병합 for actual_field_name, field_infos in field_groups.items(): # 첫 번째 필드 정보를 기준으로 시작 merged_field = field_infos[0].copy() # 모든 WSDL 소스 수집 all_sources = set() all_descriptions = set() for field_info in field_infos: all_sources.add(field_info['wsdl_source']) if field_info['description'].strip(): all_descriptions.add(field_info['description'].strip()) # 필수 필드인 경우 유지 if field_info['mandatory'] == 'M': merged_field['mandatory'] = 'M' # 병합된 정보 설정 merged_field['wsdl_sources'] = all_sources # 설명 병합 (첫 번째 설명 사용, WSDL 소스 정보는 주석에 추가) if all_descriptions: merged_field['description'] = list(all_descriptions)[0] else: merged_field['description'] = f'From multiple sources' # 테이블에 추가 (실제 필드명 사용) merged_tables[table_name][actual_field_name] = merged_field # 병합된 테이블 정보로 업데이트 self.tables = merged_tables # 테이블별 WSDL 소스 정보 출력 logger.info("\n테이블별 WSDL 소스 정보 (필드 중복 제거 후):") for table_name, fields in self.tables.items(): sources = set() for field_info in fields.values(): sources.update(field_info['wsdl_sources']) logger.table_info(f"\n{table_name}:") for source in sorted(sources): logger.table_info(f" - {source}") logger.table_info(f" 총 필드 수: {len(fields)}") # MATL_PLNT 테이블의 경우 최종 필드 목록 출력 if table_name == 'MATL_PLNT': logger.debug(f" 최종 필드 목록:") for field_name in sorted(fields.keys()): logger.debug(f" - {field_name}") def _analyze_wsdl_file(self, wsdl_file: Path): """단일 WSDL 파일 분석""" logger.info(f"Analyzing {wsdl_file.name}...") try: with open(wsdl_file, 'r', encoding='utf-8') as f: content = f.read() # 우선 정규식으로 분석 시도 (주석에서 테이블 정보 추출) regex_count = self._extract_tables_from_regex(content, wsdl_file.name) # 정규식으로 찾지 못했을 때만 XML 파싱 시도 if regex_count == 0: try: # XML 네임스페이스 등록 namespaces = { 'xsd': 'http://www.w3.org/2001/XMLSchema', 'wsdl': 'http://schemas.xmlsoap.org/wsdl/' } root = ET.fromstring(content) self._extract_tables_from_xml(root, wsdl_file.name, namespaces) except ET.ParseError as e: logger.error(f" XML parsing failed: {e}") except Exception as e: logger.error(f" XML analysis error: {e}") except Exception as e: logger.error(f" Error analyzing {wsdl_file.name}: {e}") def _extract_tables_from_xml(self, root: ET.Element, wsdl_name: str, namespaces: dict): """XML에서 테이블 정보 추출""" # complexType 요소들에서 테이블 구조 추출 for complex_type in root.findall(".//xsd:complexType", namespaces): table_name = complex_type.get('name') if table_name: self._extract_fields_from_complex_type(complex_type, table_name, wsdl_name, namespaces) def _extract_tables_from_regex(self, content: str, wsdl_name: str) -> int: """정규식으로 테이블 정보 추출""" # Table 정보가 포함된 주석 패턴 (Description에서 --> 전까지 모든 문자 매칭) table_pattern = r'' matches = re.findall(table_pattern, content) # # MATL/PLNT 관련 필드 디버깅 # matl_plnt_matches = [match for match in matches if 'MATL/PLNT' in match[0]] # if matl_plnt_matches: # print(f" {wsdl_name}에서 MATL/PLNT 필드 발견: {len(matl_plnt_matches)}개") # for match in matl_plnt_matches: # table_path, field_name = match[0], match[1] # print(f" {field_name} (Table: {table_path})") for match in matches: table_path, field_name, mandatory, field_type, size, description = match # 필드명 매핑 적용 original_field_name = field_name.strip() mapped_field_name = self._apply_field_name_mapping(original_field_name, wsdl_name) # 테이블 경로에서 실제 테이블명 추출 # 예: "BP_HEADER/ADDRESS/AD_POSTAL" -> ["BP_HEADER", "ADDRESS", "AD_POSTAL"] table_parts = table_path.split('/') main_table = table_parts[0] # 계층 구조 기록 if len(table_parts) > 1: for i in range(len(table_parts) - 1): parent = '/'.join(table_parts[:i+1]) child = '/'.join(table_parts[:i+2]) if child not in self.table_hierarchy[parent]: self.table_hierarchy[parent].append(child) # 필드 정보 저장 (매핑된 필드명 사용) field_info = { 'field_name': mapped_field_name, # 매핑된 필드명 사용 'original_field_name': original_field_name, # 원본 필드명도 보존 'mandatory': mandatory.strip(), 'type': field_type.strip(), 'size': size.strip(), 'description': description.strip(), 'table_path': table_path, 'wsdl_source': wsdl_name } # 테이블별로 필드 저장 (|| 구분자 사용으로 충돌 방지, 매핑된 필드명 사용) # CSV 파일명 기반 테이블 prefix 추가 table_prefix = self._get_table_prefix_from_wsdl_name(wsdl_name) full_table_name = f"{table_prefix}_{table_path.replace('/', '_').upper()}" field_key = f"{mapped_field_name}||{table_path}" self.tables[full_table_name][field_key] = field_info # # MATL_PLNT 테이블에 필드 추가 시 디버깅 # if 'MATL_PLNT' in full_table_name: # print(f" {full_table_name}에 필드 추가: {mapped_field_name} (from {wsdl_name})") logger.success(f" Found {len(matches)} field definitions") return len(matches) def _extract_fields_from_complex_type(self, complex_type, table_name: str, wsdl_name: str, namespaces: dict): """complexType에서 필드 정보 추출""" for element in complex_type.findall(".//xsd:element", namespaces): field_name = element.get('name') field_type = element.get('type', 'unknown') min_occurs = element.get('minOccurs', '1') max_occurs = element.get('maxOccurs', '1') if field_name: field_info = { 'field_name': field_name, 'mandatory': 'M' if min_occurs != '0' else 'O', 'type': field_type, 'size': 'unknown', 'description': f'From {table_name}', 'table_path': table_name, 'wsdl_source': wsdl_name } field_key = f"{field_name}||{table_name}" self.tables[table_name.upper()][field_key] = field_info def _apply_field_name_mapping(self, field_name: str, wsdl_name: str) -> str: """특정 WSDL 파일의 필드명을 매핑 규칙에 따라 변경""" for wsdl_pattern, mappings in self.field_name_mappings.items(): if wsdl_pattern in wsdl_name.upper(): if field_name in mappings: original_name = field_name mapped_name = mappings[field_name] logger.debug(f" Field mapping: {original_name} -> {mapped_name} (from {wsdl_name})") return mapped_name return field_name def _get_table_prefix_from_wsdl_name(self, wsdl_name: str) -> str: """WSDL 파일명에서 테이블 prefix 추출""" # 단순히 IF_MDZ_EVCP_ 접두사만 제거하고 나머지 그대로 사용 # 예: IF_MDZ_EVCP_MATERIAL_PART_RETURN.wsdl -> MATERIAL_PART_RETURN prefix = wsdl_name.replace('IF_MDZ_EVCP_', '').replace('.wsdl', '') return prefix if prefix else 'COMMON' def analyze_current_drizzle_schema(schema_file: str) -> Set[str]: """현재 Drizzle 스키마에서 테이블 목록 추출""" try: with open(schema_file, 'r', encoding='utf-8') as f: content = f.read() # export const 테이블명 패턴 찾기 table_pattern = r'export const (\w+) = mdgSchema\.table\(' matches = re.findall(table_pattern, content) return set(matches) except FileNotFoundError: logger.error(f"Schema file not found: {schema_file}") return set() def compare_wsdl_vs_schema(wsdl_tables: Dict, schema_tables: Set[str]): """WSDL 테이블과 스키마 테이블 비교""" logger.separator() logger.header("WSDL vs Drizzle Schema 비교 결과") logger.separator() # WSDL에서 추출한 테이블명 (이미 대문자로 변환됨) wsdl_table_names = set(wsdl_tables.keys()) logger.info(f"\nWSDL에서 발견된 테이블: {len(wsdl_tables)}개") for table in sorted(wsdl_tables.keys()): field_count = len(wsdl_tables[table]) logger.table_info(f" - {table} ({field_count} fields)") logger.info(f"\nDrizzle 스키마의 테이블: {len(schema_tables)}개") for table in sorted(schema_tables): logger.table_info(f" - {table}") # 테이블명 직접 비교 (대문자로 통일) schema_tables_upper = {table.upper() for table in schema_tables} wsdl_tables_upper = {table.upper() for table in wsdl_table_names} # 누락된 테이블 찾기 missing_in_schema = wsdl_tables_upper - schema_tables_upper extra_in_schema = schema_tables_upper - wsdl_tables_upper if missing_in_schema: logger.warning(f"\n⚠️ 스키마에 누락된 테이블 ({len(missing_in_schema)}개):") for table in sorted(missing_in_schema): logger.warning(f" - {table}") if extra_in_schema: logger.success(f"\n✅ 스키마에 추가로 정의된 테이블 ({len(extra_in_schema)}개):") for table in sorted(extra_in_schema): logger.success(f" - {table}") return missing_in_schema, extra_in_schema def generate_missing_tables_schema(wsdl_tables: Dict, missing_tables: Set[str]): """누락된 테이블들의 Drizzle 스키마 코드 생성""" if not missing_tables: return logger.separator() logger.header("누락된 테이블들의 Drizzle 스키마 코드") logger.separator() for missing_table in sorted(missing_tables): # WSDL 테이블명에서 해당하는 테이블 찾기 (대문자로 직접 매칭) wsdl_table_key = missing_table.upper() if wsdl_table_key in wsdl_tables and wsdl_tables[wsdl_table_key]: logger.field_info(f"\n// {wsdl_table_key}") logger.field_info(f"export const {wsdl_table_key} = mdgSchema.table('{wsdl_table_key}', {{") logger.field_info(" id: integer('id').primaryKey().generatedByDefaultAsIdentity(),") for field_key, field_info in wsdl_tables[wsdl_table_key].items(): # field_key에서 실제 필드명 추출 (|| 구분자 사용) if '||' in field_key: actual_field_name = field_key.split('||')[0] else: actual_field_name = field_key # 필드 타입 매핑 drizzle_type = map_wsdl_type_to_drizzle(field_info['type'], field_info['size']) mandatory = ".notNull()" if field_info['mandatory'] == 'M' else "" # NOTE: WSDL별로 개별 테이블을 만들기로 했으므로 notNull() 제약조건 복구 # 주석으로 설명 추가 comment = f" // {field_info['description']}" if field_info['description'] else "" wsdl_source = f" // From: {field_info['wsdl_source']}" mandatory_comment = f" // WSDL에서 필수 필드" if field_info['mandatory'] == 'M' else "" logger.field_info(f" {actual_field_name}: {drizzle_type}{mandatory},{comment}{wsdl_source}{mandatory_comment}") logger.field_info(" ") logger.field_info(" createdAt: timestamp('created_at').defaultNow().notNull(),") logger.field_info(" updatedAt: timestamp('updated_at').defaultNow().notNull(),") logger.field_info("});") def map_wsdl_type_to_drizzle(wsdl_type: str, size: str) -> str: """WSDL 타입을 Drizzle 타입으로 매핑 (모든 필드를 VARCHAR로 통일, 방어적 사이즈 계산)""" # 기본 길이 설정 default_length = 100 min_length = 10 # 최소 길이 max_length = 2000 # 최대 길이 (PostgreSQL VARCHAR 권장 최대) # LCHR 타입은 text()로 처리 (큰 텍스트) if 'LCHR' in wsdl_type.upper(): return "text()" # 사이즈 처리 if size and size.strip(): try: size_clean = size.strip() # "n,m" 형태 처리 (소수점 있는 숫자 타입) if ',' in size_clean: parts = size_clean.split(',') if len(parts) == 2 and parts[0].isdigit() and parts[1].isdigit(): total_digits = int(parts[0]) # 전체 자릿수 decimal_places = int(parts[1]) # 소수점 이하 자릿수 # 방어적 계산: 전체 자릿수 + 부호(1) + 소수점(1) + 여유분(3) = +5 safe_length = total_digits + 5 logger.debug(f" 📏 소수점 타입 사이즈 계산: {size_clean} -> {safe_length} (원본: {total_digits}, 여유: +5)") # 최소/최대 길이 제한 safe_length = max(min_length, min(safe_length, max_length)) return f"varchar({{ length: {safe_length} }})" # 단순 숫자 처리 elif size_clean.isdigit(): original_length = int(size_clean) # 단순 숫자는 그대로 사용 (여유분 없음) safe_length = max(min_length, min(original_length, max_length)) if safe_length != original_length: logger.debug(f" 📏 단순 사이즈 조정: {original_length} -> {safe_length} (min/max 제한)") else: logger.debug(f" 📏 단순 사이즈 사용: {safe_length}") return f"varchar({{ length: {safe_length} }})" # "n.m" 형태 처리 (점으로 구분된 경우도 있을 수 있음) elif '.' in size_clean: parts = size_clean.split('.') if len(parts) == 2 and parts[0].isdigit() and parts[1].isdigit(): total_digits = int(parts[0]) decimal_places = int(parts[1]) # 방어적 계산 safe_length = total_digits + 5 logger.debug(f" 📏 소수점 타입 사이즈 계산 (점 구분): {size_clean} -> {safe_length} (원본: {total_digits}, 여유: +5)") safe_length = max(min_length, min(safe_length, max_length)) return f"varchar({{ length: {safe_length} }})" # 기타 형태는 기본값 사용 else: logger.warning(f" ⚠️ 알 수 없는 사이즈 형태: '{size_clean}' -> 기본값 {default_length} 사용") return f"varchar({{ length: {default_length} }})" except Exception as e: logger.error(f" ❌ 사이즈 파싱 오류: '{size}' -> 기본값 {default_length} 사용, 오류: {e}") return f"varchar({{ length: {default_length} }})" # 사이즈가 없거나 비어있는 경우 기본값 return f"varchar({{ length: {default_length} }})" def validate_schema(wsdl_tables: Dict, schema_tables: Set[str]) -> Dict[str, List[str]]: """스키마 검증""" validation_results = { 'missing_tables': [], 'missing_fields': [], 'type_mismatches': [], 'duplicate_fields': [] } for table_name, fields in wsdl_tables.items(): # 테이블 존재 여부 검증 if table_name not in schema_tables: validation_results['missing_tables'].append(table_name) continue # 필드 검증 field_names = set() for field_key, field_info in fields.items(): # field_key에서 실제 필드명 추출 (|| 구분자 사용) actual_field_name = field_key.split('||')[0] if '||' in field_key else field_key # 중복 필드 검사 if actual_field_name in field_names: validation_results['duplicate_fields'].append(f"{table_name}.{actual_field_name}") field_names.add(actual_field_name) # 누락된 필드 검증 (WSDL의 모든 필드가 스키마에 있는지 확인) if actual_field_name not in existing_fields: validation_results['missing_fields'].append(f"{table_name}.{actual_field_name}") # 타입 호환성 검증 # ? 기존 스키마의 필드 타입과 비교 # ! VARCHAR로 처리하기로 했으니 타입 호환성 검사는 필요 없음 return validation_results def analyze_existing_schema(schema_file: str) -> Dict[str, Dict[str, str]]: """기존 스키마 파일 분석""" existing_schema = {} try: with open(schema_file, 'r', encoding='utf-8') as f: content = f.read() # 테이블 정의 찾기 (변경된 패턴) table_pattern = r'export const (\w+) = mdgSchema\.table\([\'"](\w+)[\'"]' tables = re.findall(table_pattern, content) for table_const, table_name in tables: # 테이블의 필드 정의 찾기 field_pattern = rf'{table_const} = mdgSchema\.table\([\'"]{table_name}[\'"].*?{{(.*?)}}' table_match = re.search(field_pattern, content, re.DOTALL) if table_match: fields = {} field_defs = table_match.group(1) # 각 필드 정의 파싱 (변경된 패턴) field_pattern = r'(\w+):\s*(\w+)\([\'"](\w+)[\'"]' field_matches = re.findall(field_pattern, field_defs) for field_name, field_type, field_db_name in field_matches: fields[field_name] = { 'type': field_type, 'db_name': field_db_name } existing_schema[table_name] = fields except Exception as e: logger.error(f"스키마 파일 분석 중 오류 발생: {e}") return existing_schema def compare_field_types(wsdl_type: str, existing_type: str) -> bool: """필드 타입 호환성 검사""" type_mapping = { 'varchar': ['CHAR', 'VARC', 'LCHR'], 'integer': ['NUMB', 'NUMC'], 'decimal': ['CURR'], 'date': ['DATS'], 'time': ['TIMS'], 'text': ['LCHR'] } wsdl_type = wsdl_type.upper() existing_type = existing_type.lower() # 타입 매핑 확인 for drizzle_type, wsdl_types in type_mapping.items(): if existing_type == drizzle_type: return any(t in wsdl_type for t in wsdl_types) return False def validate_schema(wsdl_tables: Dict, schema_tables: Set[str], existing_schema: Dict[str, Dict[str, str]]) -> Dict[str, List[str]]: """스키마 검증 (개선된 버전)""" validation_results = { 'missing_tables': [], 'missing_fields': [], 'type_mismatches': [], 'duplicate_fields': [] } for table_name, fields in wsdl_tables.items(): # 테이블 존재 여부 검증 if table_name not in schema_tables: validation_results['missing_tables'].append(table_name) continue # 기존 테이블의 필드 정보 가져오기 existing_fields = existing_schema.get(table_name, {}) # 필드 검증 field_names = set() for field_key, field_info in fields.items(): # field_key에서 실제 필드명 추출 (|| 구분자 사용) actual_field_name = field_key.split('||')[0] if '||' in field_key else field_key # 중복 필드 검사 if actual_field_name in field_names: validation_results['duplicate_fields'].append(f"{table_name}.{actual_field_name}") field_names.add(actual_field_name) # 누락된 필드 검증 (WSDL의 모든 필드가 스키마에 있는지 확인) # Note: existing_fields는 기존 validate_schema에서는 정의되지 않았으므로 스킵 # 타입 호환성 검증 if actual_field_name in existing_fields: existing_type = existing_fields[actual_field_name]['type'] if not compare_field_types(field_info['type'], existing_type): validation_results['type_mismatches'].append( f"{table_name}.{actual_field_name}: WSDL={field_info['type']}, Existing={existing_type}" ) return validation_results def generate_schema_code(wsdl_tables: Dict, validation_results: Dict[str, List[str]], existing_schema: Dict[str, Dict[str, str]]) -> str: """스키마 코드 생성 (개선된 버전)""" schema_code = [] # 누락된 테이블 생성 for table_name in validation_results['missing_tables']: table_code = generate_table_code(wsdl_tables[table_name], table_name) schema_code.append(table_code) # 누락된 필드 추가 for field_info in validation_results['missing_fields']: table_name, field_name = field_info.split('.') if table_name in existing_schema: field_code = generate_field_code(wsdl_tables[table_name][field_name]) # 기존 테이블에 필드 추가하는 코드 생성 table_code = f"// {table_name}에 추가할 필드:\n{field_code}" schema_code.append(table_code) return '\n\n'.join(schema_code) def generate_table_code(fields: Dict, table_name: str) -> str: """테이블 코드 생성""" code = [ f"export const {table_name} = mdgSchema.table('{table_name}', {{", " id: integer('id').primaryKey().generatedByDefaultAsIdentity()," ] # fields에서 실제 필드명과 필드 정보 가져오기 # _merge_table_fields에서 이미 actual_field_name을 키로 사용하므로 그대로 사용 for actual_field_name, field_info in sorted(fields.items()): # 필드 코드 생성 (actual_field_name 사용) field_code = generate_field_code(field_info) code.append(f" {field_code}") code.extend([ " ", " createdAt: timestamp('created_at').defaultNow().notNull(),", " updatedAt: timestamp('updated_at').defaultNow().notNull(),", "});" ]) return '\n'.join(code) def generate_field_code(field_info: Dict) -> str: """필드 코드 생성""" drizzle_type = map_wsdl_type_to_drizzle(field_info['type'], field_info['size']) mandatory = ".notNull()" if field_info['mandatory'] == 'M' else "" # NOTE: WSDL별로 개별 테이블을 만들기로 했으므로 notNull() 제약조건 복구 comment = f" // {field_info['description']}" if field_info['description'] else "" # 여러 WSDL 소스 정보 추가 if 'wsdl_sources' in field_info and len(field_info['wsdl_sources']) > 1: sources_comment = f" // From: {', '.join(sorted(field_info['wsdl_sources']))}" else: wsdl_source = field_info.get('wsdl_source', list(field_info.get('wsdl_sources', ['Unknown']))[0]) sources_comment = f" // From: {wsdl_source}" # 필수 필드 정보는 주석으로만 표시 mandatory_comment = f" // WSDL에서 필수 필드" if field_info['mandatory'] == 'M' else "" # 필드명 매핑이 적용된 경우 원본 필드명 표시 mapping_comment = "" if 'original_field_name' in field_info and field_info['original_field_name'] != field_info['field_name']: mapping_comment = f" // Original: {field_info['original_field_name']}" return f"{field_info['field_name']}: {drizzle_type}{mandatory},{comment}{sources_comment}{mandatory_comment}{mapping_comment}" def generate_complete_schema(wsdl_tables: Dict) -> str: """완전한 스키마 코드 생성""" schema_code = [ "import { integer, varchar, text, timestamp } from 'drizzle-orm/pg-core';", "import { mdgSchema } from '../../../db/schema/MDG/mdg';", "", "// WSDL 기반 자동 생성된 스키마", "// 생성일시: " + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " (UTC로 9시간 빼야 한국 시간)", "// 개선사항:", "// 1. WSDL별로 테이블 만들었음. 인터페이스 정의서에 문제가 많아서 어쩔 수 없었음.", "// 2. 타입은 varchar를 사용하도록 했음. 숫자관련된 건 부호, 소수점 대비 방어적으로 처리함 (사이즈)", "// 3. 테이블명에서 '/' 문자를 '_'로 변경하여 PostgreSQL/TypeScript 호환성 확보함", "", ] # 테이블 코드 생성 for table_name, fields in sorted(wsdl_tables.items()): table_code = generate_table_code(fields, table_name) schema_code.append(table_code) schema_code.append("") # 빈 줄 추가 return '\n'.join(schema_code) def main(): # 현재 스크립트 위치에서 프로젝트 루트 찾기 script_dir = Path(__file__).parent project_root = script_dir.parent.parent.parent # public/wsdl/_util -> project_root wsdl_dir = script_dir.parent # public/wsdl schema_file = project_root / "db" / "schema" / "MDG" / "mdg.ts" logger.header("MDZ WSDL 분석 시작...") logger.info(f"WSDL 디렉토리: {wsdl_dir}") logger.info(f"스키마 파일: {schema_file}") logger.info("") # WSDL 분석 analyzer = WSDLAnalyzer(wsdl_dir) wsdl_tables, table_hierarchy = analyzer.analyze_all_mdz_wsdls() # 현재 스키마 분석 schema_tables = analyze_current_drizzle_schema(schema_file) # 기존 스키마 분석 existing_schema = analyze_existing_schema(schema_file) # 비교 결과 출력 missing_tables, extra_tables = compare_wsdl_vs_schema(wsdl_tables, schema_tables) # 누락된 테이블 스키마 생성 generate_missing_tables_schema(wsdl_tables, missing_tables) # 스키마 검증 validation_results = validate_schema(wsdl_tables, schema_tables, existing_schema) # 검증 결과 출력 logger.separator() logger.header("스키마 검증 결과") logger.separator() for category, items in validation_results.items(): if items: logger.warning(f"\n{category}:") for item in items: logger.warning(f" - {item}") # 완전한 스키마 코드 생성 complete_schema = generate_complete_schema(wsdl_tables) # 스키마 파일 저장 output_file = script_dir / "generated_schema.ts" with open(output_file, 'w', encoding='utf-8') as f: f.write(complete_schema) logger.success(f"\n생성된 스키마가 {output_file}에 저장되었습니다.") # 상세 필드 정보 출력 (옵션) if len(sys.argv) > 1 and sys.argv[1] == "--detailed": logger.separator() logger.header("상세 필드 정보") logger.separator() for table_name, fields in wsdl_tables.items(): logger.table_info(f"\n### {table_name}") for field_name, field_info in fields.items(): logger.field_info(f" {field_name}: {field_info['type']}({field_info['size']}) - {field_info['description']}") logger.success("\n분석 완료!") logger.info(f"- 총 WSDL 테이블: {len(wsdl_tables)}개") logger.info(f"- 현재 스키마 테이블: {len(schema_tables)}개") logger.warning(f"- 누락 테이블: {len(missing_tables)}개") if missing_tables else logger.success(f"- 누락 테이블: {len(missing_tables)}개") logger.info(f"- 추가 테이블: {len(extra_tables)}개") if __name__ == "__main__": main()