diff options
| author | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-02 10:12:06 +0900 |
|---|---|---|
| committer | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-02 10:12:06 +0900 |
| commit | 35120795147adf53de59b7f2a3c8aa14adec9a56 (patch) | |
| tree | fb747eae72d40fbf0520d6bca917cd0e0ba87b66 /services/order-executor | |
| parent | 47465828d839c460a6af12894451539908d76c26 (diff) | |
refactor: update data-collector and order-executor for Alpaca API
Diffstat (limited to 'services/order-executor')
| -rw-r--r-- | services/order-executor/pyproject.toml | 2 | ||||
| -rw-r--r-- | services/order-executor/src/order_executor/executor.py | 14 | ||||
| -rw-r--r-- | services/order-executor/src/order_executor/main.py | 72 | ||||
| -rw-r--r-- | services/order-executor/tests/test_executor.py | 20 |
4 files changed, 44 insertions, 64 deletions
diff --git a/services/order-executor/pyproject.toml b/services/order-executor/pyproject.toml index eed4fef..7bb1030 100644 --- a/services/order-executor/pyproject.toml +++ b/services/order-executor/pyproject.toml @@ -3,7 +3,7 @@ name = "order-executor" version = "0.1.0" description = "Order execution service with risk management" requires-python = ">=3.12" -dependencies = ["ccxt>=4.0", "trading-shared"] +dependencies = ["trading-shared"] [project.optional-dependencies] dev = ["pytest>=8.0", "pytest-asyncio>=0.23"] diff --git a/services/order-executor/src/order_executor/executor.py b/services/order-executor/src/order_executor/executor.py index 80f441d..a71e762 100644 --- a/services/order-executor/src/order_executor/executor.py +++ b/services/order-executor/src/order_executor/executor.py @@ -37,12 +37,8 @@ class OrderExecutor: async def execute(self, signal: Signal) -> Optional[Order]: """Run risk checks and place an order for the given signal.""" - # Fetch current balance from exchange - balance_data = await self.exchange.fetch_balance() - # Use USDT (or quote currency) free balance as available capital - free_balances = balance_data.get("free", {}) - quote_currency = signal.symbol.split("/")[-1] if "/" in signal.symbol else "USDT" - balance = Decimal(str(free_balances.get(quote_currency, 0))) + # Fetch buying power from Alpaca + balance = await self.exchange.get_buying_power() # Fetch current positions positions = {} @@ -84,11 +80,11 @@ class OrderExecutor: ) else: try: - await self.exchange.create_order( + await self.exchange.submit_order( symbol=signal.symbol, - type="market", + qty=float(signal.quantity), side=signal.side.value.lower(), - amount=float(signal.quantity), + type="market", ) order.status = OrderStatus.FILLED order.filled_at = datetime.now(timezone.utc) diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py index 68e14aa..3e098c3 100644 --- a/services/order-executor/src/order_executor/main.py +++ b/services/order-executor/src/order_executor/main.py @@ -1,13 +1,12 @@ """Order Executor Service entry point.""" - import asyncio from decimal import Decimal +from shared.alpaca import AlpacaClient from shared.broker import RedisBroker from shared.db import Database from shared.events import Event, EventType from shared.healthcheck import HealthCheckServer -from shared.exchange import create_exchange from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.notifier import TelegramNotifier @@ -16,9 +15,7 @@ from order_executor.config import ExecutorConfig from order_executor.executor import OrderExecutor from order_executor.risk_manager import RiskManager -# Health check port: base (HEALTH_PORT, default 8080) + offset -# data-collector: +0 (8080), strategy-engine: +1 (8081) -# order-executor: +2 (8082), portfolio-manager: +3 (8083) +# Health check port: base + 2 HEALTH_PORT_OFFSET = 2 @@ -26,21 +23,21 @@ async def run() -> None: config = ExecutorConfig() log = setup_logging("order-executor", config.log_level, config.log_format) metrics = ServiceMetrics("order_executor") + notifier = TelegramNotifier( - bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id + bot_token=config.telegram_bot_token, + chat_id=config.telegram_chat_id, ) db = Database(config.database_url) await db.connect() - await db.init_tables() broker = RedisBroker(config.redis_url) - exchange = create_exchange( - exchange_id=config.exchange_id, - api_key=config.binance_api_key, - api_secret=config.binance_api_secret, - sandbox=config.exchange_sandbox, + alpaca = AlpacaClient( + api_key=config.alpaca_api_key, + api_secret=config.alpaca_api_secret, + paper=config.alpaca_paper, ) risk_manager = RiskManager( @@ -63,7 +60,7 @@ async def run() -> None: ) executor = OrderExecutor( - exchange=exchange, + exchange=alpaca, risk_manager=risk_manager, broker=broker, db=db, @@ -71,41 +68,34 @@ async def run() -> None: dry_run=config.dry_run, ) - GROUP = "order-executor" - CONSUMER = "executor-1" - stream = "signals" - health = HealthCheckServer( "order-executor", port=config.health_port + HEALTH_PORT_OFFSET, auth_token=config.metrics_auth_token, ) - health.register_check("redis", broker.ping) await health.start() metrics.service_up.labels(service="order-executor").set(1) - log.info("service_started", stream=stream, dry_run=config.dry_run) + GROUP = "order-executor" + CONSUMER = "executor-1" + stream = "signals" await broker.ensure_group(stream, GROUP) - # Process pending messages first (from previous crash) - pending = await broker.read_pending(stream, GROUP, CONSUMER) - for msg_id, msg in pending: - try: - event = Event.from_dict(msg) - if event.type == EventType.SIGNAL: - signal = event.data - log.info( - "processing_pending_signal", signal_id=str(signal.id), symbol=signal.symbol - ) - await executor.execute(signal) - metrics.events_processed.labels(service="order-executor", event_type="signal").inc() - await broker.ack(stream, GROUP, msg_id) - except Exception as exc: - log.error("pending_process_failed", error=str(exc), msg_id=msg_id) - metrics.errors_total.labels(service="order-executor", error_type="processing").inc() + log.info("started", stream=stream, dry_run=config.dry_run) try: + # Process pending messages first + pending = await broker.read_pending(stream, GROUP, CONSUMER) + for msg_id, msg in pending: + try: + event = Event.from_dict(msg) + if event.type == EventType.SIGNAL: + await executor.execute(event.data) + await broker.ack(stream, GROUP, msg_id) + except Exception as exc: + log.error("pending_failed", error=str(exc), msg_id=msg_id) + while True: messages = await broker.read_group(stream, GROUP, CONSUMER, count=10, block=5000) for msg_id, msg in messages: @@ -113,29 +103,23 @@ async def run() -> None: event = Event.from_dict(msg) if event.type == EventType.SIGNAL: signal = event.data - log.info( - "processing_signal", signal_id=str(signal.id), symbol=signal.symbol - ) + log.info("processing_signal", signal_id=signal.id, symbol=signal.symbol) await executor.execute(signal) metrics.events_processed.labels( service="order-executor", event_type="signal" ).inc() await broker.ack(stream, GROUP, msg_id) except Exception as exc: - log.error("message_processing_failed", error=str(exc), msg_id=msg_id) + log.error("process_failed", error=str(exc)) metrics.errors_total.labels( service="order-executor", error_type="processing" ).inc() - except Exception as exc: - log.error("fatal_error", error=str(exc)) - await notifier.send_error(str(exc), "order-executor") - raise finally: metrics.service_up.labels(service="order-executor").set(0) await notifier.close() await broker.close() await db.close() - await exchange.close() + await alpaca.close() def main() -> None: diff --git a/services/order-executor/tests/test_executor.py b/services/order-executor/tests/test_executor.py index e64b6c0..dd823d7 100644 --- a/services/order-executor/tests/test_executor.py +++ b/services/order-executor/tests/test_executor.py @@ -13,7 +13,7 @@ from order_executor.risk_manager import RiskCheckResult, RiskManager def make_signal(side: OrderSide = OrderSide.BUY, price: str = "100", quantity: str = "1") -> Signal: return Signal( strategy="test", - symbol="BTC/USDT", + symbol="AAPL", side=side, price=Decimal(price), quantity=Decimal(quantity), @@ -21,10 +21,10 @@ def make_signal(side: OrderSide = OrderSide.BUY, price: str = "100", quantity: s ) -def make_mock_exchange(free_usdt: float = 10000.0) -> AsyncMock: +def make_mock_exchange(buying_power: str = "10000") -> AsyncMock: exchange = AsyncMock() - exchange.fetch_balance.return_value = {"free": {"USDT": free_usdt}} - exchange.create_order = AsyncMock(return_value={"id": "exchange-order-123"}) + exchange.get_buying_power = AsyncMock(return_value=Decimal(buying_power)) + exchange.submit_order = AsyncMock(return_value={"id": "alpaca-order-123"}) return exchange @@ -48,7 +48,7 @@ def make_mock_db() -> AsyncMock: @pytest.mark.asyncio async def test_executor_places_order_when_risk_passes(): - """When risk check passes, create_order is called and order status is FILLED.""" + """When risk check passes, submit_order is called and order status is FILLED.""" exchange = make_mock_exchange() risk_manager = make_mock_risk_manager(allowed=True) broker = make_mock_broker() @@ -68,14 +68,14 @@ async def test_executor_places_order_when_risk_passes(): assert order is not None assert order.status == OrderStatus.FILLED - exchange.create_order.assert_called_once() + exchange.submit_order.assert_called_once() db.insert_order.assert_called_once_with(order) broker.publish.assert_called_once() @pytest.mark.asyncio async def test_executor_rejects_when_risk_fails(): - """When risk check fails, create_order is not called and None is returned.""" + """When risk check fails, submit_order is not called and None is returned.""" exchange = make_mock_exchange() risk_manager = make_mock_risk_manager(allowed=False, reason="Position size exceeded") broker = make_mock_broker() @@ -94,14 +94,14 @@ async def test_executor_rejects_when_risk_fails(): order = await executor.execute(signal) assert order is None - exchange.create_order.assert_not_called() + exchange.submit_order.assert_not_called() db.insert_order.assert_not_called() broker.publish.assert_not_called() @pytest.mark.asyncio async def test_executor_dry_run_does_not_call_exchange(): - """In dry-run mode, risk passes, order is FILLED, but exchange.create_order is NOT called.""" + """In dry-run mode, risk passes, order is FILLED, but exchange.submit_order is NOT called.""" exchange = make_mock_exchange() risk_manager = make_mock_risk_manager(allowed=True) broker = make_mock_broker() @@ -121,6 +121,6 @@ async def test_executor_dry_run_does_not_call_exchange(): assert order is not None assert order.status == OrderStatus.FILLED - exchange.create_order.assert_not_called() + exchange.submit_order.assert_not_called() db.insert_order.assert_called_once_with(order) broker.publish.assert_called_once() |
