diff options
Diffstat (limited to 'services')
| -rw-r--r-- | services/data-collector/pyproject.toml | 8 | ||||
| -rw-r--r-- | services/data-collector/src/data_collector/config.py | 6 | ||||
| -rw-r--r-- | services/data-collector/src/data_collector/main.py | 111 | ||||
| -rw-r--r-- | services/order-executor/pyproject.toml | 2 | ||||
| -rw-r--r-- | services/order-executor/src/order_executor/executor.py | 14 | ||||
| -rw-r--r-- | services/order-executor/src/order_executor/main.py | 72 | ||||
| -rw-r--r-- | services/order-executor/tests/test_executor.py | 20 |
7 files changed, 123 insertions, 110 deletions
diff --git a/services/data-collector/pyproject.toml b/services/data-collector/pyproject.toml index 5fba78f..48282c3 100644 --- a/services/data-collector/pyproject.toml +++ b/services/data-collector/pyproject.toml @@ -1,13 +1,9 @@ [project] name = "data-collector" version = "0.1.0" -description = "Binance market data collector service" +description = "Alpaca market data collector service" requires-python = ">=3.12" -dependencies = [ - "ccxt>=4.0", - "websockets>=12.0", - "trading-shared", -] +dependencies = ["trading-shared"] [project.optional-dependencies] dev = [ diff --git a/services/data-collector/src/data_collector/config.py b/services/data-collector/src/data_collector/config.py index 1e080e5..4761013 100644 --- a/services/data-collector/src/data_collector/config.py +++ b/services/data-collector/src/data_collector/config.py @@ -1,6 +1,8 @@ +"""Data Collector configuration.""" from shared.config import Settings class CollectorConfig(Settings): - symbols: list[str] = ["BTC/USDT"] - timeframes: list[str] = ["1m"] + symbols: list[str] = ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA"] + timeframes: list[str] = ["5Min"] + poll_interval_seconds: int = 60 diff --git a/services/data-collector/src/data_collector/main.py b/services/data-collector/src/data_collector/main.py index eebe14a..38f8759 100644 --- a/services/data-collector/src/data_collector/main.py +++ b/services/data-collector/src/data_collector/main.py @@ -1,59 +1,73 @@ -"""Data Collector Service entry point.""" - +"""Data Collector Service — fetches US stock data from Alpaca.""" import asyncio +from shared.alpaca import AlpacaClient from shared.broker import RedisBroker +from shared.config import Settings from shared.db import Database +from shared.events import CandleEvent from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics +from shared.models import Candle from shared.notifier import TelegramNotifier from data_collector.config import CollectorConfig -from data_collector.storage import CandleStorage -from data_collector.ws_factory import create_websocket - -# Health check port: base (HEALTH_PORT, default 8080) + offset -# data-collector: +0 (8080), strategy-engine: +1 (8081) -# order-executor: +2 (8082), portfolio-manager: +3 (8083) +# Health check port: base + 0 HEALTH_PORT_OFFSET = 0 +async def fetch_latest_bars( + alpaca: AlpacaClient, + symbols: list[str], + timeframe: str, + log, +) -> list[Candle]: + """Fetch latest bar for each symbol from Alpaca.""" + candles = [] + for symbol in symbols: + try: + bars = await alpaca.get_bars(symbol, timeframe=timeframe, limit=1) + if bars: + bar = bars[-1] + from datetime import datetime + from decimal import Decimal + candle = Candle( + symbol=symbol, + timeframe=timeframe, + open_time=datetime.fromisoformat(bar["t"].replace("Z", "+00:00")), + open=Decimal(str(bar["o"])), + high=Decimal(str(bar["h"])), + low=Decimal(str(bar["l"])), + close=Decimal(str(bar["c"])), + volume=Decimal(str(bar["v"])), + ) + candles.append(candle) + except Exception as exc: + log.warning("fetch_bar_failed", symbol=symbol, error=str(exc)) + return candles + + async def run() -> None: - """Initialise all components and start the WebSocket collector.""" config = CollectorConfig() log = setup_logging("data-collector", config.log_level, config.log_format) metrics = ServiceMetrics("data_collector") + notifier = TelegramNotifier( - bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id + bot_token=config.telegram_bot_token, + chat_id=config.telegram_chat_id, ) db = Database(config.database_url) await db.connect() - await db.init_tables() broker = RedisBroker(config.redis_url) - storage = CandleStorage(db=db, broker=broker) - - async def on_candle(candle): - log.info( - "candle_received", - symbol=candle.symbol, - timeframe=candle.timeframe, - open_time=str(candle.open_time), - ) - await storage.store(candle) - metrics.events_processed.labels(service="data-collector", event_type="candle").inc() - - # Use the first configured timeframe for the WebSocket subscription. - timeframe = config.timeframes[0] if config.timeframes else "1m" - - ws = create_websocket( - exchange_id=config.exchange_id, - symbols=config.symbols, - timeframe=timeframe, - on_candle=on_candle, + + alpaca = AlpacaClient( + api_key=config.alpaca_api_key, + api_secret=config.alpaca_api_secret, + paper=config.alpaca_paper, ) health = HealthCheckServer( @@ -61,18 +75,38 @@ async def run() -> None: port=config.health_port + HEALTH_PORT_OFFSET, auth_token=config.metrics_auth_token, ) - health.register_check("redis", broker.ping) await health.start() metrics.service_up.labels(service="data-collector").set(1) - log.info( - "service_started", - symbols=config.symbols, - timeframe=timeframe, - ) + poll_interval = int(getattr(config, "poll_interval_seconds", 60)) + symbols = config.symbols + timeframe = config.timeframes[0] if config.timeframes else "1Day" + + log.info("starting", symbols=symbols, timeframe=timeframe, poll_interval=poll_interval) try: - await ws.start() + while True: + # Check if market is open + try: + is_open = await alpaca.is_market_open() + except Exception: + is_open = False + + if is_open: + candles = await fetch_latest_bars(alpaca, symbols, timeframe, log) + for candle in candles: + await db.insert_candle(candle) + event = CandleEvent(data=candle) + stream = f"candles.{candle.symbol}" + await broker.publish(stream, event.to_dict()) + metrics.events_processed.labels( + service="data-collector", event_type="candle" + ).inc() + log.info("candle_stored", symbol=candle.symbol, close=str(candle.close)) + else: + log.debug("market_closed") + + await asyncio.sleep(poll_interval) except Exception as exc: log.error("fatal_error", error=str(exc)) await notifier.send_error(str(exc), "data-collector") @@ -80,6 +114,7 @@ async def run() -> None: finally: metrics.service_up.labels(service="data-collector").set(0) await notifier.close() + await alpaca.close() await broker.close() await db.close() diff --git a/services/order-executor/pyproject.toml b/services/order-executor/pyproject.toml index eed4fef..7bb1030 100644 --- a/services/order-executor/pyproject.toml +++ b/services/order-executor/pyproject.toml @@ -3,7 +3,7 @@ name = "order-executor" version = "0.1.0" description = "Order execution service with risk management" requires-python = ">=3.12" -dependencies = ["ccxt>=4.0", "trading-shared"] +dependencies = ["trading-shared"] [project.optional-dependencies] dev = ["pytest>=8.0", "pytest-asyncio>=0.23"] diff --git a/services/order-executor/src/order_executor/executor.py b/services/order-executor/src/order_executor/executor.py index 80f441d..a71e762 100644 --- a/services/order-executor/src/order_executor/executor.py +++ b/services/order-executor/src/order_executor/executor.py @@ -37,12 +37,8 @@ class OrderExecutor: async def execute(self, signal: Signal) -> Optional[Order]: """Run risk checks and place an order for the given signal.""" - # Fetch current balance from exchange - balance_data = await self.exchange.fetch_balance() - # Use USDT (or quote currency) free balance as available capital - free_balances = balance_data.get("free", {}) - quote_currency = signal.symbol.split("/")[-1] if "/" in signal.symbol else "USDT" - balance = Decimal(str(free_balances.get(quote_currency, 0))) + # Fetch buying power from Alpaca + balance = await self.exchange.get_buying_power() # Fetch current positions positions = {} @@ -84,11 +80,11 @@ class OrderExecutor: ) else: try: - await self.exchange.create_order( + await self.exchange.submit_order( symbol=signal.symbol, - type="market", + qty=float(signal.quantity), side=signal.side.value.lower(), - amount=float(signal.quantity), + type="market", ) order.status = OrderStatus.FILLED order.filled_at = datetime.now(timezone.utc) diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py index 68e14aa..3e098c3 100644 --- a/services/order-executor/src/order_executor/main.py +++ b/services/order-executor/src/order_executor/main.py @@ -1,13 +1,12 @@ """Order Executor Service entry point.""" - import asyncio from decimal import Decimal +from shared.alpaca import AlpacaClient from shared.broker import RedisBroker from shared.db import Database from shared.events import Event, EventType from shared.healthcheck import HealthCheckServer -from shared.exchange import create_exchange from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.notifier import TelegramNotifier @@ -16,9 +15,7 @@ from order_executor.config import ExecutorConfig from order_executor.executor import OrderExecutor from order_executor.risk_manager import RiskManager -# Health check port: base (HEALTH_PORT, default 8080) + offset -# data-collector: +0 (8080), strategy-engine: +1 (8081) -# order-executor: +2 (8082), portfolio-manager: +3 (8083) +# Health check port: base + 2 HEALTH_PORT_OFFSET = 2 @@ -26,21 +23,21 @@ async def run() -> None: config = ExecutorConfig() log = setup_logging("order-executor", config.log_level, config.log_format) metrics = ServiceMetrics("order_executor") + notifier = TelegramNotifier( - bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id + bot_token=config.telegram_bot_token, + chat_id=config.telegram_chat_id, ) db = Database(config.database_url) await db.connect() - await db.init_tables() broker = RedisBroker(config.redis_url) - exchange = create_exchange( - exchange_id=config.exchange_id, - api_key=config.binance_api_key, - api_secret=config.binance_api_secret, - sandbox=config.exchange_sandbox, + alpaca = AlpacaClient( + api_key=config.alpaca_api_key, + api_secret=config.alpaca_api_secret, + paper=config.alpaca_paper, ) risk_manager = RiskManager( @@ -63,7 +60,7 @@ async def run() -> None: ) executor = OrderExecutor( - exchange=exchange, + exchange=alpaca, risk_manager=risk_manager, broker=broker, db=db, @@ -71,41 +68,34 @@ async def run() -> None: dry_run=config.dry_run, ) - GROUP = "order-executor" - CONSUMER = "executor-1" - stream = "signals" - health = HealthCheckServer( "order-executor", port=config.health_port + HEALTH_PORT_OFFSET, auth_token=config.metrics_auth_token, ) - health.register_check("redis", broker.ping) await health.start() metrics.service_up.labels(service="order-executor").set(1) - log.info("service_started", stream=stream, dry_run=config.dry_run) + GROUP = "order-executor" + CONSUMER = "executor-1" + stream = "signals" await broker.ensure_group(stream, GROUP) - # Process pending messages first (from previous crash) - pending = await broker.read_pending(stream, GROUP, CONSUMER) - for msg_id, msg in pending: - try: - event = Event.from_dict(msg) - if event.type == EventType.SIGNAL: - signal = event.data - log.info( - "processing_pending_signal", signal_id=str(signal.id), symbol=signal.symbol - ) - await executor.execute(signal) - metrics.events_processed.labels(service="order-executor", event_type="signal").inc() - await broker.ack(stream, GROUP, msg_id) - except Exception as exc: - log.error("pending_process_failed", error=str(exc), msg_id=msg_id) - metrics.errors_total.labels(service="order-executor", error_type="processing").inc() + log.info("started", stream=stream, dry_run=config.dry_run) try: + # Process pending messages first + pending = await broker.read_pending(stream, GROUP, CONSUMER) + for msg_id, msg in pending: + try: + event = Event.from_dict(msg) + if event.type == EventType.SIGNAL: + await executor.execute(event.data) + await broker.ack(stream, GROUP, msg_id) + except Exception as exc: + log.error("pending_failed", error=str(exc), msg_id=msg_id) + while True: messages = await broker.read_group(stream, GROUP, CONSUMER, count=10, block=5000) for msg_id, msg in messages: @@ -113,29 +103,23 @@ async def run() -> None: event = Event.from_dict(msg) if event.type == EventType.SIGNAL: signal = event.data - log.info( - "processing_signal", signal_id=str(signal.id), symbol=signal.symbol - ) + log.info("processing_signal", signal_id=signal.id, symbol=signal.symbol) await executor.execute(signal) metrics.events_processed.labels( service="order-executor", event_type="signal" ).inc() await broker.ack(stream, GROUP, msg_id) except Exception as exc: - log.error("message_processing_failed", error=str(exc), msg_id=msg_id) + log.error("process_failed", error=str(exc)) metrics.errors_total.labels( service="order-executor", error_type="processing" ).inc() - except Exception as exc: - log.error("fatal_error", error=str(exc)) - await notifier.send_error(str(exc), "order-executor") - raise finally: metrics.service_up.labels(service="order-executor").set(0) await notifier.close() await broker.close() await db.close() - await exchange.close() + await alpaca.close() def main() -> None: diff --git a/services/order-executor/tests/test_executor.py b/services/order-executor/tests/test_executor.py index e64b6c0..dd823d7 100644 --- a/services/order-executor/tests/test_executor.py +++ b/services/order-executor/tests/test_executor.py @@ -13,7 +13,7 @@ from order_executor.risk_manager import RiskCheckResult, RiskManager def make_signal(side: OrderSide = OrderSide.BUY, price: str = "100", quantity: str = "1") -> Signal: return Signal( strategy="test", - symbol="BTC/USDT", + symbol="AAPL", side=side, price=Decimal(price), quantity=Decimal(quantity), @@ -21,10 +21,10 @@ def make_signal(side: OrderSide = OrderSide.BUY, price: str = "100", quantity: s ) -def make_mock_exchange(free_usdt: float = 10000.0) -> AsyncMock: +def make_mock_exchange(buying_power: str = "10000") -> AsyncMock: exchange = AsyncMock() - exchange.fetch_balance.return_value = {"free": {"USDT": free_usdt}} - exchange.create_order = AsyncMock(return_value={"id": "exchange-order-123"}) + exchange.get_buying_power = AsyncMock(return_value=Decimal(buying_power)) + exchange.submit_order = AsyncMock(return_value={"id": "alpaca-order-123"}) return exchange @@ -48,7 +48,7 @@ def make_mock_db() -> AsyncMock: @pytest.mark.asyncio async def test_executor_places_order_when_risk_passes(): - """When risk check passes, create_order is called and order status is FILLED.""" + """When risk check passes, submit_order is called and order status is FILLED.""" exchange = make_mock_exchange() risk_manager = make_mock_risk_manager(allowed=True) broker = make_mock_broker() @@ -68,14 +68,14 @@ async def test_executor_places_order_when_risk_passes(): assert order is not None assert order.status == OrderStatus.FILLED - exchange.create_order.assert_called_once() + exchange.submit_order.assert_called_once() db.insert_order.assert_called_once_with(order) broker.publish.assert_called_once() @pytest.mark.asyncio async def test_executor_rejects_when_risk_fails(): - """When risk check fails, create_order is not called and None is returned.""" + """When risk check fails, submit_order is not called and None is returned.""" exchange = make_mock_exchange() risk_manager = make_mock_risk_manager(allowed=False, reason="Position size exceeded") broker = make_mock_broker() @@ -94,14 +94,14 @@ async def test_executor_rejects_when_risk_fails(): order = await executor.execute(signal) assert order is None - exchange.create_order.assert_not_called() + exchange.submit_order.assert_not_called() db.insert_order.assert_not_called() broker.publish.assert_not_called() @pytest.mark.asyncio async def test_executor_dry_run_does_not_call_exchange(): - """In dry-run mode, risk passes, order is FILLED, but exchange.create_order is NOT called.""" + """In dry-run mode, risk passes, order is FILLED, but exchange.submit_order is NOT called.""" exchange = make_mock_exchange() risk_manager = make_mock_risk_manager(allowed=True) broker = make_mock_broker() @@ -121,6 +121,6 @@ async def test_executor_dry_run_does_not_call_exchange(): assert order is not None assert order.status == OrderStatus.FILLED - exchange.create_order.assert_not_called() + exchange.submit_order.assert_not_called() db.insert_order.assert_called_once_with(order) broker.publish.assert_called_once() |
