|
#!/usr/bin/env python3 |
|
""" |
|
RomM Scan Monitor Script |
|
|
|
This script monitors RomM's websockets for connection errors to metadata providers |
|
and automatically restarts the scan when internet connectivity is restored. |
|
|
|
Usage: |
|
python romm_scan_monitor.py [--config CONFIG_FILE] [--log-level LEVEL] |
|
|
|
Configuration: |
|
Create a config.yml file with the following settings: |
|
|
|
romm: |
|
host: 127.0.0.1 |
|
port: 8090 |
|
username: your_username |
|
password: your_password |
|
|
|
monitoring: |
|
check_interval: 30 # seconds between connectivity checks |
|
max_retries: 0 # immediate action on first error |
|
ping_hosts: # hosts to ping for connectivity verification |
|
- google.com |
|
- 8.8.8.8 |
|
- 1.1.1.1 |
|
|
|
log_level: INFO |
|
""" |
|
|
|
import asyncio |
|
import logging |
|
import os |
|
import sys |
|
import yaml |
|
from datetime import datetime |
|
from typing import Optional |
|
|
|
import aiohttp |
|
import socketio |
|
|
|
# RomM-style logging formatter |
|
class RomMLogFormatter(logging.Formatter): |
|
"""Custom formatter to match RomM's logging style""" |
|
|
|
# ANSI color codes |
|
GREEN = '\033[92m' |
|
YELLOW = '\033[93m' |
|
RED = '\033[91m' |
|
BLUE = '\033[94m' |
|
LIGHTMAGENTA = '\033[95m' |
|
CYAN = '\033[96m' |
|
RESET = '\033[0m' |
|
|
|
def format(self, record): |
|
# Set module name for consistent formatting |
|
if not hasattr(record, 'module_name'): |
|
record.module_name = 'scan-monitor' |
|
|
|
# Color coding based on level |
|
if record.levelno == logging.INFO: |
|
level_color = self.GREEN |
|
level_padding = " " |
|
elif record.levelno == logging.WARNING: |
|
level_color = self.YELLOW |
|
level_padding = " " |
|
elif record.levelno == logging.ERROR: |
|
level_color = self.RED |
|
level_padding = " " |
|
elif record.levelno == logging.DEBUG: |
|
level_color = self.LIGHTMAGENTA |
|
level_padding = " " |
|
else: |
|
level_color = self.RED |
|
level_padding = " " |
|
|
|
# Format: LEVEL: [RomM][module][timestamp] message |
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
formatted = f"{level_color}{record.levelname}:{level_padding}{self.RESET}{self.BLUE}[RomM]{self.LIGHTMAGENTA}[{record.module_name}]{self.CYAN}[{timestamp}]{self.RESET} {record.getMessage()}" |
|
|
|
return formatted |
|
|
|
# Configure logging with RomM style |
|
def setup_logging(log_level: str = 'INFO'): |
|
"""Setup logging with RomM-style formatting""" |
|
logger = logging.getLogger('romm-scan-monitor') |
|
logger.setLevel(getattr(logging, log_level.upper())) |
|
logger.propagate = False |
|
|
|
# Console handler with RomM formatter |
|
console_handler = logging.StreamHandler(sys.stdout) |
|
console_handler.setFormatter(RomMLogFormatter()) |
|
logger.addHandler(console_handler) |
|
|
|
# File handler with simple format for log files |
|
file_handler = logging.FileHandler('romm_monitor.log') |
|
file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') |
|
file_handler.setFormatter(file_formatter) |
|
logger.addHandler(file_handler) |
|
|
|
return logger |
|
|
|
class RomMMonitor: |
|
def __init__(self, config_path: str = "romm_monitor_config.yml"): |
|
"""Initialize the RomM monitor""" |
|
self.config = self.load_config(config_path) |
|
self.logger = self.setup_logging() |
|
|
|
# RomM connection settings |
|
self.base_url = f"http://{self.config['romm']['host']}:{self.config['romm']['port']}" |
|
self.username = self.config['romm']['username'] |
|
self.password = self.config['romm']['password'] |
|
|
|
# Monitoring settings |
|
self.check_interval = self.config['monitoring']['check_interval'] |
|
self.max_retries = self.config['monitoring']['max_retries'] |
|
self.ping_hosts = self.config['monitoring']['ping_hosts'] |
|
self.skip_internet_check = self.config['monitoring'].get('skip_internet_check', False) |
|
self.error_threshold = self.config['monitoring'].get('error_threshold', 3) |
|
self.error_time_window = self.config['monitoring'].get('error_time_window', 120) |
|
|
|
# Notification settings |
|
self.pushover_enabled = self.config.get('notifications', {}).get('pushover', {}).get('enabled', False) |
|
self.pushover_api_token = self.config.get('notifications', {}).get('pushover', {}).get('api_token', '') |
|
self.pushover_user_key = self.config.get('notifications', {}).get('pushover', {}).get('user_key', '') |
|
|
|
# State variables |
|
self.session = None |
|
self.socket = None |
|
self.is_connected = False |
|
self.scan_in_progress = False |
|
self.connection_errors = 0 |
|
self.last_scan_time = None |
|
self.running = False |
|
self.error_timestamps = [] # Track error timestamps for threshold checking |
|
|
|
# Configuration validation |
|
if not self.username or not self.password: |
|
self.logger.error("Username and password not configured in config file") |
|
sys.exit(1) |
|
|
|
self.logger.info("RomM scan monitor initialized") |
|
|
|
def load_config(self, config_path: str) -> dict: |
|
"""Load configuration from YAML file""" |
|
try: |
|
with open(config_path, 'r', encoding='utf-8') as f: |
|
return yaml.safe_load(f) or {} |
|
except FileNotFoundError: |
|
self.logger.error(f"Configuration file not found: {config_path}") |
|
sys.exit(1) |
|
except yaml.YAMLError as e: |
|
self.logger.error(f"Error parsing configuration file: {e}") |
|
sys.exit(1) |
|
|
|
def setup_logging(self, log_level: str = 'INFO') -> logging.Logger: |
|
"""Setup logging with RomM-style formatting""" |
|
logger = logging.getLogger('romm-scan-monitor') |
|
logger.setLevel(getattr(logging, log_level.upper())) |
|
logger.propagate = False |
|
|
|
# Console handler with RomM formatter |
|
console_handler = logging.StreamHandler(sys.stdout) |
|
console_handler.setFormatter(RomMLogFormatter()) |
|
logger.addHandler(console_handler) |
|
|
|
# File handler with simple format for log files |
|
file_handler = logging.FileHandler('romm_monitor.log') |
|
file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') |
|
file_handler.setFormatter(file_formatter) |
|
logger.addHandler(file_handler) |
|
|
|
return logger |
|
|
|
async def check_internet_connectivity(self) -> bool: |
|
"""Check internet connectivity by pinging configured hosts""" |
|
for host in self.ping_hosts: |
|
try: |
|
# Use asyncio.create_subprocess_exec for async ping |
|
process = await asyncio.create_subprocess_exec( |
|
'ping', '-n', '1', host if os.name == 'nt' else '-c', '1', host, |
|
stdout=asyncio.subprocess.DEVNULL, |
|
stderr=asyncio.subprocess.DEVNULL |
|
) |
|
await process.wait() |
|
|
|
if process.returncode == 0: |
|
self.logger.debug(f"Ping to {host} successful") |
|
return True |
|
except Exception as e: |
|
self.logger.debug(f"Ping to {host} failed: {e}") |
|
continue |
|
|
|
self.logger.warning("All ping attempts failed") |
|
return False |
|
|
|
async def send_pushover_notification(self, title: str, message: str) -> bool: |
|
"""Send notification via Pushover""" |
|
if not self.pushover_enabled or not self.pushover_api_token or not self.pushover_user_key: |
|
return False |
|
|
|
try: |
|
notification_data = { |
|
'token': self.pushover_api_token, |
|
'user': self.pushover_user_key, |
|
'title': title, |
|
'message': message, |
|
'priority': 1 # Normal priority |
|
} |
|
|
|
async with self.session.post('https://api.pushover.net/1/messages.json', |
|
data=notification_data) as response: |
|
if response.status == 200: |
|
self.logger.info("Pushover notification sent successfully") |
|
return True |
|
else: |
|
self.logger.warning(f"Failed to send Pushover notification: {response.status}") |
|
return False |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error sending Pushover notification: {e}") |
|
return False |
|
|
|
async def authenticate(self) -> bool: |
|
"""Authenticate with RomM via HTTP sessions""" |
|
try: |
|
if not self.username or not self.password: |
|
self.logger.error("Username and password not configured") |
|
return False |
|
|
|
# Create new HTTP session |
|
if not self.session: |
|
self.session = aiohttp.ClientSession() |
|
|
|
# First get CSRF token and session |
|
async with self.session.get(f"{self.base_url}/api/heartbeat") as response: |
|
if response.status != 200: |
|
self.logger.error(f"Unable to get heartbeat: {response.status}") |
|
return False |
|
|
|
# Extract CSRF token from cookies |
|
csrf_cookie = response.cookies.get('romm_csrftoken') |
|
if csrf_cookie: |
|
self.logger.debug("CSRF token obtained") |
|
|
|
# Perform login via basic authentication |
|
auth = aiohttp.BasicAuth(self.username, self.password) |
|
|
|
async with self.session.post( |
|
f"{self.base_url}/api/login", |
|
auth=auth |
|
) as response: |
|
if response.status == 200: |
|
self.logger.info("Authentication completed successfully") |
|
return True |
|
else: |
|
self.logger.error(f"Authentication failed: {response.status}") |
|
return False |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error during authentication: {e}") |
|
return False |
|
|
|
async def connect_websocket(self): |
|
"""Connect to RomM websockets""" |
|
try: |
|
if not self.session: |
|
self.session = aiohttp.ClientSession() |
|
|
|
# Authentication |
|
if not await self.authenticate(): |
|
return False |
|
|
|
# Websocket connection |
|
self.socket = socketio.AsyncClient() |
|
|
|
@self.socket.on('connect') |
|
async def on_connect(): |
|
self.logger.info("Connected to RomM websockets") |
|
self.is_connected = True |
|
|
|
@self.socket.on('disconnect') |
|
async def on_disconnect(): |
|
self.logger.warning("Disconnected from RomM websockets") |
|
self.is_connected = False |
|
|
|
@self.socket.on('scan:scanning_platform') |
|
async def on_scanning_platform(data): |
|
self.logger.info(f"Scan in progress for platform: {data.get('name', 'Unknown')}") |
|
self.scan_in_progress = True |
|
|
|
@self.socket.on('scan:scanning_rom') |
|
async def on_scanning_rom(data): |
|
self.logger.debug(f"Scanning ROM: {data.get('name', 'Unknown')}") |
|
|
|
@self.socket.on('scan:done') |
|
async def on_scan_done(data): |
|
self.logger.info("Scan completed successfully") |
|
self.scan_in_progress = False |
|
self.last_scan_time = datetime.now() |
|
self.connection_errors = 0 # Reset errors |
|
|
|
@self.socket.on('scan:done_ko') |
|
async def on_scan_error(data): |
|
self.logger.error(f"Scan failed: {data}") |
|
self.scan_in_progress = False |
|
await self.handle_scan_error(data) |
|
|
|
# Connect to websocket |
|
await self.socket.connect( |
|
f"{self.base_url}", |
|
socketio_path="/ws/socket.io/", |
|
transports=['websocket', 'polling'], |
|
wait_timeout=10 |
|
) |
|
|
|
return True |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error connecting to websocket: {e}") |
|
return False |
|
|
|
async def handle_scan_error(self, error_data): |
|
"""Handle scan errors and restart if needed""" |
|
error_message = str(error_data) |
|
|
|
# Check if it's a connection error to metadata providers |
|
connection_error_keywords = [ |
|
"Can't connect to", "check your internet connection", |
|
"timeout", "connection refused", "502", "503", "504", |
|
"Try again", "No such file or directory", "Unable to fetch cover", |
|
"Errno -3", "Errno 2", "neoclone.screenscraper.fr" |
|
] |
|
|
|
is_connection_error = any(keyword.lower() in error_message.lower() |
|
for keyword in connection_error_keywords) |
|
|
|
if is_connection_error: |
|
current_time = datetime.now() |
|
self.error_timestamps.append(current_time) |
|
|
|
# Clean old timestamps outside the time window |
|
cutoff_time = current_time.timestamp() - self.error_time_window |
|
self.error_timestamps = [ts for ts in self.error_timestamps |
|
if ts.timestamp() > cutoff_time] |
|
|
|
# Check if we've exceeded the error threshold |
|
if len(self.error_timestamps) >= self.error_threshold: |
|
self.logger.error(f"Error threshold exceeded ({len(self.error_timestamps)} errors in {self.error_time_window}s)") |
|
|
|
# Send Pushover notification |
|
notification_title = "RomM Scan Monitor - Too Many Errors" |
|
notification_message = f"RomM has encountered {len(self.error_timestamps)} connection errors in the last {self.error_time_window} seconds. Scan will not be automatically restarted to prevent further issues." |
|
|
|
await self.send_pushover_notification(notification_title, notification_message) |
|
|
|
# Reset error tracking and don't restart |
|
self.error_timestamps = [] |
|
self.connection_errors = 0 |
|
return |
|
|
|
self.connection_errors += 1 |
|
self.logger.warning(f"Connection error detected ({self.connection_errors}/{self.config['monitoring']['max_retries'] + 1})") |
|
|
|
# If we have multiple connection errors, check internet connectivity (unless skipped) |
|
if self.connection_errors >= (self.config['monitoring']['max_retries'] + 1): |
|
if self.skip_internet_check: |
|
self.logger.info("Internet check skipped, restarting scan immediately...") |
|
if await self.restart_scan(): |
|
self.connection_errors = 0 |
|
self.last_scan_time = current_time |
|
self.error_timestamps = [] # Reset error tracking |
|
else: |
|
self.logger.error("Failed to restart scan") |
|
else: |
|
self.logger.warning("Multiple connection errors detected, checking connectivity...") |
|
|
|
if await self.check_internet_connectivity(): |
|
self.logger.info("Internet connection restored, restarting scan...") |
|
if await self.restart_scan(): |
|
self.connection_errors = 0 |
|
self.last_scan_time = current_time |
|
self.error_timestamps = [] # Reset error tracking |
|
else: |
|
self.logger.error("Failed to restart scan") |
|
else: |
|
self.logger.warning("Internet connectivity check failed, scan will not be restarted") |
|
else: |
|
# Not a connection error, just log it |
|
self.logger.debug(f"Non-connection error: {error_message}") |
|
|
|
async def restart_scan(self): |
|
"""Restart the scan using websocket""" |
|
try: |
|
# Get platforms and metadata sources from heartbeat endpoint |
|
async with self.session.get(f"{self.base_url}/api/heartbeat") as response: |
|
if response.status != 200: |
|
self.logger.error(f"Failed to get heartbeat: {response.status}") |
|
return False |
|
|
|
heartbeat_data = await response.json() |
|
|
|
# Get filesystem platforms (these are the ones that can be scanned) |
|
fs_platforms = heartbeat_data.get("FILESYSTEM", {}).get("FS_PLATFORMS", []) |
|
if not fs_platforms: |
|
self.logger.warning("No filesystem platforms found") |
|
return False |
|
|
|
# Get enabled metadata sources |
|
metadata_sources = [] |
|
metadata_info = heartbeat_data.get("METADATA_SOURCES", {}) |
|
|
|
# Map the enabled sources to their API values |
|
source_mapping = { |
|
"IGDB_API_ENABLED": "igdb", |
|
"SS_API_ENABLED": "ss", |
|
"MOBY_API_ENABLED": "moby", |
|
"RA_API_ENABLED": "ra", |
|
"LAUNCHBOX_API_ENABLED": "lb", |
|
"HASHEOUS_API_ENABLED": "hasheous", |
|
"STEAMGRIDDB_API_ENABLED": "sgdb", |
|
"TGDB_API_ENABLED": "tgdb" |
|
} |
|
|
|
for api_flag, api_value in source_mapping.items(): |
|
if metadata_info.get(api_flag, False): |
|
metadata_sources.append(api_value) |
|
|
|
if not metadata_sources: |
|
self.logger.warning("No metadata sources enabled") |
|
return False |
|
|
|
self.logger.info(f"Restarting scan with {len(fs_platforms)} platforms and {len(metadata_sources)} metadata sources") |
|
|
|
# Start scan via websocket |
|
if self.socket and self.is_connected: |
|
# Emit scan event with the same parameters as the frontend |
|
await self.socket.emit("scan", { |
|
"platforms": [], # Empty list means scan all platforms |
|
"type": "quick", # Use quick scan type |
|
"apis": metadata_sources |
|
}) |
|
self.logger.info("Scan restart command sent via websocket") |
|
return True |
|
else: |
|
self.logger.error("Websocket not connected, cannot restart scan") |
|
return False |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error restarting scan: {e}") |
|
return False |
|
|
|
async def monitor_loop(self): |
|
"""Main monitoring loop""" |
|
self.running = True |
|
self.logger.info("Monitor started") |
|
|
|
try: |
|
while self.running: |
|
# Check if we need to reset connection errors |
|
if self.connection_errors > 0: |
|
current_time = datetime.now() |
|
if (self.last_scan_time and |
|
(current_time - self.last_scan_time).total_seconds() > self.check_interval): |
|
|
|
self.logger.info("Periodic connectivity check...") |
|
if await self.check_internet_connectivity(): |
|
self.logger.info("Connectivity verified, resetting errors") |
|
self.connection_errors = 0 |
|
self.last_scan_time = None |
|
|
|
await asyncio.sleep(10) # Check every 10 seconds |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error in monitor loop: {e}") |
|
finally: |
|
self.running = False |
|
|
|
async def start(self): |
|
"""Start the monitor""" |
|
self.logger.info("Starting RomM monitor...") |
|
|
|
try: |
|
# Connect to websocket |
|
if not await self.connect_websocket(): |
|
self.logger.error("Cannot connect to RomM") |
|
return |
|
|
|
# Start monitoring loop |
|
await self.monitor_loop() |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error in monitor: {e}") |
|
finally: |
|
await self.stop() |
|
|
|
async def stop(self): |
|
"""Stop the monitor""" |
|
self.logger.info("Stopping monitor...") |
|
self.running = False |
|
|
|
try: |
|
if self.socket: |
|
await self.socket.disconnect() |
|
self.socket = None |
|
|
|
if self.session: |
|
await self.session.close() |
|
self.session = None |
|
|
|
self.is_connected = False |
|
self.logger.info("Monitor stopped") |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error during cleanup: {e}") |
|
|
|
async def main(): |
|
"""Main function""" |
|
import argparse |
|
|
|
parser = argparse.ArgumentParser(description="RomM Scan Monitor") |
|
parser.add_argument( |
|
"--config", |
|
default="romm_monitor_config.yml", |
|
help="Path to configuration file (default: romm_monitor_config.yml)" |
|
) |
|
parser.add_argument( |
|
"--help-config", |
|
action="store_true", |
|
help="Show configuration file format and exit" |
|
) |
|
|
|
args = parser.parse_args() |
|
|
|
if args.help_config: |
|
print("Configuration file format (YAML):") |
|
print(""" |
|
romm: |
|
host: 127.0.0.1 |
|
port: 8090 |
|
username: your_username_here |
|
password: your_password_here |
|
|
|
monitoring: |
|
check_interval: 30 # seconds between connectivity checks |
|
max_retries: 0 # immediate action on first error |
|
ping_hosts: # hosts to ping for connectivity verification |
|
- google.com |
|
- 8.8.8.8 |
|
- 1.1.1.1 |
|
|
|
log_level: INFO |
|
""") |
|
return |
|
|
|
try: |
|
monitor = RomMMonitor(args.config) |
|
await monitor.start() |
|
except KeyboardInterrupt: |
|
print("\nMonitor stopped by user") |
|
except Exception as e: |
|
print(f"Error: {e}") |
|
sys.exit(1) |
|
|
|
if __name__ == "__main__": |
|
asyncio.run(main()) |