From d9878b61095271a899af0ad7798fefbe3178ed23 Mon Sep 17 00:00:00 2001 From: "H. Furkan Vural" <33652917+hfurkanvural@users.noreply.github.com> Date: Tue, 20 Dec 2022 18:41:38 +0100 Subject: [PATCH] Add overwrite_cache option the to calls of remote and local executions (#1375) Signed-off-by: H. Furkan Vural Implemented cache overwrite feature is added on flytekit as well for the completeness. In order to support the cache eviction RFC, an overwrite parameter was added, indicating the data store should replace an existing artifact instead of creating a new one on local calls. --- flytekit/models/execution.py | 4 ++++ flytekit/remote/remote.py | 27 +++++++++++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/flytekit/models/execution.py b/flytekit/models/execution.py index 08fb3c938d..9c2d5ba2ec 100644 --- a/flytekit/models/execution.py +++ b/flytekit/models/execution.py @@ -175,6 +175,7 @@ def __init__( raw_output_data_config=None, max_parallelism=None, security_context: typing.Optional[security.SecurityContext] = None, + overwrite_cache: bool = None, ): """ :param flytekit.models.core.identifier.Identifier launch_plan: Launch plan unique identifier to execute @@ -200,6 +201,7 @@ def __init__( self._raw_output_data_config = raw_output_data_config self._max_parallelism = max_parallelism self._security_context = security_context + self.overwrite_cache = overwrite_cache @property def launch_plan(self): @@ -283,6 +285,7 @@ def to_flyte_idl(self): else None, max_parallelism=self.max_parallelism, security_context=self.security_context.to_flyte_idl() if self.security_context else None, + overwrite_cache=self.overwrite_cache, ) @classmethod @@ -306,6 +309,7 @@ def from_flyte_idl(cls, p): security_context=security.SecurityContext.from_flyte_idl(p.security_context) if p.security_context else None, + overwrite_cache=p.overwrite_cache, ) diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 00227f88f0..14cd7e11bb 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -744,6 +744,7 @@ def _execute( options: typing.Optional[Options] = None, wait: bool = False, type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None, + overwrite_cache: bool = None, ) -> FlyteWorkflowExecution: """Common method for execution across all entities. @@ -755,6 +756,9 @@ def _execute( :param wait: if True, waits for execution to complete :param type_hints: map of python types to inputs so that the TypeEngine knows how to convert the input values into Flyte Literals. + :param overwrite_cache: Allows for all cached values of a workflow and its tasks to be overwritten + for a single execution. If enabled, all calculations are performed even if cached results would + be available, overwriting the stored data once execution finishes successfully. :returns: :class:`~flytekit.remote.workflow_execution.FlyteWorkflowExecution` """ execution_name = execution_name or "f" + uuid.uuid4().hex[:19] @@ -810,6 +814,7 @@ def _execute( "placeholder", # Admin replaces this from oidc token if auth is enabled. 0, ), + overwrite_cache=overwrite_cache, notifications=notifications, disable_all=options.disable_notifications, labels=options.labels, @@ -873,6 +878,7 @@ def execute( options: typing.Optional[Options] = None, wait: bool = False, type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None, + overwrite_cache: bool = None, ) -> FlyteWorkflowExecution: """ Execute a task, workflow, or launchplan, either something that's been declared locally, or a fetched entity. @@ -906,6 +912,9 @@ def execute( using the type engine, and then to ``type(v)``. Providing the correct Python types is particularly important if the inputs are containers like lists or maps, or if the Python type is one of the more complex Flyte provided classes (like a StructuredDataset that's annotated with columns). + :param overwrite_cache: Allows for all cached values of a workflow and its tasks to be overwritten + for a single execution. If enabled, all calculations are performed even if cached results would + be available, overwriting the stored data once execution finishes successfully. .. note: @@ -924,6 +933,7 @@ def execute( options=options, wait=wait, type_hints=type_hints, + overwrite_cache=overwrite_cache, ) if isinstance(entity, FlyteWorkflow): return self.execute_remote_wf( @@ -935,6 +945,7 @@ def execute( options=options, wait=wait, type_hints=type_hints, + overwrite_cache=overwrite_cache, ) if isinstance(entity, PythonTask): return self.execute_local_task( @@ -947,6 +958,7 @@ def execute( execution_name=execution_name, image_config=image_config, wait=wait, + overwrite_cache=overwrite_cache, ) if isinstance(entity, WorkflowBase): return self.execute_local_workflow( @@ -960,6 +972,7 @@ def execute( image_config=image_config, options=options, wait=wait, + overwrite_cache=overwrite_cache, ) if isinstance(entity, LaunchPlan): return self.execute_local_launch_plan( @@ -971,6 +984,7 @@ def execute( execution_name=execution_name, options=options, wait=wait, + overwrite_cache=overwrite_cache, ) raise NotImplementedError(f"entity type {type(entity)} not recognized for execution") @@ -987,6 +1001,7 @@ def execute_remote_task_lp( options: typing.Optional[Options] = None, wait: bool = False, type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None, + overwrite_cache: bool = None, ) -> FlyteWorkflowExecution: """Execute a FlyteTask, or FlyteLaunchplan. @@ -1001,6 +1016,7 @@ def execute_remote_task_lp( wait=wait, options=options, type_hints=type_hints, + overwrite_cache=overwrite_cache, ) def execute_remote_wf( @@ -1013,6 +1029,7 @@ def execute_remote_wf( options: typing.Optional[Options] = None, wait: bool = False, type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None, + overwrite_cache: bool = None, ) -> FlyteWorkflowExecution: """Execute a FlyteWorkflow. @@ -1028,6 +1045,7 @@ def execute_remote_wf( options=options, wait=wait, type_hints=type_hints, + overwrite_cache=overwrite_cache, ) # Flytekit Entities @@ -1044,6 +1062,7 @@ def execute_local_task( execution_name: str = None, image_config: typing.Optional[ImageConfig] = None, wait: bool = False, + overwrite_cache: bool = None, ) -> FlyteWorkflowExecution: """ Execute an @task-decorated function or TaskTemplate task. @@ -1058,6 +1077,7 @@ def execute_local_task( :param execution_name: :param image_config: :param wait: + :param overwrite_cache: :return: """ resolved_identifiers = self._resolve_identifier_kwargs(entity, project, domain, name, version) @@ -1084,6 +1104,7 @@ def execute_local_task( execution_name=execution_name, wait=wait, type_hints=entity.python_interface.inputs, + overwrite_cache=overwrite_cache, ) def execute_local_workflow( @@ -1098,6 +1119,7 @@ def execute_local_workflow( image_config: typing.Optional[ImageConfig] = None, options: typing.Optional[Options] = None, wait: bool = False, + overwrite_cache: bool = None, ) -> FlyteWorkflowExecution: """ Execute an @workflow decorated function. @@ -1111,6 +1133,7 @@ def execute_local_workflow( :param image_config: :param options: :param wait: + :param overwrite_cache: :return: """ resolved_identifiers = self._resolve_identifier_kwargs(entity, project, domain, name, version) @@ -1155,6 +1178,7 @@ def execute_local_workflow( wait=wait, options=options, type_hints=entity.python_interface.inputs, + overwrite_cache=overwrite_cache, ) def execute_local_launch_plan( @@ -1167,6 +1191,7 @@ def execute_local_launch_plan( execution_name: typing.Optional[str] = None, options: typing.Optional[Options] = None, wait: bool = False, + overwrite_cache: bool = None, ) -> FlyteWorkflowExecution: """ @@ -1178,6 +1203,7 @@ def execute_local_launch_plan( :param execution_name: If specified, will be used as the execution name instead of randomly generating. :param options: :param wait: + :param overwrite_cache: :return: """ try: @@ -1203,6 +1229,7 @@ def execute_local_launch_plan( options=options, wait=wait, type_hints=entity.python_interface.inputs, + overwrite_cache=overwrite_cache, ) ###################################