Created
August 5, 2025 02:30
-
-
Save dstandish/a748988614f9f718f6b96b1330c04eec to your computer and use it in GitHub Desktop.
scheduler-query
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| WITH | |
| anon_5 AS | |
| (SELECT | |
| task_instance.dag_id AS dag_id, | |
| task_instance.run_id AS run_id, | |
| count(:count_1) AS now_running | |
| FROM task_instance | |
| WHERE | |
| task_instance.state IN (__[postcompile_state_4]) | |
| GROUP BY task_instance.dag_id, task_instance.run_id | |
| ), | |
| anon_6 AS | |
| (SELECT | |
| task_instance.dag_id AS dag_id, | |
| task_instance.task_id AS task_id, | |
| count(:count_2) AS now_running | |
| FROM task_instance | |
| WHERE | |
| task_instance.state IN (__[postcompile_state_5]) | |
| GROUP BY task_instance.dag_id, task_instance.task_id | |
| ), | |
| anon_7 AS | |
| (SELECT | |
| task_instance.dag_id AS dag_id, | |
| task_instance.run_id AS run_id, | |
| task_instance.task_id AS task_id, | |
| count(:count_3) AS now_running | |
| FROM task_instance | |
| WHERE | |
| task_instance.state IN (__[postcompile_state_6]) | |
| GROUP BY task_instance.dag_id, task_instance.run_id, task_instance.task_id | |
| ), | |
| anon_8 AS | |
| (SELECT | |
| task_instance.pool AS pool, | |
| count(:count_4) AS now_running | |
| FROM task_instance | |
| WHERE | |
| task_instance.state IN (__[postcompile_state_7]) | |
| GROUP BY task_instance.pool | |
| ) | |
| SELECT | |
| task_instance.rendered_map_index, | |
| task_instance.task_display_name, | |
| task_instance.id, | |
| task_instance.task_id, | |
| task_instance.dag_id, | |
| task_instance.run_id, | |
| task_instance.map_index, | |
| task_instance.start_date, | |
| task_instance.end_date, | |
| task_instance.duration, | |
| task_instance.state, | |
| task_instance.try_number, | |
| task_instance.max_tries, | |
| task_instance.hostname, | |
| task_instance.unixname, | |
| task_instance.pool, | |
| task_instance.pool_slots, | |
| task_instance.queue, | |
| task_instance.priority_weight, | |
| task_instance.operator, | |
| task_instance.custom_operator_name, | |
| task_instance.queued_dttm, | |
| task_instance.scheduled_dttm, | |
| task_instance.queued_by_job_id, | |
| task_instance.last_heartbeat_at, | |
| task_instance.pid, | |
| task_instance.executor, | |
| task_instance.executor_config, | |
| task_instance.max_active_tis_per_dag, | |
| task_instance.max_active_tis_per_dagrun, | |
| task_instance.updated_at, | |
| task_instance.context_carrier, | |
| task_instance.span_status, | |
| task_instance.external_executor_id, | |
| task_instance.trigger_id, | |
| task_instance.trigger_timeout, | |
| task_instance.next_method, | |
| task_instance.next_kwargs, | |
| task_instance.dag_version_id, | |
| dag_run_1.state AS state_1, | |
| dag_run_1.id AS id_1, | |
| dag_run_1.dag_id AS dag_id_1, | |
| dag_run_1.queued_at, | |
| dag_run_1.logical_date, | |
| dag_run_1.start_date AS start_date_1, | |
| dag_run_1.end_date AS end_date_1, | |
| dag_run_1.run_id AS run_id_1, | |
| dag_run_1.creating_job_id, | |
| dag_run_1.run_type, | |
| dag_run_1.triggered_by, | |
| dag_run_1.triggering_user_name, | |
| dag_run_1.conf, | |
| dag_run_1.data_interval_start, | |
| dag_run_1.data_interval_end, | |
| dag_run_1.run_after, | |
| dag_run_1.last_scheduling_decision, | |
| dag_run_1.log_template_id, | |
| dag_run_1.updated_at AS updated_at_1, | |
| dag_run_1.clear_number, | |
| dag_run_1.backfill_id, | |
| dag_run_1.bundle_version, | |
| dag_run_1.scheduled_by_job_id, | |
| dag_run_1.context_carrier AS context_carrier_1, | |
| dag_run_1.span_status AS span_status_1, | |
| dag_run_1.created_dag_version_id | |
| FROM | |
| task_instance | |
| JOIN (SELECT | |
| task_instance.rendered_map_index AS rendered_map_index, | |
| task_instance.task_display_name AS task_display_name, | |
| task_instance.id AS id, | |
| task_instance.task_id AS task_id, | |
| task_instance.dag_id AS dag_id, | |
| task_instance.run_id AS run_id, | |
| task_instance.map_index AS map_index, | |
| task_instance.start_date AS start_date, | |
| task_instance.end_date AS end_date, | |
| task_instance.duration AS duration, | |
| task_instance.state AS state, | |
| task_instance.try_number AS try_number, | |
| task_instance.max_tries AS max_tries, | |
| task_instance.hostname AS hostname, | |
| task_instance.unixname AS unixname, | |
| task_instance.pool AS pool, | |
| task_instance.pool_slots AS pool_slots, | |
| task_instance.queue AS queue, | |
| task_instance.priority_weight AS priority_weight, | |
| task_instance.operator AS operator, | |
| task_instance.custom_operator_name AS custom_operator_name, | |
| task_instance.queued_dttm AS queued_dttm, | |
| task_instance.scheduled_dttm AS scheduled_dttm, | |
| task_instance.queued_by_job_id AS queued_by_job_id, | |
| task_instance.last_heartbeat_at AS last_heartbeat_at, | |
| task_instance.pid AS pid, | |
| task_instance.executor AS executor, | |
| task_instance.executor_config AS executor_config, | |
| task_instance.max_active_tis_per_dag AS max_active_tis_per_dag, | |
| task_instance.max_active_tis_per_dagrun AS max_active_tis_per_dagrun, | |
| task_instance.updated_at AS updated_at, | |
| task_instance.context_carrier AS context_carrier, | |
| task_instance.span_status AS span_status, | |
| task_instance.external_executor_id AS external_executor_id, | |
| task_instance.trigger_id AS trigger_id, | |
| task_instance.trigger_timeout AS trigger_timeout, | |
| task_instance.next_method AS next_method, | |
| task_instance.next_kwargs AS next_kwargs, | |
| task_instance.dag_version_id AS dag_version_id, | |
| sum(task_instance.pool_slots) | |
| OVER (PARTITION BY task_instance.pool ORDER BY -task_instance.priority_weight, dag_run.logical_date, task_instance.map_index) AS pool_slots_taken | |
| FROM | |
| task_instance | |
| JOIN (SELECT | |
| task_instance.rendered_map_index AS rendered_map_index, | |
| task_instance.task_display_name AS task_display_name, | |
| task_instance.id AS id, | |
| task_instance.task_id AS task_id, | |
| task_instance.dag_id AS dag_id, | |
| task_instance.run_id AS run_id, | |
| task_instance.map_index AS map_index, | |
| task_instance.start_date AS start_date, | |
| task_instance.end_date AS end_date, | |
| task_instance.duration AS duration, | |
| task_instance.state AS state, | |
| task_instance.try_number AS try_number, | |
| task_instance.max_tries AS max_tries, | |
| task_instance.hostname AS hostname, | |
| task_instance.unixname AS unixname, | |
| task_instance.pool AS pool, | |
| task_instance.pool_slots AS pool_slots, | |
| task_instance.queue AS queue, | |
| task_instance.priority_weight AS priority_weight, | |
| task_instance.operator AS operator, | |
| task_instance.custom_operator_name AS custom_operator_name, | |
| task_instance.queued_dttm AS queued_dttm, | |
| task_instance.scheduled_dttm AS scheduled_dttm, | |
| task_instance.queued_by_job_id AS queued_by_job_id, | |
| task_instance.last_heartbeat_at AS last_heartbeat_at, | |
| task_instance.pid AS pid, | |
| task_instance.executor AS executor, | |
| task_instance.executor_config AS executor_config, | |
| task_instance.max_active_tis_per_dag AS max_active_tis_per_dag, | |
| task_instance.max_active_tis_per_dagrun AS max_active_tis_per_dagrun, | |
| task_instance.updated_at AS updated_at, | |
| task_instance.context_carrier AS context_carrier, | |
| task_instance.span_status AS span_status, | |
| task_instance.external_executor_id AS external_executor_id, | |
| task_instance.trigger_id AS trigger_id, | |
| task_instance.trigger_timeout AS trigger_timeout, | |
| task_instance.next_method AS next_method, | |
| task_instance.next_kwargs AS next_kwargs, | |
| task_instance.dag_version_id AS dag_version_id, | |
| row_number() | |
| OVER (PARTITION BY task_instance.dag_id, task_instance.run_id, task_instance.task_id ORDER BY -task_instance.priority_weight, dag_run.logical_date, task_instance.map_index) AS mapped_tis_per_dagrun_count | |
| FROM | |
| task_instance | |
| JOIN (SELECT | |
| task_instance.rendered_map_index AS rendered_map_index, | |
| task_instance.task_display_name AS task_display_name, | |
| task_instance.id AS id, | |
| task_instance.task_id AS task_id, | |
| task_instance.dag_id AS dag_id, | |
| task_instance.run_id AS run_id, | |
| task_instance.map_index AS map_index, | |
| task_instance.start_date AS start_date, | |
| task_instance.end_date AS end_date, | |
| task_instance.duration AS duration, | |
| task_instance.state AS state, | |
| task_instance.try_number AS try_number, | |
| task_instance.max_tries AS max_tries, | |
| task_instance.hostname AS hostname, | |
| task_instance.unixname AS unixname, | |
| task_instance.pool AS pool, | |
| task_instance.pool_slots AS pool_slots, | |
| task_instance.queue AS queue, | |
| task_instance.priority_weight AS priority_weight, | |
| task_instance.operator AS operator, | |
| task_instance.custom_operator_name AS custom_operator_name, | |
| task_instance.queued_dttm AS queued_dttm, | |
| task_instance.scheduled_dttm AS scheduled_dttm, | |
| task_instance.queued_by_job_id AS queued_by_job_id, | |
| task_instance.last_heartbeat_at AS last_heartbeat_at, | |
| task_instance.pid AS pid, | |
| task_instance.executor AS executor, | |
| task_instance.executor_config AS executor_config, | |
| task_instance.max_active_tis_per_dag AS max_active_tis_per_dag, | |
| task_instance.max_active_tis_per_dagrun AS max_active_tis_per_dagrun, | |
| task_instance.updated_at AS updated_at, | |
| task_instance.context_carrier AS context_carrier, | |
| task_instance.span_status AS span_status, | |
| task_instance.external_executor_id AS external_executor_id, | |
| task_instance.trigger_id AS trigger_id, | |
| task_instance.trigger_timeout AS trigger_timeout, | |
| task_instance.next_method AS next_method, | |
| task_instance.next_kwargs AS next_kwargs, | |
| task_instance.dag_version_id AS dag_version_id, | |
| row_number() | |
| OVER (PARTITION BY task_instance.dag_id, task_instance.task_id ORDER BY -task_instance.priority_weight, dag_run.logical_date, task_instance.map_index) AS tis_per_dag_count | |
| FROM | |
| task_instance | |
| JOIN (SELECT | |
| task_instance.rendered_map_index AS rendered_map_index, | |
| task_instance.task_display_name AS task_display_name, | |
| task_instance.id AS id, | |
| task_instance.task_id AS task_id, | |
| task_instance.dag_id AS dag_id, | |
| task_instance.run_id AS run_id, | |
| task_instance.map_index AS map_index, | |
| task_instance.start_date AS start_date, | |
| task_instance.end_date AS end_date, | |
| task_instance.duration AS duration, | |
| task_instance.state AS state, | |
| task_instance.try_number AS try_number, | |
| task_instance.max_tries AS max_tries, | |
| task_instance.hostname AS hostname, | |
| task_instance.unixname AS unixname, | |
| task_instance.pool AS pool, | |
| task_instance.pool_slots AS pool_slots, | |
| task_instance.queue AS queue, | |
| task_instance.priority_weight AS priority_weight, | |
| task_instance.operator AS operator, | |
| task_instance.custom_operator_name AS custom_operator_name, | |
| task_instance.queued_dttm AS queued_dttm, | |
| task_instance.scheduled_dttm AS scheduled_dttm, | |
| task_instance.queued_by_job_id AS queued_by_job_id, | |
| task_instance.last_heartbeat_at AS last_heartbeat_at, | |
| task_instance.pid AS pid, | |
| task_instance.executor AS executor, | |
| task_instance.executor_config AS executor_config, | |
| task_instance.max_active_tis_per_dag AS max_active_tis_per_dag, | |
| task_instance.max_active_tis_per_dagrun AS max_active_tis_per_dagrun, | |
| task_instance.updated_at AS updated_at, | |
| task_instance.context_carrier AS context_carrier, | |
| task_instance.span_status AS span_status, | |
| task_instance.external_executor_id AS external_executor_id, | |
| task_instance.trigger_id AS trigger_id, | |
| task_instance.trigger_timeout AS trigger_timeout, | |
| task_instance.next_method AS next_method, | |
| task_instance.next_kwargs AS next_kwargs, | |
| task_instance.dag_version_id AS dag_version_id, | |
| row_number() | |
| OVER (PARTITION BY task_instance.dag_id, task_instance.run_id ORDER BY -task_instance.priority_weight, dag_run.logical_date, task_instance.map_index) AS total_tis_per_dagrun_count | |
| FROM | |
| task_instance | |
| JOIN dag_run ON dag_run.dag_id = task_instance.dag_id | |
| AND dag_run.run_id = task_instance.run_id | |
| JOIN dag ON task_instance.dag_id = dag.dag_id | |
| WHERE | |
| dag_run.state = :state_2 | |
| AND NOT dag.is_paused | |
| AND task_instance.state = :state_3 | |
| AND dag.bundle_name IS NOT NULL | |
| ) AS anon_4 ON task_instance.id = anon_4.id | |
| LEFT OUTER JOIN anon_5 | |
| ON task_instance.dag_id = anon_5.dag_id AND task_instance.run_id = anon_5.run_id | |
| JOIN dag_run ON task_instance.run_id = dag_run.run_id | |
| JOIN dag ON task_instance.dag_id = dag.dag_id | |
| WHERE | |
| coalesce(anon_4.total_tis_per_dagrun_count, 0) + coalesce(anon_5.now_running, 0) | |
| <= coalesce(dag.max_active_tasks, :coalesce_1) | |
| ) AS anon_3 ON task_instance.id = anon_3.id | |
| LEFT OUTER JOIN anon_6 | |
| ON task_instance.dag_id = anon_6.dag_id AND task_instance.task_id = anon_6.task_id | |
| JOIN dag_run ON task_instance.run_id = dag_run.run_id | |
| WHERE | |
| coalesce(anon_3.tis_per_dag_count, 0) + coalesce(anon_6.now_running, 0) | |
| <= coalesce(task_instance.max_active_tis_per_dag, :coalesce_2) | |
| ) AS anon_2 ON task_instance.id = anon_2.id | |
| LEFT OUTER JOIN anon_7 ON task_instance.dag_id = anon_7.dag_id AND task_instance.run_id = anon_7.run_id | |
| AND task_instance.task_id = anon_7.task_id | |
| JOIN dag_run ON task_instance.run_id = dag_run.run_id | |
| WHERE | |
| coalesce(anon_2.mapped_tis_per_dagrun_count, 0) + coalesce(anon_7.now_running, 0) | |
| <= coalesce(task_instance.max_active_tis_per_dagrun, :coalesce_3) | |
| ) AS anon_1 ON task_instance.id = anon_1.id | |
| LEFT OUTER JOIN anon_8 ON task_instance.pool = anon_8.pool | |
| JOIN dag_run ON task_instance.run_id = dag_run.run_id | |
| JOIN slot_pool ON task_instance.pool = slot_pool.pool | |
| JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id | |
| WHERE | |
| coalesce(anon_1.pool_slots_taken, 0) + coalesce(anon_8.now_running, 0) <= coalesce(slot_pool.slots, :coalesce_4) | |
| LIMIT :param_1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment