summaryrefslogtreecommitdiff
path: root/public/wsdl/_util/update_wsdl_with_csv.py
diff options
context:
space:
mode:
Diffstat (limited to 'public/wsdl/_util/update_wsdl_with_csv.py')
-rw-r--r--public/wsdl/_util/update_wsdl_with_csv.py342
1 files changed, 223 insertions, 119 deletions
diff --git a/public/wsdl/_util/update_wsdl_with_csv.py b/public/wsdl/_util/update_wsdl_with_csv.py
index 91a9d4dc..b914d411 100644
--- a/public/wsdl/_util/update_wsdl_with_csv.py
+++ b/public/wsdl/_util/update_wsdl_with_csv.py
@@ -3,6 +3,7 @@ import csv
import re
import shutil
import os
+import argparse
from datetime import datetime
# 컬러 로그를 위한 색상 코드 추가
@@ -142,8 +143,8 @@ def get_complex_type_info(wsdl_content):
type_stack = [] # 중첩된 complexType을 추적하기 위한 스택
for line in wsdl_content:
- # complexType 시작 태그 찾기
- type_match = re.search(r'<xsd:complexType\s+name="([^"]+)"', line)
+ # complexType 시작 태그 찾기 (xsd: 또는 xs: 네임스페이스 지원)
+ type_match = re.search(r'<(?:xsd:|xs:)complexType\s+name="([^"]+)"', line)
if type_match:
if current_type:
type_stack.append(current_type)
@@ -151,8 +152,8 @@ def get_complex_type_info(wsdl_content):
current_fields = []
continue
- # complexType 종료 태그 찾기
- if '</xsd:complexType>' in line:
+ # complexType 종료 태그 찾기 (xsd: 또는 xs: 네임스페이스 지원)
+ if re.search(r'</(?:xsd:|xs:)complexType>', line):
if current_type:
complex_types[current_type] = current_fields
if type_stack:
@@ -161,25 +162,19 @@ def get_complex_type_info(wsdl_content):
current_type = None
continue
- # element 태그 찾기
- element_match = re.search(r'<xsd:element\s+name="([^"]+)"', line)
+ # element 태그 찾기 (xsd: 또는 xs: 네임스페이스 지원)
+ element_match = re.search(r'<(?:xsd:|xs:)element\s+name="([^"]+)"', line)
if element_match and current_type:
+ type_match = re.search(r'type="([^"]+)"', line) if 'type="' in line else None
field_info = {
'name': element_match.group(1),
- 'type': re.search(r'type="([^"]+)"', line).group(1) if 'type="' in line else None,
+ 'type': type_match.group(1) if type_match else None,
'is_array': 'maxOccurs="unbounded"' in line
}
current_fields.append(field_info)
return complex_types
-def get_table_for_complex_type(table_name, complex_type):
- """테이블 이름에서 complexType에 해당하는 부분 추출"""
- # 테이블 이름이 '/'로 구분되어 있다면 마지막 부분을 반환
- if '/' in table_name:
- return table_name.split('/')[-1].upper()
- return table_name.upper()
-
def load_csv_data(csv_file):
"""CSV 파일에서 필드 정보를 딕셔너리로 로드"""
csv_data = {}
@@ -230,16 +225,61 @@ def load_csv_data(csv_file):
return csv_data
+def load_csv_data_from_path(csv_file_path):
+ """CSV 파일 경로에서 필드 정보를 딕셔너리로 로드"""
+ csv_data = {}
+ csv_filename = os.path.basename(csv_file_path)
+
+ try:
+ with open(csv_file_path, 'r', encoding='utf-8-sig') as f: # BOM 처리
+ reader = csv.DictReader(f)
+ for row_num, row in enumerate(reader, start=2): # 헤더 다음부터 2행
+ try:
+ field_name = row['Field']
+ table_name = row['Table']
+
+ # 매핑된 필드명이 있으면 사용, 없으면 원래 필드명 사용
+ wsdl_field_name = FIELD_MAPPING.get(field_name, field_name)
+
+ # 테이블 정보를 키에 포함 (구분자를 || 로 변경)
+ key = f"{wsdl_field_name}||{table_name}"
+
+ # 타입과 사이즈 정규화
+ normalized_type, normalized_size = normalize_sap_type_and_size(
+ row.get('Type', ''), row.get('Size', '')
+ )
+
+ # Description 안전 처리
+ safe_desc = safe_description_escape(row.get('Description', ''))
+
+ csv_data[key] = {
+ 'seq': row.get('SEQ', ''),
+ 'table': table_name,
+ 'field': row.get('Field', ''), # 원래 CSV 필드명 저장
+ 'mo': row.get('M/O', ''),
+ 'type': normalized_type, # 정규화된 타입
+ 'size': normalized_size, # 정규화된 사이즈
+ 'description': safe_desc, # 안전 처리된 Description
+ 'original_type': row.get('Type', ''), # 원본 타입 보존
+ 'original_size': row.get('Size', '') # 원본 사이즈 보존
+ }
+
+ except Exception as e:
+ print_error(f"CSV 행 {row_num} 처리 실패 - {csv_filename}: {str(e)}")
+ print_error(f"문제 행 데이터: {row}")
+ continue
+
+ except Exception as e:
+ print_error(f"CSV 파일 로딩 실패 - {csv_file_path}: {str(e)}")
+ return {}
+
+ return csv_data
+
def extract_field_name_from_line(line):
"""라인에서 name="필드명" 추출"""
match = re.search(r'name="([^"]+)"', line)
return match.group(1) if match else None
-def extract_field_from_comment(comment_line):
- """주석에서 Field: 부분의 필드명 추출"""
- match = re.search(r'Field:([^,]+)', comment_line)
- return match.group(1).strip() if match else None
-
def has_seq_in_comment(comment_line):
"""주석에 SEQ가 있는지 확인"""
return 'SEQ:' in comment_line
@@ -318,12 +358,13 @@ def comments_are_equal(existing_comment, expected_comment):
def should_process_line(line, csv_data):
"""라인이 처리 대상인지 확인"""
# 네 조건을 모두 만족해야 함:
- # 1. <xsd:element 태그
+ # 1. <xsd:element 또는 <xs:element 태그
# 2. name=" 속성이 있는 태그
# 3. maxOccurs=" 속성이 없는 태그 (배열 데이터 제외)
# 4. CSV에 해당 필드가 있는 경우
- if not ('<xsd:element' in line and 'name="' in line):
+ # xsd:element 또는 xs:element 태그 검사
+ if not (re.search(r'<(?:xsd:|xs:)element\s+', line) and 'name="' in line):
return False
# maxOccurs=" 가 있으면 배열 데이터이므로 제외 (모든 maxOccurs 속성)
@@ -339,7 +380,8 @@ def should_process_line(line, csv_data):
def get_skip_reason(line, csv_data):
"""필드를 건너뛰는 이유를 반환"""
- if not ('<xsd:element' in line and 'name="' in line):
+ # xsd:element 또는 xs:element 태그 검사
+ if not (re.search(r'<(?:xsd:|xs:)element\s+', line) and 'name="' in line):
return None
field_name = extract_field_name_from_line(line)
@@ -365,36 +407,6 @@ def get_skip_reason(line, csv_data):
return None
-def get_table_prefix_from_csv_name(csv_name: str) -> str:
- """CSV 파일명에서 테이블 prefix 추출"""
- csv_upper = csv_name.upper()
-
- # CSV 파일명 패턴에서 마스터 타입 추출
- if 'CUSTOMER_MASTER' in csv_upper:
- return 'CUSTOMER'
- elif 'VENDOR_MASTER' in csv_upper:
- return 'VENDOR'
- elif 'EMPLOYEE_MASTER' in csv_upper:
- return 'EMPLOYEE'
- elif 'PROJECT_MASTER' in csv_upper:
- return 'PROJECT'
- elif 'DEPARTMENT_CODE' in csv_upper:
- return 'DEPARTMENT'
- elif 'ORGANIZATION_MASTER' in csv_upper:
- return 'ORGANIZATION'
- elif 'EQUP_MASTER' in csv_upper:
- return 'EQUP'
- elif 'MODEL_MASTER' in csv_upper:
- return 'MODEL'
- elif 'MATERIAL_MASTER' in csv_upper:
- return 'MATERIAL'
- elif 'EMPLOYEE_REFERENCE' in csv_upper:
- return 'EMPLOYEE_REF'
- else:
- # 기본적으로 MDZ 부분 제거 후 첫 번째 단어 사용
- parts = csv_name.replace('IF_MDZ_EVCP_', '').split('_')
- return parts[0] if parts else 'COMMON'
-
def backup_file(filepath):
"""파일을 백업"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
@@ -403,18 +415,19 @@ def backup_file(filepath):
print(f"백업 파일 생성: {backup_path}")
return backup_path
-def process_wsdl_file(target):
- """WSDL 파일 처리"""
- csv_file_path = os.path.join(CSV_DIR, f'{target}.csv')
- wsdl_file_path = os.path.join(WSDL_DIR, f'{target}.wsdl')
-
+def process_files(csv_file_path, wsdl_file_path):
+ """개별 CSV와 WSDL 파일 처리"""
try:
# 백업 생성
backup_path = backup_file(wsdl_file_path)
- print_color(f"\n🚀 처리 시작: {target}", Colors.BOLD)
+ # 파일명 추출 (로깅용)
+ csv_filename = os.path.basename(csv_file_path)
+ wsdl_filename = os.path.basename(wsdl_file_path)
+
+ print_color(f"\n🚀 처리 시작: {csv_filename} → {wsdl_filename}", Colors.BOLD)
print_info("CSV 데이터 로딩 중...")
- csv_data = load_csv_data(target)
+ csv_data = load_csv_data_from_path(csv_file_path)
print_success(f"CSV에서 {len(csv_data)}개 필드 정보 로드됨")
# WSDL 파일 읽기
@@ -425,7 +438,7 @@ def process_wsdl_file(target):
complex_types = get_complex_type_info(lines)
print_success(f"WSDL에서 {len(complex_types)}개 complexType 정보 추출됨")
except Exception as e:
- print_error(f"파일 초기화 실패 - {target}: {str(e)}")
+ print_error(f"파일 초기화 실패 - {csv_filename} → {wsdl_filename}: {str(e)}")
return
# complexType 구조 출력 (디버깅용)
@@ -454,23 +467,23 @@ def process_wsdl_file(target):
line_processed = False
try:
- # complexType 시작 태그 확인
- type_match = re.search(r'<xsd:complexType\s+name="([^"]+)"', line)
+ # complexType 시작 태그 확인 (xsd: 또는 xs: 네임스페이스 지원)
+ type_match = re.search(r'<(?:xsd:|xs:)complexType\s+name="([^"]+)"', line)
if type_match:
if current_complex_type:
type_stack.append(current_complex_type)
current_complex_type = type_match.group(1)
print_color(f"현재 complexType: {current_complex_type}", Colors.BLUE)
- # complexType 종료 태그 확인
- if '</xsd:complexType>' in line:
+ # complexType 종료 태그 확인 (xsd: 또는 xs: 네임스페이스 지원)
+ if re.search(r'</(?:xsd:|xs:)complexType>', line):
if type_stack:
current_complex_type = type_stack.pop()
print_color(f"이전 complexType으로 복귀: {current_complex_type}", Colors.BLUE)
else:
current_complex_type = None
- # CSV에 있는 xsd:element 필드인지 확인
+ # CSV에 있는 xsd:element 또는 xs:element 필드인지 확인
if should_process_line(line, csv_data):
field_name = extract_field_name_from_line(line)
@@ -527,7 +540,7 @@ def process_wsdl_file(target):
print_info(f" 주석 추가: {field_name}")
line_processed = True
- elif '<xsd:element' in line and 'name="' in line:
+ elif re.search(r'<(?:xsd:|xs:)element\s+', line) and 'name="' in line:
field_name = extract_field_name_from_line(line)
if field_name:
skip_reason = get_skip_reason(line, csv_data)
@@ -567,7 +580,7 @@ def process_wsdl_file(target):
# 결과 출력
print_color(f"\n{'='*50}", Colors.BOLD)
- print_color(f"처리 완료: {target}", Colors.BOLD)
+ print_color(f"처리 완료: {csv_filename} → {wsdl_filename}", Colors.BOLD)
print_color(f"{'='*50}", Colors.BOLD)
print_info(f"CSV 파일: {csv_file_path}")
@@ -608,66 +621,157 @@ def process_wsdl_file(target):
if changes_made - corrected_seq > 0:
print(f" - 새로 추가/교체된 주석: {changes_made - corrected_seq}개")
-if __name__ == "__main__":
+def process_wsdl_file(target):
+ """WSDL 파일 처리 (기존 방식, 일괄 처리용)"""
+ csv_file_path = os.path.join(CSV_DIR, f'{target}.csv')
+ wsdl_file_path = os.path.join(WSDL_DIR, f'{target}.wsdl')
+
try:
- csv_files = get_csv_files()
- print_color(f"\n🎯 발견된 CSV 파일: {len(csv_files)}개", Colors.BOLD)
- print_info(f"처리할 파일 목록: {csv_files}")
+ # 백업 생성
+ backup_path = backup_file(wsdl_file_path)
- total_files = len(csv_files)
- success_count = 0
- error_count = 0
+ print_color(f"\n🚀 처리 시작: {target}", Colors.BOLD)
+ print_info("CSV 데이터 로딩 중...")
+ csv_data = load_csv_data(target)
+ print_success(f"CSV에서 {len(csv_data)}개 필드 정보 로드됨")
+
+ # WSDL 파일 읽기
+ with open(wsdl_file_path, 'r', encoding='utf-8') as f:
+ lines = f.readlines()
- for i, target in enumerate(csv_files, 1):
+ # complexType 정보 추출
+ complex_types = get_complex_type_info(lines)
+ print_success(f"WSDL에서 {len(complex_types)}개 complexType 정보 추출됨")
+ except Exception as e:
+ print_error(f"파일 초기화 실패 - {target}: {str(e)}")
+ return
+
+def parse_arguments():
+ """커맨드라인 아규먼트 파싱"""
+ parser = argparse.ArgumentParser(
+ description="WSDL 파일에 CSV 정보를 기반으로 주석을 추가하는 도구",
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ epilog="""
+사용 예시:
+ python3 update_wsdl_with_csv.py --csv ./public/wsdl/IF_ECC_EVCP_PR_INFORMATION.csv --wsdl ./public/wsdl/IF_ECC_EVCP_PR_INFORMATION.wsdl
+ python3 update_wsdl_with_csv.py --batch # 기존 일괄 처리 모드
+ """
+ )
+
+ group = parser.add_mutually_exclusive_group(required=True)
+ group.add_argument(
+ '--csv',
+ type=str,
+ help='처리할 CSV 파일 경로'
+ )
+ group.add_argument(
+ '--batch',
+ action='store_true',
+ help='일괄 처리 모드 (기존 방식)'
+ )
+
+ parser.add_argument(
+ '--wsdl',
+ type=str,
+ help='처리할 WSDL 파일 경로 (--csv와 함께 사용)'
+ )
+
+ args = parser.parse_args()
+
+ # --csv 옵션 사용 시 --wsdl도 필수
+ if args.csv and not args.wsdl:
+ parser.error("--csv 옵션을 사용할 때는 --wsdl 옵션도 필요합니다.")
+
+ # 파일 존재 여부 확인
+ if args.csv:
+ if not os.path.exists(args.csv):
+ parser.error(f"CSV 파일을 찾을 수 없습니다: {args.csv}")
+ if not args.csv.endswith('.csv'):
+ parser.error(f"CSV 파일이 아닙니다: {args.csv}")
+
+ if args.wsdl:
+ if not os.path.exists(args.wsdl):
+ parser.error(f"WSDL 파일을 찾을 수 없습니다: {args.wsdl}")
+ if not args.wsdl.endswith('.wsdl'):
+ parser.error(f"WSDL 파일이 아닙니다: {args.wsdl}")
+
+ return args
+
+if __name__ == "__main__":
+ try:
+ args = parse_arguments()
+
+ if args.batch:
+ csv_files = get_csv_files()
+ print_color(f"\n🎯 발견된 CSV 파일: {len(csv_files)}개", Colors.BOLD)
+ print_info(f"처리할 파일 목록: {csv_files}")
+
+ total_files = len(csv_files)
+ success_count = 0
+ error_count = 0
+
+ for i, target in enumerate(csv_files, 1):
+ print_color(f"\n{'='*60}", Colors.BOLD)
+ print_color(f"진행률: {i}/{total_files} - {target}", Colors.BOLD)
+ print_color(f"{'='*60}", Colors.BOLD)
+
+ try:
+ process_wsdl_file(target)
+ success_count += 1
+ except Exception as e:
+ print_error(f"파일 처리 실패 - {target}: {str(e)}")
+ error_count += 1
+
+ # 최종 통계
print_color(f"\n{'='*60}", Colors.BOLD)
- print_color(f"진행률: {i}/{total_files} - {target}", Colors.BOLD)
+ print_color("🏁 전체 처리 완료", Colors.BOLD)
print_color(f"{'='*60}", Colors.BOLD)
+ print_success(f"성공: {success_count}개 파일")
+ if error_count > 0:
+ print_error(f"실패: {error_count}개 파일")
+ else:
+ print_success("모든 파일이 성공적으로 처리되었습니다!")
+
+ # 발견된 SAP 타입들 출력 (PostgreSQL 매핑용)
+ print_color(f"\n{'='*60}", Colors.BOLD)
+ print_color("📊 발견된 SAP 타입 통계 (PostgreSQL 매핑용)", Colors.MAGENTA)
+ print_color(f"{'='*60}", Colors.BOLD)
+
+ print_color(f"\n🔤 고유 SAP 타입 ({len(discovered_sap_types)}개):", Colors.CYAN)
+ for sap_type in sorted(discovered_sap_types):
+ print(f" - {sap_type}")
+
+ print_color(f"\n📏 타입-사이즈 조합 ({len(type_size_combinations)}개):", Colors.YELLOW)
+ for combination in sorted(type_size_combinations):
+ print(f" - {combination}")
+
+ print_color(f"\n💡 PostgreSQL 타입 매핑 가이드 (XML 파싱/조회용):", Colors.GREEN)
+ print(" 🎯 실용적 접근법:")
+ print(" - 대부분 → VARCHAR(500) 또는 TEXT (XML에서 모든 데이터가 문자열로 전송)")
+ print(" - 숫자 검색/정렬이 필요한 경우만 → NUMERIC")
+ print(" - 날짜 검색/정렬이 필요한 경우만 → DATE/TIMESTAMP")
+ print("")
+ print(" 📋 SAP 타입별 상세:")
+ print(" - CHAR, VARC, LCHR → VARCHAR(해당사이즈) 또는 TEXT")
+ print(" - DATS (날짜) → VARCHAR(8) 또는 DATE (YYYYMMDD 형식)")
+ print(" - TIMS (시간) → VARCHAR(6) 또는 TIME (HHMMSS 형식)")
+ print(" - CURR, DEC, QUAN, NUMB, NUMC, FLTP → VARCHAR 또는 NUMERIC")
+ print(" - CUKY (통화), UNIT (단위), LANG (언어) → VARCHAR(10)")
+ print("")
+ print(" ⚡ 권장: 초기에는 모두 VARCHAR/TEXT로 시작하고 필요시 변환")
+
+ else: # --csv 옵션 사용 시
+ print_color(f"\n🎯 개별 파일 처리 모드", Colors.BOLD)
+ print_info(f"CSV 파일: {args.csv}")
+ print_info(f"WSDL 파일: {args.wsdl}")
+
try:
- process_wsdl_file(target)
- success_count += 1
+ process_files(args.csv, args.wsdl)
+ print_success("개별 파일 처리 완료!")
except Exception as e:
- print_error(f"파일 처리 실패 - {target}: {str(e)}")
- error_count += 1
-
- # 최종 통계
- print_color(f"\n{'='*60}", Colors.BOLD)
- print_color("🏁 전체 처리 완료", Colors.BOLD)
- print_color(f"{'='*60}", Colors.BOLD)
-
- print_success(f"성공: {success_count}개 파일")
- if error_count > 0:
- print_error(f"실패: {error_count}개 파일")
- else:
- print_success("모든 파일이 성공적으로 처리되었습니다!")
-
- # 발견된 SAP 타입들 출력 (PostgreSQL 매핑용)
- print_color(f"\n{'='*60}", Colors.BOLD)
- print_color("📊 발견된 SAP 타입 통계 (PostgreSQL 매핑용)", Colors.MAGENTA)
- print_color(f"{'='*60}", Colors.BOLD)
-
- print_color(f"\n🔤 고유 SAP 타입 ({len(discovered_sap_types)}개):", Colors.CYAN)
- for sap_type in sorted(discovered_sap_types):
- print(f" - {sap_type}")
-
- print_color(f"\n📏 타입-사이즈 조합 ({len(type_size_combinations)}개):", Colors.YELLOW)
- for combination in sorted(type_size_combinations):
- print(f" - {combination}")
-
- print_color(f"\n💡 PostgreSQL 타입 매핑 가이드 (XML 파싱/조회용):", Colors.GREEN)
- print(" 🎯 실용적 접근법:")
- print(" - 대부분 → VARCHAR(500) 또는 TEXT (XML에서 모든 데이터가 문자열로 전송)")
- print(" - 숫자 검색/정렬이 필요한 경우만 → NUMERIC")
- print(" - 날짜 검색/정렬이 필요한 경우만 → DATE/TIMESTAMP")
- print("")
- print(" 📋 SAP 타입별 상세:")
- print(" - CHAR, VARC, LCHR → VARCHAR(해당사이즈) 또는 TEXT")
- print(" - DATS (날짜) → VARCHAR(8) 또는 DATE (YYYYMMDD 형식)")
- print(" - TIMS (시간) → VARCHAR(6) 또는 TIME (HHMMSS 형식)")
- print(" - CURR, DEC, QUAN, NUMB, NUMC, FLTP → VARCHAR 또는 NUMERIC")
- print(" - CUKY (통화), UNIT (단위), LANG (언어) → VARCHAR(10)")
- print("")
- print(" ⚡ 권장: 초기에는 모두 VARCHAR/TEXT로 시작하고 필요시 변환")
+ print_error(f"개별 파일 처리 실패: {str(e)}")
+ exit(1)
except Exception as e:
print_error(f"스크립트 실행 중 치명적 오류 발생: {str(e)}")