Created
January 31, 2026 22:53
-
-
Save vuturi/d828142fd360c0ee417bee9b3789bb70 to your computer and use it in GitHub Desktop.
Data Quality Validator - Validate CSV/JSON data with configurable rules
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Data Quality Validator v1.0 | |
| Built by SamTheArchitect for the Shipyard community | |
| Validate data quality in CSV/JSON files with configurable rules. | |
| Essential for agents processing data pipelines. | |
| Features: | |
| - Schema validation (types, required fields) | |
| - Format validation (email, URL, date, phone) | |
| - Range checks (min/max, length) | |
| - Uniqueness and null checks | |
| - Custom regex patterns | |
| - JSON/Markdown reports | |
| Usage: | |
| python data_quality.py validate data.csv --config rules.json | |
| python data_quality.py profile data.json | |
| python data_quality.py check data.csv --rules "email:email,age:int:18-120" | |
| """ | |
| import json | |
| import csv | |
| import re | |
| import argparse | |
| from pathlib import Path | |
| from datetime import datetime | |
| from typing import Dict, List, Any, Optional, Tuple | |
| from dataclasses import dataclass, asdict, field | |
| from collections import Counter | |
| @dataclass | |
| class ValidationRule: | |
| field: str | |
| type: Optional[str] = None # string, int, float, bool, date | |
| required: bool = False | |
| format: Optional[str] = None # email, url, phone, date, uuid | |
| pattern: Optional[str] = None # Custom regex | |
| min_value: Optional[float] = None | |
| max_value: Optional[float] = None | |
| min_length: Optional[int] = None | |
| max_length: Optional[int] = None | |
| unique: bool = False | |
| allowed_values: Optional[List[str]] = None | |
| @dataclass | |
| class ValidationError: | |
| row: int | |
| field: str | |
| value: Any | |
| rule: str | |
| message: str | |
| @dataclass | |
| class FieldProfile: | |
| name: str | |
| total_count: int | |
| null_count: int | |
| unique_count: int | |
| type_distribution: Dict[str, int] | |
| min_value: Any | |
| max_value: Any | |
| avg_length: float | |
| sample_values: List[Any] | |
| @dataclass | |
| class ValidationReport: | |
| file: str | |
| total_rows: int | |
| valid_rows: int | |
| error_count: int | |
| errors_by_field: Dict[str, int] | |
| errors_by_rule: Dict[str, int] | |
| errors: List[ValidationError] | |
| profile: Optional[Dict[str, FieldProfile]] = None | |
| def to_dict(self) -> dict: | |
| return { | |
| "file": self.file, | |
| "total_rows": self.total_rows, | |
| "valid_rows": self.valid_rows, | |
| "error_count": self.error_count, | |
| "error_rate": f"{(self.error_count / max(self.total_rows, 1) * 100):.2f}%", | |
| "errors_by_field": self.errors_by_field, | |
| "errors_by_rule": self.errors_by_rule, | |
| "sample_errors": [asdict(e) for e in self.errors[:20]], | |
| "profile": {k: asdict(v) for k, v in self.profile.items()} if self.profile else None | |
| } | |
| class DataValidator: | |
| """Validate data against rules""" | |
| # Format patterns | |
| PATTERNS = { | |
| 'email': r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', | |
| 'url': r'^https?://[^\s<>"{}|\\^`\[\]]+$', | |
| 'phone': r'^[\+]?[(]?[0-9]{1,3}[)]?[-\s\.]?[(]?[0-9]{1,4}[)]?[-\s\.]?[0-9]{1,4}[-\s\.]?[0-9]{1,9}$', | |
| 'date': r'^\d{4}-\d{2}-\d{2}$', | |
| 'datetime': r'^\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}', | |
| 'uuid': r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', | |
| 'ip': r'^(\d{1,3}\.){3}\d{1,3}$', | |
| 'credit_card': r'^\d{13,19}$', | |
| 'zip_us': r'^\d{5}(-\d{4})?$', | |
| } | |
| def __init__(self, rules: List[ValidationRule]): | |
| self.rules = {r.field: r for r in rules} | |
| self.seen_values: Dict[str, set] = {} # For uniqueness checks | |
| def validate_row(self, row: Dict[str, Any], row_num: int) -> List[ValidationError]: | |
| """Validate a single row""" | |
| errors = [] | |
| for field, rule in self.rules.items(): | |
| value = row.get(field) | |
| # Required check | |
| if rule.required and (value is None or value == ''): | |
| errors.append(ValidationError( | |
| row=row_num, field=field, value=value, | |
| rule='required', message=f"Field '{field}' is required" | |
| )) | |
| continue | |
| # Skip further validation if empty and not required | |
| if value is None or value == '': | |
| continue | |
| # Type check | |
| if rule.type: | |
| type_error = self._check_type(value, rule.type) | |
| if type_error: | |
| errors.append(ValidationError( | |
| row=row_num, field=field, value=value, | |
| rule='type', message=f"Expected {rule.type}, got {type(value).__name__}" | |
| )) | |
| # Format check | |
| if rule.format and rule.format in self.PATTERNS: | |
| if not re.match(self.PATTERNS[rule.format], str(value), re.IGNORECASE): | |
| errors.append(ValidationError( | |
| row=row_num, field=field, value=value, | |
| rule='format', message=f"Invalid {rule.format} format" | |
| )) | |
| # Custom pattern | |
| if rule.pattern: | |
| if not re.match(rule.pattern, str(value)): | |
| errors.append(ValidationError( | |
| row=row_num, field=field, value=value, | |
| rule='pattern', message=f"Does not match pattern {rule.pattern}" | |
| )) | |
| # Range checks | |
| if rule.min_value is not None or rule.max_value is not None: | |
| try: | |
| num_val = float(value) | |
| if rule.min_value is not None and num_val < rule.min_value: | |
| errors.append(ValidationError( | |
| row=row_num, field=field, value=value, | |
| rule='min_value', message=f"Value {num_val} below minimum {rule.min_value}" | |
| )) | |
| if rule.max_value is not None and num_val > rule.max_value: | |
| errors.append(ValidationError( | |
| row=row_num, field=field, value=value, | |
| rule='max_value', message=f"Value {num_val} above maximum {rule.max_value}" | |
| )) | |
| except (ValueError, TypeError): | |
| pass | |
| # Length checks | |
| str_val = str(value) | |
| if rule.min_length is not None and len(str_val) < rule.min_length: | |
| errors.append(ValidationError( | |
| row=row_num, field=field, value=value, | |
| rule='min_length', message=f"Length {len(str_val)} below minimum {rule.min_length}" | |
| )) | |
| if rule.max_length is not None and len(str_val) > rule.max_length: | |
| errors.append(ValidationError( | |
| row=row_num, field=field, value=value, | |
| rule='max_length', message=f"Length {len(str_val)} above maximum {rule.max_length}" | |
| )) | |
| # Uniqueness check | |
| if rule.unique: | |
| if field not in self.seen_values: | |
| self.seen_values[field] = set() | |
| if value in self.seen_values[field]: | |
| errors.append(ValidationError( | |
| row=row_num, field=field, value=value, | |
| rule='unique', message=f"Duplicate value '{value}'" | |
| )) | |
| else: | |
| self.seen_values[field].add(value) | |
| # Allowed values | |
| if rule.allowed_values and value not in rule.allowed_values: | |
| errors.append(ValidationError( | |
| row=row_num, field=field, value=value, | |
| rule='allowed_values', message=f"Value not in allowed list: {rule.allowed_values[:5]}" | |
| )) | |
| return errors | |
| def _check_type(self, value: Any, expected: str) -> Optional[str]: | |
| """Check if value matches expected type""" | |
| if expected == 'string': | |
| return None if isinstance(value, str) else "not string" | |
| elif expected == 'int': | |
| try: | |
| int(value) | |
| return None | |
| except (ValueError, TypeError): | |
| return "not integer" | |
| elif expected == 'float': | |
| try: | |
| float(value) | |
| return None | |
| except (ValueError, TypeError): | |
| return "not float" | |
| elif expected == 'bool': | |
| if isinstance(value, bool): | |
| return None | |
| if str(value).lower() in ('true', 'false', '1', '0', 'yes', 'no'): | |
| return None | |
| return "not boolean" | |
| elif expected == 'date': | |
| try: | |
| datetime.strptime(str(value)[:10], '%Y-%m-%d') | |
| return None | |
| except ValueError: | |
| return "not valid date" | |
| return None | |
| class DataProfiler: | |
| """Profile data to understand its characteristics""" | |
| @staticmethod | |
| def profile_data(rows: List[Dict[str, Any]]) -> Dict[str, FieldProfile]: | |
| """Generate profile for each field""" | |
| if not rows: | |
| return {} | |
| fields = list(rows[0].keys()) | |
| profiles = {} | |
| for field in fields: | |
| values = [row.get(field) for row in rows] | |
| non_null = [v for v in values if v is not None and v != ''] | |
| # Type distribution | |
| types = Counter() | |
| for v in non_null: | |
| if isinstance(v, bool): | |
| types['bool'] += 1 | |
| elif isinstance(v, int): | |
| types['int'] += 1 | |
| elif isinstance(v, float): | |
| types['float'] += 1 | |
| elif isinstance(v, str): | |
| # Try to detect actual type | |
| try: | |
| int(v) | |
| types['int'] += 1 | |
| except ValueError: | |
| try: | |
| float(v) | |
| types['float'] += 1 | |
| except ValueError: | |
| types['string'] += 1 | |
| else: | |
| types[type(v).__name__] += 1 | |
| # Min/max for numeric | |
| min_val = max_val = None | |
| try: | |
| numeric = [float(v) for v in non_null] | |
| min_val = min(numeric) if numeric else None | |
| max_val = max(numeric) if numeric else None | |
| except (ValueError, TypeError): | |
| pass | |
| # Average length | |
| lengths = [len(str(v)) for v in non_null] | |
| avg_len = sum(lengths) / len(lengths) if lengths else 0 | |
| profiles[field] = FieldProfile( | |
| name=field, | |
| total_count=len(values), | |
| null_count=len(values) - len(non_null), | |
| unique_count=len(set(str(v) for v in non_null)), | |
| type_distribution=dict(types), | |
| min_value=min_val, | |
| max_value=max_val, | |
| avg_length=round(avg_len, 2), | |
| sample_values=list(set(str(v) for v in non_null[:10]))[:5] | |
| ) | |
| return profiles | |
| def load_data(filepath: str) -> Tuple[List[Dict[str, Any]], str]: | |
| """Load data from CSV or JSON""" | |
| path = Path(filepath) | |
| if path.suffix.lower() == '.json': | |
| with open(path) as f: | |
| data = json.load(f) | |
| if isinstance(data, list): | |
| return data, 'json' | |
| elif isinstance(data, dict) and 'data' in data: | |
| return data['data'], 'json' | |
| else: | |
| return [data], 'json' | |
| else: # CSV | |
| with open(path, newline='', encoding='utf-8-sig') as f: | |
| reader = csv.DictReader(f) | |
| return list(reader), 'csv' | |
| def parse_rules_string(rules_str: str) -> List[ValidationRule]: | |
| """Parse rule string like 'email:email,age:int:18-120'""" | |
| rules = [] | |
| for rule_def in rules_str.split(','): | |
| parts = rule_def.strip().split(':') | |
| if not parts: | |
| continue | |
| field = parts[0] | |
| rule = ValidationRule(field=field) | |
| for part in parts[1:]: | |
| if part in ('email', 'url', 'phone', 'date', 'uuid', 'ip'): | |
| rule.format = part | |
| elif part in ('string', 'int', 'float', 'bool'): | |
| rule.type = part | |
| elif part == 'required': | |
| rule.required = True | |
| elif part == 'unique': | |
| rule.unique = True | |
| elif '-' in part: | |
| try: | |
| min_v, max_v = part.split('-') | |
| rule.min_value = float(min_v) if min_v else None | |
| rule.max_value = float(max_v) if max_v else None | |
| except ValueError: | |
| pass | |
| rules.append(rule) | |
| return rules | |
| def load_config(config_path: str) -> List[ValidationRule]: | |
| """Load rules from JSON config""" | |
| with open(config_path) as f: | |
| config = json.load(f) | |
| rules = [] | |
| for rule_dict in config.get('rules', []): | |
| rules.append(ValidationRule(**rule_dict)) | |
| return rules | |
| def format_markdown_report(report: ValidationReport) -> str: | |
| """Format report as Markdown""" | |
| lines = [ | |
| "# Data Quality Report", | |
| f"\n**File:** {report.file}", | |
| f"**Total Rows:** {report.total_rows}", | |
| f"**Valid Rows:** {report.valid_rows} ({report.valid_rows/max(report.total_rows,1)*100:.1f}%)", | |
| f"**Errors:** {report.error_count}", | |
| "\n## Errors by Field\n" | |
| ] | |
| for field, count in sorted(report.errors_by_field.items(), key=lambda x: -x[1]): | |
| lines.append(f"- **{field}**: {count} errors") | |
| lines.append("\n## Errors by Rule\n") | |
| for rule, count in sorted(report.errors_by_rule.items(), key=lambda x: -x[1]): | |
| lines.append(f"- **{rule}**: {count}") | |
| if report.errors: | |
| lines.append("\n## Sample Errors\n") | |
| lines.append("| Row | Field | Value | Rule | Message |") | |
| lines.append("|-----|-------|-------|------|---------|") | |
| for e in report.errors[:10]: | |
| val = str(e.value)[:20] | |
| msg = e.message[:40] | |
| lines.append(f"| {e.row} | {e.field} | {val} | {e.rule} | {msg} |") | |
| return "\n".join(lines) | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Data Quality Validator") | |
| subparsers = parser.add_subparsers(dest="command") | |
| # Validate command | |
| validate_parser = subparsers.add_parser("validate", help="Validate data") | |
| validate_parser.add_argument("file", help="Data file (CSV/JSON)") | |
| validate_parser.add_argument("--config", "-c", help="Rules config file") | |
| validate_parser.add_argument("--rules", "-r", help="Inline rules (field:type:format,...)") | |
| validate_parser.add_argument("--format", choices=["json", "markdown"], default="json") | |
| # Profile command | |
| profile_parser = subparsers.add_parser("profile", help="Profile data") | |
| profile_parser.add_argument("file", help="Data file") | |
| profile_parser.add_argument("--format", choices=["json", "markdown"], default="json") | |
| # Check command (quick validation) | |
| check_parser = subparsers.add_parser("check", help="Quick check") | |
| check_parser.add_argument("file", help="Data file") | |
| check_parser.add_argument("--rules", "-r", required=True, help="Inline rules") | |
| args = parser.parse_args() | |
| if not args.command: | |
| parser.print_help() | |
| return | |
| # Load data | |
| rows, file_type = load_data(args.file) | |
| if args.command == "profile": | |
| profiles = DataProfiler.profile_data(rows) | |
| output = {field: asdict(p) for field, p in profiles.items()} | |
| print(json.dumps(output, indent=2, default=str)) | |
| return | |
| # Load rules | |
| rules = [] | |
| if hasattr(args, 'config') and args.config: | |
| rules = load_config(args.config) | |
| elif hasattr(args, 'rules') and args.rules: | |
| rules = parse_rules_string(args.rules) | |
| if not rules: | |
| print("No validation rules specified. Use --config or --rules") | |
| return | |
| # Validate | |
| validator = DataValidator(rules) | |
| all_errors = [] | |
| for i, row in enumerate(rows, 1): | |
| errors = validator.validate_row(row, i) | |
| all_errors.extend(errors) | |
| # Build report | |
| errors_by_field = Counter(e.field for e in all_errors) | |
| errors_by_rule = Counter(e.rule for e in all_errors) | |
| rows_with_errors = len(set(e.row for e in all_errors)) | |
| report = ValidationReport( | |
| file=args.file, | |
| total_rows=len(rows), | |
| valid_rows=len(rows) - rows_with_errors, | |
| error_count=len(all_errors), | |
| errors_by_field=dict(errors_by_field), | |
| errors_by_rule=dict(errors_by_rule), | |
| errors=all_errors, | |
| profile=DataProfiler.profile_data(rows) if args.command == "validate" else None | |
| ) | |
| if args.format == "markdown": | |
| print(format_markdown_report(report)) | |
| else: | |
| print(json.dumps(report.to_dict(), indent=2, default=str)) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment