Created
February 2, 2026 15:42
-
-
Save tatiana/17c30b77f8996201dba61d9a370a07e1 to your computer and use it in GitHub Desktop.
DAG illustrating how to track memory consumption used by Cosmos and Airflow tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Airflow 2.x / 3.x compatible DAG that dynamically creates Cosmos DbtBuildLocalOperator | |
| instances for multiple dbt project paths and tracks MAX memory usage per task | |
| (including all child processes / threads). | |
| Requirements: | |
| - apache-airflow >= 2.5 | |
| - astronomer-cosmos | |
| - psutil | |
| This file is self-contained and safe to drop into your DAGs folder. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import os | |
| import threading | |
| import time | |
| from datetime import datetime, timedelta | |
| from typing import Dict | |
| import psutil | |
| from airflow import DAG | |
| from airflow.models.taskinstance import TaskInstance | |
| try: | |
| from airflow.providers.standard.operators.empty import EmptyOperator | |
| except ImportError: | |
| from airflow.operators.empty import EmptyOperator | |
| from cosmos.config import ProfileConfig | |
| from cosmos.operators.local import DbtBuildLocalOperator | |
| from cosmos.operators.watcher import DbtConsumerWatcherSensor | |
| logger = logging.getLogger("memory_tracking_dag") | |
| # --------------------------------------------------------------------------- | |
| # CONFIGURATION | |
| # --------------------------------------------------------------------------- | |
| dbt_projects = [ | |
| { | |
| "project_path": "/Users/tatiana.alchueyr/Code/astronomer-cosmos/dev/dags/dbt/jaffle_shop", | |
| "profile_config": ProfileConfig( | |
| profile_name="postgres_profile", | |
| target_name="dev", | |
| profiles_yml_filepath="/Users/tatiana.alchueyr/Code/astronomer-cosmos/dev/dags/dbt/jaffle_shop/profiles.yml" | |
| ) | |
| }, | |
| { | |
| "project_path": "/Users/tatiana.alchueyr/Code/cosmos-benchmark/dbt/fhir-dbt-analytics", | |
| "profile_config": ProfileConfig( | |
| profile_name="fhir_dbt_analytics", | |
| target_name="dev", | |
| profiles_yml_filepath="/Users/tatiana.alchueyr/Code/cosmos-benchmark/dbt/fhir-dbt-analytics/profiles.yml" | |
| ) | |
| }, | |
| ] | |
| MEMORY_POLL_INTERVAL_SECONDS = 0.5 | |
| # --------------------------------------------------------------------------- | |
| # MEMORY TRACKING UTILITIES | |
| # --------------------------------------------------------------------------- | |
| _memory_trackers: Dict[str, "MemoryTracker"] = {} | |
| class MemoryTracker: | |
| """ | |
| Tracks maximum RSS memory (bytes) for a process and all of its children. | |
| Sampling-based to work across Airflow 2 & 3 without executor internals. | |
| """ | |
| def __init__(self, pid: int, poll_interval: float = 0.5): | |
| self.pid = pid | |
| self.poll_interval = poll_interval | |
| self.max_rss_bytes = 0 | |
| self._stop_event = threading.Event() | |
| self._thread = threading.Thread(target=self._run, daemon=True) | |
| def start(self): | |
| self._thread.start() | |
| def stop(self): | |
| self._stop_event.set() | |
| self._thread.join(timeout=5) | |
| def _run(self): | |
| try: | |
| parent = psutil.Process(self.pid) | |
| except psutil.NoSuchProcess: | |
| return | |
| while not self._stop_event.is_set(): | |
| rss = 0 | |
| try: | |
| processes = [parent] + parent.children(recursive=True) | |
| for p in processes: | |
| try: | |
| rss += p.memory_info().rss | |
| except psutil.NoSuchProcess: | |
| continue | |
| self.max_rss_bytes = max(self.max_rss_bytes, rss) | |
| except psutil.NoSuchProcess: | |
| break | |
| time.sleep(self.poll_interval) | |
| # --------------------------------------------------------------------------- | |
| # AIRFLOW CALLBACKS | |
| # --------------------------------------------------------------------------- | |
| def start_memory_tracking(context): | |
| ti: TaskInstance = context["ti"] | |
| pid = os.getpid() | |
| tracker = MemoryTracker(pid=pid, poll_interval=MEMORY_POLL_INTERVAL_SECONDS) | |
| _memory_trackers[ti.task_id] = tracker | |
| tracker.start() | |
| def stop_memory_tracking(context): | |
| ti: TaskInstance = context["ti"] | |
| tracker = _memory_trackers.pop(ti.task_id, None) | |
| if tracker: | |
| tracker.stop() | |
| max_mb = tracker.max_rss_bytes / 1024 / 1024 | |
| logger.info("Max memory usage (RSS, incl. children): %.2f MB", max_mb) | |
| # Persist to XCom for observability | |
| ti.xcom_push(key="max_memory_mb", value=round(max_mb, 2)) | |
| # --------------------------------------------------------------------------- | |
| # DAG DEFINITION | |
| # --------------------------------------------------------------------------- | |
| with DAG( | |
| dag_id="memory_tracking_dag", | |
| start_date=datetime(2026, 2, 1), | |
| schedule=None, | |
| catchup=False, | |
| tags=["dbt", "cosmos", "memory"], | |
| ) as dag: | |
| for dbt_project in dbt_projects: | |
| project_path = dbt_project["project_path"] | |
| profile_config = dbt_project["profile_config"] | |
| project_name = os.path.basename(project_path.rstrip("/")) | |
| DbtBuildLocalOperator( | |
| task_id=f"dbt_build_{project_name}", | |
| project_dir=project_path, | |
| profile_config=profile_config, | |
| dbt_cmd_flags=["--threads", 16], | |
| install_deps=True, | |
| on_execute_callback=start_memory_tracking, | |
| on_success_callback=stop_memory_tracking, | |
| on_failure_callback=stop_memory_tracking, | |
| ) | |
| consumer = DbtConsumerWatcherSensor( | |
| task_id="dbt_consumer_sensor", | |
| project_dir=project_path, | |
| profile_config=profile_config, | |
| install_deps=True, | |
| execution_timeout=timedelta(seconds=5), | |
| on_execute_callback=start_memory_tracking, | |
| on_success_callback=stop_memory_tracking, | |
| on_failure_callback=stop_memory_tracking, | |
| ) | |
| empty_operator = EmptyOperator( | |
| task_id="empty_operator", | |
| on_execute_callback=start_memory_tracking, | |
| on_success_callback=stop_memory_tracking, | |
| on_failure_callback=stop_memory_tracking, | |
| ) |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
These were some memory consumption numbers I observed when running Airflow 3.1 standalone: