Skip to content

Instantly share code, notes, and snippets.

@Exieros
Last active September 30, 2025 14:50
Show Gist options
  • Select an option

  • Save Exieros/5ba45874c90964833bc922d00c52d74e to your computer and use it in GitHub Desktop.

Select an option

Save Exieros/5ba45874c90964833bc922d00c52d74e to your computer and use it in GitHub Desktop.
This script allows you to extract layer tiles from the proprietary *.pgd format used by Alpine Quest and Offline Maps applications and combine them into a single png image. The script was written using Github Copilot Chatgpt 5.0 based on source code obtained from jadx. Time spent: 10 minutes. Wow.
import io
import os
import csv
import struct
import argparse
from collections import deque
try:
from PIL import Image
except Exception as e:
Image = None
# Try python pgd_assemble.py --help
# PGD file format constants (from reverse-engineered Java)
MAGIC_FILE = 0x5047443A # 'PGD:' big-endian int
MAGIC_NODE = 87381 # node header
MAGIC_ENTRIES_LIST = 152917
MAGIC_ENTRIES_TABLE = 283989
MAGIC_DATA_MAIN = 1070421
MAGIC_DATA_ADD = 2118997
def be_u32(b):
return struct.unpack('>I', b)[0]
def be_u64(b):
return struct.unpack('>Q', b)[0]
class FileView:
def __init__(self, path):
self.path = path
self.f = open(path, 'rb')
def close(self):
try:
self.f.close()
except Exception:
pass
def read_at(self, pos, size):
self.f.seek(pos)
return self.f.read(size)
def u32_at(self, pos):
return be_u32(self.read_at(pos, 4))
def u64_at(self, pos):
return be_u64(self.read_at(pos, 8))
class NodeListPage:
def __init__(self, base, capacity, child_count, data_count, next_ptr):
self.base = base
self.capacity = capacity
self.child_count = child_count
self.data_count = data_count
self.next_ptr = next_ptr
self.entries_pos = base + 24
class NodeList:
def __init__(self, pages):
self.pages = pages
def total_children(self):
return sum(p.child_count for p in self.pages)
def total_data(self):
return sum(p.data_count for p in self.pages)
def child_slot_pos(self, idx):
# idx is 0-based across the chain; find page and slot
off = idx
for p in self.pages:
if off < p.child_count:
return p.entries_pos + off * 12
off -= p.child_count
return None
def data_slot_pos(self, idx):
# idx is 0-based across the chain; data are stored reversed at page tail
off = idx
for p in self.pages:
if off < p.data_count:
total_slots = p.capacity
# data indices are placed from the end, contiguous
slot_index = (total_slots - 1) - off
return p.entries_pos + slot_index * 12
off -= p.data_count
return None
class NodeTable:
def __init__(self, base, child_count, data_count):
self.base = base
self.child_count = child_count
self.data_count = data_count
self.entries_pos = base + 12
def child_slot_pos(self, idx):
if 0 <= idx < self.child_count:
return self.entries_pos + idx * 12
return None
def data_slot_pos(self, idx):
# tiles are stored reversed at the end: slot_index = total_slots - 1 - idx
if 0 <= idx < self.data_count:
total_slots = self.child_count + self.data_count
slot_index = (total_slots - 1) - idx
return self.entries_pos + slot_index * 12
return None
class Node:
def __init__(self, ptr, flags, entries):
self.ptr = ptr
self.flags = flags
self.entries = entries # NodeList or NodeTable
# -------------------- Metadata (aux) decoding --------------------
class _BytesReader:
def __init__(self, data: bytes):
self._b = data
self._i = 0
def read(self, n) -> bytes:
if self._i + n > len(self._b):
raise EOFError('Unexpected EOF in aux reader')
out = self._b[self._i:self._i+n]
self._i += n
return out
def read_int(self) -> int:
return struct.unpack('>i', self.read(4))[0]
def read_long(self) -> int:
return struct.unpack('>q', self.read(8))[0]
def read_double(self) -> float:
return struct.unpack('>d', self.read(8))[0]
def read_bool(self) -> bool:
b = self.read(1)[0]
return b != 0
def read_string(self) -> str | None:
ln = self.read_int()
if ln == 0:
return None
s = self.read(ln).decode('utf-8')
return s
def read_node_metadata(fv: FileView, node: Node) -> dict:
# Node header stores meta_ptr at ptr+8; metadata is a data chain whose body is aux map
meta_ptr = fv.u64_at(node.ptr + 8)
if meta_ptr == 0:
return {}
body = read_data_chain(fv, meta_ptr)
if not body:
return {}
r = _BytesReader(body)
out = {}
try:
count = r.read_int()
for _ in range(count):
key = r.read_string()
t = r.read_int()
if t >= 0:
# string (UTF-8, length t)
val = r.read(t).decode('utf-8') if t > 0 else ''
elif t == -1:
val = r.read_bool()
elif t == -2:
val = r.read_long()
elif t == -3:
val = r.read_double()
elif t == -4:
# blob: int length + bytes
blen = r.read_int()
val = r.read(blen) if blen > 0 else b''
else:
val = None
if key is not None and val is not None:
out[key] = val
except Exception:
# tolerate metadata parsing issues
pass
return out
def parse_node(fv: FileView, ptr):
# Node header: [u32 magic][u32 flags][u64 meta_ptr][u64 entries_ptr] then entries at entries_ptr
if ptr <= 0:
return None
magic = fv.u32_at(ptr)
if magic != MAGIC_NODE:
raise IOError(f'Bad node signature at #{ptr}: {magic}')
flags = fv.u32_at(ptr + 4)
# meta_ptr = fv.u64_at(ptr + 8) # not needed here
entries_ptr = fv.u64_at(ptr + 16)
sig = fv.u32_at(entries_ptr)
if sig == MAGIC_ENTRIES_TABLE:
child_count = fv.u32_at(entries_ptr + 4)
data_count = fv.u32_at(entries_ptr + 8)
return Node(ptr, flags, NodeTable(entries_ptr, child_count, data_count))
elif sig == MAGIC_ENTRIES_LIST:
pages = []
page_ptr = entries_ptr
while page_ptr:
sig2 = fv.u32_at(page_ptr)
if sig2 != MAGIC_ENTRIES_LIST:
raise IOError(f'Bad list page signature at #{page_ptr}: {sig2}')
capacity = fv.u32_at(page_ptr + 4)
child_count = fv.u32_at(page_ptr + 8)
data_count = fv.u32_at(page_ptr + 12)
next_ptr = fv.u64_at(page_ptr + 16)
pages.append(NodeListPage(page_ptr, capacity, child_count, data_count, next_ptr))
page_ptr = next_ptr
return Node(ptr, flags, NodeList(pages))
else:
raise IOError(f'Unrecognized entries signature #{sig} at {entries_ptr}')
def read_slot_ptr_uid(fv: FileView, slot_pos):
if slot_pos is None:
return 0, 0
p = fv.u64_at(slot_pos)
uid = fv.u32_at(slot_pos + 8)
return p, uid
def read_data_chain(fv: FileView, data_ptr):
# Reads chained data blocks and returns the full concatenated payload bytes
# Data main block layout:
# [u32 1070421][u32 id][u64 total_size][u64 seg_size][u64 next_ptr] then seg bytes
# Add blocks: [u32 2118997][u64 seg_size][u64 next_ptr] then seg bytes
if data_ptr <= 0:
return b''
pos = data_ptr
sig = fv.u32_at(pos)
if sig != MAGIC_DATA_MAIN:
raise IOError(f'Bad data signature at #{pos}: {sig}')
# id_ = fv.u32_at(pos + 4)
total = fv.u64_at(pos + 8)
seg = fv.u64_at(pos + 16)
next_ptr = fv.u64_at(pos + 24)
payload = bytearray()
cur = pos + 32
take = min(seg, total)
payload += fv.read_at(cur, take)
remain = total - take
while remain > 0:
if next_ptr <= 0:
raise IOError('Corrupted archive: data chain cut')
pos = next_ptr
sig = fv.u32_at(pos)
if sig != MAGIC_DATA_ADD:
raise IOError(f'Bad data addition signature at #{pos}: {sig}')
seg = fv.u64_at(pos + 4)
next_ptr = fv.u64_at(pos + 12)
cur = pos + 20
take = min(seg, remain)
payload += fv.read_at(cur, take)
remain -= take
return bytes(payload)
def extract_tile_image_bytes(fv: FileView, slot_pos):
data_ptr, _uid = read_slot_ptr_uid(fv, slot_pos)
if data_ptr <= 0:
return None
raw = read_data_chain(fv, data_ptr)
if len(raw) < 2:
return None
# Strip custom 2+N header: read = (b0<<1) + b1
skip = ((raw[0] & 0xFF) << 1) + (raw[1] & 0xFF)
body = raw[2 + skip:]
return body
def digits_to_int_lsd_first(digits):
val = 0
mul = 1
for d in digits:
val += d * mul
mul *= 10
return val
def get_child_node_from_list(fv: FileView, nlist: NodeList, idx):
slot = nlist.child_slot_pos(idx)
if slot is None:
return None
child_ptr, _ = read_slot_ptr_uid(fv, slot)
if child_ptr <= 0:
return None
return parse_node(fv, child_ptr)
def get_child_node_from_table(fv: FileView, ntable: NodeTable, idx):
slot = ntable.child_slot_pos(idx)
if slot is None:
return None
child_ptr, _ = read_slot_ptr_uid(fv, slot)
if child_ptr <= 0:
return None
return parse_node(fv, child_ptr)
def get_tile_from_table(fv: FileView, ntable: NodeTable, idx):
slot = ntable.data_slot_pos(idx)
if slot is None:
return None
return extract_tile_image_bytes(fv, slot)
def enumerate_tiles(fv: FileView, level_node: Node):
# level_node is a TL (list of 4 quadrant TN roots at indices 0..3)
if not isinstance(level_node.entries, NodeList):
raise ValueError('Level node must be a list of quadrants')
tiles = [] # list of (x, y, bytes)
# Quadrant mapping: 0:(+,+), 1:(+,-), 2:(-,+), 3:(-,-)
quadrants = [
(0, 1, 1),
(1, 1, -1),
(2, -1, 1),
(3, -1, -1),
]
for idx, x_sign, y_sign in quadrants:
qnode = get_child_node_from_list(fv, level_node.entries, idx)
if qnode is None or not isinstance(qnode.entries, NodeTable):
continue
# Traverse X digits first
def walk_x(node_table: NodeTable, x_digits):
# deeper x digits (children 0..9)
for d in range(10):
child = get_child_node_from_table(fv, node_table, d)
if child and isinstance(child.entries, NodeTable):
walk_x(child.entries, x_digits + [d])
# last X digit to reach Y-branch (children 10..19)
for d in range(10):
y_node = get_child_node_from_table(fv, node_table, 10 + d)
if y_node and isinstance(y_node.entries, NodeTable):
walk_y(y_node.entries, x_digits + [d], [])
def walk_y(node_table: NodeTable, x_digits, y_digits):
# deeper Y digits (children 0..9)
for d in range(10):
child = get_child_node_from_table(fv, node_table, d)
if child and isinstance(child.entries, NodeTable):
walk_y(child.entries, x_digits, y_digits + [d])
# leaf tiles (indices 0..9)
for d in range(10):
data = get_tile_from_table(fv, node_table, d)
if data:
x = x_sign * digits_to_int_lsd_first(x_digits)
y = y_sign * digits_to_int_lsd_first(y_digits + [d])
tiles.append((x, y, data))
walk_x(qnode.entries, [])
return tiles
def enumerate_tile_slots(fv: FileView, level_node: Node):
"""Enumerate tile slot positions without reading image data.
Returns list of tuples (x, y, slot_pos) where slot_pos is the absolute
file position of the corresponding data slot entry inside the level tree.
This avoids materializing tile bytes in memory and can be used to lazily
fetch tiles with extract_tile_image_bytes when needed.
"""
if not isinstance(level_node.entries, NodeList):
raise ValueError('Level node must be a list of quadrants')
out = []
# Quadrant mapping consistent with enumerate_tiles
quadrants = [
(0, 1, 1),
(1, 1, -1),
(2, -1, 1),
(3, -1, -1),
]
for idx, x_sign, y_sign in quadrants:
qnode = get_child_node_from_list(fv, level_node.entries, idx)
if qnode is None or not isinstance(qnode.entries, NodeTable):
continue
def walk_x(node_table: NodeTable, x_digits):
# deeper x digits (children 0..9)
for d in range(10):
child = get_child_node_from_table(fv, node_table, d)
if child and isinstance(child.entries, NodeTable):
walk_x(child.entries, x_digits + [d])
# last X digit to reach Y-branch (children 10..19)
for d in range(10):
y_node = get_child_node_from_table(fv, node_table, 10 + d)
if y_node and isinstance(y_node.entries, NodeTable):
walk_y(y_node.entries, x_digits + [d], [])
def walk_y(node_table: NodeTable, x_digits, y_digits):
# deeper Y digits (children 0..9)
for d in range(10):
child = get_child_node_from_table(fv, node_table, d)
if child and isinstance(child.entries, NodeTable):
walk_y(child.entries, x_digits, y_digits + [d])
# leaf tiles (indices 0..9) — collect slot positions if present
for d in range(10):
slot = node_table.data_slot_pos(d)
if slot is None:
continue
data_ptr, _ = read_slot_ptr_uid(fv, slot)
if data_ptr and data_ptr > 0:
x = x_sign * digits_to_int_lsd_first(x_digits)
y = y_sign * digits_to_int_lsd_first(y_digits + [d])
out.append((x, y, slot))
walk_x(qnode.entries, [])
return out
def list_level_nodes(fv: FileView, root_node: Node):
"""Return a list of level nodes available under the first channel.
Many PGD archives store multiple resolution levels (LODs) as a list
under the first channel. Previously we always returned index 0, which
can be a low-resolution overview. This function enumerates all levels
so callers can choose the desired one (e.g., highest detail = last).
"""
if not isinstance(root_node.entries, NodeList):
return []
channel = get_child_node_from_list(fv, root_node.entries, 0)
if channel is None or not isinstance(channel.entries, NodeList):
return []
# Iterate through all children of the channel list
levels = []
child_total = channel.entries.total_children()
for idx in range(child_total):
lvl = get_child_node_from_list(fv, channel.entries, idx)
if lvl is not None:
levels.append(lvl)
return levels
def find_first_level_node(fv: FileView, root_node: Node):
# Kept for backward compatibility; now returns the first available level
levels = list_level_nodes(fv, root_node)
return levels[0] if levels else None
def find_highest_detail_level_node(fv: FileView, root_node: Node):
"""Heuristic: return the last level in the list, which tends to be the highest detail."""
levels = list_level_nodes(fv, root_node)
return levels[-1] if levels else None
def assemble_image(tiles, out_path):
if Image is None:
raise RuntimeError('Pillow not available. Please install pillow to decode tiles.')
if not tiles:
raise RuntimeError('No tiles found')
# Determine tile size from first decodable tile; filter None
tile_images = []
tile_w = tile_h = None
min_x = min_y = 10**9
max_x = max_y = -10**9
for x, y, data in tiles:
try:
img = Image.open(io.BytesIO(data)).convert('RGBA')
except Exception:
# skip undecodable tiles
continue
if tile_w is None:
tile_w, tile_h = img.size
min_x = min(min_x, x)
min_y = min(min_y, y)
max_x = max(max_x, x)
max_y = max(max_y, y)
tile_images.append((x, y, img))
if tile_w is None:
raise RuntimeError('Failed to decode any tile (is WebP support enabled in Pillow?)')
cols = max_x - min_x + 1
rows = max_y - min_y + 1
out_w = cols * tile_w
out_h = rows * tile_h
canvas = Image.new('RGBA', (out_w, out_h), (0, 0, 0, 0))
for x, y, img in tile_images:
cx = (x - min_x) * tile_w
cy = (y - min_y) * tile_h
canvas.paste(img, (cx, cy))
canvas.save(out_path)
return out_path
def load_root_node(fv: FileView):
# File header: [u32 magic][u32 version][u64 root_ptr]
magic = fv.u32_at(0)
if magic != MAGIC_FILE:
raise IOError(f'Not a PGD file (magic={hex(magic)})')
version = fv.u32_at(4)
if version > 1:
raise IOError(f'Unsupported PGD version {version}')
root_ptr = fv.u64_at(8)
return parse_node(fv, root_ptr)
def main():
parser = argparse.ArgumentParser(description='Assemble a raster mosaic from a PGD level')
parser.add_argument('--pgd', default=None, help='Path to PGD file (default: ./test.pgd)')
parser.add_argument('--out', default=None, help='Output PNG path (default: ./pgd_mosaic.png)')
parser.add_argument('--level', type=int, default=None, help='Level index under first channel; default highest detail')
args = parser.parse_args()
base = os.path.dirname(__file__)
pgd_path = args.pgd or os.path.join(base, 'test.pgd')
out_path = args.out or os.path.join(base, 'pgd_mosaic.png')
fv = FileView(pgd_path)
try:
root = load_root_node(fv)
# Prefer the highest-detail level by default for better resolution, allow override
if args.level is not None:
levels = list_level_nodes(fv, root)
if not levels:
raise RuntimeError('No level nodes found')
if args.level < 0 or args.level >= len(levels):
raise RuntimeError(f'--level out of range. Found {len(levels)} levels (0..{len(levels)-1})')
level = levels[args.level]
print(f"Using level {args.level}/{len(levels)-1}")
else:
# Default highest detail
level = find_highest_detail_level_node(fv, root)
if level is None:
raise RuntimeError('Failed to locate any level node')
# Parse metadata (PGD_CO / PGD_CB) from the level node
meta = read_node_metadata(fv, level)
tiles = enumerate_tiles(fv, level)
print(f'Found {len(tiles)} tile candidates')
# Write tiles CSV (grid and pixel positions)
tiles_csv = os.path.join(base, 'pgd_tiles.csv')
# First decode at least one tile to get tile size
sample_img = None
for _, _, data in tiles:
try:
sample_img = Image.open(io.BytesIO(data)) if Image else None
if sample_img:
break
except Exception:
continue
if sample_img:
tile_w, tile_h = sample_img.size
else:
# default to 256 if decoding is disabled
tile_w = tile_h = 256
if tiles:
min_x = min(t[0] for t in tiles)
min_y = min(t[1] for t in tiles)
else:
min_x = min_y = 0
with open(tiles_csv, 'w', newline='', encoding='utf-8') as fcsv:
w = csv.writer(fcsv)
w.writerow(['x', 'y', 'pixel_x', 'pixel_y', 'tile_width', 'tile_height'])
for x, y, data in tiles:
px = (x - min_x) * tile_w
py = (y - min_y) * tile_h
w.writerow([x, y, px, py, tile_w, tile_h])
print(f'Wrote {tiles_csv}')
# Save composite image
saved = assemble_image(tiles, out_path)
print(f'Wrote {saved}')
# Try to output WGS84 outline and bounds from PGD_CO
co = meta.get('PGD_CO') if isinstance(meta, dict) else None
if isinstance(co, str) and co.strip():
pts = []
for token in co.strip().split():
if ',' in token:
lon_s, lat_s = token.split(',', 1)
try:
lon = float(lon_s)
lat = float(lat_s)
pts.append((lon, lat))
except ValueError:
pass
if pts:
outline_csv = os.path.join(base, 'pgd_outline_wgs84.csv')
with open(outline_csv, 'w', newline='', encoding='utf-8') as fco:
w = csv.writer(fco)
w.writerow(['lon', 'lat'])
for lon, lat in pts:
w.writerow([lon, lat])
min_lon = min(p[0] for p in pts)
max_lon = max(p[0] for p in pts)
min_lat = min(p[1] for p in pts)
max_lat = max(p[1] for p in pts)
print(f'WGS84 bounds: lon [{min_lon}, {max_lon}] lat [{min_lat}, {max_lat}]')
with open(os.path.join(base, 'pgd_outline_bounds_wgs84.txt'), 'w', encoding='utf-8') as fb:
fb.write(f'min_lon={min_lon}\nmax_lon={max_lon}\nmin_lat={min_lat}\nmax_lat={max_lat}\n')
print(f'Wrote {outline_csv} and pgd_outline_bounds_wgs84.txt')
else:
cb = meta.get('PGD_CB') if isinstance(meta, dict) else None
if isinstance(cb, str) and cb.count(',') == 3:
print('PGD_CO not found; PGD_CB is present but in projected coordinates; WGS84 outline not available without CRS info')
else:
print('No outline metadata (PGD_CO/PGD_CB) found in level node')
finally:
fv.close()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment