Skip to content

Commit

Permalink
Sync api to async api (#202)
Browse files Browse the repository at this point in the history
* adding async poc changes

* adding changes with run in threadpool

* added async wrapper for rest of the api's

* adding update context and exec functions

* adding changes

* update

* adding async poc changes

* adding changes with run in threadpool

* added async wrapper for rest of the api's

* adding update context and exec functions

* adding changes

* update

* update

* adding lock to create_unique_exec

* added comments removed unused libraries and imports

* adding async poc changes

* adding changes with run in threadpool

* added async wrapper for rest of the api's

* adding update context and exec functions

* adding changes

* update

* adding async poc changes

* adding changes with run in threadpool

* added async wrapper for rest of the api's

* adding update context and exec functions

* update

* adding lock to create_unique_exec

* added comments removed unused libraries and imports

* removing lock from create_unique_exec and adding to mlmd_push

* update parse_json_to_mlmd, adding comment and defining variable type

* adding changes

* Adding changes for the case when pipeline_name doesn't exists in the dict_of_exe_ids, this situtation is rare but is already reproduced multiple times ; adding it as a precaution

* Fixed a typo

* resolving GUI bugs, when double clicked on artifact type it will not request multiple API calls and click issue between artifact types of different pipelines

* adding uuid to dataset artifacts

* handling exception in artifact push if artifacts are empty

* Update query_artifact_lineage_d3tree.py comments

---------

Co-authored-by: Abhinav Chobey <[email protected]>
Co-authored-by: Varkha Sharma <[email protected]>
Co-authored-by: First Second <[email protected]>
  • Loading branch information
4 people authored Oct 8, 2024
1 parent e394461 commit 3b08813
Show file tree
Hide file tree
Showing 15 changed files with 305 additions and 198 deletions.
45 changes: 41 additions & 4 deletions cmflib/cmf.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import typing as t

# This import is needed for jupyterlab environment
import dvc
from ml_metadata.proto import metadata_store_pb2 as mlpb
from ml_metadata.metadata_store import metadata_store
from cmflib.dvc_wrapper import (
Expand All @@ -44,6 +43,8 @@
from cmflib.metadata_helper import (
get_or_create_parent_context,
get_or_create_run_context,
get_or_create_context_with_type,
update_context_custom_properties,
associate_child_to_parent_context,
create_new_execution_in_existing_run_context,
link_execution_to_artifact,
Expand Down Expand Up @@ -313,6 +314,42 @@ def merge_created_context(
)
return ctx

def update_context(
self,
type_name: str,
context_name: str,
context_id: int,
properties: t.Optional[t.Dict] = None,
custom_properties: t.Optional[t.Dict] = None
) -> mlpb.Context:
self.context = get_or_create_context_with_type(
self.store,
context_name,
type_name,
properties,
type_properties = None,
custom_properties = custom_properties
)
if self.context is None:
print("Error - no context id")
return

if custom_properties:
for key, value in custom_properties.items():
if isinstance(value, int):
self.context.custom_properties[key].int_value = value
else:
self.context.custom_properties[key].string_value = str(
value)
updated_context = update_context_custom_properties(
self.store,
context_id,
context_name,
self.context.properties,
self.context.custom_properties,
)
return updated_context

def create_execution(
self,
execution_type: str,
Expand Down Expand Up @@ -1678,14 +1715,14 @@ def update_existing_artifact(
custom_properties: Dictionary containing custom properties to update.
Returns:
None
"""

"""
for key, value in custom_properties.items():
if isinstance(value, int):
artifact.custom_properties[key].int_value = value
else:
artifact.custom_properties[key].string_value = str(value)
put_artifact(self.store, artifact)


def get_artifact(self, artifact_id: int) -> mlpb.Artifact:
"""Gets the artifact object from mlmd"""
Expand Down Expand Up @@ -1779,7 +1816,7 @@ def add_data(
should already be versioned.
Example:
```python
dataslice.add_data(f"data/raw_data/{j}.xml)
#dataslice.add_data(f"data/raw_data/{j}.xml)
```
Args:
path: Name to identify the file to be added to the dataslice.
Expand Down
218 changes: 128 additions & 90 deletions cmflib/cmf_merger.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
###
#
# Copyright (2022) Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -17,103 +17,141 @@
import json
import os
from cmflib import cmf
import traceback
from ml_metadata.errors import AlreadyExistsError
from ml_metadata.metadata_store import metadata_store
from ml_metadata.proto import metadata_store_pb2 as mlpb
from typing import Union


# mlmd is created from metadata passed in Json format
def parse_json_to_mlmd(mlmd_json, path_to_store, cmd, exec_id):
mlmd_data = json.loads(mlmd_json)
# type(mlmd_data)
pipelines = mlmd_data["Pipeline"]
# print(type(pipelines))
pipeline = pipelines[0]
# print(type(pipeline))
pipeline_name = pipeline["name"]
# print(type(pipeline_name))
stage = {}
if cmd == "push":
data = create_original_time_since_epoch(mlmd_data)
else:
data = mlmd_data
graph = False
if os.getenv('NEO4J_URI', "") != "":
graph = True
cmf_class = cmf.Cmf(filepath=path_to_store, pipeline_name=pipeline_name,
graph=graph, is_server=True)
for stage in data["Pipeline"][0]["stages"]: # Iterates over all the stages
if exec_id is None:
list_executions = [execution for execution in stage["executions"]]
elif exec_id is not None:
list_executions = [
execution
for execution in stage["executions"]
if execution["id"] == int(exec_id)
]
def parse_json_to_mlmd(mlmd_json, path_to_store: str, cmd: str, exec_id: Union[str, int]) -> Union[str, None]:
try:
mlmd_data = json.loads(mlmd_json)
pipelines = mlmd_data["Pipeline"]
pipeline = pipelines[0]
pipeline_name = pipeline["name"]
stage = {}

# When the command is "push", add the original_time_since_epoch to the custom_properties in the metadata while pulling mlmd no need
if cmd == "push":
data = create_original_time_since_epoch(mlmd_data)
else:
return "Invalid execution id given."
data = mlmd_data

for execution in list_executions: # Iterates over all the executions
_ = cmf_class.merge_created_context(
pipeline_stage = stage['name'],
custom_properties = stage["custom_properties"],
)
_ = cmf_class.merge_created_execution(
execution["properties"]["Context_Type"],
execution["properties"]["Execution"],
execution["properties"],
execution["custom_properties"],
execution["name"]
)
for event in execution["events"]: # Iterates over all the events
artifact_type = event["artifact"]["type"]
event_type = event["type"]
artifact_name = (event["artifact"]["name"].split(":"))[0]
custom_props = event["artifact"]["custom_properties"]
props = event["artifact"]["properties"]
# print(props,'props')
uri = event["artifact"]["uri"]
if artifact_type == "Dataset" and event_type == 3:
cmf_class.log_dataset_with_version(
artifact_name,
uri,
"input",
props,
custom_properties=custom_props,
graph = False
# if cmf is configured with 'neo4j' make graph True.
if os.getenv('NEO4J_URI', "") != "":
graph = True

# Initialize the connection configuration and metadata store
config = mlpb.ConnectionConfig()
config.sqlite.filename_uri = path_to_store
store = metadata_store.MetadataStore(config)

# Initialize the cmf class with pipeline_name and graph_status
cmf_class = cmf.Cmf(filepath=path_to_store, pipeline_name=pipeline_name, #intializing cmf
graph=graph, is_server=True)

for stage in data["Pipeline"][0]["stages"]: # Iterates over all the stages
if exec_id is None: #if exec_id is None we pass all the executions.
list_executions = [execution for execution in stage["executions"]]
elif exec_id is not None: # elif exec_id is not None, we pass executions for that specific id.
list_executions = [
execution
for execution in stage["executions"]
if execution["id"] == int(exec_id)
]
else:
return "Invalid execution id given."

for execution in list_executions: # Iterates over all the executions
try:
_ = cmf_class.merge_created_context(
pipeline_stage=stage['name'],
custom_properties=stage["custom_properties"],
)
elif artifact_type == "Dataset" and event_type == 4:
cmf_class.log_dataset_with_version(
artifact_name,
uri,
"output",
props,
custom_properties=custom_props,
except AlreadyExistsError as e:
# Handle the case where the context already exists, possibly due to concurrent pushes.
# As both pipelines will be unable to fetch data from server
# updating custom properties if context already exists
_ = cmf_class.update_context(
str(stage["type"]),
str(stage["name"]),
stage["id"],
stage["properties"],
custom_properties=stage["custom_properties"]
)
elif artifact_type == "Model" and event_type == 3:
props["uri"] = uri
cmf_class.log_model_with_version(
path=artifact_name,
event="input",
props=props,
custom_properties=custom_props,
except Exception as e:
print(f"Error in merge_created_context")
try:
_ = cmf_class.merge_created_execution(
execution["properties"]["Context_Type"],
execution["properties"]["Execution"],
execution["properties"],
execution["custom_properties"],
execution["name"]
)
elif artifact_type == "Model" and event_type == 4:
props["uri"] = uri
cmf_class.log_model_with_version(
path=artifact_name,
event="output",
props=props,
custom_properties=custom_props,
except AlreadyExistsError as e:
_ = cmf_class.update_execution(
execution["id"],
execution["custom_properties"]
)
elif artifact_type == "Metrics":
# print(props,'parse')
cmf_class.log_execution_metrics_from_client(event["artifact"]["name"], custom_props)
elif artifact_type == "Dataslice":
dataslice = cmf_class.create_dataslice(event["artifact"]["name"])
dataslice.commit_existing(uri, props, custom_props)
elif artifact_type == "Step_Metrics":
cmf_class.commit_existing_metrics(event["artifact"]["name"], uri, props, custom_props)
else:
pass
except Exception as e:
print(f"Error in merge_created_execution {e}")

for event in execution["events"]: # Iterates over all the events
artifact_type = event["artifact"]["type"]
event_type = event["type"]
artifact_name = (event["artifact"]["name"].split(":"))[0]
custom_props = event["artifact"]["custom_properties"]
props = event["artifact"]["properties"]
uri = event["artifact"]["uri"]

try:
if artifact_type == "Dataset" :
if event_type == 3 :
event_io = "input"
else:
event_io = "output"
cmf_class.log_dataset_with_version(
artifact_name,
uri,
event_io,
props,
custom_properties=custom_props,
)
elif artifact_type == "Model":
if event_type == 3 :
event_io = "input"
else:
event_io = "output"
props["uri"] = uri
cmf_class.log_model_with_version(
path=artifact_name,
event= event_io,
props=props,
custom_properties=custom_props,
)
elif artifact_type == "Metrics":
cmf_class.log_execution_metrics_from_client(event["artifact"]["name"], custom_props)
elif artifact_type == "Dataslice":
dataslice = cmf_class.create_dataslice(event["artifact"]["name"])
dataslice.commit_existing(uri, custom_props)
elif artifact_type == "Step_Metrics":
cmf_class.commit_existing_metrics(event["artifact"]["name"], uri, custom_props)
else:
pass
except AlreadyExistsError as e:
# if same pipeline is pushed twice at same time, update custom_properties using 2nd pipeline
artifact = store.get_artifacts_by_uri(uri)
cmf_class.update_existing_artifact(
artifact[0],
custom_properties=custom_props,
)
except Exception as e:
print(f"Error in log_{artifact_type}_with_version" , e)
except Exception as e:
print(f"An error occurred in parse_json_to_mlmd: {e}")
traceback.print_exc()

# create_time_since_epoch is appended to mlmd pushed to cmf-server as original_create_time_since_epoch
def create_original_time_since_epoch(mlmd_data):
Expand Down
2 changes: 0 additions & 2 deletions cmflib/cmfquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@
import typing as t
from enum import Enum
from google.protobuf.json_format import MessageToDict

import pandas as pd
from ml_metadata.metadata_store import metadata_store
from ml_metadata.proto import metadata_store_pb2 as mlpb

from cmflib.mlmd_objects import CONTEXT_LIST

__all__ = ["CmfQuery"]
Expand Down
9 changes: 5 additions & 4 deletions cmflib/commands/artifact/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,11 @@ def run(self):
identifier
) # getting all artifacts with id
# dropping artifact with type 'metrics' as metrics doesn't have physical file
artifacts = artifacts[artifacts['type'] != 'Metrics']
# adding .dvc at the end of every file as it is needed for pull
artifacts['name'] = artifacts['name'].apply(lambda name: f"{name.split(':')[0]}.dvc")
names.extend(artifacts['name'].tolist())
if not artifacts.empty:
artifacts = artifacts[artifacts['type'] != 'Metrics']
# adding .dvc at the end of every file as it is needed for pull
artifacts['name'] = artifacts['name'].apply(lambda name: f"{name.split(':')[0]}.dvc")
names.extend(artifacts['name'].tolist())
file_set = set(names)
result = dvc_push(list(file_set))
return result
Expand Down
1 change: 0 additions & 1 deletion cmflib/commands/metadata/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from cmflib.server_interface import server_interface
from cmflib.utils.cmf_config import CmfConfig


# This class pushes mlmd file to cmf-server
class CmdMetadataPush(CmdBase):
def run(self):
Expand Down
Loading

0 comments on commit 3b08813

Please sign in to comment.