Skip to content

Instantly share code, notes, and snippets.

@vredesbyyrd
Created August 20, 2025 22:04
Show Gist options
  • Select an option

  • Save vredesbyyrd/2ae9f748429c30c959995a7788def0fb to your computer and use it in GitHub Desktop.

Select an option

Save vredesbyyrd/2ae9f748429c30c959995a7788def0fb to your computer and use it in GitHub Desktop.
my jellyfin_daemon.py
#!/usr/bin/env python3
import json
import os
import sys
import time
import signal
import asyncio
import logging
import threading
import hashlib
import requests
from pathlib import Path
from datetime import datetime, timezone
from typing import Dict, Any, Optional, List, Set
import argparse
# Image processing imports
try:
from PIL import Image, ImageOps
PIL_AVAILABLE = True
except ImportError:
PIL_AVAILABLE = False
print("Warning: PIL (Pillow) not available. Image processing will be disabled.")
print("Install with: pip install Pillow")
# Import Jellyfin API client
try:
from jellyfin_apiclient_python import JellyfinClient
from jellyfin_apiclient_python.ws_client import WSClient
except ImportError:
print("Error: jellyfin-apiclient-python is not installed")
print("Please install it with: pip install jellyfin-apiclient-python")
sys.exit(1)
# Configuration
JELLYFIN_API_KEY = "07a9962817324bf5872e97ae28141fa3"
JELLYFIN_USERNAME = "clu"
JELLYFIN_PASSWORD = "rinzler" # Leave empty if using API key authentication
JELLYFIN_SERVER = "http://192.168.1.175:8096"
# Cache configuration
CACHE_DIR = Path("/home/clu/scripts/rofi_jellyfin/cache")
CACHE_FILE = CACHE_DIR / "jellyfin_data.json"
IMAGE_CACHE_DIR = Path("/home/clu/scripts/rofi_jellyfin/image_cache")
DAEMON_PIDFILE = CACHE_DIR / "jellyfin_daemon.pid"
LOG_FILE = CACHE_DIR.parent / "jellyfin_daemon.log"
# Image processing configuration
PRIMARY_IMAGE_WIDTH = 170
PRIMARY_IMAGE_HEIGHT = 240
SCREENSHOT_IMAGE_WIDTH = 170
SCREENSHOT_IMAGE_HEIGHT = 240
# Library filtering configuration - user configurable
# Set to None to include all libraries, or specify a list of library names to include
# Example: ["Movies", "TV Shows"] or ["Movies", "Series", "Documentaries"]
INCLUDED_LIBRARIES = ["Movies", "Shows"] # Only include Movies and Series libraries
# Alternative: Set to None to include all libraries
# INCLUDED_LIBRARIES = None
# Cache for library information
LIBRARY_CACHE = {}
# Global state
class DaemonState:
def __init__(self):
self.client = None
self.ws_client = None
self.running = False
self.reconnect_attempts = 0
self.max_reconnect_attempts = 10
self.reconnect_delay = 5
self.user_id = None
self.access_token = None
self.device_id = "rofi-jellyfin-daemon"
self.client_name = "Rofi Jellyfin Daemon"
self.version = "1.0.0"
self._callback_ref = None
# Cache state for selective updates
self.cache_data = None
self.cache_lock = threading.Lock()
# Image session for downloads
self.image_session = None
# Delta update tracking
self.failed_updates = 0
self.total_updates = 0
self.last_playback_update = {} # Track recent playback updates
daemon_state = DaemonState()
class CustomWSClient(WSClient):
def __init__(self, client):
super().__init__(client)
self.logger = logging.getLogger()
def on_message(self, message_type, data):
"""Handle incoming WebSocket messages"""
try:
self.logger.debug(f"WebSocket message received: {message_type}")
handle_websocket_message(message_type, data)
except Exception as e:
self.logger.error(f"Error processing WebSocket message: {e}")
# Setup logging
def setup_logging():
"""Configure logging to file and console"""
CACHE_DIR.mkdir(parents=True, exist_ok=True)
# Create formatter
formatter = logging.Formatter(
'[%(asctime)s] [RT-DAEMON-%(levelname)s] %(message)s',
datefmt='%a %b %d %H:%M:%S %Y'
)
# Setup file handler
file_handler = logging.FileHandler(LOG_FILE)
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.DEBUG)
# Setup console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
console_handler.setLevel(logging.INFO if not os.getenv("DEBUG") else logging.DEBUG)
# Configure root logger
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
logger = setup_logging()
def iso_to_unix_timestamp(iso_str: Optional[str]) -> Optional[int]:
"""Convert ISO 8601 timestamp to Unix timestamp with proper timezone handling"""
if not iso_str:
return None
try:
# Handle different ISO 8601 formats
if iso_str.endswith('.0000000Z'):
iso_str = iso_str[:-10] + 'Z'
elif iso_str.endswith('Z'):
pass
elif '+' in iso_str[-6:] or iso_str[-6:-3] == '-' and iso_str[-3:].isdigit():
pass
else:
iso_str += 'Z'
try:
if iso_str.endswith('Z'):
iso_str_fixed = iso_str[:-1] + '+00:00'
else:
iso_str_fixed = iso_str
dt = datetime.fromisoformat(iso_str_fixed)
except (ValueError, AttributeError):
if iso_str.endswith('Z'):
dt = datetime.strptime(iso_str[:-1], "%Y-%m-%dT%H:%M:%S")
dt = dt.replace(tzinfo=timezone.utc)
else:
dt = datetime.strptime(iso_str.split('.')[0], "%Y-%m-%dT%H:%M:%S")
dt = dt.replace(tzinfo=timezone.utc)
timestamp = int(dt.timestamp())
logger.debug(f"Converted timestamp: {iso_str} -> {timestamp}")
return timestamp
except Exception as e:
logger.warning(f"Failed to parse timestamp '{iso_str}': {e}")
return None
def show_library_filtering_status():
"""Show library filtering status in daemon status output"""
print(f"\nπŸ“š Library Filtering:")
if INCLUDED_LIBRARIES is None:
print(" Status: Disabled (all libraries included)")
else:
print(f" Status: Enabled")
print(f" Configured libraries: {INCLUDED_LIBRARIES}")
if LIBRARY_CACHE:
available_libs = list(LIBRARY_CACHE.keys())
print(f" Available libraries: {available_libs}")
valid_libs = [lib for lib in INCLUDED_LIBRARIES if lib in LIBRARY_CACHE]
invalid_libs = [lib for lib in INCLUDED_LIBRARIES if lib not in LIBRARY_CACHE]
if valid_libs:
print(f" βœ… Valid libraries: {valid_libs}")
if invalid_libs:
print(f" ❌ Invalid libraries: {invalid_libs}")
else:
print(" Library information not available")
def get_available_libraries() -> Dict[str, str]:
"""Get available libraries/views from Jellyfin server with name -> ID mapping"""
global LIBRARY_CACHE
if not daemon_state.client:
logger.error("Cannot get libraries: client not authenticated")
return {}
try:
logger.debug("Fetching available libraries from server...")
# Use the get_views() method from jellyfin-apiclient-python
views_response = daemon_state.client.jellyfin.get_views()
if not views_response or 'Items' not in views_response:
logger.warning("No library views returned from server")
return {}
libraries = {}
for view in views_response['Items']:
library_id = view.get('Id')
library_name = view.get('Name')
collection_type = view.get('CollectionType', 'mixed')
if library_id and library_name:
libraries[library_name] = {
'id': library_id,
'collection_type': collection_type
}
logger.debug(f"Found library: '{library_name}' (ID: {library_id}, Type: {collection_type})")
LIBRARY_CACHE = libraries
logger.info(f"Found {len(libraries)} available libraries: {list(libraries.keys())}")
return libraries
except Exception as e:
logger.error(f"Failed to get available libraries: {e}")
return {}
def get_filtered_library_ids() -> List[str]:
"""Get list of library IDs to include based on INCLUDED_LIBRARIES configuration"""
if INCLUDED_LIBRARIES is None:
logger.info("Including all libraries (no filter configured)")
return [] # Empty list means no filtering
if not LIBRARY_CACHE:
get_available_libraries()
if not LIBRARY_CACHE:
logger.error("No libraries available - cannot apply filtering")
return []
filtered_ids = []
not_found = []
for library_name in INCLUDED_LIBRARIES:
if library_name in LIBRARY_CACHE:
library_info = LIBRARY_CACHE[library_name]
filtered_ids.append(library_info['id'])
logger.debug(f"Including library: '{library_name}' (ID: {library_info['id']})")
else:
not_found.append(library_name)
if not_found:
available_names = list(LIBRARY_CACHE.keys())
logger.warning(f"Configured libraries not found: {not_found}")
logger.warning(f"Available libraries: {available_names}")
if filtered_ids:
included_names = [name for name in INCLUDED_LIBRARIES if name in LIBRARY_CACHE]
logger.info(f"Library filtering enabled - including {len(filtered_ids)} libraries: {included_names}")
else:
logger.warning("No valid libraries found in configuration - will include all libraries")
return filtered_ids
def should_include_library_item(item: Dict) -> bool:
"""Check if an item should be included based on library filtering"""
if INCLUDED_LIBRARIES is None:
return True # No filtering configured
# Get the parent library ID for this item
parent_id = item.get('ParentId')
# If no parent ID, we can't determine the library, so include it
if not parent_id:
return True
# Get filtered library IDs
filtered_ids = get_filtered_library_ids()
# If no filtering (empty list), include everything
if not filtered_ids:
return True
# Check if item's parent library is in our filtered list
# Note: We need to trace up the parent chain to find the root library
return is_item_in_filtered_libraries(item, filtered_ids)
def is_item_in_filtered_libraries(item: Dict, filtered_library_ids: List[str]) -> bool:
"""Check if an item belongs to one of the filtered libraries by tracing parent chain"""
if not filtered_library_ids:
return True
# Check direct parent first
parent_id = item.get('ParentId')
if parent_id in filtered_library_ids:
return True
# For more complex hierarchies, we might need to fetch parent info
# But for most cases, checking the item's path or using the library root should work
# Alternative approach: Check if any ancestor is in filtered libraries
# This is more robust but requires additional API calls
try:
# Use the path to determine library membership
path = item.get('Path', '')
if path:
# Extract library identifier from path if possible
# This is a heuristic approach since Jellyfin paths vary
pass
except:
pass
# Fallback: if we can't determine library membership, include the item
# This ensures we don't accidentally exclude content
return True
# Add this new function for filtered recently added items
def fetch_recently_added_ids_filtered(limit=25, library_ids=None) -> List[str]:
"""Fetch recently added item IDs with library filtering"""
try:
logger.info("Fetching recently added items with library filtering...")
all_recently_added_ids = []
if library_ids:
# Fetch from each library separately
for library_id in library_ids:
try:
library_name = next(
(name for name, info in LIBRARY_CACHE.items() if info['id'] == library_id),
library_id
)
logger.debug(f"Fetching recently added from library: {library_name}")
recently_added_data = daemon_state.client.jellyfin.get_recently_added(
media=None,
parent_id=library_id,
limit=limit
)
if recently_added_data and isinstance(recently_added_data, list):
library_ids_added = [item.get("Id") for item in recently_added_data if item.get("Id")]
all_recently_added_ids.extend(library_ids_added)
logger.debug(f"Found {len(library_ids_added)} recently added in {library_name}")
except Exception as e:
logger.warning(f"Failed to fetch recently added from library {library_id}: {e}")
else:
# Fallback to standard method
return fetch_recently_added_ids(limit)
# Remove duplicates and limit results
unique_ids = list(dict.fromkeys(all_recently_added_ids))[:limit]
logger.info(f"βœ… Fetched {len(unique_ids)} recently added item IDs from filtered libraries")
return unique_ids
except Exception as e:
logger.error(f"Error fetching filtered recently added items: {e}")
return []
def get_image_cache_path(item_id: str, image_type: str = 'primary', size: str = 'small') -> Path:
"""Get local image cache path for an item with size variant support"""
hash_name = hashlib.md5(item_id.encode()).hexdigest()
if size == 'large':
filename = f"{hash_name}_{image_type}_large.jpg"
else:
filename = f"{hash_name}_{image_type}.jpg"
return IMAGE_CACHE_DIR / filename
def resize_and_pad_image(image: Image.Image, target_width: int, target_height: int,
background_color: tuple = (0, 0, 0), preserve_original_size: bool = False) -> Image.Image:
"""Resize image to fit within target dimensions while maintaining aspect ratio
Args:
image: PIL Image object
target_width: Target width in pixels
target_height: Target height in pixels
background_color: RGB tuple for padding color
preserve_original_size: If True, don't resize but pad to maintain ratio
"""
if preserve_original_size:
# For large version: don't resize, just pad to maintain the same aspect ratio
target_ratio = target_width / target_height
image_ratio = image.width / image.height
if image_ratio > target_ratio:
# Image is wider than target ratio - pad top/bottom
new_height = int(image.width / target_ratio)
padded_image = Image.new('RGB', (image.width, new_height), background_color)
y_offset = (new_height - image.height) // 2
padded_image.paste(image, (0, y_offset))
else:
# Image is taller than target ratio - pad left/right
new_width = int(image.height * target_ratio)
padded_image = Image.new('RGB', (new_width, image.height), background_color)
x_offset = (new_width - image.width) // 2
padded_image.paste(image, (x_offset, 0))
if image.mode != 'RGB':
padded_image = padded_image.convert('RGB')
logger.debug(f"Large image padded from {image.width}x{image.height} to {padded_image.width}x{padded_image.height}")
return padded_image
else:
# Original resizing logic for small version
scale_factor = min(target_width / image.width, target_height / image.height)
new_width = int(image.width * scale_factor)
new_height = int(image.height * scale_factor)
resized_image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)
if resized_image.mode != 'RGB':
resized_image = resized_image.convert('RGB')
padded_image = Image.new('RGB', (target_width, target_height), background_color)
x_offset = (target_width - new_width) // 2
y_offset = (target_height - new_height) // 2
padded_image.paste(resized_image, (x_offset, y_offset))
logger.debug(f"Small image resized from {image.width}x{image.height} to {new_width}x{new_height}, padded to {target_width}x{target_height}")
return padded_image
def download_and_process_image_from_url(image_url: str, cache_path_small: Path, cache_path_large: Path,
image_type: str = 'primary') -> tuple[bool, bool]:
"""Download image from URL and process it into both small and large versions
Returns:
tuple: (small_success, large_success)
"""
if not PIL_AVAILABLE:
logger.warning("PIL not available, skipping image processing")
return False, False
try:
if not daemon_state.image_session:
daemon_state.image_session = requests.Session()
daemon_state.image_session.headers.update({
'Authorization': f'MediaBrowser Token="{daemon_state.access_token}"'
})
response = daemon_state.image_session.get(image_url, stream=True, timeout=10)
response.raise_for_status()
cache_path_small.parent.mkdir(parents=True, exist_ok=True)
cache_path_large.parent.mkdir(parents=True, exist_ok=True)
image = Image.open(response.raw)
small_success = False
large_success = False
# Process small version (existing logic)
try:
if image_type == 'primary':
processed_image_small = resize_and_pad_image(
image, PRIMARY_IMAGE_WIDTH, PRIMARY_IMAGE_HEIGHT, (0, 0, 0), preserve_original_size=False
)
elif image_type in ['screenshot', 'thumb']:
processed_image_small = resize_and_pad_image(
image, SCREENSHOT_IMAGE_WIDTH, SCREENSHOT_IMAGE_HEIGHT, (0, 0, 0), preserve_original_size=False
)
else:
processed_image_small = image.convert('RGB')
processed_image_small.save(cache_path_small, 'JPEG', quality=90, optimize=True)
small_success = True
logger.debug(f"Successfully processed and cached small {image_type} image")
except Exception as e:
logger.error(f"Failed to process small {image_type} image: {e}")
# Process large version (new logic)
try:
if image_type == 'primary':
processed_image_large = resize_and_pad_image(
image, PRIMARY_IMAGE_WIDTH, PRIMARY_IMAGE_HEIGHT, (0, 0, 0), preserve_original_size=True
)
elif image_type in ['screenshot', 'thumb']:
processed_image_large = resize_and_pad_image(
image, SCREENSHOT_IMAGE_WIDTH, SCREENSHOT_IMAGE_HEIGHT, (0, 0, 0), preserve_original_size=True
)
else:
processed_image_large = image.convert('RGB')
processed_image_large.save(cache_path_large, 'JPEG', quality=95, optimize=True)
large_success = True
logger.debug(f"Successfully processed and cached large {image_type} image")
except Exception as e:
logger.error(f"Failed to process large {image_type} image: {e}")
logger.debug(f"Image processing completed for {image_url}: small={small_success}, large={large_success}")
return small_success, large_success
except Exception as e:
logger.error(f"Failed to download and process image {image_url}: {e}")
return False, False
def cache_item_images(item_data: Dict) -> Dict[str, str]:
"""Cache images for an item based on its type using jellyfin client (both small and large versions)"""
if not PIL_AVAILABLE:
return {}
item_id = item_data.get('id')
item_type = item_data.get('type')
cached_images = {}
if not item_id or not item_type:
return cached_images
try:
# Handle playlists - they can have Primary images
if item_type == "Playlist":
try:
primary_image_url = daemon_state.client.jellyfin.artwork(item_id, 'Primary', 765)
if primary_image_url:
cache_path_small = get_image_cache_path(item_id, 'primary', 'small')
cache_path_large = get_image_cache_path(item_id, 'primary', 'large')
# Check if both versions exist
if not cache_path_small.exists() or not cache_path_large.exists():
small_success, large_success = download_and_process_image_from_url(
primary_image_url, cache_path_small, cache_path_large, 'primary'
)
if small_success:
cached_images['primary_image_path'] = str(cache_path_small)
cached_images['has_primary_image'] = True
if large_success:
cached_images['primary_image_path_large'] = str(cache_path_large)
cached_images['has_primary_image_large'] = True
else:
# Both versions already exist
cached_images['primary_image_path'] = str(cache_path_small)
cached_images['primary_image_path_large'] = str(cache_path_large)
cached_images['has_primary_image'] = True
cached_images['has_primary_image_large'] = True
except Exception as e:
logger.debug(f"No primary image available for playlist {item_id}: {e}")
# Handle movies and series - existing logic
elif item_type in ['Movie', 'Series']:
try:
primary_image_url = daemon_state.client.jellyfin.artwork(item_id, 'Primary', 765)
if primary_image_url:
cache_path_small = get_image_cache_path(item_id, 'primary', 'small')
cache_path_large = get_image_cache_path(item_id, 'primary', 'large')
# Check if both versions exist
if not cache_path_small.exists() or not cache_path_large.exists():
small_success, large_success = download_and_process_image_from_url(
primary_image_url, cache_path_small, cache_path_large, 'primary'
)
if small_success:
cached_images['primary_image_path'] = str(cache_path_small)
cached_images['has_primary_image'] = True
if large_success:
cached_images['primary_image_path_large'] = str(cache_path_large)
cached_images['has_primary_image_large'] = True
else:
# Both versions already exist
cached_images['primary_image_path'] = str(cache_path_small)
cached_images['primary_image_path_large'] = str(cache_path_large)
cached_images['has_primary_image'] = True
cached_images['has_primary_image_large'] = True
except Exception as e:
logger.debug(f"No primary image available for {item_type} {item_id}: {e}")
elif item_type == 'Episode':
# Try primary image first (screenshot)
try:
primary_image_url = daemon_state.client.jellyfin.artwork(item_id, 'Primary', 765)
if primary_image_url:
cache_path_small = get_image_cache_path(item_id, 'screenshot', 'small')
cache_path_large = get_image_cache_path(item_id, 'screenshot', 'large')
if not cache_path_small.exists() or not cache_path_large.exists():
small_success, large_success = download_and_process_image_from_url(
primary_image_url, cache_path_small, cache_path_large, 'screenshot'
)
if small_success:
cached_images['screenshot_image_path'] = str(cache_path_small)
cached_images['has_screenshot_image'] = True
if large_success:
cached_images['screenshot_image_path_large'] = str(cache_path_large)
cached_images['has_screenshot_image_large'] = True
else:
cached_images['screenshot_image_path'] = str(cache_path_small)
cached_images['screenshot_image_path_large'] = str(cache_path_large)
cached_images['has_screenshot_image'] = True
cached_images['has_screenshot_image_large'] = True
except Exception:
# Fall back to thumb image
try:
thumb_image_url = daemon_state.client.jellyfin.artwork(item_id, 'Thumb', 765)
if thumb_image_url:
cache_path_small = get_image_cache_path(item_id, 'thumb', 'small')
cache_path_large = get_image_cache_path(item_id, 'thumb', 'large')
if not cache_path_small.exists() or not cache_path_large.exists():
small_success, large_success = download_and_process_image_from_url(
thumb_image_url, cache_path_small, cache_path_large, 'screenshot'
)
if small_success:
cached_images['thumb_image_path'] = str(cache_path_small)
cached_images['has_thumb_image'] = True
if large_success:
cached_images['thumb_image_path_large'] = str(cache_path_large)
cached_images['has_thumb_image_large'] = True
else:
cached_images['thumb_image_path'] = str(cache_path_small)
cached_images['thumb_image_path_large'] = str(cache_path_large)
cached_images['has_thumb_image'] = True
cached_images['has_thumb_image_large'] = True
except Exception as e:
logger.debug(f"No thumbnail images available for episode {item_id}: {e}")
except Exception as e:
logger.error(f"Error caching images for item {item_id}: {e}")
return cached_images
# Authentication functions
def authenticate_with_jellyfin():
"""Authenticate with Jellyfin server using credentials"""
try:
# Create client
client = JellyfinClient()
# Set client info
client.config.app(daemon_state.client_name, daemon_state.version, daemon_state.device_id, daemon_state.device_id)
# Connect to server
client.config.data['auth.ssl'] = JELLYFIN_SERVER.startswith('https')
# Try username/password authentication first (more reliable)
if JELLYFIN_PASSWORD:
logger.info("Attempting username/password authentication...")
try:
client.auth.connect_to_address(JELLYFIN_SERVER)
result = client.auth.login(JELLYFIN_SERVER, JELLYFIN_USERNAME, JELLYFIN_PASSWORD)
logger.info("βœ… Username/password authentication successful")
daemon_state.client = client
# Get access token and user ID from the client's auth state
credentials = client.auth.credentials.get_credentials()
if credentials and credentials.get('Servers'):
server_info = credentials['Servers'][0]
daemon_state.access_token = server_info.get('AccessToken')
daemon_state.user_id = server_info.get('UserId')
logger.info(f"User ID: {daemon_state.user_id}")
logger.info(f"Access Token obtained: {'βœ…' if daemon_state.access_token else '❌'}")
return True
else:
logger.error("No credentials found after successful login")
return False
except Exception as e:
logger.error(f"Username/password authentication failed: {e}")
# Try API key authentication as fallback
if JELLYFIN_API_KEY:
logger.info("Attempting API key authentication...")
try:
# Use the API key as an access token
auth_data = {
"Servers": [{
"AccessToken": JELLYFIN_API_KEY,
"address": JELLYFIN_SERVER,
"UserId": None # Will be populated after connection
}]
}
client.authenticate(auth_data, discover=False)
# Test the connection
system_info = client.jellyfin.get_system_info()
if system_info:
logger.info("βœ… API key authentication successful")
daemon_state.client = client
daemon_state.access_token = JELLYFIN_API_KEY
# Try to get user info using direct API call
try:
users = client.jellyfin.get_users()
for user in users:
if user['Name'] == JELLYFIN_USERNAME:
daemon_state.user_id = user['Id']
logger.info(f"User ID: {daemon_state.user_id}")
break
except Exception as e:
logger.warning(f"Could not get user ID: {e}")
return True
except Exception as e:
logger.warning(f"API key authentication failed: {e}")
logger.error("All authentication methods failed")
return False
except Exception as e:
logger.error(f"Authentication error: {e}")
return False
# Enhanced data processing functions
def extract_media_metadata(item: Dict) -> Dict:
"""Extract enhanced media metadata including video codec, size, subtitles, etc."""
metadata = {
"video_codec": None,
"video_width": None,
"file_size": None,
"has_subtitles": False,
"audio_language": None,
"trailer_url": None,
"imdb_url": None
}
media_sources = item.get('MediaSources', [])
if media_sources:
source = media_sources[0]
if source.get('Size'):
metadata["file_size"] = source.get('Size')
if source.get('Width'):
metadata["video_width"] = source.get('Width')
for stream in source.get('MediaStreams', []):
stream_type = stream.get('Type', '')
if stream_type == 'Video' and not metadata["video_codec"]:
metadata["video_codec"] = stream.get('Codec')
if not metadata["video_width"] and stream.get('Width'):
metadata["video_width"] = stream.get('Width')
elif stream_type == 'Audio' and not metadata["audio_language"]:
language = stream.get('Language')
if language:
metadata["audio_language"] = language
elif stream_type == 'Subtitle':
metadata["has_subtitles"] = True
provider_ids = item.get('ProviderIds', {})
imdb_id = provider_ids.get('Imdb')
if imdb_id:
metadata["imdb_url"] = f"https://www.imdb.com/title/{imdb_id}/"
trailers = item.get('RemoteTrailers', [])
if trailers:
trailer = trailers[0]
metadata["trailer_url"] = trailer.get('Url')
return metadata
def process_jellyfin_item(item: Dict) -> Dict:
"""Process a Jellyfin item into simplified format with enhanced metadata and image caching"""
processed = {
"id": item.get("Id"),
"name": item.get("Name"),
"type": item.get("Type"),
"year": item.get("ProductionYear"),
"community_rating": item.get("CommunityRating"),
"critic_rating": item.get("CriticRating"),
"date_added": item.get("DateCreated"),
"overview": item.get("Overview"),
"premiere_date": item.get("PremiereDate"),
"runtime_ticks": item.get("RunTimeTicks"),
"runtime_minutes": item.get("RunTimeTicks", 0) // 600000000 if item.get("RunTimeTicks") else None,
"genres": ", ".join(item.get("Genres", [])) if item.get("Genres") else None,
}
if item.get("Type") == "Episode":
processed.update({
"series_name": item.get("SeriesName"),
"series_id": item.get("SeriesId"),
"season_number": item.get("ParentIndexNumber"),
"episode_number": item.get("IndexNumber"),
"season_id": item.get("SeasonId"),
"parent_id": item.get("ParentId"),
"container": item.get("Container"),
})
media_metadata = extract_media_metadata(item)
processed.update(media_metadata)
elif item.get("Type") == "Movie":
processed["container"] = item.get("Container")
media_metadata = extract_media_metadata(item)
processed.update(media_metadata)
elif item.get("Type") == "Playlist":
# OPTIMIZED: Only store playlist-specific metadata, no redundant fields
processed.update({
"playlist_type": item.get("MediaType"), # Audio, Video, etc.
"child_count": item.get("ChildCount", 0),
"cumulative_runtime_ticks": item.get("CumulativeRunTimeTicks"),
"cumulative_runtime_minutes": (item.get("CumulativeRunTimeTicks", 0) // 600000000
if item.get("CumulativeRunTimeTicks") else None),
"is_folder": item.get("IsFolder", True),
"media_type": item.get("MediaType"),
})
# Remove fields that don't apply to playlists
for field in ["year", "critic_rating", "overview", "premiere_date", "runtime_ticks", "runtime_minutes"]:
processed.pop(field, None)
provider_ids = item.get("ProviderIds", {})
processed["imdb_id"] = provider_ids.get("Imdb")
processed["tmdb_id"] = provider_ids.get("Tmdb")
user_data = item.get("UserData", {})
processed.update({
"played": user_data.get("Played", False),
"is_favorite": user_data.get("IsFavorite", False),
})
if item.get("Type") in ["Movie", "Episode"]:
processed.update({
"playback_position_ticks": user_data.get("PlaybackPositionTicks", 0),
"played_percentage": user_data.get("PlayedPercentage", 0),
"last_played_date": iso_to_unix_timestamp(user_data.get("LastPlayedDate")),
})
people = item.get("People", [])
if people:
directors = [p["Name"] for p in people if p.get("Type") == "Director"]
writers = [p["Name"] for p in people if p.get("Type") == "Writer"]
cast = [p["Name"] for p in people if p.get("Type") == "Actor"]
if directors:
processed["directors"] = ", ".join(directors)
if writers:
processed["writers"] = ", ".join(writers)
if cast:
processed["cast"] = ", ".join(cast)
# Cache images (including Primary images for playlists)
image_info = cache_item_images(processed)
processed.update(image_info)
return processed
def fetch_episodes_for_series(series_id: str) -> List[Dict]:
"""Fetch all episodes for a specific series with complete metadata"""
try:
logger.debug(f"Fetching episodes for series {series_id}")
from jellyfin_apiclient_python.api import info
episode_params = {
'Recursive': True,
'Fields': info(),
'ImageTypeLimit': 1,
'EnableImageTypes': 'Primary,Backdrop,Screenshot,Thumb',
'IncludeItemTypes': 'Episode',
'ParentId': series_id,
'SortBy': 'ParentIndexNumber,IndexNumber',
'SortOrder': 'Ascending'
}
episodes_data = daemon_state.client.jellyfin.user_items(params=episode_params)
episodes = []
if episodes_data and 'Items' in episodes_data:
for episode in episodes_data['Items']:
processed_episode = process_jellyfin_item(episode)
episodes.append(processed_episode)
logger.info(f"Fetched {len(episodes_data['Items'])} episodes for series {series_id}")
logger.info(f"Total episodes fetched for series {series_id}: {len(episodes)}")
return episodes
except Exception as e:
logger.error(f"Error fetching episodes for series {series_id}: {e}")
return []
def fetch_recently_added_ids(limit=25) -> List[str]:
"""Fetch recently added item IDs using the correct Jellyfin API endpoint"""
try:
logger.info("Fetching recently added items...")
# Use the get_recently_added method from jellyfin-apiclient-python
# This corresponds to /Users/{UserId}/Items/Latest endpoint
recently_added_data = daemon_state.client.jellyfin.get_recently_added(
media=None, # Include all media types (movies, series, episodes)
parent_id=None, # From all libraries
limit=limit
)
if recently_added_data and isinstance(recently_added_data, list):
recently_added_ids = [item.get("Id") for item in recently_added_data if item.get("Id")]
logger.info(f"βœ… Fetched {len(recently_added_ids)} recently added item IDs")
return recently_added_ids
else:
logger.warning("No recently added data returned from API")
return []
except Exception as e:
logger.error(f"Error fetching recently added items: {e}")
return []
def fetch_next_up_ids(limit=25) -> List[str]:
"""Fetch next up episode IDs using the Jellyfin API"""
try:
logger.info("Fetching next up episodes...")
# Use the get_next method from jellyfin-apiclient-python
# This corresponds to /Shows/NextUp endpoint
next_up_data = daemon_state.client.jellyfin.get_next(
index=None, # Start from beginning
limit=limit
)
if next_up_data and 'Items' in next_up_data and isinstance(next_up_data['Items'], list):
next_up_ids = [item.get("Id") for item in next_up_data['Items'] if item.get("Id")]
logger.info(f"βœ… Fetched {len(next_up_ids)} next up episode IDs")
return next_up_ids
else:
logger.warning("No next up data returned from API")
return []
except Exception as e:
logger.error(f"Error fetching next up episodes: {e}")
return []
def fetch_all_jellyfin_data() -> Dict:
"""Fetch all Jellyfin data using optimized structure with ID-based filtered views and library filtering"""
logger.info("Fetching all Jellyfin data with optimized structure and library filtering...")
if not daemon_state.client or not daemon_state.user_id:
raise Exception("Client not authenticated")
# Get available libraries and apply filtering
available_libraries = get_available_libraries()
filtered_library_ids = get_filtered_library_ids()
if INCLUDED_LIBRARIES is not None:
if not filtered_library_ids:
logger.warning("No valid libraries found for filtering - falling back to all libraries")
else:
included_names = [name for name in INCLUDED_LIBRARIES if name in LIBRARY_CACHE]
logger.info(f"Using library filtering: {included_names}")
all_data = {
"timestamp": int(time.time()),
"total_items": 0,
"library_filter": {
"enabled": INCLUDED_LIBRARIES is not None,
"included_libraries": INCLUDED_LIBRARIES or [],
"available_libraries": list(available_libraries.keys()),
"filtered_library_ids": filtered_library_ids
},
"all_items": {
"movies": [],
"series": [],
"episodes": {},
"playlists": [],
"playlist_items": {},
"favorites_ids": [],
"continue_watching_ids": [],
"recently_added_ids": [],
"next_up_ids": []
}
}
try:
from jellyfin_apiclient_python.api import info
base_params = {
'Recursive': True,
'Fields': info(),
'ImageTypeLimit': 1,
'EnableImageTypes': 'Primary,Backdrop,Screenshot,Thumb'
}
# Add library filtering to params if configured
if filtered_library_ids:
# Use ParentId to filter by specific libraries
logger.debug(f"Applying library filter with IDs: {filtered_library_ids}")
# Fetch movies, series, and playlists with library filtering
main_categories = [
("movies", {"IncludeItemTypes": "Movie", **base_params}),
("series", {"IncludeItemTypes": "Series", **base_params}),
("playlists", {"IncludeItemTypes": "Playlist", **base_params})
]
for category, params in main_categories:
try:
logger.info(f"Fetching {category}...")
if filtered_library_ids and category != "playlists":
# For movies and series, apply library filtering
# We'll fetch from each library separately to ensure proper filtering
category_items = []
for library_id in filtered_library_ids:
library_name = next(
(name for name, info in LIBRARY_CACHE.items() if info['id'] == library_id),
library_id
)
logger.debug(f"Fetching {category} from library: {library_name}")
library_params = params.copy()
library_params['ParentId'] = library_id
try:
data = daemon_state.client.jellyfin.user_items(params=library_params)
library_items = data.get("Items", [])
category_items.extend(library_items)
logger.debug(f"Fetched {len(library_items)} {category} from {library_name}")
except Exception as e:
logger.warning(f"Failed to fetch {category} from library {library_name}: {e}")
logger.info(f"Total {category} from filtered libraries: {len(category_items)}")
else:
# For playlists or when no filtering, fetch normally
data = daemon_state.client.jellyfin.user_items(params=params)
category_items = data.get("Items", [])
for item in category_items:
processed_item = process_jellyfin_item(item)
all_data["all_items"][category].append(processed_item)
logger.info(f"βœ… Loaded {len(category_items)} {category}")
except Exception as e:
logger.error(f"Failed to fetch {category}: {e}")
# Fetch episodes for all series (episodes inherit library filtering from their series)
logger.info("Fetching episodes for all series...")
episode_count = 0
for series in all_data["all_items"]["series"]:
series_id = series.get("id")
if series_id:
episodes = fetch_episodes_for_series(series_id)
if episodes:
all_data["all_items"]["episodes"][series_id] = episodes
episode_count += len(episodes)
logger.info(f"βœ… Loaded {episode_count} episodes across {len(all_data['all_items']['episodes'])} series")
# Fetch playlist items for all playlists (minimal data)
logger.info("Fetching items for all playlists (optimized)...")
playlist_items_count = 0
for playlist in all_data["all_items"]["playlists"]:
playlist_id = playlist.get("id")
if playlist_id:
playlist_items = fetch_playlist_items(playlist_id)
if playlist_items:
all_data["all_items"]["playlist_items"][playlist_id] = playlist_items
playlist_items_count += len(playlist_items)
logger.info(f"βœ… Loaded {playlist_items_count} items across {len(all_data['all_items']['playlists'])} playlists")
# Collect IDs for filtered views efficiently
all_items_for_filtering = []
all_items_for_filtering.extend(all_data["all_items"]["movies"])
all_items_for_filtering.extend(all_data["all_items"]["series"])
# Add all episodes
for episodes_list in all_data["all_items"]["episodes"].values():
all_items_for_filtering.extend(episodes_list)
# Extract favorites IDs
favorites_ids = [item["id"] for item in all_items_for_filtering if item.get("is_favorite")]
all_data["all_items"]["favorites_ids"] = favorites_ids
# Extract continue watching IDs (items with progress > 5% and < 95%)
continue_watching_ids = [
item["id"] for item in all_items_for_filtering
if (item.get("playback_position_ticks", 0) > 0 and
not item.get("played", False) and
item.get("played_percentage", 0) > 5 and
item.get("played_percentage", 0) < 95)
]
all_data["all_items"]["continue_watching_ids"] = continue_watching_ids
# Fetch recently added and next up with library filtering
if filtered_library_ids:
# For library filtering, we need to fetch recently added from specific libraries
recently_added_ids = fetch_recently_added_ids_filtered(50, filtered_library_ids)
else:
recently_added_ids = fetch_recently_added_ids(50)
all_data["all_items"]["recently_added_ids"] = recently_added_ids
# Next up episodes (filtered by series that are in filtered libraries)
next_up_ids = fetch_next_up_ids(50)
if filtered_library_ids:
# Filter next up episodes to only include those from series in filtered libraries
series_ids_in_filtered_libs = {series["id"] for series in all_data["all_items"]["series"]}
filtered_next_up_ids = []
# We'd need to check each next up episode to see if its series is in filtered libraries
# For now, we'll use the next_up_ids as-is since the series filtering should handle this
filtered_next_up_ids = next_up_ids
all_data["all_items"]["next_up_ids"] = filtered_next_up_ids
else:
all_data["all_items"]["next_up_ids"] = next_up_ids
# Calculate total items
all_data["total_items"] = (
len(all_data["all_items"]["movies"]) +
len(all_data["all_items"]["series"]) +
len(all_data["all_items"]["playlists"]) +
episode_count
)
# Log filtering results
if INCLUDED_LIBRARIES is not None:
included_names = [name for name in INCLUDED_LIBRARIES if name in LIBRARY_CACHE]
logger.info(f"πŸ“š Library filtering applied - included: {included_names}")
logger.info(f"βœ… Optimized structure created:")
logger.info(f" Total items: {all_data['total_items']}")
logger.info(f" Movies: {len(all_data['all_items']['movies'])}")
logger.info(f" Series: {len(all_data['all_items']['series'])}")
logger.info(f" Playlists: {len(all_data['all_items']['playlists'])}")
logger.info(f" Episodes: {episode_count}")
logger.info(f" Favorites: {len(favorites_ids)} items")
logger.info(f" Continue watching: {len(continue_watching_ids)} items")
logger.info(f" Recently added: {len(all_data['all_items']['recently_added_ids'])} items")
logger.info(f" Next up: {len(all_data['all_items']['next_up_ids'])} episodes")
with daemon_state.cache_lock:
daemon_state.cache_data = all_data
return all_data
except Exception as e:
logger.error(f"Error fetching Jellyfin data: {e}")
raise
def fetch_playlist_items(playlist_id: str) -> List[Dict]:
"""Fetch playlist items optimally - store only essential data for lookups"""
try:
logger.debug(f"Fetching items for playlist {playlist_id}")
# Get playlist items using the standard user_items endpoint with the playlist ID as ParentId
playlist_params = {
'ParentId': playlist_id,
'Recursive': False, # Playlist items are direct children
'Fields': 'Etag', # Minimal fields - we only need basic info for ID lookup
'SortBy': 'SortName', # Maintain playlist order
'SortOrder': 'Ascending'
}
playlist_data = daemon_state.client.jellyfin.user_items(params=playlist_params)
playlist_items = []
if playlist_data and 'Items' in playlist_data:
for index, item in enumerate(playlist_data['Items']):
# OPTIMIZED: Store only essential data for playlist items
playlist_item = {
'id': item.get('Id'),
'playlist_id': playlist_id,
'playlist_index': index # Track order in playlist
}
playlist_items.append(playlist_item)
logger.info(f"Fetched {len(playlist_data['Items'])} items for playlist {playlist_id}")
return playlist_items
except Exception as e:
logger.error(f"Error fetching items for playlist {playlist_id}: {e}")
return []
def fetch_specific_item(item_id: str) -> Optional[Dict]:
"""Fetch a specific item from Jellyfin with enhanced fields and image caching"""
try:
item_data = daemon_state.client.jellyfin.get_item(item_id)
if item_data:
return process_jellyfin_item(item_data)
else:
logger.warning(f"Failed to fetch item {item_id}")
return None
except Exception as e:
logger.error(f"Error fetching item {item_id}: {e}")
return None
def fetch_multiple_items(item_ids: List[str]) -> Dict[str, Dict]:
"""Fetch multiple items efficiently in batches"""
items = {}
batch_size = 50 # Jellyfin API limit
try:
for i in range(0, len(item_ids), batch_size):
batch = item_ids[i:i + batch_size]
logger.debug(f"Fetching batch of {len(batch)} items")
try:
items_data = daemon_state.client.jellyfin.get_items(batch)
if items_data and 'Items' in items_data:
for item_data in items_data['Items']:
processed_item = process_jellyfin_item(item_data)
items[processed_item['id']] = processed_item
except Exception as e:
logger.warning(f"Batch fetch failed, falling back to individual fetches: {e}")
for item_id in batch:
item = fetch_specific_item(item_id)
if item:
items[item_id] = item
except Exception as e:
logger.error(f"Error in batch fetch: {e}")
return items
def update_user_data_in_cache(user_data_list: List[Dict]) -> bool:
"""Update user data and maintain optimized ID lists with proper position data clearing"""
if not daemon_state.cache_data:
logger.warning("No cache data available for user data update")
return False
updated_items = 0
favorites_changes = []
continue_watching_changes = []
next_up_changes = []
next_up_needs_refresh = False
with daemon_state.cache_lock:
# Get current ID sets for easy lookup
current_favorites = set(daemon_state.cache_data["all_items"].get("favorites_ids", []))
current_continue_watching = set(daemon_state.cache_data["all_items"].get("continue_watching_ids", []))
for user_data in user_data_list:
item_id = user_data.get("ItemId")
if not item_id:
continue
# Get the raw values from the API response
raw_playback_position = user_data.get("PlaybackPositionTicks", 0)
raw_played_percentage = user_data.get("PlayedPercentage", 0)
is_played = user_data.get("Played", False)
is_favorite = user_data.get("IsFavorite", False)
# FIXED: Determine if item should be in continue watching based on raw values
should_be_in_continue_watching = (
raw_playback_position > 0 and
not is_played and
raw_played_percentage > 5 and
raw_played_percentage < 95
)
# FIXED: When an item is marked as played or falls outside continue watching range,
# use the actual API values but ensure proper continue watching logic
playback_position_to_store = raw_playback_position
played_percentage_to_store = raw_played_percentage
# If item is marked as played, it should have position data cleared in continue watching logic
# but we store the actual API values in the item data for consistency
if is_played:
logger.debug(f"Item {item_id} marked as played - storing actual position data but removing from continue watching")
item_found = False
updated_item_data = None
# Update in main data structures
for category in ["movies", "series"]:
items_list = daemon_state.cache_data["all_items"].get(category, [])
for item in items_list:
if item.get("id") == item_id:
# Update user-specific fields with the actual API values
item["played_percentage"] = played_percentage_to_store
item["playback_position_ticks"] = playback_position_to_store
item["is_favorite"] = is_favorite
item["played"] = is_played
# Update other fields if present
if "PlayCount" in user_data:
item["play_count"] = user_data["PlayCount"]
if "LastPlayedDate" in user_data:
timestamp = iso_to_unix_timestamp(user_data["LastPlayedDate"])
item["last_played_date"] = timestamp
updated_items += 1
updated_item_data = item.copy()
item_found = True
logger.debug(f"Updated {category} item {item_id}: played={is_played}, percentage={played_percentage_to_store:.1f}%, position={playback_position_to_store}")
break
# Update episodes
for series_id, episodes_list in daemon_state.cache_data["all_items"].get("episodes", {}).items():
for episode in episodes_list:
if episode.get("id") == item_id:
# Update user-specific fields with the actual API values
episode["played_percentage"] = played_percentage_to_store
episode["playback_position_ticks"] = playback_position_to_store
episode["is_favorite"] = is_favorite
episode["played"] = is_played
# Update other fields if present
if "PlayCount" in user_data:
episode["play_count"] = user_data["PlayCount"]
if "LastPlayedDate" in user_data:
timestamp = iso_to_unix_timestamp(user_data["LastPlayedDate"])
episode["last_played_date"] = timestamp
updated_items += 1
updated_item_data = episode.copy()
item_found = True
logger.debug(f"Updated episode {item_id}: played={is_played}, percentage={played_percentage_to_store:.1f}%, position={playback_position_to_store}")
break
if item_found:
# Update favorites ID list
if is_favorite and item_id not in current_favorites:
current_favorites.add(item_id)
favorites_changes.append(f"Added {updated_item_data.get('name', item_id)}")
elif not is_favorite and item_id in current_favorites:
current_favorites.discard(item_id)
favorites_changes.append(f"Removed {updated_item_data.get('name', item_id)}")
# FIXED: Update continue watching ID list with proper logic
# This is the key fix - the continue watching list follows the 5%-95% rule
if should_be_in_continue_watching and item_id not in current_continue_watching:
current_continue_watching.add(item_id)
logger.info(f"Added item {updated_item_data.get('name', item_id)} to continue watching (progress: {raw_played_percentage:.1f}%)")
continue_watching_changes.append(f"Added {updated_item_data.get('name', item_id)}")
elif not should_be_in_continue_watching and item_id in current_continue_watching:
current_continue_watching.discard(item_id)
reason = "played" if is_played else f"progress {raw_played_percentage:.1f}% outside range"
logger.info(f"Removed item {updated_item_data.get('name', item_id)} from continue watching ({reason})")
continue_watching_changes.append(f"Removed {updated_item_data.get('name', item_id)}")
# Check if this affects next_up (episodes that are completed)
if (updated_item_data and updated_item_data.get("type") == "Episode" and
(is_played or raw_played_percentage > 90)):
next_up_needs_refresh = True
logger.debug(f"Episode {item_id} completed - next_up list may need refresh")
# Update the ID lists in cache
daemon_state.cache_data["all_items"]["favorites_ids"] = list(current_favorites)
daemon_state.cache_data["all_items"]["continue_watching_ids"] = list(current_continue_watching)
# Refresh next_up if needed (lightweight operation)
if next_up_needs_refresh:
try:
logger.debug("Refreshing next_up list due to episode completion")
new_next_up_ids = fetch_next_up_ids(50)
old_next_up_ids = set(daemon_state.cache_data["all_items"].get("next_up_ids", []))
new_next_up_set = set(new_next_up_ids)
if old_next_up_ids != new_next_up_set:
daemon_state.cache_data["all_items"]["next_up_ids"] = new_next_up_ids
added_count = len(new_next_up_set - old_next_up_ids)
removed_count = len(old_next_up_ids - new_next_up_set)
next_up_changes.append(f"Refreshed: +{added_count} -{removed_count}")
logger.info(f"Next up list updated: +{added_count} -{removed_count} episodes")
except Exception as e:
logger.warning(f"Failed to refresh next_up list: {e}")
next_up_changes.append("Refresh failed")
if updated_items > 0:
daemon_state.cache_data["timestamp"] = int(time.time())
log_msg = f"Updated user data for {len(user_data_list)} items ({updated_items} cache entries)"
if favorites_changes:
log_msg += f" - Favorites: {', '.join(favorites_changes)}"
if continue_watching_changes:
log_msg += f" - Continue watching: {', '.join(continue_watching_changes)}"
if next_up_changes:
log_msg += f" - Next up: {', '.join(next_up_changes)}"
logger.info(log_msg)
return True
else:
logger.warning(f"No cache entries found for user data updates")
return False
def handle_library_changes(library_data: Dict) -> bool:
"""Handle library changes with comprehensive delta updates (UPDATED for playlists)"""
items_added = library_data.get("ItemsAdded", [])
items_removed = library_data.get("ItemsRemoved", [])
items_updated = library_data.get("ItemsUpdated", [])
if not daemon_state.cache_data:
logger.warning("No cache data available for library updates")
return False
changes_made = False
failed_operations = 0
total_operations = len(items_added) + len(items_removed) + len(items_updated)
with daemon_state.cache_lock:
# Handle removed items first
if items_removed:
logger.info(f"Removing {len(items_removed)} items from cache")
for item_id in items_removed:
try:
# Remove cached images for all image types
for image_type in ['primary', 'screenshot', 'thumb']:
for size in ['small', 'large']:
cache_path = get_image_cache_path(item_id, image_type, size)
if cache_path.exists():
try:
cache_path.unlink()
logger.debug(f"Removed cached {image_type} ({size}) image for deleted item {item_id}")
except Exception as e:
logger.warning(f"Failed to remove cached image {cache_path}: {e}")
removed_count = 0
# Remove from main data structures
for category in ["movies", "series", "playlists"]: # Added playlists
items_list = daemon_state.cache_data["all_items"].get(category, [])
original_count = len(items_list)
daemon_state.cache_data["all_items"][category] = [
item for item in items_list if item.get("id") != item_id
]
removed_count += original_count - len(daemon_state.cache_data["all_items"][category])
# Remove from playlist items if it's a playlist being deleted
if item_id in daemon_state.cache_data["all_items"]["playlist_items"]:
del daemon_state.cache_data["all_items"]["playlist_items"][item_id]
removed_count += 1
# Remove from ID-based lists
for id_list_name in ["favorites_ids", "continue_watching_ids", "recently_added_ids"]:
id_list = daemon_state.cache_data["all_items"].get(id_list_name, [])
if item_id in id_list:
daemon_state.cache_data["all_items"][id_list_name].remove(item_id)
removed_count += 1
# Remove from episodes
for series_id in list(daemon_state.cache_data["all_items"].get("episodes", {}).keys()):
episodes_list = daemon_state.cache_data["all_items"]["episodes"][series_id]
original_count = len(episodes_list)
daemon_state.cache_data["all_items"]["episodes"][series_id] = [
episode for episode in episodes_list if episode.get("id") != item_id
]
removed_count += original_count - len(daemon_state.cache_data["all_items"]["episodes"][series_id])
# Remove empty series from episodes
if not daemon_state.cache_data["all_items"]["episodes"][series_id]:
del daemon_state.cache_data["all_items"]["episodes"][series_id]
if removed_count > 0:
logger.debug(f"Removed item {item_id} from {removed_count} locations")
changes_made = True
except Exception as e:
logger.error(f"Failed to remove item {item_id}: {e}")
failed_operations += 1
# Handle added and updated items with batch processing
items_to_fetch = items_added + items_updated
if items_to_fetch:
logger.info(f"Processing {len(items_to_fetch)} items (added: {len(items_added)}, updated: {len(items_updated)})")
try:
# Batch fetch items for efficiency
fetched_items = fetch_multiple_items(items_to_fetch)
for item_id in items_to_fetch:
try:
item_data = fetched_items.get(item_id)
if not item_data:
logger.warning(f"Failed to fetch item {item_id}")
failed_operations += 1
continue
item_type = item_data.get("type")
# For updated items, remove old versions first
if item_id in items_updated:
for category in ["movies", "series", "playlists"]: # Added playlists
items_list = daemon_state.cache_data["all_items"].get(category, [])
daemon_state.cache_data["all_items"][category] = [
item for item in items_list if item.get("id") != item_id
]
# Remove from ID-based lists
for id_list_name in ["favorites_ids", "continue_watching_ids", "recently_added_ids"]:
id_list = daemon_state.cache_data["all_items"].get(id_list_name, [])
if item_id in id_list:
daemon_state.cache_data["all_items"][id_list_name].remove(item_id)
# Remove from episodes if it's an episode
if item_type == "Episode":
for series_id in daemon_state.cache_data["all_items"].get("episodes", {}).keys():
episodes_list = daemon_state.cache_data["all_items"]["episodes"][series_id]
daemon_state.cache_data["all_items"]["episodes"][series_id] = [
episode for episode in episodes_list if episode.get("id") != item_id
]
# Add to appropriate categories based on type
if item_type == "Movie":
daemon_state.cache_data["all_items"]["movies"].append(item_data)
changes_made = True
elif item_type == "Series":
daemon_state.cache_data["all_items"]["series"].append(item_data)
# For new series, fetch all episodes
if item_id in items_added:
episodes = fetch_episodes_for_series(item_id)
if episodes:
daemon_state.cache_data["all_items"]["episodes"][item_id] = episodes
changes_made = True
elif item_type == "Episode":
series_id = item_data.get("series_id")
if series_id:
if series_id not in daemon_state.cache_data["all_items"]["episodes"]:
daemon_state.cache_data["all_items"]["episodes"][series_id] = []
# Insert episode in correct position (sorted by season/episode number)
episodes_list = daemon_state.cache_data["all_items"]["episodes"][series_id]
# Find correct insertion point
season_num = item_data.get("season_number", 0)
episode_num = item_data.get("episode_number", 0)
inserted = False
for i, existing_episode in enumerate(episodes_list):
existing_season = existing_episode.get("season_number", 0)
existing_episode_num = existing_episode.get("episode_number", 0)
if (season_num < existing_season or
(season_num == existing_season and episode_num < existing_episode_num)):
episodes_list.insert(i, item_data)
inserted = True
break
if not inserted:
episodes_list.append(item_data)
changes_made = True
# OPTIMIZED: Handle playlist changes with minimal data storage
elif item_type == "Playlist":
# Remove old version if updating
if item_id in items_updated:
daemon_state.cache_data["all_items"]["playlists"] = [
playlist for playlist in daemon_state.cache_data["all_items"]["playlists"]
if playlist.get("id") != item_id
]
# Remove old playlist items
if item_id in daemon_state.cache_data["all_items"]["playlist_items"]:
del daemon_state.cache_data["all_items"]["playlist_items"][item_id]
# Add new/updated playlist
daemon_state.cache_data["all_items"]["playlists"].append(item_data)
# Fetch playlist items (minimal data)
playlist_items = fetch_playlist_items(item_id)
if playlist_items:
daemon_state.cache_data["all_items"]["playlist_items"][item_id] = playlist_items
changes_made = True
logger.debug(f"Processed playlist {item_id} with {len(playlist_items)} items")
# Update ID-based lists based on item properties
# Add to favorites_ids if applicable
if item_data.get("is_favorite"):
favorites_ids = daemon_state.cache_data["all_items"].get("favorites_ids", [])
if item_id not in favorites_ids:
daemon_state.cache_data["all_items"]["favorites_ids"].append(item_id)
# Add to recently_added_ids if it's a new item and is Movie, Series, or Playlist
if item_id in items_added and item_type in ["Movie", "Series", "Playlist"]:
recently_added_ids = daemon_state.cache_data["all_items"].get("recently_added_ids", [])
# Check if not already in recently added
if item_id not in recently_added_ids:
# Add to the beginning (newest first)
recently_added_ids.insert(0, item_id)
# Keep only the most recent 50 items
daemon_state.cache_data["all_items"]["recently_added_ids"] = recently_added_ids[:50]
logger.debug(f"Added item {item_id} to recently_added_ids")
# Add to continue_watching_ids if applicable (not for playlists)
if (item_type in ["Movie", "Episode"] and
item_data.get("playback_position_ticks", 0) > 0 and
not item_data.get("played", False) and
item_data.get("played_percentage", 0) > 5 and
item_data.get("played_percentage", 0) < 95):
continue_watching_ids = daemon_state.cache_data["all_items"].get("continue_watching_ids", [])
if item_id not in continue_watching_ids:
daemon_state.cache_data["all_items"]["continue_watching_ids"].append(item_id)
logger.debug(f"Processed item {item_id} ({item_type})")
except Exception as e:
logger.error(f"Failed to process item {item_id}: {e}")
failed_operations += 1
except Exception as e:
logger.error(f"Batch processing failed: {e}")
failed_operations += len(items_to_fetch)
# Calculate success rate and decide on fallback
success_rate = 1.0 - (failed_operations / max(total_operations, 1))
if changes_made and success_rate >= 0.5: # At least 50% success rate
# Update total count and timestamp (include playlists)
episode_count = sum(len(episodes) for episodes in daemon_state.cache_data["all_items"]["episodes"].values())
daemon_state.cache_data["total_items"] = (
len(daemon_state.cache_data["all_items"]["movies"]) +
len(daemon_state.cache_data["all_items"]["series"]) +
len(daemon_state.cache_data["all_items"]["playlists"]) +
episode_count
)
daemon_state.cache_data["timestamp"] = int(time.time())
logger.info(f"Library changes applied successfully: +{len(items_added)} -{len(items_removed)} ~{len(items_updated)} items (success rate: {success_rate:.1%})")
# Update tracking stats
daemon_state.total_updates += 1
if failed_operations > 0:
daemon_state.failed_updates += 1
return True
else:
logger.warning(f"Library update had low success rate ({success_rate:.1%}), recommending full refresh")
daemon_state.failed_updates += 1
daemon_state.total_updates += 1
return False
def handle_playback_events(message_type: str, data: Dict) -> bool:
"""Handle playback events with smart delta updates"""
try:
# Extract relevant information from playback data
if not data:
return False
# Get session info to find the item being played
session_id = None
item_id = None
user_id = None
# Different message types have different data structures
if message_type == "PlaybackStart":
session_id = data.get("SessionId")
item_id = data.get("ItemId")
user_id = data.get("UserId")
elif message_type == "PlaybackStopped":
session_id = data.get("SessionId")
item_id = data.get("ItemId")
user_id = data.get("UserId")
elif message_type == "PlaybackProgress":
# For progress updates, we can be more selective
session_id = data.get("SessionId")
item_id = data.get("ItemId")
user_id = data.get("UserId")
# Skip frequent progress updates for the same item
current_time = time.time()
last_update_time = daemon_state.last_playback_update.get(item_id, 0)
if current_time - last_update_time < 30: # Skip updates within 30 seconds
logger.debug(f"Skipping frequent progress update for item {item_id}")
return True
daemon_state.last_playback_update[item_id] = current_time
elif message_type == "Sessions":
# Sessions data contains multiple sessions, extract playing items
if isinstance(data, list):
playing_items = []
for session in data:
now_playing = session.get("NowPlayingItem")
if now_playing:
playing_items.append(now_playing.get("Id"))
# Update user data for all currently playing items
if playing_items:
logger.debug(f"Updating user data for {len(playing_items)} playing items")
return update_playing_items_user_data(playing_items)
return True
# For specific item playback events, fetch updated user data
if item_id and user_id == daemon_state.user_id:
logger.debug(f"Handling {message_type} for item {item_id}")
return update_single_item_user_data(item_id)
return True
except Exception as e:
logger.error(f"Error handling playback event {message_type}: {e}")
return False
except Exception as e:
logger.error(f"Error handling playback event {message_type}: {e}")
return False
def update_single_item_user_data(item_id: str) -> bool:
"""Update user data for a single item efficiently using the correct API method"""
try:
# Use the correct API method to fetch user data for the item
user_data_response = daemon_state.client.jellyfin.get_userdata_for_item(item_id)
if user_data_response:
# Convert to the format expected by update_user_data_in_cache
user_data_list = [{
"ItemId": item_id,
"PlayedPercentage": user_data_response.get("PlayedPercentage", 0),
"PlaybackPositionTicks": user_data_response.get("PlaybackPositionTicks", 0),
"Played": user_data_response.get("Played", False),
"IsFavorite": user_data_response.get("IsFavorite", False),
"LastPlayedDate": user_data_response.get("LastPlayedDate"),
"PlayCount": user_data_response.get("PlayCount", 0)
}]
logger.debug(f"Updating user data for item {item_id}: played_percentage={user_data_response.get('PlayedPercentage', 0):.1f}%, position_ticks={user_data_response.get('PlaybackPositionTicks', 0)}")
return update_user_data_in_cache(user_data_list)
else:
logger.warning(f"No user data returned for item {item_id}")
return False
except Exception as e:
logger.error(f"Failed to update user data for item {item_id}: {e}")
return False
def update_playing_items_user_data(item_ids: List[str]) -> bool:
"""Update user data for multiple playing items using the correct API method"""
try:
user_data_list = []
for item_id in item_ids:
try:
user_data_response = daemon_state.client.jellyfin.get_userdata_for_item(item_id)
if user_data_response:
user_data_list.append({
"ItemId": item_id,
"PlayedPercentage": user_data_response.get("PlayedPercentage", 0),
"PlaybackPositionTicks": user_data_response.get("PlaybackPositionTicks", 0),
"Played": user_data_response.get("Played", False),
"IsFavorite": user_data_response.get("IsFavorite", False),
"LastPlayedDate": user_data_response.get("LastPlayedDate"),
"PlayCount": user_data_response.get("PlayCount", 0)
})
logger.debug(f"Adding user data for playing item {item_id}: played_percentage={user_data_response.get('PlayedPercentage', 0):.1f}%")
except Exception as e:
logger.warning(f"Failed to fetch user data for playing item {item_id}: {e}")
if user_data_list:
return update_user_data_in_cache(user_data_list)
except Exception as e:
logger.error(f"Failed to update user data for playing items: {e}")
return False
def should_do_full_refresh() -> bool:
"""Determine if a full refresh is needed based on failure rate"""
if daemon_state.total_updates == 0:
return False
failure_rate = daemon_state.failed_updates / daemon_state.total_updates
# Do full refresh if failure rate is too high
if failure_rate > 0.3: # More than 30% failures
logger.warning(f"High failure rate detected ({failure_rate:.1%}), recommending full refresh")
return True
# Reset counters periodically
if daemon_state.total_updates > 100:
daemon_state.failed_updates = max(0, daemon_state.failed_updates - 10)
daemon_state.total_updates = max(10, daemon_state.total_updates - 10)
return False
def fallback_full_refresh():
"""Fallback to full cache refresh when selective updates fail repeatedly"""
try:
# Small delay to let server finish processing
time.sleep(0.5)
logger.info("Performing full cache refresh...")
cache_data = fetch_all_jellyfin_data()
if save_cache(cache_data):
logger.info("βœ… Full cache refresh completed successfully")
# Reset failure counters after successful full refresh
daemon_state.failed_updates = 0
daemon_state.total_updates = 1
else:
logger.error("❌ Failed to save full cache refresh")
except Exception as e:
logger.error(f"❌ Full cache refresh failed: {e}")
# Cache management functions
def save_cache(cache_data: Dict = None) -> bool:
"""Save cache data to file atomically"""
try:
CACHE_DIR.mkdir(parents=True, exist_ok=True)
temp_file = CACHE_FILE.with_suffix('.tmp')
data_to_save = cache_data if cache_data is not None else daemon_state.cache_data
if not data_to_save:
logger.error("No cache data to save")
return False
with daemon_state.cache_lock:
with open(temp_file, 'w') as f:
json.dump(data_to_save, f, indent=2)
temp_file.rename(CACHE_FILE)
total_items = data_to_save.get('total_items', 0)
logger.info(f"βœ… Cache updated successfully ({total_items} items)")
return True
except Exception as e:
logger.error(f"Failed to save cache: {e}")
return False
def load_cache() -> bool:
"""Load existing cache data into daemon state"""
try:
if not CACHE_FILE.exists():
return False
with open(CACHE_FILE, 'r') as f:
cache_data = json.load(f)
with daemon_state.cache_lock:
daemon_state.cache_data = cache_data
total_items = cache_data.get('total_items', 0)
logger.info(f"βœ… Loaded existing cache ({total_items} items)")
return True
except Exception as e:
logger.error(f"Failed to load cache: {e}")
return False
# Enhanced WebSocket message handling
def handle_websocket_message(message_type: str, data: Any = None):
"""Handle incoming WebSocket messages with comprehensive delta updates"""
logger.info(f"πŸ“‘ WebSocket message: {message_type}")
if logger.isEnabledFor(logging.DEBUG) and data:
logger.debug(f"Message data: {json.dumps(data, indent=2, default=str)}")
try:
success = False
# Handle different message types with appropriate delta update strategies
if message_type == "UserDataChanged":
logger.info("πŸ”„ User data changed - performing delta update")
user_data_list = data.get("UserDataList", []) if data else []
if user_data_list:
success = update_user_data_in_cache(user_data_list)
if success:
success = save_cache()
if success:
logger.info("βœ… User data updated successfully")
elif message_type == "LibraryChanged":
logger.info("πŸ”„ Library changed - performing delta update")
success = handle_library_changes(data if data else {})
if success:
success = save_cache()
if success:
logger.info("βœ… Library changes applied successfully")
elif message_type in ["PlaybackStart", "PlaybackStopped", "PlaybackProgress", "Sessions"]:
logger.info(f"πŸ”„ {message_type} - performing delta playback update")
success = handle_playback_events(message_type, data if data else {})
if success:
success = save_cache()
if success:
logger.debug(f"βœ… {message_type} processed successfully")
elif message_type == "RefreshProgress":
# For refresh progress, we can be more selective
# Only do full refresh if it's a complete library refresh
if data and data.get("Progress") == 100:
logger.info("πŸ”„ Library refresh completed - performing full cache refresh")
fallback_full_refresh()
return
else:
logger.debug("πŸ’“ Refresh in progress - skipping update")
return
elif message_type == "ForceKeepAlive":
logger.debug("πŸ’“ Keep-alive message")
return
elif message_type in ["RestartRequired", "ServerShuttingDown", "ServerRestarting"]:
logger.warning(f"⚠️ Server message: {message_type}")
if message_type == "ServerShuttingDown":
daemon_state.running = False
return
else:
logger.debug(f"❓ Message type '{message_type}' not handled")
return
# Handle failures and decide on full refresh
if not success:
logger.warning(f"❌ Delta update failed for {message_type}")
# Check if we should do a full refresh based on failure patterns
if should_do_full_refresh():
logger.warning("High failure rate detected, performing full refresh")
fallback_full_refresh()
else:
logger.info("Continuing with delta updates despite failure")
except Exception as e:
logger.error(f"❌ Error handling WebSocket message {message_type}: {e}")
daemon_state.failed_updates += 1
daemon_state.total_updates += 1
# For critical errors, consider full refresh
if should_do_full_refresh():
logger.warning("Critical error detected, performing full refresh")
fallback_full_refresh()
# WebSocket client setup
def setup_websocket():
"""Setup WebSocket connection using jellyfin-apiclient-python"""
try:
if not daemon_state.client:
logger.error("Cannot setup WebSocket: client not authenticated")
return False
logger.info("Setting up WebSocket connection...")
ws_client = WSClient(daemon_state.client)
daemon_state.ws_client = ws_client
def handle_callback(message_type, data):
if message_type in ['WebSocketConnect', 'WebSocketDisconnect', 'WebSocketError']:
logger.info(f"WebSocket {message_type}")
else:
handle_websocket_message(message_type, data)
daemon_state.client.callback = handle_callback
ws_client.start()
logger.info("βœ… WebSocket client configured")
return True
except Exception as e:
logger.error(f"Failed to setup WebSocket: {e}")
return False
def websocket_worker():
"""Worker function to maintain WebSocket connection"""
try:
while daemon_state.running:
if not daemon_state.ws_client or not daemon_state.ws_client.is_alive():
logger.info("WebSocket not connected, attempting to setup...")
if not setup_websocket():
logger.error("Failed to setup WebSocket, retrying in 10 seconds...")
time.sleep(10)
continue
time.sleep(5)
except Exception as e:
logger.error(f"WebSocket worker error: {e}")
if daemon_state.running:
logger.info("Restarting WebSocket worker in 10 seconds...")
time.sleep(10)
websocket_worker()
# Image cache management functions
def get_image_cache_stats() -> Dict[str, int]:
"""Get statistics about cached images (updated to include large versions)"""
stats = {
'total_images': 0,
'primary_images': 0,
'primary_images_large': 0,
'screenshot_images': 0,
'screenshot_images_large': 0,
'thumb_images': 0,
'thumb_images_large': 0,
'cache_size_mb': 0
}
if not IMAGE_CACHE_DIR.exists():
return stats
try:
total_size = 0
for image_file in IMAGE_CACHE_DIR.glob('*.jpg'):
stats['total_images'] += 1
total_size += image_file.stat().st_size
filename = image_file.name
if '_primary_large.jpg' in filename:
stats['primary_images_large'] += 1
elif '_primary.jpg' in filename:
stats['primary_images'] += 1
elif '_screenshot_large.jpg' in filename:
stats['screenshot_images_large'] += 1
elif '_screenshot.jpg' in filename:
stats['screenshot_images'] += 1
elif '_thumb_large.jpg' in filename:
stats['thumb_images_large'] += 1
elif '_thumb.jpg' in filename:
stats['thumb_images'] += 1
stats['cache_size_mb'] = round(total_size / (1024 * 1024), 2)
except Exception as e:
logger.error(f"Failed to get image cache stats: {e}")
return stats
def cleanup_orphaned_images():
"""Remove cached images for items that no longer exist in cache data (updated for large versions)"""
if not daemon_state.cache_data or not IMAGE_CACHE_DIR.exists():
return 0
current_item_ids = set()
for category in ["movies", "series", "favorites", "continue_watching"]:
for item in daemon_state.cache_data.get('all_items', {}).get(category, []):
if item.get('id'):
current_item_ids.add(item['id'])
for episodes_list in daemon_state.cache_data.get('all_items', {}).get('episodes', {}).values():
for episode in episodes_list:
if episode.get('id'):
current_item_ids.add(episode['id'])
removed_count = 0
try:
for image_file in IMAGE_CACHE_DIR.glob('*.jpg'):
filename_parts = image_file.stem.split('_')
if len(filename_parts) >= 2:
hash_part = filename_parts[0]
found = False
for item_id in current_item_ids:
if hashlib.md5(item_id.encode()).hexdigest() == hash_part:
found = True
break
if not found:
image_file.unlink()
removed_count += 1
logger.debug(f"Removed orphaned image: {image_file.name}")
except Exception as e:
logger.error(f"Error during image cleanup: {e}")
if removed_count > 0:
logger.info(f"Cleaned up {removed_count} orphaned images")
return removed_count
# Daemon management functions
def write_pidfile() -> bool:
"""Write process ID to PID file"""
try:
CACHE_DIR.mkdir(parents=True, exist_ok=True)
with open(DAEMON_PIDFILE, 'w') as f:
f.write(str(os.getpid()))
return True
except Exception as e:
logger.error(f"Failed to write PID file: {e}")
return False
def remove_pidfile():
"""Remove PID file"""
try:
DAEMON_PIDFILE.unlink(missing_ok=True)
except Exception as e:
logger.error(f"Failed to remove PID file: {e}")
def is_daemon_running() -> bool:
"""Check if daemon is already running"""
if not DAEMON_PIDFILE.exists():
return False
try:
with open(DAEMON_PIDFILE, 'r') as f:
pid = int(f.read().strip())
os.kill(pid, 0)
return True
except (ValueError, OSError):
remove_pidfile()
return False
def signal_handler(signum, frame):
"""Handle shutdown signals"""
logger.info(f"Received signal {signum}, shutting down...")
daemon_state.running = False
if daemon_state.ws_client:
try:
daemon_state.ws_client.stop()
except:
pass
if daemon_state.image_session:
try:
daemon_state.image_session.close()
except:
pass
def run_daemon():
"""Main daemon function"""
if is_daemon_running():
logger.error("Daemon is already running")
return False
if not write_pidfile():
logger.error("Could not write PID file")
return False
# Setup signal handlers
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
daemon_state.running = True
logger.info(f"πŸš€ Jellyfin real-time cache daemon with comprehensive delta updates started (PID: {os.getpid()})")
# Check PIL availability
if not PIL_AVAILABLE:
logger.warning("⚠️ PIL (Pillow) not available - image processing disabled")
logger.warning("Install with: pip install Pillow")
try:
# Create image cache directory
IMAGE_CACHE_DIR.mkdir(parents=True, exist_ok=True)
# Authenticate with Jellyfin
if not authenticate_with_jellyfin():
logger.error("❌ Authentication failed")
return False
# Try to load existing cache first
if not load_cache():
logger.info("πŸ“Š No existing cache found, performing initial cache population with image caching...")
initial_cache = fetch_all_jellyfin_data()
if not save_cache(initial_cache):
logger.error("❌ Failed to save initial cache")
return False
else:
logger.info("πŸ“Š Using existing cache data")
# Setup WebSocket connection
if not setup_websocket():
logger.error("❌ Failed to setup WebSocket")
return False
# Start WebSocket in a separate thread
ws_thread = threading.Thread(target=websocket_worker, daemon=True)
ws_thread.start()
# Show initial image cache stats
if PIL_AVAILABLE:
image_stats = get_image_cache_stats()
logger.info(f"πŸ–ΌοΈ Image cache: {image_stats['total_images']} images "
f"({image_stats['primary_images']} primary, {image_stats['screenshot_images']} screenshots) "
f"- {image_stats['cache_size_mb']} MB")
# Main daemon loop
logger.info("βœ… Daemon fully operational - monitoring for changes...")
cleanup_counter = 0
while daemon_state.running:
time.sleep(1)
# Check if WebSocket thread is still alive
if not ws_thread.is_alive() and daemon_state.running:
logger.warning("WebSocket thread died, attempting restart...")
if setup_websocket():
ws_thread = threading.Thread(target=websocket_worker, daemon=True)
ws_thread.start()
# Periodic cleanup of orphaned images (every 30 minutes)
cleanup_counter += 1
if cleanup_counter >= 1800 and PIL_AVAILABLE: # 30 minutes * 60 seconds
cleanup_counter = 0
try:
cleanup_orphaned_images()
except Exception as e:
logger.error(f"Image cleanup failed: {e}")
except KeyboardInterrupt:
logger.info("Received keyboard interrupt")
except Exception as e:
logger.error(f"Daemon error: {e}")
return False
finally:
# Cleanup
logger.info("πŸ›‘ Daemon shutting down...")
daemon_state.running = False
# Stop WebSocket client
if daemon_state.ws_client:
try:
daemon_state.ws_client.stop()
except:
pass
# Close image session
if daemon_state.image_session:
try:
daemon_state.image_session.close()
except:
pass
remove_pidfile()
return True
# Command line interface
def main():
parser = argparse.ArgumentParser(description="Jellyfin Real-time Cache Daemon with Comprehensive Delta Updates")
parser.add_argument("command", choices=["start", "stop", "status", "restart", "test", "refresh", "images", "cleanup"],
default="start", nargs="?", help="Daemon command")
args = parser.parse_args()
if args.command == "start":
if is_daemon_running():
print("❌ Daemon is already running")
sys.exit(1)
try:
success = run_daemon()
if success:
print("βœ… Daemon started successfully")
else:
print("❌ Failed to start daemon")
sys.exit(1)
except KeyboardInterrupt:
print("⏹️ Daemon interrupted")
except Exception as e:
print(f"❌ Failed to start daemon: {e}")
sys.exit(1)
elif args.command == "stop":
if not DAEMON_PIDFILE.exists():
print("❌ Daemon is not running")
sys.exit(1)
try:
with open(DAEMON_PIDFILE, 'r') as f:
pid = int(f.read().strip())
os.kill(pid, signal.SIGTERM)
print("πŸ“€ Stop signal sent to daemon")
for _ in range(10):
if not is_daemon_running():
print("βœ… Daemon stopped successfully")
break
time.sleep(1)
else:
print("⚠️ Daemon may still be running")
except (ValueError, OSError) as e:
print(f"❌ Failed to stop daemon: {e}")
elif args.command == "status":
if is_daemon_running():
print("βœ… Daemon is running")
if CACHE_FILE.exists():
try:
with open(CACHE_FILE, 'r') as f:
cache_data = json.load(f)
if "timestamp" in cache_data:
age = int(time.time()) - cache_data["timestamp"]
print(f"πŸ“Š Cache last updated: {age} seconds ago")
print(f"πŸ“ˆ Total items: {cache_data.get('total_items', 0)}")
items = cache_data.get('all_items', {})
print(f" Movies: {len(items.get('movies', []))}")
print(f" Series: {len(items.get('series', []))}")
print(f" Playlists: {len(items.get('playlists', []))}") # NEW
print(f" Favorites: {len(items.get('favorites', []))}")
print(f" Recently Added: {len(items.get('recently_added', []))}")
print(f" Continue Watching: {len(items.get('continue_watching', []))}")
print(f" Next Up: {len(items.get('next_up_ids', []))}")
episodes = items.get('episodes', {})
total_episodes = sum(len(eps) for eps in episodes.values())
print(f" Episodes: {total_episodes} across {len(episodes)} series")
all_items = []
for category in ['movies', 'series', 'favorites', 'continue_watching']:
all_items.extend(items.get(category, []))
for series_episodes in episodes.values():
all_items.extend(series_episodes)
if all_items:
items_with_video_codec = sum(1 for item in all_items if item.get('video_codec'))
items_with_subtitles = sum(1 for item in all_items if item.get('has_subtitles'))
items_with_imdb = sum(1 for item in all_items if item.get('imdb_url'))
items_with_trailers = sum(1 for item in all_items if item.get('trailer_url'))
items_with_images = sum(1 for item in all_items
if item.get('has_primary_image') or item.get('has_screenshot_image'))
items_with_large_images = sum(1 for item in all_items
if item.get('has_primary_image_large') or item.get('has_screenshot_image_large'))
print(f"\n🎬 Enhanced Metadata:")
print(f" Items with video codec: {items_with_video_codec}")
print(f" Items with subtitles: {items_with_subtitles}")
print(f" Items with IMDB URLs: {items_with_imdb}")
print(f" Items with trailers: {items_with_trailers}")
print(f" Items with cached images (small): {items_with_images}")
print(f" Items with cached images (large): {items_with_large_images}")
if PIL_AVAILABLE:
image_stats = get_image_cache_stats()
print(f"\nπŸ–ΌοΈ Image Cache:")
print(f" Total images: {image_stats['total_images']}")
print(f" Primary images (small): {image_stats['primary_images']}")
print(f" Primary images (large): {image_stats['primary_images_large']}")
print(f" Screenshot images (small): {image_stats['screenshot_images']}")
print(f" Screenshot images (large): {image_stats['screenshot_images_large']}")
print(f" Thumbnail images (small): {image_stats['thumb_images']}")
print(f" Thumbnail images (large): {image_stats['thumb_images_large']}")
print(f" Cache size: {image_stats['cache_size_mb']} MB")
# NEW: Show playlist details
playlists = items.get('playlists', [])
playlist_items = items.get('playlist_items', {})
if playlists:
total_playlist_items = sum(len(items) for items in playlist_items.values())
print(f" Playlist items: {total_playlist_items} across {len(playlists)} playlists")
# Show sample playlist info
print(f"\n🎡 Playlist Details:")
for playlist in playlists[:5]: # Show first 5 playlists
playlist_id = playlist.get('id')
item_count = len(playlist_items.get(playlist_id, []))
print(f" {playlist.get('name', 'Unknown')}: {item_count} items")
# Show delta update statistics
print(f"\n⚑ Delta Update Stats:")
if daemon_state.total_updates > 0:
success_rate = 1.0 - (daemon_state.failed_updates / daemon_state.total_updates)
print(f" Success rate: {success_rate:.1%}")
print(f" Total updates: {daemon_state.total_updates}")
print(f" Failed updates: {daemon_state.failed_updates}")
else:
print(" No updates processed yet")
# library filtering info
show_library_filtering_status()
except Exception as e:
print(f"❌ Failed to read cache: {e}")
else:
print("❌ Daemon is not running")
elif args.command == "images":
if not PIL_AVAILABLE:
print("❌ PIL (Pillow) not available - image processing disabled")
print("Install with: pip install Pillow")
sys.exit(1)
print("πŸ–ΌοΈ Image Cache Analysis:")
image_stats = get_image_cache_stats()
print(f"Total images: {image_stats['total_images']}")
print(f"Primary images (small): {image_stats['primary_images']}")
print(f"Primary images (large): {image_stats['primary_images_large']}")
print(f"Screenshot images (small): {image_stats['screenshot_images']}")
print(f"Screenshot images (large): {image_stats['screenshot_images_large']}")
print(f"Thumbnail images (small): {image_stats['thumb_images']}")
print(f"Thumbnail images (large): {image_stats['thumb_images_large']}")
print(f"Total cache size: {image_stats['cache_size_mb']} MB")
if IMAGE_CACHE_DIR.exists():
print(f"Cache directory: {IMAGE_CACHE_DIR}")
print(f"Cache directory exists: βœ…")
else:
print(f"Cache directory: {IMAGE_CACHE_DIR}")
print(f"Cache directory exists: ❌")
elif args.command == "cleanup":
if not PIL_AVAILABLE:
print("❌ PIL (Pillow) not available")
sys.exit(1)
print("🧹 Cleaning up orphaned images...")
try:
if CACHE_FILE.exists():
with open(CACHE_FILE, 'r') as f:
daemon_state.cache_data = json.load(f)
removed = cleanup_orphaned_images()
print(f"βœ… Removed {removed} orphaned images")
image_stats = get_image_cache_stats()
print(f"πŸ–ΌοΈ Cache now contains {image_stats['total_images']} images ({image_stats['cache_size_mb']} MB)")
else:
print("❌ No cache file found - cannot determine which images are orphaned")
except Exception as e:
print(f"❌ Cleanup failed: {e}")
elif args.command == "test":
print("πŸ§ͺ Testing Jellyfin connection with comprehensive delta update capabilities...")
if not PIL_AVAILABLE:
print("⚠️ PIL (Pillow) not available - image processing will be disabled")
try:
IMAGE_CACHE_DIR.mkdir(parents=True, exist_ok=True)
if authenticate_with_jellyfin():
print("βœ… Authentication successful")
print("πŸ“š Testing library filtering...")
available_libs = get_available_libraries()
print(f"Available libraries: {list(available_libs.keys())}")
if INCLUDED_LIBRARIES:
filtered_ids = get_filtered_library_ids()
print(f"Filtered library IDs: {filtered_ids}")
print("πŸ“Š Testing comprehensive data fetch with delta update infrastructure...")
cache_data = fetch_all_jellyfin_data()
print(f"βœ… Fetched {cache_data.get('total_items', 0)} total items")
episodes = cache_data.get('all_items', {}).get('episodes', {})
total_episodes = sum(len(eps) for eps in episodes.values())
print(f"βœ… Fetched {total_episodes} episodes across {len(episodes)} series")
# Test delta update functions
print("\nπŸ§ͺ Testing delta update functions...")
# Test batch item fetching
all_items = []
for category in ['movies', 'series']:
all_items.extend(cache_data.get('all_items', {}).get(category, []))
if all_items:
sample_ids = [item['id'] for item in all_items[:3]]
print(f" Testing batch fetch of {len(sample_ids)} items...")
fetched_items = fetch_multiple_items(sample_ids)
print(f" βœ… Batch fetch returned {len(fetched_items)} items")
# Test user data update simulation
print(" Testing user data update simulation...")
if all_items:
test_user_data = [{
"ItemId": all_items[0]['id'],
"PlayedPercentage": 25.0,
"PlaybackPositionTicks": 5000000,
"IsFavorite": True
}]
update_result = update_user_data_in_cache(test_user_data)
print(f" βœ… User data update test: {'Success' if update_result else 'Failed'}")
# Test library change simulation
print(" Testing library change handling...")
test_library_data = {
"ItemsAdded": [],
"ItemsRemoved": [],
"ItemsUpdated": []
}
library_result = handle_library_changes(test_library_data)
print(f" βœ… Library change test: {'Success' if library_result else 'No changes'}")
# Test playback event handling
print(" Testing playback event handling...")
test_playback_data = {"SessionId": "test", "ItemId": "test", "UserId": daemon_state.user_id}
playback_result = handle_playback_events("PlaybackProgress", test_playback_data)
print(f" βœ… Playback event test: {'Success' if playback_result else 'Expected failure'}")
if save_cache(cache_data):
print("\nβœ… Cache saved successfully")
else:
print("\n❌ Failed to save cache")
print("\nπŸ”Œ Testing WebSocket setup...")
if setup_websocket():
print("βœ… WebSocket setup successful")
else:
print("❌ WebSocket setup failed")
# Show sample of enhanced metadata
if all_items:
sample_items = all_items[:3]
print(f"\nπŸ” Sample enhanced metadata (showing {len(sample_items)} items):")
for item in sample_items:
print(f"\n πŸ“½οΈ {item.get('name', 'Unknown')} ({item.get('type', 'Unknown')})")
print(f" Video Codec: {item.get('video_codec', 'Not available')}")
print(f" Video Width: {item.get('video_width', 'Not available')}")
print(f" IMDB URL: {item.get('imdb_url', 'Not available')}")
print(f" Primary Image: {'βœ… Cached' if item.get('has_primary_image') else '❌ Not cached'}")
if PIL_AVAILABLE:
image_stats = get_image_cache_stats()
print(f"\nπŸ–ΌοΈ Image cache statistics:")
print(f" Total images cached: {image_stats['total_images']}")
print(f" Cache size: {image_stats['cache_size_mb']} MB")
else:
print("❌ Authentication failed")
except Exception as e:
print(f"❌ Test failed: {e}")
import traceback
traceback.print_exc()
elif args.command == "refresh":
print("πŸ”„ Forcing full cache refresh with comprehensive delta update infrastructure...")
if not PIL_AVAILABLE:
print("⚠️ PIL (Pillow) not available - image processing will be disabled")
try:
IMAGE_CACHE_DIR.mkdir(parents=True, exist_ok=True)
if authenticate_with_jellyfin():
cache_data = fetch_all_jellyfin_data()
if save_cache(cache_data):
print(f"βœ… Cache refreshed successfully ({cache_data.get('total_items', 0)} items)")
episodes = cache_data.get('all_items', {}).get('episodes', {})
total_episodes = sum(len(eps) for eps in episodes.values())
print(f"βœ… Refreshed {total_episodes} episodes across {len(episodes)} series")
all_items = []
for category in ['movies', 'series', 'favorites', 'continue_watching']:
all_items.extend(cache_data.get('all_items', {}).get(category, []))
for series_episodes in episodes.values():
all_items.extend(series_episodes)
if all_items:
items_with_video_codec = sum(1 for item in all_items if item.get('video_codec'))
items_with_subtitles = sum(1 for item in all_items if item.get('has_subtitles'))
items_with_imdb = sum(1 for item in all_items if item.get('imdb_url'))
items_with_trailers = sum(1 for item in all_items if item.get('trailer_url'))
items_with_images = sum(1 for item in all_items
if item.get('has_primary_image') or item.get('has_screenshot_image'))
print(f"🎬 Enhanced metadata collected:")
print(f" Items with video codec: {items_with_video_codec}")
print(f" Items with subtitles: {items_with_subtitles}")
print(f" Items with IMDB URLs: {items_with_imdb}")
print(f" Items with trailers: {items_with_trailers}")
print(f" Items with cached images: {items_with_images}")
if PIL_AVAILABLE:
image_stats = get_image_cache_stats()
print(f"πŸ–ΌοΈ Image cache:")
print(f" Total images: {image_stats['total_images']}")
print(f" Primary images: {image_stats['primary_images']}")
print(f" Screenshot images: {image_stats['screenshot_images']}")
print(f" Cache size: {image_stats['cache_size_mb']} MB")
# Reset delta update counters after successful full refresh
daemon_state.failed_updates = 0
daemon_state.total_updates = 1
print("πŸ”„ Delta update counters reset")
else:
print("❌ Failed to save refreshed cache")
else:
print("❌ Authentication failed")
except Exception as e:
print(f"❌ Cache refresh failed: {e}")
elif args.command == "restart":
if is_daemon_running():
print("πŸ›‘ Stopping daemon...")
os.system(f"{sys.argv[0]} stop")
time.sleep(3)
print("πŸš€ Starting daemon...")
os.system(f"{sys.argv[0]} start")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment