From 2bce943ccad029f6038135835bee8fe83d36b4a0 Mon Sep 17 00:00:00 2001 From: Nataneljpwd Date: Mon, 23 Mar 2026 21:48:58 +0200 Subject: [PATCH 01/15] added new_dagruns_to_examine configuration --- .../src/airflow/config_templates/config.yml | 9 ++++ airflow-core/src/airflow/models/dagrun.py | 53 ++++++++++++++----- 2 files changed, 48 insertions(+), 14 deletions(-) diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index 0c002f5276cfe..a53a751068144 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -2519,6 +2519,15 @@ scheduler: type: integer default: "20" see_also: ":ref:`scheduler:ha:tunables`" + max_new_dagruns_per_loop_to_schedule: + description: | + How many NEW dagruns should be scheduled and examined (and locked) when scheduling + and queuing tasks. + Total max_dagruns_per_loop_to_schedule does not change. + example: ~ + version_added: 3.2.1 + type: integer + default: "0" use_job_schedule: description: | Turn off scheduler use of cron intervals by setting this to ``False``. diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index 73a5a3a875ac7..7772a07c7bd2b 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -605,24 +605,49 @@ def get_running_dag_runs_to_examine(cls, session: Session) -> ScalarResult[DagRu from airflow.models.backfill import BackfillDagRun from airflow.models.dag import DagModel - query = ( - select(cls) - .with_hint(cls, "USE INDEX (idx_dag_run_running_dags)", dialect_name="mysql") - .where(cls.state == DagRunState.RUNNING) - .join(DagModel, DagModel.dag_id == cls.dag_id) - .join(BackfillDagRun, BackfillDagRun.dag_run_id == DagRun.id, isouter=True) - .where( - DagModel.is_paused == false(), - DagModel.is_stale == false(), + def _get_dagrun_query(filters: list[any], order_by: list[any], limit: int): + return ( + select(cls) + .with_hint(cls, "USE INDEX (idx_dag_run_running_dags)", dialect_name="mysql") + .where(cls.state == DagRunState.RUNNING) + .join(DagModel, DagModel.dag_id == cls.dag_id) + .join(BackfillDagRun, BackfillDagRun.dag_run_id == DagRun.id, isouter=True) + .where(*filters) + .order_by(*order_by) + .limit(limit) ) - .order_by( - nulls_first(cast("ColumnElement[Any]", BackfillDagRun.sort_ordinal), session=session), - nulls_first(cast("ColumnElement[Any]", cls.last_scheduling_decision), session=session), - cls.run_after, + + filters = [ + DagModel.is_paused == false(), + DagModel.is_stale == false(), + ] + order = [ + nulls_first(cast("ColumnElement[Any]", BackfillDagRun.sort_ordinal), session=session), + nulls_first(cast("ColumnElement[Any]", cls.last_scheduling_decision), session=session), + cls.run_after, + ] + + new_dagruns_to_examine = airflow_conf.getint("scheduler", "max_new_dagruns_per_loop_to_schedule") + dagruns_to_examine = cls.DEFAULT_DAGRUNS_TO_EXAMINE - new_dagruns_to_examine + + if new_dagruns_to_examine < 0 or new_dagruns_to_examine > cls.DEFAULT_DAGRUNS_TO_EXAMINE: + log.warning( + "'max_new_dagruns_per_loop_to_schedule' is greater or equal to max_dagruns_per_loop_to_schedule or smaller than 0, ignoring configuration" ) - .limit(cls.DEFAULT_DAGRUNS_TO_EXAMINE) + dagruns_to_examine = cls.DEFAULT_DAGRUNS_TO_EXAMINE + new_dagruns_to_examine = 0 + + query = _get_dagrun_query( + filters, + order, + dagruns_to_examine, ) + if new_dagruns_to_examine > 0: + query = query.union( + _get_dagrun_query([*filters, DagRun.last_scheduling_decision.is_not(None)], order, new_dagruns_to_examine) + ) + query = query.where(DagRun.run_after <= func.now()) result = session.scalars(with_row_locks(query, of=cls, session=session, skip_locked=True)).unique() From 03bacfb2cb4aee285af00997006bc30cb9d4a3f3 Mon Sep 17 00:00:00 2001 From: Nataneljpwd Date: Fri, 27 Mar 2026 15:20:26 +0300 Subject: [PATCH 02/15] added an option to choose new dagruns to schedule amount --- .../src/airflow/config_templates/config.yml | 4 +- airflow-core/src/airflow/models/dagrun.py | 71 +++++++---- airflow-core/tests/unit/models/test_dagrun.py | 115 ++++++++++++++++++ 3 files changed, 166 insertions(+), 24 deletions(-) diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index a53a751068144..e999224860205 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -2523,7 +2523,9 @@ scheduler: description: | How many NEW dagruns should be scheduled and examined (and locked) when scheduling and queuing tasks. - Total max_dagruns_per_loop_to_schedule does not change. + If set, select `max_dagruns_per_loop_to_schedule` old dagruns (that have been + examined before) + And `max_new_dagruns_per_loop_to_schedule` new dagruns (that have not yet been examined). example: ~ version_added: 3.2.1 type: integer diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index 7772a07c7bd2b..91f92a9948759 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -49,6 +49,7 @@ not_, or_, text, + union_all, update, ) from sqlalchemy.dialects import postgresql @@ -56,7 +57,16 @@ from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.ext.mutable import MutableDict -from sqlalchemy.orm import Mapped, declared_attr, joinedload, mapped_column, relationship, synonym, validates +from sqlalchemy.orm import ( + Mapped, + aliased, + declared_attr, + joinedload, + mapped_column, + relationship, + synonym, + validates, +) from sqlalchemy.sql.expression import false, select from sqlalchemy.sql.functions import coalesce @@ -312,6 +322,11 @@ class DagRun(Base, LoggingMixin): "max_dagruns_per_loop_to_schedule", fallback=20, ) + DEFAULT_NEW_DAGRUNS_TO_EXAMINE = airflow_conf.getint( + "scheduler", + "max_new_dagruns_per_loop_to_schedule", + fallback=0, + ) _ti_dag_versions = association_proxy("task_instances", "dag_version") _tih_dag_versions = association_proxy("task_instances_histories", "dag_version") @@ -606,51 +621,61 @@ def get_running_dag_runs_to_examine(cls, session: Session) -> ScalarResult[DagRu from airflow.models.dag import DagModel def _get_dagrun_query(filters: list[any], order_by: list[any], limit: int): - return ( - select(cls) - .with_hint(cls, "USE INDEX (idx_dag_run_running_dags)", dialect_name="mysql") - .where(cls.state == DagRunState.RUNNING) + return select( + select(DagRun) + .with_hint(DagRun, "USE INDEX (idx_dag_run_running_dags)", dialect_name="mysql") + .where(DagRun.state == DagRunState.RUNNING) .join(DagModel, DagModel.dag_id == cls.dag_id) .join(BackfillDagRun, BackfillDagRun.dag_run_id == DagRun.id, isouter=True) .where(*filters) .order_by(*order_by) .limit(limit) + .subquery() ) filters = [ + DagRun.run_after <= func.now(), DagModel.is_paused == false(), DagModel.is_stale == false(), ] + order = [ nulls_first(cast("ColumnElement[Any]", BackfillDagRun.sort_ordinal), session=session), - nulls_first(cast("ColumnElement[Any]", cls.last_scheduling_decision), session=session), - cls.run_after, + nulls_first(cast("ColumnElement[Any]", DagRun.last_scheduling_decision), session=session), + DagRun.run_after, ] - new_dagruns_to_examine = airflow_conf.getint("scheduler", "max_new_dagruns_per_loop_to_schedule") - dagruns_to_examine = cls.DEFAULT_DAGRUNS_TO_EXAMINE - new_dagruns_to_examine + new_dagruns_to_examine = cls.DEFAULT_NEW_DAGRUNS_TO_EXAMINE + dagruns_to_examine = cls.DEFAULT_DAGRUNS_TO_EXAMINE - if new_dagruns_to_examine < 0 or new_dagruns_to_examine > cls.DEFAULT_DAGRUNS_TO_EXAMINE: - log.warning( - "'max_new_dagruns_per_loop_to_schedule' is greater or equal to max_dagruns_per_loop_to_schedule or smaller than 0, ignoring configuration" - ) - dagruns_to_examine = cls.DEFAULT_DAGRUNS_TO_EXAMINE + if new_dagruns_to_examine < 0: + log.warning("'max_new_dagruns_per_loop_to_schedule' is smaller than 0, ignoring configuration") new_dagruns_to_examine = 0 query = _get_dagrun_query( - filters, - order, - dagruns_to_examine, + filters=filters + if new_dagruns_to_examine == 0 + else [*filters, DagRun.last_scheduling_decision.is_not(None)], + order_by=order, + limit=dagruns_to_examine, ) - if new_dagruns_to_examine > 0: - query = query.union( - _get_dagrun_query([*filters, DagRun.last_scheduling_decision.is_not(None)], order, new_dagruns_to_examine) - ) + query = aliased( + DagRun, + union_all( + query, + _get_dagrun_query( + filters=[*filters, DagRun.last_scheduling_decision.is_(None)], + order_by=order, + limit=new_dagruns_to_examine, + ), + ).alias("combined"), + ) - query = query.where(DagRun.run_after <= func.now()) + result = session.scalars( + with_row_locks(select(query), of=cls, session=session, skip_locked=True) + ).unique() - result = session.scalars(with_row_locks(query, of=cls, session=session, skip_locked=True)).unique() return result @classmethod diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index 7ad78292a1edd..3842cb4e7cea9 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -987,6 +987,121 @@ def test_wait_for_downstream(self, dag_maker, session, prev_ti_state, is_ti_sche schedulable_tis = [ti.task_id for ti in decision.schedulable_tis] assert (upstream.task_id in schedulable_tis) == is_ti_schedulable + def test_get_running_dag_runs_ignores_new_dagruns_to_examine_when_smaller_than_0( + self, session, dag_maker + ): + + DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = 0 + + def create_dagruns( + last_scheduling_decision: datetime.datetime | None = None, + count: int = 20, + ): + dagrun = dag_maker.create_dagrun( + run_type=DagRunType.SCHEDULED, + state=State.RUNNING, + run_after=datetime.datetime(2024, 1, 1), + ) + dagrun.last_scheduling_decision = last_scheduling_decision + session.merge(dagrun) + for _ in range(count - 1): + dagrun = dag_maker.create_dagrun_after( + dagrun, + run_type=DagRunType.SCHEDULED, + state=State.RUNNING, + run_after=datetime.datetime(2024, 1, 1), + ) + + dagrun.last_scheduling_decision = last_scheduling_decision + session.merge(dagrun) + + with dag_maker( + dag_id="dummy_dag", + schedule=datetime.timedelta(days=1), + start_date=datetime.datetime(2024, 1, 1), + session=session, + ): + EmptyOperator(task_id="dummy_task") + + create_dagruns(None, 10) + + with dag_maker( + dag_id="dummy_dag2", + schedule=datetime.timedelta(days=1), + start_date=datetime.datetime(2024, 1, 1), + session=session, + ): + EmptyOperator(task_id="dummy_task2") + + create_dagruns(func.now(), 20) + + session.flush() + + dagruns = list(DagRun.get_running_dag_runs_to_examine(session=session)) + + assert len([dagrun for dagrun in dagruns if dagrun.last_scheduling_decision is None]) == 10 + + assert len([dagrun for dagrun in dagruns if dagrun.last_scheduling_decision is not None]) == 10 + + def test_get_running_dag_runs_with_max_new_dagruns_to_examine(self, session, dag_maker): + + DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = 10 + + def create_dagruns( + last_scheduling_decision: datetime.datetime | None = None, + count: int = 20, + ): + dagrun = dag_maker.create_dagrun( + run_type=DagRunType.SCHEDULED, + state=State.RUNNING, + run_after=datetime.datetime(2024, 1, 1), + ) + dagrun.last_scheduling_decision = last_scheduling_decision + session.merge(dagrun) + for _ in range(count - 1): + dagrun = dag_maker.create_dagrun_after( + dagrun, + run_type=DagRunType.SCHEDULED, + state=State.RUNNING, + run_after=datetime.datetime(2024, 1, 1), + ) + + dagrun.last_scheduling_decision = last_scheduling_decision + session.merge(dagrun) + + with dag_maker( + dag_id="dummy_dag", + schedule=datetime.timedelta(days=1), + start_date=datetime.datetime(2024, 1, 1), + session=session, + ): + EmptyOperator(task_id="dummy_task") + + create_dagruns(None) + + with dag_maker( + dag_id="dummy_dag2", + schedule=datetime.timedelta(days=1), + start_date=datetime.datetime(2024, 1, 1), + session=session, + ): + EmptyOperator(task_id="dummy_task2") + + create_dagruns(func.now()) + + session.flush() + + dagruns = list(DagRun.get_running_dag_runs_to_examine(session=session)) + + assert ( + len([dagrun for dagrun in dagruns if dagrun.last_scheduling_decision is None]) + == DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE + ) + assert ( + len([dagrun for dagrun in dagruns if dagrun.last_scheduling_decision is not None]) + == DagRun.DEFAULT_DAGRUNS_TO_EXAMINE + ) + @pytest.mark.parametrize("state", [DagRunState.QUEUED, DagRunState.RUNNING]) def test_next_dagruns_to_examine_only_unpaused(self, session, state, testing_dag_bundle): """ From c70108c2cd9855ae241365d6a0dae537b019cb8c Mon Sep 17 00:00:00 2001 From: Nataneljpwd Date: Fri, 27 Mar 2026 16:21:11 +0300 Subject: [PATCH 03/15] separated to 2 queries --- airflow-core/src/airflow/models/dagrun.py | 49 ++++++++++--------- airflow-core/tests/unit/models/test_dagrun.py | 9 +++- 2 files changed, 33 insertions(+), 25 deletions(-) diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index b2ce93342fd88..5549c25efe3eb 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -49,7 +49,6 @@ not_, or_, text, - union_all, update, ) from sqlalchemy.dialects import postgresql @@ -624,15 +623,18 @@ def get_running_dag_runs_to_examine(cls, session: Session) -> ScalarResult[DagRu def _get_dagrun_query(filters: list[any], order_by: list[any], limit: int): return select( - select(DagRun) - .with_hint(DagRun, "USE INDEX (idx_dag_run_running_dags)", dialect_name="mysql") - .where(DagRun.state == DagRunState.RUNNING) - .join(DagModel, DagModel.dag_id == cls.dag_id) - .join(BackfillDagRun, BackfillDagRun.dag_run_id == DagRun.id, isouter=True) - .where(*filters) - .order_by(*order_by) - .limit(limit) - .subquery() + aliased( + DagRun, + select(DagRun) + .with_hint(DagRun, "USE INDEX (idx_dag_run_running_dags)", dialect_name="mysql") + .where(DagRun.state == DagRunState.RUNNING) + .join(DagModel, DagModel.dag_id == cls.dag_id) + .join(BackfillDagRun, BackfillDagRun.dag_run_id == DagRun.id, isouter=True) + .where(*filters) + .order_by(*order_by) + .limit(limit) + .subquery(), + ), ) filters = [ @@ -662,21 +664,22 @@ def _get_dagrun_query(filters: list[any], order_by: list[any], limit: int): limit=dagruns_to_examine, ) - query = aliased( - DagRun, - union_all( - query, - _get_dagrun_query( - filters=[*filters, DagRun.last_scheduling_decision.is_(None)], - order_by=order, - limit=new_dagruns_to_examine, - ), - ).alias("combined"), + result = ( + session.scalars(with_row_locks(query, of=cls, session=session, skip_locked=True)).unique().all() ) - result = session.scalars( - with_row_locks(select(query), of=cls, session=session, skip_locked=True) - ).unique() + if new_dagruns_to_examine > 0: + new_dagruns_query = _get_dagrun_query( + filters=[*filters, DagRun.last_scheduling_decision.is_(None)], + order_by=order, + limit=new_dagruns_to_examine, + ) + + result = result + ( + session.scalars(with_row_locks(new_dagruns_query, of=cls, session=session, skip_locked=True)) + .unique() + .all() + ) return result diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index 8e1623e6606ec..9c5fa1e812005 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -1146,9 +1146,10 @@ def test_next_dagruns_to_examine_only_unpaused(self, session, state, testing_dag if state == DagRunState.RUNNING: func = DagRun.get_running_dag_runs_to_examine + runs = func(session) else: func = DagRun.get_queued_dag_runs_to_set_running - runs = func(session).all() + runs = func(session).all() assert runs == [dr] @@ -1156,7 +1157,11 @@ def test_next_dagruns_to_examine_only_unpaused(self, session, state, testing_dag session.merge(orm_dag) session.commit() - runs = func(session).all() + if state == DagRunState.RUNNING: + runs = func(session) + else: + runs = func(session).all() + assert runs == [] @mock.patch.object(Stats, "timing") From c5431b90ef43d5196e705c2c2033923f0f157b17 Mon Sep 17 00:00:00 2001 From: Nataneljpwd Date: Fri, 27 Mar 2026 16:33:54 +0300 Subject: [PATCH 04/15] removed subquery --- airflow-core/src/airflow/models/dagrun.py | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index 5549c25efe3eb..400b692fe0663 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -58,7 +58,6 @@ from sqlalchemy.ext.mutable import MutableDict from sqlalchemy.orm import ( Mapped, - aliased, declared_attr, joinedload, mapped_column, @@ -622,19 +621,15 @@ def get_running_dag_runs_to_examine(cls, session: Session) -> ScalarResult[DagRu from airflow.models.dag import DagModel def _get_dagrun_query(filters: list[any], order_by: list[any], limit: int): - return select( - aliased( - DagRun, - select(DagRun) - .with_hint(DagRun, "USE INDEX (idx_dag_run_running_dags)", dialect_name="mysql") - .where(DagRun.state == DagRunState.RUNNING) - .join(DagModel, DagModel.dag_id == cls.dag_id) - .join(BackfillDagRun, BackfillDagRun.dag_run_id == DagRun.id, isouter=True) - .where(*filters) - .order_by(*order_by) - .limit(limit) - .subquery(), - ), + return ( + select(DagRun) + .with_hint(DagRun, "USE INDEX (idx_dag_run_running_dags)", dialect_name="mysql") + .where(DagRun.state == DagRunState.RUNNING) + .join(DagModel, DagModel.dag_id == cls.dag_id) + .join(BackfillDagRun, BackfillDagRun.dag_run_id == DagRun.id, isouter=True) + .where(*filters) + .order_by(*order_by) + .limit(limit) ) filters = [ From 36e05143d2a67fa56779f28a56164797231986f2 Mon Sep 17 00:00:00 2001 From: Natanel Rudyuklakir Date: Fri, 27 Mar 2026 19:02:46 +0300 Subject: [PATCH 05/15] fixed mypy --- airflow-core/src/airflow/models/dagrun.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index 400b692fe0663..a642259bb6af4 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -110,7 +110,7 @@ from pydantic import NonNegativeInt from sqlalchemy.engine import ScalarResult from sqlalchemy.orm import Session - from sqlalchemy.sql.elements import Case, ColumnElement + from sqlalchemy.sql.elements import BinaryExpression, Case, ColumnElement from airflow.models.dag_version import DagVersion from airflow.models.taskinstancekey import TaskInstanceKey @@ -607,7 +607,7 @@ def active_runs_of_dags( @classmethod @retry_db_transaction - def get_running_dag_runs_to_examine(cls, session: Session) -> ScalarResult[DagRun]: + def get_running_dag_runs_to_examine(cls, session: Session) -> Sequence[DagRun]: """ Return the next DagRuns that the scheduler should attempt to schedule. @@ -620,7 +620,7 @@ def get_running_dag_runs_to_examine(cls, session: Session) -> ScalarResult[DagRu from airflow.models.backfill import BackfillDagRun from airflow.models.dag import DagModel - def _get_dagrun_query(filters: list[any], order_by: list[any], limit: int): + def _get_dagrun_query(filters: list[BinaryExpression], order_by: list[BinaryExpression], limit: int): return ( select(DagRun) .with_hint(DagRun, "USE INDEX (idx_dag_run_running_dags)", dialect_name="mysql") @@ -659,7 +659,7 @@ def _get_dagrun_query(filters: list[any], order_by: list[any], limit: int): limit=dagruns_to_examine, ) - result = ( + result: Sequence[DagRun] = ( session.scalars(with_row_locks(query, of=cls, session=session, skip_locked=True)).unique().all() ) From 25ced1ce5aa3c42078ef5996cdf77a9079f8f228 Mon Sep 17 00:00:00 2001 From: Natanel Rudyuklakir Date: Fri, 27 Mar 2026 19:15:48 +0300 Subject: [PATCH 06/15] fixed mypy --- airflow-core/src/airflow/models/dagrun.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index a642259bb6af4..bc88b7894b204 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -39,6 +39,7 @@ Index, Integer, PrimaryKeyConstraint, + SQLColumnExpression, String, Text, UniqueConstraint, @@ -110,7 +111,7 @@ from pydantic import NonNegativeInt from sqlalchemy.engine import ScalarResult from sqlalchemy.orm import Session - from sqlalchemy.sql.elements import BinaryExpression, Case, ColumnElement + from sqlalchemy.sql.elements import Case, ColumnElement from airflow.models.dag_version import DagVersion from airflow.models.taskinstancekey import TaskInstanceKey @@ -620,7 +621,9 @@ def get_running_dag_runs_to_examine(cls, session: Session) -> Sequence[DagRun]: from airflow.models.backfill import BackfillDagRun from airflow.models.dag import DagModel - def _get_dagrun_query(filters: list[BinaryExpression], order_by: list[BinaryExpression], limit: int): + def _get_dagrun_query( + filters: list[ColumnElement[bool]], order_by: list[SQLColumnExpression[Any]], limit: int + ): return ( select(DagRun) .with_hint(DagRun, "USE INDEX (idx_dag_run_running_dags)", dialect_name="mysql") @@ -669,13 +672,14 @@ def _get_dagrun_query(filters: list[BinaryExpression], order_by: list[BinaryExpr order_by=order, limit=new_dagruns_to_examine, ) - - result = result + ( + new_dagruns: Sequence[DagRun] = ( session.scalars(with_row_locks(new_dagruns_query, of=cls, session=session, skip_locked=True)) .unique() .all() ) + result = [*result, *new_dagruns] + return result @classmethod From e727f7e7b62e54f5bf3903e50a2720728654f244 Mon Sep 17 00:00:00 2001 From: Nataneljpwd <104094207+Nataneljpwd@users.noreply.github.com> Date: Wed, 15 Apr 2026 22:07:54 +0300 Subject: [PATCH 07/15] Update airflow-core/tests/unit/models/test_dagrun.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- airflow-core/tests/unit/models/test_dagrun.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index 9c5fa1e812005..8479e82f25229 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -1049,9 +1049,9 @@ def create_dagruns( assert len([dagrun for dagrun in dagruns if dagrun.last_scheduling_decision is not None]) == 10 - def test_get_running_dag_runs_with_max_new_dagruns_to_examine(self, session, dag_maker): + def test_get_running_dag_runs_with_max_new_dagruns_to_examine(self, session, dag_maker, monkeypatch): - DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = 10 + monkeypatch.setattr(DagRun, "DEFAULT_NEW_DAGRUNS_TO_EXAMINE", 10) def create_dagruns( last_scheduling_decision: datetime.datetime | None = None, From 39b126bdea34c7ca88c940d9fbc43429f88d25cc Mon Sep 17 00:00:00 2001 From: Natanel Rudyuklakir Date: Wed, 15 Apr 2026 22:14:40 +0300 Subject: [PATCH 08/15] fixed cr comments --- airflow-core/src/airflow/models/dagrun.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index ea4d24f5c31b7..b9eb1afa4e404 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -668,13 +668,9 @@ def _get_dagrun_query( DagRun.run_after, ] - new_dagruns_to_examine = cls.DEFAULT_NEW_DAGRUNS_TO_EXAMINE + new_dagruns_to_examine = max(cls.DEFAULT_NEW_DAGRUNS_TO_EXAMINE, 0) dagruns_to_examine = cls.DEFAULT_DAGRUNS_TO_EXAMINE - if new_dagruns_to_examine < 0: - log.warning("'max_new_dagruns_per_loop_to_schedule' is smaller than 0, ignoring configuration") - new_dagruns_to_examine = 0 - query = _get_dagrun_query( filters=filters if new_dagruns_to_examine == 0 From 4fe0a64b315af2698f3e5fe67280060e38c88247 Mon Sep 17 00:00:00 2001 From: Nataneljpwd <104094207+Nataneljpwd@users.noreply.github.com> Date: Wed, 15 Apr 2026 22:16:26 +0300 Subject: [PATCH 09/15] Update airflow-core/tests/unit/models/test_dagrun.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- airflow-core/tests/unit/models/test_dagrun.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index 8479e82f25229..a2bcf0473bf31 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -994,10 +994,10 @@ def test_wait_for_downstream(self, dag_maker, session, prev_ti_state, is_ti_sche assert (upstream.task_id in schedulable_tis) == is_ti_schedulable def test_get_running_dag_runs_ignores_new_dagruns_to_examine_when_smaller_than_0( - self, session, dag_maker + self, session, dag_maker, monkeypatch ): - DagRun.DEFAULT_NEW_DAGRUNS_TO_EXAMINE = 0 + monkeypatch.setattr(DagRun, "DEFAULT_NEW_DAGRUNS_TO_EXAMINE", 0) def create_dagruns( last_scheduling_decision: datetime.datetime | None = None, From fa5383575ecb9b95b4558b7802d28dd87eb6647e Mon Sep 17 00:00:00 2001 From: Nataneljpwd <104094207+Nataneljpwd@users.noreply.github.com> Date: Wed, 22 Apr 2026 16:21:43 +0300 Subject: [PATCH 10/15] Update airflow-core/src/airflow/config_templates/config.yml Co-authored-by: Elad Kalif <45845474+eladkal@users.noreply.github.com> --- airflow-core/src/airflow/config_templates/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index 64470db355f74..bd7efe3086294 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -2576,7 +2576,7 @@ scheduler: examined before) And `max_new_dagruns_per_loop_to_schedule` new dagruns (that have not yet been examined). example: ~ - version_added: 3.2.1 + version_added: 3.2.2 type: integer default: "0" use_job_schedule: From 14918442d63e8f47725dad97b6181fd90734c249 Mon Sep 17 00:00:00 2001 From: Natanel Rudyuklakir Date: Thu, 21 May 2026 19:44:44 +0300 Subject: [PATCH 11/15] resolve CR comments --- airflow-core/newsfragments/64294.feature.rst | 1 + .../newsfragments/64294.significant.rst | 1 + .../src/airflow/config_templates/config.yml | 14 +++++++----- airflow-core/src/airflow/models/dagrun.py | 22 +++++++++---------- airflow-core/tests/unit/models/test_dagrun.py | 21 ++++++++++++++---- 5 files changed, 38 insertions(+), 21 deletions(-) create mode 100644 airflow-core/newsfragments/64294.feature.rst create mode 100644 airflow-core/newsfragments/64294.significant.rst diff --git a/airflow-core/newsfragments/64294.feature.rst b/airflow-core/newsfragments/64294.feature.rst new file mode 100644 index 0000000000000..7a6f857dacb82 --- /dev/null +++ b/airflow-core/newsfragments/64294.feature.rst @@ -0,0 +1 @@ +Add a new ``max_new_dagruns_per_loop_to_schedule`` configuration to control how many new dagruns are scheduled each scheduler iteration diff --git a/airflow-core/newsfragments/64294.significant.rst b/airflow-core/newsfragments/64294.significant.rst new file mode 100644 index 0000000000000..8a304373eb82d --- /dev/null +++ b/airflow-core/newsfragments/64294.significant.rst @@ -0,0 +1 @@ +``get_running_dag_runs_to_examine`` now returns a ``Sequence[DagRun]`` type instead of ``ScalarResult[Dagrun]`` diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index a48c96de78e2c..33fb0a4caa350 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -2570,13 +2570,15 @@ scheduler: see_also: ":ref:`scheduler:ha:tunables`" max_new_dagruns_per_loop_to_schedule: description: | - How many NEW dagruns should be scheduled and examined (and locked) when scheduling - and queuing tasks. - If set, select `max_dagruns_per_loop_to_schedule` old dagruns (that have been - examined before) - And `max_new_dagruns_per_loop_to_schedule` new dagruns (that have not yet been examined). + When set > 0, the scheduler runs a second query per loop that fetches up to this + many dagruns that have never been examined (``last_scheduling_decision IS NULL``), + in addition to the ``max_dagruns_per_loop_to_schedule`` already-examined ones. + + This prevents starvation of older dagruns when large batches of new dagruns are + created at once (for example, via ``TriggerDagRunOperator``). Note that the total + number of dagruns locked and scheduled per loop becomes the sum of both limits. example: ~ - version_added: 3.2.2 + version_added: 3.3.0 type: integer default: "0" use_job_schedule: diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index 800bc91da762b..177025badd6ee 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -631,9 +631,9 @@ def active_runs_of_dags( @retry_db_transaction def get_running_dag_runs_to_examine(cls, session: Session) -> Sequence[DagRun]: """ - Return the next DagRuns that the scheduler should attempt to schedule. + Return the next DagRuns (as a list) that the scheduler should attempt to schedule. - This will return zero or more DagRun rows that are row-level-locked with a "SELECT ... FOR UPDATE" + This will return zero or more DagRuns that are row-level-locked with a "SELECT ... FOR UPDATE" query, you should ensure that any scheduling decisions are made in a single transaction -- as soon as the transaction is committed it will be unlocked. @@ -671,16 +671,16 @@ def _get_dagrun_query( new_dagruns_to_examine = max(cls.DEFAULT_NEW_DAGRUNS_TO_EXAMINE, 0) dagruns_to_examine = cls.DEFAULT_DAGRUNS_TO_EXAMINE - query = _get_dagrun_query( - filters=filters - if new_dagruns_to_examine == 0 - else [*filters, DagRun.last_scheduling_decision.is_not(None)], - order_by=order, - limit=dagruns_to_examine, + old_filters = ( + [*filters, DagRun.last_scheduling_decision.is_not(None)] + if new_dagruns_to_examine > 0 + else filters ) + query = _get_dagrun_query(filters=old_filters, order_by=order, limit=dagruns_to_examine) - result: Sequence[DagRun] = ( - session.scalars(with_row_locks(query, of=cls, session=session, skip_locked=True)).unique().all() + result: list[DagRun] = cast( + "list[DagRun]", + session.scalars(with_row_locks(query, of=cls, session=session, skip_locked=True)).unique().all(), ) if new_dagruns_to_examine > 0: @@ -695,7 +695,7 @@ def _get_dagrun_query( .all() ) - result = [*result, *new_dagruns] + result.extend(new_dagruns) return result diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index a2bcf0473bf31..080ea9fe7f989 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -993,11 +993,25 @@ def test_wait_for_downstream(self, dag_maker, session, prev_ti_state, is_ti_sche schedulable_tis = [ti.task_id for ti in decision.schedulable_tis] assert (upstream.task_id in schedulable_tis) == is_ti_schedulable + @pytest.mark.parametrize( + "new_dagruns_to_examine", + [ + 0, + -1, + ], + ) def test_get_running_dag_runs_ignores_new_dagruns_to_examine_when_smaller_than_0( - self, session, dag_maker, monkeypatch + self, + session, + dag_maker, + monkeypatch, + new_dagruns_to_examine, ): - - monkeypatch.setattr(DagRun, "DEFAULT_NEW_DAGRUNS_TO_EXAMINE", 0) + monkeypatch.setattr( + DagRun, + "DEFAULT_NEW_DAGRUNS_TO_EXAMINE", + new_dagruns_to_examine, + ) def create_dagruns( last_scheduling_decision: datetime.datetime | None = None, @@ -1050,7 +1064,6 @@ def create_dagruns( assert len([dagrun for dagrun in dagruns if dagrun.last_scheduling_decision is not None]) == 10 def test_get_running_dag_runs_with_max_new_dagruns_to_examine(self, session, dag_maker, monkeypatch): - monkeypatch.setattr(DagRun, "DEFAULT_NEW_DAGRUNS_TO_EXAMINE", 10) def create_dagruns( From fa873ff236aa5a4d912cc40247e9322db07833f0 Mon Sep 17 00:00:00 2001 From: Nataneljpwd Date: Mon, 1 Jun 2026 22:02:16 +0300 Subject: [PATCH 12/15] address CR comments --- airflow-core/src/airflow/models/dagrun.py | 3 + airflow-core/tests/unit/models/test_dagrun.py | 128 ++---------------- 2 files changed, 17 insertions(+), 114 deletions(-) diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index c8f73e822b11c..db583f2b12571 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -643,6 +643,9 @@ def get_running_dag_runs_to_examine(cls, session: Session) -> Sequence[DagRun]: This will return zero or more DagRuns that are row-level-locked with a "SELECT ... FOR UPDATE" query, you should ensure that any scheduling decisions are made in a single transaction -- as soon as the transaction is committed it will be unlocked. + With max_new_dagruns_per_loop_to_schedule > 0, this runs 2 queries, one for new dagruns (where + last_scheduling_decision is None) and for old (already examined) dagruns, cleared / requeued dagruns + will appear in the new dagruns query. :meta private: """ diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index d8d85f88c7291..4250a93bdb583 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -61,9 +61,7 @@ from airflow.providers.standard.operators.python import PythonOperator, ShortCircuitOperator from airflow.sdk import DAG, BaseOperator, get_current_context, setup, task, task_group, teardown from airflow.sdk.definitions.callback import AsyncCallback -from airflow.sdk.definitions.deadline import DeadlineAlert, DeadlineReference, VariableInterval -from airflow.sdk.definitions.variable import Variable -from airflow.sdk.exceptions import AirflowRuntimeError +from airflow.sdk.definitions.deadline import DeadlineAlert, DeadlineReference from airflow.serialization.definitions.deadline import SerializedReferenceModels from airflow.serialization.serialized_objects import LazyDeserializedDAG from airflow.settings import get_policy_plugin_manager @@ -183,7 +181,7 @@ def create_dag_run( ti = dag_run.get_task_instance(task_id) if TYPE_CHECKING: assert ti - ti.set_state(task_state, session=session) + ti.set_state(task_state, session) session.flush() return dag_run @@ -787,7 +785,7 @@ def test_get_latest_runs(self, dag_maker, session): ... self.create_dag_run(dag, logical_date=timezone.datetime(2015, 1, 1), session=session) self.create_dag_run(dag, logical_date=timezone.datetime(2015, 1, 2), session=session) - dagruns = DagRun.get_latest_runs(session=session) + dagruns = DagRun.get_latest_runs(session) session.close() for dagrun in dagruns: if dagrun.dag_id == "test_latest_runs_1": @@ -1056,7 +1054,7 @@ def create_dagruns( ): EmptyOperator(task_id="dummy_task2") - create_dagruns(func.now(), 20) + create_dagruns(timezone.utcnow(), 20) session.flush() @@ -1254,12 +1252,12 @@ def test_emit_scheduling_delay(self, session, schedule, expected, testing_dag_bu triggered_by=DagRunTriggeredByType.TEST, session=session, ) - ti = dag_run.get_task_instance(dag_task.task_id, session=session) - ti.set_state(TaskInstanceState.SUCCESS, session=session) + ti = dag_run.get_task_instance(dag_task.task_id, session) + ti.set_state(TaskInstanceState.SUCCESS, session) session.flush() with mock.patch("airflow._shared.observability.metrics.stats.timing") as stats_mock: - dag_run.update_state(session=session) + dag_run.update_state(session) metric_name = f"dagrun.{dag.dag_id}.first_task_scheduling_delay" @@ -1342,13 +1340,13 @@ def test_emit_first_task_start_delay(self, session, queued_at_offset, expected, session=session, ) dag_run.queued_at = queued_at - ti = dag_run.get_task_instance(dag_task.task_id, session=session) - ti.set_state(TaskInstanceState.SUCCESS, session=session) + ti = dag_run.get_task_instance(dag_task.task_id, session) + ti.set_state(TaskInstanceState.SUCCESS, session) ti.start_date = ti_start_date session.flush() with mock.patch("airflow._shared.observability.metrics.stats.timing") as stats_mock: - dag_run.update_state(session=session) + dag_run.update_state(session) start_delay_call = call("dagrun.first_task_start_delay", mock.ANY, tags=expected_stat_tags) if expected: @@ -1461,28 +1459,17 @@ def test_dag_run_dag_versions_with_null_created_dag_version(self, dag_maker, ses assert isinstance(dag_run.dag_versions, list) assert len(dag_run.dag_versions) == 0 - @pytest.mark.parametrize( - "interval", - [ - datetime.timedelta(hours=1), - VariableInterval("my_key"), - ], - ) - @mock.patch.object(Variable, "get") @mock.patch.object(Deadline, "prune_deadlines") - def test_dagrun_success_deadline(self, _, mock_get, interval, session, deadline_test_dag): + def test_dagrun_success_deadline(self, _, session, deadline_test_dag): def on_success_callable(context): assert context["dag_run"].dag_id == "test_dag" future_date = datetime.datetime.now() + datetime.timedelta(days=365) - # First value used during resolution - mock_get.return_value = "5" - scheduler_dag = deadline_test_dag( deadline=DeadlineAlert( reference=DeadlineReference.FIXED_DATETIME(future_date), - interval=interval, + interval=datetime.timedelta(hours=1), callback=AsyncCallback(empty_callback_for_deadline), ), on_success_callback=on_success_callable, @@ -1587,73 +1574,6 @@ def test_dagrun_success_handles_empty_deadline_list(self, mock_prune, dag_maker, mock_prune.assert_not_called() assert dag_run.state == DagRunState.SUCCESS - @mock.patch.object(Variable, "get") - @mock.patch.object(Deadline, "prune_deadlines") - def test_dagrun_deadline_variable_interval_stable(self, _, mock_get, session, deadline_test_dag): - future_date = datetime.datetime.now() + datetime.timedelta(days=365) - - # First value used during resolution. - mock_get.return_value = "60" - - scheduler_dag = deadline_test_dag( - deadline=DeadlineAlert( - reference=DeadlineReference.FIXED_DATETIME(future_date), - interval=VariableInterval("my_key"), - callback=AsyncCallback(empty_callback_for_deadline), - ), - ) - - dag_run = self.create_dag_run( - dag=scheduler_dag, - task_states={"task_1": TaskInstanceState.SUCCESS, "task_2": TaskInstanceState.SUCCESS}, - session=session, - ) - dag_run.dag = scheduler_dag - - # First update resolve interval to "5". - dag_run.update_state(session=session) - - deadline = session.execute(select(Deadline)).scalars().one_or_none() - first_deadline_time = deadline.deadline_time - - # Change Variable value after resolution. - mock_get.return_value = "120" - - # Run again (This should not change existing deadline). - dag_run.update_state(session=session) - - deadline = session.execute(select(Deadline)).scalars().one_or_none() - assert deadline.deadline_time == first_deadline_time - - @mock.patch.object(Deadline, "prune_deadlines") - def test_dagrun_deadline_variable_interval_missing_variable_fails(self, _, session, deadline_test_dag): - - mock_err = mock.Mock() - mock_err.error.value = "MISSING_DEADLINE" - mock_err.detail = "missing deadline" - - with mock.patch.object( - Variable, - "get", - side_effect=AirflowRuntimeError(mock_err), - ): - future_date = datetime.datetime.now() + datetime.timedelta(days=365) - - scheduler_dag = deadline_test_dag( - deadline=DeadlineAlert( - reference=DeadlineReference.FIXED_DATETIME(future_date), - interval=VariableInterval("missing_key"), - callback=AsyncCallback(empty_callback_for_deadline), - ), - ) - - with pytest.raises(ValueError, match="not found"): - self.create_dag_run( - dag=scheduler_dag, - task_states={"task_1": TaskInstanceState.SUCCESS}, - session=session, - ) - @pytest.mark.parametrize( ("run_type", "expected_tis"), @@ -3311,7 +3231,7 @@ def printx(x): session.commit() for table in [TaskInstanceNote, TaskReschedule, XComModel]: assert session.scalar(select(func.count()).select_from(table)) == 1 - dr1.task_instance_scheduling_decisions(session=session) + dr1.task_instance_scheduling_decisions(session) for table in [TaskInstanceNote, TaskReschedule, XComModel]: assert session.scalar(select(func.count()).select_from(table)) == 0 @@ -3541,7 +3461,7 @@ def make_task(task_id, dag): reduce(lambda x, y: x >> y, tasks) dr = dag_maker.create_dagrun() - tis = dr.task_instance_scheduling_decisions(session=session).tis + tis = dr.task_instance_scheduling_decisions(session).tis tis_for_state = {x.task_id for x in dr._tis_for_dagrun_state(dag=dag, tis=tis)} assert tis_for_state == expected @@ -3964,23 +3884,3 @@ def test_emit_dagrun_span_with_none_or_empty_carrier(self, dag_maker, session, c assert spans[0].name == f"dag_run.{dr.dag_id}" else: assert len(spans) == 0 - - @pytest.mark.db_test - def test_context_carrier_includes_detail_level_from_conf(self, dag_maker): - """DagRun created with TASK_SPAN_DETAIL_LEVEL_KEY in conf should encode the level in trace state.""" - from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator - - from airflow._shared.observability.traces import ( - TASK_SPAN_DETAIL_LEVEL_KEY, - get_task_span_detail_level, - ) - - with dag_maker("test_tracing_detail_level"): - EmptyOperator(task_id="t1") - dr = dag_maker.create_dagrun(conf={TASK_SPAN_DETAIL_LEVEL_KEY: 2}) - - ctx = TraceContextTextMapPropagator().extract(dr.context_carrier) - from opentelemetry import trace - - span = trace.get_current_span(ctx) - assert get_task_span_detail_level(span) == 2 From e647963701c2b70ac8cfe9b2fd25ccf9732e5761 Mon Sep 17 00:00:00 2001 From: Nataneljpwd Date: Mon, 1 Jun 2026 22:08:52 +0300 Subject: [PATCH 13/15] move the create_dagruns to a fixture --- airflow-core/tests/unit/models/test_dagrun.py | 86 ++++++++----------- 1 file changed, 37 insertions(+), 49 deletions(-) diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index 4250a93bdb583..a309d37bb7611 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -101,6 +101,35 @@ def dagbag(): return DagBag(include_examples=True) +@pytest.fixture +def create_dagruns(): + def _create_dagruns( + dag_maker, + session, + last_scheduling_decision: datetime.datetime | None = None, + count: int = 20, + ): + dagrun = dag_maker.create_dagrun( + run_type=DagRunType.SCHEDULED, + state=State.RUNNING, + run_after=datetime.datetime(2024, 1, 1), + ) + dagrun.last_scheduling_decision = last_scheduling_decision + session.merge(dagrun) + for _ in range(count - 1): + dagrun = dag_maker.create_dagrun_after( + dagrun, + run_type=DagRunType.SCHEDULED, + state=State.RUNNING, + run_after=datetime.datetime(2024, 1, 1), + ) + + dagrun.last_scheduling_decision = last_scheduling_decision + session.merge(dagrun) + + return _create_dagruns + + @pytest.fixture def deadline_test_dag(session): """Fixture that creates and syncs a basic DAG with two tasks.""" @@ -1005,6 +1034,7 @@ def test_get_running_dag_runs_ignores_new_dagruns_to_examine_when_smaller_than_0 self, session, dag_maker, + create_dagruns, monkeypatch, new_dagruns_to_examine, ): @@ -1014,28 +1044,6 @@ def test_get_running_dag_runs_ignores_new_dagruns_to_examine_when_smaller_than_0 new_dagruns_to_examine, ) - def create_dagruns( - last_scheduling_decision: datetime.datetime | None = None, - count: int = 20, - ): - dagrun = dag_maker.create_dagrun( - run_type=DagRunType.SCHEDULED, - state=State.RUNNING, - run_after=datetime.datetime(2024, 1, 1), - ) - dagrun.last_scheduling_decision = last_scheduling_decision - session.merge(dagrun) - for _ in range(count - 1): - dagrun = dag_maker.create_dagrun_after( - dagrun, - run_type=DagRunType.SCHEDULED, - state=State.RUNNING, - run_after=datetime.datetime(2024, 1, 1), - ) - - dagrun.last_scheduling_decision = last_scheduling_decision - session.merge(dagrun) - with dag_maker( dag_id="dummy_dag", schedule=datetime.timedelta(days=1), @@ -1044,7 +1052,7 @@ def create_dagruns( ): EmptyOperator(task_id="dummy_task") - create_dagruns(None, 10) + create_dagruns(dag_maker, session, None, 10) with dag_maker( dag_id="dummy_dag2", @@ -1054,7 +1062,7 @@ def create_dagruns( ): EmptyOperator(task_id="dummy_task2") - create_dagruns(timezone.utcnow(), 20) + create_dagruns(dag_maker, session, timezone.utcnow(), 20) session.flush() @@ -1064,31 +1072,11 @@ def create_dagruns( assert len([dagrun for dagrun in dagruns if dagrun.last_scheduling_decision is not None]) == 10 - def test_get_running_dag_runs_with_max_new_dagruns_to_examine(self, session, dag_maker, monkeypatch): + def test_get_running_dag_runs_with_max_new_dagruns_to_examine( + self, session, dag_maker, create_dagruns, monkeypatch + ): monkeypatch.setattr(DagRun, "DEFAULT_NEW_DAGRUNS_TO_EXAMINE", 10) - def create_dagruns( - last_scheduling_decision: datetime.datetime | None = None, - count: int = 20, - ): - dagrun = dag_maker.create_dagrun( - run_type=DagRunType.SCHEDULED, - state=State.RUNNING, - run_after=datetime.datetime(2024, 1, 1), - ) - dagrun.last_scheduling_decision = last_scheduling_decision - session.merge(dagrun) - for _ in range(count - 1): - dagrun = dag_maker.create_dagrun_after( - dagrun, - run_type=DagRunType.SCHEDULED, - state=State.RUNNING, - run_after=datetime.datetime(2024, 1, 1), - ) - - dagrun.last_scheduling_decision = last_scheduling_decision - session.merge(dagrun) - with dag_maker( dag_id="dummy_dag", schedule=datetime.timedelta(days=1), @@ -1097,7 +1085,7 @@ def create_dagruns( ): EmptyOperator(task_id="dummy_task") - create_dagruns(None) + create_dagruns(dag_maker, session, None) with dag_maker( dag_id="dummy_dag2", @@ -1107,7 +1095,7 @@ def create_dagruns( ): EmptyOperator(task_id="dummy_task2") - create_dagruns(func.now()) + create_dagruns(dag_maker, session, timezone.utcnow()) session.flush() From f5772670bf993941690f55535fcac0551acd4f7a Mon Sep 17 00:00:00 2001 From: Nataneljpwd Date: Sun, 7 Jun 2026 20:57:15 +0300 Subject: [PATCH 14/15] fixed failing tests --- airflow-core/tests/unit/models/test_dagrun.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index 9c8e1a3f36a98..c2470adec3f08 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -210,7 +210,7 @@ def create_dag_run( ti = dag_run.get_task_instance(task_id) if TYPE_CHECKING: assert ti - ti.set_state(task_state, session) + ti.set_state(task_state, session=session) session.flush() return dag_run @@ -814,7 +814,7 @@ def test_get_latest_runs(self, dag_maker, session): ... self.create_dag_run(dag, logical_date=timezone.datetime(2015, 1, 1), session=session) self.create_dag_run(dag, logical_date=timezone.datetime(2015, 1, 2), session=session) - dagruns = DagRun.get_latest_runs(session) + dagruns = DagRun.get_latest_runs(session=session) session.close() for dagrun in dagruns: if dagrun.dag_id == "test_latest_runs_1": @@ -1241,7 +1241,7 @@ def test_emit_scheduling_delay(self, session, schedule, expected, testing_dag_bu session=session, ) ti = dag_run.get_task_instance(dag_task.task_id, session) - ti.set_state(TaskInstanceState.SUCCESS, session) + ti.set_state(TaskInstanceState.SUCCESS, session=session) session.flush() with mock.patch("airflow._shared.observability.metrics.stats.timing") as stats_mock: @@ -1329,7 +1329,7 @@ def test_emit_first_task_start_delay(self, session, queued_at_offset, expected, ) dag_run.queued_at = queued_at ti = dag_run.get_task_instance(dag_task.task_id, session) - ti.set_state(TaskInstanceState.SUCCESS, session) + ti.set_state(TaskInstanceState.SUCCESS, session=session) ti.start_date = ti_start_date session.flush() From 177ee0b91a5966c9db73bdbdeed11aed3a040b10 Mon Sep 17 00:00:00 2001 From: Nataneljpwd Date: Sun, 7 Jun 2026 22:03:12 +0300 Subject: [PATCH 15/15] fixed failing tests --- airflow-core/tests/unit/models/test_dagrun.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index c2470adec3f08..ebe866d5a53f0 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -1240,7 +1240,7 @@ def test_emit_scheduling_delay(self, session, schedule, expected, testing_dag_bu triggered_by=DagRunTriggeredByType.TEST, session=session, ) - ti = dag_run.get_task_instance(dag_task.task_id, session) + ti = dag_run.get_task_instance(dag_task.task_id, session=session) ti.set_state(TaskInstanceState.SUCCESS, session=session) session.flush() @@ -1328,7 +1328,7 @@ def test_emit_first_task_start_delay(self, session, queued_at_offset, expected, session=session, ) dag_run.queued_at = queued_at - ti = dag_run.get_task_instance(dag_task.task_id, session) + ti = dag_run.get_task_instance(dag_task.task_id, session=session) ti.set_state(TaskInstanceState.SUCCESS, session=session) ti.start_date = ti_start_date session.flush() @@ -3219,7 +3219,7 @@ def printx(x): session.commit() for table in [TaskInstanceNote, TaskReschedule, XComModel]: assert session.scalar(select(func.count()).select_from(table)) == 1 - dr1.task_instance_scheduling_decisions(session) + dr1.task_instance_scheduling_decisions(session=session) for table in [TaskInstanceNote, TaskReschedule, XComModel]: assert session.scalar(select(func.count()).select_from(table)) == 0 @@ -3449,7 +3449,7 @@ def make_task(task_id, dag): reduce(lambda x, y: x >> y, tasks) dr = dag_maker.create_dagrun() - tis = dr.task_instance_scheduling_decisions(session).tis + tis = dr.task_instance_scheduling_decisions(session=session).tis tis_for_state = {x.task_id for x in dr._tis_for_dagrun_state(dag=dag, tis=tis)} assert tis_for_state == expected