This document outlines a detailed plan to add support for Sigma's new Data Models API to the dagster-sigma library. Data Models are replacing the deprecated datasets (end-of-life: June 2, 2026) and provide enhanced semantic layer capabilities including metrics, relationships, column/row-level security, and more.
Timeline: ~5-8 days development + testing Priority: Medium-High (18 month deprecation timeline for datasets) Complexity: Moderate
- Phase 0: API Discovery & Validation
- Phase 1: Core Data Structure Implementation
- Phase 2: API Client Extensions
- Phase 3: Translation Layer
- Phase 4: Asset Loading Integration
- Phase 5: Materialization Support
- Phase 6: Component Framework Integration
- Phase 7: Migration Support & Deprecation
- Testing Strategy
- Documentation Requirements
- Rollout Plan
- Risk Mitigation
Goal: Understand the exact API response structure before implementation Duration: 0.5-1 day Priority: Critical - blocks all other work
- Obtain Sigma API credentials for a test organization
- Create a test data model in Sigma UI with:
- Multiple data sources (tables, datasets)
- At least one relationship
- At least one metric
- A materialization schedule
- Document the test data model structure
Create a standalone Python script to explore all data model endpoints:
# scripts/explore_data_models_api.py
import asyncio
import json
import aiohttp
from typing import Any
async def explore_data_models_api():
"""Explore Sigma Data Models API endpoints and save responses."""
base_url = "https://aws-api.sigmacomputing.com"
client_id = "YOUR_CLIENT_ID"
client_secret = "YOUR_CLIENT_SECRET"
# Get auth token
token = await get_auth_token(base_url, client_id, client_secret)
headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json",
"X-Sigma-Partner-Id": "dagster"
}
async with aiohttp.ClientSession(headers=headers) as session:
# 1. List all data models
data_models = await fetch_and_save(
session,
f"{base_url}/v2/data-models?limit=1000",
"data_models_list.json"
)
if data_models and data_models.get("entries"):
data_model_id = data_models["entries"][0]["dataModelId"]
# 2. Get specific data model details
await fetch_and_save(
session,
f"{base_url}/v2/data-models/{data_model_id}",
"data_model_detail.json"
)
# 3. List data model sources
await fetch_and_save(
session,
f"{base_url}/v2/data-models/{data_model_id}/sources",
"data_model_sources.json"
)
# 4. List materialization schedules
await fetch_and_save(
session,
f"{base_url}/v2/data-models/{data_model_id}/materialization-schedules",
"data_model_schedules.json"
)
# 5. Check if there's a pages/elements equivalent
try:
await fetch_and_save(
session,
f"{base_url}/v2/data-models/{data_model_id}/pages",
"data_model_pages.json"
)
except:
print("No pages endpoint for data models")
# 6. Check for lineage endpoint
try:
await fetch_and_save(
session,
f"{base_url}/v2/data-models/{data_model_id}/lineage",
"data_model_lineage.json"
)
except:
print("No lineage endpoint for data models")
async def fetch_and_save(session, url, filename):
"""Fetch JSON and save to file."""
async with session.get(url) as resp:
data = await resp.json()
with open(f"api_exploration/{filename}", "w") as f:
json.dump(data, f, indent=2)
return data
if __name__ == "__main__":
asyncio.run(explore_data_models_api())- Document all fields returned by each endpoint
- Identify differences from dataset API responses
- Map data model concepts to existing dataset concepts
- Identify new concepts (metrics, relationships) and how they're represented
- Create a comparison table: Datasets vs Data Models
Answer these questions through API exploration:
- Identity: How are data models uniquely identified? (dataModelId?)
- Sources: How are upstream dependencies represented in
/sourcesendpoint? - Relationships: Are relationships exposed via API or embedded in data model?
- Metrics: Are metrics separate entities or part of data model metadata?
- Materialization: Same pattern as workbooks/datasets or different?
- Lineage: Is there a lineage endpoint or is dependency info in sources?
- Columns: How are column definitions exposed?
- Ownership: Same
ownerIdpattern as workbooks? - Paths: Do data models have folder paths like workbooks?
- Versioning: How is version tagging represented?
Deliverable: api_exploration/DATA_MODELS_API_SCHEMA.md document
Goal: Define the data model classes using @record decorator
Duration: 0.5-1 day
File: python_modules/libraries/dagster-sigma/dagster_sigma/translator.py
@record
class SigmaDataModel:
"""Represents a Sigma Data Model.
Data Models are Sigma's next-generation semantic layer that replaces datasets
(deprecated June 2026). They provide enhanced governance, metrics, relationships,
and security features.
See: https://help.sigmacomputing.com/docs/get-started-with-data-modeling
"""
properties: dict[str, Any]
"""Raw data model properties from API response."""
sources: set[str]
"""Set of upstream dependency inodes (tables, datasets, or other data models)."""
relationships: list[dict[str, Any]]
"""Relationships defined between data sources."""
metrics: list[dict[str, Any]]
"""Metrics defined in this data model."""
@property
def data_model_id(self) -> str:
"""The unique identifier for this data model."""
return self.properties["dataModelId"]
@property
def name(self) -> str:
"""The name of this data model."""
return self.properties["name"]
@property
def url(self) -> str:
"""The URL to view this data model in Sigma."""
return self.properties["url"]
@property
def path(self) -> str:
"""The folder path where this data model is located."""
return self.properties.get("path", "")
@property
def description(self) -> str:
"""The description of this data model."""
return self.properties.get("description", "")
@property
def created_at(self) -> str:
"""ISO 8601 timestamp when data model was created."""
return self.properties["createdAt"]
@property
def owner_id(self) -> str:
"""The ID of the user who owns this data model."""
return self.properties.get("ownerId", "")
@cached_method
def get_upstream_table_deps(self, organization_data: "SigmaOrganizationData") -> set[str]:
"""Get all upstream table dependencies, including transitive dependencies."""
deps = set()
for source_inode in self.sources:
# Direct table dependency
if source_inode in organization_data.tables:
deps.add(source_inode)
# Dataset dependency (resolve its tables)
elif source_inode in organization_data.datasets:
dataset = organization_data.get_datasets_by_inode()[source_inode]
deps.update(dataset.inputs)
# Data model dependency (resolve recursively)
elif source_inode in organization_data.data_models:
data_model = organization_data.get_data_models_by_inode()[source_inode]
deps.update(data_model.get_upstream_table_deps(organization_data))
return deps@record
class SigmaOrganizationData:
"""Root data structure containing all Sigma organization assets."""
workbooks: dict[str, SigmaWorkbook]
datasets: dict[str, SigmaDataset]
data_models: dict[str, SigmaDataModel] # NEW
tables: dict[str, SigmaTable]
@cached_method
def get_data_models_by_inode(self) -> dict[str, SigmaDataModel]:
"""Get data models indexed by their inode identifier."""
# Implementation depends on how data models expose inodes
# May need to extract from urlId or other field
return {
self._extract_inode(dm): dm
for dm in self.data_models.values()
}
def _extract_inode(self, data_model: SigmaDataModel) -> str:
"""Extract inode identifier from data model."""
# TODO: Determine from API exploration
# Likely similar to: f"inode-{data_model.properties['urlId']}"
pass@record
class SigmaDataModelTranslatorData:
"""Wrapper providing data model with organization context for translation."""
data_model: SigmaDataModel
organization_data: SigmaOrganizationDataclass SigmaDataModelMetadataSet(NamespacedMetadataSet):
"""Metadata for Sigma Data Model assets."""
web_url: Optional[str] = None
"""The URL to the data model in Sigma."""
created_at: Optional[str] = None
"""ISO 8601 timestamp when data model was created."""
description: Optional[str] = None
"""The data model description."""
properties: Optional[dict[str, Any]] = None
"""Raw properties from Sigma API."""
sources: Optional[list[dict[str, Any]]] = None
"""List of data sources feeding this data model."""
relationships: Optional[list[dict[str, Any]]] = None
"""Relationships defined in this data model."""
metrics: Optional[list[dict[str, Any]]] = None
"""Metrics defined in this data model."""
data_model_id: Optional[str] = None
"""The unique identifier for this data model."""
@classmethod
def namespace(cls) -> str:
return "dagster_sigma"Deliverable: Updated translator.py with new classes
Goal: Add API methods to fetch data model information
Duration: 1-1.5 days
File: python_modules/libraries/dagster-sigma/dagster_sigma/resource.py
class SigmaOrganization(ConfigurableResource):
"""Sigma API client resource."""
# ... existing fields ...
async def _fetch_data_models(self) -> list[dict[str, Any]]:
"""Fetch all data models in the organization.
Returns:
List of data model objects from Sigma API.
"""
return await self._fetch_json_async_paginated_entries(
"/v2/data-models",
params={"limit": 1000}
)
async def _fetch_data_model_sources(
self,
data_model_id: str
) -> list[dict[str, Any]]:
"""Fetch data sources for a specific data model.
Args:
data_model_id: The unique identifier for the data model.
Returns:
List of source objects (tables, datasets, or data models).
"""
return await self._fetch_json_async_paginated_entries(
f"/v2/data-models/{data_model_id}/sources",
params={"limit": 1000}
)
async def _fetch_materialization_schedules_for_data_model(
self,
data_model_id: str
) -> list[dict[str, Any]]:
"""Fetch materialization schedules for a data model.
Args:
data_model_id: The unique identifier for the data model.
Returns:
List of materialization schedule objects.
"""
return await self._fetch_json_async_paginated_entries(
f"/v2/data-models/{data_model_id}/materialization-schedules",
params={"limit": 1000}
)@record
class SigmaFilter:
"""Filters for selecting which Sigma objects to load."""
workbook_folders: Sequence[str] = []
"""List of workbook folder paths to include."""
workbooks: Sequence[str] = []
"""List of specific workbook paths to include."""
include_unused_datasets: bool = False
"""Whether to include datasets not referenced in any workbook."""
data_model_folders: Sequence[str] = []
"""List of data model folder paths to include. NEW"""
data_models: Sequence[str] = []
"""List of specific data model paths to include. NEW"""
include_data_models: bool = True
"""Whether to load data models at all. Set to False to only use legacy datasets. NEW"""
include_datasets: bool = True
"""Whether to load datasets. Set to False to only use new data models. NEW"""async def build_organization_data(
self,
sigma_filter: Optional[SigmaFilter] = None,
fetch_column_data: bool = False,
fetch_lineage_data: bool = True,
) -> SigmaOrganizationData:
"""Fetch and organize all Sigma assets.
Args:
sigma_filter: Optional filter for selecting which assets to load.
fetch_column_data: Whether to fetch column metadata.
fetch_lineage_data: Whether to fetch lineage information.
Returns:
SigmaOrganizationData with all fetched assets.
"""
sigma_filter = sigma_filter or SigmaFilter()
# Fetch all asset types in parallel
(
raw_workbooks,
raw_datasets,
raw_data_models, # NEW
raw_tables,
members,
) = await asyncio.gather(
self._fetch_workbooks(),
self._fetch_datasets() if sigma_filter.include_datasets else self._return_empty_list(),
self._fetch_data_models() if sigma_filter.include_data_models else self._return_empty_list(), # NEW
self._fetch_tables(),
self._fetch_members(),
)
# Filter data models by path
filtered_data_models = self._filter_data_models_by_path(
raw_data_models,
sigma_filter
)
# Build data model objects with sources
data_models = await self._build_data_model_objects(
filtered_data_models,
fetch_lineage_data
)
# ... rest of existing logic ...
return SigmaOrganizationData(
workbooks=workbooks_by_id,
datasets=datasets_by_id,
data_models=data_models_by_id, # NEW
tables=tables_by_inode,
)
async def _return_empty_list(self) -> list:
"""Helper to return empty list in async gather."""
return []
def _filter_data_models_by_path(
self,
data_models: list[dict[str, Any]],
sigma_filter: SigmaFilter,
) -> list[dict[str, Any]]:
"""Filter data models based on folder and path filters."""
if not sigma_filter.data_model_folders and not sigma_filter.data_models:
return data_models
filtered = []
for dm in data_models:
path = dm.get("path", "")
name = dm.get("name", "")
full_path = f"{path}/{name}" if path else name
# Check folder filters
if sigma_filter.data_model_folders:
if any(path.startswith(folder) for folder in sigma_filter.data_model_folders):
filtered.append(dm)
continue
# Check explicit data model filters
if sigma_filter.data_models:
if full_path in sigma_filter.data_models:
filtered.append(dm)
continue
return filtered if (sigma_filter.data_model_folders or sigma_filter.data_models) else data_models
async def _build_data_model_objects(
self,
data_models: list[dict[str, Any]],
fetch_lineage_data: bool,
) -> dict[str, SigmaDataModel]:
"""Build SigmaDataModel objects with enriched data.
Args:
data_models: Raw data model dictionaries from API.
fetch_lineage_data: Whether to fetch source dependencies.
Returns:
Dictionary mapping data model ID to SigmaDataModel object.
"""
if not fetch_lineage_data:
# Return basic data models without sources
return {
dm["dataModelId"]: SigmaDataModel(
properties=dm,
sources=set(),
relationships=[],
metrics=[],
)
for dm in data_models
}
# Fetch sources for all data models in parallel
sources_by_id = await asyncio.gather(*[
self._fetch_data_model_sources(dm["dataModelId"])
for dm in data_models
])
data_models_by_id = {}
for dm, sources in zip(data_models, sources_by_id):
# Extract source inodes from response
source_inodes = {
self._extract_source_inode(source)
for source in sources
}
# TODO: Extract relationships and metrics from dm or sources
# This depends on API response structure
relationships = dm.get("relationships", [])
metrics = dm.get("metrics", [])
data_models_by_id[dm["dataModelId"]] = SigmaDataModel(
properties=dm,
sources=source_inodes,
relationships=relationships,
metrics=metrics,
)
return data_models_by_id
def _extract_source_inode(self, source: dict[str, Any]) -> str:
"""Extract inode identifier from a data model source object.
Args:
source: Source object from /sources API endpoint.
Returns:
Inode identifier string.
"""
# TODO: Implement based on API exploration
# Likely patterns:
# - source["inode"]
# - source["nodeId"]
# - f"inode-{source['urlId']}"
passDeliverable: Updated resource.py with data model fetching
Goal: Translate data models to Dagster AssetSpecs
Duration: 1-1.5 days
File: python_modules/libraries/dagster-sigma/dagster_sigma/translator.py
class DagsterSigmaTranslator:
"""Translator for converting Sigma objects to Dagster AssetSpecs."""
def get_asset_spec(
self,
data: Union[
SigmaWorkbookTranslatorData,
SigmaDatasetTranslatorData,
SigmaDataModelTranslatorData # NEW
]
) -> AssetSpec:
"""Convert Sigma object to AssetSpec.
Args:
data: Translator data wrapper for a Sigma object.
Returns:
AssetSpec for the Sigma object.
"""
if isinstance(data, SigmaWorkbookTranslatorData):
return self._get_workbook_spec(data)
elif isinstance(data, SigmaDatasetTranslatorData):
return self._get_dataset_spec(data)
elif isinstance(data, SigmaDataModelTranslatorData): # NEW
return self._get_data_model_spec(data)
else:
raise ValueError(f"Unsupported translator data type: {type(data)}")
def _get_data_model_spec(
self,
data: SigmaDataModelTranslatorData
) -> AssetSpec:
"""Convert a Sigma Data Model to an AssetSpec.
Args:
data: Data model translator data.
Returns:
AssetSpec for the data model.
"""
data_model = data.data_model
org_data = data.organization_data
# Get upstream dependencies
deps = self._get_data_model_deps(data_model, org_data)
# Build metadata
metadata = SigmaDataModelMetadataSet(
web_url=data_model.url,
created_at=data_model.created_at,
description=data_model.description,
properties=data_model.properties,
sources=[
org_data.get_object_by_inode(inode)
for inode in data_model.sources
],
relationships=data_model.relationships,
metrics=data_model.metrics,
data_model_id=data_model.data_model_id,
)
# Determine owner
owner = None
if data_model.owner_id:
member = org_data.members.get(data_model.owner_id)
if member:
owner = member.get("email")
return AssetSpec(
key=self.get_data_model_asset_key(data_model),
deps=deps,
description=data_model.description or None,
metadata=metadata,
kinds={"sigma", "data_model"},
owners=[owner] if owner else None,
tags=self.get_data_model_tags(data_model),
group_name=self.get_data_model_group_name(data_model),
)
def _get_data_model_deps(
self,
data_model: SigmaDataModel,
org_data: SigmaOrganizationData,
) -> set[AssetKey]:
"""Get upstream asset dependencies for a data model.
Data models can depend on:
- Tables (direct warehouse tables)
- Datasets (legacy)
- Other data models (recursive)
Args:
data_model: The data model to get dependencies for.
org_data: Organization data for resolving references.
Returns:
Set of AssetKeys for upstream dependencies.
"""
deps = set()
for source_inode in data_model.sources:
# Table dependency
if source_inode in org_data.tables:
table = org_data.tables[source_inode]
deps.add(self.get_table_asset_key(table))
# Dataset dependency
elif source_inode in org_data.get_datasets_by_inode():
dataset = org_data.get_datasets_by_inode()[source_inode]
deps.add(self.get_dataset_asset_key(dataset))
# Data model dependency (recursive)
elif source_inode in org_data.get_data_models_by_inode():
upstream_dm = org_data.get_data_models_by_inode()[source_inode]
deps.add(self.get_data_model_asset_key(upstream_dm))
return deps
# Customization hooks for data models
def get_data_model_asset_key(self, data_model: SigmaDataModel) -> AssetKey:
"""Get the asset key for a data model.
Default: AssetKey(["sigma", "data_model", <sanitized_name>])
Args:
data_model: The data model.
Returns:
AssetKey for the data model.
"""
return AssetKey(["sigma", "data_model", self._sanitize_name(data_model.name)])
def get_data_model_tags(self, data_model: SigmaDataModel) -> dict[str, str]:
"""Get tags for a data model asset.
Args:
data_model: The data model.
Returns:
Dictionary of tags.
"""
tags = {}
if data_model.path:
tags["sigma/path"] = data_model.path
return tags
def get_data_model_group_name(self, data_model: SigmaDataModel) -> Optional[str]:
"""Get the group name for a data model asset.
Args:
data_model: The data model.
Returns:
Group name or None.
"""
return NoneDeliverable: Updated translator.py with data model translation
Goal: Integrate data models into asset loading flow
Duration: 0.5-1 day
File: python_modules/libraries/dagster-sigma/dagster_sigma/assets.py
@experimental
def load_sigma_asset_specs(
organization: SigmaOrganization,
dagster_sigma_translator: DagsterSigmaTranslator = DagsterSigmaTranslator(),
sigma_filter: Optional[SigmaFilter] = None,
fetch_column_data: bool = False,
fetch_lineage_data: bool = True,
snapshot_path: Optional[str] = None,
) -> list[AssetSpec]:
"""Load Sigma assets as Dagster AssetSpecs.
Loads workbooks, datasets, data models, and their dependencies.
Args:
organization: The SigmaOrganization resource.
dagster_sigma_translator: Translator for customizing asset specs.
sigma_filter: Optional filter for selecting which assets to load.
fetch_column_data: Whether to fetch column metadata (slower).
fetch_lineage_data: Whether to fetch lineage/dependency information.
snapshot_path: Optional path to cached snapshot file.
Returns:
List of AssetSpecs for all Sigma assets.
Examples:
Basic usage with both datasets and data models:
.. code-block:: python
from dagster_sigma import SigmaOrganization, load_sigma_asset_specs
sigma = SigmaOrganization(
base_url="https://aws-api.sigmacomputing.com",
client_id="my_client_id",
client_secret="my_client_secret",
)
# Load all Sigma assets
sigma_specs = load_sigma_asset_specs(sigma)
Using only data models (recommended for new integrations):
.. code-block:: python
from dagster_sigma import SigmaOrganization, SigmaFilter, load_sigma_asset_specs
# Only load data models, not legacy datasets
sigma_filter = SigmaFilter(
include_datasets=False,
include_data_models=True,
)
sigma_specs = load_sigma_asset_specs(
sigma,
sigma_filter=sigma_filter,
)
"""
if snapshot_path:
org_data = organization._load_snapshot(snapshot_path)
else:
org_data = organization.build_organization_data(
sigma_filter=sigma_filter,
fetch_column_data=fetch_column_data,
fetch_lineage_data=fetch_lineage_data,
)
specs = []
# Translate workbooks
for workbook in org_data.workbooks.values():
spec = dagster_sigma_translator.get_asset_spec(
SigmaWorkbookTranslatorData(
workbook=workbook,
organization_data=org_data,
)
)
specs.append(spec)
# Translate datasets (legacy)
for dataset in org_data.datasets.values():
spec = dagster_sigma_translator.get_asset_spec(
SigmaDatasetTranslatorData(
dataset=dataset,
organization_data=org_data,
)
)
specs.append(spec)
# Translate data models (NEW)
for data_model in org_data.data_models.values():
spec = dagster_sigma_translator.get_asset_spec(
SigmaDataModelTranslatorData(
data_model=data_model,
organization_data=org_data,
)
)
specs.append(spec)
return specsDeliverable: Updated assets.py with data model support
Goal: Support triggering data model materializations
Duration: 1-1.5 days
Files: resource.py, assets.py
class SigmaOrganization(ConfigurableResource):
"""Sigma API client resource."""
def run_materializations_for_data_model(
self,
spec: AssetSpec,
element_id: Optional[str] = None,
) -> None:
"""Trigger materialization for a data model.
Args:
spec: AssetSpec for the data model.
element_id: Optional specific element to materialize.
If not provided, materializes all scheduled elements.
"""
metadata = SigmaDataModelMetadataSet.extract(spec.metadata)
data_model_id = metadata.data_model_id
if not data_model_id:
raise ValueError(f"No data_model_id in metadata for {spec.key}")
# Get materialization schedules
schedules = self._fetch_materialization_schedules_for_data_model_sync(
data_model_id
)
if not schedules:
raise ValueError(f"No materialization schedules found for data model {data_model_id}")
# Trigger materializations
materialization_ids = []
for schedule in schedules:
if element_id and schedule.get("elementId") != element_id:
continue
mat_id = self._trigger_data_model_materialization_sync(
data_model_id,
schedule["elementId"],
)
materialization_ids.append((schedule["elementId"], mat_id))
# Poll until all complete
for elem_id, mat_id in materialization_ids:
self._poll_data_model_materialization_sync(
data_model_id,
mat_id,
elem_id,
)
def _fetch_materialization_schedules_for_data_model_sync(
self,
data_model_id: str,
) -> list[dict[str, Any]]:
"""Synchronous version of _fetch_materialization_schedules_for_data_model."""
return asyncio.run(
self._fetch_materialization_schedules_for_data_model(data_model_id)
)
def _trigger_data_model_materialization_sync(
self,
data_model_id: str,
element_id: str,
) -> str:
"""Trigger a data model element materialization.
Args:
data_model_id: The data model ID.
element_id: The element ID to materialize.
Returns:
Materialization ID for polling.
"""
response = requests.post(
f"{self.base_url.value}/v2/data-models/{data_model_id}/elements/{element_id}/materialize",
headers=self._get_headers(),
timeout=30,
)
response.raise_for_status()
data = response.json()
return data["materializationId"]
def _poll_data_model_materialization_sync(
self,
data_model_id: str,
materialization_id: str,
element_id: str,
) -> None:
"""Poll materialization status until complete.
Args:
data_model_id: The data model ID.
materialization_id: The materialization job ID.
element_id: The element ID being materialized.
"""
import time
max_attempts = 60
attempt = 0
while attempt < max_attempts:
response = requests.get(
f"{self.base_url.value}/v2/data-models/{data_model_id}/materializations/{materialization_id}",
headers=self._get_headers(),
timeout=30,
)
response.raise_for_status()
data = response.json()
status = data["status"]
if status == "ready":
return
elif status == "failed":
raise RuntimeError(
f"Materialization failed for data model {data_model_id} "
f"element {element_id}: {data.get('error', 'Unknown error')}"
)
time.sleep(2)
attempt += 1
raise TimeoutError(
f"Materialization timed out for data model {data_model_id} "
f"element {element_id} after {max_attempts * 2} seconds"
)def build_materialize_data_model_assets_definition(
resource_key: str,
spec: AssetSpec,
) -> AssetsDefinition:
"""Build an AssetsDefinition for materializing a Sigma data model.
This creates a materialization asset that triggers scheduled materializations
for the data model in Sigma.
Args:
resource_key: The resource key for the SigmaOrganization resource.
spec: The AssetSpec for the data model.
Returns:
AssetsDefinition that materializes the data model.
Examples:
.. code-block:: python
from dagster import Definitions
from dagster_sigma import (
SigmaOrganization,
load_sigma_asset_specs,
build_materialize_data_model_assets_definition,
)
sigma = SigmaOrganization(...)
sigma_specs = load_sigma_asset_specs(sigma)
# Find data model specs
data_model_specs = [
spec for spec in sigma_specs
if "data_model" in spec.kinds
]
# Build materialization assets
materialize_defs = [
build_materialize_data_model_assets_definition("sigma", spec)
for spec in data_model_specs
]
defs = Definitions(
assets=[*sigma_specs, *materialize_defs],
resources={"sigma": sigma},
)
"""
@multi_asset(
name=f"materialize_{spec.key.to_python_identifier()}",
specs=[spec],
required_resource_keys={resource_key},
)
def _materialize_data_model(context):
sigma: SigmaOrganization = getattr(context.resources, resource_key)
sigma.run_materializations_for_data_model(spec)
return {spec.key: MaterializeResult()}
return _materialize_data_modelDeliverable: Updated resource.py and assets.py with materialization support
Goal: Add data model support to SigmaComponent
Duration: 0.5-1 day
File: python_modules/libraries/dagster-sigma/dagster_sigma/components/sigma_component.py
The component should automatically work with data models since it uses load_sigma_asset_specs under the hood. However, we should:
- Update type hints to include
SigmaDataModelTranslatorData - Update examples to show data model usage
- Test that component loading works correctly
# Update type hints
def get_asset_spec(
self,
data: Union[
SigmaWorkbookTranslatorData,
SigmaDatasetTranslatorData,
SigmaDataModelTranslatorData, # NEW
]
) -> AssetSpec:
"""Override to customize asset specs."""
passDeliverable: Updated sigma_component.py with data model support
Goal: Help users migrate from datasets to data models Duration: 0.5-1 day Files: Multiple
# In translator.py
def _get_dataset_spec(self, data: SigmaDatasetTranslatorData) -> AssetSpec:
"""Convert a Sigma Dataset to an AssetSpec.
.. deprecated:: 1.9.0
Sigma datasets are deprecated and will be removed on June 2, 2026.
Use Sigma Data Models instead by setting `SigmaFilter(include_datasets=False, include_data_models=True)`.
See: https://help.sigmacomputing.com/docs/get-started-with-data-modeling
"""
warnings.warn(
"Sigma datasets are deprecated and will be removed on June 2, 2026. "
"Migrate to Sigma Data Models. "
"See: https://help.sigmacomputing.com/docs/get-started-with-data-modeling",
DeprecationWarning,
stacklevel=2,
)
# ... existing implementation ...# In assets.py
def detect_dataset_to_data_model_migrations(
organization: SigmaOrganization,
) -> dict[str, str]:
"""Detect datasets that have been migrated to data models.
This helper identifies datasets that have corresponding data models
with the same name, indicating a migration has occurred.
Args:
organization: The SigmaOrganization resource.
Returns:
Dictionary mapping dataset ID to corresponding data model ID.
Examples:
.. code-block:: python
from dagster_sigma import SigmaOrganization, detect_dataset_to_data_model_migrations
sigma = SigmaOrganization(...)
migrations = detect_dataset_to_data_model_migrations(sigma)
for dataset_id, data_model_id in migrations.items():
print(f"Dataset {dataset_id} has been migrated to data model {data_model_id}")
"""
org_data = organization.build_organization_data()
migrations = {}
for dataset_id, dataset in org_data.datasets.items():
# Look for data model with matching name
for dm_id, data_model in org_data.data_models.items():
if dataset.name == data_model.name:
migrations[dataset_id] = dm_id
break
return migrations# In cli.py
@sigma_group.command(name="analyze-migration")
@click.option("--base-url", required=True, help="Sigma base URL")
@click.option("--client-id", required=True, help="Sigma client ID")
@click.option("--client-secret", required=True, help="Sigma client secret")
def analyze_migration_command(base_url: str, client_id: str, client_secret: str):
"""Analyze dataset to data model migration status.
This command helps identify which datasets have been migrated to data models
and which still need migration.
"""
sigma = SigmaOrganization(
base_url=base_url,
client_id=client_id,
client_secret=client_secret,
)
org_data = sigma.build_organization_data()
migrations = detect_dataset_to_data_model_migrations(sigma)
click.echo("\n=== Dataset Migration Analysis ===\n")
click.echo(f"Total datasets: {len(org_data.datasets)}")
click.echo(f"Total data models: {len(org_data.data_models)}")
click.echo(f"Migrated datasets: {len(migrations)}")
click.echo(f"Datasets pending migration: {len(org_data.datasets) - len(migrations)}")
if migrations:
click.echo("\n--- Migrated Datasets ---")
for dataset_id, dm_id in migrations.items():
dataset = org_data.datasets[dataset_id]
data_model = org_data.data_models[dm_id]
click.echo(f" ✓ {dataset.name} → {data_model.name}")
pending = set(org_data.datasets.keys()) - set(migrations.keys())
if pending:
click.echo("\n--- Datasets Pending Migration ---")
for dataset_id in pending:
dataset = org_data.datasets[dataset_id]
click.echo(f" ⚠ {dataset.name}")Deliverable: Migration support utilities and documentation
Comprehensive testing is critical for this feature. We'll follow the existing test patterns in dagster-sigma and expand them for data models.
File: dagster_sigma_tests/test_data_models.py
import pytest
from dagster_sigma import (
SigmaDataModel,
SigmaDataModelTranslatorData,
DagsterSigmaTranslator,
)
class TestSigmaDataModelDataStructures:
"""Test data model record classes."""
def test_data_model_properties(self):
"""Test SigmaDataModel property accessors."""
dm = SigmaDataModel(
properties={
"dataModelId": "dm-123",
"name": "Sales Data Model",
"url": "https://app.sigmacomputing.com/...",
"path": "Analytics/Sales",
"description": "Sales analytics data model",
"createdAt": "2024-01-15T10:00:00Z",
"ownerId": "user-456",
},
sources={"inode-abc", "inode-def"},
relationships=[],
metrics=[],
)
assert dm.data_model_id == "dm-123"
assert dm.name == "Sales Data Model"
assert dm.url == "https://app.sigmacomputing.com/..."
assert dm.path == "Analytics/Sales"
assert dm.description == "Sales analytics data model"
assert dm.created_at == "2024-01-15T10:00:00Z"
assert dm.owner_id == "user-456"
assert len(dm.sources) == 2
def test_data_model_upstream_table_deps_direct(self, sample_org_data):
"""Test resolving direct table dependencies."""
dm = sample_org_data.data_models["dm-123"]
deps = dm.get_upstream_table_deps(sample_org_data)
assert "inode-table-1" in deps
assert len(deps) == 1
def test_data_model_upstream_table_deps_via_dataset(self, sample_org_data):
"""Test resolving table dependencies through datasets."""
dm = sample_org_data.data_models["dm-456"]
deps = dm.get_upstream_table_deps(sample_org_data)
# Should resolve dataset's table dependencies
assert "inode-table-2" in deps
def test_data_model_upstream_table_deps_recursive(self, sample_org_data):
"""Test resolving table dependencies through other data models."""
dm = sample_org_data.data_models["dm-789"]
deps = dm.get_upstream_table_deps(sample_org_data)
# Should recursively resolve through dm-123
assert "inode-table-1" in deps
class TestDataModelTranslation:
"""Test data model to AssetSpec translation."""
def test_translate_data_model_basic(self, sample_data_model, sample_org_data):
"""Test basic data model translation."""
translator = DagsterSigmaTranslator()
data = SigmaDataModelTranslatorData(
data_model=sample_data_model,
organization_data=sample_org_data,
)
spec = translator.get_asset_spec(data)
assert spec.key.path == ["sigma", "data_model", "sales_data_model"]
assert "sigma" in spec.kinds
assert "data_model" in spec.kinds
assert spec.description == "Sales analytics data model"
def test_translate_data_model_with_dependencies(self, sample_data_model, sample_org_data):
"""Test data model translation with upstream dependencies."""
translator = DagsterSigmaTranslator()
data = SigmaDataModelTranslatorData(
data_model=sample_data_model,
organization_data=sample_org_data,
)
spec = translator.get_asset_spec(data)
# Should have table and dataset dependencies
assert len(spec.deps) == 2
def test_translate_data_model_metadata(self, sample_data_model, sample_org_data):
"""Test data model metadata extraction."""
translator = DagsterSigmaTranslator()
data = SigmaDataModelTranslatorData(
data_model=sample_data_model,
organization_data=sample_org_data,
)
spec = translator.get_asset_spec(data)
metadata = spec.metadata
assert "dagster_sigma/web_url" in metadata
assert "dagster_sigma/created_at" in metadata
assert "dagster_sigma/data_model_id" in metadata
assert "dagster_sigma/sources" in metadata
def test_custom_translator_data_model(self):
"""Test custom translator for data models."""
class CustomTranslator(DagsterSigmaTranslator):
def get_data_model_asset_key(self, data_model):
return AssetKey(["custom", data_model.name])
translator = CustomTranslator()
# ... test custom behavior ...File: dagster_sigma_tests/test_resource.py (additions)
class TestDataModelAPIClient:
"""Test data model API client methods."""
@pytest.mark.asyncio
async def test_fetch_data_models(self, sigma_org, sigma_data_model_data):
"""Test fetching data models list."""
data_models = await sigma_org._fetch_data_models()
assert len(data_models) > 0
assert "dataModelId" in data_models[0]
assert "name" in data_models[0]
@pytest.mark.asyncio
async def test_fetch_data_model_sources(self, sigma_org, sigma_data_model_data):
"""Test fetching data model sources."""
sources = await sigma_org._fetch_data_model_sources("dm-123")
assert len(sources) > 0
# Validate source structure
@pytest.mark.asyncio
async def test_build_organization_data_with_data_models(
self, sigma_org, sigma_sample_data
):
"""Test building organization data including data models."""
org_data = await sigma_org.build_organization_data()
assert len(org_data.data_models) > 0
assert len(org_data.datasets) > 0
assert len(org_data.workbooks) > 0
@pytest.mark.asyncio
async def test_filter_data_models_by_path(self, sigma_org, sigma_sample_data):
"""Test filtering data models by folder path."""
filter = SigmaFilter(data_model_folders=["Analytics"])
org_data = await sigma_org.build_organization_data(sigma_filter=filter)
# Only data models in Analytics folder should be loaded
for dm in org_data.data_models.values():
assert dm.path.startswith("Analytics")File: dagster_sigma_tests/test_asset_specs.py (additions)
class TestDataModelAssetSpecs:
"""Test end-to-end asset spec generation with data models."""
def test_load_asset_specs_with_data_models(
self, sigma_org, sigma_sample_data
):
"""Test loading asset specs including data models."""
specs = load_sigma_asset_specs(sigma_org)
data_model_specs = [s for s in specs if "data_model" in s.kinds]
assert len(data_model_specs) > 0
# Verify structure
for spec in data_model_specs:
assert spec.key is not None
assert "sigma" in spec.kinds
assert "data_model" in spec.kinds
def test_load_asset_specs_data_models_only(
self, sigma_org, sigma_sample_data
):
"""Test loading only data models, excluding datasets."""
filter = SigmaFilter(
include_datasets=False,
include_data_models=True,
)
specs = load_sigma_asset_specs(sigma_org, sigma_filter=filter)
dataset_specs = [s for s in specs if "dataset" in s.kinds]
data_model_specs = [s for s in specs if "data_model" in s.kinds]
assert len(dataset_specs) == 0
assert len(data_model_specs) > 0
def test_data_model_dependencies_resolved(
self, sigma_org, sigma_sample_data
):
"""Test that data model dependencies are correctly resolved."""
specs = load_sigma_asset_specs(sigma_org)
# Find a data model spec
dm_spec = next(s for s in specs if "data_model" in s.kinds)
# Should have upstream dependencies
assert len(dm_spec.deps) > 0
# All deps should be valid asset keys
all_keys = {s.key for s in specs}
for dep in dm_spec.deps:
assert dep in all_keys or dep.path[0] != "sigma" # External depFile: dagster_sigma_tests/test_materialization.py (new file)
import pytest
from dagster import materialize
from dagster_sigma import (
SigmaOrganization,
load_sigma_asset_specs,
build_materialize_data_model_assets_definition,
)
class TestDataModelMaterialization:
"""Test data model materialization functionality."""
def test_trigger_data_model_materialization(
self, sigma_org, sigma_data_model_materialization
):
"""Test triggering a data model materialization."""
specs = load_sigma_asset_specs(sigma_org)
dm_spec = next(s for s in specs if "data_model" in s.kinds)
# Should not raise
sigma_org.run_materializations_for_data_model(dm_spec)
def test_build_materialize_data_model_assets_definition(
self, sigma_org, sigma_sample_data
):
"""Test building materialization asset definition."""
specs = load_sigma_asset_specs(sigma_org)
dm_spec = next(s for s in specs if "data_model" in s.kinds)
assets_def = build_materialize_data_model_assets_definition(
"sigma", dm_spec
)
assert assets_def is not None
assert len(assets_def.specs) == 1
def test_materialize_data_model_asset(
self, sigma_org, sigma_data_model_materialization
):
"""Test executing materialization through Dagster."""
specs = load_sigma_asset_specs(sigma_org)
dm_spec = next(s for s in specs if "data_model" in s.kinds)
assets_def = build_materialize_data_model_assets_definition(
"sigma", dm_spec
)
result = materialize(
[assets_def],
resources={"sigma": sigma_org},
)
assert result.successFile: dagster_sigma_tests/test_components.py (additions)
class TestSigmaComponentWithDataModels:
"""Test component framework with data models."""
def test_component_loads_data_models(self, sigma_sample_data):
"""Test that component loads data models."""
component = SigmaComponent(
organization=SigmaOrganization(...),
)
defs = component.build_defs()
data_model_assets = [
asset for asset in defs.get_asset_graph().all_asset_keys
if "data_model" in str(asset)
]
assert len(data_model_assets) > 0
def test_component_filter_data_models_only(self, sigma_sample_data):
"""Test component with data models only filter."""
component = SigmaComponent(
organization=SigmaOrganization(...),
sigma_filter=SigmaFilter(
include_datasets=False,
include_data_models=True,
),
)
defs = component.build_defs()
all_keys = defs.get_asset_graph().all_asset_keys
dataset_assets = [k for k in all_keys if "dataset" in str(k)]
data_model_assets = [k for k in all_keys if "data_model" in str(k)]
assert len(dataset_assets) == 0
assert len(data_model_assets) > 0File: dagster_sigma_tests/conftest.py (additions)
# Sample data model data
SAMPLE_DATA_MODEL_DATA = {
"dataModelId": "dm-123",
"name": "Sales Data Model",
"url": "https://app.sigmacomputing.com/dagster-labs/data-model/abc123",
"path": "Analytics/Sales",
"description": "Sales analytics data model",
"createdAt": "2024-01-15T10:00:00Z",
"updatedAt": "2024-01-15T12:00:00Z",
"ownerId": "8TUQL5YbOwebkUGS0SAdqxlU5R0gD",
}
SAMPLE_DATA_MODEL_SOURCES = [
{
"sourceId": "src-1",
"type": "table",
"nodeId": "inode-C5ZiVq1GkaANMaD334AnA",
"name": "Payments",
},
{
"sourceId": "src-2",
"type": "dataset",
"nodeId": "inode-Iq557kfHN8KRu76HdGSWi",
"name": "Orders Dataset",
},
]
@pytest.fixture(name="sigma_data_model_data")
def sigma_data_model_fixture(responses: aioresponses) -> None:
"""Add mock data model API responses."""
# List data models
responses.add(
method=hdrs.METH_GET,
url="https://aws-api.sigmacomputing.com/v2/data-models?limit=1000",
body=json.dumps(_build_paginated_response([SAMPLE_DATA_MODEL_DATA])),
status=200,
)
# Get data model sources
responses.add(
method=hdrs.METH_GET,
url="https://aws-api.sigmacomputing.com/v2/data-models/dm-123/sources?limit=1000",
body=json.dumps(_build_paginated_response(SAMPLE_DATA_MODEL_SOURCES)),
status=200,
)
# Materialization schedules
responses.add(
method=hdrs.METH_GET,
url="https://aws-api.sigmacomputing.com/v2/data-models/dm-123/materialization-schedules?limit=1000",
body=json.dumps(_build_paginated_response([{"elementId": "elem-1"}])),
status=200,
)
@pytest.fixture(name="sigma_data_model_materialization")
def sigma_data_model_materialization_fixture(responses: aioresponses) -> None:
"""Add mock data model materialization API responses."""
# Trigger materialization
request_responses.add(
method=request_responses.POST,
url=f"{SigmaBaseUrl.AWS_US.value}/v2/data-models/dm-123/elements/elem-1/materialize",
status=200,
body=json.dumps({"materializationId": "mat-123"}),
)
# Check status - pending
request_responses.add(
method=request_responses.GET,
url=f"{SigmaBaseUrl.AWS_US.value}/v2/data-models/dm-123/materializations/mat-123",
status=200,
body=json.dumps({"materializationId": "mat-123", "status": "pending"}),
)
# Check status - ready
request_responses.add(
method=request_responses.GET,
url=f"{SigmaBaseUrl.AWS_US.value}/v2/data-models/dm-123/materializations/mat-123",
status=200,
body=json.dumps({"materializationId": "mat-123", "status": "ready"}),
)-
Run existing tests to ensure no regressions:
pytest python_modules/libraries/dagster-sigma/dagster_sigma_tests/
-
Run new data model tests:
pytest python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_data_models.py -v pytest python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_materialization.py -v
-
Run full test suite:
cd python_modules/libraries/dagster-sigma pytest -
Check code coverage:
pytest --cov=dagster_sigma --cov-report=html
-
Type checking:
cd /Users/christian/code/dagster make quick_pyright -
Linting (MANDATORY):
cd /Users/christian/code/dagster make ruff
- Create real data model in Sigma test organization
- Run
load_sigma_asset_specsagainst real API - Verify asset specs are created correctly
- Verify dependencies are resolved correctly
- Trigger materialization and verify it completes
- Test with SigmaComponent
- Test snapshot functionality
- Test filtering options
- Test with custom translator
- Verify deprecation warnings appear for datasets
File: python_modules/libraries/dagster-sigma/README.md
Add section:
## Working with Data Models
Sigma Data Models are the next-generation replacement for datasets, providing enhanced semantic layer capabilities including metrics, relationships, and advanced security features.
### Basic Usage
```python
from dagster import Definitions
from dagster_sigma import SigmaOrganization, load_sigma_asset_specs
sigma = SigmaOrganization(
base_url="https://aws-api.sigmacomputing.com",
client_id="my_client_id",
client_secret="my_client_secret",
)
# Load all Sigma assets (workbooks, datasets, data models)
sigma_specs = load_sigma_asset_specs(sigma)
defs = Definitions(
assets=sigma_specs,
resources={"sigma": sigma},
)For new integrations, we recommend using data models exclusively:
from dagster_sigma import SigmaFilter
sigma_filter = SigmaFilter(
include_datasets=False, # Exclude legacy datasets
include_data_models=True, # Include new data models
)
sigma_specs = load_sigma_asset_specs(
sigma,
sigma_filter=sigma_filter,
)Sigma datasets are deprecated and will be removed on June 2, 2026. To prepare:
- Create data models in Sigma UI to replace your datasets
- Update your Dagster code to load data models
- Verify asset dependencies are correct
Use the migration analysis tool:
dagster-sigma analyze-migration \
--base-url "https://aws-api.sigmacomputing.com" \
--client-id "my_client_id" \
--client-secret "my_client_secret"Data models with materialization schedules can be triggered:
from dagster_sigma import build_materialize_data_model_assets_definition
# Get data model specs
data_model_specs = [
spec for spec in sigma_specs
if "data_model" in spec.kinds
]
# Build materialization assets
materialize_defs = [
build_materialize_data_model_assets_definition("sigma", spec)
for spec in data_model_specs
]
defs = Definitions(
assets=[*sigma_specs, *materialize_defs],
resources={"sigma": sigma},
)Customize how data models are translated to assets:
from dagster_sigma import DagsterSigmaTranslator
class CustomSigmaTranslator(DagsterSigmaTranslator):
def get_data_model_asset_key(self, data_model):
# Custom asset key logic
return AssetKey(["analytics", "data_models", data_model.name])
def get_data_model_group_name(self, data_model):
# Group by folder
return data_model.path.split("/")[0] if data_model.path else None
sigma_specs = load_sigma_asset_specs(
sigma,
dagster_sigma_translator=CustomSigmaTranslator(),
)
### 2. Docstring Examples
Ensure all new public APIs have comprehensive docstrings with examples (already included in code above).
### 3. Changelog Entry
**File**: `python_modules/libraries/dagster-sigma/CHANGELOG.md`
```markdown
## [Unreleased]
### Added
- Added support for Sigma Data Models, the next-generation replacement for datasets. Data Models provide enhanced semantic layer capabilities including metrics, relationships, column/row-level security, and more.
- New `SigmaDataModel` class for representing data models
- New `SigmaDataModelTranslatorData` for translation context
- New `SigmaFilter` options: `data_model_folders`, `data_models`, `include_data_models`, `include_datasets`
- New `build_materialize_data_model_assets_definition` function for materializing data models
- New CLI command `dagster-sigma analyze-migration` for analyzing dataset migration status
- New `detect_dataset_to_data_model_migrations` helper function
### Changed
- `load_sigma_asset_specs` now loads both datasets and data models by default
- `SigmaOrganizationData` now includes `data_models` field
- `DagsterSigmaTranslator` now supports translating data models
### Deprecated
- Sigma datasets are deprecated and will be removed on June 2, 2026. Use Data Models instead.
File: python_modules/libraries/dagster-sigma/MIGRATION_GUIDE.md (new)
Create a comprehensive migration guide (content outlined in Phase 7).
Duration: 1 week Audience: Internal Dagster Labs team
- Merge feature branch
- Deploy to internal test environment
- Test against Dagster Labs' Sigma organization
- Gather feedback from internal users
- Fix critical bugs
Duration: 2-4 weeks Audience: Select customers using dagster-sigma
- Release as beta feature (experimental decorator)
- Announce in dagster-sigma changelog
- Create discussion thread in Dagster Slack
- Gather feedback from early adopters
- Iterate on API design based on feedback
- Update documentation based on questions
Duration: Ongoing Audience: All dagster-sigma users
- Remove experimental decorators
- Publish blog post announcing feature
- Update main Dagster docs
- Add to integration examples
- Monitor for issues
- Provide support for migration questions
Timeline: June 2, 2026
- 6 months before: Increase visibility of deprecation warnings
- 3 months before: Send communication to all dagster-sigma users
- 1 month before: Final reminder
- Deprecation date: Remove dataset support from new installations
- 3 months after: Fully remove dataset code
Likelihood: High Impact: High Mitigation:
- Allocate Phase 0 for thorough API exploration
- Start with minimal implementation to validate approach
- Maintain close communication with Sigma team
- Build flexible data structures that can accommodate changes
Likelihood: Medium Impact: High Mitigation:
- Make all changes backward compatible
- Default to loading both datasets AND data models
- Extensive testing before release
- Clear migration documentation
- Beta period for feedback
Likelihood: Medium Impact: Medium Mitigation:
- Fetch data models in parallel with datasets
- Respect existing snapshot caching mechanism
- Add option to disable data models if needed
- Monitor API rate limits carefully
Likelihood: Medium Impact: Medium Mitigation:
- Focus on core use cases first (loading and lineage)
- Clearly document what's supported vs not supported
- Design extensible architecture for future features
- Gather user feedback on priority features
Likelihood: Medium Impact: Low Mitigation:
- Phase 0 API discovery prevents surprises
- Break work into small, testable increments
- Have fallback scope (core features vs nice-to-have)
- Regular progress check-ins
- All existing tests pass
- New tests achieve >90% coverage for new code
- Type checking passes with no errors
- Linting passes (make ruff)
- Manual testing against real Sigma API successful
- Performance benchmarks meet or exceed dataset loading
- Users can load data models alongside datasets
- Users can filter to use only data models
- Data model dependencies correctly resolved
- Materialization works for data models
- Documentation clear and comprehensive
- Migration path well-defined
- Feature shipped within estimated timeline
- No critical bugs in first 2 weeks
- Positive feedback from early adopters
- Clear path forward for dataset deprecation
- Foundation for future data model features
| Phase | Duration | Dependencies |
|---|---|---|
| Phase 0: API Discovery | 0.5-1 day | Sigma API access |
| Phase 1: Data Structures | 0.5-1 day | Phase 0 complete |
| Phase 2: API Client | 1-1.5 days | Phase 1 complete |
| Phase 3: Translation | 1-1.5 days | Phase 2 complete |
| Phase 4: Asset Loading | 0.5-1 day | Phase 3 complete |
| Phase 5: Materialization | 1-1.5 days | Phase 4 complete |
| Phase 6: Components | 0.5-1 day | Phase 5 complete |
| Phase 7: Migration | 0.5-1 day | All phases complete |
| Testing | 1-2 days | Development complete |
| Documentation | 1 day | Testing complete |
| Total | 7-10 days |
- Review this plan with the team
- Obtain Sigma API access for test environment
- Execute Phase 0 (API discovery)
- Refine estimates based on Phase 0 findings
- Begin implementation following the phases
- Should we support data models from day one or release in stages?
- What's the priority: feature completeness vs fast release?
- Should we make data models opt-in or opt-out?
- How aggressively should we deprecate datasets?
- Do we need Sigma partnership/support for this work?
- Should metrics get their own asset specs or be metadata?
- How should we handle data model relationships in the lineage graph?
| Endpoint | Method | Purpose | Priority |
|---|---|---|---|
/v2/data-models |
GET | List all data models | Critical |
/v2/data-models/{id} |
GET | Get data model details | High |
/v2/data-models/{id}/sources |
GET | Get data sources | Critical |
/v2/data-models/{id}/materialization-schedules |
GET | Get schedules | High |
/v2/data-models/{id}/elements/{elem}/materialize |
POST | Trigger materialization | High |
/v2/data-models/{id}/materializations/{mat} |
GET | Check materialization status | High |
/v2/data-models/{id}/source-swap |
POST | Swap sources | Low |
dagster-sigma/
├── dagster_sigma/
│ ├── __init__.py [Export new classes]
│ ├── resource.py [+6 new methods, ~150 lines]
│ ├── translator.py [+4 new classes, ~300 lines]
│ ├── assets.py [+2 new functions, ~100 lines]
│ ├── cli.py [+1 new command, ~50 lines]
│ └── components/
│ └── sigma_component.py [Minor updates, ~20 lines]
├── dagster_sigma_tests/
│ ├── conftest.py [+100 lines fixtures]
│ ├── test_data_models.py [NEW FILE, ~300 lines]
│ ├── test_materialization.py [NEW FILE, ~150 lines]
│ ├── test_resource.py [+50 lines]
│ ├── test_translator.py [+50 lines]
│ ├── test_asset_specs.py [+50 lines]
│ └── test_components.py [+30 lines]
├── README.md [+200 lines]
├── MIGRATION_GUIDE.md [NEW FILE, ~500 lines]
└── CHANGELOG.md [+20 lines]
Total new/modified lines: ~2,000-2,500
End of Implementation Plan