diff --git a/cmflib/cli/__init__.py b/cmflib/cli/__init__.py index 58e21183..eeafc798 100644 --- a/cmflib/cli/__init__.py +++ b/cmflib/cli/__init__.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. ### - +from cmflib.cmf_exception_handling import CmfResponse class CmfParserError(Exception): """Base class for CLI parser errors.""" @@ -36,6 +36,7 @@ def parse_args(argv=None): parser = get_main_parser() args = parser.parse_args(argv) + args.parser = parser return args @@ -54,10 +55,14 @@ def main(argv=None): args = parse_args(argv) cmd = args.func(args) msg = cmd.do_run() - print(msg) + print(msg.handle()) + except CmfResponse as e: + print(e.handle()) except CmfParserError: - pass + pass except KeyboardInterrupt: print("Interrupted by the user") except Exception as e: print(e) + + diff --git a/cmflib/cmf_exception_handling.py b/cmflib/cmf_exception_handling.py new file mode 100644 index 00000000..5cc80678 --- /dev/null +++ b/cmflib/cmf_exception_handling.py @@ -0,0 +1,399 @@ +### +# Copyright (2024) Hewlett Packard Enterprise Development LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### + +#!/usr/bin/env python3 +from typing import Optional, List + + +class CmfResponse(Exception): + """ + Response and Exceptions raised by the CMF. + CmfResponse includes two child classes, + 1. CmfSuccess + 2. CmfFailure + On the basis of success and failure various child classes are created. + + Base class for all the cmf responses and exceptions. + """ + + def __init__(self, return_code=None, status="failure", *args): + self.return_code = return_code + self.status = status + super().__init__(*args) + + +class CmfFailure(CmfResponse): + def __init__(self, return_code=None, *args): + super().__init__(return_code, status="failure", *args) + + +# Subclass for Success Cases +class CmfSuccess(CmfResponse): + def __init__(self, return_code=None, *args): + super().__init__(return_code, status="success", *args) + + +"""CMF Success Class""" + + +class ExecutionsAlreadyExists(CmfSuccess): + def __init__(self, return_code=201): + super().__init__(return_code) + + def handle(self): + return "INFO: Executions already exists." + + +class ObjectDownloadSuccess(CmfSuccess): + def __init__(self, object_name, download_loc, return_code=202): + self.object_name = object_name + self.download_loc = download_loc + super().__init__(return_code) + + def handle(self): + return f"SUCCESS: Object {self.object_name} downloaded at {self.download_loc}." + + +class BatchDownloadSuccess(CmfSuccess): + def __init__(self, files_downloaded, return_code=203): + self.files_downloaded = files_downloaded + super().__init__(return_code) + + def handle(self): + return f"SUCCESS: Number of files downloaded = {self.files_downloaded }." + + +class MlmdFilePullSuccess(CmfSuccess): + def __init__(self, full_path_to_dump, return_code=204): + self.full_path_to_dump = full_path_to_dump + super().__init__(return_code) + + def handle(self): + return f"SUCCESS: {self.full_path_to_dump} is successfully pulled." + + +class MlmdFilePushSuccess(CmfSuccess): + def __init__(self, file_name, return_code=205): + self.file_name = file_name + super().__init__(return_code) + + def handle(self): + return f"SUCCESS: {self.file_name} is successfully pushed." + + +class TensorboardPushSuccess(CmfSuccess): + def __init__(self, tensorboard_file_name: str = "All", return_code=206): + self.tensorboard_file_name = tensorboard_file_name + super().__init__(return_code) + + def handle(self): + if self.tensorboard_file_name == "All": + return f"SUCCESS: All tensorboard logs pushed successfully." + return ( + f"tensorboard logs: file {self.tensorboard_file_name} pushed successfully." + ) + + +class CmfInitComplete(CmfSuccess): + def __init__(self, return_code=207): + super().__init__(return_code) + + def handle(self): + return "SUCCESS: cmf init complete." + + +class CmfInitShow(CmfSuccess): + def __init__(self, result, attr_str, return_code=208): + self.result = result + self.attr_str = attr_str + super().__init__(return_code) + + def handle(self): + return f"{self.result}\n{self.attr_str}" + + +class ArtifactPushSuccess(CmfSuccess): + def __init__(self, message, return_code=209): + self.message = message + super().__init__(return_code) + + def handle(self): + return self.message + + +class MetadataExportToJson(CmfSuccess): + def __init__(self, full_path_to_dump, return_code=210): + self.full_path_to_dump = full_path_to_dump + super().__init__(return_code) + + def handle(self): + return f"SUCCESS: metadata successfully exported in {self.full_path_to_dump}." + + +# This class is created for messages like "Done", "Records not found" +class MsgSuccess(CmfSuccess): + def __init__( + self, + msg_str: Optional[str] = None, + msg_list: Optional[List[str]] = None, + return_code=211, + ): + self.msg_str = msg_str + self.msg_list = msg_list + super().__init__(return_code) + + def handle(self): + if self.msg_list != None: + return self.msg_list + else: + return self.msg_str + + +""" CMF FAILURE CLASSES""" + + +class PipelineNotFound(CmfFailure): + def __init__(self, pipeline_name, return_code=101): + self.pipeline_name = pipeline_name + super().__init__(return_code) + + def handle(self): + return f"ERROR: Pipeline name {self.pipeline_name} doesn't exist." + + +class FileNotFound(CmfFailure): + def __init__(self, file_name, directory, return_code=102): + self.directory = directory + self.file_name = file_name + super().__init__(return_code) + + def handle(self): + return f"ERROR: File {self.file_name} doesn't exists in {self.directory} directory." + + +class BucketNotFound(CmfFailure): + def __init__(self, bucket_name, return_code=103): + self.bucket_name = bucket_name + super().__init__(return_code) + + def handle(self): + return f"ERROR: Bucket {self.bucket_name} doesn't exist." + + +class ExecutionsNotFound(CmfFailure): + def __init__(self, return_code=104): + super().__init__(return_code) + + def handle(self): + return f"ERROR: Executions not found." + + +class ExecutionIDNotFound(CmfFailure): + def __init__(self, exec_id, return_code=105): + self.exec_id = exec_id + super().__init__(return_code) + + def handle(self): + return f"ERROR: Execution id {self.exec_id} is not present in mlmd." + + +class ArtifactNotFound(CmfFailure): + def __init__(self, artifact_name, return_code=106): + self.artifact_name = artifact_name + super().__init__(return_code) + + def handle(self): + return f"ERROR: Artifact {self.artifact_name} not found." + + +class ObjectDownloadFailure(CmfFailure): + def __init__(self, object_name, return_code=107): + self.object_name = object_name + super().__init__(return_code) + + def handle(self): + return f"Object {self.object_name} is not downloaded." + + +class BatchDownloadFailure(CmfFailure): + def __init__(self, files_downloaded, Files_failed_to_download, return_code=108): + self.files_downloaded = files_downloaded + self.Files_failed_to_download = Files_failed_to_download + super().__init__(return_code) + + def handle(self): + return f"INFO: Number of files downloaded = {self.files_downloaded }. Files failed to download = {self.Files_failed_to_download}." + + +class Minios3ServerInactive(CmfFailure): + def __init__(self, return_code=109): + super().__init__(return_code) + + def handle(self): + return f"ERROR: MinioS3 server is not running!!!" + + +class CmfNotConfigured(CmfFailure): + def __init__(self, message, return_code=110): + self.message = message + super().__init__(return_code) + + def handle(self): + return self.message + + +class MlmdNotFoundOnServer(CmfFailure): + def __init__(self, return_code=111): + super().__init__(return_code) + + def handle(self): + return "ERROR: Metadata file not available on cmf-server." + + +class UpdateCmfVersion(CmfFailure): + def __init__(self, return_code=112): + super().__init__(return_code) + + def handle(self): + return "ERROR: You need to update cmf to the latest version. Unable to push metadata file." + + +class TensorboardPushFailure(CmfFailure): + def __init__(self, tensorboard_file_name, response_text, return_code=113): + self.tensorboard_file_name = tensorboard_file_name + self.response_text = response_text + super().__init__(return_code) + + def handle(self): + return f"ERROR: Failed to upload file {self.tensorboard_file_name}. Server response: {self.response_text}." + + +class Neo4jArgumentNotProvided(CmfFailure): + def __init__(self, return_code=114): + super().__init__(return_code) + + def handle(self): + return "ERROR: Provide user, password and uri for neo4j initialization." + + +class CmfInitFailed(CmfFailure): + def __init__(self, return_code=115): + super().__init__(return_code) + + def handle(self): + return "ERROR: cmf init failed." + + +class CmfServerNotAvailable(CmfFailure): + def __init__(self, return_code=116): + super().__init__(return_code) + + def handle(self): + return "ERROR: cmf-server is not available." + + +class InternalServerError(CmfFailure): + def __init__(self, return_code=117): + super().__init__(return_code) + + def handle(self): + return "cmf-server error: The server encountered an unexpected error." + + +class MlmdFilePullFailure(CmfFailure): + def __init__(self, return_code=118): + super().__init__(return_code) + + def handle(self): + return "ERROR: Unable to pull metadata file." + + +class DirectoryNotfound(CmfFailure): + def __init__(self, dir, return_code=119): + self.dir = dir + super().__init__(return_code) + + def handle(self): + return f"ERROR: {self.dir} doesn't exists." + + +class FileNameNotfound(CmfFailure): + def __init__(self, return_code=120): + super().__init__(return_code) + + def handle(self): + return "ERROR: Provide path with file name." + + +class NoDataFoundOsdf(CmfFailure): + def __init__(self, return_code=121): + super().__init__(return_code) + + def handle(self): + return "ERROR: No data received from the server." + + +class InvalidTensorboardFilePath(CmfFailure): + def __init__(self, return_code=122): + super().__init__(return_code) + + def handle(self): + return "ERROR: Invalid tensorboard logs path. Provide valid file/folder path for tensorboard logs!!" + + +class DuplicateArgumentNotAllowed(CmfFailure): + def __init__(self, argument_name, argument_flag, return_code=123): + self.argument_flag = argument_flag + self.argument_name = argument_name + super().__init__(return_code) + + def handle(self): + return f"Error: You can only provide one {self.argument_name} using the {self.argument_flag} flag." + + +class MissingArgument(CmfFailure): + def __init__(self, argument_name, return_code=124): + self.argument_name = argument_name + super().__init__(return_code) + + def handle(self): + return f"Error: Missing {self.argument_name}" + + +class NoChangesMadeInfo(CmfFailure): + def __init__(self, return_code=125): + super().__init__(return_code) + + def handle(self): + return "INFO: No changes made to the file. Operation aborted." + + +class MsgFailure(CmfFailure): + def __init__( + self, + msg_str: Optional[str] = None, + msg_list: Optional[List[str]] = None, + return_code=126, + ): + self.msg_str = msg_str + self.msg_list = msg_list + super().__init__(return_code) + + def handle(self): + if self.msg_list != None: + return self.msg_list + else: + return self.msg_str diff --git a/cmflib/commands/artifact/list.py b/cmflib/commands/artifact/list.py index cedf3164..9670d040 100644 --- a/cmflib/commands/artifact/list.py +++ b/cmflib/commands/artifact/list.py @@ -20,10 +20,17 @@ import textwrap from tabulate import tabulate +from typing import Union, List from cmflib.cli.command import CmdBase from cmflib import cmfquery -from cmflib.dvc_wrapper import dvc_get_config -from typing import Union, List +from cmflib.cmf_exception_handling import ( + PipelineNotFound, + FileNotFound, + ArtifactNotFound, + DuplicateArgumentNotAllowed, + MissingArgument, + MsgSuccess +) class CmdArtifactsList(CmdBase): def convert_to_datetime(self, df: pd.DataFrame, col_name: str) -> pd.DataFrame: @@ -128,50 +135,45 @@ def search_artifact(self, df: pd.DataFrame) -> Union[int, List[int]]: return -1 def run(self): - # Check if 'cmf' is configured. - msg = "'cmf' is not configured.\nExecute 'cmf init' command." - result = dvc_get_config() - if len(result) == 0: - return msg + # default path for mlmd file name + mlmd_file_name = "./mlmd" current_directory = os.getcwd() if not self.args.file_name: # If self.args.file_name is None or an empty list ([]). mlmd_file_name = "./mlmd" # Default path for mlmd file name. elif len(self.args.file_name) > 1: # If the user provided more than one file name. - return "Error: You can only provide one file name using the -f flag." + raise DuplicateArgumentNotAllowed("file_name", "-f") elif not self.args.file_name[0]: # self.args.file_name[0] is an empty string (""). - return "Error: Missing File name" + raise MissingArgument("file name") else: mlmd_file_name = self.args.file_name[0].strip() if mlmd_file_name == "mlmd": mlmd_file_name = "./mlmd" - current_directory = os.path.dirname(mlmd_file_name) if not os.path.exists(mlmd_file_name): - return f"Error: {mlmd_file_name} doesn't exists in {current_directory} directory." - + raise FileNotFound(mlmd_file_name, current_directory) # Creating cmfquery object. query = cmfquery.CmfQuery(mlmd_file_name) # Check if pipeline exists in mlmd. if self.args.pipeline_name is not None and len(self.args.pipeline_name) > 1: - return "Error: You can only provide one pipeline name using the -p flag." + raise DuplicateArgumentNotAllowed("pipeline_name", "-p") elif not self.args.pipeline_name[0]: # self.args.pipeline_name[0] is an empty string (""). - return "Error: Missing pipeline name" + raise MissingArgument("pipeline name") else: pipeline_name = self.args.pipeline_name[0] df = query.get_all_artifacts_by_context(pipeline_name) if df.empty: - return "Pipeline name doesn't exists..." + raise PipelineNotFound(pipeline_name) else: if not self.args.artifact_name: # If self.args.artifact_name is None or an empty list ([]). pass elif len(self.args.artifact_name) > 1: # If the user provided more than one artifact_name. - return "Error: You can only provide one artifact name using the -a flag." + raise DuplicateArgumentNotAllowed("artifact_name", "-a") elif not self.args.artifact_name[0]: # self.args.artifact_name[0] is an empty string (""). - return "Error: Missing artifact name" + raise MissingArgument("artifact name") else: artifact_ids = self.search_artifact(df) if(artifact_ids != -1): @@ -219,14 +221,14 @@ def run(self): user_input = input("Press Enter to see more records if exists or 'q' to quit: ").strip().lower() if user_input == 'q': break - return "End of records.." + return MsgSuccess(msg_str = "End of records..") else: - return "Artifact name does not exist.." + raise ArtifactNotFound(self.args.artifact_name) df = self.convert_to_datetime(df, "create_time_since_epoch") self.display_table(df) - return "Done." + return MsgSuccess(msg_str = "Done.") def add_parser(subparsers, parent_parser): diff --git a/cmflib/commands/artifact/pull.py b/cmflib/commands/artifact/pull.py index 090809e7..f4e228e4 100644 --- a/cmflib/commands/artifact/pull.py +++ b/cmflib/commands/artifact/pull.py @@ -28,7 +28,21 @@ ) from cmflib.cli.command import CmdBase from cmflib.utils.dvc_config import DvcConfig - +from cmflib.cmf_exception_handling import ( + PipelineNotFound, + FileNotFound, + MissingArgument, + ExecutionsNotFound, + ArtifactNotFound, + BatchDownloadFailure, + BatchDownloadSuccess, + ObjectDownloadFailure, + ObjectDownloadSuccess, + DuplicateArgumentNotAllowed, + MsgSuccess, + MsgFailure +) +from cmflib.cli.utils import check_minio_server class CmdArtifactPull(CmdBase): @@ -170,15 +184,15 @@ def run(self): if mlmd_file_name == "mlmd": mlmd_file_name = "./mlmd" current_directory = os.path.dirname(mlmd_file_name) - if not os.path.exists(mlmd_file_name): - return f"ERROR: {mlmd_file_name} doesn't exists in {current_directory} directory." + if not os.path.exists(mlmd_file_name): #checking if MLMD files exists + raise FileNotFound(mlmd_file_name, current_directory) query = cmfquery.CmfQuery(mlmd_file_name) - + if not query.get_pipeline_id(self.args.pipeline_name) > 0: #checking if pipeline name exists in mlmd + raise PipelineNotFound(self.args.pipeline_name) # getting all pipeline stages[i.e Prepare, Featurize, Train and Evaluate] stages = query.get_pipeline_stages(self.args.pipeline_name) executions = [] identifiers = [] - for stage in stages: # getting all executions for stages executions = query.get_all_executions_in_stage(stage) @@ -191,120 +205,308 @@ def run(self): identifiers.append(id) else: print("No Executions found for " + stage + " stage.") - # created dictionary name_url_dict = {} if len(identifiers) == 0: # check if there are no executions - return "No executions found." + raise ExecutionsNotFound() for identifier in identifiers: get_artifacts = query.get_all_artifacts_for_execution( identifier ) # getting all artifacts with id temp_dict = dict(zip(get_artifacts['name'], get_artifacts['url'])) # getting dictionary of name and url pair name_url_dict.update(temp_dict) # updating name_url_dict with temp_dict - #print(name_url_dict) # name_url_dict = ('artifacts/parsed/test.tsv:6f597d341ceb7d8fbbe88859a892ef81', 'Test-env:/home/sharvark/local-storage/6f/597d341ceb7d8fbbe88859a892ef81' # name_url_dict = ('artifacts/parsed/test.tsv:6f597d341ceb7d8fbbe88859a892ef81', 'Test-env:/home/sharvark/local-storage/6f/597d341ceb7d8fbbe88859a892ef81,Second-env:/home/sharvark/local-storage/6f/597d341ceb7d8fbbe88859a892ef81') - output = DvcConfig.get_dvc_config() # pulling dvc config if type(output) is not dict: return output + """ + There are multiple scenarios for cmf artifact pull + Code checks if self.args.artifact_name is provided by user or not + under these conditions there are two more conditions + 1. if file is not .dir (single file) + Download single file + 2. else file is .dir (directory) + download all files from directory + + """ dvc_config_op = output - if dvc_config_op["core.remote"] == "minio": - minio_class_obj = minio_artifacts.MinioArtifacts() - if self.args.artifact_name: + minio_class_obj = minio_artifacts.MinioArtifacts(dvc_config_op) + # Check if a specific artifact name is provided as input. + if self.args.artifact_name: + # Search for the artifact in the metadata store. output = self.search_artifact(name_url_dict) - # output[0] = name + # output[0] = artifact_name # output[1] = url # output[2] = hash if output is None: - print(f"{self.args.artifact_name} doesn't exist.") + raise ArtifactNotFound(self.args.artifact_name) else: + # Extract repository arguments specific to MinIO. minio_args = self.extract_repo_args("minio", output[0], output[1], current_directory) - stmt = minio_class_obj.download_artifacts( - dvc_config_op, - current_directory, - minio_args[0], # bucket_name - minio_args[1], # object_name - minio_args[2], # path_name - ) - print(stmt) + + # Check if the object name doesn't end with `.dir` (indicating it's a file). + if not minio_args[1].endswith(".dir"): + # Download a single file from MinIO. + object_name, download_loc, download_flag = minio_class_obj.download_file( + current_directory, + minio_args[0], # bucket_name + minio_args[1], # object_name + minio_args[2], # path_name + ) + if download_flag: + # Return success if the file is downloaded successfully. + return ObjectDownloadSuccess(object_name, download_loc) + else: + return ObjectDownloadFailure(object_name) + else: + # If object name ends with `.dir`, download multiple files from a directory + # return total_files_in_directory, files_downloaded + total_files_in_directory, dir_files_downloaded, download_flag = minio_class_obj.download_directory( + current_directory, + minio_args[0], # bucket_name + minio_args[1], # object_name + minio_args[2], # path_name + ) + + if download_flag: + # Return success if all files in the directory are downloaded. + return BatchDownloadSuccess(dir_files_downloaded) + else: + # Calculate the number of files that failed to download. + file_failed_to_download = total_files_in_directory - dir_files_downloaded + return BatchDownloadFailure(dir_files_downloaded, file_failed_to_download) + else: + # Handle the case where no specific artifact name is provided. + files_downloaded = 0 + files_failed_to_download = 0 + + # Iterate through the dictionary of artifact names and URLs. for name, url in name_url_dict.items(): - if not isinstance(url, str): + if not isinstance(url, str): ## Skip invalid URLs. continue minio_args = self.extract_repo_args("minio", name, url, current_directory) - stmt = minio_class_obj.download_artifacts( - dvc_config_op, - current_directory, - minio_args[0], # bucket_name - minio_args[1], # object_name - minio_args[2], # path_name - ) - print(stmt) - return "Done" + + # Check if the object name doesn't end with `.dir` (indicating it's a file). + if not minio_args[1].endswith(".dir"): + # Download a single file from MinIO. + object_name, download_loc, download_flag = minio_class_obj.download_file( + current_directory, + minio_args[0], # bucket_name + minio_args[1], # object_name + minio_args[2], # path_name + ) + + # print output here because we are in a loop and can't return the control + if download_flag: + print(f"object {object_name} downloaded at {download_loc}.") + files_downloaded += 1 + else: + print(f"object {object_name} is not downloaded.") + files_failed_to_download += 1 + else: + # If object name ends with `.dir`, download multiple files from a directory. + total_files_in_directory, dir_files_downloaded, download_flag = minio_class_obj.download_directory( + current_directory, + minio_args[0], # bucket_name + minio_args[1], # object_name + minio_args[2], # path_name + ) + # Return success if all files in the directory are downloaded. + if download_flag: + files_downloaded += dir_files_downloaded + else: + files_downloaded += dir_files_downloaded + files_failed_to_download += (total_files_in_directory - dir_files_downloaded) + + # we are assuming, if files_failed_to_download > 0, it means our download of artifacts is not success + if not files_failed_to_download: + return BatchDownloadSuccess(files_downloaded) + else: + return BatchDownloadFailure(files_downloaded, files_failed_to_download) + elif dvc_config_op["core.remote"] == "local-storage": - local_class_obj = local_artifacts.LocalArtifacts() + local_class_obj = local_artifacts.LocalArtifacts(dvc_config_op) + # There are two main conditions + # Condition 1 - user can use -a paramter for cmf artifact pull command + # -a can be a dir or a file + # Condition 2 - user can chose to download all the artifacts in one go. + # we can have both dir and files in our list of artifacts + # Check if a specific artifact name is provided as input. if self.args.artifact_name: + # Search for the artifact in the metadata store. output = self.search_artifact(name_url_dict) # output[0] = name # output[1] = url + if output is None: - print(f"{self.args.artifact_name} doesn't exist.") + raise ArtifactNotFound(self.args.artifact_name) else: + # Extract repository arguments specific to Local repo. local_args = self.extract_repo_args("local", output[0], output[1], current_directory) - stmt = local_class_obj.download_artifacts( - dvc_config_op, current_directory, local_args[0], local_args[1] - ) - print(stmt) + # local_args [0] = current_dvc_loc + # local_args [1] = download_loc + # Check if the object name doesn't end with `.dir` (indicating it's a file). + if not local_args[0].endswith(".dir"): + # Download a single file from Local. + object_name, download_loc, download_flag = local_class_obj.download_file(current_directory, local_args[0], local_args[1]) + if download_flag: + # Return success if the file is downloaded successfully. + return ObjectDownloadSuccess(object_name, download_loc) + else: + return ObjectDownloadFailure(object_name) + + else: + # If object name ends with `.dir`, download multiple files from a directory + # return total_files_in_directory, files_downloaded + total_files_in_directory, dir_files_downloaded, download_flag = local_class_obj.download_directory(current_directory, local_args[0], local_args[1]) + + if download_flag: + # Return success if all files in the directory are downloaded. + return BatchDownloadSuccess(dir_files_downloaded) + else: + # Calculate the number of files that failed to download. + file_failed_to_download = total_files_in_directory - dir_files_downloaded + return BatchDownloadFailure(dir_files_downloaded, file_failed_to_download) else: + # Handle the case where no specific artifact name is provided. + files_downloaded = 0 + files_failed_to_download = 0 + # Iterate through the dictionary of artifact names and URLs. for name, url in name_url_dict.items(): - #print(name, url) if not isinstance(url, str): continue + # name1 - file + # name2 - failed file + # name3 - dir (5 files) + # name4 - dir (4 files) - failed dir - 2 files passed, 2 files failed + # name5 - file + # name6 - dir - and can't open it (but it has 2 files) .. user don't know local_args = self.extract_repo_args("local", name, url, current_directory) - # local_args[0] = current dvc location - # local_args[1] = current download location - stmt = local_class_obj.download_artifacts( - dvc_config_op, current_directory, local_args[0], local_args[1] - ) - print(stmt) - return "Done" + # local_args [0] = current_dvc_loc + # local_args [1] = download_loc + # Check if the object name doesn't end with `.dir` (indicating it's a file). + if not local_args[0].endswith(".dir"): + # Download a single file from Local repo. + object_name, download_loc, download_flag = local_class_obj.download_file( + current_directory, local_args[0], local_args[1]) + + # print output here because we are in a loop and can't return the control + if download_flag: + print(f"object {object_name} downloaded at {download_loc}.") + files_downloaded += 1 + else: + print(f"object {object_name} is not downloaded.") + files_failed_to_download += 1 + else: + # If object name ends with `.dir`, download multiple files from a directory. + total_files_in_directory, dir_files_downloaded, download_flag = local_class_obj.download_directory( + current_directory, local_args[0], local_args[1]) + # download_flag is true only when all the files from the directory are successfully downlaoded. + if download_flag: + files_downloaded += dir_files_downloaded + else: + files_downloaded += dir_files_downloaded + files_failed_to_download += (total_files_in_directory - dir_files_downloaded) + + # we are assuming, if files_failed_to_download > 0, it means our download of artifacts is not success + if not files_failed_to_download: + return BatchDownloadSuccess(files_downloaded) + else: + return BatchDownloadFailure( + files_downloaded, files_failed_to_download) + elif dvc_config_op["core.remote"] == "ssh-storage": - sshremote_class_obj = sshremote_artifacts.SSHremoteArtifacts() + sshremote_class_obj = sshremote_artifacts.SSHremoteArtifacts(dvc_config_op) + # Check if a specific artifact name is provided as input. if self.args.artifact_name: + # Search for the artifact in the metadata store. output = self.search_artifact(name_url_dict) # output[0] = name # output[1] = url if output is None: - print(f"{self.args.artifact_name} doesn't exist.") + raise ArtifactNotFound(self.args.artifact_name) else: + # Extract repository arguments specific to ssh-remote. args = self.extract_repo_args("ssh", output[0], output[1], current_directory) - stmt = sshremote_class_obj.download_artifacts( - dvc_config_op, - args[0], # host, - current_directory, - args[1], # remote_loc of the artifact - args[2] # name - ) - print(stmt) + # Check if the object name doesn't end with `.dir` (indicating it's a file). + if not args[1].endswith(".dir"): + # Download a single file from ssh-remote. + object_name, download_loc, download_flag = sshremote_class_obj.download_file( + args[0], # host, + current_directory, + args[1], # remote_loc of the artifact + args[2] # name + ) + if download_flag: + # Return success if the file is downloaded successfully. + return ObjectDownloadSuccess(object_name, download_loc) + else: + return ObjectDownloadFailure(object_name) + + else: + # If object name ends with `.dir`, download multiple files from a directory + # return total_files_in_directory, files_downloaded + total_files_in_directory, dir_files_downloaded, download_flag = sshremote_class_obj.download_directory( + args[0], # host, + current_directory, + args[1], # remote_loc of the artifact + args[2] # name + ) + if download_flag: + # Return success if all files in the directory are downloaded. + return BatchDownloadSuccess(dir_files_downloaded) + else: + # Calculate the number of files that failed to download. + file_failed_to_download = total_files_in_directory - dir_files_downloaded + return BatchDownloadFailure(dir_files_downloaded, file_failed_to_download) else: + # Handle the case where no specific artifact name is provided. + files_downloaded = 0 + files_failed_to_download = 0 + # Iterate through the dictionary of artifact names and URLs. for name, url in name_url_dict.items(): - #print(name, url) if not isinstance(url, str): continue args = self.extract_repo_args("ssh", name, url, current_directory) - stmt = sshremote_class_obj.download_artifacts( - dvc_config_op, + # Check if the object name doesn't end with `.dir` (indicating it's a file). + if not args[1].endswith(".dir"): + # Download a single file from ssh-remote. + object_name, download_loc, download_flag = sshremote_class_obj.download_file( args[0], # host, current_directory, args[1], # remote_loc of the artifact args[2] # name ) - print(stmt) - return "Done" + # print output here because we are in a loop and can't return the control + if download_flag: + print(f"object {object_name} downloaded at {download_loc}.") + files_downloaded += 1 + else: + print(f"object {object_name} is not downloaded.") + files_failed_to_download += 1 + else: + # If object name ends with `.dir`, download multiple files from a directory. + total_files_in_directory, dir_files_downloaded, download_flag = sshremote_class_obj.download_directory( + args[0], # host, + current_directory, + args[1], # remote_loc of the artifact + args[2] # name + ) + if download_flag: + files_downloaded += dir_files_downloaded + else: + files_downloaded += dir_files_downloaded + files_failed_to_download += (total_files_in_directory - dir_files_downloaded) + + # we are assuming, if files_failed_to_download > 0, it means our download of artifacts is not success + if not files_failed_to_download: + return BatchDownloadSuccess(files_downloaded) + else: + return BatchDownloadFailure(files_downloaded, files_failed_to_download) elif dvc_config_op["core.remote"] == "osdf": #Regenerate Token for OSDF from cmflib.utils.helper_functions import generate_osdf_token @@ -334,11 +536,10 @@ def run(self): # output[1] = url # output[3]=artifact_hash if output is None: - print(f"{self.args.artifact_name} doesn't exist.") + raise ArtifactNotFound(self.args.artifact_name) else: args = self.extract_repo_args("osdf", output[0], output[1], current_directory) - #print(f"Hash for the artifact {self.args.artifact_name} is {output[3]}") - stmt = osdfremote_class_obj.download_artifacts( + download_flag, message = osdfremote_class_obj.download_artifacts( dvc_config_op, args[0], # s_url of the artifact cache_path, @@ -347,16 +548,23 @@ def run(self): args[2], # name of the artifact output[3] #Artifact Hash ) - print(stmt) + + if download_flag : + status = MsgSuccess(msg_str = message) + else: + status = MsgFailure(msg_str = message) + return status else: for name, url in name_url_dict.items(): + total_files_count += 1 #print(name, url) if not isinstance(url, str): continue artifact_hash = name.split(':')[1] #Extract Hash of the artifact from name #print(f"Hash for the artifact {name} is {artifact_hash}") args = self.extract_repo_args("osdf", name, url, current_directory) - stmt = osdfremote_class_obj.download_artifacts( + + download_flag, message = osdfremote_class_obj.download_artifacts( dvc_config_op, args[0], # host, cache_path, @@ -365,43 +573,93 @@ def run(self): args[2], # name artifact_hash #Artifact Hash ) - print(stmt) - return "Done" + if download_flag: + print(message) #### success message + file_downloaded +=1 + else: + print(message) ### failure message + Files_failed_to_download = total_files_count - files_downloaded + if Files_failed_to_download == 0: + status = BatchDownloadSuccess(files_downloaded=files_downloaded) + else: + status = BatchDownloadFailure(files_downloaded=files_downloaded, Files_failed_to_download= Files_failed_to_download) + return status + elif dvc_config_op["core.remote"] == "amazons3": - amazonS3_class_obj = amazonS3_artifacts.AmazonS3Artifacts() - #print(self.args.artifact_name,"artifact name") + amazonS3_class_obj = amazonS3_artifacts.AmazonS3Artifacts(dvc_config_op) if self.args.artifact_name: output = self.search_artifact(name_url_dict) # output[0] = name # output[1] = url if output is None: - print(f"{self.args.artifact_name} doesn't exist.") + raise ArtifactNotFound(self.args.artifact_name) else: args = self.extract_repo_args("amazons3", output[0], output[1], current_directory) if args[0] and args[1] and args[2]: - stmt = amazonS3_class_obj.download_artifacts( - dvc_config_op, - current_directory, - args[0], # bucket_name - args[1], # object_name - args[2], # download_loc - ) - print(stmt) + if not args[1].endswith(".dir"): + object_name, download_loc, download_flag = amazonS3_class_obj.download_file( + current_directory, + args[0], # bucket_name + args[1], # object_name + args[2], # download_loc + ) + if download_flag: + return ObjectDownloadSuccess(object_name, download_loc) + else: + return ObjectDownloadFailure(object_name) + else: + total_files_in_directory, dir_files_downloaded, download_flag = amazonS3_class_obj.download_directory(current_directory, + args[0], # bucket_name + args[1], # object_name + args[2], # download_loc + ) + if download_flag: + return BatchDownloadSuccess(dir_files_downloaded) + else: + file_failed_to_download = total_files_in_directory - dir_files_downloaded + return BatchDownloadFailure(dir_files_downloaded, file_failed_to_download) + + else: + files_downloaded = 0 + files_failed_to_download = 0 for name, url in name_url_dict.items(): if not isinstance(url, str): continue args = self.extract_repo_args("amazons3", name, url, current_directory) if args[0] and args[1] and args[2]: - stmt = amazonS3_class_obj.download_artifacts( - dvc_config_op, + if not args[1].endswith(".dir"): + object_name, download_loc, download_flag = amazonS3_class_obj.download_file( + current_directory, + args[0], # bucket_name + args[1], # object_name + args[2], # download_loc + ) + if download_flag: + print(f"object {object_name} downloaded at {download_loc}.") + files_downloaded += 1 + else: + print(f"object {object_name} is not downloaded.") + files_failed_to_download += 1 + else: + total_files_in_directory, dir_files_downloaded, download_flag = amazonS3_class_obj.download_directory( current_directory, args[0], # bucket_name args[1], # object_name - args[2], # download_loc + args[2], # path_name ) - print(stmt) - return "Done" + # download_flag is true only when all the files from the directory are successfully downlaoded. + if download_flag: + files_downloaded += dir_files_downloaded + else: + files_downloaded += dir_files_downloaded + files_failed_to_download += (total_files_in_directory - dir_files_downloaded) + + # we are assuming, if files_failed_to_download > 0, it means our download of artifacts is not success + if not files_failed_to_download: + return BatchDownloadSuccess(files_downloaded) + else: + return BatchDownloadFailure(files_downloaded, files_failed_to_download) else: remote = dvc_config_op["core.remote"] msg = f"{remote} is not valid artifact repository for CMF.\n Reinitialize CMF." diff --git a/cmflib/commands/artifact/push.py b/cmflib/commands/artifact/push.py index 7179c78c..9360bd77 100644 --- a/cmflib/commands/artifact/push.py +++ b/cmflib/commands/artifact/push.py @@ -23,23 +23,34 @@ from cmflib.cli.command import CmdBase from cmflib.cli.utils import check_minio_server from cmflib.utils.helper_functions import generate_osdf_token -from cmflib.utils.helper_functions import is_url from cmflib.utils.dvc_config import DvcConfig from cmflib.dvc_wrapper import dvc_push from cmflib.dvc_wrapper import dvc_add_attribute +from cmflib.cli.utils import find_root from cmflib.utils.cmf_config import CmfConfig +from cmflib.cmf_exception_handling import PipelineNotFound, Minios3ServerInactive, FileNotFound, ExecutionsNotFound, CmfNotConfigured, ArtifactPushSuccess, MissingArgument, DuplicateArgumentNotAllowed class CmdArtifactPush(CmdBase): def run(self): result = "" dvc_config_op = DvcConfig.get_dvc_config() cmf_config_file = os.environ.get("CONFIG_FILE", ".cmfconfig") - cmf_config={} - cmf_config=CmfConfig.read_config(cmf_config_file) + + # find root_dir of .cmfconfig + output = find_root(cmf_config_file) + + # in case, there is no .cmfconfig file + if output.find("'cmf' is not configured.") != -1: + raise CmfNotConfigured(output) + + out_msg = check_minio_server(dvc_config_op) if dvc_config_op["core.remote"] == "minio" and out_msg != "SUCCESS": - return "MinioS3 server failed to start!!!" + raise Minios3ServerInactive() if dvc_config_op["core.remote"] == "osdf": + config_file_path = os.path.join(output, cmf_config_file) + cmf_config={} + cmf_config=CmfConfig.read_config(config_file_path) #print("key_id="+cmf_config["osdf-key_id"]) dynamic_password = generate_osdf_token(cmf_config["osdf-key_id"],cmf_config["osdf-key_path"],cmf_config["osdf-key_issuer"]) #print("Dynamic Password"+dynamic_password) @@ -49,26 +60,25 @@ def run(self): #print(result) return result - current_directory = os.getcwd() # Default path of mlmd file mlmd_file_name = "./mlmd" + current_directory = os.getcwd() if self.args.file_name: mlmd_file_name = self.args.file_name if mlmd_file_name == "mlmd": mlmd_file_name = "./mlmd" current_directory = os.path.dirname(mlmd_file_name) if not os.path.exists(mlmd_file_name): - return f"ERROR: {mlmd_file_name} doesn't exists in {current_directory} directory." - + raise FileNotFound(mlmd_file_name, current_directory) # creating cmfquery object query = cmfquery.CmfQuery(mlmd_file_name) - + # Put a check to see whether pipline exists or not pipeline_name = self.args.pipeline_name if not query.get_pipeline_id(pipeline_name) > 0: - return f"ERROR: Pipeline {pipeline_name} doesn't exist!!" + raise PipelineNotFound(pipeline_name) - stages = query.get_pipeline_stages(self.args.pipeline_name) + stages = query.get_pipeline_stages(pipeline_name) executions = [] identifiers = [] @@ -86,7 +96,7 @@ def run(self): names = [] if len(identifiers) == 0: # check if there are no executions - return "No executions found." + raise ExecutionsNotFound() for identifier in identifiers: artifacts = query.get_all_artifacts_for_execution( identifier @@ -113,7 +123,7 @@ def run(self): pass #print("file_set = ", final_list) result = dvc_push(list(final_list)) - return result + return ArtifactPushSuccess(result) def add_parser(subparsers, parent_parser): HELP = "Push artifacts to the user configured artifact repo." diff --git a/cmflib/commands/execution/list.py b/cmflib/commands/execution/list.py index 5816d108..4771c777 100644 --- a/cmflib/commands/execution/list.py +++ b/cmflib/commands/execution/list.py @@ -22,7 +22,14 @@ from cmflib.cli.command import CmdBase from cmflib import cmfquery from tabulate import tabulate -from cmflib.dvc_wrapper import dvc_get_config +from cmflib.cmf_exception_handling import ( + PipelineNotFound, + FileNotFound, + DuplicateArgumentNotAllowed, + MissingArgument, + MsgSuccess, + ExecutionsNotFound +) class CmdExecutionList(CmdBase): @@ -74,19 +81,13 @@ def display_table(self, df: pd.DataFrame) -> None: start_index = end_index def run(self): - # Check if 'cmf' is configured - msg = "'cmf' is not configured.\nExecute 'cmf init' command." - result = dvc_get_config() - if len(result) == 0: - return msg - current_directory = os.getcwd() if not self.args.file_name: # If self.args.file_name is None or an empty list ([]). mlmd_file_name = "./mlmd" # Default path for mlmd file name. elif len(self.args.file_name) > 1: # If the user provided more than one file name. - return "Error: You can only provide one file name using the -f flag." + raise DuplicateArgumentNotAllowed("file_name", "-f") elif not self.args.file_name[0]: # self.args.file_name[0] is an empty string (""). - return "Error: Missing File name" + raise MissingArgument("file name") else: mlmd_file_name = self.args.file_name[0].strip() if mlmd_file_name == "mlmd": @@ -94,16 +95,16 @@ def run(self): current_directory = os.path.dirname(mlmd_file_name) if not os.path.exists(mlmd_file_name): - return f"Error: {mlmd_file_name} doesn't exists in {current_directory} directory." + raise FileNotFound(mlmd_file_name, current_directory) # Creating cmfquery object. query = cmfquery.CmfQuery(mlmd_file_name) # Check if pipeline exists in mlmd. if self.args.pipeline_name is not None and len(self.args.pipeline_name) > 1: - return "Error: You can only provide one pipeline name using the -p flag." + raise DuplicateArgumentNotAllowed("pipeline_name", "-p") elif not self.args.pipeline_name[0]: # self.args.pipeline_name[0] is an empty string (""). - return "Error: Missing pipeline name" + raise MissingArgument("pipeline name") else: pipeline_name = self.args.pipeline_name[0] @@ -111,7 +112,7 @@ def run(self): # Check if the DataFrame is empty, indicating the pipeline name does not exist. if df.empty: - return "Pipeline does not exist.." + raise PipelineNotFound(pipeline_name) else: # Drop the 'Python_Env' column if it exists in the DataFrame. if "Python_Env" in df.columns: @@ -121,9 +122,9 @@ def run(self): if not self.args.execution_id: # If self.args.execution_id is None or an empty list ([]). pass elif len(self.args.execution_id) > 1: # If the user provided more than one execution_id. - return "Error: You can only provide one execution id using the -e flag." + raise DuplicateArgumentNotAllowed("execution_id", "-e") elif not self.args.execution_id[0]: # self.args.execution_id[0] is an empty string (""). - return "Error: Missing execution id" + raise MissingArgument("execution id") else: if self.args.execution_id[0].isdigit(): if int(self.args.execution_id[0]) in list(df['id']): # Converting series to list. @@ -157,11 +158,11 @@ def run(self): ) print(table) print() - return "Done" - return "Execution id does not exist.." + return MsgSuccess(msg_str = "Done.") + raise ExecutionsNotFound(self.args.execution_id[0]) self.display_table(df) - return "Done" + return MsgSuccess(msg_str = "Done.") def add_parser(subparsers, parent_parser): diff --git a/cmflib/commands/init/amazonS3.py b/cmflib/commands/init/amazonS3.py index cdfc3826..bc5f96c4 100644 --- a/cmflib/commands/init/amazonS3.py +++ b/cmflib/commands/init/amazonS3.py @@ -30,6 +30,7 @@ ) from cmflib.utils.cmf_config import CmfConfig from cmflib.utils.helper_functions import is_git_repo +from cmflib.cmf_exception_handling import Neo4jArgumentNotProvided, CmfInitComplete, CmfInitFailed class CmdInitAmazonS3(CmdBase): def run(self): @@ -62,7 +63,7 @@ def run(self): ): pass else: - return "ERROR: Provide user, password and uri for neo4j initialization." + raise Neo4jArgumentNotProvided output = is_git_repo() if not output: @@ -79,12 +80,13 @@ def run(self): repo_type = "amazons3" output = dvc_add_remote_repo(repo_type, self.args.url) if not output: - return "cmf init failed." + raise CmfInitFailed print(output) dvc_add_attribute(repo_type, "access_key_id", self.args.access_key_id) dvc_add_attribute(repo_type, "secret_access_key", self.args.secret_key) dvc_add_attribute(repo_type, "session_token", self.args.session_token) - return "cmf init complete." + status = CmfInitComplete() + return status def add_parser(subparsers, parent_parser): diff --git a/cmflib/commands/init/local.py b/cmflib/commands/init/local.py index 741ccc19..6eeb64a7 100644 --- a/cmflib/commands/init/local.py +++ b/cmflib/commands/init/local.py @@ -17,7 +17,7 @@ #!/usr/bin/env python3 import argparse import os - +from cmflib.cmf_exception_handling import Neo4jArgumentNotProvided, CmfInitComplete, CmfInitFailed from cmflib.cli.command import CmdBase from cmflib.dvc_wrapper import ( git_quiet_init, @@ -61,7 +61,7 @@ def run(self): ): pass else: - return "ERROR: Provide user, password and uri for neo4j initialization." + raise Neo4jArgumentNotProvided output = is_git_repo() @@ -79,9 +79,10 @@ def run(self): repo_type = "local-storage" output = dvc_add_remote_repo(repo_type, self.args.path) if not output: - return "cmf init failed." + raise CmfInitFailed print(output) - return "cmf init complete." + status = CmfInitComplete() + return status def add_parser(subparsers, parent_parser): diff --git a/cmflib/commands/init/minioS3.py b/cmflib/commands/init/minioS3.py index 345484a0..38f0bfbf 100644 --- a/cmflib/commands/init/minioS3.py +++ b/cmflib/commands/init/minioS3.py @@ -30,7 +30,7 @@ ) from cmflib.utils.cmf_config import CmfConfig from cmflib.utils.helper_functions import is_git_repo - +from cmflib.cmf_exception_handling import Neo4jArgumentNotProvided, CmfInitComplete, CmfInitFailed class CmdInitMinioS3(CmdBase): def run(self): @@ -63,7 +63,7 @@ def run(self): ): pass else: - return "ERROR: Provide user, password and uri for neo4j initialization." + raise Neo4jArgumentNotProvided output = is_git_repo() if not output: branch_name = "master" @@ -79,12 +79,14 @@ def run(self): repo_type = "minio" output = dvc_add_remote_repo(repo_type, self.args.url) if not output: - return "cmf init failed." + raise CmfInitFailed print(output) dvc_add_attribute(repo_type, "endpointurl", self.args.endpoint_url) dvc_add_attribute(repo_type, "access_key_id", self.args.access_key_id) dvc_add_attribute(repo_type, "secret_access_key", self.args.secret_key) - return "cmf init complete." + status = CmfInitComplete() + return status + def add_parser(subparsers, parent_parser): diff --git a/cmflib/commands/init/osdfremote.py b/cmflib/commands/init/osdfremote.py index 39325ac7..0b2a0192 100644 --- a/cmflib/commands/init/osdfremote.py +++ b/cmflib/commands/init/osdfremote.py @@ -18,7 +18,7 @@ #!/usr/bin/env python3 import argparse import os - +from cmflib.cmf_exception_handling import CmfInitComplete, CmfInitFailed, Neo4jArgumentNotProvided from cmflib.cli.command import CmdBase from cmflib.dvc_wrapper import ( git_quiet_init, @@ -64,7 +64,7 @@ def run(self): ): pass else: - return "ERROR: Provide user, password and uri for neo4j initialization." + raise Neo4jArgumentNotProvided output = is_git_repo() if not output: branch_name = "master" @@ -80,7 +80,7 @@ def run(self): dvc_quiet_init() output = dvc_add_remote_repo(repo_type, self.args.path) if not output: - return "cmf init failed." + raise CmfInitFailed print(output) #dvc_add_attribute(repo_type, "key_id", self.args.key_id) #dvc_add_attribute(repo_type, "key_path", self.args.key_path) @@ -105,7 +105,7 @@ def run(self): attr_dict["key_issuer"] = self.args.key_issuer CmfConfig.write_config(cmf_config, "osdf", attr_dict, True) - return "cmf init complete." + return CmfInitComplete def add_parser(subparsers, parent_parser): diff --git a/cmflib/commands/init/show.py b/cmflib/commands/init/show.py index fa6e7d84..07e01081 100644 --- a/cmflib/commands/init/show.py +++ b/cmflib/commands/init/show.py @@ -22,6 +22,7 @@ from cmflib.cli.utils import find_root from cmflib.dvc_wrapper import dvc_get_config from cmflib.utils.cmf_config import CmfConfig +from cmflib.cmf_exception_handling import CmfNotConfigured, CmfInitShow class CmdInitShow(CmdBase): def run(self): @@ -29,11 +30,11 @@ def run(self): msg = "'cmf' is not configured.\nExecute 'cmf init' command." result = dvc_get_config() if len(result) == 0: - return msg + return CmfNotConfigured(msg) else: cmf_config_root = find_root(cmfconfig) if cmf_config_root.find("'cmf' is not configured") != -1: - return msg + return CmfNotConfigured(msg) config_file_path = os.path.join(cmf_config_root, cmfconfig) attr_dict = CmfConfig.read_config(config_file_path) attr_list = [] @@ -41,7 +42,7 @@ def run(self): temp_str = f"{key} = {value}" attr_list.append(temp_str) attr_str = "\n".join(attr_list) - return f"{result}\n{attr_str}" + return CmfInitShow(result,attr_str) def add_parser(subparsers, parent_parser): diff --git a/cmflib/commands/init/sshremote.py b/cmflib/commands/init/sshremote.py index 245fe2f2..ca1636fb 100644 --- a/cmflib/commands/init/sshremote.py +++ b/cmflib/commands/init/sshremote.py @@ -31,6 +31,7 @@ ) from cmflib.utils.cmf_config import CmfConfig from cmflib.utils.helper_functions import is_git_repo +from cmflib.cmf_exception_handling import Neo4jArgumentNotProvided, CmfInitComplete, CmfInitFailed class CmdInitSSHRemote(CmdBase): def run(self): @@ -63,7 +64,7 @@ def run(self): ): pass else: - return "ERROR: Provide user, password and uri for neo4j initialization." + raise Neo4jArgumentNotProvided output = is_git_repo() if not output: branch_name = "master" @@ -79,12 +80,14 @@ def run(self): dvc_quiet_init() output = dvc_add_remote_repo(repo_type, self.args.path) if not output: - return "cmf init failed." + raise CmfInitFailed print(output) dvc_add_attribute(repo_type, "user", self.args.user) dvc_add_attribute(repo_type, "password", self.args.password) dvc_add_attribute(repo_type, "port", self.args.port) - return "cmf init complete." + status = CmfInitComplete() + return status + def add_parser(subparsers, parent_parser): diff --git a/cmflib/commands/metadata/export.py b/cmflib/commands/metadata/export.py index 332febc6..3fdbaf06 100644 --- a/cmflib/commands/metadata/export.py +++ b/cmflib/commands/metadata/export.py @@ -21,7 +21,14 @@ from cmflib import cmfquery from cmflib.cli.command import CmdBase -from cmflib.dvc_wrapper import dvc_get_config +from cmflib.cmf_exception_handling import ( + PipelineNotFound, + FileNotFound, + DuplicateArgumentNotAllowed, + MissingArgument, + NoChangesMadeInfo, + MetadataExportToJson +) # This class export local mlmd data to a json file class CmdMetadataExport(CmdBase): @@ -40,21 +47,15 @@ def create_full_path(self, current_directory: str, json_file_name: str) -> str: return "Provide path with file name." def run(self): - # Check if 'cmf' is configured. - msg = "'cmf' is not configured.\nExecute 'cmf init' command." - result = dvc_get_config() - if len(result) == 0: - return msg - current_directory = os.getcwd() full_path_to_dump = "" if not self.args.file_name: # If self.args.file_name is None or an empty list ([]). mlmd_file_name = "./mlmd" # Default path for mlmd file name. elif len(self.args.file_name) > 1: # If the user provided more than one file name. - return "Error: You can only provide one file name using the -f flag." + raise DuplicateArgumentNotAllowed("file_name", "-f") elif not self.args.file_name[0]: # self.args.file_name[0] is an empty string (""). - return "Error: Missing File name" + raise MissingArgument("file name") else: mlmd_file_name = self.args.file_name[0].strip() # Removing starting and ending whitespaces. if mlmd_file_name == "mlmd": @@ -62,16 +63,16 @@ def run(self): current_directory = os.path.dirname(mlmd_file_name) if not os.path.exists(mlmd_file_name): - return f"Error: {mlmd_file_name} doesn't exists in {current_directory} directory." + raise FileNotFound(mlmd_file_name, current_directory) # Initialising cmfquery class. query = cmfquery.CmfQuery(mlmd_file_name) # Check if pipeline exists in mlmd . if self.args.pipeline_name is not None and len(self.args.pipeline_name) > 1: - return "Error: You can only provide one pipeline name using the -p flag." + raise DuplicateArgumentNotAllowed("pipeline_name", "-p") elif not self.args.pipeline_name[0]: # self.args.pipeline_name[0] is an empty string (""). - return "Error: Missing pipeline name" + raise MissingArgument("pipeline name") else: pipeline_name = self.args.pipeline_name[0] @@ -81,9 +82,9 @@ def run(self): if not self.args.json_file_name: # If self.args.json_file_name is None or an empty list ([]). json_file_name = self.args.json_file_name elif len(self.args.json_file_name) > 1: # If the user provided more than one json file name. - return "Error: You can provide only one json file name using the -j flag." + raise DuplicateArgumentNotAllowed("json file", "-j") elif not self.args.json_file_name[0]: # self.args.json_file_name[0] is an empty string (""). - return "Error: Missing Json file name" + raise MissingArgument("json file") else: json_file_name = self.args.json_file_name[0].strip() @@ -96,7 +97,7 @@ def run(self): if userRespone.lower() == "yes": # Overwrite file. full_path_to_dump = self.create_full_path(current_directory, json_file_name) else: - return "No changes made to the file. Operation aborted." + raise NoChangesMadeInfo() else: full_path_to_dump = self.create_full_path(current_directory, json_file_name) else: @@ -106,7 +107,7 @@ def run(self): if userRespone.lower() == "yes": full_path_to_dump = os.getcwd() + f"/{pipeline_name}.json" else: - return "No changes made to the file. Operation aborted." + raise NoChangesMadeInfo() else: full_path_to_dump = os.getcwd() + f"/{pipeline_name}.json" @@ -116,9 +117,9 @@ def run(self): # Write metadata into json file. with open(full_path_to_dump, 'w') as f: f.write(json.dumps(json.loads(json_payload),indent=2)) - return f"SUCCESS: metadata successfully exported in {full_path_to_dump}." + return MetadataExportToJson(full_path_to_dump) else: - return f"{pipeline_name} doesn't exists in {mlmd_file_name}!!" + raise PipelineNotFound(pipeline_name) diff --git a/cmflib/commands/metadata/pull.py b/cmflib/commands/metadata/pull.py index 7c50cbf7..2ad40736 100644 --- a/cmflib/commands/metadata/pull.py +++ b/cmflib/commands/metadata/pull.py @@ -22,24 +22,30 @@ from cmflib.cli.utils import find_root from cmflib.server_interface import server_interface from cmflib.utils.cmf_config import CmfConfig - +from cmflib.cmf_exception_handling import ( + PipelineNotFound, + CmfNotConfigured, ExecutionIDNotFound, + MlmdNotFoundOnServer, + MlmdFilePullSuccess, + CmfServerNotAvailable, + InternalServerError, + MlmdFilePullFailure, + DirectoryNotfound, + FileNameNotfound +) # This class pulls mlmd file from cmf-server class CmdMetadataPull(CmdBase): def run(self): cmfconfig = os.environ.get("CONFIG_FILE", ".cmfconfig") - # find root_dir of .cmfconfig output = find_root(cmfconfig) - # in case, there is no .cmfconfig file - if output.find("'cmf' is not configured") != -1: - return output - + if output.find("'cmf' is not configured") != -1: + raise CmfNotConfigured(output) config_file_path = os.path.join(output, cmfconfig) attr_dict = CmfConfig.read_config(config_file_path) url = attr_dict.get("cmf-server-ip", "http://127.0.0.1:80") - current_directory = os.getcwd() full_path_to_dump = "" cmd = "pull" @@ -53,9 +59,9 @@ def run(self): if os.path.exists(current_directory): full_path_to_dump = self.args.file_name else: - return f"{current_directory} doesn't exists." + raise DirectoryNotfound(current_dir= current_directory) else: - return "Provide path with file name." + raise FileNameNotfound else: full_path_to_dump = os.getcwd() + "/mlmd" if self.args.execution: @@ -66,29 +72,31 @@ def run(self): status = output.status_code # checks If given pipeline does not exists/ elif pull mlmd file/ else mlmd file is not available if output.content.decode() == None: - return "Pipeline name " + self.args.pipeline_name + " doesn't exist." + raise PipelineNotFound(self.args.pipeline_name) elif output.content.decode() == "no_exec_id": - return f"Error: Execution id {exec_id} is not present in mlmd." + raise ExecutionIDNotFound(exec_id) + elif output.content: - try: - cmf_merger.parse_json_to_mlmd( - output.content, full_path_to_dump, cmd, None - ) # converts mlmd json data to mlmd file - except Exception as e: - return e - # verifying status codes if status == 200: - return f"SUCCESS: {full_path_to_dump} is successfully pulled." + try: + cmf_merger.parse_json_to_mlmd( + output.content, full_path_to_dump, cmd, None + ) # converts mlmd json data to mlmd file + pull_status = MlmdFilePullSuccess(full_path_to_dump) + return pull_status + except Exception as e: + return e + elif status == 413: + raise MlmdNotFoundOnServer + elif status == 406: + raise PipelineNotFound(self.args.pipeline_name) elif status == 404: - return "ERROR: cmf-server is not available." + raise CmfServerNotAvailable elif status == 500: - return "ERROR: Internal server error." + raise InternalServerError else: - return "ERROR: Unable to pull mlmd." - else: - return "mlmd file not available on cmf-server." - - + raise MlmdFilePullFailure + def add_parser(subparsers, parent_parser): PULL_HELP = "Pulls mlmd from cmf-server to users's machine." diff --git a/cmflib/commands/metadata/push.py b/cmflib/commands/metadata/push.py index bd630397..f78088bd 100644 --- a/cmflib/commands/metadata/push.py +++ b/cmflib/commands/metadata/push.py @@ -23,13 +23,39 @@ from cmflib.cli.utils import find_root from cmflib.server_interface import server_interface from cmflib.utils.cmf_config import CmfConfig - +from cmflib.cmf_exception_handling import ( + TensorboardPushSuccess, + TensorboardPushFailure, + MlmdFilePushSuccess, + ExecutionsAlreadyExists, + FileNotFound, + ExecutionIDNotFound, + PipelineNotFound, + UpdateCmfVersion, + CmfServerNotAvailable, + InternalServerError, + CmfNotConfigured, + InvalidTensorboardFilePath +) # This class pushes mlmd file to cmf-server class CmdMetadataPush(CmdBase): def run(self): - current_directory = os.getcwd() - mlmd_file_name = "./mlmd" + # Get url from config + cmfconfig = os.environ.get("CONFIG_FILE",".cmfconfig") + + # find root_dir of .cmfconfig + output = find_root(cmfconfig) + + # in case, there is no .cmfconfig file + if output.find("'cmf' is not configured.") != -1: + raise CmfNotConfigured(output) + config_file_path = os.path.join(output, cmfconfig) + attr_dict = CmfConfig.read_config(config_file_path) + url = attr_dict.get("cmf-server-ip", "http://127.0.0.1:80") + + mlmd_file_name = "./mlmd" + current_directory = os.getcwd() # checks if mlmd filepath is given if self.args.file_name: mlmd_file_name = self.args.file_name @@ -37,32 +63,17 @@ def run(self): # checks if mlmd file is present in current directory or given directory if not os.path.exists(mlmd_file_name): - return f"ERROR: {mlmd_file_name} doesn't exists in the {current_directory}." + raise FileNotFound(mlmd_file_name, current_directory) query = cmfquery.CmfQuery(mlmd_file_name) # print(json.dumps(json.loads(json_payload), indent=4, sort_keys=True)) execution_flag = 0 status_code = 0 - # Get url from config - cmfconfig = os.environ.get("CONFIG_FILE",".cmfconfig") - - # find root_dir of .cmfconfig - output = find_root(cmfconfig) - - # in case, there is no .cmfconfig file - if output.find("'cmf' is not configured") != -1: - return output - - config_file_path = os.path.join(output, cmfconfig) - attr_dict = CmfConfig.read_config(config_file_path) - url = attr_dict.get("cmf-server-ip", "http://127.0.0.1:80") - - print("metadata push started") - print("........................................") - # Checks if pipeline name exists if self.args.pipeline_name in query.get_pipeline_names(): + print("metadata push started") + print("........................................") # converts mlmd file to json format json_payload = query.dumptojson(self.args.pipeline_name, None) # checks if execution_id is given by user @@ -80,25 +91,24 @@ def run(self): ) break if execution_flag == 0: - return "Given execution is not found in mlmd." + raise ExecutionIDNotFound(exec_id) else: exec_id = None response = server_interface.call_mlmd_push(json_payload, url, exec_id, self.args.pipeline_name) status_code = response.status_code - if status_code == 200 and response.json()['status']=="success": - print("mlmd is successfully pushed.") - elif status_code==200 and response.json()["status"]=="exists": - print("Executions already exists.") - elif status_code==422 and response.json()["status"]=="version_update": - return "ERROR: You need to update cmf to the latest version. Unable to push metadata file." - elif status_code == 404: - return "ERROR: cmf-server is not available." - elif status_code == 500: - return "ERROR: Internal server error." - else: - return "ERROR: Status Code = {status_code}. Unable to push mlmd." - - if self.args.tensorboard: + if status_code == 200: + output = "" + display_output = "" + if response.json()['status']=="success": + display_output = "mlmd is successfully pushed." + output = MlmdFilePushSuccess(mlmd_file_name) + if response.json()["status"]=="exists": + display_output = "Executions already exists." + output = ExecutionsAlreadyExists + + if not self.args.tensorboard: + return output + print(display_output) # /tensorboard api call is done only if mlmd push is successfully completed # tensorboard parameter is passed print("......................................") @@ -111,9 +121,11 @@ def run(self): tresponse = server_interface.call_tensorboard(url, self.args.pipeline_name, file_name, self.args.tensorboard) tstatus_code = tresponse.status_code if tstatus_code == 200: - return "tensorboard logs: file {file_name} pushed successfully" + # give status code as success + return TensorboardPushSuccess(file_name) else: - return "ERROR: Failed to upload file {file_name}. Server response: {response.text}" + # give status code as failure + return TensorboardPushFailure(file_name,tresponse.text) # If path provided is a directory elif os.path.isdir(self.args.tensorboard): # Recursively push all files and subdirectories @@ -125,14 +137,21 @@ def run(self): if tresponse.status_code == 200: print(f"tensorboard logs: File {file_name} uploaded successfully.") else: - return f"ERROR: Failed to upload file {file_name}. Server response: {tresponse.text}" - return f"tensorboard logs: {self.args.tensorboard} uploaded successfully!!" + # give status as failure + return TensorboardPushFailure(file_name,tresponse.text) + return TensorboardPushSuccess() else: - return "ERROR: Invalid data path. Provide valid file/folder path for tensorboard logs!!" + return InvalidTensorboardFilePath() + elif status_code==422 and response.json()["status"]=="version_update": + raise UpdateCmfVersion + elif status_code == 404: + raise CmfServerNotAvailable + elif status_code == 500: + raise InternalServerError else: - return "SUCCESS!!" + return "ERROR: Status Code = {status_code}. Unable to push mlmd." else: - return "Pipeline name " + self.args.pipeline_name + " doesn't exists." + raise PipelineNotFound(self.args.pipeline_name) def add_parser(subparsers, parent_parser): diff --git a/cmflib/commands/pipeline/list.py b/cmflib/commands/pipeline/list.py index fe597870..511a66c7 100644 --- a/cmflib/commands/pipeline/list.py +++ b/cmflib/commands/pipeline/list.py @@ -19,23 +19,22 @@ from cmflib.cli.command import CmdBase from cmflib import cmfquery -from cmflib.dvc_wrapper import dvc_get_config +from cmflib.cmf_exception_handling import ( + FileNotFound, + DuplicateArgumentNotAllowed, + MissingArgument, + MsgSuccess +) class CmdPipelineList(CmdBase): def run(self): - # Check if 'cmf' is configured. - msg = "'cmf' is not configured.\nExecute 'cmf init' command." - result = dvc_get_config() - if len(result) == 0: - return msg - current_directory = os.getcwd() if not self.args.file_name: # If self.args.file_name is None or an empty list ([]). mlmd_file_name = "./mlmd" # Default path for mlmd file name. elif len(self.args.file_name) > 1: # If the user provided more than one file name. - return "Error: You can only provide one file name using the -f flag." + raise DuplicateArgumentNotAllowed("file_name", "-f") elif not self.args.file_name[0]: # self.args.file_name[0] is an empty string (""). - return "Error: Missing File name" + raise MissingArgument("file name") else: mlmd_file_name = self.args.file_name[0].strip() if mlmd_file_name == "mlmd": @@ -43,12 +42,12 @@ def run(self): current_directory = os.path.dirname(mlmd_file_name) if not os.path.exists(mlmd_file_name): - return f"Error: {mlmd_file_name} doesn't exists in {current_directory} directory." + raise FileNotFound(mlmd_file_name, current_directory) # Creating cmfquery object. query = cmfquery.CmfQuery(mlmd_file_name) - - return [pipeline.name for pipeline in query._get_pipelines()] + + return MsgSuccess(msg_list = [pipeline.name for pipeline in query._get_pipelines()]) def add_parser(subparsers, parent_parser): diff --git a/cmflib/storage_backends/amazonS3_artifacts.py b/cmflib/storage_backends/amazonS3_artifacts.py index 194641ca..de9cafcb 100644 --- a/cmflib/storage_backends/amazonS3_artifacts.py +++ b/cmflib/storage_backends/amazonS3_artifacts.py @@ -18,88 +18,175 @@ import boto3 class AmazonS3Artifacts: - def download_artifacts( + + def __init__(self, dvc_config_op): + """ + Initialize the AmazonS3Artifacts class with AWS credentials. + + Args: + dvc_config_op (dict): Dictionary containing AWS credentials (access key, secret key, and session token). + """ + self.access_key = dvc_config_op["remote.amazons3.access_key_id"] + self.secret_key = dvc_config_op["remote.amazons3.secret_access_key"] + self.session_token = dvc_config_op["remote.amazons3.session_token"] + + # Create an S3 client with the provided credentials. + self.s3 = boto3.client( + 's3', + aws_access_key_id = self.access_key, + aws_secret_access_key = self.secret_key, + aws_session_token = self.session_token + ) + + def download_file( self, - dvc_config_op, current_directory: str, bucket_name: str, object_name: str, download_loc: str, ): - access_key = dvc_config_op["remote.amazons3.access_key_id"] - secret_key = dvc_config_op["remote.amazons3.secret_access_key"] - session_token = dvc_config_op["remote.amazons3.session_token"] + """ + Download a single file from an S3 bucket. + + Args: + current_directory (str): The current working directory. + bucket_name (str): Name of the s3 bucket. + object_name (str): Key (path) of the file in the s3 bucket. + download_loc (str): Local path where the file should be downloaded. + + Returns: + tuple: (object_name, download_loc, status) where status indicates success (True) or failure (False). + """ try: - s3 = boto3.client( - 's3', - aws_access_key_id=access_key, - aws_secret_access_key=secret_key, - aws_session_token=session_token - ) - s3.head_bucket(Bucket=bucket_name) + response = "" + # Check if the bucket exists. + self.s3.head_bucket(Bucket=bucket_name) + + # Create necessary directories for the download location. dir_path = "" if "/" in download_loc: dir_path, _ = download_loc.rsplit("/", 1) if dir_path != "": os.makedirs(dir_path, mode=0o777, exist_ok=True) # creating subfolders if needed + + # Download the file + response = self.s3.download_file(bucket_name, object_name, download_loc) + + # Check if the response indicates success. + if response == None: + return object_name, download_loc, True + else: + return object_name, download_loc, False + except self.s3.exceptions.ClientError as e: + # If a specific error code is returned, the bucket does not exist + if e.response['Error']['Code'] == '404': + print(f"{bucket_name} doesn't exists!!") + return object_name, download_loc, False + else: + print(e) + return object_name, download_loc, False + except Exception as e: + return object_name, download_loc, False - response = "" + def download_directory(self, + current_directory: str, + bucket_name: str, + object_name: str, + download_loc: str, + ): + """ + Download a directory from an S3 bucket using its .dir metadata object. + + Args: + current_directory (str): The current working directory . + bucket_name (str): Name of the S3 bucket. + object_name (str): Key (path) of the .dir object in the S3 bucket. + download_loc (str): Local directory path where the directory should be downloaded. + + Returns: + tuple: (total_files_in_directory, files_downloaded, status) where status indicates success (True) or failure (False). + """ + + self.s3.head_bucket(Bucket=bucket_name) + """" + if object_name ends with .dir - it is a directory. + we download .dir object with 'temp_dir' and remove + this after all the files from this .dir object is downloaded. + """ + # in case of .dir, download_loc is a absolute path for a folder + dir_path = "" + if "/" in download_loc: + dir_path, _ = download_loc.rsplit("/", 1) + if dir_path != "": + os.makedirs(dir_path, mode=0o777, exist_ok=True) # creating subfolders if needed + os.makedirs(download_loc, mode=0o777, exist_ok=True) + total_files_in_directory = 0 + files_downloaded = 0 - """" - if object_name ends with .dir - it is a directory. - we download .dir object with 'temp_dir' and remove - this after all the files from this .dir object is downloaded. + # Temporary file to download the .dir metadata object. + temp_dir = f"{download_loc}/temp_dir" + try: + # Download the .dir file containing metadata about tracked files. + response = self.s3.download_file(bucket_name, object_name, temp_dir) + + # Read the .dir metadata to get file information. + with open(temp_dir, 'r') as file: + tracked_files = eval(file.read()) + + # removing temp_dir + if os.path.exists(temp_dir): + os.remove(temp_dir) """ - if object_name.endswith('.dir'): - # in case of .dir, download_loc is a absolute path for a folder - os.makedirs(download_loc, mode=0o777, exist_ok=True) - - # download .dir object - temp_dir = f"{download_loc}/temp_dir" - response = s3.download_file(bucket_name, object_name, temp_dir) - - with open(temp_dir, 'r') as file: - tracked_files = eval(file.read()) - - # removing temp_dir - if os.path.exists(temp_dir): - os.remove(temp_dir) - - """ - object_name = files/md5/c9/d8fdacc0d942cf8d7d95b6301cfb97.dir - contains the path of the .dir on the artifact repo - we need to remove the hash of the .dir from the object_name - which will leave us with the artifact repo path - """ - repo_path = "/".join(object_name.split("/")[:-2]) - for file_info in tracked_files: - relpath = file_info['relpath'] - md5_val = file_info['md5'] - # download_loc = /home/user/datatslice/example-get-started/test/artifacts/raw_data - # md5_val = a237457aa730c396e5acdbc5a64c8453 - # we need a2/37457aa730c396e5acdbc5a64c8453 - formatted_md5 = md5_val[:2] + '/' + md5_val[2:] - temp_download_loc = f"{download_loc}/{relpath}" - temp_object_name = f"{repo_path}/{formatted_md5}" - obj = s3.download_file(bucket_name, temp_object_name, temp_download_loc) - if obj == None: - print(f"object {temp_object_name} downloaded at {temp_download_loc}.") - else: - # download objects which are file - response = s3.download_file(bucket_name, object_name, download_loc) - if response == None: - return f"{object_name} downloaded at {download_loc}" - return response + object_name = files/md5/c9/d8fdacc0d942cf8d7d95b6301cfb97.dir + contains the path of the .dir on the artifact repo + we need to remove the hash of the .dir from the object_name + which will leave us with the artifact repo path + """ + repo_path = "/".join(object_name.split("/")[:-2]) + obj=True + for file_info in tracked_files: + total_files_in_directory += 1 + relpath = file_info['relpath'] + md5_val = file_info['md5'] + # download_loc = /home/user/datatslice/example-get-started/test/artifacts/raw_data + # md5_val = a237457aa730c396e5acdbc5a64c8453 + # we need a2/37457aa730c396e5acdbc5a64c8453 + formatted_md5 = md5_val[:2] + '/' + md5_val[2:] + temp_download_loc = f"{download_loc}/{relpath}" + temp_object_name = f"{repo_path}/{formatted_md5}" + + obj = self.s3.download_file(bucket_name, temp_object_name, temp_download_loc) + if obj == None: + files_downloaded += 1 + print(f"object {temp_object_name} downloaded at {temp_download_loc}.") + else: + print(f"object {temp_object_name} is not downloaded.") - except s3.exceptions.ClientError as e: + # Check if all files were successfully downloaded. + if (total_files_in_directory - files_downloaded) == 0: + return total_files_in_directory, files_downloaded, True + else: + return total_files_in_directory, files_downloaded, False + except self.s3.exceptions.ClientError as e: # If a specific error code is returned, the bucket does not exist if e.response['Error']['Code'] == '404': - return f"{bucket_name} doesn't exists!!" + print(f"{bucket_name} doesn't exists!!") + total_files_in_directory = 1 + return total_files_in_directory, files_downloaded, False else: - # Handle other errors - raise - except TypeError as exception: - return exception + print(e) + total_files_in_directory = 1 + return total_files_in_directory, files_downloaded, False except Exception as e: - return e + print(f"object {object_name} is not downloaded.") + # Handle failure to download the .dir metadata. + # need to improve this + # We usually don't count .dir as a file while counting total_files_in_directory. + # However, here we failed to download the .dir folder itself. So we need to make + # total_files_in_directory = 1, because .............. + total_files_in_directory = 1 + return total_files_in_directory, files_downloaded, False + + + \ No newline at end of file diff --git a/cmflib/storage_backends/local_artifacts.py b/cmflib/storage_backends/local_artifacts.py index 8626c4cb..e3725a94 100644 --- a/cmflib/storage_backends/local_artifacts.py +++ b/cmflib/storage_backends/local_artifacts.py @@ -17,74 +17,153 @@ import os from dvc.api import DVCFileSystem -class LocalArtifacts: - def download_artifacts( +class LocalArtifacts(): + """ + Initialize the LocalArtifacts class with local repo url. + This class downloads one local artifact at a time and if the passed artifact is a directory + then, it downloads all the files from the directory + + Args: + dvc_config_op (dict): Dictionary containing local url (remote.local.url). + """ + + def __init__(self, dvc_config_op): + self.fs = DVCFileSystem( + dvc_config_op["remote.local-storage.url"] + ) # dvc_config_op[1] is file system path - "/path/to/local/repository" + + def download_file( self, - dvc_config_op, current_directory: str, object_name: str, download_loc: str, ): - obj = True + """ + Download a single file from an S3 bucket. + + Args: + current_directory (str): The current working directory. + bucket_name (str): Name of the local bucket. + object_name (str): Key (path) of the file in the local repo. + download_loc (str): Local path where the file should be downloaded. + + Returns: + tuple: (object_name, download_loc, status) where status indicates success (True) or failure (False). + """ + # get_file() only creates file, to put artifacts in proper directory, subfolders are required. + # download_loc = contains absolute path of the file with file name and extension + + # Create necessary directories for the download location. + dir_path = "" + if "/" in download_loc: + dir_path, _ = download_loc.rsplit("/", 1) + if dir_path != "": + os.makedirs(dir_path, mode=0o777, exist_ok=True) # creating subfolders if needed + try: - fs = DVCFileSystem( - dvc_config_op["remote.local-storage.url"] - ) # dvc_config_op[1] is file system path - "/path/to/local/repository" + # get_file() returns none when file gets downloaded. + response = self.fs.get_file(object_name, download_loc) + + # Check if the response indicates success. + if response == None: + return object_name, download_loc, True + else: + return object_name, download_loc, False + except Exception as e: + return object_name, download_loc, False + + + def download_directory( + self, + current_directory: str, + object_name: str, + download_loc: str, + ): + """ + Download a directory from an local repo using its .dir metadata object. + + Args: + current_directory (str): The current working directory . + bucket_name (str): Name of the local bucket. + object_name (str): Key (path) of the .dir object in the local bucket. + download_loc (str): Local directory path where the directory should be downloaded. + + Returns: + tuple: (total_files_in_directory, files_downloaded, status) where status indicates success (True) or failure (False). + """ + + # get_file() only creates file, to put artifacts in proper directory, subfolders are required. + # download_loc = contains absolute path of the file with file name and extension + dir_path = "" + if "/" in download_loc: + dir_path, _ = download_loc.rsplit("/", 1) + if dir_path != "": + os.makedirs(dir_path, mode=0o777, exist_ok=True) # creating subfolders if needed + + """" + if object_name ends with .dir - it is a directory. + we download .dir object with 'temp_dir' and remove + this after all the files from this .dir object is downloaded. + """ + # in case of .dir, download_loc is a absolute path for a folder + os.makedirs(download_loc, mode=0o777, exist_ok=True) + total_files_in_directory = 0 + files_downloaded = 0 + + # Temporary file to download the .dir metadata object. + temp_dir = f"{download_loc}/dir" + try: + # Download the .dir file containing metadata about tracked files. + response = self.fs.get_file(object_name, temp_dir) - # get_file() only creates file, to put artifacts in proper directory, subfolders are required. - # download_loc = contains absolute path of the file with file name and extension - dir_path = "" - if "/" in download_loc: - dir_path, _ = download_loc.rsplit("/", 1) - if dir_path != "": - os.makedirs(dir_path, mode=0o777, exist_ok=True) # creating subfolders if needed - - response = "" - - """" - if object_name ends with .dir - it is a directory. - we download .dir object with 'temp_dir' and remove - this after all the files from this .dir object is downloaded. + with open(temp_dir, 'r') as file: + tracked_files = eval(file.read()) + + # removing temp_dir + if os.path.exists(temp_dir): + os.remove(temp_dir) + """ - if object_name.endswith('.dir'): - # in case of .dir, download_loc is a absolute path for a folder - os.makedirs(download_loc, mode=0o777, exist_ok=True) - - # download the .dir object - temp_dir = f"{download_loc}/dir" - response = fs.get_file(object_name, temp_dir) - - with open(temp_dir, 'r') as file: - tracked_files = eval(file.read()) - - # removing temp_dir - if os.path.exists(temp_dir): - os.remove(temp_dir) - - """ - object_name = "files/md5/9b/9a458ac0b534f088a47c2b68bae479.dir" - contains the path of the .dir on the artifact repo - we need to remove the hash of the .dir from the object_name - which will leave us with the artifact repo path - """ - repo_path = "/".join(object_name.split("/")[:-2]) - for file_info in tracked_files: - relpath = file_info['relpath'] - md5_val = file_info['md5'] - # md5_val = a237457aa730c396e5acdbc5a64c8453 - # we need a2/37457aa730c396e5acdbc5a64c8453 - formatted_md5 = md5_val[:2] + '/' + md5_val[2:] - temp_object_name = f"{repo_path}/{formatted_md5}" - temp_download_loc = f"{download_loc}/{relpath}" - obj = fs.get_file(temp_object_name, temp_download_loc) + object_name = "files/md5/9b/9a458ac0b534f088a47c2b68bae479.dir" + contains the path of the .dir on the artifact repo + we need to remove the hash of the .dir from the object_name + which will leave us with the artifact repo path + """ + repo_path = "/".join(object_name.split("/")[:-2]) + + obj = True + for file_info in tracked_files: + total_files_in_directory += 1 + relpath = file_info['relpath'] + md5_val = file_info['md5'] + # md5_val = a237457aa730c396e5acdbc5a64c8453 + # we need a2/37457aa730c396e5acdbc5a64c8453 + formatted_md5 = md5_val[:2] + '/' + md5_val[2:] + temp_object_name = f"{repo_path}/{formatted_md5}" + temp_download_loc = f"{download_loc}/{relpath}" + try: + obj = self.fs.get_file(temp_object_name, temp_download_loc) if obj == None: + files_downloaded += 1 print(f"object {temp_object_name} downloaded at {temp_download_loc}.") - else: - response = fs.get_file(object_name, download_loc) - if response == None: # get_file() returns none when file gets downloaded. - stmt = f"object {object_name} downloaded at {download_loc}." - return stmt - except TypeError as exception: - return exception - except Exception as exception: - return exception + else: + print(f"object {temp_object_name} is not downloaded.") + # this exception is for get_file() function for temp_object_name + except Exception as e: + print(f"object {temp_object_name} is not downloaded.") + + # total_files - files_downloaded gives us the number of files which are failed to download + if (total_files_in_directory - files_downloaded) == 0: + return total_files_in_directory, files_downloaded, True + else: + return total_files_in_directory, files_downloaded, False + # this exception is for get_file() function for object_name + except Exception as e: + print(f"object {object_name} is not downloaded.") + # We usually don't count .dir as a file while counting total_files_in_directory. + # However, here we failed to download the .dir folder itself. + # So we need to make, total_files_in_directory = 1 + total_files_in_directory = 1 + return total_files_in_directory, files_downloaded, False + + # sometimes we get TypeError as an execption, however investiagtion for the exact scenarios is pending diff --git a/cmflib/storage_backends/minio_artifacts.py b/cmflib/storage_backends/minio_artifacts.py index 2597d0f6..5531d4fd 100644 --- a/cmflib/storage_backends/minio_artifacts.py +++ b/cmflib/storage_backends/minio_artifacts.py @@ -17,83 +17,165 @@ import os from minio import Minio from minio.error import S3Error - +from cmflib.cmf_exception_handling import BucketNotFound class MinioArtifacts: - def download_artifacts( + + def __init__(self, dvc_config_op): + """ + Initialize the MinioArtifacts class with minios3 repo credentials. + dvc_config_op["remote.minio.endpointurl"] = http://XX.XX.XX.XX:9000 + Args: + dvc_config_op (dict): Dictionary containing local url (remote.local.url). + """ + + self.endpoint = dvc_config_op["remote.minio.endpointurl"].split("http://")[1] + self.access_key = dvc_config_op["remote.minio.access_key_id"] + self.secret_key = dvc_config_op["remote.minio.secret_access_key"] + self.client = Minio( + self.endpoint, access_key=self.access_key, secret_key=self.secret_key, secure=False + ) + + + def download_file( + self, + current_directory: str, + bucket_name: str, + object_name: str, + download_loc: str, + ): + """ + Download a single file from an S3 bucket. + + Args: + current_directory (str): The current working directory. + bucket_name (str): Name of the minioS3 bucket. + object_name (str): Key (path) of the file in the minios3 repo. + download_loc (str): Local path where the file should be downloaded. + + Returns: + tuple: (object_name, download_loc, status) where status indicates success (True) or failure (False). + """ + try: + found = self.client.bucket_exists(bucket_name) + + #check if minio bucket exists + if not found: + raise BucketNotFound(bucket_name) + + #Download file + response = self.client.fget_object(bucket_name, object_name, download_loc) + + # Check if the response indicates success. + if response: + return object_name, download_loc, True + else: + return object_name, download_loc, False + except S3Error as exception: + print(exception) + return object_name, download_loc, False + except Exception as e: + return object_name, download_loc, False + + + def download_directory( self, - dvc_config_op, current_directory: str, bucket_name: str, object_name: str, download_loc: str, ): - # dvc_config_op["remote.minio.endpointurl"] = http://XX.XX.XX.XX:9000 - endpoint = dvc_config_op["remote.minio.endpointurl"].split("http://")[1] - access_key = dvc_config_op["remote.minio.access_key_id"] - secret_key = dvc_config_op["remote.minio.secret_access_key"] + """ + Download a directory from an minios3 repo using its .dir metadata object. + + Args: + current_directory (str): The current working directory . + bucket_name (str): Name of the minios3 bucket. + object_name (str): Key (path) of the .dir object in the minios3 bucket. + download_loc (str): Local directory path where the directory should be downloaded. + + Returns: + tuple: (total_files_in_directory, files_downloaded, status) where status indicates success (True) or failure (False). + """ + + found = self.client.bucket_exists(bucket_name) + if not found: #check if minio bucket exists + raise BucketNotFound(bucket_name) + + """" + if object_name ends with .dir - it is a directory. + we download .dir object with 'temp_dir' and remove + this after all the files from this .dir object is downloaded. + """ + + # in case of .dir, download_loc is a absolute path for a folder + os.makedirs(download_loc, mode=0o777, exist_ok=True) + + total_files_in_directory = 0 + files_downloaded = 0 + + # Temporary file to download the .dir metadata object. + temp_dir = f"{download_loc}/temp_dir" try: - client = Minio( - endpoint, access_key=access_key, secret_key=secret_key, secure=False - ) - found = client.bucket_exists(bucket_name) - if not found: - return "Bucket doesn't exists" + # Download the .dir file containing metadata about tracked files. - response = "" + response = self.client.fget_object(bucket_name, object_name, temp_dir) + + with open(temp_dir, 'r') as file: + tracked_files = eval(file.read()) - """" - if object_name ends with .dir - it is a directory. - we download .dir object with 'temp_dir' and remove - this after all the files from this .dir object is downloaded. + # removing temp_dir + if os.path.exists(temp_dir): + os.remove(temp_dir) + + """ + object_name = files/md5/c9/d8fdacc0d942cf8d7d95b6301cfb97.dir + contains the path of the .dir on the artifact repo + we need to remove the hash of the .dir from the object_name + which will leave us with the artifact repo path """ - if object_name.endswith('.dir'): - # in case of .dir, download_loc is a absolute path for a folder - os.makedirs(download_loc, mode=0o777, exist_ok=True) - - # download .dir object - temp_dir = f"{download_loc}/temp_dir" - response = client.fget_object(bucket_name, object_name, temp_dir) - - with open(temp_dir, 'r') as file: - tracked_files = eval(file.read()) - - # removing temp_dir - if os.path.exists(temp_dir): - os.remove(temp_dir) - - """ - object_name = files/md5/c9/d8fdacc0d942cf8d7d95b6301cfb97.dir - contains the path of the .dir on the artifact repo - we need to remove the hash of the .dir from the object_name - which will leave us with the artifact repo path - """ - repo_path = object_name.split("/") - repo_path = repo_path[:len(repo_path)-2] - repo_path = "/".join(repo_path) - for file_info in tracked_files: - relpath = file_info['relpath'] - md5_val = file_info['md5'] - # download_loc = /home/sharvark/datatslice/example-get-started/test/artifacts/raw_data - # md5_val = a237457aa730c396e5acdbc5a64c8453 - # we need a2/37457aa730c396e5acdbc5a64c8453 - formatted_md5 = md5_val[:2] + '/' + md5_val[2:] - temp_download_loc = f"{download_loc}/{relpath}" - temp_object_name = f"{repo_path}/{formatted_md5}" - obj = client.fget_object(bucket_name, temp_object_name, temp_download_loc) + repo_path = object_name.split("/") + repo_path = repo_path[:len(repo_path)-2] + repo_path = "/".join(repo_path) + + + obj=True + for file_info in tracked_files: + total_files_in_directory += 1 + relpath = file_info['relpath'] + md5_val = file_info['md5'] + # download_loc = /home/sharvark/datatslice/example-get-started/test/artifacts/raw_data + # md5_val = a237457aa730c396e5acdbc5a64c8453 + # we need a2/37457aa730c396e5acdbc5a64c8453 + formatted_md5 = md5_val[:2] + '/' + md5_val[2:] + temp_download_loc = f"{download_loc}/{relpath}" + temp_object_name = f"{repo_path}/{formatted_md5}" + try: + obj = self.client.fget_object(bucket_name, temp_object_name, temp_download_loc) if obj: + files_downloaded +=1 print(f"object {temp_object_name} downloaded at {temp_download_loc}.") else: print(f"object {temp_object_name} is not downloaded.") - else: - response = client.fget_object(bucket_name, object_name, download_loc) - if response: - stmt = f"object {object_name} downloaded at {download_loc}." - return stmt - else: - return f"object {object_name} is not downloaded." - - except TypeError as exception: - return exception + except Exception as e: + print(f"object {temp_object_name} is not downloaded.") + + # total_files - files_downloaded gives us the number of files which are failed to download + if (total_files_in_directory - files_downloaded) == 0: + return total_files_in_directory, files_downloaded, True + else: + return total_files_in_directory, files_downloaded, False except S3Error as exception: - return exception + print(exception) + total_files_in_directory = 1 + return total_files_in_directory, files_downloaded, False + except Exception as e: + print(f"object {object_name} is not downloaded.") + # need to improve this + # We usually don't count .dir as a file while counting total_files_in_directory. + # However, here we failed to download the .dir folder itself. So we need to make + # total_files_in_directory = 1, because .............. + total_files_in_directory = 1 + return total_files_in_directory, files_downloaded, False + + # sometimes we get TypeError as an execption, however investiagtion for the exact scenarios is pending diff --git a/cmflib/storage_backends/osdf_artifacts.py b/cmflib/storage_backends/osdf_artifacts.py index 43a725fc..5bfb0b98 100644 --- a/cmflib/storage_backends/osdf_artifacts.py +++ b/cmflib/storage_backends/osdf_artifacts.py @@ -129,10 +129,10 @@ def download_artifacts( success, result = download_and_verify_file(host, headers, remote_file_path, local_file_path, artifact_hash, timeout=10) if success: #print(result) - return result + return success, result else: #print(f"Failed to download and verify file: {result}") - return f"Failed to download and verify file" + return success, f"Failed to download and verify file: {result}" else: #Generate Cached path for artifact cached_s_url=generate_cached_url(host,cache) @@ -140,7 +140,7 @@ def download_artifacts( success, cached_result = download_and_verify_file(cached_s_url, headers, remote_file_path, local_path, artifact_hash,timeout=5) if success: #print(cached_result) - return cached_result + return success, cached_result else: print(f"Failed to download and verify file from cache: {cached_result}") print(f"Trying Origin at {host}") @@ -148,10 +148,10 @@ def download_artifacts( success, origin_result = download_and_verify_file(host, headers, remote_file_path, local_path, artifact_hash, timeout=10) if success: #print(origin_result) - return origin_result + return success, origin_result else: #print(f"Failed to download and verify file: {result}") - return f"Failed to download and verify file" + return success, f"Failed to download and verify file: {origin_result}" diff --git a/cmflib/storage_backends/sshremote_artifacts.py b/cmflib/storage_backends/sshremote_artifacts.py index 40fea410..c500aa36 100644 --- a/cmflib/storage_backends/sshremote_artifacts.py +++ b/cmflib/storage_backends/sshremote_artifacts.py @@ -18,89 +18,149 @@ import paramiko # this is temporary - need to remove after TripleDES warning goes away from paramiko -import warnings -warnings.filterwarnings(action='ignore', module='.*paramiko.*') +# import warnings +# warnings.filterwarnings(action='ignore', module='.*paramiko.*') class SSHremoteArtifacts: - def download_artifacts( + + def __init__(self, dvc_config_op): + self.user = dvc_config_op["remote.ssh-storage.user"] + self.password = dvc_config_op["remote.ssh-storage.password"] + + + def download_file( self, - dvc_config_op, host: str, current_directory: str, object_name: str, download_loc: str, ): - user = dvc_config_op["remote.ssh-storage.user"] - password = dvc_config_op["remote.ssh-storage.password"] + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy( + paramiko.AutoAddPolicy() + ) # this can lead to man in the middle attack, need to find another solution + ssh.connect(host, username=self.user, password=self.password) + sftp = ssh.open_sftp() + dir_path = "" + # in case download_loc is absolute path like /home/user/test/data.xml.gz + # we need to make this absolute path a relative one by removing first '/' + if os.path.isabs(download_loc): + download_loc = download_loc[1:] + if "/" in download_loc: + dir_path, _ = download_loc.rsplit("/", 1) + if dir_path != "": + # creates subfolders needed as per artifacts' folder structure + os.makedirs(dir_path, mode=0o777, exist_ok=True) + + response = "" + abs_download_loc = os.path.abspath(os.path.join(current_directory, download_loc)) try: - ssh = paramiko.SSHClient() - ssh.set_missing_host_key_policy( - paramiko.AutoAddPolicy() - ) # this can lead to man in the middle attack, need to find another solution - ssh.connect(host, username=user, password=password) - sftp = ssh.open_sftp() - dir_path = "" - # in case download_loc is absolute path like home/user/test/data.xml.gz - # we need to make this absolute path a relative one by removing first '/' - if os.path.isabs(download_loc): - download_loc = download_loc[1:] - if "/" in download_loc: - dir_path, _ = download_loc.rsplit("/", 1) - if dir_path != "": - # creates subfolders needed as per artifacts' folder structure - os.makedirs(dir_path, mode=0o777, exist_ok=True) + response = sftp.put(object_name, abs_download_loc) + # we can close sftp connection as we have already downloaded the file + sftp.close() + ssh.close() + if response: + return object_name, abs_download_loc, True + else: + return object_name, abs_download_loc, False + except Exception as e: + # this exception is for function sftp.put() + sftp.close() + ssh.close() + return object_name, abs_download_loc, False - response = "" - abs_download_loc = os.path.abspath(os.path.join(current_directory, download_loc)) - """" - if object_name ends with .dir - it is a directory. - we download .dir object with 'temp_dir' and remove - this after all the files from this .dir object is downloaded. - """ - if object_name.endswith('.dir'): - # in case of .dir, abs_download_loc is a absolute path for a folder - os.makedirs(abs_download_loc, mode=0o777, exist_ok=True) - # download .dir object - temp_dir = f"{abs_download_loc}/temp_dir" - response = sftp.put(object_name, temp_dir) + def download_directory( + self, + host: str, + current_directory: str, + object_name: str, + download_loc: str, + ): + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy( + paramiko.AutoAddPolicy() + ) # this can lead to man in the middle attack, need to find another solution + ssh.connect(host, username=self.user, password=self.password) + sftp = ssh.open_sftp() + dir_path = "" + # in case download_loc is absolute path like home/user/test/data.xml.gz + # we need to make this absolute path a relative one by removing first '/' + if os.path.isabs(download_loc): + download_loc = download_loc[1:] + if "/" in download_loc: + dir_path, _ = download_loc.rsplit("/", 1) + if dir_path != "": + # creates subfolders needed as per artifacts' folder structure + os.makedirs(dir_path, mode=0o777, exist_ok=True) + + response = "" + abs_download_loc = os.path.abspath(os.path.join(current_directory, download_loc)) + + """" + if object_name ends with .dir - it is a directory. + we download .dir object with 'temp_dir' and remove + this after all the files from this .dir object is downloaded. + """ + # in case of .dir, abs_download_loc is a absolute path for a folder + os.makedirs(abs_download_loc, mode=0o777, exist_ok=True) - with open(temp_dir, 'r') as file: - tracked_files = eval(file.read()) + # download .dir object + temp_dir = f"{abs_download_loc}/temp_dir" + try: + response = sftp.put(object_name, temp_dir) + with open(temp_dir, 'r') as file: + tracked_files = eval(file.read()) - # removing temp_dir - if os.path.exists(temp_dir): - os.remove(temp_dir) + # removing temp_dir + if os.path.exists(temp_dir): + os.remove(temp_dir) + """ + object_name = /home/user/ssh-storage/files/md5/dd/2d792b7cf6efb02231f85c6147e403.dir + contains the path of the .dir on the artifact repo + we need to remove the hash of the .dir from the object_name + which will leave us with the artifact repo path + """ + + repo_path = "/".join(object_name.split("/")[:-2]) - """ - object_name = /home/user/ssh-storage/files/md5/dd/2d792b7cf6efb02231f85c6147e403.dir - contains the path of the .dir on the artifact repo - we need to remove the hash of the .dir from the object_name - which will leave us with the artifact repo path - """ - repo_path = "/".join(object_name.split("/")[:-2]) - for file_info in tracked_files: - relpath = file_info['relpath'] - md5_val = file_info['md5'] - # download_loc = /home/user/datatslice/example-get-started/test/artifacts/raw_data - # md5_val = a237457aa730c396e5acdbc5a64c8453 - # we need a2/37457aa730c396e5acdbc5a64c8453 - formatted_md5 = md5_val[:2] + '/' + md5_val[2:] - temp_download_loc = f"{abs_download_loc}/{relpath}" - temp_object_name = f"{repo_path}/{formatted_md5}" + total_files_in_directory = 0 + files_downloaded = 0 + for file_info in tracked_files: + total_files_in_directory += 1 + relpath = file_info['relpath'] + md5_val = file_info['md5'] + # download_loc = /home/user/datatslice/example-get-started/test/artifacts/raw_data + # md5_val = a237457aa730c396e5acdbc5a64c8453 + # we need a2/37457aa730c396e5acdbc5a64c8453 + formatted_md5 = md5_val[:2] + '/' + md5_val[2:] + temp_download_loc = f"{abs_download_loc}/{relpath}" + temp_object_name = f"{repo_path}/{formatted_md5}" + try: obj = sftp.put(object_name, temp_download_loc) + sftp.close() + ssh.close() if obj: + files_downloaded += 1 print(f"object {temp_object_name} downloaded at {temp_download_loc}.") - else: - response = sftp.put(object_name, abs_download_loc) - if response: - stmt = f"object {object_name} downloaded at {abs_download_loc}." - return stmt + else: + print(f"object {temp_object_name} is not downloaded.") + except Exception as e: + sftp.close() + ssh.close() + print(f"object {temp_object_name} is not downloaded.") + # total_files - files_downloaded gives us the number of files which are failed to download + if (total_files_in_directory - files_downloaded) == 0: + return total_files_in_directory, files_downloaded, True + else: + return total_files_in_directory, files_downloaded, False + except Exception as e: sftp.close() ssh.close() - - except TypeError as exception: - return exception - except Exception as exception: - return exception + print(f"object {object_name} is not downloaded.") + # We usually don't count .dir as a file while counting total_files_in_directory. + # However, here we failed to download the .dir folder itself. + # So we need to make, total_files_in_directory = 1 + total_files_in_directory = 1 + return total_files_in_directory, files_downloaded, False diff --git a/server/app/get_data.py b/server/app/get_data.py index 0a0b6188..20e7d649 100644 --- a/server/app/get_data.py +++ b/server/app/get_data.py @@ -258,10 +258,11 @@ def create_unique_executions(server_store_path, req_info) -> str: for uuid in uuids: if uuid in list_executions_exists: stage['executions'].remove(cmf_exec) - + for i in mlmd_data["Pipeline"]: i['stages']=[stage for stage in i['stages'] if stage['executions']!=[]] for i in mlmd_data["Pipeline"]: + if len(i['stages']) == 0 : status="exists" else: diff --git a/server/app/main.py b/server/app/main.py index 30df9c5a..9fe72a03 100644 --- a/server/app/main.py +++ b/server/app/main.py @@ -28,6 +28,7 @@ from server.app.query_execution_lineage_d3tree import query_execution_lineage_d3tree from server.app.query_artifact_lineage_d3tree import query_artifact_lineage_d3tree from server.app.query_visualization_artifact_execution import query_visualization_artifact_execution +from cmflib.cmf_exception_handling import MlmdNotFoundOnServer from pathlib import Path import os import json @@ -124,8 +125,9 @@ async def mlmd_pull(info: Request, pipeline_name: str): #json_payload values can be json data, NULL or no_exec_id. json_payload= await async_api(get_mlmd_from_server, server_store_path, pipeline_name, req_info['exec_id']) else: - print("No mlmd file submitted.") - json_payload = "" + raise HTTPException(status_code=413, detail=f"mlmd file not available on cmf-server.") + if json_payload == None: + raise HTTPException(status_code=406, detail=f"Pipeline {pipeline_name} not found.") return json_payload # api to display executions available in mlmd