Skip to content

Instantly share code, notes, and snippets.

@evandhoffman
Created February 21, 2026 00:09
Show Gist options
  • Select an option

  • Save evandhoffman/6b9f9e13d8d1e7101ac0ae4957664707 to your computer and use it in GitHub Desktop.

Select an option

Save evandhoffman/6b9f9e13d8d1e7101ac0ae4957664707 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Manages quality definitions and profile assignments in Radarr and Sonarr.
Usage:
python3 set-quality-limits.py # set size limits
python3 set-quality-limits.py --dry-run # preview size limit changes
python3 set-quality-limits.py --assign-profile Any # assign profile to all content
python3 set-quality-limits.py --check-scores Any # verify custom format scores in a profile
python3 set-quality-limits.py --refresh # trigger Refresh & Scan for all content
python3 set-quality-limits.py --expire-bad # delete files with negative CF scores and re-search
python3 set-quality-limits.py --assign-profile Any --dry-run
"""
import argparse
import logging
import sys
import math
import json
import urllib.request
import urllib.error
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(levelname)-8s %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger(__name__)
INSTANCES = [
{
"label": "Radarr",
"url": "http://radarr:7878",
"api_key": "<key>",
},
{
"label": "Sonarr",
"url": "http://sonarr.168.1.179:8989",
"api_key": "<key>",
},
]
# MB/min limits per quality. Covers both Radarr and Sonarr naming conventions.
# Preferred size is computed as 75% of max, clamped to >= min.
# 2-hour movie reference: 500 MB/min = 60 GB, 250 = 30 GB, 50 = 6 GB
MAX_LIMITS = {
"Remux-2160p": 500,
"Bluray-2160p Remux": 500,
"Bluray-2160p": 500,
"WEBDL-2160p": 500,
"WEBRip-2160p": 500,
"HDTV-2160p": 500,
"Remux-1080p": 250,
"Bluray-1080p Remux": 250,
"Bluray-1080p": 250,
"WEBDL-1080p": 250,
"WEBRip-1080p": 250,
"HDTV-1080p": 250,
"Bluray-720p": 50,
"WEBDL-720p": 50,
"WEBRip-720p": 50,
"HDTV-720p": 50,
}
# Minimums: just enough to reject obviously garbage/mislabelled files.
# 2-hour movie reference: 100 MB/min = 12 GB, 40 = 5 GB, 20 = 2.4 GB, 5 = 600 MB
MIN_LIMITS = {
"Remux-2160p": 100,
"Bluray-2160p Remux": 100,
"Bluray-2160p": 50,
"WEBDL-2160p": 15,
"WEBRip-2160p": 15,
"HDTV-2160p": 15,
"Remux-1080p": 40,
"Bluray-1080p Remux": 40,
"Bluray-1080p": 20,
"WEBDL-1080p": 5,
"WEBRip-1080p": 5,
"HDTV-1080p": 10,
"Bluray-720p": 10,
"WEBDL-720p": 3,
"WEBRip-720p": 3,
"HDTV-720p": 5,
}
def api_request(url, api_key, method="GET", body=None):
headers = {
"X-Api-Key": api_key,
"Content-Type": "application/json",
}
data = json.dumps(body).encode() if body is not None else None
req = urllib.request.Request(url, data=data, headers=headers, method=method)
log.debug("%s %s", method, url)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
raw = resp.read()
log.debug("Response %d, %d bytes", resp.status, len(raw))
return json.loads(raw) if raw else None
except urllib.error.HTTPError as e:
body_text = e.read().decode(errors="replace")
log.error("HTTP %d from %s: %s", e.code, url, body_text)
raise
except urllib.error.URLError as e:
log.error("Connection failed to %s: %s", url, e.reason)
raise
def process_instance(label, base_url, api_key, dry_run):
log.info("=== %s ===", label)
log.info("Fetching quality definitions from %s", base_url)
definitions = api_request(f"{base_url}/api/v3/qualitydefinition", api_key)
log.info("Got %d quality definitions", len(definitions))
updated = []
changed = []
for qd in definitions:
name = qd["quality"]["name"]
old_min = qd.get("minSize")
old_max = qd.get("maxSize")
old_pref = qd.get("preferredSize")
if name in MAX_LIMITS:
new_max = MAX_LIMITS[name]
new_min = MIN_LIMITS[name]
old_min = qd.get("minSize")
# Preferred must sit between min and max
new_pref = max(math.floor(new_max * 0.75), math.ceil(new_min))
if old_max != new_max or old_min != new_min or old_pref != new_pref:
log.debug(
" %-25s min: %s -> %s max: %s -> %s preferred: %s -> %s",
name, old_min, new_min, old_max, new_max, old_pref, new_pref,
)
changed.append(name)
qd["minSize"] = new_min
qd["maxSize"] = new_max
qd["preferredSize"] = new_pref
else:
log.debug(" %-25s already correct (min=%s max=%s)", name, old_min, old_max)
else:
log.debug(" %-25s no limit defined, skipping (max=%s)", name, qd.get("maxSize"))
updated.append(qd)
if not changed:
log.info("All limits already correct, nothing to do.")
return
log.info("%d qualities to update: %s", len(changed), ", ".join(changed))
if dry_run:
log.info("DRY RUN — skipping PUT")
return
log.info("Sending updated definitions to %s", base_url)
api_request(
f"{base_url}/api/v3/qualitydefinition/update",
api_key,
method="PUT",
body=updated,
)
log.info("Done.")
# Expected custom format scores, split by which instance type they apply to.
# Format names must match exactly what Sonarr/Radarr show in the UI (set by recyclarr).
EXPECTED_SCORES_COMMON = {
# --- Blocked ---
"BR-DISK": -10000,
"LQ": -10000,
"DV (w/o HDR fallback)": -10000,
# --- HDR ---
"HDR": 500,
"DV (Disk)": 500,
"DV Boost": 250,
"HDR10+ Boost": 100,
# --- Release groups ---
"WEB Tier 01": 100,
"WEB Tier 02": 100,
"WEB Tier 03": 100,
# --- Repack ---
"Repack/Proper": 5,
"Repack2": 5,
}
EXPECTED_SCORES_RADARR = {
# --- Blocked ---
"3D": -10000,
"Upscaled": -10000,
# --- Release groups ---
"Remux Tier 01": 100,
"Remux Tier 02": 100,
"HD Bluray Tier 01": 100,
"HD Bluray Tier 02": 100,
# --- Movie versions ---
"Hybrid": 150,
"Remaster": 150,
"4K Remaster": 150,
"Criterion Collection": 150,
"Masters of Cinema": 150,
"Special Edition": 150,
"IMAX": 150,
"IMAX Enhanced": 150,
}
EXPECTED_SCORES_SONARR = {
"WEB Scene": 100,
"Repack3": 5,
}
def check_scores(label, base_url, api_key, profile_name):
log.info("=== %s: checking scores in profile '%s' ===", label, profile_name)
profiles = api_request(f"{base_url}/api/v3/qualityprofile", api_key)
profile = next((p for p in profiles if p["name"] == profile_name), None)
if not profile:
available = [p["name"] for p in profiles]
log.error("Profile '%s' not found. Available: %s", profile_name, available)
return
# Build a name -> score map from the profile's formatItems
actual = {item["name"]: item["score"] for item in profile.get("formatItems", [])}
is_radarr = "7878" in base_url
expected = {**EXPECTED_SCORES_COMMON}
expected.update(EXPECTED_SCORES_RADARR if is_radarr else EXPECTED_SCORES_SONARR)
ok = errors = missing = 0
for fmt_name, expected_score in sorted(expected.items()):
if fmt_name not in actual:
log.warning(" %-30s NOT FOUND in profile (not synced yet?)", fmt_name)
missing += 1
elif actual[fmt_name] != expected_score:
log.error(
" %-30s WRONG SCORE: got %d, expected %d",
fmt_name, actual[fmt_name], expected_score,
)
errors += 1
else:
log.debug(" %-30s OK (%d)", fmt_name, actual[fmt_name])
ok += 1
log.info(
"Result: %d OK, %d wrong, %d not found in profile",
ok, errors, missing,
)
if errors:
log.error("Score mismatches detected — re-run recyclarr sync to fix.")
def assign_profile(label, base_url, api_key, profile_name, dry_run):
log.info("=== %s: assign profile '%s' ===", label, profile_name)
# Look up the profile ID by name
profiles = api_request(f"{base_url}/api/v3/qualityprofile", api_key)
match = next((p for p in profiles if p["name"] == profile_name), None)
if not match:
available = [p["name"] for p in profiles]
log.error("Profile '%s' not found. Available: %s", profile_name, available)
return
profile_id = match["id"]
log.info("Found profile '%s' with id=%d", profile_name, profile_id)
# Radarr uses /movie, Sonarr uses /series
is_radarr = "7878" in base_url
content_endpoint = "movie" if is_radarr else "series"
editor_endpoint = f"{base_url}/api/v3/{content_endpoint}/editor"
id_field = "id"
content = api_request(f"{base_url}/api/v3/{content_endpoint}", api_key)
already_correct = [c for c in content if c.get("qualityProfileId") == profile_id]
to_update = [c for c in content if c.get("qualityProfileId") != profile_id]
log.info(
"%d items already on '%s', %d to update",
len(already_correct), profile_name, len(to_update),
)
if not to_update:
log.info("Nothing to do.")
return
for c in to_update:
title = c.get("title", c.get("id"))
log.debug(" will update: %s (current profileId=%s)", title, c.get("qualityProfileId"))
if dry_run:
log.info("DRY RUN — skipping PUT to %s", editor_endpoint)
return
ids = [c[id_field] for c in to_update]
payload = {"qualityProfileId": profile_id}
if is_radarr:
payload["movieIds"] = ids
else:
payload["seriesIds"] = ids
api_request(editor_endpoint, api_key, method="PUT", body=payload)
log.info("Done — updated %d items.", len(ids))
def expire_bad_files(label, base_url, api_key, dry_run):
"""Find files with negative custom format scores, delete them, and trigger a search."""
log.info("=== %s: expiring files with negative custom format scores ===", label)
is_radarr = "7878" in base_url
if is_radarr:
movies = api_request(f"{base_url}/api/v3/movie", api_key)
bad = [
m for m in movies
if m.get("hasFile") and m.get("movieFile", {}).get("customFormatScore", 0) < 0
]
if not bad:
log.info("No movies with negative custom format scores found.")
return
for m in bad:
score = m["movieFile"]["customFormatScore"]
formats = [cf["name"] for cf in m["movieFile"].get("customFormats", [])]
log.warning(" BAD: %-50s score=%-8d formats=%s", m["title"], score, formats)
log.info("Found %d movie(s) with negative scores.", len(bad))
if dry_run:
log.info("DRY RUN — would delete %d file(s) and trigger search.", len(bad))
return
for m in bad:
file_id = m["movieFile"]["id"]
log.info(" Deleting file id=%d for '%s'", file_id, m["title"])
api_request(f"{base_url}/api/v3/moviefile/{file_id}", api_key, method="DELETE")
movie_ids = [m["id"] for m in bad]
log.info("Triggering search for %d movie(s)...", len(movie_ids))
result = api_request(
f"{base_url}/api/v3/command", api_key,
method="POST", body={"name": "MoviesSearch", "movieIds": movie_ids},
)
log.info("Search command queued: id=%s", result.get("id"))
else: # Sonarr
series_list = api_request(f"{base_url}/api/v3/series", api_key)
series_with_files = [
s for s in series_list
if s.get("statistics", {}).get("episodeFileCount", 0) > 0
]
log.info("Scanning episode files for %d series...", len(series_with_files))
bad_series = {} # series_id -> {series, files}
for series in series_with_files:
sid = series["id"]
ep_files = api_request(
f"{base_url}/api/v3/episodefile?seriesId={sid}", api_key
)
bad_files = [f for f in ep_files if f.get("customFormatScore", 0) < 0]
if bad_files:
bad_series[sid] = {"series": series, "files": bad_files}
if not bad_series:
log.info("No episode files with negative custom format scores found.")
return
total_files = sum(len(d["files"]) for d in bad_series.values())
for sid, data in bad_series.items():
for f in data["files"]:
score = f.get("customFormatScore", 0)
formats = [cf["name"] for cf in f.get("customFormats", [])]
log.warning(
" BAD: %-40s %-50s score=%-8d formats=%s",
data["series"]["title"], f.get("relativePath", "?"), score, formats,
)
log.info(
"Found %d file(s) across %d series with negative scores.",
total_files, len(bad_series),
)
if dry_run:
log.info("DRY RUN — would delete %d file(s) and trigger search for %d series.",
total_files, len(bad_series))
return
for sid, data in bad_series.items():
for f in data["files"]:
log.info(" Deleting episode file id=%d '%s'", f["id"], f.get("relativePath"))
api_request(f"{base_url}/api/v3/episodefile/{f['id']}", api_key, method="DELETE")
for sid, data in bad_series.items():
log.info(" Triggering search for series '%s'", data["series"]["title"])
api_request(
f"{base_url}/api/v3/command", api_key,
method="POST", body={"name": "SeriesSearch", "seriesId": sid},
)
log.info("Done — deleted %d file(s), triggered search for %d series.",
total_files, len(bad_series))
def refresh_all(label, base_url, api_key, dry_run):
log.info("=== %s: Refresh & Scan all content ===", label)
is_radarr = "7878" in base_url
command = "RefreshMovie" if is_radarr else "RefreshSeries"
if dry_run:
log.info("DRY RUN — would POST command '%s'", command)
return
result = api_request(
f"{base_url}/api/v3/command",
api_key,
method="POST",
body={"name": command},
)
log.info("Command queued: id=%s name=%s state=%s", result.get("id"), result.get("name"), result.get("status"))
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--dry-run", action="store_true", help="Preview without applying")
parser.add_argument("--assign-profile", metavar="NAME",
help="Bulk-assign a quality profile to all movies/series")
parser.add_argument("--check-scores", metavar="NAME",
help="Verify custom format scores in the named quality profile")
parser.add_argument("--refresh", action="store_true",
help="Trigger Refresh & Scan for all movies/series")
parser.add_argument("--expire-bad", action="store_true",
help="Delete files with negative CF scores and trigger a new search")
args = parser.parse_args()
if args.dry_run:
log.info("DRY RUN mode — no changes will be made")
for instance in INSTANCES:
try:
if args.assign_profile:
assign_profile(
instance["label"],
instance["url"],
instance["api_key"],
args.assign_profile,
args.dry_run,
)
elif args.check_scores:
check_scores(
instance["label"],
instance["url"],
instance["api_key"],
args.check_scores,
)
elif args.refresh:
refresh_all(
instance["label"],
instance["url"],
instance["api_key"],
args.dry_run,
)
elif args.expire_bad:
expire_bad_files(
instance["label"],
instance["url"],
instance["api_key"],
args.dry_run,
)
else:
process_instance(
instance["label"],
instance["url"],
instance["api_key"],
args.dry_run,
)
except Exception as e:
log.error("Failed to process %s: %s", instance["label"], e)
log.info("All done.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment