From a8a3d737b81e8e9ba8fbec8f52d0a5e4258b4dda Mon Sep 17 00:00:00 2001 From: "1568018+davassi@users.noreply.github.com" Date: Fri, 28 Nov 2025 08:53:52 +0100 Subject: [PATCH] Adding sqlite --- hypertrade.db | Bin 0 -> 45056 bytes hypertrade/config.py | 4 + hypertrade/daemon.py | 17 +- hypertrade/database.py | 376 ++++++++++++++++++++++++++++++++++ hypertrade/routes/webhooks.py | 220 ++++++++++++++++++++ 5 files changed, 616 insertions(+), 1 deletion(-) create mode 100644 hypertrade.db create mode 100644 hypertrade/database.py diff --git a/hypertrade.db b/hypertrade.db new file mode 100644 index 0000000000000000000000000000000000000000..99b861af984d99aba5ef8f88b065be686e24dcb2 GIT binary patch literal 45056 zcmeI&O>5&u7{GC9H)}giLl*X6dN=rR=Ho?kd5pSJ^l=-9SNA?Af4VOO7;J z;zAFlw9v28dp||LL~s2HJytK4V_R{&hdqhEz?L-{J)`+Ok8FnI{H*ILDc<_Q#8KjN z?Xjlo+Sfv8npTTnd+{sZlH;BHAl~aspErH1X&-<0&HllETD|s+3oLVC9-&#a4oZU*}%R~L=;wXMFl7a1x@{8)52g$pxjFC4Q zb}b&Fximz2CbR>29mVsAhsYXs3j-87CIe)Rt=p@O^J=B>NmHLJ<}kl5Hd0ZT9n>}j z&pIsy&KivK%7&n`$)!Kef>t*MO*<_IO&g5wA5|J(H1*l~oi6f`#jR|LyXdzRdeLV5 zdbU$-93JYwN6NVz%RF)U{?R;Yzx}*xE^0xv>SAqkM{yovW>#9i`-Cs`44!a|3}?!OR}|k$2nsrFq;Qbo=7d;{6_<_RP+Sl`^(+C`0tj zW3y*kFU<42s@;*X0&Bc}SZN#`=@)56!|QRp=CQ-%!SRN2?Qmx$ziTI3o->h8<13Bd z4)kiH+0_4-r7^RL%ibTXM9A7cSbeNzUkt3y*}yDXy>}6W*+Z6;(mk09-O<*(hwhc< zjJK82b>w);RkJv4?e0AzP6Kx+w{;rJcQQyya{i;LlSm>D8~00IagfB*srAb sqlite3.Connection: + """Get a database connection with row factory.""" + conn = sqlite3.connect(self.db_path) + conn.row_factory = sqlite3.Row + return conn + + def _ensure_db_exists(self) -> None: + """Create database tables if they don't exist.""" + conn = self._get_connection() + cursor = conn.cursor() + + # Orders table: tracks all executed orders (successful and failed) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS orders ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + request_id TEXT UNIQUE, + timestamp TEXT NOT NULL, + symbol TEXT NOT NULL, + side TEXT NOT NULL, + signal TEXT NOT NULL, + quantity REAL NOT NULL, + price REAL NOT NULL, + leverage INTEGER, + subaccount TEXT, + status TEXT NOT NULL, + order_id TEXT, + avg_price REAL, + total_size REAL, + response_json TEXT, + execution_ms REAL + ) + """) + + # Failures table: detailed failure information + cursor.execute(""" + CREATE TABLE IF NOT EXISTS failures ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + order_id INTEGER, + request_id TEXT, + timestamp TEXT NOT NULL, + error_type TEXT NOT NULL, + error_message TEXT NOT NULL, + attempt INTEGER NOT NULL, + retry_count INTEGER DEFAULT 0, + FOREIGN KEY (order_id) REFERENCES orders(id) + ) + """) + + # Create indices for faster queries + cursor.execute("CREATE INDEX IF NOT EXISTS idx_orders_timestamp ON orders(timestamp)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_orders_symbol ON orders(symbol)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_orders_status ON orders(status)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_orders_request_id ON orders(request_id)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_failures_order_id ON failures(order_id)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_failures_timestamp ON failures(timestamp)") + + conn.commit() + conn.close() + + # Restrict database file permissions to owner only (rw-------) + try: + os.chmod(self.db_path, 0o600) + log.debug("Database file permissions restricted to owner only") + except OSError as e: + log.warning("Could not restrict database file permissions: %s", e) + + log.info("Database initialized: %s", self.db_path) + + def log_order( + self, + request_id: str, + symbol: str, + side: str, + signal: str, + quantity: float, + price: float, + status: str, + leverage: Optional[int] = None, + subaccount: Optional[str] = None, + order_id: Optional[str] = None, + avg_price: Optional[float] = None, + total_size: Optional[float] = None, + response_json: Optional[str] = None, + execution_ms: Optional[float] = None, + ) -> int: + """Log an order execution to the database. + + Args: + request_id: Unique request identifier + symbol: Trading symbol (e.g., 'ETHUSDT') + side: Order side (BUY/SELL) + signal: Signal type (OPEN_LONG, CLOSE_LONG, etc.) + quantity: Order quantity + price: Order price + status: Status (PLACED, FILLED, FAILED, REJECTED) + leverage: Order leverage multiplier + subaccount: Subaccount address if trading on subaccount + order_id: Exchange order ID + avg_price: Average execution price + total_size: Total size executed + response_json: Full response JSON from exchange + execution_ms: Execution time in milliseconds + + Returns: + Order ID in database + """ + conn = self._get_connection() + cursor = conn.cursor() + + try: + cursor.execute(""" + INSERT INTO orders ( + request_id, timestamp, symbol, side, signal, quantity, price, + leverage, subaccount, status, order_id, avg_price, total_size, + response_json, execution_ms + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + request_id, + datetime.now(timezone.utc).isoformat(), + symbol, + side, + signal, + quantity, + price, + leverage, + subaccount, + status, + order_id, + avg_price, + total_size, + response_json, + execution_ms, + )) + conn.commit() + order_pk = cursor.lastrowid + log.debug( + "Order logged: id=%d request_id=%s symbol=%s side=%s status=%s", + order_pk, request_id, symbol, side, status + ) + return order_pk + except sqlite3.IntegrityError: + log.warning("Duplicate request_id: %s", request_id) + raise + finally: + conn.close() + + def log_failure( + self, + request_id: str, + error_type: str, + error_message: str, + attempt: int = 1, + retry_count: int = 0, + order_id: Optional[int] = None, + ) -> int: + """Log an order failure or error. + + Args: + request_id: Unique request identifier + error_type: Error class name (HyperliquidValidationError, etc.) + error_message: Human-readable error message + attempt: Current attempt number + retry_count: Number of retries performed + order_id: Foreign key to orders table (if available) + + Returns: + Failure log ID in database + """ + conn = self._get_connection() + cursor = conn.cursor() + + try: + cursor.execute(""" + INSERT INTO failures ( + order_id, request_id, timestamp, error_type, error_message, + attempt, retry_count + ) VALUES (?, ?, ?, ?, ?, ?, ?) + """, ( + order_id, + request_id, + datetime.now(timezone.utc).isoformat(), + error_type, + error_message, + attempt, + retry_count, + )) + conn.commit() + failure_id = cursor.lastrowid + log.debug( + "Failure logged: id=%d request_id=%s error_type=%s attempt=%d", + failure_id, request_id, error_type, attempt + ) + return failure_id + finally: + conn.close() + + def get_orders( + self, + limit: int = 100, + offset: int = 0, + symbol: Optional[str] = None, + status: Optional[str] = None, + side: Optional[str] = None, + ) -> List[Dict[str, Any]]: + """Query orders from database. + + Args: + limit: Maximum number of records to return + offset: Number of records to skip + symbol: Filter by symbol + status: Filter by status + side: Filter by side (BUY/SELL) + + Returns: + List of order dictionaries + """ + conn = self._get_connection() + cursor = conn.cursor() + + query = "SELECT * FROM orders WHERE 1=1" + params: List[Any] = [] + + if symbol: + query += " AND symbol = ?" + params.append(symbol) + if status: + query += " AND status = ?" + params.append(status) + if side: + query += " AND side = ?" + params.append(side) + + query += " ORDER BY timestamp DESC LIMIT ? OFFSET ?" + params.extend([limit, offset]) + + cursor.execute(query, params) + rows = cursor.fetchall() + conn.close() + + return [dict(row) for row in rows] + + def get_failures( + self, + limit: int = 100, + offset: int = 0, + error_type: Optional[str] = None, + ) -> List[Dict[str, Any]]: + """Query failures from database. + + Args: + limit: Maximum number of records to return + offset: Number of records to skip + error_type: Filter by error type + + Returns: + List of failure dictionaries + """ + conn = self._get_connection() + cursor = conn.cursor() + + query = "SELECT * FROM failures WHERE 1=1" + params: List[Any] = [] + + if error_type: + query += " AND error_type = ?" + params.append(error_type) + + query += " ORDER BY timestamp DESC LIMIT ? OFFSET ?" + params.extend([limit, offset]) + + cursor.execute(query, params) + rows = cursor.fetchall() + conn.close() + + return [dict(row) for row in rows] + + def get_order_by_request_id(self, request_id: str) -> Optional[Dict[str, Any]]: + """Get a single order by request ID. + + Args: + request_id: Request identifier + + Returns: + Order dictionary or None if not found + """ + conn = self._get_connection() + cursor = conn.cursor() + + cursor.execute("SELECT * FROM orders WHERE request_id = ?", (request_id,)) + row = cursor.fetchone() + conn.close() + + return dict(row) if row else None + + def get_failures_by_order_id(self, order_id: int) -> List[Dict[str, Any]]: + """Get all failures for a specific order. + + Args: + order_id: Order ID in database + + Returns: + List of failure dictionaries + """ + conn = self._get_connection() + cursor = conn.cursor() + + cursor.execute("SELECT * FROM failures WHERE order_id = ? ORDER BY timestamp", (order_id,)) + rows = cursor.fetchall() + conn.close() + + return [dict(row) for row in rows] + + def get_statistics(self) -> Dict[str, Any]: + """Get summary statistics about orders and failures. + + Returns: + Dictionary with stats + """ + conn = self._get_connection() + cursor = conn.cursor() + + cursor.execute("SELECT COUNT(*) as total_orders FROM orders") + total_orders = cursor.fetchone()["total_orders"] + + cursor.execute("SELECT COUNT(*) as failed_orders FROM orders WHERE status IN ('FAILED', 'REJECTED')") + failed_orders = cursor.fetchone()["failed_orders"] + + cursor.execute("SELECT COUNT(*) as total_failures FROM failures") + total_failures = cursor.fetchone()["total_failures"] + + cursor.execute(""" + SELECT symbol, COUNT(*) as count FROM orders + GROUP BY symbol ORDER BY count DESC LIMIT 5 + """) + top_symbols = [dict(row) for row in cursor.fetchall()] + + cursor.execute(""" + SELECT error_type, COUNT(*) as count FROM failures + GROUP BY error_type ORDER BY count DESC LIMIT 5 + """) + top_errors = [dict(row) for row in cursor.fetchall()] + + conn.close() + + return { + "total_orders": total_orders, + "failed_orders": failed_orders, + "success_rate": (total_orders - failed_orders) / total_orders * 100 if total_orders > 0 else 0, + "total_failures": total_failures, + "top_symbols": top_symbols, + "top_errors": top_errors, + } diff --git a/hypertrade/routes/webhooks.py b/hypertrade/routes/webhooks.py index 98a00bb..32bcc36 100644 --- a/hypertrade/routes/webhooks.py +++ b/hypertrade/routes/webhooks.py @@ -4,6 +4,7 @@ import logging import hmac import time +import json from datetime import datetime, timezone from typing import Optional @@ -31,6 +32,7 @@ ) router = APIRouter(tags=["webhooks"]) +history_router = APIRouter(tags=["history"]) log = logging.getLogger("uvicorn.error") def _place_order_with_retry(client: HyperliquidService, order_request: OrderRequest, max_retries: int = 2) -> dict: @@ -189,23 +191,105 @@ async def hypertrade_webhook( # EXECUTION: Place the order with retry logic. # =================================================================== + req_id = getattr(request.state, "request_id", None) + db = getattr(request.app.state, "db", None) + try: log.info("Attempting to place order on Hyperliquid: symbol=%s side=%s", symbol, side.value) result = _place_order_with_retry(client, order_request, max_retries=2) except HyperliquidValidationError as e: log.warning("Order validation error: %s", e) + if db and req_id: + db.log_order( + request_id=req_id, + symbol=symbol, + side=side.value, + signal=signal.value, + quantity=contracts, + price=price, + leverage=leverage, + subaccount=vault_address, + status="REJECTED", + execution_ms=(time.perf_counter() - start_time) * 1000, + ) + db.log_failure( + request_id=req_id, + error_type=e.__class__.__name__, + error_message=str(e), + attempt=1, + retry_count=0, + ) raise HTTPException(status_code=400, detail=f"Invalid order: {e}") from e except HyperliquidNetworkError as e: log.error("Network error placing order (after retries): %s", e) + if db and req_id: + db.log_order( + request_id=req_id, + symbol=symbol, + side=side.value, + signal=signal.value, + quantity=contracts, + price=price, + leverage=leverage, + subaccount=vault_address, + status="FAILED", + execution_ms=(time.perf_counter() - start_time) * 1000, + ) + db.log_failure( + request_id=req_id, + error_type=e.__class__.__name__, + error_message=str(e), + attempt=3, + retry_count=2, + ) raise HTTPException( status_code=503, detail="Temporary service unavailable - order may have been placed, check manually" ) from e except HyperliquidAPIError as e: log.error("API error placing order (after retries): %s", e) + if db and req_id: + db.log_order( + request_id=req_id, + symbol=symbol, + side=side.value, + signal=signal.value, + quantity=contracts, + price=price, + leverage=leverage, + subaccount=vault_address, + status="FAILED", + execution_ms=(time.perf_counter() - start_time) * 1000, + ) + db.log_failure( + request_id=req_id, + error_type=e.__class__.__name__, + error_message=str(e), + attempt=3, + retry_count=2, + ) raise HTTPException(status_code=502, detail=f"Exchange error: {e}") from e log.info("Order placed successfully: %s", result) + + # Log successful order + if db and req_id: + db.log_order( + request_id=req_id, + symbol=symbol, + side=side.value, + signal=signal.value, + quantity=contracts, + price=price, + leverage=leverage, + subaccount=vault_address, + status="PLACED", + order_id=result.get("orderId"), + avg_price=result.get("avgPx"), + total_size=result.get("totalSz"), + response_json=json.dumps(result) if result else None, + execution_ms=(time.perf_counter() - start_time) * 1000, + ) # Finally: build a response. response = _build_response(payload, signal=signal, side=side, symbol=symbol) @@ -445,3 +529,139 @@ def require_env(var_name: str) -> str: log.info("Missing required environment variable: %s", var_name) raise ValueError(var_name + " must be provided") return value + + +# ═══════════════════════════════════════════════════════════════════════════ +# History & Analytics Endpoints +# ═══════════════════════════════════════════════════════════════════════════ + +@history_router.get( + "/history/orders", + summary="Get order execution history", +) +async def get_orders_history( + request: Request, + limit: int = 100, + offset: int = 0, + symbol: Optional[str] = None, + status: Optional[str] = None, + side: Optional[str] = None, +) -> dict: + """Retrieve order execution history from database. + + Args: + limit: Maximum number of orders to return (default 100, max 1000) + offset: Number of orders to skip for pagination + symbol: Filter by trading symbol (e.g., 'ETHUSDT') + status: Filter by status (PLACED, FAILED, REJECTED) + side: Filter by side (BUY, SELL) + + Returns: + Dictionary with orders list and metadata + """ + db = getattr(request.app.state, "db", None) + if not db: + raise HTTPException(status_code=503, detail="Database not available") + + limit = min(max(1, limit), 1000) + offset = max(0, offset) + + orders = db.get_orders(limit=limit, offset=offset, symbol=symbol, status=status, side=side) + return { + "status": "ok", + "count": len(orders), + "limit": limit, + "offset": offset, + "orders": orders, + } + + +@history_router.get( + "/history/failures", + summary="Get order failure logs", +) +async def get_failures_history( + request: Request, + limit: int = 100, + offset: int = 0, + error_type: Optional[str] = None, +) -> dict: + """Retrieve order failure logs from database. + + Args: + limit: Maximum number of failures to return (default 100, max 1000) + offset: Number of failures to skip for pagination + error_type: Filter by error type + + Returns: + Dictionary with failures list and metadata + """ + db = getattr(request.app.state, "db", None) + if not db: + raise HTTPException(status_code=503, detail="Database not available") + + limit = min(max(1, limit), 1000) + offset = max(0, offset) + + failures = db.get_failures(limit=limit, offset=offset, error_type=error_type) + return { + "status": "ok", + "count": len(failures), + "limit": limit, + "offset": offset, + "failures": failures, + } + + +@history_router.get( + "/history/order/{request_id}", + summary="Get order details by request ID", +) +async def get_order_details( + request: Request, + request_id: str, +) -> dict: + """Get detailed order information and associated failures. + + Args: + request_id: Unique request identifier + + Returns: + Dictionary with order details and failure logs + """ + db = getattr(request.app.state, "db", None) + if not db: + raise HTTPException(status_code=503, detail="Database not available") + + order = db.get_order_by_request_id(request_id) + if not order: + raise HTTPException(status_code=404, detail=f"Order not found: {request_id}") + + failures = db.get_failures_by_order_id(order["id"]) if order.get("id") else [] + + return { + "status": "ok", + "order": order, + "failures": failures, + } + + +@history_router.get( + "/history/stats", + summary="Get order statistics", +) +async def get_statistics(request: Request) -> dict: + """Get summary statistics about orders and failures. + + Returns: + Dictionary with various statistics + """ + db = getattr(request.app.state, "db", None) + if not db: + raise HTTPException(status_code=503, detail="Database not available") + + stats = db.get_statistics() + return { + "status": "ok", + "statistics": stats, + }