Skip to content

Instantly share code, notes, and snippets.

@dstandish
Created August 5, 2025 02:30
Show Gist options
  • Select an option

  • Save dstandish/a748988614f9f718f6b96b1330c04eec to your computer and use it in GitHub Desktop.

Select an option

Save dstandish/a748988614f9f718f6b96b1330c04eec to your computer and use it in GitHub Desktop.
scheduler-query
WITH
anon_5 AS
(SELECT
task_instance.dag_id AS dag_id,
task_instance.run_id AS run_id,
count(:count_1) AS now_running
FROM task_instance
WHERE
task_instance.state IN (__[postcompile_state_4])
GROUP BY task_instance.dag_id, task_instance.run_id
),
anon_6 AS
(SELECT
task_instance.dag_id AS dag_id,
task_instance.task_id AS task_id,
count(:count_2) AS now_running
FROM task_instance
WHERE
task_instance.state IN (__[postcompile_state_5])
GROUP BY task_instance.dag_id, task_instance.task_id
),
anon_7 AS
(SELECT
task_instance.dag_id AS dag_id,
task_instance.run_id AS run_id,
task_instance.task_id AS task_id,
count(:count_3) AS now_running
FROM task_instance
WHERE
task_instance.state IN (__[postcompile_state_6])
GROUP BY task_instance.dag_id, task_instance.run_id, task_instance.task_id
),
anon_8 AS
(SELECT
task_instance.pool AS pool,
count(:count_4) AS now_running
FROM task_instance
WHERE
task_instance.state IN (__[postcompile_state_7])
GROUP BY task_instance.pool
)
SELECT
task_instance.rendered_map_index,
task_instance.task_display_name,
task_instance.id,
task_instance.task_id,
task_instance.dag_id,
task_instance.run_id,
task_instance.map_index,
task_instance.start_date,
task_instance.end_date,
task_instance.duration,
task_instance.state,
task_instance.try_number,
task_instance.max_tries,
task_instance.hostname,
task_instance.unixname,
task_instance.pool,
task_instance.pool_slots,
task_instance.queue,
task_instance.priority_weight,
task_instance.operator,
task_instance.custom_operator_name,
task_instance.queued_dttm,
task_instance.scheduled_dttm,
task_instance.queued_by_job_id,
task_instance.last_heartbeat_at,
task_instance.pid,
task_instance.executor,
task_instance.executor_config,
task_instance.max_active_tis_per_dag,
task_instance.max_active_tis_per_dagrun,
task_instance.updated_at,
task_instance.context_carrier,
task_instance.span_status,
task_instance.external_executor_id,
task_instance.trigger_id,
task_instance.trigger_timeout,
task_instance.next_method,
task_instance.next_kwargs,
task_instance.dag_version_id,
dag_run_1.state AS state_1,
dag_run_1.id AS id_1,
dag_run_1.dag_id AS dag_id_1,
dag_run_1.queued_at,
dag_run_1.logical_date,
dag_run_1.start_date AS start_date_1,
dag_run_1.end_date AS end_date_1,
dag_run_1.run_id AS run_id_1,
dag_run_1.creating_job_id,
dag_run_1.run_type,
dag_run_1.triggered_by,
dag_run_1.triggering_user_name,
dag_run_1.conf,
dag_run_1.data_interval_start,
dag_run_1.data_interval_end,
dag_run_1.run_after,
dag_run_1.last_scheduling_decision,
dag_run_1.log_template_id,
dag_run_1.updated_at AS updated_at_1,
dag_run_1.clear_number,
dag_run_1.backfill_id,
dag_run_1.bundle_version,
dag_run_1.scheduled_by_job_id,
dag_run_1.context_carrier AS context_carrier_1,
dag_run_1.span_status AS span_status_1,
dag_run_1.created_dag_version_id
FROM
task_instance
JOIN (SELECT
task_instance.rendered_map_index AS rendered_map_index,
task_instance.task_display_name AS task_display_name,
task_instance.id AS id,
task_instance.task_id AS task_id,
task_instance.dag_id AS dag_id,
task_instance.run_id AS run_id,
task_instance.map_index AS map_index,
task_instance.start_date AS start_date,
task_instance.end_date AS end_date,
task_instance.duration AS duration,
task_instance.state AS state,
task_instance.try_number AS try_number,
task_instance.max_tries AS max_tries,
task_instance.hostname AS hostname,
task_instance.unixname AS unixname,
task_instance.pool AS pool,
task_instance.pool_slots AS pool_slots,
task_instance.queue AS queue,
task_instance.priority_weight AS priority_weight,
task_instance.operator AS operator,
task_instance.custom_operator_name AS custom_operator_name,
task_instance.queued_dttm AS queued_dttm,
task_instance.scheduled_dttm AS scheduled_dttm,
task_instance.queued_by_job_id AS queued_by_job_id,
task_instance.last_heartbeat_at AS last_heartbeat_at,
task_instance.pid AS pid,
task_instance.executor AS executor,
task_instance.executor_config AS executor_config,
task_instance.max_active_tis_per_dag AS max_active_tis_per_dag,
task_instance.max_active_tis_per_dagrun AS max_active_tis_per_dagrun,
task_instance.updated_at AS updated_at,
task_instance.context_carrier AS context_carrier,
task_instance.span_status AS span_status,
task_instance.external_executor_id AS external_executor_id,
task_instance.trigger_id AS trigger_id,
task_instance.trigger_timeout AS trigger_timeout,
task_instance.next_method AS next_method,
task_instance.next_kwargs AS next_kwargs,
task_instance.dag_version_id AS dag_version_id,
sum(task_instance.pool_slots)
OVER (PARTITION BY task_instance.pool ORDER BY -task_instance.priority_weight, dag_run.logical_date, task_instance.map_index) AS pool_slots_taken
FROM
task_instance
JOIN (SELECT
task_instance.rendered_map_index AS rendered_map_index,
task_instance.task_display_name AS task_display_name,
task_instance.id AS id,
task_instance.task_id AS task_id,
task_instance.dag_id AS dag_id,
task_instance.run_id AS run_id,
task_instance.map_index AS map_index,
task_instance.start_date AS start_date,
task_instance.end_date AS end_date,
task_instance.duration AS duration,
task_instance.state AS state,
task_instance.try_number AS try_number,
task_instance.max_tries AS max_tries,
task_instance.hostname AS hostname,
task_instance.unixname AS unixname,
task_instance.pool AS pool,
task_instance.pool_slots AS pool_slots,
task_instance.queue AS queue,
task_instance.priority_weight AS priority_weight,
task_instance.operator AS operator,
task_instance.custom_operator_name AS custom_operator_name,
task_instance.queued_dttm AS queued_dttm,
task_instance.scheduled_dttm AS scheduled_dttm,
task_instance.queued_by_job_id AS queued_by_job_id,
task_instance.last_heartbeat_at AS last_heartbeat_at,
task_instance.pid AS pid,
task_instance.executor AS executor,
task_instance.executor_config AS executor_config,
task_instance.max_active_tis_per_dag AS max_active_tis_per_dag,
task_instance.max_active_tis_per_dagrun AS max_active_tis_per_dagrun,
task_instance.updated_at AS updated_at,
task_instance.context_carrier AS context_carrier,
task_instance.span_status AS span_status,
task_instance.external_executor_id AS external_executor_id,
task_instance.trigger_id AS trigger_id,
task_instance.trigger_timeout AS trigger_timeout,
task_instance.next_method AS next_method,
task_instance.next_kwargs AS next_kwargs,
task_instance.dag_version_id AS dag_version_id,
row_number()
OVER (PARTITION BY task_instance.dag_id, task_instance.run_id, task_instance.task_id ORDER BY -task_instance.priority_weight, dag_run.logical_date, task_instance.map_index) AS mapped_tis_per_dagrun_count
FROM
task_instance
JOIN (SELECT
task_instance.rendered_map_index AS rendered_map_index,
task_instance.task_display_name AS task_display_name,
task_instance.id AS id,
task_instance.task_id AS task_id,
task_instance.dag_id AS dag_id,
task_instance.run_id AS run_id,
task_instance.map_index AS map_index,
task_instance.start_date AS start_date,
task_instance.end_date AS end_date,
task_instance.duration AS duration,
task_instance.state AS state,
task_instance.try_number AS try_number,
task_instance.max_tries AS max_tries,
task_instance.hostname AS hostname,
task_instance.unixname AS unixname,
task_instance.pool AS pool,
task_instance.pool_slots AS pool_slots,
task_instance.queue AS queue,
task_instance.priority_weight AS priority_weight,
task_instance.operator AS operator,
task_instance.custom_operator_name AS custom_operator_name,
task_instance.queued_dttm AS queued_dttm,
task_instance.scheduled_dttm AS scheduled_dttm,
task_instance.queued_by_job_id AS queued_by_job_id,
task_instance.last_heartbeat_at AS last_heartbeat_at,
task_instance.pid AS pid,
task_instance.executor AS executor,
task_instance.executor_config AS executor_config,
task_instance.max_active_tis_per_dag AS max_active_tis_per_dag,
task_instance.max_active_tis_per_dagrun AS max_active_tis_per_dagrun,
task_instance.updated_at AS updated_at,
task_instance.context_carrier AS context_carrier,
task_instance.span_status AS span_status,
task_instance.external_executor_id AS external_executor_id,
task_instance.trigger_id AS trigger_id,
task_instance.trigger_timeout AS trigger_timeout,
task_instance.next_method AS next_method,
task_instance.next_kwargs AS next_kwargs,
task_instance.dag_version_id AS dag_version_id,
row_number()
OVER (PARTITION BY task_instance.dag_id, task_instance.task_id ORDER BY -task_instance.priority_weight, dag_run.logical_date, task_instance.map_index) AS tis_per_dag_count
FROM
task_instance
JOIN (SELECT
task_instance.rendered_map_index AS rendered_map_index,
task_instance.task_display_name AS task_display_name,
task_instance.id AS id,
task_instance.task_id AS task_id,
task_instance.dag_id AS dag_id,
task_instance.run_id AS run_id,
task_instance.map_index AS map_index,
task_instance.start_date AS start_date,
task_instance.end_date AS end_date,
task_instance.duration AS duration,
task_instance.state AS state,
task_instance.try_number AS try_number,
task_instance.max_tries AS max_tries,
task_instance.hostname AS hostname,
task_instance.unixname AS unixname,
task_instance.pool AS pool,
task_instance.pool_slots AS pool_slots,
task_instance.queue AS queue,
task_instance.priority_weight AS priority_weight,
task_instance.operator AS operator,
task_instance.custom_operator_name AS custom_operator_name,
task_instance.queued_dttm AS queued_dttm,
task_instance.scheduled_dttm AS scheduled_dttm,
task_instance.queued_by_job_id AS queued_by_job_id,
task_instance.last_heartbeat_at AS last_heartbeat_at,
task_instance.pid AS pid,
task_instance.executor AS executor,
task_instance.executor_config AS executor_config,
task_instance.max_active_tis_per_dag AS max_active_tis_per_dag,
task_instance.max_active_tis_per_dagrun AS max_active_tis_per_dagrun,
task_instance.updated_at AS updated_at,
task_instance.context_carrier AS context_carrier,
task_instance.span_status AS span_status,
task_instance.external_executor_id AS external_executor_id,
task_instance.trigger_id AS trigger_id,
task_instance.trigger_timeout AS trigger_timeout,
task_instance.next_method AS next_method,
task_instance.next_kwargs AS next_kwargs,
task_instance.dag_version_id AS dag_version_id,
row_number()
OVER (PARTITION BY task_instance.dag_id, task_instance.run_id ORDER BY -task_instance.priority_weight, dag_run.logical_date, task_instance.map_index) AS total_tis_per_dagrun_count
FROM
task_instance
JOIN dag_run ON dag_run.dag_id = task_instance.dag_id
AND dag_run.run_id = task_instance.run_id
JOIN dag ON task_instance.dag_id = dag.dag_id
WHERE
dag_run.state = :state_2
AND NOT dag.is_paused
AND task_instance.state = :state_3
AND dag.bundle_name IS NOT NULL
) AS anon_4 ON task_instance.id = anon_4.id
LEFT OUTER JOIN anon_5
ON task_instance.dag_id = anon_5.dag_id AND task_instance.run_id = anon_5.run_id
JOIN dag_run ON task_instance.run_id = dag_run.run_id
JOIN dag ON task_instance.dag_id = dag.dag_id
WHERE
coalesce(anon_4.total_tis_per_dagrun_count, 0) + coalesce(anon_5.now_running, 0)
<= coalesce(dag.max_active_tasks, :coalesce_1)
) AS anon_3 ON task_instance.id = anon_3.id
LEFT OUTER JOIN anon_6
ON task_instance.dag_id = anon_6.dag_id AND task_instance.task_id = anon_6.task_id
JOIN dag_run ON task_instance.run_id = dag_run.run_id
WHERE
coalesce(anon_3.tis_per_dag_count, 0) + coalesce(anon_6.now_running, 0)
<= coalesce(task_instance.max_active_tis_per_dag, :coalesce_2)
) AS anon_2 ON task_instance.id = anon_2.id
LEFT OUTER JOIN anon_7 ON task_instance.dag_id = anon_7.dag_id AND task_instance.run_id = anon_7.run_id
AND task_instance.task_id = anon_7.task_id
JOIN dag_run ON task_instance.run_id = dag_run.run_id
WHERE
coalesce(anon_2.mapped_tis_per_dagrun_count, 0) + coalesce(anon_7.now_running, 0)
<= coalesce(task_instance.max_active_tis_per_dagrun, :coalesce_3)
) AS anon_1 ON task_instance.id = anon_1.id
LEFT OUTER JOIN anon_8 ON task_instance.pool = anon_8.pool
JOIN dag_run ON task_instance.run_id = dag_run.run_id
JOIN slot_pool ON task_instance.pool = slot_pool.pool
JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id
WHERE
coalesce(anon_1.pool_slots_taken, 0) + coalesce(anon_8.now_running, 0) <= coalesce(slot_pool.slots, :coalesce_4)
LIMIT :param_1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment