Skip to content

Instantly share code, notes, and snippets.

@RicterZ
Created November 13, 2025 02:32
Show Gist options
  • Select an option

  • Save RicterZ/cd449fcce46adec81bc5a582bf2bc31c to your computer and use it in GitHub Desktop.

Select an option

Save RicterZ/cd449fcce46adec81bc5a582bf2bc31c to your computer and use it in GitHub Desktop.
#!/home/bgmi-docker/.venv/bin/python
import transmission_rpc
import requests
import os
import sys
import shutil
import logging
from typing import Optional, List
import json
import xml.etree.ElementTree as ET
VIDEO_EXTS = ['mp4', 'mkv', 'mov', 'wmv', 'rmvb', 'avi', 'mpeg']
SUBTITLE_EXTS = ['ass', 'srt', 'sub', 'vtt']
BGMI_BASE_PATH = '/bgmi/bangumi/'
ENV_TORRENT_DIR = 'TR_TORRENT_DIR'
ENV_TORRENT_HASH = 'TR_TORRENT_HASH'
TR_HOST = os.getenv('TR_HOST', '127.0.0.1')
TR_PORT = int(os.getenv('TR_PORT', '9091'))
TR_USERNAME = os.getenv('TR_USERNAME', 'your_username')
TR_PASSWORD = os.getenv('TR_PASSWORD', 'your_password')
TR_RPC_PATH = os.getenv('TR_RPC_PATH', '/transmission/rpc')
TMM_API_URL = os.getenv('TMM_API_URL', 'http://tmm.home.ricterz.me/api/tvshow')
TMM_API_KEY = os.getenv('TMM_API_KEY', 'tmmkey')
TMDB_API_KEY = os.getenv('TMDB_API_KEY', 'tmdb_api_key')
LOG_FILE = os.getenv('LOG_FILE', '/tmp/transmission.log')
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOG_FILE, encoding='utf-8'),
logging.StreamHandler(sys.stderr)
]
)
logger = logging.getLogger(__name__)
def validate_environment() -> None:
if not os.getenv(ENV_TORRENT_DIR):
raise Exception(f'{ENV_TORRENT_DIR} environment variable is not set')
def get_abs_episode_from_env() -> int:
path = os.getenv(ENV_TORRENT_DIR, '').rstrip('/')
rel = path.replace(BGMI_BASE_PATH, '')
parts = rel.rstrip('/').split('/') if rel else []
for i in range(len(parts) - 1, -1, -1):
if parts[i].isdigit():
return int(parts[i])
raise Exception('Cannot extract absolute episode number from TR_TORRENT_DIR')
def read_show_ids_from_nfo(possible_paths: List[str]) -> tuple[str, str, str]:
tvdbid, tmdbid, title = '', '', ''
for nfo_path in possible_paths:
try:
if nfo_path and os.path.exists(nfo_path):
tree = ET.parse(nfo_path)
root = tree.getroot()
t = root.findtext('title') or ''
tvdb = root.findtext('tvdbid') or ''
tmdb = root.findtext('tmdbid') or ''
if not tvdb:
uid_nodes = root.findall('uniqueid')
for uid in uid_nodes:
if uid.get('type') == 'tvdb' and (uid.text or '').strip():
tvdb = uid.text.strip()
break
if not tmdb:
uid_nodes = root.findall('uniqueid')
for uid in uid_nodes:
if uid.get('type') == 'tmdb' and (uid.text or '').strip():
tmdb = uid.text.strip()
break
title = (t or title).strip()
tvdbid = (tvdb or tvdbid).strip()
tmdbid = (tmdb or tmdbid).strip()
if title or tvdbid or tmdbid:
break
except Exception:
continue
return tvdbid, tmdbid, title
def build_abs_map_via_tmdb(tmdbid: str) -> List[List[int]]:
flat: List[List[int]] = []
if not tmdbid or not TMDB_API_KEY:
return flat
try:
show_url = f'https://api.themoviedb.org/3/tv/{tmdbid}?api_key={TMDB_API_KEY}'
rs = requests.get(show_url, timeout=30)
rs.raise_for_status()
js = rs.json()
seasons = js.get('seasons') or []
season_numbers = sorted([s.get('season_number') for s in seasons if isinstance(s.get('season_number'), int) and s.get('season_number') > 0])
items_with_date = []
items_without_date = []
for sn in season_numbers:
sd_url = f'https://api.themoviedb.org/3/tv/{tmdbid}/season/{sn}?api_key={TMDB_API_KEY}'
rse = requests.get(sd_url, timeout=30)
rse.raise_for_status()
jse = rse.json()
eps = jse.get('episodes') or []
for ep in eps:
s = sn
e = ep.get('episode_number')
air_date = ep.get('air_date') or ''
if isinstance(e, int):
if air_date:
items_with_date.append((air_date, s, e))
else:
items_without_date.append((s, e))
items_with_date.sort(key=lambda x: (x[0], x[1], x[2]))
items_without_date.sort(key=lambda x: (x[0], x[1]))
flat = [[s, e] for _, s, e in items_with_date] + [[s, e] for s, e in items_without_date]
return flat
except Exception as e:
logger.warning(f'TMDB mapping failed: {e}')
return []
def get_season_episode_from_cache(abs_ep: int, tmdbid: str, show_dir: str) -> Optional[tuple[int, int]]:
if not show_dir:
return None
cache_path = os.path.join(show_dir, '.abs_map.json')
try:
if os.path.exists(cache_path):
with open(cache_path, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, dict) and data.get('tmdbid') == tmdbid and isinstance(data.get('flat'), list):
flat = data['flat']
idx = abs_ep - 1
if 0 <= idx < len(flat) and isinstance(flat[idx], list) and len(flat[idx]) == 2:
s, e = flat[idx]
if isinstance(s, int) and isinstance(e, int):
return s, e
except Exception:
pass
flat = build_abs_map_via_tmdb(tmdbid)
if flat:
try:
with open(cache_path, 'w', encoding='utf-8') as f:
json.dump({'tmdbid': tmdbid, 'flat': flat}, f, ensure_ascii=False)
except Exception:
pass
idx = abs_ep - 1
if 0 <= idx < len(flat):
s, e = flat[idx]
if isinstance(s, int) and isinstance(e, int):
return s, e
return None
def get_files(path: Optional[str] = None) -> List[str]:
if not path:
path = os.getenv(ENV_TORRENT_DIR, '')
if not path:
raise Exception('Path is not specified and TR_TORRENT_DIR is not set')
if not os.path.exists(path):
raise Exception(f'Path does not exist: {path}')
if not os.path.isdir(path):
raise Exception(f'Path is not a directory: {path}')
try:
return [os.path.join(path, item) for item in os.listdir(path)]
except OSError as e:
raise Exception(f'Cannot list directory {path}: {e}')
def find_video_file(path: Optional[str] = None) -> Optional[str]:
if not path:
path = os.getenv(ENV_TORRENT_DIR, '')
if not path or not os.path.exists(path):
logger.warning(f'Invalid path for find_video_file: {path}')
return None
try:
files = get_files(path)
except Exception as e:
logger.error(f'Error getting files: {e}')
return None
for file_path in files:
if os.path.isdir(file_path):
found_file = find_video_file(file_path)
if found_file:
try:
parent_dir = os.path.realpath(os.path.join(found_file, '..', '..'))
if os.path.exists(parent_dir):
new_location = os.path.join(parent_dir, os.path.basename(found_file))
shutil.move(found_file, new_location)
return new_location
else:
logger.warning(f'Parent directory does not exist: {parent_dir}')
return found_file
except (OSError, shutil.Error) as e:
logger.error(f'Failed to move file {found_file}: {e}')
return found_file
continue
if os.path.isfile(file_path):
try:
_, ext = os.path.splitext(file_path)
ext = ext.lstrip('.').lower()
if ext in VIDEO_EXTS:
return file_path
except Exception as e:
logger.warning(f'Error checking file {file_path}: {e}')
continue
return None
def find_subtitle_files(video_path: str) -> List[str]:
subtitle_files = []
video_dir = os.path.dirname(video_path)
video_name = os.path.splitext(os.path.basename(video_path))[0]
if not os.path.exists(video_dir):
return subtitle_files
try:
files = get_files(video_dir)
for file_path in files:
if os.path.isfile(file_path):
try:
file_name = os.path.splitext(os.path.basename(file_path))[0]
_, ext = os.path.splitext(file_path)
ext = ext.lstrip('.')
ext = ext.lower()
if ext in SUBTITLE_EXTS and file_name == video_name:
subtitle_files.append(file_path)
except Exception as e:
logger.warning(f'Error checking subtitle file {file_path}: {e}')
continue
except Exception as e:
logger.warning(f'Error finding subtitle files: {e}')
return subtitle_files
def scrape_new() -> bool:
try:
headers = {'api-key': TMM_API_KEY}
payload = [
{"action": "update", "scope": {"name": "all"}},
{"action": "scrape", "scope": {"name": "new"}},
]
response = requests.post(TMM_API_URL, headers=headers, json=payload, timeout=60)
response.raise_for_status()
return True
except requests.RequestException as e:
logger.error(f'Failed to call TMM API: {e}')
return False
except Exception as e:
logger.error(f'Unexpected error during scraping: {e}')
return False
def remove_transmission_seed() -> None:
try:
client = transmission_rpc.Client(
host=TR_HOST,
port=TR_PORT,
username=TR_USERNAME,
password=TR_PASSWORD,
path=TR_RPC_PATH,
)
torrent_hash = os.getenv(ENV_TORRENT_HASH, '')
if torrent_hash:
try:
client.remove_torrent(torrent_hash)
except Exception as e:
logger.warning(f'Failed to remove current torrent {torrent_hash}: {e}')
else:
logger.warning('TR_TORRENT_HASH not set, skipping current torrent removal')
try:
torrents = client.get_torrents()
remove_list = []
for torrent in torrents:
if hasattr(torrent, 'status') and 'seeding' in str(torrent.status).lower():
remove_list.append(torrent)
for torrent in remove_list:
try:
client.remove_torrent(torrent.info_hash)
except Exception as e:
logger.warning(f'Failed to remove seeding torrent {torrent.info_hash}: {e}')
except Exception as e:
logger.warning(f'Failed to get or remove seeding torrents: {e}')
except Exception as e:
logger.error(f'Failed to connect to Transmission RPC: {e}')
raise
def remove_directory_safely(directory: str) -> None:
try:
if os.path.exists(directory):
shutil.rmtree(directory)
except OSError as e:
logger.error(f'Failed to remove directory {directory}: {e}')
raise
def main() -> None:
try:
validate_environment()
video_file = find_video_file()
if not video_file:
raise Exception('No video file found in torrent directory')
_, video_ext = os.path.splitext(video_file)
video_ext = video_ext.lstrip('.')
abs_ep = get_abs_episode_from_env()
torrent_dir_env = os.getenv(ENV_TORRENT_DIR, '').rstrip('/')
parent_show_dir = os.path.normpath(os.path.join(torrent_dir_env, '..'))
nfo_path = os.path.join(parent_show_dir, 'tvshow.nfo') if parent_show_dir else ''
tvdbid, tmdbid, _ = read_show_ids_from_nfo([nfo_path])
if not tvdbid and not tmdbid:
logger.warning(f'tvshow.nfo not found or missing ids at: {nfo_path}')
mapped = get_season_episode_from_cache(abs_ep, tmdbid, parent_show_dir)
if mapped:
season_num, ep_num = mapped
else:
season_num, ep_num = 1, abs_ep
new_path_base = os.path.join(os.path.normpath(os.path.join(os.getenv(ENV_TORRENT_DIR, '').rstrip('/'), '..')), f'S{str(season_num).zfill(2)}E{str(ep_num).zfill(2)}')
new_video_path = f'{new_path_base}.{video_ext}'
new_dir = os.path.dirname(new_video_path)
if not os.path.exists(new_dir):
raise Exception(f'Parent directory does not exist: {new_dir}')
if os.path.exists(new_video_path):
logger.warning(f'Target file already exists: {new_video_path}, overwriting...')
subtitle_files = find_subtitle_files(video_file)
shutil.move(video_file, new_video_path)
for subtitle_file in subtitle_files:
try:
_, subtitle_ext = os.path.splitext(subtitle_file)
subtitle_ext = subtitle_ext.lstrip('.')
new_subtitle_path = f'{new_path_base}.{subtitle_ext}'
shutil.move(subtitle_file, new_subtitle_path)
except Exception as e:
logger.warning(f'Failed to move subtitle file {subtitle_file}: {e}')
if not scrape_new():
logger.warning('TMM scrape for new items failed, continuing...')
torrent_dir = os.getenv(ENV_TORRENT_DIR, '').rstrip('/')
if torrent_dir and os.path.exists(torrent_dir):
try:
remove_directory_safely(torrent_dir)
except Exception as e:
logger.warning(f'Failed to remove torrent directory {torrent_dir}: {e}')
try:
remove_transmission_seed()
except Exception as e:
logger.warning(f'Failed to remove transmission seeds (non-critical): {e}')
except Exception as e:
logger.error(f'Unexpected error: {e}', exc_info=True)
sys.exit(1)
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
logger.info('Interrupted by user')
sys.exit(130)
except Exception as e:
logger.error(f'Fatal error: {e}', exc_info=True)
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment