summaryrefslogtreecommitdiff
path: root/shared/src
diff options
context:
space:
mode:
Diffstat (limited to 'shared/src')
-rw-r--r--shared/src/shared/db.py52
1 files changed, 42 insertions, 10 deletions
diff --git a/shared/src/shared/db.py b/shared/src/shared/db.py
index f9b7f56..515ba2c 100644
--- a/shared/src/shared/db.py
+++ b/shared/src/shared/db.py
@@ -1,5 +1,6 @@
"""Database layer using SQLAlchemy 2.0 async ORM for the trading platform."""
+from contextlib import asynccontextmanager
from datetime import datetime
from typing import Optional
@@ -42,6 +43,17 @@ class Database:
"""Return a new async session from the factory."""
return self._session_factory()
+ @asynccontextmanager
+ async def transaction(self):
+ """Provide a transactional scope with automatic rollback on error."""
+ async with self.get_session() as session:
+ try:
+ yield session
+ await session.commit()
+ except Exception:
+ await session.rollback()
+ raise
+
async def insert_candle(self, candle: Candle) -> None:
"""Upsert a candle row using session.merge."""
row = CandleRow(
@@ -55,8 +67,12 @@ class Database:
volume=candle.volume,
)
async with self._session_factory() as session:
- session.merge(row)
- await session.commit()
+ try:
+ session.merge(row)
+ await session.commit()
+ except Exception:
+ await session.rollback()
+ raise
async def insert_signal(self, signal: Signal) -> None:
"""Insert a signal row."""
@@ -71,8 +87,12 @@ class Database:
created_at=signal.created_at,
)
async with self._session_factory() as session:
- session.add(row)
- await session.commit()
+ try:
+ session.add(row)
+ await session.commit()
+ except Exception:
+ await session.rollback()
+ raise
async def insert_order(self, order: Order) -> None:
"""Insert an order row."""
@@ -89,8 +109,12 @@ class Database:
filled_at=order.filled_at,
)
async with self._session_factory() as session:
- session.add(row)
- await session.commit()
+ try:
+ session.add(row)
+ await session.commit()
+ except Exception:
+ await session.rollback()
+ raise
async def update_order_status(
self,
@@ -105,8 +129,12 @@ class Database:
.values(status=status.value, filled_at=filled_at)
)
async with self._session_factory() as session:
- await session.execute(stmt)
- await session.commit()
+ try:
+ await session.execute(stmt)
+ await session.commit()
+ except Exception:
+ await session.rollback()
+ raise
async def get_candles(self, symbol: str, timeframe: str, limit: int = 500) -> list[dict]:
"""Retrieve candles ordered by open_time descending."""
@@ -117,6 +145,10 @@ class Database:
.limit(limit)
)
async with self._session_factory() as session:
- result = await session.execute(stmt)
- rows = result.fetchall()
+ try:
+ result = await session.execute(stmt)
+ rows = result.fetchall()
+ except Exception:
+ await session.rollback()
+ raise
return [dict(row._mapping) for row in rows]