Skip to content

Instantly share code, notes, and snippets.

@kiranmaya
Last active November 10, 2025 18:12
Show Gist options
  • Select an option

  • Save kiranmaya/59764d7a8a322347cb8cabf973a5bba0 to your computer and use it in GitHub Desktop.

Select an option

Save kiranmaya/59764d7a8a322347cb8cabf973a5bba0 to your computer and use it in GitHub Desktop.
BinanceLiqudations api .
import json
import logging
import threading
import time
from collections import deque
from contextlib import asynccontextmanager
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Deque, Optional
try:
import websocket
except ImportError as exc: # pragma: no cover
raise ImportError("websocket-client is required: pip install websocket-client") from exc
if not hasattr(websocket, "WebSocketApp"): # pragma: no cover
raise ImportError(
"Expected websocket-client package, but found a different module named 'websocket'. "
"Install it with: pip install websocket-client"
)
from fastapi import FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
STREAM_URL = "wss://fstream.binance.com/ws/!forceOrder@arr"
TARGET_SYMBOL = "BTCUSDT"
RETENTION_HOURS = 240
PING_INTERVAL = 20
PING_TIMEOUT = 10
RECONNECT_DELAY_SECONDS = 5
@dataclass(frozen=True)
class LiquidationEvent:
symbol: str
side: str
price: float
quantity: float
notional: float
timestamp: datetime
class LiquidationStore:
"""Thread-safe in-memory store retaining the most recent liquidation events."""
def __init__(self, retention_hours: int = RETENTION_HOURS) -> None:
self._retention = timedelta(hours=retention_hours)
self._events: Deque[LiquidationEvent] = deque()
self._lock = threading.Lock()
def add(self, event: LiquidationEvent) -> None:
with self._lock:
self._events.append(event)
self._prune_locked()
def summarize(
self,
start_time: datetime,
end_time: datetime,
side: Optional[str] = None,
) -> tuple[float, int]:
with self._lock:
self._prune_locked()
total = 0.0
count = 0
for event in self._events:
if event.timestamp < start_time or event.timestamp > end_time:
continue
if side and event.side != side:
continue
total += event.notional
count += 1
return round(total, 2), count
def bucketize(
self,
start_time: datetime,
end_time: datetime,
interval_minutes: int,
side: Optional[str] = None,
) -> list[dict]:
interval_seconds = interval_minutes * 60
with self._lock:
self._prune_locked()
buckets: dict[datetime, dict[str, float | int]] = {}
for event in self._events:
if event.timestamp < start_time or event.timestamp > end_time:
continue
if side and event.side != side:
continue
bucket_start = self._floor_to_interval(event.timestamp, interval_seconds)
entry = buckets.setdefault(
bucket_start,
{"total_notional": 0.0, "event_count": 0},
)
entry["total_notional"] += event.notional
entry["event_count"] += 1
delta = timedelta(seconds=interval_seconds)
series: list[dict] = []
for bucket_start in sorted(buckets.keys()):
entry = buckets[bucket_start]
if not entry["event_count"]:
continue
series.append(
{
"bucket_start": bucket_start.isoformat(),
"bucket_end": (bucket_start + delta).isoformat(),
"total_notional": round(float(entry["total_notional"]), 2),
"event_count": int(entry["event_count"]),
}
)
return series
@staticmethod
def _floor_to_interval(moment: datetime, interval_seconds: int) -> datetime:
floored_seconds = int(moment.timestamp() // interval_seconds * interval_seconds)
return datetime.fromtimestamp(floored_seconds, tz=timezone.utc)
def _prune_locked(self) -> None:
cutoff = datetime.now(timezone.utc) - self._retention
while self._events and self._events[0].timestamp < cutoff:
self._events.popleft()
class BinanceLiquidationStream:
"""Maintains a live connection to Binance liquidation stream and feeds the store."""
def __init__(self, store: LiquidationStore, url: str = STREAM_URL) -> None:
self._store = store
self._url = url
self._ws_app: Optional[websocket.WebSocketApp] = None
self._stop_event = threading.Event()
self._thread: Optional[threading.Thread] = None
def start(self) -> None:
if self._thread and self._thread.is_alive():
return
self._stop_event.clear()
self._thread = threading.Thread(target=self._run, name="BinanceLiquidations", daemon=True)
self._thread.start()
def stop(self) -> None:
self._stop_event.set()
if self._ws_app:
try:
self._ws_app.close()
except Exception:
logging.exception("Failed to close websocket cleanly")
if self._thread:
self._thread.join(timeout=10)
def _run(self) -> None:
while not self._stop_event.is_set():
self._ws_app = websocket.WebSocketApp(
self._url,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
on_open=self._on_open,
)
try:
self._ws_app.run_forever(ping_interval=PING_INTERVAL, ping_timeout=PING_TIMEOUT)
except Exception:
logging.exception("Websocket run encountered an error")
if self._stop_event.is_set():
break
logging.warning("Websocket disconnected, retrying in %s seconds", RECONNECT_DELAY_SECONDS)
time.sleep(RECONNECT_DELAY_SECONDS)
self._ws_app = None
def _on_message(self, _ws: websocket.WebSocketApp, message: str) -> None:
try:
payload = json.loads(message)
order = payload.get("o", {})
symbol = order.get("s")
if symbol != TARGET_SYMBOL:
return
side = order.get("S")
price = self._parse_float(order.get("ap")) or self._parse_float(order.get("p"))
quantity = self._parse_float(order.get("q")) or self._parse_float(order.get("l"))
timestamp_ms = order.get("T")
if not (side and price and quantity and timestamp_ms):
return
timestamp = datetime.fromtimestamp(int(timestamp_ms) / 1000, tz=timezone.utc)
notional = round(price * quantity, 2)
event = LiquidationEvent(
symbol=symbol,
side=side,
price=price,
quantity=quantity,
notional=notional,
timestamp=timestamp,
)
self._store.add(event)
print(f"Recorded liquidation: {event} on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
except json.JSONDecodeError:
logging.exception("Failed to decode liquidation payload: %s", message)
except Exception:
logging.exception("Unexpected error while processing liquidation payload")
def _on_error(self, _ws: websocket.WebSocketApp, error: Exception) -> None:
logging.error("Websocket error: %s", error)
def _on_close(self, _ws: websocket.WebSocketApp, close_status_code: int, close_msg: str) -> None:
logging.info("Websocket closed: code=%s msg=%s", close_status_code, close_msg)
def _on_open(self, _ws: websocket.WebSocketApp) -> None:
logging.info("Connected to Binance liquidation stream")
@staticmethod
def _parse_float(value: Optional[str]) -> Optional[float]:
try:
result = float(value)
return result if result > 0 else None
except (TypeError, ValueError):
return None
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
store = LiquidationStore()
stream = BinanceLiquidationStream(store)
@asynccontextmanager
async def lifespan(_app: FastAPI):
stream.start()
try:
yield
finally:
stream.stop()
app = FastAPI(
title="BTCUSDT Liquidation Service",
description="Streams Binance liquidation data for BTCUSDT and exposes simple aggregates.",
version="1.0.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/liquidations/total")
def liquidation_total(
minutes: int = Query(60, ge=1, le=RETENTION_HOURS * 60, description="Minutes to look back within the last 3 hours"),
side: Optional[str] = Query(None, pattern="^(BUY|SELL)$", description="Optional side filter"),
) -> dict:
now = datetime.now(timezone.utc)
start = now - timedelta(minutes=minutes)
total, count = store.summarize(start, now, side=side)
return {
"symbol": TARGET_SYMBOL,
"side": side or "ALL",
"window_start": start.isoformat(),
"window_end": now.isoformat(),
"minutes": minutes,
"total_notional": total,
"event_count": count,
}
@app.get("/liquidations/series")
def liquidation_series(
minutes: int = Query(60, ge=1, le=RETENTION_HOURS * 60, description="Minutes to look back within the last 3 hours"),
interval_minutes: int = Query(1, ge=1, description="Bucket size in minutes"),
side: Optional[str] = Query(None, pattern="^(BUY|SELL)$", description="Optional side filter"),
) -> dict:
if interval_minutes > minutes:
raise HTTPException(status_code=400, detail="interval_minutes cannot be greater than minutes")
now = datetime.now(timezone.utc)
start = now - timedelta(minutes=minutes)
series = store.bucketize(start, now, interval_minutes=interval_minutes, side=side)
return {
"symbol": TARGET_SYMBOL,
"side": side or "ALL",
"window_start": start.isoformat(),
"window_end": now.isoformat(),
"minutes": minutes,
"interval_minutes": interval_minutes,
"buckets": series,
}
@app.get("/health")
def health() -> dict:
return {"status": "ok"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app,port=3000, reload=False)
'''
How to use it:
Run uvicorn Bianance.BinanceLiqudations:app --host 0.0.0.0 --port 8000,
confirm /health,
then call
your ip address:port/liquidations/series?minutes=60&interval_minutes=1 for per-minute
totals or /
liquidations/series?minutes=120&interval_minutes=5 for 5-minute buckets (optional &side=BUY/SELL).
Each bucket entry includes bucket_start, bucket_end, total_notional, and event_count.
'''
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment