Created
November 13, 2025 19:15
-
-
Save segyges/54d55198586cf5da6126b2c2a13e1e62 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pandas as pd | |
| import sys | |
| from pathlib import Path | |
| from datetime import datetime | |
| def log_warning(message, log_file="conversion_warnings.txt"): | |
| """Log a warning message to the warnings file with timestamp.""" | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| with open(log_file, 'a', encoding='utf-8') as f: | |
| f.write(f"[{timestamp}] {message}\n") | |
| def parse_dat_file(file_path): | |
| """ | |
| Parse a .dat file with various eDiscovery delimiters. | |
| Args: | |
| file_path: Path to the .dat file | |
| Returns: | |
| pandas DataFrame with parsed data | |
| """ | |
| file_path = Path(file_path) | |
| if not file_path.exists(): | |
| raise FileNotFoundError(f"File not found: {file_path}") | |
| if not file_path.suffix.lower() == '.dat': | |
| log_warning(f"File extension is '{file_path.suffix}', expected '.dat'") | |
| print(f"Reading file: {file_path}") | |
| try: | |
| # Read the file with UTF-8 encoding | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| except UnicodeDecodeError: | |
| # Try with latin-1 if UTF-8 fails | |
| log_warning(f"UTF-8 decoding failed, trying latin-1 encoding") | |
| with open(file_path, 'r', encoding='latin-1') as f: | |
| content = f.read() | |
| # Remove BOM if present | |
| if content.startswith('\ufeff'): | |
| content = content[1:] | |
| print("Removed BOM from file") | |
| # Split into lines | |
| lines = content.split('\n') | |
| lines = [line.strip() for line in lines if line.strip()] | |
| if not lines: | |
| raise ValueError("File is empty or contains no valid data") | |
| print(f"Found {len(lines)} lines in file") | |
| # Auto-detect delimiter by examining first line | |
| first_line = lines[0] | |
| if first_line.startswith('þ'): | |
| first_line = first_line[1:] | |
| if 'þ\x14þ' in first_line: | |
| delimiter = 'þ\x14þ' | |
| delim_name = 'þ<DC4>þ (thorn-DC4-thorn)' | |
| elif 'þ¶þ' in first_line: | |
| delimiter = 'þ¶þ' | |
| delim_name = 'þ¶þ (thorn-pilcrow-thorn)' | |
| elif 'þ' in first_line: | |
| delimiter = 'þ' | |
| delim_name = 'þ (thorn)' | |
| else: | |
| log_warning("No recognized delimiter found, using comma") | |
| delimiter = ',' | |
| delim_name = ', (comma)' | |
| print(f"Detected delimiter: {delim_name}") | |
| parsed_rows = [] | |
| for line_num, line in enumerate(lines, start=1): | |
| try: | |
| cleaned = line | |
| # Remove leading thorn if present | |
| if cleaned.startswith('þ'): | |
| cleaned = cleaned[1:] | |
| # Remove trailing delimiters | |
| for suffix in ['þ¶', '¶', 'þ']: | |
| if cleaned.endswith(suffix): | |
| cleaned = cleaned[:-len(suffix)] | |
| break | |
| # Split by detected delimiter | |
| fields = cleaned.split(delimiter) | |
| # Filter out the \x14 control characters that appear between delimiters | |
| # These are remnants when delimiter is þ\x14þ and we split on it | |
| fields = [f for f in fields if f != '\x14'] | |
| parsed_rows.append(fields) | |
| except Exception as e: | |
| log_warning(f"Line {line_num}: Error parsing line - {str(e)}") | |
| continue | |
| if not parsed_rows: | |
| raise ValueError("No valid data rows could be parsed") | |
| # Check if all rows have the same number of fields | |
| field_counts = [len(row) for row in parsed_rows] | |
| if len(set(field_counts)) > 1: | |
| log_warning(f"Inconsistent field counts detected: {set(field_counts)}") | |
| log_warning(f"Header has {field_counts[0]} fields") | |
| for i, count in enumerate(field_counts[1:], start=2): | |
| if count != field_counts[0]: | |
| log_warning(f"Line {i}: Has {count} fields instead of {field_counts[0]}") | |
| # First row is headers | |
| headers = parsed_rows[0] | |
| data_rows = parsed_rows[1:] | |
| print(f"Headers: {len(headers)} columns") | |
| print(f"Data rows: {len(data_rows)} rows") | |
| # Check for empty or duplicate headers | |
| for i, header in enumerate(headers): | |
| if not header or header.strip() == '': | |
| log_warning(f"Column {i+1} has empty header, using 'Column_{i+1}' instead") | |
| headers[i] = f'Column_{i+1}' | |
| if len(headers) != len(set(headers)): | |
| log_warning(f"Duplicate headers detected: {[h for h in headers if headers.count(h) > 1]}") | |
| # Create DataFrame | |
| # Pad rows that are too short, truncate rows that are too long | |
| max_cols = len(headers) | |
| normalized_rows = [] | |
| for i, row in enumerate(data_rows, start=2): | |
| if len(row) < max_cols: | |
| log_warning(f"Line {i}: Row has {len(row)} fields, padding to {max_cols}") | |
| row = row + [''] * (max_cols - len(row)) | |
| elif len(row) > max_cols: | |
| log_warning(f"Line {i}: Row has {len(row)} fields, truncating to {max_cols}") | |
| row = row[:max_cols] | |
| normalized_rows.append(row) | |
| df = pd.DataFrame(normalized_rows, columns=headers) | |
| return df | |
| def convert_dat_to_csv(input_file, output_file=None): | |
| """ | |
| Convert a .dat file to CSV format. | |
| Args: | |
| input_file: Path to input .dat file | |
| output_file: Path to output .csv file (optional, defaults to input_file with .csv extension) | |
| """ | |
| input_path = Path(input_file) | |
| if output_file is None: | |
| output_file = input_path.with_suffix('.csv') | |
| else: | |
| output_file = Path(output_file) | |
| print(f"\n{'='*60}") | |
| print(f"eDiscovery DAT to CSV Converter") | |
| print(f"{'='*60}\n") | |
| # Clear previous warnings file for this conversion | |
| log_file = "conversion_warnings.txt" | |
| if Path(log_file).exists(): | |
| Path(log_file).unlink() | |
| try: | |
| # Parse the DAT file | |
| df = parse_dat_file(input_path) | |
| # Export to CSV using pandas (handles all escaping properly) | |
| df.to_csv(output_file, index=False, encoding='utf-8') | |
| print(f"\n✓ Successfully converted to CSV") | |
| print(f" Input: {input_path}") | |
| print(f" Output: {output_file}") | |
| print(f" Rows: {len(df)}") | |
| print(f" Columns: {len(df.columns)}") | |
| if Path(log_file).exists(): | |
| print(f"\n⚠ Warnings were logged to: {log_file}") | |
| else: | |
| print(f"\n✓ No warnings generated") | |
| print(f"\n{'='*60}\n") | |
| except Exception as e: | |
| error_msg = f"FATAL ERROR: {str(e)}" | |
| log_warning(error_msg) | |
| print(f"\n✗ Conversion failed: {str(e)}") | |
| print(f" Check {log_file} for details\n") | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| if len(sys.argv) < 2: | |
| print("Usage: python convert_dat.py <input_file.dat> [output_file.csv]") | |
| print("\nExample:") | |
| print(" python convert_dat.py documents.dat") | |
| print(" python convert_dat.py documents.dat output.csv") | |
| sys.exit(1) | |
| input_file = sys.argv[1] | |
| output_file = sys.argv[2] if len(sys.argv) > 2 else None | |
| convert_dat_to_csv(input_file, output_file) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment