Skip to content

Commit

Permalink
Path branch (#200)
Browse files Browse the repository at this point in the history
* Added path variables as class properties and put artifacts inside the respective executions

* segregate by execution id

* replace / with comma

* removed duplicate word
  • Loading branch information
annmary-roy authored Aug 29, 2024
1 parent a5c56fe commit 0269f34
Showing 1 changed file with 7 additions and 4 deletions.
11 changes: 7 additions & 4 deletions cmflib/cmf.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ class Cmf:
# pylint: disable=too-many-instance-attributes
# Reading CONFIG_FILE variable
cmf_config = os.environ.get("CONFIG_FILE", ".cmfconfig")
ARTIFACTS_PATH = "cmf_artifacts"
DATASLICE_PATH = "dataslice"
METRICS_PATH = "metrics"
if os.path.exists(cmf_config):
attr_dict = CmfConfig.read_config(cmf_config)
__neo4j_uri = attr_dict.get("neo4j-uri", "")
Expand Down Expand Up @@ -1495,7 +1498,7 @@ def commit_metrics(self, metrics_name: str):
assert self.execution is not None, f"Failed to create execution for {self.pipeline_name}!!"


directory_path = os.path.join( "cmf_artifacts/metrics",self.execution.properties["Execution_uuid"].string_value)
directory_path = os.path.join(ARTIFACTS_PATH, self.execution.properties["Execution_uuid"].string_value.split(',')[0], METRICS_PATH)
os.makedirs(directory_path, exist_ok=True)
metrics_df = pd.DataFrame.from_dict(
self.metrics[metrics_name], orient="index")
Expand Down Expand Up @@ -1721,7 +1724,7 @@ def create_dataslice(self, name: str) -> "Cmf.DataSlice":
def read_dataslice(self, name: str) -> pd.DataFrame:
"""Reads the dataslice"""
# To do checkout if not there
directory_path = os.path.join("cmf_artifacts/dataslices",self.execution.properties["Execution_uuid"].string_value)
directory_path = os.path.join(ARTIFACTS_PATH, self.execution.properties["Execution_uuid"].string_value.split(',')[0], DATASLICE_PATH)
name = os.path.join(directory_path, name)
df = pd.read_parquet(name)
return df
Expand All @@ -1743,7 +1746,7 @@ def update_dataslice(self, name: str, record: str, custom_properties: t.Dict):
Returns:
None
"""
directory_path = os.path.join("cmf_artifacts/dataslices", self.execution.properties["Execution_uuid"].string_value)
directory_path = os.path.join(ARTIFACTS_PATH, self.execution.properties["Execution_uuid"].string_value.split(',')[0], DATASLICE_PATH)
name = os.path.join(directory_path, name)
df = pd.read_parquet(name)
temp_dict = df.to_dict("index")
Expand Down Expand Up @@ -1827,7 +1830,7 @@ def commit(self, custom_properties: t.Optional[t.Dict] = None) -> None:
self.writer.create_execution(execution_type=name_without_extension)
assert self.writer.execution is not None, f"Failed to create execution for {self.pipeline_name}!!"

directory_path = os.path.join( "cmf_artifacts/dataslices",self.writer.execution.properties["Execution_uuid"].string_value)
directory_path = os.path.join(self.writer.ARTIFACTS_PATH, self.writer.execution.properties["Execution_uuid"].string_value.split(',')[0], self.writer.DATASLICE_PATH)
os.makedirs(directory_path, exist_ok=True)
custom_props = {} if custom_properties is None else custom_properties
git_repo = git_get_repo()
Expand Down

0 comments on commit 0269f34

Please sign in to comment.