summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--services/data-collector/src/data_collector/main.py33
-rw-r--r--services/order-executor/src/order_executor/executor.py18
-rw-r--r--services/order-executor/src/order_executor/main.py33
-rw-r--r--services/order-executor/tests/test_executor.py3
-rw-r--r--services/portfolio-manager/src/portfolio_manager/main.py45
-rw-r--r--services/strategy-engine/src/strategy_engine/main.py34
-rw-r--r--shared/src/shared/broker.py4
7 files changed, 124 insertions, 46 deletions
diff --git a/services/data-collector/src/data_collector/main.py b/services/data-collector/src/data_collector/main.py
index adf1e96..c778503 100644
--- a/services/data-collector/src/data_collector/main.py
+++ b/services/data-collector/src/data_collector/main.py
@@ -1,21 +1,24 @@
"""Data Collector Service entry point."""
import asyncio
-import logging
from shared.broker import RedisBroker
from shared.db import Database
+from shared.healthcheck import HealthCheckServer
+from shared.logging import setup_logging
+from shared.metrics import ServiceMetrics
+from shared.notifier import TelegramNotifier
from data_collector.binance_ws import BinanceWebSocket
from data_collector.config import CollectorConfig
from data_collector.storage import CandleStorage
-logging.basicConfig(level=logging.INFO)
-logger = logging.getLogger(__name__)
-
async def run() -> None:
"""Initialise all components and start the WebSocket collector."""
config = CollectorConfig()
+ log = setup_logging("data-collector", config.log_level, config.log_format)
+ metrics = ServiceMetrics("data_collector")
+ notifier = TelegramNotifier(bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id)
db = Database(config.database_url)
await db.connect()
@@ -25,8 +28,9 @@ async def run() -> None:
storage = CandleStorage(db=db, broker=broker)
async def on_candle(candle):
- logger.info("Candle received: %s %s %s", candle.symbol, candle.timeframe, candle.open_time)
+ log.info("candle_received", symbol=candle.symbol, timeframe=candle.timeframe, open_time=str(candle.open_time))
await storage.store(candle)
+ metrics.events_processed.labels(service="data-collector", event_type="candle").inc()
# Use the first configured timeframe for the WebSocket subscription.
timeframe = config.timeframes[0] if config.timeframes else "1m"
@@ -37,15 +41,26 @@ async def run() -> None:
on_candle=on_candle,
)
- logger.info(
- "Starting data collector for symbols=%s timeframe=%s",
- config.symbols,
- timeframe,
+ health = HealthCheckServer("data-collector", port=config.health_port)
+ health.register_check("redis", broker.ping)
+ await health.start()
+ metrics.service_up.labels(service="data-collector").set(1)
+
+ log.info(
+ "service_started",
+ symbols=config.symbols,
+ timeframe=timeframe,
)
try:
await ws.start()
+ except Exception as exc:
+ log.error("fatal_error", error=str(exc))
+ await notifier.send_error(str(exc), "data-collector")
+ raise
finally:
+ metrics.service_up.labels(service="data-collector").set(0)
+ await notifier.close()
await broker.close()
await db.close()
diff --git a/services/order-executor/src/order_executor/executor.py b/services/order-executor/src/order_executor/executor.py
index 16ae52c..099520d 100644
--- a/services/order-executor/src/order_executor/executor.py
+++ b/services/order-executor/src/order_executor/executor.py
@@ -1,5 +1,5 @@
"""Order execution logic."""
-import logging
+import structlog
from datetime import datetime, timezone
from decimal import Decimal
from typing import Any, Optional
@@ -8,10 +8,11 @@ from shared.broker import RedisBroker
from shared.db import Database
from shared.events import OrderEvent
from shared.models import Order, OrderSide, OrderStatus, OrderType, Signal
+from shared.notifier import TelegramNotifier
from order_executor.risk_manager import RiskManager
-logger = logging.getLogger(__name__)
+logger = structlog.get_logger()
class OrderExecutor:
@@ -23,12 +24,14 @@ class OrderExecutor:
risk_manager: RiskManager,
broker: RedisBroker,
db: Database,
+ notifier: TelegramNotifier,
dry_run: bool = True,
) -> None:
self.exchange = exchange
self.risk_manager = risk_manager
self.broker = broker
self.db = db
+ self.notifier = notifier
self.dry_run = dry_run
async def execute(self, signal: Signal) -> Optional[Order]:
@@ -56,7 +59,7 @@ class OrderExecutor:
if not result.allowed:
logger.warning(
- "Risk check rejected signal %s: %s", signal.id, result.reason
+ "risk_check_rejected", signal_id=str(signal.id), reason=result.reason
)
return None
@@ -74,7 +77,7 @@ class OrderExecutor:
if self.dry_run:
order.status = OrderStatus.FILLED
order.filled_at = datetime.now(timezone.utc)
- logger.info("[DRY RUN] Order filled: %s %s %s", order.side, order.quantity, order.symbol)
+ logger.info("order_filled_dry_run", side=str(order.side), quantity=str(order.quantity), symbol=order.symbol)
else:
try:
await self.exchange.create_order(
@@ -85,10 +88,10 @@ class OrderExecutor:
)
order.status = OrderStatus.FILLED
order.filled_at = datetime.now(timezone.utc)
- logger.info("Order filled: %s %s %s", order.side, order.quantity, order.symbol)
+ logger.info("order_filled", side=str(order.side), quantity=str(order.quantity), symbol=order.symbol)
except Exception as exc:
order.status = OrderStatus.FAILED
- logger.error("Order failed for signal %s: %s", signal.id, exc)
+ logger.error("order_failed", signal_id=str(signal.id), error=str(exc))
# Persist to DB
await self.db.insert_order(order)
@@ -97,4 +100,7 @@ class OrderExecutor:
event = OrderEvent(data=order)
await self.broker.publish("orders", event.to_dict())
+ # Notify via Telegram
+ await self.notifier.send_order(order)
+
return order
diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py
index b57c513..7f0578d 100644
--- a/services/order-executor/src/order_executor/main.py
+++ b/services/order-executor/src/order_executor/main.py
@@ -1,6 +1,5 @@
"""Order Executor Service entry point."""
import asyncio
-import logging
from decimal import Decimal
import ccxt.async_support as ccxt
@@ -8,18 +7,21 @@ import ccxt.async_support as ccxt
from shared.broker import RedisBroker
from shared.db import Database
from shared.events import Event, EventType
+from shared.healthcheck import HealthCheckServer
+from shared.logging import setup_logging
+from shared.metrics import ServiceMetrics
+from shared.notifier import TelegramNotifier
from order_executor.config import ExecutorConfig
from order_executor.executor import OrderExecutor
from order_executor.risk_manager import RiskManager
-logging.basicConfig(level=logging.INFO)
-logger = logging.getLogger(__name__)
-
async def run() -> None:
config = ExecutorConfig()
- logging.getLogger().setLevel(config.log_level)
+ log = setup_logging("order-executor", config.log_level, config.log_format)
+ metrics = ServiceMetrics("order_executor")
+ notifier = TelegramNotifier(bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id)
db = Database(config.database_url)
await db.connect()
@@ -45,12 +47,19 @@ async def run() -> None:
risk_manager=risk_manager,
broker=broker,
db=db,
+ notifier=notifier,
dry_run=config.dry_run,
)
last_id = "$"
stream = "signals"
- logger.info("Order executor started, listening on stream=%s dry_run=%s", stream, config.dry_run)
+
+ health = HealthCheckServer("order-executor", port=config.health_port + 2)
+ health.register_check("redis", broker.ping)
+ await health.start()
+ metrics.service_up.labels(service="order-executor").set(1)
+
+ log.info("service_started", stream=stream, dry_run=config.dry_run)
try:
while True:
@@ -60,16 +69,24 @@ async def run() -> None:
event = Event.from_dict(msg)
if event.type == EventType.SIGNAL:
signal = event.data
- logger.info("Processing signal %s for %s", signal.id, signal.symbol)
+ log.info("processing_signal", signal_id=str(signal.id), symbol=signal.symbol)
await executor.execute(signal)
+ metrics.events_processed.labels(service="order-executor", event_type="signal").inc()
except Exception as exc:
- logger.error("Failed to process message: %s", exc)
+ log.error("message_processing_failed", error=str(exc))
+ metrics.errors_total.labels(service="order-executor", error_type="processing").inc()
if messages:
# Advance last_id to avoid re-reading — broker.read returns decoded dicts,
# so we track progress by re-reading with "0" for replaying or "$" for new only.
# Since we block on "$" we get only new messages each iteration.
pass
+ except Exception as exc:
+ log.error("fatal_error", error=str(exc))
+ await notifier.send_error(str(exc), "order-executor")
+ raise
finally:
+ metrics.service_up.labels(service="order-executor").set(0)
+ await notifier.close()
await broker.close()
await db.close()
await exchange.close()
diff --git a/services/order-executor/tests/test_executor.py b/services/order-executor/tests/test_executor.py
index 5b18992..4836ffb 100644
--- a/services/order-executor/tests/test_executor.py
+++ b/services/order-executor/tests/test_executor.py
@@ -58,6 +58,7 @@ async def test_executor_places_order_when_risk_passes():
risk_manager=risk_manager,
broker=broker,
db=db,
+ notifier=AsyncMock(),
dry_run=False,
)
@@ -84,6 +85,7 @@ async def test_executor_rejects_when_risk_fails():
risk_manager=risk_manager,
broker=broker,
db=db,
+ notifier=AsyncMock(),
dry_run=False,
)
@@ -109,6 +111,7 @@ async def test_executor_dry_run_does_not_call_exchange():
risk_manager=risk_manager,
broker=broker,
db=db,
+ notifier=AsyncMock(),
dry_run=True,
)
diff --git a/services/portfolio-manager/src/portfolio_manager/main.py b/services/portfolio-manager/src/portfolio_manager/main.py
index cb7e6a8..56624f7 100644
--- a/services/portfolio-manager/src/portfolio_manager/main.py
+++ b/services/portfolio-manager/src/portfolio_manager/main.py
@@ -1,26 +1,35 @@
"""Portfolio Manager Service entry point."""
import asyncio
-import logging
from shared.broker import RedisBroker
from shared.events import Event, OrderEvent
+from shared.healthcheck import HealthCheckServer
+from shared.logging import setup_logging
+from shared.metrics import ServiceMetrics
+from shared.notifier import TelegramNotifier
from portfolio_manager.config import PortfolioConfig
from portfolio_manager.portfolio import PortfolioTracker
-logging.basicConfig(level=logging.INFO)
-logger = logging.getLogger(__name__)
-
ORDERS_STREAM = "orders"
async def run() -> None:
config = PortfolioConfig()
+ log = setup_logging("portfolio-manager", config.log_level, config.log_format)
+ metrics = ServiceMetrics("portfolio_manager")
+ notifier = TelegramNotifier(bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id)
+
broker = RedisBroker(config.redis_url)
tracker = PortfolioTracker()
+ health = HealthCheckServer("portfolio-manager", port=config.health_port + 3)
+ health.register_check("redis", broker.ping)
+ await health.start()
+ metrics.service_up.labels(service="portfolio-manager").set(1)
+
last_id = "$"
- logger.info("Portfolio manager started, listening on stream=%s", ORDERS_STREAM)
+ log.info("service_started", stream=ORDERS_STREAM)
try:
while True:
@@ -31,20 +40,28 @@ async def run() -> None:
if isinstance(event, OrderEvent):
order = event.data
tracker.apply_order(order)
- logger.info(
- "Applied order symbol=%s side=%s qty=%s price=%s",
- order.symbol,
- order.side,
- order.quantity,
- order.price,
+ log.info(
+ "order_applied",
+ symbol=order.symbol,
+ side=str(order.side),
+ quantity=str(order.quantity),
+ price=str(order.price),
)
positions = tracker.get_all_positions()
- logger.info("Current positions count=%d", len(positions))
- except Exception:
- logger.exception("Failed to process message: %s", msg)
+ log.info("positions_updated", count=len(positions))
+ metrics.events_processed.labels(service="portfolio-manager", event_type="order").inc()
+ except Exception as exc:
+ log.exception("message_processing_failed", error=str(exc))
+ metrics.errors_total.labels(service="portfolio-manager", error_type="processing").inc()
# Update last_id to the latest processed message id if broker returns ids
# Since broker.read returns parsed payloads (not ids), we use "$" to get new msgs
+ except Exception as exc:
+ log.error("fatal_error", error=str(exc))
+ await notifier.send_error(str(exc), "portfolio-manager")
+ raise
finally:
+ metrics.service_up.labels(service="portfolio-manager").set(0)
+ await notifier.close()
await broker.close()
diff --git a/services/strategy-engine/src/strategy_engine/main.py b/services/strategy-engine/src/strategy_engine/main.py
index 83bb867..2e3c4ac 100644
--- a/services/strategy-engine/src/strategy_engine/main.py
+++ b/services/strategy-engine/src/strategy_engine/main.py
@@ -1,23 +1,27 @@
"""Strategy Engine Service entry point."""
import asyncio
-import logging
from pathlib import Path
from shared.broker import RedisBroker
+from shared.healthcheck import HealthCheckServer
+from shared.logging import setup_logging
+from shared.metrics import ServiceMetrics
+from shared.notifier import TelegramNotifier
from strategy_engine.config import StrategyConfig
from strategy_engine.engine import StrategyEngine
from strategy_engine.plugin_loader import load_strategies
-logging.basicConfig(level=logging.INFO)
-logger = logging.getLogger(__name__)
-
# The strategies directory lives alongside the installed package
STRATEGIES_DIR = Path(__file__).parent.parent.parent.parent / "strategies"
async def run() -> None:
config = StrategyConfig()
+ log = setup_logging("strategy-engine", config.log_level, config.log_format)
+ metrics = ServiceMetrics("strategy_engine")
+ notifier = TelegramNotifier(bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id)
+
broker = RedisBroker(config.redis_url)
strategies_dir = STRATEGIES_DIR
@@ -28,23 +32,35 @@ async def run() -> None:
params = config.strategy_params.get(strategy.name, {})
strategy.configure(params)
- logger.info(
- "Loaded %d strategies: %s",
- len(strategies),
- [s.name for s in strategies],
+ log.info(
+ "strategies_loaded",
+ count=len(strategies),
+ names=[s.name for s in strategies],
)
engine = StrategyEngine(broker=broker, strategies=strategies)
+ health = HealthCheckServer("strategy-engine", port=config.health_port + 1)
+ health.register_check("redis", broker.ping)
+ await health.start()
+ metrics.service_up.labels(service="strategy-engine").set(1)
+
try:
for symbol in config.symbols:
stream = f"candles.{symbol.replace('/', '_')}"
last_id = "$"
- logger.info("Starting engine loop for stream=%s", stream)
+ log.info("engine_loop_started", stream=stream)
while True:
last_id = await engine.process_once(stream, last_id)
+ metrics.events_processed.labels(service="strategy-engine", event_type="candle").inc()
+ except Exception as exc:
+ log.error("fatal_error", error=str(exc))
+ await notifier.send_error(str(exc), "strategy-engine")
+ raise
finally:
+ metrics.service_up.labels(service="strategy-engine").set(0)
+ await notifier.close()
await broker.close()
diff --git a/shared/src/shared/broker.py b/shared/src/shared/broker.py
index 9a50441..0f87b06 100644
--- a/shared/src/shared/broker.py
+++ b/shared/src/shared/broker.py
@@ -38,6 +38,10 @@ class RedisBroker:
messages.append(json.loads(payload))
return messages
+ async def ping(self) -> bool:
+ """Ping the Redis server; return True if reachable."""
+ return await self._redis.ping()
+
async def close(self) -> None:
"""Close the Redis connection."""
await self._redis.aclose()