summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitea/workflows/ci.yaml6
-rwxr-xr-xscripts/ci.sh7
-rw-r--r--services/backtester/src/backtester/config.py7
-rw-r--r--services/data-collector/src/data_collector/main.py13
-rw-r--r--services/data-collector/src/data_collector/ws_factory.py33
-rw-r--r--services/data-collector/tests/test_ws_factory.py18
-rw-r--r--services/order-executor/src/order_executor/main.py7
-rw-r--r--services/strategy-engine/src/strategy_engine/main.py7
8 files changed, 85 insertions, 13 deletions
diff --git a/.gitea/workflows/ci.yaml b/.gitea/workflows/ci.yaml
index ab7c1c5..95b51c1 100644
--- a/.gitea/workflows/ci.yaml
+++ b/.gitea/workflows/ci.yaml
@@ -36,7 +36,5 @@ jobs:
if: github.ref == 'refs/heads/master'
steps:
- uses: actions/checkout@v4
-
- - name: Build Docker images
- run: |
- docker compose build
+ - name: Build all Docker images
+ run: docker compose build
diff --git a/scripts/ci.sh b/scripts/ci.sh
index c89dd0d..45296a0 100755
--- a/scripts/ci.sh
+++ b/scripts/ci.sh
@@ -12,4 +12,11 @@ ruff format --check .
echo "=== Running tests ==="
pytest -v
+echo "=== Building Docker images ==="
+if command -v docker &> /dev/null; then
+ docker compose build --quiet 2>&1 || echo "WARNING: Docker build failed (Docker may not be available in CI)"
+else
+ echo "SKIP: Docker not available"
+fi
+
echo "=== All checks passed ==="
diff --git a/services/backtester/src/backtester/config.py b/services/backtester/src/backtester/config.py
index 5a912f3..0d759f8 100644
--- a/services/backtester/src/backtester/config.py
+++ b/services/backtester/src/backtester/config.py
@@ -1,14 +1,13 @@
"""Configuration for the backtester service."""
-from pydantic_settings import BaseSettings
+from shared.config import Settings
-class BacktestConfig(BaseSettings):
+class BacktestConfig(Settings):
backtest_initial_balance: float = 10000.0
- database_url: str = "postgresql://trading:trading@localhost:5432/trading"
symbol: str = "BTCUSDT"
timeframe: str = "1h"
- strategy_name: str = "sma_crossover"
+ strategy_name: str = "rsi_strategy"
candle_limit: int = 500
model_config = {"env_file": ".env", "env_file_encoding": "utf-8", "extra": "ignore"}
diff --git a/services/data-collector/src/data_collector/main.py b/services/data-collector/src/data_collector/main.py
index 4384985..b393cc2 100644
--- a/services/data-collector/src/data_collector/main.py
+++ b/services/data-collector/src/data_collector/main.py
@@ -9,9 +9,15 @@ from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
-from data_collector.binance_ws import BinanceWebSocket
from data_collector.config import CollectorConfig
from data_collector.storage import CandleStorage
+from data_collector.ws_factory import create_websocket
+
+
+# Health check port: base (HEALTH_PORT, default 8080) + offset
+# data-collector: +0 (8080), strategy-engine: +1 (8081)
+# order-executor: +2 (8082), portfolio-manager: +3 (8083)
+HEALTH_PORT_OFFSET = 0
async def run() -> None:
@@ -43,14 +49,15 @@ async def run() -> None:
# Use the first configured timeframe for the WebSocket subscription.
timeframe = config.timeframes[0] if config.timeframes else "1m"
- ws = BinanceWebSocket(
+ ws = create_websocket(
+ exchange_id=config.exchange_id,
symbols=config.symbols,
timeframe=timeframe,
on_candle=on_candle,
)
health = HealthCheckServer(
- "data-collector", port=config.health_port, auth_token=config.metrics_auth_token
+ "data-collector", port=config.health_port + HEALTH_PORT_OFFSET, auth_token=config.metrics_auth_token
)
health.register_check("redis", broker.ping)
await health.start()
diff --git a/services/data-collector/src/data_collector/ws_factory.py b/services/data-collector/src/data_collector/ws_factory.py
new file mode 100644
index 0000000..b8e2719
--- /dev/null
+++ b/services/data-collector/src/data_collector/ws_factory.py
@@ -0,0 +1,33 @@
+"""WebSocket factory for exchange-specific connections."""
+import logging
+
+from data_collector.binance_ws import BinanceWebSocket
+
+logger = logging.getLogger(__name__)
+
+# Supported exchanges for WebSocket streaming
+SUPPORTED_WS = {"binance": BinanceWebSocket}
+
+
+def create_websocket(exchange_id: str, **kwargs):
+ """Create an exchange-specific WebSocket handler.
+
+ Args:
+ exchange_id: Exchange identifier (e.g. 'binance')
+ **kwargs: Passed to the WebSocket constructor (symbols, timeframe, on_candle)
+
+ Returns:
+ WebSocket handler instance
+
+ Raises:
+ ValueError: If exchange is not supported for WebSocket streaming
+ """
+ ws_cls = SUPPORTED_WS.get(exchange_id)
+ if ws_cls is None:
+ supported = ", ".join(sorted(SUPPORTED_WS.keys()))
+ raise ValueError(
+ f"WebSocket streaming not supported for '{exchange_id}'. "
+ f"Supported: {supported}. "
+ f"Use REST polling as fallback for unsupported exchanges."
+ )
+ return ws_cls(**kwargs)
diff --git a/services/data-collector/tests/test_ws_factory.py b/services/data-collector/tests/test_ws_factory.py
new file mode 100644
index 0000000..ef0449c
--- /dev/null
+++ b/services/data-collector/tests/test_ws_factory.py
@@ -0,0 +1,18 @@
+"""Tests for WebSocket factory."""
+import pytest
+from data_collector.ws_factory import create_websocket, SUPPORTED_WS
+from data_collector.binance_ws import BinanceWebSocket
+
+
+def test_create_binance_ws():
+ ws = create_websocket("binance", symbols=["BTCUSDT"], timeframe="1m", on_candle=lambda c: None)
+ assert isinstance(ws, BinanceWebSocket)
+
+
+def test_create_unsupported_exchange():
+ with pytest.raises(ValueError, match="not supported"):
+ create_websocket("unsupported_exchange", symbols=["BTCUSDT"], timeframe="1m", on_candle=lambda c: None)
+
+
+def test_supported_exchanges():
+ assert "binance" in SUPPORTED_WS
diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py
index 1eeee7b..f111c75 100644
--- a/services/order-executor/src/order_executor/main.py
+++ b/services/order-executor/src/order_executor/main.py
@@ -16,6 +16,11 @@ from order_executor.config import ExecutorConfig
from order_executor.executor import OrderExecutor
from order_executor.risk_manager import RiskManager
+# Health check port: base (HEALTH_PORT, default 8080) + offset
+# data-collector: +0 (8080), strategy-engine: +1 (8081)
+# order-executor: +2 (8082), portfolio-manager: +3 (8083)
+HEALTH_PORT_OFFSET = 2
+
async def run() -> None:
config = ExecutorConfig()
@@ -61,7 +66,7 @@ async def run() -> None:
stream = "signals"
health = HealthCheckServer(
- "order-executor", port=config.health_port + 2, auth_token=config.metrics_auth_token
+ "order-executor", port=config.health_port + HEALTH_PORT_OFFSET, auth_token=config.metrics_auth_token
)
health.register_check("redis", broker.ping)
await health.start()
diff --git a/services/strategy-engine/src/strategy_engine/main.py b/services/strategy-engine/src/strategy_engine/main.py
index efd79e9..1ccef6e 100644
--- a/services/strategy-engine/src/strategy_engine/main.py
+++ b/services/strategy-engine/src/strategy_engine/main.py
@@ -16,6 +16,11 @@ from strategy_engine.plugin_loader import load_strategies
# The strategies directory lives alongside the installed package
STRATEGIES_DIR = Path(__file__).parent.parent.parent.parent / "strategies"
+# Health check port: base (HEALTH_PORT, default 8080) + offset
+# data-collector: +0 (8080), strategy-engine: +1 (8081)
+# order-executor: +2 (8082), portfolio-manager: +3 (8083)
+HEALTH_PORT_OFFSET = 1
+
async def process_symbol(engine: StrategyEngine, stream: str, log) -> None:
"""Process candles for a single symbol stream."""
@@ -47,7 +52,7 @@ async def run() -> None:
engine = StrategyEngine(broker=broker, strategies=strategies)
health = HealthCheckServer(
- "strategy-engine", port=config.health_port + 1, auth_token=config.metrics_auth_token
+ "strategy-engine", port=config.health_port + HEALTH_PORT_OFFSET, auth_token=config.metrics_auth_token
)
health.register_check("redis", broker.ping)
await health.start()