Last active
September 30, 2025 14:50
-
-
Save Exieros/5ba45874c90964833bc922d00c52d74e to your computer and use it in GitHub Desktop.
This script allows you to extract layer tiles from the proprietary *.pgd format used by Alpine Quest and Offline Maps applications and combine them into a single png image. The script was written using Github Copilot Chatgpt 5.0 based on source code obtained from jadx. Time spent: 10 minutes. Wow.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import io | |
| import os | |
| import csv | |
| import struct | |
| import argparse | |
| from collections import deque | |
| try: | |
| from PIL import Image | |
| except Exception as e: | |
| Image = None | |
| # Try python pgd_assemble.py --help | |
| # PGD file format constants (from reverse-engineered Java) | |
| MAGIC_FILE = 0x5047443A # 'PGD:' big-endian int | |
| MAGIC_NODE = 87381 # node header | |
| MAGIC_ENTRIES_LIST = 152917 | |
| MAGIC_ENTRIES_TABLE = 283989 | |
| MAGIC_DATA_MAIN = 1070421 | |
| MAGIC_DATA_ADD = 2118997 | |
| def be_u32(b): | |
| return struct.unpack('>I', b)[0] | |
| def be_u64(b): | |
| return struct.unpack('>Q', b)[0] | |
| class FileView: | |
| def __init__(self, path): | |
| self.path = path | |
| self.f = open(path, 'rb') | |
| def close(self): | |
| try: | |
| self.f.close() | |
| except Exception: | |
| pass | |
| def read_at(self, pos, size): | |
| self.f.seek(pos) | |
| return self.f.read(size) | |
| def u32_at(self, pos): | |
| return be_u32(self.read_at(pos, 4)) | |
| def u64_at(self, pos): | |
| return be_u64(self.read_at(pos, 8)) | |
| class NodeListPage: | |
| def __init__(self, base, capacity, child_count, data_count, next_ptr): | |
| self.base = base | |
| self.capacity = capacity | |
| self.child_count = child_count | |
| self.data_count = data_count | |
| self.next_ptr = next_ptr | |
| self.entries_pos = base + 24 | |
| class NodeList: | |
| def __init__(self, pages): | |
| self.pages = pages | |
| def total_children(self): | |
| return sum(p.child_count for p in self.pages) | |
| def total_data(self): | |
| return sum(p.data_count for p in self.pages) | |
| def child_slot_pos(self, idx): | |
| # idx is 0-based across the chain; find page and slot | |
| off = idx | |
| for p in self.pages: | |
| if off < p.child_count: | |
| return p.entries_pos + off * 12 | |
| off -= p.child_count | |
| return None | |
| def data_slot_pos(self, idx): | |
| # idx is 0-based across the chain; data are stored reversed at page tail | |
| off = idx | |
| for p in self.pages: | |
| if off < p.data_count: | |
| total_slots = p.capacity | |
| # data indices are placed from the end, contiguous | |
| slot_index = (total_slots - 1) - off | |
| return p.entries_pos + slot_index * 12 | |
| off -= p.data_count | |
| return None | |
| class NodeTable: | |
| def __init__(self, base, child_count, data_count): | |
| self.base = base | |
| self.child_count = child_count | |
| self.data_count = data_count | |
| self.entries_pos = base + 12 | |
| def child_slot_pos(self, idx): | |
| if 0 <= idx < self.child_count: | |
| return self.entries_pos + idx * 12 | |
| return None | |
| def data_slot_pos(self, idx): | |
| # tiles are stored reversed at the end: slot_index = total_slots - 1 - idx | |
| if 0 <= idx < self.data_count: | |
| total_slots = self.child_count + self.data_count | |
| slot_index = (total_slots - 1) - idx | |
| return self.entries_pos + slot_index * 12 | |
| return None | |
| class Node: | |
| def __init__(self, ptr, flags, entries): | |
| self.ptr = ptr | |
| self.flags = flags | |
| self.entries = entries # NodeList or NodeTable | |
| # -------------------- Metadata (aux) decoding -------------------- | |
| class _BytesReader: | |
| def __init__(self, data: bytes): | |
| self._b = data | |
| self._i = 0 | |
| def read(self, n) -> bytes: | |
| if self._i + n > len(self._b): | |
| raise EOFError('Unexpected EOF in aux reader') | |
| out = self._b[self._i:self._i+n] | |
| self._i += n | |
| return out | |
| def read_int(self) -> int: | |
| return struct.unpack('>i', self.read(4))[0] | |
| def read_long(self) -> int: | |
| return struct.unpack('>q', self.read(8))[0] | |
| def read_double(self) -> float: | |
| return struct.unpack('>d', self.read(8))[0] | |
| def read_bool(self) -> bool: | |
| b = self.read(1)[0] | |
| return b != 0 | |
| def read_string(self) -> str | None: | |
| ln = self.read_int() | |
| if ln == 0: | |
| return None | |
| s = self.read(ln).decode('utf-8') | |
| return s | |
| def read_node_metadata(fv: FileView, node: Node) -> dict: | |
| # Node header stores meta_ptr at ptr+8; metadata is a data chain whose body is aux map | |
| meta_ptr = fv.u64_at(node.ptr + 8) | |
| if meta_ptr == 0: | |
| return {} | |
| body = read_data_chain(fv, meta_ptr) | |
| if not body: | |
| return {} | |
| r = _BytesReader(body) | |
| out = {} | |
| try: | |
| count = r.read_int() | |
| for _ in range(count): | |
| key = r.read_string() | |
| t = r.read_int() | |
| if t >= 0: | |
| # string (UTF-8, length t) | |
| val = r.read(t).decode('utf-8') if t > 0 else '' | |
| elif t == -1: | |
| val = r.read_bool() | |
| elif t == -2: | |
| val = r.read_long() | |
| elif t == -3: | |
| val = r.read_double() | |
| elif t == -4: | |
| # blob: int length + bytes | |
| blen = r.read_int() | |
| val = r.read(blen) if blen > 0 else b'' | |
| else: | |
| val = None | |
| if key is not None and val is not None: | |
| out[key] = val | |
| except Exception: | |
| # tolerate metadata parsing issues | |
| pass | |
| return out | |
| def parse_node(fv: FileView, ptr): | |
| # Node header: [u32 magic][u32 flags][u64 meta_ptr][u64 entries_ptr] then entries at entries_ptr | |
| if ptr <= 0: | |
| return None | |
| magic = fv.u32_at(ptr) | |
| if magic != MAGIC_NODE: | |
| raise IOError(f'Bad node signature at #{ptr}: {magic}') | |
| flags = fv.u32_at(ptr + 4) | |
| # meta_ptr = fv.u64_at(ptr + 8) # not needed here | |
| entries_ptr = fv.u64_at(ptr + 16) | |
| sig = fv.u32_at(entries_ptr) | |
| if sig == MAGIC_ENTRIES_TABLE: | |
| child_count = fv.u32_at(entries_ptr + 4) | |
| data_count = fv.u32_at(entries_ptr + 8) | |
| return Node(ptr, flags, NodeTable(entries_ptr, child_count, data_count)) | |
| elif sig == MAGIC_ENTRIES_LIST: | |
| pages = [] | |
| page_ptr = entries_ptr | |
| while page_ptr: | |
| sig2 = fv.u32_at(page_ptr) | |
| if sig2 != MAGIC_ENTRIES_LIST: | |
| raise IOError(f'Bad list page signature at #{page_ptr}: {sig2}') | |
| capacity = fv.u32_at(page_ptr + 4) | |
| child_count = fv.u32_at(page_ptr + 8) | |
| data_count = fv.u32_at(page_ptr + 12) | |
| next_ptr = fv.u64_at(page_ptr + 16) | |
| pages.append(NodeListPage(page_ptr, capacity, child_count, data_count, next_ptr)) | |
| page_ptr = next_ptr | |
| return Node(ptr, flags, NodeList(pages)) | |
| else: | |
| raise IOError(f'Unrecognized entries signature #{sig} at {entries_ptr}') | |
| def read_slot_ptr_uid(fv: FileView, slot_pos): | |
| if slot_pos is None: | |
| return 0, 0 | |
| p = fv.u64_at(slot_pos) | |
| uid = fv.u32_at(slot_pos + 8) | |
| return p, uid | |
| def read_data_chain(fv: FileView, data_ptr): | |
| # Reads chained data blocks and returns the full concatenated payload bytes | |
| # Data main block layout: | |
| # [u32 1070421][u32 id][u64 total_size][u64 seg_size][u64 next_ptr] then seg bytes | |
| # Add blocks: [u32 2118997][u64 seg_size][u64 next_ptr] then seg bytes | |
| if data_ptr <= 0: | |
| return b'' | |
| pos = data_ptr | |
| sig = fv.u32_at(pos) | |
| if sig != MAGIC_DATA_MAIN: | |
| raise IOError(f'Bad data signature at #{pos}: {sig}') | |
| # id_ = fv.u32_at(pos + 4) | |
| total = fv.u64_at(pos + 8) | |
| seg = fv.u64_at(pos + 16) | |
| next_ptr = fv.u64_at(pos + 24) | |
| payload = bytearray() | |
| cur = pos + 32 | |
| take = min(seg, total) | |
| payload += fv.read_at(cur, take) | |
| remain = total - take | |
| while remain > 0: | |
| if next_ptr <= 0: | |
| raise IOError('Corrupted archive: data chain cut') | |
| pos = next_ptr | |
| sig = fv.u32_at(pos) | |
| if sig != MAGIC_DATA_ADD: | |
| raise IOError(f'Bad data addition signature at #{pos}: {sig}') | |
| seg = fv.u64_at(pos + 4) | |
| next_ptr = fv.u64_at(pos + 12) | |
| cur = pos + 20 | |
| take = min(seg, remain) | |
| payload += fv.read_at(cur, take) | |
| remain -= take | |
| return bytes(payload) | |
| def extract_tile_image_bytes(fv: FileView, slot_pos): | |
| data_ptr, _uid = read_slot_ptr_uid(fv, slot_pos) | |
| if data_ptr <= 0: | |
| return None | |
| raw = read_data_chain(fv, data_ptr) | |
| if len(raw) < 2: | |
| return None | |
| # Strip custom 2+N header: read = (b0<<1) + b1 | |
| skip = ((raw[0] & 0xFF) << 1) + (raw[1] & 0xFF) | |
| body = raw[2 + skip:] | |
| return body | |
| def digits_to_int_lsd_first(digits): | |
| val = 0 | |
| mul = 1 | |
| for d in digits: | |
| val += d * mul | |
| mul *= 10 | |
| return val | |
| def get_child_node_from_list(fv: FileView, nlist: NodeList, idx): | |
| slot = nlist.child_slot_pos(idx) | |
| if slot is None: | |
| return None | |
| child_ptr, _ = read_slot_ptr_uid(fv, slot) | |
| if child_ptr <= 0: | |
| return None | |
| return parse_node(fv, child_ptr) | |
| def get_child_node_from_table(fv: FileView, ntable: NodeTable, idx): | |
| slot = ntable.child_slot_pos(idx) | |
| if slot is None: | |
| return None | |
| child_ptr, _ = read_slot_ptr_uid(fv, slot) | |
| if child_ptr <= 0: | |
| return None | |
| return parse_node(fv, child_ptr) | |
| def get_tile_from_table(fv: FileView, ntable: NodeTable, idx): | |
| slot = ntable.data_slot_pos(idx) | |
| if slot is None: | |
| return None | |
| return extract_tile_image_bytes(fv, slot) | |
| def enumerate_tiles(fv: FileView, level_node: Node): | |
| # level_node is a TL (list of 4 quadrant TN roots at indices 0..3) | |
| if not isinstance(level_node.entries, NodeList): | |
| raise ValueError('Level node must be a list of quadrants') | |
| tiles = [] # list of (x, y, bytes) | |
| # Quadrant mapping: 0:(+,+), 1:(+,-), 2:(-,+), 3:(-,-) | |
| quadrants = [ | |
| (0, 1, 1), | |
| (1, 1, -1), | |
| (2, -1, 1), | |
| (3, -1, -1), | |
| ] | |
| for idx, x_sign, y_sign in quadrants: | |
| qnode = get_child_node_from_list(fv, level_node.entries, idx) | |
| if qnode is None or not isinstance(qnode.entries, NodeTable): | |
| continue | |
| # Traverse X digits first | |
| def walk_x(node_table: NodeTable, x_digits): | |
| # deeper x digits (children 0..9) | |
| for d in range(10): | |
| child = get_child_node_from_table(fv, node_table, d) | |
| if child and isinstance(child.entries, NodeTable): | |
| walk_x(child.entries, x_digits + [d]) | |
| # last X digit to reach Y-branch (children 10..19) | |
| for d in range(10): | |
| y_node = get_child_node_from_table(fv, node_table, 10 + d) | |
| if y_node and isinstance(y_node.entries, NodeTable): | |
| walk_y(y_node.entries, x_digits + [d], []) | |
| def walk_y(node_table: NodeTable, x_digits, y_digits): | |
| # deeper Y digits (children 0..9) | |
| for d in range(10): | |
| child = get_child_node_from_table(fv, node_table, d) | |
| if child and isinstance(child.entries, NodeTable): | |
| walk_y(child.entries, x_digits, y_digits + [d]) | |
| # leaf tiles (indices 0..9) | |
| for d in range(10): | |
| data = get_tile_from_table(fv, node_table, d) | |
| if data: | |
| x = x_sign * digits_to_int_lsd_first(x_digits) | |
| y = y_sign * digits_to_int_lsd_first(y_digits + [d]) | |
| tiles.append((x, y, data)) | |
| walk_x(qnode.entries, []) | |
| return tiles | |
| def enumerate_tile_slots(fv: FileView, level_node: Node): | |
| """Enumerate tile slot positions without reading image data. | |
| Returns list of tuples (x, y, slot_pos) where slot_pos is the absolute | |
| file position of the corresponding data slot entry inside the level tree. | |
| This avoids materializing tile bytes in memory and can be used to lazily | |
| fetch tiles with extract_tile_image_bytes when needed. | |
| """ | |
| if not isinstance(level_node.entries, NodeList): | |
| raise ValueError('Level node must be a list of quadrants') | |
| out = [] | |
| # Quadrant mapping consistent with enumerate_tiles | |
| quadrants = [ | |
| (0, 1, 1), | |
| (1, 1, -1), | |
| (2, -1, 1), | |
| (3, -1, -1), | |
| ] | |
| for idx, x_sign, y_sign in quadrants: | |
| qnode = get_child_node_from_list(fv, level_node.entries, idx) | |
| if qnode is None or not isinstance(qnode.entries, NodeTable): | |
| continue | |
| def walk_x(node_table: NodeTable, x_digits): | |
| # deeper x digits (children 0..9) | |
| for d in range(10): | |
| child = get_child_node_from_table(fv, node_table, d) | |
| if child and isinstance(child.entries, NodeTable): | |
| walk_x(child.entries, x_digits + [d]) | |
| # last X digit to reach Y-branch (children 10..19) | |
| for d in range(10): | |
| y_node = get_child_node_from_table(fv, node_table, 10 + d) | |
| if y_node and isinstance(y_node.entries, NodeTable): | |
| walk_y(y_node.entries, x_digits + [d], []) | |
| def walk_y(node_table: NodeTable, x_digits, y_digits): | |
| # deeper Y digits (children 0..9) | |
| for d in range(10): | |
| child = get_child_node_from_table(fv, node_table, d) | |
| if child and isinstance(child.entries, NodeTable): | |
| walk_y(child.entries, x_digits, y_digits + [d]) | |
| # leaf tiles (indices 0..9) — collect slot positions if present | |
| for d in range(10): | |
| slot = node_table.data_slot_pos(d) | |
| if slot is None: | |
| continue | |
| data_ptr, _ = read_slot_ptr_uid(fv, slot) | |
| if data_ptr and data_ptr > 0: | |
| x = x_sign * digits_to_int_lsd_first(x_digits) | |
| y = y_sign * digits_to_int_lsd_first(y_digits + [d]) | |
| out.append((x, y, slot)) | |
| walk_x(qnode.entries, []) | |
| return out | |
| def list_level_nodes(fv: FileView, root_node: Node): | |
| """Return a list of level nodes available under the first channel. | |
| Many PGD archives store multiple resolution levels (LODs) as a list | |
| under the first channel. Previously we always returned index 0, which | |
| can be a low-resolution overview. This function enumerates all levels | |
| so callers can choose the desired one (e.g., highest detail = last). | |
| """ | |
| if not isinstance(root_node.entries, NodeList): | |
| return [] | |
| channel = get_child_node_from_list(fv, root_node.entries, 0) | |
| if channel is None or not isinstance(channel.entries, NodeList): | |
| return [] | |
| # Iterate through all children of the channel list | |
| levels = [] | |
| child_total = channel.entries.total_children() | |
| for idx in range(child_total): | |
| lvl = get_child_node_from_list(fv, channel.entries, idx) | |
| if lvl is not None: | |
| levels.append(lvl) | |
| return levels | |
| def find_first_level_node(fv: FileView, root_node: Node): | |
| # Kept for backward compatibility; now returns the first available level | |
| levels = list_level_nodes(fv, root_node) | |
| return levels[0] if levels else None | |
| def find_highest_detail_level_node(fv: FileView, root_node: Node): | |
| """Heuristic: return the last level in the list, which tends to be the highest detail.""" | |
| levels = list_level_nodes(fv, root_node) | |
| return levels[-1] if levels else None | |
| def assemble_image(tiles, out_path): | |
| if Image is None: | |
| raise RuntimeError('Pillow not available. Please install pillow to decode tiles.') | |
| if not tiles: | |
| raise RuntimeError('No tiles found') | |
| # Determine tile size from first decodable tile; filter None | |
| tile_images = [] | |
| tile_w = tile_h = None | |
| min_x = min_y = 10**9 | |
| max_x = max_y = -10**9 | |
| for x, y, data in tiles: | |
| try: | |
| img = Image.open(io.BytesIO(data)).convert('RGBA') | |
| except Exception: | |
| # skip undecodable tiles | |
| continue | |
| if tile_w is None: | |
| tile_w, tile_h = img.size | |
| min_x = min(min_x, x) | |
| min_y = min(min_y, y) | |
| max_x = max(max_x, x) | |
| max_y = max(max_y, y) | |
| tile_images.append((x, y, img)) | |
| if tile_w is None: | |
| raise RuntimeError('Failed to decode any tile (is WebP support enabled in Pillow?)') | |
| cols = max_x - min_x + 1 | |
| rows = max_y - min_y + 1 | |
| out_w = cols * tile_w | |
| out_h = rows * tile_h | |
| canvas = Image.new('RGBA', (out_w, out_h), (0, 0, 0, 0)) | |
| for x, y, img in tile_images: | |
| cx = (x - min_x) * tile_w | |
| cy = (y - min_y) * tile_h | |
| canvas.paste(img, (cx, cy)) | |
| canvas.save(out_path) | |
| return out_path | |
| def load_root_node(fv: FileView): | |
| # File header: [u32 magic][u32 version][u64 root_ptr] | |
| magic = fv.u32_at(0) | |
| if magic != MAGIC_FILE: | |
| raise IOError(f'Not a PGD file (magic={hex(magic)})') | |
| version = fv.u32_at(4) | |
| if version > 1: | |
| raise IOError(f'Unsupported PGD version {version}') | |
| root_ptr = fv.u64_at(8) | |
| return parse_node(fv, root_ptr) | |
| def main(): | |
| parser = argparse.ArgumentParser(description='Assemble a raster mosaic from a PGD level') | |
| parser.add_argument('--pgd', default=None, help='Path to PGD file (default: ./test.pgd)') | |
| parser.add_argument('--out', default=None, help='Output PNG path (default: ./pgd_mosaic.png)') | |
| parser.add_argument('--level', type=int, default=None, help='Level index under first channel; default highest detail') | |
| args = parser.parse_args() | |
| base = os.path.dirname(__file__) | |
| pgd_path = args.pgd or os.path.join(base, 'test.pgd') | |
| out_path = args.out or os.path.join(base, 'pgd_mosaic.png') | |
| fv = FileView(pgd_path) | |
| try: | |
| root = load_root_node(fv) | |
| # Prefer the highest-detail level by default for better resolution, allow override | |
| if args.level is not None: | |
| levels = list_level_nodes(fv, root) | |
| if not levels: | |
| raise RuntimeError('No level nodes found') | |
| if args.level < 0 or args.level >= len(levels): | |
| raise RuntimeError(f'--level out of range. Found {len(levels)} levels (0..{len(levels)-1})') | |
| level = levels[args.level] | |
| print(f"Using level {args.level}/{len(levels)-1}") | |
| else: | |
| # Default highest detail | |
| level = find_highest_detail_level_node(fv, root) | |
| if level is None: | |
| raise RuntimeError('Failed to locate any level node') | |
| # Parse metadata (PGD_CO / PGD_CB) from the level node | |
| meta = read_node_metadata(fv, level) | |
| tiles = enumerate_tiles(fv, level) | |
| print(f'Found {len(tiles)} tile candidates') | |
| # Write tiles CSV (grid and pixel positions) | |
| tiles_csv = os.path.join(base, 'pgd_tiles.csv') | |
| # First decode at least one tile to get tile size | |
| sample_img = None | |
| for _, _, data in tiles: | |
| try: | |
| sample_img = Image.open(io.BytesIO(data)) if Image else None | |
| if sample_img: | |
| break | |
| except Exception: | |
| continue | |
| if sample_img: | |
| tile_w, tile_h = sample_img.size | |
| else: | |
| # default to 256 if decoding is disabled | |
| tile_w = tile_h = 256 | |
| if tiles: | |
| min_x = min(t[0] for t in tiles) | |
| min_y = min(t[1] for t in tiles) | |
| else: | |
| min_x = min_y = 0 | |
| with open(tiles_csv, 'w', newline='', encoding='utf-8') as fcsv: | |
| w = csv.writer(fcsv) | |
| w.writerow(['x', 'y', 'pixel_x', 'pixel_y', 'tile_width', 'tile_height']) | |
| for x, y, data in tiles: | |
| px = (x - min_x) * tile_w | |
| py = (y - min_y) * tile_h | |
| w.writerow([x, y, px, py, tile_w, tile_h]) | |
| print(f'Wrote {tiles_csv}') | |
| # Save composite image | |
| saved = assemble_image(tiles, out_path) | |
| print(f'Wrote {saved}') | |
| # Try to output WGS84 outline and bounds from PGD_CO | |
| co = meta.get('PGD_CO') if isinstance(meta, dict) else None | |
| if isinstance(co, str) and co.strip(): | |
| pts = [] | |
| for token in co.strip().split(): | |
| if ',' in token: | |
| lon_s, lat_s = token.split(',', 1) | |
| try: | |
| lon = float(lon_s) | |
| lat = float(lat_s) | |
| pts.append((lon, lat)) | |
| except ValueError: | |
| pass | |
| if pts: | |
| outline_csv = os.path.join(base, 'pgd_outline_wgs84.csv') | |
| with open(outline_csv, 'w', newline='', encoding='utf-8') as fco: | |
| w = csv.writer(fco) | |
| w.writerow(['lon', 'lat']) | |
| for lon, lat in pts: | |
| w.writerow([lon, lat]) | |
| min_lon = min(p[0] for p in pts) | |
| max_lon = max(p[0] for p in pts) | |
| min_lat = min(p[1] for p in pts) | |
| max_lat = max(p[1] for p in pts) | |
| print(f'WGS84 bounds: lon [{min_lon}, {max_lon}] lat [{min_lat}, {max_lat}]') | |
| with open(os.path.join(base, 'pgd_outline_bounds_wgs84.txt'), 'w', encoding='utf-8') as fb: | |
| fb.write(f'min_lon={min_lon}\nmax_lon={max_lon}\nmin_lat={min_lat}\nmax_lat={max_lat}\n') | |
| print(f'Wrote {outline_csv} and pgd_outline_bounds_wgs84.txt') | |
| else: | |
| cb = meta.get('PGD_CB') if isinstance(meta, dict) else None | |
| if isinstance(cb, str) and cb.count(',') == 3: | |
| print('PGD_CO not found; PGD_CB is present but in projected coordinates; WGS84 outline not available without CRS info') | |
| else: | |
| print('No outline metadata (PGD_CO/PGD_CB) found in level node') | |
| finally: | |
| fv.close() | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment