Skip to content

Instantly share code, notes, and snippets.

@bonny1992
Created August 10, 2025 16:30
Show Gist options
  • Select an option

  • Save bonny1992/0ec33191233783bd4585946b89dadc16 to your computer and use it in GitHub Desktop.

Select an option

Save bonny1992/0ec33191233783bd4585946b89dadc16 to your computer and use it in GitHub Desktop.
ROMM scan monitor + restart in case of failed connection to metadata providers

RomM Scan Monitor

A Python script that monitors RomM's websocket connections for metadata provider connection errors, automatically restarts scans when internet connectivity is restored, and sends notifications when too many errors occur.

Features

  • Real-time monitoring of RomM websocket connections
  • Automatic scan restart when metadata provider connection errors occur
  • Internet connectivity verification via ping to multiple hosts
  • Configurable error thresholds with Pushover notifications
  • Session-based authentication to avoid 403 errors
  • RomM-style logging with color-coded output

Configuration

Edit romm_monitor_config.yml:

romm:
  host: 127.0.0.1
  port: 8090
  username: your_username_here
  password: your_password_here

monitoring:
  check_interval: 30              # seconds between connectivity checks
  max_retries: 0                  # immediate action on first error
  ping_hosts:                     # hosts to ping for connectivity verification
    - google.com
    - 8.8.8.8
    - 1.1.1.1
  skip_internet_check: false      # skip internet check and restart immediately
  error_threshold: 3              # max errors before notification (default: 3)
  error_time_window: 120          # time window in seconds (default: 2 minutes)

notifications:
  pushover:
    enabled: false                # enable/disable Pushover notifications
    api_token: your_api_token_here
    user_key: your_user_key_here

log_level: INFO

Key Configuration Options

  • skip_internet_check: When true, skips internet connectivity verification and restarts scan immediately on connection errors
  • error_threshold: Number of errors within the time window before sending notification and stopping auto-restart
  • error_time_window: Time window in seconds to count errors (default: 2 minutes)
  • pushover.enabled: Enable/disable Pushover notifications for error threshold alerts

Installation

  1. Install Python dependencies:

    pip install -r requirements.txt
  2. Configure romm_monitor_config.yml with your RomM credentials and settings

  3. Run the monitor:

    python romm_scan_monitor.py

How It Works

  1. Authentication: Establishes HTTP session with RomM using basic auth
  2. Websocket Connection: Connects to RomM's Socket.IO endpoint
  3. Error Monitoring: Listens for scan:done_ko events indicating scan failures
  4. Connection Error Detection: Identifies metadata provider connection errors
  5. Internet Verification: Pings configured hosts to verify connectivity (unless skipped)
  6. Auto-restart: Restarts scan via websocket if internet is restored
  7. Error Threshold: Sends Pushover notification and stops auto-restart if too many errors occur

Error Handling

  • Connection Errors: Automatically detected and handled
  • Internet Check: Verifies connectivity before restarting (configurable)
  • Error Threshold: Prevents infinite restart loops with notifications
  • Authentication: Handles session management and CSRF tokens

Logging

The script uses RomM's logging format:

LEVEL: [RomM][module][timestamp] message

Available log levels: DEBUG, INFO, WARNING, ERROR

Troubleshooting

Common Issues

  1. 403 Authentication Error: Verify username/password in config
  2. Websocket Connection Failed: Check if RomM is running and accessible
  3. Scan Not Restarting: Verify platforms and metadata sources are available

Testing

  • Set skip_internet_check: true to test scan restart without internet verification
  • Use error_threshold: 1 for immediate notification testing
  • Monitor logs for detailed operation information

Dependencies

  • aiohttp: Async HTTP client/server
  • python-socketio: Socket.IO client
  • PyYAML: YAML configuration parsing
aiohttp>=3.8.0
python-socketio>=5.7.0
PyYAML>=6.0
romm:
host: 127.0.0.1
port: 8090 # My ROMM server port
username: your_username_here
password: your_password_here
monitoring:
check_interval: 30 # seconds between connectivity checks
max_retries: 0 # immediate action on first error
ping_hosts: # hosts to ping for connectivity verification
- google.com
- 8.8.8.8
- 1.1.1.1
skip_internet_check: false # skip internet connectivity check and restart scan immediately
error_threshold: 3 # max errors before sending notification (default: 3)
error_time_window: 120 # time window in seconds to count errors (default: 2 minutes)
notifications:
pushover:
enabled: false # enable/disable Pushover notifications
api_token: your_api_token_here
user_key: your_user_key_here
log_level: INFO
#!/usr/bin/env python3
"""
RomM Scan Monitor Script
This script monitors RomM's websockets for connection errors to metadata providers
and automatically restarts the scan when internet connectivity is restored.
Usage:
python romm_scan_monitor.py [--config CONFIG_FILE] [--log-level LEVEL]
Configuration:
Create a config.yml file with the following settings:
romm:
host: 127.0.0.1
port: 8090
username: your_username
password: your_password
monitoring:
check_interval: 30 # seconds between connectivity checks
max_retries: 0 # immediate action on first error
ping_hosts: # hosts to ping for connectivity verification
- google.com
- 8.8.8.8
- 1.1.1.1
log_level: INFO
"""
import asyncio
import logging
import os
import sys
import yaml
from datetime import datetime
from typing import Optional
import aiohttp
import socketio
# RomM-style logging formatter
class RomMLogFormatter(logging.Formatter):
"""Custom formatter to match RomM's logging style"""
# ANSI color codes
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BLUE = '\033[94m'
LIGHTMAGENTA = '\033[95m'
CYAN = '\033[96m'
RESET = '\033[0m'
def format(self, record):
# Set module name for consistent formatting
if not hasattr(record, 'module_name'):
record.module_name = 'scan-monitor'
# Color coding based on level
if record.levelno == logging.INFO:
level_color = self.GREEN
level_padding = " "
elif record.levelno == logging.WARNING:
level_color = self.YELLOW
level_padding = " "
elif record.levelno == logging.ERROR:
level_color = self.RED
level_padding = " "
elif record.levelno == logging.DEBUG:
level_color = self.LIGHTMAGENTA
level_padding = " "
else:
level_color = self.RED
level_padding = " "
# Format: LEVEL: [RomM][module][timestamp] message
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
formatted = f"{level_color}{record.levelname}:{level_padding}{self.RESET}{self.BLUE}[RomM]{self.LIGHTMAGENTA}[{record.module_name}]{self.CYAN}[{timestamp}]{self.RESET} {record.getMessage()}"
return formatted
# Configure logging with RomM style
def setup_logging(log_level: str = 'INFO'):
"""Setup logging with RomM-style formatting"""
logger = logging.getLogger('romm-scan-monitor')
logger.setLevel(getattr(logging, log_level.upper()))
logger.propagate = False
# Console handler with RomM formatter
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(RomMLogFormatter())
logger.addHandler(console_handler)
# File handler with simple format for log files
file_handler = logging.FileHandler('romm_monitor.log')
file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
return logger
class RomMMonitor:
def __init__(self, config_path: str = "romm_monitor_config.yml"):
"""Initialize the RomM monitor"""
self.config = self.load_config(config_path)
self.logger = self.setup_logging()
# RomM connection settings
self.base_url = f"http://{self.config['romm']['host']}:{self.config['romm']['port']}"
self.username = self.config['romm']['username']
self.password = self.config['romm']['password']
# Monitoring settings
self.check_interval = self.config['monitoring']['check_interval']
self.max_retries = self.config['monitoring']['max_retries']
self.ping_hosts = self.config['monitoring']['ping_hosts']
self.skip_internet_check = self.config['monitoring'].get('skip_internet_check', False)
self.error_threshold = self.config['monitoring'].get('error_threshold', 3)
self.error_time_window = self.config['monitoring'].get('error_time_window', 120)
# Notification settings
self.pushover_enabled = self.config.get('notifications', {}).get('pushover', {}).get('enabled', False)
self.pushover_api_token = self.config.get('notifications', {}).get('pushover', {}).get('api_token', '')
self.pushover_user_key = self.config.get('notifications', {}).get('pushover', {}).get('user_key', '')
# State variables
self.session = None
self.socket = None
self.is_connected = False
self.scan_in_progress = False
self.connection_errors = 0
self.last_scan_time = None
self.running = False
self.error_timestamps = [] # Track error timestamps for threshold checking
# Configuration validation
if not self.username or not self.password:
self.logger.error("Username and password not configured in config file")
sys.exit(1)
self.logger.info("RomM scan monitor initialized")
def load_config(self, config_path: str) -> dict:
"""Load configuration from YAML file"""
try:
with open(config_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f) or {}
except FileNotFoundError:
self.logger.error(f"Configuration file not found: {config_path}")
sys.exit(1)
except yaml.YAMLError as e:
self.logger.error(f"Error parsing configuration file: {e}")
sys.exit(1)
def setup_logging(self, log_level: str = 'INFO') -> logging.Logger:
"""Setup logging with RomM-style formatting"""
logger = logging.getLogger('romm-scan-monitor')
logger.setLevel(getattr(logging, log_level.upper()))
logger.propagate = False
# Console handler with RomM formatter
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(RomMLogFormatter())
logger.addHandler(console_handler)
# File handler with simple format for log files
file_handler = logging.FileHandler('romm_monitor.log')
file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
return logger
async def check_internet_connectivity(self) -> bool:
"""Check internet connectivity by pinging configured hosts"""
for host in self.ping_hosts:
try:
# Use asyncio.create_subprocess_exec for async ping
process = await asyncio.create_subprocess_exec(
'ping', '-n', '1', host if os.name == 'nt' else '-c', '1', host,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
await process.wait()
if process.returncode == 0:
self.logger.debug(f"Ping to {host} successful")
return True
except Exception as e:
self.logger.debug(f"Ping to {host} failed: {e}")
continue
self.logger.warning("All ping attempts failed")
return False
async def send_pushover_notification(self, title: str, message: str) -> bool:
"""Send notification via Pushover"""
if not self.pushover_enabled or not self.pushover_api_token or not self.pushover_user_key:
return False
try:
notification_data = {
'token': self.pushover_api_token,
'user': self.pushover_user_key,
'title': title,
'message': message,
'priority': 1 # Normal priority
}
async with self.session.post('https://api.pushover.net/1/messages.json',
data=notification_data) as response:
if response.status == 200:
self.logger.info("Pushover notification sent successfully")
return True
else:
self.logger.warning(f"Failed to send Pushover notification: {response.status}")
return False
except Exception as e:
self.logger.error(f"Error sending Pushover notification: {e}")
return False
async def authenticate(self) -> bool:
"""Authenticate with RomM via HTTP sessions"""
try:
if not self.username or not self.password:
self.logger.error("Username and password not configured")
return False
# Create new HTTP session
if not self.session:
self.session = aiohttp.ClientSession()
# First get CSRF token and session
async with self.session.get(f"{self.base_url}/api/heartbeat") as response:
if response.status != 200:
self.logger.error(f"Unable to get heartbeat: {response.status}")
return False
# Extract CSRF token from cookies
csrf_cookie = response.cookies.get('romm_csrftoken')
if csrf_cookie:
self.logger.debug("CSRF token obtained")
# Perform login via basic authentication
auth = aiohttp.BasicAuth(self.username, self.password)
async with self.session.post(
f"{self.base_url}/api/login",
auth=auth
) as response:
if response.status == 200:
self.logger.info("Authentication completed successfully")
return True
else:
self.logger.error(f"Authentication failed: {response.status}")
return False
except Exception as e:
self.logger.error(f"Error during authentication: {e}")
return False
async def connect_websocket(self):
"""Connect to RomM websockets"""
try:
if not self.session:
self.session = aiohttp.ClientSession()
# Authentication
if not await self.authenticate():
return False
# Websocket connection
self.socket = socketio.AsyncClient()
@self.socket.on('connect')
async def on_connect():
self.logger.info("Connected to RomM websockets")
self.is_connected = True
@self.socket.on('disconnect')
async def on_disconnect():
self.logger.warning("Disconnected from RomM websockets")
self.is_connected = False
@self.socket.on('scan:scanning_platform')
async def on_scanning_platform(data):
self.logger.info(f"Scan in progress for platform: {data.get('name', 'Unknown')}")
self.scan_in_progress = True
@self.socket.on('scan:scanning_rom')
async def on_scanning_rom(data):
self.logger.debug(f"Scanning ROM: {data.get('name', 'Unknown')}")
@self.socket.on('scan:done')
async def on_scan_done(data):
self.logger.info("Scan completed successfully")
self.scan_in_progress = False
self.last_scan_time = datetime.now()
self.connection_errors = 0 # Reset errors
@self.socket.on('scan:done_ko')
async def on_scan_error(data):
self.logger.error(f"Scan failed: {data}")
self.scan_in_progress = False
await self.handle_scan_error(data)
# Connect to websocket
await self.socket.connect(
f"{self.base_url}",
socketio_path="/ws/socket.io/",
transports=['websocket', 'polling'],
wait_timeout=10
)
return True
except Exception as e:
self.logger.error(f"Error connecting to websocket: {e}")
return False
async def handle_scan_error(self, error_data):
"""Handle scan errors and restart if needed"""
error_message = str(error_data)
# Check if it's a connection error to metadata providers
connection_error_keywords = [
"Can't connect to", "check your internet connection",
"timeout", "connection refused", "502", "503", "504",
"Try again", "No such file or directory", "Unable to fetch cover",
"Errno -3", "Errno 2", "neoclone.screenscraper.fr"
]
is_connection_error = any(keyword.lower() in error_message.lower()
for keyword in connection_error_keywords)
if is_connection_error:
current_time = datetime.now()
self.error_timestamps.append(current_time)
# Clean old timestamps outside the time window
cutoff_time = current_time.timestamp() - self.error_time_window
self.error_timestamps = [ts for ts in self.error_timestamps
if ts.timestamp() > cutoff_time]
# Check if we've exceeded the error threshold
if len(self.error_timestamps) >= self.error_threshold:
self.logger.error(f"Error threshold exceeded ({len(self.error_timestamps)} errors in {self.error_time_window}s)")
# Send Pushover notification
notification_title = "RomM Scan Monitor - Too Many Errors"
notification_message = f"RomM has encountered {len(self.error_timestamps)} connection errors in the last {self.error_time_window} seconds. Scan will not be automatically restarted to prevent further issues."
await self.send_pushover_notification(notification_title, notification_message)
# Reset error tracking and don't restart
self.error_timestamps = []
self.connection_errors = 0
return
self.connection_errors += 1
self.logger.warning(f"Connection error detected ({self.connection_errors}/{self.config['monitoring']['max_retries'] + 1})")
# If we have multiple connection errors, check internet connectivity (unless skipped)
if self.connection_errors >= (self.config['monitoring']['max_retries'] + 1):
if self.skip_internet_check:
self.logger.info("Internet check skipped, restarting scan immediately...")
if await self.restart_scan():
self.connection_errors = 0
self.last_scan_time = current_time
self.error_timestamps = [] # Reset error tracking
else:
self.logger.error("Failed to restart scan")
else:
self.logger.warning("Multiple connection errors detected, checking connectivity...")
if await self.check_internet_connectivity():
self.logger.info("Internet connection restored, restarting scan...")
if await self.restart_scan():
self.connection_errors = 0
self.last_scan_time = current_time
self.error_timestamps = [] # Reset error tracking
else:
self.logger.error("Failed to restart scan")
else:
self.logger.warning("Internet connectivity check failed, scan will not be restarted")
else:
# Not a connection error, just log it
self.logger.debug(f"Non-connection error: {error_message}")
async def restart_scan(self):
"""Restart the scan using websocket"""
try:
# Get platforms and metadata sources from heartbeat endpoint
async with self.session.get(f"{self.base_url}/api/heartbeat") as response:
if response.status != 200:
self.logger.error(f"Failed to get heartbeat: {response.status}")
return False
heartbeat_data = await response.json()
# Get filesystem platforms (these are the ones that can be scanned)
fs_platforms = heartbeat_data.get("FILESYSTEM", {}).get("FS_PLATFORMS", [])
if not fs_platforms:
self.logger.warning("No filesystem platforms found")
return False
# Get enabled metadata sources
metadata_sources = []
metadata_info = heartbeat_data.get("METADATA_SOURCES", {})
# Map the enabled sources to their API values
source_mapping = {
"IGDB_API_ENABLED": "igdb",
"SS_API_ENABLED": "ss",
"MOBY_API_ENABLED": "moby",
"RA_API_ENABLED": "ra",
"LAUNCHBOX_API_ENABLED": "lb",
"HASHEOUS_API_ENABLED": "hasheous",
"STEAMGRIDDB_API_ENABLED": "sgdb",
"TGDB_API_ENABLED": "tgdb"
}
for api_flag, api_value in source_mapping.items():
if metadata_info.get(api_flag, False):
metadata_sources.append(api_value)
if not metadata_sources:
self.logger.warning("No metadata sources enabled")
return False
self.logger.info(f"Restarting scan with {len(fs_platforms)} platforms and {len(metadata_sources)} metadata sources")
# Start scan via websocket
if self.socket and self.is_connected:
# Emit scan event with the same parameters as the frontend
await self.socket.emit("scan", {
"platforms": [], # Empty list means scan all platforms
"type": "quick", # Use quick scan type
"apis": metadata_sources
})
self.logger.info("Scan restart command sent via websocket")
return True
else:
self.logger.error("Websocket not connected, cannot restart scan")
return False
except Exception as e:
self.logger.error(f"Error restarting scan: {e}")
return False
async def monitor_loop(self):
"""Main monitoring loop"""
self.running = True
self.logger.info("Monitor started")
try:
while self.running:
# Check if we need to reset connection errors
if self.connection_errors > 0:
current_time = datetime.now()
if (self.last_scan_time and
(current_time - self.last_scan_time).total_seconds() > self.check_interval):
self.logger.info("Periodic connectivity check...")
if await self.check_internet_connectivity():
self.logger.info("Connectivity verified, resetting errors")
self.connection_errors = 0
self.last_scan_time = None
await asyncio.sleep(10) # Check every 10 seconds
except Exception as e:
self.logger.error(f"Error in monitor loop: {e}")
finally:
self.running = False
async def start(self):
"""Start the monitor"""
self.logger.info("Starting RomM monitor...")
try:
# Connect to websocket
if not await self.connect_websocket():
self.logger.error("Cannot connect to RomM")
return
# Start monitoring loop
await self.monitor_loop()
except Exception as e:
self.logger.error(f"Error in monitor: {e}")
finally:
await self.stop()
async def stop(self):
"""Stop the monitor"""
self.logger.info("Stopping monitor...")
self.running = False
try:
if self.socket:
await self.socket.disconnect()
self.socket = None
if self.session:
await self.session.close()
self.session = None
self.is_connected = False
self.logger.info("Monitor stopped")
except Exception as e:
self.logger.error(f"Error during cleanup: {e}")
async def main():
"""Main function"""
import argparse
parser = argparse.ArgumentParser(description="RomM Scan Monitor")
parser.add_argument(
"--config",
default="romm_monitor_config.yml",
help="Path to configuration file (default: romm_monitor_config.yml)"
)
parser.add_argument(
"--help-config",
action="store_true",
help="Show configuration file format and exit"
)
args = parser.parse_args()
if args.help_config:
print("Configuration file format (YAML):")
print("""
romm:
host: 127.0.0.1
port: 8090
username: your_username_here
password: your_password_here
monitoring:
check_interval: 30 # seconds between connectivity checks
max_retries: 0 # immediate action on first error
ping_hosts: # hosts to ping for connectivity verification
- google.com
- 8.8.8.8
- 1.1.1.1
log_level: INFO
""")
return
try:
monitor = RomMMonitor(args.config)
await monitor.start()
except KeyboardInterrupt:
print("\nMonitor stopped by user")
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment