Created
February 21, 2026 00:09
-
-
Save evandhoffman/6b9f9e13d8d1e7101ac0ae4957664707 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Manages quality definitions and profile assignments in Radarr and Sonarr. | |
| Usage: | |
| python3 set-quality-limits.py # set size limits | |
| python3 set-quality-limits.py --dry-run # preview size limit changes | |
| python3 set-quality-limits.py --assign-profile Any # assign profile to all content | |
| python3 set-quality-limits.py --check-scores Any # verify custom format scores in a profile | |
| python3 set-quality-limits.py --refresh # trigger Refresh & Scan for all content | |
| python3 set-quality-limits.py --expire-bad # delete files with negative CF scores and re-search | |
| python3 set-quality-limits.py --assign-profile Any --dry-run | |
| """ | |
| import argparse | |
| import logging | |
| import sys | |
| import math | |
| import json | |
| import urllib.request | |
| import urllib.error | |
| logging.basicConfig( | |
| level=logging.DEBUG, | |
| format="%(asctime)s %(levelname)-8s %(message)s", | |
| datefmt="%H:%M:%S", | |
| ) | |
| log = logging.getLogger(__name__) | |
| INSTANCES = [ | |
| { | |
| "label": "Radarr", | |
| "url": "http://radarr:7878", | |
| "api_key": "<key>", | |
| }, | |
| { | |
| "label": "Sonarr", | |
| "url": "http://sonarr.168.1.179:8989", | |
| "api_key": "<key>", | |
| }, | |
| ] | |
| # MB/min limits per quality. Covers both Radarr and Sonarr naming conventions. | |
| # Preferred size is computed as 75% of max, clamped to >= min. | |
| # 2-hour movie reference: 500 MB/min = 60 GB, 250 = 30 GB, 50 = 6 GB | |
| MAX_LIMITS = { | |
| "Remux-2160p": 500, | |
| "Bluray-2160p Remux": 500, | |
| "Bluray-2160p": 500, | |
| "WEBDL-2160p": 500, | |
| "WEBRip-2160p": 500, | |
| "HDTV-2160p": 500, | |
| "Remux-1080p": 250, | |
| "Bluray-1080p Remux": 250, | |
| "Bluray-1080p": 250, | |
| "WEBDL-1080p": 250, | |
| "WEBRip-1080p": 250, | |
| "HDTV-1080p": 250, | |
| "Bluray-720p": 50, | |
| "WEBDL-720p": 50, | |
| "WEBRip-720p": 50, | |
| "HDTV-720p": 50, | |
| } | |
| # Minimums: just enough to reject obviously garbage/mislabelled files. | |
| # 2-hour movie reference: 100 MB/min = 12 GB, 40 = 5 GB, 20 = 2.4 GB, 5 = 600 MB | |
| MIN_LIMITS = { | |
| "Remux-2160p": 100, | |
| "Bluray-2160p Remux": 100, | |
| "Bluray-2160p": 50, | |
| "WEBDL-2160p": 15, | |
| "WEBRip-2160p": 15, | |
| "HDTV-2160p": 15, | |
| "Remux-1080p": 40, | |
| "Bluray-1080p Remux": 40, | |
| "Bluray-1080p": 20, | |
| "WEBDL-1080p": 5, | |
| "WEBRip-1080p": 5, | |
| "HDTV-1080p": 10, | |
| "Bluray-720p": 10, | |
| "WEBDL-720p": 3, | |
| "WEBRip-720p": 3, | |
| "HDTV-720p": 5, | |
| } | |
| def api_request(url, api_key, method="GET", body=None): | |
| headers = { | |
| "X-Api-Key": api_key, | |
| "Content-Type": "application/json", | |
| } | |
| data = json.dumps(body).encode() if body is not None else None | |
| req = urllib.request.Request(url, data=data, headers=headers, method=method) | |
| log.debug("%s %s", method, url) | |
| try: | |
| with urllib.request.urlopen(req, timeout=10) as resp: | |
| raw = resp.read() | |
| log.debug("Response %d, %d bytes", resp.status, len(raw)) | |
| return json.loads(raw) if raw else None | |
| except urllib.error.HTTPError as e: | |
| body_text = e.read().decode(errors="replace") | |
| log.error("HTTP %d from %s: %s", e.code, url, body_text) | |
| raise | |
| except urllib.error.URLError as e: | |
| log.error("Connection failed to %s: %s", url, e.reason) | |
| raise | |
| def process_instance(label, base_url, api_key, dry_run): | |
| log.info("=== %s ===", label) | |
| log.info("Fetching quality definitions from %s", base_url) | |
| definitions = api_request(f"{base_url}/api/v3/qualitydefinition", api_key) | |
| log.info("Got %d quality definitions", len(definitions)) | |
| updated = [] | |
| changed = [] | |
| for qd in definitions: | |
| name = qd["quality"]["name"] | |
| old_min = qd.get("minSize") | |
| old_max = qd.get("maxSize") | |
| old_pref = qd.get("preferredSize") | |
| if name in MAX_LIMITS: | |
| new_max = MAX_LIMITS[name] | |
| new_min = MIN_LIMITS[name] | |
| old_min = qd.get("minSize") | |
| # Preferred must sit between min and max | |
| new_pref = max(math.floor(new_max * 0.75), math.ceil(new_min)) | |
| if old_max != new_max or old_min != new_min or old_pref != new_pref: | |
| log.debug( | |
| " %-25s min: %s -> %s max: %s -> %s preferred: %s -> %s", | |
| name, old_min, new_min, old_max, new_max, old_pref, new_pref, | |
| ) | |
| changed.append(name) | |
| qd["minSize"] = new_min | |
| qd["maxSize"] = new_max | |
| qd["preferredSize"] = new_pref | |
| else: | |
| log.debug(" %-25s already correct (min=%s max=%s)", name, old_min, old_max) | |
| else: | |
| log.debug(" %-25s no limit defined, skipping (max=%s)", name, qd.get("maxSize")) | |
| updated.append(qd) | |
| if not changed: | |
| log.info("All limits already correct, nothing to do.") | |
| return | |
| log.info("%d qualities to update: %s", len(changed), ", ".join(changed)) | |
| if dry_run: | |
| log.info("DRY RUN — skipping PUT") | |
| return | |
| log.info("Sending updated definitions to %s", base_url) | |
| api_request( | |
| f"{base_url}/api/v3/qualitydefinition/update", | |
| api_key, | |
| method="PUT", | |
| body=updated, | |
| ) | |
| log.info("Done.") | |
| # Expected custom format scores, split by which instance type they apply to. | |
| # Format names must match exactly what Sonarr/Radarr show in the UI (set by recyclarr). | |
| EXPECTED_SCORES_COMMON = { | |
| # --- Blocked --- | |
| "BR-DISK": -10000, | |
| "LQ": -10000, | |
| "DV (w/o HDR fallback)": -10000, | |
| # --- HDR --- | |
| "HDR": 500, | |
| "DV (Disk)": 500, | |
| "DV Boost": 250, | |
| "HDR10+ Boost": 100, | |
| # --- Release groups --- | |
| "WEB Tier 01": 100, | |
| "WEB Tier 02": 100, | |
| "WEB Tier 03": 100, | |
| # --- Repack --- | |
| "Repack/Proper": 5, | |
| "Repack2": 5, | |
| } | |
| EXPECTED_SCORES_RADARR = { | |
| # --- Blocked --- | |
| "3D": -10000, | |
| "Upscaled": -10000, | |
| # --- Release groups --- | |
| "Remux Tier 01": 100, | |
| "Remux Tier 02": 100, | |
| "HD Bluray Tier 01": 100, | |
| "HD Bluray Tier 02": 100, | |
| # --- Movie versions --- | |
| "Hybrid": 150, | |
| "Remaster": 150, | |
| "4K Remaster": 150, | |
| "Criterion Collection": 150, | |
| "Masters of Cinema": 150, | |
| "Special Edition": 150, | |
| "IMAX": 150, | |
| "IMAX Enhanced": 150, | |
| } | |
| EXPECTED_SCORES_SONARR = { | |
| "WEB Scene": 100, | |
| "Repack3": 5, | |
| } | |
| def check_scores(label, base_url, api_key, profile_name): | |
| log.info("=== %s: checking scores in profile '%s' ===", label, profile_name) | |
| profiles = api_request(f"{base_url}/api/v3/qualityprofile", api_key) | |
| profile = next((p for p in profiles if p["name"] == profile_name), None) | |
| if not profile: | |
| available = [p["name"] for p in profiles] | |
| log.error("Profile '%s' not found. Available: %s", profile_name, available) | |
| return | |
| # Build a name -> score map from the profile's formatItems | |
| actual = {item["name"]: item["score"] for item in profile.get("formatItems", [])} | |
| is_radarr = "7878" in base_url | |
| expected = {**EXPECTED_SCORES_COMMON} | |
| expected.update(EXPECTED_SCORES_RADARR if is_radarr else EXPECTED_SCORES_SONARR) | |
| ok = errors = missing = 0 | |
| for fmt_name, expected_score in sorted(expected.items()): | |
| if fmt_name not in actual: | |
| log.warning(" %-30s NOT FOUND in profile (not synced yet?)", fmt_name) | |
| missing += 1 | |
| elif actual[fmt_name] != expected_score: | |
| log.error( | |
| " %-30s WRONG SCORE: got %d, expected %d", | |
| fmt_name, actual[fmt_name], expected_score, | |
| ) | |
| errors += 1 | |
| else: | |
| log.debug(" %-30s OK (%d)", fmt_name, actual[fmt_name]) | |
| ok += 1 | |
| log.info( | |
| "Result: %d OK, %d wrong, %d not found in profile", | |
| ok, errors, missing, | |
| ) | |
| if errors: | |
| log.error("Score mismatches detected — re-run recyclarr sync to fix.") | |
| def assign_profile(label, base_url, api_key, profile_name, dry_run): | |
| log.info("=== %s: assign profile '%s' ===", label, profile_name) | |
| # Look up the profile ID by name | |
| profiles = api_request(f"{base_url}/api/v3/qualityprofile", api_key) | |
| match = next((p for p in profiles if p["name"] == profile_name), None) | |
| if not match: | |
| available = [p["name"] for p in profiles] | |
| log.error("Profile '%s' not found. Available: %s", profile_name, available) | |
| return | |
| profile_id = match["id"] | |
| log.info("Found profile '%s' with id=%d", profile_name, profile_id) | |
| # Radarr uses /movie, Sonarr uses /series | |
| is_radarr = "7878" in base_url | |
| content_endpoint = "movie" if is_radarr else "series" | |
| editor_endpoint = f"{base_url}/api/v3/{content_endpoint}/editor" | |
| id_field = "id" | |
| content = api_request(f"{base_url}/api/v3/{content_endpoint}", api_key) | |
| already_correct = [c for c in content if c.get("qualityProfileId") == profile_id] | |
| to_update = [c for c in content if c.get("qualityProfileId") != profile_id] | |
| log.info( | |
| "%d items already on '%s', %d to update", | |
| len(already_correct), profile_name, len(to_update), | |
| ) | |
| if not to_update: | |
| log.info("Nothing to do.") | |
| return | |
| for c in to_update: | |
| title = c.get("title", c.get("id")) | |
| log.debug(" will update: %s (current profileId=%s)", title, c.get("qualityProfileId")) | |
| if dry_run: | |
| log.info("DRY RUN — skipping PUT to %s", editor_endpoint) | |
| return | |
| ids = [c[id_field] for c in to_update] | |
| payload = {"qualityProfileId": profile_id} | |
| if is_radarr: | |
| payload["movieIds"] = ids | |
| else: | |
| payload["seriesIds"] = ids | |
| api_request(editor_endpoint, api_key, method="PUT", body=payload) | |
| log.info("Done — updated %d items.", len(ids)) | |
| def expire_bad_files(label, base_url, api_key, dry_run): | |
| """Find files with negative custom format scores, delete them, and trigger a search.""" | |
| log.info("=== %s: expiring files with negative custom format scores ===", label) | |
| is_radarr = "7878" in base_url | |
| if is_radarr: | |
| movies = api_request(f"{base_url}/api/v3/movie", api_key) | |
| bad = [ | |
| m for m in movies | |
| if m.get("hasFile") and m.get("movieFile", {}).get("customFormatScore", 0) < 0 | |
| ] | |
| if not bad: | |
| log.info("No movies with negative custom format scores found.") | |
| return | |
| for m in bad: | |
| score = m["movieFile"]["customFormatScore"] | |
| formats = [cf["name"] for cf in m["movieFile"].get("customFormats", [])] | |
| log.warning(" BAD: %-50s score=%-8d formats=%s", m["title"], score, formats) | |
| log.info("Found %d movie(s) with negative scores.", len(bad)) | |
| if dry_run: | |
| log.info("DRY RUN — would delete %d file(s) and trigger search.", len(bad)) | |
| return | |
| for m in bad: | |
| file_id = m["movieFile"]["id"] | |
| log.info(" Deleting file id=%d for '%s'", file_id, m["title"]) | |
| api_request(f"{base_url}/api/v3/moviefile/{file_id}", api_key, method="DELETE") | |
| movie_ids = [m["id"] for m in bad] | |
| log.info("Triggering search for %d movie(s)...", len(movie_ids)) | |
| result = api_request( | |
| f"{base_url}/api/v3/command", api_key, | |
| method="POST", body={"name": "MoviesSearch", "movieIds": movie_ids}, | |
| ) | |
| log.info("Search command queued: id=%s", result.get("id")) | |
| else: # Sonarr | |
| series_list = api_request(f"{base_url}/api/v3/series", api_key) | |
| series_with_files = [ | |
| s for s in series_list | |
| if s.get("statistics", {}).get("episodeFileCount", 0) > 0 | |
| ] | |
| log.info("Scanning episode files for %d series...", len(series_with_files)) | |
| bad_series = {} # series_id -> {series, files} | |
| for series in series_with_files: | |
| sid = series["id"] | |
| ep_files = api_request( | |
| f"{base_url}/api/v3/episodefile?seriesId={sid}", api_key | |
| ) | |
| bad_files = [f for f in ep_files if f.get("customFormatScore", 0) < 0] | |
| if bad_files: | |
| bad_series[sid] = {"series": series, "files": bad_files} | |
| if not bad_series: | |
| log.info("No episode files with negative custom format scores found.") | |
| return | |
| total_files = sum(len(d["files"]) for d in bad_series.values()) | |
| for sid, data in bad_series.items(): | |
| for f in data["files"]: | |
| score = f.get("customFormatScore", 0) | |
| formats = [cf["name"] for cf in f.get("customFormats", [])] | |
| log.warning( | |
| " BAD: %-40s %-50s score=%-8d formats=%s", | |
| data["series"]["title"], f.get("relativePath", "?"), score, formats, | |
| ) | |
| log.info( | |
| "Found %d file(s) across %d series with negative scores.", | |
| total_files, len(bad_series), | |
| ) | |
| if dry_run: | |
| log.info("DRY RUN — would delete %d file(s) and trigger search for %d series.", | |
| total_files, len(bad_series)) | |
| return | |
| for sid, data in bad_series.items(): | |
| for f in data["files"]: | |
| log.info(" Deleting episode file id=%d '%s'", f["id"], f.get("relativePath")) | |
| api_request(f"{base_url}/api/v3/episodefile/{f['id']}", api_key, method="DELETE") | |
| for sid, data in bad_series.items(): | |
| log.info(" Triggering search for series '%s'", data["series"]["title"]) | |
| api_request( | |
| f"{base_url}/api/v3/command", api_key, | |
| method="POST", body={"name": "SeriesSearch", "seriesId": sid}, | |
| ) | |
| log.info("Done — deleted %d file(s), triggered search for %d series.", | |
| total_files, len(bad_series)) | |
| def refresh_all(label, base_url, api_key, dry_run): | |
| log.info("=== %s: Refresh & Scan all content ===", label) | |
| is_radarr = "7878" in base_url | |
| command = "RefreshMovie" if is_radarr else "RefreshSeries" | |
| if dry_run: | |
| log.info("DRY RUN — would POST command '%s'", command) | |
| return | |
| result = api_request( | |
| f"{base_url}/api/v3/command", | |
| api_key, | |
| method="POST", | |
| body={"name": command}, | |
| ) | |
| log.info("Command queued: id=%s name=%s state=%s", result.get("id"), result.get("name"), result.get("status")) | |
| def main(): | |
| parser = argparse.ArgumentParser(description=__doc__) | |
| parser.add_argument("--dry-run", action="store_true", help="Preview without applying") | |
| parser.add_argument("--assign-profile", metavar="NAME", | |
| help="Bulk-assign a quality profile to all movies/series") | |
| parser.add_argument("--check-scores", metavar="NAME", | |
| help="Verify custom format scores in the named quality profile") | |
| parser.add_argument("--refresh", action="store_true", | |
| help="Trigger Refresh & Scan for all movies/series") | |
| parser.add_argument("--expire-bad", action="store_true", | |
| help="Delete files with negative CF scores and trigger a new search") | |
| args = parser.parse_args() | |
| if args.dry_run: | |
| log.info("DRY RUN mode — no changes will be made") | |
| for instance in INSTANCES: | |
| try: | |
| if args.assign_profile: | |
| assign_profile( | |
| instance["label"], | |
| instance["url"], | |
| instance["api_key"], | |
| args.assign_profile, | |
| args.dry_run, | |
| ) | |
| elif args.check_scores: | |
| check_scores( | |
| instance["label"], | |
| instance["url"], | |
| instance["api_key"], | |
| args.check_scores, | |
| ) | |
| elif args.refresh: | |
| refresh_all( | |
| instance["label"], | |
| instance["url"], | |
| instance["api_key"], | |
| args.dry_run, | |
| ) | |
| elif args.expire_bad: | |
| expire_bad_files( | |
| instance["label"], | |
| instance["url"], | |
| instance["api_key"], | |
| args.dry_run, | |
| ) | |
| else: | |
| process_instance( | |
| instance["label"], | |
| instance["url"], | |
| instance["api_key"], | |
| args.dry_run, | |
| ) | |
| except Exception as e: | |
| log.error("Failed to process %s: %s", instance["label"], e) | |
| log.info("All done.") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment