Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add start execution from triggerer support to dynamic task mapping #39912

Merged

Conversation

Lee-W
Copy link
Member

@Lee-W Lee-W commented May 29, 2024

This is a follow-up PR of #39585, which enables the mapped operator to start execution from triggerer

Why

Dynamic task mapping is not yet supported in #38674 and #39585.

What

Allow DAG authors to provide argument trigger_kwargs and start_from_trigger in __init__ which can be used in expand.

from __future__ import annotations

from datetime import timedelta
from typing import Any

from airflow.models.baseoperator import BaseOperator, StartTriggerArgs
from airflow.triggers.temporal import TimeDeltaTrigger


class AsyncOperator(BaseOperator):
    start_trigger_args = StartTriggerArgs(
        trigger_cls="airflow.triggers.testing.SuccessTrigger",
        trigger_kwargs={},
        next_method="execute_complete",
        timeout=None,
    )
    start_from_trigger = True

    def __init__(self, *args, start_from_trigger: bool, trigger_kwargs=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.start_from_trigger = start_from_trigger
        self.start_trigger_args.trigger_kwargs = trigger_kwargs

    def execute(self, context):
        self.log.info("execute")

    def execute_complete(self, context, event=None) -> None:
        self.log.info("execute complete")
from __future__ import annotations

import datetime

import pendulum

from airflow import DAG
from airflow.operators.async_op import AsyncOperator

with DAG(
    dag_id="example_async_operator",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    dagrun_timeout=datetime.timedelta(minutes=10),
) as dag:
    add_one_task = AsyncOperator.partial(task_id="add_one").expand(
        trigger_kwargs=[{}, {}], start_from_trigger=[True, False]
    )

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@Lee-W
Copy link
Member Author

Lee-W commented May 29, 2024

The code here is not yet well structured but has been tested locally and seems to work fine. It would be great if I could get some reviews and suggestions. Thanks!

@Lee-W Lee-W force-pushed the add-start-with-trigger-support-to-mapped-op branch 7 times, most recently from 245c685 to 4ff77db Compare May 31, 2024 10:25
@Lee-W
Copy link
Member Author

Lee-W commented May 31, 2024

This PR is ready to be reviewed but blocked by #39585 so I'll keep it as draft for now.

@Lee-W Lee-W force-pushed the add-start-with-trigger-support-to-mapped-op branch 5 times, most recently from 590b172 to e1f7aee Compare June 6, 2024 01:17
@Lee-W Lee-W force-pushed the add-start-with-trigger-support-to-mapped-op branch 2 times, most recently from 18f4ddc to 403ced5 Compare June 11, 2024 11:15
@Lee-W Lee-W changed the title Add start_with_trigger support to mapped op Add start execution from triggerer support to dynamic task mapping Jun 11, 2024
@Lee-W Lee-W marked this pull request as ready for review June 11, 2024 11:15
airflow/models/dagrun.py Outdated Show resolved Hide resolved
@Lee-W Lee-W force-pushed the add-start-with-trigger-support-to-mapped-op branch 2 times, most recently from f0343d2 to e30bb4c Compare June 13, 2024 14:36
@Lee-W Lee-W force-pushed the add-start-with-trigger-support-to-mapped-op branch from 0c1f617 to 50a001c Compare July 17, 2024 09:16
Copy link
Member

@uranusjr uranusjr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking how we could get rid of the very out-of-place isinstance calls. The main problem is we don’t need context for the non-mapped case. Looking into the implementation though, the resolution process really only needs ti and run_id, and getting both are basically free at those parts of the code. So I would probably do

  1. This this PR, create an interface in AbstractOperator (say expand_start_from_trigger) that takes context and session. The BaseOperator implementation will simply return self.start_from_trigger.
  2. If this is fine performance-wise, just call it a day.
  3. Assuming we want to optimise, create a new PR to only pass in {"run_id": self.run_id, "ti": ti} instead with a comment explaining why (performance consideration, and reasons this should be enough).

airflow/models/dagrun.py Outdated Show resolved Hide resolved
airflow/models/mappedoperator.py Outdated Show resolved Hide resolved
airflow/models/taskinstance.py Outdated Show resolved Hide resolved
@uranusjr
Copy link
Member

Probably shouldn’t be an approval (the exception class likely needs discussion, and I would prefer the function in AbstractOperator to be a part of this PR), but close enough.

@Lee-W Lee-W force-pushed the add-start-with-trigger-support-to-mapped-op branch 3 times, most recently from 1671692 to 06c45f4 Compare July 22, 2024 01:32
@Lee-W
Copy link
Member Author

Lee-W commented Jul 22, 2024

The latest comments from @uranusjr have been addressed. I'll go ahead and merge this one.

Lee-W added 18 commits July 22, 2024 15:49
…art_from_trigger and _expand_start_trigger_args
…ct operator to unify how base op and mapped op retrieve start_from_trigger
…ct operator to unify how base op and mapped op retrieve start_trigger_args
…d validate start_trigger_args passed to _defer_task" to TaskDeferralError
@Lee-W Lee-W force-pushed the add-start-with-trigger-support-to-mapped-op branch from 06c45f4 to 636265e Compare July 22, 2024 07:49
@Lee-W Lee-W merged commit 85b2666 into apache:main Jul 22, 2024
48 checks passed
@Lee-W Lee-W deleted the add-start-with-trigger-support-to-mapped-op branch July 22, 2024 08:29
@ephraimbuddy ephraimbuddy added the type:new-feature Changelog: New Features label Jul 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants