diff options
Diffstat (limited to 'shared/src')
| -rw-r--r-- | shared/src/shared/config.py | 7 | ||||
| -rw-r--r-- | shared/src/shared/notifier.py | 134 |
2 files changed, 141 insertions, 0 deletions
diff --git a/shared/src/shared/config.py b/shared/src/shared/config.py index 1304c5e..511654f 100644 --- a/shared/src/shared/config.py +++ b/shared/src/shared/config.py @@ -12,5 +12,12 @@ class Settings(BaseSettings): risk_stop_loss_pct: float = 5.0 risk_daily_loss_limit_pct: float = 10.0 dry_run: bool = True + telegram_bot_token: str = "" + telegram_chat_id: str = "" + telegram_enabled: bool = False + log_format: str = "json" + health_port: int = 8080 + circuit_breaker_threshold: int = 5 + circuit_breaker_timeout: int = 60 model_config = {"env_file": ".env", "env_file_encoding": "utf-8"} diff --git a/shared/src/shared/notifier.py b/shared/src/shared/notifier.py new file mode 100644 index 0000000..de86f87 --- /dev/null +++ b/shared/src/shared/notifier.py @@ -0,0 +1,134 @@ +"""Telegram notification service for the trading platform.""" +import asyncio +import logging +from decimal import Decimal +from typing import Optional, Sequence + +import aiohttp + +from shared.models import Signal, Order, Position + +logger = logging.getLogger(__name__) + +TELEGRAM_API_URL = "https://api.telegram.org/bot{token}/sendMessage" +MAX_RETRIES = 3 + + +class TelegramNotifier: + """Sends notifications via Telegram Bot API.""" + + def __init__(self, bot_token: str, chat_id: str) -> None: + self._bot_token = bot_token + self._chat_id = chat_id + self._semaphore = asyncio.Semaphore(1) + self._session: Optional[aiohttp.ClientSession] = None + + @property + def enabled(self) -> bool: + """Return True if a bot token has been configured.""" + return bool(self._bot_token) + + def _get_session(self) -> aiohttp.ClientSession: + if self._session is None or self._session.closed: + self._session = aiohttp.ClientSession() + return self._session + + async def send(self, message: str, parse_mode: str = "HTML") -> None: + """Send a message via Telegram with retries and rate limiting. + + Does nothing if the notifier is not enabled. + """ + if not self.enabled: + return + + url = TELEGRAM_API_URL.format(token=self._bot_token) + payload = { + "chat_id": self._chat_id, + "text": message, + "parse_mode": parse_mode, + } + + session = self._session or self._get_session() + async with self._semaphore: + for attempt in range(1, MAX_RETRIES + 1): + try: + async with session.post(url, json=payload) as resp: + if resp.status == 200: + return + body = await resp.json() + logger.warning( + "Telegram API error (attempt %d/%d): %s", + attempt, + MAX_RETRIES, + body, + ) + except Exception: + logger.exception( + "Telegram send failed (attempt %d/%d)", attempt, MAX_RETRIES + ) + if attempt < MAX_RETRIES: + await asyncio.sleep(attempt) + + async def send_signal(self, signal: Signal) -> None: + """Format and send a trading signal notification.""" + msg = ( + "<b>📊 Signal</b>\n" + f"Side: <b>{signal.side.value}</b>\n" + f"Strategy: {signal.strategy}\n" + f"Symbol: {signal.symbol}\n" + f"Price: {signal.price}\n" + f"Quantity: {signal.quantity}\n" + f"Reason: {signal.reason}" + ) + await self.send(msg) + + async def send_order(self, order: Order) -> None: + """Format and send an order notification.""" + msg = ( + "<b>📝 Order</b>\n" + f"Status: <b>{order.status.value}</b>\n" + f"Symbol: {order.symbol}\n" + f"Side: {order.side.value}\n" + f"Price: {order.price}\n" + f"Quantity: {order.quantity}" + ) + await self.send(msg) + + async def send_error(self, error: str, service: str) -> None: + """Format and send an error alert.""" + msg = ( + "<b>🚨 Error Alert</b>\n" + f"Service: <b>{service}</b>\n" + f"Error: {error}" + ) + await self.send(msg) + + async def send_daily_summary( + self, + positions: Sequence[Position], + total_value: Decimal, + daily_pnl: Decimal, + ) -> None: + """Format and send a daily portfolio summary.""" + lines = [ + "<b>📈 Daily Summary</b>", + f"Total Value: {total_value}", + f"Daily P&L: {daily_pnl}", + "", + "<b>Positions:</b>", + ] + for pos in positions: + lines.append( + f" {pos.symbol}: qty={pos.quantity} " + f"entry={pos.avg_entry_price} " + f"current={pos.current_price} " + f"pnl={pos.unrealized_pnl}" + ) + if not positions: + lines.append(" No open positions") + await self.send("\n".join(lines)) + + async def close(self) -> None: + """Close the underlying aiohttp session.""" + if self._session is not None: + await self._session.close() |
