Created
November 13, 2025 02:32
-
-
Save RicterZ/cd449fcce46adec81bc5a582bf2bc31c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/home/bgmi-docker/.venv/bin/python | |
| import transmission_rpc | |
| import requests | |
| import os | |
| import sys | |
| import shutil | |
| import logging | |
| from typing import Optional, List | |
| import json | |
| import xml.etree.ElementTree as ET | |
| VIDEO_EXTS = ['mp4', 'mkv', 'mov', 'wmv', 'rmvb', 'avi', 'mpeg'] | |
| SUBTITLE_EXTS = ['ass', 'srt', 'sub', 'vtt'] | |
| BGMI_BASE_PATH = '/bgmi/bangumi/' | |
| ENV_TORRENT_DIR = 'TR_TORRENT_DIR' | |
| ENV_TORRENT_HASH = 'TR_TORRENT_HASH' | |
| TR_HOST = os.getenv('TR_HOST', '127.0.0.1') | |
| TR_PORT = int(os.getenv('TR_PORT', '9091')) | |
| TR_USERNAME = os.getenv('TR_USERNAME', 'your_username') | |
| TR_PASSWORD = os.getenv('TR_PASSWORD', 'your_password') | |
| TR_RPC_PATH = os.getenv('TR_RPC_PATH', '/transmission/rpc') | |
| TMM_API_URL = os.getenv('TMM_API_URL', 'http://tmm.home.ricterz.me/api/tvshow') | |
| TMM_API_KEY = os.getenv('TMM_API_KEY', 'tmmkey') | |
| TMDB_API_KEY = os.getenv('TMDB_API_KEY', 'tmdb_api_key') | |
| LOG_FILE = os.getenv('LOG_FILE', '/tmp/transmission.log') | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler(LOG_FILE, encoding='utf-8'), | |
| logging.StreamHandler(sys.stderr) | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def validate_environment() -> None: | |
| if not os.getenv(ENV_TORRENT_DIR): | |
| raise Exception(f'{ENV_TORRENT_DIR} environment variable is not set') | |
| def get_abs_episode_from_env() -> int: | |
| path = os.getenv(ENV_TORRENT_DIR, '').rstrip('/') | |
| rel = path.replace(BGMI_BASE_PATH, '') | |
| parts = rel.rstrip('/').split('/') if rel else [] | |
| for i in range(len(parts) - 1, -1, -1): | |
| if parts[i].isdigit(): | |
| return int(parts[i]) | |
| raise Exception('Cannot extract absolute episode number from TR_TORRENT_DIR') | |
| def read_show_ids_from_nfo(possible_paths: List[str]) -> tuple[str, str, str]: | |
| tvdbid, tmdbid, title = '', '', '' | |
| for nfo_path in possible_paths: | |
| try: | |
| if nfo_path and os.path.exists(nfo_path): | |
| tree = ET.parse(nfo_path) | |
| root = tree.getroot() | |
| t = root.findtext('title') or '' | |
| tvdb = root.findtext('tvdbid') or '' | |
| tmdb = root.findtext('tmdbid') or '' | |
| if not tvdb: | |
| uid_nodes = root.findall('uniqueid') | |
| for uid in uid_nodes: | |
| if uid.get('type') == 'tvdb' and (uid.text or '').strip(): | |
| tvdb = uid.text.strip() | |
| break | |
| if not tmdb: | |
| uid_nodes = root.findall('uniqueid') | |
| for uid in uid_nodes: | |
| if uid.get('type') == 'tmdb' and (uid.text or '').strip(): | |
| tmdb = uid.text.strip() | |
| break | |
| title = (t or title).strip() | |
| tvdbid = (tvdb or tvdbid).strip() | |
| tmdbid = (tmdb or tmdbid).strip() | |
| if title or tvdbid or tmdbid: | |
| break | |
| except Exception: | |
| continue | |
| return tvdbid, tmdbid, title | |
| def build_abs_map_via_tmdb(tmdbid: str) -> List[List[int]]: | |
| flat: List[List[int]] = [] | |
| if not tmdbid or not TMDB_API_KEY: | |
| return flat | |
| try: | |
| show_url = f'https://api.themoviedb.org/3/tv/{tmdbid}?api_key={TMDB_API_KEY}' | |
| rs = requests.get(show_url, timeout=30) | |
| rs.raise_for_status() | |
| js = rs.json() | |
| seasons = js.get('seasons') or [] | |
| season_numbers = sorted([s.get('season_number') for s in seasons if isinstance(s.get('season_number'), int) and s.get('season_number') > 0]) | |
| items_with_date = [] | |
| items_without_date = [] | |
| for sn in season_numbers: | |
| sd_url = f'https://api.themoviedb.org/3/tv/{tmdbid}/season/{sn}?api_key={TMDB_API_KEY}' | |
| rse = requests.get(sd_url, timeout=30) | |
| rse.raise_for_status() | |
| jse = rse.json() | |
| eps = jse.get('episodes') or [] | |
| for ep in eps: | |
| s = sn | |
| e = ep.get('episode_number') | |
| air_date = ep.get('air_date') or '' | |
| if isinstance(e, int): | |
| if air_date: | |
| items_with_date.append((air_date, s, e)) | |
| else: | |
| items_without_date.append((s, e)) | |
| items_with_date.sort(key=lambda x: (x[0], x[1], x[2])) | |
| items_without_date.sort(key=lambda x: (x[0], x[1])) | |
| flat = [[s, e] for _, s, e in items_with_date] + [[s, e] for s, e in items_without_date] | |
| return flat | |
| except Exception as e: | |
| logger.warning(f'TMDB mapping failed: {e}') | |
| return [] | |
| def get_season_episode_from_cache(abs_ep: int, tmdbid: str, show_dir: str) -> Optional[tuple[int, int]]: | |
| if not show_dir: | |
| return None | |
| cache_path = os.path.join(show_dir, '.abs_map.json') | |
| try: | |
| if os.path.exists(cache_path): | |
| with open(cache_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| if isinstance(data, dict) and data.get('tmdbid') == tmdbid and isinstance(data.get('flat'), list): | |
| flat = data['flat'] | |
| idx = abs_ep - 1 | |
| if 0 <= idx < len(flat) and isinstance(flat[idx], list) and len(flat[idx]) == 2: | |
| s, e = flat[idx] | |
| if isinstance(s, int) and isinstance(e, int): | |
| return s, e | |
| except Exception: | |
| pass | |
| flat = build_abs_map_via_tmdb(tmdbid) | |
| if flat: | |
| try: | |
| with open(cache_path, 'w', encoding='utf-8') as f: | |
| json.dump({'tmdbid': tmdbid, 'flat': flat}, f, ensure_ascii=False) | |
| except Exception: | |
| pass | |
| idx = abs_ep - 1 | |
| if 0 <= idx < len(flat): | |
| s, e = flat[idx] | |
| if isinstance(s, int) and isinstance(e, int): | |
| return s, e | |
| return None | |
| def get_files(path: Optional[str] = None) -> List[str]: | |
| if not path: | |
| path = os.getenv(ENV_TORRENT_DIR, '') | |
| if not path: | |
| raise Exception('Path is not specified and TR_TORRENT_DIR is not set') | |
| if not os.path.exists(path): | |
| raise Exception(f'Path does not exist: {path}') | |
| if not os.path.isdir(path): | |
| raise Exception(f'Path is not a directory: {path}') | |
| try: | |
| return [os.path.join(path, item) for item in os.listdir(path)] | |
| except OSError as e: | |
| raise Exception(f'Cannot list directory {path}: {e}') | |
| def find_video_file(path: Optional[str] = None) -> Optional[str]: | |
| if not path: | |
| path = os.getenv(ENV_TORRENT_DIR, '') | |
| if not path or not os.path.exists(path): | |
| logger.warning(f'Invalid path for find_video_file: {path}') | |
| return None | |
| try: | |
| files = get_files(path) | |
| except Exception as e: | |
| logger.error(f'Error getting files: {e}') | |
| return None | |
| for file_path in files: | |
| if os.path.isdir(file_path): | |
| found_file = find_video_file(file_path) | |
| if found_file: | |
| try: | |
| parent_dir = os.path.realpath(os.path.join(found_file, '..', '..')) | |
| if os.path.exists(parent_dir): | |
| new_location = os.path.join(parent_dir, os.path.basename(found_file)) | |
| shutil.move(found_file, new_location) | |
| return new_location | |
| else: | |
| logger.warning(f'Parent directory does not exist: {parent_dir}') | |
| return found_file | |
| except (OSError, shutil.Error) as e: | |
| logger.error(f'Failed to move file {found_file}: {e}') | |
| return found_file | |
| continue | |
| if os.path.isfile(file_path): | |
| try: | |
| _, ext = os.path.splitext(file_path) | |
| ext = ext.lstrip('.').lower() | |
| if ext in VIDEO_EXTS: | |
| return file_path | |
| except Exception as e: | |
| logger.warning(f'Error checking file {file_path}: {e}') | |
| continue | |
| return None | |
| def find_subtitle_files(video_path: str) -> List[str]: | |
| subtitle_files = [] | |
| video_dir = os.path.dirname(video_path) | |
| video_name = os.path.splitext(os.path.basename(video_path))[0] | |
| if not os.path.exists(video_dir): | |
| return subtitle_files | |
| try: | |
| files = get_files(video_dir) | |
| for file_path in files: | |
| if os.path.isfile(file_path): | |
| try: | |
| file_name = os.path.splitext(os.path.basename(file_path))[0] | |
| _, ext = os.path.splitext(file_path) | |
| ext = ext.lstrip('.') | |
| ext = ext.lower() | |
| if ext in SUBTITLE_EXTS and file_name == video_name: | |
| subtitle_files.append(file_path) | |
| except Exception as e: | |
| logger.warning(f'Error checking subtitle file {file_path}: {e}') | |
| continue | |
| except Exception as e: | |
| logger.warning(f'Error finding subtitle files: {e}') | |
| return subtitle_files | |
| def scrape_new() -> bool: | |
| try: | |
| headers = {'api-key': TMM_API_KEY} | |
| payload = [ | |
| {"action": "update", "scope": {"name": "all"}}, | |
| {"action": "scrape", "scope": {"name": "new"}}, | |
| ] | |
| response = requests.post(TMM_API_URL, headers=headers, json=payload, timeout=60) | |
| response.raise_for_status() | |
| return True | |
| except requests.RequestException as e: | |
| logger.error(f'Failed to call TMM API: {e}') | |
| return False | |
| except Exception as e: | |
| logger.error(f'Unexpected error during scraping: {e}') | |
| return False | |
| def remove_transmission_seed() -> None: | |
| try: | |
| client = transmission_rpc.Client( | |
| host=TR_HOST, | |
| port=TR_PORT, | |
| username=TR_USERNAME, | |
| password=TR_PASSWORD, | |
| path=TR_RPC_PATH, | |
| ) | |
| torrent_hash = os.getenv(ENV_TORRENT_HASH, '') | |
| if torrent_hash: | |
| try: | |
| client.remove_torrent(torrent_hash) | |
| except Exception as e: | |
| logger.warning(f'Failed to remove current torrent {torrent_hash}: {e}') | |
| else: | |
| logger.warning('TR_TORRENT_HASH not set, skipping current torrent removal') | |
| try: | |
| torrents = client.get_torrents() | |
| remove_list = [] | |
| for torrent in torrents: | |
| if hasattr(torrent, 'status') and 'seeding' in str(torrent.status).lower(): | |
| remove_list.append(torrent) | |
| for torrent in remove_list: | |
| try: | |
| client.remove_torrent(torrent.info_hash) | |
| except Exception as e: | |
| logger.warning(f'Failed to remove seeding torrent {torrent.info_hash}: {e}') | |
| except Exception as e: | |
| logger.warning(f'Failed to get or remove seeding torrents: {e}') | |
| except Exception as e: | |
| logger.error(f'Failed to connect to Transmission RPC: {e}') | |
| raise | |
| def remove_directory_safely(directory: str) -> None: | |
| try: | |
| if os.path.exists(directory): | |
| shutil.rmtree(directory) | |
| except OSError as e: | |
| logger.error(f'Failed to remove directory {directory}: {e}') | |
| raise | |
| def main() -> None: | |
| try: | |
| validate_environment() | |
| video_file = find_video_file() | |
| if not video_file: | |
| raise Exception('No video file found in torrent directory') | |
| _, video_ext = os.path.splitext(video_file) | |
| video_ext = video_ext.lstrip('.') | |
| abs_ep = get_abs_episode_from_env() | |
| torrent_dir_env = os.getenv(ENV_TORRENT_DIR, '').rstrip('/') | |
| parent_show_dir = os.path.normpath(os.path.join(torrent_dir_env, '..')) | |
| nfo_path = os.path.join(parent_show_dir, 'tvshow.nfo') if parent_show_dir else '' | |
| tvdbid, tmdbid, _ = read_show_ids_from_nfo([nfo_path]) | |
| if not tvdbid and not tmdbid: | |
| logger.warning(f'tvshow.nfo not found or missing ids at: {nfo_path}') | |
| mapped = get_season_episode_from_cache(abs_ep, tmdbid, parent_show_dir) | |
| if mapped: | |
| season_num, ep_num = mapped | |
| else: | |
| season_num, ep_num = 1, abs_ep | |
| new_path_base = os.path.join(os.path.normpath(os.path.join(os.getenv(ENV_TORRENT_DIR, '').rstrip('/'), '..')), f'S{str(season_num).zfill(2)}E{str(ep_num).zfill(2)}') | |
| new_video_path = f'{new_path_base}.{video_ext}' | |
| new_dir = os.path.dirname(new_video_path) | |
| if not os.path.exists(new_dir): | |
| raise Exception(f'Parent directory does not exist: {new_dir}') | |
| if os.path.exists(new_video_path): | |
| logger.warning(f'Target file already exists: {new_video_path}, overwriting...') | |
| subtitle_files = find_subtitle_files(video_file) | |
| shutil.move(video_file, new_video_path) | |
| for subtitle_file in subtitle_files: | |
| try: | |
| _, subtitle_ext = os.path.splitext(subtitle_file) | |
| subtitle_ext = subtitle_ext.lstrip('.') | |
| new_subtitle_path = f'{new_path_base}.{subtitle_ext}' | |
| shutil.move(subtitle_file, new_subtitle_path) | |
| except Exception as e: | |
| logger.warning(f'Failed to move subtitle file {subtitle_file}: {e}') | |
| if not scrape_new(): | |
| logger.warning('TMM scrape for new items failed, continuing...') | |
| torrent_dir = os.getenv(ENV_TORRENT_DIR, '').rstrip('/') | |
| if torrent_dir and os.path.exists(torrent_dir): | |
| try: | |
| remove_directory_safely(torrent_dir) | |
| except Exception as e: | |
| logger.warning(f'Failed to remove torrent directory {torrent_dir}: {e}') | |
| try: | |
| remove_transmission_seed() | |
| except Exception as e: | |
| logger.warning(f'Failed to remove transmission seeds (non-critical): {e}') | |
| except Exception as e: | |
| logger.error(f'Unexpected error: {e}', exc_info=True) | |
| sys.exit(1) | |
| if __name__ == '__main__': | |
| try: | |
| main() | |
| except KeyboardInterrupt: | |
| logger.info('Interrupted by user') | |
| sys.exit(130) | |
| except Exception as e: | |
| logger.error(f'Fatal error: {e}', exc_info=True) | |
| sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment