summaryrefslogtreecommitdiff
path: root/services/strategy-engine/src/strategy_engine/main.py
diff options
context:
space:
mode:
Diffstat (limited to 'services/strategy-engine/src/strategy_engine/main.py')
-rw-r--r--services/strategy-engine/src/strategy_engine/main.py85
1 files changed, 80 insertions, 5 deletions
diff --git a/services/strategy-engine/src/strategy_engine/main.py b/services/strategy-engine/src/strategy_engine/main.py
index 30de528..3d73058 100644
--- a/services/strategy-engine/src/strategy_engine/main.py
+++ b/services/strategy-engine/src/strategy_engine/main.py
@@ -1,17 +1,25 @@
"""Strategy Engine Service entry point."""
import asyncio
+import zoneinfo
+from datetime import datetime
from pathlib import Path
+import aiohttp
+
+from shared.alpaca import AlpacaClient
from shared.broker import RedisBroker
+from shared.db import Database
from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
-
+from shared.sentiment_models import MarketSentiment
+from shared.shutdown import GracefulShutdown
from strategy_engine.config import StrategyConfig
from strategy_engine.engine import StrategyEngine
from strategy_engine.plugin_loader import load_strategies
+from strategy_engine.stock_selector import StockSelector
# The strategies directory lives alongside the installed package
STRATEGIES_DIR = Path(__file__).parent.parent.parent.parent / "strategies"
@@ -30,23 +38,74 @@ async def process_symbol(engine: StrategyEngine, stream: str, log) -> None:
last_id = await engine.process_once(stream, last_id)
+async def run_stock_selector(
+ selector: StockSelector,
+ notifier: TelegramNotifier,
+ db: Database,
+ config: StrategyConfig,
+ log,
+) -> None:
+ """Run the stock selector once per day at the configured time."""
+ et = zoneinfo.ZoneInfo("America/New_York")
+
+ while True:
+ now_et = datetime.now(et)
+ target_hour, target_min = map(int, config.selector_final_time.split(":"))
+
+ if now_et.hour == target_hour and now_et.minute == target_min:
+ log.info("stock_selector_running")
+ try:
+ selections = await selector.select()
+ if selections:
+ ms_data = await db.get_latest_market_sentiment()
+ ms = None
+ if ms_data:
+ ms = MarketSentiment(**ms_data)
+ await notifier.send_stock_selection(selections, ms)
+ log.info("stock_selector_complete", picks=[s.symbol for s in selections])
+ else:
+ log.info("stock_selector_no_picks")
+ except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc:
+ log.warning("stock_selector_network_error", error=str(exc))
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("stock_selector_data_error", error=str(exc))
+ except Exception as exc:
+ log.error("stock_selector_error", error=str(exc), exc_info=True)
+ await asyncio.sleep(120) # Sleep past this minute
+ else:
+ await asyncio.sleep(30)
+
+
async def run() -> None:
config = StrategyConfig()
log = setup_logging("strategy-engine", config.log_level, config.log_format)
metrics = ServiceMetrics("strategy_engine")
notifier = TelegramNotifier(
- bot_token=config.telegram_bot_token,
+ bot_token=config.telegram_bot_token.get_secret_value(),
chat_id=config.telegram_chat_id,
)
- broker = RedisBroker(config.redis_url)
+ broker = RedisBroker(config.redis_url.get_secret_value())
+
+ db = Database(config.database_url.get_secret_value())
+ await db.connect()
+
+ alpaca = AlpacaClient(
+ api_key=config.alpaca_api_key.get_secret_value(),
+ api_secret=config.alpaca_api_secret.get_secret_value(),
+ paper=config.alpaca_paper,
+ )
+
strategies = load_strategies(STRATEGIES_DIR)
for strategy in strategies:
params = config.strategy_params.get(strategy.name, {})
strategy.configure(params)
+ shutdown = GracefulShutdown()
+ shutdown.install_handlers()
+
log.info("loaded_strategies", count=len(strategies), names=[s.name for s in strategies])
engine = StrategyEngine(broker=broker, strategies=strategies)
@@ -67,9 +126,23 @@ async def run() -> None:
task = asyncio.create_task(process_symbol(engine, stream, log))
tasks.append(task)
- await asyncio.gather(*tasks)
+ if config.anthropic_api_key.get_secret_value():
+ selector = StockSelector(
+ db=db,
+ broker=broker,
+ alpaca=alpaca,
+ anthropic_api_key=config.anthropic_api_key.get_secret_value(),
+ anthropic_model=config.anthropic_model,
+ max_picks=config.selector_max_picks,
+ )
+ tasks.append(
+ asyncio.create_task(run_stock_selector(selector, notifier, db, config, log))
+ )
+ log.info("stock_selector_enabled", time=config.selector_final_time)
+
+ await shutdown.wait()
except Exception as exc:
- log.error("fatal_error", error=str(exc))
+ log.error("fatal_error", error=str(exc), exc_info=True)
await notifier.send_error(str(exc), "strategy-engine")
raise
finally:
@@ -78,6 +151,8 @@ async def run() -> None:
metrics.service_up.labels(service="strategy-engine").set(0)
await notifier.close()
await broker.close()
+ await alpaca.close()
+ await db.close()
def main() -> None: