diff options
| author | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-02 15:46:18 +0900 |
|---|---|---|
| committer | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-02 15:46:18 +0900 |
| commit | 776376dda8005635c4c3365905ca7df857789fec (patch) | |
| tree | 09a8f134929007b58981e6c5e756c3f3b4d3cbda | |
| parent | 8da5fb843856bb6585c6753f44d422beaa4a8204 (diff) | |
refactor: specialize exception handling across all services
| -rw-r--r-- | services/api/src/trading_api/routers/orders.py | 11 | ||||
| -rw-r--r-- | services/api/src/trading_api/routers/portfolio.py | 11 | ||||
| -rw-r--r-- | services/api/src/trading_api/routers/strategies.py | 5 | ||||
| -rw-r--r-- | services/data-collector/src/data_collector/main.py | 12 | ||||
| -rw-r--r-- | services/news-collector/src/news_collector/main.py | 26 | ||||
| -rw-r--r-- | services/order-executor/src/order_executor/main.py | 32 | ||||
| -rw-r--r-- | services/portfolio-manager/src/portfolio_manager/main.py | 26 | ||||
| -rw-r--r-- | services/strategy-engine/src/strategy_engine/main.py | 15 |
8 files changed, 114 insertions, 24 deletions
diff --git a/services/api/src/trading_api/routers/orders.py b/services/api/src/trading_api/routers/orders.py index c69dc10..a29ae2f 100644 --- a/services/api/src/trading_api/routers/orders.py +++ b/services/api/src/trading_api/routers/orders.py @@ -5,6 +5,7 @@ import logging from fastapi import APIRouter, HTTPException, Request from shared.sa_models import OrderRow, SignalRow from sqlalchemy import select +from sqlalchemy.exc import OperationalError logger = logging.getLogger(__name__) @@ -35,8 +36,11 @@ async def get_orders(request: Request, limit: int = 50): } for r in rows ] + except OperationalError as exc: + logger.error("Database error fetching orders: %s", exc) + raise HTTPException(status_code=503, detail="Database unavailable") except Exception as exc: - logger.error("Failed to get orders: %s", exc) + logger.error("Failed to get orders: %s", exc, exc_info=True) raise HTTPException(status_code=500, detail="Failed to retrieve orders") @@ -62,6 +66,9 @@ async def get_signals(request: Request, limit: int = 50): } for r in rows ] + except OperationalError as exc: + logger.error("Database error fetching signals: %s", exc) + raise HTTPException(status_code=503, detail="Database unavailable") except Exception as exc: - logger.error("Failed to get signals: %s", exc) + logger.error("Failed to get signals: %s", exc, exc_info=True) raise HTTPException(status_code=500, detail="Failed to retrieve signals") diff --git a/services/api/src/trading_api/routers/portfolio.py b/services/api/src/trading_api/routers/portfolio.py index d76d85d..3907a86 100644 --- a/services/api/src/trading_api/routers/portfolio.py +++ b/services/api/src/trading_api/routers/portfolio.py @@ -5,6 +5,7 @@ import logging from fastapi import APIRouter, HTTPException, Request from shared.sa_models import PositionRow from sqlalchemy import select +from sqlalchemy.exc import OperationalError logger = logging.getLogger(__name__) @@ -29,8 +30,11 @@ async def get_positions(request: Request): } for r in rows ] + except OperationalError as exc: + logger.error("Database error fetching positions: %s", exc) + raise HTTPException(status_code=503, detail="Database unavailable") except Exception as exc: - logger.error("Failed to get positions: %s", exc) + logger.error("Failed to get positions: %s", exc, exc_info=True) raise HTTPException(status_code=500, detail="Failed to retrieve positions") @@ -49,6 +53,9 @@ async def get_snapshots(request: Request, days: int = 30): } for s in snapshots ] + except OperationalError as exc: + logger.error("Database error fetching snapshots: %s", exc) + raise HTTPException(status_code=503, detail="Database unavailable") except Exception as exc: - logger.error("Failed to get snapshots: %s", exc) + logger.error("Failed to get snapshots: %s", exc, exc_info=True) raise HTTPException(status_code=500, detail="Failed to retrieve snapshots") diff --git a/services/api/src/trading_api/routers/strategies.py b/services/api/src/trading_api/routers/strategies.py index 7ddd54e..5db7320 100644 --- a/services/api/src/trading_api/routers/strategies.py +++ b/services/api/src/trading_api/routers/strategies.py @@ -42,6 +42,9 @@ async def list_strategies(): } for s in strategies ] + except (ImportError, FileNotFoundError) as exc: + logger.error("Strategy loading error: %s", exc) + raise HTTPException(status_code=503, detail="Strategy engine unavailable") except Exception as exc: - logger.error("Failed to list strategies: %s", exc) + logger.error("Failed to list strategies: %s", exc, exc_info=True) raise HTTPException(status_code=500, detail="Failed to list strategies") diff --git a/services/data-collector/src/data_collector/main.py b/services/data-collector/src/data_collector/main.py index 171db52..8b9f301 100644 --- a/services/data-collector/src/data_collector/main.py +++ b/services/data-collector/src/data_collector/main.py @@ -2,6 +2,8 @@ import asyncio +import aiohttp + from shared.alpaca import AlpacaClient from shared.broker import RedisBroker from shared.db import Database @@ -46,8 +48,10 @@ async def fetch_latest_bars( volume=Decimal(str(bar["v"])), ) candles.append(candle) - except Exception as exc: - log.warning("fetch_bar_failed", symbol=symbol, error=str(exc)) + except (aiohttp.ClientError, ConnectionError, TimeoutError, asyncio.TimeoutError) as exc: + log.warning("fetch_bar_network_error", symbol=symbol, error=str(exc)) + except (ValueError, KeyError, TypeError) as exc: + log.warning("fetch_bar_parse_error", symbol=symbol, error=str(exc)) return candles @@ -94,7 +98,7 @@ async def run() -> None: # Check if market is open try: is_open = await alpaca.is_market_open() - except Exception: + except (aiohttp.ClientError, ConnectionError, TimeoutError, asyncio.TimeoutError): is_open = False if is_open: @@ -113,7 +117,7 @@ async def run() -> None: await asyncio.sleep(poll_interval) except Exception as exc: - log.error("fatal_error", error=str(exc)) + log.error("fatal_error", error=str(exc), exc_info=True) await notifier.send_error(str(exc), "data-collector") raise finally: diff --git a/services/news-collector/src/news_collector/main.py b/services/news-collector/src/news_collector/main.py index 837a397..af0cd20 100644 --- a/services/news-collector/src/news_collector/main.py +++ b/services/news-collector/src/news_collector/main.py @@ -3,6 +3,8 @@ import asyncio from datetime import datetime, timezone +import aiohttp + from shared.broker import RedisBroker from shared.db import Database from shared.events import NewsEvent @@ -54,9 +56,15 @@ async def run_collector_loop(collector, db: Database, broker: RedisBroker, log) collector=collector.name, count=count, ) - except Exception as exc: + except (aiohttp.ClientError, ConnectionError, TimeoutError, asyncio.TimeoutError) as exc: + log.warning( + "collector_network_error", + collector=collector.name, + error=str(exc), + ) + except (ValueError, KeyError, TypeError) as exc: log.warning( - "collector_error", + "collector_parse_error", collector=collector.name, error=str(exc), ) @@ -83,8 +91,10 @@ async def run_fear_greed_loop(collector: FearGreedCollector, db: Database, log) value=result.fear_greed, label=result.fear_greed_label, ) - except Exception as exc: - log.warning("fear_greed_error", error=str(exc)) + except (aiohttp.ClientError, ConnectionError, TimeoutError, asyncio.TimeoutError) as exc: + log.warning("fear_greed_network_error", error=str(exc)) + except (ValueError, KeyError, TypeError) as exc: + log.warning("fear_greed_parse_error", error=str(exc)) await asyncio.sleep(collector.poll_interval) @@ -100,8 +110,10 @@ async def run_aggregator_loop(db: Database, interval: int, log) -> None: for score in scores.values(): await db.upsert_symbol_score(score) log.info("aggregation_complete", symbols=len(scores)) - except Exception as exc: - log.warning("aggregator_error", error=str(exc)) + except (ConnectionError, TimeoutError, asyncio.TimeoutError) as exc: + log.warning("aggregator_network_error", error=str(exc)) + except (ValueError, KeyError, TypeError) as exc: + log.warning("aggregator_parse_error", error=str(exc)) def _determine_regime(fear_greed: int, vix: float | None) -> str: @@ -177,7 +189,7 @@ async def run() -> None: ) await shutdown.wait() except Exception as exc: - log.error("fatal_error", error=str(exc)) + log.error("fatal_error", error=str(exc), exc_info=True) await notifier.send_error(str(exc), "news-collector") raise finally: diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py index 63d93bc..d9e2373 100644 --- a/services/order-executor/src/order_executor/main.py +++ b/services/order-executor/src/order_executor/main.py @@ -3,6 +3,8 @@ import asyncio from decimal import Decimal +import aiohttp + from shared.alpaca import AlpacaClient from shared.broker import RedisBroker from shared.db import Database @@ -98,8 +100,18 @@ async def run() -> None: if event.type == EventType.SIGNAL: await executor.execute(event.data) await broker.ack(stream, GROUP, msg_id) + except ( + aiohttp.ClientError, + ConnectionError, + TimeoutError, + asyncio.TimeoutError, + ) as exc: + log.warning("pending_network_error", error=str(exc), msg_id=msg_id) + except (ValueError, KeyError, TypeError) as exc: + log.warning("pending_parse_error", error=str(exc), msg_id=msg_id) + await broker.ack(stream, GROUP, msg_id) except Exception as exc: - log.error("pending_failed", error=str(exc), msg_id=msg_id) + log.error("pending_failed", error=str(exc), msg_id=msg_id, exc_info=True) while not shutdown.is_shutting_down: messages = await broker.read_group(stream, GROUP, CONSUMER, count=10, block=5000) @@ -114,8 +126,24 @@ async def run() -> None: service="order-executor", event_type="signal" ).inc() await broker.ack(stream, GROUP, msg_id) + except ( + aiohttp.ClientError, + ConnectionError, + TimeoutError, + asyncio.TimeoutError, + ) as exc: + log.warning("process_network_error", error=str(exc)) + metrics.errors_total.labels( + service="order-executor", error_type="network" + ).inc() + except (ValueError, KeyError, TypeError) as exc: + log.warning("process_parse_error", error=str(exc)) + await broker.ack(stream, GROUP, msg_id) + metrics.errors_total.labels( + service="order-executor", error_type="validation" + ).inc() except Exception as exc: - log.error("process_failed", error=str(exc)) + log.error("process_failed", error=str(exc), exc_info=True) metrics.errors_total.labels( service="order-executor", error_type="processing" ).inc() diff --git a/services/portfolio-manager/src/portfolio_manager/main.py b/services/portfolio-manager/src/portfolio_manager/main.py index 6cf248f..6ca7b1b 100644 --- a/services/portfolio-manager/src/portfolio_manager/main.py +++ b/services/portfolio-manager/src/portfolio_manager/main.py @@ -2,6 +2,8 @@ import asyncio +import sqlalchemy.exc + from shared.broker import RedisBroker from shared.db import Database from shared.events import Event, OrderEvent @@ -52,8 +54,12 @@ async def snapshot_loop( while True: try: await save_snapshot(db, tracker, notifier, log) + except (sqlalchemy.exc.OperationalError, ConnectionError, TimeoutError) as exc: + log.warning("snapshot_db_error", error=str(exc)) + except (ValueError, KeyError, TypeError) as exc: + log.warning("snapshot_data_error", error=str(exc)) except Exception as exc: - log.error("snapshot_failed", error=str(exc)) + log.error("snapshot_failed", error=str(exc), exc_info=True) await asyncio.sleep(interval_hours * 3600) @@ -112,8 +118,12 @@ async def run() -> None: service="portfolio-manager", event_type="order" ).inc() await broker.ack(ORDERS_STREAM, GROUP, msg_id) + except (ValueError, KeyError, TypeError) as exc: + log.warning("pending_parse_error", error=str(exc), msg_id=msg_id) + await broker.ack(ORDERS_STREAM, GROUP, msg_id) + metrics.errors_total.labels(service="portfolio-manager", error_type="validation").inc() except Exception as exc: - log.error("pending_process_failed", error=str(exc), msg_id=msg_id) + log.error("pending_process_failed", error=str(exc), msg_id=msg_id, exc_info=True) metrics.errors_total.labels(service="portfolio-manager", error_type="processing").inc() try: @@ -138,13 +148,21 @@ async def run() -> None: service="portfolio-manager", event_type="order" ).inc() await broker.ack(ORDERS_STREAM, GROUP, msg_id) + except (ValueError, KeyError, TypeError) as exc: + log.warning("message_parse_error", error=str(exc), msg_id=msg_id) + await broker.ack(ORDERS_STREAM, GROUP, msg_id) + metrics.errors_total.labels( + service="portfolio-manager", error_type="validation" + ).inc() except Exception as exc: - log.exception("message_processing_failed", error=str(exc), msg_id=msg_id) + log.error( + "message_processing_failed", error=str(exc), msg_id=msg_id, exc_info=True + ) metrics.errors_total.labels( service="portfolio-manager", error_type="processing" ).inc() except Exception as exc: - log.error("fatal_error", error=str(exc)) + log.error("fatal_error", error=str(exc), exc_info=True) await notifier.send_error(str(exc), "portfolio-manager") raise finally: diff --git a/services/strategy-engine/src/strategy_engine/main.py b/services/strategy-engine/src/strategy_engine/main.py index 411c54b..2852b53 100644 --- a/services/strategy-engine/src/strategy_engine/main.py +++ b/services/strategy-engine/src/strategy_engine/main.py @@ -5,6 +5,8 @@ from datetime import datetime from pathlib import Path import zoneinfo +import aiohttp + from shared.alpaca import AlpacaClient from shared.broker import RedisBroker from shared.db import Database @@ -64,8 +66,17 @@ async def run_stock_selector( log.info("stock_selector_complete", picks=[s.symbol for s in selections]) else: log.info("stock_selector_no_picks") + except ( + aiohttp.ClientError, + ConnectionError, + TimeoutError, + asyncio.TimeoutError, + ) as exc: + log.warning("stock_selector_network_error", error=str(exc)) + except (ValueError, KeyError, TypeError) as exc: + log.warning("stock_selector_data_error", error=str(exc)) except Exception as exc: - log.error("stock_selector_error", error=str(exc)) + log.error("stock_selector_error", error=str(exc), exc_info=True) await asyncio.sleep(120) # Sleep past this minute else: await asyncio.sleep(30) @@ -137,7 +148,7 @@ async def run() -> None: await shutdown.wait() except Exception as exc: - log.error("fatal_error", error=str(exc)) + log.error("fatal_error", error=str(exc), exc_info=True) await notifier.send_error(str(exc), "strategy-engine") raise finally: |
