Skip to content

Instantly share code, notes, and snippets.

@tpjfern03
Forked from evz/load_data.py
Last active November 29, 2018 18:17
Show Gist options
  • Select an option

  • Save tpjfern03/e9d57f32a7b7ebb0b901cb7b5f1d0084 to your computer and use it in GitHub Desktop.

Select an option

Save tpjfern03/e9d57f32a7b7ebb0b901cb7b5f1d0084 to your computer and use it in GitHub Desktop.
Faster loader for loading data to pgsql #pgsql #sqlalchemy
import csv
import sqlalchemy as sa
import os
import pytz
from datetime import datetime
import itertools
tz = pytz.timezone('America/Chicago')
# DB_CONN = os.environ['DATABASE_URL']
DB_CONN = 'postgres://eric:@localhost:5432/oeem_test'
engine = sa.create_engine(DB_CONN)
project_table = sa.Table('datastore_project',
sa.MetaData(),
autoload=True,
autoload_with=engine)
attribute_keys = [
{
'id': 1,
'name': 'project_cost',
'data_type': 'float_value',
'display_name': 'Project Cost',
},
{
'id': 2,
'name': 'predicted_electricity_savings',
'data_type': 'float_value',
'display_name': 'Estimated Electricity Savings'
},
]
def setupTables():
conn = engine.connect()
trans = conn.begin()
try:
conn.execute('''
INSERT INTO auth_user (
id,
password,
is_superuser,
username,
first_name,
last_name,
email,
is_staff,
is_active,
date_joined
) VALUES (
1,
'test',
TRUE,
'test',
'tester',
'mctesterson',
'test@test.com',
TRUE,
TRUE,
NOW()
)
''')
trans.commit()
except sa.exc.IntegrityError:
trans.rollback()
trans = conn.begin()
try:
conn.execute('''
INSERT INTO datastore_projectowner (
id,
user_id,
added,
updated
) VALUES (
1,
1,
NOW(),
NOW()
)
''')
trans.commit()
except sa.exc.IntegrityError:
trans.rollback()
trans = conn.begin()
try:
conn.execute('TRUNCATE datastore_projectattributekey CASCADE')
conn.execute(sa.text('''
INSERT INTO datastore_projectattributekey (
id,
name,
data_type,
display_name
) VALUES (
:id,
:name,
:data_type,
:display_name
)
'''), *attribute_keys)
trans.commit()
except sa.exc.IntegrityError as e:
trans.rollback()
def loadProjects():
with open("project-processed.csv", 'r') as project_f:
reader = csv.reader(project_f)
header = next(reader)
project_header = header[:7]
project_header = project_header + ['project_owner_id', 'added', 'updated']
projects = []
with engine.begin() as conn:
conn.execute('TRUNCATE datastore_project CASCADE')
for row in reader:
this_row = row[:7]
now = tz.localize(datetime.now())
this_row = this_row + [1, now, now]
projects.append(dict(zip(project_header, this_row)))
if len(projects) % 50000 == 0:
conn.execute(project_table.insert(), *projects)
projects = []
if projects:
conn.execute(project_table.insert(), *projects)
project_f.seek(0)
header = next(reader)
# float_value, key_id, project_id
attr_insert = '''
INSERT INTO datastore_projectattribute (
float_value,
key_id,
project_id
) VALUES (
:float_value,
:key_id,
(SELECT id FROM datastore_project WHERE project_id = :project_id)
)
'''
attributes = []
with engine.begin() as conn:
conn.execute('TRUNCATE datastore_projectattribute CASCADE')
for row in reader:
savings, cost = row[7:]
attributes.append({
'float_value': savings,
'key_id': 2,
'project_id': row[0]
})
attributes.append({
'float_value': cost,
'key_id': 1,
'project_id': row[0]
})
if len(projects) % 50000 == 0:
conn.execute(sa.text(attr_insert), *attributes)
attributes = []
if attributes:
conn.execute(sa.text(attr_insert), *attributes)
def loadConsumption():
grouper = lambda x: x[0]
metadata_insert = '''
INSERT INTO datastore_consumptionmetadata (
project_id,
energy_unit,
fuel_type,
added,
updated
)
SELECT
id,
'KWH' AS energy_unit,
'E' AS fuel_type,
NOW() AS added,
NOW() AS updated
FROM datastore_project
WHERE project_id = :project_id
RETURNING id
'''
record_insert = '''
INSERT INTO datastore_consumptionrecord (
start,
value,
estimated,
metadata_id
) VALUES (
:start,
:value,
:estimated,
:metadata_id
)
'''
with open('consumption-processed.csv', 'r') as f:
reader = csv.reader(f)
next(reader)
record_header = ['start', 'value', 'estimated']
total = 0
with engine.begin() as conn:
conn.execute('TRUNCATE datastore_consumptionmetadata CASCADE')
conn.execute('TRUNCATE datastore_consumptionrecord CASCADE')
with engine.begin() as conn:
record_inserts = []
for project_id, project_group in itertools.groupby(reader, key=grouper):
metadata_id = list(conn.execute(sa.text(metadata_insert),
project_id=project_id))
if metadata_id:
for record in project_group:
row = dict(zip(record_header, record[4:]))
row['metadata_id'] = metadata_id[0].id
record_inserts.append(row)
if len(record_inserts) % 50000 == 0:
total += 50000
print('inserted', total)
conn.execute(sa.text(record_insert), *record_inserts)
record_inserts = []
if record_inserts:
conn.execute(sa.text(record_insert), *record_inserts)
print('inserted', (total + len(record_inserts)))
if __name__ == "__main__":
setupTables()
loadProjects()
loadConsumption()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment