Last active
January 14, 2026 17:22
-
-
Save NickCrews/cb2ff440bcf34f951bb245a239368eb6 to your computer and use it in GitHub Desktop.
Usage `uv run https://gist.github.com/NickCrews/cb2ff440bcf34f951bb245a239368eb6 sync --pg-url 'postgresql://user:pass@host:port/db' --where "schema IN ('crm','ai','districts','users',) and table_name <> 'donations_raw'" --on-exists=ignore`. This is very susceptible to SQL injection acttacks with the --where option, use with caution!
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Inspect and sync PostgreSQL databases with DuckDB. | |
| This is very susceptible to SQL injection acttacks with the --where option, use with caution! | |
| """ | |
| # /// script | |
| # requires-python = ">=3.10" | |
| # dependencies = [ | |
| # "duckdb", | |
| # "python-dotenv", | |
| # ] | |
| # /// | |
| import argparse | |
| import json | |
| import logging | |
| import os | |
| from typing import Literal, TypedDict | |
| import duckdb | |
| from dotenv import load_dotenv | |
| logger = logging.getLogger(__name__) | |
| class TableInfo(TypedDict): | |
| database: str | |
| schema: str | |
| table: str | |
| def sync( | |
| pg_url: str, | |
| duckdb_path: str = "local.duckdb", | |
| *, | |
| on_exists: Literal["error", "ignore", "replace"] = "error", | |
| wheres: list[str] | None = None, | |
| ): | |
| """ | |
| Syncs tables from a PostgreSQL database to a local DuckDB file. | |
| Args: | |
| pg_url: PostgreSQL connection URL. | |
| duckdb_path: Path to the local DuckDB file. | |
| on_exists: How to handle tables that already exist in DuckDB | |
| ('error', 'ignore', 'replace'). | |
| wheres: SQL WHERE clauses to filter tables (combined with AND). | |
| """ | |
| logger.debug("Connecting to DuckDB: %s", duckdb_path) | |
| duckdb_conn = duckdb.connect(database=duckdb_path) | |
| _attach_postgres(duckdb_conn, pg_url) | |
| tables = _get_tables(duckdb_conn, wheres=wheres) | |
| logger.debug("Found %d tables in PostgreSQL: %s", len(tables), tables) | |
| for table in tables: | |
| logger.debug("Processing table: %s", table["table"]) | |
| if on_exists == "error": | |
| create_table_clause = "CREATE TABLE" | |
| elif on_exists == "ignore": | |
| create_table_clause = "CREATE TABLE IF NOT EXISTS" | |
| elif on_exists == "replace": | |
| create_table_clause = "CREATE OR REPLACE TABLE" | |
| else: | |
| raise ValueError(f"Invalid on_exists_mode: {on_exists}") | |
| # Sync the table from PostgreSQL to DuckDB | |
| logger.info( | |
| """Creating table "%s"."%s" in DuckDB from PostgreSQL...""", | |
| table["schema"], | |
| table["table"], | |
| ) | |
| duckdb_conn.execute(f"""CREATE SCHEMA IF NOT EXISTS "{table["schema"]}";""") | |
| duckdb_conn.execute( | |
| f'{create_table_clause} "{table["schema"]}"."{table["table"]}" AS ' | |
| f'SELECT * FROM postgres.{table["schema"]}."{table["table"]}";' | |
| ) | |
| logger.info( | |
| 'Table "%s"."%s" synced successfully.', | |
| table["schema"], | |
| table["table"], | |
| ) | |
| def list_tables( | |
| pg_url: str, | |
| *, | |
| format: Literal["markdown", "json"] = "markdown", | |
| wheres: list[str] | None = None, | |
| ): | |
| """ | |
| Lists all tables in the PostgreSQL database. | |
| Args: | |
| pg_url: PostgreSQL connection URL. | |
| format: Output format ('markdown' or 'json'). | |
| wheres: SQL WHERE clauses to filter tables (combined with AND). | |
| """ | |
| duckdb_conn = duckdb.connect(":memory:") | |
| _attach_postgres(duckdb_conn, pg_url) | |
| tables = _get_tables(duckdb_conn, wheres=wheres) | |
| if format == "json": | |
| print(json.dumps(tables, indent=2)) | |
| else: | |
| if tables: | |
| print("| database | schema | table |") | |
| print("|----------|--------|-------|") | |
| for table in tables: | |
| db = table["database"] | |
| sch = table["schema"] | |
| tbl = table["table"] | |
| print(f"| {db} | {sch} | {tbl} |") | |
| else: | |
| print("No tables found.") | |
| def _attach_postgres( | |
| conn: duckdb.DuckDBPyConnection, | |
| pg_url: str, | |
| *, | |
| alias: str = "postgres", | |
| ) -> None: | |
| """ | |
| Attach a PostgreSQL database to a DuckDB connection. | |
| Parameters | |
| ---------- | |
| conn : duckdb.DuckDBPyConnection | |
| Active DuckDB connection. | |
| pg_url : str | |
| PostgreSQL connection URL. | |
| alias : str, optional | |
| Alias for the attached database (default is "postgres"). | |
| """ | |
| logger.debug("Installing and loading DuckDB PostgreSQL extension...") | |
| conn.execute("INSTALL postgres;") | |
| conn.execute("LOAD postgres;") | |
| logger.debug("Attaching to PostgreSQL database...") | |
| conn.execute(f"ATTACH '{pg_url}' AS {alias} (TYPE POSTGRES);") | |
| def _get_tables( | |
| conn: duckdb.DuckDBPyConnection, wheres: list[str] | None = None | |
| ) -> list[TableInfo]: | |
| """ | |
| Retrieve table information from the attached PostgreSQL database. | |
| Parameters | |
| ---------- | |
| conn : duckdb.DuckDBPyConnection | |
| Active DuckDB connection. | |
| wheres : list[str] | None | |
| Optional WHERE clauses combined with AND. | |
| Returns | |
| ------- | |
| list[TableInfo] | |
| List of TableInfo named tuples. | |
| """ | |
| base_query = ( | |
| "SELECT table_catalog AS database, " | |
| "table_schema AS schema, table_name AS table " | |
| "FROM postgres.information_schema.tables" | |
| ) | |
| where_clauses = ["table_type = 'BASE TABLE'"] | |
| if wheres: | |
| where_clauses.extend(wheres) | |
| if where_clauses: | |
| query = f"{base_query} WHERE {' AND '.join(where_clauses)};" | |
| else: | |
| query = f"{base_query};" | |
| logger.debug("Executing table retrieval query: %s", query) | |
| result = conn.execute(query).fetchall() | |
| tables = [TableInfo(database=row[0], schema=row[1], table=row[2]) for row in result] | |
| return tables | |
| def cli(): | |
| parser = argparse.ArgumentParser( | |
| description="Manage PostgreSQL and DuckDB databases." | |
| ) | |
| subparsers = parser.add_subparsers(dest="command", help="Command to run") | |
| list_parser = subparsers.add_parser("list", help="List all tables in PostgreSQL") | |
| list_parser.add_argument( | |
| "--pg-url", | |
| type=str, | |
| help="PostgreSQL connection URL (e.g., postgresql://user:pass@host:port/db)", | |
| ) | |
| list_parser.add_argument( | |
| "--format", | |
| type=str, | |
| choices=["markdown", "json"], | |
| default="markdown", | |
| help="Output format (default: markdown)", | |
| ) | |
| list_parser.add_argument( | |
| "--where", | |
| action="append", | |
| dest="wheres", | |
| help=( | |
| "SQL WHERE clause to filter tables " | |
| "(can be used multiple times, combined with AND). " | |
| "Example: --where \"schema = 'public'\" " | |
| "--where \"table LIKE 'user%'\"" | |
| ), | |
| ) | |
| sync_parser = subparsers.add_parser("sync", help="Sync PostgreSQL tables to DuckDB") | |
| sync_parser.add_argument( | |
| "--pg-url", | |
| type=str, | |
| help="PostgreSQL connection URL (e.g., postgresql://user:pass@host:port/db)", | |
| ) | |
| sync_parser.add_argument( | |
| "--duckdb", | |
| type=str, | |
| default="local.duckdb", | |
| help="Path to the local DuckDB file (default: local.duckdb)", | |
| ) | |
| sync_parser.add_argument( | |
| "--on-exists", | |
| type=str, | |
| choices=["error", "ignore", "replace"], | |
| default="error", | |
| help="How to handle tables that already exist in DuckDB (default: error)", | |
| ) | |
| sync_parser.add_argument( | |
| "--where", | |
| action="append", | |
| dest="wheres", | |
| help=( | |
| "SQL WHERE clause to filter tables " | |
| "(can be used multiple times, combined with AND). " | |
| "Example: --where \"schema = 'public'\" " | |
| "--where \"table LIKE 'user%'\"" | |
| ), | |
| ) | |
| args = parser.parse_args() | |
| # Load .env file | |
| load_dotenv() | |
| # Configure logging for CLI runs | |
| logging.basicConfig(level=logging.INFO) | |
| if not args.command: | |
| parser.print_help() | |
| return | |
| if args.command == "sync": | |
| pg_url = args.pg_url or os.getenv("PG_URL") | |
| if not pg_url: | |
| logger.error( | |
| "PostgreSQL URL not provided. Use --pg-url or set PG_URL env var." | |
| ) | |
| return | |
| logger.debug("Starting sync CLI") | |
| sync( | |
| pg_url=pg_url, | |
| duckdb_path=args.duckdb, | |
| on_exists=args.on_exists, | |
| wheres=args.wheres, | |
| ) | |
| elif args.command == "list": | |
| pg_url = args.pg_url or os.getenv("PG_URL") | |
| if not pg_url: | |
| logger.error( | |
| "PostgreSQL URL not provided. Use --pg-url or set PG_URL env var." | |
| ) | |
| return | |
| logger.debug("Listing tables") | |
| list_tables(pg_url=pg_url, format=args.format, wheres=args.wheres) | |
| if __name__ == "__main__": | |
| cli() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment