summaryrefslogtreecommitdiff
path: root/shared/src
diff options
context:
space:
mode:
Diffstat (limited to 'shared/src')
-rw-r--r--shared/src/shared/config.py7
-rw-r--r--shared/src/shared/notifier.py134
2 files changed, 141 insertions, 0 deletions
diff --git a/shared/src/shared/config.py b/shared/src/shared/config.py
index 1304c5e..511654f 100644
--- a/shared/src/shared/config.py
+++ b/shared/src/shared/config.py
@@ -12,5 +12,12 @@ class Settings(BaseSettings):
risk_stop_loss_pct: float = 5.0
risk_daily_loss_limit_pct: float = 10.0
dry_run: bool = True
+ telegram_bot_token: str = ""
+ telegram_chat_id: str = ""
+ telegram_enabled: bool = False
+ log_format: str = "json"
+ health_port: int = 8080
+ circuit_breaker_threshold: int = 5
+ circuit_breaker_timeout: int = 60
model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}
diff --git a/shared/src/shared/notifier.py b/shared/src/shared/notifier.py
new file mode 100644
index 0000000..de86f87
--- /dev/null
+++ b/shared/src/shared/notifier.py
@@ -0,0 +1,134 @@
+"""Telegram notification service for the trading platform."""
+import asyncio
+import logging
+from decimal import Decimal
+from typing import Optional, Sequence
+
+import aiohttp
+
+from shared.models import Signal, Order, Position
+
+logger = logging.getLogger(__name__)
+
+TELEGRAM_API_URL = "https://api.telegram.org/bot{token}/sendMessage"
+MAX_RETRIES = 3
+
+
+class TelegramNotifier:
+ """Sends notifications via Telegram Bot API."""
+
+ def __init__(self, bot_token: str, chat_id: str) -> None:
+ self._bot_token = bot_token
+ self._chat_id = chat_id
+ self._semaphore = asyncio.Semaphore(1)
+ self._session: Optional[aiohttp.ClientSession] = None
+
+ @property
+ def enabled(self) -> bool:
+ """Return True if a bot token has been configured."""
+ return bool(self._bot_token)
+
+ def _get_session(self) -> aiohttp.ClientSession:
+ if self._session is None or self._session.closed:
+ self._session = aiohttp.ClientSession()
+ return self._session
+
+ async def send(self, message: str, parse_mode: str = "HTML") -> None:
+ """Send a message via Telegram with retries and rate limiting.
+
+ Does nothing if the notifier is not enabled.
+ """
+ if not self.enabled:
+ return
+
+ url = TELEGRAM_API_URL.format(token=self._bot_token)
+ payload = {
+ "chat_id": self._chat_id,
+ "text": message,
+ "parse_mode": parse_mode,
+ }
+
+ session = self._session or self._get_session()
+ async with self._semaphore:
+ for attempt in range(1, MAX_RETRIES + 1):
+ try:
+ async with session.post(url, json=payload) as resp:
+ if resp.status == 200:
+ return
+ body = await resp.json()
+ logger.warning(
+ "Telegram API error (attempt %d/%d): %s",
+ attempt,
+ MAX_RETRIES,
+ body,
+ )
+ except Exception:
+ logger.exception(
+ "Telegram send failed (attempt %d/%d)", attempt, MAX_RETRIES
+ )
+ if attempt < MAX_RETRIES:
+ await asyncio.sleep(attempt)
+
+ async def send_signal(self, signal: Signal) -> None:
+ """Format and send a trading signal notification."""
+ msg = (
+ "<b>📊 Signal</b>\n"
+ f"Side: <b>{signal.side.value}</b>\n"
+ f"Strategy: {signal.strategy}\n"
+ f"Symbol: {signal.symbol}\n"
+ f"Price: {signal.price}\n"
+ f"Quantity: {signal.quantity}\n"
+ f"Reason: {signal.reason}"
+ )
+ await self.send(msg)
+
+ async def send_order(self, order: Order) -> None:
+ """Format and send an order notification."""
+ msg = (
+ "<b>📝 Order</b>\n"
+ f"Status: <b>{order.status.value}</b>\n"
+ f"Symbol: {order.symbol}\n"
+ f"Side: {order.side.value}\n"
+ f"Price: {order.price}\n"
+ f"Quantity: {order.quantity}"
+ )
+ await self.send(msg)
+
+ async def send_error(self, error: str, service: str) -> None:
+ """Format and send an error alert."""
+ msg = (
+ "<b>🚨 Error Alert</b>\n"
+ f"Service: <b>{service}</b>\n"
+ f"Error: {error}"
+ )
+ await self.send(msg)
+
+ async def send_daily_summary(
+ self,
+ positions: Sequence[Position],
+ total_value: Decimal,
+ daily_pnl: Decimal,
+ ) -> None:
+ """Format and send a daily portfolio summary."""
+ lines = [
+ "<b>📈 Daily Summary</b>",
+ f"Total Value: {total_value}",
+ f"Daily P&amp;L: {daily_pnl}",
+ "",
+ "<b>Positions:</b>",
+ ]
+ for pos in positions:
+ lines.append(
+ f" {pos.symbol}: qty={pos.quantity} "
+ f"entry={pos.avg_entry_price} "
+ f"current={pos.current_price} "
+ f"pnl={pos.unrealized_pnl}"
+ )
+ if not positions:
+ lines.append(" No open positions")
+ await self.send("\n".join(lines))
+
+ async def close(self) -> None:
+ """Close the underlying aiohttp session."""
+ if self._session is not None:
+ await self._session.close()