Skip to content

Instantly share code, notes, and snippets.

@segyges
Created November 13, 2025 19:15
Show Gist options
  • Select an option

  • Save segyges/54d55198586cf5da6126b2c2a13e1e62 to your computer and use it in GitHub Desktop.

Select an option

Save segyges/54d55198586cf5da6126b2c2a13e1e62 to your computer and use it in GitHub Desktop.
import pandas as pd
import sys
from pathlib import Path
from datetime import datetime
def log_warning(message, log_file="conversion_warnings.txt"):
"""Log a warning message to the warnings file with timestamp."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(log_file, 'a', encoding='utf-8') as f:
f.write(f"[{timestamp}] {message}\n")
def parse_dat_file(file_path):
"""
Parse a .dat file with various eDiscovery delimiters.
Args:
file_path: Path to the .dat file
Returns:
pandas DataFrame with parsed data
"""
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
if not file_path.suffix.lower() == '.dat':
log_warning(f"File extension is '{file_path.suffix}', expected '.dat'")
print(f"Reading file: {file_path}")
try:
# Read the file with UTF-8 encoding
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
except UnicodeDecodeError:
# Try with latin-1 if UTF-8 fails
log_warning(f"UTF-8 decoding failed, trying latin-1 encoding")
with open(file_path, 'r', encoding='latin-1') as f:
content = f.read()
# Remove BOM if present
if content.startswith('\ufeff'):
content = content[1:]
print("Removed BOM from file")
# Split into lines
lines = content.split('\n')
lines = [line.strip() for line in lines if line.strip()]
if not lines:
raise ValueError("File is empty or contains no valid data")
print(f"Found {len(lines)} lines in file")
# Auto-detect delimiter by examining first line
first_line = lines[0]
if first_line.startswith('þ'):
first_line = first_line[1:]
if 'þ\x14þ' in first_line:
delimiter = 'þ\x14þ'
delim_name = 'þ<DC4>þ (thorn-DC4-thorn)'
elif 'þ¶þ' in first_line:
delimiter = 'þ¶þ'
delim_name = 'þ¶þ (thorn-pilcrow-thorn)'
elif 'þ' in first_line:
delimiter = 'þ'
delim_name = 'þ (thorn)'
else:
log_warning("No recognized delimiter found, using comma")
delimiter = ','
delim_name = ', (comma)'
print(f"Detected delimiter: {delim_name}")
parsed_rows = []
for line_num, line in enumerate(lines, start=1):
try:
cleaned = line
# Remove leading thorn if present
if cleaned.startswith('þ'):
cleaned = cleaned[1:]
# Remove trailing delimiters
for suffix in ['þ¶', '¶', 'þ']:
if cleaned.endswith(suffix):
cleaned = cleaned[:-len(suffix)]
break
# Split by detected delimiter
fields = cleaned.split(delimiter)
# Filter out the \x14 control characters that appear between delimiters
# These are remnants when delimiter is þ\x14þ and we split on it
fields = [f for f in fields if f != '\x14']
parsed_rows.append(fields)
except Exception as e:
log_warning(f"Line {line_num}: Error parsing line - {str(e)}")
continue
if not parsed_rows:
raise ValueError("No valid data rows could be parsed")
# Check if all rows have the same number of fields
field_counts = [len(row) for row in parsed_rows]
if len(set(field_counts)) > 1:
log_warning(f"Inconsistent field counts detected: {set(field_counts)}")
log_warning(f"Header has {field_counts[0]} fields")
for i, count in enumerate(field_counts[1:], start=2):
if count != field_counts[0]:
log_warning(f"Line {i}: Has {count} fields instead of {field_counts[0]}")
# First row is headers
headers = parsed_rows[0]
data_rows = parsed_rows[1:]
print(f"Headers: {len(headers)} columns")
print(f"Data rows: {len(data_rows)} rows")
# Check for empty or duplicate headers
for i, header in enumerate(headers):
if not header or header.strip() == '':
log_warning(f"Column {i+1} has empty header, using 'Column_{i+1}' instead")
headers[i] = f'Column_{i+1}'
if len(headers) != len(set(headers)):
log_warning(f"Duplicate headers detected: {[h for h in headers if headers.count(h) > 1]}")
# Create DataFrame
# Pad rows that are too short, truncate rows that are too long
max_cols = len(headers)
normalized_rows = []
for i, row in enumerate(data_rows, start=2):
if len(row) < max_cols:
log_warning(f"Line {i}: Row has {len(row)} fields, padding to {max_cols}")
row = row + [''] * (max_cols - len(row))
elif len(row) > max_cols:
log_warning(f"Line {i}: Row has {len(row)} fields, truncating to {max_cols}")
row = row[:max_cols]
normalized_rows.append(row)
df = pd.DataFrame(normalized_rows, columns=headers)
return df
def convert_dat_to_csv(input_file, output_file=None):
"""
Convert a .dat file to CSV format.
Args:
input_file: Path to input .dat file
output_file: Path to output .csv file (optional, defaults to input_file with .csv extension)
"""
input_path = Path(input_file)
if output_file is None:
output_file = input_path.with_suffix('.csv')
else:
output_file = Path(output_file)
print(f"\n{'='*60}")
print(f"eDiscovery DAT to CSV Converter")
print(f"{'='*60}\n")
# Clear previous warnings file for this conversion
log_file = "conversion_warnings.txt"
if Path(log_file).exists():
Path(log_file).unlink()
try:
# Parse the DAT file
df = parse_dat_file(input_path)
# Export to CSV using pandas (handles all escaping properly)
df.to_csv(output_file, index=False, encoding='utf-8')
print(f"\n✓ Successfully converted to CSV")
print(f" Input: {input_path}")
print(f" Output: {output_file}")
print(f" Rows: {len(df)}")
print(f" Columns: {len(df.columns)}")
if Path(log_file).exists():
print(f"\n⚠ Warnings were logged to: {log_file}")
else:
print(f"\n✓ No warnings generated")
print(f"\n{'='*60}\n")
except Exception as e:
error_msg = f"FATAL ERROR: {str(e)}"
log_warning(error_msg)
print(f"\n✗ Conversion failed: {str(e)}")
print(f" Check {log_file} for details\n")
sys.exit(1)
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python convert_dat.py <input_file.dat> [output_file.csv]")
print("\nExample:")
print(" python convert_dat.py documents.dat")
print(" python convert_dat.py documents.dat output.csv")
sys.exit(1)
input_file = sys.argv[1]
output_file = sys.argv[2] if len(sys.argv) > 2 else None
convert_dat_to_csv(input_file, output_file)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment