Skip to content

Instantly share code, notes, and snippets.

@cnolanminich
Created December 8, 2025 17:47
Show Gist options
  • Select an option

  • Save cnolanminich/7af194a340772fb30ae88750be79e9ee to your computer and use it in GitHub Desktop.

Select an option

Save cnolanminich/7af194a340772fb30ae88750be79e9ee to your computer and use it in GitHub Desktop.
Sigma Data Model Implementation Plan

Sigma Data Models Implementation Plan

Executive Summary

This document outlines a detailed plan to add support for Sigma's new Data Models API to the dagster-sigma library. Data Models are replacing the deprecated datasets (end-of-life: June 2, 2026) and provide enhanced semantic layer capabilities including metrics, relationships, column/row-level security, and more.

Timeline: ~5-8 days development + testing Priority: Medium-High (18 month deprecation timeline for datasets) Complexity: Moderate


Table of Contents

  1. Phase 0: API Discovery & Validation
  2. Phase 1: Core Data Structure Implementation
  3. Phase 2: API Client Extensions
  4. Phase 3: Translation Layer
  5. Phase 4: Asset Loading Integration
  6. Phase 5: Materialization Support
  7. Phase 6: Component Framework Integration
  8. Phase 7: Migration Support & Deprecation
  9. Testing Strategy
  10. Documentation Requirements
  11. Rollout Plan
  12. Risk Mitigation

Phase 0: API Discovery & Validation

Goal: Understand the exact API response structure before implementation Duration: 0.5-1 day Priority: Critical - blocks all other work

Tasks

0.1: Set Up Test Environment

  • Obtain Sigma API credentials for a test organization
  • Create a test data model in Sigma UI with:
    • Multiple data sources (tables, datasets)
    • At least one relationship
    • At least one metric
    • A materialization schedule
  • Document the test data model structure

0.2: API Exploration Script

Create a standalone Python script to explore all data model endpoints:

# scripts/explore_data_models_api.py
import asyncio
import json
import aiohttp
from typing import Any

async def explore_data_models_api():
    """Explore Sigma Data Models API endpoints and save responses."""

    base_url = "https://aws-api.sigmacomputing.com"
    client_id = "YOUR_CLIENT_ID"
    client_secret = "YOUR_CLIENT_SECRET"

    # Get auth token
    token = await get_auth_token(base_url, client_id, client_secret)

    headers = {
        "Authorization": f"Bearer {token}",
        "Accept": "application/json",
        "X-Sigma-Partner-Id": "dagster"
    }

    async with aiohttp.ClientSession(headers=headers) as session:
        # 1. List all data models
        data_models = await fetch_and_save(
            session,
            f"{base_url}/v2/data-models?limit=1000",
            "data_models_list.json"
        )

        if data_models and data_models.get("entries"):
            data_model_id = data_models["entries"][0]["dataModelId"]

            # 2. Get specific data model details
            await fetch_and_save(
                session,
                f"{base_url}/v2/data-models/{data_model_id}",
                "data_model_detail.json"
            )

            # 3. List data model sources
            await fetch_and_save(
                session,
                f"{base_url}/v2/data-models/{data_model_id}/sources",
                "data_model_sources.json"
            )

            # 4. List materialization schedules
            await fetch_and_save(
                session,
                f"{base_url}/v2/data-models/{data_model_id}/materialization-schedules",
                "data_model_schedules.json"
            )

            # 5. Check if there's a pages/elements equivalent
            try:
                await fetch_and_save(
                    session,
                    f"{base_url}/v2/data-models/{data_model_id}/pages",
                    "data_model_pages.json"
                )
            except:
                print("No pages endpoint for data models")

            # 6. Check for lineage endpoint
            try:
                await fetch_and_save(
                    session,
                    f"{base_url}/v2/data-models/{data_model_id}/lineage",
                    "data_model_lineage.json"
                )
            except:
                print("No lineage endpoint for data models")

async def fetch_and_save(session, url, filename):
    """Fetch JSON and save to file."""
    async with session.get(url) as resp:
        data = await resp.json()
        with open(f"api_exploration/{filename}", "w") as f:
            json.dump(data, f, indent=2)
        return data

if __name__ == "__main__":
    asyncio.run(explore_data_models_api())

0.3: Schema Analysis

  • Document all fields returned by each endpoint
  • Identify differences from dataset API responses
  • Map data model concepts to existing dataset concepts
  • Identify new concepts (metrics, relationships) and how they're represented
  • Create a comparison table: Datasets vs Data Models

0.4: Validation Questions

Answer these questions through API exploration:

  1. Identity: How are data models uniquely identified? (dataModelId?)
  2. Sources: How are upstream dependencies represented in /sources endpoint?
  3. Relationships: Are relationships exposed via API or embedded in data model?
  4. Metrics: Are metrics separate entities or part of data model metadata?
  5. Materialization: Same pattern as workbooks/datasets or different?
  6. Lineage: Is there a lineage endpoint or is dependency info in sources?
  7. Columns: How are column definitions exposed?
  8. Ownership: Same ownerId pattern as workbooks?
  9. Paths: Do data models have folder paths like workbooks?
  10. Versioning: How is version tagging represented?

Deliverable: api_exploration/DATA_MODELS_API_SCHEMA.md document


Phase 1: Core Data Structure Implementation

Goal: Define the data model classes using @record decorator Duration: 0.5-1 day File: python_modules/libraries/dagster-sigma/dagster_sigma/translator.py

Tasks

1.1: Define SigmaDataModel Record Class

@record
class SigmaDataModel:
    """Represents a Sigma Data Model.

    Data Models are Sigma's next-generation semantic layer that replaces datasets
    (deprecated June 2026). They provide enhanced governance, metrics, relationships,
    and security features.

    See: https://help.sigmacomputing.com/docs/get-started-with-data-modeling
    """

    properties: dict[str, Any]
    """Raw data model properties from API response."""

    sources: set[str]
    """Set of upstream dependency inodes (tables, datasets, or other data models)."""

    relationships: list[dict[str, Any]]
    """Relationships defined between data sources."""

    metrics: list[dict[str, Any]]
    """Metrics defined in this data model."""

    @property
    def data_model_id(self) -> str:
        """The unique identifier for this data model."""
        return self.properties["dataModelId"]

    @property
    def name(self) -> str:
        """The name of this data model."""
        return self.properties["name"]

    @property
    def url(self) -> str:
        """The URL to view this data model in Sigma."""
        return self.properties["url"]

    @property
    def path(self) -> str:
        """The folder path where this data model is located."""
        return self.properties.get("path", "")

    @property
    def description(self) -> str:
        """The description of this data model."""
        return self.properties.get("description", "")

    @property
    def created_at(self) -> str:
        """ISO 8601 timestamp when data model was created."""
        return self.properties["createdAt"]

    @property
    def owner_id(self) -> str:
        """The ID of the user who owns this data model."""
        return self.properties.get("ownerId", "")

    @cached_method
    def get_upstream_table_deps(self, organization_data: "SigmaOrganizationData") -> set[str]:
        """Get all upstream table dependencies, including transitive dependencies."""
        deps = set()

        for source_inode in self.sources:
            # Direct table dependency
            if source_inode in organization_data.tables:
                deps.add(source_inode)
            # Dataset dependency (resolve its tables)
            elif source_inode in organization_data.datasets:
                dataset = organization_data.get_datasets_by_inode()[source_inode]
                deps.update(dataset.inputs)
            # Data model dependency (resolve recursively)
            elif source_inode in organization_data.data_models:
                data_model = organization_data.get_data_models_by_inode()[source_inode]
                deps.update(data_model.get_upstream_table_deps(organization_data))

        return deps

1.2: Update SigmaOrganizationData

@record
class SigmaOrganizationData:
    """Root data structure containing all Sigma organization assets."""

    workbooks: dict[str, SigmaWorkbook]
    datasets: dict[str, SigmaDataset]
    data_models: dict[str, SigmaDataModel]  # NEW
    tables: dict[str, SigmaTable]

    @cached_method
    def get_data_models_by_inode(self) -> dict[str, SigmaDataModel]:
        """Get data models indexed by their inode identifier."""
        # Implementation depends on how data models expose inodes
        # May need to extract from urlId or other field
        return {
            self._extract_inode(dm): dm
            for dm in self.data_models.values()
        }

    def _extract_inode(self, data_model: SigmaDataModel) -> str:
        """Extract inode identifier from data model."""
        # TODO: Determine from API exploration
        # Likely similar to: f"inode-{data_model.properties['urlId']}"
        pass

1.3: Create Translator Data Wrapper

@record
class SigmaDataModelTranslatorData:
    """Wrapper providing data model with organization context for translation."""

    data_model: SigmaDataModel
    organization_data: SigmaOrganizationData

1.4: Define Metadata Set

class SigmaDataModelMetadataSet(NamespacedMetadataSet):
    """Metadata for Sigma Data Model assets."""

    web_url: Optional[str] = None
    """The URL to the data model in Sigma."""

    created_at: Optional[str] = None
    """ISO 8601 timestamp when data model was created."""

    description: Optional[str] = None
    """The data model description."""

    properties: Optional[dict[str, Any]] = None
    """Raw properties from Sigma API."""

    sources: Optional[list[dict[str, Any]]] = None
    """List of data sources feeding this data model."""

    relationships: Optional[list[dict[str, Any]]] = None
    """Relationships defined in this data model."""

    metrics: Optional[list[dict[str, Any]]] = None
    """Metrics defined in this data model."""

    data_model_id: Optional[str] = None
    """The unique identifier for this data model."""

    @classmethod
    def namespace(cls) -> str:
        return "dagster_sigma"

Deliverable: Updated translator.py with new classes


Phase 2: API Client Extensions

Goal: Add API methods to fetch data model information Duration: 1-1.5 days File: python_modules/libraries/dagster-sigma/dagster_sigma/resource.py

Tasks

2.1: Add Data Model Fetching Methods

class SigmaOrganization(ConfigurableResource):
    """Sigma API client resource."""

    # ... existing fields ...

    async def _fetch_data_models(self) -> list[dict[str, Any]]:
        """Fetch all data models in the organization.

        Returns:
            List of data model objects from Sigma API.
        """
        return await self._fetch_json_async_paginated_entries(
            "/v2/data-models",
            params={"limit": 1000}
        )

    async def _fetch_data_model_sources(
        self,
        data_model_id: str
    ) -> list[dict[str, Any]]:
        """Fetch data sources for a specific data model.

        Args:
            data_model_id: The unique identifier for the data model.

        Returns:
            List of source objects (tables, datasets, or data models).
        """
        return await self._fetch_json_async_paginated_entries(
            f"/v2/data-models/{data_model_id}/sources",
            params={"limit": 1000}
        )

    async def _fetch_materialization_schedules_for_data_model(
        self,
        data_model_id: str
    ) -> list[dict[str, Any]]:
        """Fetch materialization schedules for a data model.

        Args:
            data_model_id: The unique identifier for the data model.

        Returns:
            List of materialization schedule objects.
        """
        return await self._fetch_json_async_paginated_entries(
            f"/v2/data-models/{data_model_id}/materialization-schedules",
            params={"limit": 1000}
        )

2.2: Update SigmaFilter

@record
class SigmaFilter:
    """Filters for selecting which Sigma objects to load."""

    workbook_folders: Sequence[str] = []
    """List of workbook folder paths to include."""

    workbooks: Sequence[str] = []
    """List of specific workbook paths to include."""

    include_unused_datasets: bool = False
    """Whether to include datasets not referenced in any workbook."""

    data_model_folders: Sequence[str] = []
    """List of data model folder paths to include. NEW"""

    data_models: Sequence[str] = []
    """List of specific data model paths to include. NEW"""

    include_data_models: bool = True
    """Whether to load data models at all. Set to False to only use legacy datasets. NEW"""

    include_datasets: bool = True
    """Whether to load datasets. Set to False to only use new data models. NEW"""

2.3: Update build_organization_data Method

async def build_organization_data(
    self,
    sigma_filter: Optional[SigmaFilter] = None,
    fetch_column_data: bool = False,
    fetch_lineage_data: bool = True,
) -> SigmaOrganizationData:
    """Fetch and organize all Sigma assets.

    Args:
        sigma_filter: Optional filter for selecting which assets to load.
        fetch_column_data: Whether to fetch column metadata.
        fetch_lineage_data: Whether to fetch lineage information.

    Returns:
        SigmaOrganizationData with all fetched assets.
    """
    sigma_filter = sigma_filter or SigmaFilter()

    # Fetch all asset types in parallel
    (
        raw_workbooks,
        raw_datasets,
        raw_data_models,  # NEW
        raw_tables,
        members,
    ) = await asyncio.gather(
        self._fetch_workbooks(),
        self._fetch_datasets() if sigma_filter.include_datasets else self._return_empty_list(),
        self._fetch_data_models() if sigma_filter.include_data_models else self._return_empty_list(),  # NEW
        self._fetch_tables(),
        self._fetch_members(),
    )

    # Filter data models by path
    filtered_data_models = self._filter_data_models_by_path(
        raw_data_models,
        sigma_filter
    )

    # Build data model objects with sources
    data_models = await self._build_data_model_objects(
        filtered_data_models,
        fetch_lineage_data
    )

    # ... rest of existing logic ...

    return SigmaOrganizationData(
        workbooks=workbooks_by_id,
        datasets=datasets_by_id,
        data_models=data_models_by_id,  # NEW
        tables=tables_by_inode,
    )

async def _return_empty_list(self) -> list:
    """Helper to return empty list in async gather."""
    return []

def _filter_data_models_by_path(
    self,
    data_models: list[dict[str, Any]],
    sigma_filter: SigmaFilter,
) -> list[dict[str, Any]]:
    """Filter data models based on folder and path filters."""
    if not sigma_filter.data_model_folders and not sigma_filter.data_models:
        return data_models

    filtered = []
    for dm in data_models:
        path = dm.get("path", "")
        name = dm.get("name", "")
        full_path = f"{path}/{name}" if path else name

        # Check folder filters
        if sigma_filter.data_model_folders:
            if any(path.startswith(folder) for folder in sigma_filter.data_model_folders):
                filtered.append(dm)
                continue

        # Check explicit data model filters
        if sigma_filter.data_models:
            if full_path in sigma_filter.data_models:
                filtered.append(dm)
                continue

    return filtered if (sigma_filter.data_model_folders or sigma_filter.data_models) else data_models

async def _build_data_model_objects(
    self,
    data_models: list[dict[str, Any]],
    fetch_lineage_data: bool,
) -> dict[str, SigmaDataModel]:
    """Build SigmaDataModel objects with enriched data.

    Args:
        data_models: Raw data model dictionaries from API.
        fetch_lineage_data: Whether to fetch source dependencies.

    Returns:
        Dictionary mapping data model ID to SigmaDataModel object.
    """
    if not fetch_lineage_data:
        # Return basic data models without sources
        return {
            dm["dataModelId"]: SigmaDataModel(
                properties=dm,
                sources=set(),
                relationships=[],
                metrics=[],
            )
            for dm in data_models
        }

    # Fetch sources for all data models in parallel
    sources_by_id = await asyncio.gather(*[
        self._fetch_data_model_sources(dm["dataModelId"])
        for dm in data_models
    ])

    data_models_by_id = {}
    for dm, sources in zip(data_models, sources_by_id):
        # Extract source inodes from response
        source_inodes = {
            self._extract_source_inode(source)
            for source in sources
        }

        # TODO: Extract relationships and metrics from dm or sources
        # This depends on API response structure
        relationships = dm.get("relationships", [])
        metrics = dm.get("metrics", [])

        data_models_by_id[dm["dataModelId"]] = SigmaDataModel(
            properties=dm,
            sources=source_inodes,
            relationships=relationships,
            metrics=metrics,
        )

    return data_models_by_id

def _extract_source_inode(self, source: dict[str, Any]) -> str:
    """Extract inode identifier from a data model source object.

    Args:
        source: Source object from /sources API endpoint.

    Returns:
        Inode identifier string.
    """
    # TODO: Implement based on API exploration
    # Likely patterns:
    # - source["inode"]
    # - source["nodeId"]
    # - f"inode-{source['urlId']}"
    pass

Deliverable: Updated resource.py with data model fetching


Phase 3: Translation Layer

Goal: Translate data models to Dagster AssetSpecs Duration: 1-1.5 days File: python_modules/libraries/dagster-sigma/dagster_sigma/translator.py

Tasks

3.1: Extend DagsterSigmaTranslator

class DagsterSigmaTranslator:
    """Translator for converting Sigma objects to Dagster AssetSpecs."""

    def get_asset_spec(
        self,
        data: Union[
            SigmaWorkbookTranslatorData,
            SigmaDatasetTranslatorData,
            SigmaDataModelTranslatorData  # NEW
        ]
    ) -> AssetSpec:
        """Convert Sigma object to AssetSpec.

        Args:
            data: Translator data wrapper for a Sigma object.

        Returns:
            AssetSpec for the Sigma object.
        """
        if isinstance(data, SigmaWorkbookTranslatorData):
            return self._get_workbook_spec(data)
        elif isinstance(data, SigmaDatasetTranslatorData):
            return self._get_dataset_spec(data)
        elif isinstance(data, SigmaDataModelTranslatorData):  # NEW
            return self._get_data_model_spec(data)
        else:
            raise ValueError(f"Unsupported translator data type: {type(data)}")

    def _get_data_model_spec(
        self,
        data: SigmaDataModelTranslatorData
    ) -> AssetSpec:
        """Convert a Sigma Data Model to an AssetSpec.

        Args:
            data: Data model translator data.

        Returns:
            AssetSpec for the data model.
        """
        data_model = data.data_model
        org_data = data.organization_data

        # Get upstream dependencies
        deps = self._get_data_model_deps(data_model, org_data)

        # Build metadata
        metadata = SigmaDataModelMetadataSet(
            web_url=data_model.url,
            created_at=data_model.created_at,
            description=data_model.description,
            properties=data_model.properties,
            sources=[
                org_data.get_object_by_inode(inode)
                for inode in data_model.sources
            ],
            relationships=data_model.relationships,
            metrics=data_model.metrics,
            data_model_id=data_model.data_model_id,
        )

        # Determine owner
        owner = None
        if data_model.owner_id:
            member = org_data.members.get(data_model.owner_id)
            if member:
                owner = member.get("email")

        return AssetSpec(
            key=self.get_data_model_asset_key(data_model),
            deps=deps,
            description=data_model.description or None,
            metadata=metadata,
            kinds={"sigma", "data_model"},
            owners=[owner] if owner else None,
            tags=self.get_data_model_tags(data_model),
            group_name=self.get_data_model_group_name(data_model),
        )

    def _get_data_model_deps(
        self,
        data_model: SigmaDataModel,
        org_data: SigmaOrganizationData,
    ) -> set[AssetKey]:
        """Get upstream asset dependencies for a data model.

        Data models can depend on:
        - Tables (direct warehouse tables)
        - Datasets (legacy)
        - Other data models (recursive)

        Args:
            data_model: The data model to get dependencies for.
            org_data: Organization data for resolving references.

        Returns:
            Set of AssetKeys for upstream dependencies.
        """
        deps = set()

        for source_inode in data_model.sources:
            # Table dependency
            if source_inode in org_data.tables:
                table = org_data.tables[source_inode]
                deps.add(self.get_table_asset_key(table))

            # Dataset dependency
            elif source_inode in org_data.get_datasets_by_inode():
                dataset = org_data.get_datasets_by_inode()[source_inode]
                deps.add(self.get_dataset_asset_key(dataset))

            # Data model dependency (recursive)
            elif source_inode in org_data.get_data_models_by_inode():
                upstream_dm = org_data.get_data_models_by_inode()[source_inode]
                deps.add(self.get_data_model_asset_key(upstream_dm))

        return deps

    # Customization hooks for data models

    def get_data_model_asset_key(self, data_model: SigmaDataModel) -> AssetKey:
        """Get the asset key for a data model.

        Default: AssetKey(["sigma", "data_model", <sanitized_name>])

        Args:
            data_model: The data model.

        Returns:
            AssetKey for the data model.
        """
        return AssetKey(["sigma", "data_model", self._sanitize_name(data_model.name)])

    def get_data_model_tags(self, data_model: SigmaDataModel) -> dict[str, str]:
        """Get tags for a data model asset.

        Args:
            data_model: The data model.

        Returns:
            Dictionary of tags.
        """
        tags = {}
        if data_model.path:
            tags["sigma/path"] = data_model.path
        return tags

    def get_data_model_group_name(self, data_model: SigmaDataModel) -> Optional[str]:
        """Get the group name for a data model asset.

        Args:
            data_model: The data model.

        Returns:
            Group name or None.
        """
        return None

Deliverable: Updated translator.py with data model translation


Phase 4: Asset Loading Integration

Goal: Integrate data models into asset loading flow Duration: 0.5-1 day File: python_modules/libraries/dagster-sigma/dagster_sigma/assets.py

Tasks

4.1: Update load_sigma_asset_specs

@experimental
def load_sigma_asset_specs(
    organization: SigmaOrganization,
    dagster_sigma_translator: DagsterSigmaTranslator = DagsterSigmaTranslator(),
    sigma_filter: Optional[SigmaFilter] = None,
    fetch_column_data: bool = False,
    fetch_lineage_data: bool = True,
    snapshot_path: Optional[str] = None,
) -> list[AssetSpec]:
    """Load Sigma assets as Dagster AssetSpecs.

    Loads workbooks, datasets, data models, and their dependencies.

    Args:
        organization: The SigmaOrganization resource.
        dagster_sigma_translator: Translator for customizing asset specs.
        sigma_filter: Optional filter for selecting which assets to load.
        fetch_column_data: Whether to fetch column metadata (slower).
        fetch_lineage_data: Whether to fetch lineage/dependency information.
        snapshot_path: Optional path to cached snapshot file.

    Returns:
        List of AssetSpecs for all Sigma assets.

    Examples:
        Basic usage with both datasets and data models:

        .. code-block:: python

            from dagster_sigma import SigmaOrganization, load_sigma_asset_specs

            sigma = SigmaOrganization(
                base_url="https://aws-api.sigmacomputing.com",
                client_id="my_client_id",
                client_secret="my_client_secret",
            )

            # Load all Sigma assets
            sigma_specs = load_sigma_asset_specs(sigma)

        Using only data models (recommended for new integrations):

        .. code-block:: python

            from dagster_sigma import SigmaOrganization, SigmaFilter, load_sigma_asset_specs

            # Only load data models, not legacy datasets
            sigma_filter = SigmaFilter(
                include_datasets=False,
                include_data_models=True,
            )

            sigma_specs = load_sigma_asset_specs(
                sigma,
                sigma_filter=sigma_filter,
            )
    """
    if snapshot_path:
        org_data = organization._load_snapshot(snapshot_path)
    else:
        org_data = organization.build_organization_data(
            sigma_filter=sigma_filter,
            fetch_column_data=fetch_column_data,
            fetch_lineage_data=fetch_lineage_data,
        )

    specs = []

    # Translate workbooks
    for workbook in org_data.workbooks.values():
        spec = dagster_sigma_translator.get_asset_spec(
            SigmaWorkbookTranslatorData(
                workbook=workbook,
                organization_data=org_data,
            )
        )
        specs.append(spec)

    # Translate datasets (legacy)
    for dataset in org_data.datasets.values():
        spec = dagster_sigma_translator.get_asset_spec(
            SigmaDatasetTranslatorData(
                dataset=dataset,
                organization_data=org_data,
            )
        )
        specs.append(spec)

    # Translate data models (NEW)
    for data_model in org_data.data_models.values():
        spec = dagster_sigma_translator.get_asset_spec(
            SigmaDataModelTranslatorData(
                data_model=data_model,
                organization_data=org_data,
            )
        )
        specs.append(spec)

    return specs

Deliverable: Updated assets.py with data model support


Phase 5: Materialization Support

Goal: Support triggering data model materializations Duration: 1-1.5 days Files: resource.py, assets.py

Tasks

5.1: Add Materialization Methods to Resource

class SigmaOrganization(ConfigurableResource):
    """Sigma API client resource."""

    def run_materializations_for_data_model(
        self,
        spec: AssetSpec,
        element_id: Optional[str] = None,
    ) -> None:
        """Trigger materialization for a data model.

        Args:
            spec: AssetSpec for the data model.
            element_id: Optional specific element to materialize.
                       If not provided, materializes all scheduled elements.
        """
        metadata = SigmaDataModelMetadataSet.extract(spec.metadata)
        data_model_id = metadata.data_model_id

        if not data_model_id:
            raise ValueError(f"No data_model_id in metadata for {spec.key}")

        # Get materialization schedules
        schedules = self._fetch_materialization_schedules_for_data_model_sync(
            data_model_id
        )

        if not schedules:
            raise ValueError(f"No materialization schedules found for data model {data_model_id}")

        # Trigger materializations
        materialization_ids = []
        for schedule in schedules:
            if element_id and schedule.get("elementId") != element_id:
                continue

            mat_id = self._trigger_data_model_materialization_sync(
                data_model_id,
                schedule["elementId"],
            )
            materialization_ids.append((schedule["elementId"], mat_id))

        # Poll until all complete
        for elem_id, mat_id in materialization_ids:
            self._poll_data_model_materialization_sync(
                data_model_id,
                mat_id,
                elem_id,
            )

    def _fetch_materialization_schedules_for_data_model_sync(
        self,
        data_model_id: str,
    ) -> list[dict[str, Any]]:
        """Synchronous version of _fetch_materialization_schedules_for_data_model."""
        return asyncio.run(
            self._fetch_materialization_schedules_for_data_model(data_model_id)
        )

    def _trigger_data_model_materialization_sync(
        self,
        data_model_id: str,
        element_id: str,
    ) -> str:
        """Trigger a data model element materialization.

        Args:
            data_model_id: The data model ID.
            element_id: The element ID to materialize.

        Returns:
            Materialization ID for polling.
        """
        response = requests.post(
            f"{self.base_url.value}/v2/data-models/{data_model_id}/elements/{element_id}/materialize",
            headers=self._get_headers(),
            timeout=30,
        )
        response.raise_for_status()
        data = response.json()
        return data["materializationId"]

    def _poll_data_model_materialization_sync(
        self,
        data_model_id: str,
        materialization_id: str,
        element_id: str,
    ) -> None:
        """Poll materialization status until complete.

        Args:
            data_model_id: The data model ID.
            materialization_id: The materialization job ID.
            element_id: The element ID being materialized.
        """
        import time

        max_attempts = 60
        attempt = 0

        while attempt < max_attempts:
            response = requests.get(
                f"{self.base_url.value}/v2/data-models/{data_model_id}/materializations/{materialization_id}",
                headers=self._get_headers(),
                timeout=30,
            )
            response.raise_for_status()
            data = response.json()

            status = data["status"]

            if status == "ready":
                return
            elif status == "failed":
                raise RuntimeError(
                    f"Materialization failed for data model {data_model_id} "
                    f"element {element_id}: {data.get('error', 'Unknown error')}"
                )

            time.sleep(2)
            attempt += 1

        raise TimeoutError(
            f"Materialization timed out for data model {data_model_id} "
            f"element {element_id} after {max_attempts * 2} seconds"
        )

5.2: Add Asset Definition Builder

def build_materialize_data_model_assets_definition(
    resource_key: str,
    spec: AssetSpec,
) -> AssetsDefinition:
    """Build an AssetsDefinition for materializing a Sigma data model.

    This creates a materialization asset that triggers scheduled materializations
    for the data model in Sigma.

    Args:
        resource_key: The resource key for the SigmaOrganization resource.
        spec: The AssetSpec for the data model.

    Returns:
        AssetsDefinition that materializes the data model.

    Examples:
        .. code-block:: python

            from dagster import Definitions
            from dagster_sigma import (
                SigmaOrganization,
                load_sigma_asset_specs,
                build_materialize_data_model_assets_definition,
            )

            sigma = SigmaOrganization(...)
            sigma_specs = load_sigma_asset_specs(sigma)

            # Find data model specs
            data_model_specs = [
                spec for spec in sigma_specs
                if "data_model" in spec.kinds
            ]

            # Build materialization assets
            materialize_defs = [
                build_materialize_data_model_assets_definition("sigma", spec)
                for spec in data_model_specs
            ]

            defs = Definitions(
                assets=[*sigma_specs, *materialize_defs],
                resources={"sigma": sigma},
            )
    """
    @multi_asset(
        name=f"materialize_{spec.key.to_python_identifier()}",
        specs=[spec],
        required_resource_keys={resource_key},
    )
    def _materialize_data_model(context):
        sigma: SigmaOrganization = getattr(context.resources, resource_key)
        sigma.run_materializations_for_data_model(spec)
        return {spec.key: MaterializeResult()}

    return _materialize_data_model

Deliverable: Updated resource.py and assets.py with materialization support


Phase 6: Component Framework Integration

Goal: Add data model support to SigmaComponent Duration: 0.5-1 day File: python_modules/libraries/dagster-sigma/dagster_sigma/components/sigma_component.py

Tasks

6.1: Update SigmaComponent

The component should automatically work with data models since it uses load_sigma_asset_specs under the hood. However, we should:

  1. Update type hints to include SigmaDataModelTranslatorData
  2. Update examples to show data model usage
  3. Test that component loading works correctly
# Update type hints
def get_asset_spec(
    self,
    data: Union[
        SigmaWorkbookTranslatorData,
        SigmaDatasetTranslatorData,
        SigmaDataModelTranslatorData,  # NEW
    ]
) -> AssetSpec:
    """Override to customize asset specs."""
    pass

Deliverable: Updated sigma_component.py with data model support


Phase 7: Migration Support & Deprecation

Goal: Help users migrate from datasets to data models Duration: 0.5-1 day Files: Multiple

Tasks

7.1: Add Deprecation Warnings

# In translator.py
def _get_dataset_spec(self, data: SigmaDatasetTranslatorData) -> AssetSpec:
    """Convert a Sigma Dataset to an AssetSpec.

    .. deprecated:: 1.9.0
        Sigma datasets are deprecated and will be removed on June 2, 2026.
        Use Sigma Data Models instead by setting `SigmaFilter(include_datasets=False, include_data_models=True)`.
        See: https://help.sigmacomputing.com/docs/get-started-with-data-modeling
    """
    warnings.warn(
        "Sigma datasets are deprecated and will be removed on June 2, 2026. "
        "Migrate to Sigma Data Models. "
        "See: https://help.sigmacomputing.com/docs/get-started-with-data-modeling",
        DeprecationWarning,
        stacklevel=2,
    )
    # ... existing implementation ...

7.2: Add Migration Helper

# In assets.py
def detect_dataset_to_data_model_migrations(
    organization: SigmaOrganization,
) -> dict[str, str]:
    """Detect datasets that have been migrated to data models.

    This helper identifies datasets that have corresponding data models
    with the same name, indicating a migration has occurred.

    Args:
        organization: The SigmaOrganization resource.

    Returns:
        Dictionary mapping dataset ID to corresponding data model ID.

    Examples:
        .. code-block:: python

            from dagster_sigma import SigmaOrganization, detect_dataset_to_data_model_migrations

            sigma = SigmaOrganization(...)
            migrations = detect_dataset_to_data_model_migrations(sigma)

            for dataset_id, data_model_id in migrations.items():
                print(f"Dataset {dataset_id} has been migrated to data model {data_model_id}")
    """
    org_data = organization.build_organization_data()

    migrations = {}
    for dataset_id, dataset in org_data.datasets.items():
        # Look for data model with matching name
        for dm_id, data_model in org_data.data_models.items():
            if dataset.name == data_model.name:
                migrations[dataset_id] = dm_id
                break

    return migrations

7.3: Add CLI Command for Migration Analysis

# In cli.py
@sigma_group.command(name="analyze-migration")
@click.option("--base-url", required=True, help="Sigma base URL")
@click.option("--client-id", required=True, help="Sigma client ID")
@click.option("--client-secret", required=True, help="Sigma client secret")
def analyze_migration_command(base_url: str, client_id: str, client_secret: str):
    """Analyze dataset to data model migration status.

    This command helps identify which datasets have been migrated to data models
    and which still need migration.
    """
    sigma = SigmaOrganization(
        base_url=base_url,
        client_id=client_id,
        client_secret=client_secret,
    )

    org_data = sigma.build_organization_data()
    migrations = detect_dataset_to_data_model_migrations(sigma)

    click.echo("\n=== Dataset Migration Analysis ===\n")
    click.echo(f"Total datasets: {len(org_data.datasets)}")
    click.echo(f"Total data models: {len(org_data.data_models)}")
    click.echo(f"Migrated datasets: {len(migrations)}")
    click.echo(f"Datasets pending migration: {len(org_data.datasets) - len(migrations)}")

    if migrations:
        click.echo("\n--- Migrated Datasets ---")
        for dataset_id, dm_id in migrations.items():
            dataset = org_data.datasets[dataset_id]
            data_model = org_data.data_models[dm_id]
            click.echo(f"  ✓ {dataset.name}{data_model.name}")

    pending = set(org_data.datasets.keys()) - set(migrations.keys())
    if pending:
        click.echo("\n--- Datasets Pending Migration ---")
        for dataset_id in pending:
            dataset = org_data.datasets[dataset_id]
            click.echo(f"  ⚠ {dataset.name}")

Deliverable: Migration support utilities and documentation


Testing Strategy

Overview

Comprehensive testing is critical for this feature. We'll follow the existing test patterns in dagster-sigma and expand them for data models.

Test Categories

1. Unit Tests

File: dagster_sigma_tests/test_data_models.py

import pytest
from dagster_sigma import (
    SigmaDataModel,
    SigmaDataModelTranslatorData,
    DagsterSigmaTranslator,
)

class TestSigmaDataModelDataStructures:
    """Test data model record classes."""

    def test_data_model_properties(self):
        """Test SigmaDataModel property accessors."""
        dm = SigmaDataModel(
            properties={
                "dataModelId": "dm-123",
                "name": "Sales Data Model",
                "url": "https://app.sigmacomputing.com/...",
                "path": "Analytics/Sales",
                "description": "Sales analytics data model",
                "createdAt": "2024-01-15T10:00:00Z",
                "ownerId": "user-456",
            },
            sources={"inode-abc", "inode-def"},
            relationships=[],
            metrics=[],
        )

        assert dm.data_model_id == "dm-123"
        assert dm.name == "Sales Data Model"
        assert dm.url == "https://app.sigmacomputing.com/..."
        assert dm.path == "Analytics/Sales"
        assert dm.description == "Sales analytics data model"
        assert dm.created_at == "2024-01-15T10:00:00Z"
        assert dm.owner_id == "user-456"
        assert len(dm.sources) == 2

    def test_data_model_upstream_table_deps_direct(self, sample_org_data):
        """Test resolving direct table dependencies."""
        dm = sample_org_data.data_models["dm-123"]
        deps = dm.get_upstream_table_deps(sample_org_data)

        assert "inode-table-1" in deps
        assert len(deps) == 1

    def test_data_model_upstream_table_deps_via_dataset(self, sample_org_data):
        """Test resolving table dependencies through datasets."""
        dm = sample_org_data.data_models["dm-456"]
        deps = dm.get_upstream_table_deps(sample_org_data)

        # Should resolve dataset's table dependencies
        assert "inode-table-2" in deps

    def test_data_model_upstream_table_deps_recursive(self, sample_org_data):
        """Test resolving table dependencies through other data models."""
        dm = sample_org_data.data_models["dm-789"]
        deps = dm.get_upstream_table_deps(sample_org_data)

        # Should recursively resolve through dm-123
        assert "inode-table-1" in deps


class TestDataModelTranslation:
    """Test data model to AssetSpec translation."""

    def test_translate_data_model_basic(self, sample_data_model, sample_org_data):
        """Test basic data model translation."""
        translator = DagsterSigmaTranslator()
        data = SigmaDataModelTranslatorData(
            data_model=sample_data_model,
            organization_data=sample_org_data,
        )

        spec = translator.get_asset_spec(data)

        assert spec.key.path == ["sigma", "data_model", "sales_data_model"]
        assert "sigma" in spec.kinds
        assert "data_model" in spec.kinds
        assert spec.description == "Sales analytics data model"

    def test_translate_data_model_with_dependencies(self, sample_data_model, sample_org_data):
        """Test data model translation with upstream dependencies."""
        translator = DagsterSigmaTranslator()
        data = SigmaDataModelTranslatorData(
            data_model=sample_data_model,
            organization_data=sample_org_data,
        )

        spec = translator.get_asset_spec(data)

        # Should have table and dataset dependencies
        assert len(spec.deps) == 2

    def test_translate_data_model_metadata(self, sample_data_model, sample_org_data):
        """Test data model metadata extraction."""
        translator = DagsterSigmaTranslator()
        data = SigmaDataModelTranslatorData(
            data_model=sample_data_model,
            organization_data=sample_org_data,
        )

        spec = translator.get_asset_spec(data)
        metadata = spec.metadata

        assert "dagster_sigma/web_url" in metadata
        assert "dagster_sigma/created_at" in metadata
        assert "dagster_sigma/data_model_id" in metadata
        assert "dagster_sigma/sources" in metadata

    def test_custom_translator_data_model(self):
        """Test custom translator for data models."""
        class CustomTranslator(DagsterSigmaTranslator):
            def get_data_model_asset_key(self, data_model):
                return AssetKey(["custom", data_model.name])

        translator = CustomTranslator()
        # ... test custom behavior ...

File: dagster_sigma_tests/test_resource.py (additions)

class TestDataModelAPIClient:
    """Test data model API client methods."""

    @pytest.mark.asyncio
    async def test_fetch_data_models(self, sigma_org, sigma_data_model_data):
        """Test fetching data models list."""
        data_models = await sigma_org._fetch_data_models()

        assert len(data_models) > 0
        assert "dataModelId" in data_models[0]
        assert "name" in data_models[0]

    @pytest.mark.asyncio
    async def test_fetch_data_model_sources(self, sigma_org, sigma_data_model_data):
        """Test fetching data model sources."""
        sources = await sigma_org._fetch_data_model_sources("dm-123")

        assert len(sources) > 0
        # Validate source structure

    @pytest.mark.asyncio
    async def test_build_organization_data_with_data_models(
        self, sigma_org, sigma_sample_data
    ):
        """Test building organization data including data models."""
        org_data = await sigma_org.build_organization_data()

        assert len(org_data.data_models) > 0
        assert len(org_data.datasets) > 0
        assert len(org_data.workbooks) > 0

    @pytest.mark.asyncio
    async def test_filter_data_models_by_path(self, sigma_org, sigma_sample_data):
        """Test filtering data models by folder path."""
        filter = SigmaFilter(data_model_folders=["Analytics"])
        org_data = await sigma_org.build_organization_data(sigma_filter=filter)

        # Only data models in Analytics folder should be loaded
        for dm in org_data.data_models.values():
            assert dm.path.startswith("Analytics")

2. Integration Tests

File: dagster_sigma_tests/test_asset_specs.py (additions)

class TestDataModelAssetSpecs:
    """Test end-to-end asset spec generation with data models."""

    def test_load_asset_specs_with_data_models(
        self, sigma_org, sigma_sample_data
    ):
        """Test loading asset specs including data models."""
        specs = load_sigma_asset_specs(sigma_org)

        data_model_specs = [s for s in specs if "data_model" in s.kinds]

        assert len(data_model_specs) > 0

        # Verify structure
        for spec in data_model_specs:
            assert spec.key is not None
            assert "sigma" in spec.kinds
            assert "data_model" in spec.kinds

    def test_load_asset_specs_data_models_only(
        self, sigma_org, sigma_sample_data
    ):
        """Test loading only data models, excluding datasets."""
        filter = SigmaFilter(
            include_datasets=False,
            include_data_models=True,
        )
        specs = load_sigma_asset_specs(sigma_org, sigma_filter=filter)

        dataset_specs = [s for s in specs if "dataset" in s.kinds]
        data_model_specs = [s for s in specs if "data_model" in s.kinds]

        assert len(dataset_specs) == 0
        assert len(data_model_specs) > 0

    def test_data_model_dependencies_resolved(
        self, sigma_org, sigma_sample_data
    ):
        """Test that data model dependencies are correctly resolved."""
        specs = load_sigma_asset_specs(sigma_org)

        # Find a data model spec
        dm_spec = next(s for s in specs if "data_model" in s.kinds)

        # Should have upstream dependencies
        assert len(dm_spec.deps) > 0

        # All deps should be valid asset keys
        all_keys = {s.key for s in specs}
        for dep in dm_spec.deps:
            assert dep in all_keys or dep.path[0] != "sigma"  # External dep

3. Materialization Tests

File: dagster_sigma_tests/test_materialization.py (new file)

import pytest
from dagster import materialize
from dagster_sigma import (
    SigmaOrganization,
    load_sigma_asset_specs,
    build_materialize_data_model_assets_definition,
)


class TestDataModelMaterialization:
    """Test data model materialization functionality."""

    def test_trigger_data_model_materialization(
        self, sigma_org, sigma_data_model_materialization
    ):
        """Test triggering a data model materialization."""
        specs = load_sigma_asset_specs(sigma_org)
        dm_spec = next(s for s in specs if "data_model" in s.kinds)

        # Should not raise
        sigma_org.run_materializations_for_data_model(dm_spec)

    def test_build_materialize_data_model_assets_definition(
        self, sigma_org, sigma_sample_data
    ):
        """Test building materialization asset definition."""
        specs = load_sigma_asset_specs(sigma_org)
        dm_spec = next(s for s in specs if "data_model" in s.kinds)

        assets_def = build_materialize_data_model_assets_definition(
            "sigma", dm_spec
        )

        assert assets_def is not None
        assert len(assets_def.specs) == 1

    def test_materialize_data_model_asset(
        self, sigma_org, sigma_data_model_materialization
    ):
        """Test executing materialization through Dagster."""
        specs = load_sigma_asset_specs(sigma_org)
        dm_spec = next(s for s in specs if "data_model" in s.kinds)

        assets_def = build_materialize_data_model_assets_definition(
            "sigma", dm_spec
        )

        result = materialize(
            [assets_def],
            resources={"sigma": sigma_org},
        )

        assert result.success

4. Component Tests

File: dagster_sigma_tests/test_components.py (additions)

class TestSigmaComponentWithDataModels:
    """Test component framework with data models."""

    def test_component_loads_data_models(self, sigma_sample_data):
        """Test that component loads data models."""
        component = SigmaComponent(
            organization=SigmaOrganization(...),
        )

        defs = component.build_defs()

        data_model_assets = [
            asset for asset in defs.get_asset_graph().all_asset_keys
            if "data_model" in str(asset)
        ]

        assert len(data_model_assets) > 0

    def test_component_filter_data_models_only(self, sigma_sample_data):
        """Test component with data models only filter."""
        component = SigmaComponent(
            organization=SigmaOrganization(...),
            sigma_filter=SigmaFilter(
                include_datasets=False,
                include_data_models=True,
            ),
        )

        defs = component.build_defs()
        all_keys = defs.get_asset_graph().all_asset_keys

        dataset_assets = [k for k in all_keys if "dataset" in str(k)]
        data_model_assets = [k for k in all_keys if "data_model" in str(k)]

        assert len(dataset_assets) == 0
        assert len(data_model_assets) > 0

5. Test Fixtures

File: dagster_sigma_tests/conftest.py (additions)

# Sample data model data
SAMPLE_DATA_MODEL_DATA = {
    "dataModelId": "dm-123",
    "name": "Sales Data Model",
    "url": "https://app.sigmacomputing.com/dagster-labs/data-model/abc123",
    "path": "Analytics/Sales",
    "description": "Sales analytics data model",
    "createdAt": "2024-01-15T10:00:00Z",
    "updatedAt": "2024-01-15T12:00:00Z",
    "ownerId": "8TUQL5YbOwebkUGS0SAdqxlU5R0gD",
}

SAMPLE_DATA_MODEL_SOURCES = [
    {
        "sourceId": "src-1",
        "type": "table",
        "nodeId": "inode-C5ZiVq1GkaANMaD334AnA",
        "name": "Payments",
    },
    {
        "sourceId": "src-2",
        "type": "dataset",
        "nodeId": "inode-Iq557kfHN8KRu76HdGSWi",
        "name": "Orders Dataset",
    },
]

@pytest.fixture(name="sigma_data_model_data")
def sigma_data_model_fixture(responses: aioresponses) -> None:
    """Add mock data model API responses."""

    # List data models
    responses.add(
        method=hdrs.METH_GET,
        url="https://aws-api.sigmacomputing.com/v2/data-models?limit=1000",
        body=json.dumps(_build_paginated_response([SAMPLE_DATA_MODEL_DATA])),
        status=200,
    )

    # Get data model sources
    responses.add(
        method=hdrs.METH_GET,
        url="https://aws-api.sigmacomputing.com/v2/data-models/dm-123/sources?limit=1000",
        body=json.dumps(_build_paginated_response(SAMPLE_DATA_MODEL_SOURCES)),
        status=200,
    )

    # Materialization schedules
    responses.add(
        method=hdrs.METH_GET,
        url="https://aws-api.sigmacomputing.com/v2/data-models/dm-123/materialization-schedules?limit=1000",
        body=json.dumps(_build_paginated_response([{"elementId": "elem-1"}])),
        status=200,
    )

@pytest.fixture(name="sigma_data_model_materialization")
def sigma_data_model_materialization_fixture(responses: aioresponses) -> None:
    """Add mock data model materialization API responses."""

    # Trigger materialization
    request_responses.add(
        method=request_responses.POST,
        url=f"{SigmaBaseUrl.AWS_US.value}/v2/data-models/dm-123/elements/elem-1/materialize",
        status=200,
        body=json.dumps({"materializationId": "mat-123"}),
    )

    # Check status - pending
    request_responses.add(
        method=request_responses.GET,
        url=f"{SigmaBaseUrl.AWS_US.value}/v2/data-models/dm-123/materializations/mat-123",
        status=200,
        body=json.dumps({"materializationId": "mat-123", "status": "pending"}),
    )

    # Check status - ready
    request_responses.add(
        method=request_responses.GET,
        url=f"{SigmaBaseUrl.AWS_US.value}/v2/data-models/dm-123/materializations/mat-123",
        status=200,
        body=json.dumps({"materializationId": "mat-123", "status": "ready"}),
    )

Test Execution Plan

  1. Run existing tests to ensure no regressions:

    pytest python_modules/libraries/dagster-sigma/dagster_sigma_tests/
  2. Run new data model tests:

    pytest python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_data_models.py -v
    pytest python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_materialization.py -v
  3. Run full test suite:

    cd python_modules/libraries/dagster-sigma
    pytest
  4. Check code coverage:

    pytest --cov=dagster_sigma --cov-report=html
  5. Type checking:

    cd /Users/christian/code/dagster
    make quick_pyright
  6. Linting (MANDATORY):

    cd /Users/christian/code/dagster
    make ruff

Manual Testing Checklist

  • Create real data model in Sigma test organization
  • Run load_sigma_asset_specs against real API
  • Verify asset specs are created correctly
  • Verify dependencies are resolved correctly
  • Trigger materialization and verify it completes
  • Test with SigmaComponent
  • Test snapshot functionality
  • Test filtering options
  • Test with custom translator
  • Verify deprecation warnings appear for datasets

Documentation Requirements

1. API Documentation

File: python_modules/libraries/dagster-sigma/README.md

Add section:

## Working with Data Models

Sigma Data Models are the next-generation replacement for datasets, providing enhanced semantic layer capabilities including metrics, relationships, and advanced security features.

### Basic Usage

```python
from dagster import Definitions
from dagster_sigma import SigmaOrganization, load_sigma_asset_specs

sigma = SigmaOrganization(
    base_url="https://aws-api.sigmacomputing.com",
    client_id="my_client_id",
    client_secret="my_client_secret",
)

# Load all Sigma assets (workbooks, datasets, data models)
sigma_specs = load_sigma_asset_specs(sigma)

defs = Definitions(
    assets=sigma_specs,
    resources={"sigma": sigma},
)

Using Data Models Only (Recommended)

For new integrations, we recommend using data models exclusively:

from dagster_sigma import SigmaFilter

sigma_filter = SigmaFilter(
    include_datasets=False,  # Exclude legacy datasets
    include_data_models=True,  # Include new data models
)

sigma_specs = load_sigma_asset_specs(
    sigma,
    sigma_filter=sigma_filter,
)

Migrating from Datasets to Data Models

Sigma datasets are deprecated and will be removed on June 2, 2026. To prepare:

  1. Create data models in Sigma UI to replace your datasets
  2. Update your Dagster code to load data models
  3. Verify asset dependencies are correct

Use the migration analysis tool:

dagster-sigma analyze-migration \
    --base-url "https://aws-api.sigmacomputing.com" \
    --client-id "my_client_id" \
    --client-secret "my_client_secret"

Materializing Data Models

Data models with materialization schedules can be triggered:

from dagster_sigma import build_materialize_data_model_assets_definition

# Get data model specs
data_model_specs = [
    spec for spec in sigma_specs
    if "data_model" in spec.kinds
]

# Build materialization assets
materialize_defs = [
    build_materialize_data_model_assets_definition("sigma", spec)
    for spec in data_model_specs
]

defs = Definitions(
    assets=[*sigma_specs, *materialize_defs],
    resources={"sigma": sigma},
)

Customizing Data Model Assets

Customize how data models are translated to assets:

from dagster_sigma import DagsterSigmaTranslator

class CustomSigmaTranslator(DagsterSigmaTranslator):
    def get_data_model_asset_key(self, data_model):
        # Custom asset key logic
        return AssetKey(["analytics", "data_models", data_model.name])

    def get_data_model_group_name(self, data_model):
        # Group by folder
        return data_model.path.split("/")[0] if data_model.path else None

sigma_specs = load_sigma_asset_specs(
    sigma,
    dagster_sigma_translator=CustomSigmaTranslator(),
)

### 2. Docstring Examples

Ensure all new public APIs have comprehensive docstrings with examples (already included in code above).

### 3. Changelog Entry

**File**: `python_modules/libraries/dagster-sigma/CHANGELOG.md`

```markdown
## [Unreleased]

### Added

- Added support for Sigma Data Models, the next-generation replacement for datasets. Data Models provide enhanced semantic layer capabilities including metrics, relationships, column/row-level security, and more.
- New `SigmaDataModel` class for representing data models
- New `SigmaDataModelTranslatorData` for translation context
- New `SigmaFilter` options: `data_model_folders`, `data_models`, `include_data_models`, `include_datasets`
- New `build_materialize_data_model_assets_definition` function for materializing data models
- New CLI command `dagster-sigma analyze-migration` for analyzing dataset migration status
- New `detect_dataset_to_data_model_migrations` helper function

### Changed

- `load_sigma_asset_specs` now loads both datasets and data models by default
- `SigmaOrganizationData` now includes `data_models` field
- `DagsterSigmaTranslator` now supports translating data models

### Deprecated

- Sigma datasets are deprecated and will be removed on June 2, 2026. Use Data Models instead.

4. Migration Guide

File: python_modules/libraries/dagster-sigma/MIGRATION_GUIDE.md (new)

Create a comprehensive migration guide (content outlined in Phase 7).


Rollout Plan

Phase 1: Alpha Release (Internal Testing)

Duration: 1 week Audience: Internal Dagster Labs team

  • Merge feature branch
  • Deploy to internal test environment
  • Test against Dagster Labs' Sigma organization
  • Gather feedback from internal users
  • Fix critical bugs

Phase 2: Beta Release (Early Adopters)

Duration: 2-4 weeks Audience: Select customers using dagster-sigma

  • Release as beta feature (experimental decorator)
  • Announce in dagster-sigma changelog
  • Create discussion thread in Dagster Slack
  • Gather feedback from early adopters
  • Iterate on API design based on feedback
  • Update documentation based on questions

Phase 3: Stable Release

Duration: Ongoing Audience: All dagster-sigma users

  • Remove experimental decorators
  • Publish blog post announcing feature
  • Update main Dagster docs
  • Add to integration examples
  • Monitor for issues
  • Provide support for migration questions

Phase 4: Dataset Deprecation (2026)

Timeline: June 2, 2026

  • 6 months before: Increase visibility of deprecation warnings
  • 3 months before: Send communication to all dagster-sigma users
  • 1 month before: Final reminder
  • Deprecation date: Remove dataset support from new installations
  • 3 months after: Fully remove dataset code

Risk Mitigation

Risk 1: Unknown API Response Structure

Likelihood: High Impact: High Mitigation:

  • Allocate Phase 0 for thorough API exploration
  • Start with minimal implementation to validate approach
  • Maintain close communication with Sigma team
  • Build flexible data structures that can accommodate changes

Risk 2: Breaking Changes for Existing Users

Likelihood: Medium Impact: High Mitigation:

  • Make all changes backward compatible
  • Default to loading both datasets AND data models
  • Extensive testing before release
  • Clear migration documentation
  • Beta period for feedback

Risk 3: Performance Impact

Likelihood: Medium Impact: Medium Mitigation:

  • Fetch data models in parallel with datasets
  • Respect existing snapshot caching mechanism
  • Add option to disable data models if needed
  • Monitor API rate limits carefully

Risk 4: Incomplete Feature Parity

Likelihood: Medium Impact: Medium Mitigation:

  • Focus on core use cases first (loading and lineage)
  • Clearly document what's supported vs not supported
  • Design extensible architecture for future features
  • Gather user feedback on priority features

Risk 5: Timeline Slippage

Likelihood: Medium Impact: Low Mitigation:

  • Phase 0 API discovery prevents surprises
  • Break work into small, testable increments
  • Have fallback scope (core features vs nice-to-have)
  • Regular progress check-ins

Success Criteria

Technical Success

  • All existing tests pass
  • New tests achieve >90% coverage for new code
  • Type checking passes with no errors
  • Linting passes (make ruff)
  • Manual testing against real Sigma API successful
  • Performance benchmarks meet or exceed dataset loading

User Success

  • Users can load data models alongside datasets
  • Users can filter to use only data models
  • Data model dependencies correctly resolved
  • Materialization works for data models
  • Documentation clear and comprehensive
  • Migration path well-defined

Project Success

  • Feature shipped within estimated timeline
  • No critical bugs in first 2 weeks
  • Positive feedback from early adopters
  • Clear path forward for dataset deprecation
  • Foundation for future data model features

Estimated Timeline

Phase Duration Dependencies
Phase 0: API Discovery 0.5-1 day Sigma API access
Phase 1: Data Structures 0.5-1 day Phase 0 complete
Phase 2: API Client 1-1.5 days Phase 1 complete
Phase 3: Translation 1-1.5 days Phase 2 complete
Phase 4: Asset Loading 0.5-1 day Phase 3 complete
Phase 5: Materialization 1-1.5 days Phase 4 complete
Phase 6: Components 0.5-1 day Phase 5 complete
Phase 7: Migration 0.5-1 day All phases complete
Testing 1-2 days Development complete
Documentation 1 day Testing complete
Total 7-10 days

Next Steps

  1. Review this plan with the team
  2. Obtain Sigma API access for test environment
  3. Execute Phase 0 (API discovery)
  4. Refine estimates based on Phase 0 findings
  5. Begin implementation following the phases

Questions for Discussion

  1. Should we support data models from day one or release in stages?
  2. What's the priority: feature completeness vs fast release?
  3. Should we make data models opt-in or opt-out?
  4. How aggressively should we deprecate datasets?
  5. Do we need Sigma partnership/support for this work?
  6. Should metrics get their own asset specs or be metadata?
  7. How should we handle data model relationships in the lineage graph?

Appendix A: API Endpoint Summary

Endpoint Method Purpose Priority
/v2/data-models GET List all data models Critical
/v2/data-models/{id} GET Get data model details High
/v2/data-models/{id}/sources GET Get data sources Critical
/v2/data-models/{id}/materialization-schedules GET Get schedules High
/v2/data-models/{id}/elements/{elem}/materialize POST Trigger materialization High
/v2/data-models/{id}/materializations/{mat} GET Check materialization status High
/v2/data-models/{id}/source-swap POST Swap sources Low

Appendix B: Code Structure Summary

dagster-sigma/
├── dagster_sigma/
│   ├── __init__.py           [Export new classes]
│   ├── resource.py           [+6 new methods, ~150 lines]
│   ├── translator.py         [+4 new classes, ~300 lines]
│   ├── assets.py             [+2 new functions, ~100 lines]
│   ├── cli.py                [+1 new command, ~50 lines]
│   └── components/
│       └── sigma_component.py [Minor updates, ~20 lines]
├── dagster_sigma_tests/
│   ├── conftest.py           [+100 lines fixtures]
│   ├── test_data_models.py   [NEW FILE, ~300 lines]
│   ├── test_materialization.py [NEW FILE, ~150 lines]
│   ├── test_resource.py      [+50 lines]
│   ├── test_translator.py    [+50 lines]
│   ├── test_asset_specs.py   [+50 lines]
│   └── test_components.py    [+30 lines]
├── README.md                 [+200 lines]
├── MIGRATION_GUIDE.md        [NEW FILE, ~500 lines]
└── CHANGELOG.md              [+20 lines]

Total new/modified lines: ~2,000-2,500

End of Implementation Plan

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment