Skip to content

Instantly share code, notes, and snippets.

@nsa-yoda
Created January 14, 2026 07:28
Show Gist options
  • Select an option

  • Save nsa-yoda/4e905e653f04a0dfef6c19538ee12a72 to your computer and use it in GitHub Desktop.

Select an option

Save nsa-yoda/4e905e653f04a0dfef6c19538ee12a72 to your computer and use it in GitHub Desktop.
Bulk import CSV -> D1 via Cloudflare D1 REST API (/import).
#!/usr/bin/env python3
"""
Bulk import CSV -> D1 via Cloudflare D1 REST API (/import).
Flow per chunk:
1) POST /import {action:"init", etag:<md5>}
2) PUT upload_url (raw SQL file contents)
3) POST /import {action:"ingest", etag:<md5>, filename:<filename>}
4) POST /import {action:"poll", current_bookmark:<bookmark>} until done
Ref: Cloudflare tutorial + API reference. [oai_citation:1‡Cloudflare Docs](https://developers.cloudflare.com/d1/tutorials/import-to-d1-with-rest-api/)
"""
from __future__ import annotations
import csv
import hashlib
import os
import tempfile
import time
from dataclasses import dataclass
from typing import Dict, Iterable, List, Sequence, Tuple
import requests
# =========================
# Update these constants
# =========================
ACCOUNT_ID = "YOUR_ACCOUNT_ID"
DATABASE_ID = "YOUR_DATABASE_ID" # UUID
API_TOKEN = "YOUR_CLOUDFLARE_API_TOKEN"
INPUT_CSV_PATH = "./input.csv"
TARGET_TABLE = "your_table_name"
CHUNK_SIZE = 1000
CSV_DELIMITER = ","
CSV_ENCODING = "utf-8"
# If your CSV has a header row (recommended)
CSV_HAS_HEADER = True
# Optional: skip these columns when generating SQL (e.g., autoincrement id)
SKIP_COLUMNS: Tuple[str, ...] = ()
# Poll settings
POLL_INTERVAL_SECONDS = 1.0
POLL_TIMEOUT_SECONDS = 60 * 10 # 10 minutes per chunk (tune as needed)
# =========================
# Implementation
# =========================
D1_IMPORT_URL = f"https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/d1/database/{DATABASE_ID}/import"
@dataclass(frozen=True)
class D1InitResult:
upload_url: str
filename: str
def _headers_json() -> Dict[str, str]:
return {
"Content-Type": "application/json",
"Authorization": f"Bearer {API_TOKEN}",
}
def _md5_hex(data: bytes) -> str:
return hashlib.md5(data).hexdigest()
def _sql_escape(value: str) -> str:
"""
Basic SQL single-quote escaping for SQLite:
- NULL for empty strings? (here: keep empty string as '')
- Escape single quotes by doubling them.
"""
return value.replace("'", "''")
def make_insert_sql(table: str, rows: Sequence[Dict[str, str]], skip_columns: Sequence[str] = ()) -> str:
if not rows:
return ""
skip = set(skip_columns)
# Determine columns from the first row, preserving order.
cols = [c for c in rows[0].keys() if c not in skip]
if not cols:
raise ValueError("After applying SKIP_COLUMNS, no columns remain to insert.")
col_list = ",".join(cols)
values_sql_parts: List[str] = []
for r in rows:
vals: List[str] = []
for c in cols:
v = r.get(c, "")
# Treat blank as NULL? If you want that behavior, switch this on:
# if v is None or v == "":
# vals.append("NULL")
# else:
vals.append("'" + _sql_escape(str(v)) + "'")
values_sql_parts.append("(" + ",".join(vals) + ")")
return f"INSERT INTO {table} ({col_list}) VALUES " + ",".join(values_sql_parts) + ";\n"
def d1_import_init(etag_hex: str) -> D1InitResult:
resp = requests.post(
D1_IMPORT_URL,
headers=_headers_json(),
json={"action": "init", "etag": etag_hex},
timeout=60,
)
resp.raise_for_status()
data = resp.json()
if not data.get("success", False):
raise RuntimeError(f"init failed: {data}")
result = data.get("result") or {}
upload_url = result.get("upload_url")
filename = result.get("filename")
if not upload_url or not filename:
raise RuntimeError(f"init missing upload_url/filename: {data}")
return D1InitResult(upload_url=upload_url, filename=filename)
def d1_upload_to_r2(upload_url: str, sql_bytes: bytes, expected_etag: str) -> None:
# Upload is a raw PUT to the pre-signed URL. [oai_citation:2‡Cloudflare Docs](https://developers.cloudflare.com/d1/tutorials/import-to-d1-with-rest-api/)
r2_resp = requests.put(upload_url, data=sql_bytes, timeout=120)
r2_resp.raise_for_status()
# Cloudflare tutorial verifies ETag. [oai_citation:3‡Cloudflare Docs](https://developers.cloudflare.com/d1/tutorials/import-to-d1-with-rest-api/)
got_etag = (r2_resp.headers.get("ETag") or "").replace('"', "")
if got_etag and got_etag != expected_etag:
raise RuntimeError(f"ETag mismatch (expected {expected_etag}, got {got_etag})")
def d1_import_ingest(etag_hex: str, filename: str) -> str:
resp = requests.post(
D1_IMPORT_URL,
headers=_headers_json(),
json={"action": "ingest", "etag": etag_hex, "filename": filename},
timeout=60,
)
resp.raise_for_status()
data = resp.json()
if not data.get("success", False):
raise RuntimeError(f"ingest failed: {data}")
result = data.get("result") or {}
bookmark = result.get("at_bookmark")
if not bookmark:
raise RuntimeError(f"ingest missing at_bookmark: {data}")
return bookmark
def d1_import_poll(bookmark: str, timeout_seconds: float) -> None:
"""
Poll until import completes. [oai_citation:4‡Cloudflare Docs](https://developers.cloudflare.com/d1/tutorials/import-to-d1-with-rest-api/)
"""
deadline = time.time() + timeout_seconds
payload = {"action": "poll", "current_bookmark": bookmark}
while True:
if time.time() > deadline:
raise TimeoutError("poll timed out waiting for D1 import to complete")
resp = requests.post(
D1_IMPORT_URL,
headers=_headers_json(),
json=payload,
timeout=60,
)
resp.raise_for_status()
data = resp.json()
if not data.get("success", False):
raise RuntimeError(f"poll failed: {data}")
result = data.get("result") or {}
# Tutorial checks `success` and also breaks on "Not currently importing anything." [oai_citation:5‡Cloudflare Docs](https://developers.cloudflare.com/d1/tutorials/import-to-d1-with-rest-api/)
success = bool(result.get("success"))
error = result.get("error")
if success:
return
if (not success) and error == "Not currently importing anything.":
return
time.sleep(POLL_INTERVAL_SECONDS)
def iter_csv_chunks(path: str, chunk_size: int) -> Iterable[List[Dict[str, str]]]:
with open(path, "r", encoding=CSV_ENCODING, newline="") as f:
reader = csv.reader(f, delimiter=CSV_DELIMITER)
header: List[str]
if CSV_HAS_HEADER:
header = next(reader)
else:
# If no header, create generic columns: c0,c1,...
first_row = next(reader)
header = [f"c{i}" for i in range(len(first_row))]
# yield the first row as part of first chunk
chunk = [dict(zip(header, first_row))]
for row in reader:
chunk.append(dict(zip(header, row)))
if len(chunk) >= chunk_size:
yield chunk
chunk = []
if chunk:
yield chunk
return
chunk: List[Dict[str, str]] = []
for row in reader:
chunk.append(dict(zip(header, row)))
if len(chunk) >= chunk_size:
yield chunk
chunk = []
if chunk:
yield chunk
def main() -> None:
if not os.path.exists(INPUT_CSV_PATH):
raise FileNotFoundError(f"INPUT_CSV_PATH not found: {INPUT_CSV_PATH}")
if not (ACCOUNT_ID and DATABASE_ID and API_TOKEN and TARGET_TABLE):
raise ValueError("Set ACCOUNT_ID, DATABASE_ID, API_TOKEN, and TARGET_TABLE at the top of the file.")
total = 0
chunk_idx = 0
for rows in iter_csv_chunks(INPUT_CSV_PATH, CHUNK_SIZE):
chunk_idx += 1
total += len(rows)
sql_text = make_insert_sql(TARGET_TABLE, rows, SKIP_COLUMNS)
sql_bytes = sql_text.encode("utf-8")
etag_hex = _md5_hex(sql_bytes)
# Write chunk to a temp .sql file (as requested)
tmp_path = None
try:
with tempfile.NamedTemporaryFile(mode="wb", suffix=".sql", delete=False) as tmp:
tmp.write(sql_bytes)
tmp_path = tmp.name
print(f"[chunk {chunk_idx}] rows={len(rows)} tmp={tmp_path} md5={etag_hex}")
# 1) init
init_res = d1_import_init(etag_hex)
# 2) PUT upload_url
# (Read from the temp file to honor your "write temp file then upload" requirement)
with open(tmp_path, "rb") as r:
file_bytes = r.read()
d1_upload_to_r2(init_res.upload_url, file_bytes, expected_etag=etag_hex)
# 3) ingest
bookmark = d1_import_ingest(etag_hex, init_res.filename)
# 4) poll
d1_import_poll(bookmark, timeout_seconds=POLL_TIMEOUT_SECONDS)
print(f"[chunk {chunk_idx}] import complete ✅ (cumulative rows={total})")
finally:
if tmp_path and os.path.exists(tmp_path):
try:
os.remove(tmp_path)
except OSError:
pass
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment