Skip to content

Instantly share code, notes, and snippets.

@philerooski
Created February 24, 2026 19:32
Show Gist options
  • Select an option

  • Save philerooski/7d78829e10a71a9b06b2ab268c758b37 to your computer and use it in GitHub Desktop.

Select an option

Save philerooski/7d78829e10a71a9b06b2ab268c758b37 to your computer and use it in GitHub Desktop.
A helper script to verify that data loaded by `load_snapshot_data.py` looks as expected
"""
Verify snapshot data load integrity.
This script checks:
1. LOAD_LOG table for any errors or anomalies
2. Compares tables between PROD_576 and PROD_568 schemas
3. Validates record counts (PROD_576 should have more records)
Usage:
python verify_snapshot_load.py --database SYNAPSE_RDS_SNAPSHOT \\
--new-schema PROD_576 --old-schema PROD_568
"""
import snowflake.connector
import sys
from typing import Dict, List, Tuple
from dataclasses import dataclass
from datetime import datetime
@dataclass
class TableStats:
"""Statistics for a table."""
name: str
row_count: int
exists: bool = True
@dataclass
class LoadLogEntry:
"""Entry from the LOAD_LOG table."""
prefix: str
data_type: str
stage_path: str
phase: str
status: str
sql_text: str
error_message: str
log_ts: datetime
class SnapshotVerifier:
"""Verify snapshot data load integrity."""
def __init__(self, database: str, new_schema: str, old_schema: str):
self.database = database
self.new_schema = new_schema
self.old_schema = old_schema
self.conn = None
self.cursor = None
def connect(self):
"""Connect to Snowflake."""
print("Connecting to Snowflake...")
self.conn = snowflake.connector.connect()
self.cursor = self.conn.cursor()
self.cursor.execute(f"USE DATABASE {self.database}")
print(f" ✓ Connected to database {self.database}\n")
def check_load_log(self) -> Dict[str, any]:
"""
Check LOAD_LOG table for anomalies.
Returns:
Dictionary with load log statistics and any errors found
"""
print("=" * 70)
print("CHECKING LOAD_LOG FOR ANOMALIES")
print("=" * 70)
self.cursor.execute(f"USE SCHEMA {self.new_schema}")
# Check if LOAD_LOG exists by attempting to describe it
try:
self.cursor.execute("DESC TABLE LOAD_LOG")
# Table exists if no exception was raised
except Exception:
print(" ⚠ LOAD_LOG table not found in schema")
return {"exists": False}
# Get summary statistics
self.cursor.execute(
"""
SELECT
COUNT(*) as total_entries,
COUNT(DISTINCT DATA_TYPE) as unique_tables,
COUNT(DISTINCT PHASE) as unique_phases,
COUNT(DISTINCT STATUS) as unique_statuses
FROM LOAD_LOG
"""
)
stats = self.cursor.fetchone()
total_entries, unique_tables, unique_phases, unique_statuses = stats
print(f"\nLoad Log Summary:")
print(f" Total log entries: {total_entries}")
print(f" Unique tables processed: {unique_tables}")
print(f" Unique phases: {unique_phases}")
print(f" Unique statuses: {unique_statuses}")
# Check for failures
self.cursor.execute(
"""
SELECT
DATA_TYPE,
PHASE,
STATUS,
ERROR_MESSAGE,
LOG_TS
FROM LOAD_LOG
WHERE STATUS IN ('FAIL', 'FAILED')
ORDER BY LOG_TS
"""
)
failures = self.cursor.fetchall()
if failures:
print(f"\n ✗ Found {len(failures)} FAILED operations:")
for data_type, phase, status, error_msg, log_ts in failures:
print(f"\n Table: {data_type}")
print(f" Phase: {phase}")
print(f" Status: {status}")
print(f" Time: {log_ts}")
print(f" Error: {error_msg}")
else:
print(f"\n ✓ No failed operations found")
# Check completion status for each table
self.cursor.execute(
"""
SELECT
DATA_TYPE,
COUNT(*) as log_entries,
SUM(CASE WHEN STATUS = 'OK' THEN 1 ELSE 0 END) as successful_ops,
SUM(CASE WHEN STATUS IN ('FAIL', 'FAILED') THEN 1 ELSE 0 END) as failed_ops,
MAX(CASE WHEN PHASE = 'COPY_DATA' AND STATUS = 'OK' THEN 1 ELSE 0 END) as copy_completed
FROM LOAD_LOG
GROUP BY DATA_TYPE
ORDER BY DATA_TYPE
"""
)
table_completion = self.cursor.fetchall()
incomplete_tables = []
for (
data_type,
log_entries,
successful_ops,
failed_ops,
copy_completed,
) in table_completion:
if copy_completed == 0:
incomplete_tables.append(data_type)
if incomplete_tables:
print(
f"\n ⚠ Tables without completed COPY_DATA operations ({len(incomplete_tables)}):"
)
for table in incomplete_tables:
print(f" - {table}")
else:
print(
f"\n ✓ All {len(table_completion)} tables completed COPY_DATA successfully"
)
return {
"exists": True,
"total_entries": total_entries,
"unique_tables": unique_tables,
"failures": len(failures),
"incomplete_tables": incomplete_tables,
}
def get_table_list(self, schema: str) -> List[str]:
"""
Get list of all tables in a schema (excluding LOAD_LOG).
Args:
schema: Schema name to query
Returns:
List of table names
"""
self.cursor.execute(
f"""
SELECT TABLE_NAME
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = '{schema}'
AND TABLE_TYPE = 'BASE TABLE'
AND TABLE_NAME != 'LOAD_LOG'
ORDER BY TABLE_NAME
"""
)
return [row[0] for row in self.cursor.fetchall()]
def get_table_row_count(self, schema: str, table: str) -> int:
"""
Get row count for a specific table.
Args:
schema: Schema name
table: Table name
Returns:
Number of rows in the table
"""
self.cursor.execute(f"SELECT COUNT(*) FROM {schema}.{table}")
return self.cursor.fetchone()[0]
def compare_schemas(self) -> Dict[str, any]:
"""
Compare tables between new and old schemas.
Returns:
Dictionary with comparison results
"""
print("\n" + "=" * 70)
print("COMPARING SCHEMAS")
print("=" * 70)
print(f"\nGetting table lists...")
new_tables = set(self.get_table_list(self.new_schema))
old_tables = set(self.get_table_list(self.old_schema))
print(f" {self.new_schema}: {len(new_tables)} tables")
print(f" {self.old_schema}: {len(old_tables)} tables")
# Find differences
only_in_new = new_tables - old_tables
only_in_old = old_tables - new_tables
common_tables = new_tables & old_tables
print(f"\n Common tables: {len(common_tables)}")
if only_in_new:
print(f"\n ⚠ Tables only in {self.new_schema} ({len(only_in_new)}):")
for table in sorted(only_in_new):
print(f" + {table}")
if only_in_old:
print(f"\n ⚠ Tables only in {self.old_schema} ({len(only_in_old)}):")
for table in sorted(only_in_old):
print(f" - {table}")
if not only_in_new and not only_in_old:
print(f" ✓ Both schemas have identical table sets")
return {
"new_schema_tables": len(new_tables),
"old_schema_tables": len(old_tables),
"common_tables": len(common_tables),
"only_in_new": list(only_in_new),
"only_in_old": list(only_in_old),
"tables_to_compare": sorted(common_tables),
}
def compare_row_counts(self, tables: List[str]) -> Dict[str, any]:
"""
Compare row counts for common tables.
Args:
tables: List of table names to compare
Returns:
Dictionary with row count comparison results
"""
print("\n" + "=" * 70)
print("COMPARING ROW COUNTS")
print("=" * 70)
results = []
tables_with_fewer_rows = []
tables_with_same_rows = []
tables_with_more_rows = []
for i, table in enumerate(tables, 1):
print(f"\nChecking {i}/{len(tables)}: {table}")
try:
new_count = self.get_table_row_count(self.new_schema, table)
old_count = self.get_table_row_count(self.old_schema, table)
diff = new_count - old_count
diff_pct = (diff / old_count * 100) if old_count > 0 else 0
print(f" {self.new_schema}: {new_count:,} rows")
print(f" {self.old_schema}: {old_count:,} rows")
print(f" Difference: {diff:+,} ({diff_pct:+.2f}%)")
result = {
"table": table,
"new_count": new_count,
"old_count": old_count,
"difference": diff,
"difference_pct": diff_pct,
}
results.append(result)
if diff < 0:
tables_with_fewer_rows.append(table)
print(f" ✗ WARNING: {self.new_schema} has FEWER rows!")
elif diff == 0:
tables_with_same_rows.append(table)
print(f" ⚠ Same row count")
else:
tables_with_more_rows.append(table)
print(f" ✓ {self.new_schema} has more rows")
except Exception as e:
print(f" ✗ Error querying table: {e}")
results.append({"table": table, "error": str(e)})
return {
"results": results,
"tables_with_fewer_rows": tables_with_fewer_rows,
"tables_with_same_rows": tables_with_same_rows,
"tables_with_more_rows": tables_with_more_rows,
}
def print_summary(self, log_check: Dict, schema_comp: Dict, row_comp: Dict):
"""
Print final summary report.
Args:
log_check: Results from check_load_log()
schema_comp: Results from compare_schemas()
row_comp: Results from compare_row_counts()
"""
print("\n" + "=" * 70)
print("VERIFICATION SUMMARY")
print("=" * 70)
issues_found = []
# Load log summary
print("\n📋 Load Log:")
if log_check.get("exists"):
if log_check.get("failures", 0) > 0:
print(f" ✗ {log_check['failures']} failed operations")
issues_found.append(f"{log_check['failures']} failed load operations")
else:
print(f" ✓ No failures")
if log_check.get("incomplete_tables"):
print(f" ✗ {len(log_check['incomplete_tables'])} incomplete tables")
issues_found.append(
f"{len(log_check['incomplete_tables'])} incomplete tables"
)
else:
print(f" ✓ All tables completed successfully")
else:
print(f" ⚠ LOAD_LOG not found")
issues_found.append("LOAD_LOG table not found")
# Schema comparison summary
print("\n📊 Schema Comparison:")
if schema_comp.get("only_in_new"):
print(
f" ⚠ {len(schema_comp['only_in_new'])} tables only in {self.new_schema}"
)
issues_found.append(
f"{len(schema_comp['only_in_new'])} tables only in new schema"
)
if schema_comp.get("only_in_old"):
print(
f" ✗ {len(schema_comp['only_in_old'])} tables missing from {self.new_schema}"
)
issues_found.append(
f"{len(schema_comp['only_in_old'])} tables missing from new schema"
)
if not schema_comp.get("only_in_new") and not schema_comp.get("only_in_old"):
print(
f" ✓ Schemas have identical table sets ({schema_comp['common_tables']} tables)"
)
# Row count summary
print("\n📈 Row Count Comparison:")
if row_comp.get("tables_with_fewer_rows"):
print(
f" ✗ {len(row_comp['tables_with_fewer_rows'])} tables with FEWER rows in {self.new_schema}"
)
issues_found.append(
f"{len(row_comp['tables_with_fewer_rows'])} tables with fewer rows"
)
for table in row_comp["tables_with_fewer_rows"]:
print(f" - {table}")
else:
print(f" ✓ No tables with fewer rows")
if row_comp.get("tables_with_same_rows"):
print(
f" ⚠ {len(row_comp['tables_with_same_rows'])} tables with same row count"
)
if row_comp.get("tables_with_more_rows"):
print(
f" ✓ {len(row_comp['tables_with_more_rows'])} tables with more rows in {self.new_schema}"
)
# Overall status
print("\n" + "=" * 70)
if issues_found:
print("❌ VERIFICATION FAILED")
print("\nIssues found:")
for i, issue in enumerate(issues_found, 1):
print(f" {i}. {issue}")
else:
print("✅ VERIFICATION PASSED")
print(f"\nAll {schema_comp['common_tables']} tables loaded successfully")
print(
f"All tables in {self.new_schema} have more rows than {self.old_schema}"
)
print("=" * 70)
def verify(self):
"""Run complete verification process."""
try:
self.connect()
# Step 1: Check load log
log_results = self.check_load_log()
# Step 2: Compare schemas
schema_results = self.compare_schemas()
# Step 3: Compare row counts for common tables
row_count_results = self.compare_row_counts(
schema_results["tables_to_compare"]
)
# Step 4: Print summary
self.print_summary(log_results, schema_results, row_count_results)
except Exception as e:
print(f"\n✗ Fatal error during verification: {e}", file=sys.stderr)
raise
finally:
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Verify snapshot data load integrity")
parser.add_argument(
"--database",
default="SYNAPSE_RDS_SNAPSHOT",
help="Database name (default: SYNAPSE_RDS_SNAPSHOT)",
)
parser.add_argument(
"--new-schema",
dest="new_schema",
default="PROD_576",
help="New schema to verify (default: PROD_576)",
)
parser.add_argument(
"--old-schema",
dest="old_schema",
default="PROD_568",
help="Old schema to compare against (default: PROD_568)",
)
args = parser.parse_args()
verifier = SnapshotVerifier(
database=args.database, new_schema=args.new_schema, old_schema=args.old_schema
)
verifier.verify()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment