Created
February 24, 2026 19:32
-
-
Save philerooski/7d78829e10a71a9b06b2ab268c758b37 to your computer and use it in GitHub Desktop.
A helper script to verify that data loaded by `load_snapshot_data.py` looks as expected
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Verify snapshot data load integrity. | |
| This script checks: | |
| 1. LOAD_LOG table for any errors or anomalies | |
| 2. Compares tables between PROD_576 and PROD_568 schemas | |
| 3. Validates record counts (PROD_576 should have more records) | |
| Usage: | |
| python verify_snapshot_load.py --database SYNAPSE_RDS_SNAPSHOT \\ | |
| --new-schema PROD_576 --old-schema PROD_568 | |
| """ | |
| import snowflake.connector | |
| import sys | |
| from typing import Dict, List, Tuple | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| @dataclass | |
| class TableStats: | |
| """Statistics for a table.""" | |
| name: str | |
| row_count: int | |
| exists: bool = True | |
| @dataclass | |
| class LoadLogEntry: | |
| """Entry from the LOAD_LOG table.""" | |
| prefix: str | |
| data_type: str | |
| stage_path: str | |
| phase: str | |
| status: str | |
| sql_text: str | |
| error_message: str | |
| log_ts: datetime | |
| class SnapshotVerifier: | |
| """Verify snapshot data load integrity.""" | |
| def __init__(self, database: str, new_schema: str, old_schema: str): | |
| self.database = database | |
| self.new_schema = new_schema | |
| self.old_schema = old_schema | |
| self.conn = None | |
| self.cursor = None | |
| def connect(self): | |
| """Connect to Snowflake.""" | |
| print("Connecting to Snowflake...") | |
| self.conn = snowflake.connector.connect() | |
| self.cursor = self.conn.cursor() | |
| self.cursor.execute(f"USE DATABASE {self.database}") | |
| print(f" ✓ Connected to database {self.database}\n") | |
| def check_load_log(self) -> Dict[str, any]: | |
| """ | |
| Check LOAD_LOG table for anomalies. | |
| Returns: | |
| Dictionary with load log statistics and any errors found | |
| """ | |
| print("=" * 70) | |
| print("CHECKING LOAD_LOG FOR ANOMALIES") | |
| print("=" * 70) | |
| self.cursor.execute(f"USE SCHEMA {self.new_schema}") | |
| # Check if LOAD_LOG exists by attempting to describe it | |
| try: | |
| self.cursor.execute("DESC TABLE LOAD_LOG") | |
| # Table exists if no exception was raised | |
| except Exception: | |
| print(" ⚠ LOAD_LOG table not found in schema") | |
| return {"exists": False} | |
| # Get summary statistics | |
| self.cursor.execute( | |
| """ | |
| SELECT | |
| COUNT(*) as total_entries, | |
| COUNT(DISTINCT DATA_TYPE) as unique_tables, | |
| COUNT(DISTINCT PHASE) as unique_phases, | |
| COUNT(DISTINCT STATUS) as unique_statuses | |
| FROM LOAD_LOG | |
| """ | |
| ) | |
| stats = self.cursor.fetchone() | |
| total_entries, unique_tables, unique_phases, unique_statuses = stats | |
| print(f"\nLoad Log Summary:") | |
| print(f" Total log entries: {total_entries}") | |
| print(f" Unique tables processed: {unique_tables}") | |
| print(f" Unique phases: {unique_phases}") | |
| print(f" Unique statuses: {unique_statuses}") | |
| # Check for failures | |
| self.cursor.execute( | |
| """ | |
| SELECT | |
| DATA_TYPE, | |
| PHASE, | |
| STATUS, | |
| ERROR_MESSAGE, | |
| LOG_TS | |
| FROM LOAD_LOG | |
| WHERE STATUS IN ('FAIL', 'FAILED') | |
| ORDER BY LOG_TS | |
| """ | |
| ) | |
| failures = self.cursor.fetchall() | |
| if failures: | |
| print(f"\n ✗ Found {len(failures)} FAILED operations:") | |
| for data_type, phase, status, error_msg, log_ts in failures: | |
| print(f"\n Table: {data_type}") | |
| print(f" Phase: {phase}") | |
| print(f" Status: {status}") | |
| print(f" Time: {log_ts}") | |
| print(f" Error: {error_msg}") | |
| else: | |
| print(f"\n ✓ No failed operations found") | |
| # Check completion status for each table | |
| self.cursor.execute( | |
| """ | |
| SELECT | |
| DATA_TYPE, | |
| COUNT(*) as log_entries, | |
| SUM(CASE WHEN STATUS = 'OK' THEN 1 ELSE 0 END) as successful_ops, | |
| SUM(CASE WHEN STATUS IN ('FAIL', 'FAILED') THEN 1 ELSE 0 END) as failed_ops, | |
| MAX(CASE WHEN PHASE = 'COPY_DATA' AND STATUS = 'OK' THEN 1 ELSE 0 END) as copy_completed | |
| FROM LOAD_LOG | |
| GROUP BY DATA_TYPE | |
| ORDER BY DATA_TYPE | |
| """ | |
| ) | |
| table_completion = self.cursor.fetchall() | |
| incomplete_tables = [] | |
| for ( | |
| data_type, | |
| log_entries, | |
| successful_ops, | |
| failed_ops, | |
| copy_completed, | |
| ) in table_completion: | |
| if copy_completed == 0: | |
| incomplete_tables.append(data_type) | |
| if incomplete_tables: | |
| print( | |
| f"\n ⚠ Tables without completed COPY_DATA operations ({len(incomplete_tables)}):" | |
| ) | |
| for table in incomplete_tables: | |
| print(f" - {table}") | |
| else: | |
| print( | |
| f"\n ✓ All {len(table_completion)} tables completed COPY_DATA successfully" | |
| ) | |
| return { | |
| "exists": True, | |
| "total_entries": total_entries, | |
| "unique_tables": unique_tables, | |
| "failures": len(failures), | |
| "incomplete_tables": incomplete_tables, | |
| } | |
| def get_table_list(self, schema: str) -> List[str]: | |
| """ | |
| Get list of all tables in a schema (excluding LOAD_LOG). | |
| Args: | |
| schema: Schema name to query | |
| Returns: | |
| List of table names | |
| """ | |
| self.cursor.execute( | |
| f""" | |
| SELECT TABLE_NAME | |
| FROM INFORMATION_SCHEMA.TABLES | |
| WHERE TABLE_SCHEMA = '{schema}' | |
| AND TABLE_TYPE = 'BASE TABLE' | |
| AND TABLE_NAME != 'LOAD_LOG' | |
| ORDER BY TABLE_NAME | |
| """ | |
| ) | |
| return [row[0] for row in self.cursor.fetchall()] | |
| def get_table_row_count(self, schema: str, table: str) -> int: | |
| """ | |
| Get row count for a specific table. | |
| Args: | |
| schema: Schema name | |
| table: Table name | |
| Returns: | |
| Number of rows in the table | |
| """ | |
| self.cursor.execute(f"SELECT COUNT(*) FROM {schema}.{table}") | |
| return self.cursor.fetchone()[0] | |
| def compare_schemas(self) -> Dict[str, any]: | |
| """ | |
| Compare tables between new and old schemas. | |
| Returns: | |
| Dictionary with comparison results | |
| """ | |
| print("\n" + "=" * 70) | |
| print("COMPARING SCHEMAS") | |
| print("=" * 70) | |
| print(f"\nGetting table lists...") | |
| new_tables = set(self.get_table_list(self.new_schema)) | |
| old_tables = set(self.get_table_list(self.old_schema)) | |
| print(f" {self.new_schema}: {len(new_tables)} tables") | |
| print(f" {self.old_schema}: {len(old_tables)} tables") | |
| # Find differences | |
| only_in_new = new_tables - old_tables | |
| only_in_old = old_tables - new_tables | |
| common_tables = new_tables & old_tables | |
| print(f"\n Common tables: {len(common_tables)}") | |
| if only_in_new: | |
| print(f"\n ⚠ Tables only in {self.new_schema} ({len(only_in_new)}):") | |
| for table in sorted(only_in_new): | |
| print(f" + {table}") | |
| if only_in_old: | |
| print(f"\n ⚠ Tables only in {self.old_schema} ({len(only_in_old)}):") | |
| for table in sorted(only_in_old): | |
| print(f" - {table}") | |
| if not only_in_new and not only_in_old: | |
| print(f" ✓ Both schemas have identical table sets") | |
| return { | |
| "new_schema_tables": len(new_tables), | |
| "old_schema_tables": len(old_tables), | |
| "common_tables": len(common_tables), | |
| "only_in_new": list(only_in_new), | |
| "only_in_old": list(only_in_old), | |
| "tables_to_compare": sorted(common_tables), | |
| } | |
| def compare_row_counts(self, tables: List[str]) -> Dict[str, any]: | |
| """ | |
| Compare row counts for common tables. | |
| Args: | |
| tables: List of table names to compare | |
| Returns: | |
| Dictionary with row count comparison results | |
| """ | |
| print("\n" + "=" * 70) | |
| print("COMPARING ROW COUNTS") | |
| print("=" * 70) | |
| results = [] | |
| tables_with_fewer_rows = [] | |
| tables_with_same_rows = [] | |
| tables_with_more_rows = [] | |
| for i, table in enumerate(tables, 1): | |
| print(f"\nChecking {i}/{len(tables)}: {table}") | |
| try: | |
| new_count = self.get_table_row_count(self.new_schema, table) | |
| old_count = self.get_table_row_count(self.old_schema, table) | |
| diff = new_count - old_count | |
| diff_pct = (diff / old_count * 100) if old_count > 0 else 0 | |
| print(f" {self.new_schema}: {new_count:,} rows") | |
| print(f" {self.old_schema}: {old_count:,} rows") | |
| print(f" Difference: {diff:+,} ({diff_pct:+.2f}%)") | |
| result = { | |
| "table": table, | |
| "new_count": new_count, | |
| "old_count": old_count, | |
| "difference": diff, | |
| "difference_pct": diff_pct, | |
| } | |
| results.append(result) | |
| if diff < 0: | |
| tables_with_fewer_rows.append(table) | |
| print(f" ✗ WARNING: {self.new_schema} has FEWER rows!") | |
| elif diff == 0: | |
| tables_with_same_rows.append(table) | |
| print(f" ⚠ Same row count") | |
| else: | |
| tables_with_more_rows.append(table) | |
| print(f" ✓ {self.new_schema} has more rows") | |
| except Exception as e: | |
| print(f" ✗ Error querying table: {e}") | |
| results.append({"table": table, "error": str(e)}) | |
| return { | |
| "results": results, | |
| "tables_with_fewer_rows": tables_with_fewer_rows, | |
| "tables_with_same_rows": tables_with_same_rows, | |
| "tables_with_more_rows": tables_with_more_rows, | |
| } | |
| def print_summary(self, log_check: Dict, schema_comp: Dict, row_comp: Dict): | |
| """ | |
| Print final summary report. | |
| Args: | |
| log_check: Results from check_load_log() | |
| schema_comp: Results from compare_schemas() | |
| row_comp: Results from compare_row_counts() | |
| """ | |
| print("\n" + "=" * 70) | |
| print("VERIFICATION SUMMARY") | |
| print("=" * 70) | |
| issues_found = [] | |
| # Load log summary | |
| print("\n📋 Load Log:") | |
| if log_check.get("exists"): | |
| if log_check.get("failures", 0) > 0: | |
| print(f" ✗ {log_check['failures']} failed operations") | |
| issues_found.append(f"{log_check['failures']} failed load operations") | |
| else: | |
| print(f" ✓ No failures") | |
| if log_check.get("incomplete_tables"): | |
| print(f" ✗ {len(log_check['incomplete_tables'])} incomplete tables") | |
| issues_found.append( | |
| f"{len(log_check['incomplete_tables'])} incomplete tables" | |
| ) | |
| else: | |
| print(f" ✓ All tables completed successfully") | |
| else: | |
| print(f" ⚠ LOAD_LOG not found") | |
| issues_found.append("LOAD_LOG table not found") | |
| # Schema comparison summary | |
| print("\n📊 Schema Comparison:") | |
| if schema_comp.get("only_in_new"): | |
| print( | |
| f" ⚠ {len(schema_comp['only_in_new'])} tables only in {self.new_schema}" | |
| ) | |
| issues_found.append( | |
| f"{len(schema_comp['only_in_new'])} tables only in new schema" | |
| ) | |
| if schema_comp.get("only_in_old"): | |
| print( | |
| f" ✗ {len(schema_comp['only_in_old'])} tables missing from {self.new_schema}" | |
| ) | |
| issues_found.append( | |
| f"{len(schema_comp['only_in_old'])} tables missing from new schema" | |
| ) | |
| if not schema_comp.get("only_in_new") and not schema_comp.get("only_in_old"): | |
| print( | |
| f" ✓ Schemas have identical table sets ({schema_comp['common_tables']} tables)" | |
| ) | |
| # Row count summary | |
| print("\n📈 Row Count Comparison:") | |
| if row_comp.get("tables_with_fewer_rows"): | |
| print( | |
| f" ✗ {len(row_comp['tables_with_fewer_rows'])} tables with FEWER rows in {self.new_schema}" | |
| ) | |
| issues_found.append( | |
| f"{len(row_comp['tables_with_fewer_rows'])} tables with fewer rows" | |
| ) | |
| for table in row_comp["tables_with_fewer_rows"]: | |
| print(f" - {table}") | |
| else: | |
| print(f" ✓ No tables with fewer rows") | |
| if row_comp.get("tables_with_same_rows"): | |
| print( | |
| f" ⚠ {len(row_comp['tables_with_same_rows'])} tables with same row count" | |
| ) | |
| if row_comp.get("tables_with_more_rows"): | |
| print( | |
| f" ✓ {len(row_comp['tables_with_more_rows'])} tables with more rows in {self.new_schema}" | |
| ) | |
| # Overall status | |
| print("\n" + "=" * 70) | |
| if issues_found: | |
| print("❌ VERIFICATION FAILED") | |
| print("\nIssues found:") | |
| for i, issue in enumerate(issues_found, 1): | |
| print(f" {i}. {issue}") | |
| else: | |
| print("✅ VERIFICATION PASSED") | |
| print(f"\nAll {schema_comp['common_tables']} tables loaded successfully") | |
| print( | |
| f"All tables in {self.new_schema} have more rows than {self.old_schema}" | |
| ) | |
| print("=" * 70) | |
| def verify(self): | |
| """Run complete verification process.""" | |
| try: | |
| self.connect() | |
| # Step 1: Check load log | |
| log_results = self.check_load_log() | |
| # Step 2: Compare schemas | |
| schema_results = self.compare_schemas() | |
| # Step 3: Compare row counts for common tables | |
| row_count_results = self.compare_row_counts( | |
| schema_results["tables_to_compare"] | |
| ) | |
| # Step 4: Print summary | |
| self.print_summary(log_results, schema_results, row_count_results) | |
| except Exception as e: | |
| print(f"\n✗ Fatal error during verification: {e}", file=sys.stderr) | |
| raise | |
| finally: | |
| if self.cursor: | |
| self.cursor.close() | |
| if self.conn: | |
| self.conn.close() | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Verify snapshot data load integrity") | |
| parser.add_argument( | |
| "--database", | |
| default="SYNAPSE_RDS_SNAPSHOT", | |
| help="Database name (default: SYNAPSE_RDS_SNAPSHOT)", | |
| ) | |
| parser.add_argument( | |
| "--new-schema", | |
| dest="new_schema", | |
| default="PROD_576", | |
| help="New schema to verify (default: PROD_576)", | |
| ) | |
| parser.add_argument( | |
| "--old-schema", | |
| dest="old_schema", | |
| default="PROD_568", | |
| help="Old schema to compare against (default: PROD_568)", | |
| ) | |
| args = parser.parse_args() | |
| verifier = SnapshotVerifier( | |
| database=args.database, new_schema=args.new_schema, old_schema=args.old_schema | |
| ) | |
| verifier.verify() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment