summaryrefslogtreecommitdiff
path: root/services
diff options
context:
space:
mode:
Diffstat (limited to 'services')
-rw-r--r--services/api/Dockerfile15
-rw-r--r--services/api/pyproject.toml6
-rw-r--r--services/api/src/trading_api/dependencies/__init__.py0
-rw-r--r--services/api/src/trading_api/dependencies/auth.py29
-rw-r--r--services/api/src/trading_api/main.py50
-rw-r--r--services/api/src/trading_api/routers/orders.py29
-rw-r--r--services/api/src/trading_api/routers/portfolio.py22
-rw-r--r--services/api/src/trading_api/routers/strategies.py7
-rw-r--r--services/api/tests/test_api.py1
-rw-r--r--services/api/tests/test_orders_router.py6
-rw-r--r--services/api/tests/test_portfolio_router.py6
-rw-r--r--services/backtester/Dockerfile9
-rw-r--r--services/backtester/pyproject.toml2
-rw-r--r--services/backtester/src/backtester/engine.py5
-rw-r--r--services/backtester/src/backtester/main.py6
-rw-r--r--services/backtester/src/backtester/metrics.py2
-rw-r--r--services/backtester/src/backtester/simulator.py19
-rw-r--r--services/backtester/src/backtester/walk_forward.py4
-rw-r--r--services/backtester/tests/test_engine.py9
-rw-r--r--services/backtester/tests/test_metrics.py9
-rw-r--r--services/backtester/tests/test_simulator.py13
-rw-r--r--services/backtester/tests/test_walk_forward.py10
-rw-r--r--services/data-collector/Dockerfile9
-rw-r--r--services/data-collector/src/data_collector/main.py31
-rw-r--r--services/data-collector/tests/test_storage.py9
-rw-r--r--services/news-collector/Dockerfile12
-rw-r--r--services/news-collector/pyproject.toml7
-rw-r--r--services/news-collector/src/news_collector/collectors/fear_greed.py5
-rw-r--r--services/news-collector/src/news_collector/collectors/fed.py6
-rw-r--r--services/news-collector/src/news_collector/collectors/finnhub.py4
-rw-r--r--services/news-collector/src/news_collector/collectors/reddit.py4
-rw-r--r--services/news-collector/src/news_collector/collectors/rss.py6
-rw-r--r--services/news-collector/src/news_collector/collectors/sec_edgar.py10
-rw-r--r--services/news-collector/src/news_collector/collectors/truth_social.py4
-rw-r--r--services/news-collector/src/news_collector/config.py3
-rw-r--r--services/news-collector/src/news_collector/main.py81
-rw-r--r--services/news-collector/tests/test_fear_greed.py2
-rw-r--r--services/news-collector/tests/test_fed.py3
-rw-r--r--services/news-collector/tests/test_finnhub.py2
-rw-r--r--services/news-collector/tests/test_main.py8
-rw-r--r--services/news-collector/tests/test_reddit.py3
-rw-r--r--services/news-collector/tests/test_rss.py2
-rw-r--r--services/news-collector/tests/test_sec_edgar.py8
-rw-r--r--services/news-collector/tests/test_truth_social.py3
-rw-r--r--services/order-executor/Dockerfile9
-rw-r--r--services/order-executor/src/order_executor/executor.py16
-rw-r--r--services/order-executor/src/order_executor/main.py45
-rw-r--r--services/order-executor/src/order_executor/risk_manager.py26
-rw-r--r--services/order-executor/tests/test_executor.py4
-rw-r--r--services/order-executor/tests/test_risk_manager.py2
-rw-r--r--services/portfolio-manager/Dockerfile9
-rw-r--r--services/portfolio-manager/src/portfolio_manager/main.py43
-rw-r--r--services/portfolio-manager/tests/test_portfolio.py3
-rw-r--r--services/portfolio-manager/tests/test_snapshot.py3
-rw-r--r--services/strategy-engine/Dockerfile9
-rw-r--r--services/strategy-engine/pyproject.toml6
-rw-r--r--services/strategy-engine/src/strategy_engine/config.py6
-rw-r--r--services/strategy-engine/src/strategy_engine/engine.py8
-rw-r--r--services/strategy-engine/src/strategy_engine/main.py33
-rw-r--r--services/strategy-engine/src/strategy_engine/plugin_loader.py1
-rw-r--r--services/strategy-engine/src/strategy_engine/stock_selector.py210
-rw-r--r--services/strategy-engine/strategies/base.py5
-rw-r--r--services/strategy-engine/strategies/bollinger_strategy.py2
-rw-r--r--services/strategy-engine/strategies/combined_strategy.py2
-rw-r--r--services/strategy-engine/strategies/ema_crossover_strategy.py2
-rw-r--r--services/strategy-engine/strategies/grid_strategy.py5
-rw-r--r--services/strategy-engine/strategies/indicators/__init__.py16
-rw-r--r--services/strategy-engine/strategies/indicators/momentum.py2
-rw-r--r--services/strategy-engine/strategies/indicators/trend.py2
-rw-r--r--services/strategy-engine/strategies/indicators/volatility.py2
-rw-r--r--services/strategy-engine/strategies/indicators/volume.py2
-rw-r--r--services/strategy-engine/strategies/macd_strategy.py2
-rw-r--r--services/strategy-engine/strategies/moc_strategy.py4
-rw-r--r--services/strategy-engine/strategies/rsi_strategy.py2
-rw-r--r--services/strategy-engine/strategies/volume_profile_strategy.py4
-rw-r--r--services/strategy-engine/strategies/vwap_strategy.py4
-rw-r--r--services/strategy-engine/tests/test_base_filters.py7
-rw-r--r--services/strategy-engine/tests/test_bollinger_strategy.py6
-rw-r--r--services/strategy-engine/tests/test_combined_strategy.py11
-rw-r--r--services/strategy-engine/tests/test_ema_crossover_strategy.py6
-rw-r--r--services/strategy-engine/tests/test_engine.py8
-rw-r--r--services/strategy-engine/tests/test_grid_strategy.py6
-rw-r--r--services/strategy-engine/tests/test_indicators.py9
-rw-r--r--services/strategy-engine/tests/test_macd_strategy.py6
-rw-r--r--services/strategy-engine/tests/test_moc_strategy.py7
-rw-r--r--services/strategy-engine/tests/test_multi_symbol.py10
-rw-r--r--services/strategy-engine/tests/test_plugin_loader.py2
-rw-r--r--services/strategy-engine/tests/test_rsi_strategy.py6
-rw-r--r--services/strategy-engine/tests/test_stock_selector.py37
-rw-r--r--services/strategy-engine/tests/test_strategy_validation.py8
-rw-r--r--services/strategy-engine/tests/test_volume_profile_strategy.py15
-rw-r--r--services/strategy-engine/tests/test_vwap_strategy.py12
92 files changed, 679 insertions, 447 deletions
diff --git a/services/api/Dockerfile b/services/api/Dockerfile
index b942075..93d2b75 100644
--- a/services/api/Dockerfile
+++ b/services/api/Dockerfile
@@ -1,11 +1,18 @@
-FROM python:3.12-slim
+FROM python:3.12-slim AS builder
WORKDIR /app
COPY shared/ shared/
RUN pip install --no-cache-dir ./shared
COPY services/api/ services/api/
RUN pip install --no-cache-dir ./services/api
-COPY services/strategy-engine/strategies/ /app/strategies/
COPY services/strategy-engine/ services/strategy-engine/
RUN pip install --no-cache-dir ./services/strategy-engine
-ENV PYTHONPATH=/app
-CMD ["uvicorn", "trading_api.main:app", "--host", "0.0.0.0", "--port", "8000"]
+
+FROM python:3.12-slim
+RUN useradd -r -s /bin/false appuser
+WORKDIR /app
+COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
+COPY --from=builder /usr/local/bin /usr/local/bin
+COPY services/strategy-engine/strategies/ /app/strategies/
+ENV PYTHONPATH=/app STRATEGIES_DIR=/app/strategies
+USER appuser
+CMD ["uvicorn", "trading_api.main:app", "--host", "0.0.0.0", "--port", "8000", "--timeout-graceful-shutdown", "30"]
diff --git a/services/api/pyproject.toml b/services/api/pyproject.toml
index fd2598d..95099d2 100644
--- a/services/api/pyproject.toml
+++ b/services/api/pyproject.toml
@@ -3,11 +3,7 @@ name = "trading-api"
version = "0.1.0"
description = "REST API for the trading platform"
requires-python = ">=3.12"
-dependencies = [
- "fastapi>=0.110",
- "uvicorn>=0.27",
- "trading-shared",
-]
+dependencies = ["fastapi>=0.110,<1", "uvicorn>=0.27,<1", "slowapi>=0.1.9,<1", "trading-shared"]
[project.optional-dependencies]
dev = ["pytest>=8.0", "pytest-asyncio>=0.23", "httpx>=0.27"]
diff --git a/services/api/src/trading_api/dependencies/__init__.py b/services/api/src/trading_api/dependencies/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/services/api/src/trading_api/dependencies/__init__.py
diff --git a/services/api/src/trading_api/dependencies/auth.py b/services/api/src/trading_api/dependencies/auth.py
new file mode 100644
index 0000000..a5e76c1
--- /dev/null
+++ b/services/api/src/trading_api/dependencies/auth.py
@@ -0,0 +1,29 @@
+"""Bearer token authentication dependency."""
+
+import logging
+
+from fastapi import Depends, HTTPException, status
+from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
+
+from shared.config import Settings
+
+logger = logging.getLogger(__name__)
+
+_security = HTTPBearer(auto_error=False)
+_settings = Settings()
+
+
+async def verify_token(
+ credentials: HTTPAuthorizationCredentials | None = Depends(_security),
+) -> None:
+ """Verify Bearer token. Skip auth if API_AUTH_TOKEN is not configured."""
+ token = _settings.api_auth_token.get_secret_value()
+ if not token:
+ return # Auth disabled in dev mode
+
+ if credentials is None or credentials.credentials != token:
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail="Invalid or missing authentication token",
+ headers={"WWW-Authenticate": "Bearer"},
+ )
diff --git a/services/api/src/trading_api/main.py b/services/api/src/trading_api/main.py
index 39f7b43..05c6d2f 100644
--- a/services/api/src/trading_api/main.py
+++ b/services/api/src/trading_api/main.py
@@ -1,33 +1,71 @@
"""Trading Platform REST API."""
+import logging
from contextlib import asynccontextmanager
-from fastapi import FastAPI
+from fastapi import Depends, FastAPI
+from fastapi.middleware.cors import CORSMiddleware
+from slowapi import Limiter, _rate_limit_exceeded_handler
+from slowapi.errors import RateLimitExceeded
+from slowapi.util import get_remote_address
from shared.config import Settings
from shared.db import Database
+from trading_api.dependencies.auth import verify_token
+from trading_api.routers import orders, portfolio, strategies
-from trading_api.routers import portfolio, orders, strategies
+logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
settings = Settings()
- app.state.db = Database(settings.database_url)
+ if not settings.api_auth_token.get_secret_value():
+ logger.warning("API_AUTH_TOKEN not set — authentication is disabled")
+ app.state.db = Database(settings.database_url.get_secret_value())
await app.state.db.connect()
yield
await app.state.db.close()
+cfg = Settings()
+
+limiter = Limiter(key_func=get_remote_address)
+
app = FastAPI(
title="Trading Platform API",
version="0.1.0",
lifespan=lifespan,
)
-app.include_router(portfolio.router, prefix="/api/v1/portfolio", tags=["portfolio"])
-app.include_router(orders.router, prefix="/api/v1/orders", tags=["orders"])
-app.include_router(strategies.router, prefix="/api/v1/strategies", tags=["strategies"])
+app.state.limiter = limiter
+app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
+
+app.add_middleware(
+ CORSMiddleware,
+ allow_origins=cfg.cors_origins.split(","),
+ allow_methods=["GET", "POST"],
+ allow_headers=["Authorization", "Content-Type"],
+)
+
+app.include_router(
+ portfolio.router,
+ prefix="/api/v1/portfolio",
+ tags=["portfolio"],
+ dependencies=[Depends(verify_token)],
+)
+app.include_router(
+ orders.router,
+ prefix="/api/v1/orders",
+ tags=["orders"],
+ dependencies=[Depends(verify_token)],
+)
+app.include_router(
+ strategies.router,
+ prefix="/api/v1/strategies",
+ tags=["strategies"],
+ dependencies=[Depends(verify_token)],
+)
@app.get("/health")
diff --git a/services/api/src/trading_api/routers/orders.py b/services/api/src/trading_api/routers/orders.py
index c69dc10..b664e2a 100644
--- a/services/api/src/trading_api/routers/orders.py
+++ b/services/api/src/trading_api/routers/orders.py
@@ -2,17 +2,23 @@
import logging
-from fastapi import APIRouter, HTTPException, Request
-from shared.sa_models import OrderRow, SignalRow
+from fastapi import APIRouter, HTTPException, Query, Request
+from slowapi import Limiter
+from slowapi.util import get_remote_address
from sqlalchemy import select
+from sqlalchemy.exc import OperationalError
+
+from shared.sa_models import OrderRow, SignalRow
logger = logging.getLogger(__name__)
router = APIRouter()
+limiter = Limiter(key_func=get_remote_address)
@router.get("/")
-async def get_orders(request: Request, limit: int = 50):
+@limiter.limit("60/minute")
+async def get_orders(request: Request, limit: int = Query(50, ge=1, le=1000)):
"""Get recent orders."""
try:
db = request.app.state.db
@@ -35,13 +41,17 @@ async def get_orders(request: Request, limit: int = 50):
}
for r in rows
]
+ except OperationalError as exc:
+ logger.error("Database error fetching orders: %s", exc)
+ raise HTTPException(status_code=503, detail="Database unavailable") from exc
except Exception as exc:
- logger.error("Failed to get orders: %s", exc)
- raise HTTPException(status_code=500, detail="Failed to retrieve orders")
+ logger.error("Failed to get orders: %s", exc, exc_info=True)
+ raise HTTPException(status_code=500, detail="Failed to retrieve orders") from exc
@router.get("/signals")
-async def get_signals(request: Request, limit: int = 50):
+@limiter.limit("60/minute")
+async def get_signals(request: Request, limit: int = Query(50, ge=1, le=1000)):
"""Get recent signals."""
try:
db = request.app.state.db
@@ -62,6 +72,9 @@ async def get_signals(request: Request, limit: int = 50):
}
for r in rows
]
+ except OperationalError as exc:
+ logger.error("Database error fetching signals: %s", exc)
+ raise HTTPException(status_code=503, detail="Database unavailable") from exc
except Exception as exc:
- logger.error("Failed to get signals: %s", exc)
- raise HTTPException(status_code=500, detail="Failed to retrieve signals")
+ logger.error("Failed to get signals: %s", exc, exc_info=True)
+ raise HTTPException(status_code=500, detail="Failed to retrieve signals") from exc
diff --git a/services/api/src/trading_api/routers/portfolio.py b/services/api/src/trading_api/routers/portfolio.py
index d76d85d..56bee7c 100644
--- a/services/api/src/trading_api/routers/portfolio.py
+++ b/services/api/src/trading_api/routers/portfolio.py
@@ -2,9 +2,11 @@
import logging
-from fastapi import APIRouter, HTTPException, Request
-from shared.sa_models import PositionRow
+from fastapi import APIRouter, HTTPException, Query, Request
from sqlalchemy import select
+from sqlalchemy.exc import OperationalError
+
+from shared.sa_models import PositionRow
logger = logging.getLogger(__name__)
@@ -29,13 +31,16 @@ async def get_positions(request: Request):
}
for r in rows
]
+ except OperationalError as exc:
+ logger.error("Database error fetching positions: %s", exc)
+ raise HTTPException(status_code=503, detail="Database unavailable") from exc
except Exception as exc:
- logger.error("Failed to get positions: %s", exc)
- raise HTTPException(status_code=500, detail="Failed to retrieve positions")
+ logger.error("Failed to get positions: %s", exc, exc_info=True)
+ raise HTTPException(status_code=500, detail="Failed to retrieve positions") from exc
@router.get("/snapshots")
-async def get_snapshots(request: Request, days: int = 30):
+async def get_snapshots(request: Request, days: int = Query(30, ge=1, le=365)):
"""Get portfolio snapshots for the last N days."""
try:
db = request.app.state.db
@@ -49,6 +54,9 @@ async def get_snapshots(request: Request, days: int = 30):
}
for s in snapshots
]
+ except OperationalError as exc:
+ logger.error("Database error fetching snapshots: %s", exc)
+ raise HTTPException(status_code=503, detail="Database unavailable") from exc
except Exception as exc:
- logger.error("Failed to get snapshots: %s", exc)
- raise HTTPException(status_code=500, detail="Failed to retrieve snapshots")
+ logger.error("Failed to get snapshots: %s", exc, exc_info=True)
+ raise HTTPException(status_code=500, detail="Failed to retrieve snapshots") from exc
diff --git a/services/api/src/trading_api/routers/strategies.py b/services/api/src/trading_api/routers/strategies.py
index 7ddd54e..157094c 100644
--- a/services/api/src/trading_api/routers/strategies.py
+++ b/services/api/src/trading_api/routers/strategies.py
@@ -42,6 +42,9 @@ async def list_strategies():
}
for s in strategies
]
+ except (ImportError, FileNotFoundError) as exc:
+ logger.error("Strategy loading error: %s", exc)
+ raise HTTPException(status_code=503, detail="Strategy engine unavailable") from exc
except Exception as exc:
- logger.error("Failed to list strategies: %s", exc)
- raise HTTPException(status_code=500, detail="Failed to list strategies")
+ logger.error("Failed to list strategies: %s", exc, exc_info=True)
+ raise HTTPException(status_code=500, detail="Failed to list strategies") from exc
diff --git a/services/api/tests/test_api.py b/services/api/tests/test_api.py
index 669143b..f3b0a47 100644
--- a/services/api/tests/test_api.py
+++ b/services/api/tests/test_api.py
@@ -1,6 +1,7 @@
"""Tests for the REST API."""
from unittest.mock import AsyncMock, patch
+
from fastapi.testclient import TestClient
diff --git a/services/api/tests/test_orders_router.py b/services/api/tests/test_orders_router.py
index 0658619..52252c5 100644
--- a/services/api/tests/test_orders_router.py
+++ b/services/api/tests/test_orders_router.py
@@ -1,10 +1,10 @@
"""Tests for orders API router."""
-import pytest
from unittest.mock import AsyncMock, MagicMock
-from fastapi.testclient import TestClient
-from fastapi import FastAPI
+import pytest
+from fastapi import FastAPI
+from fastapi.testclient import TestClient
from trading_api.routers.orders import router
diff --git a/services/api/tests/test_portfolio_router.py b/services/api/tests/test_portfolio_router.py
index 3bd1b2c..8cd8ff8 100644
--- a/services/api/tests/test_portfolio_router.py
+++ b/services/api/tests/test_portfolio_router.py
@@ -1,11 +1,11 @@
"""Tests for portfolio API router."""
-import pytest
from decimal import Decimal
from unittest.mock import AsyncMock, MagicMock
-from fastapi.testclient import TestClient
-from fastapi import FastAPI
+import pytest
+from fastapi import FastAPI
+from fastapi.testclient import TestClient
from trading_api.routers.portfolio import router
diff --git a/services/backtester/Dockerfile b/services/backtester/Dockerfile
index 9a4f439..1108e42 100644
--- a/services/backtester/Dockerfile
+++ b/services/backtester/Dockerfile
@@ -1,10 +1,17 @@
-FROM python:3.12-slim
+FROM python:3.12-slim AS builder
WORKDIR /app
COPY shared/ shared/
RUN pip install --no-cache-dir ./shared
COPY services/backtester/ services/backtester/
RUN pip install --no-cache-dir ./services/backtester
+
+FROM python:3.12-slim
+RUN useradd -r -s /bin/false appuser
+WORKDIR /app
+COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
+COPY --from=builder /usr/local/bin /usr/local/bin
COPY services/strategy-engine/strategies/ /app/strategies/
ENV STRATEGIES_DIR=/app/strategies
ENV PYTHONPATH=/app
+USER appuser
CMD ["python", "-m", "backtester.main"]
diff --git a/services/backtester/pyproject.toml b/services/backtester/pyproject.toml
index 2601d04..034bcf6 100644
--- a/services/backtester/pyproject.toml
+++ b/services/backtester/pyproject.toml
@@ -3,7 +3,7 @@ name = "backtester"
version = "0.1.0"
description = "Strategy backtesting engine"
requires-python = ">=3.12"
-dependencies = ["pandas>=2.0", "numpy>=1.20", "rich>=13.0", "trading-shared"]
+dependencies = ["pandas>=2.1,<3", "numpy>=1.26,<3", "rich>=13.0,<14", "trading-shared"]
[project.optional-dependencies]
dev = ["pytest>=8.0", "pytest-asyncio>=0.23"]
diff --git a/services/backtester/src/backtester/engine.py b/services/backtester/src/backtester/engine.py
index b03715d..fcf48f1 100644
--- a/services/backtester/src/backtester/engine.py
+++ b/services/backtester/src/backtester/engine.py
@@ -6,10 +6,9 @@ from dataclasses import dataclass, field
from decimal import Decimal
from typing import Protocol
-from shared.models import Candle, Signal
-
from backtester.metrics import DetailedMetrics, TradeRecord, compute_detailed_metrics
from backtester.simulator import OrderSimulator, SimulatedTrade
+from shared.models import Candle, Signal
class StrategyProtocol(Protocol):
@@ -101,7 +100,7 @@ class BacktestEngine:
final_balance = simulator.balance
if candles:
last_price = candles[-1].close
- for symbol, qty in simulator.positions.items():
+ for qty in simulator.positions.values():
if qty > Decimal("0"):
final_balance += qty * last_price
elif qty < Decimal("0"):
diff --git a/services/backtester/src/backtester/main.py b/services/backtester/src/backtester/main.py
index a4cea76..dbde00b 100644
--- a/services/backtester/src/backtester/main.py
+++ b/services/backtester/src/backtester/main.py
@@ -17,11 +17,11 @@ _STRATEGIES_DIR = Path(
if _STRATEGIES_DIR.parent not in [Path(p) for p in sys.path]:
sys.path.insert(0, str(_STRATEGIES_DIR.parent))
-from shared.db import Database # noqa: E402
-from shared.models import Candle # noqa: E402
from backtester.config import BacktestConfig # noqa: E402
from backtester.engine import BacktestEngine # noqa: E402
from backtester.reporter import format_report # noqa: E402
+from shared.db import Database # noqa: E402
+from shared.models import Candle # noqa: E402
async def run_backtest() -> str:
@@ -45,7 +45,7 @@ async def run_backtest() -> str:
except Exception as exc:
raise RuntimeError(f"Failed to load strategy '{config.strategy_name}': {exc}") from exc
- db = Database(config.database_url)
+ db = Database(config.database_url.get_secret_value())
await db.connect()
try:
rows = await db.get_candles(config.symbol, config.timeframe, config.candle_limit)
diff --git a/services/backtester/src/backtester/metrics.py b/services/backtester/src/backtester/metrics.py
index 239cb6f..c7b032b 100644
--- a/services/backtester/src/backtester/metrics.py
+++ b/services/backtester/src/backtester/metrics.py
@@ -266,7 +266,7 @@ def compute_detailed_metrics(
largest_win=largest_win,
largest_loss=largest_loss,
avg_holding_period=avg_holding,
- trade_pairs=[p for p in pairs],
+ trade_pairs=list(pairs),
risk_free_rate=risk_free_rate,
recovery_factor=recovery_factor,
max_consecutive_losses=max_consec_losses,
diff --git a/services/backtester/src/backtester/simulator.py b/services/backtester/src/backtester/simulator.py
index 64c88dd..6bce18b 100644
--- a/services/backtester/src/backtester/simulator.py
+++ b/services/backtester/src/backtester/simulator.py
@@ -1,9 +1,8 @@
"""Simulated order executor for backtesting."""
from dataclasses import dataclass, field
-from datetime import datetime, timezone
+from datetime import UTC, datetime
from decimal import Decimal
-from typing import Optional
from shared.models import OrderSide, Signal
@@ -16,7 +15,7 @@ class SimulatedTrade:
quantity: Decimal
balance_after: Decimal
fee: Decimal = Decimal("0")
- timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
+ timestamp: datetime = field(default_factory=lambda: datetime.now(UTC))
@dataclass
@@ -27,8 +26,8 @@ class OpenPosition:
side: OrderSide # BUY = long, SELL = short
entry_price: Decimal
quantity: Decimal
- stop_loss: Optional[Decimal] = None
- take_profit: Optional[Decimal] = None
+ stop_loss: Decimal | None = None
+ take_profit: Decimal | None = None
class OrderSimulator:
@@ -70,7 +69,7 @@ class OrderSimulator:
remaining: list[OpenPosition] = []
for pos in self.open_positions:
triggered = False
- exit_price: Optional[Decimal] = None
+ exit_price: Decimal | None = None
if pos.side == OrderSide.BUY: # Long position
if pos.stop_loss is not None and candle_low <= pos.stop_loss:
@@ -125,12 +124,12 @@ class OrderSimulator:
def execute(
self,
signal: Signal,
- timestamp: Optional[datetime] = None,
- stop_loss: Optional[Decimal] = None,
- take_profit: Optional[Decimal] = None,
+ timestamp: datetime | None = None,
+ stop_loss: Decimal | None = None,
+ take_profit: Decimal | None = None,
) -> bool:
"""Execute a signal with slippage and fees. Returns True if accepted."""
- ts = timestamp or datetime.now(timezone.utc)
+ ts = timestamp or datetime.now(UTC)
exec_price = self._apply_slippage(signal.price, signal.side)
fee = self._calculate_fee(exec_price, signal.quantity)
diff --git a/services/backtester/src/backtester/walk_forward.py b/services/backtester/src/backtester/walk_forward.py
index c7b7fd8..720ad5e 100644
--- a/services/backtester/src/backtester/walk_forward.py
+++ b/services/backtester/src/backtester/walk_forward.py
@@ -1,11 +1,11 @@
"""Walk-forward analysis for strategy parameter optimization."""
+from collections.abc import Callable
from dataclasses import dataclass, field
from decimal import Decimal
-from typing import Callable
-from shared.models import Candle
from backtester.engine import BacktestEngine, BacktestResult, StrategyProtocol
+from shared.models import Candle
@dataclass
diff --git a/services/backtester/tests/test_engine.py b/services/backtester/tests/test_engine.py
index 4794e63..f789831 100644
--- a/services/backtester/tests/test_engine.py
+++ b/services/backtester/tests/test_engine.py
@@ -1,20 +1,19 @@
"""Tests for the BacktestEngine."""
-from datetime import datetime, timezone
+from datetime import UTC, datetime
from decimal import Decimal
from unittest.mock import MagicMock
-
-from shared.models import Candle, Signal, OrderSide
-
from backtester.engine import BacktestEngine
+from shared.models import Candle, OrderSide, Signal
+
def make_candle(symbol: str, price: float, timeframe: str = "1h") -> Candle:
return Candle(
symbol=symbol,
timeframe=timeframe,
- open_time=datetime.now(timezone.utc),
+ open_time=datetime.now(UTC),
open=Decimal(str(price)),
high=Decimal(str(price * 1.01)),
low=Decimal(str(price * 0.99)),
diff --git a/services/backtester/tests/test_metrics.py b/services/backtester/tests/test_metrics.py
index 55f5b6c..13e545e 100644
--- a/services/backtester/tests/test_metrics.py
+++ b/services/backtester/tests/test_metrics.py
@@ -1,17 +1,16 @@
"""Tests for detailed backtest metrics."""
import math
-from datetime import datetime, timedelta, timezone
+from datetime import UTC, datetime, timedelta
from decimal import Decimal
import pytest
-
from backtester.metrics import TradeRecord, compute_detailed_metrics
def _make_trade(side: str, price: str, minutes_offset: int = 0) -> TradeRecord:
return TradeRecord(
- time=datetime(2025, 1, 1, tzinfo=timezone.utc) + timedelta(minutes=minutes_offset),
+ time=datetime(2025, 1, 1, tzinfo=UTC) + timedelta(minutes=minutes_offset),
symbol="AAPL",
side=side,
price=Decimal(price),
@@ -124,7 +123,7 @@ def test_consecutive_losses():
def test_risk_free_rate_affects_sharpe():
"""Higher risk-free rate should lower Sharpe ratio."""
- base = datetime(2025, 1, 1, tzinfo=timezone.utc)
+ base = datetime(2025, 1, 1, tzinfo=UTC)
trades = [
TradeRecord(
time=base, symbol="AAPL", side="BUY", price=Decimal("100"), quantity=Decimal("1")
@@ -184,7 +183,7 @@ def test_daily_returns_populated():
def test_fee_subtracted_from_pnl():
"""Fees should be subtracted from trade PnL."""
- base = datetime(2025, 1, 1, tzinfo=timezone.utc)
+ base = datetime(2025, 1, 1, tzinfo=UTC)
trades_with_fees = [
TradeRecord(
time=base,
diff --git a/services/backtester/tests/test_simulator.py b/services/backtester/tests/test_simulator.py
index 62e2cdb..f85594f 100644
--- a/services/backtester/tests/test_simulator.py
+++ b/services/backtester/tests/test_simulator.py
@@ -1,11 +1,12 @@
"""Tests for the OrderSimulator."""
-from datetime import datetime, timezone
+from datetime import UTC, datetime
from decimal import Decimal
-from shared.models import OrderSide, Signal
from backtester.simulator import OrderSimulator
+from shared.models import OrderSide, Signal
+
def make_signal(
symbol: str,
@@ -135,7 +136,7 @@ def test_stop_loss_triggers():
signal = make_signal("AAPL", OrderSide.BUY, "50000", "0.1")
sim.execute(signal, stop_loss=Decimal("48000"))
- ts = datetime(2025, 1, 1, tzinfo=timezone.utc)
+ ts = datetime(2025, 1, 1, tzinfo=UTC)
closed = sim.check_stops(
candle_high=Decimal("50500"),
candle_low=Decimal("47500"), # below stop_loss
@@ -153,7 +154,7 @@ def test_take_profit_triggers():
signal = make_signal("AAPL", OrderSide.BUY, "50000", "0.1")
sim.execute(signal, take_profit=Decimal("55000"))
- ts = datetime(2025, 1, 1, tzinfo=timezone.utc)
+ ts = datetime(2025, 1, 1, tzinfo=UTC)
closed = sim.check_stops(
candle_high=Decimal("56000"), # above take_profit
candle_low=Decimal("50000"),
@@ -171,7 +172,7 @@ def test_stop_not_triggered_within_range():
signal = make_signal("AAPL", OrderSide.BUY, "50000", "0.1")
sim.execute(signal, stop_loss=Decimal("48000"), take_profit=Decimal("55000"))
- ts = datetime(2025, 1, 1, tzinfo=timezone.utc)
+ ts = datetime(2025, 1, 1, tzinfo=UTC)
closed = sim.check_stops(
candle_high=Decimal("52000"),
candle_low=Decimal("49000"),
@@ -212,7 +213,7 @@ def test_short_stop_loss():
signal = make_signal("AAPL", OrderSide.SELL, "50000", "0.1")
sim.execute(signal, stop_loss=Decimal("52000"))
- ts = datetime(2025, 1, 1, tzinfo=timezone.utc)
+ ts = datetime(2025, 1, 1, tzinfo=UTC)
closed = sim.check_stops(
candle_high=Decimal("53000"), # above stop_loss
candle_low=Decimal("49000"),
diff --git a/services/backtester/tests/test_walk_forward.py b/services/backtester/tests/test_walk_forward.py
index 96abb6e..b1aa12c 100644
--- a/services/backtester/tests/test_walk_forward.py
+++ b/services/backtester/tests/test_walk_forward.py
@@ -1,18 +1,18 @@
"""Tests for walk-forward analysis."""
import sys
-from pathlib import Path
+from datetime import UTC, datetime, timedelta
from decimal import Decimal
-from datetime import datetime, timedelta, timezone
-
+from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "strategy-engine"))
-from shared.models import Candle
from backtester.walk_forward import WalkForwardEngine, WalkForwardResult
from strategies.rsi_strategy import RsiStrategy
+from shared.models import Candle
+
def _generate_candles(n=100, base_price=100.0):
candles = []
@@ -23,7 +23,7 @@ def _generate_candles(n=100, base_price=100.0):
Candle(
symbol="AAPL",
timeframe="1h",
- open_time=datetime(2025, 1, 1, tzinfo=timezone.utc) + timedelta(hours=i),
+ open_time=datetime(2025, 1, 1, tzinfo=UTC) + timedelta(hours=i),
open=Decimal(str(price)),
high=Decimal(str(price + 5)),
low=Decimal(str(price - 5)),
diff --git a/services/data-collector/Dockerfile b/services/data-collector/Dockerfile
index 8cb8af4..4d154c5 100644
--- a/services/data-collector/Dockerfile
+++ b/services/data-collector/Dockerfile
@@ -1,8 +1,15 @@
-FROM python:3.12-slim
+FROM python:3.12-slim AS builder
WORKDIR /app
COPY shared/ shared/
RUN pip install --no-cache-dir ./shared
COPY services/data-collector/ services/data-collector/
RUN pip install --no-cache-dir ./services/data-collector
+
+FROM python:3.12-slim
+RUN useradd -r -s /bin/false appuser
+WORKDIR /app
+COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
+COPY --from=builder /usr/local/bin /usr/local/bin
ENV PYTHONPATH=/app
+USER appuser
CMD ["python", "-m", "data_collector.main"]
diff --git a/services/data-collector/src/data_collector/main.py b/services/data-collector/src/data_collector/main.py
index b42b34c..2d44848 100644
--- a/services/data-collector/src/data_collector/main.py
+++ b/services/data-collector/src/data_collector/main.py
@@ -2,6 +2,9 @@
import asyncio
+import aiohttp
+
+from data_collector.config import CollectorConfig
from shared.alpaca import AlpacaClient
from shared.broker import RedisBroker
from shared.db import Database
@@ -11,8 +14,7 @@ from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.models import Candle
from shared.notifier import TelegramNotifier
-
-from data_collector.config import CollectorConfig
+from shared.shutdown import GracefulShutdown
# Health check port: base + 0
HEALTH_PORT_OFFSET = 0
@@ -45,8 +47,10 @@ async def fetch_latest_bars(
volume=Decimal(str(bar["v"])),
)
candles.append(candle)
- except Exception as exc:
- log.warning("fetch_bar_failed", symbol=symbol, error=str(exc))
+ except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc:
+ log.warning("fetch_bar_network_error", symbol=symbol, error=str(exc))
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("fetch_bar_parse_error", symbol=symbol, error=str(exc))
return candles
@@ -56,18 +60,18 @@ async def run() -> None:
metrics = ServiceMetrics("data_collector")
notifier = TelegramNotifier(
- bot_token=config.telegram_bot_token,
+ bot_token=config.telegram_bot_token.get_secret_value(),
chat_id=config.telegram_chat_id,
)
- db = Database(config.database_url)
+ db = Database(config.database_url.get_secret_value())
await db.connect()
- broker = RedisBroker(config.redis_url)
+ broker = RedisBroker(config.redis_url.get_secret_value())
alpaca = AlpacaClient(
- api_key=config.alpaca_api_key,
- api_secret=config.alpaca_api_secret,
+ api_key=config.alpaca_api_key.get_secret_value(),
+ api_secret=config.alpaca_api_secret.get_secret_value(),
paper=config.alpaca_paper,
)
@@ -83,14 +87,17 @@ async def run() -> None:
symbols = config.symbols
timeframe = config.timeframes[0] if config.timeframes else "1Day"
+ shutdown = GracefulShutdown()
+ shutdown.install_handlers()
+
log.info("starting", symbols=symbols, timeframe=timeframe, poll_interval=poll_interval)
try:
- while True:
+ while not shutdown.is_shutting_down:
# Check if market is open
try:
is_open = await alpaca.is_market_open()
- except Exception:
+ except (aiohttp.ClientError, ConnectionError, TimeoutError):
is_open = False
if is_open:
@@ -109,7 +116,7 @@ async def run() -> None:
await asyncio.sleep(poll_interval)
except Exception as exc:
- log.error("fatal_error", error=str(exc))
+ log.error("fatal_error", error=str(exc), exc_info=True)
await notifier.send_error(str(exc), "data-collector")
raise
finally:
diff --git a/services/data-collector/tests/test_storage.py b/services/data-collector/tests/test_storage.py
index ffffa40..51f3aee 100644
--- a/services/data-collector/tests/test_storage.py
+++ b/services/data-collector/tests/test_storage.py
@@ -1,19 +1,20 @@
"""Tests for storage module."""
-import pytest
+from datetime import UTC, datetime
from decimal import Decimal
-from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock
-from shared.models import Candle
+import pytest
from data_collector.storage import CandleStorage
+from shared.models import Candle
+
def _make_candle(symbol: str = "AAPL") -> Candle:
return Candle(
symbol=symbol,
timeframe="1m",
- open_time=datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
+ open_time=datetime(2024, 1, 1, 0, 0, 0, tzinfo=UTC),
open=Decimal("30000"),
high=Decimal("30100"),
low=Decimal("29900"),
diff --git a/services/news-collector/Dockerfile b/services/news-collector/Dockerfile
index a8e5902..7accee2 100644
--- a/services/news-collector/Dockerfile
+++ b/services/news-collector/Dockerfile
@@ -1,9 +1,17 @@
-FROM python:3.12-slim
+FROM python:3.12-slim AS builder
WORKDIR /app
COPY shared/ shared/
RUN pip install --no-cache-dir ./shared
COPY services/news-collector/ services/news-collector/
RUN pip install --no-cache-dir ./services/news-collector
-RUN python -c "import nltk; nltk.download('vader_lexicon', quiet=True)"
+RUN python -c "import nltk; nltk.download('vader_lexicon', download_dir='/usr/local/nltk_data')"
+
+FROM python:3.12-slim
+RUN useradd -r -s /bin/false appuser
+WORKDIR /app
+COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
+COPY --from=builder /usr/local/bin /usr/local/bin
+COPY --from=builder /usr/local/nltk_data /usr/local/nltk_data
ENV PYTHONPATH=/app
+USER appuser
CMD ["python", "-m", "news_collector.main"]
diff --git a/services/news-collector/pyproject.toml b/services/news-collector/pyproject.toml
index 14c856a..6e62b70 100644
--- a/services/news-collector/pyproject.toml
+++ b/services/news-collector/pyproject.toml
@@ -3,12 +3,7 @@ name = "news-collector"
version = "0.1.0"
description = "News and sentiment data collector service"
requires-python = ">=3.12"
-dependencies = [
- "trading-shared",
- "feedparser>=6.0",
- "nltk>=3.8",
- "aiohttp>=3.9",
-]
+dependencies = ["trading-shared", "feedparser>=6.0,<7", "nltk>=3.8,<4", "aiohttp>=3.9,<4"]
[project.optional-dependencies]
dev = [
diff --git a/services/news-collector/src/news_collector/collectors/fear_greed.py b/services/news-collector/src/news_collector/collectors/fear_greed.py
index f79f716..42e8f88 100644
--- a/services/news-collector/src/news_collector/collectors/fear_greed.py
+++ b/services/news-collector/src/news_collector/collectors/fear_greed.py
@@ -2,7 +2,6 @@
import logging
from dataclasses import dataclass
-from typing import Optional
import aiohttp
@@ -26,7 +25,7 @@ class FearGreedCollector(BaseCollector):
async def is_available(self) -> bool:
return True
- async def _fetch_index(self) -> Optional[dict]:
+ async def _fetch_index(self) -> dict | None:
headers = {"User-Agent": "Mozilla/5.0"}
try:
async with aiohttp.ClientSession() as session:
@@ -50,7 +49,7 @@ class FearGreedCollector(BaseCollector):
return "Greed"
return "Extreme Greed"
- async def collect(self) -> Optional[FearGreedResult]:
+ async def collect(self) -> FearGreedResult | None:
data = await self._fetch_index()
if data is None:
return None
diff --git a/services/news-collector/src/news_collector/collectors/fed.py b/services/news-collector/src/news_collector/collectors/fed.py
index fce4842..52128e5 100644
--- a/services/news-collector/src/news_collector/collectors/fed.py
+++ b/services/news-collector/src/news_collector/collectors/fed.py
@@ -3,7 +3,7 @@
import asyncio
import logging
from calendar import timegm
-from datetime import datetime, timezone
+from datetime import UTC, datetime
import feedparser
from nltk.sentiment.vader import SentimentIntensityAnalyzer
@@ -76,10 +76,10 @@ class FedCollector(BaseCollector):
if published_parsed:
try:
ts = timegm(published_parsed)
- return datetime.fromtimestamp(ts, tz=timezone.utc)
+ return datetime.fromtimestamp(ts, tz=UTC)
except Exception:
pass
- return datetime.now(timezone.utc)
+ return datetime.now(UTC)
async def collect(self) -> list[NewsItem]:
try:
diff --git a/services/news-collector/src/news_collector/collectors/finnhub.py b/services/news-collector/src/news_collector/collectors/finnhub.py
index 13e3602..67cb455 100644
--- a/services/news-collector/src/news_collector/collectors/finnhub.py
+++ b/services/news-collector/src/news_collector/collectors/finnhub.py
@@ -1,7 +1,7 @@
"""Finnhub news collector with VADER sentiment analysis."""
import logging
-from datetime import datetime, timezone
+from datetime import UTC, datetime
import aiohttp
from nltk.sentiment.vader import SentimentIntensityAnalyzer
@@ -64,7 +64,7 @@ class FinnhubCollector(BaseCollector):
sentiment = sentiment_scores["compound"]
ts = article.get("datetime", 0)
- published_at = datetime.fromtimestamp(ts, tz=timezone.utc)
+ published_at = datetime.fromtimestamp(ts, tz=UTC)
related = article.get("related", "")
symbols = [t.strip() for t in related.split(",") if t.strip()] if related else []
diff --git a/services/news-collector/src/news_collector/collectors/reddit.py b/services/news-collector/src/news_collector/collectors/reddit.py
index 226a2f9..4e9d6f5 100644
--- a/services/news-collector/src/news_collector/collectors/reddit.py
+++ b/services/news-collector/src/news_collector/collectors/reddit.py
@@ -2,7 +2,7 @@
import logging
import re
-from datetime import datetime, timezone
+from datetime import UTC, datetime
import aiohttp
from nltk.sentiment.vader import SentimentIntensityAnalyzer
@@ -78,7 +78,7 @@ class RedditCollector(BaseCollector):
symbols = list(dict.fromkeys(_TICKER_PATTERN.findall(combined)))
created_utc = post_data.get("created_utc", 0)
- published_at = datetime.fromtimestamp(created_utc, tz=timezone.utc)
+ published_at = datetime.fromtimestamp(created_utc, tz=UTC)
items.append(
NewsItem(
diff --git a/services/news-collector/src/news_collector/collectors/rss.py b/services/news-collector/src/news_collector/collectors/rss.py
index ddf8503..bca0e9f 100644
--- a/services/news-collector/src/news_collector/collectors/rss.py
+++ b/services/news-collector/src/news_collector/collectors/rss.py
@@ -3,7 +3,7 @@
import asyncio
import logging
import re
-from datetime import datetime, timezone
+from datetime import UTC, datetime
from time import mktime
import feedparser
@@ -56,10 +56,10 @@ class RSSCollector(BaseCollector):
if parsed_time:
try:
ts = mktime(parsed_time)
- return datetime.fromtimestamp(ts, tz=timezone.utc)
+ return datetime.fromtimestamp(ts, tz=UTC)
except Exception:
pass
- return datetime.now(timezone.utc)
+ return datetime.now(UTC)
async def collect(self) -> list[NewsItem]:
try:
diff --git a/services/news-collector/src/news_collector/collectors/sec_edgar.py b/services/news-collector/src/news_collector/collectors/sec_edgar.py
index ca1d070..d88518f 100644
--- a/services/news-collector/src/news_collector/collectors/sec_edgar.py
+++ b/services/news-collector/src/news_collector/collectors/sec_edgar.py
@@ -1,13 +1,13 @@
"""SEC EDGAR filing collector (free, no API key required)."""
import logging
-from datetime import datetime, timezone
+from datetime import UTC, datetime
import aiohttp
from nltk.sentiment.vader import SentimentIntensityAnalyzer
-from shared.models import NewsCategory, NewsItem
from news_collector.collectors.base import BaseCollector
+from shared.models import NewsCategory, NewsItem
logger = logging.getLogger(__name__)
@@ -58,7 +58,7 @@ class SecEdgarCollector(BaseCollector):
async def collect(self) -> list[NewsItem]:
filings_data = await self._fetch_recent_filings()
items = []
- today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
+ today = datetime.now(UTC).strftime("%Y-%m-%d")
for company_data in filings_data:
tickers = [t["ticker"] for t in company_data.get("tickers", [])]
@@ -87,9 +87,7 @@ class SecEdgarCollector(BaseCollector):
headline=headline,
summary=desc,
url=f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&accession={accession}",
- published_at=datetime.strptime(filing_date, "%Y-%m-%d").replace(
- tzinfo=timezone.utc
- ),
+ published_at=datetime.strptime(filing_date, "%Y-%m-%d").replace(tzinfo=UTC),
symbols=tickers,
sentiment=self._vader.polarity_scores(headline)["compound"],
category=NewsCategory.FILING,
diff --git a/services/news-collector/src/news_collector/collectors/truth_social.py b/services/news-collector/src/news_collector/collectors/truth_social.py
index 33ebc86..e2acd88 100644
--- a/services/news-collector/src/news_collector/collectors/truth_social.py
+++ b/services/news-collector/src/news_collector/collectors/truth_social.py
@@ -2,7 +2,7 @@
import logging
import re
-from datetime import datetime, timezone
+from datetime import UTC, datetime
import aiohttp
from nltk.sentiment.vader import SentimentIntensityAnalyzer
@@ -67,7 +67,7 @@ class TruthSocialCollector(BaseCollector):
try:
published_at = datetime.fromisoformat(created_at_str.replace("Z", "+00:00"))
except Exception:
- published_at = datetime.now(timezone.utc)
+ published_at = datetime.now(UTC)
items.append(
NewsItem(
diff --git a/services/news-collector/src/news_collector/config.py b/services/news-collector/src/news_collector/config.py
index 70d98f1..6e78eba 100644
--- a/services/news-collector/src/news_collector/config.py
+++ b/services/news-collector/src/news_collector/config.py
@@ -5,6 +5,3 @@ from shared.config import Settings
class NewsCollectorConfig(Settings):
health_port: int = 8084
- finnhub_api_key: str = ""
- news_poll_interval: int = 300
- sentiment_aggregate_interval: int = 900
diff --git a/services/news-collector/src/news_collector/main.py b/services/news-collector/src/news_collector/main.py
index 3493f7c..c39fa67 100644
--- a/services/news-collector/src/news_collector/main.py
+++ b/services/news-collector/src/news_collector/main.py
@@ -1,8 +1,18 @@
"""News Collector Service — fetches news from multiple sources and aggregates sentiment."""
import asyncio
-from datetime import datetime, timezone
+from datetime import UTC, datetime
+import aiohttp
+
+from news_collector.collectors.fear_greed import FearGreedCollector
+from news_collector.collectors.fed import FedCollector
+from news_collector.collectors.finnhub import FinnhubCollector
+from news_collector.collectors.reddit import RedditCollector
+from news_collector.collectors.rss import RSSCollector
+from news_collector.collectors.sec_edgar import SecEdgarCollector
+from news_collector.collectors.truth_social import TruthSocialCollector
+from news_collector.config import NewsCollectorConfig
from shared.broker import RedisBroker
from shared.db import Database
from shared.events import NewsEvent
@@ -11,20 +21,9 @@ from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.models import NewsItem
from shared.notifier import TelegramNotifier
-from shared.sentiment_models import MarketSentiment
from shared.sentiment import SentimentAggregator
-
-from news_collector.config import NewsCollectorConfig
-from news_collector.collectors.finnhub import FinnhubCollector
-from news_collector.collectors.rss import RSSCollector
-from news_collector.collectors.sec_edgar import SecEdgarCollector
-from news_collector.collectors.truth_social import TruthSocialCollector
-from news_collector.collectors.reddit import RedditCollector
-from news_collector.collectors.fear_greed import FearGreedCollector
-from news_collector.collectors.fed import FedCollector
-
-# Health check port: base + 4
-HEALTH_PORT_OFFSET = 4
+from shared.sentiment_models import MarketSentiment
+from shared.shutdown import GracefulShutdown
async def run_collector_once(collector, db: Database, broker: RedisBroker) -> int:
@@ -53,9 +52,15 @@ async def run_collector_loop(collector, db: Database, broker: RedisBroker, log)
collector=collector.name,
count=count,
)
- except Exception as exc:
+ except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc:
log.warning(
- "collector_error",
+ "collector_network_error",
+ collector=collector.name,
+ error=str(exc),
+ )
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning(
+ "collector_parse_error",
collector=collector.name,
error=str(exc),
)
@@ -74,7 +79,7 @@ async def run_fear_greed_loop(collector: FearGreedCollector, db: Database, log)
vix=None,
fed_stance="neutral",
market_regime=_determine_regime(result.fear_greed, None),
- updated_at=datetime.now(timezone.utc),
+ updated_at=datetime.now(UTC),
)
await db.upsert_market_sentiment(ms)
log.info(
@@ -82,8 +87,10 @@ async def run_fear_greed_loop(collector: FearGreedCollector, db: Database, log)
value=result.fear_greed,
label=result.fear_greed_label,
)
- except Exception as exc:
- log.warning("fear_greed_error", error=str(exc))
+ except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc:
+ log.warning("fear_greed_network_error", error=str(exc))
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("fear_greed_parse_error", error=str(exc))
await asyncio.sleep(collector.poll_interval)
@@ -93,14 +100,16 @@ async def run_aggregator_loop(db: Database, interval: int, log) -> None:
while True:
await asyncio.sleep(interval)
try:
- now = datetime.now(timezone.utc)
+ now = datetime.now(UTC)
news_items = await db.get_recent_news(hours=24)
scores = aggregator.aggregate(news_items, now)
for score in scores.values():
await db.upsert_symbol_score(score)
log.info("aggregation_complete", symbols=len(scores))
- except Exception as exc:
- log.warning("aggregator_error", error=str(exc))
+ except (ConnectionError, TimeoutError) as exc:
+ log.warning("aggregator_network_error", error=str(exc))
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("aggregator_parse_error", error=str(exc))
def _determine_regime(fear_greed: int, vix: float | None) -> str:
@@ -115,14 +124,14 @@ async def run() -> None:
metrics = ServiceMetrics("news_collector")
notifier = TelegramNotifier(
- bot_token=config.telegram_bot_token,
+ bot_token=config.telegram_bot_token.get_secret_value(),
chat_id=config.telegram_chat_id,
)
- db = Database(config.database_url)
+ db = Database(config.database_url.get_secret_value())
await db.connect()
- broker = RedisBroker(config.redis_url)
+ broker = RedisBroker(config.redis_url.get_secret_value())
health = HealthCheckServer(
"news-collector",
@@ -133,7 +142,7 @@ async def run() -> None:
metrics.service_up.labels(service="news-collector").set(1)
# Build collectors
- finnhub = FinnhubCollector(api_key=config.finnhub_api_key)
+ finnhub = FinnhubCollector(api_key=config.finnhub_api_key.get_secret_value())
rss = RSSCollector()
sec = SecEdgarCollector()
truth = TruthSocialCollector()
@@ -143,6 +152,9 @@ async def run() -> None:
news_collectors = [finnhub, rss, sec, truth, reddit, fed]
+ shutdown = GracefulShutdown()
+ shutdown.install_handlers()
+
log.info(
"starting",
collectors=[c.name for c in news_collectors],
@@ -151,14 +163,13 @@ async def run() -> None:
)
try:
- tasks = []
- for collector in news_collectors:
- tasks.append(
- asyncio.create_task(
- run_collector_loop(collector, db, broker, log),
- name=f"collector-{collector.name}",
- )
+ tasks = [
+ asyncio.create_task(
+ run_collector_loop(collector, db, broker, log),
+ name=f"collector-{collector.name}",
)
+ for collector in news_collectors
+ ]
tasks.append(
asyncio.create_task(
run_fear_greed_loop(fear_greed, db, log),
@@ -171,9 +182,9 @@ async def run() -> None:
name="aggregator-loop",
)
)
- await asyncio.gather(*tasks)
+ await shutdown.wait()
except Exception as exc:
- log.error("fatal_error", error=str(exc))
+ log.error("fatal_error", error=str(exc), exc_info=True)
await notifier.send_error(str(exc), "news-collector")
raise
finally:
diff --git a/services/news-collector/tests/test_fear_greed.py b/services/news-collector/tests/test_fear_greed.py
index d483aa6..e8bd8f0 100644
--- a/services/news-collector/tests/test_fear_greed.py
+++ b/services/news-collector/tests/test_fear_greed.py
@@ -1,8 +1,8 @@
"""Tests for CNN Fear & Greed Index collector."""
-import pytest
from unittest.mock import AsyncMock, patch
+import pytest
from news_collector.collectors.fear_greed import FearGreedCollector
diff --git a/services/news-collector/tests/test_fed.py b/services/news-collector/tests/test_fed.py
index d1a736b..7f1c46c 100644
--- a/services/news-collector/tests/test_fed.py
+++ b/services/news-collector/tests/test_fed.py
@@ -1,7 +1,8 @@
"""Tests for Federal Reserve collector."""
-import pytest
from unittest.mock import AsyncMock, patch
+
+import pytest
from news_collector.collectors.fed import FedCollector
diff --git a/services/news-collector/tests/test_finnhub.py b/services/news-collector/tests/test_finnhub.py
index a4cf169..3af65b8 100644
--- a/services/news-collector/tests/test_finnhub.py
+++ b/services/news-collector/tests/test_finnhub.py
@@ -1,8 +1,8 @@
"""Tests for Finnhub news collector."""
-import pytest
from unittest.mock import AsyncMock, patch
+import pytest
from news_collector.collectors.finnhub import FinnhubCollector
diff --git a/services/news-collector/tests/test_main.py b/services/news-collector/tests/test_main.py
index 66190dc..f85569a 100644
--- a/services/news-collector/tests/test_main.py
+++ b/services/news-collector/tests/test_main.py
@@ -1,16 +1,18 @@
"""Tests for news collector scheduler."""
+from datetime import UTC, datetime
from unittest.mock import AsyncMock, MagicMock
-from datetime import datetime, timezone
-from shared.models import NewsCategory, NewsItem
+
from news_collector.main import run_collector_once
+from shared.models import NewsCategory, NewsItem
+
async def test_run_collector_once_stores_and_publishes():
mock_item = NewsItem(
source="test",
headline="Test news",
- published_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ published_at=datetime(2026, 4, 2, tzinfo=UTC),
sentiment=0.5,
category=NewsCategory.MACRO,
)
diff --git a/services/news-collector/tests/test_reddit.py b/services/news-collector/tests/test_reddit.py
index 440b173..31b1dc1 100644
--- a/services/news-collector/tests/test_reddit.py
+++ b/services/news-collector/tests/test_reddit.py
@@ -1,7 +1,8 @@
"""Tests for Reddit collector."""
-import pytest
from unittest.mock import AsyncMock, patch
+
+import pytest
from news_collector.collectors.reddit import RedditCollector
diff --git a/services/news-collector/tests/test_rss.py b/services/news-collector/tests/test_rss.py
index e03250a..7242c75 100644
--- a/services/news-collector/tests/test_rss.py
+++ b/services/news-collector/tests/test_rss.py
@@ -1,8 +1,8 @@
"""Tests for RSS news collector."""
-import pytest
from unittest.mock import AsyncMock, patch
+import pytest
from news_collector.collectors.rss import RSSCollector
diff --git a/services/news-collector/tests/test_sec_edgar.py b/services/news-collector/tests/test_sec_edgar.py
index 5d4f69f..b0faf18 100644
--- a/services/news-collector/tests/test_sec_edgar.py
+++ b/services/news-collector/tests/test_sec_edgar.py
@@ -1,9 +1,9 @@
"""Tests for SEC EDGAR filing collector."""
-import pytest
-from datetime import datetime, timezone
-from unittest.mock import AsyncMock, patch, MagicMock
+from datetime import UTC, datetime
+from unittest.mock import AsyncMock, MagicMock, patch
+import pytest
from news_collector.collectors.sec_edgar import SecEdgarCollector
@@ -37,7 +37,7 @@ async def test_collect_parses_filings(collector):
}
mock_datetime = MagicMock(spec=datetime)
- mock_datetime.now.return_value = datetime(2026, 4, 2, tzinfo=timezone.utc)
+ mock_datetime.now.return_value = datetime(2026, 4, 2, tzinfo=UTC)
mock_datetime.strptime = datetime.strptime
with patch.object(
diff --git a/services/news-collector/tests/test_truth_social.py b/services/news-collector/tests/test_truth_social.py
index 91ddb9d..52f1e46 100644
--- a/services/news-collector/tests/test_truth_social.py
+++ b/services/news-collector/tests/test_truth_social.py
@@ -1,7 +1,8 @@
"""Tests for Truth Social collector."""
-import pytest
from unittest.mock import AsyncMock, patch
+
+import pytest
from news_collector.collectors.truth_social import TruthSocialCollector
diff --git a/services/order-executor/Dockerfile b/services/order-executor/Dockerfile
index bc8b21c..376afec 100644
--- a/services/order-executor/Dockerfile
+++ b/services/order-executor/Dockerfile
@@ -1,8 +1,15 @@
-FROM python:3.12-slim
+FROM python:3.12-slim AS builder
WORKDIR /app
COPY shared/ shared/
RUN pip install --no-cache-dir ./shared
COPY services/order-executor/ services/order-executor/
RUN pip install --no-cache-dir ./services/order-executor
+
+FROM python:3.12-slim
+RUN useradd -r -s /bin/false appuser
+WORKDIR /app
+COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
+COPY --from=builder /usr/local/bin /usr/local/bin
ENV PYTHONPATH=/app
+USER appuser
CMD ["python", "-m", "order_executor.main"]
diff --git a/services/order-executor/src/order_executor/executor.py b/services/order-executor/src/order_executor/executor.py
index a71e762..fd502cd 100644
--- a/services/order-executor/src/order_executor/executor.py
+++ b/services/order-executor/src/order_executor/executor.py
@@ -1,18 +1,18 @@
"""Order execution logic."""
-import structlog
-from datetime import datetime, timezone
+from datetime import UTC, datetime
from decimal import Decimal
-from typing import Any, Optional
+from typing import Any
+
+import structlog
+from order_executor.risk_manager import RiskManager
from shared.broker import RedisBroker
from shared.db import Database
from shared.events import OrderEvent
from shared.models import Order, OrderStatus, OrderType, Signal
from shared.notifier import TelegramNotifier
-from order_executor.risk_manager import RiskManager
-
logger = structlog.get_logger()
@@ -35,7 +35,7 @@ class OrderExecutor:
self.notifier = notifier
self.dry_run = dry_run
- async def execute(self, signal: Signal) -> Optional[Order]:
+ async def execute(self, signal: Signal) -> Order | None:
"""Run risk checks and place an order for the given signal."""
# Fetch buying power from Alpaca
balance = await self.exchange.get_buying_power()
@@ -71,7 +71,7 @@ class OrderExecutor:
if self.dry_run:
order.status = OrderStatus.FILLED
- order.filled_at = datetime.now(timezone.utc)
+ order.filled_at = datetime.now(UTC)
logger.info(
"order_filled_dry_run",
side=str(order.side),
@@ -87,7 +87,7 @@ class OrderExecutor:
type="market",
)
order.status = OrderStatus.FILLED
- order.filled_at = datetime.now(timezone.utc)
+ order.filled_at = datetime.now(UTC)
logger.info(
"order_filled",
side=str(order.side),
diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py
index 51ab286..99f88e1 100644
--- a/services/order-executor/src/order_executor/main.py
+++ b/services/order-executor/src/order_executor/main.py
@@ -3,6 +3,11 @@
import asyncio
from decimal import Decimal
+import aiohttp
+
+from order_executor.config import ExecutorConfig
+from order_executor.executor import OrderExecutor
+from order_executor.risk_manager import RiskManager
from shared.alpaca import AlpacaClient
from shared.broker import RedisBroker
from shared.db import Database
@@ -11,10 +16,7 @@ from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
-
-from order_executor.config import ExecutorConfig
-from order_executor.executor import OrderExecutor
-from order_executor.risk_manager import RiskManager
+from shared.shutdown import GracefulShutdown
# Health check port: base + 2
HEALTH_PORT_OFFSET = 2
@@ -26,18 +28,18 @@ async def run() -> None:
metrics = ServiceMetrics("order_executor")
notifier = TelegramNotifier(
- bot_token=config.telegram_bot_token,
+ bot_token=config.telegram_bot_token.get_secret_value(),
chat_id=config.telegram_chat_id,
)
- db = Database(config.database_url)
+ db = Database(config.database_url.get_secret_value())
await db.connect()
- broker = RedisBroker(config.redis_url)
+ broker = RedisBroker(config.redis_url.get_secret_value())
alpaca = AlpacaClient(
- api_key=config.alpaca_api_key,
- api_secret=config.alpaca_api_secret,
+ api_key=config.alpaca_api_key.get_secret_value(),
+ api_secret=config.alpaca_api_secret.get_secret_value(),
paper=config.alpaca_paper,
)
@@ -83,6 +85,9 @@ async def run() -> None:
await broker.ensure_group(stream, GROUP)
+ shutdown = GracefulShutdown()
+ shutdown.install_handlers()
+
log.info("started", stream=stream, dry_run=config.dry_run)
try:
@@ -94,10 +99,15 @@ async def run() -> None:
if event.type == EventType.SIGNAL:
await executor.execute(event.data)
await broker.ack(stream, GROUP, msg_id)
+ except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc:
+ log.warning("pending_network_error", error=str(exc), msg_id=msg_id)
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("pending_parse_error", error=str(exc), msg_id=msg_id)
+ await broker.ack(stream, GROUP, msg_id)
except Exception as exc:
- log.error("pending_failed", error=str(exc), msg_id=msg_id)
+ log.error("pending_failed", error=str(exc), msg_id=msg_id, exc_info=True)
- while True:
+ while not shutdown.is_shutting_down:
messages = await broker.read_group(stream, GROUP, CONSUMER, count=10, block=5000)
for msg_id, msg in messages:
try:
@@ -110,8 +120,19 @@ async def run() -> None:
service="order-executor", event_type="signal"
).inc()
await broker.ack(stream, GROUP, msg_id)
+ except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc:
+ log.warning("process_network_error", error=str(exc))
+ metrics.errors_total.labels(
+ service="order-executor", error_type="network"
+ ).inc()
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("process_parse_error", error=str(exc))
+ await broker.ack(stream, GROUP, msg_id)
+ metrics.errors_total.labels(
+ service="order-executor", error_type="validation"
+ ).inc()
except Exception as exc:
- log.error("process_failed", error=str(exc))
+ log.error("process_failed", error=str(exc), exc_info=True)
metrics.errors_total.labels(
service="order-executor", error_type="processing"
).inc()
diff --git a/services/order-executor/src/order_executor/risk_manager.py b/services/order-executor/src/order_executor/risk_manager.py
index 5a05746..811a862 100644
--- a/services/order-executor/src/order_executor/risk_manager.py
+++ b/services/order-executor/src/order_executor/risk_manager.py
@@ -1,12 +1,12 @@
"""Risk management for order execution."""
+import math
+from collections import deque
from dataclasses import dataclass
-from datetime import datetime, timezone, timedelta
+from datetime import UTC, datetime, timedelta
from decimal import Decimal
-from collections import deque
-import math
-from shared.models import Signal, OrderSide, Position
+from shared.models import OrderSide, Position, Signal
@dataclass
@@ -123,15 +123,13 @@ class RiskManager:
else:
self._consecutive_losses += 1
if self._consecutive_losses >= self._max_consecutive_losses:
- self._paused_until = datetime.now(timezone.utc) + timedelta(
- minutes=self._loss_pause_minutes
- )
+ self._paused_until = datetime.now(UTC) + timedelta(minutes=self._loss_pause_minutes)
def is_paused(self) -> bool:
"""Check if trading is paused due to consecutive losses."""
if self._paused_until is None:
return False
- if datetime.now(timezone.utc) >= self._paused_until:
+ if datetime.now(UTC) >= self._paused_until:
self._paused_until = None
self._consecutive_losses = 0
return False
@@ -233,9 +231,9 @@ class RiskManager:
mean_a = sum(returns_a) / len(returns_a)
mean_b = sum(returns_b) / len(returns_b)
- cov = sum((a - mean_a) * (b - mean_b) for a, b in zip(returns_a, returns_b)) / len(
- returns_a
- )
+ cov = sum(
+ (a - mean_a) * (b - mean_b) for a, b in zip(returns_a, returns_b, strict=True)
+ ) / len(returns_a)
std_a = math.sqrt(sum((a - mean_a) ** 2 for a in returns_a) / len(returns_a))
std_b = math.sqrt(sum((b - mean_b) ** 2 for b in returns_b) / len(returns_b))
@@ -280,7 +278,11 @@ class RiskManager:
min_len = min(len(r) for r in all_returns)
portfolio_returns = []
for i in range(min_len):
- pr = sum(w * r[-(min_len - i)] for w, r in zip(weights, all_returns) if len(r) > i)
+ pr = sum(
+ w * r[-(min_len - i)]
+ for w, r in zip(weights, all_returns, strict=False)
+ if len(r) > i
+ )
portfolio_returns.append(pr)
if not portfolio_returns:
diff --git a/services/order-executor/tests/test_executor.py b/services/order-executor/tests/test_executor.py
index dd823d7..cda6b72 100644
--- a/services/order-executor/tests/test_executor.py
+++ b/services/order-executor/tests/test_executor.py
@@ -4,11 +4,11 @@ from decimal import Decimal
from unittest.mock import AsyncMock, MagicMock
import pytest
-
-from shared.models import OrderSide, OrderStatus, Signal
from order_executor.executor import OrderExecutor
from order_executor.risk_manager import RiskCheckResult, RiskManager
+from shared.models import OrderSide, OrderStatus, Signal
+
def make_signal(side: OrderSide = OrderSide.BUY, price: str = "100", quantity: str = "1") -> Signal:
return Signal(
diff --git a/services/order-executor/tests/test_risk_manager.py b/services/order-executor/tests/test_risk_manager.py
index 3d5175b..66e769c 100644
--- a/services/order-executor/tests/test_risk_manager.py
+++ b/services/order-executor/tests/test_risk_manager.py
@@ -2,9 +2,9 @@
from decimal import Decimal
+from order_executor.risk_manager import RiskManager
from shared.models import OrderSide, Position, Signal
-from order_executor.risk_manager import RiskManager
def make_signal(side: OrderSide, price: str, quantity: str, symbol: str = "AAPL") -> Signal:
diff --git a/services/portfolio-manager/Dockerfile b/services/portfolio-manager/Dockerfile
index b1a7681..0fa3f35 100644
--- a/services/portfolio-manager/Dockerfile
+++ b/services/portfolio-manager/Dockerfile
@@ -1,8 +1,15 @@
-FROM python:3.12-slim
+FROM python:3.12-slim AS builder
WORKDIR /app
COPY shared/ shared/
RUN pip install --no-cache-dir ./shared
COPY services/portfolio-manager/ services/portfolio-manager/
RUN pip install --no-cache-dir ./services/portfolio-manager
+
+FROM python:3.12-slim
+RUN useradd -r -s /bin/false appuser
+WORKDIR /app
+COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
+COPY --from=builder /usr/local/bin /usr/local/bin
ENV PYTHONPATH=/app
+USER appuser
CMD ["python", "-m", "portfolio_manager.main"]
diff --git a/services/portfolio-manager/src/portfolio_manager/main.py b/services/portfolio-manager/src/portfolio_manager/main.py
index a6823ae..f885aa8 100644
--- a/services/portfolio-manager/src/portfolio_manager/main.py
+++ b/services/portfolio-manager/src/portfolio_manager/main.py
@@ -2,6 +2,10 @@
import asyncio
+import sqlalchemy.exc
+
+from portfolio_manager.config import PortfolioConfig
+from portfolio_manager.portfolio import PortfolioTracker
from shared.broker import RedisBroker
from shared.db import Database
from shared.events import Event, OrderEvent
@@ -9,9 +13,7 @@ from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
-
-from portfolio_manager.config import PortfolioConfig
-from portfolio_manager.portfolio import PortfolioTracker
+from shared.shutdown import GracefulShutdown
ORDERS_STREAM = "orders"
@@ -51,8 +53,12 @@ async def snapshot_loop(
while True:
try:
await save_snapshot(db, tracker, notifier, log)
+ except (sqlalchemy.exc.OperationalError, ConnectionError, TimeoutError) as exc:
+ log.warning("snapshot_db_error", error=str(exc))
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("snapshot_data_error", error=str(exc))
except Exception as exc:
- log.error("snapshot_failed", error=str(exc))
+ log.error("snapshot_failed", error=str(exc), exc_info=True)
await asyncio.sleep(interval_hours * 3600)
@@ -61,10 +67,10 @@ async def run() -> None:
log = setup_logging("portfolio-manager", config.log_level, config.log_format)
metrics = ServiceMetrics("portfolio_manager")
notifier = TelegramNotifier(
- bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id
+ bot_token=config.telegram_bot_token.get_secret_value(), chat_id=config.telegram_chat_id
)
- broker = RedisBroker(config.redis_url)
+ broker = RedisBroker(config.redis_url.get_secret_value())
tracker = PortfolioTracker()
health = HealthCheckServer(
@@ -76,13 +82,16 @@ async def run() -> None:
await health.start()
metrics.service_up.labels(service="portfolio-manager").set(1)
- db = Database(config.database_url)
+ db = Database(config.database_url.get_secret_value())
await db.connect()
snapshot_task = asyncio.create_task(
snapshot_loop(db, tracker, notifier, config.snapshot_interval_hours, log)
)
+ shutdown = GracefulShutdown()
+ shutdown.install_handlers()
+
GROUP = "portfolio-manager"
CONSUMER = "portfolio-1"
log.info("service_started", stream=ORDERS_STREAM)
@@ -108,12 +117,16 @@ async def run() -> None:
service="portfolio-manager", event_type="order"
).inc()
await broker.ack(ORDERS_STREAM, GROUP, msg_id)
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("pending_parse_error", error=str(exc), msg_id=msg_id)
+ await broker.ack(ORDERS_STREAM, GROUP, msg_id)
+ metrics.errors_total.labels(service="portfolio-manager", error_type="validation").inc()
except Exception as exc:
- log.error("pending_process_failed", error=str(exc), msg_id=msg_id)
+ log.error("pending_process_failed", error=str(exc), msg_id=msg_id, exc_info=True)
metrics.errors_total.labels(service="portfolio-manager", error_type="processing").inc()
try:
- while True:
+ while not shutdown.is_shutting_down:
messages = await broker.read_group(ORDERS_STREAM, GROUP, CONSUMER, count=10, block=1000)
for msg_id, msg in messages:
try:
@@ -134,13 +147,21 @@ async def run() -> None:
service="portfolio-manager", event_type="order"
).inc()
await broker.ack(ORDERS_STREAM, GROUP, msg_id)
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("message_parse_error", error=str(exc), msg_id=msg_id)
+ await broker.ack(ORDERS_STREAM, GROUP, msg_id)
+ metrics.errors_total.labels(
+ service="portfolio-manager", error_type="validation"
+ ).inc()
except Exception as exc:
- log.exception("message_processing_failed", error=str(exc), msg_id=msg_id)
+ log.error(
+ "message_processing_failed", error=str(exc), msg_id=msg_id, exc_info=True
+ )
metrics.errors_total.labels(
service="portfolio-manager", error_type="processing"
).inc()
except Exception as exc:
- log.error("fatal_error", error=str(exc))
+ log.error("fatal_error", error=str(exc), exc_info=True)
await notifier.send_error(str(exc), "portfolio-manager")
raise
finally:
diff --git a/services/portfolio-manager/tests/test_portfolio.py b/services/portfolio-manager/tests/test_portfolio.py
index 365dc1a..c8a6894 100644
--- a/services/portfolio-manager/tests/test_portfolio.py
+++ b/services/portfolio-manager/tests/test_portfolio.py
@@ -2,9 +2,10 @@
from decimal import Decimal
-from shared.models import Order, OrderSide, OrderStatus, OrderType
from portfolio_manager.portfolio import PortfolioTracker
+from shared.models import Order, OrderSide, OrderStatus, OrderType
+
def make_order(side: OrderSide, price: str, quantity: str) -> Order:
"""Helper to create a filled Order."""
diff --git a/services/portfolio-manager/tests/test_snapshot.py b/services/portfolio-manager/tests/test_snapshot.py
index ec5e92d..f2026e2 100644
--- a/services/portfolio-manager/tests/test_snapshot.py
+++ b/services/portfolio-manager/tests/test_snapshot.py
@@ -1,9 +1,10 @@
"""Tests for save_snapshot in portfolio-manager."""
-import pytest
from decimal import Decimal
from unittest.mock import AsyncMock, MagicMock
+import pytest
+
from shared.models import Position
diff --git a/services/strategy-engine/Dockerfile b/services/strategy-engine/Dockerfile
index de635dc..f1484e9 100644
--- a/services/strategy-engine/Dockerfile
+++ b/services/strategy-engine/Dockerfile
@@ -1,9 +1,16 @@
-FROM python:3.12-slim
+FROM python:3.12-slim AS builder
WORKDIR /app
COPY shared/ shared/
RUN pip install --no-cache-dir ./shared
COPY services/strategy-engine/ services/strategy-engine/
RUN pip install --no-cache-dir ./services/strategy-engine
+
+FROM python:3.12-slim
+RUN useradd -r -s /bin/false appuser
+WORKDIR /app
+COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
+COPY --from=builder /usr/local/bin /usr/local/bin
COPY services/strategy-engine/strategies/ /app/strategies/
ENV PYTHONPATH=/app
+USER appuser
CMD ["python", "-m", "strategy_engine.main"]
diff --git a/services/strategy-engine/pyproject.toml b/services/strategy-engine/pyproject.toml
index 4f5b6be..e4bfb12 100644
--- a/services/strategy-engine/pyproject.toml
+++ b/services/strategy-engine/pyproject.toml
@@ -3,11 +3,7 @@ name = "strategy-engine"
version = "0.1.0"
description = "Plugin-based strategy execution engine"
requires-python = ">=3.12"
-dependencies = [
- "pandas>=2.0",
- "numpy>=1.20",
- "trading-shared",
-]
+dependencies = ["pandas>=2.1,<3", "numpy>=1.26,<3", "trading-shared"]
[project.optional-dependencies]
dev = ["pytest>=8.0", "pytest-asyncio>=0.23"]
diff --git a/services/strategy-engine/src/strategy_engine/config.py b/services/strategy-engine/src/strategy_engine/config.py
index 2a9cb43..9fd9c49 100644
--- a/services/strategy-engine/src/strategy_engine/config.py
+++ b/services/strategy-engine/src/strategy_engine/config.py
@@ -7,9 +7,3 @@ class StrategyConfig(Settings):
symbols: list[str] = ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA"]
timeframes: list[str] = ["1m"]
strategy_params: dict = {}
- selector_candidates_time: str = "15:00"
- selector_filter_time: str = "15:15"
- selector_final_time: str = "15:30"
- selector_max_picks: int = 3
- anthropic_api_key: str = ""
- anthropic_model: str = "claude-sonnet-4-20250514"
diff --git a/services/strategy-engine/src/strategy_engine/engine.py b/services/strategy-engine/src/strategy_engine/engine.py
index d401aee..4b2c468 100644
--- a/services/strategy-engine/src/strategy_engine/engine.py
+++ b/services/strategy-engine/src/strategy_engine/engine.py
@@ -2,11 +2,11 @@
import logging
-from shared.broker import RedisBroker
-from shared.events import CandleEvent, SignalEvent, Event
-
from strategies.base import BaseStrategy
+from shared.broker import RedisBroker
+from shared.events import CandleEvent, Event, SignalEvent
+
logger = logging.getLogger(__name__)
@@ -26,7 +26,7 @@ class StrategyEngine:
try:
event = Event.from_dict(raw)
except Exception as exc:
- logger.warning("Failed to parse event: %s – %s", raw, exc)
+ logger.warning("Failed to parse event: %s - %s", raw, exc)
continue
if not isinstance(event, CandleEvent):
diff --git a/services/strategy-engine/src/strategy_engine/main.py b/services/strategy-engine/src/strategy_engine/main.py
index 5a30766..3d73058 100644
--- a/services/strategy-engine/src/strategy_engine/main.py
+++ b/services/strategy-engine/src/strategy_engine/main.py
@@ -1,9 +1,11 @@
"""Strategy Engine Service entry point."""
import asyncio
+import zoneinfo
from datetime import datetime
from pathlib import Path
-import zoneinfo
+
+import aiohttp
from shared.alpaca import AlpacaClient
from shared.broker import RedisBroker
@@ -13,7 +15,7 @@ from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
from shared.sentiment_models import MarketSentiment
-
+from shared.shutdown import GracefulShutdown
from strategy_engine.config import StrategyConfig
from strategy_engine.engine import StrategyEngine
from strategy_engine.plugin_loader import load_strategies
@@ -63,8 +65,12 @@ async def run_stock_selector(
log.info("stock_selector_complete", picks=[s.symbol for s in selections])
else:
log.info("stock_selector_no_picks")
+ except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc:
+ log.warning("stock_selector_network_error", error=str(exc))
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("stock_selector_data_error", error=str(exc))
except Exception as exc:
- log.error("stock_selector_error", error=str(exc))
+ log.error("stock_selector_error", error=str(exc), exc_info=True)
await asyncio.sleep(120) # Sleep past this minute
else:
await asyncio.sleep(30)
@@ -76,18 +82,18 @@ async def run() -> None:
metrics = ServiceMetrics("strategy_engine")
notifier = TelegramNotifier(
- bot_token=config.telegram_bot_token,
+ bot_token=config.telegram_bot_token.get_secret_value(),
chat_id=config.telegram_chat_id,
)
- broker = RedisBroker(config.redis_url)
+ broker = RedisBroker(config.redis_url.get_secret_value())
- db = Database(config.database_url)
+ db = Database(config.database_url.get_secret_value())
await db.connect()
alpaca = AlpacaClient(
- api_key=config.alpaca_api_key,
- api_secret=config.alpaca_api_secret,
+ api_key=config.alpaca_api_key.get_secret_value(),
+ api_secret=config.alpaca_api_secret.get_secret_value(),
paper=config.alpaca_paper,
)
@@ -97,6 +103,9 @@ async def run() -> None:
params = config.strategy_params.get(strategy.name, {})
strategy.configure(params)
+ shutdown = GracefulShutdown()
+ shutdown.install_handlers()
+
log.info("loaded_strategies", count=len(strategies), names=[s.name for s in strategies])
engine = StrategyEngine(broker=broker, strategies=strategies)
@@ -117,12 +126,12 @@ async def run() -> None:
task = asyncio.create_task(process_symbol(engine, stream, log))
tasks.append(task)
- if config.anthropic_api_key:
+ if config.anthropic_api_key.get_secret_value():
selector = StockSelector(
db=db,
broker=broker,
alpaca=alpaca,
- anthropic_api_key=config.anthropic_api_key,
+ anthropic_api_key=config.anthropic_api_key.get_secret_value(),
anthropic_model=config.anthropic_model,
max_picks=config.selector_max_picks,
)
@@ -131,9 +140,9 @@ async def run() -> None:
)
log.info("stock_selector_enabled", time=config.selector_final_time)
- await asyncio.gather(*tasks)
+ await shutdown.wait()
except Exception as exc:
- log.error("fatal_error", error=str(exc))
+ log.error("fatal_error", error=str(exc), exc_info=True)
await notifier.send_error(str(exc), "strategy-engine")
raise
finally:
diff --git a/services/strategy-engine/src/strategy_engine/plugin_loader.py b/services/strategy-engine/src/strategy_engine/plugin_loader.py
index 62e4160..57680db 100644
--- a/services/strategy-engine/src/strategy_engine/plugin_loader.py
+++ b/services/strategy-engine/src/strategy_engine/plugin_loader.py
@@ -5,7 +5,6 @@ import sys
from pathlib import Path
import yaml
-
from strategies.base import BaseStrategy
diff --git a/services/strategy-engine/src/strategy_engine/stock_selector.py b/services/strategy-engine/src/strategy_engine/stock_selector.py
index 268d557..8657b93 100644
--- a/services/strategy-engine/src/strategy_engine/stock_selector.py
+++ b/services/strategy-engine/src/strategy_engine/stock_selector.py
@@ -1,9 +1,10 @@
"""3-stage stock selector engine: sentiment → technical → LLM."""
+import asyncio
import json
import logging
import re
-from datetime import datetime, timezone
+from datetime import UTC, datetime
import aiohttp
@@ -18,18 +19,12 @@ logger = logging.getLogger(__name__)
ANTHROPIC_API_URL = "https://api.anthropic.com/v1/messages"
-def _parse_llm_selections(text: str) -> list[SelectedStock]:
- """Parse LLM response into SelectedStock list.
-
- Handles both bare JSON arrays and markdown code blocks.
- Returns empty list on any parse error.
- """
- # Try to extract JSON from markdown code block first
+def _extract_json_array(text: str) -> list[dict] | None:
+ """Extract a JSON array from text that may contain markdown code blocks."""
code_block = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", text, re.DOTALL)
if code_block:
raw = code_block.group(1)
else:
- # Try to find a bare JSON array
array_match = re.search(r"\[.*\]", text, re.DOTALL)
if array_match:
raw = array_match.group(0)
@@ -38,27 +33,38 @@ def _parse_llm_selections(text: str) -> list[SelectedStock]:
try:
data = json.loads(raw)
- if not isinstance(data, list):
- return []
- selections = []
- for item in data:
- if not isinstance(item, dict):
- continue
- try:
- selection = SelectedStock(
- symbol=item["symbol"],
- side=OrderSide(item["side"]),
- conviction=float(item["conviction"]),
- reason=item.get("reason", ""),
- key_news=item.get("key_news", []),
- )
- selections.append(selection)
- except (KeyError, ValueError) as e:
- logger.warning("Skipping invalid selection item: %s", e)
- return selections
+ if isinstance(data, list):
+ return [item for item in data if isinstance(item, dict)]
+ return None
except (json.JSONDecodeError, TypeError):
+ return None
+
+
+def _parse_llm_selections(text: str) -> list[SelectedStock]:
+ """Parse LLM response into SelectedStock list.
+
+ Handles both bare JSON arrays and markdown code blocks.
+ Returns empty list on any parse error.
+ """
+ items = _extract_json_array(text)
+ if items is None:
return []
+ selections = []
+ for item in items:
+ try:
+ selection = SelectedStock(
+ symbol=item["symbol"],
+ side=OrderSide(item["side"]),
+ conviction=float(item["conviction"]),
+ reason=item.get("reason", ""),
+ key_news=item.get("key_news", []),
+ )
+ selections.append(selection)
+ except (KeyError, ValueError) as e:
+ logger.warning("Skipping invalid selection item: %s", e)
+ return selections
+
class SentimentCandidateSource:
"""Generates candidates from DB sentiment scores."""
@@ -92,7 +98,7 @@ class LLMCandidateSource:
self._api_key = api_key
self._model = model
- async def get_candidates(self) -> list[Candidate]:
+ async def get_candidates(self, session: aiohttp.ClientSession | None = None) -> list[Candidate]:
news_items = await self._db.get_recent_news(hours=24)
if not news_items:
return []
@@ -110,26 +116,29 @@ class LLMCandidateSource:
"Headlines:\n" + "\n".join(headlines)
)
+ own_session = session is None
+ if own_session:
+ session = aiohttp.ClientSession()
+
try:
- async with aiohttp.ClientSession() as session:
- async with session.post(
- ANTHROPIC_API_URL,
- headers={
- "x-api-key": self._api_key,
- "anthropic-version": "2023-06-01",
- "content-type": "application/json",
- },
- json={
- "model": self._model,
- "max_tokens": 1024,
- "messages": [{"role": "user", "content": prompt}],
- },
- ) as resp:
- if resp.status != 200:
- body = await resp.text()
- logger.error("LLM candidate source error %d: %s", resp.status, body)
- return []
- data = await resp.json()
+ async with session.post(
+ ANTHROPIC_API_URL,
+ headers={
+ "x-api-key": self._api_key,
+ "anthropic-version": "2023-06-01",
+ "content-type": "application/json",
+ },
+ json={
+ "model": self._model,
+ "max_tokens": 1024,
+ "messages": [{"role": "user", "content": prompt}],
+ },
+ ) as resp:
+ if resp.status != 200:
+ body = await resp.text()
+ logger.error("LLM candidate source error %d: %s", resp.status, body)
+ return []
+ data = await resp.json()
content = data.get("content", [])
text = ""
@@ -141,40 +150,32 @@ class LLMCandidateSource:
except Exception as e:
logger.error("LLMCandidateSource error: %s", e)
return []
+ finally:
+ if own_session:
+ await session.close()
def _parse_candidates(self, text: str) -> list[Candidate]:
- code_block = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", text, re.DOTALL)
- if code_block:
- raw = code_block.group(1)
- else:
- array_match = re.search(r"\[.*\]", text, re.DOTALL)
- raw = array_match.group(0) if array_match else text.strip()
+ items = _extract_json_array(text)
+ if items is None:
+ return []
- try:
- items = json.loads(raw)
- if not isinstance(items, list):
- return []
- candidates = []
- for item in items:
- if not isinstance(item, dict):
- continue
- try:
- direction_str = item.get("direction", "BUY")
- direction = OrderSide(direction_str)
- except ValueError:
- direction = None
- candidates.append(
- Candidate(
- symbol=item["symbol"],
- source="llm",
- direction=direction,
- score=float(item.get("score", 0.5)),
- reason=item.get("reason", ""),
- )
+ candidates = []
+ for item in items:
+ try:
+ direction_str = item.get("direction", "BUY")
+ direction = OrderSide(direction_str)
+ except ValueError:
+ direction = None
+ candidates.append(
+ Candidate(
+ symbol=item["symbol"],
+ source="llm",
+ direction=direction,
+ score=float(item.get("score", 0.5)),
+ reason=item.get("reason", ""),
)
- return candidates
- except (json.JSONDecodeError, TypeError, KeyError):
- return []
+ )
+ return candidates
def _compute_rsi(closes: list[float], period: int = 14) -> float:
@@ -217,6 +218,18 @@ class StockSelector:
self._api_key = anthropic_api_key
self._model = anthropic_model
self._max_picks = max_picks
+ self._http_session: aiohttp.ClientSession | None = None
+ self._session_lock = asyncio.Lock()
+
+ async def _ensure_session(self) -> aiohttp.ClientSession:
+ async with self._session_lock:
+ if self._http_session is None or self._http_session.closed:
+ self._http_session = aiohttp.ClientSession()
+ return self._http_session
+
+ async def close(self) -> None:
+ if self._http_session and not self._http_session.closed:
+ await self._http_session.close()
async def select(self) -> list[SelectedStock]:
"""Run the full 3-stage pipeline and return selected stocks."""
@@ -235,8 +248,9 @@ class StockSelector:
sentiment_source = SentimentCandidateSource(self._db)
llm_source = LLMCandidateSource(self._db, self._api_key, self._model)
+ session = await self._ensure_session()
sentiment_candidates = await sentiment_source.get_candidates()
- llm_candidates = await llm_source.get_candidates()
+ llm_candidates = await llm_source.get_candidates(session=session)
candidates = self._merge_candidates(sentiment_candidates, llm_candidates)
if not candidates:
@@ -253,7 +267,7 @@ class StockSelector:
selections = await self._llm_final_select(filtered, market_sentiment)
# Persist and publish
- today = datetime.now(timezone.utc).date()
+ today = datetime.now(UTC).date()
sentiment_snapshot = {
"fear_greed": market_sentiment.fear_greed,
"market_regime": market_sentiment.market_regime,
@@ -372,25 +386,25 @@ class StockSelector:
)
try:
- async with aiohttp.ClientSession() as session:
- async with session.post(
- ANTHROPIC_API_URL,
- headers={
- "x-api-key": self._api_key,
- "anthropic-version": "2023-06-01",
- "content-type": "application/json",
- },
- json={
- "model": self._model,
- "max_tokens": 1024,
- "messages": [{"role": "user", "content": prompt}],
- },
- ) as resp:
- if resp.status != 200:
- body = await resp.text()
- logger.error("LLM final select error %d: %s", resp.status, body)
- return []
- data = await resp.json()
+ session = await self._ensure_session()
+ async with session.post(
+ ANTHROPIC_API_URL,
+ headers={
+ "x-api-key": self._api_key,
+ "anthropic-version": "2023-06-01",
+ "content-type": "application/json",
+ },
+ json={
+ "model": self._model,
+ "max_tokens": 1024,
+ "messages": [{"role": "user", "content": prompt}],
+ },
+ ) as resp:
+ if resp.status != 200:
+ body = await resp.text()
+ logger.error("LLM final select error %d: %s", resp.status, body)
+ return []
+ data = await resp.json()
content = data.get("content", [])
text = ""
diff --git a/services/strategy-engine/strategies/base.py b/services/strategy-engine/strategies/base.py
index d5be675..1d9d289 100644
--- a/services/strategy-engine/strategies/base.py
+++ b/services/strategy-engine/strategies/base.py
@@ -1,7 +1,6 @@
from abc import ABC, abstractmethod
from collections import deque
from decimal import Decimal
-from typing import Optional
import pandas as pd
@@ -102,7 +101,7 @@ class BaseStrategy(ABC):
def _calculate_atr_stops(
self, entry_price: Decimal, side: str
- ) -> tuple[Optional[Decimal], Optional[Decimal]]:
+ ) -> tuple[Decimal | None, Decimal | None]:
"""Calculate ATR-based stop-loss and take-profit.
Returns (stop_loss, take_profit) as Decimal or (None, None) if not enough data.
@@ -131,7 +130,7 @@ class BaseStrategy(ABC):
return sl, tp
- def _apply_filters(self, signal: Signal) -> Optional[Signal]:
+ def _apply_filters(self, signal: Signal) -> Signal | None:
"""Apply all filters to a signal. Returns signal with SL/TP or None if filtered out."""
if signal is None:
return None
diff --git a/services/strategy-engine/strategies/bollinger_strategy.py b/services/strategy-engine/strategies/bollinger_strategy.py
index ebe7967..02ff09a 100644
--- a/services/strategy-engine/strategies/bollinger_strategy.py
+++ b/services/strategy-engine/strategies/bollinger_strategy.py
@@ -3,7 +3,7 @@ from decimal import Decimal
import pandas as pd
-from shared.models import Candle, Signal, OrderSide
+from shared.models import Candle, OrderSide, Signal
from strategies.base import BaseStrategy
diff --git a/services/strategy-engine/strategies/combined_strategy.py b/services/strategy-engine/strategies/combined_strategy.py
index ba92485..f562918 100644
--- a/services/strategy-engine/strategies/combined_strategy.py
+++ b/services/strategy-engine/strategies/combined_strategy.py
@@ -2,7 +2,7 @@
from decimal import Decimal
-from shared.models import Candle, Signal, OrderSide
+from shared.models import Candle, OrderSide, Signal
from strategies.base import BaseStrategy
diff --git a/services/strategy-engine/strategies/ema_crossover_strategy.py b/services/strategy-engine/strategies/ema_crossover_strategy.py
index 68d0ba3..9c181f3 100644
--- a/services/strategy-engine/strategies/ema_crossover_strategy.py
+++ b/services/strategy-engine/strategies/ema_crossover_strategy.py
@@ -3,7 +3,7 @@ from decimal import Decimal
import pandas as pd
-from shared.models import Candle, Signal, OrderSide
+from shared.models import Candle, OrderSide, Signal
from strategies.base import BaseStrategy
diff --git a/services/strategy-engine/strategies/grid_strategy.py b/services/strategy-engine/strategies/grid_strategy.py
index 283bfe5..491252e 100644
--- a/services/strategy-engine/strategies/grid_strategy.py
+++ b/services/strategy-engine/strategies/grid_strategy.py
@@ -1,9 +1,8 @@
from decimal import Decimal
-from typing import Optional
import numpy as np
-from shared.models import Candle, Signal, OrderSide
+from shared.models import Candle, OrderSide, Signal
from strategies.base import BaseStrategy
@@ -17,7 +16,7 @@ class GridStrategy(BaseStrategy):
self._grid_count: int = 5
self._quantity: Decimal = Decimal("0.01")
self._grid_levels: list[float] = []
- self._last_zone: Optional[int] = None
+ self._last_zone: int | None = None
self._exit_threshold_pct: float = 5.0
self._out_of_range: bool = False
self._in_position: bool = False # Track if we have any grid positions
diff --git a/services/strategy-engine/strategies/indicators/__init__.py b/services/strategy-engine/strategies/indicators/__init__.py
index 3c713e6..01637b7 100644
--- a/services/strategy-engine/strategies/indicators/__init__.py
+++ b/services/strategy-engine/strategies/indicators/__init__.py
@@ -1,21 +1,21 @@
"""Reusable technical indicator functions."""
-from strategies.indicators.trend import ema, sma, macd, adx
-from strategies.indicators.volatility import atr, bollinger_bands, keltner_channels
from strategies.indicators.momentum import rsi, stochastic
-from strategies.indicators.volume import volume_sma, volume_ratio, obv
+from strategies.indicators.trend import adx, ema, macd, sma
+from strategies.indicators.volatility import atr, bollinger_bands, keltner_channels
+from strategies.indicators.volume import obv, volume_ratio, volume_sma
__all__ = [
- "ema",
- "sma",
- "macd",
"adx",
"atr",
"bollinger_bands",
+ "ema",
"keltner_channels",
+ "macd",
+ "obv",
"rsi",
+ "sma",
"stochastic",
- "volume_sma",
"volume_ratio",
- "obv",
+ "volume_sma",
]
diff --git a/services/strategy-engine/strategies/indicators/momentum.py b/services/strategy-engine/strategies/indicators/momentum.py
index c479452..a82210b 100644
--- a/services/strategy-engine/strategies/indicators/momentum.py
+++ b/services/strategy-engine/strategies/indicators/momentum.py
@@ -1,7 +1,7 @@
"""Momentum indicators: RSI, Stochastic."""
-import pandas as pd
import numpy as np
+import pandas as pd
def rsi(closes: pd.Series, period: int = 14) -> pd.Series:
diff --git a/services/strategy-engine/strategies/indicators/trend.py b/services/strategy-engine/strategies/indicators/trend.py
index c94a071..1085199 100644
--- a/services/strategy-engine/strategies/indicators/trend.py
+++ b/services/strategy-engine/strategies/indicators/trend.py
@@ -1,7 +1,7 @@
"""Trend indicators: EMA, SMA, MACD, ADX."""
-import pandas as pd
import numpy as np
+import pandas as pd
def sma(series: pd.Series, period: int) -> pd.Series:
diff --git a/services/strategy-engine/strategies/indicators/volatility.py b/services/strategy-engine/strategies/indicators/volatility.py
index c16143e..da82f26 100644
--- a/services/strategy-engine/strategies/indicators/volatility.py
+++ b/services/strategy-engine/strategies/indicators/volatility.py
@@ -1,7 +1,7 @@
"""Volatility indicators: ATR, Bollinger Bands, Keltner Channels."""
-import pandas as pd
import numpy as np
+import pandas as pd
def atr(
diff --git a/services/strategy-engine/strategies/indicators/volume.py b/services/strategy-engine/strategies/indicators/volume.py
index 502f1ce..d7c6471 100644
--- a/services/strategy-engine/strategies/indicators/volume.py
+++ b/services/strategy-engine/strategies/indicators/volume.py
@@ -1,7 +1,7 @@
"""Volume indicators: Volume SMA, Volume Ratio, OBV."""
-import pandas as pd
import numpy as np
+import pandas as pd
def volume_sma(volumes: pd.Series, period: int = 20) -> pd.Series:
diff --git a/services/strategy-engine/strategies/macd_strategy.py b/services/strategy-engine/strategies/macd_strategy.py
index 356a42b..b5aea07 100644
--- a/services/strategy-engine/strategies/macd_strategy.py
+++ b/services/strategy-engine/strategies/macd_strategy.py
@@ -3,7 +3,7 @@ from decimal import Decimal
import pandas as pd
-from shared.models import Candle, Signal, OrderSide
+from shared.models import Candle, OrderSide, Signal
from strategies.base import BaseStrategy
diff --git a/services/strategy-engine/strategies/moc_strategy.py b/services/strategy-engine/strategies/moc_strategy.py
index 7eaa59e..cbc8440 100644
--- a/services/strategy-engine/strategies/moc_strategy.py
+++ b/services/strategy-engine/strategies/moc_strategy.py
@@ -8,12 +8,12 @@ Rules:
"""
from collections import deque
-from decimal import Decimal
from datetime import datetime
+from decimal import Decimal
import pandas as pd
-from shared.models import Candle, Signal, OrderSide
+from shared.models import Candle, OrderSide, Signal
from strategies.base import BaseStrategy
diff --git a/services/strategy-engine/strategies/rsi_strategy.py b/services/strategy-engine/strategies/rsi_strategy.py
index 0646d8c..2df080d 100644
--- a/services/strategy-engine/strategies/rsi_strategy.py
+++ b/services/strategy-engine/strategies/rsi_strategy.py
@@ -3,7 +3,7 @@ from decimal import Decimal
import pandas as pd
-from shared.models import Candle, Signal, OrderSide
+from shared.models import Candle, OrderSide, Signal
from strategies.base import BaseStrategy
diff --git a/services/strategy-engine/strategies/volume_profile_strategy.py b/services/strategy-engine/strategies/volume_profile_strategy.py
index ef2ae14..67b5c23 100644
--- a/services/strategy-engine/strategies/volume_profile_strategy.py
+++ b/services/strategy-engine/strategies/volume_profile_strategy.py
@@ -3,7 +3,7 @@ from decimal import Decimal
import numpy as np
-from shared.models import Candle, Signal, OrderSide
+from shared.models import Candle, OrderSide, Signal
from strategies.base import BaseStrategy
@@ -137,7 +137,7 @@ class VolumeProfileStrategy(BaseStrategy):
if result is None:
return None
- poc, va_low, va_high, hvn_levels, lvn_levels = result
+ poc, va_low, va_high, hvn_levels, _lvn_levels = result
if close < va_low:
self._was_below_va = True
diff --git a/services/strategy-engine/strategies/vwap_strategy.py b/services/strategy-engine/strategies/vwap_strategy.py
index d64950e..4ee4952 100644
--- a/services/strategy-engine/strategies/vwap_strategy.py
+++ b/services/strategy-engine/strategies/vwap_strategy.py
@@ -1,7 +1,7 @@
from collections import deque
from decimal import Decimal
-from shared.models import Candle, Signal, OrderSide
+from shared.models import Candle, OrderSide, Signal
from strategies.base import BaseStrategy
@@ -107,7 +107,7 @@ class VwapStrategy(BaseStrategy):
# Standard deviation of (TP - VWAP) for bands
std_dev = 0.0
if len(self._tp_values) >= 2:
- diffs = [tp - v for tp, v in zip(self._tp_values, self._vwap_values)]
+ diffs = [tp - v for tp, v in zip(self._tp_values, self._vwap_values, strict=True)]
mean_diff = sum(diffs) / len(diffs)
variance = sum((d - mean_diff) ** 2 for d in diffs) / len(diffs)
std_dev = variance**0.5
diff --git a/services/strategy-engine/tests/test_base_filters.py b/services/strategy-engine/tests/test_base_filters.py
index ae9ca05..66adec7 100644
--- a/services/strategy-engine/tests/test_base_filters.py
+++ b/services/strategy-engine/tests/test_base_filters.py
@@ -5,12 +5,13 @@ from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
+from datetime import UTC, datetime
from decimal import Decimal
-from datetime import datetime, timezone
-from shared.models import Candle, Signal, OrderSide
from strategies.base import BaseStrategy
+from shared.models import Candle, OrderSide, Signal
+
class DummyStrategy(BaseStrategy):
name = "dummy"
@@ -45,7 +46,7 @@ def _candle(price=100.0, volume=10.0, high=None, low=None):
return Candle(
symbol="AAPL",
timeframe="1h",
- open_time=datetime(2025, 1, 1, tzinfo=timezone.utc),
+ open_time=datetime(2025, 1, 1, tzinfo=UTC),
open=Decimal(str(price)),
high=Decimal(str(h)),
low=Decimal(str(lo)),
diff --git a/services/strategy-engine/tests/test_bollinger_strategy.py b/services/strategy-engine/tests/test_bollinger_strategy.py
index 8261377..70ec66e 100644
--- a/services/strategy-engine/tests/test_bollinger_strategy.py
+++ b/services/strategy-engine/tests/test_bollinger_strategy.py
@@ -1,18 +1,18 @@
"""Tests for the Bollinger Bands strategy."""
-from datetime import datetime, timezone
+from datetime import UTC, datetime
from decimal import Decimal
+from strategies.bollinger_strategy import BollingerStrategy
from shared.models import Candle, OrderSide
-from strategies.bollinger_strategy import BollingerStrategy
def make_candle(close: float) -> Candle:
return Candle(
symbol="AAPL",
timeframe="1m",
- open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
+ open_time=datetime(2024, 1, 1, tzinfo=UTC),
open=Decimal(str(close)),
high=Decimal(str(close)),
low=Decimal(str(close)),
diff --git a/services/strategy-engine/tests/test_combined_strategy.py b/services/strategy-engine/tests/test_combined_strategy.py
index 8a4dc74..6a15250 100644
--- a/services/strategy-engine/tests/test_combined_strategy.py
+++ b/services/strategy-engine/tests/test_combined_strategy.py
@@ -5,13 +5,14 @@ from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
+from datetime import UTC, datetime
from decimal import Decimal
-from datetime import datetime, timezone
-import pytest
-from shared.models import Candle, Signal, OrderSide
-from strategies.combined_strategy import CombinedStrategy
+import pytest
from strategies.base import BaseStrategy
+from strategies.combined_strategy import CombinedStrategy
+
+from shared.models import Candle, OrderSide, Signal
class AlwaysBuyStrategy(BaseStrategy):
@@ -74,7 +75,7 @@ def _candle(price=100.0):
return Candle(
symbol="AAPL",
timeframe="1m",
- open_time=datetime(2025, 1, 1, tzinfo=timezone.utc),
+ open_time=datetime(2025, 1, 1, tzinfo=UTC),
open=Decimal(str(price)),
high=Decimal(str(price + 10)),
low=Decimal(str(price - 10)),
diff --git a/services/strategy-engine/tests/test_ema_crossover_strategy.py b/services/strategy-engine/tests/test_ema_crossover_strategy.py
index 7028eb0..af2b587 100644
--- a/services/strategy-engine/tests/test_ema_crossover_strategy.py
+++ b/services/strategy-engine/tests/test_ema_crossover_strategy.py
@@ -1,18 +1,18 @@
"""Tests for the EMA Crossover strategy."""
-from datetime import datetime, timezone
+from datetime import UTC, datetime
from decimal import Decimal
+from strategies.ema_crossover_strategy import EmaCrossoverStrategy
from shared.models import Candle, OrderSide
-from strategies.ema_crossover_strategy import EmaCrossoverStrategy
def make_candle(close: float) -> Candle:
return Candle(
symbol="AAPL",
timeframe="1m",
- open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
+ open_time=datetime(2024, 1, 1, tzinfo=UTC),
open=Decimal(str(close)),
high=Decimal(str(close)),
low=Decimal(str(close)),
diff --git a/services/strategy-engine/tests/test_engine.py b/services/strategy-engine/tests/test_engine.py
index 2623027..fa888b5 100644
--- a/services/strategy-engine/tests/test_engine.py
+++ b/services/strategy-engine/tests/test_engine.py
@@ -1,21 +1,21 @@
"""Tests for the StrategyEngine."""
-from datetime import datetime, timezone
+from datetime import UTC, datetime
from decimal import Decimal
from unittest.mock import AsyncMock, MagicMock
import pytest
+from strategy_engine.engine import StrategyEngine
-from shared.models import Candle, Signal, OrderSide
from shared.events import CandleEvent
-from strategy_engine.engine import StrategyEngine
+from shared.models import Candle, OrderSide, Signal
def make_candle_event() -> dict:
candle = Candle(
symbol="AAPL",
timeframe="1m",
- open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
+ open_time=datetime(2024, 1, 1, tzinfo=UTC),
open=Decimal("50000"),
high=Decimal("50100"),
low=Decimal("49900"),
diff --git a/services/strategy-engine/tests/test_grid_strategy.py b/services/strategy-engine/tests/test_grid_strategy.py
index 878b900..f697012 100644
--- a/services/strategy-engine/tests/test_grid_strategy.py
+++ b/services/strategy-engine/tests/test_grid_strategy.py
@@ -1,18 +1,18 @@
"""Tests for the Grid strategy."""
-from datetime import datetime, timezone
+from datetime import UTC, datetime
from decimal import Decimal
+from strategies.grid_strategy import GridStrategy
from shared.models import Candle, OrderSide
-from strategies.grid_strategy import GridStrategy
def make_candle(close: float) -> Candle:
return Candle(
symbol="AAPL",
timeframe="1m",
- open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
+ open_time=datetime(2024, 1, 1, tzinfo=UTC),
open=Decimal(str(close)),
high=Decimal(str(close)),
low=Decimal(str(close)),
diff --git a/services/strategy-engine/tests/test_indicators.py b/services/strategy-engine/tests/test_indicators.py
index 481569b..3147fc4 100644
--- a/services/strategy-engine/tests/test_indicators.py
+++ b/services/strategy-engine/tests/test_indicators.py
@@ -5,14 +5,13 @@ from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
-import pandas as pd
import numpy as np
+import pandas as pd
import pytest
-
-from strategies.indicators.trend import sma, ema, macd, adx
-from strategies.indicators.volatility import atr, bollinger_bands
from strategies.indicators.momentum import rsi, stochastic
-from strategies.indicators.volume import volume_sma, volume_ratio, obv
+from strategies.indicators.trend import adx, ema, macd, sma
+from strategies.indicators.volatility import atr, bollinger_bands
+from strategies.indicators.volume import obv, volume_ratio, volume_sma
class TestTrend:
diff --git a/services/strategy-engine/tests/test_macd_strategy.py b/services/strategy-engine/tests/test_macd_strategy.py
index 556fd4c..7fac16f 100644
--- a/services/strategy-engine/tests/test_macd_strategy.py
+++ b/services/strategy-engine/tests/test_macd_strategy.py
@@ -1,18 +1,18 @@
"""Tests for the MACD strategy."""
-from datetime import datetime, timezone
+from datetime import UTC, datetime
from decimal import Decimal
+from strategies.macd_strategy import MacdStrategy
from shared.models import Candle, OrderSide
-from strategies.macd_strategy import MacdStrategy
def _candle(price: float) -> Candle:
return Candle(
symbol="AAPL",
timeframe="1m",
- open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
+ open_time=datetime(2024, 1, 1, tzinfo=UTC),
open=Decimal(str(price)),
high=Decimal(str(price)),
low=Decimal(str(price)),
diff --git a/services/strategy-engine/tests/test_moc_strategy.py b/services/strategy-engine/tests/test_moc_strategy.py
index 1928a28..076e846 100644
--- a/services/strategy-engine/tests/test_moc_strategy.py
+++ b/services/strategy-engine/tests/test_moc_strategy.py
@@ -5,19 +5,20 @@ from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
-from datetime import datetime, timezone
+from datetime import UTC, datetime
from decimal import Decimal
-from shared.models import Candle, OrderSide
from strategies.moc_strategy import MocStrategy
+from shared.models import Candle, OrderSide
+
def _candle(price, hour=20, minute=0, volume=100.0, day=1, open_price=None):
op = open_price if open_price is not None else price - 1 # Default: bullish
return Candle(
symbol="AAPL",
timeframe="5Min",
- open_time=datetime(2025, 1, day, hour, minute, tzinfo=timezone.utc),
+ open_time=datetime(2025, 1, day, hour, minute, tzinfo=UTC),
open=Decimal(str(op)),
high=Decimal(str(price + 1)),
low=Decimal(str(min(op, price) - 1)),
diff --git a/services/strategy-engine/tests/test_multi_symbol.py b/services/strategy-engine/tests/test_multi_symbol.py
index 671a9d3..922bfc2 100644
--- a/services/strategy-engine/tests/test_multi_symbol.py
+++ b/services/strategy-engine/tests/test_multi_symbol.py
@@ -9,11 +9,13 @@ import pytest
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
+from datetime import UTC, datetime
+from decimal import Decimal
+
from strategy_engine.engine import StrategyEngine
+
from shared.events import CandleEvent
from shared.models import Candle
-from decimal import Decimal
-from datetime import datetime, timezone
@pytest.mark.asyncio
@@ -24,7 +26,7 @@ async def test_engine_processes_multiple_streams():
candle_btc = Candle(
symbol="AAPL",
timeframe="1m",
- open_time=datetime(2025, 1, 1, tzinfo=timezone.utc),
+ open_time=datetime(2025, 1, 1, tzinfo=UTC),
open=Decimal("50000"),
high=Decimal("51000"),
low=Decimal("49000"),
@@ -34,7 +36,7 @@ async def test_engine_processes_multiple_streams():
candle_eth = Candle(
symbol="MSFT",
timeframe="1m",
- open_time=datetime(2025, 1, 1, tzinfo=timezone.utc),
+ open_time=datetime(2025, 1, 1, tzinfo=UTC),
open=Decimal("3000"),
high=Decimal("3100"),
low=Decimal("2900"),
diff --git a/services/strategy-engine/tests/test_plugin_loader.py b/services/strategy-engine/tests/test_plugin_loader.py
index 5191fc3..7bd450f 100644
--- a/services/strategy-engine/tests/test_plugin_loader.py
+++ b/services/strategy-engine/tests/test_plugin_loader.py
@@ -2,10 +2,8 @@
from pathlib import Path
-
from strategy_engine.plugin_loader import load_strategies
-
STRATEGIES_DIR = Path(__file__).parent.parent / "strategies"
diff --git a/services/strategy-engine/tests/test_rsi_strategy.py b/services/strategy-engine/tests/test_rsi_strategy.py
index 6d31fd5..6c74f0b 100644
--- a/services/strategy-engine/tests/test_rsi_strategy.py
+++ b/services/strategy-engine/tests/test_rsi_strategy.py
@@ -1,18 +1,18 @@
"""Tests for the RSI strategy."""
-from datetime import datetime, timezone
+from datetime import UTC, datetime
from decimal import Decimal
+from strategies.rsi_strategy import RsiStrategy
from shared.models import Candle, OrderSide
-from strategies.rsi_strategy import RsiStrategy
def make_candle(close: float, idx: int = 0) -> Candle:
return Candle(
symbol="AAPL",
timeframe="1m",
- open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
+ open_time=datetime(2024, 1, 1, tzinfo=UTC),
open=Decimal(str(close)),
high=Decimal(str(close)),
low=Decimal(str(close)),
diff --git a/services/strategy-engine/tests/test_stock_selector.py b/services/strategy-engine/tests/test_stock_selector.py
index ff9d09c..76b8541 100644
--- a/services/strategy-engine/tests/test_stock_selector.py
+++ b/services/strategy-engine/tests/test_stock_selector.py
@@ -1,12 +1,12 @@
"""Tests for stock selector engine."""
+from datetime import UTC, datetime
from unittest.mock import AsyncMock, MagicMock
-from datetime import datetime, timezone
-
from strategy_engine.stock_selector import (
SentimentCandidateSource,
StockSelector,
+ _extract_json_array,
_parse_llm_selections,
)
@@ -60,6 +60,37 @@ def test_parse_llm_selections_with_markdown():
assert selections[0].symbol == "TSLA"
+def test_extract_json_array_from_markdown():
+ text = '```json\n[{"symbol": "AAPL", "score": 0.9}]\n```'
+ result = _extract_json_array(text)
+ assert result == [{"symbol": "AAPL", "score": 0.9}]
+
+
+def test_extract_json_array_bare():
+ text = '[{"symbol": "TSLA"}]'
+ result = _extract_json_array(text)
+ assert result == [{"symbol": "TSLA"}]
+
+
+def test_extract_json_array_invalid():
+ assert _extract_json_array("not json") is None
+
+
+def test_extract_json_array_filters_non_dicts():
+ text = '[{"symbol": "AAPL"}, "bad", 42]'
+ result = _extract_json_array(text)
+ assert result == [{"symbol": "AAPL"}]
+
+
+async def test_selector_close():
+ selector = StockSelector(
+ db=MagicMock(), broker=MagicMock(), alpaca=MagicMock(), anthropic_api_key="test"
+ )
+ # No session yet - close should be safe
+ await selector.close()
+ assert selector._http_session is None
+
+
async def test_selector_blocks_on_risk_off():
mock_db = MagicMock()
mock_db.get_latest_market_sentiment = AsyncMock(
@@ -69,7 +100,7 @@ async def test_selector_blocks_on_risk_off():
"vix": 35.0,
"fed_stance": "neutral",
"market_regime": "risk_off",
- "updated_at": datetime.now(timezone.utc),
+ "updated_at": datetime.now(UTC),
}
)
diff --git a/services/strategy-engine/tests/test_strategy_validation.py b/services/strategy-engine/tests/test_strategy_validation.py
index debab1f..0d9607a 100644
--- a/services/strategy-engine/tests/test_strategy_validation.py
+++ b/services/strategy-engine/tests/test_strategy_validation.py
@@ -1,13 +1,11 @@
import pytest
-
-from strategies.rsi_strategy import RsiStrategy
-from strategies.macd_strategy import MacdStrategy
from strategies.bollinger_strategy import BollingerStrategy
from strategies.ema_crossover_strategy import EmaCrossoverStrategy
from strategies.grid_strategy import GridStrategy
-from strategies.vwap_strategy import VwapStrategy
+from strategies.macd_strategy import MacdStrategy
+from strategies.rsi_strategy import RsiStrategy
from strategies.volume_profile_strategy import VolumeProfileStrategy
-
+from strategies.vwap_strategy import VwapStrategy
# ── RSI ──────────────────────────────────────────────────────────────────
diff --git a/services/strategy-engine/tests/test_volume_profile_strategy.py b/services/strategy-engine/tests/test_volume_profile_strategy.py
index 65ee2e8..f47898c 100644
--- a/services/strategy-engine/tests/test_volume_profile_strategy.py
+++ b/services/strategy-engine/tests/test_volume_profile_strategy.py
@@ -1,18 +1,18 @@
"""Tests for the Volume Profile strategy."""
-from datetime import datetime, timezone
+from datetime import UTC, datetime
from decimal import Decimal
+from strategies.volume_profile_strategy import VolumeProfileStrategy
from shared.models import Candle, OrderSide
-from strategies.volume_profile_strategy import VolumeProfileStrategy
def make_candle(close: float, volume: float = 1.0) -> Candle:
return Candle(
symbol="AAPL",
timeframe="1m",
- open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
+ open_time=datetime(2024, 1, 1, tzinfo=UTC),
open=Decimal(str(close)),
high=Decimal(str(close)),
low=Decimal(str(close)),
@@ -134,13 +134,10 @@ def test_volume_profile_hvn_detection():
# Create a profile with very high volume at price ~100 and low volume elsewhere
# Prices range from 90 to 110, heavy volume concentrated at 100
- candles_data = []
# Low volume at extremes
- for p in [90, 91, 92, 109, 110]:
- candles_data.append((p, 1.0))
+ candles_data = [(p, 1.0) for p in [90, 91, 92, 109, 110]]
# Very high volume around 100
- for _ in range(15):
- candles_data.append((100, 100.0))
+ candles_data.extend((100, 100.0) for _ in range(15))
for price, vol in candles_data:
strategy.on_candle(make_candle(price, vol))
@@ -148,7 +145,7 @@ def test_volume_profile_hvn_detection():
# Access the internal method to verify HVN detection
result = strategy._compute_value_area()
assert result is not None
- poc, va_low, va_high, hvn_levels, lvn_levels = result
+ _poc, _va_low, _va_high, hvn_levels, _lvn_levels = result
# The bin containing price ~100 should have very high volume -> HVN
assert len(hvn_levels) > 0
diff --git a/services/strategy-engine/tests/test_vwap_strategy.py b/services/strategy-engine/tests/test_vwap_strategy.py
index 2c34b01..078d0cf 100644
--- a/services/strategy-engine/tests/test_vwap_strategy.py
+++ b/services/strategy-engine/tests/test_vwap_strategy.py
@@ -1,11 +1,11 @@
"""Tests for the VWAP strategy."""
-from datetime import datetime, timezone
+from datetime import UTC, datetime
from decimal import Decimal
+from strategies.vwap_strategy import VwapStrategy
from shared.models import Candle, OrderSide
-from strategies.vwap_strategy import VwapStrategy
def make_candle(
@@ -20,7 +20,7 @@ def make_candle(
if low is None:
low = close
if open_time is None:
- open_time = datetime(2024, 1, 1, tzinfo=timezone.utc)
+ open_time = datetime(2024, 1, 1, tzinfo=UTC)
return Candle(
symbol="AAPL",
timeframe="1m",
@@ -111,11 +111,11 @@ def test_vwap_daily_reset():
"""Candles from two different dates cause VWAP to reset."""
strategy = _configured_strategy()
- day1 = datetime(2024, 1, 1, tzinfo=timezone.utc)
- day2 = datetime(2024, 1, 2, tzinfo=timezone.utc)
+ day1 = datetime(2024, 1, 1, tzinfo=UTC)
+ day2 = datetime(2024, 1, 2, tzinfo=UTC)
# Feed 35 candles on day 1 to build VWAP state
- for i in range(35):
+ for _i in range(35):
strategy.on_candle(make_candle(100.0, high=101.0, low=99.0, open_time=day1))
# Verify state is built up