Created
December 2, 2025 15:51
-
-
Save inntran/439e15e7a8b4934fc90621b8336d331d to your computer and use it in GitHub Desktop.
To convert nested JSON data into two-dimensional CSV or XLSX file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| JSON Flattener Tool | |
| Flattens JSON data by specified levels and exports to CSV or XLSX format. | |
| Each record becomes one row with flattened column names. | |
| Usage: | |
| python3 json_flattener.py input.json --output output.csv --levels 2 | |
| python3 json_flattener.py input.json --output output.xlsx --levels 3 --format xlsx | |
| """ | |
| import argparse | |
| import json | |
| import csv | |
| import sys | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Union, Optional | |
| def is_name_value_array(data: List[Dict]) -> bool: | |
| """ | |
| Check if an array contains name-value pair objects. | |
| Returns True if all items are dicts with 'Name' and 'Value' keys. | |
| """ | |
| if not isinstance(data, list) or len(data) == 0: | |
| return False | |
| for item in data: | |
| if not isinstance(item, dict): | |
| return False | |
| if 'Name' not in item or 'Value' not in item: | |
| return False | |
| return True | |
| def flatten_name_value_array(data: List[Dict], parent_key: str = '') -> Dict[str, Any]: | |
| """ | |
| Flatten a name-value array into a dictionary using Name as keys and Value as values. | |
| """ | |
| items = {} | |
| for item in data: | |
| name = str(item.get('Name', '')).strip() | |
| value = item.get('Value', '') | |
| if name: # Only add if name is not empty | |
| # Clean up the name to be a valid column name | |
| clean_name = name.replace(' ', '_').replace('-', '_').replace('.', '_') | |
| if parent_key: | |
| key = f"{parent_key}_{clean_name}" | |
| else: | |
| key = clean_name | |
| items[key] = value | |
| return items | |
| def flatten_json(data: Any, separator: str = '_', max_levels: int = None, current_level: int = 0, parent_key: str = '') -> Dict[str, Any]: | |
| """ | |
| Flatten a nested JSON object to a specified depth. | |
| Args: | |
| data: The data to flatten (dict, list, or primitive) | |
| separator: String to use for separating nested keys | |
| max_levels: Maximum levels to flatten (None for unlimited) | |
| current_level: Current nesting level (for recursion) | |
| parent_key: Parent key for building nested key names | |
| Returns: | |
| Flattened dictionary | |
| """ | |
| items = {} | |
| if isinstance(data, dict): | |
| for key, value in data.items(): | |
| new_key = f"{parent_key}{separator}{key}" if parent_key else key | |
| # If we've reached max levels, store as JSON string | |
| if max_levels is not None and current_level >= max_levels: | |
| if isinstance(value, (dict, list)): | |
| items[new_key] = json.dumps(value, default=str) | |
| else: | |
| items[new_key] = value | |
| else: | |
| # Continue flattening | |
| if isinstance(value, (dict, list)): | |
| items.update(flatten_json(value, separator, max_levels, current_level + 1, new_key)) | |
| else: | |
| items[new_key] = value | |
| elif isinstance(data, list): | |
| # Check if this is a name-value array | |
| if is_name_value_array(data): | |
| items.update(flatten_name_value_array(data, parent_key)) | |
| else: | |
| # Regular array processing | |
| for i, value in enumerate(data): | |
| new_key = f"{parent_key}{separator}{i}" if parent_key else str(i) | |
| # If we've reached max levels, store as JSON string | |
| if max_levels is not None and current_level >= max_levels: | |
| if isinstance(value, (dict, list)): | |
| items[new_key] = json.dumps(value, default=str) | |
| else: | |
| items[new_key] = value | |
| else: | |
| # Continue flattening | |
| if isinstance(value, (dict, list)): | |
| items.update(flatten_json(value, separator, max_levels, current_level + 1, new_key)) | |
| else: | |
| items[new_key] = value | |
| else: | |
| # Primitive value | |
| if parent_key: | |
| items[parent_key] = data | |
| else: | |
| items['value'] = data | |
| return items | |
| def process_json_data(data: Any, max_levels: int = None, separator: str = '_') -> List[Dict[str, Any]]: | |
| """ | |
| Process JSON data and return a list of flattened records. | |
| Args: | |
| data: Input JSON data | |
| max_levels: Maximum levels to flatten | |
| separator: Separator for nested keys | |
| Returns: | |
| List of flattened dictionaries, one per record | |
| """ | |
| if isinstance(data, list): | |
| # Process each item in the list as a separate record | |
| records = [] | |
| for item in data: | |
| flattened = flatten_json(item, separator, max_levels) | |
| records.append(flattened) | |
| return records | |
| else: | |
| # Single record | |
| flattened = flatten_json(data, separator, max_levels) | |
| return [flattened] | |
| def write_to_csv(records: List[Dict[str, Any]], output_file: str, encoding: str = 'utf-8') -> None: | |
| """Write records to CSV file.""" | |
| if not records: | |
| print("No records to write", file=sys.stderr) | |
| return | |
| # Get all unique fieldnames from all records | |
| fieldnames = set() | |
| for record in records: | |
| fieldnames.update(record.keys()) | |
| fieldnames = sorted(fieldnames) | |
| with open(output_file, 'w', newline='', encoding=encoding) as csvfile: | |
| writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | |
| writer.writeheader() | |
| for record in records: | |
| # Ensure all values are strings and handle None values | |
| clean_record = {} | |
| for key in fieldnames: | |
| value = record.get(key) | |
| if value is None: | |
| clean_record[key] = '' | |
| elif isinstance(value, (dict, list)): | |
| clean_record[key] = json.dumps(value, default=str) | |
| else: | |
| clean_record[key] = str(value) | |
| writer.writerow(clean_record) | |
| def write_to_xlsx(records: List[Dict[str, Any]], output_file: str) -> None: | |
| """Write records to XLSX file using openpyxl.""" | |
| try: | |
| import openpyxl | |
| from openpyxl.utils import get_column_letter | |
| except ImportError: | |
| raise ImportError("openpyxl is required for XLSX output. Install with: pip install openpyxl") | |
| if not records: | |
| print("No records to write", file=sys.stderr) | |
| return | |
| # Get all unique fieldnames from all records | |
| fieldnames = set() | |
| for record in records: | |
| fieldnames.update(record.keys()) | |
| fieldnames = sorted(fieldnames) | |
| # Create workbook and worksheet | |
| wb = openpyxl.Workbook() | |
| ws = wb.active | |
| ws.title = "Flattened Data" | |
| # Write headers | |
| for col, fieldname in enumerate(fieldnames, 1): | |
| ws.cell(row=1, column=col, value=fieldname) | |
| # Write data | |
| for row_idx, record in enumerate(records, 2): | |
| for col_idx, fieldname in enumerate(fieldnames, 1): | |
| value = record.get(fieldname) | |
| if value is None: | |
| cell_value = '' | |
| elif isinstance(value, (dict, list)): | |
| cell_value = json.dumps(value, default=str) | |
| else: | |
| cell_value = str(value) | |
| ws.cell(row=row_idx, column=col_idx, value=cell_value) | |
| # Auto-adjust column widths (optional) | |
| for col in ws.columns: | |
| max_length = 0 | |
| column = col[0].column_letter | |
| for cell in col: | |
| try: | |
| if len(str(cell.value)) > max_length: | |
| max_length = len(str(cell.value)) | |
| except: | |
| pass | |
| adjusted_width = min(max_length + 2, 50) # Cap at 50 characters | |
| ws.column_dimensions[column].width = adjusted_width | |
| wb.save(output_file) | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description='Flatten JSON data by specified levels and export to CSV or XLSX', | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| %(prog)s data.json --output flattened.csv --levels 2 | |
| %(prog)s data.json --output flattened.xlsx --levels 3 --format xlsx | |
| %(prog)s data.json --output result.csv --separator "." | |
| %(prog)s data.json --output result.csv --levels 1 --encoding utf-8 | |
| Notes: | |
| - If levels is not specified, all levels will be flattened | |
| - For XLSX output, openpyxl package is required: pip install openpyxl | |
| - Large nested structures beyond max levels are stored as JSON strings | |
| - Arrays with Name/Value pairs (like CustomFields) are automatically converted to named columns | |
| - Name-Value arrays use the 'Name' field as column names and 'Value' field as cell values | |
| """ | |
| ) | |
| parser.add_argument('input', help='Input JSON file path') | |
| parser.add_argument('--output', '-o', required=True, help='Output file path (.csv or .xlsx)') | |
| parser.add_argument('--levels', '-l', type=int, help='Maximum levels to flatten (default: unlimited)') | |
| parser.add_argument('--format', '-f', choices=['csv', 'xlsx'], help='Output format (auto-detected from extension if not specified)') | |
| parser.add_argument('--separator', '-s', default='_', help='Separator for nested keys (default: _)') | |
| parser.add_argument('--encoding', '-e', default='utf-8', help='File encoding for CSV output (default: utf-8)') | |
| parser.add_argument('--pretty-print', '-p', action='store_true', help='Pretty print input JSON for verification') | |
| args = parser.parse_args() | |
| # Validate input file | |
| input_path = Path(args.input) | |
| if not input_path.exists(): | |
| print(f"Error: Input file '{args.input}' not found", file=sys.stderr) | |
| return 1 | |
| # Determine output format | |
| output_format = args.format | |
| if not output_format: | |
| output_ext = Path(args.output).suffix.lower() | |
| if output_ext == '.csv': | |
| output_format = 'csv' | |
| elif output_ext in ['.xlsx', '.xls']: | |
| output_format = 'xlsx' | |
| else: | |
| print(f"Error: Cannot determine format from extension '{output_ext}'. Use --format to specify.", file=sys.stderr) | |
| return 1 | |
| try: | |
| # Load JSON data | |
| print(f"Loading JSON from {args.input}...") | |
| with open(args.input, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| if args.pretty_print: | |
| print("Input JSON structure:") | |
| print(json.dumps(data, indent=2, default=str)[:1000] + "..." if len(json.dumps(data, default=str)) > 1000 else json.dumps(data, indent=2, default=str)) | |
| print() | |
| # Process data | |
| print(f"Flattening JSON data (max levels: {args.levels if args.levels else 'unlimited'})...") | |
| records = process_json_data(data, args.levels, args.separator) | |
| print(f"Generated {len(records)} record(s)") | |
| if records: | |
| print(f"Each record has {len(records[0])} column(s)") | |
| print("Sample columns:", ", ".join(sorted(records[0].keys())[:10]) + ("..." if len(records[0]) > 10 else "")) | |
| # Write output | |
| print(f"Writing to {args.output} as {output_format.upper()}...") | |
| if output_format == 'csv': | |
| write_to_csv(records, args.output, args.encoding) | |
| elif output_format == 'xlsx': | |
| write_to_xlsx(records, args.output) | |
| print(f"Successfully wrote {len(records)} record(s) to {args.output}") | |
| except FileNotFoundError: | |
| print(f"Error: Could not read input file '{args.input}'", file=sys.stderr) | |
| return 1 | |
| except json.JSONDecodeError as e: | |
| print(f"Error: Invalid JSON in input file: {e}", file=sys.stderr) | |
| return 1 | |
| except ImportError as e: | |
| print(f"Error: {e}", file=sys.stderr) | |
| return 1 | |
| except Exception as e: | |
| print(f"Error: {e}", file=sys.stderr) | |
| return 1 | |
| return 0 | |
| if __name__ == '__main__': | |
| sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment