Skip to content

Instantly share code, notes, and snippets.

@zzstoatzz
Last active December 3, 2025 00:46
Show Gist options
  • Select an option

  • Save zzstoatzz/138c82fcc6e7b7e411254f3ef8bdcbd7 to your computer and use it in GitHub Desktop.

Select an option

Save zzstoatzz/138c82fcc6e7b7e411254f3ef8bdcbd7 to your computer and use it in GitHub Desktop.
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "prefect>=3.0",
# "prefect-dbt>=0.7.9",
# "dbt-duckdb>=1.9",
# ]
# ///
"""
dbt + Prefect: Resume-from-Failure Demo
Q: If 4 tests run and 1 fails, do we retry all tests?
A: NO - `dbt retry` only re-runs the failed test(s).
Run:
uv run https://gist.githubusercontent.com/.../dbt_retry_demo.py
"""
import tempfile
from pathlib import Path
from prefect import flow
from prefect_dbt import PrefectDbtRunner, PrefectDbtSettings
def create_project(project_dir: Path):
"""Create a minimal dbt project with 1 model and 4 tests."""
project_dir.mkdir(parents=True, exist_ok=True)
(project_dir / "dbt_project.yml").write_text(
"name: demo\nversion: '1.0'\nprofile: demo\n"
"model-paths: ['models']\ntest-paths: ['tests']"
)
profiles = project_dir / "profiles"
profiles.mkdir(exist_ok=True)
(profiles / "profiles.yml").write_text(
f"demo:\n target: dev\n outputs:\n dev:\n"
f" type: duckdb\n path: {project_dir}/demo.duckdb"
)
models = project_dir / "models"
models.mkdir(exist_ok=True)
(models / "users.sql").write_text(
"select 1 as id, 'alice' as name union all select 2, 'bob'"
)
(models / "schema.yml").write_text(
"version: 2\nmodels:\n - name: users\n columns:\n"
" - name: id\n tests: [unique, not_null]\n"
" - name: name\n tests: [not_null]"
)
tests = project_dir / "tests"
tests.mkdir(exist_ok=True)
# This test fails: bad assertion (will fix by correcting the test)
(tests / "bad_test.sql").write_text(
"-- This test incorrectly expects 10 users\n"
"select 1 where (select count(*) from {{ ref('users') }}) < 10"
)
@flow(log_prints=True)
def main():
with tempfile.TemporaryDirectory() as tmpdir:
project_dir = Path(tmpdir) / "demo"
create_project(project_dir)
settings = PrefectDbtSettings(
project_dir=project_dir,
profiles_dir=project_dir / "profiles",
)
runner = PrefectDbtRunner(settings=settings)
print("=" * 60)
print("STEP 1: dbt build (1 model + 4 tests, 1 test will fail)")
print("=" * 60)
try:
runner.invoke(["build"])
except ValueError:
print("\n>>> 1 test failed (bad_test) - expected!")
print("\n" + "=" * 60)
print("STEP 2: Fix the bad test")
print("=" * 60)
tests = project_dir / "tests"
(tests / "bad_test.sql").write_text(
"-- Fixed: now correctly expects at least 1 user\n"
"select 1 where (select count(*) from {{ ref('users') }}) < 1"
)
print("bad_test.sql fixed!")
print("\n" + "=" * 60)
print("STEP 3: dbt retry")
print("=" * 60)
print("Watch the output: only 1 of 4 tests re-runs!\n")
runner.invoke(["retry"])
print("\n" + "=" * 60)
print("SUCCESS!")
print("=" * 60)
print("""
Q: If 4 tests run and 1 fails, do we retry all 4?
A: NO - dbt retry only re-runs failed nodes.
- dbt writes results to target/run_results.json
- `dbt retry` reads that file to find failures
- Only failed nodes re-run (see "1 of 1" above)
This is dbt's native state tracking - works great with prefect-dbt!
""")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment