1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
"""SEC EDGAR filing collector (free, no API key required)."""
import logging
from datetime import UTC, datetime
import aiohttp
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from news_collector.collectors.base import BaseCollector
from shared.models import NewsCategory, NewsItem
logger = logging.getLogger(__name__)
TRACKED_CIKS = {
"0000320193": "AAPL",
"0000789019": "MSFT",
"0001652044": "GOOGL",
"0001018724": "AMZN",
"0001318605": "TSLA",
"0001045810": "NVDA",
"0001326801": "META",
"0000019617": "JPM",
"0000078003": "PFE",
"0000021344": "KO",
}
SEC_USER_AGENT = "TradingPlatform research@example.com"
class SecEdgarCollector(BaseCollector):
name = "sec_edgar"
poll_interval = 1800 # 30 minutes
def __init__(self) -> None:
self._vader = SentimentIntensityAnalyzer()
async def is_available(self) -> bool:
return True
async def _fetch_recent_filings(self) -> list[dict]:
results = []
headers = {"User-Agent": SEC_USER_AGENT}
async with aiohttp.ClientSession() as session:
for cik, ticker in TRACKED_CIKS.items():
try:
url = f"https://data.sec.gov/submissions/CIK{cik}.json"
async with session.get(
url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)
) as resp:
if resp.status == 200:
data = await resp.json()
data["tickers"] = [{"ticker": ticker}]
results.append(data)
except Exception as exc:
logger.warning("sec_fetch_failed", cik=cik, error=str(exc))
return results
async def collect(self) -> list[NewsItem]:
filings_data = await self._fetch_recent_filings()
items = []
today = datetime.now(UTC).strftime("%Y-%m-%d")
for company_data in filings_data:
tickers = [t["ticker"] for t in company_data.get("tickers", [])]
company_name = company_data.get("name", "Unknown")
recent = company_data.get("filings", {}).get("recent", {})
forms = recent.get("form", [])
dates = recent.get("filingDate", [])
descriptions = recent.get("primaryDocDescription", [])
accessions = recent.get("accessionNumber", [])
for i, form in enumerate(forms):
if form != "8-K":
continue
filing_date = dates[i] if i < len(dates) else ""
if filing_date != today:
continue
desc = descriptions[i] if i < len(descriptions) else "8-K Filing"
accession = accessions[i] if i < len(accessions) else ""
headline = f"{company_name} ({', '.join(tickers)}): {form} - {desc}"
items.append(
NewsItem(
source=self.name,
headline=headline,
summary=desc,
url=f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&accession={accession}",
published_at=datetime.strptime(filing_date, "%Y-%m-%d").replace(tzinfo=UTC),
symbols=tickers,
sentiment=self._vader.polarity_scores(headline)["compound"],
category=NewsCategory.FILING,
raw_data={"form": form, "accession": accession},
)
)
return items
|