-
Notifications
You must be signed in to change notification settings - Fork 3
/
weld.py
69 lines (56 loc) · 1.89 KB
/
weld.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
"""
Perform the data pipeline welding procedure.
This links together a versioned processing pipeline (recipe) with versioned data.
Execution:
To perform the weld, edit the configuration files in the config/ folder for the
specific experiment, and then execute:
python weld.py --experiment_config_file $EXPCONFIG --options_config_file $OPCONFIG
EXPCONFIG and OPCONFIG are environment variables of the customized yaml config files
"""
import pathlib
import multiprocessing as mp
from weld_pipeline import weld_pipeline
from config.utils import get_batches, parse_command_args
EXPCONFIG_DEFAULT = pathlib.Path("config/experiment.yaml")
OPCONFIG_DEFAULT = pathlib.Path("config/options.yaml")
RECIPE_DEFAULT = "recipe"
args = parse_command_args(
experiment_config_file=EXPCONFIG_DEFAULT,
options_config_file=OPCONFIG_DEFAULT,
recipe_dir=RECIPE_DEFAULT,
)
recipe_dir = args.recipe_dir
experiment_config_file = args.experiment_config_file
options_config_file = args.options_config_file
parallel = args.parallel
force = args.force
batches = get_batches(config=experiment_config_file)
if parallel:
# Setup multiprocessing
pool = mp.Pool(mp.cpu_count() - 1)
# Run the welding pipeline for different plates
results = [
pool.apply(
weld_pipeline,
args=(
batch_id,
recipe_dir,
experiment_config_file,
options_config_file,
force,
),
)
for batch_id in batches
]
# Close the parallelization
pool.close()
else:
for batch_id in batches:
# If parallel is not provided, then process each plate sequentially
weld_pipeline(
batch_id=batch_id,
recipe_folder=recipe_dir,
experiment_config_file=experiment_config_file,
options_config_file=options_config_file,
force=force,
)