Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Added example of _push_xcoms_if_necessary to deferring.rst #45158

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
38 changes: 38 additions & 0 deletions docs/apache-airflow/authoring-and-scheduling/deferring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,44 @@ In the above example, the trigger will end the task instance directly if ``end_f
.. note::
Exiting from the trigger works only when listeners are not integrated for the deferrable operator. Currently, when deferrable operator has the ``end_from_trigger`` attribute set to ``True`` and listeners are integrated it raises an exception during parsing to indicate this limitation. While writing the custom trigger, ensure that the trigger is not set to end the task instance directly if the listeners are added from plugins. If the ``end_from_trigger`` attribute is changed to different attribute by author of trigger, the DAG parsing would not raise any exception and the listeners dependent on this task would not work. This limitation will be addressed in future releases.

Handling XComs for Deferred Tasks
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

When working with deferred tasks that exit directly from triggers, you may need to push XCom values for subsequent tasks in the pipeline. The method ``_push_xcoms_if_necessary`` is responsible for pushing these values. Below is an example of how this can be implemented:
avyuktsoni0731 marked this conversation as resolved.
Show resolved Hide resolved

.. code-block:: python

def _push_xcoms_if_necessary(self, *, task_instance: TaskInstance) -> None:
"""
Push XComs if required based on the task's state and the provided events.
"""
if task_instance.state == TaskInstanceState.SUCCESS:
task_instance.xcom_push(
key="result", value={"status": "success", "message": "Task completed successfully"}
)
elif task_instance.state == TaskInstanceState.FAILED:
task_instance.xcom_push(key="result", value={"status": "failure", "message": "Task failed"})

You can call this method within your trigger to manage XComs when the task instance ends directly from the trigger. Here's an example:

.. code-block:: python

class WaitFiveHourTrigger(BaseTrigger):
def __init__(self, duration: timedelta, *, end_from_trigger: bool = False):
super().__init__()
self.duration = duration
self.end_from_trigger = end_from_trigger

async def run(self) -> AsyncIterator[TriggerEvent]:
await asyncio.sleep(self.duration.total_seconds())
if self.end_from_trigger:
task_instance = ... # Get the relevant TaskInstance
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I am not sure how to get the task instance here . I am not sure if you can do it automatically ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, let me look into this again, and try to make the necessary changes that @tirkarthi suggested.

self._push_xcoms_if_necessary(task_instance=task_instance)
yield TaskSuccessEvent()
else:
yield TriggerEvent({"duration": self.duration})

In this example, XCom values are pushed to store additional information about the task's result, which downstream tasks can retrieve.

High Availability
-----------------
Expand Down