Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Echo task #2654

Merged
merged 11 commits into from
Aug 26, 2024
Merged

Add Echo task #2654

merged 11 commits into from
Aug 26, 2024

Conversation

pingsutw
Copy link
Member

@pingsutw pingsutw commented Aug 5, 2024

Tracking issue

flyteorg/flyte#3533

Why are the changes needed?

Currently, the conditional in Flyte does not support skipping one of the branch without any task execution. This means that we will need to use a noop task to skip the branch. This will lead to a waste of time during execution.

What changes were proposed in this pull request?

Add an echo task inside flytekit, and propeller will use echo plugin to run it without launching a pod.

echo task just simply copy the inputs to outputs and return to the users

How was this patch tested?

import typing
from click.testing import CliRunner

from flytekit import task, workflow, ImageSpec, conditional
from flytekit.clis.sdk_in_container import pyflyte
from flytekit.core.task import Echo

new_flytekit = "git+https://github.com/flyteorg/flytekit.git@5d6e9a0d2882d7484db682e065dfa269d8a02964"

custom_image = ImageSpec(
    registry="ghcr.io/flyteorg",
    packages=[new_flytekit],
    apt_packages=["git"],
)


echo = Echo(name="echo", inputs={"a": float})


@task(container_image=custom_image)
def calculate_circle_circumference(radius: float) -> typing.Optional[float]:
    return 2 * 3.14 * radius  # Task to calculate the circumference of a circle


@task(container_image=custom_image)
def calculate_circle_area(radius: float) -> int:
    return 3.14 * radius * radius  # Task to calculate the area of a circle


@workflow
def wf(radius: float) -> typing.Optional[float]:
    return (
        conditional("shape_properties_with_multiple_branches")
        .if_((radius >= 0.1) & (radius < 1.0), last_case=False)
        .then(calculate_circle_circumference(radius=radius))
        .else_()
        .then(echo(a=radius))
    )


if __name__ == '__main__':
    runner = CliRunner()
    result = runner.invoke(
        pyflyte.main,
        [
            "register",
            "/Users/kevin/git/flytekit/flyte-example/condition.py",
        ]
    )
    print(result.stdout)

Setup process

Enable the echo plugin in the flytepropeller.

tasks:
  task-plugins:
    enabled-plugins:
      - container
      - sidecar
      - K8S-ARRAY
      - agent-service
      - echo                                                                                                                                                                                                                       
      # - ray
    default-for-task-types:
      - container: container
      - container_array: K8S-ARRAY

Screenshots

image

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

NA

Docs link

NA

Signed-off-by: Kevin Su <[email protected]>
Copy link

codecov bot commented Aug 6, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 88.66%. Comparing base (72da0d0) to head (2349cee).
Report is 25 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #2654      +/-   ##
==========================================
+ Coverage   78.77%   88.66%   +9.88%     
==========================================
  Files         187       34     -153     
  Lines       19149     1747   -17402     
  Branches     3993        0    -3993     
==========================================
- Hits        15085     1549   -13536     
+ Misses       3374      198    -3176     
+ Partials      690        0     -690     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@pingsutw pingsutw marked this pull request as draft August 8, 2024 07:39
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
@pingsutw pingsutw marked this pull request as ready for review August 11, 2024 06:57
@pingsutw pingsutw changed the title [WIP] Add Echo task Add Echo task Aug 11, 2024
@pingsutw
Copy link
Member Author

cc @yubofredwang

Copy link
Member

@thomasjpfan thomasjpfan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I run pyflyte run --remote:

import typing

from flytekit import ImageSpec, conditional, task, workflow
from flytekit.core.task import Echo

new_flytekit = "git+https://github.com/flyteorg/flytekit.git@68c6cf91891923c4cc898b2a493d42927035ce32"

custom_image = ImageSpec(
    registry="localhost:30000",
    packages=[new_flytekit],
    apt_packages=["git"],
)


echo1 = Echo(name="echo", inputs={"a": float})


@task(container_image=custom_image)
def t1(radius: float) -> typing.Optional[float]:
    return 2 * 3.14 * radius


@workflow
def wf1(radius: float) -> typing.Optional[float]:
    return (
        conditional("shape_properties_with_multiple_branches")
        .if_((radius >= 0.1) & (radius < 1.0))
        .then(t1(radius=radius))
        .else_()
        .then(echo1(a=radius))
    )

I am getting this error:

Request rejected by the API, due to Invalid input.
RPC Failed, with Status: StatusCode.INVALID_ARGUMENT
        details: failed to compile workflow for [resource_type:WORKFLOW  project:"flytesnacks"  domain:"development"  name:"tjpf_noop.wf1"  version:"bonkFHVywVrbD1ngx7SsDQ"] with err failed to compile workflow with err Collected Errors: 5
        Error 0: Code: MismatchingTypes, Node Id: n0-n1, Description: Variable [o0] (type [union_type:{variants:{simple:FLOAT  structure:{tag:"float"}}  variants:{simple:NONE  structure:{tag:"none"}}}]) doesn't match expected type [simple:FLOAT].
        Error 1: Code: ParameterNotBound, Node Id: end-node, Description: Parameter not bound [o0].
        Error 2: Code: UnreachableNodes, Node Id: start-node, Description: The Workflow contain unreachable nodes [n0].
        Error 3: Code: ValueRequired, Node Id: n0, Description: Value required [.radius].
        Error 4: Code: VariableNameNotFound, Node Id: n0, Description: Variable [o0] not found on node [n0].

flytekit/core/task.py Outdated Show resolved Hide resolved
flytekit/core/task.py Outdated Show resolved Hide resolved
tests/flytekit/unit/core/test_conditions.py Outdated Show resolved Hide resolved
@pingsutw
Copy link
Member Author

sorry, my bad.

echo1 = Echo(name="echo", inputs={"a": float})

it should be

echo1 = Echo(name="echo", inputs={"a": typing.Optional[float]})

Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
@thomasjpfan
Copy link
Member

When I run:

import typing

from flytekit import ImageSpec, conditional, task, workflow
from flytekit.core.task import Echo

new_flytekit = "git+https://github.com/flyteorg/flytekit.git@2349ceeaf74fe6449b66eee54be5667aec0b45ab"

custom_image = ImageSpec(
    registry="localhost:30000",
    packages=[new_flytekit],
    apt_packages=["git"],
    builder="default",
)


echo1 = Echo(name="echo", inputs={"a": typing.Optional[float]})


@task(container_image=custom_image)
def t1(radius: float) -> typing.Optional[float]:
    return 2 * 3.14 * radius


@workflow
def wf1(radius: float) -> typing.Optional[float]:
    return (
        conditional("shape_properties_with_multiple_branches")
        .if_((radius >= 0.1) & (radius < 1.0))
        .then(t1(radius=radius))
        .else_()
        .then(echo1(a=radius))
    )

I get this error:

Workflow[flytesnacks:development:main.wf1] failed. RuntimeExecutionError: max number of system retry attempts [11/10] exhausted. Last known status message: failed at Node[n0-n1]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [container]: [BadTaskSpecification] invalid TaskSpecification, unable to determine Pod configuration

Screenshot 2024-08-16 at 11 36 39 AM

@pingsutw
Copy link
Member Author

pingsutw commented Aug 17, 2024

It's not enabled by default in the backend. Need to update the config. just updated the PR description, sorry.

tasks:
  task-plugins:
    enabled-plugins:
      - container
      - sidecar
      - K8S-ARRAY
      - agent-service
      - echo                                                                                                                                                                                                                       
    default-for-task-types:
      - container: container
      - container_array: K8S-ARRAY

Copy link
Member

@thomasjpfan thomasjpfan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested this with the plugin and it works!

Two questions:

  1. Where can we document that echo needs to be a part of the task-plugin? From the user side, it's hard to tell that this does not work out of the box and they need to ask their platform engineer to add a new plugin.
  2. Should we add echo to the list of default plugins in the flyte repo?

A task that simply echoes the inputs back to the user.
The task's inputs and outputs interface are the same.
FlytePropeller won't create a pod for this task, it will simply pass the inputs to the outputs.
https://github.com/flyteorg/flyte/blob/master/flyteplugins/go/tasks/plugins/testing/echo.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a follow up, I think this should link out to a docs around "How to enable echo tasks". This way a user can look at the docstring and know to contact their platform engineer to enable the feature.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! good call.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will create another follow-up PR to address this bad error.

image

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just updated the docstring in this PR too

Signed-off-by: Kevin Su <[email protected]>
Copy link
Contributor

@wild-endeavor wild-endeavor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future we can improve on this a tiny bit (though hard to justify I think), but we can create a hardcoded no-op marker object, and whenever the conditional sees a no-op marker, it just automatically creates this echo task for the user.

@pingsutw pingsutw enabled auto-merge (squash) August 26, 2024 21:11
@pingsutw pingsutw merged commit 74d847a into master Aug 26, 2024
100 of 101 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants