Skip to content

Instantly share code, notes, and snippets.

@inntran
Created December 2, 2025 15:51
Show Gist options
  • Select an option

  • Save inntran/439e15e7a8b4934fc90621b8336d331d to your computer and use it in GitHub Desktop.

Select an option

Save inntran/439e15e7a8b4934fc90621b8336d331d to your computer and use it in GitHub Desktop.
To convert nested JSON data into two-dimensional CSV or XLSX file.
#!/usr/bin/env python3
"""
JSON Flattener Tool
Flattens JSON data by specified levels and exports to CSV or XLSX format.
Each record becomes one row with flattened column names.
Usage:
python3 json_flattener.py input.json --output output.csv --levels 2
python3 json_flattener.py input.json --output output.xlsx --levels 3 --format xlsx
"""
import argparse
import json
import csv
import sys
from pathlib import Path
from typing import Any, Dict, List, Union, Optional
def is_name_value_array(data: List[Dict]) -> bool:
"""
Check if an array contains name-value pair objects.
Returns True if all items are dicts with 'Name' and 'Value' keys.
"""
if not isinstance(data, list) or len(data) == 0:
return False
for item in data:
if not isinstance(item, dict):
return False
if 'Name' not in item or 'Value' not in item:
return False
return True
def flatten_name_value_array(data: List[Dict], parent_key: str = '') -> Dict[str, Any]:
"""
Flatten a name-value array into a dictionary using Name as keys and Value as values.
"""
items = {}
for item in data:
name = str(item.get('Name', '')).strip()
value = item.get('Value', '')
if name: # Only add if name is not empty
# Clean up the name to be a valid column name
clean_name = name.replace(' ', '_').replace('-', '_').replace('.', '_')
if parent_key:
key = f"{parent_key}_{clean_name}"
else:
key = clean_name
items[key] = value
return items
def flatten_json(data: Any, separator: str = '_', max_levels: int = None, current_level: int = 0, parent_key: str = '') -> Dict[str, Any]:
"""
Flatten a nested JSON object to a specified depth.
Args:
data: The data to flatten (dict, list, or primitive)
separator: String to use for separating nested keys
max_levels: Maximum levels to flatten (None for unlimited)
current_level: Current nesting level (for recursion)
parent_key: Parent key for building nested key names
Returns:
Flattened dictionary
"""
items = {}
if isinstance(data, dict):
for key, value in data.items():
new_key = f"{parent_key}{separator}{key}" if parent_key else key
# If we've reached max levels, store as JSON string
if max_levels is not None and current_level >= max_levels:
if isinstance(value, (dict, list)):
items[new_key] = json.dumps(value, default=str)
else:
items[new_key] = value
else:
# Continue flattening
if isinstance(value, (dict, list)):
items.update(flatten_json(value, separator, max_levels, current_level + 1, new_key))
else:
items[new_key] = value
elif isinstance(data, list):
# Check if this is a name-value array
if is_name_value_array(data):
items.update(flatten_name_value_array(data, parent_key))
else:
# Regular array processing
for i, value in enumerate(data):
new_key = f"{parent_key}{separator}{i}" if parent_key else str(i)
# If we've reached max levels, store as JSON string
if max_levels is not None and current_level >= max_levels:
if isinstance(value, (dict, list)):
items[new_key] = json.dumps(value, default=str)
else:
items[new_key] = value
else:
# Continue flattening
if isinstance(value, (dict, list)):
items.update(flatten_json(value, separator, max_levels, current_level + 1, new_key))
else:
items[new_key] = value
else:
# Primitive value
if parent_key:
items[parent_key] = data
else:
items['value'] = data
return items
def process_json_data(data: Any, max_levels: int = None, separator: str = '_') -> List[Dict[str, Any]]:
"""
Process JSON data and return a list of flattened records.
Args:
data: Input JSON data
max_levels: Maximum levels to flatten
separator: Separator for nested keys
Returns:
List of flattened dictionaries, one per record
"""
if isinstance(data, list):
# Process each item in the list as a separate record
records = []
for item in data:
flattened = flatten_json(item, separator, max_levels)
records.append(flattened)
return records
else:
# Single record
flattened = flatten_json(data, separator, max_levels)
return [flattened]
def write_to_csv(records: List[Dict[str, Any]], output_file: str, encoding: str = 'utf-8') -> None:
"""Write records to CSV file."""
if not records:
print("No records to write", file=sys.stderr)
return
# Get all unique fieldnames from all records
fieldnames = set()
for record in records:
fieldnames.update(record.keys())
fieldnames = sorted(fieldnames)
with open(output_file, 'w', newline='', encoding=encoding) as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for record in records:
# Ensure all values are strings and handle None values
clean_record = {}
for key in fieldnames:
value = record.get(key)
if value is None:
clean_record[key] = ''
elif isinstance(value, (dict, list)):
clean_record[key] = json.dumps(value, default=str)
else:
clean_record[key] = str(value)
writer.writerow(clean_record)
def write_to_xlsx(records: List[Dict[str, Any]], output_file: str) -> None:
"""Write records to XLSX file using openpyxl."""
try:
import openpyxl
from openpyxl.utils import get_column_letter
except ImportError:
raise ImportError("openpyxl is required for XLSX output. Install with: pip install openpyxl")
if not records:
print("No records to write", file=sys.stderr)
return
# Get all unique fieldnames from all records
fieldnames = set()
for record in records:
fieldnames.update(record.keys())
fieldnames = sorted(fieldnames)
# Create workbook and worksheet
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "Flattened Data"
# Write headers
for col, fieldname in enumerate(fieldnames, 1):
ws.cell(row=1, column=col, value=fieldname)
# Write data
for row_idx, record in enumerate(records, 2):
for col_idx, fieldname in enumerate(fieldnames, 1):
value = record.get(fieldname)
if value is None:
cell_value = ''
elif isinstance(value, (dict, list)):
cell_value = json.dumps(value, default=str)
else:
cell_value = str(value)
ws.cell(row=row_idx, column=col_idx, value=cell_value)
# Auto-adjust column widths (optional)
for col in ws.columns:
max_length = 0
column = col[0].column_letter
for cell in col:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50) # Cap at 50 characters
ws.column_dimensions[column].width = adjusted_width
wb.save(output_file)
def main():
parser = argparse.ArgumentParser(
description='Flatten JSON data by specified levels and export to CSV or XLSX',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s data.json --output flattened.csv --levels 2
%(prog)s data.json --output flattened.xlsx --levels 3 --format xlsx
%(prog)s data.json --output result.csv --separator "."
%(prog)s data.json --output result.csv --levels 1 --encoding utf-8
Notes:
- If levels is not specified, all levels will be flattened
- For XLSX output, openpyxl package is required: pip install openpyxl
- Large nested structures beyond max levels are stored as JSON strings
- Arrays with Name/Value pairs (like CustomFields) are automatically converted to named columns
- Name-Value arrays use the 'Name' field as column names and 'Value' field as cell values
"""
)
parser.add_argument('input', help='Input JSON file path')
parser.add_argument('--output', '-o', required=True, help='Output file path (.csv or .xlsx)')
parser.add_argument('--levels', '-l', type=int, help='Maximum levels to flatten (default: unlimited)')
parser.add_argument('--format', '-f', choices=['csv', 'xlsx'], help='Output format (auto-detected from extension if not specified)')
parser.add_argument('--separator', '-s', default='_', help='Separator for nested keys (default: _)')
parser.add_argument('--encoding', '-e', default='utf-8', help='File encoding for CSV output (default: utf-8)')
parser.add_argument('--pretty-print', '-p', action='store_true', help='Pretty print input JSON for verification')
args = parser.parse_args()
# Validate input file
input_path = Path(args.input)
if not input_path.exists():
print(f"Error: Input file '{args.input}' not found", file=sys.stderr)
return 1
# Determine output format
output_format = args.format
if not output_format:
output_ext = Path(args.output).suffix.lower()
if output_ext == '.csv':
output_format = 'csv'
elif output_ext in ['.xlsx', '.xls']:
output_format = 'xlsx'
else:
print(f"Error: Cannot determine format from extension '{output_ext}'. Use --format to specify.", file=sys.stderr)
return 1
try:
# Load JSON data
print(f"Loading JSON from {args.input}...")
with open(args.input, 'r', encoding='utf-8') as f:
data = json.load(f)
if args.pretty_print:
print("Input JSON structure:")
print(json.dumps(data, indent=2, default=str)[:1000] + "..." if len(json.dumps(data, default=str)) > 1000 else json.dumps(data, indent=2, default=str))
print()
# Process data
print(f"Flattening JSON data (max levels: {args.levels if args.levels else 'unlimited'})...")
records = process_json_data(data, args.levels, args.separator)
print(f"Generated {len(records)} record(s)")
if records:
print(f"Each record has {len(records[0])} column(s)")
print("Sample columns:", ", ".join(sorted(records[0].keys())[:10]) + ("..." if len(records[0]) > 10 else ""))
# Write output
print(f"Writing to {args.output} as {output_format.upper()}...")
if output_format == 'csv':
write_to_csv(records, args.output, args.encoding)
elif output_format == 'xlsx':
write_to_xlsx(records, args.output)
print(f"Successfully wrote {len(records)} record(s) to {args.output}")
except FileNotFoundError:
print(f"Error: Could not read input file '{args.input}'", file=sys.stderr)
return 1
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in input file: {e}", file=sys.stderr)
return 1
except ImportError as e:
print(f"Error: {e}", file=sys.stderr)
return 1
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
return 0
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment