diff options
Diffstat (limited to 'shared')
| -rw-r--r-- | shared/src/shared/config.py | 2 | ||||
| -rw-r--r-- | shared/src/shared/sentiment.py | 219 | ||||
| -rw-r--r-- | shared/tests/test_sentiment.py | 144 |
3 files changed, 365 insertions, 0 deletions
diff --git a/shared/src/shared/config.py b/shared/src/shared/config.py index 6023755..def70b9 100644 --- a/shared/src/shared/config.py +++ b/shared/src/shared/config.py @@ -36,5 +36,7 @@ class Settings(BaseSettings): circuit_breaker_threshold: int = 5 circuit_breaker_timeout: int = 60 metrics_auth_token: str = "" # If set, /health and /metrics require Bearer token + cryptopanic_api_key: str = "" # Free key from cryptopanic.com + cryptoquant_api_key: str = "" # Free key from cryptoquant.com model_config = {"env_file": ".env", "env_file_encoding": "utf-8"} diff --git a/shared/src/shared/sentiment.py b/shared/src/shared/sentiment.py new file mode 100644 index 0000000..bc62efe --- /dev/null +++ b/shared/src/shared/sentiment.py @@ -0,0 +1,219 @@ +"""Market sentiment data from free APIs. + +Supports: +- Fear & Greed Index (alternative.me) — no API key needed +- CryptoPanic news sentiment — free API key from cryptopanic.com +- CryptoQuant exchange netflow — free API key from cryptoquant.com + +All providers are optional. If API key is missing, the provider is disabled. +""" + +import logging +from dataclasses import dataclass, field +from datetime import datetime, timezone + +import aiohttp + +logger = logging.getLogger(__name__) + + +@dataclass +class SentimentData: + """Aggregated sentiment snapshot.""" + + fear_greed_value: int | None = None # 0-100 + fear_greed_label: str | None = ( + None # "Extreme Fear", "Fear", "Neutral", "Greed", "Extreme Greed" + ) + news_sentiment: float | None = None # -1.0 (bearish) to 1.0 (bullish) + news_count: int = 0 + exchange_netflow: float | None = ( + None # Positive = inflow (bearish), Negative = outflow (bullish) + ) + timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + + @property + def should_buy(self) -> bool: + """Simple aggregated buy signal from sentiment.""" + score = 0 + checks = 0 + + if self.fear_greed_value is not None: + checks += 1 + if self.fear_greed_value < 30: + score += 1 # Fear = buy opportunity + elif self.fear_greed_value > 70: + score -= 1 # Greed = avoid buying + + if self.news_sentiment is not None: + checks += 1 + if self.news_sentiment > 0.1: + score += 1 # Positive news + elif self.news_sentiment < -0.3: + score -= 1 # Very negative news = avoid + + if self.exchange_netflow is not None: + checks += 1 + if self.exchange_netflow < 0: + score += 1 # Outflow = bullish (coins leaving exchanges) + elif self.exchange_netflow > 0: + score -= 1 # Inflow = bearish (coins entering exchanges to sell) + + if checks == 0: + return True # No data, allow by default + + return score >= 0 # Net neutral or positive = allow + + @property + def should_block(self) -> bool: + """Strong bearish signal — block all buying.""" + # Block on extreme greed + if self.fear_greed_value is not None and self.fear_greed_value > 80: + return True + # Block on very negative news + if self.news_sentiment is not None and self.news_sentiment < -0.5: + return True + return False + + +class SentimentProvider: + """Fetches sentiment data from multiple free APIs.""" + + def __init__( + self, + cryptopanic_api_key: str = "", + cryptoquant_api_key: str = "", + ) -> None: + self._cryptopanic_key = cryptopanic_api_key + self._cryptoquant_key = cryptoquant_api_key + self._session: aiohttp.ClientSession | None = None + self._cached: SentimentData | None = None + self._cache_ttl: int = 300 # 5 minutes cache + + async def _ensure_session(self) -> aiohttp.ClientSession: + if self._session is None or self._session.closed: + self._session = aiohttp.ClientSession() + return self._session + + async def fetch_fear_greed(self) -> tuple[int | None, str | None]: + """Fetch Fear & Greed Index from alternative.me (no key needed).""" + try: + session = await self._ensure_session() + url = "https://api.alternative.me/fng/?limit=1" + async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp: + if resp.status != 200: + logger.warning("Fear & Greed API returned %d", resp.status) + return None, None + data = await resp.json() + entry = data.get("data", [{}])[0] + value = int(entry.get("value", 0)) + label = entry.get("value_classification", "") + return value, label + except Exception as exc: + logger.warning("Fear & Greed fetch failed: %s", exc) + return None, None + + async def fetch_news_sentiment(self, currency: str = "SOL") -> tuple[float | None, int]: + """Fetch news sentiment from CryptoPanic. + + Returns (sentiment_score, news_count). + Sentiment score: -1.0 (all bearish) to 1.0 (all bullish). + """ + if not self._cryptopanic_key: + return None, 0 + + try: + session = await self._ensure_session() + url = ( + f"https://cryptopanic.com/api/v1/posts/" + f"?auth_token={self._cryptopanic_key}" + f"¤cies={currency}" + f"&kind=news" + f"&filter=hot" + f"&limit=10" + ) + async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp: + if resp.status != 200: + logger.warning("CryptoPanic API returned %d", resp.status) + return None, 0 + data = await resp.json() + results = data.get("results", []) + if not results: + return None, 0 + + # CryptoPanic provides votes: positive, negative, important, etc. + total_positive = 0 + total_negative = 0 + count = 0 + for post in results: + votes = post.get("votes", {}) + pos = votes.get("positive", 0) + neg = votes.get("negative", 0) + total_positive += pos + total_negative += neg + count += 1 + + total_votes = total_positive + total_negative + if total_votes == 0: + return 0.0, count + + sentiment = (total_positive - total_negative) / total_votes + return sentiment, count + except Exception as exc: + logger.warning("CryptoPanic fetch failed: %s", exc) + return None, 0 + + async def fetch_exchange_netflow(self, symbol: str = "sol") -> float | None: + """Fetch exchange netflow from CryptoQuant. + + Returns netflow value. Positive = inflow (bearish), Negative = outflow (bullish). + """ + if not self._cryptoquant_key: + return None + + try: + session = await self._ensure_session() + url = ( + f"https://api.cryptoquant.com/v1/{symbol}/exchange-flows/netflow?window=day&limit=1" + ) + headers = {"Authorization": f"Bearer {self._cryptoquant_key}"} + async with session.get( + url, headers=headers, timeout=aiohttp.ClientTimeout(total=10) + ) as resp: + if resp.status != 200: + logger.warning("CryptoQuant API returned %d", resp.status) + return None + data = await resp.json() + result = data.get("result", {}).get("data", []) + if result: + return float(result[0].get("netflow", 0)) + return None + except Exception as exc: + logger.warning("CryptoQuant fetch failed: %s", exc) + return None + + async def get_sentiment(self, currency: str = "SOL") -> SentimentData: + """Fetch all sentiment data and return aggregated result.""" + fg_value, fg_label = await self.fetch_fear_greed() + news_score, news_count = await self.fetch_news_sentiment(currency) + netflow = await self.fetch_exchange_netflow(currency.lower()) + + sentiment = SentimentData( + fear_greed_value=fg_value, + fear_greed_label=fg_label, + news_sentiment=news_score, + news_count=news_count, + exchange_netflow=netflow, + ) + + self._cached = sentiment + return sentiment + + @property + def cached(self) -> SentimentData | None: + """Return last fetched sentiment data.""" + return self._cached + + async def close(self) -> None: + if self._session and not self._session.closed: + await self._session.close() diff --git a/shared/tests/test_sentiment.py b/shared/tests/test_sentiment.py new file mode 100644 index 0000000..2caa266 --- /dev/null +++ b/shared/tests/test_sentiment.py @@ -0,0 +1,144 @@ +"""Tests for market sentiment module.""" + +import pytest +from unittest.mock import AsyncMock, MagicMock +from shared.sentiment import SentimentData, SentimentProvider + + +# --- SentimentData tests --- + + +def test_sentiment_should_buy_on_fear(): + s = SentimentData(fear_greed_value=15) # Extreme fear + assert s.should_buy is True + + +def test_sentiment_should_not_buy_on_greed(): + s = SentimentData(fear_greed_value=75) # Greed + assert s.should_buy is False + + +def test_sentiment_should_block_extreme_greed(): + s = SentimentData(fear_greed_value=85) + assert s.should_block is True + + +def test_sentiment_should_block_very_negative_news(): + s = SentimentData(news_sentiment=-0.6) + assert s.should_block is True + + +def test_sentiment_no_block_on_neutral(): + s = SentimentData(fear_greed_value=50, news_sentiment=0.0) + assert s.should_block is False + + +def test_sentiment_should_buy_default_no_data(): + s = SentimentData() + assert s.should_buy is True + assert s.should_block is False + + +def test_sentiment_positive_news_allows_buy(): + s = SentimentData(fear_greed_value=50, news_sentiment=0.3) + assert s.should_buy is True + + +def test_sentiment_outflow_bullish(): + s = SentimentData(exchange_netflow=-100.0) # Outflow = bullish + assert s.should_buy is True + + +def test_sentiment_inflow_bearish(): + s = SentimentData(fear_greed_value=50, exchange_netflow=100.0) # Inflow = bearish + assert s.should_buy is False + + +# --- SentimentProvider tests --- + + +@pytest.mark.asyncio +async def test_provider_fetch_fear_greed(): + provider = SentimentProvider() + + mock_response = AsyncMock() + mock_response.status = 200 + mock_response.json = AsyncMock( + return_value={"data": [{"value": "25", "value_classification": "Extreme Fear"}]} + ) + mock_response.__aenter__ = AsyncMock(return_value=mock_response) + mock_response.__aexit__ = AsyncMock(return_value=False) + + mock_session = MagicMock() + mock_session.closed = False + mock_session.get = MagicMock(return_value=mock_response) + mock_session.close = AsyncMock() + provider._session = mock_session + + value, label = await provider.fetch_fear_greed() + assert value == 25 + assert label == "Extreme Fear" + + await provider.close() + + +@pytest.mark.asyncio +async def test_provider_fetch_fear_greed_failure(): + provider = SentimentProvider() + + mock_response = AsyncMock() + mock_response.status = 500 + mock_response.__aenter__ = AsyncMock(return_value=mock_response) + mock_response.__aexit__ = AsyncMock(return_value=False) + + mock_session = MagicMock() + mock_session.closed = False + mock_session.get = MagicMock(return_value=mock_response) + mock_session.close = AsyncMock() + provider._session = mock_session + + value, label = await provider.fetch_fear_greed() + assert value is None + + await provider.close() + + +@pytest.mark.asyncio +async def test_provider_news_disabled_without_key(): + provider = SentimentProvider(cryptopanic_api_key="") + score, count = await provider.fetch_news_sentiment() + assert score is None + assert count == 0 + + +@pytest.mark.asyncio +async def test_provider_netflow_disabled_without_key(): + provider = SentimentProvider(cryptoquant_api_key="") + result = await provider.fetch_exchange_netflow() + assert result is None + + +@pytest.mark.asyncio +async def test_provider_get_sentiment_aggregates(): + provider = SentimentProvider() + + mock_response = AsyncMock() + mock_response.status = 200 + mock_response.json = AsyncMock( + return_value={"data": [{"value": "20", "value_classification": "Extreme Fear"}]} + ) + mock_response.__aenter__ = AsyncMock(return_value=mock_response) + mock_response.__aexit__ = AsyncMock(return_value=False) + + mock_session = MagicMock() + mock_session.closed = False + mock_session.get = MagicMock(return_value=mock_response) + mock_session.close = AsyncMock() + provider._session = mock_session + + sentiment = await provider.get_sentiment("SOL") + assert sentiment.fear_greed_value == 20 + assert sentiment.fear_greed_label == "Extreme Fear" + assert provider.cached is sentiment + + await provider.close() |
