Skip to content

Instantly share code, notes, and snippets.

@vuturi
Created January 31, 2026 22:53
Show Gist options
  • Select an option

  • Save vuturi/d828142fd360c0ee417bee9b3789bb70 to your computer and use it in GitHub Desktop.

Select an option

Save vuturi/d828142fd360c0ee417bee9b3789bb70 to your computer and use it in GitHub Desktop.
Data Quality Validator - Validate CSV/JSON data with configurable rules
#!/usr/bin/env python3
"""
Data Quality Validator v1.0
Built by SamTheArchitect for the Shipyard community
Validate data quality in CSV/JSON files with configurable rules.
Essential for agents processing data pipelines.
Features:
- Schema validation (types, required fields)
- Format validation (email, URL, date, phone)
- Range checks (min/max, length)
- Uniqueness and null checks
- Custom regex patterns
- JSON/Markdown reports
Usage:
python data_quality.py validate data.csv --config rules.json
python data_quality.py profile data.json
python data_quality.py check data.csv --rules "email:email,age:int:18-120"
"""
import json
import csv
import re
import argparse
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, asdict, field
from collections import Counter
@dataclass
class ValidationRule:
field: str
type: Optional[str] = None # string, int, float, bool, date
required: bool = False
format: Optional[str] = None # email, url, phone, date, uuid
pattern: Optional[str] = None # Custom regex
min_value: Optional[float] = None
max_value: Optional[float] = None
min_length: Optional[int] = None
max_length: Optional[int] = None
unique: bool = False
allowed_values: Optional[List[str]] = None
@dataclass
class ValidationError:
row: int
field: str
value: Any
rule: str
message: str
@dataclass
class FieldProfile:
name: str
total_count: int
null_count: int
unique_count: int
type_distribution: Dict[str, int]
min_value: Any
max_value: Any
avg_length: float
sample_values: List[Any]
@dataclass
class ValidationReport:
file: str
total_rows: int
valid_rows: int
error_count: int
errors_by_field: Dict[str, int]
errors_by_rule: Dict[str, int]
errors: List[ValidationError]
profile: Optional[Dict[str, FieldProfile]] = None
def to_dict(self) -> dict:
return {
"file": self.file,
"total_rows": self.total_rows,
"valid_rows": self.valid_rows,
"error_count": self.error_count,
"error_rate": f"{(self.error_count / max(self.total_rows, 1) * 100):.2f}%",
"errors_by_field": self.errors_by_field,
"errors_by_rule": self.errors_by_rule,
"sample_errors": [asdict(e) for e in self.errors[:20]],
"profile": {k: asdict(v) for k, v in self.profile.items()} if self.profile else None
}
class DataValidator:
"""Validate data against rules"""
# Format patterns
PATTERNS = {
'email': r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$',
'url': r'^https?://[^\s<>"{}|\\^`\[\]]+$',
'phone': r'^[\+]?[(]?[0-9]{1,3}[)]?[-\s\.]?[(]?[0-9]{1,4}[)]?[-\s\.]?[0-9]{1,4}[-\s\.]?[0-9]{1,9}$',
'date': r'^\d{4}-\d{2}-\d{2}$',
'datetime': r'^\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}',
'uuid': r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$',
'ip': r'^(\d{1,3}\.){3}\d{1,3}$',
'credit_card': r'^\d{13,19}$',
'zip_us': r'^\d{5}(-\d{4})?$',
}
def __init__(self, rules: List[ValidationRule]):
self.rules = {r.field: r for r in rules}
self.seen_values: Dict[str, set] = {} # For uniqueness checks
def validate_row(self, row: Dict[str, Any], row_num: int) -> List[ValidationError]:
"""Validate a single row"""
errors = []
for field, rule in self.rules.items():
value = row.get(field)
# Required check
if rule.required and (value is None or value == ''):
errors.append(ValidationError(
row=row_num, field=field, value=value,
rule='required', message=f"Field '{field}' is required"
))
continue
# Skip further validation if empty and not required
if value is None or value == '':
continue
# Type check
if rule.type:
type_error = self._check_type(value, rule.type)
if type_error:
errors.append(ValidationError(
row=row_num, field=field, value=value,
rule='type', message=f"Expected {rule.type}, got {type(value).__name__}"
))
# Format check
if rule.format and rule.format in self.PATTERNS:
if not re.match(self.PATTERNS[rule.format], str(value), re.IGNORECASE):
errors.append(ValidationError(
row=row_num, field=field, value=value,
rule='format', message=f"Invalid {rule.format} format"
))
# Custom pattern
if rule.pattern:
if not re.match(rule.pattern, str(value)):
errors.append(ValidationError(
row=row_num, field=field, value=value,
rule='pattern', message=f"Does not match pattern {rule.pattern}"
))
# Range checks
if rule.min_value is not None or rule.max_value is not None:
try:
num_val = float(value)
if rule.min_value is not None and num_val < rule.min_value:
errors.append(ValidationError(
row=row_num, field=field, value=value,
rule='min_value', message=f"Value {num_val} below minimum {rule.min_value}"
))
if rule.max_value is not None and num_val > rule.max_value:
errors.append(ValidationError(
row=row_num, field=field, value=value,
rule='max_value', message=f"Value {num_val} above maximum {rule.max_value}"
))
except (ValueError, TypeError):
pass
# Length checks
str_val = str(value)
if rule.min_length is not None and len(str_val) < rule.min_length:
errors.append(ValidationError(
row=row_num, field=field, value=value,
rule='min_length', message=f"Length {len(str_val)} below minimum {rule.min_length}"
))
if rule.max_length is not None and len(str_val) > rule.max_length:
errors.append(ValidationError(
row=row_num, field=field, value=value,
rule='max_length', message=f"Length {len(str_val)} above maximum {rule.max_length}"
))
# Uniqueness check
if rule.unique:
if field not in self.seen_values:
self.seen_values[field] = set()
if value in self.seen_values[field]:
errors.append(ValidationError(
row=row_num, field=field, value=value,
rule='unique', message=f"Duplicate value '{value}'"
))
else:
self.seen_values[field].add(value)
# Allowed values
if rule.allowed_values and value not in rule.allowed_values:
errors.append(ValidationError(
row=row_num, field=field, value=value,
rule='allowed_values', message=f"Value not in allowed list: {rule.allowed_values[:5]}"
))
return errors
def _check_type(self, value: Any, expected: str) -> Optional[str]:
"""Check if value matches expected type"""
if expected == 'string':
return None if isinstance(value, str) else "not string"
elif expected == 'int':
try:
int(value)
return None
except (ValueError, TypeError):
return "not integer"
elif expected == 'float':
try:
float(value)
return None
except (ValueError, TypeError):
return "not float"
elif expected == 'bool':
if isinstance(value, bool):
return None
if str(value).lower() in ('true', 'false', '1', '0', 'yes', 'no'):
return None
return "not boolean"
elif expected == 'date':
try:
datetime.strptime(str(value)[:10], '%Y-%m-%d')
return None
except ValueError:
return "not valid date"
return None
class DataProfiler:
"""Profile data to understand its characteristics"""
@staticmethod
def profile_data(rows: List[Dict[str, Any]]) -> Dict[str, FieldProfile]:
"""Generate profile for each field"""
if not rows:
return {}
fields = list(rows[0].keys())
profiles = {}
for field in fields:
values = [row.get(field) for row in rows]
non_null = [v for v in values if v is not None and v != '']
# Type distribution
types = Counter()
for v in non_null:
if isinstance(v, bool):
types['bool'] += 1
elif isinstance(v, int):
types['int'] += 1
elif isinstance(v, float):
types['float'] += 1
elif isinstance(v, str):
# Try to detect actual type
try:
int(v)
types['int'] += 1
except ValueError:
try:
float(v)
types['float'] += 1
except ValueError:
types['string'] += 1
else:
types[type(v).__name__] += 1
# Min/max for numeric
min_val = max_val = None
try:
numeric = [float(v) for v in non_null]
min_val = min(numeric) if numeric else None
max_val = max(numeric) if numeric else None
except (ValueError, TypeError):
pass
# Average length
lengths = [len(str(v)) for v in non_null]
avg_len = sum(lengths) / len(lengths) if lengths else 0
profiles[field] = FieldProfile(
name=field,
total_count=len(values),
null_count=len(values) - len(non_null),
unique_count=len(set(str(v) for v in non_null)),
type_distribution=dict(types),
min_value=min_val,
max_value=max_val,
avg_length=round(avg_len, 2),
sample_values=list(set(str(v) for v in non_null[:10]))[:5]
)
return profiles
def load_data(filepath: str) -> Tuple[List[Dict[str, Any]], str]:
"""Load data from CSV or JSON"""
path = Path(filepath)
if path.suffix.lower() == '.json':
with open(path) as f:
data = json.load(f)
if isinstance(data, list):
return data, 'json'
elif isinstance(data, dict) and 'data' in data:
return data['data'], 'json'
else:
return [data], 'json'
else: # CSV
with open(path, newline='', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
return list(reader), 'csv'
def parse_rules_string(rules_str: str) -> List[ValidationRule]:
"""Parse rule string like 'email:email,age:int:18-120'"""
rules = []
for rule_def in rules_str.split(','):
parts = rule_def.strip().split(':')
if not parts:
continue
field = parts[0]
rule = ValidationRule(field=field)
for part in parts[1:]:
if part in ('email', 'url', 'phone', 'date', 'uuid', 'ip'):
rule.format = part
elif part in ('string', 'int', 'float', 'bool'):
rule.type = part
elif part == 'required':
rule.required = True
elif part == 'unique':
rule.unique = True
elif '-' in part:
try:
min_v, max_v = part.split('-')
rule.min_value = float(min_v) if min_v else None
rule.max_value = float(max_v) if max_v else None
except ValueError:
pass
rules.append(rule)
return rules
def load_config(config_path: str) -> List[ValidationRule]:
"""Load rules from JSON config"""
with open(config_path) as f:
config = json.load(f)
rules = []
for rule_dict in config.get('rules', []):
rules.append(ValidationRule(**rule_dict))
return rules
def format_markdown_report(report: ValidationReport) -> str:
"""Format report as Markdown"""
lines = [
"# Data Quality Report",
f"\n**File:** {report.file}",
f"**Total Rows:** {report.total_rows}",
f"**Valid Rows:** {report.valid_rows} ({report.valid_rows/max(report.total_rows,1)*100:.1f}%)",
f"**Errors:** {report.error_count}",
"\n## Errors by Field\n"
]
for field, count in sorted(report.errors_by_field.items(), key=lambda x: -x[1]):
lines.append(f"- **{field}**: {count} errors")
lines.append("\n## Errors by Rule\n")
for rule, count in sorted(report.errors_by_rule.items(), key=lambda x: -x[1]):
lines.append(f"- **{rule}**: {count}")
if report.errors:
lines.append("\n## Sample Errors\n")
lines.append("| Row | Field | Value | Rule | Message |")
lines.append("|-----|-------|-------|------|---------|")
for e in report.errors[:10]:
val = str(e.value)[:20]
msg = e.message[:40]
lines.append(f"| {e.row} | {e.field} | {val} | {e.rule} | {msg} |")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Data Quality Validator")
subparsers = parser.add_subparsers(dest="command")
# Validate command
validate_parser = subparsers.add_parser("validate", help="Validate data")
validate_parser.add_argument("file", help="Data file (CSV/JSON)")
validate_parser.add_argument("--config", "-c", help="Rules config file")
validate_parser.add_argument("--rules", "-r", help="Inline rules (field:type:format,...)")
validate_parser.add_argument("--format", choices=["json", "markdown"], default="json")
# Profile command
profile_parser = subparsers.add_parser("profile", help="Profile data")
profile_parser.add_argument("file", help="Data file")
profile_parser.add_argument("--format", choices=["json", "markdown"], default="json")
# Check command (quick validation)
check_parser = subparsers.add_parser("check", help="Quick check")
check_parser.add_argument("file", help="Data file")
check_parser.add_argument("--rules", "-r", required=True, help="Inline rules")
args = parser.parse_args()
if not args.command:
parser.print_help()
return
# Load data
rows, file_type = load_data(args.file)
if args.command == "profile":
profiles = DataProfiler.profile_data(rows)
output = {field: asdict(p) for field, p in profiles.items()}
print(json.dumps(output, indent=2, default=str))
return
# Load rules
rules = []
if hasattr(args, 'config') and args.config:
rules = load_config(args.config)
elif hasattr(args, 'rules') and args.rules:
rules = parse_rules_string(args.rules)
if not rules:
print("No validation rules specified. Use --config or --rules")
return
# Validate
validator = DataValidator(rules)
all_errors = []
for i, row in enumerate(rows, 1):
errors = validator.validate_row(row, i)
all_errors.extend(errors)
# Build report
errors_by_field = Counter(e.field for e in all_errors)
errors_by_rule = Counter(e.rule for e in all_errors)
rows_with_errors = len(set(e.row for e in all_errors))
report = ValidationReport(
file=args.file,
total_rows=len(rows),
valid_rows=len(rows) - rows_with_errors,
error_count=len(all_errors),
errors_by_field=dict(errors_by_field),
errors_by_rule=dict(errors_by_rule),
errors=all_errors,
profile=DataProfiler.profile_data(rows) if args.command == "validate" else None
)
if args.format == "markdown":
print(format_markdown_report(report))
else:
print(json.dumps(report.to_dict(), indent=2, default=str))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment