Created
March 8, 2026 14:13
-
-
Save rsignell/e0b5c2fdf8cfff6a083266e7bf73ec85 to your computer and use it in GitHub Desktop.
taranto-icechunk-append.ipynb
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "cells": [ | |
| { | |
| "cell_type": "markdown", | |
| "id": "header-md", | |
| "metadata": {}, | |
| "source": [ | |
| "# Create or append Virtual Icechunk Store from SHYFEM forecast NetCDF files\n", | |
| "\n", | |
| "* This notebook appends Taranto SHYFEM forecast data to Icechunk store using date-based set logic.\n", | |
| "* If no repo exists, a new one will be created with references to all the existing NetCDF files. \n", | |
| "\n", | |
| "**Append Methodology:**\n", | |
| "1. **`set_repo`**: extract all dates currently present in the Icechunk store's `time` coordinate\n", | |
| "2. **`set_cloud`**: Scan the S3 bucket for all available NOS files and extract their dates.\n", | |
| "3. **`new_dates`**: Calculate the difference (`set_cloud - set_repo`) to determine exactly which days need to be written for creation or appended.\n" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "id": "imports", | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "import warnings\n", | |
| "import os\n", | |
| "import pandas as pd\n", | |
| "import fsspec\n", | |
| "import icechunk\n", | |
| "import xarray as xr\n", | |
| "from obstore.store import from_url\n", | |
| "from virtualizarr import open_virtual_dataset\n", | |
| "from virtualizarr.parsers import HDFParser\n", | |
| "from obspec_utils.registry import ObjectStoreRegistry \n", | |
| "from dotenv import load_dotenv\n", | |
| "\n", | |
| "warnings.filterwarnings(\"ignore\", category=UserWarning)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "id": "config-env", | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "# Load credentials\n", | |
| "_ = load_dotenv(f'{os.environ[\"HOME\"]}/dotenv/rsignell4.env')\n", | |
| "\n", | |
| "# Configuration\n", | |
| "storage_endpoint = 'https://pangeo-eosc-minioapi.vm.fedcloud.eu'\n", | |
| "storage_bucket = 'rsignell4-protocoast'\n", | |
| "storage_name = 'taranto-icechunk3'\n", | |
| "bucket_url = f\"s3://{storage_bucket}\"\n", | |
| "\n", | |
| "# Setup Filesystem\n", | |
| "fs = fsspec.filesystem('s3', anon=False, endpoint_url=storage_endpoint, \n", | |
| " skip_instance_cache=True, use_listings_cache=False)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "id": "6fccf159-656f-4515-a582-adc8fe296937", | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "fs.ls(f'{storage_bucket}/icechunk/')" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "id": "8608b3c7-bb40-4905-9339-7cfd7aa1c8f8", | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "fs.ls('rsignell4-protocoast/full_dataset/shyfem/taranto/forecast/20260129/taranto_nos_20260129_nc4.nc')" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "id": "setup-icechunk", | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "# Define Icechunk Storage & Config\n", | |
| "storage = icechunk.s3_storage(\n", | |
| " bucket=storage_bucket,\n", | |
| " prefix=f\"icechunk/{storage_name}\",\n", | |
| " from_env=True,\n", | |
| " endpoint_url=storage_endpoint,\n", | |
| " region='not-used',\n", | |
| " force_path_style=True\n", | |
| ")\n", | |
| "\n", | |
| "config = icechunk.RepositoryConfig.default()\n", | |
| "config.set_virtual_chunk_container(\n", | |
| " icechunk.VirtualChunkContainer(\n", | |
| " url_prefix=f\"{bucket_url}/\",\n", | |
| " store=icechunk.s3_store(region=\"not-used\", anonymous=False, s3_compatible=True, \n", | |
| " force_path_style=True, endpoint_url=storage_endpoint),\n", | |
| " ),\n", | |
| ")\n", | |
| "\n", | |
| "credentials = icechunk.containers_credentials({f\"{bucket_url}/\": icechunk.s3_credentials(anonymous=False)})\n", | |
| "\n", | |
| "# Setup VirtualiZarr Registry\n", | |
| "store_obj = from_url(bucket_url, region=\"not-used\", endpoint=storage_endpoint)\n", | |
| "registry = ObjectStoreRegistry({bucket_url: store_obj})\n", | |
| "parser = HDFParser()" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "id": "md-sets", | |
| "metadata": {}, | |
| "source": [ | |
| "### Step 1: Create Date Sets (`set_repo` vs `set_cloud`)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "id": "define-sets", | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "# --- 1. Get Dates from Icechunk Repo (set_repo) ---\n", | |
| "try:\n", | |
| " repo = icechunk.Repository.open(storage, config, authorize_virtual_chunk_access=credentials)\n", | |
| " session = repo.readonly_session(\"main\")\n", | |
| " ds = xr.open_zarr(session.store, consolidated=False, chunks={})\n", | |
| " \n", | |
| " if 'time' in ds.coords:\n", | |
| " # Extract dates as YYYYMMDD strings\n", | |
| " dates = pd.to_datetime(ds.time.values) + pd.Timedelta(days=1)\n", | |
| " set_repo = set(dates.strftime('%Y%m%d'))\n", | |
| " else:\n", | |
| " set_repo = set()\n", | |
| " \n", | |
| "except Exception as e:\n", | |
| " print(f\"Repo access failed or empty ({e}). Assuming set_repo is empty.\")\n", | |
| " repo = None\n", | |
| " set_repo = set()\n", | |
| "\n", | |
| "print(f\"set_repo: {len(set_repo)} dates found.\")\n", | |
| "\n", | |
| "# --- 2. Get Dates from Cloud Bucket (set_cloud) ---\n", | |
| "print(\"Scanning S3 for NOS files...\")\n", | |
| "nos_files = fs.glob(f'{bucket_url}/full_dataset/shyfem/taranto/forecast/*/*nos*.nc')\n", | |
| "\n", | |
| "set_cloud = set()\n", | |
| "date_to_files_map = {} # Helper to quickly find files for a date later\n", | |
| "\n", | |
| "for f in nos_files:\n", | |
| " # Path structure: .../forecast/YYYYMMDD/taranto_nos_YYYYMMDD_nc4.nc\n", | |
| " try:\n", | |
| " date_str = f.split('/')[-2] # Parent folder is date\n", | |
| " set_cloud.add(date_str)\n", | |
| " \n", | |
| " # Store both NOS and assume OUS follows same pattern\n", | |
| " base_dir = os.path.dirname(f)\n", | |
| " nos_path = f's3://{f}'\n", | |
| " ous_path = f's3://{base_dir}/taranto_ous_{date_str}_nc4.nc'\n", | |
| " \n", | |
| " date_to_files_map[date_str] = {'nos': nos_path, 'ous': ous_path}\n", | |
| " \n", | |
| " except IndexError:\n", | |
| " pass\n", | |
| "\n", | |
| "print(f\"set_cloud: {len(set_cloud)} dates found.\")" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "id": "calc-diff", | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "# --- 3. Determine New Dates ---\n", | |
| "new_dates = sorted(list(set_cloud - set_repo))\n", | |
| "\n", | |
| "print(f\"Dates to process: {len(new_dates)}\")\n", | |
| "if new_dates:\n", | |
| " print(f\"Range: {new_dates[0]} to {new_dates[-1]}\")" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "id": "md-virtualize", | |
| "metadata": {}, | |
| "source": [ | |
| "### Step 2: Virtualize and Merge New Data" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "id": "helper-fix", | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "def fix_ds(ds):\n", | |
| " \"\"\"Standardizes dimensions and coordinates for Taranto dataset.\"\"\"\n", | |
| " ds = ds.rename_vars(time='valid_time')\n", | |
| " ds = ds.rename_dims(time='step')\n", | |
| " step = (ds.valid_time - ds.valid_time[0]).assign_attrs({\"standard_name\": \"forecast_period\"})\n", | |
| " time = ds.valid_time[0].assign_attrs({\"standard_name\": \"forecast_reference_time\"})\n", | |
| " ds = ds.assign_coords(step=step, time=time)\n", | |
| " ds = ds.drop_indexes(\"valid_time\")\n", | |
| " ds = ds.drop_vars('valid_time')\n", | |
| " ds = ds.set_coords(['latitude', 'longitude', 'element_index', 'topology', 'total_depth'])\n", | |
| " return ds" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "id": "virtualize-logic", | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "ds_final = None\n", | |
| "\n", | |
| "# new_dates = new_dates[:3] # only if testing\n", | |
| "\n", | |
| "if new_dates:\n", | |
| " # Reconstruct file lists based on the identified dates\n", | |
| " nos_urls = [date_to_files_map[d]['nos'] for d in new_dates]\n", | |
| " ous_urls = [date_to_files_map[d]['ous'] for d in new_dates]\n", | |
| "\n", | |
| " # --- Process NOS ---\n", | |
| " print(f\"Virtualizing {len(nos_urls)} NOS files...\")\n", | |
| " nos_list = [\n", | |
| " open_virtual_dataset(url, parser=parser, registry=registry, loadable_variables=[\"time\"])\n", | |
| " for url in nos_urls\n", | |
| " ]\n", | |
| " nos_list = [fix_ds(ds) for ds in nos_list]\n", | |
| " combined_nos = xr.concat(\n", | |
| " nos_list, dim=\"time\", coords=\"minimal\", compat=\"override\", combine_attrs=\"override\"\n", | |
| " )\n", | |
| "\n", | |
| " # --- Process OUS ---\n", | |
| " print(f\"Virtualizing {len(ous_urls)} OUS files...\")\n", | |
| " ous_list = [\n", | |
| " open_virtual_dataset(url, parser=parser, registry=registry, loadable_variables=[\"time\"])\n", | |
| " for url in ous_urls\n", | |
| " ]\n", | |
| " ous_list = [fix_ds(ds) for ds in ous_list]\n", | |
| " combined_ous = xr.concat(\n", | |
| " ous_list, dim=\"time\", coords=\"minimal\", compat=\"override\", combine_attrs=\"override\"\n", | |
| " )\n", | |
| "\n", | |
| " # --- Merge ---\n", | |
| " ds_final = xr.merge([combined_nos, combined_ous], compat='override')\n", | |
| " print(\"Datasets merged and ready for writing to Icechunk.\")\n", | |
| " \n", | |
| "else:\n", | |
| " print(\"No new dates found. Skipping processing.\")" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "id": "md-append", | |
| "metadata": {}, | |
| "source": [ | |
| "### Step 3: Append to Icechunk" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "id": "append-logic", | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "if ds_final is not None:\n", | |
| " # Ensure we have a valid repo object\n", | |
| " if repo is None:\n", | |
| " repo = icechunk.Repository.create(storage, config, authorize_virtual_chunk_access=credentials)\n", | |
| " initial_session = repo.writable_session(\"main\")\n", | |
| "\n", | |
| " # Append\n", | |
| " print(f\"Writing {len(ds_final.time)} time steps to Icechunk...\")\n", | |
| " ds_final.virtualize.to_icechunk(initial_session.store)\n", | |
| " \n", | |
| " # Commit\n", | |
| " msg = f\"Initialized with forecast data: {new_dates[0]} to {new_dates[-1]}\"\n", | |
| " initial_session.commit(msg)\n", | |
| " print(f\"Commit successful: '{msg}'\")\n", | |
| " # Create Writable Session\n", | |
| " else:\n", | |
| " append_session = repo.writable_session(\"main\")\n", | |
| "\n", | |
| " # Append\n", | |
| " print(f\"Appending {len(ds_final.time)} time steps to Icechunk...\")\n", | |
| " ds_final.virtualize.to_icechunk(append_session.store, append_dim=\"time\")\n", | |
| " \n", | |
| " # Commit\n", | |
| " msg = f\"Appended forecast data: {new_dates[0]} to {new_dates[-1]}\"\n", | |
| " append_session.commit(msg)\n", | |
| " print(f\"Commit successful: '{msg}'\")\n", | |
| "\n", | |
| " # Verify History\n", | |
| " history = repo.ancestry(branch=\"main\")\n", | |
| " latest = next(history)\n", | |
| " print(f\"Latest Commit [{latest.written_at}]: {latest.message}\")\n", | |
| " \n", | |
| "else:\n", | |
| " print(\"Nothing to append.\")" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "id": "2f626059-66a0-42f9-949c-d6961e6d5e2d", | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "id": "915f9b5b-da89-4f3f-b3f3-e045eebdaa14", | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [] | |
| } | |
| ], | |
| "metadata": { | |
| "kernelspec": { | |
| "display_name": "Python 3 (ipykernel)", | |
| "language": "python", | |
| "name": "python3" | |
| }, | |
| "language_info": { | |
| "codemirror_mode": { | |
| "name": "ipython", | |
| "version": 3 | |
| }, | |
| "file_extension": ".py", | |
| "mimetype": "text/x-python", | |
| "name": "python", | |
| "nbconvert_exporter": "python", | |
| "pygments_lexer": "ipython3", | |
| "version": "3.13.11" | |
| } | |
| }, | |
| "nbformat": 4, | |
| "nbformat_minor": 5 | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment