diff --git a/airflow-core/newsfragments/68741.bugfix.rst b/airflow-core/newsfragments/68741.bugfix.rst new file mode 100644 index 0000000000000..897eee4180a09 --- /dev/null +++ b/airflow-core/newsfragments/68741.bugfix.rst @@ -0,0 +1 @@ +Ignore stale executor success when a deferred task is rescheduled into QUEUED before the scheduler processes the stale SUCCESS from the worker. diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 3c5d71e9a3d02..72641af12b6f6 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -1433,6 +1433,8 @@ def process_executor_events( # or the TI is queued by another job. Either ways we should not fail it. # 3) the trigger already put the TI back to scheduled (resume after defer) but the executor success # from the worker exit after defer() has not been processed yet - should not fail it. + # 4) the trigger already put the TI back to queued (resume after defer) but the executor success + # from the worker exit after defer() has not been processed yet - should not fail it. # All of this could also happen if the state is "running", # but that is handled by the scheduler detecting task instances without heartbeats. @@ -1447,9 +1449,9 @@ def process_executor_events( ti.queued_by_job_id != job_id # Another scheduler has queued this task again or executor.has_task(ti) # This scheduler has this task already or ( - # Resume-after-defer: trigger moved TI to scheduled (next_method set) before we saw the - # executor success from the defer exit for the same try_number. - ti.state == TaskInstanceState.SCHEDULED + # Resume-after-defer: trigger moved TI to scheduled or queued (next_method set) + # before we saw the executor success from the defer exit for the same try_number. + ti.state in (TaskInstanceState.SCHEDULED, TaskInstanceState.QUEUED) and state == TaskInstanceState.SUCCESS and ti.next_method is not None ) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index f905fccb7341e..eceb7be630551 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -923,6 +923,66 @@ def test_process_executor_events_stale_success_when_scheduled_after_defer( tags={"dag_id": dag_id, "task_id": ti1.task_id}, ) + @mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest") + @mock.patch("airflow._shared.observability.metrics.stats._get_backend") + def test_process_executor_events_stale_success_when_queued_after_defer( + self, mock_get_backend, mock_task_callback, dag_maker + ): + """ + Trigger moved TI to queued (resume after defer) before executor success from defer exit arrived. + + Regression for https://github.com/apache/airflow/issues/67287 — must not treat as state mismatch. + The fix for #66374 (#66431) covered the scheduled-state variant; this covers the queued-state variant. + """ + mock_stats = mock.MagicMock(spec=StatsLogger) + mock_get_backend.return_value = mock_stats + dag_id = "test_process_executor_events_stale_success_queued_after_defer" + task_id_1 = "dummy_task" + + session = settings.Session() + with dag_maker(dag_id=dag_id, fileloc="/test_path1/"): + task1 = EmptyOperator(task_id=task_id_1) + ti1 = dag_maker.create_dagrun().get_task_instance(task1.task_id) + + executor = MockExecutor(do_update=False) + task_callback = mock.MagicMock() + mock_task_callback.return_value = task_callback + scheduler_job = Job() + session.add(scheduler_job) + session.flush() + self.job_runner = SchedulerJobRunner(scheduler_job, executors=[executor]) + + ti1.state = State.QUEUED + ti1.next_method = "execute_callback" + ti1.queued_by_job_id = scheduler_job.id + ti1.try_number = 1 + session.merge(ti1) + session.commit() + + executor.event_buffer[ti1.key] = State.SUCCESS, None + executor.has_task = mock.MagicMock(return_value=False) + mock_stats.incr.reset_mock() + + self.job_runner._process_executor_events(executor=executor, session=session) + ti1.refresh_from_db(session=session) + assert ti1.state == State.QUEUED + self.job_runner.executor.callback_sink.send.assert_not_called() + mock_stats.incr.assert_not_called() + + # Without next_method, queued + stale success is still a mismatch (e.g. external kill). + ti1.next_method = None + session.merge(ti1) + session.commit() + + executor.event_buffer[ti1.key] = State.SUCCESS, None + mock_stats.incr.reset_mock() + + self.job_runner._process_executor_events(executor=executor, session=session) + mock_stats.incr.assert_any_call( + "scheduler.tasks.killed_externally", + tags={"dag_id": dag_id, "task_id": ti1.task_id}, + ) + @mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest") @mock.patch("airflow._shared.observability.metrics.stats._get_backend") def test_process_executor_events_multiple_try_numbers_warns(