1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
|
"""News Collector Service — fetches news from multiple sources and aggregates sentiment."""
import asyncio
from datetime import UTC, datetime
import aiohttp
from news_collector.collectors.fear_greed import FearGreedCollector
from news_collector.collectors.fed import FedCollector
from news_collector.collectors.finnhub import FinnhubCollector
from news_collector.collectors.reddit import RedditCollector
from news_collector.collectors.rss import RSSCollector
from news_collector.collectors.sec_edgar import SecEdgarCollector
from news_collector.collectors.truth_social import TruthSocialCollector
from news_collector.config import NewsCollectorConfig
from shared.broker import RedisBroker
from shared.db import Database
from shared.events import NewsEvent
from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.models import NewsItem
from shared.notifier import TelegramNotifier
from shared.sentiment import SentimentAggregator
from shared.sentiment_models import MarketSentiment
from shared.shutdown import GracefulShutdown
# Health check port: base + 4
HEALTH_PORT_OFFSET = 4
async def run_collector_once(collector, db: Database, broker: RedisBroker) -> int:
"""Run a single collector, store results in DB, publish to Redis.
Returns the number of items collected.
"""
items: list[NewsItem] = await collector.collect()
count = 0
for item in items:
await db.insert_news_item(item)
event = NewsEvent(data=item)
stream = f"news.{item.category.value}"
await broker.publish(stream, event.to_dict())
count += 1
return count
async def run_collector_loop(collector, db: Database, broker: RedisBroker, log) -> None:
"""Run a collector repeatedly on its configured poll_interval."""
while True:
try:
count = await run_collector_once(collector, db, broker)
log.info(
"collector_ran",
collector=collector.name,
count=count,
)
except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc:
log.warning(
"collector_network_error",
collector=collector.name,
error=str(exc),
)
except (ValueError, KeyError, TypeError) as exc:
log.warning(
"collector_parse_error",
collector=collector.name,
error=str(exc),
)
await asyncio.sleep(collector.poll_interval)
async def run_fear_greed_loop(collector: FearGreedCollector, db: Database, log) -> None:
"""Fetch Fear & Greed index on its interval and update MarketSentiment in DB."""
while True:
try:
result = await collector.collect()
if result is not None:
ms = MarketSentiment(
fear_greed=result.fear_greed,
fear_greed_label=result.fear_greed_label,
vix=None,
fed_stance="neutral",
market_regime=_determine_regime(result.fear_greed, None),
updated_at=datetime.now(UTC),
)
await db.upsert_market_sentiment(ms)
log.info(
"fear_greed_updated",
value=result.fear_greed,
label=result.fear_greed_label,
)
except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc:
log.warning("fear_greed_network_error", error=str(exc))
except (ValueError, KeyError, TypeError) as exc:
log.warning("fear_greed_parse_error", error=str(exc))
await asyncio.sleep(collector.poll_interval)
async def run_aggregator_loop(db: Database, interval: int, log) -> None:
"""Run SentimentAggregator every interval seconds and persist scores."""
aggregator = SentimentAggregator()
while True:
await asyncio.sleep(interval)
try:
now = datetime.now(UTC)
news_items = await db.get_recent_news(hours=24)
scores = aggregator.aggregate(news_items, now)
for score in scores.values():
await db.upsert_symbol_score(score)
log.info("aggregation_complete", symbols=len(scores))
except (ConnectionError, TimeoutError) as exc:
log.warning("aggregator_network_error", error=str(exc))
except (ValueError, KeyError, TypeError) as exc:
log.warning("aggregator_parse_error", error=str(exc))
def _determine_regime(fear_greed: int, vix: float | None) -> str:
"""Classify market regime from fear/greed index and optional VIX."""
aggregator = SentimentAggregator()
return aggregator.determine_regime(fear_greed, vix)
async def run() -> None:
config = NewsCollectorConfig()
log = setup_logging("news-collector", config.log_level, config.log_format)
metrics = ServiceMetrics("news_collector")
notifier = TelegramNotifier(
bot_token=config.telegram_bot_token.get_secret_value(),
chat_id=config.telegram_chat_id,
)
db = Database(config.database_url.get_secret_value())
await db.connect()
broker = RedisBroker(config.redis_url.get_secret_value())
health = HealthCheckServer(
"news-collector",
port=config.health_port,
auth_token=config.metrics_auth_token,
)
await health.start()
metrics.service_up.labels(service="news-collector").set(1)
# Build collectors
finnhub = FinnhubCollector(api_key=config.finnhub_api_key.get_secret_value())
rss = RSSCollector()
sec = SecEdgarCollector()
truth = TruthSocialCollector()
reddit = RedditCollector()
fear_greed = FearGreedCollector()
fed = FedCollector()
news_collectors = [finnhub, rss, sec, truth, reddit, fed]
shutdown = GracefulShutdown()
shutdown.install_handlers()
log.info(
"starting",
collectors=[c.name for c in news_collectors],
poll_interval=config.news_poll_interval,
aggregate_interval=config.sentiment_aggregate_interval,
)
try:
tasks = [
asyncio.create_task(
run_collector_loop(collector, db, broker, log),
name=f"collector-{collector.name}",
)
for collector in news_collectors
]
tasks.append(
asyncio.create_task(
run_fear_greed_loop(fear_greed, db, log),
name="fear-greed-loop",
)
)
tasks.append(
asyncio.create_task(
run_aggregator_loop(db, config.sentiment_aggregate_interval, log),
name="aggregator-loop",
)
)
await shutdown.wait()
except Exception as exc:
log.error("fatal_error", error=str(exc), exc_info=True)
await notifier.send_error(str(exc), "news-collector")
raise
finally:
metrics.service_up.labels(service="news-collector").set(0)
for task in tasks:
task.cancel()
await notifier.close()
await broker.close()
await db.close()
def main() -> None:
asyncio.run(run())
if __name__ == "__main__":
main()
|