From 9cda51ac03c73425199123e417dd57b19dc90ad4 Mon Sep 17 00:00:00 2001 From: "H. Furkan Vural" Date: Mon, 19 Dec 2022 12:53:29 +0100 Subject: [PATCH 1/2] Add overwrite_cache option on execute Signed-off-by: H. Furkan Vural --- flytekit/models/execution.py | 4 ++++ flytekit/remote/remote.py | 24 +++++++++++++++++++++++- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/flytekit/models/execution.py b/flytekit/models/execution.py index 08fb3c938d..9c2d5ba2ec 100644 --- a/flytekit/models/execution.py +++ b/flytekit/models/execution.py @@ -175,6 +175,7 @@ def __init__( raw_output_data_config=None, max_parallelism=None, security_context: typing.Optional[security.SecurityContext] = None, + overwrite_cache: bool = None, ): """ :param flytekit.models.core.identifier.Identifier launch_plan: Launch plan unique identifier to execute @@ -200,6 +201,7 @@ def __init__( self._raw_output_data_config = raw_output_data_config self._max_parallelism = max_parallelism self._security_context = security_context + self.overwrite_cache = overwrite_cache @property def launch_plan(self): @@ -283,6 +285,7 @@ def to_flyte_idl(self): else None, max_parallelism=self.max_parallelism, security_context=self.security_context.to_flyte_idl() if self.security_context else None, + overwrite_cache=self.overwrite_cache, ) @classmethod @@ -306,6 +309,7 @@ def from_flyte_idl(cls, p): security_context=security.SecurityContext.from_flyte_idl(p.security_context) if p.security_context else None, + overwrite_cache=p.overwrite_cache, ) diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 00227f88f0..271cc0d38e 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -744,6 +744,7 @@ def _execute( options: typing.Optional[Options] = None, wait: bool = False, type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None, + overwrite_cache: bool = None, ) -> FlyteWorkflowExecution: """Common method for execution across all entities. @@ -755,6 +756,7 @@ def _execute( :param wait: if True, waits for execution to complete :param type_hints: map of python types to inputs so that the TypeEngine knows how to convert the input values into Flyte Literals. + :param overwrite_cache: execute entity by overwriting the existing cache (if there is any) :returns: :class:`~flytekit.remote.workflow_execution.FlyteWorkflowExecution` """ execution_name = execution_name or "f" + uuid.uuid4().hex[:19] @@ -810,6 +812,7 @@ def _execute( "placeholder", # Admin replaces this from oidc token if auth is enabled. 0, ), + overwrite_cache=overwrite_cache, notifications=notifications, disable_all=options.disable_notifications, labels=options.labels, @@ -873,6 +876,7 @@ def execute( options: typing.Optional[Options] = None, wait: bool = False, type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None, + overwrite_cache: bool = None, ) -> FlyteWorkflowExecution: """ Execute a task, workflow, or launchplan, either something that's been declared locally, or a fetched entity. @@ -906,7 +910,7 @@ def execute( using the type engine, and then to ``type(v)``. Providing the correct Python types is particularly important if the inputs are containers like lists or maps, or if the Python type is one of the more complex Flyte provided classes (like a StructuredDataset that's annotated with columns). - + :param overwrite_cache: execute entity by overwriting the existing cache (if there is any) .. note: The ``name`` and ``version`` arguments do not apply to ``FlyteTask``, ``FlyteLaunchPlan``, and @@ -924,6 +928,7 @@ def execute( options=options, wait=wait, type_hints=type_hints, + overwrite_cache=overwrite_cache, ) if isinstance(entity, FlyteWorkflow): return self.execute_remote_wf( @@ -935,6 +940,7 @@ def execute( options=options, wait=wait, type_hints=type_hints, + overwrite_cache=overwrite_cache, ) if isinstance(entity, PythonTask): return self.execute_local_task( @@ -947,6 +953,7 @@ def execute( execution_name=execution_name, image_config=image_config, wait=wait, + overwrite_cache=overwrite_cache, ) if isinstance(entity, WorkflowBase): return self.execute_local_workflow( @@ -960,6 +967,7 @@ def execute( image_config=image_config, options=options, wait=wait, + overwrite_cache=overwrite_cache, ) if isinstance(entity, LaunchPlan): return self.execute_local_launch_plan( @@ -971,6 +979,7 @@ def execute( execution_name=execution_name, options=options, wait=wait, + overwrite_cache=overwrite_cache, ) raise NotImplementedError(f"entity type {type(entity)} not recognized for execution") @@ -987,6 +996,7 @@ def execute_remote_task_lp( options: typing.Optional[Options] = None, wait: bool = False, type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None, + overwrite_cache: bool = None, ) -> FlyteWorkflowExecution: """Execute a FlyteTask, or FlyteLaunchplan. @@ -1001,6 +1011,7 @@ def execute_remote_task_lp( wait=wait, options=options, type_hints=type_hints, + overwrite_cache=overwrite_cache, ) def execute_remote_wf( @@ -1013,6 +1024,7 @@ def execute_remote_wf( options: typing.Optional[Options] = None, wait: bool = False, type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None, + overwrite_cache: bool = None, ) -> FlyteWorkflowExecution: """Execute a FlyteWorkflow. @@ -1028,6 +1040,7 @@ def execute_remote_wf( options=options, wait=wait, type_hints=type_hints, + overwrite_cache=overwrite_cache, ) # Flytekit Entities @@ -1044,6 +1057,7 @@ def execute_local_task( execution_name: str = None, image_config: typing.Optional[ImageConfig] = None, wait: bool = False, + overwrite_cache: bool = None, ) -> FlyteWorkflowExecution: """ Execute an @task-decorated function or TaskTemplate task. @@ -1058,6 +1072,7 @@ def execute_local_task( :param execution_name: :param image_config: :param wait: + :param overwrite_cache: :return: """ resolved_identifiers = self._resolve_identifier_kwargs(entity, project, domain, name, version) @@ -1084,6 +1099,7 @@ def execute_local_task( execution_name=execution_name, wait=wait, type_hints=entity.python_interface.inputs, + overwrite_cache=overwrite_cache, ) def execute_local_workflow( @@ -1098,6 +1114,7 @@ def execute_local_workflow( image_config: typing.Optional[ImageConfig] = None, options: typing.Optional[Options] = None, wait: bool = False, + overwrite_cache: bool = None, ) -> FlyteWorkflowExecution: """ Execute an @workflow decorated function. @@ -1111,6 +1128,7 @@ def execute_local_workflow( :param image_config: :param options: :param wait: + :param overwrite_cache: :return: """ resolved_identifiers = self._resolve_identifier_kwargs(entity, project, domain, name, version) @@ -1155,6 +1173,7 @@ def execute_local_workflow( wait=wait, options=options, type_hints=entity.python_interface.inputs, + overwrite_cache=overwrite_cache, ) def execute_local_launch_plan( @@ -1167,6 +1186,7 @@ def execute_local_launch_plan( execution_name: typing.Optional[str] = None, options: typing.Optional[Options] = None, wait: bool = False, + overwrite_cache: bool = None, ) -> FlyteWorkflowExecution: """ @@ -1178,6 +1198,7 @@ def execute_local_launch_plan( :param execution_name: If specified, will be used as the execution name instead of randomly generating. :param options: :param wait: + :param overwrite_cache: :return: """ try: @@ -1203,6 +1224,7 @@ def execute_local_launch_plan( options=options, wait=wait, type_hints=entity.python_interface.inputs, + overwrite_cache=overwrite_cache, ) ################################### From a9ea6f09b47c10677004972e1dde10900b346a39 Mon Sep 17 00:00:00 2001 From: "H. Furkan Vural" Date: Tue, 20 Dec 2022 08:53:30 +0100 Subject: [PATCH 2/2] Update docstrings Signed-off-by: H. Furkan Vural --- flytekit/remote/remote.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 271cc0d38e..14cd7e11bb 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -756,7 +756,9 @@ def _execute( :param wait: if True, waits for execution to complete :param type_hints: map of python types to inputs so that the TypeEngine knows how to convert the input values into Flyte Literals. - :param overwrite_cache: execute entity by overwriting the existing cache (if there is any) + :param overwrite_cache: Allows for all cached values of a workflow and its tasks to be overwritten + for a single execution. If enabled, all calculations are performed even if cached results would + be available, overwriting the stored data once execution finishes successfully. :returns: :class:`~flytekit.remote.workflow_execution.FlyteWorkflowExecution` """ execution_name = execution_name or "f" + uuid.uuid4().hex[:19] @@ -910,7 +912,10 @@ def execute( using the type engine, and then to ``type(v)``. Providing the correct Python types is particularly important if the inputs are containers like lists or maps, or if the Python type is one of the more complex Flyte provided classes (like a StructuredDataset that's annotated with columns). - :param overwrite_cache: execute entity by overwriting the existing cache (if there is any) + :param overwrite_cache: Allows for all cached values of a workflow and its tasks to be overwritten + for a single execution. If enabled, all calculations are performed even if cached results would + be available, overwriting the stored data once execution finishes successfully. + .. note: The ``name`` and ``version`` arguments do not apply to ``FlyteTask``, ``FlyteLaunchPlan``, and