Skip to content

Instantly share code, notes, and snippets.

@rsignell
Created March 8, 2026 14:13
Show Gist options
  • Select an option

  • Save rsignell/e0b5c2fdf8cfff6a083266e7bf73ec85 to your computer and use it in GitHub Desktop.

Select an option

Save rsignell/e0b5c2fdf8cfff6a083266e7bf73ec85 to your computer and use it in GitHub Desktop.
taranto-icechunk-append.ipynb
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"id": "header-md",
"metadata": {},
"source": [
"# Create or append Virtual Icechunk Store from SHYFEM forecast NetCDF files\n",
"\n",
"* This notebook appends Taranto SHYFEM forecast data to Icechunk store using date-based set logic.\n",
"* If no repo exists, a new one will be created with references to all the existing NetCDF files. \n",
"\n",
"**Append Methodology:**\n",
"1. **`set_repo`**: extract all dates currently present in the Icechunk store's `time` coordinate\n",
"2. **`set_cloud`**: Scan the S3 bucket for all available NOS files and extract their dates.\n",
"3. **`new_dates`**: Calculate the difference (`set_cloud - set_repo`) to determine exactly which days need to be written for creation or appended.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "imports",
"metadata": {},
"outputs": [],
"source": [
"import warnings\n",
"import os\n",
"import pandas as pd\n",
"import fsspec\n",
"import icechunk\n",
"import xarray as xr\n",
"from obstore.store import from_url\n",
"from virtualizarr import open_virtual_dataset\n",
"from virtualizarr.parsers import HDFParser\n",
"from obspec_utils.registry import ObjectStoreRegistry \n",
"from dotenv import load_dotenv\n",
"\n",
"warnings.filterwarnings(\"ignore\", category=UserWarning)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "config-env",
"metadata": {},
"outputs": [],
"source": [
"# Load credentials\n",
"_ = load_dotenv(f'{os.environ[\"HOME\"]}/dotenv/rsignell4.env')\n",
"\n",
"# Configuration\n",
"storage_endpoint = 'https://pangeo-eosc-minioapi.vm.fedcloud.eu'\n",
"storage_bucket = 'rsignell4-protocoast'\n",
"storage_name = 'taranto-icechunk3'\n",
"bucket_url = f\"s3://{storage_bucket}\"\n",
"\n",
"# Setup Filesystem\n",
"fs = fsspec.filesystem('s3', anon=False, endpoint_url=storage_endpoint, \n",
" skip_instance_cache=True, use_listings_cache=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6fccf159-656f-4515-a582-adc8fe296937",
"metadata": {},
"outputs": [],
"source": [
"fs.ls(f'{storage_bucket}/icechunk/')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8608b3c7-bb40-4905-9339-7cfd7aa1c8f8",
"metadata": {},
"outputs": [],
"source": [
"fs.ls('rsignell4-protocoast/full_dataset/shyfem/taranto/forecast/20260129/taranto_nos_20260129_nc4.nc')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "setup-icechunk",
"metadata": {},
"outputs": [],
"source": [
"# Define Icechunk Storage & Config\n",
"storage = icechunk.s3_storage(\n",
" bucket=storage_bucket,\n",
" prefix=f\"icechunk/{storage_name}\",\n",
" from_env=True,\n",
" endpoint_url=storage_endpoint,\n",
" region='not-used',\n",
" force_path_style=True\n",
")\n",
"\n",
"config = icechunk.RepositoryConfig.default()\n",
"config.set_virtual_chunk_container(\n",
" icechunk.VirtualChunkContainer(\n",
" url_prefix=f\"{bucket_url}/\",\n",
" store=icechunk.s3_store(region=\"not-used\", anonymous=False, s3_compatible=True, \n",
" force_path_style=True, endpoint_url=storage_endpoint),\n",
" ),\n",
")\n",
"\n",
"credentials = icechunk.containers_credentials({f\"{bucket_url}/\": icechunk.s3_credentials(anonymous=False)})\n",
"\n",
"# Setup VirtualiZarr Registry\n",
"store_obj = from_url(bucket_url, region=\"not-used\", endpoint=storage_endpoint)\n",
"registry = ObjectStoreRegistry({bucket_url: store_obj})\n",
"parser = HDFParser()"
]
},
{
"cell_type": "markdown",
"id": "md-sets",
"metadata": {},
"source": [
"### Step 1: Create Date Sets (`set_repo` vs `set_cloud`)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "define-sets",
"metadata": {},
"outputs": [],
"source": [
"# --- 1. Get Dates from Icechunk Repo (set_repo) ---\n",
"try:\n",
" repo = icechunk.Repository.open(storage, config, authorize_virtual_chunk_access=credentials)\n",
" session = repo.readonly_session(\"main\")\n",
" ds = xr.open_zarr(session.store, consolidated=False, chunks={})\n",
" \n",
" if 'time' in ds.coords:\n",
" # Extract dates as YYYYMMDD strings\n",
" dates = pd.to_datetime(ds.time.values) + pd.Timedelta(days=1)\n",
" set_repo = set(dates.strftime('%Y%m%d'))\n",
" else:\n",
" set_repo = set()\n",
" \n",
"except Exception as e:\n",
" print(f\"Repo access failed or empty ({e}). Assuming set_repo is empty.\")\n",
" repo = None\n",
" set_repo = set()\n",
"\n",
"print(f\"set_repo: {len(set_repo)} dates found.\")\n",
"\n",
"# --- 2. Get Dates from Cloud Bucket (set_cloud) ---\n",
"print(\"Scanning S3 for NOS files...\")\n",
"nos_files = fs.glob(f'{bucket_url}/full_dataset/shyfem/taranto/forecast/*/*nos*.nc')\n",
"\n",
"set_cloud = set()\n",
"date_to_files_map = {} # Helper to quickly find files for a date later\n",
"\n",
"for f in nos_files:\n",
" # Path structure: .../forecast/YYYYMMDD/taranto_nos_YYYYMMDD_nc4.nc\n",
" try:\n",
" date_str = f.split('/')[-2] # Parent folder is date\n",
" set_cloud.add(date_str)\n",
" \n",
" # Store both NOS and assume OUS follows same pattern\n",
" base_dir = os.path.dirname(f)\n",
" nos_path = f's3://{f}'\n",
" ous_path = f's3://{base_dir}/taranto_ous_{date_str}_nc4.nc'\n",
" \n",
" date_to_files_map[date_str] = {'nos': nos_path, 'ous': ous_path}\n",
" \n",
" except IndexError:\n",
" pass\n",
"\n",
"print(f\"set_cloud: {len(set_cloud)} dates found.\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "calc-diff",
"metadata": {},
"outputs": [],
"source": [
"# --- 3. Determine New Dates ---\n",
"new_dates = sorted(list(set_cloud - set_repo))\n",
"\n",
"print(f\"Dates to process: {len(new_dates)}\")\n",
"if new_dates:\n",
" print(f\"Range: {new_dates[0]} to {new_dates[-1]}\")"
]
},
{
"cell_type": "markdown",
"id": "md-virtualize",
"metadata": {},
"source": [
"### Step 2: Virtualize and Merge New Data"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "helper-fix",
"metadata": {},
"outputs": [],
"source": [
"def fix_ds(ds):\n",
" \"\"\"Standardizes dimensions and coordinates for Taranto dataset.\"\"\"\n",
" ds = ds.rename_vars(time='valid_time')\n",
" ds = ds.rename_dims(time='step')\n",
" step = (ds.valid_time - ds.valid_time[0]).assign_attrs({\"standard_name\": \"forecast_period\"})\n",
" time = ds.valid_time[0].assign_attrs({\"standard_name\": \"forecast_reference_time\"})\n",
" ds = ds.assign_coords(step=step, time=time)\n",
" ds = ds.drop_indexes(\"valid_time\")\n",
" ds = ds.drop_vars('valid_time')\n",
" ds = ds.set_coords(['latitude', 'longitude', 'element_index', 'topology', 'total_depth'])\n",
" return ds"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "virtualize-logic",
"metadata": {},
"outputs": [],
"source": [
"ds_final = None\n",
"\n",
"# new_dates = new_dates[:3] # only if testing\n",
"\n",
"if new_dates:\n",
" # Reconstruct file lists based on the identified dates\n",
" nos_urls = [date_to_files_map[d]['nos'] for d in new_dates]\n",
" ous_urls = [date_to_files_map[d]['ous'] for d in new_dates]\n",
"\n",
" # --- Process NOS ---\n",
" print(f\"Virtualizing {len(nos_urls)} NOS files...\")\n",
" nos_list = [\n",
" open_virtual_dataset(url, parser=parser, registry=registry, loadable_variables=[\"time\"])\n",
" for url in nos_urls\n",
" ]\n",
" nos_list = [fix_ds(ds) for ds in nos_list]\n",
" combined_nos = xr.concat(\n",
" nos_list, dim=\"time\", coords=\"minimal\", compat=\"override\", combine_attrs=\"override\"\n",
" )\n",
"\n",
" # --- Process OUS ---\n",
" print(f\"Virtualizing {len(ous_urls)} OUS files...\")\n",
" ous_list = [\n",
" open_virtual_dataset(url, parser=parser, registry=registry, loadable_variables=[\"time\"])\n",
" for url in ous_urls\n",
" ]\n",
" ous_list = [fix_ds(ds) for ds in ous_list]\n",
" combined_ous = xr.concat(\n",
" ous_list, dim=\"time\", coords=\"minimal\", compat=\"override\", combine_attrs=\"override\"\n",
" )\n",
"\n",
" # --- Merge ---\n",
" ds_final = xr.merge([combined_nos, combined_ous], compat='override')\n",
" print(\"Datasets merged and ready for writing to Icechunk.\")\n",
" \n",
"else:\n",
" print(\"No new dates found. Skipping processing.\")"
]
},
{
"cell_type": "markdown",
"id": "md-append",
"metadata": {},
"source": [
"### Step 3: Append to Icechunk"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "append-logic",
"metadata": {},
"outputs": [],
"source": [
"if ds_final is not None:\n",
" # Ensure we have a valid repo object\n",
" if repo is None:\n",
" repo = icechunk.Repository.create(storage, config, authorize_virtual_chunk_access=credentials)\n",
" initial_session = repo.writable_session(\"main\")\n",
"\n",
" # Append\n",
" print(f\"Writing {len(ds_final.time)} time steps to Icechunk...\")\n",
" ds_final.virtualize.to_icechunk(initial_session.store)\n",
" \n",
" # Commit\n",
" msg = f\"Initialized with forecast data: {new_dates[0]} to {new_dates[-1]}\"\n",
" initial_session.commit(msg)\n",
" print(f\"Commit successful: '{msg}'\")\n",
" # Create Writable Session\n",
" else:\n",
" append_session = repo.writable_session(\"main\")\n",
"\n",
" # Append\n",
" print(f\"Appending {len(ds_final.time)} time steps to Icechunk...\")\n",
" ds_final.virtualize.to_icechunk(append_session.store, append_dim=\"time\")\n",
" \n",
" # Commit\n",
" msg = f\"Appended forecast data: {new_dates[0]} to {new_dates[-1]}\"\n",
" append_session.commit(msg)\n",
" print(f\"Commit successful: '{msg}'\")\n",
"\n",
" # Verify History\n",
" history = repo.ancestry(branch=\"main\")\n",
" latest = next(history)\n",
" print(f\"Latest Commit [{latest.written_at}]: {latest.message}\")\n",
" \n",
"else:\n",
" print(\"Nothing to append.\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2f626059-66a0-42f9-949c-d6961e6d5e2d",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "915f9b5b-da89-4f3f-b3f3-e045eebdaa14",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.11"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment