Last active
November 10, 2025 18:12
-
-
Save kiranmaya/59764d7a8a322347cb8cabf973a5bba0 to your computer and use it in GitHub Desktop.
BinanceLiqudations api .
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json | |
| import logging | |
| import threading | |
| import time | |
| from collections import deque | |
| from contextlib import asynccontextmanager | |
| from dataclasses import dataclass | |
| from datetime import datetime, timedelta, timezone | |
| from typing import Deque, Optional | |
| try: | |
| import websocket | |
| except ImportError as exc: # pragma: no cover | |
| raise ImportError("websocket-client is required: pip install websocket-client") from exc | |
| if not hasattr(websocket, "WebSocketApp"): # pragma: no cover | |
| raise ImportError( | |
| "Expected websocket-client package, but found a different module named 'websocket'. " | |
| "Install it with: pip install websocket-client" | |
| ) | |
| from fastapi import FastAPI, HTTPException, Query | |
| from fastapi.middleware.cors import CORSMiddleware | |
| STREAM_URL = "wss://fstream.binance.com/ws/!forceOrder@arr" | |
| TARGET_SYMBOL = "BTCUSDT" | |
| RETENTION_HOURS = 240 | |
| PING_INTERVAL = 20 | |
| PING_TIMEOUT = 10 | |
| RECONNECT_DELAY_SECONDS = 5 | |
| @dataclass(frozen=True) | |
| class LiquidationEvent: | |
| symbol: str | |
| side: str | |
| price: float | |
| quantity: float | |
| notional: float | |
| timestamp: datetime | |
| class LiquidationStore: | |
| """Thread-safe in-memory store retaining the most recent liquidation events.""" | |
| def __init__(self, retention_hours: int = RETENTION_HOURS) -> None: | |
| self._retention = timedelta(hours=retention_hours) | |
| self._events: Deque[LiquidationEvent] = deque() | |
| self._lock = threading.Lock() | |
| def add(self, event: LiquidationEvent) -> None: | |
| with self._lock: | |
| self._events.append(event) | |
| self._prune_locked() | |
| def summarize( | |
| self, | |
| start_time: datetime, | |
| end_time: datetime, | |
| side: Optional[str] = None, | |
| ) -> tuple[float, int]: | |
| with self._lock: | |
| self._prune_locked() | |
| total = 0.0 | |
| count = 0 | |
| for event in self._events: | |
| if event.timestamp < start_time or event.timestamp > end_time: | |
| continue | |
| if side and event.side != side: | |
| continue | |
| total += event.notional | |
| count += 1 | |
| return round(total, 2), count | |
| def bucketize( | |
| self, | |
| start_time: datetime, | |
| end_time: datetime, | |
| interval_minutes: int, | |
| side: Optional[str] = None, | |
| ) -> list[dict]: | |
| interval_seconds = interval_minutes * 60 | |
| with self._lock: | |
| self._prune_locked() | |
| buckets: dict[datetime, dict[str, float | int]] = {} | |
| for event in self._events: | |
| if event.timestamp < start_time or event.timestamp > end_time: | |
| continue | |
| if side and event.side != side: | |
| continue | |
| bucket_start = self._floor_to_interval(event.timestamp, interval_seconds) | |
| entry = buckets.setdefault( | |
| bucket_start, | |
| {"total_notional": 0.0, "event_count": 0}, | |
| ) | |
| entry["total_notional"] += event.notional | |
| entry["event_count"] += 1 | |
| delta = timedelta(seconds=interval_seconds) | |
| series: list[dict] = [] | |
| for bucket_start in sorted(buckets.keys()): | |
| entry = buckets[bucket_start] | |
| if not entry["event_count"]: | |
| continue | |
| series.append( | |
| { | |
| "bucket_start": bucket_start.isoformat(), | |
| "bucket_end": (bucket_start + delta).isoformat(), | |
| "total_notional": round(float(entry["total_notional"]), 2), | |
| "event_count": int(entry["event_count"]), | |
| } | |
| ) | |
| return series | |
| @staticmethod | |
| def _floor_to_interval(moment: datetime, interval_seconds: int) -> datetime: | |
| floored_seconds = int(moment.timestamp() // interval_seconds * interval_seconds) | |
| return datetime.fromtimestamp(floored_seconds, tz=timezone.utc) | |
| def _prune_locked(self) -> None: | |
| cutoff = datetime.now(timezone.utc) - self._retention | |
| while self._events and self._events[0].timestamp < cutoff: | |
| self._events.popleft() | |
| class BinanceLiquidationStream: | |
| """Maintains a live connection to Binance liquidation stream and feeds the store.""" | |
| def __init__(self, store: LiquidationStore, url: str = STREAM_URL) -> None: | |
| self._store = store | |
| self._url = url | |
| self._ws_app: Optional[websocket.WebSocketApp] = None | |
| self._stop_event = threading.Event() | |
| self._thread: Optional[threading.Thread] = None | |
| def start(self) -> None: | |
| if self._thread and self._thread.is_alive(): | |
| return | |
| self._stop_event.clear() | |
| self._thread = threading.Thread(target=self._run, name="BinanceLiquidations", daemon=True) | |
| self._thread.start() | |
| def stop(self) -> None: | |
| self._stop_event.set() | |
| if self._ws_app: | |
| try: | |
| self._ws_app.close() | |
| except Exception: | |
| logging.exception("Failed to close websocket cleanly") | |
| if self._thread: | |
| self._thread.join(timeout=10) | |
| def _run(self) -> None: | |
| while not self._stop_event.is_set(): | |
| self._ws_app = websocket.WebSocketApp( | |
| self._url, | |
| on_message=self._on_message, | |
| on_error=self._on_error, | |
| on_close=self._on_close, | |
| on_open=self._on_open, | |
| ) | |
| try: | |
| self._ws_app.run_forever(ping_interval=PING_INTERVAL, ping_timeout=PING_TIMEOUT) | |
| except Exception: | |
| logging.exception("Websocket run encountered an error") | |
| if self._stop_event.is_set(): | |
| break | |
| logging.warning("Websocket disconnected, retrying in %s seconds", RECONNECT_DELAY_SECONDS) | |
| time.sleep(RECONNECT_DELAY_SECONDS) | |
| self._ws_app = None | |
| def _on_message(self, _ws: websocket.WebSocketApp, message: str) -> None: | |
| try: | |
| payload = json.loads(message) | |
| order = payload.get("o", {}) | |
| symbol = order.get("s") | |
| if symbol != TARGET_SYMBOL: | |
| return | |
| side = order.get("S") | |
| price = self._parse_float(order.get("ap")) or self._parse_float(order.get("p")) | |
| quantity = self._parse_float(order.get("q")) or self._parse_float(order.get("l")) | |
| timestamp_ms = order.get("T") | |
| if not (side and price and quantity and timestamp_ms): | |
| return | |
| timestamp = datetime.fromtimestamp(int(timestamp_ms) / 1000, tz=timezone.utc) | |
| notional = round(price * quantity, 2) | |
| event = LiquidationEvent( | |
| symbol=symbol, | |
| side=side, | |
| price=price, | |
| quantity=quantity, | |
| notional=notional, | |
| timestamp=timestamp, | |
| ) | |
| self._store.add(event) | |
| print(f"Recorded liquidation: {event} on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| except json.JSONDecodeError: | |
| logging.exception("Failed to decode liquidation payload: %s", message) | |
| except Exception: | |
| logging.exception("Unexpected error while processing liquidation payload") | |
| def _on_error(self, _ws: websocket.WebSocketApp, error: Exception) -> None: | |
| logging.error("Websocket error: %s", error) | |
| def _on_close(self, _ws: websocket.WebSocketApp, close_status_code: int, close_msg: str) -> None: | |
| logging.info("Websocket closed: code=%s msg=%s", close_status_code, close_msg) | |
| def _on_open(self, _ws: websocket.WebSocketApp) -> None: | |
| logging.info("Connected to Binance liquidation stream") | |
| @staticmethod | |
| def _parse_float(value: Optional[str]) -> Optional[float]: | |
| try: | |
| result = float(value) | |
| return result if result > 0 else None | |
| except (TypeError, ValueError): | |
| return None | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| store = LiquidationStore() | |
| stream = BinanceLiquidationStream(store) | |
| @asynccontextmanager | |
| async def lifespan(_app: FastAPI): | |
| stream.start() | |
| try: | |
| yield | |
| finally: | |
| stream.stop() | |
| app = FastAPI( | |
| title="BTCUSDT Liquidation Service", | |
| description="Streams Binance liquidation data for BTCUSDT and exposes simple aggregates.", | |
| version="1.0.0", | |
| lifespan=lifespan, | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| @app.get("/liquidations/total") | |
| def liquidation_total( | |
| minutes: int = Query(60, ge=1, le=RETENTION_HOURS * 60, description="Minutes to look back within the last 3 hours"), | |
| side: Optional[str] = Query(None, pattern="^(BUY|SELL)$", description="Optional side filter"), | |
| ) -> dict: | |
| now = datetime.now(timezone.utc) | |
| start = now - timedelta(minutes=minutes) | |
| total, count = store.summarize(start, now, side=side) | |
| return { | |
| "symbol": TARGET_SYMBOL, | |
| "side": side or "ALL", | |
| "window_start": start.isoformat(), | |
| "window_end": now.isoformat(), | |
| "minutes": minutes, | |
| "total_notional": total, | |
| "event_count": count, | |
| } | |
| @app.get("/liquidations/series") | |
| def liquidation_series( | |
| minutes: int = Query(60, ge=1, le=RETENTION_HOURS * 60, description="Minutes to look back within the last 3 hours"), | |
| interval_minutes: int = Query(1, ge=1, description="Bucket size in minutes"), | |
| side: Optional[str] = Query(None, pattern="^(BUY|SELL)$", description="Optional side filter"), | |
| ) -> dict: | |
| if interval_minutes > minutes: | |
| raise HTTPException(status_code=400, detail="interval_minutes cannot be greater than minutes") | |
| now = datetime.now(timezone.utc) | |
| start = now - timedelta(minutes=minutes) | |
| series = store.bucketize(start, now, interval_minutes=interval_minutes, side=side) | |
| return { | |
| "symbol": TARGET_SYMBOL, | |
| "side": side or "ALL", | |
| "window_start": start.isoformat(), | |
| "window_end": now.isoformat(), | |
| "minutes": minutes, | |
| "interval_minutes": interval_minutes, | |
| "buckets": series, | |
| } | |
| @app.get("/health") | |
| def health() -> dict: | |
| return {"status": "ok"} | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app,port=3000, reload=False) | |
| ''' | |
| How to use it: | |
| Run uvicorn Bianance.BinanceLiqudations:app --host 0.0.0.0 --port 8000, | |
| confirm /health, | |
| then call | |
| your ip address:port/liquidations/series?minutes=60&interval_minutes=1 for per-minute | |
| totals or / | |
| liquidations/series?minutes=120&interval_minutes=5 for 5-minute buckets (optional &side=BUY/SELL). | |
| Each bucket entry includes bucket_start, bucket_end, total_notional, and event_count. | |
| ''' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment