Skip to content

Instantly share code, notes, and snippets.

@jac18281828
Created November 25, 2025 17:26
Show Gist options
  • Select an option

  • Save jac18281828/c178bf5cce1862fd9e6f993a59b9c306 to your computer and use it in GitHub Desktop.

Select an option

Save jac18281828/c178bf5cce1862fd9e6f993a59b9c306 to your computer and use it in GitHub Desktop.
dump a parquet file and try to interpret its schema
#!/usr/bin/env python3
"""
parquet_schema.py — Quick column (schema) lister for Parquet files or partitioned folders.
Usage:
python parquet_schema.py /path/to/file.parquet
python parquet_schema.py /path/to/folder # infers dataset schema across files
python parquet_schema.py /path --raw # print raw Arrow schema
python parquet_schema.py /path --json # JSON output
python parquet_schema.py /path --sample 5 # show a few rows just to check
Requirements:
pip install pyarrow
(Optional) pip install pandas for --sample
"""
import argparse
import json
import os
import sys
from typing import List, Tuple
try:
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
except Exception as e:
sys.stderr.write("Error: pyarrow is required. Try `pip install pyarrow`.\n")
raise
def _flatten_fields(fields: List[pa.Field], prefix: str = "") -> List[Tuple[str, str]]:
"""
Flatten nested/struct/list types to 'dot' notation for easy viewing.
Returns a list of (name, type_string).
"""
flat: List[Tuple[str, str]] = []
for f in fields:
name = f.name if not prefix else f"{prefix}.{f.name}"
t = f.type
if pa.types.is_struct(t):
# show the struct as a whole + its children
flat.append((name, str(t)))
flat.extend(_flatten_fields(list(t), prefix=name))
elif pa.types.is_list(t) or pa.types.is_large_list(t) or pa.types.is_fixed_size_list(t):
flat.append((name, str(t)))
# If the list contains a struct, expose its child fields too
value_type = t.value_type if hasattr(t, "value_type") else getattr(t, "value_field", None)
vt = value_type if isinstance(value_type, pa.DataType) else (value_type.type if value_type else None)
if vt and pa.types.is_struct(vt):
flat.extend(_flatten_fields(list(vt), prefix=name + "[]"))
else:
flat.append((name, str(t)))
return flat
def _is_dir(path: str) -> bool:
return os.path.isdir(path)
def _infer_schema_from_dir(path: str) -> pa.Schema:
"""
Use pyarrow.dataset to infer the *unified* schema across a Parquet dataset directory,
including typical Hive-style partitioning.
"""
dataset = ds.dataset(path, format="parquet", partitioning="hive")
return dataset.schema
def _schema_from_file(path: str) -> pa.Schema:
pf = pq.ParquetFile(path)
return pf.schema_arrow
def print_table(rows: List[Tuple[str, str]]):
if not rows:
print("No columns detected.")
return
# compute widths
name_w = max(len("column"), max((len(r[0]) for r in rows), default=0))
type_w = max(len("arrow_type"), max((len(r[1]) for r in rows), default=0))
header = f"{'column'.ljust(name_w)} {'arrow_type'.ljust(type_w)}"
bar = f"{'-'*name_w} {'-'*type_w}"
print(header)
print(bar)
for n, t in rows:
print(f"{n.ljust(name_w)} {t.ljust(type_w)}")
def sample_rows(path: str, n: int):
try:
import pandas as pd
except Exception:
print("Install pandas to enable --sample (pip install pandas).")
return
if _is_dir(path):
dataset = ds.dataset(path, format="parquet", partitioning="hive")
tbl = dataset.to_table(limit=n)
df = tbl.to_pandas(types_mapper=pd.ArrowDtype)
else:
tbl = pq.read_table(path, **({"use_threads": True}))
df = tbl.slice(0, n).to_pandas(types_mapper=pd.ArrowDtype)
# Display a compact sample
print("\nSample rows:")
with pd.option_context("display.max_columns", None, "display.width", 160):
print(df.head(n))
def dump_file(path: str):
"""Dump the entire Parquet file as JSON, streaming row by row."""
if _is_dir(path):
parquet_file = pq.ParquetDataset(path)
else:
parquet_file = pq.ParquetFile(path)
# Open JSON array
print("[")
first_row = True
# Stream batches to avoid loading entire file into memory
for batch in parquet_file.iter_batches(batch_size=1000):
# Convert batch to Python dicts
batch_dict = batch.to_pydict()
num_rows = len(batch_dict[list(batch_dict.keys())[0]])
for i in range(num_rows):
if not first_row:
print(",")
first_row = False
# Build row dict
row = {k: v[i] for k, v in batch_dict.items()}
# Print without newline at end so we can add comma
print(json.dumps(row, default=str), end="")
# Close JSON array
print("\n]")
def main():
ap = argparse.ArgumentParser(description="List columns (schema) for a Parquet file or directory.")
ap.add_argument("path", help="Path to a .parquet file OR a directory containing a Parquet dataset")
ap.add_argument("--raw", action="store_true", help="Print raw Arrow schema string")
ap.add_argument("--json", action="store_true", help="Output as JSON instead of a table")
ap.add_argument("--sample", type=int, default=0, help="Show N sample rows (requires pandas)")
ap.add_argument("--dump", action="store_true", help="Dump the entire file as JSON (streams data to avoid memory issues)")
args = ap.parse_args()
path = args.path
if not os.path.exists(path):
print(f"Path not found: {path}", file=sys.stderr)
sys.exit(2)
# Handle --dump option (dumps entire file as JSON)
if args.dump:
try:
dump_file(path)
except Exception as e:
print(f"Failed to dump file: {e}", file=sys.stderr)
sys.exit(1)
return
try:
if _is_dir(path):
schema = _infer_schema_from_dir(path)
else:
schema = _schema_from_file(path)
except Exception as e:
print(f"Failed to read schema: {e}", file=sys.stderr)
sys.exit(1)
if args.raw:
print(schema)
else:
flat = _flatten_fields(list(schema))
if args.json:
print(json.dumps([{"column": n, "arrow_type": t} for n, t in flat], indent=2))
else:
print_table(flat)
if args.sample > 0:
try:
sample_rows(path, args.sample)
except Exception as e:
print(f"Failed to sample rows: {e}", file=sys.stderr)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment