diff --git a/cmflib/cmf.py b/cmflib/cmf.py index cb653dd6..f4f7a68b 100644 --- a/cmflib/cmf.py +++ b/cmflib/cmf.py @@ -20,6 +20,7 @@ import re import os import sys +import yaml import pandas as pd import typing as t @@ -54,7 +55,18 @@ link_execution_to_input_artifact, ) from cmflib.utils.cmf_config import CmfConfig -from cmflib.utils.helper_functions import get_python_env, change_dir +from cmflib.utils.helper_functions import get_python_env, change_dir, get_md5_hash +from cmflib.cmf_server_methods import ( + merge_created_context, + merge_created_execution, + log_python_env_from_client, + log_dataset_with_version, + log_model_with_version, + log_execution_metrics_from_client, + log_step_metrics_from_client, + log_dataslice_from_client, +) + from cmflib.cmf_commands_wrapper import ( _metadata_push, _metadata_pull, @@ -103,16 +115,9 @@ class Cmf: """ # pylint: disable=too-many-instance-attributes - # Reading CONFIG_FILE variable - cmf_config = os.environ.get("CONFIG_FILE", ".cmfconfig") ARTIFACTS_PATH = "cmf_artifacts" DATASLICE_PATH = "dataslice" METRICS_PATH = "metrics" - if os.path.exists(cmf_config): - attr_dict = CmfConfig.read_config(cmf_config) - __neo4j_uri = attr_dict.get("neo4j-uri", "") - __neo4j_password = attr_dict.get("neo4j-password", "") - __neo4j_user = attr_dict.get("neo4j-user", "") def __init__( self, @@ -170,6 +175,7 @@ def __init__( ) os.chdir(logging_dir) + # function used to load neo4j params for cmf client @staticmethod def __load_neo4j_params(): cmf_config = os.environ.get("CONFIG_FILE", ".cmfconfig") @@ -179,7 +185,7 @@ def __load_neo4j_params(): Cmf.__neo4j_password = attr_dict.get("neo4j-password", "") Cmf.__neo4j_user = attr_dict.get("neo4j-user", "") - + # function used to load neo4j params for cmf-server @staticmethod def __get_neo4j_server_config(): Cmf.__neo4j_uri = os.getenv('NEO4J_URI', "") @@ -276,47 +282,7 @@ def create_context( ) return ctx - def merge_created_context( - self, pipeline_stage: str, custom_properties: t.Optional[t.Dict] = None - ) -> mlpb.Context: - """Merge created context. - Every call creates a unique pipeline stage. - Created for metadata push purpose. - Example: - - ```python - #Create context - # Import CMF - from cmflib.cmf import Cmf - from ml_metadata.proto import metadata_store_pb2 as mlpb - # Create CMF logger - cmf = Cmf(filepath="mlmd", pipeline_name="test_pipeline") - # Create context - context: mlmd.proto.Context = cmf.merge_created_context( - pipeline_stage="Test-env/prepare", - custom_properties ={"user-metadata1": "metadata_value"} - ``` - Args: - Pipeline_stage: Pipeline_Name/Stage_name. - custom_properties: Developers can provide key value pairs with additional properties of the execution that - need to be stored. - Returns: - Context object from ML Metadata library associated with the new context for this stage. - """ - - custom_props = {} if custom_properties is None else custom_properties - ctx = get_or_create_run_context( - self.store, pipeline_stage, custom_props) - self.child_context = ctx - associate_child_to_parent_context( - store=self.store, parent_context=self.parent_context, child_context=ctx - ) - if self.graph: - self.driver.create_stage_node( - pipeline_stage, self.parent_context, ctx.id, custom_props - ) - return ctx - + def update_context( self, type_name: str, @@ -419,7 +385,7 @@ def create_execution( git_repo = git_get_repo() git_start_commit = git_get_commit() cmd = str(sys.argv) if cmd is None else cmd - python_env=get_python_env() + self.execution = create_new_execution_in_existing_run_context( store=self.store, # Type field when re-using executions @@ -433,7 +399,6 @@ def create_execution( pipeline_type=self.parent_context.name, git_repo=git_repo, git_start_commit=git_start_commit, - python_env=python_env, custom_properties=custom_props, create_new_execution=create_new_execution, ) @@ -462,6 +427,37 @@ def create_execution( self.execution.id, custom_props, ) + + directory_path = self.ARTIFACTS_PATH + os.makedirs(directory_path, exist_ok=True) + packages = get_python_env() + if isinstance(packages, list): + output = f"{packages}\n" + md5_hash = get_md5_hash(output) + python_env_file_path = os.path.join(directory_path, f"python_env_{md5_hash}.txt") + # create file if it doesn't exists + if not os.path.exists(python_env_file_path): + #print(f"{python_env_file_path} doesn't exists!!") + with open(python_env_file_path, 'w') as file: + for package in packages: + file.write(f"{package}\n") + + else: + # in case output is dict + env_output = yaml.dump(packages, sort_keys=False) + md5_hash = get_md5_hash(env_output) + python_env_file_path = os.path.join(directory_path, f"python_env_{md5_hash}.yaml") + # create file if it doesn't exists + if not os.path.exists(python_env_file_path): + #print(f"{python_env_file_path} doesn't exists!!") + with open(python_env_file_path, 'w') as file: + file.write(env_output) + + # link the artifact to execution if it exists and creates artifact if it doesn't + self.log_python_env(python_env_file_path) + new_custom_props = {} + new_custom_props["Python_Env"] = python_env_file_path + self.update_execution(self.execution.id, new_custom_props) os.chdir(logging_dir) return self.execution @@ -539,126 +535,95 @@ def update_execution( ) return self.execution - def merge_created_execution( - self, - execution_type: str, - execution_cmd: str, - properties: t.Optional[t.Dict] = None, - custom_properties: t.Optional[t.Dict] = None, - orig_execution_name:str = "", - create_new_execution:bool = True - ) -> mlpb.Execution: - """Merge Created execution. - Every call creates a unique execution. Execution can only be created within a context, so - [create_context][cmflib.cmf.Cmf.create_context] must be called first. - Every call occurs when metadata push or pull is processed. Data from pre-existing executions is used - to create new executions with additional data(Required on cmf-server). - Example: - ```python - # Import CMF - from cmflib.cmf import Cmf - from ml_metadata.proto import metadata_store_pb2 as mlpb - # Create CMF logger - cmf = Cmf(filepath="mlmd", pipeline_name="test_pipeline") - # Create or reuse context for this stage - context: mlmd.proto.Context = cmf.merge_created_context( - pipeline_stage="prepare", - custom_properties ={"user-metadata1": "metadata_value"} - ) - # Create a new execution for this stage run - execution: mlmd.proto.Execution = cmf.merge_created_execution( - execution_type="Prepare", - properties={"Context_Type":""}, - custom_properties = {"split": split, "seed": seed}, - orig_execution_name=execution_name - ) - ``` - Args: - execution_type: Type of the execution.(when create_new_execution is False, this is the name of execution) - properties: Properties of Execution. - custom_properties: Developers can provide key value pairs with additional properties of the execution that - need to be stored. - - cmd: command used to run this execution. + def log_python_env( + self, + url: str, + ) -> mlpb.Artifact: + "Used to log the python packages involved in the current execution" - create_new_execution:bool = True, This can be used by advanced users to re-use executions - This is applicable, when working with framework code like mmdet, pytorch lightning etc, where the - custom call-backs are used to log metrics. - if create_new_execution is True(Default), execution_type parameter will be used as the name of the execution type. - if create_new_execution is False, if existing execution exist with the same name as execution_type. - it will be reused. - Only executions created with create_new_execution as False will have "name" as a property. + git_repo = git_get_repo() + name = re.split("/", url)[-1] + existing_artifact = [] + commit_output(url, self.execution.id) + c_hash = dvc_get_hash(url) - Returns: - Execution object from ML Metadata library associated with the execution for this stage. - """ - # Initializing the execution related fields - properties = {} if properties is None else properties - self.metrics = {} - self.input_artifacts = [] - self.execution_label_props = {} - custom_props = {} if custom_properties is None else custom_properties - # print(custom_props) - git_repo = properties.get("Git_Repo", "") - git_start_commit = properties.get("Git_Start_Commit", "") - python_env = properties.get("Python_Env", "") - #name = properties.get("Name", "") - create_new_execution = True - execution_name = execution_type - #exe.name property is passed as the orig_execution_name. - #if name is not an empty string then we are re-using executions - if orig_execution_name != "": - create_new_execution = False - execution_name = orig_execution_name + if c_hash == "": + print("Error in getting the dvc hash,return without logging") + return - self.execution = create_new_execution_in_existing_run_context( - store=self.store, - execution_type_name=execution_type, # Type field when re-using executions - execution_name=execution_name, #Name field if we are re-using executionsname - #Type field , if creating new executions always - context_id=self.child_context.id, - execution=execution_cmd, - pipeline_id=self.parent_context.id, - pipeline_type=self.parent_context.name, - git_repo=git_repo, - git_start_commit=git_start_commit, - python_env=python_env, - custom_properties=custom_props, - create_new_execution=create_new_execution - ) + commit = c_hash + dvc_url = dvc_get_url(url) + dvc_url_with_pipeline = f"{self.parent_context.name}:{dvc_url}" + url = url + ":" + c_hash + if c_hash and c_hash.strip: + existing_artifact.extend(self.store.get_artifacts_by_uri(c_hash)) - uuids = "" + if existing_artifact and len(existing_artifact) != 0: + existing_artifact = existing_artifact[0] + uri = c_hash + artifact = link_execution_to_artifact( + store=self.store, + execution_id=self.execution.id, + uri=uri, + input_name=url, + event_type=mlpb.Event.Type.INPUT, + ) + else: + uri = c_hash if c_hash and c_hash.strip() else str(uuid.uuid1()) + artifact = create_new_artifact_event_and_attribution( + store=self.store, + execution_id=self.execution.id, + context_id=self.child_context.id, + uri=uri, + name=url, + type_name="Environment", + event_type=mlpb.Event.Type.INPUT, + properties={ + "git_repo": str(git_repo), + # passing c_hash value to commit + "Commit": str(commit), + "url": str(dvc_url_with_pipeline), + }, + artifact_type_properties={ + "git_repo": mlpb.STRING, + "Commit": mlpb.STRING, + "url": mlpb.STRING, + }, + milliseconds_since_epoch=int(time.time() * 1000), + ) + custom_props = {} + custom_props["git_repo"] = git_repo + custom_props["Commit"] = commit + self.execution_label_props["git_repo"] = git_repo + self.execution_label_props["Commit"] = commit - uuids = self.execution.properties["Execution_uuid"].string_value - if uuids: - self.execution.properties["Execution_uuid"].string_value = uuids +\ - ","+properties["Execution_uuid"] - else: - self.execution.properties["Execution_uuid"].string_value =\ - properties["Execution_uuid"] + if self.graph: + self.driver.create_env_node( + name, + url, + uri, + "input", + self.execution.id, + self.parent_context, + custom_props, + ) + self.input_artifacts.append( + { + "Name": name, + "Path": url, + "URI": uri, + "Event": "input", + "Execution_Name": self.execution_name, + "Type": "Environment", + "Execution_Command": self.execution_command, + "Pipeline_Id": self.parent_context.id, + "Pipeline_Name": self.parent_context.name, + } + ) + self.driver.create_execution_links(uri, name, "Environment") + return artifact - - self.store.put_executions([self.execution]) - self.execution_name = str(self.execution.id) + "," + execution_type - self.execution_command = execution_cmd - for k, v in custom_props.items(): - k = re.sub("-", "_", k) - self.execution_label_props[k] = v - self.execution_label_props["Execution_Name"] = ( - execution_type + ":" + str(self.execution.id) - ) - self.execution_label_props["execution_command"] = execution_cmd - if self.graph: - self.driver.create_execution_node( - self.execution_name, - self.child_context.id, - self.parent_context, - execution_cmd, - self.execution.id, - custom_props, - ) - return self.execution def log_dvc_lock(self, file_path: str): """Used to update the dvc lock file created with dvc run command.""" @@ -872,142 +837,6 @@ def update_model_url(self, dup_artifact: list, updated_url: str): put_artifact(self.store, dup_art) return dup_artifact - def log_dataset_with_version( - self, - url: str, - version: str, - event: str, - props: t.Optional[t.Dict] = None, - custom_properties: t.Optional[t.Dict] = None, - ) -> mlpb.Artifact: - """Logs a dataset when the version (hash) is known. - Example: - ```python - artifact: mlpb.Artifact = cmf.log_dataset_with_version( - url="path/to/dataset", - version="abcdef", - event="output", - props={ "git_repo": "https://github.com/example/repo", - "url": "/path/in/repo", }, - custom_properties={ "custom_key": "custom_value", }, - ) - ``` - Args: - url: Path to the dataset. - version: Hash or version identifier for the dataset. - event: Takes arguments `INPUT` or `OUTPUT`. - props: Optional properties for the dataset (e.g., git_repo, url). - custom_properties: Optional custom properties for the dataset. - Returns: - Artifact object from the ML Protocol Buffers library associated with the new dataset artifact. - """ - - props = {} if props is None else props - custom_props = {} if custom_properties is None else custom_properties - git_repo = props.get("git_repo", "") - name = url - event_type = mlpb.Event.Type.OUTPUT - existing_artifact = [] - c_hash = version - if event.lower() == "input": - event_type = mlpb.Event.Type.INPUT - - # dataset_commit = commit_output(url, self.execution.id) - - dataset_commit = version - url = url + ":" + c_hash - if c_hash and c_hash.strip: - existing_artifact.extend(self.store.get_artifacts_by_uri(c_hash)) - - # To Do - What happens when uri is the same but names are different - if existing_artifact and len(existing_artifact) != 0: - existing_artifact = existing_artifact[0] - - # Quick fix- Updating only the name - if custom_properties is not None: - self.update_existing_artifact( - existing_artifact, custom_properties) - uri = c_hash - # update url for existing artifact - self.update_dataset_url(existing_artifact, props.get("url", "")) - artifact = link_execution_to_artifact( - store=self.store, - execution_id=self.execution.id, - uri=uri, - input_name=url, - event_type=event_type, - ) - else: - # if((existing_artifact and len(existing_artifact )!= 0) and c_hash != ""): - # url = url + ":" + str(self.execution.id) - uri = c_hash if c_hash and c_hash.strip() else str(uuid.uuid1()) - artifact = create_new_artifact_event_and_attribution( - store=self.store, - execution_id=self.execution.id, - context_id=self.child_context.id, - uri=uri, - name=url, - type_name="Dataset", - event_type=event_type, - properties={ - "git_repo": str(git_repo), - "Commit": str(dataset_commit), - "url": props.get("url", " "), - }, - artifact_type_properties={ - "git_repo": mlpb.STRING, - "Commit": mlpb.STRING, - "url": mlpb.STRING, - }, - custom_properties=custom_props, - milliseconds_since_epoch=int(time.time() * 1000), - ) - custom_props["git_repo"] = git_repo - custom_props["Commit"] = dataset_commit - self.execution_label_props["git_repo"] = git_repo - self.execution_label_props["Commit"] = dataset_commit - - if self.graph: - self.driver.create_dataset_node( - name, - url, - uri, - event, - self.execution.id, - self.parent_context, - custom_props, - ) - if event.lower() == "input": - self.input_artifacts.append( - { - "Name": name, - "Path": url, - "URI": uri, - "Event": event.lower(), - "Execution_Name": self.execution_name, - "Type": "Dataset", - "Execution_Command": self.execution_command, - "Pipeline_Id": self.parent_context.id, - "Pipeline_Name": self.parent_context.name, - } - ) - self.driver.create_execution_links(uri, name, "Dataset") - else: - child_artifact = { - "Name": name, - "Path": url, - "URI": uri, - "Event": event.lower(), - "Execution_Name": self.execution_name, - "Type": "Dataset", - "Execution_Command": self.execution_command, - "Pipeline_Id": self.parent_context.id, - "Pipeline_Name": self.parent_context.name, - } - self.driver.create_artifact_relationships( - self.input_artifacts, child_artifact, self.execution_label_props - ) - return artifact # Add the model to dvc do a git commit and store the commit id in MLMD def log_model( @@ -1138,7 +967,7 @@ def log_model( custom_properties=custom_props, milliseconds_since_epoch=int(time.time() * 1000), ) - # custom_properties["Commit"] = model_commit + custom_properties["Commit"] = model_commit self.execution_label_props["Commit"] = model_commit #To DO model nodes should be similar to dataset nodes when we create neo4j if self.graph: @@ -1182,226 +1011,6 @@ def log_model( os.chdir(logging_dir) return artifact - # Add the model to dvc do a git commit and store the commit id in MLMD - def log_model_with_version( - self, - path: str, - event: str, - props=None, - custom_properties: t.Optional[t.Dict] = None, - ) -> object: - """Logs a model when the version(hash) is known - The model is added to dvc and the metadata file (.dvc) gets committed to git. - Example: - ```python - artifact: mlmd.proto.Artifact= cmf.log_model_with_version( - path="path/to/model.pkl", - event="output", - props={ - "url": "/home/user/local-storage/bf/629ccd5cd008066b72c04f9a918737", - "model_type": "RandomForestClassifier", - "model_name": "RandomForestClassifier:default", - "Commit": "commit 1146dad8b74cae205db6a3132ea403db1e4032e5", - "model_framework": "SKlearn", - }, - custom_properties={ - "uri": "bf629ccd5cd008066b72c04f9a918737", - }, - - ) - ``` - Args: - path: Path to the model file. - event: Takes arguments `INPUT` OR `OUTPUT`. - props: Model artifact properties. - custom_properties: The model properties. - Returns: - Artifact object from ML Metadata library associated with the new model artifact. - """ - - if custom_properties is None: - custom_properties = {} - custom_props = {} if custom_properties is None else custom_properties - name = re.split("/", path)[-1] - event_type = mlpb.Event.Type.OUTPUT - existing_artifact = [] - if event.lower() == "input": - event_type = mlpb.Event.Type.INPUT - - # props["commit"] = "" # To do get from incoming data - c_hash = props.get("uri", " ") - # If connecting to an existing artifact - The name of the artifact is used as path/steps/key - model_uri = path + ":" + c_hash - # dvc_url = dvc_get_url(path, False) - url = props.get("url", "") - # uri = "" - if c_hash and c_hash.strip(): - uri = c_hash.strip() - existing_artifact.extend(self.store.get_artifacts_by_uri(uri)) - else: - raise RuntimeError("Model commit failed, Model uri empty") - - if ( - existing_artifact - and len(existing_artifact) != 0 - ): - # update url for existing artifact - existing_artifact = self.update_model_url(existing_artifact, url) - artifact = link_execution_to_artifact( - store=self.store, - execution_id=self.execution.id, - uri=c_hash, - input_name=model_uri, - event_type=event_type, - ) - model_uri = artifact.name - else: - uri = c_hash if c_hash and c_hash.strip() else str(uuid.uuid1()) - model_uri = model_uri + ":" + str(self.execution.id) - artifact = create_new_artifact_event_and_attribution( - store=self.store, - execution_id=self.execution.id, - context_id=self.child_context.id, - uri=uri, - name=model_uri, - type_name="Model", - event_type=event_type, - properties={ - "model_framework": props.get("model_framework", ""), - "model_type": props.get("model_type", ""), - "model_name": props.get("model_name", ""), - "Commit": props.get("Commit", ""), - "url": str(url), - }, - artifact_type_properties={ - "model_framework": mlpb.STRING, - "model_type": mlpb.STRING, - "model_name": mlpb.STRING, - "Commit": mlpb.STRING, - "url": mlpb.STRING, - }, - custom_properties=custom_props, - milliseconds_since_epoch=int(time.time() * 1000), - ) - # custom_properties["Commit"] = model_commit - # custom_props["url"] = url - self.execution_label_props["Commit"] = props.get("Commit", "") - if self.graph: - self.driver.create_model_node( - model_uri, - uri, - event, - self.execution.id, - self.parent_context, - custom_props, - ) - if event.lower() == "input": - self.input_artifacts.append( - { - "Name": model_uri, - "URI": uri, - "Event": event.lower(), - "Execution_Name": self.execution_name, - "Type": "Model", - "Execution_Command": self.execution_command, - "Pipeline_Id": self.parent_context.id, - "Pipeline_Name": self.parent_context.name, - } - ) - self.driver.create_execution_links(uri, model_uri, "Model") - else: - child_artifact = { - "Name": model_uri, - "URI": uri, - "Event": event.lower(), - "Execution_Name": self.execution_name, - "Type": "Model", - "Execution_Command": self.execution_command, - "Pipeline_Id": self.parent_context.id, - "Pipeline_Name": self.parent_context.name, - } - self.driver.create_artifact_relationships( - self.input_artifacts, child_artifact, self.execution_label_props - ) - - return artifact - - def log_execution_metrics_from_client(self, metrics_name: str, - custom_properties: t.Optional[t.Dict] = None) -> mlpb.Artifact: - """ Logs execution metrics from a client. - Data from pre-existing metrics from client side is used to create identical metrics on server side. - Example: - ```python - artifact: mlpb.Artifact = cmf.log_execution_metrics_from_client( - metrics_name="example_metrics:uri:123", - custom_properties={"custom_key": "custom_value"}, - ) - ``` - Args: - metrics_name: Name of the metrics in the format "name:uri:execution_id". - custom_properties: Optional custom properties for the metrics. - Returns: - Artifact object from the ML Protocol Buffers library associated with the metrics artifact. - """ - - metrics = None - custom_props = {} if custom_properties is None else custom_properties - existing_artifact = [] - name_tokens = metrics_name.split(":") - if name_tokens and len(name_tokens) > 2: - name = name_tokens[0] - uri = name_tokens[1] - execution_id = name_tokens[2] - else: - print(f"Error : metrics name {metrics_name} is not in the correct format") - return - - #we need to add the execution id to the metrics name - new_metrics_name = f"{name}:{uri}:{str(self.execution.id)}" - existing_artifacts = self.store.get_artifacts_by_uri(uri) - - existing_artifact = existing_artifacts[0] if existing_artifacts else None - if not existing_artifact or \ - ((existing_artifact) and not - (existing_artifact.name == new_metrics_name)): #we need to add the artifact otherwise its already there - metrics = create_new_artifact_event_and_attribution( - store=self.store, - execution_id=self.execution.id, - context_id=self.child_context.id, - uri=uri, - name=new_metrics_name, - type_name="Metrics", - event_type=mlpb.Event.Type.OUTPUT, - properties={"metrics_name": metrics_name}, - artifact_type_properties={"metrics_name": mlpb.STRING}, - custom_properties=custom_props, - milliseconds_since_epoch=int(time.time() * 1000), - ) - if self.graph: - # To do create execution_links - self.driver.create_metrics_node( - metrics_name, - uri, - "output", - self.execution.id, - self.parent_context, - custom_props, - ) - child_artifact = { - "Name": metrics_name, - "URI": uri, - "Event": "output", - "Execution_Name": self.execution_name, - "Type": "Metrics", - "Execution_Command": self.execution_command, - "Pipeline_Id": self.parent_context.id, - "Pipeline_Name": self.parent_context.name, - } - self.driver.create_artifact_relationships( - self.input_artifacts, child_artifact, self.execution_label_props - ) - return metrics - def log_execution_metrics( self, metrics_name: str, custom_properties: t.Optional[t.Dict] = None @@ -1587,6 +1196,10 @@ def commit_metrics(self, metrics_name: str): custom_properties=custom_props, milliseconds_since_epoch=int(time.time() * 1000), ) + + custom_props["Commit"] = metrics_commit + self.execution_label_props["Commit"] = metrics_commit + if self.graph: self.driver.create_metrics_node( name, @@ -1601,7 +1214,7 @@ def commit_metrics(self, metrics_name: str): "URI": uri, "Event": "output", "Execution_Name": self.execution_name, - "Type": "Metrics", + "Type": "Step_Metrics", "Execution_Command": self.execution_command, "Pipeline_Id": self.parent_context.id, } @@ -1612,79 +1225,6 @@ def commit_metrics(self, metrics_name: str): os.chdir(logging_dir) return metrics - def commit_existing_metrics(self, metrics_name: str, uri: str, props: t.Optional[t.Dict] = None, custom_properties: t.Optional[t.Dict] = None): - """ - Commits existing metrics associated with the given URI to MLMD. - Example: - ```python - artifact: mlpb.Artifact = cmf.commit_existing_metrics("existing_metrics", "abc123", - {"custom_key": "custom_value"}) - ``` - Args: - metrics_name: Name of the metrics. - uri: Unique identifier associated with the metrics. - custom_properties: Optional custom properties for the metrics. - Returns: - Artifact object from the ML Protocol Buffers library associated with the existing metrics artifact. - """ - - custom_props = {} if custom_properties is None else custom_properties - c_hash = uri.strip() - existing_artifact = [] - existing_artifact.extend(self.store.get_artifacts_by_uri(c_hash)) - if (existing_artifact - and len(existing_artifact) != 0 ): - metrics = link_execution_to_artifact( - store=self.store, - execution_id=self.execution.id, - uri=c_hash, - input_name=metrics_name, - event_type=mlpb.Event.Type.OUTPUT, - ) - else: - metrics = create_new_artifact_event_and_attribution( - store=self.store, - execution_id=self.execution.id, - context_id=self.child_context.id, - uri=uri, - name=metrics_name, - type_name="Step_Metrics", - event_type=mlpb.Event.Type.OUTPUT, - properties={ - # passing uri value to commit - "Commit": props.get("Commit", ""), - "url": props.get("url", ""), - }, - artifact_type_properties={ - "Commit": mlpb.STRING, - "url": mlpb.STRING, - }, - custom_properties=custom_props, - milliseconds_since_epoch=int(time.time() * 1000), - ) - if self.graph: - self.driver.create_metrics_node( - metrics_name, - uri, - "output", - self.execution.id, - self.parent_context, - custom_props, - ) - child_artifact = { - "Name": metrics_name, - "URI": uri, - "Event": "output", - "Execution_Name": self.execution_name, - "Type": "Metrics", - "Execution_Command": self.execution_command, - "Pipeline_Id": self.parent_context.id, - } - self.driver.create_artifact_relationships( - self.input_artifacts, child_artifact, self.execution_label_props - ) - return metrics - def log_validation_output( self, version: str, custom_properties: t.Optional[t.Dict] = None @@ -1902,12 +1442,6 @@ def commit(self, custom_properties: t.Optional[t.Dict] = None) -> None: input_name=dataslice_path + ":" + c_hash, ) else: - props={ - "git_repo": str(git_repo), - # passing c_hash value to commit - "Commit": str(dataslice_commit), - "url": str(dvc_url_with_pipeline), - }, slice = create_new_artifact_event_and_attribution( store=self.writer.store, execution_id=self.writer.execution.id, @@ -1930,57 +1464,16 @@ def commit(self, custom_properties: t.Optional[t.Dict] = None) -> None: custom_properties=custom_props, milliseconds_since_epoch=int(time.time() * 1000), ) - if self.writer.graph: - self.writer.driver.create_dataslice_node( - self.name, dataslice_path + ":" + c_hash, c_hash, self.data_parent, props - ) - os.chdir(logging_dir) - return slice - # commit existing dataslice to server - def commit_existing(self, uri: str, props: t.Optional[t.Dict] = None, custom_properties: t.Optional[t.Dict] = None) -> None: - custom_props = {} if custom_properties is None else custom_properties - c_hash = uri.strip() - dataslice_commit = c_hash - existing_artifact = [] - if c_hash and c_hash.strip(): - existing_artifact.extend( - self.writer.store.get_artifacts_by_uri(c_hash)) - if existing_artifact and len(existing_artifact) != 0: - print("Adding to existing data slice") - # Haven't added event type in this if cond, is it not needed?? - slice = link_execution_to_input_artifact( - store=self.writer.store, - execution_id=self.writer.execution.id, - uri=c_hash, - input_name=self.name, - ) - else: - slice = create_new_artifact_event_and_attribution( - store=self.writer.store, - execution_id=self.writer.execution.id, - context_id=self.writer.child_context.id, - uri=c_hash, - name=self.name, - type_name="Dataslice", - event_type=mlpb.Event.Type.OUTPUT, - properties={ - "git_repo": props.get("git_repo", ""), - "Commit": props.get("Commit", ""), - "url": props.get("url", " "), - }, - artifact_type_properties={ - "git_repo": mlpb.STRING, - "Commit": mlpb.STRING, - "url": mlpb.STRING, - }, - custom_properties=custom_properties, - milliseconds_since_epoch=int(time.time() * 1000), - ) + custom_props["git_repo"] = git_repo + custom_props["Commit"] = dataslice_commit + self.writer.execution_label_props["git_repo"] = git_repo + self.writer.execution_label_props["Commit"] = dataslice_commit if self.writer.graph: self.writer.driver.create_dataslice_node( - self.name, self.name, c_hash, self.data_parent, custom_properties + self.name, dataslice_path + ":" + c_hash, c_hash, self.data_parent, custom_props ) + os.chdir(logging_dir) return slice @@ -1996,6 +1489,18 @@ def commit_existing(self, uri: str, props: t.Optional[t.Dict] = None, custom_pro # print(last) # os.symlink(str(index), slicedir + "/ " + last) +# Binding cmf_server_methods to Cmf class +Cmf.merge_created_context = merge_created_context +Cmf.merge_created_execution = merge_created_execution +Cmf.log_python_env_from_client = log_python_env_from_client +Cmf.log_dataset_with_version = log_dataset_with_version +Cmf.log_model_with_version = log_model_with_version +Cmf.log_execution_metrics_from_client = log_execution_metrics_from_client +#Cmf.commit_existing_metrics = commit_existing_metrics +Cmf.log_step_metrics_from_client = log_step_metrics_from_client +#Cmf.DataSlice.commit_existing = commit_existing +Cmf.DataSlice.log_dataslice_from_client = log_dataslice_from_client + def metadata_push(pipeline_name: str, filepath = "./mlmd", tensorboard_path: str = "", execution_id: str = ""): """ Pushes MLMD file to CMF-server. Example: diff --git a/cmflib/cmf_merger.py b/cmflib/cmf_merger.py index 82c6082d..449946fd 100644 --- a/cmflib/cmf_merger.py +++ b/cmflib/cmf_merger.py @@ -135,9 +135,12 @@ def parse_json_to_mlmd(mlmd_json, path_to_store: str, cmd: str, exec_id: Union[s cmf_class.log_execution_metrics_from_client(event["artifact"]["name"], custom_props) elif artifact_type == "Dataslice": dataslice = cmf_class.create_dataslice(event["artifact"]["name"]) - dataslice.commit_existing(uri, custom_props) + dataslice.log_dataslice_from_client(uri, props, custom_props) elif artifact_type == "Step_Metrics": - cmf_class.commit_existing_metrics(event["artifact"]["name"], uri, custom_props) + cmf_class.log_step_metrics_from_client(event["artifact"]["name"], uri, props, + custom_props) + elif artifact_type == "Environment": + cmf_class.log_python_env_from_client(artifact_name, uri, props) else: pass except AlreadyExistsError as e: @@ -194,7 +197,4 @@ def create_original_time_since_epoch(mlmd_data): artifact.append(k["artifact"]["create_time_since_epoch"]) # print(k['artifact']['custom_properties']['original_create_time_since_epoch']) - return mlmd_data - - - + return mlmd_data \ No newline at end of file diff --git a/cmflib/cmf_server_methods.py b/cmflib/cmf_server_methods.py new file mode 100644 index 00000000..5e955508 --- /dev/null +++ b/cmflib/cmf_server_methods.py @@ -0,0 +1,766 @@ +"""This module contains all the public API for CMF""" +### +# Copyright (2022) Hewlett Packard Enterprise Development LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### + +import time +import uuid +import re +import typing as t + +# This import is needed for jupyterlab environment +from ml_metadata.proto import metadata_store_pb2 as mlpb +from cmflib.metadata_helper import ( + get_or_create_run_context, + associate_child_to_parent_context, + create_new_execution_in_existing_run_context, + link_execution_to_artifact, + create_new_artifact_event_and_attribution, + link_execution_to_input_artifact, +) + +def merge_created_context( + self, pipeline_stage: str, custom_properties: t.Optional[t.Dict] = None +) -> mlpb.Context: + """Merge created context. + Every call creates a unique pipeline stage. + Created for metadata push purpose. + Example: + + ```python + #Create context + # Import CMF + from cmflib.cmf import Cmf + from ml_metadata.proto import metadata_store_pb2 as mlpb + # Create CMF logger + cmf = Cmf(filepath="mlmd", pipeline_name="test_pipeline") + # Create context + context: mlmd.proto.Context = cmf.merge_created_context( + pipeline_stage="Test-env/prepare", + custom_properties ={"user-metadata1": "metadata_value"} + ``` + Args: + Pipeline_stage: Pipeline_Name/Stage_name. + custom_properties: Developers can provide key value pairs with additional properties of the execution that + need to be stored. + Returns: + Context object from ML Metadata library associated with the new context for this stage. + """ + + custom_props = {} if custom_properties is None else custom_properties + ctx = get_or_create_run_context( + self.store, pipeline_stage, custom_props) + self.child_context = ctx + associate_child_to_parent_context( + store=self.store, parent_context=self.parent_context, child_context=ctx + ) + if self.graph: + self.driver.create_stage_node( + pipeline_stage, self.parent_context, ctx.id, custom_props + ) + return ctx + + +def merge_created_execution( + self, + execution_type: str, + execution_cmd: str, + properties: t.Optional[t.Dict] = None, + custom_properties: t.Optional[t.Dict] = None, + orig_execution_name:str = "", + create_new_execution:bool = True +) -> mlpb.Execution: + """Merge Created execution. + Every call creates a unique execution. Execution can only be created within a context, so + [create_context][cmflib.cmf.Cmf.create_context] must be called first. + Every call occurs when metadata push or pull is processed. Data from pre-existing executions is used + to create new executions with additional data(Required on cmf-server). + Example: + ```python + # Import CMF + from cmflib.cmf import Cmf + from ml_metadata.proto import metadata_store_pb2 as mlpb + # Create CMF logger + cmf = Cmf(filepath="mlmd", pipeline_name="test_pipeline") + # Create or reuse context for this stage + context: mlmd.proto.Context = cmf.merge_created_context( + pipeline_stage="prepare", + custom_properties ={"user-metadata1": "metadata_value"} + ) + # Create a new execution for this stage run + execution: mlmd.proto.Execution = cmf.merge_created_execution( + execution_type="Prepare", + properties={"Context_Type":""}, + custom_properties = {"split": split, "seed": seed}, + orig_execution_name=execution_name + ) + ``` + Args: + execution_type: Type of the execution.(when create_new_execution is False, this is the name of execution) + properties: Properties of Execution. + custom_properties: Developers can provide key value pairs with additional properties of the execution that + need to be stored. + + cmd: command used to run this execution. + + create_new_execution:bool = True, This can be used by advanced users to re-use executions + This is applicable, when working with framework code like mmdet, pytorch lightning etc, where the + custom call-backs are used to log metrics. + if create_new_execution is True(Default), execution_type parameter will be used as the name of the execution type. + if create_new_execution is False, if existing execution exist with the same name as execution_type. + it will be reused. + Only executions created with create_new_execution as False will have "name" as a property. + + + Returns: + Execution object from ML Metadata library associated with the execution for this stage. + """ + # Initializing the execution related fields + properties = {} if properties is None else properties + self.metrics = {} + self.input_artifacts = [] + self.execution_label_props = {} + custom_props = {} if custom_properties is None else custom_properties + # print(custom_props) + git_repo = properties.get("Git_Repo", "") + git_start_commit = properties.get("Git_Start_Commit", "") + #name = properties.get("Name", "") + create_new_execution = True + execution_name = execution_type + #exe.name property is passed as the orig_execution_name. + #if name is not an empty string then we are re-using executions + if orig_execution_name != "": + create_new_execution = False + execution_name = orig_execution_name + + self.execution = create_new_execution_in_existing_run_context( + store=self.store, + execution_type_name=execution_type, # Type field when re-using executions + execution_name=execution_name, #Name field if we are re-using executionsname + #Type field , if creating new executions always + context_id=self.child_context.id, + execution=execution_cmd, + pipeline_id=self.parent_context.id, + pipeline_type=self.parent_context.name, + git_repo=git_repo, + git_start_commit=git_start_commit, + custom_properties=custom_props, + create_new_execution=create_new_execution + ) + + uuids = "" + + uuids = self.execution.properties["Execution_uuid"].string_value + if uuids: + self.execution.properties["Execution_uuid"].string_value = uuids +\ + ","+properties["Execution_uuid"] + else: + self.execution.properties["Execution_uuid"].string_value =\ + properties["Execution_uuid"] + + + self.store.put_executions([self.execution]) + self.execution_name = str(self.execution.id) + "," + execution_type + self.execution_command = execution_cmd + for k, v in custom_props.items(): + k = re.sub("-", "_", k) + self.execution_label_props[k] = v + self.execution_label_props["Execution_Name"] = ( + execution_type + ":" + str(self.execution.id) + ) + self.execution_label_props["execution_command"] = execution_cmd + if self.graph: + self.driver.create_execution_node( + self.execution_name, + self.child_context.id, + self.parent_context, + execution_cmd, + self.execution.id, + custom_props, + ) + + # link the artifact to execution if it exists and creates artifact if it doesn't + return self.execution + + +def log_python_env_from_client( + self, + url: str, + uri: str, + props: t.Optional[t.Dict] = None, + ) -> mlpb.Artifact: + "Used to log the python packages involved in the current execution" + + git_repo = props.get("git_repo", "") + name = url + existing_artifact = [] + c_hash = uri + commit = props.get("Commit", "") + url = url + ":" + c_hash + if c_hash and c_hash.strip: + existing_artifact.extend(self.store.get_artifacts_by_uri(c_hash)) + + if existing_artifact and len(existing_artifact) != 0: + existing_artifact = existing_artifact[0] + artifact = link_execution_to_artifact( + store=self.store, + execution_id=self.execution.id, + uri=uri, + input_name=url, + event_type=mlpb.Event.Type.INPUT, + ) + else: + uri = c_hash if c_hash and c_hash.strip() else str(uuid.uuid1()) + artifact = create_new_artifact_event_and_attribution( + store=self.store, + execution_id=self.execution.id, + context_id=self.child_context.id, + uri=uri, + name=url, + type_name="Environment", + event_type=mlpb.Event.Type.INPUT, + properties={ + "git_repo": str(git_repo), + # passing c_hash value to commit + "Commit": str(commit), + "url": props.get("url", ""), + }, + artifact_type_properties={ + "git_repo": mlpb.STRING, + "Commit": mlpb.STRING, + "url": mlpb.STRING, + }, + milliseconds_since_epoch=int(time.time() * 1000), + ) + custom_props = {} + custom_props["git_repo"] = git_repo + custom_props["Commit"] = commit + self.execution_label_props["git_repo"] = git_repo + self.execution_label_props["Commit"] = commit + + if self.graph: + self.driver.create_env_node( + name, + url, + uri, + "input", + self.execution.id, + self.parent_context, + custom_props, + ) + self.input_artifacts.append( + { + "Name": name, + "Path": url, + "URI": uri, + "Event": "input", + "Execution_Name": self.execution_name, + "Type": "Environment", + "Execution_Command": self.execution_command, + "Pipeline_Id": self.parent_context.id, + "Pipeline_Name": self.parent_context.name, + } + ) + self.driver.create_execution_links(uri, name, "Environment") + return artifact + + +def log_dataset_with_version( + self, + url: str, + version: str, + event: str, + props: t.Optional[t.Dict] = None, + custom_properties: t.Optional[t.Dict] = None, +) -> mlpb.Artifact: + """Logs a dataset when the version (hash) is known. + Example: + ```python + artifact: mlpb.Artifact = cmf.log_dataset_with_version( + url="path/to/dataset", + version="abcdef", + event="output", + props={ "git_repo": "https://github.com/example/repo", + "url": "/path/in/repo", }, + custom_properties={ "custom_key": "custom_value", }, + ) + ``` + Args: + url: Path to the dataset. + version: Hash or version identifier for the dataset. + event: Takes arguments `INPUT` or `OUTPUT`. + props: Optional properties for the dataset (e.g., git_repo, url). + custom_properties: Optional custom properties for the dataset. + Returns: + Artifact object from the ML Protocol Buffers library associated with the new dataset artifact. + """ + + props = {} if props is None else props + custom_props = {} if custom_properties is None else custom_properties + git_repo = props.get("git_repo", "") + name = url + event_type = mlpb.Event.Type.OUTPUT + existing_artifact = [] + c_hash = version + if event.lower() == "input": + event_type = mlpb.Event.Type.INPUT + + # dataset_commit = commit_output(url, self.execution.id) + + dataset_commit = version + url = url + ":" + c_hash + if c_hash and c_hash.strip: + existing_artifact.extend(self.store.get_artifacts_by_uri(c_hash)) + + # To Do - What happens when uri is the same but names are different + if existing_artifact and len(existing_artifact) != 0: + existing_artifact = existing_artifact[0] + + # Quick fix- Updating only the name + if custom_properties is not None: + self.update_existing_artifact( + existing_artifact, custom_properties) + uri = c_hash + # update url for existing artifact + self.update_dataset_url(existing_artifact, props.get("url", "")) + artifact = link_execution_to_artifact( + store=self.store, + execution_id=self.execution.id, + uri=uri, + input_name=url, + event_type=event_type, + ) + else: + # if((existing_artifact and len(existing_artifact )!= 0) and c_hash != ""): + # url = url + ":" + str(self.execution.id) + uri = c_hash if c_hash and c_hash.strip() else str(uuid.uuid1()) + artifact = create_new_artifact_event_and_attribution( + store=self.store, + execution_id=self.execution.id, + context_id=self.child_context.id, + uri=uri, + name=url, + type_name="Dataset", + event_type=event_type, + properties={ + "git_repo": str(git_repo), + "Commit": str(dataset_commit), + "url": props.get("url", " "), + }, + artifact_type_properties={ + "git_repo": mlpb.STRING, + "Commit": mlpb.STRING, + "url": mlpb.STRING, + }, + custom_properties=custom_props, + milliseconds_since_epoch=int(time.time() * 1000), + ) + custom_props["git_repo"] = git_repo + custom_props["Commit"] = dataset_commit + self.execution_label_props["git_repo"] = git_repo + self.execution_label_props["Commit"] = dataset_commit + + if self.graph: + self.driver.create_dataset_node( + name, + url, + uri, + event, + self.execution.id, + self.parent_context, + custom_props, + ) + if event.lower() == "input": + self.input_artifacts.append( + { + "Name": name, + "Path": url, + "URI": uri, + "Event": event.lower(), + "Execution_Name": self.execution_name, + "Type": "Dataset", + "Execution_Command": self.execution_command, + "Pipeline_Id": self.parent_context.id, + "Pipeline_Name": self.parent_context.name, + } + ) + self.driver.create_execution_links(uri, name, "Dataset") + else: + child_artifact = { + "Name": name, + "Path": url, + "URI": uri, + "Event": event.lower(), + "Execution_Name": self.execution_name, + "Type": "Dataset", + "Execution_Command": self.execution_command, + "Pipeline_Id": self.parent_context.id, + "Pipeline_Name": self.parent_context.name, + } + self.driver.create_artifact_relationships( + self.input_artifacts, child_artifact, self.execution_label_props + ) + return artifact + + + +# Add the model to dvc do a git commit and store the commit id in MLMD +def log_model_with_version( + self, + path: str, + event: str, + props=None, + custom_properties: t.Optional[t.Dict] = None, +) -> object: + """Logs a model when the version(hash) is known + The model is added to dvc and the metadata file (.dvc) gets committed to git. + Example: + ```python + artifact: mlmd.proto.Artifact= cmf.log_model_with_version( + path="path/to/model.pkl", + event="output", + props={ + "url": "/home/user/local-storage/bf/629ccd5cd008066b72c04f9a918737", + "model_type": "RandomForestClassifier", + "model_name": "RandomForestClassifier:default", + "Commit": "commit 1146dad8b74cae205db6a3132ea403db1e4032e5", + "model_framework": "SKlearn", + }, + custom_properties={ + "uri": "bf629ccd5cd008066b72c04f9a918737", + }, + + ) + ``` + Args: + path: Path to the model file. + event: Takes arguments `INPUT` OR `OUTPUT`. + props: Model artifact properties. + custom_properties: The model properties. + Returns: + Artifact object from ML Metadata library associated with the new model artifact. + """ + + if custom_properties is None: + custom_properties = {} + custom_props = {} if custom_properties is None else custom_properties + name = re.split("/", path)[-1] + event_type = mlpb.Event.Type.OUTPUT + existing_artifact = [] + if event.lower() == "input": + event_type = mlpb.Event.Type.INPUT + + # props["commit"] = "" # To do get from incoming data + c_hash = props.get("uri", " ") + # If connecting to an existing artifact - The name of the artifact is used as path/steps/key + model_uri = path + ":" + c_hash + # dvc_url = dvc_get_url(path, False) + url = props.get("url", "") + # uri = "" + if c_hash and c_hash.strip(): + uri = c_hash.strip() + existing_artifact.extend(self.store.get_artifacts_by_uri(uri)) + else: + raise RuntimeError("Model commit failed, Model uri empty") + + if ( + existing_artifact + and len(existing_artifact) != 0 + ): + # update url for existing artifact + existing_artifact = self.update_model_url(existing_artifact, url) + artifact = link_execution_to_artifact( + store=self.store, + execution_id=self.execution.id, + uri=c_hash, + input_name=model_uri, + event_type=event_type, + ) + model_uri = artifact.name + else: + uri = c_hash if c_hash and c_hash.strip() else str(uuid.uuid1()) + model_uri = model_uri + ":" + str(self.execution.id) + artifact = create_new_artifact_event_and_attribution( + store=self.store, + execution_id=self.execution.id, + context_id=self.child_context.id, + uri=uri, + name=model_uri, + type_name="Model", + event_type=event_type, + properties={ + "model_framework": props.get("model_framework", ""), + "model_type": props.get("model_type", ""), + "model_name": props.get("model_name", ""), + "Commit": props.get("Commit", ""), + "url": str(url), + }, + artifact_type_properties={ + "model_framework": mlpb.STRING, + "model_type": mlpb.STRING, + "model_name": mlpb.STRING, + "Commit": mlpb.STRING, + "url": mlpb.STRING, + }, + custom_properties=custom_props, + milliseconds_since_epoch=int(time.time() * 1000), + ) + custom_properties["Commit"] = props.get("Commit", "") + custom_props["url"] = url + self.execution_label_props["Commit"] = props.get("Commit", "") + if self.graph: + self.driver.create_model_node( + model_uri, + uri, + event, + self.execution.id, + self.parent_context, + custom_props, + ) + if event.lower() == "input": + self.input_artifacts.append( + { + "Name": model_uri, + "URI": uri, + "Event": event.lower(), + "Execution_Name": self.execution_name, + "Type": "Model", + "Execution_Command": self.execution_command, + "Pipeline_Id": self.parent_context.id, + "Pipeline_Name": self.parent_context.name, + } + ) + self.driver.create_execution_links(uri, model_uri, "Model") + else: + child_artifact = { + "Name": model_uri, + "URI": uri, + "Event": event.lower(), + "Execution_Name": self.execution_name, + "Type": "Model", + "Execution_Command": self.execution_command, + "Pipeline_Id": self.parent_context.id, + "Pipeline_Name": self.parent_context.name, + } + self.driver.create_artifact_relationships( + self.input_artifacts, child_artifact, self.execution_label_props + ) + + return artifact + +def log_execution_metrics_from_client(self, metrics_name: str, + custom_properties: t.Optional[t.Dict] = None) -> mlpb.Artifact: + """ Logs execution metrics from a client. + Data from pre-existing metrics from client side is used to create identical metrics on server side. + Example: + ```python + artifact: mlpb.Artifact = cmf.log_execution_metrics_from_client( + metrics_name="example_metrics:uri:123", + custom_properties={"custom_key": "custom_value"}, + ) + ``` + Args: + metrics_name: Name of the metrics in the format "name:uri:execution_id". + custom_properties: Optional custom properties for the metrics. + Returns: + Artifact object from the ML Protocol Buffers library associated with the metrics artifact. + """ + + metrics = None + custom_props = {} if custom_properties is None else custom_properties + existing_artifact = [] + name_tokens = metrics_name.split(":") + if name_tokens and len(name_tokens) > 2: + name = name_tokens[0] + uri = name_tokens[1] + execution_id = name_tokens[2] + else: + print(f"Error : metrics name {metrics_name} is not in the correct format") + return + + #we need to add the execution id to the metrics name + new_metrics_name = f"{name}:{uri}:{str(self.execution.id)}" + existing_artifacts = self.store.get_artifacts_by_uri(uri) + + existing_artifact = existing_artifacts[0] if existing_artifacts else None + if not existing_artifact or \ + ((existing_artifact) and not + (existing_artifact.name == new_metrics_name)): #we need to add the artifact otherwise its already there + metrics = create_new_artifact_event_and_attribution( + store=self.store, + execution_id=self.execution.id, + context_id=self.child_context.id, + uri=uri, + name=new_metrics_name, + type_name="Metrics", + event_type=mlpb.Event.Type.OUTPUT, + properties={"metrics_name": metrics_name}, + artifact_type_properties={"metrics_name": mlpb.STRING}, + custom_properties=custom_props, + milliseconds_since_epoch=int(time.time() * 1000), + ) + + if self.graph: + # To do create execution_links + self.driver.create_metrics_node( + metrics_name, + uri, + "output", + self.execution.id, + self.parent_context, + custom_props, + ) + child_artifact = { + "Name": metrics_name, + "URI": uri, + "Event": "output", + "Execution_Name": self.execution_name, + "Type": "Metrics", + "Execution_Command": self.execution_command, + "Pipeline_Id": self.parent_context.id, + "Pipeline_Name": self.parent_context.name, + } + self.driver.create_artifact_relationships( + self.input_artifacts, child_artifact, self.execution_label_props + ) + return metrics + + + +def log_step_metrics_from_client(self, metrics_name: str, uri: str, props: t.Optional[t.Dict] = None, custom_properties: t.Optional[t.Dict] = None): + """ + Commits existing metrics associated with the given URI to MLMD. + Example: + ```python + artifact: mlpb.Artifact = cmf.log_metrics_from_client("existing_metrics", "abc123", + {"custom_key": "custom_value"}) + ``` + Args: + metrics_name: Name of the metrics. + uri: Unique identifier associated with the metrics. + custom_properties: Optional custom properties for the metrics. + Returns: + Artifact object from the ML Protocol Buffers library associated with the existing metrics artifact. + """ + + custom_props = {} if custom_properties is None else custom_properties + c_hash = uri.strip() + existing_artifact = [] + existing_artifact.extend(self.store.get_artifacts_by_uri(c_hash)) + if (existing_artifact + and len(existing_artifact) != 0 ): + metrics = link_execution_to_artifact( + store=self.store, + execution_id=self.execution.id, + uri=c_hash, + input_name=metrics_name, + event_type=mlpb.Event.Type.OUTPUT, + ) + else: + metrics = create_new_artifact_event_and_attribution( + store=self.store, + execution_id=self.execution.id, + context_id=self.child_context.id, + uri=uri, + name=metrics_name, + type_name="Step_Metrics", + event_type=mlpb.Event.Type.OUTPUT, + properties={ + # passing uri value to commit + "Commit": props.get("Commit", ""), + "url": props.get("url", ""), + }, + artifact_type_properties={ + "Commit": mlpb.STRING, + "url": mlpb.STRING, + }, + custom_properties=custom_props, + milliseconds_since_epoch=int(time.time() * 1000), + ) + + metrics_commit = props.get("Commit", "") + custom_props["Commit"] = metrics_commit + self.execution_label_props["Commit"] = metrics_commit + + if self.graph: + self.driver.create_step_metrics_node( + metrics_name, + uri, + "output", + self.execution.id, + self.parent_context, + custom_props, + ) + child_artifact = { + "Name": metrics_name, + "URI": uri, + "Event": "output", + "Execution_Name": self.execution_name, + "Type": "Step_Metrics", + "Execution_Command": self.execution_command, + "Pipeline_Id": self.parent_context.id, + } + self.driver.create_artifact_relationships( + self.input_artifacts, child_artifact, self.execution_label_props + ) + return metrics + +# commit existing dataslice to server +def log_dataslice_from_client(self, uri: str, props: t.Optional[t.Dict] = None, custom_properties: t.Optional[t.Dict] = None) -> None: + custom_props = {} if custom_properties is None else custom_properties + c_hash = uri.strip() + dataslice_commit = c_hash + existing_artifact = [] + if c_hash and c_hash.strip(): + existing_artifact.extend( + self.writer.store.get_artifacts_by_uri(c_hash)) + if existing_artifact and len(existing_artifact) != 0: + print("Adding to existing data slice") + # Haven't added event type in this if cond, is it not needed?? + slice = link_execution_to_input_artifact( + store=self.writer.store, + execution_id=self.writer.execution.id, + uri=c_hash, + input_name=self.name, + ) + else: + slice = create_new_artifact_event_and_attribution( + store=self.writer.store, + execution_id=self.writer.execution.id, + context_id=self.writer.child_context.id, + uri=c_hash, + name=self.name, + type_name="Dataslice", + event_type=mlpb.Event.Type.OUTPUT, + properties={ + "git_repo": props.get("git_repo", ""), + "Commit": props.get("Commit", ""), + "url": props.get("url", " "), + }, + artifact_type_properties={ + "git_repo": mlpb.STRING, + "Commit": mlpb.STRING, + "url": mlpb.STRING, + }, + custom_properties=custom_props, + milliseconds_since_epoch=int(time.time() * 1000), + ) + custom_props["git_repo"] = props.get("git_repo", "") + custom_props["Commit"] = props.get("Commit", "") + if self.writer.graph: + self.writer.driver.create_dataslice_node( + self.name, self.name, c_hash, self.data_parent, custom_props + ) + return slice \ No newline at end of file diff --git a/cmflib/commands/metadata/push.py b/cmflib/commands/metadata/push.py index bd630397..71672821 100644 --- a/cmflib/commands/metadata/push.py +++ b/cmflib/commands/metadata/push.py @@ -26,18 +26,31 @@ # This class pushes mlmd file to cmf-server class CmdMetadataPush(CmdBase): + + # Create a function to search for files into multiple directories + def search_files(self, file_list, *directories): + found_files = {} + for directory in directories: + abs_dir = os.path.abspath(directory) + for file_name in file_list: + if isinstance(file_name, str): + file_path = os.path.join(abs_dir, file_name) + if os.path.isfile(file_path): + found_files[file_name] = file_path + return found_files + def run(self): - current_directory = os.getcwd() + current_directory = mlmd_directory = os.getcwd() mlmd_file_name = "./mlmd" # checks if mlmd filepath is given if self.args.file_name: mlmd_file_name = self.args.file_name - current_directory = os.path.dirname(self.args.file_name) + mlmd_directory = os.path.dirname(self.args.file_name) # checks if mlmd file is present in current directory or given directory if not os.path.exists(mlmd_file_name): - return f"ERROR: {mlmd_file_name} doesn't exists in the {current_directory}." + return f"ERROR: {mlmd_file_name} doesn't exists in the {mlmd_directory}." query = cmfquery.CmfQuery(mlmd_file_name) # print(json.dumps(json.loads(json_payload), indent=4, sort_keys=True)) @@ -63,74 +76,89 @@ def run(self): # Checks if pipeline name exists if self.args.pipeline_name in query.get_pipeline_names(): - # converts mlmd file to json format - json_payload = query.dumptojson(self.args.pipeline_name, None) - # checks if execution_id is given by user + execution = None + exec_id = None if self.args.execution: - exec_id = self.args.execution - mlmd_data = json.loads(json_payload)["Pipeline"] - # checks if given execution_id present in mlmd - for i in mlmd_data[0]["stages"]: - for j in i["executions"]: - if j["id"] == int(exec_id): - execution_flag = 1 - # calling mlmd_push api to push mlmd file to cmf-server - response = server_interface.call_mlmd_push( - json_payload, url, exec_id, self.args.pipeline_name - ) - break - if execution_flag == 0: + execution = cmfquery.get_all_executions_by_ids_list([self.args.execution]) + if execution.empty: return "Given execution is not found in mlmd." - else: - exec_id = None - response = server_interface.call_mlmd_push(json_payload, url, exec_id, self.args.pipeline_name) + exec_id = self.args.execution + # converts mlmd file to json format + json_payload = query.dumptojson(self.args.pipeline_name, None) + response = server_interface.call_mlmd_push(json_payload, url, exec_id, self.args.pipeline_name) status_code = response.status_code - if status_code == 200 and response.json()['status']=="success": - print("mlmd is successfully pushed.") - elif status_code==200 and response.json()["status"]=="exists": - print("Executions already exists.") - elif status_code==422 and response.json()["status"]=="version_update": + + # we need to push the python env files only after the mlmd push has succeded + # otherwise there is no use of those python env files on cmf-server + + if status_code==422 and response.json()["status"]=="version_update": return "ERROR: You need to update cmf to the latest version. Unable to push metadata file." elif status_code == 404: return "ERROR: cmf-server is not available." elif status_code == 500: return "ERROR: Internal server error." - else: - return "ERROR: Status Code = {status_code}. Unable to push mlmd." - - if self.args.tensorboard: - # /tensorboard api call is done only if mlmd push is successfully completed - # tensorboard parameter is passed - print("......................................") - print("tensorboard logs upload started!!") - print("......................................") - - # check if the path provided is for a file - if os.path.isfile(self.args.tensorboard): - file_name = os.path.basename(self.args.tensorboard) - tresponse = server_interface.call_tensorboard(url, self.args.pipeline_name, file_name, self.args.tensorboard) - tstatus_code = tresponse.status_code - if tstatus_code == 200: - return "tensorboard logs: file {file_name} pushed successfully" - else: - return "ERROR: Failed to upload file {file_name}. Server response: {response.text}" - # If path provided is a directory - elif os.path.isdir(self.args.tensorboard): - # Recursively push all files and subdirectories - for root, dirs, files in os.walk(self.args.tensorboard): - for file_name in files: - file_path = os.path.join(root, file_name) - relative_path = os.path.relpath(file_path, self.args.tensorboard) - tresponse = server_interface.call_tensorboard(url, self.args.pipeline_name, relative_path, file_path) - if tresponse.status_code == 200: - print(f"tensorboard logs: File {file_name} uploaded successfully.") - else: - return f"ERROR: Failed to upload file {file_name}. Server response: {tresponse.text}" - return f"tensorboard logs: {self.args.tensorboard} uploaded successfully!!" + elif status_code == 200: + # the only question remains how we want to percieve the failure of upload of the python env files + # for now, it is considered as non-consequential. + # that means it's failure/success doesn't matter. + # however, we will be keeping the record of the status code. + + # Getting all executions df to get the custom property 'Python_Env' + executions = query.get_all_executions_in_pipeline(self.args.pipeline_name) + if not executions.empty: + if 'custom_properties_Python_Env' in executions.columns: + list_of_env_files = executions['custom_properties_Python_Env'].drop_duplicates().tolist() + # This is a validation step to suppress errors in case user is pushing the mlmd + # from a directory in which 'cmf_artifacts/Python_Env_hash.txt' is not present. + # Find the valid file paths. + found_files = self.search_files(list_of_env_files, current_directory, mlmd_directory) + + # push valid files on cmf-server + if found_files: + for name, path in found_files.items(): + env_response = server_interface.call_python_env(url, name, path) + # keeping record of status but this won't affect the mlmd success. + print(env_response.json()) + + output = response.json()['status'] + if output =="success": + output = "mlmd is successfully pushed." + elif output =="exists": + output = "Executions already exists." + if not self.args.tensorboard: + return output else: - return "ERROR: Invalid data path. Provide valid file/folder path for tensorboard logs!!" + print(output) + # /tensorboard api call is done only if mlmd push is successfully completed + # tensorboard parameter is passed + print("......................................") + print("tensorboard logs upload started!!") + print("......................................") + # check if the path provided is for a file + if os.path.isfile(self.args.tensorboard): + file_name = os.path.basename(self.args.tensorboard) + tresponse = server_interface.call_tensorboard(url, self.args.pipeline_name, file_name, self.args.tensorboard) + tstatus_code = tresponse.status_code + if tstatus_code == 200: + return "tensorboard logs: file {file_name} pushed successfully" + else: + return "ERROR: Failed to upload file {file_name}. Server response: {response.text}" + elif os.path.isdir(self.args.tensorboard): + # Recursively push all files and subdirectories + for root, dirs, files in os.walk(self.args.tensorboard): + for file_name in files: + file_path = os.path.join(root, file_name) + relative_path = os.path.relpath(file_path, self.args.tensorboard) + tresponse = server_interface.call_tensorboard(url, self.args.pipeline_name, relative_path, file_path) + if tresponse.status_code == 200: + print(f"tensorboard logs: File {file_name} uploaded successfully.") + else: + return f"ERROR: Failed to upload file {file_name}. Server response: {tresponse.text}" + return f"tensorboard logs: {self.args.tensorboard} uploaded successfully!!" + else: + return "ERROR: Invalid data path. Provide valid file/folder path for tensorboard logs!!" else: - return "SUCCESS!!" + return "ERROR: Status Code = {status_code}. Unable to push mlmd." else: return "Pipeline name " + self.args.pipeline_name + " doesn't exists." diff --git a/cmflib/graph_wrapper.py b/cmflib/graph_wrapper.py index 29b5ab41..8d73d29c 100644 --- a/cmflib/graph_wrapper.py +++ b/cmflib/graph_wrapper.py @@ -88,6 +88,22 @@ def create_dataset_node(self, name: str, path: str, uri: str, event: str, execut "Execution", "Dataset", self.execution_id, node_id, event) _ = session.write_transaction(self._run_transaction, pc_syntax) + def create_env_node(self, name: str, path: str, uri: str, event: str, execution_id: int, + pipeline_context: mlpb.Context, custom_properties=None): + if custom_properties is None: + custom_properties = {} + pipeline_id = pipeline_context.id + pipeline_name = pipeline_context.name + dataset_syntax = self._create_env_syntax( + name, path, uri, pipeline_id, pipeline_name, custom_properties) + with self.driver.session() as session: + node = session.write_transaction( + self._run_transaction, dataset_syntax) + node_id = node[0]["node_id"] + pc_syntax = self._create_execution_artifacts_link_syntax( + "Execution", "Environment", self.execution_id, node_id, event) + _ = session.write_transaction(self._run_transaction, pc_syntax) + def create_dataslice_node(self, name: str, path: str, uri: str, parent_name:str, custom_properties=None): if custom_properties is None: @@ -144,6 +160,22 @@ def create_metrics_node(self, name: str, uri: str, event: str, execution_id: int "Execution", "Metrics", self.execution_id, node_id, event) _ = session.write_transaction(self._run_transaction, pc_syntax) + def create_step_metrics_node(self, name: str, uri: str, event: str, execution_id: int, pipeline_context: mlpb.Context, + custom_properties=None): + if custom_properties is None: + custom_properties = {} + pipeline_id = pipeline_context.id + pipeline_name = pipeline_context.name + metrics_syntax = self._create_step_metrics_syntax( + name, uri, event, execution_id, pipeline_id, pipeline_name, custom_properties) + with self.driver.session() as session: + node = session.write_transaction( + self._run_transaction, metrics_syntax) + node_id = node[0]["node_id"] + pc_syntax = self._create_execution_artifacts_link_syntax( + "Execution", "Step_Metrics", self.execution_id, node_id, event) + _ = session.write_transaction(self._run_transaction, pc_syntax) + def create_artifact_relationships( self, parent_artifacts, @@ -185,10 +217,10 @@ def create_execution_links( parent_execution_query = "MATCH (n:{}".format( parent_artifact_type) + "{uri: '" + parent_artifact_uri + "'}) " \ - "<-[:output]-(f:Execution) Return ID(f)as id, f.uri as uri" + "<-[:output]-(f:Execution) Return ELEMENTID(f) as id, f.uri as uri" already_linked_execution_query = "MATCH (f)-[r:linked]->(e2:Execution) " \ - "WHERE r.uri = '{}' RETURN ID(f)as id, f.uri as uri".format(parent_artifact_uri) + "WHERE r.uri = '{}' RETURN ELEMENTID(f) as id, f.uri as uri".format(parent_artifact_uri) with self.driver.session() as session: execution_parent = session.read_transaction( @@ -219,7 +251,7 @@ def create_execution_links( def _get_node(self, node_label: str, node_name: str)->int: #Match(n:Metrics) where n.Name contains 'metrics_1' return n search_syntax = "MATCH (n:{}) where '{}' in n.Name \ - return ID(n) as node_id".format(node_label, node_name) + return ELEMENTID(n) as node_id".format(node_label, node_name) print(search_syntax) node_id = None with self.driver.session() as session: @@ -232,7 +264,7 @@ def _get_node(self, node_label: str, node_name: str)->int: def _get_node_with_path(self, node_label: str, node_path: str)->int: #Match(n:Metrics) where n.Path contains 'metrics_1' return n search_syntax = "MATCH (n:{}) where '{}' in n.Path \ - return ID(n) as node_id".format(node_label, node_path) + return ELEMENTID(n) as node_id".format(node_label, node_path) print(search_syntax) node_id = None with self.driver.session() as session: @@ -261,7 +293,7 @@ def _create_pipeline_syntax(name: str, props: t.Dict, uri: int) -> str: k = re.sub('\W+', '', k) syntax_str = syntax_str + k + ":" + "\"" + v + "\"" + "," syntax_str = syntax_str.rstrip(syntax_str[-1]) - syntax_str = syntax_str + "}) RETURN ID(a) as node_id" + syntax_str = syntax_str + "}) RETURN ELEMENTID(a) as node_id" return syntax_str # Todo - Verify what is considered as unique node . is it a combination of @@ -282,7 +314,25 @@ def _create_dataset_syntax(name: str, path: str, uri: str, pipeline_id: int, pip " = coalesce([x in a." + k + " where x <>\"" + str(v) + "\"], []) + \"" + str(v) + "\"," syntax_str = syntax_str + props_str syntax_str = syntax_str.rstrip(",") - syntax_str = syntax_str + " RETURN ID(a) as node_id" + syntax_str = syntax_str + " RETURN ELEMENTID(a) as node_id" + return syntax_str + + @staticmethod + def _create_env_syntax(name: str, path: str, uri: str, pipeline_id: int, pipeline_name: str, + custom_properties): + custom_properties["Name"] = name + custom_properties["Path"] = path + custom_properties["pipeline_id"] = str(pipeline_id) + custom_properties["pipeline_name"] = pipeline_name + syntax_str = "MERGE (a:Environment {uri:\"" + uri + "\"}) SET " + # props_str = "" + for k, v in custom_properties.items(): + k = re.sub('\W+', '', k) + props_str = "a." + k + \ + " = coalesce([x in a." + k + " where x <>\"" + str(v) + "\"], []) + \"" + str(v) + "\"," + syntax_str = syntax_str + props_str + syntax_str = syntax_str.rstrip(",") + syntax_str = syntax_str + " RETURN ELEMENTID(a) as node_id" return syntax_str @staticmethod @@ -298,7 +348,7 @@ def _create_dataslice_syntax(name: str, path: str, uri: str, " = coalesce([x in a." + k + " where x <>\"" + str(v) + "\"], []) + \"" + str(v) + "\"," syntax_str = syntax_str + props_str syntax_str = syntax_str.rstrip(",") - syntax_str = syntax_str + " RETURN ID(a) as node_id" + syntax_str = syntax_str + " RETURN ELEMENTID(a) as node_id" return syntax_str @staticmethod @@ -314,7 +364,7 @@ def _create_model_syntax(name: str, uri: str, pipeline_id: int, pipeline_name: s #syntax_str = syntax_str + k + ":" + "\"" + str(v) + "\"" + "," syntax_str = syntax_str + props_str syntax_str = syntax_str.rstrip(",") - syntax_str = syntax_str + " RETURN ID(a) as node_id" + syntax_str = syntax_str + " RETURN ELEMENTID(a) as node_id" return syntax_str @staticmethod @@ -331,7 +381,24 @@ def _create_metrics_syntax(name: str, uri: str, event: str, execution_id: int, p syntax_str = syntax_str + k + ":" + "\"" + str(v) + "\"" + "," syntax_str = syntax_str.rstrip(syntax_str[-1]) syntax_str = syntax_str + "})" - syntax_str = syntax_str + " RETURN ID(a) as node_id" + syntax_str = syntax_str + " RETURN ELEMENTID(a) as node_id" + return syntax_str + + @staticmethod + def _create_step_metrics_syntax(name: str, uri: str, event: str, execution_id: int, pipeline_id: int, + pipeline_name: str, custom_properties): + custom_properties["Name"] = name + custom_properties["uri"] = uri + # custom_properties["execution_id"] = str(execution_id) + custom_properties["pipeline_id"] = str(pipeline_id) + custom_properties["pipeline_name"] = pipeline_name + syntax_str = "MERGE (a:Step_Metrics {" # + str(props) + ")" + for k, v in custom_properties.items(): + k = re.sub('\W+', '', k) + syntax_str = syntax_str + k + ":" + "\"" + str(v) + "\"" + "," + syntax_str = syntax_str.rstrip(syntax_str[-1]) + syntax_str = syntax_str + "})" + syntax_str = syntax_str + " RETURN ELEMENTID(a) as node_id" return syntax_str @staticmethod @@ -346,13 +413,13 @@ def _create_stage_syntax(name: str, props: t.Dict, uri: int, pipeline_id: int, p syntax_str = syntax_str + k + ":" + "\"" + str(v) + "\"" + "," syntax_str = syntax_str.rstrip(syntax_str[-1]) - syntax_str = syntax_str + "}) RETURN ID(a) as node_id" + syntax_str = syntax_str + "}) RETURN ELEMENTID(a) as node_id" return syntax_str @staticmethod def _create_parent_child_syntax(parent_label: str, child_label: str, parent_id: int, child_id: int, relation: str): - parent_child_syntax = "MATCH (a:{}), (b:{}) where ID(a) = {} AND ID(b) = {} MERGE (a)-[r:{}]->(b) \ + parent_child_syntax = "MATCH (a:{}), (b:{}) where ELEMENTID(a) = '{}' AND ELEMENTID(b) = '{}' MERGE (a)-[r:{}]->(b) \ return type(r)".format(parent_label, child_label, parent_id, child_id, relation) return parent_child_syntax @@ -360,10 +427,10 @@ def _create_parent_child_syntax(parent_label: str, child_label: str, parent_id: def _create_execution_artifacts_link_syntax(parent_label: str, child_label: str, parent_id: int, child_id: int, relation: str): if relation.lower() == "input": - parent_child_syntax = "MATCH (a:{}), (b:{}) where ID(a) = {} AND ID(b) = {} MERGE (a)<-[r:{}]-(b) \ + parent_child_syntax = "MATCH (a:{}), (b:{}) where ELEMENTID(a) = '{}' AND ELEMENTID(b) = '{}' MERGE (a)<-[r:{}]-(b) \ return type(r)".format(parent_label, child_label, parent_id, child_id, relation) else: - parent_child_syntax = "MATCH (a:{}), (b:{}) where ID(a) = {} AND ID(b) = {} MERGE (a)-[r:{}]->(b) \ + parent_child_syntax = "MATCH (a:{}), (b:{}) where ELEMENTID(a) = '{}' AND ELEMENTID(b) = '{}' MERGE (a)-[r:{}]->(b) \ return type(r)".format(parent_label, child_label, parent_id, child_id, relation) return parent_child_syntax @@ -380,7 +447,7 @@ def _create_execution_link_syntax(parent_label: str, child_label: str, parent_ur CREATE (a)-[r:RELTYPE]->(b) RETURN type(r) """ - parent_child_syntax_1 = "MATCH (a:{}), (b:{}) WHERE a.uri = '{}' AND ID(a) = {} AND ID(b) = {} ".format( + parent_child_syntax_1 = "MATCH (a:{}), (b:{}) WHERE a.uri = '{}' AND ELEMENTID(a) = '{}' AND ELEMENTID(b) = '{}' ".format( parent_label, child_label, parent_uri, parent_id, child_id) parent_child_syntax_2 = "MERGE (a)-[r:{}".format(relation) parent_child_syntax_3 = "{" @@ -434,5 +501,5 @@ def _create_execution_syntax(name: str, command: str, props: t.Dict, uri: int, p syntax_str = syntax_str + k + ":" + "\"" + v + "\"" + "," syntax_str = syntax_str.rstrip(syntax_str[-1]) - syntax_str = syntax_str + "}) RETURN ID(a) as node_id" + syntax_str = syntax_str + "}) RETURN ELEMENTID(a) as node_id" return syntax_str diff --git a/cmflib/metadata_helper.py b/cmflib/metadata_helper.py index 91f3b6d1..73feeedf 100644 --- a/cmflib/metadata_helper.py +++ b/cmflib/metadata_helper.py @@ -316,7 +316,6 @@ def create_new_execution_in_existing_context( EXECUTION_REPO = "Git_Repo" EXECUTION_START_COMMIT = "Git_Start_Commit" EXECUTION_END_COMMIT = "Git_End_Commit" -EXECUTION_PYTHON_ENV= "Python_Env" EXECUTION_PIPELINE_TYPE = "Pipeline_Type" EXECUTION_PIPELINE_ID = "Pipeline_id" @@ -393,7 +392,6 @@ def create_new_execution_in_existing_run_context( git_repo: str = None, git_start_commit: str = None, git_end_commit: str = "", - python_env: str = "", custom_properties: dict = None, create_new_execution:bool = True ) -> metadata_store_pb2.Execution: @@ -418,7 +416,6 @@ def create_new_execution_in_existing_run_context( EXECUTION_REPO: metadata_store_pb2.STRING, EXECUTION_START_COMMIT: metadata_store_pb2.STRING, EXECUTION_END_COMMIT: metadata_store_pb2.STRING, - EXECUTION_PYTHON_ENV: metadata_store_pb2.STRING, }, properties={ @@ -433,7 +430,6 @@ def create_new_execution_in_existing_run_context( EXECUTION_REPO: metadata_store_pb2.Value(string_value=git_repo), EXECUTION_START_COMMIT: metadata_store_pb2.Value(string_value=git_start_commit), EXECUTION_END_COMMIT: metadata_store_pb2.Value(string_value=git_end_commit), - EXECUTION_PYTHON_ENV: metadata_store_pb2.Value(string_value=python_env), # should set to task ID, not component ID }, custom_properties=mlmd_custom_properties, diff --git a/cmflib/server_interface/server_interface.py b/cmflib/server_interface/server_interface.py index 3b3730c6..521d53cb 100644 --- a/cmflib/server_interface/server_interface.py +++ b/cmflib/server_interface/server_interface.py @@ -17,7 +17,7 @@ import requests import json -# This function posts mlmd data to mlmd_push api on cmf-server +# This function posts mlmd data on cmf-server using mlmd_push rest api def call_mlmd_push(json_payload, url, exec_id, pipeline_name): url_to_pass = f"{url}/mlmd_push" json_data = {"id": exec_id, "json_payload": json_payload, "pipeline_name": pipeline_name} @@ -40,3 +40,10 @@ def call_tensorboard(url, pipeline_name, file_name, file_path): params = {'pipeline_name': pipeline_name} response = requests.post(url_to_pass, files=files, params=params) return response + +# This function posts env file to cmf-server +def call_python_env(url, file_name, file_path): + url_to_pass = f"{url}/python-env" + files = {'file': (file_name, open(file_path, 'rb'))} + response = requests.post(url_to_pass, files=files) + return response \ No newline at end of file diff --git a/cmflib/utils/helper_functions.py b/cmflib/utils/helper_functions.py index 9dc12ea3..3371bb15 100644 --- a/cmflib/utils/helper_functions.py +++ b/cmflib/utils/helper_functions.py @@ -18,6 +18,7 @@ import sys import subprocess import json +import yaml def is_url(url)-> bool: from urllib.parse import urlparse @@ -36,56 +37,96 @@ def is_git_repo(): else: return +def get_python_env(env_name='cmf'): + # what this is supposed to return + try: + # Check if the environment is conda + if is_conda_installed(): # If conda is installed and the command succeeds -def get_python_env()-> str: - installed_packages = "" - python_version = sys.version - packages = "" - # check if conda is installed - if is_conda_installed(): - import conda - # List all installed packages and their versions - data = list_conda_packages_json() - transformed_result = [f"{entry['name']}=={entry['version']}" for entry in data] - installed_packages = transformed_result - packages = f"Conda: Python {python_version}: {installed_packages}" - else: - # pip - try: - from pip._internal.operations import freeze - - # List all installed packages and their versions - installed_packages_generator = freeze.freeze() - installed_packages = list(installed_packages_generator) - packages = f"Python {python_version}: {installed_packages}" - except ImportError: - print("Pip is not installed.") - return packages + # Step 1: Get the list of conda packages + conda_packages = subprocess.check_output(['conda', 'list', '--export']).decode('utf-8').splitlines() + + # Step 2: Get the list of pip packages + pip_packages = subprocess.check_output(['pip', 'freeze']).decode('utf-8').splitlines() + + # Step 3: Get the list of channels from the current conda environment + channels_raw = subprocess.check_output(['conda', 'config', '--show', 'channels']).decode('utf-8').splitlines() + + # Filter out lines that start with 'channels:' and any empty or commented lines + channels = [line.strip().lstrip('- ').strip() for line in channels_raw if line and not line.startswith('channels:') and not line.startswith('#')] + + # Step 4: Create a YAML structure for the environment + env_data = { + 'name': env_name, # Name the environment -- don't provide the name + 'channels': channels, # Add the cleaned channels list + 'dependencies': [], + } + + # Add conda packages to dependencies + for package in conda_packages: + if not package.startswith('#') and len(package.strip()) > 0: + env_data['dependencies'].append(package) + + # Add pip packages under a pip section in dependencies + if pip_packages: + pip_section = {'pip': pip_packages} + env_data['dependencies'].append(pip_section) + + return env_data + + else: + # If not conda, assume virtualenv/pip + # Step 1: Get the list of pip packages + pip_packages = subprocess.check_output(['pip', 'freeze']).decode('utf-8').splitlines() + return pip_packages + + except Exception as e: + print(f"An error occurred: {e}") + + return + +def get_md5_hash(output): + import hashlib + + # Convert the string to bytes (utf-8 encoding) + byte_content = output.encode('utf-8') + + # Create an MD5 hash object + md5_hash = hashlib.md5() + + # Update the hash with the byte content + md5_hash.update(byte_content) + + # Return the hexadecimal digest + hash_for_op = md5_hash.hexdigest() + + return hash_for_op + def change_dir(cmf_init_path): logging_dir = os.getcwd() if not logging_dir == cmf_init_path: os.chdir(cmf_init_path) return logging_dir -def is_conda_installed(): + +def is_conda_installed() -> bool: + """Check if Conda is installed by running 'conda --version'.""" try: - import conda # Run the 'conda --version' command and capture the output subprocess.run(['conda', '--version'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return True - except subprocess.CalledProcessError: - return False - except ImportError: + except (subprocess.CalledProcessError, FileNotFoundError): return False -def list_conda_packages_json(): +def list_conda_packages_json() -> list: + """Return a list of installed Conda packages and their versions.""" try: result = subprocess.run(['conda', 'list', '--json'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) return json.loads(result.stdout) - except subprocess.CalledProcessError as e: - return f"Error: {e.stderr}" + except (subprocess.CalledProcessError, json.JSONDecodeError): + return [] # Generate SciToken dynamically diff --git a/docker-compose-server.yml b/docker-compose-server.yml index f5a267af..626b678a 100644 --- a/docker-compose-server.yml +++ b/docker-compose-server.yml @@ -29,6 +29,7 @@ services: # both the directory paths should be updated as per user's environment volumes: - /home/xxxx/cmf-server/data:/cmf-server/data + - /home/xxxx/cmf-server/data/env:/cmf-server/data/env - /home/xxxx/cmf-server/data/static:/cmf-server/data/static - /home/xxxx/cmf-server/data/tensorboard-logs:/cmf-server/data/tensorboard-logs container_name: cmf-server diff --git a/examples/example-get-started/src/query.py b/examples/example-get-started/src/query.py index 4b4b47ba..521ff518 100644 --- a/examples/example-get-started/src/query.py +++ b/examples/example-get-started/src/query.py @@ -12,7 +12,7 @@ def _print_executions_in_stage(cmf_query: cmfquery.CmfQuery, stage_name: str) -> print('\n') df: pd.DataFrame = cmf_query.get_all_executions_in_stage(stage_name) # dropping Python_Env value in query output as it is very big in size most of the time - df.drop(columns=['Git_Start_Commit', 'Git_End_Commit', 'Python_Env'], inplace=True, axis=1) + df.drop(columns=['Git_Start_Commit', 'Git_End_Commit'], inplace=True, axis=1) print(tabulate(df, headers='keys', tablefmt='psql')) diff --git a/examples/nano-cmf/src/query.py b/examples/nano-cmf/src/query.py index 8f6c41c6..97f31590 100644 --- a/examples/nano-cmf/src/query.py +++ b/examples/nano-cmf/src/query.py @@ -11,7 +11,7 @@ def _print_executions_in_stage(cmf_query: cmfquery.CmfQuery, stage_name: str) -> print('\n') print('\n') df: pd.DataFrame = cmf_query.get_all_executions_in_stage(stage_name) - df.drop(columns=['Git_Start_Commit', 'Git_End_Commit', 'Python_Env'], inplace=True, axis=1) + df.drop(columns=['Git_Start_Commit', 'Git_End_Commit'], inplace=True, axis=1) print(tabulate(df, headers='keys', tablefmt='psql')) diff --git a/server/app/main.py b/server/app/main.py index 30df9c5a..d417fb1a 100644 --- a/server/app/main.py +++ b/server/app/main.py @@ -1,7 +1,7 @@ # cmf-server api's from fastapi import FastAPI, Request, HTTPException, Query, UploadFile, File from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import HTMLResponse +from fastapi.responses import HTMLResponse, PlainTextResponse from fastapi.staticfiles import StaticFiles from contextlib import asynccontextmanager import pandas as pd @@ -325,6 +325,8 @@ async def artifact_types(request: Request): # checks if mlmd file exists on server if os.path.exists(server_store_path): artifact_types = await async_api(get_artifact_types, server_store_path) + if "Environment" in artifact_types: + artifact_types.remove("Environment") return artifact_types else: artifact_types = "" @@ -395,6 +397,52 @@ async def artifact_execution_lineage(request: Request, pipeline_name: str): response = await query_visualization_artifact_execution(server_store_path, pipeline_name, dict_of_art_ids, dict_of_exe_ids) return response +# Rest api is for pushing python env to upload python env +@app.post("/python-env") +async def upload_python_env(request:Request, file: UploadFile = File(..., description="The file to upload")): + try: + file_path = os.path.join("/cmf-server/data/env/", os.path.basename(file.filename)) + os.makedirs(os.path.dirname(file_path), exist_ok=True) + with open(file_path, "wb") as buffer: + buffer.write(await file.read()) + return {"message": f"File '{file.filename}' uploaded successfully"} + except Exception as e: + return {"error": f"Failed to up load file: {e}"} + +# Rest api to fetch the env data from the /cmf-server/data/env folder +@app.get("/python-env", response_class=PlainTextResponse) +async def get_python_env(file_name: str) -> str: + """ + API endpoint to fetch the content of a requirements file. + + Args: + file_name (str): The name of the file to be fetched. Must end with .txt or .yaml. + + Returns: + str: The content of the file as plain text. + + Raises: + HTTPException: If the file does not exist or the extension is unsupported. + """ + # Validate file extension + if not (file_name.endswith(".txt") or file_name.endswith(".yaml")): + raise HTTPException( + status_code=400, detail="Unsupported file extension. Use .txt or .yaml" + ) + + # Check if the file exists + file_path = os.path.join("/cmf-server/data/env/", os.path.basename(file_name)) + if not os.path.exists(file_path): + raise HTTPException(status_code=404, detail="File not found") + + # Read and return the file content as plain text + try: + with open(file_path, "r") as file: + content = file.read() + return content + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error reading file: {str(e)}") + async def update_global_art_dict(pipeline_name): global dict_of_art_ids diff --git a/ui/src/client.js b/ui/src/client.js index bf5d5b29..cba76743 100644 --- a/ui/src/client.js +++ b/ui/src/client.js @@ -152,6 +152,22 @@ class FastAPIClient { return data; }); } + + async getPythonEnv(file_name) { + return this.apiClient + .get(`/python-env`, { + params: { + file_name: file_name + }, + responseType: "text", // Explicitly specify response type as text + }) + .then(( response ) => { + return response.data; + }); + } + } + + export default FastAPIClient; diff --git a/ui/src/components/ArtifactTable/index.jsx b/ui/src/components/ArtifactTable/index.jsx index 51fd441d..9ff336ff 100644 --- a/ui/src/components/ArtifactTable/index.jsx +++ b/ui/src/components/ArtifactTable/index.jsx @@ -17,7 +17,7 @@ // ArtifactTable.jsx import React, { useState, useEffect } from "react"; import "./index.css"; -import Popup from "../../components/Popup"; +import ModelCardPopup from "../../components/ModelCardPopup"; import FastAPIClient from "../../client"; import config from "../../config"; @@ -181,7 +181,7 @@ const ArtifactTable = ({ artifacts, ArtifactType, onSort }) => { > Open Model Card - { // Default sorting order @@ -26,6 +31,9 @@ const ExecutionTable = ({ executions, onSort, onFilter }) => { const [filterValue, setFilterValue] = useState(""); const [expandedRow, setExpandedRow] = useState(null); + const [showPopup, setShowPopup] = useState(false); + const [popupData, setPopupData] = useState(""); + const consistentColumns = []; useEffect(() => { @@ -61,6 +69,20 @@ const ExecutionTable = ({ executions, onSort, onFilter }) => { } }; + + const handleLinkClick = (file_name) => { + setShowPopup(true); + client.getPythonEnv(file_name).then((data) => { + console.log(data); + setPopupData(data); + setShowPopup(true); + }); + }; + + const handleClosePopup = () => { + setShowPopup(false); + }; + const renderArrow = () => { if (sortOrder === "desc") { return ( @@ -168,6 +190,11 @@ const ExecutionTable = ({ executions, onSort, onFilter }) => { Execution + + + Python Env + + Git Repo @@ -192,6 +219,23 @@ const ExecutionTable = ({ executions, onSort, onFilter }) => { {data.Context_Type} {data.Execution} + + { + e.preventDefault(); + handleLinkClick(data.custom_properties_Python_Env); + + }} + > + Click for Env Details + + + {data.Git_Repo} {data.Git_Start_Commit} {data.Pipeline_Type} diff --git a/ui/src/components/Popup/index.css b/ui/src/components/ModelCardPopup/index.css similarity index 100% rename from ui/src/components/Popup/index.css rename to ui/src/components/ModelCardPopup/index.css diff --git a/ui/src/components/Popup/index.jsx b/ui/src/components/ModelCardPopup/index.jsx similarity index 98% rename from ui/src/components/Popup/index.jsx rename to ui/src/components/ModelCardPopup/index.jsx index 9f44df4a..ac387f8b 100644 --- a/ui/src/components/Popup/index.jsx +++ b/ui/src/components/ModelCardPopup/index.jsx @@ -1,7 +1,7 @@ import React from "react"; import "./index.css"; // Optional: For styling the popup -const Popup = ({ show, model_data, onClose }) => { +const ModelCardPopup = ({ show, model_data, onClose }) => { if (!show) { return null; } @@ -198,4 +198,4 @@ const Popup = ({ show, model_data, onClose }) => { ); }; -export default Popup; +export default ModelCardPopup; diff --git a/ui/src/components/PythonEnvPopup/index.jsx b/ui/src/components/PythonEnvPopup/index.jsx new file mode 100644 index 00000000..1c694194 --- /dev/null +++ b/ui/src/components/PythonEnvPopup/index.jsx @@ -0,0 +1,31 @@ +import React from "react"; +import "./index.module.css"; // Optional: For styling the popup + +const PythonEnvPopup = ({ show, python_env, onClose }) => { + if (!show) { + return null; + } + + console.log(python_env) + return ( + <> +
+
+
+ +
+

Environment Configuration

+
+
+              {python_env}
+            
+
+
+
+ + ); +}; + +export default PythonEnvPopup; diff --git a/ui/src/components/PythonEnvPopup/index.module.css b/ui/src/components/PythonEnvPopup/index.module.css new file mode 100644 index 00000000..2570ca31 --- /dev/null +++ b/ui/src/components/PythonEnvPopup/index.module.css @@ -0,0 +1,68 @@ +/* Overlay for popup */ +.popup-overlay { + position: fixed; + top: 0; + left: 0; + width: 100%; + height: 100%; + background-color: rgba(0, 0, 0, 0.5); + display: flex; + justify-content: center; + align-items: center; +} + +/* Popup container */ +.popup { + position: relative; + background-color: white; + padding: 20px; + border-radius: 8px; + width: 90%; + max-width: 800px; /* Adjusted for better responsiveness */ + max-height: 90vh; /* Prevent overflow beyond the viewport */ + box-shadow: 0 0 10px rgba(0, 0, 0, 0.1); + overflow-y: auto; /* Scroll if content exceeds height */ +} + +/* Close button */ +.close-button { + position: absolute; + top: -18px; + right: -19px; + background: gray; + color: white; + border: 2px solid black; + border-radius: 50%; + padding: 5px 10px; + font-size: 12px; /* Unified font size */ + cursor: pointer; + z-index: 10; +} + +/* Popup heading */ +.popup-heading { + font-size: 20px; + font-weight: bold; + text-align: center; + color: #333; + margin: 0; + padding: 10px 0; + background-color: #f9f9f9; /* Subtle background for heading */ + border-bottom: 1px solid #ddd; +} + +/* Popup content */ +.popup-content { + margin-top: 20px; + padding: 15px; + font-family: 'Courier New', Courier, monospace; + font-size: 14px; /* Default font size */ + background-color: #f8f9fa; /* Light gray background */ + color: #333; + border: 1px solid #ddd; + border-radius: 4px; + max-height: 400px; + overflow-y: auto; /* Scrollable content */ + white-space: pre-wrap; /* Preserve whitespace and line breaks */ +} +