Last active
February 23, 2026 10:53
-
-
Save tallcoleman/6771559900a856ee0938940eca1bd1e9 to your computer and use it in GitHub Desktop.
Data formatting script for migrating from Umami cloud to self-hosted
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # requires-python = ">=3.14" | |
| # dependencies = [ | |
| # "pandas>=3.0.0", | |
| # ] | |
| # /// | |
| import uuid | |
| from pathlib import Path | |
| import pandas as pd | |
| # HOW TO USE: | |
| # - configure the three variables below | |
| # - run `uv run umami_import.py` | |
| # Enter the website ID from your new self-hosted instance | |
| NEW_WEBSITE_ID = "" | |
| # Folder where the export files from your cloud instance are saved | |
| EXPORT_FOLDER = "" | |
| # Folder where you want to save your generated import files | |
| IMPORT_FOLDER = "" | |
| # Based on scripts from https://github.com/RoversX/umami-csv-import-script with several improvements | |
| def main( | |
| website_id_new: str, | |
| website_event_old_file: Path, | |
| session_new_file: Path, | |
| website_event_new_file: Path, | |
| event_data_old_file: Path, | |
| event_data_new_file: Path, | |
| ) -> None: | |
| """Runs all table generation functions.""" | |
| generate_website_event( | |
| website_id_new=website_id_new, | |
| website_event_old_file=website_event_old_file, | |
| website_event_new_file=website_event_new_file, | |
| ) | |
| generate_session( | |
| website_id_new=website_id_new, | |
| website_event_old_file=website_event_old_file, | |
| session_new_file=session_new_file, | |
| ) | |
| generate_event_data( | |
| website_event_old_file=website_event_old_file, | |
| event_data_old_file=event_data_old_file, | |
| event_data_new_file=event_data_new_file, | |
| website_id_new=website_id_new, | |
| ) | |
| def generate_website_event( | |
| website_id_new: str, | |
| website_event_old_file: Path, | |
| website_event_new_file: Path, | |
| ): | |
| """Generate a new website_event table using the website_event file from the Umami data export. Replaces the old website_id with website_id_new.""" | |
| # Load the original CSV file | |
| website_event_old = pd.read_csv(website_event_old_file) | |
| # Update the website_id column with the user-provided website ID | |
| website_event_old["website_id"] = website_id_new | |
| # Define the columns required for the website_event table | |
| # Check in psql with `\d website_event` | |
| # Must be in exactly the same order as the table schema | |
| website_event_columns = [ | |
| "event_id", | |
| "website_id", | |
| "session_id", | |
| "created_at", | |
| "url_path", | |
| "url_query", | |
| "referrer_path", | |
| "referrer_query", | |
| "referrer_domain", | |
| "page_title", | |
| "event_type", | |
| "event_name", | |
| "visit_id", | |
| "tag", | |
| "fbclid", | |
| "gclid", | |
| "li_fat_id", | |
| "msclkid", | |
| "ttclid", | |
| "twclid", | |
| "utm_campaign", | |
| "utm_content", | |
| "utm_medium", | |
| "utm_source", | |
| "utm_term", | |
| "hostname", | |
| ] | |
| # Create a new DataFrame for the website_event data with the required columns | |
| df_website_event = website_event_old[website_event_columns] | |
| website_event_unique = df_website_event.groupby( | |
| "event_id", as_index=False | |
| ).aggregate("first") | |
| # Save the new website_event data to a CSV file | |
| website_event_new_file.parent.mkdir(exist_ok=True, parents=True) | |
| website_event_unique.to_csv(website_event_new_file, index=False) | |
| print(f"Successfully generated {website_event_new_file}") | |
| def generate_session( | |
| website_id_new: str, | |
| website_event_old_file: Path, | |
| session_new_file: Path, | |
| ): | |
| """Generate a new session table using the website_event file from the Umami data export. Replaces the old website_id with website_id_new.""" | |
| # Load the original CSV file | |
| website_event_old = pd.read_csv(website_event_old_file) | |
| # Update the website_id column with the user-provided website ID | |
| website_event_old["website_id"] = website_id_new | |
| # Define the columns required for the session table | |
| # Check in psql with `\d session` | |
| # Must be in exactly the same order as the table schema | |
| sessions_aggregations = { | |
| # 'session_id' will be the key for the aggregation | |
| "website_id": "first", | |
| "browser": "first", | |
| "os": "first", | |
| "device": "first", | |
| "screen": "first", | |
| "language": "first", | |
| "country": "first", | |
| "region": "first", | |
| "city": "first", | |
| "created_at": "min", # pick first date-time | |
| "distinct_id": "first", | |
| } | |
| unique_sessions = website_event_old.groupby("session_id", as_index=False).aggregate( | |
| sessions_aggregations | |
| ) | |
| # Save the new session data to a CSV file | |
| session_new_file.parent.mkdir(exist_ok=True, parents=True) | |
| unique_sessions.to_csv(session_new_file, index=False) | |
| print(f"Successfully generated {session_new_file}") | |
| def generate_event_data( | |
| website_id_new: str, | |
| website_event_old_file: Path, | |
| event_data_old_file: Path, | |
| event_data_new_file: Path, | |
| ): | |
| """ | |
| Generate a new event_data table using the website_event and event_data files from the Umami data export. Replaces the old website_id with website_id_new. | |
| The event_id column from the event_data export file may not always match event_id entries from the website_event export file. If this is the case, the function will try to find a match based on unique timestamps if possible. If there are still event_data rows that are not successfully matched to website_event rows, the function will print a warning about it and output a file of the un-matched event_data rows. | |
| """ | |
| # skip if there is no event_data_old file | |
| if not event_data_old_file.exists(): | |
| print( | |
| f"No event_data export file found at {event_data_old_file}, skipping event_data import file generation." | |
| ) | |
| return | |
| event_data_old = pd.read_csv(event_data_old_file) | |
| # skip if there is no event data | |
| if len(event_data_old) == 0: | |
| print( | |
| f"Event data export file found at {event_data_old_file} is empty, skipping event event_data import file generation." | |
| ) | |
| return | |
| website_event_old = pd.read_csv(website_event_old_file) | |
| # Check in psql with `\d event_data` | |
| # Must be in exactly the same order as the table schema | |
| event_data_columns = [ | |
| "event_data_id", | |
| "website_id", | |
| "website_event_id", | |
| "data_key", | |
| "string_value", | |
| "number_value", | |
| "date_value", | |
| "data_type", | |
| "created_at", | |
| ] | |
| # website_event_id is further checked to make sure it matches website_event table, so column name is prefixed with '_' here | |
| event_data_new = pd.DataFrame().assign( | |
| event_data_id=[uuid.uuid4() for _ in range(len(event_data_old.index))], | |
| website_id=website_id_new, | |
| _website_event_id=event_data_old["event_id"], | |
| data_key=event_data_old["data_key"], | |
| string_value=event_data_old["string_value"], | |
| number_value=event_data_old["number_value"], | |
| date_value=event_data_old["date_value"], | |
| data_type=event_data_old["data_type"], | |
| created_at=event_data_old["created_at"], | |
| ) | |
| # pull matches betweeen _website_event_id and website_event.event_id | |
| # both direct matches and cases where there is a 1:1 match using created_at | |
| event_data_new = event_data_new.merge( | |
| pd.DataFrame().assign( | |
| _website_event_id=website_event_old["event_id"], | |
| _weid_match=website_event_old["event_id"], | |
| ), | |
| how="left", | |
| on="_website_event_id", | |
| ).merge( | |
| pd.DataFrame() | |
| .assign( | |
| created_at=website_event_old["created_at"], | |
| _weid_created_at_match=website_event_old["event_id"], | |
| ) | |
| .drop_duplicates(subset=["created_at"], keep=False), | |
| how="left", | |
| on="created_at", | |
| ) | |
| # assign website_event_id based on direct matches with a fallback to matches based on 1:1 created_at matches | |
| event_data_new = event_data_new.assign( | |
| website_event_id=event_data_new["_weid_match"].fillna( | |
| event_data_new["_weid_created_at_match"] | |
| ) | |
| ) | |
| # replace \N string for null with None | |
| event_data_new = event_data_new.replace(r"\N", None) | |
| # set column order to match db schema | |
| event_data_new = event_data_new[event_data_columns] | |
| # warn and save a list of unmatched events, if there are any | |
| unmatched_event_data = event_data_new[event_data_new["website_event_id"].isna()] | |
| if len(unmatched_event_data) > 0: | |
| unmatched_path = event_data_new_file.with_name( | |
| event_data_new_file.name + "_unmatched" | |
| ) | |
| print( | |
| f"{len(unmatched_event_data)} event_data rows were unable to be matched to a website event. Saving list of unmatched entries to {unmatched_path}" | |
| ) | |
| unmatched_path.parent.mkdir(exist_ok=True, parents=True) | |
| unmatched_event_data.to_csv(unmatched_path, index=False) | |
| # save validated event data to csv | |
| print(f"Successfully generated {event_data_new_file}") | |
| matched_event_data = event_data_new[~event_data_new["website_event_id"].isna()] | |
| event_data_new_file.parent.mkdir(exist_ok=True, parents=True) | |
| matched_event_data.to_csv(event_data_new_file, index=False) | |
| if __name__ == "__main__": | |
| main( | |
| website_id_new=NEW_WEBSITE_ID, | |
| website_event_old_file=Path(EXPORT_FOLDER) / "website_event.csv", | |
| session_new_file=Path(IMPORT_FOLDER) / "session_new.csv", | |
| website_event_new_file=Path(IMPORT_FOLDER) / "website_event_new.csv", | |
| event_data_old_file=Path(EXPORT_FOLDER) / "event_data.csv", | |
| event_data_new_file=Path(IMPORT_FOLDER) / "event_data_new.csv", | |
| ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment