Skip to content

Add a standard toggle for resumability to ResumableJobMixin#68623

Open
amoghrajesh wants to merge 8 commits into
apache:mainfrom
astronomer:resumable-mixin-reconnect-flag
Open

Add a standard toggle for resumability to ResumableJobMixin#68623
amoghrajesh wants to merge 8 commits into
apache:mainfrom
astronomer:resumable-mixin-reconnect-flag

Conversation

@amoghrajesh

@amoghrajesh amoghrajesh commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

Was generative AI tooling used to co-author this PR?
  • Yes: claude sonnet 4.6

Why

ResumableJobMixin had no standard on/off switch for crash recovery. When SparkSubmitOperator was
ported to resumability, I added a reconnect_on_retry parameter, but any future resumable operator
would need to invent their own name — making the flag inconsistent across operators and impossible to
measure uniformly.

What changed

  • ResumableJobMixin now owns durable: bool = True in its own __init__, decorated with
    @BaseOperatorMeta._apply_defaults. Operators that inherit (ResumableJobMixin, BaseOperator) get
    the flag for free — no redeclaration needed, default_args injection and .partial() work
    automatically.
  • When False, execute_resumable() skips all task_state_store interaction and runs a plain
    submit/poll/result cycle.
  • SparkSubmitOperator.reconnect_on_retry is renamed to durable. The per-mode if/else branching in
    execute() is removed and all three tracking paths (standalone, K8s, YARN) now call
    execute_resumable(context) directly.
  • reconnect_on_retry is kept as a deprecated alias for backcompat with 6.1.0 — passing it raises an
    AirflowProviderDeprecationWarning and maps to durable.

Impact on operators using resumability

No behavior change. durable defaults to True, so crash recovery is on by default exactly as before.

How to opt out

Set durable=False on the task:

SparkSubmitOperator(
    task_id="my_spark_job",
    application="my_app.jar",
    durable=False,
)

Or via default_args to disable for all tasks in a DAG:

with DAG("my_dag", default_args={"durable": False}):

Implementation note

ResumableJobMixin.__init__ is decorated directly with @BaseOperatorMeta._apply_defaults. When
super().__init__(**kwargs) reaches the mixin, apply_defaults fires, sees durable in the
signature, and injects it from default_args if not explicitly set. Operators just need
(ResumableJobMixin, BaseOperator) ordering and super().__init__(**kwargs).


  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

@amoghrajesh

Copy link
Copy Markdown
Contributor Author

Converting to draft as I am unsure if this is the best solution.

@amoghrajesh amoghrajesh marked this pull request as ready for review June 17, 2026 10:00

@ashb ashb left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably worth mentioning :param durable: etc in the doc string of SparkSubmitOperator explicitly?

LGTM I think, but lets get @vikramkoka and @kaxil's view on this.

* The Airflow worker must be able to reach the Kubernetes API server and have permission to
read and delete pods in the driver's namespace; otherwise pod tracking and cleanup will fail.
* Set ``reconnect_on_retry=True`` (the default) to enable crash recovery: the driver pod name is
* Set ``resume_on_retry=True`` (the default) to enable crash recovery: the driver pod name is

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs updating to reflect new name

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, done.

@amoghrajesh amoghrajesh Jun 18, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The import is from airflow.sdk import ResumableJobMixin but the mixin itself lives in task-sdk/src/airflow/sdk/bases/resumablejobmixin.py -- is that import right (as in using the canonical location)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from airflow.sdk import ResumableJobMixin is the right one to use, its exported and documented

@amoghrajesh

amoghrajesh commented Jun 18, 2026

Copy link
Copy Markdown
Contributor Author

It's probably worth mentioning :param durable: etc in the doc string of SparkSubmitOperator explicitly?

Yes it is, mentioned it in handling comments from ash

(cherry picked from commit 546469d70ec3efc373bfa1d73c2f8d8d79b5cd03)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

full tests needed We need to run full set of tests for this PR to merge

Projects

Status: In progress

Development

Successfully merging this pull request may close these issues.

2 participants