Skip to content

Instantly share code, notes, and snippets.

@NickCrews
Last active January 14, 2026 17:22
Show Gist options
  • Select an option

  • Save NickCrews/cb2ff440bcf34f951bb245a239368eb6 to your computer and use it in GitHub Desktop.

Select an option

Save NickCrews/cb2ff440bcf34f951bb245a239368eb6 to your computer and use it in GitHub Desktop.
Usage `uv run https://gist.github.com/NickCrews/cb2ff440bcf34f951bb245a239368eb6 sync --pg-url 'postgresql://user:pass@host:port/db' --where "schema IN ('crm','ai','districts','users',) and table_name <> 'donations_raw'" --on-exists=ignore`. This is very susceptible to SQL injection acttacks with the --where option, use with caution!
"""
Inspect and sync PostgreSQL databases with DuckDB.
This is very susceptible to SQL injection acttacks with the --where option, use with caution!
"""
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "duckdb",
# "python-dotenv",
# ]
# ///
import argparse
import json
import logging
import os
from typing import Literal, TypedDict
import duckdb
from dotenv import load_dotenv
logger = logging.getLogger(__name__)
class TableInfo(TypedDict):
database: str
schema: str
table: str
def sync(
pg_url: str,
duckdb_path: str = "local.duckdb",
*,
on_exists: Literal["error", "ignore", "replace"] = "error",
wheres: list[str] | None = None,
):
"""
Syncs tables from a PostgreSQL database to a local DuckDB file.
Args:
pg_url: PostgreSQL connection URL.
duckdb_path: Path to the local DuckDB file.
on_exists: How to handle tables that already exist in DuckDB
('error', 'ignore', 'replace').
wheres: SQL WHERE clauses to filter tables (combined with AND).
"""
logger.debug("Connecting to DuckDB: %s", duckdb_path)
duckdb_conn = duckdb.connect(database=duckdb_path)
_attach_postgres(duckdb_conn, pg_url)
tables = _get_tables(duckdb_conn, wheres=wheres)
logger.debug("Found %d tables in PostgreSQL: %s", len(tables), tables)
for table in tables:
logger.debug("Processing table: %s", table["table"])
if on_exists == "error":
create_table_clause = "CREATE TABLE"
elif on_exists == "ignore":
create_table_clause = "CREATE TABLE IF NOT EXISTS"
elif on_exists == "replace":
create_table_clause = "CREATE OR REPLACE TABLE"
else:
raise ValueError(f"Invalid on_exists_mode: {on_exists}")
# Sync the table from PostgreSQL to DuckDB
logger.info(
"""Creating table "%s"."%s" in DuckDB from PostgreSQL...""",
table["schema"],
table["table"],
)
duckdb_conn.execute(f"""CREATE SCHEMA IF NOT EXISTS "{table["schema"]}";""")
duckdb_conn.execute(
f'{create_table_clause} "{table["schema"]}"."{table["table"]}" AS '
f'SELECT * FROM postgres.{table["schema"]}."{table["table"]}";'
)
logger.info(
'Table "%s"."%s" synced successfully.',
table["schema"],
table["table"],
)
def list_tables(
pg_url: str,
*,
format: Literal["markdown", "json"] = "markdown",
wheres: list[str] | None = None,
):
"""
Lists all tables in the PostgreSQL database.
Args:
pg_url: PostgreSQL connection URL.
format: Output format ('markdown' or 'json').
wheres: SQL WHERE clauses to filter tables (combined with AND).
"""
duckdb_conn = duckdb.connect(":memory:")
_attach_postgres(duckdb_conn, pg_url)
tables = _get_tables(duckdb_conn, wheres=wheres)
if format == "json":
print(json.dumps(tables, indent=2))
else:
if tables:
print("| database | schema | table |")
print("|----------|--------|-------|")
for table in tables:
db = table["database"]
sch = table["schema"]
tbl = table["table"]
print(f"| {db} | {sch} | {tbl} |")
else:
print("No tables found.")
def _attach_postgres(
conn: duckdb.DuckDBPyConnection,
pg_url: str,
*,
alias: str = "postgres",
) -> None:
"""
Attach a PostgreSQL database to a DuckDB connection.
Parameters
----------
conn : duckdb.DuckDBPyConnection
Active DuckDB connection.
pg_url : str
PostgreSQL connection URL.
alias : str, optional
Alias for the attached database (default is "postgres").
"""
logger.debug("Installing and loading DuckDB PostgreSQL extension...")
conn.execute("INSTALL postgres;")
conn.execute("LOAD postgres;")
logger.debug("Attaching to PostgreSQL database...")
conn.execute(f"ATTACH '{pg_url}' AS {alias} (TYPE POSTGRES);")
def _get_tables(
conn: duckdb.DuckDBPyConnection, wheres: list[str] | None = None
) -> list[TableInfo]:
"""
Retrieve table information from the attached PostgreSQL database.
Parameters
----------
conn : duckdb.DuckDBPyConnection
Active DuckDB connection.
wheres : list[str] | None
Optional WHERE clauses combined with AND.
Returns
-------
list[TableInfo]
List of TableInfo named tuples.
"""
base_query = (
"SELECT table_catalog AS database, "
"table_schema AS schema, table_name AS table "
"FROM postgres.information_schema.tables"
)
where_clauses = ["table_type = 'BASE TABLE'"]
if wheres:
where_clauses.extend(wheres)
if where_clauses:
query = f"{base_query} WHERE {' AND '.join(where_clauses)};"
else:
query = f"{base_query};"
logger.debug("Executing table retrieval query: %s", query)
result = conn.execute(query).fetchall()
tables = [TableInfo(database=row[0], schema=row[1], table=row[2]) for row in result]
return tables
def cli():
parser = argparse.ArgumentParser(
description="Manage PostgreSQL and DuckDB databases."
)
subparsers = parser.add_subparsers(dest="command", help="Command to run")
list_parser = subparsers.add_parser("list", help="List all tables in PostgreSQL")
list_parser.add_argument(
"--pg-url",
type=str,
help="PostgreSQL connection URL (e.g., postgresql://user:pass@host:port/db)",
)
list_parser.add_argument(
"--format",
type=str,
choices=["markdown", "json"],
default="markdown",
help="Output format (default: markdown)",
)
list_parser.add_argument(
"--where",
action="append",
dest="wheres",
help=(
"SQL WHERE clause to filter tables "
"(can be used multiple times, combined with AND). "
"Example: --where \"schema = 'public'\" "
"--where \"table LIKE 'user%'\""
),
)
sync_parser = subparsers.add_parser("sync", help="Sync PostgreSQL tables to DuckDB")
sync_parser.add_argument(
"--pg-url",
type=str,
help="PostgreSQL connection URL (e.g., postgresql://user:pass@host:port/db)",
)
sync_parser.add_argument(
"--duckdb",
type=str,
default="local.duckdb",
help="Path to the local DuckDB file (default: local.duckdb)",
)
sync_parser.add_argument(
"--on-exists",
type=str,
choices=["error", "ignore", "replace"],
default="error",
help="How to handle tables that already exist in DuckDB (default: error)",
)
sync_parser.add_argument(
"--where",
action="append",
dest="wheres",
help=(
"SQL WHERE clause to filter tables "
"(can be used multiple times, combined with AND). "
"Example: --where \"schema = 'public'\" "
"--where \"table LIKE 'user%'\""
),
)
args = parser.parse_args()
# Load .env file
load_dotenv()
# Configure logging for CLI runs
logging.basicConfig(level=logging.INFO)
if not args.command:
parser.print_help()
return
if args.command == "sync":
pg_url = args.pg_url or os.getenv("PG_URL")
if not pg_url:
logger.error(
"PostgreSQL URL not provided. Use --pg-url or set PG_URL env var."
)
return
logger.debug("Starting sync CLI")
sync(
pg_url=pg_url,
duckdb_path=args.duckdb,
on_exists=args.on_exists,
wheres=args.wheres,
)
elif args.command == "list":
pg_url = args.pg_url or os.getenv("PG_URL")
if not pg_url:
logger.error(
"PostgreSQL URL not provided. Use --pg-url or set PG_URL env var."
)
return
logger.debug("Listing tables")
list_tables(pg_url=pg_url, format=args.format, wheres=args.wheres)
if __name__ == "__main__":
cli()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment