Skip to content

Instantly share code, notes, and snippets.

@mdrakiburrahman
Created February 28, 2026 22:11
Show Gist options
  • Select an option

  • Save mdrakiburrahman/ae17342a35f5e2cd615600f89be25708 to your computer and use it in GitHub Desktop.

Select an option

Save mdrakiburrahman/ae17342a35f5e2cd615600f89be25708 to your computer and use it in GitHub Desktop.
"""Fabric Lakehouse ODBC query runner.
Loads query definitions from a YAML file, builds a WHERE clause from caller-
supplied scope (months, services, teams, severities), and executes queries
in parallel via ThreadPoolExecutor — all sharing a single ODBC connection
(``ReuseSession=true``) so only one Livy session is created.
A warm-up ``SELECT 1`` runs first to ensure the Livy session is alive.
Then all real queries fire in parallel using cursors from the same connection.
"""
from __future__ import annotations
import logging
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import pandas as pd
import pyodbc
import yaml
from incident.client import PROD_FABRIC_WORKSPACE_ID, PROD_FABRIC_INCIDENT_LAKEHOUSE_ID
log = logging.getLogger(__name__)
# ── Scope (the caller owns this, NOT the YAML) ────────────────────────────
@dataclass
class QueryScope:
"""Defines *what* data to query — partition months, services, severities."""
months: list[str]
service_names: list[str] = field(default_factory=lambda: ["Azure Hybrid Data"])
extra_team_names: list[str] = field(default_factory=list)
severities: list[int] = field(default_factory=lambda: [3, 4])
@property
def common_where(self) -> str:
"""Build the combined WHERE clause string."""
partition = f"YearMonth IN ({', '.join(repr(m) for m in self.months)})"
svc_list = ", ".join(repr(s) for s in self.service_names)
clauses = [f"OwningServiceName IN ({svc_list})"]
if self.extra_team_names:
team_list = ", ".join(repr(t) for t in self.extra_team_names)
clauses.append(f"OwningTeamName IN ({team_list})")
service = f"({' OR '.join(clauses)})"
sev_list = ", ".join(str(s) for s in self.severities)
severity = f"Severity IN ({sev_list})"
return f"{partition} AND {service} AND {severity}"
# ── Query definition (loaded from YAML) ───────────────────────────────────
@dataclass
class QueryDefinition:
"""A single named query from the YAML file."""
name: str
description: str
output: str # CSV filename
sql_template: str # contains {common_where} placeholder
# ── Runner ─────────────────────────────────────────────────────────────────
class FabricQueryRunner:
"""Executes query definitions against a Fabric Lakehouse via ODBC.
Uses a single shared ODBC connection (``ReuseSession=true``) so Livy
doesn't spin up multiple sessions. A threading lock serialises cursor
creation to keep pyodbc's connection object safe across threads.
"""
def __init__(
self,
queries: list[QueryDefinition],
scope: QueryScope,
output_dir: Path,
*,
workspace_id: str = PROD_FABRIC_WORKSPACE_ID,
lakehouse_id: str = PROD_FABRIC_INCIDENT_LAKEHOUSE_ID,
max_workers: int = 6,
):
self.queries = queries
self.scope = scope
self.output_dir = output_dir
self.max_workers = max_workers
self._connection_string = (
"DRIVER={Microsoft Fabric ODBC Driver};"
f"WorkspaceId={workspace_id};"
f"LakehouseId={lakehouse_id};"
"AuthFlow=AZURE_CLI;"
"ReuseSession=true;"
"LogLevel=Info;"
r"LogFile=C:\.temp\odbc_driver_debug.log;"
)
self._conn: pyodbc.Connection | None = None
self._conn_lock = threading.Lock()
# ── Factory ────────────────────────────────────────────────────────────
@classmethod
def from_yaml(
cls,
yaml_path: str | Path,
scope: QueryScope,
*,
max_workers: int = 6,
**kwargs: Any,
) -> FabricQueryRunner:
"""Load query definitions from a YAML file."""
yaml_path = Path(yaml_path)
with open(yaml_path) as f:
cfg = yaml.safe_load(f)
queries = []
for name, qdef in cfg["queries"].items():
queries.append(
QueryDefinition(
name=name,
description=qdef.get("description", name),
output=qdef["output"],
sql_template=qdef["sql"],
)
)
out_rel = cfg.get("output_dir", ".temp")
output_dir = yaml_path.parent / out_rel
return cls(queries, scope, output_dir, max_workers=max_workers, **kwargs)
# ── Connection management ────────────────────────────────────────────
def _get_conn(self) -> pyodbc.Connection:
"""Return the shared connection, creating it on first call."""
if self._conn is None:
self._conn = pyodbc.connect(self._connection_string, timeout=120)
return self._conn
def close(self) -> None:
"""Close the shared connection (if open)."""
if self._conn is not None:
self._conn.close()
self._conn = None
# ── Execution ──────────────────────────────────────────────────────────
def _warmup(self) -> None:
"""Run SELECT 1 on the shared connection to wake Livy up."""
log.info("Warming up Livy session (SELECT 1)...")
t0 = time.time()
conn = self._get_conn()
cur = conn.cursor()
cur.execute("SELECT 1")
cur.fetchall()
cur.close()
log.info("Livy warm — ready in %.1fs", time.time() - t0)
def _execute_one(self, qdef: QueryDefinition) -> tuple[str, pd.DataFrame, float]:
"""Run a single query using a cursor from the shared connection."""
sql = qdef.sql_template.format(common_where=self.scope.common_where)
t0 = time.time()
log.info("[%s] Executing: %s", qdef.name, qdef.description)
with self._conn_lock:
cur = self._get_conn().cursor()
cur.execute(sql)
cols = [d[0] for d in cur.description]
rows = cur.fetchall()
df = pd.DataFrame.from_records([tuple(r) for r in rows], columns=cols)
cur.close()
elapsed = time.time() - t0
log.info("[%s] ✅ %d rows in %.0fs", qdef.name, len(df), elapsed)
return qdef.name, df, elapsed
def run(self) -> dict[str, pd.DataFrame]:
"""Warm up Livy, then execute all queries in parallel. Returns name→DataFrame."""
self.output_dir.mkdir(parents=True, exist_ok=True)
# Phase 1: warm up Livy so the session is alive
self._warmup()
# Phase 2: fire all queries in parallel (shared connection, one Livy session)
total_t0 = time.time()
results: dict[str, pd.DataFrame] = {}
errors: dict[str, str] = {}
total_queries = len(self.queries)
completed_count = 0
log.info("Invoking %d queries in parallel (max_workers=%d)...", total_queries, self.max_workers)
try:
with ThreadPoolExecutor(max_workers=self.max_workers) as pool:
futures = {
pool.submit(self._execute_one, qdef): qdef.name
for qdef in self.queries
}
for future in as_completed(futures):
name = futures[future]
completed_count += 1
remaining = total_queries - completed_count
try:
_, df, elapsed = future.result()
out_path = self.output_dir / next(
q.output for q in self.queries if q.name == name
)
df.to_csv(out_path, index=False)
results[name] = df
log.info(
"Progress: %d/%d completed, %d remaining [%s ✅]",
completed_count, total_queries, remaining, name,
)
except Exception as exc:
log.error("[%s] ❌ FAILED: %s", name, exc)
log.info(
"Progress: %d/%d completed, %d remaining [%s ❌]",
completed_count, total_queries, remaining, name,
)
errors[name] = str(exc)
finally:
self.close()
log.info(
"All %d queries complete in %.0fs (%d succeeded, %d failed)",
len(self.queries),
time.time() - total_t0,
len(results),
len(errors),
)
if errors:
raise RuntimeError(
f"{len(errors)} queries failed: "
+ ", ".join(f"{n}: {e}" for n, e in errors.items())
)
return results
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment