Created
March 3, 2026 09:31
-
-
Save cybermaxs/cbed714bec0e5cca20d2615a61d419a2 to your computer and use it in GitHub Desktop.
Use duckDB and regexes to scan for PII on s3 files.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json, sys, duckdb, typer | |
| PATTERNS = { | |
| "email": r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}", | |
| "phone": r"\+[1-9]\d{6,14}", | |
| "ssn": r"\d{3}-\d{2}-\d{4}", | |
| "credit_card": r"4[0-9]{12}([0-9]{3})?|5[1-5][0-9]{14}|3[47][0-9]{13}", | |
| "ipv4": r"(([0-9]{1,3})\.){3}([0-9]{1,3})", | |
| "geolocation": r"-?\d{1,3}\.\d{5,}", | |
| } | |
| def main( | |
| s3_path: str, | |
| sample: int = typer.Option(5000, help="Rows to sample per column"), | |
| region: str = typer.Option("eu-west-3", help="AWS region"), | |
| ): | |
| conn = duckdb.connect() | |
| conn.install_extension("httpfs") | |
| conn.load_extension("httpfs") | |
| conn.execute(f"CREATE SECRET (TYPE S3, PROVIDER CREDENTIAL_CHAIN, REGION '{region}')") | |
| base = s3_path.rstrip("/") | |
| src = base if base.endswith(".parquet") else f"{base}/**/*.parquet" | |
| try: | |
| columns = [r[0] for r in conn.execute(f"DESCRIBE SELECT * FROM read_parquet('{src}') LIMIT 0").fetchall()] | |
| except Exception as e: | |
| sys.exit(str(e)) | |
| def scan_query(col): | |
| counts = ", ".join(f"COUNT(*) FILTER (WHERE regexp_matches(\"{col}\"::VARCHAR, '{pat}', 'i')) AS \"{label}\"" for label, pat in PATTERNS.items()) | |
| samples = ", ".join(f"list(\"{col}\"::VARCHAR) FILTER (WHERE regexp_matches(\"{col}\"::VARCHAR, '{pat}', 'i')) AS \"{label}_samples\"" for label, pat in PATTERNS.items()) | |
| return f'SELECT {counts}, {samples}, COUNT(*) AS total FROM (SELECT * FROM read_parquet(\'{src}\') WHERE "{col}" IS NOT NULL) USING SAMPLE {sample} ROWS' | |
| findings = [] | |
| n = len(PATTERNS) | |
| for col in columns: | |
| s = col.replace('"', '""') | |
| try: | |
| row = conn.execute(scan_query(s)).fetchone() | |
| except Exception: | |
| continue | |
| total = row[-1] | |
| for i, label in enumerate(PATTERNS): | |
| if row[i]: | |
| findings.append({"column": col, "pii_type": label, "matches": row[i], "sample": total, "rate": f"{row[i]/total:.1%}", "examples": (row[n + i] or [])[:3]}) | |
| print(json.dumps(findings, indent=2)) | |
| typer.run(main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment