Created
February 28, 2026 22:11
-
-
Save mdrakiburrahman/ae17342a35f5e2cd615600f89be25708 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Fabric Lakehouse ODBC query runner. | |
| Loads query definitions from a YAML file, builds a WHERE clause from caller- | |
| supplied scope (months, services, teams, severities), and executes queries | |
| in parallel via ThreadPoolExecutor — all sharing a single ODBC connection | |
| (``ReuseSession=true``) so only one Livy session is created. | |
| A warm-up ``SELECT 1`` runs first to ensure the Livy session is alive. | |
| Then all real queries fire in parallel using cursors from the same connection. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import threading | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from typing import Any | |
| import pandas as pd | |
| import pyodbc | |
| import yaml | |
| from incident.client import PROD_FABRIC_WORKSPACE_ID, PROD_FABRIC_INCIDENT_LAKEHOUSE_ID | |
| log = logging.getLogger(__name__) | |
| # ── Scope (the caller owns this, NOT the YAML) ──────────────────────────── | |
| @dataclass | |
| class QueryScope: | |
| """Defines *what* data to query — partition months, services, severities.""" | |
| months: list[str] | |
| service_names: list[str] = field(default_factory=lambda: ["Azure Hybrid Data"]) | |
| extra_team_names: list[str] = field(default_factory=list) | |
| severities: list[int] = field(default_factory=lambda: [3, 4]) | |
| @property | |
| def common_where(self) -> str: | |
| """Build the combined WHERE clause string.""" | |
| partition = f"YearMonth IN ({', '.join(repr(m) for m in self.months)})" | |
| svc_list = ", ".join(repr(s) for s in self.service_names) | |
| clauses = [f"OwningServiceName IN ({svc_list})"] | |
| if self.extra_team_names: | |
| team_list = ", ".join(repr(t) for t in self.extra_team_names) | |
| clauses.append(f"OwningTeamName IN ({team_list})") | |
| service = f"({' OR '.join(clauses)})" | |
| sev_list = ", ".join(str(s) for s in self.severities) | |
| severity = f"Severity IN ({sev_list})" | |
| return f"{partition} AND {service} AND {severity}" | |
| # ── Query definition (loaded from YAML) ─────────────────────────────────── | |
| @dataclass | |
| class QueryDefinition: | |
| """A single named query from the YAML file.""" | |
| name: str | |
| description: str | |
| output: str # CSV filename | |
| sql_template: str # contains {common_where} placeholder | |
| # ── Runner ───────────────────────────────────────────────────────────────── | |
| class FabricQueryRunner: | |
| """Executes query definitions against a Fabric Lakehouse via ODBC. | |
| Uses a single shared ODBC connection (``ReuseSession=true``) so Livy | |
| doesn't spin up multiple sessions. A threading lock serialises cursor | |
| creation to keep pyodbc's connection object safe across threads. | |
| """ | |
| def __init__( | |
| self, | |
| queries: list[QueryDefinition], | |
| scope: QueryScope, | |
| output_dir: Path, | |
| *, | |
| workspace_id: str = PROD_FABRIC_WORKSPACE_ID, | |
| lakehouse_id: str = PROD_FABRIC_INCIDENT_LAKEHOUSE_ID, | |
| max_workers: int = 6, | |
| ): | |
| self.queries = queries | |
| self.scope = scope | |
| self.output_dir = output_dir | |
| self.max_workers = max_workers | |
| self._connection_string = ( | |
| "DRIVER={Microsoft Fabric ODBC Driver};" | |
| f"WorkspaceId={workspace_id};" | |
| f"LakehouseId={lakehouse_id};" | |
| "AuthFlow=AZURE_CLI;" | |
| "ReuseSession=true;" | |
| "LogLevel=Info;" | |
| r"LogFile=C:\.temp\odbc_driver_debug.log;" | |
| ) | |
| self._conn: pyodbc.Connection | None = None | |
| self._conn_lock = threading.Lock() | |
| # ── Factory ──────────────────────────────────────────────────────────── | |
| @classmethod | |
| def from_yaml( | |
| cls, | |
| yaml_path: str | Path, | |
| scope: QueryScope, | |
| *, | |
| max_workers: int = 6, | |
| **kwargs: Any, | |
| ) -> FabricQueryRunner: | |
| """Load query definitions from a YAML file.""" | |
| yaml_path = Path(yaml_path) | |
| with open(yaml_path) as f: | |
| cfg = yaml.safe_load(f) | |
| queries = [] | |
| for name, qdef in cfg["queries"].items(): | |
| queries.append( | |
| QueryDefinition( | |
| name=name, | |
| description=qdef.get("description", name), | |
| output=qdef["output"], | |
| sql_template=qdef["sql"], | |
| ) | |
| ) | |
| out_rel = cfg.get("output_dir", ".temp") | |
| output_dir = yaml_path.parent / out_rel | |
| return cls(queries, scope, output_dir, max_workers=max_workers, **kwargs) | |
| # ── Connection management ──────────────────────────────────────────── | |
| def _get_conn(self) -> pyodbc.Connection: | |
| """Return the shared connection, creating it on first call.""" | |
| if self._conn is None: | |
| self._conn = pyodbc.connect(self._connection_string, timeout=120) | |
| return self._conn | |
| def close(self) -> None: | |
| """Close the shared connection (if open).""" | |
| if self._conn is not None: | |
| self._conn.close() | |
| self._conn = None | |
| # ── Execution ────────────────────────────────────────────────────────── | |
| def _warmup(self) -> None: | |
| """Run SELECT 1 on the shared connection to wake Livy up.""" | |
| log.info("Warming up Livy session (SELECT 1)...") | |
| t0 = time.time() | |
| conn = self._get_conn() | |
| cur = conn.cursor() | |
| cur.execute("SELECT 1") | |
| cur.fetchall() | |
| cur.close() | |
| log.info("Livy warm — ready in %.1fs", time.time() - t0) | |
| def _execute_one(self, qdef: QueryDefinition) -> tuple[str, pd.DataFrame, float]: | |
| """Run a single query using a cursor from the shared connection.""" | |
| sql = qdef.sql_template.format(common_where=self.scope.common_where) | |
| t0 = time.time() | |
| log.info("[%s] Executing: %s", qdef.name, qdef.description) | |
| with self._conn_lock: | |
| cur = self._get_conn().cursor() | |
| cur.execute(sql) | |
| cols = [d[0] for d in cur.description] | |
| rows = cur.fetchall() | |
| df = pd.DataFrame.from_records([tuple(r) for r in rows], columns=cols) | |
| cur.close() | |
| elapsed = time.time() - t0 | |
| log.info("[%s] ✅ %d rows in %.0fs", qdef.name, len(df), elapsed) | |
| return qdef.name, df, elapsed | |
| def run(self) -> dict[str, pd.DataFrame]: | |
| """Warm up Livy, then execute all queries in parallel. Returns name→DataFrame.""" | |
| self.output_dir.mkdir(parents=True, exist_ok=True) | |
| # Phase 1: warm up Livy so the session is alive | |
| self._warmup() | |
| # Phase 2: fire all queries in parallel (shared connection, one Livy session) | |
| total_t0 = time.time() | |
| results: dict[str, pd.DataFrame] = {} | |
| errors: dict[str, str] = {} | |
| total_queries = len(self.queries) | |
| completed_count = 0 | |
| log.info("Invoking %d queries in parallel (max_workers=%d)...", total_queries, self.max_workers) | |
| try: | |
| with ThreadPoolExecutor(max_workers=self.max_workers) as pool: | |
| futures = { | |
| pool.submit(self._execute_one, qdef): qdef.name | |
| for qdef in self.queries | |
| } | |
| for future in as_completed(futures): | |
| name = futures[future] | |
| completed_count += 1 | |
| remaining = total_queries - completed_count | |
| try: | |
| _, df, elapsed = future.result() | |
| out_path = self.output_dir / next( | |
| q.output for q in self.queries if q.name == name | |
| ) | |
| df.to_csv(out_path, index=False) | |
| results[name] = df | |
| log.info( | |
| "Progress: %d/%d completed, %d remaining [%s ✅]", | |
| completed_count, total_queries, remaining, name, | |
| ) | |
| except Exception as exc: | |
| log.error("[%s] ❌ FAILED: %s", name, exc) | |
| log.info( | |
| "Progress: %d/%d completed, %d remaining [%s ❌]", | |
| completed_count, total_queries, remaining, name, | |
| ) | |
| errors[name] = str(exc) | |
| finally: | |
| self.close() | |
| log.info( | |
| "All %d queries complete in %.0fs (%d succeeded, %d failed)", | |
| len(self.queries), | |
| time.time() - total_t0, | |
| len(results), | |
| len(errors), | |
| ) | |
| if errors: | |
| raise RuntimeError( | |
| f"{len(errors)} queries failed: " | |
| + ", ".join(f"{n}: {e}" for n, e in errors.items()) | |
| ) | |
| return results |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment