Skip to content

Instantly share code, notes, and snippets.

@AgentLoneStar007
Last active September 25, 2025 15:16
Show Gist options
  • Select an option

  • Save AgentLoneStar007/f2d6299c3487f62141c3bae4d4648f9f to your computer and use it in GitHub Desktop.

Select an option

Save AgentLoneStar007/f2d6299c3487f62141c3bae4d4648f9f to your computer and use it in GitHub Desktop.
A very simple, partially-finished, websocket to connect to Streamer.bot, written in Python.
# Imports
import websockets
import json
import logging
from typing import Optional
from enum import Enum
import asyncio
# TODO: What's left for this websocket helper:
# - Add a backoff strategy for the reconnect
# - Maybe add unique subscription IDs
# - Maybe change out handle_event to instead be able to hook to a callback
# - MAKE WEBSOCKET CHILL WITH THE LOGS! Files are already getting huge over just minutes!
class StreamerBotWebsocket:
class TwitchEvents(str, Enum):
ChatMessage = "ChatMessage"
"""The ChatMessage event type for the Twitch events sub. Will fire upon chat messages."""
def __init__(
self,
url: str,
port: int = 8080,
keep_subscriptions_upon_disconnect: bool = True
) -> None:
"""
A websocket server for interfacing with Streamer.bot.
:param url: The IP address or URL Streamer.bot is at.
:param port: The port that Streamer.bot is listening on.
:param keep_subscriptions_upon_disconnect: Whether subscriptions should retain across disconnects.
:returns: ``None``
:raises None:
"""
# Make the URL and port class-accessible
self.url: str = url
self.port: int = port
del url, port # Cleanup
# Create the base logger
self._log: logging.Logger = logging.getLogger()
# Create a variable to store whether the socket is still listening
self._running: bool = False
# Create the websocket itself
self._websocket: Optional[websockets.ClientConnection] = None
# Create a variable to show how often the connection should be checked, in seconds
self._ping_interval: int = 5
# Create a list to store all background tasks in
self._tasks: list[asyncio.Task] = []
# Create a dictionary to store subscriptions in
self._subscriptions: dict[str, list[str]] = {}
# Create a variable to contain whether subscriptions should be kept upon disconnect
self._keep_subscriptions: bool = keep_subscriptions_upon_disconnect
# Create an Asyncio lock to prevent issues with the subscriptions list ever accidentally
# getting accessed concurrently
self._subscriptions_lock: asyncio.Lock = asyncio.Lock()
return
async def _listen_loop(self, websocket: websockets.ClientConnection):
"""Main loop to receive events."""
async for message in websocket:
try:
# Parse the data
data: dict = json.loads(message)
# Send the data to the event handler
await self._handle_event(data)
except Exception as error:
self._log.error(f"The following error occurred while processing an event: {error}")
return
async def _reconnect(self):
"""Attempt to reconnect to the websocket."""
if self._running:
await asyncio.sleep(5)
await self.connect()
async def _ping_loop(self):
"""Send pings periodically to keep connection alive."""
while self._running and self._websocket:
try:
# Ping the websocket every interval
await self._websocket.ping()
await asyncio.sleep(self._ping_interval)
# Pass the error up if it's an Asyncio cancelled error
except asyncio.CancelledError:
raise
# Handle if there's an error pinging
except Exception as error:
self._log.warning(
f"Failed to ping Streamer.bot with the following error: {error}. Attempting reconnect...")
# Attempt to reconnect
await self._reconnect()
return
async def _handle_event(self, payload: dict):
"""Process data from the websocket."""
## Processor for Twitch chat messages
if (
payload.get("event", {}).get("source", "") == "Twitch"
) and (
payload.get("event", {}).get("type", "") == self.TwitchEvents.ChatMessage
):
self._log.info(
f"Received message from {payload["data"]["message"]["displayName"]}: {payload["data"]["message"]["message"]}"
)
return
async def connect(self) -> None:
"""Starts the websocket connection to Streamer.bot."""
# Update the running var
self._running = True
# Try to connect to Streamer.bot
while self._running:
try:
self._log.debug("Attempting connection to Streamer.bot...")
# Connect to the websocket
self._websocket = await websockets.connect(
# Protocol
"ws://"
# IP/URL
f"{self.url}"
# Port
f":{self.port}"
)
self._log.info("Connected to Streamer.bot.")
# Resubscribe to previous subscriptions
if self._subscriptions and self._keep_subscriptions:
for source, events in self._subscriptions.items():
# Create the subscription message
subscription_message: dict = {
"request": "Subscribe",
"id": "reconnect",
"events": {source: events},
}
# Send it
await self._websocket.send(json.dumps(subscription_message))
self._log.debug(f"Re-subscribed to {source}: {events}")
# Keep listening to the socket in the background
self._tasks.append(asyncio.create_task(self._listen_loop(self._websocket)))
# And keep pinging the connection to make sure it stays alive
self._tasks.append(asyncio.create_task(self._ping_loop()))
return
# Handle an error in the connection
# TODO: Maybe add a different handler for critical errors and non-critical errors
except Exception as error:
# TODO: Implement a back-off system!
self._log.error(f"The following error occurred in the Streamer.bot connection: {error}.")
return
async def subscribe(
self,
twitch: list[TwitchEvents]
) -> None:
"""
Subscribe to an event from the Streamer.bot websocket.
Usage:
# Subscribe to Twitch chat messages
await websocket.subscribe(twitch=websocket.TwitchEvents.ChatMessage)
:param twitch: All Twitch-related events to subscribe to.
:returns: ``None``
:raises ConnectionError: If the websocket is not connected.
"""
# Don't do anything if no arguments were provided
if not any([twitch]):
return
if not self._websocket:
raise ConnectionError("Websocket is not connected!")
# Create the events dictionary
events: dict = {}
# Add all event types to it
if twitch:
events["Twitch"] = [event.value for event in twitch]
async with self._subscriptions_lock:
# Add all subscriptions to the subscriptions dictionary for use upon reconnect
for source, event in events.items():
# Add the source to the subscriptions if not already present
if source not in self._subscriptions:
self._subscriptions[source] = []
for event in events:
# Add the event to the source if not already present
if event not in self._subscriptions[source]:
self._subscriptions[source].append(event)
# Create the payload to be sent
payload: dict = {
"request": "Subscribe",
"id": "1", # TODO: Check if this needs to be unique
"events": events
}
self._log.debug(f"Attempting to subscribe to the following events in Streamer.bot: {events}")
await self._websocket.send(json.dumps(payload))
self._log.debug(f"Subscribed.")
return
async def unsubscribe(
self,
unsubscribe_from_all: bool = False,
twitch: list[TwitchEvents] = None
) -> None:
"""
Unsubscribe from events received from the Streamer.bot websocket.
Usage:
# Unsubscribe from Twitch chat messages
await websocket.unsubscribe(twitch=websocket.TwitchEvents.ChatMessage)
# Or unsub from all events
await websocket.unsubscribe(unsubscribe_from_all=True)
:param unsubscribe_from_all: A boolean value that, if set to true,
will unsubscribe from all other arguments. Default is false.
:param twitch: All Twitch-related events to unsubscribe from.
:returns: ``None``
:raises ConnectionError: If the websocket is not connected.
"""
# Don't do anything if no arguments were provided
if not any([twitch, unsubscribe_from_all]):
return
if not self._websocket:
raise ConnectionError("Websocket is not connected!")
async with self._subscriptions_lock:
if not unsubscribe_from_all:
# Create the events dictionary
events: dict = {}
# Add all event types to it
if twitch:
events["Twitch"] = [event.value for event in twitch]
# Remove all matching events from the subscriptions dictionary to prevent resubscribe upon reconnect
for source, event in events.items():
# Check if the source is in the subscriptions list
if source in self._subscriptions:
for event in events:
# If the event is in the source,
if event in self._subscriptions[source]:
# Remove the event from the source
self._subscriptions[source].remove(event)
# If there are no events left for that source, remove it entirely
if not self._subscriptions[source]:
self._subscriptions.pop(source, None)
else:
# Create the events dictionary to be a copy of the current subscriptions
events: dict = self._subscriptions.copy()
self._subscriptions.clear()
# Create the payload to be sent
payload: dict = {
"request": "Unsubscribe",
"events": events,
}
self._log.debug(
"Attempting to unsubscribe from all events from Streamer.bot..." if unsubscribe_from_all else (
f"Attempting to unsubscribe from the following events in Streamer.bot: " + str(events))
)
await self._websocket.send(json.dumps(payload))
self._log.debug(f"Unsubscribed.")
async def disconnect(self) -> None:
"""Disconnects from Streamer.bot."""
# Update the running var
self._running = False
# Cancel background tasks
for task in self._tasks:
task.cancel()
await asyncio.gather(*self._tasks, return_exceptions=True)
self._tasks.clear()
# Disconnect the websocket if it's still active
if self._websocket:
await self._websocket.close()
self._websocket = None
self._log.debug("Closed connection to Streamer.bot.")
return
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment