Skip to content

Instantly share code, notes, and snippets.

@grobertson
Created February 25, 2026 03:14
Show Gist options
  • Select an option

  • Save grobertson/d82e5e2333080ab049e820d0978aa04b to your computer and use it in GitHub Desktop.

Select an option

Save grobertson/d82e5e2333080ab049e820d0978aa04b to your computer and use it in GitHub Desktop.
This tool can be used to undo the damage of a bug in MediaCMS which wipes out ownership information on item update.
#!/usr/bin/env python3
"""
restore_owners.py — Recover correct media ownership after enrichment
scripts incorrectly re-assigned every item to the admin account.
Root cause (MediaCMS bug)
--------------------------
PUT /api/v1/media/{token} always calls ``serializer.save(user=request.user)``
in the MediaDetail view, even though ``user`` is declared ReadOnlyField on
the serializer. Any admin-token PUT therefore silently overwrites the stored
owner with the admin account.
Recovery strategy
-----------------
1. Fetch every media item via GET /api/v1/manage_media (paginated).
2. For items whose reported ``user`` field matches ADMIN_USER, fetch the
individual item via GET /api/v1/media/{token} to obtain the
``original_media_url`` field (only exposed on the single-item endpoint).
3. Parse the real uploader's username from that URL path.
MediaCMS stores originals at:
{MEDIA_UPLOAD_DIR}user/{username}/{uid}.{filename}
so the username sits between ``/user/`` and the next ``/``.
4. Group all affected friendly_tokens by recovered username.
5. POST /api/v1/media/user/bulk_actions with action=change_owner for each
username group to atomically restore ownership.
Usage
-----
python restore_owners.py --token TOKEN
python restore_owners.py --token TOKEN --admin mysite-admin --dry-run
python restore_owners.py --token TOKEN --api-url https://example.com/api/v1
"""
from __future__ import annotations
import argparse
import re
import sys
import time
from collections import defaultdict
import requests
# ── Defaults ──────────────────────────────────────────────────────────────────
API_BASE = "https://www.dropsugar.co/api/v1"
DEFAULT_TIMEOUT = 30
REQUEST_DELAY = 0.30 # seconds between individual media fetches
BATCH_SIZE = 50 # tokens per change_owner call
# Only restore ownership to these users; all others remain under admin.
# Add usernames here, or pass them via --greenlist on the command line.
_DEFAULT_GREENLIST: tuple[str, ...] = ()
# MediaCMS stores original files at .../user/{username}/...
# We try two common patterns for robustness.
_USERNAME_PATTERNS = [
re.compile(r"/user/([^/]+)/"),
re.compile(r"/user-data/([^/]+)/"),
]
# ── HTTP helpers ───────────────────────────────────────────────────────────────
def _get_with_backoff(
session: requests.Session,
url: str,
*,
params: dict | None = None,
max_retries: int = 4,
delay: float = REQUEST_DELAY,
) -> requests.Response:
backoff = max(delay, 1.0)
for attempt in range(max_retries + 1):
r = session.get(url, params=params, timeout=DEFAULT_TIMEOUT)
if r.status_code != 429:
return r
wait = min(float(r.headers.get("Retry-After", backoff)), 60)
print(f" ** 429 rate-limited — waiting {wait:.0f}s **")
time.sleep(wait)
backoff *= 2
return r
def _post_with_backoff(
session: requests.Session,
url: str,
payload: dict,
*,
max_retries: int = 4,
delay: float = REQUEST_DELAY,
) -> requests.Response:
backoff = max(delay, 1.0)
for attempt in range(max_retries + 1):
r = session.post(url, json=payload, timeout=DEFAULT_TIMEOUT)
if r.status_code != 429:
return r
wait = min(float(r.headers.get("Retry-After", backoff)), 60)
print(f" ** 429 rate-limited — waiting {wait:.0f}s **")
time.sleep(wait)
backoff *= 2
return r
# ── Step 1 — fetch catalog ─────────────────────────────────────────────────────
def fetch_all_media(
session: requests.Session,
api_base: str,
) -> list[dict]:
"""Paginate through /manage_media and return every item."""
all_items: list[dict] = []
page = 1
total: int | None = None
resp = _get_with_backoff(session, f"{api_base}/manage_media", params={"page": 1})
if resp.status_code == 403:
print(" ⚠ /manage_media returned 403 — falling back to /media "
"(may be capped at ~1000)", file=sys.stderr)
resp = _get_with_backoff(session, f"{api_base}/media", params={"page": 1})
resp.raise_for_status()
data = resp.json()
total = data.get("count", 0)
all_items.extend(data.get("results", []))
print(f" Total media in CMS: {total}")
while data.get("next"):
page += 1
time.sleep(REQUEST_DELAY)
resp = _get_with_backoff(
session, f"{api_base}/manage_media", params={"page": page},
)
resp.raise_for_status()
data = resp.json()
all_items.extend(data.get("results", []))
pct = int(len(all_items) / total * 100) if total else 0
print(f"\r Fetched {len(all_items)}/{total} ({pct}%)",
end="", flush=True)
if total:
print(f"\r Fetched {len(all_items)}/{total} (100%) ")
return all_items
# ── Step 2 — resolve real owner from original_media_url ───────────────────────
def _parse_username_from_url(url: str) -> str | None:
"""Extract the original uploader's username from an original_media_url.
MediaCMS stores files at:
{MEDIA_UPLOAD_DIR}user/{username}/{uid}.{filename}
so the full URL contains /user/{username}/ in its path.
"""
if not url:
return None
for pattern in _USERNAME_PATTERNS:
m = pattern.search(url)
if m:
return m.group(1)
return None
def fetch_original_owner(
session: requests.Session,
api_base: str,
friendly_token: str,
delay: float,
) -> str | None:
"""Return the username embedded in the item's original_media_url, or None."""
time.sleep(delay)
r = _get_with_backoff(session, f"{api_base}/media/{friendly_token}")
if r.status_code != 200:
return None
data = r.json()
url = data.get("original_media_url") or ""
return _parse_username_from_url(url)
# ── Step 3 — group tokens by recovered username ────────────────────────────────
def build_ownership_map(
session: requests.Session,
api_base: str,
affected: list[dict],
delay: float,
known_admin: str,
greenlist: frozenset[str] | None = None,
) -> dict[str, list[str]]:
"""For every item in *affected*, look up the real owner and group tokens.
Returns {real_username: [friendly_token, ...]}.
Items whose real owner cannot be determined are collected under the
special key ``"__unknown__"`` and reported but not changed.
If *greenlist* is provided, only those users receive ownership back;
all others are filed under ``"__not_greenlisted__"`` and left with admin.
"""
owner_map: dict[str, list[str]] = defaultdict(list)
total = len(affected)
no_url: list[str] = []
no_parse: list[str] = []
for i, item in enumerate(affected, 1):
token = item["friendly_token"]
title = item.get("title", "")[:60]
print(f"\r [{i}/{total}] Resolving {token} … ", end="", flush=True)
real_owner = fetch_original_owner(session, api_base, token, delay)
if real_owner is None:
no_url.append(f"{token} {title}")
owner_map["__unknown__"].append(token)
continue
if real_owner == known_admin:
# File was uploaded by admin originally — ownership is correct.
# (Unlikely to be an issue but handle it cleanly.)
owner_map["__already_admin__"].append(token)
continue
if greenlist is not None and real_owner not in greenlist:
owner_map["__not_greenlisted__"].append(token)
continue
owner_map[real_owner].append(token)
print()
if no_url:
print(f"\n ⚠ {len(no_url)} item(s) had no original_media_url "
"(SHOW_ORIGINAL_MEDIA may be disabled on this instance):")
for line in no_url[:20]:
print(f" {line}")
if len(no_url) > 20:
print(f" … and {len(no_url) - 20} more")
return dict(owner_map)
# ── Step 4 — restore ownership ─────────────────────────────────────────────────
def restore_ownership(
session: requests.Session,
api_base: str,
owner_map: dict[str, list[str]],
dry_run: bool,
delay: float,
) -> tuple[int, int]:
"""Call change_owner for each username group. Returns (ok_count, fail_count)."""
url = f"{api_base}/media/user/bulk_actions"
ok = 0
fail = 0
skipped_keys = {"__unknown__", "__already_admin__", "__not_greenlisted__"}
for username, tokens in owner_map.items():
if username in skipped_keys:
if username == "__already_admin__":
print(f" ✓ {len(tokens)} item(s) were already admin-owned "
"— no change needed.")
elif username == "__not_greenlisted__":
print(f" ⏭ {len(tokens)} item(s) belong to non-greenlisted "
"user(s) — left with admin.")
continue
print(f"\n Restoring {len(tokens)} item(s) to @{username} …")
if dry_run:
for token in tokens[:5]:
print(f" (dry-run) would change_owner {token} → {username}")
if len(tokens) > 5:
print(f" … and {len(tokens) - 5} more")
ok += len(tokens)
continue
# Send in batches to stay well under any request-size limits
for batch_start in range(0, len(tokens), BATCH_SIZE):
batch = tokens[batch_start : batch_start + BATCH_SIZE]
payload = {
"action": "change_owner",
"media_ids": batch,
"owner": username,
}
r = _post_with_backoff(session, url, payload, delay=delay)
if r.status_code == 200:
ok += len(batch)
detail = r.json().get("detail", "")
print(f" ✅ [{batch_start + 1}–{batch_start + len(batch)}] {detail}")
else:
fail += len(batch)
print(f" ❌ [{batch_start + 1}–{batch_start + len(batch)}] "
f"HTTP {r.status_code}: {r.text[:120]}")
time.sleep(delay)
return ok, fail
# ── CLI ────────────────────────────────────────────────────────────────────────
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
prog="restore_owners",
description=(
"Recover correct media ownership after enrichment scripts "
"accidentally re-assigned every item to the admin account."
),
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
examples:
%(prog)s --token TOKEN
%(prog)s --token TOKEN --dry-run
%(prog)s --token TOKEN --admin mysite-admin
%(prog)s --token TOKEN --api-url https://example.com/api/v1
""",
)
p.add_argument("--token", required=True,
help="MediaCMS API token (must be admin-level).")
p.add_argument("--admin", default="admin", metavar="USERNAME",
help="Admin username whose items need recovery "
"(default: admin).")
p.add_argument("--api-url", default=API_BASE,
help="MediaCMS API base URL.")
p.add_argument("--delay", type=float, default=REQUEST_DELAY,
help="Delay in seconds between per-item fetches "
f"(default: {REQUEST_DELAY}).")
p.add_argument("--dry-run", action="store_true",
help="Show what would change without modifying the CMS.")
p.add_argument(
"--greenlist",
nargs="*",
metavar="USERNAME",
default=list(_DEFAULT_GREENLIST),
help=(
"Usernames allowed to have ownership restored back to them. "
"Items belonging to any other user are left under admin. "
"Pass --greenlist with no names to disable filtering entirely "
"and restore all recoverable items. "
"By default no users are greenlisted, so --greenlist (or editing "
"_DEFAULT_GREENLIST in the script) must be provided."
),
)
p.add_argument("--skip-fetch", action="store_true",
help="Skip fetching all media and only restore from a "
"previously saved owner map (not yet implemented).")
return p
def main(argv: list[str] | None = None) -> int:
if sys.platform == "win32":
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
if hasattr(sys.stderr, "reconfigure"):
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
parser = build_parser()
args = parser.parse_args(argv)
api_base = args.api_url.rstrip("/")
mode = "DRY-RUN" if args.dry_run else "COMMIT"
greenlist: frozenset[str] | None = (
frozenset(args.greenlist) if args.greenlist else None
)
greenlist_label = (
", ".join(sorted(greenlist)) if greenlist else "ALL (no filtering)"
)
print(f"\n{'='*60}")
print(f" restore_owners — Mode: {mode}")
print(f" Admin username: {args.admin}")
print(f" API base: {api_base}")
print(f" Delay: {args.delay}s")
print(f" Greenlist: {greenlist_label}")
print(f"{'='*60}\n")
session = requests.Session()
session.headers["Authorization"] = f"Token {args.token}"
session.headers["Content-Type"] = "application/json"
# ── Step 1: fetch catalog ──────────────────────────────────────────────
print(" Fetching media catalog …")
all_media = fetch_all_media(session, api_base)
if not all_media:
print(" No media found.")
return 1
# ── Step 2: identify affected items ────────────────────────────────────
# All items now show user=admin because of the bug. We recover the
# real owner from the original_media_url, so we process *all* of them.
# If you know only a subset was affected, you can pre-filter here.
affected = [
item for item in all_media
if item.get("user") == args.admin
]
total_affected = len(affected)
print(f"\n Items currently owned by '{args.admin}': {total_affected}")
if total_affected == 0:
print(" Nothing to restore — no items are owned by the admin account.")
return 0
if not args.dry_run:
confirm = input(
f"\n This will attempt to restore ownership for up to "
f"{total_affected} items.\n"
f" Type YES to continue: "
).strip()
if confirm != "YES":
print(" Aborted.")
return 0
# ── Step 3: build {username → [tokens]} map ────────────────────────────
print(f"\n Fetching individual item details to recover original owners …\n"
f" (This will make up to {total_affected} API calls at "
f"{args.delay}s each — estimated "
f"{total_affected * args.delay / 60:.1f} min)\n")
owner_map = build_ownership_map(
session, api_base, affected, args.delay, args.admin,
greenlist=greenlist,
)
# Summary
unknown = len(owner_map.get("__unknown__", []))
already_admin = len(owner_map.get("__already_admin__", []))
not_greenlisted = len(owner_map.get("__not_greenlisted__", []))
recoverable = {
u: toks for u, toks in owner_map.items()
if u not in {"__unknown__", "__already_admin__", "__not_greenlisted__"}
}
total_recoverable = sum(len(v) for v in recoverable.values())
print(f"\n Owner recovery summary:")
print(f" Recoverable: {total_recoverable}")
print(f" Originally admin: {already_admin} (no change needed)")
print(f" Unknown (no URL): {unknown}")
print(f" Not greenlisted: {not_greenlisted} (left with admin)")
print()
for username, tokens in sorted(recoverable.items()):
print(f" @{username:<30} {len(tokens):5d} item(s)")
if not recoverable:
print(" No items to restore.")
return 0
# ── Step 4: restore ────────────────────────────────────────────────────
print(f"\n {'[DRY-RUN] Would restore' if args.dry_run else 'Restoring'} "
f"ownership …")
ok, fail = restore_ownership(
session, api_base, owner_map, args.dry_run, args.delay,
)
print(f"\n{'='*60}")
action = "Would restore" if args.dry_run else "Restored"
print(f" {action}: {ok} | Failed: {fail} | "
f"Unknown: {unknown}")
print(f"{'='*60}\n")
return 0 if fail == 0 else 1
if __name__ == "__main__":
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment