Skip to content

Instantly share code, notes, and snippets.

@tallcoleman
Last active February 23, 2026 10:53
Show Gist options
  • Select an option

  • Save tallcoleman/6771559900a856ee0938940eca1bd1e9 to your computer and use it in GitHub Desktop.

Select an option

Save tallcoleman/6771559900a856ee0938940eca1bd1e9 to your computer and use it in GitHub Desktop.
Data formatting script for migrating from Umami cloud to self-hosted
# /// script
# requires-python = ">=3.14"
# dependencies = [
# "pandas>=3.0.0",
# ]
# ///
import uuid
from pathlib import Path
import pandas as pd
# HOW TO USE:
# - configure the three variables below
# - run `uv run umami_import.py`
# Enter the website ID from your new self-hosted instance
NEW_WEBSITE_ID = ""
# Folder where the export files from your cloud instance are saved
EXPORT_FOLDER = ""
# Folder where you want to save your generated import files
IMPORT_FOLDER = ""
# Based on scripts from https://github.com/RoversX/umami-csv-import-script with several improvements
def main(
website_id_new: str,
website_event_old_file: Path,
session_new_file: Path,
website_event_new_file: Path,
event_data_old_file: Path,
event_data_new_file: Path,
) -> None:
"""Runs all table generation functions."""
generate_website_event(
website_id_new=website_id_new,
website_event_old_file=website_event_old_file,
website_event_new_file=website_event_new_file,
)
generate_session(
website_id_new=website_id_new,
website_event_old_file=website_event_old_file,
session_new_file=session_new_file,
)
generate_event_data(
website_event_old_file=website_event_old_file,
event_data_old_file=event_data_old_file,
event_data_new_file=event_data_new_file,
website_id_new=website_id_new,
)
def generate_website_event(
website_id_new: str,
website_event_old_file: Path,
website_event_new_file: Path,
):
"""Generate a new website_event table using the website_event file from the Umami data export. Replaces the old website_id with website_id_new."""
# Load the original CSV file
website_event_old = pd.read_csv(website_event_old_file)
# Update the website_id column with the user-provided website ID
website_event_old["website_id"] = website_id_new
# Define the columns required for the website_event table
# Check in psql with `\d website_event`
# Must be in exactly the same order as the table schema
website_event_columns = [
"event_id",
"website_id",
"session_id",
"created_at",
"url_path",
"url_query",
"referrer_path",
"referrer_query",
"referrer_domain",
"page_title",
"event_type",
"event_name",
"visit_id",
"tag",
"fbclid",
"gclid",
"li_fat_id",
"msclkid",
"ttclid",
"twclid",
"utm_campaign",
"utm_content",
"utm_medium",
"utm_source",
"utm_term",
"hostname",
]
# Create a new DataFrame for the website_event data with the required columns
df_website_event = website_event_old[website_event_columns]
website_event_unique = df_website_event.groupby(
"event_id", as_index=False
).aggregate("first")
# Save the new website_event data to a CSV file
website_event_new_file.parent.mkdir(exist_ok=True, parents=True)
website_event_unique.to_csv(website_event_new_file, index=False)
print(f"Successfully generated {website_event_new_file}")
def generate_session(
website_id_new: str,
website_event_old_file: Path,
session_new_file: Path,
):
"""Generate a new session table using the website_event file from the Umami data export. Replaces the old website_id with website_id_new."""
# Load the original CSV file
website_event_old = pd.read_csv(website_event_old_file)
# Update the website_id column with the user-provided website ID
website_event_old["website_id"] = website_id_new
# Define the columns required for the session table
# Check in psql with `\d session`
# Must be in exactly the same order as the table schema
sessions_aggregations = {
# 'session_id' will be the key for the aggregation
"website_id": "first",
"browser": "first",
"os": "first",
"device": "first",
"screen": "first",
"language": "first",
"country": "first",
"region": "first",
"city": "first",
"created_at": "min", # pick first date-time
"distinct_id": "first",
}
unique_sessions = website_event_old.groupby("session_id", as_index=False).aggregate(
sessions_aggregations
)
# Save the new session data to a CSV file
session_new_file.parent.mkdir(exist_ok=True, parents=True)
unique_sessions.to_csv(session_new_file, index=False)
print(f"Successfully generated {session_new_file}")
def generate_event_data(
website_id_new: str,
website_event_old_file: Path,
event_data_old_file: Path,
event_data_new_file: Path,
):
"""
Generate a new event_data table using the website_event and event_data files from the Umami data export. Replaces the old website_id with website_id_new.
The event_id column from the event_data export file may not always match event_id entries from the website_event export file. If this is the case, the function will try to find a match based on unique timestamps if possible. If there are still event_data rows that are not successfully matched to website_event rows, the function will print a warning about it and output a file of the un-matched event_data rows.
"""
# skip if there is no event_data_old file
if not event_data_old_file.exists():
print(
f"No event_data export file found at {event_data_old_file}, skipping event_data import file generation."
)
return
event_data_old = pd.read_csv(event_data_old_file)
# skip if there is no event data
if len(event_data_old) == 0:
print(
f"Event data export file found at {event_data_old_file} is empty, skipping event event_data import file generation."
)
return
website_event_old = pd.read_csv(website_event_old_file)
# Check in psql with `\d event_data`
# Must be in exactly the same order as the table schema
event_data_columns = [
"event_data_id",
"website_id",
"website_event_id",
"data_key",
"string_value",
"number_value",
"date_value",
"data_type",
"created_at",
]
# website_event_id is further checked to make sure it matches website_event table, so column name is prefixed with '_' here
event_data_new = pd.DataFrame().assign(
event_data_id=[uuid.uuid4() for _ in range(len(event_data_old.index))],
website_id=website_id_new,
_website_event_id=event_data_old["event_id"],
data_key=event_data_old["data_key"],
string_value=event_data_old["string_value"],
number_value=event_data_old["number_value"],
date_value=event_data_old["date_value"],
data_type=event_data_old["data_type"],
created_at=event_data_old["created_at"],
)
# pull matches betweeen _website_event_id and website_event.event_id
# both direct matches and cases where there is a 1:1 match using created_at
event_data_new = event_data_new.merge(
pd.DataFrame().assign(
_website_event_id=website_event_old["event_id"],
_weid_match=website_event_old["event_id"],
),
how="left",
on="_website_event_id",
).merge(
pd.DataFrame()
.assign(
created_at=website_event_old["created_at"],
_weid_created_at_match=website_event_old["event_id"],
)
.drop_duplicates(subset=["created_at"], keep=False),
how="left",
on="created_at",
)
# assign website_event_id based on direct matches with a fallback to matches based on 1:1 created_at matches
event_data_new = event_data_new.assign(
website_event_id=event_data_new["_weid_match"].fillna(
event_data_new["_weid_created_at_match"]
)
)
# replace \N string for null with None
event_data_new = event_data_new.replace(r"\N", None)
# set column order to match db schema
event_data_new = event_data_new[event_data_columns]
# warn and save a list of unmatched events, if there are any
unmatched_event_data = event_data_new[event_data_new["website_event_id"].isna()]
if len(unmatched_event_data) > 0:
unmatched_path = event_data_new_file.with_name(
event_data_new_file.name + "_unmatched"
)
print(
f"{len(unmatched_event_data)} event_data rows were unable to be matched to a website event. Saving list of unmatched entries to {unmatched_path}"
)
unmatched_path.parent.mkdir(exist_ok=True, parents=True)
unmatched_event_data.to_csv(unmatched_path, index=False)
# save validated event data to csv
print(f"Successfully generated {event_data_new_file}")
matched_event_data = event_data_new[~event_data_new["website_event_id"].isna()]
event_data_new_file.parent.mkdir(exist_ok=True, parents=True)
matched_event_data.to_csv(event_data_new_file, index=False)
if __name__ == "__main__":
main(
website_id_new=NEW_WEBSITE_ID,
website_event_old_file=Path(EXPORT_FOLDER) / "website_event.csv",
session_new_file=Path(IMPORT_FOLDER) / "session_new.csv",
website_event_new_file=Path(IMPORT_FOLDER) / "website_event_new.csv",
event_data_old_file=Path(EXPORT_FOLDER) / "event_data.csv",
event_data_new_file=Path(IMPORT_FOLDER) / "event_data_new.csv",
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment