diff options
| author | dujinkim <dujin.kim@dtsolution.co.kr> | 2025-07-21 07:20:21 +0000 |
|---|---|---|
| committer | dujinkim <dujin.kim@dtsolution.co.kr> | 2025-07-21 07:20:21 +0000 |
| commit | 194bd4bd7e6144d5c09c5e3f5476d254234dce72 (patch) | |
| tree | c97d0b9d53edceba89b2597f14cbffe5717deb96 /public/wsdl/_util/wsdl_comment_to_drizzle_schema.py | |
| parent | 9da494b0e3bbe7b513521d0915510fe9ee376b8b (diff) | |
| parent | 8165f003563e3d7f328747be3098542fe527b014 (diff) | |
Merge remote-tracking branch 'origin/ECC-SOAP-INTERFACE' into dujinkim
Diffstat (limited to 'public/wsdl/_util/wsdl_comment_to_drizzle_schema.py')
| -rwxr-xr-x | public/wsdl/_util/wsdl_comment_to_drizzle_schema.py | 584 |
1 files changed, 584 insertions, 0 deletions
diff --git a/public/wsdl/_util/wsdl_comment_to_drizzle_schema.py b/public/wsdl/_util/wsdl_comment_to_drizzle_schema.py new file mode 100755 index 00000000..73e71374 --- /dev/null +++ b/public/wsdl/_util/wsdl_comment_to_drizzle_schema.py @@ -0,0 +1,584 @@ +#!/usr/bin/env python3 +""" +개별 WSDL 파일을 Drizzle 스키마로 변환하는 스크립트 +Usage: python3 wsdl_comment_to_drizzle_schema.py --wsdl IF_ECC_EVCP_PR_INFORMATION.wsdl +""" + +import argparse +import os +import re +import xml.etree.ElementTree as ET +from pathlib import Path +from typing import Dict, List, Set, Tuple, Optional +from collections import defaultdict +import sys +from datetime import datetime + +class ColorLogger: + """컬러 로깅을 위한 클래스""" + + # ANSI 컬러 코드 + COLORS = { + 'RESET': '\033[0m', + 'BOLD': '\033[1m', + 'DIM': '\033[2m', + + # 기본 컬러 + 'BLACK': '\033[30m', + 'RED': '\033[31m', + 'GREEN': '\033[32m', + 'YELLOW': '\033[33m', + 'BLUE': '\033[34m', + 'MAGENTA': '\033[35m', + 'CYAN': '\033[36m', + 'WHITE': '\033[37m', + + # 밝은 컬러 + 'BRIGHT_BLACK': '\033[90m', + 'BRIGHT_RED': '\033[91m', + 'BRIGHT_GREEN': '\033[92m', + 'BRIGHT_YELLOW': '\033[93m', + 'BRIGHT_BLUE': '\033[94m', + 'BRIGHT_MAGENTA': '\033[95m', + 'BRIGHT_CYAN': '\033[96m', + 'BRIGHT_WHITE': '\033[97m', + + # 배경 컬러 + 'BG_RED': '\033[41m', + 'BG_GREEN': '\033[42m', + 'BG_YELLOW': '\033[43m', + 'BG_BLUE': '\033[44m', + } + + def __init__(self, enable_colors: bool = True): + """ + 컬러 로거 초기화 + Args: + enable_colors: Windows CMD에서는 False로 설정 가능 + """ + self.enable_colors = enable_colors and self._supports_color() + + def _supports_color(self) -> bool: + """컬러 지원 여부 확인""" + # Windows에서 colorama가 없으면 컬러 비활성화 + if os.name == 'nt': + try: + import colorama + colorama.init() + return True + except ImportError: + return False + return True + + def _colorize(self, text: str, color: str) -> str: + """텍스트에 컬러 적용""" + if not self.enable_colors: + return text + return f"{self.COLORS.get(color, '')}{text}{self.COLORS['RESET']}" + + def header(self, text: str): + """헤더 로그 (굵은 파란색)""" + colored_text = self._colorize(text, 'BOLD') + colored_text = self._colorize(colored_text, 'BRIGHT_BLUE') + print(colored_text) + + def info(self, text: str): + """정보 로그 (파란색)""" + colored_text = self._colorize(text, 'BLUE') + print(colored_text) + + def success(self, text: str): + """성공 로그 (초록색)""" + colored_text = self._colorize(text, 'BRIGHT_GREEN') + print(colored_text) + + def warning(self, text: str): + """경고 로그 (노란색)""" + colored_text = self._colorize(text, 'BRIGHT_YELLOW') + print(colored_text) + + def error(self, text: str): + """에러 로그 (빨간색)""" + colored_text = self._colorize(text, 'BRIGHT_RED') + print(colored_text) + + def debug(self, text: str): + """디버그 로그 (회색)""" + colored_text = self._colorize(text, 'BRIGHT_BLACK') + print(colored_text) + + def table_info(self, text: str): + """테이블 정보 로그 (시안색)""" + colored_text = self._colorize(text, 'CYAN') + print(colored_text) + + def field_info(self, text: str): + """필드 정보 로그 (마젠타)""" + colored_text = self._colorize(text, 'MAGENTA') + print(colored_text) + + def separator(self, char: str = "=", length: int = 80): + """구분선 출력 (굵은 흰색)""" + line = char * length + colored_line = self._colorize(line, 'BOLD') + print(colored_line) + +# 전역 로거 인스턴스 +logger = ColorLogger() + +class WSDLAnalyzer: + def __init__(self, wsdl_file: str, table_prefix: Optional[str] = None): + """ + WSDL 파일 분석기 초기화 + Args: + wsdl_file: 분석할 WSDL 파일 경로 + table_prefix: 테이블 접두사 (옵션) + """ + self.wsdl_file = Path(wsdl_file) + self.table_prefix = table_prefix + self.tables = defaultdict(dict) # table_name -> {field_name: field_info} + self.table_hierarchy = defaultdict(list) # parent -> [children] + + # 필드명 매핑 규칙 정의 (필요시 확장 가능) + self.field_name_mappings = {} + + def analyze_wsdl(self) -> Tuple[Dict, Dict]: + """WSDL 파일을 분석하고 테이블 정보 반환""" + if not self.wsdl_file.exists(): + raise FileNotFoundError(f"WSDL file not found: {self.wsdl_file}") + + logger.info(f"Analyzing {self.wsdl_file.name}...") + + try: + with open(self.wsdl_file, 'r', encoding='utf-8') as f: + content = f.read() + + # 우선 정규식으로 분석 시도 (주석에서 테이블 정보 추출) + regex_count = self._extract_tables_from_regex(content, self.wsdl_file.name) + + # 정규식으로 찾지 못했을 때만 XML 파싱 시도 + if regex_count == 0: + try: + # XML 네임스페이스 등록 + namespaces = { + 'xsd': 'http://www.w3.org/2001/XMLSchema', + 'wsdl': 'http://schemas.xmlsoap.org/wsdl/' + } + + root = ET.fromstring(content) + self._extract_tables_from_xml(root, self.wsdl_file.name, namespaces) + except ET.ParseError as e: + logger.error(f" XML parsing failed: {e}") + except Exception as e: + logger.error(f" XML analysis error: {e}") + + # 테이블별 필드 합집합 처리 + self._merge_table_fields() + + return self.tables, self.table_hierarchy + + except Exception as e: + logger.error(f" Error analyzing {self.wsdl_file.name}: {e}") + raise + + def _merge_table_fields(self): + """테이블별 필드 합집합 처리""" + merged_tables = defaultdict(dict) + + for table_name, fields in self.tables.items(): + # 테이블별 필드를 실제 필드명 기준으로 그룹화 + field_groups = defaultdict(list) # actual_field_name -> [field_infos] + + for field_key, field_info in fields.items(): + # field_key에서 실제 필드명 추출 (|| 구분자 사용) + actual_field_name = field_key.split('||')[0] if '||' in field_key else field_key + field_groups[actual_field_name].append(field_info) + + # 각 필드 그룹을 병합 + for actual_field_name, field_infos in field_groups.items(): + # 첫 번째 필드 정보를 기준으로 시작 + merged_field = field_infos[0].copy() + + # 모든 WSDL 소스 수집 + all_sources = set() + all_descriptions = set() + + for field_info in field_infos: + all_sources.add(field_info['wsdl_source']) + if field_info['description'].strip(): + all_descriptions.add(field_info['description'].strip()) + + # 필수 필드인 경우 유지 + if field_info['mandatory'] == 'M': + merged_field['mandatory'] = 'M' + + # 병합된 정보 설정 + merged_field['wsdl_sources'] = all_sources + + # 설명 병합 (첫 번째 설명 사용) + if all_descriptions: + merged_field['description'] = list(all_descriptions)[0] + else: + merged_field['description'] = f'From {self.wsdl_file.name}' + + # 테이블에 추가 (실제 필드명 사용) + merged_tables[table_name][actual_field_name] = merged_field + + # 병합된 테이블 정보로 업데이트 + self.tables = merged_tables + + def _extract_tables_from_xml(self, root: ET.Element, wsdl_name: str, namespaces: dict): + """XML에서 테이블 정보 추출""" + # complexType 요소들에서 테이블 구조 추출 + for complex_type in root.findall(".//xsd:complexType", namespaces): + table_name = complex_type.get('name') + if table_name: + self._extract_fields_from_complex_type(complex_type, table_name, wsdl_name, namespaces) + + def _extract_tables_from_regex(self, content: str, wsdl_name: str) -> int: + """정규식으로 테이블 정보 추출""" + + # 1단계: 모든 SEQ 주석 찾기 + all_comments = re.findall(r'<!-- SEQ:\d+.*?-->', content, re.DOTALL) + + matches = [] + for comment in all_comments: + # 2단계: 단순한 파싱 방법 (콤마로 분할) + comment = comment.strip() + + # 콤마로 분할해서 각 부분을 분석 + parts = comment.split(', ') + + if len(parts) >= 7: + try: + table = parts[1].split(':')[1] if ':' in parts[1] else '' + field = parts[2].split(':')[1] if ':' in parts[2] else '' + mo = parts[3].split(':')[1] if ':' in parts[3] else '' + type_val = parts[4].split(':')[1] if ':' in parts[4] else '' + size = parts[5].split(':')[1] if ':' in parts[5] else '' + desc = parts[6].split(':')[1].replace(' -->', '') if ':' in parts[6] else '' + + matches.append((table, field, mo, type_val, size, desc)) + except (IndexError, ValueError): + # 파싱 실패 시 무시 + continue + + for match in matches: + table_path, field_name, mandatory, field_type, size, description = match + + # 필드명 매핑 적용 + original_field_name = field_name.strip() + mapped_field_name = self._apply_field_name_mapping(original_field_name, wsdl_name) + + # 테이블 경로에서 실제 테이블명 추출 + table_parts = table_path.split('/') + + # 계층 구조 기록 + if len(table_parts) > 1: + for i in range(len(table_parts) - 1): + parent = '/'.join(table_parts[:i+1]) + child = '/'.join(table_parts[:i+2]) + if child not in self.table_hierarchy[parent]: + self.table_hierarchy[parent].append(child) + + # 필드 정보 저장 + field_info = { + 'field_name': mapped_field_name, + 'original_field_name': original_field_name, + 'mandatory': mandatory.strip(), + 'type': field_type.strip(), + 'size': size.strip(), + 'description': description.strip(), + 'table_path': table_path, + 'wsdl_source': wsdl_name + } + + # 테이블명 생성 + table_name = self._generate_table_name(table_path, wsdl_name) + field_key = f"{mapped_field_name}||{table_path}" + self.tables[table_name][field_key] = field_info + + logger.success(f" Found {len(matches)} field definitions") + return len(matches) + + def _extract_fields_from_complex_type(self, complex_type, table_name: str, wsdl_name: str, namespaces: dict): + """complexType에서 필드 정보 추출""" + for element in complex_type.findall(".//xsd:element", namespaces): + field_name = element.get('name') + field_type = element.get('type', 'unknown') + min_occurs = element.get('minOccurs', '1') + + if field_name: + field_info = { + 'field_name': field_name, + 'original_field_name': field_name, + 'mandatory': 'M' if min_occurs != '0' else 'O', + 'type': field_type, + 'size': 'unknown', + 'description': f'From {table_name}', + 'table_path': table_name, + 'wsdl_source': wsdl_name + } + + # 테이블명 생성 + generated_table_name = self._generate_table_name(table_name, wsdl_name) + field_key = f"{field_name}||{table_name}" + self.tables[generated_table_name][field_key] = field_info + + def _apply_field_name_mapping(self, field_name: str, wsdl_name: str) -> str: + """특정 WSDL 파일의 필드명을 매핑 규칙에 따라 변경""" + for wsdl_pattern, mappings in self.field_name_mappings.items(): + if wsdl_pattern in wsdl_name.upper(): + if field_name in mappings: + original_name = field_name + mapped_name = mappings[field_name] + logger.debug(f" Field mapping: {original_name} -> {mapped_name} (from {wsdl_name})") + return mapped_name + return field_name + + def _generate_table_name(self, table_path: str, wsdl_name: str) -> str: + """테이블명 생성""" + # 테이블 접두사 생성 + if self.table_prefix: + prefix = self.table_prefix + else: + # WSDL 파일명에서 접두사 추출 + prefix = self._get_table_prefix_from_wsdl_name(wsdl_name) + + # 테이블 경로를 테이블명으로 변환 + table_suffix = table_path.replace('/', '_').upper() + + return f"{prefix}_{table_suffix}" + + def _get_table_prefix_from_wsdl_name(self, wsdl_name: str) -> str: + """WSDL 파일명에서 테이블 prefix 추출""" + # IF_XXX_EVCP_ 접두사 제거 + prefix = wsdl_name.replace('.wsdl', '') + # 일반적인 접두사 패턴 제거 + for pattern in ['IF_MDZ_EVCP_', 'IF_ECC_EVCP_', 'IF_']: + if prefix.startswith(pattern): + prefix = prefix[len(pattern):] + break + return prefix if prefix else 'COMMON' + +def map_wsdl_type_to_drizzle(wsdl_type: str, size: str) -> str: + """WSDL 타입을 Drizzle 타입으로 매핑""" + # 기본 길이 설정 + default_length = 100 + min_length = 10 + max_length = 2000 + + # LCHR 타입은 text()로 처리 + if 'LCHR' in wsdl_type.upper(): + return "text()" + + # 사이즈 처리 + if size and size.strip(): + try: + size_clean = size.strip() + + # "n,m" 형태 처리 (소수점 있는 숫자 타입 또는 numeric 타입) + if ',' in size_clean: + parts = size_clean.split(',') + if len(parts) == 2 and parts[0].isdigit() and parts[1].isdigit(): + total_digits = int(parts[0]) + decimal_places = int(parts[1]) + + # numeric 타입 처리 + if 'NUMERIC' in wsdl_type.upper() or 'CURR' in wsdl_type.upper() or 'NUMC' in wsdl_type.upper() or 'NUMB' in wsdl_type.upper() or 'DEC' in wsdl_type.upper(): + # numeric 타입은 decimal 또는 varchar로 처리 + if decimal_places > 0: + # 소수점이 있는 경우 decimal 타입 사용 + return f"decimal({{ precision: {total_digits}, scale: {decimal_places} }})" + else: + # 소수점이 없는 경우 integer 또는 varchar 사용 + if total_digits <= 10: + return "integer()" + else: + return f"varchar({{ length: {total_digits + 2} }})" + else: + # 기타 타입은 방어적 계산 + safe_length = total_digits + 5 + safe_length = max(min_length, min(safe_length, max_length)) + return f"varchar({{ length: {safe_length} }})" + + # 단순 숫자 처리 + elif size_clean.isdigit(): + original_length = int(size_clean) + safe_length = max(min_length, min(original_length, max_length)) + return f"varchar({{ length: {safe_length} }})" + + # "n.m" 형태 처리 + elif '.' in size_clean: + parts = size_clean.split('.') + if len(parts) == 2 and parts[0].isdigit() and parts[1].isdigit(): + total_digits = int(parts[0]) + decimal_places = int(parts[1]) + + # numeric 타입 처리 + if 'NUMERIC' in wsdl_type.upper() or 'CURR' in wsdl_type.upper() or 'NUMC' in wsdl_type.upper() or 'NUMB' in wsdl_type.upper() or 'DEC' in wsdl_type.upper(): + if decimal_places > 0: + return f"decimal({{ precision: {total_digits}, scale: {decimal_places} }})" + else: + if total_digits <= 10: + return "integer()" + else: + return f"varchar({{ length: {total_digits + 2} }})" + else: + safe_length = total_digits + 5 + safe_length = max(min_length, min(safe_length, max_length)) + return f"varchar({{ length: {safe_length} }})" + + # 기타 형태는 기본값 사용 + else: + logger.warning(f" ⚠️ 알 수 없는 사이즈 형태: '{size_clean}' -> 기본값 {default_length} 사용") + return f"varchar({{ length: {default_length} }})" + + except Exception as e: + logger.error(f" ❌ 사이즈 파싱 오류: '{size}' -> 기본값 {default_length} 사용, 오류: {e}") + return f"varchar({{ length: {default_length} }})" + + # 사이즈가 없거나 비어있는 경우 기본값 + return f"varchar({{ length: {default_length} }})" + +def generate_drizzle_schema(wsdl_tables: Dict, wsdl_file: str) -> str: + """Drizzle 스키마 코드 생성""" + wsdl_name = Path(wsdl_file).stem + + schema_code = [ + "import { integer, varchar, text, timestamp, decimal } from 'drizzle-orm/pg-core';", + "import { mdgSchema } from '../../../db/schema/MDG/mdg';", + "", + f"// WSDL 파일: {wsdl_name}.wsdl", + f"// 생성일시: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", + "// 자동 생성된 스키마 파일 - 인터페이스 정의서가 비정형인 만큼, 스케치 용도로 사용하고, 실제 구현을 위해선 점검이 필수입니다.", + "", + ] + + # 테이블 코드 생성 + for table_name, fields in sorted(wsdl_tables.items()): + schema_code.append(f"// Table: {table_name}") + schema_code.append(f"export const {table_name} = mdgSchema.table('{table_name}', {{") + schema_code.append(" id: integer('id').primaryKey().generatedByDefaultAsIdentity(),") + + for field_name, field_info in sorted(fields.items()): + drizzle_type = map_wsdl_type_to_drizzle(field_info['type'], field_info['size']) + mandatory = ".notNull()" if field_info['mandatory'] == 'M' else "" + + comment = f" // {field_info['description']}" if field_info['description'] else "" + wsdl_source = f" // From: {field_info['wsdl_source']}" + mandatory_comment = f" // Required" if field_info['mandatory'] == 'M' else "" + + schema_code.append(f" {field_name}: {drizzle_type}{mandatory},{comment}{wsdl_source}{mandatory_comment}") + + schema_code.append(" ") + schema_code.append(" createdAt: timestamp('created_at').defaultNow().notNull(),") + schema_code.append(" updatedAt: timestamp('updated_at').defaultNow().notNull(),") + schema_code.append("});") + schema_code.append("") + + return '\n'.join(schema_code) + +def print_analysis_summary(wsdl_tables: Dict, detailed: bool = False): + """분석 결과 요약 출력""" + logger.separator() + logger.header("분석 결과 요약") + logger.separator() + + logger.info(f"총 테이블 수: {len(wsdl_tables)}") + + total_fields = 0 + for table_name, fields in wsdl_tables.items(): + field_count = len(fields) + total_fields += field_count + logger.table_info(f" - {table_name}: {field_count} fields") + + logger.info(f"총 필드 수: {total_fields}") + + if detailed: + logger.separator() + logger.header("상세 필드 정보") + logger.separator() + + for table_name, fields in wsdl_tables.items(): + logger.table_info(f"\n### {table_name}") + for field_name, field_info in fields.items(): + logger.field_info(f" {field_name}: {field_info['type']}({field_info['size']}) - {field_info['description']}") + +def main(): + """메인 함수""" + parser = argparse.ArgumentParser( + description="WSDL 파일을 Drizzle 스키마로 변환", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +사용 예시: + %(prog)s --wsdl IF_ECC_EVCP_PR_INFORMATION.wsdl + %(prog)s --wsdl path/to/file.wsdl --output-dir ./schemas + %(prog)s --wsdl file.wsdl --table-prefix CUSTOM --detailed + """ + ) + + parser.add_argument('--wsdl', required=True, help='분석할 WSDL 파일 경로') + parser.add_argument('--output-dir', help='출력 디렉토리 (기본값: WSDL 파일과 같은 디렉토리)') + parser.add_argument('--table-prefix', help='테이블 접두사 (기본값: WSDL 파일명에서 추출)') + parser.add_argument('--detailed', action='store_true', help='상세 분석 결과 출력') + parser.add_argument('--no-colors', action='store_true', help='컬러 출력 비활성화') + + args = parser.parse_args() + + # 컬러 설정 + global logger + logger = ColorLogger(enable_colors=not args.no_colors) + + try: + # WSDL 파일 경로 처리 + wsdl_file = Path(args.wsdl) + if not wsdl_file.is_absolute(): + wsdl_file = Path.cwd() / wsdl_file + + if not wsdl_file.exists(): + logger.error(f"WSDL 파일을 찾을 수 없습니다: {wsdl_file}") + return 1 + + # 출력 디렉토리 설정 + if args.output_dir: + output_dir = Path(args.output_dir) + else: + output_dir = wsdl_file.parent + + output_dir.mkdir(parents=True, exist_ok=True) + + # 분석 시작 + logger.header(f"WSDL 분석 시작: {wsdl_file.name}") + logger.info(f"입력 파일: {wsdl_file}") + logger.info(f"출력 디렉토리: {output_dir}") + + # WSDL 분석 + analyzer = WSDLAnalyzer(str(wsdl_file), args.table_prefix) + wsdl_tables, table_hierarchy = analyzer.analyze_wsdl() + + if not wsdl_tables: + logger.warning("테이블이 발견되지 않았습니다.") + return 1 + + # 스키마 코드 생성 + schema_code = generate_drizzle_schema(wsdl_tables, str(wsdl_file)) + + # 출력 파일 생성 + output_file = output_dir / f"{wsdl_file.stem}.ts" + with open(output_file, 'w', encoding='utf-8') as f: + f.write(schema_code) + + logger.success(f"스키마 파일이 생성되었습니다: {output_file}") + + # 분석 결과 요약 + print_analysis_summary(wsdl_tables, args.detailed) + + return 0 + + except Exception as e: + logger.error(f"오류 발생: {e}") + return 1 + +if __name__ == "__main__": + sys.exit(main())
\ No newline at end of file |
