Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/newsfragments/68741.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Ignore stale executor success when a deferred task is rescheduled into QUEUED before the scheduler processes the stale SUCCESS from the worker.
8 changes: 5 additions & 3 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1433,6 +1433,8 @@ def process_executor_events(
# or the TI is queued by another job. Either ways we should not fail it.
# 3) the trigger already put the TI back to scheduled (resume after defer) but the executor success
# from the worker exit after defer() has not been processed yet - should not fail it.
# 4) the trigger already put the TI back to queued (resume after defer) but the executor success
# from the worker exit after defer() has not been processed yet - should not fail it.

# All of this could also happen if the state is "running",
# but that is handled by the scheduler detecting task instances without heartbeats.
Expand All @@ -1447,9 +1449,9 @@ def process_executor_events(
ti.queued_by_job_id != job_id # Another scheduler has queued this task again
or executor.has_task(ti) # This scheduler has this task already
or (
# Resume-after-defer: trigger moved TI to scheduled (next_method set) before we saw the
# executor success from the defer exit for the same try_number.
ti.state == TaskInstanceState.SCHEDULED
# Resume-after-defer: trigger moved TI to scheduled or queued (next_method set)
# before we saw the executor success from the defer exit for the same try_number.
ti.state in (TaskInstanceState.SCHEDULED, TaskInstanceState.QUEUED)
and state == TaskInstanceState.SUCCESS
and ti.next_method is not None
)
Expand Down
60 changes: 60 additions & 0 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,66 @@ def test_process_executor_events_stale_success_when_scheduled_after_defer(
tags={"dag_id": dag_id, "task_id": ti1.task_id},
)

@mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
@mock.patch("airflow._shared.observability.metrics.stats._get_backend")
def test_process_executor_events_stale_success_when_queued_after_defer(
self, mock_get_backend, mock_task_callback, dag_maker
):
"""
Trigger moved TI to queued (resume after defer) before executor success from defer exit arrived.

Regression for https://github.com/apache/airflow/issues/67287 — must not treat as state mismatch.
The fix for #66374 (#66431) covered the scheduled-state variant; this covers the queued-state variant.
"""
mock_stats = mock.MagicMock(spec=StatsLogger)
mock_get_backend.return_value = mock_stats
dag_id = "test_process_executor_events_stale_success_queued_after_defer"
task_id_1 = "dummy_task"

session = settings.Session()
with dag_maker(dag_id=dag_id, fileloc="/test_path1/"):
task1 = EmptyOperator(task_id=task_id_1)
ti1 = dag_maker.create_dagrun().get_task_instance(task1.task_id)

executor = MockExecutor(do_update=False)
task_callback = mock.MagicMock()
mock_task_callback.return_value = task_callback
scheduler_job = Job()
session.add(scheduler_job)
session.flush()
self.job_runner = SchedulerJobRunner(scheduler_job, executors=[executor])

ti1.state = State.QUEUED
ti1.next_method = "execute_callback"
ti1.queued_by_job_id = scheduler_job.id
ti1.try_number = 1
session.merge(ti1)
session.commit()

executor.event_buffer[ti1.key] = State.SUCCESS, None
executor.has_task = mock.MagicMock(return_value=False)
mock_stats.incr.reset_mock()

self.job_runner._process_executor_events(executor=executor, session=session)
ti1.refresh_from_db(session=session)
assert ti1.state == State.QUEUED
self.job_runner.executor.callback_sink.send.assert_not_called()
mock_stats.incr.assert_not_called()

# Without next_method, queued + stale success is still a mismatch (e.g. external kill).
ti1.next_method = None
session.merge(ti1)
session.commit()

executor.event_buffer[ti1.key] = State.SUCCESS, None
mock_stats.incr.reset_mock()

self.job_runner._process_executor_events(executor=executor, session=session)
mock_stats.incr.assert_any_call(
"scheduler.tasks.killed_externally",
tags={"dag_id": dag_id, "task_id": ti1.task_id},
)

@mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
@mock.patch("airflow._shared.observability.metrics.stats._get_backend")
def test_process_executor_events_multiple_try_numbers_warns(
Expand Down