Skip to content

Instantly share code, notes, and snippets.

@cybermaxs
Created March 3, 2026 09:31
Show Gist options
  • Select an option

  • Save cybermaxs/cbed714bec0e5cca20d2615a61d419a2 to your computer and use it in GitHub Desktop.

Select an option

Save cybermaxs/cbed714bec0e5cca20d2615a61d419a2 to your computer and use it in GitHub Desktop.
Use duckDB and regexes to scan for PII on s3 files.
import json, sys, duckdb, typer
PATTERNS = {
"email": r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}",
"phone": r"\+[1-9]\d{6,14}",
"ssn": r"\d{3}-\d{2}-\d{4}",
"credit_card": r"4[0-9]{12}([0-9]{3})?|5[1-5][0-9]{14}|3[47][0-9]{13}",
"ipv4": r"(([0-9]{1,3})\.){3}([0-9]{1,3})",
"geolocation": r"-?\d{1,3}\.\d{5,}",
}
def main(
s3_path: str,
sample: int = typer.Option(5000, help="Rows to sample per column"),
region: str = typer.Option("eu-west-3", help="AWS region"),
):
conn = duckdb.connect()
conn.install_extension("httpfs")
conn.load_extension("httpfs")
conn.execute(f"CREATE SECRET (TYPE S3, PROVIDER CREDENTIAL_CHAIN, REGION '{region}')")
base = s3_path.rstrip("/")
src = base if base.endswith(".parquet") else f"{base}/**/*.parquet"
try:
columns = [r[0] for r in conn.execute(f"DESCRIBE SELECT * FROM read_parquet('{src}') LIMIT 0").fetchall()]
except Exception as e:
sys.exit(str(e))
def scan_query(col):
counts = ", ".join(f"COUNT(*) FILTER (WHERE regexp_matches(\"{col}\"::VARCHAR, '{pat}', 'i')) AS \"{label}\"" for label, pat in PATTERNS.items())
samples = ", ".join(f"list(\"{col}\"::VARCHAR) FILTER (WHERE regexp_matches(\"{col}\"::VARCHAR, '{pat}', 'i')) AS \"{label}_samples\"" for label, pat in PATTERNS.items())
return f'SELECT {counts}, {samples}, COUNT(*) AS total FROM (SELECT * FROM read_parquet(\'{src}\') WHERE "{col}" IS NOT NULL) USING SAMPLE {sample} ROWS'
findings = []
n = len(PATTERNS)
for col in columns:
s = col.replace('"', '""')
try:
row = conn.execute(scan_query(s)).fetchone()
except Exception:
continue
total = row[-1]
for i, label in enumerate(PATTERNS):
if row[i]:
findings.append({"column": col, "pii_type": label, "matches": row[i], "sample": total, "rate": f"{row[i]/total:.1%}", "examples": (row[n + i] or [])[:3]})
print(json.dumps(findings, indent=2))
typer.run(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment