summaryrefslogtreecommitdiff
path: root/services
diff options
context:
space:
mode:
authorTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-02 10:12:06 +0900
committerTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-02 10:12:06 +0900
commit35120795147adf53de59b7f2a3c8aa14adec9a56 (patch)
treefb747eae72d40fbf0520d6bca917cd0e0ba87b66 /services
parent47465828d839c460a6af12894451539908d76c26 (diff)
refactor: update data-collector and order-executor for Alpaca API
Diffstat (limited to 'services')
-rw-r--r--services/data-collector/pyproject.toml8
-rw-r--r--services/data-collector/src/data_collector/config.py6
-rw-r--r--services/data-collector/src/data_collector/main.py111
-rw-r--r--services/order-executor/pyproject.toml2
-rw-r--r--services/order-executor/src/order_executor/executor.py14
-rw-r--r--services/order-executor/src/order_executor/main.py72
-rw-r--r--services/order-executor/tests/test_executor.py20
7 files changed, 123 insertions, 110 deletions
diff --git a/services/data-collector/pyproject.toml b/services/data-collector/pyproject.toml
index 5fba78f..48282c3 100644
--- a/services/data-collector/pyproject.toml
+++ b/services/data-collector/pyproject.toml
@@ -1,13 +1,9 @@
[project]
name = "data-collector"
version = "0.1.0"
-description = "Binance market data collector service"
+description = "Alpaca market data collector service"
requires-python = ">=3.12"
-dependencies = [
- "ccxt>=4.0",
- "websockets>=12.0",
- "trading-shared",
-]
+dependencies = ["trading-shared"]
[project.optional-dependencies]
dev = [
diff --git a/services/data-collector/src/data_collector/config.py b/services/data-collector/src/data_collector/config.py
index 1e080e5..4761013 100644
--- a/services/data-collector/src/data_collector/config.py
+++ b/services/data-collector/src/data_collector/config.py
@@ -1,6 +1,8 @@
+"""Data Collector configuration."""
from shared.config import Settings
class CollectorConfig(Settings):
- symbols: list[str] = ["BTC/USDT"]
- timeframes: list[str] = ["1m"]
+ symbols: list[str] = ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA"]
+ timeframes: list[str] = ["5Min"]
+ poll_interval_seconds: int = 60
diff --git a/services/data-collector/src/data_collector/main.py b/services/data-collector/src/data_collector/main.py
index eebe14a..38f8759 100644
--- a/services/data-collector/src/data_collector/main.py
+++ b/services/data-collector/src/data_collector/main.py
@@ -1,59 +1,73 @@
-"""Data Collector Service entry point."""
-
+"""Data Collector Service — fetches US stock data from Alpaca."""
import asyncio
+from shared.alpaca import AlpacaClient
from shared.broker import RedisBroker
+from shared.config import Settings
from shared.db import Database
+from shared.events import CandleEvent
from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
+from shared.models import Candle
from shared.notifier import TelegramNotifier
from data_collector.config import CollectorConfig
-from data_collector.storage import CandleStorage
-from data_collector.ws_factory import create_websocket
-
-# Health check port: base (HEALTH_PORT, default 8080) + offset
-# data-collector: +0 (8080), strategy-engine: +1 (8081)
-# order-executor: +2 (8082), portfolio-manager: +3 (8083)
+# Health check port: base + 0
HEALTH_PORT_OFFSET = 0
+async def fetch_latest_bars(
+ alpaca: AlpacaClient,
+ symbols: list[str],
+ timeframe: str,
+ log,
+) -> list[Candle]:
+ """Fetch latest bar for each symbol from Alpaca."""
+ candles = []
+ for symbol in symbols:
+ try:
+ bars = await alpaca.get_bars(symbol, timeframe=timeframe, limit=1)
+ if bars:
+ bar = bars[-1]
+ from datetime import datetime
+ from decimal import Decimal
+ candle = Candle(
+ symbol=symbol,
+ timeframe=timeframe,
+ open_time=datetime.fromisoformat(bar["t"].replace("Z", "+00:00")),
+ open=Decimal(str(bar["o"])),
+ high=Decimal(str(bar["h"])),
+ low=Decimal(str(bar["l"])),
+ close=Decimal(str(bar["c"])),
+ volume=Decimal(str(bar["v"])),
+ )
+ candles.append(candle)
+ except Exception as exc:
+ log.warning("fetch_bar_failed", symbol=symbol, error=str(exc))
+ return candles
+
+
async def run() -> None:
- """Initialise all components and start the WebSocket collector."""
config = CollectorConfig()
log = setup_logging("data-collector", config.log_level, config.log_format)
metrics = ServiceMetrics("data_collector")
+
notifier = TelegramNotifier(
- bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id
+ bot_token=config.telegram_bot_token,
+ chat_id=config.telegram_chat_id,
)
db = Database(config.database_url)
await db.connect()
- await db.init_tables()
broker = RedisBroker(config.redis_url)
- storage = CandleStorage(db=db, broker=broker)
-
- async def on_candle(candle):
- log.info(
- "candle_received",
- symbol=candle.symbol,
- timeframe=candle.timeframe,
- open_time=str(candle.open_time),
- )
- await storage.store(candle)
- metrics.events_processed.labels(service="data-collector", event_type="candle").inc()
-
- # Use the first configured timeframe for the WebSocket subscription.
- timeframe = config.timeframes[0] if config.timeframes else "1m"
-
- ws = create_websocket(
- exchange_id=config.exchange_id,
- symbols=config.symbols,
- timeframe=timeframe,
- on_candle=on_candle,
+
+ alpaca = AlpacaClient(
+ api_key=config.alpaca_api_key,
+ api_secret=config.alpaca_api_secret,
+ paper=config.alpaca_paper,
)
health = HealthCheckServer(
@@ -61,18 +75,38 @@ async def run() -> None:
port=config.health_port + HEALTH_PORT_OFFSET,
auth_token=config.metrics_auth_token,
)
- health.register_check("redis", broker.ping)
await health.start()
metrics.service_up.labels(service="data-collector").set(1)
- log.info(
- "service_started",
- symbols=config.symbols,
- timeframe=timeframe,
- )
+ poll_interval = int(getattr(config, "poll_interval_seconds", 60))
+ symbols = config.symbols
+ timeframe = config.timeframes[0] if config.timeframes else "1Day"
+
+ log.info("starting", symbols=symbols, timeframe=timeframe, poll_interval=poll_interval)
try:
- await ws.start()
+ while True:
+ # Check if market is open
+ try:
+ is_open = await alpaca.is_market_open()
+ except Exception:
+ is_open = False
+
+ if is_open:
+ candles = await fetch_latest_bars(alpaca, symbols, timeframe, log)
+ for candle in candles:
+ await db.insert_candle(candle)
+ event = CandleEvent(data=candle)
+ stream = f"candles.{candle.symbol}"
+ await broker.publish(stream, event.to_dict())
+ metrics.events_processed.labels(
+ service="data-collector", event_type="candle"
+ ).inc()
+ log.info("candle_stored", symbol=candle.symbol, close=str(candle.close))
+ else:
+ log.debug("market_closed")
+
+ await asyncio.sleep(poll_interval)
except Exception as exc:
log.error("fatal_error", error=str(exc))
await notifier.send_error(str(exc), "data-collector")
@@ -80,6 +114,7 @@ async def run() -> None:
finally:
metrics.service_up.labels(service="data-collector").set(0)
await notifier.close()
+ await alpaca.close()
await broker.close()
await db.close()
diff --git a/services/order-executor/pyproject.toml b/services/order-executor/pyproject.toml
index eed4fef..7bb1030 100644
--- a/services/order-executor/pyproject.toml
+++ b/services/order-executor/pyproject.toml
@@ -3,7 +3,7 @@ name = "order-executor"
version = "0.1.0"
description = "Order execution service with risk management"
requires-python = ">=3.12"
-dependencies = ["ccxt>=4.0", "trading-shared"]
+dependencies = ["trading-shared"]
[project.optional-dependencies]
dev = ["pytest>=8.0", "pytest-asyncio>=0.23"]
diff --git a/services/order-executor/src/order_executor/executor.py b/services/order-executor/src/order_executor/executor.py
index 80f441d..a71e762 100644
--- a/services/order-executor/src/order_executor/executor.py
+++ b/services/order-executor/src/order_executor/executor.py
@@ -37,12 +37,8 @@ class OrderExecutor:
async def execute(self, signal: Signal) -> Optional[Order]:
"""Run risk checks and place an order for the given signal."""
- # Fetch current balance from exchange
- balance_data = await self.exchange.fetch_balance()
- # Use USDT (or quote currency) free balance as available capital
- free_balances = balance_data.get("free", {})
- quote_currency = signal.symbol.split("/")[-1] if "/" in signal.symbol else "USDT"
- balance = Decimal(str(free_balances.get(quote_currency, 0)))
+ # Fetch buying power from Alpaca
+ balance = await self.exchange.get_buying_power()
# Fetch current positions
positions = {}
@@ -84,11 +80,11 @@ class OrderExecutor:
)
else:
try:
- await self.exchange.create_order(
+ await self.exchange.submit_order(
symbol=signal.symbol,
- type="market",
+ qty=float(signal.quantity),
side=signal.side.value.lower(),
- amount=float(signal.quantity),
+ type="market",
)
order.status = OrderStatus.FILLED
order.filled_at = datetime.now(timezone.utc)
diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py
index 68e14aa..3e098c3 100644
--- a/services/order-executor/src/order_executor/main.py
+++ b/services/order-executor/src/order_executor/main.py
@@ -1,13 +1,12 @@
"""Order Executor Service entry point."""
-
import asyncio
from decimal import Decimal
+from shared.alpaca import AlpacaClient
from shared.broker import RedisBroker
from shared.db import Database
from shared.events import Event, EventType
from shared.healthcheck import HealthCheckServer
-from shared.exchange import create_exchange
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
@@ -16,9 +15,7 @@ from order_executor.config import ExecutorConfig
from order_executor.executor import OrderExecutor
from order_executor.risk_manager import RiskManager
-# Health check port: base (HEALTH_PORT, default 8080) + offset
-# data-collector: +0 (8080), strategy-engine: +1 (8081)
-# order-executor: +2 (8082), portfolio-manager: +3 (8083)
+# Health check port: base + 2
HEALTH_PORT_OFFSET = 2
@@ -26,21 +23,21 @@ async def run() -> None:
config = ExecutorConfig()
log = setup_logging("order-executor", config.log_level, config.log_format)
metrics = ServiceMetrics("order_executor")
+
notifier = TelegramNotifier(
- bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id
+ bot_token=config.telegram_bot_token,
+ chat_id=config.telegram_chat_id,
)
db = Database(config.database_url)
await db.connect()
- await db.init_tables()
broker = RedisBroker(config.redis_url)
- exchange = create_exchange(
- exchange_id=config.exchange_id,
- api_key=config.binance_api_key,
- api_secret=config.binance_api_secret,
- sandbox=config.exchange_sandbox,
+ alpaca = AlpacaClient(
+ api_key=config.alpaca_api_key,
+ api_secret=config.alpaca_api_secret,
+ paper=config.alpaca_paper,
)
risk_manager = RiskManager(
@@ -63,7 +60,7 @@ async def run() -> None:
)
executor = OrderExecutor(
- exchange=exchange,
+ exchange=alpaca,
risk_manager=risk_manager,
broker=broker,
db=db,
@@ -71,41 +68,34 @@ async def run() -> None:
dry_run=config.dry_run,
)
- GROUP = "order-executor"
- CONSUMER = "executor-1"
- stream = "signals"
-
health = HealthCheckServer(
"order-executor",
port=config.health_port + HEALTH_PORT_OFFSET,
auth_token=config.metrics_auth_token,
)
- health.register_check("redis", broker.ping)
await health.start()
metrics.service_up.labels(service="order-executor").set(1)
- log.info("service_started", stream=stream, dry_run=config.dry_run)
+ GROUP = "order-executor"
+ CONSUMER = "executor-1"
+ stream = "signals"
await broker.ensure_group(stream, GROUP)
- # Process pending messages first (from previous crash)
- pending = await broker.read_pending(stream, GROUP, CONSUMER)
- for msg_id, msg in pending:
- try:
- event = Event.from_dict(msg)
- if event.type == EventType.SIGNAL:
- signal = event.data
- log.info(
- "processing_pending_signal", signal_id=str(signal.id), symbol=signal.symbol
- )
- await executor.execute(signal)
- metrics.events_processed.labels(service="order-executor", event_type="signal").inc()
- await broker.ack(stream, GROUP, msg_id)
- except Exception as exc:
- log.error("pending_process_failed", error=str(exc), msg_id=msg_id)
- metrics.errors_total.labels(service="order-executor", error_type="processing").inc()
+ log.info("started", stream=stream, dry_run=config.dry_run)
try:
+ # Process pending messages first
+ pending = await broker.read_pending(stream, GROUP, CONSUMER)
+ for msg_id, msg in pending:
+ try:
+ event = Event.from_dict(msg)
+ if event.type == EventType.SIGNAL:
+ await executor.execute(event.data)
+ await broker.ack(stream, GROUP, msg_id)
+ except Exception as exc:
+ log.error("pending_failed", error=str(exc), msg_id=msg_id)
+
while True:
messages = await broker.read_group(stream, GROUP, CONSUMER, count=10, block=5000)
for msg_id, msg in messages:
@@ -113,29 +103,23 @@ async def run() -> None:
event = Event.from_dict(msg)
if event.type == EventType.SIGNAL:
signal = event.data
- log.info(
- "processing_signal", signal_id=str(signal.id), symbol=signal.symbol
- )
+ log.info("processing_signal", signal_id=signal.id, symbol=signal.symbol)
await executor.execute(signal)
metrics.events_processed.labels(
service="order-executor", event_type="signal"
).inc()
await broker.ack(stream, GROUP, msg_id)
except Exception as exc:
- log.error("message_processing_failed", error=str(exc), msg_id=msg_id)
+ log.error("process_failed", error=str(exc))
metrics.errors_total.labels(
service="order-executor", error_type="processing"
).inc()
- except Exception as exc:
- log.error("fatal_error", error=str(exc))
- await notifier.send_error(str(exc), "order-executor")
- raise
finally:
metrics.service_up.labels(service="order-executor").set(0)
await notifier.close()
await broker.close()
await db.close()
- await exchange.close()
+ await alpaca.close()
def main() -> None:
diff --git a/services/order-executor/tests/test_executor.py b/services/order-executor/tests/test_executor.py
index e64b6c0..dd823d7 100644
--- a/services/order-executor/tests/test_executor.py
+++ b/services/order-executor/tests/test_executor.py
@@ -13,7 +13,7 @@ from order_executor.risk_manager import RiskCheckResult, RiskManager
def make_signal(side: OrderSide = OrderSide.BUY, price: str = "100", quantity: str = "1") -> Signal:
return Signal(
strategy="test",
- symbol="BTC/USDT",
+ symbol="AAPL",
side=side,
price=Decimal(price),
quantity=Decimal(quantity),
@@ -21,10 +21,10 @@ def make_signal(side: OrderSide = OrderSide.BUY, price: str = "100", quantity: s
)
-def make_mock_exchange(free_usdt: float = 10000.0) -> AsyncMock:
+def make_mock_exchange(buying_power: str = "10000") -> AsyncMock:
exchange = AsyncMock()
- exchange.fetch_balance.return_value = {"free": {"USDT": free_usdt}}
- exchange.create_order = AsyncMock(return_value={"id": "exchange-order-123"})
+ exchange.get_buying_power = AsyncMock(return_value=Decimal(buying_power))
+ exchange.submit_order = AsyncMock(return_value={"id": "alpaca-order-123"})
return exchange
@@ -48,7 +48,7 @@ def make_mock_db() -> AsyncMock:
@pytest.mark.asyncio
async def test_executor_places_order_when_risk_passes():
- """When risk check passes, create_order is called and order status is FILLED."""
+ """When risk check passes, submit_order is called and order status is FILLED."""
exchange = make_mock_exchange()
risk_manager = make_mock_risk_manager(allowed=True)
broker = make_mock_broker()
@@ -68,14 +68,14 @@ async def test_executor_places_order_when_risk_passes():
assert order is not None
assert order.status == OrderStatus.FILLED
- exchange.create_order.assert_called_once()
+ exchange.submit_order.assert_called_once()
db.insert_order.assert_called_once_with(order)
broker.publish.assert_called_once()
@pytest.mark.asyncio
async def test_executor_rejects_when_risk_fails():
- """When risk check fails, create_order is not called and None is returned."""
+ """When risk check fails, submit_order is not called and None is returned."""
exchange = make_mock_exchange()
risk_manager = make_mock_risk_manager(allowed=False, reason="Position size exceeded")
broker = make_mock_broker()
@@ -94,14 +94,14 @@ async def test_executor_rejects_when_risk_fails():
order = await executor.execute(signal)
assert order is None
- exchange.create_order.assert_not_called()
+ exchange.submit_order.assert_not_called()
db.insert_order.assert_not_called()
broker.publish.assert_not_called()
@pytest.mark.asyncio
async def test_executor_dry_run_does_not_call_exchange():
- """In dry-run mode, risk passes, order is FILLED, but exchange.create_order is NOT called."""
+ """In dry-run mode, risk passes, order is FILLED, but exchange.submit_order is NOT called."""
exchange = make_mock_exchange()
risk_manager = make_mock_risk_manager(allowed=True)
broker = make_mock_broker()
@@ -121,6 +121,6 @@ async def test_executor_dry_run_does_not_call_exchange():
assert order is not None
assert order.status == OrderStatus.FILLED
- exchange.create_order.assert_not_called()
+ exchange.submit_order.assert_not_called()
db.insert_order.assert_called_once_with(order)
broker.publish.assert_called_once()