diff options
| author | joonhoekim <26rote@gmail.com> | 2025-06-27 01:25:48 +0000 |
|---|---|---|
| committer | joonhoekim <26rote@gmail.com> | 2025-06-27 01:25:48 +0000 |
| commit | 15b2d4ff61d0339385edd8cc67bf7579fcc2af08 (patch) | |
| tree | f0c36724855abccf705a9cdcae6fa3efd54d996d /public/wsdl/_util/analyze_mdz_wsdl.py | |
| parent | e9897d416b3e7327bbd4d4aef887eee37751ae82 (diff) | |
(김준회) MDG SOAP 수신 유틸리티 및 API 엔드포인트, 스키마
Diffstat (limited to 'public/wsdl/_util/analyze_mdz_wsdl.py')
| -rwxr-xr-x | public/wsdl/_util/analyze_mdz_wsdl.py | 847 |
1 files changed, 847 insertions, 0 deletions
diff --git a/public/wsdl/_util/analyze_mdz_wsdl.py b/public/wsdl/_util/analyze_mdz_wsdl.py new file mode 100755 index 00000000..216d867b --- /dev/null +++ b/public/wsdl/_util/analyze_mdz_wsdl.py @@ -0,0 +1,847 @@ +#!/usr/bin/env python3 +""" +MDZ WSDL 파일 분석 스크립트 +- WSDL 파일에서 테이블 구조 추출 +- 현재 Drizzle 스키마와 비교 +- 누락된 테이블/필드 확인 +""" + +import os +import re +import xml.etree.ElementTree as ET +from pathlib import Path +from typing import Dict, List, Set, Tuple +from collections import defaultdict +import sys +from datetime import datetime + +class ColorLogger: + """컬러 로깅을 위한 클래스""" + + # ANSI 컬러 코드 + COLORS = { + 'RESET': '\033[0m', + 'BOLD': '\033[1m', + 'DIM': '\033[2m', + + # 기본 컬러 + 'BLACK': '\033[30m', + 'RED': '\033[31m', + 'GREEN': '\033[32m', + 'YELLOW': '\033[33m', + 'BLUE': '\033[34m', + 'MAGENTA': '\033[35m', + 'CYAN': '\033[36m', + 'WHITE': '\033[37m', + + # 밝은 컬러 + 'BRIGHT_BLACK': '\033[90m', + 'BRIGHT_RED': '\033[91m', + 'BRIGHT_GREEN': '\033[92m', + 'BRIGHT_YELLOW': '\033[93m', + 'BRIGHT_BLUE': '\033[94m', + 'BRIGHT_MAGENTA': '\033[95m', + 'BRIGHT_CYAN': '\033[96m', + 'BRIGHT_WHITE': '\033[97m', + + # 배경 컬러 + 'BG_RED': '\033[41m', + 'BG_GREEN': '\033[42m', + 'BG_YELLOW': '\033[43m', + 'BG_BLUE': '\033[44m', + } + + def __init__(self, enable_colors: bool = True): + """ + 컬러 로거 초기화 + Args: + enable_colors: Windows CMD에서는 False로 설정 가능 + """ + self.enable_colors = enable_colors and self._supports_color() + + def _supports_color(self) -> bool: + """컬러 지원 여부 확인""" + # Windows에서 colorama가 없으면 컬러 비활성화 + if os.name == 'nt': + try: + import colorama + colorama.init() + return True + except ImportError: + return False + return True + + def _colorize(self, text: str, color: str) -> str: + """텍스트에 컬러 적용""" + if not self.enable_colors: + return text + return f"{self.COLORS.get(color, '')}{text}{self.COLORS['RESET']}" + + def header(self, text: str): + """헤더 로그 (굵은 파란색)""" + colored_text = self._colorize(text, 'BOLD') + colored_text = self._colorize(colored_text, 'BRIGHT_BLUE') + print(colored_text) + + def info(self, text: str): + """정보 로그 (파란색)""" + colored_text = self._colorize(text, 'BLUE') + print(colored_text) + + def success(self, text: str): + """성공 로그 (초록색)""" + colored_text = self._colorize(text, 'BRIGHT_GREEN') + print(colored_text) + + def warning(self, text: str): + """경고 로그 (노란색)""" + colored_text = self._colorize(text, 'BRIGHT_YELLOW') + print(colored_text) + + def error(self, text: str): + """에러 로그 (빨간색)""" + colored_text = self._colorize(text, 'BRIGHT_RED') + print(colored_text) + + def debug(self, text: str): + """디버그 로그 (회색)""" + colored_text = self._colorize(text, 'BRIGHT_BLACK') + print(colored_text) + + def table_info(self, text: str): + """테이블 정보 로그 (시안색)""" + colored_text = self._colorize(text, 'CYAN') + print(colored_text) + + def field_info(self, text: str): + """필드 정보 로그 (마젠타)""" + colored_text = self._colorize(text, 'MAGENTA') + print(colored_text) + + def separator(self, char: str = "=", length: int = 80): + """구분선 출력 (굵은 흰색)""" + line = char * length + colored_line = self._colorize(line, 'BOLD') + print(colored_line) + +# 전역 로거 인스턴스 +logger = ColorLogger() + +class WSDLAnalyzer: + def __init__(self, wsdl_directory: str): + self.wsdl_directory = Path(wsdl_directory) + self.tables = defaultdict(dict) # table_name -> {field_name: field_info} + self.table_hierarchy = defaultdict(list) # parent -> [children] + self.table_sources = defaultdict(set) # table_name -> {wsdl_file_names} + + # 필드명 매핑 규칙 정의 (개별 WSDL을 존중해 테이블 분리하기로 했으므로 사용하지 않음.) + self.field_name_mappings = {} + + # 사용법 + # self.field_name_mappings = { + # 'CUSTOMER_MASTER': { # WSDL 파일명에 이 문자열이 포함되면 + # 'ADDRNO': 'ADR_NO' # ADDRNO를 ADR_NO로 변경 + # } + # } + + def analyze_all_mdz_wsdls(self): + """MDZ가 포함된 모든 WSDL 파일 분석""" + wsdl_files = list(self.wsdl_directory.glob("*MDZ*.wsdl")) + + logger.info(f"Found {len(wsdl_files)} MDZ WSDL files:") + for wsdl_file in wsdl_files: + logger.table_info(f" - {wsdl_file.name}") + logger.info("") + + for wsdl_file in wsdl_files: + self._analyze_wsdl_file(wsdl_file) + + # 테이블별 필드 합집합 처리 + self._merge_table_fields() + + return self.tables, self.table_hierarchy + + def _merge_table_fields(self): + """테이블별 필드 합집합 처리 - 개선된 버전""" + merged_tables = defaultdict(dict) + + for table_name, fields in self.tables.items(): + # MATL_PLNT 테이블의 경우 디버깅 정보 출력 + if table_name == 'MATL_PLNT': + logger.debug(f"\n=== MATL_PLNT 테이블 디버깅 ===") + logger.debug(f" 병합 전 필드 수: {len(fields)}") + logger.debug(f" 필드 목록:") + for field_key, field_info in fields.items(): + logger.debug(f" {field_key} -> {field_info['field_name']} (from {field_info['wsdl_source']})") + + # 테이블별 필드를 실제 필드명 기준으로 그룹화 + field_groups = defaultdict(list) # actual_field_name -> [field_infos] + + for field_key, field_info in fields.items(): + # field_key에서 실제 필드명 추출 (|| 구분자 사용) + actual_field_name = field_key.split('||')[0] if '||' in field_key else field_key + field_groups[actual_field_name].append(field_info) + + # MATL_PLNT 테이블의 경우 그룹화 결과 출력 + if table_name == 'MATL_PLNT': + logger.debug(f" 그룹화 후 필드 수: {len(field_groups)}") + logger.debug(f" 그룹별 필드:") + for actual_field_name, field_infos in field_groups.items(): + sources = [info['wsdl_source'] for info in field_infos] + logger.debug(f" {actual_field_name}: {len(field_infos)}개 소스 - {sources}") + + # 각 필드 그룹을 병합 + for actual_field_name, field_infos in field_groups.items(): + # 첫 번째 필드 정보를 기준으로 시작 + merged_field = field_infos[0].copy() + + # 모든 WSDL 소스 수집 + all_sources = set() + all_descriptions = set() + + for field_info in field_infos: + all_sources.add(field_info['wsdl_source']) + if field_info['description'].strip(): + all_descriptions.add(field_info['description'].strip()) + + # 필수 필드인 경우 유지 + if field_info['mandatory'] == 'M': + merged_field['mandatory'] = 'M' + + # 병합된 정보 설정 + merged_field['wsdl_sources'] = all_sources + + # 설명 병합 (첫 번째 설명 사용, WSDL 소스 정보는 주석에 추가) + if all_descriptions: + merged_field['description'] = list(all_descriptions)[0] + else: + merged_field['description'] = f'From multiple sources' + + # 테이블에 추가 (실제 필드명 사용) + merged_tables[table_name][actual_field_name] = merged_field + + # 병합된 테이블 정보로 업데이트 + self.tables = merged_tables + + # 테이블별 WSDL 소스 정보 출력 + logger.info("\n테이블별 WSDL 소스 정보 (필드 중복 제거 후):") + for table_name, fields in self.tables.items(): + sources = set() + for field_info in fields.values(): + sources.update(field_info['wsdl_sources']) + logger.table_info(f"\n{table_name}:") + for source in sorted(sources): + logger.table_info(f" - {source}") + logger.table_info(f" 총 필드 수: {len(fields)}") + + # MATL_PLNT 테이블의 경우 최종 필드 목록 출력 + if table_name == 'MATL_PLNT': + logger.debug(f" 최종 필드 목록:") + for field_name in sorted(fields.keys()): + logger.debug(f" - {field_name}") + + def _analyze_wsdl_file(self, wsdl_file: Path): + """단일 WSDL 파일 분석""" + logger.info(f"Analyzing {wsdl_file.name}...") + + try: + with open(wsdl_file, 'r', encoding='utf-8') as f: + content = f.read() + + # 우선 정규식으로 분석 시도 (주석에서 테이블 정보 추출) + regex_count = self._extract_tables_from_regex(content, wsdl_file.name) + + # 정규식으로 찾지 못했을 때만 XML 파싱 시도 + if regex_count == 0: + try: + # XML 네임스페이스 등록 + namespaces = { + 'xsd': 'http://www.w3.org/2001/XMLSchema', + 'wsdl': 'http://schemas.xmlsoap.org/wsdl/' + } + + root = ET.fromstring(content) + self._extract_tables_from_xml(root, wsdl_file.name, namespaces) + except ET.ParseError as e: + logger.error(f" XML parsing failed: {e}") + except Exception as e: + logger.error(f" XML analysis error: {e}") + + except Exception as e: + logger.error(f" Error analyzing {wsdl_file.name}: {e}") + + def _extract_tables_from_xml(self, root: ET.Element, wsdl_name: str, namespaces: dict): + """XML에서 테이블 정보 추출""" + # complexType 요소들에서 테이블 구조 추출 + for complex_type in root.findall(".//xsd:complexType", namespaces): + table_name = complex_type.get('name') + if table_name: + self._extract_fields_from_complex_type(complex_type, table_name, wsdl_name, namespaces) + + def _extract_tables_from_regex(self, content: str, wsdl_name: str) -> int: + """정규식으로 테이블 정보 추출""" + + # Table 정보가 포함된 주석 패턴 (Description에서 --> 전까지 모든 문자 매칭) + table_pattern = r'<!-- SEQ:\d+, Table:([^,]+), Field:([^,]+), M/O:([^,]*), Type:([^,]+), Size:([^,]+), Description:(.*?) -->' + + matches = re.findall(table_pattern, content) + + # # MATL/PLNT 관련 필드 디버깅 + # matl_plnt_matches = [match for match in matches if 'MATL/PLNT' in match[0]] + # if matl_plnt_matches: + # print(f" {wsdl_name}에서 MATL/PLNT 필드 발견: {len(matl_plnt_matches)}개") + # for match in matl_plnt_matches: + # table_path, field_name = match[0], match[1] + # print(f" {field_name} (Table: {table_path})") + + for match in matches: + table_path, field_name, mandatory, field_type, size, description = match + + # 필드명 매핑 적용 + original_field_name = field_name.strip() + mapped_field_name = self._apply_field_name_mapping(original_field_name, wsdl_name) + + # 테이블 경로에서 실제 테이블명 추출 + # 예: "BP_HEADER/ADDRESS/AD_POSTAL" -> ["BP_HEADER", "ADDRESS", "AD_POSTAL"] + table_parts = table_path.split('/') + main_table = table_parts[0] + + # 계층 구조 기록 + if len(table_parts) > 1: + for i in range(len(table_parts) - 1): + parent = '/'.join(table_parts[:i+1]) + child = '/'.join(table_parts[:i+2]) + if child not in self.table_hierarchy[parent]: + self.table_hierarchy[parent].append(child) + + # 필드 정보 저장 (매핑된 필드명 사용) + field_info = { + 'field_name': mapped_field_name, # 매핑된 필드명 사용 + 'original_field_name': original_field_name, # 원본 필드명도 보존 + 'mandatory': mandatory.strip(), + 'type': field_type.strip(), + 'size': size.strip(), + 'description': description.strip(), + 'table_path': table_path, + 'wsdl_source': wsdl_name + } + + # 테이블별로 필드 저장 (|| 구분자 사용으로 충돌 방지, 매핑된 필드명 사용) + # CSV 파일명 기반 테이블 prefix 추가 + table_prefix = self._get_table_prefix_from_wsdl_name(wsdl_name) + full_table_name = f"{table_prefix}_{table_path.replace('/', '_').upper()}" + field_key = f"{mapped_field_name}||{table_path}" + self.tables[full_table_name][field_key] = field_info + + # # MATL_PLNT 테이블에 필드 추가 시 디버깅 + # if 'MATL_PLNT' in full_table_name: + # print(f" {full_table_name}에 필드 추가: {mapped_field_name} (from {wsdl_name})") + + logger.success(f" Found {len(matches)} field definitions") + return len(matches) + + def _extract_fields_from_complex_type(self, complex_type, table_name: str, wsdl_name: str, namespaces: dict): + """complexType에서 필드 정보 추출""" + for element in complex_type.findall(".//xsd:element", namespaces): + field_name = element.get('name') + field_type = element.get('type', 'unknown') + min_occurs = element.get('minOccurs', '1') + max_occurs = element.get('maxOccurs', '1') + + if field_name: + field_info = { + 'field_name': field_name, + 'mandatory': 'M' if min_occurs != '0' else 'O', + 'type': field_type, + 'size': 'unknown', + 'description': f'From {table_name}', + 'table_path': table_name, + 'wsdl_source': wsdl_name + } + + field_key = f"{field_name}||{table_name}" + self.tables[table_name.upper()][field_key] = field_info + + def _apply_field_name_mapping(self, field_name: str, wsdl_name: str) -> str: + """특정 WSDL 파일의 필드명을 매핑 규칙에 따라 변경""" + for wsdl_pattern, mappings in self.field_name_mappings.items(): + if wsdl_pattern in wsdl_name.upper(): + if field_name in mappings: + original_name = field_name + mapped_name = mappings[field_name] + logger.debug(f" Field mapping: {original_name} -> {mapped_name} (from {wsdl_name})") + return mapped_name + return field_name + + def _get_table_prefix_from_wsdl_name(self, wsdl_name: str) -> str: + """WSDL 파일명에서 테이블 prefix 추출""" + # 단순히 IF_MDZ_EVCP_ 접두사만 제거하고 나머지 그대로 사용 + # 예: IF_MDZ_EVCP_MATERIAL_PART_RETURN.wsdl -> MATERIAL_PART_RETURN + prefix = wsdl_name.replace('IF_MDZ_EVCP_', '').replace('.wsdl', '') + return prefix if prefix else 'COMMON' + +def analyze_current_drizzle_schema(schema_file: str) -> Set[str]: + """현재 Drizzle 스키마에서 테이블 목록 추출""" + try: + with open(schema_file, 'r', encoding='utf-8') as f: + content = f.read() + + # export const 테이블명 패턴 찾기 + table_pattern = r'export const (\w+) = mdgSchema\.table\(' + matches = re.findall(table_pattern, content) + + return set(matches) + + except FileNotFoundError: + logger.error(f"Schema file not found: {schema_file}") + return set() + +def compare_wsdl_vs_schema(wsdl_tables: Dict, schema_tables: Set[str]): + """WSDL 테이블과 스키마 테이블 비교""" + logger.separator() + logger.header("WSDL vs Drizzle Schema 비교 결과") + logger.separator() + + # WSDL에서 추출한 테이블명 (이미 대문자로 변환됨) + wsdl_table_names = set(wsdl_tables.keys()) + + logger.info(f"\nWSDL에서 발견된 테이블: {len(wsdl_tables)}개") + for table in sorted(wsdl_tables.keys()): + field_count = len(wsdl_tables[table]) + logger.table_info(f" - {table} ({field_count} fields)") + + logger.info(f"\nDrizzle 스키마의 테이블: {len(schema_tables)}개") + for table in sorted(schema_tables): + logger.table_info(f" - {table}") + + # 테이블명 직접 비교 (대문자로 통일) + schema_tables_upper = {table.upper() for table in schema_tables} + wsdl_tables_upper = {table.upper() for table in wsdl_table_names} + + # 누락된 테이블 찾기 + missing_in_schema = wsdl_tables_upper - schema_tables_upper + extra_in_schema = schema_tables_upper - wsdl_tables_upper + + if missing_in_schema: + logger.warning(f"\n⚠️ 스키마에 누락된 테이블 ({len(missing_in_schema)}개):") + for table in sorted(missing_in_schema): + logger.warning(f" - {table}") + + if extra_in_schema: + logger.success(f"\n✅ 스키마에 추가로 정의된 테이블 ({len(extra_in_schema)}개):") + for table in sorted(extra_in_schema): + logger.success(f" - {table}") + + return missing_in_schema, extra_in_schema + +def generate_missing_tables_schema(wsdl_tables: Dict, missing_tables: Set[str]): + """누락된 테이블들의 Drizzle 스키마 코드 생성""" + if not missing_tables: + return + + logger.separator() + logger.header("누락된 테이블들의 Drizzle 스키마 코드") + logger.separator() + + for missing_table in sorted(missing_tables): + # WSDL 테이블명에서 해당하는 테이블 찾기 (대문자로 직접 매칭) + wsdl_table_key = missing_table.upper() + + if wsdl_table_key in wsdl_tables and wsdl_tables[wsdl_table_key]: + logger.field_info(f"\n// {wsdl_table_key}") + logger.field_info(f"export const {wsdl_table_key} = mdgSchema.table('{wsdl_table_key}', {{") + logger.field_info(" id: integer('id').primaryKey().generatedByDefaultAsIdentity(),") + + for field_key, field_info in wsdl_tables[wsdl_table_key].items(): + # field_key에서 실제 필드명 추출 (|| 구분자 사용) + if '||' in field_key: + actual_field_name = field_key.split('||')[0] + else: + actual_field_name = field_key + + # 필드 타입 매핑 + drizzle_type = map_wsdl_type_to_drizzle(field_info['type'], field_info['size']) + mandatory = ".notNull()" if field_info['mandatory'] == 'M' else "" + # NOTE: WSDL별로 개별 테이블을 만들기로 했으므로 notNull() 제약조건 복구 + + # 주석으로 설명 추가 + comment = f" // {field_info['description']}" if field_info['description'] else "" + wsdl_source = f" // From: {field_info['wsdl_source']}" + mandatory_comment = f" // WSDL에서 필수 필드" if field_info['mandatory'] == 'M' else "" + + logger.field_info(f" {actual_field_name}: {drizzle_type}{mandatory},{comment}{wsdl_source}{mandatory_comment}") + + logger.field_info(" ") + logger.field_info(" createdAt: timestamp('created_at').defaultNow().notNull(),") + logger.field_info(" updatedAt: timestamp('updated_at').defaultNow().notNull(),") + logger.field_info("});") + +def map_wsdl_type_to_drizzle(wsdl_type: str, size: str) -> str: + """WSDL 타입을 Drizzle 타입으로 매핑 (모든 필드를 VARCHAR로 통일, 방어적 사이즈 계산)""" + # 기본 길이 설정 + default_length = 100 + min_length = 10 # 최소 길이 + max_length = 2000 # 최대 길이 (PostgreSQL VARCHAR 권장 최대) + + # LCHR 타입은 text()로 처리 (큰 텍스트) + if 'LCHR' in wsdl_type.upper(): + return "text()" + + # 사이즈 처리 + if size and size.strip(): + try: + size_clean = size.strip() + + # "n,m" 형태 처리 (소수점 있는 숫자 타입) + if ',' in size_clean: + parts = size_clean.split(',') + if len(parts) == 2 and parts[0].isdigit() and parts[1].isdigit(): + total_digits = int(parts[0]) # 전체 자릿수 + decimal_places = int(parts[1]) # 소수점 이하 자릿수 + + # 방어적 계산: 전체 자릿수 + 부호(1) + 소수점(1) + 여유분(3) = +5 + safe_length = total_digits + 5 + logger.debug(f" 📏 소수점 타입 사이즈 계산: {size_clean} -> {safe_length} (원본: {total_digits}, 여유: +5)") + + # 최소/최대 길이 제한 + safe_length = max(min_length, min(safe_length, max_length)) + return f"varchar({{ length: {safe_length} }})" + + # 단순 숫자 처리 + elif size_clean.isdigit(): + original_length = int(size_clean) + # 단순 숫자는 그대로 사용 (여유분 없음) + safe_length = max(min_length, min(original_length, max_length)) + + if safe_length != original_length: + logger.debug(f" 📏 단순 사이즈 조정: {original_length} -> {safe_length} (min/max 제한)") + else: + logger.debug(f" 📏 단순 사이즈 사용: {safe_length}") + + return f"varchar({{ length: {safe_length} }})" + + # "n.m" 형태 처리 (점으로 구분된 경우도 있을 수 있음) + elif '.' in size_clean: + parts = size_clean.split('.') + if len(parts) == 2 and parts[0].isdigit() and parts[1].isdigit(): + total_digits = int(parts[0]) + decimal_places = int(parts[1]) + + # 방어적 계산 + safe_length = total_digits + 5 + logger.debug(f" 📏 소수점 타입 사이즈 계산 (점 구분): {size_clean} -> {safe_length} (원본: {total_digits}, 여유: +5)") + + safe_length = max(min_length, min(safe_length, max_length)) + return f"varchar({{ length: {safe_length} }})" + + # 기타 형태는 기본값 사용 + else: + logger.warning(f" ⚠️ 알 수 없는 사이즈 형태: '{size_clean}' -> 기본값 {default_length} 사용") + return f"varchar({{ length: {default_length} }})" + + except Exception as e: + logger.error(f" ❌ 사이즈 파싱 오류: '{size}' -> 기본값 {default_length} 사용, 오류: {e}") + return f"varchar({{ length: {default_length} }})" + + # 사이즈가 없거나 비어있는 경우 기본값 + return f"varchar({{ length: {default_length} }})" + +def validate_schema(wsdl_tables: Dict, schema_tables: Set[str]) -> Dict[str, List[str]]: + """스키마 검증""" + validation_results = { + 'missing_tables': [], + 'missing_fields': [], + 'type_mismatches': [], + 'duplicate_fields': [] + } + + for table_name, fields in wsdl_tables.items(): + # 테이블 존재 여부 검증 + if table_name not in schema_tables: + validation_results['missing_tables'].append(table_name) + continue + + # 필드 검증 + field_names = set() + for field_key, field_info in fields.items(): + # field_key에서 실제 필드명 추출 (|| 구분자 사용) + actual_field_name = field_key.split('||')[0] if '||' in field_key else field_key + + # 중복 필드 검사 + if actual_field_name in field_names: + validation_results['duplicate_fields'].append(f"{table_name}.{actual_field_name}") + field_names.add(actual_field_name) + + # 누락된 필드 검증 (WSDL의 모든 필드가 스키마에 있는지 확인) + if actual_field_name not in existing_fields: + validation_results['missing_fields'].append(f"{table_name}.{actual_field_name}") + + # 타입 호환성 검증 + # ? 기존 스키마의 필드 타입과 비교 + # ! VARCHAR로 처리하기로 했으니 타입 호환성 검사는 필요 없음 + + return validation_results + +def analyze_existing_schema(schema_file: str) -> Dict[str, Dict[str, str]]: + """기존 스키마 파일 분석""" + existing_schema = {} + try: + with open(schema_file, 'r', encoding='utf-8') as f: + content = f.read() + + # 테이블 정의 찾기 (변경된 패턴) + table_pattern = r'export const (\w+) = mdgSchema\.table\([\'"](\w+)[\'"]' + tables = re.findall(table_pattern, content) + + for table_const, table_name in tables: + # 테이블의 필드 정의 찾기 + field_pattern = rf'{table_const} = mdgSchema\.table\([\'"]{table_name}[\'"].*?{{(.*?)}}' + table_match = re.search(field_pattern, content, re.DOTALL) + + if table_match: + fields = {} + field_defs = table_match.group(1) + + # 각 필드 정의 파싱 (변경된 패턴) + field_pattern = r'(\w+):\s*(\w+)\([\'"](\w+)[\'"]' + field_matches = re.findall(field_pattern, field_defs) + + for field_name, field_type, field_db_name in field_matches: + fields[field_name] = { + 'type': field_type, + 'db_name': field_db_name + } + + existing_schema[table_name] = fields + + except Exception as e: + logger.error(f"스키마 파일 분석 중 오류 발생: {e}") + + return existing_schema + +def compare_field_types(wsdl_type: str, existing_type: str) -> bool: + """필드 타입 호환성 검사""" + type_mapping = { + 'varchar': ['CHAR', 'VARC', 'LCHR'], + 'integer': ['NUMB', 'NUMC'], + 'decimal': ['CURR'], + 'date': ['DATS'], + 'time': ['TIMS'], + 'text': ['LCHR'] + } + + wsdl_type = wsdl_type.upper() + existing_type = existing_type.lower() + + # 타입 매핑 확인 + for drizzle_type, wsdl_types in type_mapping.items(): + if existing_type == drizzle_type: + return any(t in wsdl_type for t in wsdl_types) + + return False + +def validate_schema(wsdl_tables: Dict, schema_tables: Set[str], existing_schema: Dict[str, Dict[str, str]]) -> Dict[str, List[str]]: + """스키마 검증 (개선된 버전)""" + validation_results = { + 'missing_tables': [], + 'missing_fields': [], + 'type_mismatches': [], + 'duplicate_fields': [] + } + + for table_name, fields in wsdl_tables.items(): + # 테이블 존재 여부 검증 + if table_name not in schema_tables: + validation_results['missing_tables'].append(table_name) + continue + + # 기존 테이블의 필드 정보 가져오기 + existing_fields = existing_schema.get(table_name, {}) + + # 필드 검증 + field_names = set() + for field_key, field_info in fields.items(): + # field_key에서 실제 필드명 추출 (|| 구분자 사용) + actual_field_name = field_key.split('||')[0] if '||' in field_key else field_key + + # 중복 필드 검사 + if actual_field_name in field_names: + validation_results['duplicate_fields'].append(f"{table_name}.{actual_field_name}") + field_names.add(actual_field_name) + + # 누락된 필드 검증 (WSDL의 모든 필드가 스키마에 있는지 확인) + # Note: existing_fields는 기존 validate_schema에서는 정의되지 않았으므로 스킵 + + # 타입 호환성 검증 + if actual_field_name in existing_fields: + existing_type = existing_fields[actual_field_name]['type'] + if not compare_field_types(field_info['type'], existing_type): + validation_results['type_mismatches'].append( + f"{table_name}.{actual_field_name}: WSDL={field_info['type']}, Existing={existing_type}" + ) + + return validation_results + +def generate_schema_code(wsdl_tables: Dict, validation_results: Dict[str, List[str]], existing_schema: Dict[str, Dict[str, str]]) -> str: + """스키마 코드 생성 (개선된 버전)""" + schema_code = [] + + # 누락된 테이블 생성 + for table_name in validation_results['missing_tables']: + table_code = generate_table_code(wsdl_tables[table_name], table_name) + schema_code.append(table_code) + + # 누락된 필드 추가 + for field_info in validation_results['missing_fields']: + table_name, field_name = field_info.split('.') + if table_name in existing_schema: + field_code = generate_field_code(wsdl_tables[table_name][field_name]) + # 기존 테이블에 필드 추가하는 코드 생성 + table_code = f"// {table_name}에 추가할 필드:\n{field_code}" + schema_code.append(table_code) + + return '\n\n'.join(schema_code) + +def generate_table_code(fields: Dict, table_name: str) -> str: + """테이블 코드 생성""" + code = [ + f"export const {table_name} = mdgSchema.table('{table_name}', {{", + " id: integer('id').primaryKey().generatedByDefaultAsIdentity()," + ] + + # fields에서 실제 필드명과 필드 정보 가져오기 + # _merge_table_fields에서 이미 actual_field_name을 키로 사용하므로 그대로 사용 + for actual_field_name, field_info in sorted(fields.items()): + # 필드 코드 생성 (actual_field_name 사용) + field_code = generate_field_code(field_info) + code.append(f" {field_code}") + + code.extend([ + " ", + " createdAt: timestamp('created_at').defaultNow().notNull(),", + " updatedAt: timestamp('updated_at').defaultNow().notNull(),", + "});" + ]) + + return '\n'.join(code) + +def generate_field_code(field_info: Dict) -> str: + """필드 코드 생성""" + drizzle_type = map_wsdl_type_to_drizzle(field_info['type'], field_info['size']) + mandatory = ".notNull()" if field_info['mandatory'] == 'M' else "" + # NOTE: WSDL별로 개별 테이블을 만들기로 했으므로 notNull() 제약조건 복구 + + comment = f" // {field_info['description']}" if field_info['description'] else "" + + # 여러 WSDL 소스 정보 추가 + if 'wsdl_sources' in field_info and len(field_info['wsdl_sources']) > 1: + sources_comment = f" // From: {', '.join(sorted(field_info['wsdl_sources']))}" + else: + wsdl_source = field_info.get('wsdl_source', list(field_info.get('wsdl_sources', ['Unknown']))[0]) + sources_comment = f" // From: {wsdl_source}" + + # 필수 필드 정보는 주석으로만 표시 + mandatory_comment = f" // WSDL에서 필수 필드" if field_info['mandatory'] == 'M' else "" + + # 필드명 매핑이 적용된 경우 원본 필드명 표시 + mapping_comment = "" + if 'original_field_name' in field_info and field_info['original_field_name'] != field_info['field_name']: + mapping_comment = f" // Original: {field_info['original_field_name']}" + + return f"{field_info['field_name']}: {drizzle_type}{mandatory},{comment}{sources_comment}{mandatory_comment}{mapping_comment}" + +def generate_complete_schema(wsdl_tables: Dict) -> str: + """완전한 스키마 코드 생성""" + schema_code = [ + "import { integer, varchar, text, timestamp } from 'drizzle-orm/pg-core';", + "import { mdgSchema } from '../../../db/schema/MDG/mdg';", + "", + "// WSDL 기반 자동 생성된 스키마", + "// 생성일시: " + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " (UTC로 9시간 빼야 한국 시간)", + "// 개선사항:", + "// 1. WSDL별로 테이블 만들었음. 인터페이스 정의서에 문제가 많아서 어쩔 수 없었음.", + "// 2. 타입은 varchar를 사용하도록 했음. 숫자관련된 건 부호, 소수점 대비 방어적으로 처리함 (사이즈)", + "// 3. 테이블명에서 '/' 문자를 '_'로 변경하여 PostgreSQL/TypeScript 호환성 확보함", + "", + ] + + # 테이블 코드 생성 + for table_name, fields in sorted(wsdl_tables.items()): + table_code = generate_table_code(fields, table_name) + schema_code.append(table_code) + schema_code.append("") # 빈 줄 추가 + + return '\n'.join(schema_code) + +def main(): + # 현재 스크립트 위치에서 프로젝트 루트 찾기 + script_dir = Path(__file__).parent + project_root = script_dir.parent.parent.parent # public/wsdl/_util -> project_root + + wsdl_dir = script_dir.parent # public/wsdl + schema_file = project_root / "db" / "schema" / "MDG" / "mdg.ts" + + logger.header("MDZ WSDL 분석 시작...") + logger.info(f"WSDL 디렉토리: {wsdl_dir}") + logger.info(f"스키마 파일: {schema_file}") + logger.info("") + + # WSDL 분석 + analyzer = WSDLAnalyzer(wsdl_dir) + wsdl_tables, table_hierarchy = analyzer.analyze_all_mdz_wsdls() + + # 현재 스키마 분석 + schema_tables = analyze_current_drizzle_schema(schema_file) + + # 기존 스키마 분석 + existing_schema = analyze_existing_schema(schema_file) + + # 비교 결과 출력 + missing_tables, extra_tables = compare_wsdl_vs_schema(wsdl_tables, schema_tables) + + # 누락된 테이블 스키마 생성 + generate_missing_tables_schema(wsdl_tables, missing_tables) + + # 스키마 검증 + validation_results = validate_schema(wsdl_tables, schema_tables, existing_schema) + + # 검증 결과 출력 + logger.separator() + logger.header("스키마 검증 결과") + logger.separator() + + for category, items in validation_results.items(): + if items: + logger.warning(f"\n{category}:") + for item in items: + logger.warning(f" - {item}") + + # 완전한 스키마 코드 생성 + complete_schema = generate_complete_schema(wsdl_tables) + + # 스키마 파일 저장 + output_file = script_dir / "generated_schema.ts" + with open(output_file, 'w', encoding='utf-8') as f: + f.write(complete_schema) + + logger.success(f"\n생성된 스키마가 {output_file}에 저장되었습니다.") + + # 상세 필드 정보 출력 (옵션) + if len(sys.argv) > 1 and sys.argv[1] == "--detailed": + logger.separator() + logger.header("상세 필드 정보") + logger.separator() + + for table_name, fields in wsdl_tables.items(): + logger.table_info(f"\n### {table_name}") + for field_name, field_info in fields.items(): + logger.field_info(f" {field_name}: {field_info['type']}({field_info['size']}) - {field_info['description']}") + + logger.success("\n분석 완료!") + logger.info(f"- 총 WSDL 테이블: {len(wsdl_tables)}개") + logger.info(f"- 현재 스키마 테이블: {len(schema_tables)}개") + logger.warning(f"- 누락 테이블: {len(missing_tables)}개") if missing_tables else logger.success(f"- 누락 테이블: {len(missing_tables)}개") + logger.info(f"- 추가 테이블: {len(extra_tables)}개") + +if __name__ == "__main__": + main()
\ No newline at end of file |
