summaryrefslogtreecommitdiff
path: root/services
diff options
context:
space:
mode:
Diffstat (limited to 'services')
-rw-r--r--services/api/src/trading_api/routers/orders.py11
-rw-r--r--services/api/src/trading_api/routers/portfolio.py11
-rw-r--r--services/api/src/trading_api/routers/strategies.py5
-rw-r--r--services/data-collector/src/data_collector/main.py12
-rw-r--r--services/news-collector/src/news_collector/main.py26
-rw-r--r--services/order-executor/src/order_executor/main.py32
-rw-r--r--services/portfolio-manager/src/portfolio_manager/main.py26
-rw-r--r--services/strategy-engine/src/strategy_engine/main.py15
8 files changed, 114 insertions, 24 deletions
diff --git a/services/api/src/trading_api/routers/orders.py b/services/api/src/trading_api/routers/orders.py
index c69dc10..a29ae2f 100644
--- a/services/api/src/trading_api/routers/orders.py
+++ b/services/api/src/trading_api/routers/orders.py
@@ -5,6 +5,7 @@ import logging
from fastapi import APIRouter, HTTPException, Request
from shared.sa_models import OrderRow, SignalRow
from sqlalchemy import select
+from sqlalchemy.exc import OperationalError
logger = logging.getLogger(__name__)
@@ -35,8 +36,11 @@ async def get_orders(request: Request, limit: int = 50):
}
for r in rows
]
+ except OperationalError as exc:
+ logger.error("Database error fetching orders: %s", exc)
+ raise HTTPException(status_code=503, detail="Database unavailable")
except Exception as exc:
- logger.error("Failed to get orders: %s", exc)
+ logger.error("Failed to get orders: %s", exc, exc_info=True)
raise HTTPException(status_code=500, detail="Failed to retrieve orders")
@@ -62,6 +66,9 @@ async def get_signals(request: Request, limit: int = 50):
}
for r in rows
]
+ except OperationalError as exc:
+ logger.error("Database error fetching signals: %s", exc)
+ raise HTTPException(status_code=503, detail="Database unavailable")
except Exception as exc:
- logger.error("Failed to get signals: %s", exc)
+ logger.error("Failed to get signals: %s", exc, exc_info=True)
raise HTTPException(status_code=500, detail="Failed to retrieve signals")
diff --git a/services/api/src/trading_api/routers/portfolio.py b/services/api/src/trading_api/routers/portfolio.py
index d76d85d..3907a86 100644
--- a/services/api/src/trading_api/routers/portfolio.py
+++ b/services/api/src/trading_api/routers/portfolio.py
@@ -5,6 +5,7 @@ import logging
from fastapi import APIRouter, HTTPException, Request
from shared.sa_models import PositionRow
from sqlalchemy import select
+from sqlalchemy.exc import OperationalError
logger = logging.getLogger(__name__)
@@ -29,8 +30,11 @@ async def get_positions(request: Request):
}
for r in rows
]
+ except OperationalError as exc:
+ logger.error("Database error fetching positions: %s", exc)
+ raise HTTPException(status_code=503, detail="Database unavailable")
except Exception as exc:
- logger.error("Failed to get positions: %s", exc)
+ logger.error("Failed to get positions: %s", exc, exc_info=True)
raise HTTPException(status_code=500, detail="Failed to retrieve positions")
@@ -49,6 +53,9 @@ async def get_snapshots(request: Request, days: int = 30):
}
for s in snapshots
]
+ except OperationalError as exc:
+ logger.error("Database error fetching snapshots: %s", exc)
+ raise HTTPException(status_code=503, detail="Database unavailable")
except Exception as exc:
- logger.error("Failed to get snapshots: %s", exc)
+ logger.error("Failed to get snapshots: %s", exc, exc_info=True)
raise HTTPException(status_code=500, detail="Failed to retrieve snapshots")
diff --git a/services/api/src/trading_api/routers/strategies.py b/services/api/src/trading_api/routers/strategies.py
index 7ddd54e..5db7320 100644
--- a/services/api/src/trading_api/routers/strategies.py
+++ b/services/api/src/trading_api/routers/strategies.py
@@ -42,6 +42,9 @@ async def list_strategies():
}
for s in strategies
]
+ except (ImportError, FileNotFoundError) as exc:
+ logger.error("Strategy loading error: %s", exc)
+ raise HTTPException(status_code=503, detail="Strategy engine unavailable")
except Exception as exc:
- logger.error("Failed to list strategies: %s", exc)
+ logger.error("Failed to list strategies: %s", exc, exc_info=True)
raise HTTPException(status_code=500, detail="Failed to list strategies")
diff --git a/services/data-collector/src/data_collector/main.py b/services/data-collector/src/data_collector/main.py
index 171db52..8b9f301 100644
--- a/services/data-collector/src/data_collector/main.py
+++ b/services/data-collector/src/data_collector/main.py
@@ -2,6 +2,8 @@
import asyncio
+import aiohttp
+
from shared.alpaca import AlpacaClient
from shared.broker import RedisBroker
from shared.db import Database
@@ -46,8 +48,10 @@ async def fetch_latest_bars(
volume=Decimal(str(bar["v"])),
)
candles.append(candle)
- except Exception as exc:
- log.warning("fetch_bar_failed", symbol=symbol, error=str(exc))
+ except (aiohttp.ClientError, ConnectionError, TimeoutError, asyncio.TimeoutError) as exc:
+ log.warning("fetch_bar_network_error", symbol=symbol, error=str(exc))
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("fetch_bar_parse_error", symbol=symbol, error=str(exc))
return candles
@@ -94,7 +98,7 @@ async def run() -> None:
# Check if market is open
try:
is_open = await alpaca.is_market_open()
- except Exception:
+ except (aiohttp.ClientError, ConnectionError, TimeoutError, asyncio.TimeoutError):
is_open = False
if is_open:
@@ -113,7 +117,7 @@ async def run() -> None:
await asyncio.sleep(poll_interval)
except Exception as exc:
- log.error("fatal_error", error=str(exc))
+ log.error("fatal_error", error=str(exc), exc_info=True)
await notifier.send_error(str(exc), "data-collector")
raise
finally:
diff --git a/services/news-collector/src/news_collector/main.py b/services/news-collector/src/news_collector/main.py
index 837a397..af0cd20 100644
--- a/services/news-collector/src/news_collector/main.py
+++ b/services/news-collector/src/news_collector/main.py
@@ -3,6 +3,8 @@
import asyncio
from datetime import datetime, timezone
+import aiohttp
+
from shared.broker import RedisBroker
from shared.db import Database
from shared.events import NewsEvent
@@ -54,9 +56,15 @@ async def run_collector_loop(collector, db: Database, broker: RedisBroker, log)
collector=collector.name,
count=count,
)
- except Exception as exc:
+ except (aiohttp.ClientError, ConnectionError, TimeoutError, asyncio.TimeoutError) as exc:
+ log.warning(
+ "collector_network_error",
+ collector=collector.name,
+ error=str(exc),
+ )
+ except (ValueError, KeyError, TypeError) as exc:
log.warning(
- "collector_error",
+ "collector_parse_error",
collector=collector.name,
error=str(exc),
)
@@ -83,8 +91,10 @@ async def run_fear_greed_loop(collector: FearGreedCollector, db: Database, log)
value=result.fear_greed,
label=result.fear_greed_label,
)
- except Exception as exc:
- log.warning("fear_greed_error", error=str(exc))
+ except (aiohttp.ClientError, ConnectionError, TimeoutError, asyncio.TimeoutError) as exc:
+ log.warning("fear_greed_network_error", error=str(exc))
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("fear_greed_parse_error", error=str(exc))
await asyncio.sleep(collector.poll_interval)
@@ -100,8 +110,10 @@ async def run_aggregator_loop(db: Database, interval: int, log) -> None:
for score in scores.values():
await db.upsert_symbol_score(score)
log.info("aggregation_complete", symbols=len(scores))
- except Exception as exc:
- log.warning("aggregator_error", error=str(exc))
+ except (ConnectionError, TimeoutError, asyncio.TimeoutError) as exc:
+ log.warning("aggregator_network_error", error=str(exc))
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("aggregator_parse_error", error=str(exc))
def _determine_regime(fear_greed: int, vix: float | None) -> str:
@@ -177,7 +189,7 @@ async def run() -> None:
)
await shutdown.wait()
except Exception as exc:
- log.error("fatal_error", error=str(exc))
+ log.error("fatal_error", error=str(exc), exc_info=True)
await notifier.send_error(str(exc), "news-collector")
raise
finally:
diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py
index 63d93bc..d9e2373 100644
--- a/services/order-executor/src/order_executor/main.py
+++ b/services/order-executor/src/order_executor/main.py
@@ -3,6 +3,8 @@
import asyncio
from decimal import Decimal
+import aiohttp
+
from shared.alpaca import AlpacaClient
from shared.broker import RedisBroker
from shared.db import Database
@@ -98,8 +100,18 @@ async def run() -> None:
if event.type == EventType.SIGNAL:
await executor.execute(event.data)
await broker.ack(stream, GROUP, msg_id)
+ except (
+ aiohttp.ClientError,
+ ConnectionError,
+ TimeoutError,
+ asyncio.TimeoutError,
+ ) as exc:
+ log.warning("pending_network_error", error=str(exc), msg_id=msg_id)
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("pending_parse_error", error=str(exc), msg_id=msg_id)
+ await broker.ack(stream, GROUP, msg_id)
except Exception as exc:
- log.error("pending_failed", error=str(exc), msg_id=msg_id)
+ log.error("pending_failed", error=str(exc), msg_id=msg_id, exc_info=True)
while not shutdown.is_shutting_down:
messages = await broker.read_group(stream, GROUP, CONSUMER, count=10, block=5000)
@@ -114,8 +126,24 @@ async def run() -> None:
service="order-executor", event_type="signal"
).inc()
await broker.ack(stream, GROUP, msg_id)
+ except (
+ aiohttp.ClientError,
+ ConnectionError,
+ TimeoutError,
+ asyncio.TimeoutError,
+ ) as exc:
+ log.warning("process_network_error", error=str(exc))
+ metrics.errors_total.labels(
+ service="order-executor", error_type="network"
+ ).inc()
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("process_parse_error", error=str(exc))
+ await broker.ack(stream, GROUP, msg_id)
+ metrics.errors_total.labels(
+ service="order-executor", error_type="validation"
+ ).inc()
except Exception as exc:
- log.error("process_failed", error=str(exc))
+ log.error("process_failed", error=str(exc), exc_info=True)
metrics.errors_total.labels(
service="order-executor", error_type="processing"
).inc()
diff --git a/services/portfolio-manager/src/portfolio_manager/main.py b/services/portfolio-manager/src/portfolio_manager/main.py
index 6cf248f..6ca7b1b 100644
--- a/services/portfolio-manager/src/portfolio_manager/main.py
+++ b/services/portfolio-manager/src/portfolio_manager/main.py
@@ -2,6 +2,8 @@
import asyncio
+import sqlalchemy.exc
+
from shared.broker import RedisBroker
from shared.db import Database
from shared.events import Event, OrderEvent
@@ -52,8 +54,12 @@ async def snapshot_loop(
while True:
try:
await save_snapshot(db, tracker, notifier, log)
+ except (sqlalchemy.exc.OperationalError, ConnectionError, TimeoutError) as exc:
+ log.warning("snapshot_db_error", error=str(exc))
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("snapshot_data_error", error=str(exc))
except Exception as exc:
- log.error("snapshot_failed", error=str(exc))
+ log.error("snapshot_failed", error=str(exc), exc_info=True)
await asyncio.sleep(interval_hours * 3600)
@@ -112,8 +118,12 @@ async def run() -> None:
service="portfolio-manager", event_type="order"
).inc()
await broker.ack(ORDERS_STREAM, GROUP, msg_id)
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("pending_parse_error", error=str(exc), msg_id=msg_id)
+ await broker.ack(ORDERS_STREAM, GROUP, msg_id)
+ metrics.errors_total.labels(service="portfolio-manager", error_type="validation").inc()
except Exception as exc:
- log.error("pending_process_failed", error=str(exc), msg_id=msg_id)
+ log.error("pending_process_failed", error=str(exc), msg_id=msg_id, exc_info=True)
metrics.errors_total.labels(service="portfolio-manager", error_type="processing").inc()
try:
@@ -138,13 +148,21 @@ async def run() -> None:
service="portfolio-manager", event_type="order"
).inc()
await broker.ack(ORDERS_STREAM, GROUP, msg_id)
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("message_parse_error", error=str(exc), msg_id=msg_id)
+ await broker.ack(ORDERS_STREAM, GROUP, msg_id)
+ metrics.errors_total.labels(
+ service="portfolio-manager", error_type="validation"
+ ).inc()
except Exception as exc:
- log.exception("message_processing_failed", error=str(exc), msg_id=msg_id)
+ log.error(
+ "message_processing_failed", error=str(exc), msg_id=msg_id, exc_info=True
+ )
metrics.errors_total.labels(
service="portfolio-manager", error_type="processing"
).inc()
except Exception as exc:
- log.error("fatal_error", error=str(exc))
+ log.error("fatal_error", error=str(exc), exc_info=True)
await notifier.send_error(str(exc), "portfolio-manager")
raise
finally:
diff --git a/services/strategy-engine/src/strategy_engine/main.py b/services/strategy-engine/src/strategy_engine/main.py
index 411c54b..2852b53 100644
--- a/services/strategy-engine/src/strategy_engine/main.py
+++ b/services/strategy-engine/src/strategy_engine/main.py
@@ -5,6 +5,8 @@ from datetime import datetime
from pathlib import Path
import zoneinfo
+import aiohttp
+
from shared.alpaca import AlpacaClient
from shared.broker import RedisBroker
from shared.db import Database
@@ -64,8 +66,17 @@ async def run_stock_selector(
log.info("stock_selector_complete", picks=[s.symbol for s in selections])
else:
log.info("stock_selector_no_picks")
+ except (
+ aiohttp.ClientError,
+ ConnectionError,
+ TimeoutError,
+ asyncio.TimeoutError,
+ ) as exc:
+ log.warning("stock_selector_network_error", error=str(exc))
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("stock_selector_data_error", error=str(exc))
except Exception as exc:
- log.error("stock_selector_error", error=str(exc))
+ log.error("stock_selector_error", error=str(exc), exc_info=True)
await asyncio.sleep(120) # Sleep past this minute
else:
await asyncio.sleep(30)
@@ -137,7 +148,7 @@ async def run() -> None:
await shutdown.wait()
except Exception as exc:
- log.error("fatal_error", error=str(exc))
+ log.error("fatal_error", error=str(exc), exc_info=True)
await notifier.send_error(str(exc), "strategy-engine")
raise
finally: