#!/usr/bin/env python3 import csv import re import shutil import os import argparse from datetime import datetime # 컬러 로그를 위한 색상 코드 추가 class Colors: RED = '\033[91m' GREEN = '\033[92m' YELLOW = '\033[93m' BLUE = '\033[94m' MAGENTA = '\033[95m' CYAN = '\033[96m' WHITE = '\033[97m' ENDC = '\033[0m' # End color BOLD = '\033[1m' def print_color(message, color=Colors.WHITE): """컬러 출력 함수""" print(f"{color}{message}{Colors.ENDC}") def print_error(message): """에러 메시지 출력""" print_color(f"❌ ERROR: {message}", Colors.RED) def print_warning(message): """경고 메시지 출력""" print_color(f"⚠️ WARNING: {message}", Colors.YELLOW) def print_success(message): """성공 메시지 출력""" print_color(f"✅ SUCCESS: {message}", Colors.GREEN) def print_info(message): """정보 메시지 출력""" print_color(f"ℹ️ INFO: {message}", Colors.CYAN) """ 실제 CSV 파일들 IF_MDZ_EVCP_CUSTOMER_MASTER.csv IF_MDZ_EVCP_EMPLOYEE_REFERENCE.csv IF_MDZ_EVCP_MATERIAL_MASTER_PART_RETURN.csv IF_MDZ_EVCP_PROJECT_MASTER.csv IF_MDZ_EVCP_DEPARTMENT_CODE.csv IF_MDZ_EVCP_EQUP_MASTER.csv IF_MDZ_EVCP_MODEL_MASTER.csv IF_MDZ_EVCP_VENDOR_MASTER.csv IF_MDZ_EVCP_EMPLOYEE_MASTER.csv IF_MDZ_EVCP_MATERIAL_MASTER_PART.csv IF_MDZ_EVCP_ORGANIZATION_MASTER.csv """ # ===== 설정 ===== CSV_DIR = './public/wsdl/_csv' WSDL_DIR = './public/wsdl' # 발견된 SAP 타입들을 수집하기 위한 전역 SET discovered_sap_types = set() type_size_combinations = set() # 타입-사이즈 조합도 수집 # 필드명 매핑 테이블 (CSV -> WSDL) FIELD_MAPPING = { # 개별 WSDL 별 테이블 만들기로 했으므로 사용하지 않고 WSDL 그대로 사용 # 'ADR_NO': 'ADDRNO', # 필요한 경우 더 추가 } # 테이블 매핑 테이블 (complexType -> CSV Table) TABLE_MAPPING = { # 'MATL': 'MATL', # 'UNIT': 'MATL/UNIT', # 필요한 경우 더 추가 } def normalize_sap_type_and_size(sap_type, size_str): """SAP 타입과 사이즈를 정규화""" global discovered_sap_types, type_size_combinations try: # 타입을 대문자로 변환 normalized_type = sap_type.upper().strip() if sap_type else 'CHAR' # 사이즈 처리 normalized_size = size_str.strip() if size_str else '' original_size = normalized_size # 원본 사이즈 보존 (로깅용) # 빈 사이즈인 경우 기본값 설정 if not normalized_size: normalized_size = '255' else: # 따옴표로 감싸진 경우 제거 (예: "1,0") quote_removed = False if normalized_size.startswith('"') and normalized_size.endswith('"'): before_quote_removal = normalized_size normalized_size = normalized_size[1:-1] quote_removed = True print_color(f"🔍 SIZE 파싱: 따옴표 제거 - '{before_quote_removal}' -> '{normalized_size}' (Type: {normalized_type})", Colors.YELLOW) # 로깅: 최종 결과 (따옴표가 없는 경우만) if not quote_removed and original_size: print_color(f"🔍 SIZE 파싱: 따옴표 없음 - '{original_size}' 그대로 사용 (Type: {normalized_type})", Colors.BLUE) # 발견된 타입들을 SET에 추가 discovered_sap_types.add(normalized_type) type_size_combinations.add(f"{normalized_type}({normalized_size})") # 컬럼 구분자나 특수문자가 있는 경우 그대로 유지 # DEC, QUAN, NUMB 등에서 "1,0" 형태의 사이즈는 정상 return normalized_type, normalized_size except Exception as e: print_error(f"타입/사이즈 정규화 실패 - Type: {sap_type}, Size: {size_str}, Error: {str(e)}") return 'CHAR', '255' # 기본값 반환 def safe_description_escape(description): """Description 필드의 특수문자를 안전하게 처리""" try: if not description: return '' # HTML/XML 특수문자 이스케이프 description = description.replace('&', '&') description = description.replace('<', '<') description = description.replace('>', '>') description = description.replace('"', '"') description = description.replace("'", ''') return description except Exception as e: print_error(f"Description 이스케이프 실패: {description}, Error: {str(e)}") return str(description) if description else '' def get_csv_files(): """CSV 디렉토리에서 모든 CSV 파일 목록을 가져옴""" csv_files = [] for file in os.listdir(CSV_DIR): if file.endswith('.csv'): csv_files.append(file.replace('.csv', '')) return csv_files def get_complex_type_info(wsdl_content): """WSDL 파일에서 complexType 정보를 추출""" complex_types = {} current_type = None current_fields = [] type_stack = [] # 중첩된 complexType을 추적하기 위한 스택 for line in wsdl_content: # complexType 시작 태그 찾기 (xsd: 또는 xs: 네임스페이스 지원) type_match = re.search(r'<(?:xsd:|xs:)complexType\s+name="([^"]+)"', line) if type_match: if current_type: type_stack.append(current_type) current_type = type_match.group(1) current_fields = [] continue # complexType 종료 태그 찾기 (xsd: 또는 xs: 네임스페이스 지원) if re.search(r'', line): if current_type: complex_types[current_type] = current_fields if type_stack: current_type = type_stack.pop() else: current_type = None continue # element 태그 찾기 (xsd: 또는 xs: 네임스페이스 지원) element_match = re.search(r'<(?:xsd:|xs:)element\s+name="([^"]+)"', line) if element_match and current_type: type_match = re.search(r'type="([^"]+)"', line) if 'type="' in line else None field_info = { 'name': element_match.group(1), 'type': type_match.group(1) if type_match else None, 'is_array': 'maxOccurs="unbounded"' in line } current_fields.append(field_info) return complex_types def load_csv_data(csv_file): """CSV 파일에서 필드 정보를 딕셔너리로 로드""" csv_data = {} csv_path = os.path.join(CSV_DIR, f'{csv_file}.csv') try: with open(csv_path, 'r', encoding='utf-8-sig') as f: # BOM 처리 reader = csv.DictReader(f) for row_num, row in enumerate(reader, start=2): # 헤더 다음부터 2행 try: field_name = row['Field'] table_name = row['Table'] # 매핑된 필드명이 있으면 사용, 없으면 원래 필드명 사용 wsdl_field_name = FIELD_MAPPING.get(field_name, field_name) # 테이블 정보를 키에 포함 (구분자를 || 로 변경) key = f"{wsdl_field_name}||{table_name}" # 타입과 사이즈 정규화 normalized_type, normalized_size = normalize_sap_type_and_size( row.get('Type', ''), row.get('Size', '') ) # Description 안전 처리 safe_desc = safe_description_escape(row.get('Description', '')) csv_data[key] = { 'seq': row.get('SEQ', ''), 'table': table_name, 'field': row.get('Field', ''), # 원래 CSV 필드명 저장 'mo': row.get('M/O', ''), 'type': normalized_type, # 정규화된 타입 'size': normalized_size, # 정규화된 사이즈 'description': safe_desc, # 안전 처리된 Description 'original_type': row.get('Type', ''), # 원본 타입 보존 'original_size': row.get('Size', '') # 원본 사이즈 보존 } except Exception as e: print_error(f"CSV 행 {row_num} 처리 실패 - {csv_file}: {str(e)}") print_error(f"문제 행 데이터: {row}") continue except Exception as e: print_error(f"CSV 파일 로딩 실패 - {csv_path}: {str(e)}") return {} return csv_data def load_csv_data_from_path(csv_file_path): """CSV 파일 경로에서 필드 정보를 딕셔너리로 로드""" csv_data = {} csv_filename = os.path.basename(csv_file_path) try: with open(csv_file_path, 'r', encoding='utf-8-sig') as f: # BOM 처리 reader = csv.DictReader(f) for row_num, row in enumerate(reader, start=2): # 헤더 다음부터 2행 try: field_name = row['Field'] table_name = row['Table'] # 매핑된 필드명이 있으면 사용, 없으면 원래 필드명 사용 wsdl_field_name = FIELD_MAPPING.get(field_name, field_name) # 테이블 정보를 키에 포함 (구분자를 || 로 변경) key = f"{wsdl_field_name}||{table_name}" # 타입과 사이즈 정규화 normalized_type, normalized_size = normalize_sap_type_and_size( row.get('Type', ''), row.get('Size', '') ) # Description 안전 처리 safe_desc = safe_description_escape(row.get('Description', '')) csv_data[key] = { 'seq': row.get('SEQ', ''), 'table': table_name, 'field': row.get('Field', ''), # 원래 CSV 필드명 저장 'mo': row.get('M/O', ''), 'type': normalized_type, # 정규화된 타입 'size': normalized_size, # 정규화된 사이즈 'description': safe_desc, # 안전 처리된 Description 'original_type': row.get('Type', ''), # 원본 타입 보존 'original_size': row.get('Size', '') # 원본 사이즈 보존 } except Exception as e: print_error(f"CSV 행 {row_num} 처리 실패 - {csv_filename}: {str(e)}") print_error(f"문제 행 데이터: {row}") continue except Exception as e: print_error(f"CSV 파일 로딩 실패 - {csv_file_path}: {str(e)}") return {} return csv_data def extract_field_name_from_line(line): """라인에서 name="필드명" 추출""" match = re.search(r'name="([^"]+)"', line) return match.group(1) if match else None def has_seq_in_comment(comment_line): """주석에 SEQ가 있는지 확인""" return 'SEQ:' in comment_line def get_indentation(line): """라인의 들여쓰기 반환""" return len(line) - len(line.lstrip()) def create_comment(field_name, csv_data, indentation, complex_type): """CSV 데이터를 기반으로 주석 생성""" try: # 필드명으로 시작하는 키들을 찾음 (대소문자 구분 없이) matching_keys = [key for key in csv_data.keys() if key.split('||')[0].upper() == field_name.upper()] if not matching_keys: indent = ' ' * indentation print_warning(f"매칭되지 않은 필드: {field_name}") return f"{indent}" # complexType과 일치하는 테이블 정보 찾기 matching_data = None # 1. complexType 이름과 완전히 일치하는 테이블 찾기 for key in matching_keys: table_name = key.split('||', 1)[1] if complex_type.upper() == table_name.upper(): matching_data = csv_data[key] break # 2. CSV 테이블명을 '/'로 스플릿한 마지막 부분이 complexType과 일치하는 경우 if not matching_data: for key in matching_keys: table_name = key.split('||', 1)[1] if '/' in table_name: last_part = table_name.split('/')[-1] if complex_type.upper() == last_part.upper(): matching_data = csv_data[key] break # 3. 필드명만 일치하는 경우 (첫 번째 매칭 데이터 사용) if not matching_data: matching_data = csv_data[matching_keys[0]] # 4. 매칭된 데이터가 있으면 주석 생성, 없으면 매칭 실패 주석 if matching_data: indent = ' ' * indentation # CSV의 실제 타입과 사이즈 사용 comment = f"{indent}" print_info(f"주석 생성 완료: {field_name} -> Type:{matching_data['type']}, Size:{matching_data['size']}") return comment else: indent = ' ' * indentation print_warning(f"매칭 데이터를 찾을 수 없음: {field_name}") return f"{indent}" except Exception as e: indent = ' ' * indentation print_error(f"주석 생성 실패 - 필드: {field_name}, 에러: {str(e)}") return f"{indent}" def normalize_comment(comment_line): """주석을 정규화 (공백 제거, 소문자 변환 등)""" # 제거하고 내용만 추출 content = re.sub(r'^\s*\s*$', '', comment_line.strip()) # 여러 공백을 하나로 통합 content = re.sub(r'\s+', ' ', content) return content.strip() def comments_are_equal(existing_comment, expected_comment): """두 주석이 같은 내용인지 비교""" existing_normalized = normalize_comment(existing_comment) expected_normalized = normalize_comment(expected_comment) return existing_normalized == expected_normalized def should_process_line(line, csv_data): """라인이 처리 대상인지 확인""" # 네 조건을 모두 만족해야 함: # 1. ', line): if type_stack: current_complex_type = type_stack.pop() print_color(f"이전 complexType으로 복귀: {current_complex_type}", Colors.BLUE) else: current_complex_type = None # CSV에 있는 xsd:element 또는 xs:element 필드인지 확인 if should_process_line(line, csv_data): field_name = extract_field_name_from_line(line) if field_name and current_complex_type: processed_fields.append(field_name) print_color(f"처리 중인 필드: {field_name} (complexType: {current_complex_type})", Colors.CYAN) # 바로 위 라인이 주석인지 확인 (공백 라인 건너뛰면서) comment_line_index = -1 j = len(new_lines) - 1 while j >= 0: prev_line = new_lines[j].strip() if prev_line == '': j -= 1 continue elif prev_line.startswith(''): comment_line_index = j break else: break if comment_line_index >= 0: existing_comment = new_lines[comment_line_index] if has_seq_in_comment(existing_comment): indentation = get_indentation(line) expected_comment = create_comment(field_name, csv_data, indentation, current_complex_type) if expected_comment: if comments_are_equal(existing_comment, expected_comment): verified_correct += 1 print_success(f" 주석 검증 통과") else: new_lines[comment_line_index] = expected_comment + '\n' changes_made += 1 corrected_seq += 1 print_warning(f" SEQ 주석 수정: {field_name}") print(f" 기존: {existing_comment.strip()}") print(f" 수정: {expected_comment}") else: indentation = get_indentation(line) new_comment = create_comment(field_name, csv_data, indentation, current_complex_type) if new_comment: new_lines[comment_line_index] = new_comment + '\n' changes_made += 1 print_warning(f" 주석 교체: {field_name}") else: indentation = get_indentation(line) new_comment = create_comment(field_name, csv_data, indentation, current_complex_type) if new_comment: new_lines.append(new_comment + '\n') changes_made += 1 print_info(f" 주석 추가: {field_name}") line_processed = True elif re.search(r'<(?:xsd:|xs:)element\s+', line) and 'name="' in line: field_name = extract_field_name_from_line(line) if field_name: skip_reason = get_skip_reason(line, csv_data) if skip_reason == "ARRAY_TYPE": skipped_array_fields.append(field_name) skipped_fields.append(field_name) print_color(f"건너뛴 필드: {field_name} (배열 타입 - maxOccurs 속성)", Colors.YELLOW) elif skip_reason == "REQ_WRAPPER_TYPE": skipped_req_wrapper_fields.append(field_name) skipped_fields.append(field_name) print_color(f"건너뛴 필드: {field_name} (요청 래퍼 타입 - 정상)", Colors.BLUE) elif skip_reason == "NO_CSV_DATA": skipped_no_csv_fields.append(field_name) skipped_fields.append(field_name) print_error(f"건너뛴 필드: {field_name} (CSV에 데이터 없음 - 확인 필요!)") else: # 기타 이유로 건너뛴 경우 skipped_fields.append(field_name) print_warning(f"건너뛴 필드: {field_name} (기타 이유)") except Exception as e: print_error(f"라인 처리 중 오류 발생 (라인 {i+1}): {str(e)}") print_error(f"문제 라인: {line.strip()}") error_count += 1 new_lines.append(line) i += 1 # 결과 저장 try: with open(wsdl_file_path, 'w', encoding='utf-8') as f: f.writelines(new_lines) print_success("WSDL 파일 저장 완료") except Exception as e: print_error(f"WSDL 파일 저장 실패: {str(e)}") return # 결과 출력 print_color(f"\n{'='*50}", Colors.BOLD) print_color(f"처리 완료: {csv_filename} → {wsdl_filename}", Colors.BOLD) print_color(f"{'='*50}", Colors.BOLD) print_info(f"CSV 파일: {csv_file_path}") print_info(f"WSDL 파일: {wsdl_file_path}") print_info(f"백업 파일: {backup_path}") print_color(f"\n📊 처리 통계:", Colors.MAGENTA) print(f" 총 변경사항: {changes_made}개") print(f" 처리된 CSV 필드 수: {len(processed_fields)}") print(f" 건너뛴 필드 총계: {len(skipped_fields)}") print(f" ├─ 배열 타입 (정상): {len(skipped_array_fields)}개") print(f" ├─ 요청 래퍼 타입 (정상): {len(skipped_req_wrapper_fields)}개") print_color(f" └─ CSV 누락 (문제): {len(skipped_no_csv_fields)}개", Colors.RED if len(skipped_no_csv_fields) > 0 else Colors.WHITE) print(f" 검증 통과한 SEQ 주석: {verified_correct}개") print(f" 수정된 SEQ 주석: {corrected_seq}개") print(f" 오류 발생 횟수: {error_count}개") # CSV 누락 필드 상세 표시 if len(skipped_no_csv_fields) > 0: print_error(f"\n⚠️ CSV에 누락된 필드 목록 (확인 필요):") for field in skipped_no_csv_fields: print_error(f" - {field}") # 최종 결과 if error_count > 0: print_error(f"\n⚠️ {error_count}개의 오류가 발생했습니다. 로그를 확인해주세요.") if len(skipped_no_csv_fields) > 0: print_error(f"\n🚨 주의: {len(skipped_no_csv_fields)}개의 필드가 CSV에 누락되어 있습니다!") print_error("이 필드들은 WSDL에 정의되어 있지만 CSV 스펙에 없어 주석이 생성되지 않았습니다.") if changes_made == 0: print_success(f"\n모든 주석이 정확합니다! (검증된 SEQ 주석: {verified_correct}개)") else: print_success(f"\n{changes_made}개의 주석이 수정되었습니다.") if corrected_seq > 0: print(f" - 기존 SEQ 주석 수정: {corrected_seq}개") if changes_made - corrected_seq > 0: print(f" - 새로 추가/교체된 주석: {changes_made - corrected_seq}개") def process_wsdl_file(target): """WSDL 파일 처리 (기존 방식, 일괄 처리용)""" csv_file_path = os.path.join(CSV_DIR, f'{target}.csv') wsdl_file_path = os.path.join(WSDL_DIR, f'{target}.wsdl') try: # 백업 생성 backup_path = backup_file(wsdl_file_path) print_color(f"\n🚀 처리 시작: {target}", Colors.BOLD) print_info("CSV 데이터 로딩 중...") csv_data = load_csv_data(target) print_success(f"CSV에서 {len(csv_data)}개 필드 정보 로드됨") # WSDL 파일 읽기 with open(wsdl_file_path, 'r', encoding='utf-8') as f: lines = f.readlines() # complexType 정보 추출 complex_types = get_complex_type_info(lines) print_success(f"WSDL에서 {len(complex_types)}개 complexType 정보 추출됨") except Exception as e: print_error(f"파일 초기화 실패 - {target}: {str(e)}") return def parse_arguments(): """커맨드라인 아규먼트 파싱""" parser = argparse.ArgumentParser( description="WSDL 파일에 CSV 정보를 기반으로 주석을 추가하는 도구", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 사용 예시: python3 update_wsdl_with_csv.py --csv ./public/wsdl/IF_ECC_EVCP_PR_INFORMATION.csv --wsdl ./public/wsdl/IF_ECC_EVCP_PR_INFORMATION.wsdl python3 update_wsdl_with_csv.py --batch # 기존 일괄 처리 모드 """ ) group = parser.add_mutually_exclusive_group(required=True) group.add_argument( '--csv', type=str, help='처리할 CSV 파일 경로' ) group.add_argument( '--batch', action='store_true', help='일괄 처리 모드 (기존 방식)' ) parser.add_argument( '--wsdl', type=str, help='처리할 WSDL 파일 경로 (--csv와 함께 사용)' ) args = parser.parse_args() # --csv 옵션 사용 시 --wsdl도 필수 if args.csv and not args.wsdl: parser.error("--csv 옵션을 사용할 때는 --wsdl 옵션도 필요합니다.") # 파일 존재 여부 확인 if args.csv: if not os.path.exists(args.csv): parser.error(f"CSV 파일을 찾을 수 없습니다: {args.csv}") if not args.csv.endswith('.csv'): parser.error(f"CSV 파일이 아닙니다: {args.csv}") if args.wsdl: if not os.path.exists(args.wsdl): parser.error(f"WSDL 파일을 찾을 수 없습니다: {args.wsdl}") if not args.wsdl.endswith('.wsdl'): parser.error(f"WSDL 파일이 아닙니다: {args.wsdl}") return args if __name__ == "__main__": try: args = parse_arguments() if args.batch: csv_files = get_csv_files() print_color(f"\n🎯 발견된 CSV 파일: {len(csv_files)}개", Colors.BOLD) print_info(f"처리할 파일 목록: {csv_files}") total_files = len(csv_files) success_count = 0 error_count = 0 for i, target in enumerate(csv_files, 1): print_color(f"\n{'='*60}", Colors.BOLD) print_color(f"진행률: {i}/{total_files} - {target}", Colors.BOLD) print_color(f"{'='*60}", Colors.BOLD) try: process_wsdl_file(target) success_count += 1 except Exception as e: print_error(f"파일 처리 실패 - {target}: {str(e)}") error_count += 1 # 최종 통계 print_color(f"\n{'='*60}", Colors.BOLD) print_color("🏁 전체 처리 완료", Colors.BOLD) print_color(f"{'='*60}", Colors.BOLD) print_success(f"성공: {success_count}개 파일") if error_count > 0: print_error(f"실패: {error_count}개 파일") else: print_success("모든 파일이 성공적으로 처리되었습니다!") # 발견된 SAP 타입들 출력 (PostgreSQL 매핑용) print_color(f"\n{'='*60}", Colors.BOLD) print_color("📊 발견된 SAP 타입 통계 (PostgreSQL 매핑용)", Colors.MAGENTA) print_color(f"{'='*60}", Colors.BOLD) print_color(f"\n🔤 고유 SAP 타입 ({len(discovered_sap_types)}개):", Colors.CYAN) for sap_type in sorted(discovered_sap_types): print(f" - {sap_type}") print_color(f"\n📏 타입-사이즈 조합 ({len(type_size_combinations)}개):", Colors.YELLOW) for combination in sorted(type_size_combinations): print(f" - {combination}") print_color(f"\n💡 PostgreSQL 타입 매핑 가이드 (XML 파싱/조회용):", Colors.GREEN) print(" 🎯 실용적 접근법:") print(" - 대부분 → VARCHAR(500) 또는 TEXT (XML에서 모든 데이터가 문자열로 전송)") print(" - 숫자 검색/정렬이 필요한 경우만 → NUMERIC") print(" - 날짜 검색/정렬이 필요한 경우만 → DATE/TIMESTAMP") print("") print(" 📋 SAP 타입별 상세:") print(" - CHAR, VARC, LCHR → VARCHAR(해당사이즈) 또는 TEXT") print(" - DATS (날짜) → VARCHAR(8) 또는 DATE (YYYYMMDD 형식)") print(" - TIMS (시간) → VARCHAR(6) 또는 TIME (HHMMSS 형식)") print(" - CURR, DEC, QUAN, NUMB, NUMC, FLTP → VARCHAR 또는 NUMERIC") print(" - CUKY (통화), UNIT (단위), LANG (언어) → VARCHAR(10)") print("") print(" ⚡ 권장: 초기에는 모두 VARCHAR/TEXT로 시작하고 필요시 변환") else: # --csv 옵션 사용 시 print_color(f"\n🎯 개별 파일 처리 모드", Colors.BOLD) print_info(f"CSV 파일: {args.csv}") print_info(f"WSDL 파일: {args.wsdl}") try: process_files(args.csv, args.wsdl) print_success("개별 파일 처리 완료!") except Exception as e: print_error(f"개별 파일 처리 실패: {str(e)}") exit(1) except Exception as e: print_error(f"스크립트 실행 중 치명적 오류 발생: {str(e)}") exit(1)