|
import csv |
|
import os |
|
import sys |
|
import logging |
|
import argparse |
|
import pandas as pd |
|
import fileinput |
|
|
|
LOGLEVEL = os.environ.get('LOGLEVEL', 'WARNING').upper() |
|
logging.basicConfig(level=LOGLEVEL) |
|
|
|
class RecordParser(object): |
|
def __init__(self, header_csv, record_csv, **kwargs): |
|
self.primary_list = [] |
|
self.header_data = {} |
|
self.header_data_records = self._set_colspecs(header_csv) |
|
self.detail_data = {} |
|
self.detail_data_records = self._set_colspecs(record_csv) |
|
self.last_header_row = 0 |
|
self.current_file = None |
|
|
|
def _get_fixed_data(self, line, header): |
|
item_data = {} |
|
for col_name, (start_col, end_col) in header: |
|
item_data[col_name] = line[start_col:end_col] |
|
|
|
return item_data |
|
|
|
def _set_colspecs(self, file_name): |
|
name = [] |
|
colspecs = [] |
|
|
|
df = pd.read_csv(file_name) |
|
|
|
for index, row in df.iterrows(): |
|
name.append(row[0]) |
|
colspecs.append((row[1], row[2])) |
|
|
|
return list(zip(name, colspecs)) |
|
|
|
def _process_row(self, line): |
|
if line[22:25] == "000": |
|
if (self.last_header_row == (self.current_file.filelineno() - 1) and not self.current_file.isfirstline()): |
|
logging.info(f"Found empty header at {self.current_file.filelineno()}") |
|
self.primary_list.append(self.header_data.copy()) |
|
self.last_header_row = self.current_file.filelineno() |
|
self.header_data.clear() |
|
self.header_data = self._get_fixed_data(line, self.header_data_records) |
|
logging.info(f"Header data set to: {self.header_data}") |
|
return self.header_data |
|
else: |
|
line_object = {**self.header_data, **self._get_fixed_data(line, self.detail_data_records)} |
|
self.primary_list.append(line_object) |
|
logging.info(f"Line data set to: {line_object}") |
|
return self.primary_list |
|
|
|
def parse_file(self, file_name): |
|
with fileinput.input(file_name) as file: |
|
self.current_file = file |
|
for line in file: |
|
self._process_row(line) |
|
logging.debug(f"Parsed line #{file.filelineno()} : {line}") |
|
|
|
def write_csv(self, outfile="output.csv", newline=""): |
|
with open(outfile, "w", newline=newline) as csv_file: |
|
cols = sorted(set(key for key, value in (self.detail_data_records + self.header_data_records))) |
|
writer = csv.DictWriter(csv_file, |
|
fieldnames=cols, |
|
restval='', |
|
extrasaction='ignore') |
|
|
|
logging.info("Writing output file.") |
|
writer.writeheader() |
|
writer.writerows(self.primary_list) |
|
|
|
if __name__ == "__main__": |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument("-v", "--verbose", help="Print detailed parsing information.", |
|
action="store_true") |
|
parser.add_argument("-l", "--headercsv", help="The path to the CSV file containing the header line record format.") |
|
parser.add_argument("-d", "--detailcsv", help="The path to the CSV file containing the detail line record format.") |
|
parser.add_argument("-i", "--input", help="The path to the TXT file containing the UCR data file to process.") |
|
args = parser.parse_args() |
|
|
|
if args.verbose: |
|
LOGLEVEL = "DEBUG" |
|
|
|
record = RecordParser(args.headercsv, args.detailcsv) |
|
record.parse_file(args.input) |
|
record.write_csv() |
|
|