Skip to content

Commit

Permalink
configure path and basic pipeline of basic_cleaning
Browse files Browse the repository at this point in the history
  • Loading branch information
panstav1 committed Nov 18, 2023
1 parent de5d468 commit 7c21542
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 8 deletions.
3 changes: 2 additions & 1 deletion config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ main:
experiment_name: development
steps: all
etl:
sample: "sample1.csv"
sample: "sample.csv"
min_price: 10 # dollars
max_price: 350 # dollars
output_artifact: "cleaned_dataset.parquet"
data_check:
kl_threshold: 0.2
modeling:
Expand Down
6 changes: 3 additions & 3 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ def go(config: DictConfig):

if "basic_cleaning" in active_steps:
_ = mlflow.run (
os.path.join (root_path,"basic_cleaning"),
os.path.join (root_path, "src", "basic_cleaning"),
"main",
parameters={
"input_artifact": config["etl"]["sample"],
"output_artifact": "cleaned_dataset.parquet",
"input_artifact": config["etl"]["sample"]+ ':latest',
"output_artifact": config["etl"]["output_artifact"],
"output_type": "parquet",
"output_description": "Cleaned_data_without_outliers",
"min_price": config["etl"]["min_price"],
Expand Down
4 changes: 3 additions & 1 deletion src/basic_cleaning/conda.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,7 @@ channels:
- defaults
dependencies:
- pip=20.3.3
- pandas=1.2.3
- pip:
- wandb==0.12.17
- wandb==0.12.17
- mlflow==2.2.2
40 changes: 37 additions & 3 deletions src/basic_cleaning/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import argparse
import logging
import wandb
import pandas as pd
import os


logging.basicConfig(level=logging.INFO, format="%(asctime)-15s %(message)s")
Expand All @@ -21,11 +23,43 @@ def go(args):
# particular version of the artifact
# artifact_local_path = run.use_artifact(args.input_artifact).file()

######################
# YOUR CODE HERE #
######################
logger.info(f"Downloading {args.input_artifact} ...")
artifact = run.use_artifact(args.input_artifact)
artifact_path = artifact.file()

dataframe = pd.read_csv(artifact_path)

min_price = args.min_price
max_price = args.max_price
logger.info('Applying min/max outlier detection on price column')
try:
# Outlier drop on price
idx = dataframe['price'].between (min_price, max_price)
except TypeError as err:
logger.error(err)
logger.error('Min price and Max Price are not numbers')
raise TypeError(err)

outlier_df = dataframe[idx].copy ()

# Convert last_review to datetime
outlier_df['last_review'] = pd.to_datetime (outlier_df['last_review'])
outlier_df.to_parquet(args.output_artifact)

logger.info(f'Storing file as {args.output_artifact}...')

# Storing the dataset
artifact = wandb.Artifact(
name=args.output_artifact,
type=args.output_type,
description=args.output_description,
)
artifact.add_file(args.output_artifact)

logger.info("Logging artifact")
run.log_artifact(artifact)

os.remove(args.output_artifact)

if __name__ == "__main__":

Expand Down

0 comments on commit 7c21542

Please sign in to comment.