Skip to content

fix xcom push not working in SparkKubernetesOperator#52051

Closed
kesem0811 wants to merge 17 commits into
apache:mainfrom
kesem0811:feature/fix_xcom_push_in_SparkKubernetesOperator
Closed

fix xcom push not working in SparkKubernetesOperator#52051
kesem0811 wants to merge 17 commits into
apache:mainfrom
kesem0811:feature/fix_xcom_push_in_SparkKubernetesOperator

Conversation

@kesem0811

@kesem0811 kesem0811 commented Jun 23, 2025

Copy link
Copy Markdown
Contributor

Fixes: #39184 where SparkKubernetesOperator tasks hang indefinitely because no sidecar is injected to read /airflow/xcom/return.json.

@eladkal eladkal requested review from Lee-W and romsharon98 June 24, 2025 04:36
)
self.template_body["spark"]["spec"]= driver_with_xcom_template
except KeyError as e:
raise AirflowException("Spec missing in SparkApplication template") from e

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not use AirflowException, we can create a customized exception or just use KeyError

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?
In all the file, when something doesn't work as expected, an AirflowException is raised

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AirflowException is broad and not infomative

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets switch to KeyError


def add_xcom_sidecar(
pod: k8s.V1Pod,
pod: Union[k8s.V1Pod, dict],

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pod: Union[k8s.V1Pod, dict],
pod: k8s.V1Pod | dict,

)
self.template_body["spark"]["spec"]= driver_with_xcom_template
except KeyError as e:
raise AirflowException("Spec missing in SparkApplication template") from e

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AirflowException is broad and not infomative

@eladkal eladkal requested a review from Lee-W July 30, 2025 12:01
@eladkal eladkal changed the title Feature/fix xcom push in spark kubernetes operator fix xcom push not working in SparkKubernetesOperator Jul 30, 2025
@eladkal

eladkal commented Aug 6, 2025

Copy link
Copy Markdown
Contributor

@kesem0811 can you fix the static checks?

@Lee-W Lee-W left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll fix static checks

…ator

- Resolved merge conflict in spark_kubernetes.py
- Preserved xcom sidecar functionality from local changes
- Integrated upstream improvements including deferrable support and better reattach logic
- Combined both approaches: upstream refactoring + local xcom features
@kesem0811 kesem0811 requested a review from Lee-W September 25, 2025 11:20
- Add proper type checking for dict vs V1Pod inputs in add_xcom_sidecar()
- Convert dict inputs to V1Pod objects to ensure .spec attribute access
- Maintains backward compatibility while fixing type safety
- Resolves: Item 'dict[Any, Any]' has no attribute 'spec' [union-attr]
@kesem0811

kesem0811 commented Sep 25, 2025

Copy link
Copy Markdown
Contributor Author

the static checks failed with the error:
The image /mnt/ci-image-save-v3-linux_amd64-3.10.tar does not exist.
I love to make the tests run again?
@romsharon98 @eladkal @Lee-W

@Lee-W Lee-W left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in general looks good, but we'll need unit tests for this

Comment on lines +33 to +36
VOLUME_MOUNT_NAME = "xcom"
VOLUME_MOUNT = k8s.V1VolumeMount(name=VOLUME_MOUNT_NAME, mount_path=XCOM_MOUNT_PATH)
XCOM_SIDECAR_COMMAND = ["sh", "-c", XCOM_CMD]
VOLUME = k8s.V1Volume(name=VOLUME_MOUNT_NAME, empty_dir=k8s.V1EmptyDirVolumeSource())

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
VOLUME_MOUNT_NAME = "xcom"
VOLUME_MOUNT = k8s.V1VolumeMount(name=VOLUME_MOUNT_NAME, mount_path=XCOM_MOUNT_PATH)
XCOM_SIDECAR_COMMAND = ["sh", "-c", XCOM_CMD]
VOLUME = k8s.V1Volume(name=VOLUME_MOUNT_NAME, empty_dir=k8s.V1EmptyDirVolumeSource())
XCOM_SIDECAR_COMMAND = ["sh", "-c", XCOM_CMD]
VOLUME_MOUNT_NAME = "xcom"
VOLUME_MOUNT = k8s.V1VolumeMount(name=VOLUME_MOUNT_NAME, mount_path=XCOM_MOUNT_PATH)
VOLUME = k8s.V1Volume(name=VOLUME_MOUNT_NAME, empty_dir=k8s.V1EmptyDirVolumeSource())

let's group similar concepts together


def add_xcom_sidecar(
pod: k8s.V1Pod,
pod: k8s.V1Pod | dict,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pod: k8s.V1Pod | dict,
pod: k8s.V1Pod | dict[str, Any],

Comment on lines +78 to +80
def add_sidecar_to_spark_operator_pod_spec(
spec: dict, sidecar_container_image: str | None = None, sidecar_container_resources: dict | None = None
):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def add_sidecar_to_spark_operator_pod_spec(
spec: dict, sidecar_container_image: str | None = None, sidecar_container_resources: dict | None = None
):
def add_sidecar_to_spark_operator_pod_spec(
spec: dict[str, Any], sidecar_container_image: str | None = None, sidecar_container_resources: dict[str, Any] | None = None
) -> dict[str, Any]:

please confirm whether these type are correct. Thanks!

@kunaljubce

Copy link
Copy Markdown
Contributor

@kesem0811 Can you also fix the static checks CI error? Should be autofixed by your prek pre-commit hooks, assuming you have them installed.

@Lee-W

Lee-W commented Jan 4, 2026

Copy link
Copy Markdown
Member

will also need your help resolving the conflict. Thanks!

@potiuk potiuk marked this pull request as draft April 2, 2026 13:07
@potiuk

potiuk commented Apr 2, 2026

Copy link
Copy Markdown
Member

This pull request has had no activity from the author for over 4 weeks. We are converting it to draft to keep the review queue manageable.

@kesem0811, please mark this PR as ready for review when you are ready to continue working on it. Thank you for your contribution!

@potiuk

potiuk commented Apr 26, 2026

Copy link
Copy Markdown
Member

@kesem0811 This draft PR has had no activity for 3 weeks. Closing to keep the queue clean.

You are welcome to reopen and continue when you're ready. If you'd like to pick it back up, please rebase onto the current main branch first.


Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you.

@potiuk potiuk closed this Apr 26, 2026
@kesem0811

Copy link
Copy Markdown
Contributor Author

Hi everyone - sorry for not responding earlier. That’s on me, and I understand why the PR was closed.

I’ve opened a new PR that addresses the review feedback and completes the fix: #68788

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Problem with pushing Xcom when using SparkKubernetesOperator

6 participants