Skip to content

Instantly share code, notes, and snippets.

@ngi
Created March 4, 2026 08:50
Show Gist options
  • Select an option

  • Save ngi/b7eb253f40b1727b41cd8116c9c6e7a1 to your computer and use it in GitHub Desktop.

Select an option

Save ngi/b7eb253f40b1727b41cd8116c9c6e7a1 to your computer and use it in GitHub Desktop.
Polymarket RTDS trade listener (aiohttp)
#!/usr/bin/env python3
"""Polymarket RTDS trade listener (aiohttp)."""
import asyncio, json, sys
from datetime import datetime
import aiohttp
URI = "wss://ws-live-data.polymarket.com"
SLUG = sys.argv[1] if len(sys.argv) > 1 else None
def ts():
return datetime.now().strftime("%H:%M:%S")
async def main():
while True:
try:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(URI) as ws:
await ws.send_str("ping")
print(f"[{ts()}] ping →")
sub = {"topic": "activity", "type": "orders_matched"}
if SLUG:
sub["filters"] = json.dumps({"market_slug": SLUG}, separators=(",", ":"))
await ws.send_str(json.dumps({
"action": "subscribe",
"subscriptions": [sub],
}))
print(f"[{ts()}] Connected — filter: {SLUG or 'all'}")
ping = asyncio.create_task(keep_alive(ws))
try:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
raw = msg.data
if raw == "pong":
print(f"[{ts()}] ← pong")
continue
print(f"[{ts()}] {raw}")
elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
break
finally:
ping.cancel()
except (aiohttp.ClientError, OSError) as e:
print(f"[{ts()}] Disconnected ({e}), reconnecting in 2s...")
await asyncio.sleep(2)
async def keep_alive(ws):
while True:
await asyncio.sleep(10)
await ws.send_str("ping")
print(f"[{ts()}] ping →")
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment