From 33b14aaa2344b0fd95d1629627c3d135b24ae102 Mon Sep 17 00:00:00 2001 From: TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> Date: Wed, 1 Apr 2026 15:56:35 +0900 Subject: feat: initial trading platform implementation Binance spot crypto trading platform with microservices architecture: - shared: Pydantic models, Redis Streams broker, asyncpg DB layer - data-collector: Binance WebSocket/REST market data collection - strategy-engine: Plugin-based strategy execution (RSI, Grid) - order-executor: Order execution with risk management - portfolio-manager: Position tracking and PnL calculation - backtester: Historical strategy testing with simulator - cli: Click-based CLI for all operations - Docker Compose orchestration with Redis and PostgreSQL - 24 test files covering all modules --- .../src/strategy_engine/__init__.py | 0 .../strategy-engine/src/strategy_engine/config.py | 8 ++++ .../strategy-engine/src/strategy_engine/engine.py | 54 +++++++++++++++++++++ .../strategy-engine/src/strategy_engine/main.py | 56 ++++++++++++++++++++++ .../src/strategy_engine/plugin_loader.py | 36 ++++++++++++++ 5 files changed, 154 insertions(+) create mode 100644 services/strategy-engine/src/strategy_engine/__init__.py create mode 100644 services/strategy-engine/src/strategy_engine/config.py create mode 100644 services/strategy-engine/src/strategy_engine/engine.py create mode 100644 services/strategy-engine/src/strategy_engine/main.py create mode 100644 services/strategy-engine/src/strategy_engine/plugin_loader.py (limited to 'services/strategy-engine/src/strategy_engine') diff --git a/services/strategy-engine/src/strategy_engine/__init__.py b/services/strategy-engine/src/strategy_engine/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/strategy-engine/src/strategy_engine/config.py b/services/strategy-engine/src/strategy_engine/config.py new file mode 100644 index 0000000..2864b09 --- /dev/null +++ b/services/strategy-engine/src/strategy_engine/config.py @@ -0,0 +1,8 @@ +"""Strategy Engine configuration.""" +from shared.config import Settings + + +class StrategyConfig(Settings): + symbols: list[str] = ["BTC/USDT"] + timeframes: list[str] = ["1m"] + strategy_params: dict = {} diff --git a/services/strategy-engine/src/strategy_engine/engine.py b/services/strategy-engine/src/strategy_engine/engine.py new file mode 100644 index 0000000..09dbf65 --- /dev/null +++ b/services/strategy-engine/src/strategy_engine/engine.py @@ -0,0 +1,54 @@ +"""Strategy Engine: consumes candle events and publishes signals.""" +import logging + +from shared.broker import RedisBroker +from shared.events import CandleEvent, SignalEvent, Event + +from strategies.base import BaseStrategy + +logger = logging.getLogger(__name__) + + +class StrategyEngine: + def __init__(self, broker: RedisBroker, strategies: list[BaseStrategy]) -> None: + self._broker = broker + self._strategies = strategies + + async def process_once(self, stream: str, last_id: str) -> str: + """Read one batch of messages from the stream, process candles, publish signals. + + Returns the updated last_id for the next call. + """ + messages = await self._broker.read(stream, last_id=last_id, count=10, block=100) + + for raw in messages: + try: + event = Event.from_dict(raw) + except Exception as exc: + logger.warning("Failed to parse event: %s – %s", raw, exc) + continue + + if not isinstance(event, CandleEvent): + continue + + candle = event.data + for strategy in self._strategies: + try: + signal = strategy.on_candle(candle) + except Exception as exc: + logger.error( + "Strategy %s raised on candle: %s", strategy.name, exc + ) + continue + + if signal is not None: + signal_event = SignalEvent(data=signal) + await self._broker.publish("signals", signal_event.to_dict()) + logger.info( + "Signal published: strategy=%s symbol=%s side=%s", + signal.strategy, + signal.symbol, + signal.side, + ) + + return last_id diff --git a/services/strategy-engine/src/strategy_engine/main.py b/services/strategy-engine/src/strategy_engine/main.py new file mode 100644 index 0000000..83bb867 --- /dev/null +++ b/services/strategy-engine/src/strategy_engine/main.py @@ -0,0 +1,56 @@ +"""Strategy Engine Service entry point.""" +import asyncio +import logging +from pathlib import Path + +from shared.broker import RedisBroker + +from strategy_engine.config import StrategyConfig +from strategy_engine.engine import StrategyEngine +from strategy_engine.plugin_loader import load_strategies + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# The strategies directory lives alongside the installed package +STRATEGIES_DIR = Path(__file__).parent.parent.parent.parent / "strategies" + + +async def run() -> None: + config = StrategyConfig() + broker = RedisBroker(config.redis_url) + + strategies_dir = STRATEGIES_DIR + strategies = load_strategies(strategies_dir) + + # Configure each strategy with params from config + for strategy in strategies: + params = config.strategy_params.get(strategy.name, {}) + strategy.configure(params) + + logger.info( + "Loaded %d strategies: %s", + len(strategies), + [s.name for s in strategies], + ) + + engine = StrategyEngine(broker=broker, strategies=strategies) + + try: + for symbol in config.symbols: + stream = f"candles.{symbol.replace('/', '_')}" + last_id = "$" + logger.info("Starting engine loop for stream=%s", stream) + + while True: + last_id = await engine.process_once(stream, last_id) + finally: + await broker.close() + + +def main() -> None: + asyncio.run(run()) + + +if __name__ == "__main__": + main() diff --git a/services/strategy-engine/src/strategy_engine/plugin_loader.py b/services/strategy-engine/src/strategy_engine/plugin_loader.py new file mode 100644 index 0000000..719dc6d --- /dev/null +++ b/services/strategy-engine/src/strategy_engine/plugin_loader.py @@ -0,0 +1,36 @@ +"""Dynamic plugin loader for strategy modules.""" +import importlib.util +import sys +from pathlib import Path + +from strategies.base import BaseStrategy + + +def load_strategies(strategies_dir: Path) -> list[BaseStrategy]: + """Scan strategies_dir for *.py files and load all BaseStrategy subclasses.""" + loaded: list[BaseStrategy] = [] + + for path in sorted(strategies_dir.glob("*.py")): + # Skip dunder files and base + if path.name.startswith("__") or path.name == "base.py": + continue + + module_name = f"_strategy_plugin_{path.stem}" + spec = importlib.util.spec_from_file_location(module_name, path) + if spec is None or spec.loader is None: + continue + + module = importlib.util.module_from_spec(spec) + sys.modules[module_name] = module + spec.loader.exec_module(module) + + for attr_name in dir(module): + obj = getattr(module, attr_name) + if ( + isinstance(obj, type) + and issubclass(obj, BaseStrategy) + and obj is not BaseStrategy + ): + loaded.append(obj()) + + return loaded -- cgit v1.2.3