Skip to content

Instantly share code, notes, and snippets.

@FiveBoroughs
Created February 28, 2026 18:15
Show Gist options
  • Select an option

  • Save FiveBoroughs/6d8171f18de28cc7947331ca8ba4384f to your computer and use it in GitHub Desktop.

Select an option

Save FiveBoroughs/6d8171f18de28cc7947331ca8ba4384f to your computer and use it in GitHub Desktop.
Test scripts for interleaved XMLTV index (Dispatcharr PR #938)
"""Re-order an existing XMLTV file so programmes are sorted by start time.
Uses a two-pass approach to avoid loading the full DOM into memory:
1. First pass: collect (start_time, byte_offset, length) for each programme
2. Sort by start_time
3. Second pass: seek to each offset and write in sorted order
Usage:
python reorder_xmltv_by_time.py input.xml output.xml
"""
import re
import os
import sys
import time
if len(sys.argv) < 3:
print(f"Usage: {sys.argv[0]} <input.xml> <output.xml>")
sys.exit(1)
src, dst = sys.argv[1], sys.argv[2]
PROG_OPEN = b"<programme"
PROG_CLOSE = b"</programme>"
CLOSE_LEN = len(PROG_CLOSE)
START_RE = re.compile(rb'start="([^"]+)"')
print(f"Pass 1: scanning {src} for programme offsets...")
t0 = time.monotonic()
entries = [] # (start_attr, file_offset, length)
header_end = 0
with open(src, "rb") as f:
buf = bytearray()
file_pos = 0
CHUNK = 8 * 1024 * 1024
while True:
data = f.read(CHUNK)
if not data and not buf:
break
buf.extend(data)
search_from = 0
while True:
idx = buf.find(PROG_OPEN, search_from)
if idx == -1:
break
close = buf.find(PROG_CLOSE, idx)
if close == -1:
break
close_end = close + CLOSE_LEN
if not entries and not header_end:
header_end = file_pos + idx
tag_bytes = buf[idx:close_end]
m = START_RE.search(tag_bytes[:500])
start_attr = m.group(1) if m else b""
abs_offset = file_pos + idx
length = close_end - idx
entries.append((start_attr, abs_offset, length))
search_from = close_end
keep = max(search_from, len(buf) - 4096) if data else len(buf)
file_pos += keep
del buf[:keep]
if not data:
break
elapsed = time.monotonic() - t0
print(f" {len(entries)} programmes found in {elapsed:.1f}s")
print("Sorting by start time...")
entries.sort(key=lambda e: e[0])
unique_20 = set()
with open(src, "rb") as f:
for start_attr, offset, length in entries[:20]:
f.seek(offset)
chunk = f.read(min(length, 500))
ch_m = re.search(rb'channel="([^"]+)"', chunk)
if ch_m:
unique_20.add(ch_m.group(1))
print(f" First 20 programmes span {len(unique_20)} channels (confirms time-sorted)")
print(f"Pass 2: writing {dst}...")
t0 = time.monotonic()
with open(src, "rb") as fin, open(dst, "wb") as fout:
fin.seek(0)
fout.write(fin.read(header_end))
for _, offset, length in entries:
fin.seek(offset)
fout.write(fin.read(length))
fout.write(b"\n")
fout.write(b"</tv>\n")
elapsed = time.monotonic() - t0
size_mb = os.path.getsize(dst) / (1024 * 1024)
print(f" Wrote {size_mb:.0f}MB in {elapsed:.1f}s")
"""Test the byte-offset index against an interleaved XMLTV file.
Point it at a time-sorted XMLTV file (use reorder_xmltv_by_time.py to
create one) and an EPGSource ID. It builds the index, verifies the
interleaved flag is set, and runs lookups on 5 channels.
Usage (inside the container):
python test_interleaved_index.py <source_id> <path_to_timesorted.xml>
"""
import django
import os
import sys
import time
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "dispatcharr.settings")
django.setup()
from apps.epg.tasks import (
build_programme_index,
_read_programs_at_offsets,
_scan_from_offset_for_tvg_id,
)
from apps.epg.models import EPGSource
from django.utils import timezone
if len(sys.argv) < 3:
print(f"Usage: {sys.argv[0]} <source_id> <path_to_timesorted.xml>")
sys.exit(1)
source_id = int(sys.argv[1])
xml_path = sys.argv[2]
source = EPGSource.objects.get(id=source_id)
orig_extracted = source.extracted_file_path
orig_index = source.programme_index
try:
source.extracted_file_path = xml_path
source.programme_index = None
source.save(update_fields=["extracted_file_path", "programme_index"])
print("Building byte-offset index...")
t0 = time.monotonic()
build_programme_index(source.id)
source.refresh_from_db(fields=["programme_index"])
elapsed = time.monotonic() - t0
idx = source.programme_index
ch = idx.get("channels", {})
print(f" {len(ch)} channels indexed in {elapsed:.1f}s, interleaved={idx.get('interleaved')}")
print("\nLookups:")
now = timezone.now()
for tvg_id in list(ch.keys())[:5]:
offsets = ch[tvg_id]
t0 = time.monotonic()
result = _read_programs_at_offsets(xml_path, tvg_id, offsets, now)
if not result:
result = _scan_from_offset_for_tvg_id(xml_path, tvg_id, offsets[-1], now)
elapsed = (time.monotonic() - t0) * 1000
title = result["title"][:50] if isinstance(result, dict) else str(result)
print(f" {tvg_id}: {title} ({elapsed:.1f}ms)")
finally:
source.extracted_file_path = orig_extracted
source.programme_index = orig_index
source.save(update_fields=["extracted_file_path", "programme_index"])
print("\nSource restored.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment