Created
January 14, 2026 07:28
-
-
Save nsa-yoda/4e905e653f04a0dfef6c19538ee12a72 to your computer and use it in GitHub Desktop.
Bulk import CSV -> D1 via Cloudflare D1 REST API (/import).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Bulk import CSV -> D1 via Cloudflare D1 REST API (/import). | |
| Flow per chunk: | |
| 1) POST /import {action:"init", etag:<md5>} | |
| 2) PUT upload_url (raw SQL file contents) | |
| 3) POST /import {action:"ingest", etag:<md5>, filename:<filename>} | |
| 4) POST /import {action:"poll", current_bookmark:<bookmark>} until done | |
| Ref: Cloudflare tutorial + API reference. [oai_citation:1‡Cloudflare Docs](https://developers.cloudflare.com/d1/tutorials/import-to-d1-with-rest-api/) | |
| """ | |
| from __future__ import annotations | |
| import csv | |
| import hashlib | |
| import os | |
| import tempfile | |
| import time | |
| from dataclasses import dataclass | |
| from typing import Dict, Iterable, List, Sequence, Tuple | |
| import requests | |
| # ========================= | |
| # Update these constants | |
| # ========================= | |
| ACCOUNT_ID = "YOUR_ACCOUNT_ID" | |
| DATABASE_ID = "YOUR_DATABASE_ID" # UUID | |
| API_TOKEN = "YOUR_CLOUDFLARE_API_TOKEN" | |
| INPUT_CSV_PATH = "./input.csv" | |
| TARGET_TABLE = "your_table_name" | |
| CHUNK_SIZE = 1000 | |
| CSV_DELIMITER = "," | |
| CSV_ENCODING = "utf-8" | |
| # If your CSV has a header row (recommended) | |
| CSV_HAS_HEADER = True | |
| # Optional: skip these columns when generating SQL (e.g., autoincrement id) | |
| SKIP_COLUMNS: Tuple[str, ...] = () | |
| # Poll settings | |
| POLL_INTERVAL_SECONDS = 1.0 | |
| POLL_TIMEOUT_SECONDS = 60 * 10 # 10 minutes per chunk (tune as needed) | |
| # ========================= | |
| # Implementation | |
| # ========================= | |
| D1_IMPORT_URL = f"https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/d1/database/{DATABASE_ID}/import" | |
| @dataclass(frozen=True) | |
| class D1InitResult: | |
| upload_url: str | |
| filename: str | |
| def _headers_json() -> Dict[str, str]: | |
| return { | |
| "Content-Type": "application/json", | |
| "Authorization": f"Bearer {API_TOKEN}", | |
| } | |
| def _md5_hex(data: bytes) -> str: | |
| return hashlib.md5(data).hexdigest() | |
| def _sql_escape(value: str) -> str: | |
| """ | |
| Basic SQL single-quote escaping for SQLite: | |
| - NULL for empty strings? (here: keep empty string as '') | |
| - Escape single quotes by doubling them. | |
| """ | |
| return value.replace("'", "''") | |
| def make_insert_sql(table: str, rows: Sequence[Dict[str, str]], skip_columns: Sequence[str] = ()) -> str: | |
| if not rows: | |
| return "" | |
| skip = set(skip_columns) | |
| # Determine columns from the first row, preserving order. | |
| cols = [c for c in rows[0].keys() if c not in skip] | |
| if not cols: | |
| raise ValueError("After applying SKIP_COLUMNS, no columns remain to insert.") | |
| col_list = ",".join(cols) | |
| values_sql_parts: List[str] = [] | |
| for r in rows: | |
| vals: List[str] = [] | |
| for c in cols: | |
| v = r.get(c, "") | |
| # Treat blank as NULL? If you want that behavior, switch this on: | |
| # if v is None or v == "": | |
| # vals.append("NULL") | |
| # else: | |
| vals.append("'" + _sql_escape(str(v)) + "'") | |
| values_sql_parts.append("(" + ",".join(vals) + ")") | |
| return f"INSERT INTO {table} ({col_list}) VALUES " + ",".join(values_sql_parts) + ";\n" | |
| def d1_import_init(etag_hex: str) -> D1InitResult: | |
| resp = requests.post( | |
| D1_IMPORT_URL, | |
| headers=_headers_json(), | |
| json={"action": "init", "etag": etag_hex}, | |
| timeout=60, | |
| ) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if not data.get("success", False): | |
| raise RuntimeError(f"init failed: {data}") | |
| result = data.get("result") or {} | |
| upload_url = result.get("upload_url") | |
| filename = result.get("filename") | |
| if not upload_url or not filename: | |
| raise RuntimeError(f"init missing upload_url/filename: {data}") | |
| return D1InitResult(upload_url=upload_url, filename=filename) | |
| def d1_upload_to_r2(upload_url: str, sql_bytes: bytes, expected_etag: str) -> None: | |
| # Upload is a raw PUT to the pre-signed URL. [oai_citation:2‡Cloudflare Docs](https://developers.cloudflare.com/d1/tutorials/import-to-d1-with-rest-api/) | |
| r2_resp = requests.put(upload_url, data=sql_bytes, timeout=120) | |
| r2_resp.raise_for_status() | |
| # Cloudflare tutorial verifies ETag. [oai_citation:3‡Cloudflare Docs](https://developers.cloudflare.com/d1/tutorials/import-to-d1-with-rest-api/) | |
| got_etag = (r2_resp.headers.get("ETag") or "").replace('"', "") | |
| if got_etag and got_etag != expected_etag: | |
| raise RuntimeError(f"ETag mismatch (expected {expected_etag}, got {got_etag})") | |
| def d1_import_ingest(etag_hex: str, filename: str) -> str: | |
| resp = requests.post( | |
| D1_IMPORT_URL, | |
| headers=_headers_json(), | |
| json={"action": "ingest", "etag": etag_hex, "filename": filename}, | |
| timeout=60, | |
| ) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if not data.get("success", False): | |
| raise RuntimeError(f"ingest failed: {data}") | |
| result = data.get("result") or {} | |
| bookmark = result.get("at_bookmark") | |
| if not bookmark: | |
| raise RuntimeError(f"ingest missing at_bookmark: {data}") | |
| return bookmark | |
| def d1_import_poll(bookmark: str, timeout_seconds: float) -> None: | |
| """ | |
| Poll until import completes. [oai_citation:4‡Cloudflare Docs](https://developers.cloudflare.com/d1/tutorials/import-to-d1-with-rest-api/) | |
| """ | |
| deadline = time.time() + timeout_seconds | |
| payload = {"action": "poll", "current_bookmark": bookmark} | |
| while True: | |
| if time.time() > deadline: | |
| raise TimeoutError("poll timed out waiting for D1 import to complete") | |
| resp = requests.post( | |
| D1_IMPORT_URL, | |
| headers=_headers_json(), | |
| json=payload, | |
| timeout=60, | |
| ) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if not data.get("success", False): | |
| raise RuntimeError(f"poll failed: {data}") | |
| result = data.get("result") or {} | |
| # Tutorial checks `success` and also breaks on "Not currently importing anything." [oai_citation:5‡Cloudflare Docs](https://developers.cloudflare.com/d1/tutorials/import-to-d1-with-rest-api/) | |
| success = bool(result.get("success")) | |
| error = result.get("error") | |
| if success: | |
| return | |
| if (not success) and error == "Not currently importing anything.": | |
| return | |
| time.sleep(POLL_INTERVAL_SECONDS) | |
| def iter_csv_chunks(path: str, chunk_size: int) -> Iterable[List[Dict[str, str]]]: | |
| with open(path, "r", encoding=CSV_ENCODING, newline="") as f: | |
| reader = csv.reader(f, delimiter=CSV_DELIMITER) | |
| header: List[str] | |
| if CSV_HAS_HEADER: | |
| header = next(reader) | |
| else: | |
| # If no header, create generic columns: c0,c1,... | |
| first_row = next(reader) | |
| header = [f"c{i}" for i in range(len(first_row))] | |
| # yield the first row as part of first chunk | |
| chunk = [dict(zip(header, first_row))] | |
| for row in reader: | |
| chunk.append(dict(zip(header, row))) | |
| if len(chunk) >= chunk_size: | |
| yield chunk | |
| chunk = [] | |
| if chunk: | |
| yield chunk | |
| return | |
| chunk: List[Dict[str, str]] = [] | |
| for row in reader: | |
| chunk.append(dict(zip(header, row))) | |
| if len(chunk) >= chunk_size: | |
| yield chunk | |
| chunk = [] | |
| if chunk: | |
| yield chunk | |
| def main() -> None: | |
| if not os.path.exists(INPUT_CSV_PATH): | |
| raise FileNotFoundError(f"INPUT_CSV_PATH not found: {INPUT_CSV_PATH}") | |
| if not (ACCOUNT_ID and DATABASE_ID and API_TOKEN and TARGET_TABLE): | |
| raise ValueError("Set ACCOUNT_ID, DATABASE_ID, API_TOKEN, and TARGET_TABLE at the top of the file.") | |
| total = 0 | |
| chunk_idx = 0 | |
| for rows in iter_csv_chunks(INPUT_CSV_PATH, CHUNK_SIZE): | |
| chunk_idx += 1 | |
| total += len(rows) | |
| sql_text = make_insert_sql(TARGET_TABLE, rows, SKIP_COLUMNS) | |
| sql_bytes = sql_text.encode("utf-8") | |
| etag_hex = _md5_hex(sql_bytes) | |
| # Write chunk to a temp .sql file (as requested) | |
| tmp_path = None | |
| try: | |
| with tempfile.NamedTemporaryFile(mode="wb", suffix=".sql", delete=False) as tmp: | |
| tmp.write(sql_bytes) | |
| tmp_path = tmp.name | |
| print(f"[chunk {chunk_idx}] rows={len(rows)} tmp={tmp_path} md5={etag_hex}") | |
| # 1) init | |
| init_res = d1_import_init(etag_hex) | |
| # 2) PUT upload_url | |
| # (Read from the temp file to honor your "write temp file then upload" requirement) | |
| with open(tmp_path, "rb") as r: | |
| file_bytes = r.read() | |
| d1_upload_to_r2(init_res.upload_url, file_bytes, expected_etag=etag_hex) | |
| # 3) ingest | |
| bookmark = d1_import_ingest(etag_hex, init_res.filename) | |
| # 4) poll | |
| d1_import_poll(bookmark, timeout_seconds=POLL_TIMEOUT_SECONDS) | |
| print(f"[chunk {chunk_idx}] import complete ✅ (cumulative rows={total})") | |
| finally: | |
| if tmp_path and os.path.exists(tmp_path): | |
| try: | |
| os.remove(tmp_path) | |
| except OSError: | |
| pass | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment