Created
February 28, 2026 18:15
-
-
Save FiveBoroughs/6d8171f18de28cc7947331ca8ba4384f to your computer and use it in GitHub Desktop.
Test scripts for interleaved XMLTV index (Dispatcharr PR #938)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Re-order an existing XMLTV file so programmes are sorted by start time. | |
| Uses a two-pass approach to avoid loading the full DOM into memory: | |
| 1. First pass: collect (start_time, byte_offset, length) for each programme | |
| 2. Sort by start_time | |
| 3. Second pass: seek to each offset and write in sorted order | |
| Usage: | |
| python reorder_xmltv_by_time.py input.xml output.xml | |
| """ | |
| import re | |
| import os | |
| import sys | |
| import time | |
| if len(sys.argv) < 3: | |
| print(f"Usage: {sys.argv[0]} <input.xml> <output.xml>") | |
| sys.exit(1) | |
| src, dst = sys.argv[1], sys.argv[2] | |
| PROG_OPEN = b"<programme" | |
| PROG_CLOSE = b"</programme>" | |
| CLOSE_LEN = len(PROG_CLOSE) | |
| START_RE = re.compile(rb'start="([^"]+)"') | |
| print(f"Pass 1: scanning {src} for programme offsets...") | |
| t0 = time.monotonic() | |
| entries = [] # (start_attr, file_offset, length) | |
| header_end = 0 | |
| with open(src, "rb") as f: | |
| buf = bytearray() | |
| file_pos = 0 | |
| CHUNK = 8 * 1024 * 1024 | |
| while True: | |
| data = f.read(CHUNK) | |
| if not data and not buf: | |
| break | |
| buf.extend(data) | |
| search_from = 0 | |
| while True: | |
| idx = buf.find(PROG_OPEN, search_from) | |
| if idx == -1: | |
| break | |
| close = buf.find(PROG_CLOSE, idx) | |
| if close == -1: | |
| break | |
| close_end = close + CLOSE_LEN | |
| if not entries and not header_end: | |
| header_end = file_pos + idx | |
| tag_bytes = buf[idx:close_end] | |
| m = START_RE.search(tag_bytes[:500]) | |
| start_attr = m.group(1) if m else b"" | |
| abs_offset = file_pos + idx | |
| length = close_end - idx | |
| entries.append((start_attr, abs_offset, length)) | |
| search_from = close_end | |
| keep = max(search_from, len(buf) - 4096) if data else len(buf) | |
| file_pos += keep | |
| del buf[:keep] | |
| if not data: | |
| break | |
| elapsed = time.monotonic() - t0 | |
| print(f" {len(entries)} programmes found in {elapsed:.1f}s") | |
| print("Sorting by start time...") | |
| entries.sort(key=lambda e: e[0]) | |
| unique_20 = set() | |
| with open(src, "rb") as f: | |
| for start_attr, offset, length in entries[:20]: | |
| f.seek(offset) | |
| chunk = f.read(min(length, 500)) | |
| ch_m = re.search(rb'channel="([^"]+)"', chunk) | |
| if ch_m: | |
| unique_20.add(ch_m.group(1)) | |
| print(f" First 20 programmes span {len(unique_20)} channels (confirms time-sorted)") | |
| print(f"Pass 2: writing {dst}...") | |
| t0 = time.monotonic() | |
| with open(src, "rb") as fin, open(dst, "wb") as fout: | |
| fin.seek(0) | |
| fout.write(fin.read(header_end)) | |
| for _, offset, length in entries: | |
| fin.seek(offset) | |
| fout.write(fin.read(length)) | |
| fout.write(b"\n") | |
| fout.write(b"</tv>\n") | |
| elapsed = time.monotonic() - t0 | |
| size_mb = os.path.getsize(dst) / (1024 * 1024) | |
| print(f" Wrote {size_mb:.0f}MB in {elapsed:.1f}s") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Test the byte-offset index against an interleaved XMLTV file. | |
| Point it at a time-sorted XMLTV file (use reorder_xmltv_by_time.py to | |
| create one) and an EPGSource ID. It builds the index, verifies the | |
| interleaved flag is set, and runs lookups on 5 channels. | |
| Usage (inside the container): | |
| python test_interleaved_index.py <source_id> <path_to_timesorted.xml> | |
| """ | |
| import django | |
| import os | |
| import sys | |
| import time | |
| os.environ.setdefault("DJANGO_SETTINGS_MODULE", "dispatcharr.settings") | |
| django.setup() | |
| from apps.epg.tasks import ( | |
| build_programme_index, | |
| _read_programs_at_offsets, | |
| _scan_from_offset_for_tvg_id, | |
| ) | |
| from apps.epg.models import EPGSource | |
| from django.utils import timezone | |
| if len(sys.argv) < 3: | |
| print(f"Usage: {sys.argv[0]} <source_id> <path_to_timesorted.xml>") | |
| sys.exit(1) | |
| source_id = int(sys.argv[1]) | |
| xml_path = sys.argv[2] | |
| source = EPGSource.objects.get(id=source_id) | |
| orig_extracted = source.extracted_file_path | |
| orig_index = source.programme_index | |
| try: | |
| source.extracted_file_path = xml_path | |
| source.programme_index = None | |
| source.save(update_fields=["extracted_file_path", "programme_index"]) | |
| print("Building byte-offset index...") | |
| t0 = time.monotonic() | |
| build_programme_index(source.id) | |
| source.refresh_from_db(fields=["programme_index"]) | |
| elapsed = time.monotonic() - t0 | |
| idx = source.programme_index | |
| ch = idx.get("channels", {}) | |
| print(f" {len(ch)} channels indexed in {elapsed:.1f}s, interleaved={idx.get('interleaved')}") | |
| print("\nLookups:") | |
| now = timezone.now() | |
| for tvg_id in list(ch.keys())[:5]: | |
| offsets = ch[tvg_id] | |
| t0 = time.monotonic() | |
| result = _read_programs_at_offsets(xml_path, tvg_id, offsets, now) | |
| if not result: | |
| result = _scan_from_offset_for_tvg_id(xml_path, tvg_id, offsets[-1], now) | |
| elapsed = (time.monotonic() - t0) * 1000 | |
| title = result["title"][:50] if isinstance(result, dict) else str(result) | |
| print(f" {tvg_id}: {title} ({elapsed:.1f}ms)") | |
| finally: | |
| source.extracted_file_path = orig_extracted | |
| source.programme_index = orig_index | |
| source.save(update_fields=["extracted_file_path", "programme_index"]) | |
| print("\nSource restored.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment