Created
January 22, 2026 11:44
-
-
Save d-v-b/2ff0c7850c1a106350f012eba958dac1 to your computer and use it in GitHub Desktop.
Generate OME-Zarr data with (currently) invalid array dimensionality and axis type / order
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # requires-python = ">=3.11" | |
| # dependencies = [ | |
| # "numpy", | |
| # "zarr>=3", | |
| # "ome-zarr-models", | |
| # "pydantic", | |
| # "pydantic-zarr" | |
| # ] | |
| # /// | |
| """ | |
| Script to generate OME-Zarr v0.5 hierarchies that violate spec requirements. | |
| This script creates Zarr hierarchies with invalid metadata that bypass the | |
| pydantic validation in ome-zarr-models, allowing us to test against non-compliant | |
| data. | |
| Violations generated: | |
| 1. Dimensionality violations: Arrays with 1D, 6D, 7D axes (valid is 2-5D) | |
| 2. Axis ordering violations: | |
| - Time axis not first | |
| - Space axes not last | |
| - Channel before time | |
| Usage: | |
| uv run generate_invalid_omezarr.py --output ./test_data --verify | |
| """ | |
| from __future__ import annotations | |
| from pathlib import Path | |
| import numpy as np | |
| import zarr | |
| from pydantic_zarr.v3 import ArraySpec | |
| from ome_zarr_models.v05.axes import Axis | |
| from ome_zarr_models.v05.base import BaseZarrAttrs | |
| from ome_zarr_models.v05.image import Image, ImageAttrs | |
| from ome_zarr_models.v05.multiscales import Dataset, Multiscale | |
| def create_invalid_image( | |
| axes: tuple[Axis, ...], | |
| array_shape: tuple[int, ...], | |
| name: str | None = None, | |
| ) -> Image: | |
| """ | |
| Create an Image model bypassing validation using model_construct. | |
| This allows creating invalid axis configurations for testing. | |
| """ | |
| ndim = len(axes) | |
| dimension_names = tuple(ax.name for ax in axes) | |
| # Create the array spec from a numpy array | |
| array_spec = ArraySpec.from_array( | |
| np.zeros(array_shape, dtype="uint8"), | |
| dimension_names=dimension_names, | |
| ) | |
| # Create dataset with valid transforms (Dataset.build has its own validation) | |
| dataset = Dataset.build( | |
| path="s0", | |
| scale=(1.0,) * ndim, | |
| translation=(0.0,) * ndim, | |
| ) | |
| # Use model_construct to bypass Multiscale validators | |
| multiscale = Multiscale.model_construct( | |
| axes=axes, | |
| datasets=(dataset,), | |
| coordinateTransformations=None, | |
| metadata=None, | |
| name=name, | |
| type=None, | |
| ) | |
| # Use model_construct to bypass ImageAttrs validators | |
| image_attrs = ImageAttrs.model_construct( | |
| multiscales=[multiscale], | |
| version="0.5", | |
| ) | |
| # Use model_construct to bypass Image validators | |
| return Image.model_construct( | |
| attributes=BaseZarrAttrs(ome=image_attrs), | |
| members={"s0": array_spec}, | |
| ) | |
| def create_invalid_zarr_hierarchy( | |
| output_path: Path, | |
| axes: tuple[Axis, ...], | |
| array_shape: tuple[int, ...], | |
| name: str, | |
| ) -> None: | |
| """ | |
| Create an invalid OME-Zarr hierarchy using model_construct and to_zarr. | |
| Uses model_construct to bypass pydantic validation, then to_zarr | |
| to serialize the entire model to storage. | |
| """ | |
| # Build the complete Image model bypassing validation | |
| image = create_invalid_image(axes, array_shape, name=name) | |
| # Use to_zarr to write the entire hierarchy | |
| store = zarr.storage.LocalStore(output_path) | |
| group = image.to_zarr(store, path="/") | |
| # Write some data to the array | |
| group["s0"][...] = np.zeros(array_shape, dtype="uint8") | |
| print(f"Created invalid hierarchy at: {output_path}") | |
| print(f" Violation: {name}") | |
| print(f" Axes: {[ax.name for ax in axes]}") | |
| print(f" Types: {[ax.type for ax in axes]}") | |
| print(f" Shape: {array_shape}") | |
| print() | |
| def generate_dimensionality_violations( | |
| base_path: Path, suffix: str = ".zarr" | |
| ) -> list[Path]: | |
| """Generate hierarchies that violate the 2-5D dimensionality requirement.""" | |
| created_paths = [] | |
| # 1D array (invalid - minimum is 2D) | |
| axes_1d = (Axis(name="x", type="space", unit="micrometer"),) | |
| path_1d = base_path / f"invalid_1d{suffix}" | |
| create_invalid_zarr_hierarchy( | |
| path_1d, | |
| axes_1d, | |
| array_shape=(100,), | |
| name="1D array (invalid: minimum is 2D)", | |
| ) | |
| created_paths.append(path_1d) | |
| # 6D array (invalid - maximum is 5D) | |
| axes_6d = ( | |
| Axis(name="t", type="time", unit="second"), | |
| Axis(name="c", type="channel"), | |
| Axis(name="extra", type="custom"), | |
| Axis(name="z", type="space", unit="micrometer"), | |
| Axis(name="y", type="space", unit="micrometer"), | |
| Axis(name="x", type="space", unit="micrometer"), | |
| ) | |
| path_6d = base_path / f"invalid_6d{suffix}" | |
| create_invalid_zarr_hierarchy( | |
| path_6d, | |
| axes_6d, | |
| array_shape=(2, 3, 4, 10, 10, 10), | |
| name="6D array (invalid: maximum is 5D)", | |
| ) | |
| created_paths.append(path_6d) | |
| # 7D array (invalid - maximum is 5D) | |
| axes_7d = ( | |
| Axis(name="t", type="time", unit="second"), | |
| Axis(name="c", type="channel"), | |
| Axis(name="extra1", type="custom"), | |
| Axis(name="extra2", type="custom"), | |
| Axis(name="z", type="space", unit="micrometer"), | |
| Axis(name="y", type="space", unit="micrometer"), | |
| Axis(name="x", type="space", unit="micrometer"), | |
| ) | |
| path_7d = base_path / f"invalid_7d{suffix}" | |
| create_invalid_zarr_hierarchy( | |
| path_7d, | |
| axes_7d, | |
| array_shape=(2, 3, 2, 2, 10, 10, 10), | |
| name="7D array (invalid: maximum is 5D)", | |
| ) | |
| created_paths.append(path_7d) | |
| return created_paths | |
| def generate_axis_ordering_violations( | |
| base_path: Path, suffix: str = ".zarr" | |
| ) -> list[Path]: | |
| """Generate hierarchies that violate axis ordering requirements.""" | |
| created_paths = [] | |
| # Time not first (channel before time) | |
| axes_channel_before_time = ( | |
| Axis(name="c", type="channel"), | |
| Axis(name="t", type="time", unit="second"), | |
| Axis(name="y", type="space", unit="micrometer"), | |
| Axis(name="x", type="space", unit="micrometer"), | |
| ) | |
| path_channel_time = base_path / f"invalid_channel_before_time{suffix}" | |
| create_invalid_zarr_hierarchy( | |
| path_channel_time, | |
| axes_channel_before_time, | |
| array_shape=(3, 10, 100, 100), | |
| name="Channel before time (invalid: time must be first)", | |
| ) | |
| created_paths.append(path_channel_time) | |
| # Space not last (space, space, channel) | |
| axes_space_not_last = ( | |
| Axis(name="y", type="space", unit="micrometer"), | |
| Axis(name="x", type="space", unit="micrometer"), | |
| Axis(name="c", type="channel"), | |
| ) | |
| path_space_not_last = base_path / f"invalid_space_not_last{suffix}" | |
| create_invalid_zarr_hierarchy( | |
| path_space_not_last, | |
| axes_space_not_last, | |
| array_shape=(100, 100, 3), | |
| name="Space axes not last (invalid: space must be at end)", | |
| ) | |
| created_paths.append(path_space_not_last) | |
| # Space axes interleaved with non-space | |
| axes_interleaved = ( | |
| Axis(name="t", type="time", unit="second"), | |
| Axis(name="z", type="space", unit="micrometer"), | |
| Axis(name="c", type="channel"), | |
| Axis(name="y", type="space", unit="micrometer"), | |
| Axis(name="x", type="space", unit="micrometer"), | |
| ) | |
| path_interleaved = base_path / f"invalid_interleaved_space{suffix}" | |
| create_invalid_zarr_hierarchy( | |
| path_interleaved, | |
| axes_interleaved, | |
| array_shape=(5, 20, 3, 100, 100), | |
| name="Space axes interleaved (invalid: space must be contiguous at end)", | |
| ) | |
| created_paths.append(path_interleaved) | |
| # Time at end instead of beginning | |
| axes_time_at_end = ( | |
| Axis(name="c", type="channel"), | |
| Axis(name="z", type="space", unit="micrometer"), | |
| Axis(name="y", type="space", unit="micrometer"), | |
| Axis(name="x", type="space", unit="micrometer"), | |
| Axis(name="t", type="time", unit="second"), | |
| ) | |
| path_time_end = base_path / f"invalid_time_at_end{suffix}" | |
| create_invalid_zarr_hierarchy( | |
| path_time_end, | |
| axes_time_at_end, | |
| array_shape=(3, 20, 100, 100, 10), | |
| name="Time at end (invalid: time must be first if present)", | |
| ) | |
| created_paths.append(path_time_end) | |
| # Time in the middle | |
| axes_time_middle = ( | |
| Axis(name="c", type="channel"), | |
| Axis(name="t", type="time", unit="second"), | |
| Axis(name="z", type="space", unit="micrometer"), | |
| Axis(name="y", type="space", unit="micrometer"), | |
| Axis(name="x", type="space", unit="micrometer"), | |
| ) | |
| path_time_middle = base_path / f"invalid_time_in_middle{suffix}" | |
| create_invalid_zarr_hierarchy( | |
| path_time_middle, | |
| axes_time_middle, | |
| array_shape=(3, 10, 20, 100, 100), | |
| name="Time in middle (invalid: time must be first if present)", | |
| ) | |
| created_paths.append(path_time_middle) | |
| return created_paths | |
| def generate_all_violations( | |
| base_path: Path, suffix: str = ".zarr" | |
| ) -> dict[str, list[Path]]: | |
| """Generate all types of invalid OME-Zarr hierarchies.""" | |
| base_path = Path(base_path) | |
| print("=" * 60) | |
| print("Generating invalid OME-Zarr v0.5 hierarchies") | |
| print("=" * 60) | |
| print() | |
| results: dict[str, list[Path]] = {} | |
| print("-" * 60) | |
| print("DIMENSIONALITY VIOLATIONS (valid range is 2-5D)") | |
| print("-" * 60) | |
| results["dimensionality"] = generate_dimensionality_violations( | |
| base_path / "dimensionality_violations", suffix=suffix | |
| ) | |
| print("-" * 60) | |
| print("AXIS ORDERING VIOLATIONS") | |
| print("-" * 60) | |
| results["axis_ordering"] = generate_axis_ordering_violations( | |
| base_path / "axis_ordering_violations", suffix=suffix | |
| ) | |
| print("=" * 60) | |
| print("Summary") | |
| print("=" * 60) | |
| total = sum(len(paths) for paths in results.values()) | |
| print(f"Total invalid hierarchies created: {total}") | |
| for category, paths in results.items(): | |
| print(f" {category}: {len(paths)}") | |
| print(f"\nAll outputs in: {base_path}") | |
| return results | |
| def verify_violations(base_path: Path) -> None: | |
| """ | |
| Verify that the generated hierarchies are indeed invalid by attempting | |
| to load them with ome-zarr-models. | |
| """ | |
| from pydantic import ValidationError | |
| print() | |
| print("=" * 60) | |
| print("Verification: Attempting to load invalid hierarchies") | |
| print("=" * 60) | |
| print() | |
| for zarr_path in base_path.rglob("*.zarr"): | |
| print(f"Testing: {zarr_path.relative_to(base_path)}") | |
| try: | |
| store = zarr.storage.LocalStore(zarr_path) | |
| group = zarr.open_group(store, mode="r") | |
| Image.from_zarr(group) | |
| print(" UNEXPECTED: Loaded successfully (should have failed!)") | |
| except ValidationError as e: | |
| # Extract first error message | |
| error_msg = str(e.errors()[0]["msg"])[:60] | |
| print(f" EXPECTED ValidationError: {error_msg}...") | |
| except Exception as e: | |
| print(f" ERROR: {type(e).__name__}: {e}") | |
| print() | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser( | |
| description="Generate invalid OME-Zarr v0.5 hierarchies for testing" | |
| ) | |
| parser.add_argument( | |
| "--output", | |
| "-o", | |
| type=Path, | |
| default=Path("invalid_omezarr_test_data"), | |
| help="Output directory for generated hierarchies", | |
| ) | |
| parser.add_argument( | |
| "--verify", | |
| "-v", | |
| action="store_true", | |
| help="Verify that generated hierarchies fail validation", | |
| ) | |
| parser.add_argument( | |
| "--suffix", | |
| "-s", | |
| type=str, | |
| default=".zarr", | |
| help="Suffix for output directories (default: .zarr)", | |
| ) | |
| args = parser.parse_args() | |
| results = generate_all_violations(args.output, suffix=args.suffix) | |
| if args.verify: | |
| verify_violations(args.output) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment