From a8ee8b13587ab592cc64bc6d58419f9636c3977b Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sun, 14 Jul 2024 22:34:15 -0700 Subject: [PATCH 1/8] Add noop task Signed-off-by: Kevin Su --- flytekit/core/condition.py | 10 ++++------ flytekit/core/task.py | 24 ++++++++++++++++++++++-- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/flytekit/core/condition.py b/flytekit/core/condition.py index 50403574c1..5608b569f3 100644 --- a/flytekit/core/condition.py +++ b/flytekit/core/condition.py @@ -111,8 +111,8 @@ def end_branch(self) -> Optional[Union[Condition, Promise, Tuple[Promise], VoidP return self._compute_outputs(n) return self._condition - def if_(self, expr: Union[ComparisonExpression, ConjunctionExpression]) -> Case: - return self._condition._if(expr) + def if_(self, expr: Union[ComparisonExpression, ConjunctionExpression], last_case=False) -> Case: + return self._condition._if(expr, last_case) def compute_output_vars(self) -> typing.Optional[typing.List[str]]: """ @@ -329,10 +329,10 @@ class Condition(object): def __init__(self, cs: ConditionalSection): self._cs = cs - def _if(self, expr: Union[ComparisonExpression, ConjunctionExpression]) -> Case: + def _if(self, expr: Union[ComparisonExpression, ConjunctionExpression], last_case=False) -> Case: if expr is None: raise AssertionError(f"Required an expression received None for condition:{self._cs.name}.if_(...)") - return self._cs.start_branch(Case(cs=self._cs, expr=expr, stmt="if_")) + return self._cs.start_branch(Case(cs=self._cs, expr=expr, stmt="if_"), last_case) def elif_(self, expr: Union[ComparisonExpression, ConjunctionExpression]) -> Case: if expr is None: @@ -447,8 +447,6 @@ def to_case_block(c: Case) -> Tuple[Union[_core_wf.IfBlock], typing.List[Promise def to_ifelse_block(node_id: str, cs: ConditionalSection) -> Tuple[_core_wf.IfElseBlock, typing.List[Binding]]: if len(cs.cases) == 0: raise AssertionError("Illegal Condition block, with no if-else cases") - if len(cs.cases) < 2: - raise AssertionError("At least an if/else is required. Dangling If is not allowed") all_promises: typing.List[Promise] = [] first_case, promises = to_case_block(cs.cases[0]) all_promises.extend(promises) diff --git a/flytekit/core/task.py b/flytekit/core/task.py index 7e420269d3..ce9f2fc56d 100644 --- a/flytekit/core/task.py +++ b/flytekit/core/task.py @@ -6,8 +6,8 @@ from flytekit.core import launch_plan as _annotated_launchplan from flytekit.core import workflow as _annotated_workflow -from flytekit.core.base_task import TaskMetadata, TaskResolverMixin -from flytekit.core.interface import transform_function_to_interface +from flytekit.core.base_task import PythonTask, TaskMetadata, TaskResolverMixin +from flytekit.core.interface import Interface, transform_function_to_interface from flytekit.core.pod_template import PodTemplate from flytekit.core.python_function_task import PythonFunctionTask from flytekit.core.reference_entity import ReferenceEntity, TaskReference @@ -410,3 +410,23 @@ def wrapper(fn) -> ReferenceTask: return ReferenceTask(project, domain, name, version, interface.inputs, interface.outputs) return wrapper + + +class Noop(PythonTask): + """ + This is the simplest form of a ChatGPT Task, you can define the model and the input you want. + """ + + _TASK_TYPE = "noop" + + def __init__( + self, name: str, inputs: Optional[Dict[str, Type]] = None, outputs: Optional[Dict[str, Type]] = None, **kwargs + ): + if outputs is None: + outputs = {"o0": None} + super().__init__( + task_type=self._TASK_TYPE, + name=name, + interface=Interface(inputs=inputs, outputs=outputs), + **kwargs, + ) From 5d6e9a0d2882d7484db682e065dfa269d8a02964 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 16 Jul 2024 11:42:13 -0700 Subject: [PATCH 2/8] test Signed-off-by: Kevin Su --- flytekit/core/task.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/flytekit/core/task.py b/flytekit/core/task.py index ce9f2fc56d..96e9472aa2 100644 --- a/flytekit/core/task.py +++ b/flytekit/core/task.py @@ -412,12 +412,12 @@ def wrapper(fn) -> ReferenceTask: return wrapper -class Noop(PythonTask): +class Echo(PythonTask): """ - This is the simplest form of a ChatGPT Task, you can define the model and the input you want. + A task that simply echoes the inputs back to the user. """ - _TASK_TYPE = "noop" + _TASK_TYPE = "echo" def __init__( self, name: str, inputs: Optional[Dict[str, Type]] = None, outputs: Optional[Dict[str, Type]] = None, **kwargs @@ -430,3 +430,8 @@ def __init__( interface=Interface(inputs=inputs, outputs=outputs), **kwargs, ) + + def execute(self, **kwargs) -> Any: + for k, v in kwargs.items(): + print(f"{k} = {v}") + return kwargs["a"] From 378addeefe0e8ae778c79d463942ad250499e2a1 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sat, 10 Aug 2024 22:25:19 -0700 Subject: [PATCH 3/8] wip Signed-off-by: Kevin Su --- flytekit/core/condition.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/flytekit/core/condition.py b/flytekit/core/condition.py index 5608b569f3..50403574c1 100644 --- a/flytekit/core/condition.py +++ b/flytekit/core/condition.py @@ -111,8 +111,8 @@ def end_branch(self) -> Optional[Union[Condition, Promise, Tuple[Promise], VoidP return self._compute_outputs(n) return self._condition - def if_(self, expr: Union[ComparisonExpression, ConjunctionExpression], last_case=False) -> Case: - return self._condition._if(expr, last_case) + def if_(self, expr: Union[ComparisonExpression, ConjunctionExpression]) -> Case: + return self._condition._if(expr) def compute_output_vars(self) -> typing.Optional[typing.List[str]]: """ @@ -329,10 +329,10 @@ class Condition(object): def __init__(self, cs: ConditionalSection): self._cs = cs - def _if(self, expr: Union[ComparisonExpression, ConjunctionExpression], last_case=False) -> Case: + def _if(self, expr: Union[ComparisonExpression, ConjunctionExpression]) -> Case: if expr is None: raise AssertionError(f"Required an expression received None for condition:{self._cs.name}.if_(...)") - return self._cs.start_branch(Case(cs=self._cs, expr=expr, stmt="if_"), last_case) + return self._cs.start_branch(Case(cs=self._cs, expr=expr, stmt="if_")) def elif_(self, expr: Union[ComparisonExpression, ConjunctionExpression]) -> Case: if expr is None: @@ -447,6 +447,8 @@ def to_case_block(c: Case) -> Tuple[Union[_core_wf.IfBlock], typing.List[Promise def to_ifelse_block(node_id: str, cs: ConditionalSection) -> Tuple[_core_wf.IfElseBlock, typing.List[Binding]]: if len(cs.cases) == 0: raise AssertionError("Illegal Condition block, with no if-else cases") + if len(cs.cases) < 2: + raise AssertionError("At least an if/else is required. Dangling If is not allowed") all_promises: typing.List[Promise] = [] first_case, promises = to_case_block(cs.cases[0]) all_promises.extend(promises) From 5e6cc6d245f0da9926f9de7981ca1a243b40ef7e Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sat, 10 Aug 2024 23:47:00 -0700 Subject: [PATCH 4/8] Add a test Signed-off-by: Kevin Su --- flytekit/core/task.py | 19 +++++----- tests/flytekit/unit/core/test_conditions.py | 39 +++++++++++++++++++++ 2 files changed, 49 insertions(+), 9 deletions(-) diff --git a/flytekit/core/task.py b/flytekit/core/task.py index 9cecc0203b..bc64c4c037 100644 --- a/flytekit/core/task.py +++ b/flytekit/core/task.py @@ -12,7 +12,7 @@ from flytekit.core import launch_plan as _annotated_launchplan from flytekit.core import workflow as _annotated_workflow from flytekit.core.base_task import PythonTask, TaskMetadata, TaskResolverMixin -from flytekit.core.interface import Interface, transform_function_to_interface +from flytekit.core.interface import Interface, output_name_generator, transform_function_to_interface from flytekit.core.pod_template import PodTemplate from flytekit.core.python_function_task import PythonFunctionTask from flytekit.core.reference_entity import ReferenceEntity, TaskReference @@ -421,15 +421,14 @@ def wrapper(fn) -> ReferenceTask: class Echo(PythonTask): """ A task that simply echoes the inputs back to the user. + The task's inputs and outputs interface are the same. + FlytePropeller won't create a pod for this task, it will simply pass the inputs to the outputs. """ _TASK_TYPE = "echo" - def __init__( - self, name: str, inputs: Optional[Dict[str, Type]] = None, outputs: Optional[Dict[str, Type]] = None, **kwargs - ): - if outputs is None: - outputs = {"o0": None} + def __init__(self, name: str, inputs: Optional[Dict[str, Type]] = None, **kwargs): + outputs = dict(zip(list(output_name_generator(len(inputs.values()))), inputs.values())) if inputs else None super().__init__( task_type=self._TASK_TYPE, name=name, @@ -438,6 +437,8 @@ def __init__( ) def execute(self, **kwargs) -> Any: - for k, v in kwargs.items(): - print(f"{k} = {v}") - return kwargs["a"] + values = list(kwargs.values()) + if len(values) == 1: + return values[0] + else: + return tuple(values) diff --git a/tests/flytekit/unit/core/test_conditions.py b/tests/flytekit/unit/core/test_conditions.py index b3bf0c5eab..5f365a6ecc 100644 --- a/tests/flytekit/unit/core/test_conditions.py +++ b/tests/flytekit/unit/core/test_conditions.py @@ -9,6 +9,7 @@ from flytekit import task, workflow from flytekit.configuration import Image, ImageConfig, SerializationSettings from flytekit.core.condition import conditional +from flytekit.core.task import Echo from flytekit.models.core.workflow import Node from flytekit.tools.translator import get_serializable @@ -495,3 +496,41 @@ def multiplier_2(my_input: float) -> float: res = multiplier_2(my_input=10.0) assert res == 20 + + +def test_echo_in_condition(): + echo1 = Echo(name="echo", inputs={"a": float}) + + @task() + def t1(radius: float) -> typing.Optional[float]: + return 2 * 3.14 * radius + + @workflow + def wf1(radius: float) -> typing.Optional[float]: + return ( + conditional("shape_properties_with_multiple_branches") + .if_((radius >= 0.1) & (radius < 1.0)) + .then(t1(radius=radius)) + .else_() + .then(echo1(a=radius)) + ) + + assert wf1(radius=1.8) == 1.8 + + echo2 = Echo(name="echo", inputs={"a": float, "b": float}) + + @task() + def t2(radius: float) -> (float, float): + return 2 * 3.14 * radius, 2 * 3.14 * radius + + @workflow + def wf2(radius1: float, radius2: float) -> (float, float): + return ( + conditional("shape_properties_with_multiple_branches") + .if_((radius1 >= 0.1) & (radius1 < 1.0)) + .then(t2(radius=radius2)) + .else_() + .then(echo2(a=radius1, b=radius2)) + ) + + assert wf2(radius1=1.8, radius2=1.8) == (1.8, 1.8) From 68c6cf91891923c4cc898b2a493d42927035ce32 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sat, 10 Aug 2024 23:56:54 -0700 Subject: [PATCH 5/8] nit Signed-off-by: Kevin Su --- flytekit/core/task.py | 1 + 1 file changed, 1 insertion(+) diff --git a/flytekit/core/task.py b/flytekit/core/task.py index bc64c4c037..296b912883 100644 --- a/flytekit/core/task.py +++ b/flytekit/core/task.py @@ -423,6 +423,7 @@ class Echo(PythonTask): A task that simply echoes the inputs back to the user. The task's inputs and outputs interface are the same. FlytePropeller won't create a pod for this task, it will simply pass the inputs to the outputs. + https://github.com/flyteorg/flyte/blob/master/flyteplugins/go/tasks/plugins/testing/echo.go """ _TASK_TYPE = "echo" From 0fd8c2a1003dd1d91450af011dceeebefd3b20b0 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 12 Aug 2024 13:51:02 -0700 Subject: [PATCH 6/8] address comments Signed-off-by: Kevin Su --- flytekit/core/task.py | 21 +++++++++++++-------- tests/flytekit/unit/core/test_conditions.py | 4 ++-- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/flytekit/core/task.py b/flytekit/core/task.py index 296b912883..68947b0883 100644 --- a/flytekit/core/task.py +++ b/flytekit/core/task.py @@ -419,17 +419,22 @@ def wrapper(fn) -> ReferenceTask: class Echo(PythonTask): - """ - A task that simply echoes the inputs back to the user. - The task's inputs and outputs interface are the same. - FlytePropeller won't create a pod for this task, it will simply pass the inputs to the outputs. - https://github.com/flyteorg/flyte/blob/master/flyteplugins/go/tasks/plugins/testing/echo.go - """ - _TASK_TYPE = "echo" def __init__(self, name: str, inputs: Optional[Dict[str, Type]] = None, **kwargs): - outputs = dict(zip(list(output_name_generator(len(inputs.values()))), inputs.values())) if inputs else None + """ + A task that simply echoes the inputs back to the user. + The task's inputs and outputs interface are the same. + FlytePropeller won't create a pod for this task, it will simply pass the inputs to the outputs. + https://github.com/flyteorg/flyte/blob/master/flyteplugins/go/tasks/plugins/testing/echo.go + + :param name: The name of the task. + :param inputs: Name and type of inputs specified as a dictionary. + e.g. {"a": int, "b": str}. + :param kwargs: All other args required by the parent type - PythonTask. + + """ + outputs = dict(zip(output_name_generator(len(inputs)), inputs.values())) if inputs else None super().__init__( task_type=self._TASK_TYPE, name=name, diff --git a/tests/flytekit/unit/core/test_conditions.py b/tests/flytekit/unit/core/test_conditions.py index 5f365a6ecc..8035428719 100644 --- a/tests/flytekit/unit/core/test_conditions.py +++ b/tests/flytekit/unit/core/test_conditions.py @@ -520,11 +520,11 @@ def wf1(radius: float) -> typing.Optional[float]: echo2 = Echo(name="echo", inputs={"a": float, "b": float}) @task() - def t2(radius: float) -> (float, float): + def t2(radius: float) -> typing.Tuple[float, float]: return 2 * 3.14 * radius, 2 * 3.14 * radius @workflow - def wf2(radius1: float, radius2: float) -> (float, float): + def wf2(radius1: float, radius2: float) -> typing.Tuple[float, float]: return ( conditional("shape_properties_with_multiple_branches") .if_((radius1 >= 0.1) & (radius1 < 1.0)) From 2349ceeaf74fe6449b66eee54be5667aec0b45ab Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 12 Aug 2024 13:53:00 -0700 Subject: [PATCH 7/8] nit Signed-off-by: Kevin Su --- tests/flytekit/unit/core/test_conditions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/flytekit/unit/core/test_conditions.py b/tests/flytekit/unit/core/test_conditions.py index 8035428719..53a924d697 100644 --- a/tests/flytekit/unit/core/test_conditions.py +++ b/tests/flytekit/unit/core/test_conditions.py @@ -499,7 +499,7 @@ def multiplier_2(my_input: float) -> float: def test_echo_in_condition(): - echo1 = Echo(name="echo", inputs={"a": float}) + echo1 = Echo(name="echo", inputs={"a": typing.Optional[float]}) @task() def t1(radius: float) -> typing.Optional[float]: From 0176727f4f30bf540ec6d98e47c30e2967976806 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 20 Aug 2024 17:03:25 -0700 Subject: [PATCH 8/8] updated docstring Signed-off-by: Kevin Su --- flytekit/core/task.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/flytekit/core/task.py b/flytekit/core/task.py index 68947b0883..2588248488 100644 --- a/flytekit/core/task.py +++ b/flytekit/core/task.py @@ -425,9 +425,18 @@ def __init__(self, name: str, inputs: Optional[Dict[str, Type]] = None, **kwargs """ A task that simply echoes the inputs back to the user. The task's inputs and outputs interface are the same. - FlytePropeller won't create a pod for this task, it will simply pass the inputs to the outputs. + + FlytePropeller uses echo plugin to handle this task, and it won't create a pod for this task. + It will simply pass the inputs to the outputs. https://github.com/flyteorg/flyte/blob/master/flyteplugins/go/tasks/plugins/testing/echo.go + Note: Make sure to enable the echo plugin in the propeller config to use this task. + ``` + task-plugins: + enabled-plugins: + - echo + ``` + :param name: The name of the task. :param inputs: Name and type of inputs specified as a dictionary. e.g. {"a": int, "b": str}.