Created
February 27, 2026 14:03
-
-
Save Enzime/0a3f6dc9552885128e6c44553943805b to your computer and use it in GitHub Desktop.
Instead using this and injecting autounattend.xml into the ISO, just create a second ISO or USB with the file in the root like https://schneegans.de/windows/unattend-generator/
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #! /usr/bin/env nix-shell | |
| #! nix-shell -i python3 -p "python3.withPackages (ps: [ ps.pycdlib ])" | |
| """Inject autounattend.xml into a Windows ISO, preserving UDF structure and boot records.""" | |
| import hashlib | |
| import io | |
| import struct | |
| import sys | |
| import pycdlib | |
| SECTOR = 2048 | |
| def decode_udf_name(name): | |
| """Decode a UDF file identifier, handling both UTF-8 and UTF-16BE encodings.""" | |
| if isinstance(name, str): | |
| return name | |
| # Try UTF-8 first | |
| try: | |
| decoded = name.decode("utf-8") | |
| if "\x00" not in decoded: | |
| return decoded | |
| except UnicodeDecodeError: | |
| pass | |
| # UTF-16BE (OSTA Compressed Unicode with compression ID 16) | |
| try: | |
| return name.decode("utf-16-be") | |
| except UnicodeDecodeError: | |
| return name.decode("utf-8", errors="replace") | |
| def walk_udf(iso, path="/"): | |
| """Recursively walk UDF tree, returning dict of path -> (size, md5).""" | |
| files = {} | |
| for child in iso.list_children(udf_path=path): | |
| if child is None: | |
| continue | |
| name = decode_udf_name(child.file_identifier()) | |
| full_path = path.rstrip("/") + "/" + name | |
| if child.is_dir(): | |
| files.update(walk_udf(iso, full_path)) | |
| else: | |
| size = child.info_len | |
| try: | |
| buf = io.BytesIO() | |
| iso.get_file_from_iso_fp(buf, udf_path=full_path) | |
| md5 = hashlib.md5(buf.getvalue()).hexdigest() | |
| except Exception: | |
| md5 = "<no data>" | |
| files[full_path] = (size, md5) | |
| return files | |
| def check_avdps(path, label): | |
| """Check UDF Anchor Volume Descriptor Pointers at standard locations.""" | |
| import os | |
| total_sectors = os.path.getsize(path) // SECTOR | |
| locations = { | |
| "Sector 256": 256, | |
| f"N-1 (sector {total_sectors - 1})": total_sectors - 1, | |
| } | |
| print(f"\n {label} AVDPs:") | |
| with open(path, "rb") as f: | |
| for loc_label, sector in locations.items(): | |
| f.seek(sector * SECTOR) | |
| data = f.read(SECTOR) | |
| tag_id = struct.unpack_from("<H", data, 0)[0] | |
| is_avdp = tag_id == 2 | |
| print(f" {loc_label}: {'AVDP' if is_avdp else f'NOT AVDP (tag={tag_id})'}") | |
| if is_avdp: | |
| main_len, main_loc = struct.unpack_from("<II", data, 16) | |
| res_len, res_loc = struct.unpack_from("<II", data, 24) | |
| print(f" Main VD: loc={main_loc}, len={main_len}") | |
| print(f" Reserve VD: loc={res_loc}, len={res_len}") | |
| def timestamp_to_tuple(ts): | |
| """Convert a UDFTimestamp to a comparable tuple.""" | |
| if ts is None: | |
| return None | |
| return (ts.year, ts.month, ts.day, ts.hour, ts.minute, ts.second, | |
| ts.centiseconds, ts.hundreds_microseconds, ts.microseconds) | |
| def get_file_metadata(iso, path="/"): | |
| """Recursively collect UDF file metadata (timestamps, permissions).""" | |
| meta = {} | |
| for child in iso.list_children(udf_path=path): | |
| if child is None: | |
| continue | |
| name = decode_udf_name(child.file_identifier()) | |
| full_path = path.rstrip("/") + "/" + name | |
| entry = { | |
| "info_len": child.info_len if hasattr(child, "info_len") else None, | |
| "file_link_count": child.file_link_count if hasattr(child, "file_link_count") else None, | |
| "mod_time": timestamp_to_tuple(getattr(child, "mod_time", None)), | |
| "access_time": timestamp_to_tuple(getattr(child, "access_time", None)), | |
| "attr_time": timestamp_to_tuple(getattr(child, "attr_time", None)), | |
| } | |
| if hasattr(child, "permissions"): | |
| entry["permissions"] = child.permissions | |
| meta[full_path] = entry | |
| if child.is_dir(): | |
| meta.update(get_file_metadata(iso, full_path)) | |
| return meta | |
| def compare_eltorito(orig, mod): | |
| """Compare El Torito boot catalog structures.""" | |
| failures = [] | |
| print("\n--- El Torito Boot Catalog ---") | |
| orig_cat = orig.eltorito_boot_catalog | |
| mod_cat = mod.eltorito_boot_catalog | |
| if not orig_cat or not mod_cat: | |
| msg = "Missing boot catalog" | |
| print(f" FAIL: {msg}") | |
| failures.append(msg) | |
| return failures | |
| # Validation entry | |
| ve_match = orig_cat.validation_entry.record() == mod_cat.validation_entry.record() | |
| print(f" Validation entry identical: {ve_match}") | |
| if not ve_match: | |
| failures.append("El Torito validation entry differs") | |
| # Initial entry fields (excluding RBA which shifts) | |
| orig_ie = orig_cat.initial_entry | |
| mod_ie = mod_cat.initial_entry | |
| for field in ("boot_indicator", "boot_media_type", "load_segment", | |
| "system_type", "sector_count"): | |
| orig_val = getattr(orig_ie, field) | |
| mod_val = getattr(mod_ie, field) | |
| match = orig_val == mod_val | |
| print(f" Initial entry {field}: orig={orig_val}, mod={mod_val}, match={match}") | |
| if not match: | |
| failures.append(f"El Torito initial entry {field} differs: {orig_val} vs {mod_val}") | |
| print(f" Initial entry load_rba: orig={orig_ie.load_rba}, mod={mod_ie.load_rba} (shift expected)") | |
| # Section entries | |
| if len(orig_cat.sections) != len(mod_cat.sections): | |
| msg = f"Section count differs: {len(orig_cat.sections)} vs {len(mod_cat.sections)}" | |
| print(f" FAIL: {msg}") | |
| failures.append(msg) | |
| else: | |
| print(f" Section count: {len(orig_cat.sections)} (match)") | |
| for i, (orig_sec, mod_sec) in enumerate(zip(orig_cat.sections, mod_cat.sections)): | |
| sec_match = orig_sec.record() == mod_sec.record() | |
| print(f" Section {i} header identical: {sec_match}") | |
| if not sec_match: | |
| failures.append(f"El Torito section {i} header differs") | |
| # Boot image content | |
| print("\n--- El Torito Boot Image Content ---") | |
| orig_fp = orig._cdfp | |
| orig_fp.seek(orig_ie.load_rba * SECTOR) | |
| orig_data = orig_fp.read(orig_ie.sector_count * 512) | |
| mod_fp = mod._cdfp | |
| mod_fp.seek(mod_ie.load_rba * SECTOR) | |
| mod_data = mod_fp.read(mod_ie.sector_count * 512) | |
| orig_md5 = hashlib.md5(orig_data).hexdigest() | |
| mod_md5 = hashlib.md5(mod_data).hexdigest() | |
| img_match = orig_data == mod_data | |
| print(f" Original: {len(orig_data)} bytes, MD5={orig_md5}") | |
| print(f" Modified: {len(mod_data)} bytes, MD5={mod_md5}") | |
| print(f" Identical: {img_match}") | |
| if not img_match: | |
| failures.append("El Torito boot image content differs") | |
| return failures | |
| def compare_iso9660_pvd(input_iso, output_iso): | |
| """Compare ISO 9660 Primary Volume Descriptor fields.""" | |
| failures = [] | |
| print("\n--- ISO 9660 Primary Volume Descriptor ---") | |
| with open(input_iso, "rb") as f: | |
| f.seek(16 * SECTOR) | |
| orig_pvd = f.read(SECTOR) | |
| with open(output_iso, "rb") as f: | |
| f.seek(16 * SECTOR) | |
| mod_pvd = f.read(SECTOR) | |
| # Fields that should be identical | |
| fields = [ | |
| ("System Identifier", 8, 40, "str"), | |
| ("Volume Identifier", 40, 72, "str"), | |
| ("Volume Set Identifier", 190, 318, "str"), | |
| ("Publisher Identifier", 318, 446, "str"), | |
| ("Application Identifier", 574, 702, "str"), | |
| ("Volume Creation Date", 813, 830, "raw"), | |
| ] | |
| for name, start, end, fmt in fields: | |
| orig_val = orig_pvd[start:end] | |
| mod_val = mod_pvd[start:end] | |
| match = orig_val == mod_val | |
| if fmt == "str": | |
| display_orig = orig_val.rstrip(b"\x00 ").decode("ascii", errors="replace") | |
| display_mod = mod_val.rstrip(b"\x00 ").decode("ascii", errors="replace") | |
| else: | |
| display_orig = orig_val.hex() | |
| display_mod = mod_val.hex() | |
| print(f" {name}: {'match' if match else 'DIFFERS'}") | |
| if not match: | |
| print(f" orig: {display_orig}") | |
| print(f" mod: {display_mod}") | |
| failures.append(f"ISO 9660 PVD {name} differs") | |
| # Volume Modification Date (expected to change) | |
| orig_mdate = orig_pvd[830:847] | |
| mod_mdate = mod_pvd[830:847] | |
| print(f" Volume Modification Date: {'match' if orig_mdate == mod_mdate else 'differs (expected)'}") | |
| # Logical Block Size (should be 2048) | |
| orig_bs = struct.unpack_from("<H", orig_pvd, 128)[0] | |
| mod_bs = struct.unpack_from("<H", mod_pvd, 128)[0] | |
| print(f" Logical Block Size: orig={orig_bs}, mod={mod_bs}, match={orig_bs == mod_bs}") | |
| if orig_bs != mod_bs: | |
| failures.append(f"ISO 9660 block size differs: {orig_bs} vs {mod_bs}") | |
| # Path Table Size | |
| orig_pts = struct.unpack_from("<I", orig_pvd, 132)[0] | |
| mod_pts = struct.unpack_from("<I", mod_pvd, 132)[0] | |
| print(f" Path Table Size: orig={orig_pts}, mod={mod_pts}, match={orig_pts == mod_pts}") | |
| if orig_pts != mod_pts: | |
| failures.append(f"ISO 9660 path table size differs: {orig_pts} vs {mod_pts}") | |
| return failures | |
| def read_avdp(path, sector): | |
| """Read an AVDP and return (main_vd_loc, main_vd_len, reserve_vd_loc, reserve_vd_len) or None.""" | |
| with open(path, "rb") as f: | |
| f.seek(sector * SECTOR) | |
| data = f.read(SECTOR) | |
| tag_id = struct.unpack_from("<H", data, 0)[0] | |
| if tag_id != 2: | |
| return None | |
| main_len, main_loc = struct.unpack_from("<II", data, 16) | |
| res_len, res_loc = struct.unpack_from("<II", data, 24) | |
| return (main_loc, main_len, res_loc, res_len) | |
| def _parse_vd_descriptors(data): | |
| """Parse a VD sequence into a dict of tag_id -> sector data, handling duplicates.""" | |
| tag_names = {1: "PVD", 2: "AVDP", 4: "IUVD", 5: "PD", 6: "LVD", 7: "USD", 8: "TD"} | |
| descriptors = {} | |
| num_sectors = len(data) // SECTOR | |
| order = [] | |
| for i in range(num_sectors): | |
| sector_data = data[i * SECTOR:(i + 1) * SECTOR] | |
| tag_id = struct.unpack_from("<H", sector_data, 0)[0] | |
| if tag_id == 0: | |
| continue # empty/padding sector | |
| name = tag_names.get(tag_id, f"unknown({tag_id})") | |
| descriptors[tag_id] = sector_data | |
| order.append((tag_id, name)) | |
| return descriptors, order | |
| def compare_udf_volume_descriptors(input_iso, output_iso): | |
| """Compare UDF volume descriptor sequences semantically.""" | |
| failures = [] | |
| print("\n--- UDF Volume Descriptors ---") | |
| orig_avdp = read_avdp(input_iso, 256) | |
| mod_avdp = read_avdp(output_iso, 256) | |
| if not orig_avdp or not mod_avdp: | |
| failures.append("Could not read AVDP at sector 256") | |
| return failures | |
| orig_main_loc, orig_main_len, orig_res_loc, orig_res_len = orig_avdp | |
| mod_main_loc, mod_main_len, mod_res_loc, mod_res_len = mod_avdp | |
| relocated = orig_main_loc != mod_main_loc | |
| if relocated: | |
| print(f" VD sequence relocated: sector {orig_main_loc} -> {mod_main_loc}") | |
| with open(input_iso, "rb") as f: | |
| f.seek(orig_main_loc * SECTOR) | |
| orig_vds_raw = f.read(orig_main_len) | |
| with open(output_iso, "rb") as f: | |
| f.seek(mod_main_loc * SECTOR) | |
| mod_vds_raw = f.read(mod_main_len) | |
| tag_names = {1: "PVD", 4: "IUVD", 5: "PD", 6: "LVD", 7: "USD", 8: "TD"} | |
| orig_descs, orig_order = _parse_vd_descriptors(orig_vds_raw) | |
| mod_descs, mod_order = _parse_vd_descriptors(mod_vds_raw) | |
| orig_types = {t for t, _ in orig_order} | |
| mod_types = {t for t, _ in mod_order} | |
| print(f" Original descriptor order: {', '.join(n for _, n in orig_order)}") | |
| print(f" Modified descriptor order: {', '.join(n for _, n in mod_order)}") | |
| if orig_types != mod_types: | |
| missing = orig_types - mod_types | |
| extra = mod_types - orig_types | |
| if missing: | |
| msg = f"Missing descriptor types: {[tag_names.get(t, t) for t in missing]}" | |
| print(f" FAIL: {msg}") | |
| failures.append(msg) | |
| if extra: | |
| print(f" NOTE: Extra descriptor types: {[tag_names.get(t, t) for t in extra]}") | |
| else: | |
| print(f" Same descriptor types present: {[tag_names.get(t, t) for t in sorted(orig_types)]}") | |
| # Compare key semantic fields from each descriptor type | |
| for tag_id in sorted(orig_types & mod_types): | |
| name = tag_names.get(tag_id, f"unknown({tag_id})") | |
| orig_data = orig_descs[tag_id] | |
| mod_data = mod_descs[tag_id] | |
| if tag_id == 1: # Primary Volume Descriptor | |
| # Compare volume identifier (bytes 24-55, dstring) | |
| orig_vol_id = orig_data[24:56].rstrip(b"\x00") | |
| mod_vol_id = mod_data[24:56].rstrip(b"\x00") | |
| match = orig_vol_id == mod_vol_id | |
| print(f" {name} volume identifier: {'match' if match else 'DIFFERS'}") | |
| if not match: | |
| print(f" orig: {orig_vol_id!r}") | |
| print(f" mod: {mod_vol_id!r}") | |
| failures.append(f"UDF {name} volume identifier differs") | |
| elif tag_id == 5: # Partition Descriptor | |
| # Compare partition number (bytes 22-23) and partition contents (bytes 24-55) | |
| orig_part_num = struct.unpack_from("<H", orig_data, 22)[0] | |
| mod_part_num = struct.unpack_from("<H", mod_data, 22)[0] | |
| print(f" {name} partition number: orig={orig_part_num}, mod={mod_part_num}, " | |
| f"{'match' if orig_part_num == mod_part_num else 'DIFFERS'}") | |
| if orig_part_num != mod_part_num: | |
| failures.append(f"UDF partition number differs") | |
| # Access type (bytes 184-187) | |
| orig_access = struct.unpack_from("<I", orig_data, 184)[0] | |
| mod_access = struct.unpack_from("<I", mod_data, 184)[0] | |
| access_names = {0: "unspecified", 1: "read-only", 2: "write-once", | |
| 3: "rewritable", 4: "overwritable"} | |
| print(f" {name} access type: orig={access_names.get(orig_access, orig_access)}, " | |
| f"mod={access_names.get(mod_access, mod_access)}") | |
| # Partition start + length | |
| orig_start = struct.unpack_from("<I", orig_data, 188)[0] | |
| mod_start = struct.unpack_from("<I", mod_data, 188)[0] | |
| orig_plen = struct.unpack_from("<I", orig_data, 192)[0] | |
| mod_plen = struct.unpack_from("<I", mod_data, 192)[0] | |
| print(f" {name} partition start: orig={orig_start}, mod={mod_start}") | |
| print(f" {name} partition length: orig={orig_plen}, mod={mod_plen} " | |
| f"(diff={mod_plen - orig_plen})") | |
| elif tag_id == 6: # Logical Volume Descriptor | |
| # Logical block size (bytes 212-215) | |
| orig_lbs = struct.unpack_from("<I", orig_data, 212)[0] | |
| mod_lbs = struct.unpack_from("<I", mod_data, 212)[0] | |
| match = orig_lbs == mod_lbs | |
| print(f" {name} logical block size: orig={orig_lbs}, mod={mod_lbs}, " | |
| f"{'match' if match else 'DIFFERS'}") | |
| if not match: | |
| failures.append(f"UDF logical block size differs") | |
| return failures | |
| return failures | |
| def main(): | |
| if len(sys.argv) != 4: | |
| print(f"Usage: {sys.argv[0]} <input.iso> <autounattend.xml> <output.iso>") | |
| sys.exit(1) | |
| input_iso = sys.argv[1] | |
| unattend_file = sys.argv[2] | |
| output_iso = sys.argv[3] | |
| # Step 1: Inject the file | |
| print(f"Opening {input_iso}...") | |
| iso = pycdlib.PyCdlib() | |
| iso.open(input_iso) | |
| print(f"Adding {unattend_file} to ISO root...") | |
| iso.add_file(unattend_file, udf_path="/autounattend.xml") | |
| print(f"Writing {output_iso}...") | |
| iso.write(output_iso) | |
| iso.close() | |
| # Open both for comparison | |
| print("\n=== Verification ===") | |
| failures = [] | |
| orig = pycdlib.PyCdlib() | |
| orig.open(input_iso) | |
| mod = pycdlib.PyCdlib() | |
| mod.open(output_iso) | |
| # Test 1: UDF file contents | |
| print("\n--- UDF File Contents ---") | |
| print("Walking original ISO...") | |
| orig_files = walk_udf(orig, "/") | |
| print(f" {len(orig_files)} files") | |
| print("Walking modified ISO...") | |
| mod_files = walk_udf(mod, "/") | |
| print(f" {len(mod_files)} files") | |
| only_in_orig = set(orig_files.keys()) - set(mod_files.keys()) | |
| only_in_mod = set(mod_files.keys()) - set(orig_files.keys()) | |
| common = set(orig_files.keys()) & set(mod_files.keys()) | |
| if only_in_orig: | |
| print(f"\nOnly in original ({len(only_in_orig)}):") | |
| for f in sorted(only_in_orig): | |
| print(f" {f} ({orig_files[f][0]} bytes)") | |
| failures.append(f"{len(only_in_orig)} files missing from modified ISO") | |
| if only_in_mod: | |
| print(f"\nOnly in modified ({len(only_in_mod)}):") | |
| for f in sorted(only_in_mod): | |
| print(f" {f} ({mod_files[f][0]} bytes)") | |
| expected_new = {"/autounattend.xml"} | |
| unexpected = only_in_mod - expected_new | |
| if unexpected: | |
| failures.append(f"Unexpected new files in modified ISO: {unexpected}") | |
| mismatched = [] | |
| for f in sorted(common): | |
| if orig_files[f] != mod_files[f]: | |
| mismatched.append(f) | |
| if mismatched: | |
| print(f"\nFiles with different content ({len(mismatched)}):") | |
| for f in mismatched: | |
| print(f" {f}") | |
| print(f" orig: size={orig_files[f][0]} md5={orig_files[f][1]}") | |
| print(f" mod: size={mod_files[f][0]} md5={mod_files[f][1]}") | |
| failures.append(f"{len(mismatched)} files have different content") | |
| else: | |
| print(f"\nAll {len(common)} common files are identical.") | |
| # Test 2: UDF file metadata | |
| print("\n--- UDF File Metadata (timestamps, permissions) ---") | |
| orig_meta = get_file_metadata(orig, "/") | |
| mod_meta = get_file_metadata(mod, "/") | |
| meta_common = set(orig_meta.keys()) & set(mod_meta.keys()) | |
| meta_diffs = [f for f in sorted(meta_common) if orig_meta[f] != mod_meta[f]] | |
| if meta_diffs: | |
| print(f" {len(meta_diffs)} files with different metadata:") | |
| for f in meta_diffs[:10]: | |
| print(f" {f}") | |
| print(f" orig: {orig_meta[f]}") | |
| print(f" mod: {mod_meta[f]}") | |
| if len(meta_diffs) > 10: | |
| print(f" ... and {len(meta_diffs) - 10} more") | |
| failures.append(f"{len(meta_diffs)} files have different metadata") | |
| else: | |
| print(f" All {len(meta_common)} common files have identical metadata.") | |
| # Test 3: UDF directory structure | |
| print("\n--- UDF Directory Structure ---") | |
| orig_dirs = {p for p in orig_meta if orig.list_children(udf_path=p) is not None} | |
| mod_dirs = {p for p in mod_meta if mod.list_children(udf_path=p) is not None} | |
| # Simpler: just compare the set of all paths (dirs + files) | |
| orig_all_paths = set(orig_meta.keys()) | |
| mod_all_paths = set(mod_meta.keys()) | |
| missing_paths = orig_all_paths - mod_all_paths | |
| if missing_paths: | |
| print(f" FAIL: {len(missing_paths)} paths missing from modified ISO") | |
| failures.append(f"{len(missing_paths)} paths missing from modified ISO") | |
| else: | |
| print(f" All {len(orig_all_paths)} original paths present in modified ISO.") | |
| # Test 4: El Torito | |
| failures.extend(compare_eltorito(orig, mod)) | |
| orig.close() | |
| mod.close() | |
| # Test 5: System area | |
| print("\n--- System Area (first 32KB) ---") | |
| with open(input_iso, "rb") as f: | |
| orig_sa = f.read(16 * SECTOR) | |
| with open(output_iso, "rb") as f: | |
| mod_sa = f.read(16 * SECTOR) | |
| sa_match = orig_sa == mod_sa | |
| print(f" Identical: {sa_match}") | |
| if not sa_match: | |
| failures.append("System area differs") | |
| # Test 6: ISO 9660 PVD | |
| failures.extend(compare_iso9660_pvd(input_iso, output_iso)) | |
| # Test 7: UDF volume descriptors | |
| failures.extend(compare_udf_volume_descriptors(input_iso, output_iso)) | |
| # Test 8: UDF anchors | |
| print("\n--- UDF Anchors ---") | |
| check_avdps(input_iso, "Original") | |
| check_avdps(output_iso, "Modified") | |
| # Compare AVDP VD pointers | |
| orig_avdp = read_avdp(input_iso, 256) | |
| mod_avdp = read_avdp(output_iso, 256) | |
| if orig_avdp and mod_avdp: | |
| if orig_avdp[0] != mod_avdp[0] or orig_avdp[2] != mod_avdp[2]: | |
| print(f"\n NOTE: VD locations were relocated by pycdlib:") | |
| print(f" Main VD: sector {orig_avdp[0]} -> {mod_avdp[0]}") | |
| print(f" Reserve VD: sector {orig_avdp[2]} -> {mod_avdp[2]}") | |
| # Summary | |
| print("\n" + "=" * 50) | |
| if failures: | |
| print(f"ISSUES FOUND ({len(failures)}):") | |
| for f in failures: | |
| print(f" - {f}") | |
| else: | |
| print("ALL CHECKS PASSED") | |
| print("=" * 50) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment