diff --git a/cmflib/cmf.py b/cmflib/cmf.py index fa3e8cb6..4253ce92 100644 --- a/cmflib/cmf.py +++ b/cmflib/cmf.py @@ -154,9 +154,9 @@ def __load_neo4j_params(): cmf_config = os.environ.get("CONFIG_FILE", ".cmfconfig") if os.path.exists(cmf_config): attr_dict = CmfConfig.read_config(cmf_config) - __neo4j_uri = attr_dict.get("neo4j-uri", "") - __neo4j_password = attr_dict.get("neo4j-password", "") - __neo4j_user = attr_dict.get("neo4j-user", "") + Cmf.__neo4j_uri = attr_dict.get("neo4j-uri", "") + Cmf.__neo4j_password = attr_dict.get("neo4j-password", "") + Cmf.__neo4j_user = attr_dict.get("neo4j-user", "") @staticmethod @@ -218,6 +218,7 @@ def create_context( ) -> mlpb.Context: """Create's a context(stage). Every call creates a unique pipeline stage. + Updates Pipeline_stage name. Example: ```python #Create context @@ -257,8 +258,9 @@ def create_context( def merge_created_context( self, pipeline_stage: str, custom_properties: t.Optional[t.Dict] = None ) -> mlpb.Context: - """Create context. + """Merge created context. Every call creates a unique pipeline stage. + Created for metadata push purpose. Example: ```python @@ -269,12 +271,12 @@ def merge_created_context( # Create CMF logger cmf = Cmf(filename="mlmd", pipeline_name="test_pipeline") # Create context - context: mlmd.proto.Context = cmf.create_context( - pipeline_stage="prepare", + context: mlmd.proto.Context = cmf.merge_created_context( + pipeline_stage="Test-env/prepare", custom_properties ={"user-metadata1": "metadata_value"} ``` Args: - Pipeline_stage: Name of the Stage. + Pipeline_stage: Pipeline_Name/Stage_name. custom_properties: Developers can provide key value pairs with additional properties of the execution that need to be stored. Returns: @@ -397,6 +399,25 @@ def update_execution( """Updates an existing execution. The custom properties can be updated after creation of the execution. The new custom properties is merged with earlier custom properties. + Example + ```python + # Import CMF + from cmflib.cmf import Cmf + from ml_metadata.proto import metadata_store_pb2 as mlpb + # Create CMF logger + cmf = Cmf(filename="mlmd", pipeline_name="test_pipeline") + # Update a execution + execution: mlmd.proto.Execution = cmf.update_execution( + execution_id=8, + custom_properties = {"split": split, "seed": seed} + ) + ``` + Args: + execution_id: id of the execution. + custom_properties: Developers can provide key value pairs with additional properties of the execution that + need to be updated. + Returns: + Execution object from ML Metadata library associated with the updated execution for this stage. """ self.execution = self.store.get_executions_by_id([execution_id])[0] if self.execution is None: @@ -455,6 +476,51 @@ def merge_created_execution( orig_execution_name:str = "", create_new_execution:bool = True ) -> mlpb.Execution: + """Merge Created execution. + Every call creates a unique execution. Execution can only be created within a context, so + [create_context][cmflib.cmf.Cmf.create_context] must be called first. + Every call occurs when metadata push or pull is processed. Data from pre-existing executions is used + to create new executions with additional data(Required on cmf-server). + Example: + ```python + # Import CMF + from cmflib.cmf import Cmf + from ml_metadata.proto import metadata_store_pb2 as mlpb + # Create CMF logger + cmf = Cmf(filename="mlmd", pipeline_name="test_pipeline") + # Create or reuse context for this stage + context: mlmd.proto.Context = cmf.merge_created_context( + pipeline_stage="prepare", + custom_properties ={"user-metadata1": "metadata_value"} + ) + # Create a new execution for this stage run + execution: mlmd.proto.Execution = cmf.merge_created_execution( + execution_type="Prepare", + properties={"Context_Type":""}, + custom_properties = {"split": split, "seed": seed}, + orig_execution_name=execution_name + ) + ``` + Args: + execution_type: Type of the execution.(when create_new_execution is False, this is the name of execution) + properties: Properties of Execution. + custom_properties: Developers can provide key value pairs with additional properties of the execution that + need to be stored. + + cmd: command used to run this execution. + + create_new_execution:bool = True, This can be used by advanced users to re-use executions + This is applicable, when working with framework code like mmdet, pytorch lightning etc, where the + custom call-backs are used to log metrics. + if create_new_execution is True(Default), execution_type parameter will be used as the name of the execution type. + if create_new_execution is False, if existing execution exist with the same name as execution_type. + it will be reused. + Only executions created with create_new_execution as False will have "name" as a property. + + + Returns: + Execution object from ML Metadata library associated with the execution for this stage. + """ # Initializing the execution related fields properties = {} if properties is None else properties self.metrics = {} @@ -664,6 +730,21 @@ def log_dataset( return artifact def update_dataset_url(self, artifact: mlpb.Artifact, updated_url: str): + """Update dataset url + Updates url of given artifact. + Example + ```python + artifact: mlmd.proto.Artifact = cmf.update_dataset_url( + artifact="data.xml.gz" + updated_url="/repo/data.xml", + ) + ``` + Args: + artifact: Artifact for which url is to be updated + updated_url: The updated url path of the dataset. + Returns: + Updates artifact in mlmd, does not returns anything. + """ for key, value in artifact.properties.items(): if key == "url": old_url = value.string_value @@ -673,6 +754,19 @@ def update_dataset_url(self, artifact: mlpb.Artifact, updated_url: str): put_artifact(self.store, artifact) def update_model_url(self, dup_artifact: list, updated_url: str): + """Updates the URL property of model artifacts. + Example: + ```python + dup_artifact = [...] # List of artifacts + updated_url = "/new/url" + updated_artifacts = cmf.update_model_url(dup_artifact, updated_url) + ``` + Args: + dup_artifact: List of artifacts to update. + updated_url: New URL to add to the existing URLs. + Returns: + List of updated artifacts. + """ for art in dup_artifact: dup_art = art for key, value in dup_art.properties.items(): @@ -692,7 +786,28 @@ def log_dataset_with_version( props: t.Optional[t.Dict] = None, custom_properties: t.Optional[t.Dict] = None, ) -> mlpb.Artifact: - """Logs a dataset when the version(hash) is known""" + """Logs a dataset when the version (hash) is known. + Example: + ```python + artifact: mlpb.Artifact = cmf.log_dataset_with_version( + url="path/to/dataset", + version="abcdef", + event="output", + props={ "git_repo": "https://github.com/example/repo", + "url": "/path/in/repo", }, + custom_properties={ "custom_key": "custom_value", }, + ) + ``` + Args: + url: Path to the dataset. + version: Hash or version identifier for the dataset. + event: Takes arguments `INPUT` or `OUTPUT`. + props: Optional properties for the dataset (e.g., git_repo, url). + custom_properties: Optional custom properties for the dataset. + Returns: + Artifact object from the ML Protocol Buffers library associated with the new dataset artifact. + """ + props = {} if props is None else props custom_props = {} if custom_properties is None else custom_properties git_repo = props.get("git_repo", "") @@ -983,9 +1098,7 @@ def log_model_with_version( Args: path: Path to the model file. event: Takes arguments `INPUT` OR `OUTPUT`. - model_framework: Framework used to create the model. - model_type: Type of model algorithm used. - model_name: Name of the algorithm used. + props: Model artifact properties. custom_properties: The model properties. Returns: Artifact object from ML Metadata library associated with the new model artifact. @@ -1100,6 +1213,21 @@ def log_model_with_version( def log_execution_metrics_from_client(self, metrics_name: str, custom_properties: t.Optional[t.Dict] = None) -> mlpb.Artifact: + """ Logs execution metrics from a client. + Data from pre-existing metrics from client side is used to create identical metrics on server side. + Example: + ```python + artifact: mlpb.Artifact = cmf.log_execution_metrics_from_client( + metrics_name="example_metrics:uri:123", + custom_properties={"custom_key": "custom_value"}, + ) + ``` + Args: + metrics_name: Name of the metrics in the format "name:uri:execution_id". + custom_properties: Optional custom properties for the metrics. + Returns: + Artifact object from the ML Protocol Buffers library associated with the metrics artifact. + """ metrics = None custom_props = {} if custom_properties is None else custom_properties existing_artifact = [] @@ -1246,9 +1374,19 @@ def log_metric( self.metrics[metrics_name][1] = custom_properties def commit_metrics(self, metrics_name: str): - """Writes the inmemory metrics to parquet file - Commit the metrics file associated with the metrics id to dvc and git and - store the artifact in mlmd + """ Writes the in-memory metrics to a Parquet file, commits the metrics file associated with the metrics id to DVC and Git, + and stores the artifact in MLMD. + + Example: + ```python + artifact: mlpb.Artifact = cmf.commit_metrics("example_metrics") + ``` + + Args: + metrics_name: Name of the metrics. + + Returns: + Artifact object from the ML Protocol Buffers library associated with the new metrics artifact. """ metrics_df = pd.DataFrame.from_dict( self.metrics[metrics_name], orient="index") @@ -1303,6 +1441,21 @@ def commit_metrics(self, metrics_name: str): return metrics def commit_existing_metrics(self, metrics_name: str, uri: str, custom_properties: t.Optional[t.Dict] = None): + """ + Commits existing metrics associated with the given URI to MLMD. + Example: + ```python + artifact: mlpb.Artifact = cmf.commit_existing_metrics("existing_metrics", "abc123", + {"custom_key": "custom_value"}) + ``` + Args: + metrics_name: Name of the metrics. + uri: Unique identifier associated with the metrics. + custom_properties: Optional custom properties for the metrics. + Returns: + Artifact object from the ML Protocol Buffers library associated with the existing metrics artifact. + """ + custom_props = {} if custom_properties is None else custom_properties c_hash = uri.strip() existing_artifact = [] @@ -1373,7 +1526,18 @@ def log_validation_output( def update_existing_artifact( self, artifact: mlpb.Artifact, custom_properties: t.Dict ): - """Updates an existing artifact and stores it back to mlmd""" + """ Updates an existing artifact with the provided custom properties and stores it back to MLMD. + Example: + ```python + update_artifact=cmf.update_existing_artifact(existing_artifact, {"key1": "updated_value"}) + ``` + Args: + artifact: Existing artifact to be updated. + custom_properties: Dictionary containing custom properties to update. + Returns: + None + """ + for key, value in custom_properties.items(): if isinstance(value, int): artifact.custom_properties[key].int_value = value @@ -1424,6 +1588,20 @@ def read_dataslice(self, name: str) -> pd.DataFrame: # To do - Once update the hash and the new version should be updated in # the mlmd def update_dataslice(self, name: str, record: str, custom_properties: t.Dict): + """Updates a dataslice record in a Parquet file with the provided custom properties. + Example: + ```python + dataslice=cmf.update_dataslice("dataslice_file.parquet", "record_id", + {"key1": "updated_value"}) + ``` + Args: + name: Name of the Parquet file. + record: Identifier of the dataslice record to be updated. + custom_properties: Dictionary containing custom properties to update. + + Returns: + None + """ df = pd.read_parquet(name) temp_dict = df.to_dict("index") temp_dict[record].update(custom_properties) @@ -1583,39 +1761,106 @@ def commit_existing(self, uri: str, custom_properties: t.Optional[t.Dict] = None # os.symlink(str(index), slicedir + "/ " + last) def metadata_push(pipeline_name,filename,execution_id: str = ""): - """Pushes mlmd file to cmf-server """ + """ Pushes MLMD file to CMF-server. + Example: + ```python + result = metadata_push("example_pipeline", "mlmd_file", "3") + ``` + Args: + pipeline_name: Name of the pipeline. + filename: Path to the MLMD file. + execution_id: Optional execution ID. + + Returns: + Response output from the _metadata_push function. + """ # Required arguments: pipeline_name, filename (mlmd file path) #Optional arguments: Execution_ID output = _metadata_push(pipeline_name,filename, execution_id) return output def metadata_pull(pipeline_name,filename ="./mlmd", execution_id: str = ""): - """Pulls mlmd file from cmf-server""" + """ Pulls MLMD file from CMF-server. + Example: + ```python + result = metadata_pull("example_pipeline", "./mlmd_directory", "execution_123") + ``` + Args: + pipeline_name: Name of the pipeline. + filename: File path to store the MLMD file. + execution_id: Optional execution ID. + Returns: + Message from the _metadata_pull function. + """ # Required arguments: pipeline_name, filename(file path to store mlmd file) #Optional arguments: Execution_ID output = _metadata_pull(pipeline_name,filename, execution_id) return output def artifact_pull(pipeline_name,filename="./mlmd"): - """Pulls artifacts from initialized repository """ + """ Pulls artifacts from the initialized repository. + + Example: + ```python + result = artifact_pull("example_pipeline", "./mlmd_directory") + ``` + + Args: + pipeline_name: Name of the pipeline. + filename: Path to store artifacts. + + Returns: + Output from the _artifact_pull function. + """ + # Required arguments: Pipeline_name # Optional arguments: filename( path to store artifacts) output = _artifact_pull(pipeline_name,filename) return output def artifact_pull_single(pipeline_name,filename,artifact_name): - """Pulls artifacts from initialized repository """ + """ Pulls a single artifact from the initialized repository. + Example: + ```python + result = artifact_pull_single("example_pipeline", "./mlmd_directory", "example_artifact") + ``` + Args: + pipeline_name: Name of the pipeline. + filename: Path to store the artifact. + artifact_name: Name of the artifact. + Returns: + Output from the _artifact_pull_single function. + """ + # Required arguments: Pipeline_name # Optional arguments: filename( path to store artifacts) output = _artifact_pull_single(pipeline_name,filename,artifact_name) return output def artifact_push(): - """Push artifacts to initialized repository""" + """ Pushes artifacts to the initialized repository. + + Example: + ```python + result = artifact_push() + ``` + + Returns: + Output from the _artifact_push function. + """ output = _artifact_push() return output def cmf_init_show(): + """ Initializes and shows details of the CMF command. + Example: + ```python + result = cmf_init_show() + ``` + Returns: + Output from the _cmf_cmd_init function. + """ + output=_cmf_cmd_init() return output @@ -1634,6 +1879,38 @@ def cmf_init(type: str="", password: str="", port: int=0 ): + + """ Initializes the CMF configuration based on the provided parameters. + Example: + ```python + cmf_init( type="local", + path="/path/to/re", + git_remote_url="git@github.com:user/repo.git", + cmf_server_url="http://cmf-server" + neo4j_user", + neo4j_password="password", + neo4j_uri="bolt://localhost:76" + ) + ``` + Args: + type: Type of repository ("local", "minioS3", "amazonS3", "sshremote") + path: Path for the local repository. + git_remote_url: Git remote URL for version control. + cmf_server_url: CMF server URL. + neo4j_user: Neo4j database username. + neo4j_password: Neo4j database password. + neo4j_uri: Neo4j database URI. + url: URL for MinioS3 or AmazonS3. + endpoint_url: Endpoint URL for MinioS3. + access_key_id: Access key ID for MinioS3 or AmazonS3. + secret_key: Secret key for MinioS3 or AmazonS3. + user: SSH remote username. + password: SSH remote password. + port: SSH remote port + Returns: + Output based on the initialized repository type. + """ + if type=="": return print("Error: Type is not provided") if type not in ["local","minioS3","amazonS3","sshremote"]: diff --git a/cmflib/cmf_merger.py b/cmflib/cmf_merger.py index aa901fce..030de6bc 100644 --- a/cmflib/cmf_merger.py +++ b/cmflib/cmf_merger.py @@ -105,7 +105,6 @@ def parse_json_to_mlmd(mlmd_json, path_to_store, cmd, exec_id): ) elif artifact_type == "Metrics": # print(props,'parse') - # cmf_class.log_execution_metrics_with_uuid(props, custom_props) cmf_class.log_execution_metrics_from_client(event["artifact"]["name"], custom_props) elif artifact_type == "Dataslice": dataslice = cmf_class.create_dataslice(event["artifact"]["name"]) diff --git a/cmflib/cmfquery.py b/cmflib/cmfquery.py index 335d2326..42302df3 100644 --- a/cmflib/cmfquery.py +++ b/cmflib/cmfquery.py @@ -109,8 +109,6 @@ class CmfQuery(object): by users via CMF API. When methods in this class accept `name` parameters, it is expected that values of these parameters are fully-qualified names of respective entities. - TODO: (sergey) need to provide concrete examples and detailed description on how to actually use methods of this - class correctly, e.g., how to determine these fully-qualified names. Args: filepath: Path to the MLMD database file. @@ -648,7 +646,12 @@ def get_all_child_artifacts(self, artifact_name: str) -> pd.DataFrame: return df def get_one_hop_parent_artifacts(self, artifact_name: str) -> pd.DataFrame: - """Return input artifacts for the execution that produced the given artifact.""" + """Return input artifacts for the execution that produced the given artifact. + Args: + artifact_name: Artifact name. + Returns: + Data frame containing immediate parent artifactog of given artifact. + """ artifact: t.Optional = self._get_artifact(artifact_name) if not artifact: return pd.DataFrame() @@ -660,7 +663,12 @@ def get_one_hop_parent_artifacts(self, artifact_name: str) -> pd.DataFrame: ) def get_all_parent_artifacts(self, artifact_name: str) -> pd.DataFrame: - """Return all upstream artifacts.""" + """Return all upstream artifacts. + Args: + artifact_name: Artifact name. + Returns: + Data frame containing all parent artifacts. + """ df = pd.DataFrame() d1 = self.get_one_hop_parent_artifacts(artifact_name) # df = df.append(d1, sort=True, ignore_index=True) @@ -673,7 +681,12 @@ def get_all_parent_artifacts(self, artifact_name: str) -> pd.DataFrame: return df def get_all_parent_executions(self, artifact_name: str) -> pd.DataFrame: - """Return all executions that produced upstream artifacts for the given artifact.""" + """Return all executions that produced upstream artifacts for the given artifact. + Args: + artifact_name: Artifact name. + Returns: + Data frame containing all parent executions. + """ parent_artifacts: pd.DataFrame = self.get_all_parent_artifacts(artifact_name) if parent_artifacts.shape[0] == 0: # If it's empty, there's no `id` column and the code below raises an exception. @@ -728,7 +741,12 @@ def find_producer_execution(self, artifact_name: str) -> t.Optional[mlpb.Executi get_producer_execution = find_producer_execution def get_metrics(self, metrics_name: str) -> t.Optional[pd.DataFrame]: - """Return metric data frame.""" + """Return metric data frame. + Args: + metrics_name: Metrics name. + Returns: + Data frame containing all metrics. + """ for metric in self.store.get_artifacts_by_type("Step_Metrics"): if metric.name == metrics_name: name: t.Optional[str] = metric.custom_properties.get("Name", None) @@ -749,6 +767,8 @@ def dumptojson(self, pipeline_name: str, exec_id: t.Optional[int] = None) -> t.O Args: pipeline_name: Name of an AI pipelines. exec_id: Optional stage execution ID - filter stages by this execution ID. + Returns: + Pipeline in JSON format. """ if exec_id is not None: exec_id = int(exec_id) diff --git a/cmflib/commands/artifact/pull.py b/cmflib/commands/artifact/pull.py index fbfd1fc4..f6025a29 100644 --- a/cmflib/commands/artifact/pull.py +++ b/cmflib/commands/artifact/pull.py @@ -81,7 +81,8 @@ def extract_repo_args(self, type: str, name: str, url: str, current_directory: s # url_with_bucket = varkha-test/23/6d9502e0283d91f689d7038b8508a2 # Splitting the string using '/' as the delimiter bucket_name, object_name = url_with_bucket.split('/', 1) - download_loc = current_directory + "/" + name + download_loc = current_directory + "/" + name if current_directory != "" else name + print(download_loc) return bucket_name, object_name, download_loc else: # returning bucket_name, object_name and download_loc returning as empty diff --git a/cmflib/commands/init/amazonS3.py b/cmflib/commands/init/amazonS3.py index a1ac4b56..d12e6487 100644 --- a/cmflib/commands/init/amazonS3.py +++ b/cmflib/commands/init/amazonS3.py @@ -127,12 +127,12 @@ def add_parser(subparsers, parent_parser): default=argparse.SUPPRESS, ) - required_arguments.add_argument( + parser.add_argument( "--session-token", required=True, help="Specify Session Token.", metavar="", - default=argparse.SUPPRESS, + default="", ) required_arguments.add_argument( diff --git a/cmflib/dvc_wrapper.py b/cmflib/dvc_wrapper.py index a3ab8c7c..c0aa8a38 100644 --- a/cmflib/dvc_wrapper.py +++ b/cmflib/dvc_wrapper.py @@ -435,7 +435,7 @@ def dvc_push() -> str: process = subprocess.Popen(['dvc', 'push'], stdout=subprocess.PIPE, universal_newlines=True) - output, errs = process.communicate(timeout=60) + output, errs = process.communicate() commit = output.strip() except Exception as err: diff --git a/cmflib/storage_backends/amazonS3_artifacts.py b/cmflib/storage_backends/amazonS3_artifacts.py index f817d070..e252fb04 100644 --- a/cmflib/storage_backends/amazonS3_artifacts.py +++ b/cmflib/storage_backends/amazonS3_artifacts.py @@ -37,7 +37,9 @@ def download_artifacts( aws_session_token=session_token ) s3.head_bucket(Bucket=bucket_name) - dir_path, _ = download_loc.rsplit("/", 1) + dir_path = "" + if "/" in download_loc: + dir_path, _ = download_loc.rsplit("/", 1) if dir_path != "": os.makedirs(dir_path, mode=0o777, exist_ok=True) # creating subfolders if needed response = s3.download_file(bucket_name, object_name, download_loc) diff --git a/docs/api/public/cmf.md b/docs/api/public/cmf.md index cc60c906..f1275daa 100644 --- a/docs/api/public/cmf.md +++ b/docs/api/public/cmf.md @@ -1,4 +1,18 @@ -# cmflib.cmf.Cmf +# cmflib.cmf + +::: cmflib.cmf + options: + show_root_toc_entry: false + merge_init_into_class: true + docstring_style: google + members: + - metadata_push + - metadata_pull + - artifact_pull + - artifact_pull_single + - artifact_push + - cmf_init_show + - cmf_init ::: cmflib.cmf.Cmf options: @@ -8,9 +22,17 @@ members: - __init__ - create_context + - merge_created_context - create_execution + - update_execution + - merge_created__execution - log_dataset + - log_dataset_with_version - log_model + - log_model_with_version + - log_execution_metrics_from_client - log_execution_metrics - log_metric + - commit_existing_metrics - create_dataslice + - update_dataslice diff --git a/docs/api/public/cmfquery.md b/docs/api/public/cmfquery.md new file mode 100644 index 00000000..0c8ae361 --- /dev/null +++ b/docs/api/public/cmfquery.md @@ -0,0 +1,29 @@ +# cmflib.cmfquery.CmfQuery + +::: cmflib.cmfquery.CmfQuery + options: + show_root_toc_entry: false + merge_init_into_class: true + docstring_style: google + members: + - get_pipeline_names + - get_pipeline_id + - get_pipeline_stages + - get_all_exe_in_stage + - get_all_executions_by_ids_list + - get_all_artifacts_by_context + - get_all_artifacts_by_ids_list + - get_all_executions_in_stage + - get_artifact_df + - get_all_artifacts + - get_artifact + - get_all_artifacts_for_execution + - get_all_artifact_types + - get_all_executions_for_artifact + - get_one_hop_child_artifacts + - get_all_child_artifacts + - get_one_hop_parent_artifacts + - get_all_parent_artifacts + - get_all_parent_executions + - get_metrics + - dumptojson diff --git a/mkdocs.yaml b/mkdocs.yaml index 8b8c7a0a..ee4e6f92 100644 --- a/mkdocs.yaml +++ b/mkdocs.yaml @@ -93,6 +93,7 @@ nav: - Public API: - CMF: api/public/cmf.md - DataSlice: api/public/dataslice.md + - CmfQuery: api/public/cmfquery.md - Ontology: - Ontology: common-metadata-ontology/readme.md diff --git a/pyproject.toml b/pyproject.toml index c37d05c0..0244af35 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "cmflib" -version = "0.0.7" +version = "0.0.8" dependencies = [ "ml-metadata==1.11.0", "dvc[ssh,s3]==2.27.0", diff --git a/setup.py b/setup.py index 67570cfc..209921d0 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,6 @@ from setuptools import setup, find_packages -VERSION = '0.0.7' +VERSION = '0.0.8' DESCRIPTION = 'Metadata Python Package' LONG_DESCRIPTION = 'Metadata framework storing AI metadata into MLMD'