From c9579132a009b5c2647a5f4b0b419168af74e439 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sun, 22 Mar 2026 18:02:52 +0100 Subject: [PATCH 1/4] Add XCom Push Support for Triggerer --- airflow-core/src/airflow/models/trigger.py | 8 +++ airflow-core/src/airflow/triggers/base.py | 15 ++++- .../tests/unit/models/test_trigger.py | 65 +++++++++++++++++++ 3 files changed, 86 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/models/trigger.py b/airflow-core/src/airflow/models/trigger.py index da78eede343dd..49d5f39188716 100644 --- a/airflow-core/src/airflow/models/trigger.py +++ b/airflow-core/src/airflow/models/trigger.py @@ -460,6 +460,9 @@ def handle_event_submit(event: TriggerEvent, *, task_instance: TaskInstance, ses as well as its state to scheduled. It also adds the event's payload into the kwargs for the task. + If the event includes XCom values, they are pushed to the task instance + before the task is rescheduled. + :param task_instance: The task instance to handle the submit event for. :param session: The session to be used for the database callback sink. """ @@ -486,6 +489,11 @@ def handle_event_submit(event: TriggerEvent, *, task_instance: TaskInstance, ses # re-serialize the entire dict using serde to ensure consistent structure task_instance.next_kwargs = serialize(next_kwargs) + # Push XCom values if provided by the trigger + if event.xcoms: + for key, value in event.xcoms.items(): + task_instance.xcom_push(key=key, value=value, session=session) + # Remove ourselves as its trigger task_instance.trigger_id = None diff --git a/airflow-core/src/airflow/triggers/base.py b/airflow-core/src/airflow/triggers/base.py index 416558242b8a0..2238c2bde030a 100644 --- a/airflow-core/src/airflow/triggers/base.py +++ b/airflow-core/src/airflow/triggers/base.py @@ -168,8 +168,19 @@ class TriggerEvent(BaseModel): Must be natively JSON-serializable, or registered with the airflow serialization code. """ - def __init__(self, payload, **kwargs): - super().__init__(payload=payload, **kwargs) + xcoms: dict[str, JsonValue] | None = None + """ + Optional XCom values to persist when the event is processed. + + If provided, these XCom key-value pairs will be pushed to the task instance + before the task is rescheduled. This allows triggers to write XCom values + directly without requiring the task to resume on a worker first. + + Keys are XCom keys and values must be JSON-serializable. + """ + + def __init__(self, payload, *, xcoms: dict[str, JsonValue] | None = None, **kwargs): + super().__init__(payload=payload, xcoms=xcoms, **kwargs) def __repr__(self) -> str: return f"TriggerEvent<{self.payload!r}>" diff --git a/airflow-core/tests/unit/models/test_trigger.py b/airflow-core/tests/unit/models/test_trigger.py index dfd0f2e99cd0a..68cfc90418e9a 100644 --- a/airflow-core/tests/unit/models/test_trigger.py +++ b/airflow-core/tests/unit/models/test_trigger.py @@ -919,3 +919,68 @@ def test_kwargs_not_encrypted(): assert trigger.kwargs["param1"] == "value1" assert trigger.kwargs["param2"] == "value2" + + +def test_submit_event_with_xcoms(session, create_task_instance): + """ + Tests that events with xcoms submitted to a trigger push XCom values + to the task instance before rescheduling it. + """ + trigger = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={}) + session.add(trigger) + task_instance = create_task_instance( + session=session, logical_date=timezone.utcnow(), state=State.DEFERRED + ) + task_instance.trigger_id = trigger.id + task_instance.next_kwargs = {} + session.commit() + + event = TriggerEvent("payload", xcoms={"return_value": {"key": "value"}, "extra_key": 42}) + Trigger.submit_event(trigger.id, event, session=session) + session.flush() + + session.refresh(task_instance) + assert task_instance.state == State.SCHEDULED + + xcoms = session.scalars( + XComModel.get_many( + dag_ids=[task_instance.dag_id], + task_ids=[task_instance.task_id], + run_id=task_instance.run_id, + ) + ).all() + actual_xcoms = {x.key: x.value for x in xcoms} + assert actual_xcoms == { + "return_value": json.dumps({"key": "value"}), + "extra_key": json.dumps(42), + } + + +def test_submit_event_without_xcoms_does_not_push(session, create_task_instance): + """ + Tests that events without xcoms don't push any XCom values. + """ + trigger = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={}) + session.add(trigger) + task_instance = create_task_instance( + session=session, logical_date=timezone.utcnow(), state=State.DEFERRED + ) + task_instance.trigger_id = trigger.id + task_instance.next_kwargs = {} + session.commit() + + event = TriggerEvent("payload") + Trigger.submit_event(trigger.id, event, session=session) + session.flush() + + session.refresh(task_instance) + assert task_instance.state == State.SCHEDULED + + xcoms = session.scalars( + XComModel.get_many( + dag_ids=[task_instance.dag_id], + task_ids=[task_instance.task_id], + run_id=task_instance.run_id, + ) + ).all() + assert len(xcoms) == 0 From a4495a0ef500b0cc048e790f7fd162c68d9afbf0 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sun, 22 Mar 2026 22:16:16 +0100 Subject: [PATCH 2/4] Add docs --- airflow-core/docs/authoring-and-scheduling/deferring.rst | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airflow-core/docs/authoring-and-scheduling/deferring.rst b/airflow-core/docs/authoring-and-scheduling/deferring.rst index 7d2bc648e229c..6099ec9dd6159 100644 --- a/airflow-core/docs/authoring-and-scheduling/deferring.rst +++ b/airflow-core/docs/authoring-and-scheduling/deferring.rst @@ -445,12 +445,14 @@ Triggers can have two options: they can either send execution back to the worker async def run(self) -> AsyncIterator[TriggerEvent]: await asyncio.sleep(self.duration.total_seconds()) if self.end_from_trigger: - yield TaskSuccessEvent() + yield TaskSuccessEvent(xcoms={"wait_duration_s": self.duration.total_seconds()}) else: yield TriggerEvent({"duration": self.duration}) In the above example, the trigger will end the task instance directly if ``end_from_trigger`` is set to ``True`` by yielding ``TaskSuccessEvent``. Otherwise, it will resume the task instance with the method specified in the operator. +Note also that in case of direct exit, an XCom can be produced and passed with the ``TaskSuccessEvent`` or ``TaskFailureEvent``. This XCom will be pushed when the task instance is marked as success or failure. + .. note:: Exiting from the trigger works only when listeners are not integrated for the deferrable operator. Currently, when deferrable operator has the ``end_from_trigger`` attribute set to ``True`` and listeners are integrated it raises an exception during parsing to indicate this limitation. While writing the custom trigger, ensure that the trigger is not set to end the task instance directly if the listeners are added from plugins. If the ``end_from_trigger`` attribute is changed to different attribute by author of trigger, the Dag parsing would not raise any exception and the listeners dependent on this task would not work. This limitation will be addressed in future releases. From e9085ea05e0155a80d0228fad45b3f203dc32b18 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sun, 22 Mar 2026 22:30:00 +0100 Subject: [PATCH 3/4] Log a warning if XCom is sent to a non TI --- airflow-core/src/airflow/models/trigger.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/airflow-core/src/airflow/models/trigger.py b/airflow-core/src/airflow/models/trigger.py index 49d5f39188716..1f9a0eeba765a 100644 --- a/airflow-core/src/airflow/models/trigger.py +++ b/airflow-core/src/airflow/models/trigger.py @@ -273,6 +273,12 @@ def submit_event(cls, trigger_id, event: TriggerEvent, session: Session = NEW_SE handle_event_submit(event, task_instance=task_instance, session=session) # Send an event to assets + if event.xcoms: + log.warning( + "Trigger event %i contains XCom values, which cannot be sent to assets or callbacks. XCom values: %s", + trigger_id, + event.xcoms, + ) trigger = session.scalars(select(cls).where(cls.id == trigger_id)).one_or_none() if trigger is None: # Already deleted for some reason From b5acf20d5731bae88e0582906f158a1b0b2dd716 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sun, 22 Mar 2026 22:33:21 +0100 Subject: [PATCH 4/4] Add warning to pydoc as well --- airflow-core/src/airflow/triggers/base.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/airflow-core/src/airflow/triggers/base.py b/airflow-core/src/airflow/triggers/base.py index 2238c2bde030a..474d55f96131c 100644 --- a/airflow-core/src/airflow/triggers/base.py +++ b/airflow-core/src/airflow/triggers/base.py @@ -177,6 +177,10 @@ class TriggerEvent(BaseModel): directly without requiring the task to resume on a worker first. Keys are XCom keys and values must be JSON-serializable. + + Note: XCom only applies to task instances, not to assets or callbacks. If a trigger event + with XCom values is sent to an asset or callback, the XCom values will be ignored and a + warning will be logged. """ def __init__(self, payload, *, xcoms: dict[str, JsonValue] | None = None, **kwargs):