Skip to content

Instantly share code, notes, and snippets.

@tatiana
Created February 2, 2026 15:42
Show Gist options
  • Select an option

  • Save tatiana/17c30b77f8996201dba61d9a370a07e1 to your computer and use it in GitHub Desktop.

Select an option

Save tatiana/17c30b77f8996201dba61d9a370a07e1 to your computer and use it in GitHub Desktop.
DAG illustrating how to track memory consumption used by Cosmos and Airflow tasks
"""
Airflow 2.x / 3.x compatible DAG that dynamically creates Cosmos DbtBuildLocalOperator
instances for multiple dbt project paths and tracks MAX memory usage per task
(including all child processes / threads).
Requirements:
- apache-airflow >= 2.5
- astronomer-cosmos
- psutil
This file is self-contained and safe to drop into your DAGs folder.
"""
from __future__ import annotations
import logging
import os
import threading
import time
from datetime import datetime, timedelta
from typing import Dict
import psutil
from airflow import DAG
from airflow.models.taskinstance import TaskInstance
try:
from airflow.providers.standard.operators.empty import EmptyOperator
except ImportError:
from airflow.operators.empty import EmptyOperator
from cosmos.config import ProfileConfig
from cosmos.operators.local import DbtBuildLocalOperator
from cosmos.operators.watcher import DbtConsumerWatcherSensor
logger = logging.getLogger("memory_tracking_dag")
# ---------------------------------------------------------------------------
# CONFIGURATION
# ---------------------------------------------------------------------------
dbt_projects = [
{
"project_path": "/Users/tatiana.alchueyr/Code/astronomer-cosmos/dev/dags/dbt/jaffle_shop",
"profile_config": ProfileConfig(
profile_name="postgres_profile",
target_name="dev",
profiles_yml_filepath="/Users/tatiana.alchueyr/Code/astronomer-cosmos/dev/dags/dbt/jaffle_shop/profiles.yml"
)
},
{
"project_path": "/Users/tatiana.alchueyr/Code/cosmos-benchmark/dbt/fhir-dbt-analytics",
"profile_config": ProfileConfig(
profile_name="fhir_dbt_analytics",
target_name="dev",
profiles_yml_filepath="/Users/tatiana.alchueyr/Code/cosmos-benchmark/dbt/fhir-dbt-analytics/profiles.yml"
)
},
]
MEMORY_POLL_INTERVAL_SECONDS = 0.5
# ---------------------------------------------------------------------------
# MEMORY TRACKING UTILITIES
# ---------------------------------------------------------------------------
_memory_trackers: Dict[str, "MemoryTracker"] = {}
class MemoryTracker:
"""
Tracks maximum RSS memory (bytes) for a process and all of its children.
Sampling-based to work across Airflow 2 & 3 without executor internals.
"""
def __init__(self, pid: int, poll_interval: float = 0.5):
self.pid = pid
self.poll_interval = poll_interval
self.max_rss_bytes = 0
self._stop_event = threading.Event()
self._thread = threading.Thread(target=self._run, daemon=True)
def start(self):
self._thread.start()
def stop(self):
self._stop_event.set()
self._thread.join(timeout=5)
def _run(self):
try:
parent = psutil.Process(self.pid)
except psutil.NoSuchProcess:
return
while not self._stop_event.is_set():
rss = 0
try:
processes = [parent] + parent.children(recursive=True)
for p in processes:
try:
rss += p.memory_info().rss
except psutil.NoSuchProcess:
continue
self.max_rss_bytes = max(self.max_rss_bytes, rss)
except psutil.NoSuchProcess:
break
time.sleep(self.poll_interval)
# ---------------------------------------------------------------------------
# AIRFLOW CALLBACKS
# ---------------------------------------------------------------------------
def start_memory_tracking(context):
ti: TaskInstance = context["ti"]
pid = os.getpid()
tracker = MemoryTracker(pid=pid, poll_interval=MEMORY_POLL_INTERVAL_SECONDS)
_memory_trackers[ti.task_id] = tracker
tracker.start()
def stop_memory_tracking(context):
ti: TaskInstance = context["ti"]
tracker = _memory_trackers.pop(ti.task_id, None)
if tracker:
tracker.stop()
max_mb = tracker.max_rss_bytes / 1024 / 1024
logger.info("Max memory usage (RSS, incl. children): %.2f MB", max_mb)
# Persist to XCom for observability
ti.xcom_push(key="max_memory_mb", value=round(max_mb, 2))
# ---------------------------------------------------------------------------
# DAG DEFINITION
# ---------------------------------------------------------------------------
with DAG(
dag_id="memory_tracking_dag",
start_date=datetime(2026, 2, 1),
schedule=None,
catchup=False,
tags=["dbt", "cosmos", "memory"],
) as dag:
for dbt_project in dbt_projects:
project_path = dbt_project["project_path"]
profile_config = dbt_project["profile_config"]
project_name = os.path.basename(project_path.rstrip("/"))
DbtBuildLocalOperator(
task_id=f"dbt_build_{project_name}",
project_dir=project_path,
profile_config=profile_config,
dbt_cmd_flags=["--threads", 16],
install_deps=True,
on_execute_callback=start_memory_tracking,
on_success_callback=stop_memory_tracking,
on_failure_callback=stop_memory_tracking,
)
consumer = DbtConsumerWatcherSensor(
task_id="dbt_consumer_sensor",
project_dir=project_path,
profile_config=profile_config,
install_deps=True,
execution_timeout=timedelta(seconds=5),
on_execute_callback=start_memory_tracking,
on_success_callback=stop_memory_tracking,
on_failure_callback=stop_memory_tracking,
)
empty_operator = EmptyOperator(
task_id="empty_operator",
on_execute_callback=start_memory_tracking,
on_success_callback=stop_memory_tracking,
on_failure_callback=stop_memory_tracking,
)
@tatiana
Copy link
Author

tatiana commented Feb 4, 2026

These were some memory consumption numbers I observed when running Airflow 3.1 standalone:

  • dbt build task with a small dbt project (Jaffle Shop): 292-378 MB
  • dbt build task with a medium dbt project (Google FHIR analytics): 608-708 MB
  • Watcher consumer sensor: 232 - 243 MB
  • Empty operator: 203 - 204 MB

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment