summaryrefslogtreecommitdiff
path: root/services/strategy-engine/src/strategy_engine
diff options
context:
space:
mode:
authorTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-01 15:56:35 +0900
committerTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-01 15:56:35 +0900
commit33b14aaa2344b0fd95d1629627c3d135b24ae102 (patch)
tree90b214758bc3b076baa7711226a1a1be6268e72e /services/strategy-engine/src/strategy_engine
parent9360f1a800aa29b40399a2f3bfbfcf215a04e279 (diff)
feat: initial trading platform implementation
Binance spot crypto trading platform with microservices architecture: - shared: Pydantic models, Redis Streams broker, asyncpg DB layer - data-collector: Binance WebSocket/REST market data collection - strategy-engine: Plugin-based strategy execution (RSI, Grid) - order-executor: Order execution with risk management - portfolio-manager: Position tracking and PnL calculation - backtester: Historical strategy testing with simulator - cli: Click-based CLI for all operations - Docker Compose orchestration with Redis and PostgreSQL - 24 test files covering all modules
Diffstat (limited to 'services/strategy-engine/src/strategy_engine')
-rw-r--r--services/strategy-engine/src/strategy_engine/__init__.py0
-rw-r--r--services/strategy-engine/src/strategy_engine/config.py8
-rw-r--r--services/strategy-engine/src/strategy_engine/engine.py54
-rw-r--r--services/strategy-engine/src/strategy_engine/main.py56
-rw-r--r--services/strategy-engine/src/strategy_engine/plugin_loader.py36
5 files changed, 154 insertions, 0 deletions
diff --git a/services/strategy-engine/src/strategy_engine/__init__.py b/services/strategy-engine/src/strategy_engine/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/services/strategy-engine/src/strategy_engine/__init__.py
diff --git a/services/strategy-engine/src/strategy_engine/config.py b/services/strategy-engine/src/strategy_engine/config.py
new file mode 100644
index 0000000..2864b09
--- /dev/null
+++ b/services/strategy-engine/src/strategy_engine/config.py
@@ -0,0 +1,8 @@
+"""Strategy Engine configuration."""
+from shared.config import Settings
+
+
+class StrategyConfig(Settings):
+ symbols: list[str] = ["BTC/USDT"]
+ timeframes: list[str] = ["1m"]
+ strategy_params: dict = {}
diff --git a/services/strategy-engine/src/strategy_engine/engine.py b/services/strategy-engine/src/strategy_engine/engine.py
new file mode 100644
index 0000000..09dbf65
--- /dev/null
+++ b/services/strategy-engine/src/strategy_engine/engine.py
@@ -0,0 +1,54 @@
+"""Strategy Engine: consumes candle events and publishes signals."""
+import logging
+
+from shared.broker import RedisBroker
+from shared.events import CandleEvent, SignalEvent, Event
+
+from strategies.base import BaseStrategy
+
+logger = logging.getLogger(__name__)
+
+
+class StrategyEngine:
+ def __init__(self, broker: RedisBroker, strategies: list[BaseStrategy]) -> None:
+ self._broker = broker
+ self._strategies = strategies
+
+ async def process_once(self, stream: str, last_id: str) -> str:
+ """Read one batch of messages from the stream, process candles, publish signals.
+
+ Returns the updated last_id for the next call.
+ """
+ messages = await self._broker.read(stream, last_id=last_id, count=10, block=100)
+
+ for raw in messages:
+ try:
+ event = Event.from_dict(raw)
+ except Exception as exc:
+ logger.warning("Failed to parse event: %s – %s", raw, exc)
+ continue
+
+ if not isinstance(event, CandleEvent):
+ continue
+
+ candle = event.data
+ for strategy in self._strategies:
+ try:
+ signal = strategy.on_candle(candle)
+ except Exception as exc:
+ logger.error(
+ "Strategy %s raised on candle: %s", strategy.name, exc
+ )
+ continue
+
+ if signal is not None:
+ signal_event = SignalEvent(data=signal)
+ await self._broker.publish("signals", signal_event.to_dict())
+ logger.info(
+ "Signal published: strategy=%s symbol=%s side=%s",
+ signal.strategy,
+ signal.symbol,
+ signal.side,
+ )
+
+ return last_id
diff --git a/services/strategy-engine/src/strategy_engine/main.py b/services/strategy-engine/src/strategy_engine/main.py
new file mode 100644
index 0000000..83bb867
--- /dev/null
+++ b/services/strategy-engine/src/strategy_engine/main.py
@@ -0,0 +1,56 @@
+"""Strategy Engine Service entry point."""
+import asyncio
+import logging
+from pathlib import Path
+
+from shared.broker import RedisBroker
+
+from strategy_engine.config import StrategyConfig
+from strategy_engine.engine import StrategyEngine
+from strategy_engine.plugin_loader import load_strategies
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+# The strategies directory lives alongside the installed package
+STRATEGIES_DIR = Path(__file__).parent.parent.parent.parent / "strategies"
+
+
+async def run() -> None:
+ config = StrategyConfig()
+ broker = RedisBroker(config.redis_url)
+
+ strategies_dir = STRATEGIES_DIR
+ strategies = load_strategies(strategies_dir)
+
+ # Configure each strategy with params from config
+ for strategy in strategies:
+ params = config.strategy_params.get(strategy.name, {})
+ strategy.configure(params)
+
+ logger.info(
+ "Loaded %d strategies: %s",
+ len(strategies),
+ [s.name for s in strategies],
+ )
+
+ engine = StrategyEngine(broker=broker, strategies=strategies)
+
+ try:
+ for symbol in config.symbols:
+ stream = f"candles.{symbol.replace('/', '_')}"
+ last_id = "$"
+ logger.info("Starting engine loop for stream=%s", stream)
+
+ while True:
+ last_id = await engine.process_once(stream, last_id)
+ finally:
+ await broker.close()
+
+
+def main() -> None:
+ asyncio.run(run())
+
+
+if __name__ == "__main__":
+ main()
diff --git a/services/strategy-engine/src/strategy_engine/plugin_loader.py b/services/strategy-engine/src/strategy_engine/plugin_loader.py
new file mode 100644
index 0000000..719dc6d
--- /dev/null
+++ b/services/strategy-engine/src/strategy_engine/plugin_loader.py
@@ -0,0 +1,36 @@
+"""Dynamic plugin loader for strategy modules."""
+import importlib.util
+import sys
+from pathlib import Path
+
+from strategies.base import BaseStrategy
+
+
+def load_strategies(strategies_dir: Path) -> list[BaseStrategy]:
+ """Scan strategies_dir for *.py files and load all BaseStrategy subclasses."""
+ loaded: list[BaseStrategy] = []
+
+ for path in sorted(strategies_dir.glob("*.py")):
+ # Skip dunder files and base
+ if path.name.startswith("__") or path.name == "base.py":
+ continue
+
+ module_name = f"_strategy_plugin_{path.stem}"
+ spec = importlib.util.spec_from_file_location(module_name, path)
+ if spec is None or spec.loader is None:
+ continue
+
+ module = importlib.util.module_from_spec(spec)
+ sys.modules[module_name] = module
+ spec.loader.exec_module(module)
+
+ for attr_name in dir(module):
+ obj = getattr(module, attr_name)
+ if (
+ isinstance(obj, type)
+ and issubclass(obj, BaseStrategy)
+ and obj is not BaseStrategy
+ ):
+ loaded.append(obj())
+
+ return loaded