Last active
May 24, 2024 21:19
-
-
Save dpeng817/e54c7cf92cc7b4a505dd83ddbb49a8f2 to your computer and use it in GitHub Desktop.
Freshness checks example code
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from dagster import ( | |
| build_time_partition_freshness_checks, | |
| build_sensor_for_freshness_checks, | |
| AssetCheckSeverity, | |
| JobDefinition, | |
| AssetChecksDefinition | |
| ) | |
| from typing import Dict, Optional | |
| import json | |
| # Represents the latest evaluation timestamp for each asset check. This allows us to avoid reacting to the same failure multiple times. | |
| LatestEvaluationTimestampMapping = Dict[str, float] | |
| # every day at 9am, the previous day's partition should be fresh. | |
| # The return type here is a sequence of checks, which follows our build_x_checks model which all return | |
| # sequences, but in practice this is a sequence with a single multi-asset-check definition (even if you pass in multiple assets). | |
| freshness_checks: AssetChecksDefinition = build_time_partition_freshness_checks( | |
| [my_daily_partitioned_assets_def], deadline_cron="0 9 * * *", timezone="...", severity=AssetCheckSeverity.WARN | |
| ) | |
| # will handle the scheduling of the freshness check | |
| freshness_sensor = build_sensor_for_freshness_checks( | |
| freshness_checks=freshness_checks, name="..." | |
| ) | |
| # performing some action based on freshness check results | |
| def react_to_failed_checks_factory(check_keys: Sequence[AssetCheckKey], reaction_job: JobDefinition): | |
| @sensor(job=reaction_job) | |
| def react_to_freshness_checks(context: SensorEvaluationContext) -> Optional[RunRequest]: | |
| # Cursor here is a mapping from freshness check key to the timestamp of the last evaluation | |
| # of that check that we've processed. | |
| last_evaluation_timestamps = json.loads(context.cursor) if context.cursor else {} | |
| # This API gives us a "summary record" for each freshness check. A summary record | |
| # contains the most recent execution record for the check. | |
| summary_records = context.instance.event_log_storage.get_asset_check_summary_records(check_keys) | |
| # here is where we accumulate the check keys to react to. | |
| freshness_checks_to_react_to = [] | |
| for check_key in check_keys: | |
| summary_record = summary_records[check_key] | |
| last_check_execution_record = summary_record.last_check_execution_record | |
| # If there is an execution of this check (possibly none if the check has never run), | |
| # and it evaluated to failure, then check if we've processed this failure already. | |
| if last_check_execution_record and last_check_execution_record.status.value == "FAILED": | |
| last_eval_timestamp = last_evaluation_timestamps.get(check_key) | |
| # If we haven't processed this failure yet, then we should react to it. | |
| if last_eval_timestamp is None or last_check_execution_record.create_timestamp > last_eval_timestamp: | |
| # If we haven't processed this failure yet, then we should react to it. | |
| freshness_checks_to_react_to.append(check_key) | |
| last_evaluation_timestamps[check_key] = last_check_execution_record.create_timestamp | |
| # Do something to convert the freshness check keys into a run request, which will kick off | |
| # the reaction job. This probably means conversion into config. | |
| ... | |
| # Update the cursor | |
| context.update_cursor(json.dumps(last_evaluation_timestamps)) | |
| return react_to_freshness_checks | |
| my_reaction_sensor = react_to_failed_checks_factory( | |
| check_keys=[freshness_check.check_key], reaction_job=open_datadog_freshness_incident | |
| ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment