From 4d6a83a694d52851f0d2ffa3e41a33cc0a632c99 Mon Sep 17 00:00:00 2001 From: Florian Deconinck Date: Tue, 1 Aug 2023 15:42:03 -0400 Subject: [PATCH 01/12] Move AQ GEOS to new build/Run stack AQ GEOS benchmark --- experiments/experiments.yaml | 2 +- geosongpu_ci/actions/slurm.py | 6 +- geosongpu_ci/pipeline/aquaplanet.py | 226 ++++++++++++++++++++++--- geosongpu_ci/pipeline/geos.py | 52 ++++-- geosongpu_ci/pipeline/held_suarez.py | 45 ++--- geosongpu_ci/tools/benchmark/report.py | 14 +- 6 files changed, 262 insertions(+), 83 deletions(-) diff --git a/experiments/experiments.yaml b/experiments/experiments.yaml index b17d180..8d3579a 100644 --- a/experiments/experiments.yaml +++ b/experiments/experiments.yaml @@ -34,7 +34,7 @@ geos_aq: tag_or_hash: gcm-v11.0.4.2 note: Aquaplanet v11.0.4.2 input: - directory: /discover/nobackup/projects/geosongpu/geos_data/aquaplanet/gcm-11.0.4.2/C180-L72/ + C180-L72: /discover/nobackup/projects/geosongpu/geos_data/aquaplanet/gcm-11.0.4.2/C180-L72/ tasks: - GEOS - Aquaplanet diff --git a/geosongpu_ci/actions/slurm.py b/geosongpu_ci/actions/slurm.py index a50e7d2..1b10453 100644 --- a/geosongpu_ci/actions/slurm.py +++ b/geosongpu_ci/actions/slurm.py @@ -68,14 +68,14 @@ def one_half_Nodes_CPU(cls) -> "SlurmConfiguration": def wait_for_sbatch(job_id: str): - sleep(5) # wait 5 seconds for SLURM to enter prolog + sleep(60) # wait 60 seconds for SLURM to enter prolog running = True while running: sacct_result = subprocess.run( ["sacct", "-j", job_id, "-o", "state"], stdout=subprocess.PIPE ).stdout.decode("utf-8") running = False - for state in sacct_result.split("\n")[2:]: - if state.strip() in ["RUNNING", "PENDING"]: + for state in sacct_result.split("\n"): + if "RUNNING" in state.strip or "PENDING" in state: running = True break diff --git a/geosongpu_ci/pipeline/aquaplanet.py b/geosongpu_ci/pipeline/aquaplanet.py index d2dc369..1c89dd3 100644 --- a/geosongpu_ci/pipeline/aquaplanet.py +++ b/geosongpu_ci/pipeline/aquaplanet.py @@ -2,9 +2,15 @@ from geosongpu_ci.utils.environment import Environment from geosongpu_ci.utils.registry import Registry from geosongpu_ci.actions.slurm import wait_for_sbatch -from geosongpu_ci.pipeline.geos import copy_input_from_project +from geosongpu_ci.pipeline.geos import copy_input_to_experiment_directory +from geosongpu_ci.actions.pipeline import PipelineAction from geosongpu_ci.utils.shell import ShellScript +from geosongpu_ci.tools.benchmark.geos_log_parser import parse_geos_log +from geosongpu_ci.tools.benchmark.report import report from typing import Dict, Any +import glob +import os +import shutil def _replace_in_file(url: str, text_to_replace: str, new_text: str): @@ -15,6 +21,49 @@ def _replace_in_file(url: str, text_to_replace: str, new_text: str): f.write(data) +def _simulate( + experiment_directory: str, + setup_sh: str, + cap_rc: str, + log_pattern: str, + fv3_dacemode: str, +): + # Execute caching step on 6 GPUs + ShellScript("run_script_gpu").write( + shell_commands=[ + f"cd {experiment_directory}", + f"./{setup_sh}", + f"cp -f {cap_rc} CAP.rc", + ] + ).execute(remove_after_execution=True) + _replace_in_file( + url=f"{experiment_directory}/gcm_run.j", + text_to_replace="#SBATCH --output=slurm-%j-%%x.out", + new_text=log_pattern, + ) + _replace_in_file( + url=f"{experiment_directory}/gcm_run.j", + text_to_replace="setenv FV3_DACEMODE BuildAndRun", + new_text=f"setenv FV3_DACEMODE {fv3_dacemode}", + ) + sbatch_result = ( + ShellScript("run_script_gpu") + .write( + shell_commands=[ + f"cd {experiment_directory}", + f"export CUPY_CACHE_DIR={experiment_directory}/.cupy", + "sbatch gcm_run.j", + ] + ) + .execute() + ) + job_id = sbatch_result.split(" ")[-1].strip().replace("\n", "") + wait_for_sbatch(job_id) + + +VALIDATION_RESOLUTION = "C180-L72" + + @Registry.register class Aquaplanet(TaskBase): def run_action( @@ -24,39 +73,164 @@ def run_action( metadata: Dict[str, Any], ): geos = env.get("GEOS_BASE_DIRECTORY") - layout = "1x1" - experiment_dir = copy_input_from_project(config, geos, layout) - _replace_in_file( - url=f"{experiment_dir}/gcm_run.j", - text_to_replace="setenv GEOSBASE TO_BE_REPLACED", - new_text=f"setenv GEOSBASE {geos}", - ) - _replace_in_file( - url=f"{experiment_dir}/gcm_run.j", - text_to_replace="setenv EXPDIR TO_BE_REPLACED", - new_text=f"setenv EXPDIR {experiment_dir}", - ) + if ( + env.experiment_action == PipelineAction.All + or env.experiment_action == PipelineAction.Validation + ): + # Prepare experiment directory + resolution = VALIDATION_RESOLUTION + experiment_dir = copy_input_to_experiment_directory( + input_directory=config["input"][VALIDATION_RESOLUTION], + geos_directory=geos, + resolution=VALIDATION_RESOLUTION, + trigger_reset=True, + ) + + # Modify all gcm_run.j.X with directory information + gcm_runs = glob.glob(f"{experiment_dir}/{resolution}/gcm_run.j.*") + for gcm_run in gcm_runs: + _replace_in_file( + url=gcm_run, + text_to_replace="setenv GEOSBASE TO_BE_REPLACED", + new_text=f"setenv GEOSBASE {geos}", + ) + _replace_in_file( + url=gcm_run, + text_to_replace="setenv EXPDIR TO_BE_REPLACED", + new_text=f"setenv EXPDIR {experiment_dir}", + ) - sbatch_result = ( - ShellScript("run_script_gpu") - .write( - shell_commands=[ - f"cd {experiment_dir}", - f"export CUPY_CACHE_DIR={experiment_dir}/.cupy", - "sbatch gcm_run.j", - ] + # Execute caching step on 6 GPUs + _simulate( + experiment_directory=experiment_dir, + setup_sh="setup_1.5nodes_gpu.sh", + cap_rc="CAP.rc.1ts", + log_pattern="validation.cache.dacegpu.%t.out", + fv3_dacemode="BuildAndRun", + ) + + # Run for 12h on 6 GPUs + _simulate( + experiment_directory=experiment_dir, + setup_sh="setup_1.5nodes_gpu.sh", + cap_rc="CAP.rc.12hours", + log_pattern="validation.12hours.dacegpu.%t.out", + fv3_dacemode="Run", ) - .execute() - ) - job_id = sbatch_result.split(" ")[-1].strip().replace("\n", "") - wait_for_sbatch(job_id) + if ( + env.experiment_action == PipelineAction.All + or env.experiment_action == PipelineAction.Benchmark + ): + if ( + resolution == VALIDATION_RESOLUTION + and env.experiment_action == PipelineAction.All + ): + # Experiment directory is already present and backend cached + experiment_dir = f"{geos}/experiment/{resolution}" + else: + # Build experiment directory + experiment_dir = copy_input_to_experiment_directory( + input_directory=config["input"][VALIDATION_RESOLUTION], + geos_directory=geos, + resolution=VALIDATION_RESOLUTION, + trigger_reset=True, + ) + + # Modify all gcm_run.j.X with directory information + gcm_runs = glob.glob(f"{experiment_dir}/{resolution}/gcm_run.j.*") + for gcm_run in gcm_runs: + _replace_in_file( + url=gcm_run, + text_to_replace="setenv GEOSBASE TO_BE_REPLACED", + new_text=f"setenv GEOSBASE {geos}", + ) + _replace_in_file( + url=gcm_run, + text_to_replace="setenv EXPDIR TO_BE_REPLACED", + new_text=f"setenv EXPDIR {experiment_dir}", + ) + + _simulate( + experiment_directory=experiment_dir, + setup_sh="setup_1.5nodes_gpu.sh", + cap_rc="CAP.rc.1ts", + log_pattern="benchmark.cache.dacegpu.%t.out", + fv3_dacemode="BuildAndRun", + ) + + # Execute 1 day run on 6 GPUs + _simulate( + experiment_directory=experiment_dir, + setup_sh="setup_1.5nodes_gpu.sh", + cap_rc="CAP.rc.1day", + log_pattern="benchmark.1day.dacegpu.%t.out", + fv3_dacemode="Run", + ) + + # Execute 1 day run on 72 CPUs (fortran) + _simulate( + experiment_directory=experiment_dir, + setup_sh="setup_1.5nodes_cpu.sh", + cap_rc="CAP.rc.1day", + log_pattern="benchmark.1day.fortran.%t.out", + fv3_dacemode="Run", + ) def check( self, config: Dict[str, Any], env: Environment, ) -> bool: - # TODO + # Setup + geos_path = env.get("GEOS_BASE_DIRECTORY") + geos_experiment_path = f"{geos_path}/experiment" + artifact_directory = f"{env.artifact_directory}/held_suarez/" + os.makedirs(artifact_directory, exist_ok=True) + + # Metadata + ci_metadata_rpath = f"{geos_path}/../ci_metadata" + file_exists = os.path.isfile(ci_metadata_rpath) + if not file_exists: + raise RuntimeError( + "Held-Suarez run didn't write ci_metadata at " + f"{ci_metadata_rpath}. " + "Coding or Permission error." + ) + shutil.copy(ci_metadata_rpath, artifact_directory) + + # Validation artefact save + if ( + env.experiment_action == PipelineAction.Validation + or env.experiment_action == PipelineAction.All + ): + logs = glob.glob( + f"{geos_experiment_path}/{VALIDATION_RESOLUTION}/validation.*" + ) + validation_artifact = f"{artifact_directory}/Validation/" + os.makedirs(validation_artifact, exist_ok=True) + for log in logs: + shutil.copy(log, validation_artifact) + + # Benchmark artifact save & analysis + if ( + env.experiment_action == PipelineAction.Benchmark + or env.experiment_action == PipelineAction.All + ): + for resolution in ["C180-L72"]: + logs = glob.glob(f"{geos_experiment_path}/{resolution}/benchmark.*") + benchmark_artifact = f"{artifact_directory}/Benchmark/{resolution}" + os.makedirs(benchmark_artifact, exist_ok=True) + bench_raw_data = [] + for log in logs: + shutil.copy(log, benchmark_artifact) + # Grab all rank 0 that are not caching runs + if ".0.out" in log and "cache" not in log: + bench_raw_data.append(parse_geos_log(log)) + benchmark_report = report(bench_raw_data) + print(benchmark_report) + with open(f"{benchmark_artifact}/report_benchmark.out", "w") as f: + f.write(str(benchmark_report)) + return True diff --git a/geosongpu_ci/pipeline/geos.py b/geosongpu_ci/pipeline/geos.py index 0672d5b..c66cc86 100644 --- a/geosongpu_ci/pipeline/geos.py +++ b/geosongpu_ci/pipeline/geos.py @@ -1,5 +1,5 @@ from geosongpu_ci.utils.environment import Environment -from typing import Dict, Any +from typing import Dict, Any, Optional from geosongpu_ci.pipeline.task import TaskBase from geosongpu_ci.utils.shell import ShellScript from geosongpu_ci.utils.registry import Registry @@ -33,6 +33,7 @@ def set_python_environment( geos_install_dir: str, working_directory: str = ".", ) -> ShellScript: + """Set a python environment using FVDycore GridComp auto-env script""" geos_fvdycore_comp = ( f"{geos_directory}/src/Components/@GEOSgcm_GridComp/" "GEOSagcm_GridComp/GEOSsuperdyn_GridComp/@FVdycoreCubed_GridComp" @@ -52,6 +53,40 @@ def set_python_environment( return set_env +def copy_input_to_experiment_directory( + input_directory: str, + geos_directory: str, + resolution: str, + experiment_name: Optional[str] = None, + trigger_reset: bool = False, +) -> str: + """Copy the input directory into the experiment directory. + + Optionally, trigger the "reset.sh" to get data clean and ready to execute. + """ + if experiment_name: + experiment_dir = f"{geos_directory}/experiment/{experiment_name}" + else: + experiment_dir = f"{geos_directory}/experiment/{resolution}" + + if trigger_reset: + reset_cmd = "./reset.sh" + else: + reset_cmd = "" + + ShellScript(f"copy_input_{resolution}").write( + modules=[], + shell_commands=[ + f"cd {geos_directory}", + f"mkdir -p {experiment_dir}", + f"cd {experiment_dir}", + f"cp -r {input_directory}/* .", + reset_cmd, + ], + ).execute() + return experiment_dir + + @Registry.register class GEOS(TaskBase): def run_action( @@ -119,18 +154,3 @@ def check( env: Environment, ) -> bool: return _check(env) - - -def copy_input_from_project(config: Dict[str, Any], geos_dir: str, layout: str) -> str: - # Copy input - input_config = config["input"] - experiment_dir = f"{geos_dir}/experiment/l{layout}" - ShellScript("copy_input").write( - shell_commands=[ - f"cd {geos_dir}", - f"mkdir -p {geos_dir}/experiment/l{layout}", - f"cd {experiment_dir}", - f"cp -r {input_config['directory']}/l{layout}/* .", - ], - ).execute() - return experiment_dir diff --git a/geosongpu_ci/pipeline/held_suarez.py b/geosongpu_ci/pipeline/held_suarez.py index 0064794..5985eec 100644 --- a/geosongpu_ci/pipeline/held_suarez.py +++ b/geosongpu_ci/pipeline/held_suarez.py @@ -5,12 +5,15 @@ from geosongpu_ci.actions.pipeline import PipelineAction from geosongpu_ci.actions.slurm import SlurmConfiguration from geosongpu_ci.utils.shell import ShellScript -from geosongpu_ci.pipeline.geos import set_python_environment +from geosongpu_ci.pipeline.geos import ( + set_python_environment, + copy_input_to_experiment_directory, +) from geosongpu_ci.pipeline.gtfv3_config import GTFV3Config from geosongpu_ci.utils.progress import Progress from geosongpu_ci.tools.benchmark.geos_log_parser import parse_geos_log from geosongpu_ci.tools.benchmark.report import report -from typing import Dict, Any, Optional +from typing import Dict, Any import shutil import os import dataclasses @@ -75,32 +78,6 @@ def _copy_executable_script( ) -def _copy_input_to_experiment_directory( - input_directory: str, - geos_directory: str, - resolution: str, - experiment_name: Optional[str] = None, -) -> str: - """Copy the input directory into the experiment direcotry - and trigger the "reset.sh" to get data clean and ready to execute. - """ - if experiment_name: - experiment_dir = f"{geos_directory}/experiment/{experiment_name}" - else: - experiment_dir = f"{geos_directory}/experiment/{resolution}" - ShellScript(f"copy_input_{resolution}").write( - modules=[], - shell_commands=[ - f"cd {geos_directory}", - f"mkdir -p {experiment_dir}", - f"cd {experiment_dir}", - f"cp -r {input_directory}/* .", - "./reset.sh", - ], - ).execute() - return experiment_dir - - def _make_srun_script( executable_name: str, experiment_directory: str, @@ -162,10 +139,11 @@ def run_action( or env.experiment_action == PipelineAction.All ): # Get experiment directory ready - experiment_dir = _copy_input_to_experiment_directory( + experiment_dir = copy_input_to_experiment_directory( input_directory=config["input"][VALIDATION_RESOLUTION], geos_directory=geos, resolution=VALIDATION_RESOLUTION, + trigger_reset=True, ) prolog_scripts = PrologScripts( experiment_directory=experiment_dir, @@ -208,9 +186,9 @@ def run_action( ): # We run a range of resolution. C180-L72 might already be ran for resolution in ["C180-L72", "C180-L91", "C180-L137"]: - if resolution == VALIDATION_RESOLUTION and ( - env.experiment_action == PipelineAction.Validation - or env.experiment_action == PipelineAction.All + if ( + resolution == VALIDATION_RESOLUTION + and env.experiment_action == PipelineAction.All ): # In case validation ran already, we have the experiment dir # and the cache ready to run @@ -223,10 +201,11 @@ def run_action( ) else: # Get experiment directory ready - experiment_dir = _copy_input_to_experiment_directory( + experiment_dir = copy_input_to_experiment_directory( input_directory=config["input"][resolution], geos_directory=geos, resolution=resolution, + trigger_reset=True, ) prolog_scripts = PrologScripts( experiment_directory=experiment_dir, diff --git a/geosongpu_ci/tools/benchmark/report.py b/geosongpu_ci/tools/benchmark/report.py index 0b02b7e..1461a12 100644 --- a/geosongpu_ci/tools/benchmark/report.py +++ b/geosongpu_ci/tools/benchmark/report.py @@ -70,9 +70,15 @@ def sankey_plot_of_gridcomp(raw_data: BenchmarkRawData, filename: str, title: st def _comparison_in_X(value_A: float, value_B: float, label: str) -> str: if value_A > value_B: - return f"{label}: 1.00x - {(value_A/value_B):.2f}x\n" + return ( + f"{label}: 1.00x ({value_A:.2f}s) - " + f"{(value_A/value_B):.2f}x ({value_B:.2f}s)\n" + ) else: - return f"{label}: {(value_B/value_A):.2f}x - 1.00x\n" + return ( + f"{label}: {(value_B/value_A):.2f}x ({value_A:.2f}s) - " + f"1.00x ({value_B:.2f}s)\n" + ) def report(raw_data: List[BenchmarkRawData]) -> BenchmarkReport: @@ -93,12 +99,12 @@ def report(raw_data: List[BenchmarkRawData]) -> BenchmarkReport: report.setup = ( "Experiment: \n" f" Resolution: C{grid_resolution[0]}-L{grid_resolution[2]}\n" - " Layouts:\n" + " Layouts:\n" ) for bench_data in raw_data: report.setup += ( - f" - {bench_data.backend}:" + f" - {bench_data.backend}:" f"{bench_data.node_setup[0]}x{bench_data.node_setup[1]}" f", {bench_data.node_setup[2]} ranks\n" ) From c1a9ed88e5039d931a0c341f38ce556490e8e96f Mon Sep 17 00:00:00 2001 From: Florian Deconinck Date: Tue, 1 Aug 2023 15:49:52 -0400 Subject: [PATCH 02/12] Add module-level cli to AQ --- geosongpu_ci/pipeline/aquaplanet.py | 54 +++++++++++++++++++++++++++- geosongpu_ci/pipeline/held_suarez.py | 7 ++-- geosongpu_ci/pipeline/task.py | 6 +++- 3 files changed, 60 insertions(+), 7 deletions(-) diff --git a/geosongpu_ci/pipeline/aquaplanet.py b/geosongpu_ci/pipeline/aquaplanet.py index 1c89dd3..337fbfa 100644 --- a/geosongpu_ci/pipeline/aquaplanet.py +++ b/geosongpu_ci/pipeline/aquaplanet.py @@ -1,4 +1,5 @@ -from geosongpu_ci.pipeline.task import TaskBase +import click +from geosongpu_ci.pipeline.task import TaskBase, get_config from geosongpu_ci.utils.environment import Environment from geosongpu_ci.utils.registry import Registry from geosongpu_ci.actions.slurm import wait_for_sbatch @@ -234,3 +235,54 @@ def check( f.write(str(benchmark_report)) return True + + +@click.command() +@click.argument("step") +@click.argument("geos_base_directory") +@click.option("--action", default="Validation") +@click.option("--artifact", default=".", help="Artifact directory for results storage") +@click.option( + "--setup_only", + is_flag=True, + help="Setup the experiment but skip any long running jobs (build, run...)", +) +def cli( + step: str, geos_base_directory: str, action: str, artifact: str, setup_only: bool +): + # Validation step + if step not in TaskBase.step_options(): + raise click.BadArgumentUsage( + f"step needs to be from {TaskBase.step_options()} (given: {step})" + ) + + print( + "Running Aquaplanet:\n" + f" step: {step}\n" + f" action: {action}\n" + f" artifact: {artifact}\n" + f" setup only: {setup_only}" + ) + + # Rebuild the basics + experience_name = "geos_aq" + task = Registry.registry["Aquaplanet"]() + config = get_config(experience_name) + env = Environment( + experience_name=experience_name, + experiment_action=PipelineAction[action], + artifact_directory=artifact, + setup_only=setup_only, + ) + env.set("GEOS_BASE_DIRECTORY", geos_base_directory) + + if step == "all" or step == "run": + task.run(config, env) + elif step == "all" or step == "check": + task.check(config, env) + else: + RuntimeError(f"Coding error. Step {step} unknown on AQ cli") + + +if __name__ == "__main__": + cli() diff --git a/geosongpu_ci/pipeline/held_suarez.py b/geosongpu_ci/pipeline/held_suarez.py index 5985eec..f067fe7 100644 --- a/geosongpu_ci/pipeline/held_suarez.py +++ b/geosongpu_ci/pipeline/held_suarez.py @@ -352,9 +352,6 @@ def check( return True -CLI_STEP_OPTIONS = ["all", "run", "check"] - - @click.command() @click.argument("step") @click.argument("geos_base_directory") @@ -369,9 +366,9 @@ def cli( step: str, geos_base_directory: str, action: str, artifact: str, setup_only: bool ): # Validation step - if step not in ["all", "run", "check"]: + if step not in TaskBase.step_options(): raise click.BadArgumentUsage( - f"step needs to be from {CLI_STEP_OPTIONS} (given: {step})" + f"step needs to be from {TaskBase.step_options()} (given: {step})" ) print( diff --git a/geosongpu_ci/pipeline/task.py b/geosongpu_ci/pipeline/task.py index d57e6f4..a174c26 100644 --- a/geosongpu_ci/pipeline/task.py +++ b/geosongpu_ci/pipeline/task.py @@ -1,4 +1,4 @@ -from typing import Dict, Any +from typing import Dict, Any, Iterable from geosongpu_ci.actions.pipeline import PipelineAction import sys import site @@ -69,6 +69,10 @@ def check( ) -> bool: ... + @staticmethod + def step_options() -> Iterable[str]: + return ["all", "run", "check"] + def _find_experiments() -> str: # pip install geosongpu-ci From dc623f9a35096fc1ab17f3f044b99d2c2576b62b Mon Sep 17 00:00:00 2001 From: Florian Deconinck Date: Tue, 1 Aug 2023 16:20:00 -0400 Subject: [PATCH 03/12] reset.sh does not exist for AQ --- geosongpu_ci/pipeline/aquaplanet.py | 1 - 1 file changed, 1 deletion(-) diff --git a/geosongpu_ci/pipeline/aquaplanet.py b/geosongpu_ci/pipeline/aquaplanet.py index 337fbfa..4521687 100644 --- a/geosongpu_ci/pipeline/aquaplanet.py +++ b/geosongpu_ci/pipeline/aquaplanet.py @@ -85,7 +85,6 @@ def run_action( input_directory=config["input"][VALIDATION_RESOLUTION], geos_directory=geos, resolution=VALIDATION_RESOLUTION, - trigger_reset=True, ) # Modify all gcm_run.j.X with directory information From 8f67e3f821dd5714d1dc894214d6af1bab7fc5b6 Mon Sep 17 00:00:00 2001 From: Florian Deconinck Date: Tue, 1 Aug 2023 16:28:44 -0400 Subject: [PATCH 04/12] Missed --- geosongpu_ci/pipeline/aquaplanet.py | 1 - 1 file changed, 1 deletion(-) diff --git a/geosongpu_ci/pipeline/aquaplanet.py b/geosongpu_ci/pipeline/aquaplanet.py index 4521687..07c23d4 100644 --- a/geosongpu_ci/pipeline/aquaplanet.py +++ b/geosongpu_ci/pipeline/aquaplanet.py @@ -135,7 +135,6 @@ def run_action( input_directory=config["input"][VALIDATION_RESOLUTION], geos_directory=geos, resolution=VALIDATION_RESOLUTION, - trigger_reset=True, ) # Modify all gcm_run.j.X with directory information From ba6f1cb53a74f1ccebf3e7b2e319d02feee87ea8 Mon Sep 17 00:00:00 2001 From: Florian Deconinck Date: Tue, 1 Aug 2023 16:49:44 -0400 Subject: [PATCH 05/12] Move sbatch wait to be an option on .execute() --- geosongpu_ci/actions/slurm.py | 14 -------------- geosongpu_ci/pipeline/aquaplanet.py | 22 ++++++++-------------- geosongpu_ci/utils/shell.py | 24 +++++++++++++++++++++++- 3 files changed, 31 insertions(+), 29 deletions(-) diff --git a/geosongpu_ci/actions/slurm.py b/geosongpu_ci/actions/slurm.py index 1b10453..cbba7ff 100644 --- a/geosongpu_ci/actions/slurm.py +++ b/geosongpu_ci/actions/slurm.py @@ -65,17 +65,3 @@ def one_half_Nodes_CPU(cls) -> "SlurmConfiguration": ntasks_per_node=48, sockets_per_node=2, ) - - -def wait_for_sbatch(job_id: str): - sleep(60) # wait 60 seconds for SLURM to enter prolog - running = True - while running: - sacct_result = subprocess.run( - ["sacct", "-j", job_id, "-o", "state"], stdout=subprocess.PIPE - ).stdout.decode("utf-8") - running = False - for state in sacct_result.split("\n"): - if "RUNNING" in state.strip or "PENDING" in state: - running = True - break diff --git a/geosongpu_ci/pipeline/aquaplanet.py b/geosongpu_ci/pipeline/aquaplanet.py index 07c23d4..9a64c42 100644 --- a/geosongpu_ci/pipeline/aquaplanet.py +++ b/geosongpu_ci/pipeline/aquaplanet.py @@ -47,19 +47,13 @@ def _simulate( text_to_replace="setenv FV3_DACEMODE BuildAndRun", new_text=f"setenv FV3_DACEMODE {fv3_dacemode}", ) - sbatch_result = ( - ShellScript("run_script_gpu") - .write( - shell_commands=[ - f"cd {experiment_directory}", - f"export CUPY_CACHE_DIR={experiment_directory}/.cupy", - "sbatch gcm_run.j", - ] - ) - .execute() - ) - job_id = sbatch_result.split(" ")[-1].strip().replace("\n", "") - wait_for_sbatch(job_id) + ShellScript("run_sbatch_gpu").write( + shell_commands=[ + f"cd {experiment_directory}", + f"export CUPY_CACHE_DIR={experiment_directory}/.cupy", + "sbatch gcm_run.j", + ] + ).execute(sbatch=True) VALIDATION_RESOLUTION = "C180-L72" @@ -88,7 +82,7 @@ def run_action( ) # Modify all gcm_run.j.X with directory information - gcm_runs = glob.glob(f"{experiment_dir}/{resolution}/gcm_run.j.*") + gcm_runs = glob.glob(f"{experiment_dir}/gcm_run.j.*") for gcm_run in gcm_runs: _replace_in_file( url=gcm_run, diff --git a/geosongpu_ci/utils/shell.py b/geosongpu_ci/utils/shell.py index c914e1d..757d455 100644 --- a/geosongpu_ci/utils/shell.py +++ b/geosongpu_ci/utils/shell.py @@ -3,6 +3,7 @@ import os import stat from geosongpu_ci.utils.progress import Progress +from time import sleep class ShellScript: @@ -52,13 +53,34 @@ def write( self._make_executable() return self - def execute(self, remove_after_execution: bool = False) -> Any: + def execute( + self, + remove_after_execution: bool = False, + sbatch: bool = False, + ) -> Any: # Execute result = self._execute_shell_script() + if sbatch: + self._sbatch_wait(result) if remove_after_execution: os.remove(self.path) return result + def _sbatch_wait(self, result_of_sbatch_call: str) -> Any: + # TODO: check that result_of_sbatch_call is actually correct + job_id = result_of_sbatch_call.split(" ")[-1].strip().replace("\n", "") + sleep(30) # wait 30 seconds for SLURM to enter prolog + running = True + while running: + sacct_result = subprocess.run( + ["sacct", "-j", job_id, "-o", "state"], stdout=subprocess.PIPE + ).stdout.decode("utf-8") + running = False + for state in sacct_result.split("\n"): + if "RUNNING" in state.strip() or "PENDING" in state.strip(): + running = True + break + def _make_executable(self): st = os.stat(self.path) os.chmod(self.path, st.st_mode | stat.S_IEXEC) From 482df85219d068bbdbb27dbb0870abb97e731f84 Mon Sep 17 00:00:00 2001 From: Florian Deconinck Date: Tue, 1 Aug 2023 16:54:12 -0400 Subject: [PATCH 06/12] Missing post-refactor cleanup --- geosongpu_ci/actions/slurm.py | 2 -- geosongpu_ci/pipeline/aquaplanet.py | 1 - 2 files changed, 3 deletions(-) diff --git a/geosongpu_ci/actions/slurm.py b/geosongpu_ci/actions/slurm.py index cbba7ff..02a6438 100644 --- a/geosongpu_ci/actions/slurm.py +++ b/geosongpu_ci/actions/slurm.py @@ -1,5 +1,3 @@ -import subprocess -from time import sleep from dataclasses import dataclass diff --git a/geosongpu_ci/pipeline/aquaplanet.py b/geosongpu_ci/pipeline/aquaplanet.py index 9a64c42..f202ca9 100644 --- a/geosongpu_ci/pipeline/aquaplanet.py +++ b/geosongpu_ci/pipeline/aquaplanet.py @@ -2,7 +2,6 @@ from geosongpu_ci.pipeline.task import TaskBase, get_config from geosongpu_ci.utils.environment import Environment from geosongpu_ci.utils.registry import Registry -from geosongpu_ci.actions.slurm import wait_for_sbatch from geosongpu_ci.pipeline.geos import copy_input_to_experiment_directory from geosongpu_ci.actions.pipeline import PipelineAction from geosongpu_ci.utils.shell import ShellScript From 00b0cb9670172419f771d658b41bb5d7c6c8d1b0 Mon Sep 17 00:00:00 2001 From: Florian Deconinck Date: Wed, 2 Aug 2023 14:26:12 -0400 Subject: [PATCH 07/12] Refactor to allow easier duplication of simulate --- geosongpu_ci/pipeline/aquaplanet.py | 159 +++++++++++++++------------- geosongpu_ci/utils/shell.py | 17 +-- 2 files changed, 94 insertions(+), 82 deletions(-) diff --git a/geosongpu_ci/pipeline/aquaplanet.py b/geosongpu_ci/pipeline/aquaplanet.py index f202ca9..e874d96 100644 --- a/geosongpu_ci/pipeline/aquaplanet.py +++ b/geosongpu_ci/pipeline/aquaplanet.py @@ -14,6 +14,7 @@ def _replace_in_file(url: str, text_to_replace: str, new_text: str): + data = None with open(url, "r") as f: data = f.read() data = data.replace(text_to_replace, new_text) @@ -21,45 +22,84 @@ def _replace_in_file(url: str, text_to_replace: str, new_text: str): f.write(data) -def _simulate( - experiment_directory: str, - setup_sh: str, - cap_rc: str, - log_pattern: str, - fv3_dacemode: str, -): - # Execute caching step on 6 GPUs - ShellScript("run_script_gpu").write( - shell_commands=[ - f"cd {experiment_directory}", - f"./{setup_sh}", - f"cp -f {cap_rc} CAP.rc", - ] - ).execute(remove_after_execution=True) - _replace_in_file( - url=f"{experiment_directory}/gcm_run.j", - text_to_replace="#SBATCH --output=slurm-%j-%%x.out", - new_text=log_pattern, - ) - _replace_in_file( - url=f"{experiment_directory}/gcm_run.j", - text_to_replace="setenv FV3_DACEMODE BuildAndRun", - new_text=f"setenv FV3_DACEMODE {fv3_dacemode}", - ) - ShellScript("run_sbatch_gpu").write( - shell_commands=[ - f"cd {experiment_directory}", - f"export CUPY_CACHE_DIR={experiment_directory}/.cupy", - "sbatch gcm_run.j", - ] - ).execute(sbatch=True) - - VALIDATION_RESOLUTION = "C180-L72" @Registry.register class Aquaplanet(TaskBase): + def __init__(self, skip_metadata=False) -> None: + super().__init__(skip_metadata) + self._gcm_run_experiment = None + + def prepare_experiment( + self, + input_directory: str, + geos_directory: str, + ) -> str: + experiment_directory = copy_input_to_experiment_directory( + input_directory=input_directory, + geos_directory=geos_directory, + resolution=VALIDATION_RESOLUTION, + ) + + # Modify all gcm_run.j.X with directory information + gcm_runs = glob.glob(f"{experiment_directory}/gcm_run.j.*") + for gcm_run in gcm_runs: + _replace_in_file( + url=gcm_run, + text_to_replace="setenv GEOSBASE TO_BE_REPLACED", + new_text=f"setenv GEOSBASE {geos_directory}", + ) + _replace_in_file( + url=gcm_run, + text_to_replace="setenv EXPDIR TO_BE_REPLACED", + new_text=f"setenv EXPDIR {experiment_directory}", + ) + + self._gcm_run_experiment = experiment_directory + return experiment_directory + + def simulate( + self, + experiment_directory: str, + setup_sh: str, + cap_rc: str, + log_pattern: str, + fv3_dacemode: str, + ): + # Check we have gcm_run prepared correctly + if not self._gcm_run_experiment != experiment_directory: + raise RuntimeError( + f"Aquaplanet setup for experiment {self._gcm_run_experiment} " + f" instead of {experiment_directory}. Abort simulation." + ) + + # Execute caching step on 6 GPUs + ShellScript("temporary_setup").write( + shell_commands=[ + f"cd {experiment_directory}", + f"./{setup_sh}", + f"cp -f {cap_rc} CAP.rc", + ] + ).execute(remove_after_execution=True) + _replace_in_file( + url=f"{experiment_directory}/gcm_run.j", + text_to_replace="#SBATCH --output=slurm-%j-%x.out", + new_text=f"#SBATCH --output={log_pattern}", + ) + _replace_in_file( + url=f"{experiment_directory}/gcm_run.j", + text_to_replace="setenv FV3_DACEMODE BuildAndRun", + new_text=f"setenv FV3_DACEMODE {fv3_dacemode}", + ) + ShellScript("run_sbatch_gpu").write( + shell_commands=[ + f"cd {experiment_directory}", + f"export CUPY_CACHE_DIR={experiment_directory}/.cupy", + "sbatch gcm_run.j", + ] + ).execute(sbatch=True) + def run_action( self, config: Dict[str, Any], @@ -67,6 +107,7 @@ def run_action( metadata: Dict[str, Any], ): geos = env.get("GEOS_BASE_DIRECTORY") + validation_experiment_dir = None if ( env.experiment_action == PipelineAction.All @@ -74,28 +115,14 @@ def run_action( ): # Prepare experiment directory resolution = VALIDATION_RESOLUTION - experiment_dir = copy_input_to_experiment_directory( + experiment_dir = self.prepare_experiment( input_directory=config["input"][VALIDATION_RESOLUTION], geos_directory=geos, - resolution=VALIDATION_RESOLUTION, ) - - # Modify all gcm_run.j.X with directory information - gcm_runs = glob.glob(f"{experiment_dir}/gcm_run.j.*") - for gcm_run in gcm_runs: - _replace_in_file( - url=gcm_run, - text_to_replace="setenv GEOSBASE TO_BE_REPLACED", - new_text=f"setenv GEOSBASE {geos}", - ) - _replace_in_file( - url=gcm_run, - text_to_replace="setenv EXPDIR TO_BE_REPLACED", - new_text=f"setenv EXPDIR {experiment_dir}", - ) + validation_experiment_dir = experiment_dir # Execute caching step on 6 GPUs - _simulate( + self.simulate( experiment_directory=experiment_dir, setup_sh="setup_1.5nodes_gpu.sh", cap_rc="CAP.rc.1ts", @@ -104,7 +131,7 @@ def run_action( ) # Run for 12h on 6 GPUs - _simulate( + self.simulate( experiment_directory=experiment_dir, setup_sh="setup_1.5nodes_gpu.sh", cap_rc="CAP.rc.12hours", @@ -121,30 +148,14 @@ def run_action( and env.experiment_action == PipelineAction.All ): # Experiment directory is already present and backend cached - experiment_dir = f"{geos}/experiment/{resolution}" + experiment_dir = validation_experiment_dir else: - # Build experiment directory - experiment_dir = copy_input_to_experiment_directory( + # Build experiment directory & run cache + experiment_dir = self.prepare_experiment( input_directory=config["input"][VALIDATION_RESOLUTION], geos_directory=geos, - resolution=VALIDATION_RESOLUTION, ) - - # Modify all gcm_run.j.X with directory information - gcm_runs = glob.glob(f"{experiment_dir}/{resolution}/gcm_run.j.*") - for gcm_run in gcm_runs: - _replace_in_file( - url=gcm_run, - text_to_replace="setenv GEOSBASE TO_BE_REPLACED", - new_text=f"setenv GEOSBASE {geos}", - ) - _replace_in_file( - url=gcm_run, - text_to_replace="setenv EXPDIR TO_BE_REPLACED", - new_text=f"setenv EXPDIR {experiment_dir}", - ) - - _simulate( + self.simulate( experiment_directory=experiment_dir, setup_sh="setup_1.5nodes_gpu.sh", cap_rc="CAP.rc.1ts", @@ -153,7 +164,7 @@ def run_action( ) # Execute 1 day run on 6 GPUs - _simulate( + self.simulate( experiment_directory=experiment_dir, setup_sh="setup_1.5nodes_gpu.sh", cap_rc="CAP.rc.1day", @@ -162,7 +173,7 @@ def run_action( ) # Execute 1 day run on 72 CPUs (fortran) - _simulate( + self.simulate( experiment_directory=experiment_dir, setup_sh="setup_1.5nodes_cpu.sh", cap_rc="CAP.rc.1day", diff --git a/geosongpu_ci/utils/shell.py b/geosongpu_ci/utils/shell.py index 757d455..b139148 100644 --- a/geosongpu_ci/utils/shell.py +++ b/geosongpu_ci/utils/shell.py @@ -59,9 +59,11 @@ def execute( sbatch: bool = False, ) -> Any: # Execute - result = self._execute_shell_script() - if sbatch: - self._sbatch_wait(result) + with Progress(self.name): + result = self._execute_shell_script() + if sbatch: + self._sbatch_wait(result) + # Remove if this is not important to keep if remove_after_execution: os.remove(self.path) return result @@ -86,11 +88,10 @@ def _make_executable(self): os.chmod(self.path, st.st_mode | stat.S_IEXEC) def _execute_shell_script(self) -> str: - with Progress(self.name): - _make_executable(self.path) - result = run_subprocess( - self.path, - ) + _make_executable(self.path) + result = run_subprocess( + self.path, + ) return result From e986965f68b73c007717224531f278e9357ef896 Mon Sep 17 00:00:00 2001 From: Florian Deconinck Date: Wed, 2 Aug 2023 14:26:53 -0400 Subject: [PATCH 08/12] Refactor Held-Suarez to allow simulate from outside Add cli "bemchmark" option --- geosongpu_ci/actions/slurm.py | 9 +- geosongpu_ci/pipeline/gtfv3_config.py | 5 +- geosongpu_ci/pipeline/held_suarez.py | 368 +++++++++++++++++--------- 3 files changed, 250 insertions(+), 132 deletions(-) diff --git a/geosongpu_ci/actions/slurm.py b/geosongpu_ci/actions/slurm.py index 02a6438..44cb62c 100644 --- a/geosongpu_ci/actions/slurm.py +++ b/geosongpu_ci/actions/slurm.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +from typing import Optional @dataclass @@ -43,23 +44,25 @@ def srun_bash(self, wrapper: str, executable_name: str) -> str: ) @classmethod - def one_half_nodes_GPU(cls) -> "SlurmConfiguration": + def one_half_nodes_GPU(cls, output: Optional[str] = None) -> "SlurmConfiguration": """1/2 node configuration on Discover with A100 & Rome Epyc""" - return SlurmConfiguration( + return cls( nodes=2, ntasks=6, ntasks_per_node=3, sockets_per_node=2, gpus_per_node=3, mem_per_gpu="40G", + output=output or cls.output, ) @classmethod - def one_half_Nodes_CPU(cls) -> "SlurmConfiguration": + def one_half_Nodes_CPU(cls, output: Optional[str] = None) -> "SlurmConfiguration": """1/2 node configuration on Discover with Rome Epyc""" return SlurmConfiguration( nodes=2, ntasks=72, ntasks_per_node=48, sockets_per_node=2, + output=output or cls.output, ) diff --git a/geosongpu_ci/pipeline/gtfv3_config.py b/geosongpu_ci/pipeline/gtfv3_config.py index 9d40102..a56b8c4 100644 --- a/geosongpu_ci/pipeline/gtfv3_config.py +++ b/geosongpu_ci/pipeline/gtfv3_config.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +from typing import Optional @dataclass @@ -21,5 +22,5 @@ def sh(self) -> str: ) @classmethod - def dace_gpu_32_bit_BAR(cls) -> "GTFV3Config": - return GTFV3Config() + def dace_gpu_32_bit_BAR(cls, dacemode: Optional[str] = None) -> "GTFV3Config": + return cls(FV3_DACEMODE=dacemode or cls.FV3_DACEMODE) diff --git a/geosongpu_ci/pipeline/held_suarez.py b/geosongpu_ci/pipeline/held_suarez.py index f067fe7..109a149 100644 --- a/geosongpu_ci/pipeline/held_suarez.py +++ b/geosongpu_ci/pipeline/held_suarez.py @@ -122,6 +122,93 @@ class HeldSuarez(TaskBase): Proposes C180-LXX benchmarking and basic build worthiness validation. """ + executable_name = "./GEOShs.x" + + def _setup_1ts_1node_gtfv3(self, experiment_directory: str) -> ShellScript: + return ShellScript( + name="setup_config_1ts_1node_gtfv3", + working_directory=experiment_directory, + ).write( + shell_commands=[ + f"cd {experiment_directory}", + "cp -f AgcmSimple.rc.1x6.gtfv3 AgcmSimple.rc", + "cp -f input.nml.1x1 input.nml", + "cp -f CAP.rc.1ts CAP.rc", + ], + ) + + def _setup_1day_1node_gtfv3(self, experiment_directory: str) -> ShellScript: + return ShellScript( + name="_setup_config_1day_1node_gtfv3", + working_directory=experiment_directory, + ).write( + shell_commands=[ + f"cd {experiment_directory}", + "cp -f AgcmSimple.rc.1x6.gtfv3 AgcmSimple.rc", + "cp -f input.nml.1x1 input.nml", + "cp -f CAP.rc.1day CAP.rc", + ], + ) + + def _setup_1day_1node_fortran(self, experiment_directory: str) -> ShellScript: + return ShellScript( + name="setup_config_1day_1node_fortran", + working_directory=experiment_directory, + ).write( + shell_commands=[ + f"cd {experiment_directory}", + "cp -f AgcmSimple.rc.3x24.fortran AgcmSimple.rc", + "cp -f input.nml.3x4 input.nml", + "cp -f CAP.rc.1day CAP.rc", + ], + ) + + def prepare_experiment( + self, + input_directory: str, + resolution: str, + geos_directory: str, + geos_install_directory: str, + executable_name: str, + ) -> str: + experiment_dir = copy_input_to_experiment_directory( + input_directory=input_directory, + geos_directory=geos_directory, + resolution=resolution, + trigger_reset=True, + ) + prolog_scripts = PrologScripts( + experiment_directory=experiment_dir, + executable_name=executable_name, + geos_directory=geos_directory, + geos_install_path=geos_install_directory, + ) + return experiment_dir, prolog_scripts + + def simulate( + self, + experiment_directory: str, + executable_name: str, + prolog_scripts: PrologScripts, + slurm_config: SlurmConfiguration, + gtfv3_config: GTFV3Config, + setup_script: ShellScript, + setup_only: bool = False, + ): + srun_script = _make_srun_script( + executable_name=executable_name, + experiment_directory=experiment_directory, + slurm_config=slurm_config, + gtfv3_config=gtfv3_config, + prolog_scripts=prolog_scripts, + ) + + setup_script.execute() + if not setup_only: + srun_script.execute() + else: + Progress.log(f"= = = Skipping {srun_script.name} = = =") + def run_action( self, config: Dict[str, Any], @@ -129,55 +216,40 @@ def run_action( metadata: Dict[str, Any], ): # Setup - geos_install_path = env.get("GEOS_INSTALL_DIRECTORY") + geos_install_directory = env.get("GEOS_INSTALL_DIRECTORY") geos = env.get("GEOS_BASE_DIRECTORY") - executable_name = "./GEOShs.x" + validation_experiment_directory = None + validation_prolog_script = None # # # Validation # # # if ( env.experiment_action == PipelineAction.Validation or env.experiment_action == PipelineAction.All ): - # Get experiment directory ready - experiment_dir = copy_input_to_experiment_directory( + experiment_directory, prolog_scripts = self.prepare_experiment( input_directory=config["input"][VALIDATION_RESOLUTION], - geos_directory=geos, resolution=VALIDATION_RESOLUTION, - trigger_reset=True, - ) - prolog_scripts = PrologScripts( - experiment_directory=experiment_dir, - executable_name=executable_name, geos_directory=geos, - geos_install_path=geos_install_path, + geos_install_directory=geos_install_directory, + executable_name=self.executable_name, ) # Run 1 timestep for cache build - slurm_config = SlurmConfiguration.one_half_nodes_GPU() - slurm_config.output = "validation.cache.dacegpu.%t.out" - srun_script = _make_srun_script( - executable_name=executable_name, - experiment_directory=experiment_dir, - slurm_config=slurm_config, - gtfv3_config=GTFV3Config.dace_gpu_32_bit_BAR(), + self.simulate( + experiment_directory=experiment_directory, + executable_name=self.executable_name, prolog_scripts=prolog_scripts, + slurm_config=SlurmConfiguration.one_half_nodes_GPU( + output="validation.cache.dacegpu.%t.out" + ), + gtfv3_config=GTFV3Config.dace_gpu_32_bit_BAR(), + setup_script=self._setup_1ts_1node_gtfv3(experiment_directory), + setup_only=env.setup_only, ) - ShellScript( - name="setup_config_1ts_1node_gtfv3", - working_directory=experiment_dir, - ).write( - shell_commands=[ - f"cd {experiment_dir}", - "cp -f AgcmSimple.rc.1x6.gtfv3 AgcmSimple.rc", - "cp -f input.nml.1x1 input.nml", - "cp -f CAP.rc.1ts CAP.rc", - ], - ).execute() - if not env.setup_only: - srun_script.execute() - - # TODO: more to be done to actually check on the results rather then - # just "can run". + + # Cache for benchmark + validation_experiment_directory = experiment_directory + validation_prolog_script = prolog_scripts # # # Benchmark # # # if ( @@ -192,107 +264,55 @@ def run_action( ): # In case validation ran already, we have the experiment dir # and the cache ready to run - experiment_dir = f"{geos}/experiment/{resolution}" - prolog_scripts = PrologScripts( - experiment_directory=experiment_dir, - executable_name=executable_name, - geos_directory=geos, - geos_install_path=geos_install_path, - ) + experiment_dir = validation_experiment_directory + prolog_scripts = validation_prolog_script else: - # Get experiment directory ready - experiment_dir = copy_input_to_experiment_directory( + experiment_dir, prolog_scripts = self.prepare_experiment( input_directory=config["input"][resolution], - geos_directory=geos, resolution=resolution, - trigger_reset=True, - ) - prolog_scripts = PrologScripts( - experiment_directory=experiment_dir, - executable_name=executable_name, geos_directory=geos, - geos_install_path=geos_install_path, + geos_install_directory=geos_install_directory, + executable_name=self.executable_name, ) # Run 1 timestep for cache build - slurm_config = SlurmConfiguration.one_half_nodes_GPU() - slurm_config.output = "benchmark.cache.dacegpu.%t.out" - srun_script = _make_srun_script( - executable_name=executable_name, - experiment_directory=experiment_dir, - slurm_config=slurm_config, - gtfv3_config=GTFV3Config.dace_gpu_32_bit_BAR(), + self.simulate( + experiment_directory=experiment_directory, + executable_name=self.executable_name, prolog_scripts=prolog_scripts, + slurm_config=SlurmConfiguration.one_half_nodes_GPU( + output="benchmark.cache.dacegpu.%t.out" + ), + gtfv3_config=GTFV3Config.dace_gpu_32_bit_BAR(), + setup_script=self._setup_1ts_1node_gtfv3(experiment_directory), + setup_only=env.setup_only, ) - ShellScript( - name="setup_config_1ts_1node_gtfv3", - working_directory=experiment_dir, - ).write( - shell_commands=[ - f"cd {experiment_dir}", - "cp -f AgcmSimple.rc.1x6.gtfv3 AgcmSimple.rc", - "cp -f input.nml.1x1 input.nml", - "cp -f CAP.rc.1ts CAP.rc", - ], - ).execute() - if not env.setup_only: - srun_script.execute() - else: - Progress.log(f"= = = Skipping {srun_script.name} = = =") - - # Run 1 day - slurm_config = SlurmConfiguration.one_half_nodes_GPU() - slurm_config.output = "benchmark.1day.dacegpu.%t.out" - gtfv3_config = dataclasses.replace(GTFV3Config.dace_gpu_32_bit_BAR()) - gtfv3_config.FV3_DACEMODE = "Run" - srun_script = _make_srun_script( - executable_name=executable_name, - experiment_directory=experiment_dir, - slurm_config=slurm_config, - gtfv3_config=gtfv3_config, + + # Run 1 day gtfv3 + self.simulate( + experiment_directory=experiment_directory, + executable_name=self.executable_name, prolog_scripts=prolog_scripts, + slurm_config=SlurmConfiguration.one_half_nodes_GPU( + output="benchmark.1day.dacegpu.%t.out" + ), + gtfv3_config=GTFV3Config.dace_gpu_32_bit_BAR(dacemode="Run"), + setup_script=self._setup_1day_1node_gtfv3(experiment_directory), + setup_only=env.setup_only, ) - ShellScript( - name="setup_config_1day_1node_gtfv3", - working_directory=experiment_dir, - ).write( - shell_commands=[ - f"cd {experiment_dir}", - "cp -f AgcmSimple.rc.1x6.gtfv3 AgcmSimple.rc", - "cp -f input.nml.1x1 input.nml", - "cp -f CAP.rc.1day CAP.rc", - ], - ).execute() - if not env.setup_only: - srun_script.execute() - else: - Progress.log(f"= = = Skipping {srun_script.name} = = =") - - # Execute Fortran - slurm_config = SlurmConfiguration.one_half_Nodes_CPU() - slurm_config.output = "benchmark.1day.fortran.%t.out" - srun_script = _make_srun_script( - executable_name=executable_name, - experiment_directory=experiment_dir, - slurm_config=slurm_config, - gtfv3_config=gtfv3_config, + + # Run 1 day Fortran + self.simulate( + experiment_directory=experiment_directory, + executable_name=self.executable_name, prolog_scripts=prolog_scripts, + slurm_config=SlurmConfiguration.one_half_Nodes_CPU( + output="benchmark.1day.dacegpu.%t.out" + ), + gtfv3_config=GTFV3Config.dace_gpu_32_bit_BAR(dacemode="Run"), + setup_script=self._setup_1day_1node_gtfv3(experiment_directory), + setup_only=env.setup_only, ) - ShellScript( - name="setup_config_1day_1node_fortran", - working_directory=experiment_dir, - ).write( - shell_commands=[ - f"cd {experiment_dir}", - "cp -f AgcmSimple.rc.3x24.fortran AgcmSimple.rc", - "cp -f input.nml.3x4 input.nml", - "cp -f CAP.rc.1day CAP.rc", - ], - ).execute() - if not env.setup_only: - srun_script.execute() - else: - Progress.log(f"= = = Skipping {srun_script.name} = = =") def check( self, @@ -352,9 +372,16 @@ def check( return True -@click.command() -@click.argument("step") +@click.group() @click.argument("geos_base_directory") +@click.pass_context +def cli(ctx, geos_base_directory): + ctx.ensure_object(dict) + ctx.obj["geos_base_directory"] = geos_base_directory + + +@cli.command() +@click.argument("step") @click.option("--action", default="Validation") @click.option("--artifact", default=".", help="Artifact directory for results storage") @click.option( @@ -362,9 +389,10 @@ def check( is_flag=True, help="Setup the experiment but skip any long running jobs (build, run...)", ) -def cli( - step: str, geos_base_directory: str, action: str, artifact: str, setup_only: bool -): +@click.pass_context +def pipe(ctx, step: str, action: str, artifact: str, setup_only: bool): + geos_base_directory = ctx.obj["geos_base_directory"] + # Validation step if step not in TaskBase.step_options(): raise click.BadArgumentUsage( @@ -399,5 +427,91 @@ def cli( RuntimeError(f"Coding error. Step {step} unknown on HS cli") +@cli.command() +@click.argument("resolution") +@click.option( + "--setup_only", + is_flag=True, + help="Setup the experiment but skip any long running jobs (build, run...)", +) +@click.pass_context +def benchmark( + ctx, + resolution: str, + setup_only: bool, +): + geos_base_directory = ctx.obj["geos_base_directory"] + geos_install_directory = f"{geos_base_directory}/install" + + # Rebuild the basics + experience_name = "geos_hs" + HS = HeldSuarez() + config = get_config(experience_name) + + experiment_directory, prolog_scripts = HS.prepare_experiment( + input_directory=config["input"][resolution], + resolution=resolution, + geos_directory=geos_base_directory, + geos_install_directory=geos_install_directory, + executable_name=HS.executable_name, + ) + + print( + "Running HeldSuarez benchmark:\n" + f" resolution: {resolution}\n" + f" setup only: {setup_only}\n" + f" experiment dir: {experiment_directory}" + ) + + # Run 1 timestep for cache build + HS.simulate( + experiment_directory=experiment_directory, + executable_name=HS.executable_name, + prolog_scripts=prolog_scripts, + slurm_config=SlurmConfiguration.one_half_nodes_GPU( + output="benchmark.cache.dacegpu.%t.out" + ), + gtfv3_config=GTFV3Config.dace_gpu_32_bit_BAR(), + setup_script=HS._setup_1ts_1node_gtfv3(experiment_directory), + setup_only=setup_only, + ) + HS.simulate( + experiment_directory=experiment_directory, + executable_name=HS.executable_name, + prolog_scripts=prolog_scripts, + slurm_config=SlurmConfiguration.one_half_nodes_GPU( + output="benchmark.1day.dacegpu.%t.out" + ), + gtfv3_config=GTFV3Config.dace_gpu_32_bit_BAR(dacemode="Run"), + setup_script=HS._setup_1day_1node_gtfv3(experiment_directory), + setup_only=setup_only, + ) + HS.simulate( + experiment_directory=experiment_directory, + executable_name=HS.executable_name, + prolog_scripts=prolog_scripts, + slurm_config=SlurmConfiguration.one_half_nodes_CPU( + output="benchmark.1day.fortran.%t.out" + ), + gtfv3_config=GTFV3Config.dace_gpu_32_bit_BAR(), + setup_script=HS._setup_1day_1node_fortran(experiment_directory), + setup_only=setup_only, + ) + + # Report + bench_raw_data = [] + bench_raw_data.append( + parse_geos_log(f"{experiment_directory}/benchmark.1day.dacegpu.0.out") + ) + bench_raw_data.append( + parse_geos_log(f"{experiment_directory}/benchmark.1day.fortran.0.out") + ) + + benchmark_report = report(bench_raw_data) + print(benchmark_report) + with open("report_benchmark.out", "w") as f: + f.write(str(benchmark_report)) + + if __name__ == "__main__": cli() From b042d41536fc7546ca102eeadd44c0926ee0c953 Mon Sep 17 00:00:00 2001 From: Florian Deconinck Date: Wed, 2 Aug 2023 14:59:09 -0400 Subject: [PATCH 09/12] Yaml lint --- .github/workflows/discover-oncomment.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/discover-oncomment.yml b/.github/workflows/discover-oncomment.yml index 55d2543..6015fba 100644 --- a/.github/workflows/discover-oncomment.yml +++ b/.github/workflows/discover-oncomment.yml @@ -88,7 +88,7 @@ jobs: comment-results: #Run the job everytime _but_ when upstream has been cancelled - if: success() || failure() + if: success() || failure() needs: [run-discover-ondemand, bot] runs-on: ubuntu-latest steps: From 7de448551fca6f8570507757997e487614dc1e4f Mon Sep 17 00:00:00 2001 From: Florian Deconinck Date: Wed, 2 Aug 2023 16:13:25 -0400 Subject: [PATCH 10/12] Remove unused check --- geosongpu_ci/pipeline/aquaplanet.py | 12 ------------ geosongpu_ci/pipeline/held_suarez.py | 1 - 2 files changed, 13 deletions(-) diff --git a/geosongpu_ci/pipeline/aquaplanet.py b/geosongpu_ci/pipeline/aquaplanet.py index e874d96..757fe00 100644 --- a/geosongpu_ci/pipeline/aquaplanet.py +++ b/geosongpu_ci/pipeline/aquaplanet.py @@ -27,10 +27,6 @@ def _replace_in_file(url: str, text_to_replace: str, new_text: str): @Registry.register class Aquaplanet(TaskBase): - def __init__(self, skip_metadata=False) -> None: - super().__init__(skip_metadata) - self._gcm_run_experiment = None - def prepare_experiment( self, input_directory: str, @@ -56,7 +52,6 @@ def prepare_experiment( new_text=f"setenv EXPDIR {experiment_directory}", ) - self._gcm_run_experiment = experiment_directory return experiment_directory def simulate( @@ -67,13 +62,6 @@ def simulate( log_pattern: str, fv3_dacemode: str, ): - # Check we have gcm_run prepared correctly - if not self._gcm_run_experiment != experiment_directory: - raise RuntimeError( - f"Aquaplanet setup for experiment {self._gcm_run_experiment} " - f" instead of {experiment_directory}. Abort simulation." - ) - # Execute caching step on 6 GPUs ShellScript("temporary_setup").write( shell_commands=[ diff --git a/geosongpu_ci/pipeline/held_suarez.py b/geosongpu_ci/pipeline/held_suarez.py index 109a149..8b319ad 100644 --- a/geosongpu_ci/pipeline/held_suarez.py +++ b/geosongpu_ci/pipeline/held_suarez.py @@ -16,7 +16,6 @@ from typing import Dict, Any import shutil import os -import dataclasses import glob From c7ad1084b51e7d2f2afcb5145ae820e4a7b697ce Mon Sep 17 00:00:00 2001 From: Florian Deconinck Date: Thu, 3 Aug 2023 11:50:53 -0400 Subject: [PATCH 11/12] Fix geos log parser for AQ --- .../tools/benchmark/geos_log_parser.py | 56 ++++++++++--------- geosongpu_ci/tools/benchmark/report.py | 18 +++++- 2 files changed, 46 insertions(+), 28 deletions(-) diff --git a/geosongpu_ci/tools/benchmark/geos_log_parser.py b/geosongpu_ci/tools/benchmark/geos_log_parser.py index 87f1aca..a1dd68e 100644 --- a/geosongpu_ci/tools/benchmark/geos_log_parser.py +++ b/geosongpu_ci/tools/benchmark/geos_log_parser.py @@ -15,6 +15,7 @@ def _grep( exclude_pattern: Optional[bool] = False, start_pattern: Optional[str] = None, end_pattern: Optional[str] = None, + expected: Optional[bool] = True, ) -> Iterable[str]: results = [] check = start_pattern is None @@ -24,11 +25,13 @@ def _grep( check = True if end_pattern and end_pattern in line: break - if pattern in line: + if check and pattern in line: if exclude_pattern and pattern in line: line = "".join(line.split(pattern)[1:]) if line != "": results.append(line) + if expected and results == []: + raise RuntimeError(f"Expecting {pattern} to be found") return results @@ -45,12 +48,14 @@ def parse_geos_log(filename: str) -> BenchmarkRawData: benchmark = BenchmarkRawData() # Get backend - is_gtfv3 = _grep(filename, "RUN_GTFV3:1", exclude_pattern=True) != [] + is_gtfv3 = ( + _grep(filename, "RUN_GTFV3:1", exclude_pattern=True, expected=False) != [] + ) if not is_gtfv3: benchmark.backend = "fortran" else: backend_pattern = "backend: " - grepped = _grep(filename, backend_pattern, exclude_pattern=True) + grepped = _grep(filename, backend_pattern, exclude_pattern=True, expected=False) if grepped == []: benchmark.backend = "gtfv3 (details failed to parse)" else: @@ -59,15 +64,13 @@ def parse_geos_log(filename: str) -> BenchmarkRawData: # Get timings of FV if is_gtfv3: - interface_timings = _grep(filename, "fv3_interf", exclude_pattern=True) + interface_timings = _grep(filename, "0 , geos_gtfv3", exclude_pattern=True) benchmark.fv_dyncore_timings = _extract_numerics(interface_timings) dycore_timings = _grep(filename, "] Run...", exclude_pattern=True) benchmark.inner_dycore_timings = _extract_numerics(dycore_timings) else: - dycore_timings = _grep( - filename, "fv_dynamics: time taken", exclude_pattern=True - ) + dycore_timings = _grep(filename, "0: fv_dynamics", exclude_pattern=True) benchmark.fv_dyncore_timings = _extract_numerics(dycore_timings) # Get setup (grid, nodes) @@ -92,23 +95,23 @@ def parse_geos_log(filename: str) -> BenchmarkRawData: benchmark.node_setup = (NX, int(NY / 6), int(NX * (NY / 6) * 6)) # Get details FV Grid Comp timings - dyn_profiler_entry = "profiler: Times for component " - superdyn_profiler_exit = "profiler: Times for component " + dyn_profiler_entry = "Times for component " + superdyn_profiler_exit = "Times for component " dyn_profiler_patterns = [ - ("profiler: ------RUN", "RUN", ""), - ("profiler: --------DYN_ANA", "DYN_ANA", "RUN"), - ("profiler: --------DYN_PROLOGUE", "DYN_PROLOGUE", "RUN"), - ("profiler: --------DYN_CORE", "DYN_CORE", "RUN"), - ("profiler: ----------PROLOGUE", "PROLOGUE", "DYN_CORE"), - ("profiler: ----------PULL_TRACERS", "PULL_TRACERS", "DYN_CORE"), - ("profiler: ----------STATE_TO_FV", "STATE_TO_FV", "DYN_CORE"), - ("profiler: ----------MAKE_NH", "MAKE_NH", "DYN_CORE"), - ("profiler: ----------MASS_FIX", "MASS_FIX", "DYN_CORE"), - ("profiler: ----------FV_DYNAMICS", "FV_DYNAMICS", "DYN_CORE"), - ("profiler: ----------PUSH_TRACERS", "PUSH_TRACERS", "DYN_CORE"), - ("profiler: ----------FV_TO_STATE", "FV_TO_STATE", "DYN_CORE"), - ("profiler: --------DYN_EPILOGUE", "DYN_EPILOGUE", "RUN"), - ("profiler: ------RUN2", "RUN2", ""), + ("------RUN", "RUN", ""), + ("--------DYN_ANA", "DYN_ANA", "RUN"), + ("--------DYN_PROLOGUE", "DYN_PROLOGUE", "RUN"), + ("--------DYN_CORE", "DYN_CORE", "RUN"), + ("----------PROLOGUE", "PROLOGUE", "DYN_CORE"), + ("----------PULL_TRACERS", "PULL_TRACERS", "DYN_CORE"), + ("----------STATE_TO_FV", "STATE_TO_FV", "DYN_CORE"), + ("----------MAKE_NH", "MAKE_NH", "DYN_CORE"), + ("----------MASS_FIX", "MASS_FIX", "DYN_CORE"), + ("----------FV_DYNAMICS", "FV_DYNAMICS", "DYN_CORE"), + ("----------PUSH_TRACERS", "PUSH_TRACERS", "DYN_CORE"), + ("----------FV_TO_STATE", "FV_TO_STATE", "DYN_CORE"), + ("--------DYN_EPILOGUE", "DYN_EPILOGUE", "RUN"), + ("------RUN2", "RUN2", ""), ] for pattern, shortname, parent in dyn_profiler_patterns: measures = _extract_numerics( @@ -117,6 +120,7 @@ def parse_geos_log(filename: str) -> BenchmarkRawData: pattern, start_pattern=dyn_profiler_entry, end_pattern=superdyn_profiler_exit, + expected=False, ) ) if measures != []: @@ -125,10 +129,8 @@ def parse_geos_log(filename: str) -> BenchmarkRawData: ) # Model throughput - gloabl_profiler_entry = "profiler: Model Throughput" - global_run_time = _grep( - filename, "profiler: --Run", start_pattern=gloabl_profiler_entry - ) + gloabl_profiler_entry = "Model Throughput" + global_run_time = _grep(filename, "--Run", start_pattern=gloabl_profiler_entry) benchmark.global_run_time = _extract_numerics(global_run_time)[1] return benchmark diff --git a/geosongpu_ci/tools/benchmark/report.py b/geosongpu_ci/tools/benchmark/report.py index 1461a12..d8b44aa 100644 --- a/geosongpu_ci/tools/benchmark/report.py +++ b/geosongpu_ci/tools/benchmark/report.py @@ -1,8 +1,10 @@ +import click import itertools -from typing import Any, List +from typing import Any, List, Iterable from dataclasses import dataclass, field import numpy as np from geosongpu_ci.tools.benchmark.raw_data import BenchmarkRawData +from geosongpu_ci.tools.benchmark.geos_log_parser import parse_geos_log @dataclass @@ -150,3 +152,17 @@ def report(raw_data: List[BenchmarkRawData]) -> BenchmarkReport: report.per_backend_per_metric_comparison.append({REPORT_TIME_KEY: time_report}) return report + + +@click.command() +@click.argument("geos_logs", nargs=-1) +def cli(geos_logs: Iterable[str]): + benchmark_raw_data = [] + for log in geos_logs: + benchmark_raw_data.append(parse_geos_log(log)) + r = report(benchmark_raw_data) + print(r) + + +if __name__ == "__main__": + cli() From ce13824d5545214d750d5c80e1688512db90242a Mon Sep 17 00:00:00 2001 From: Florian Deconinck Date: Thu, 3 Aug 2023 12:09:16 -0400 Subject: [PATCH 12/12] setup_only for AQ setup_only for Geos CMake but do not build --- geosongpu_ci/pipeline/aquaplanet.py | 56 ++++++++++++++++------------- geosongpu_ci/pipeline/geos.py | 35 +++++++++++------- 2 files changed, 54 insertions(+), 37 deletions(-) diff --git a/geosongpu_ci/pipeline/aquaplanet.py b/geosongpu_ci/pipeline/aquaplanet.py index 757fe00..b26ae76 100644 --- a/geosongpu_ci/pipeline/aquaplanet.py +++ b/geosongpu_ci/pipeline/aquaplanet.py @@ -7,6 +7,7 @@ from geosongpu_ci.utils.shell import ShellScript from geosongpu_ci.tools.benchmark.geos_log_parser import parse_geos_log from geosongpu_ci.tools.benchmark.report import report +from geosongpu_ci.utils.progress import Progress from typing import Dict, Any import glob import os @@ -61,32 +62,37 @@ def simulate( cap_rc: str, log_pattern: str, fv3_dacemode: str, + setup_only: bool = False, ): - # Execute caching step on 6 GPUs - ShellScript("temporary_setup").write( - shell_commands=[ - f"cd {experiment_directory}", - f"./{setup_sh}", - f"cp -f {cap_rc} CAP.rc", - ] - ).execute(remove_after_execution=True) - _replace_in_file( - url=f"{experiment_directory}/gcm_run.j", - text_to_replace="#SBATCH --output=slurm-%j-%x.out", - new_text=f"#SBATCH --output={log_pattern}", - ) - _replace_in_file( - url=f"{experiment_directory}/gcm_run.j", - text_to_replace="setenv FV3_DACEMODE BuildAndRun", - new_text=f"setenv FV3_DACEMODE {fv3_dacemode}", - ) - ShellScript("run_sbatch_gpu").write( - shell_commands=[ - f"cd {experiment_directory}", - f"export CUPY_CACHE_DIR={experiment_directory}/.cupy", - "sbatch gcm_run.j", - ] - ).execute(sbatch=True) + executor_name = f"{setup_sh.replace('sh', '')}_{cap_rc}.sbatch" + executor_name.replace("/", "-") # sanitize + if setup_only: + ShellScript("temporary_setup").write( + shell_commands=[ + f"cd {experiment_directory}", + f"./{setup_sh}", + f"cp -f {cap_rc} CAP.rc", + ] + ).execute(remove_after_execution=True) + _replace_in_file( + url=f"{experiment_directory}/gcm_run.j", + text_to_replace="#SBATCH --output=slurm-%j-%x.out", + new_text=f"#SBATCH --output={log_pattern}", + ) + _replace_in_file( + url=f"{experiment_directory}/gcm_run.j", + text_to_replace="setenv FV3_DACEMODE BuildAndRun", + new_text=f"setenv FV3_DACEMODE {fv3_dacemode}", + ) + ShellScript(executor_name).write( + shell_commands=[ + f"cd {experiment_directory}", + f"export CUPY_CACHE_DIR={experiment_directory}/.cupy", + "sbatch gcm_run.j", + ] + ).execute(sbatch=True) + else: + Progress.log(f"= = = Skipping {executor_name} = = =") def run_action( self, diff --git a/geosongpu_ci/pipeline/geos.py b/geosongpu_ci/pipeline/geos.py index c66cc86..bc30ffb 100644 --- a/geosongpu_ci/pipeline/geos.py +++ b/geosongpu_ci/pipeline/geos.py @@ -104,6 +104,11 @@ def run_action( do_mepo=True, ) + set_env_script = set_python_environment( + geos_directory=f"{env.CI_WORKSPACE}/geos", + geos_install_dir=f"{env.CI_WORKSPACE}/geos/install", + ) + # Build GEOS with GTFV3 interface cmake_cmd = "cmake .." cmake_cmd += " -DBASEDIR=$BASEDIR/Linux" @@ -114,16 +119,7 @@ def run_action( if env.experiment_name == GEOS_AQ_KEY: cmake_cmd += " -DAQUAPLANET=ON" - set_env_script = set_python_environment( - geos_directory=f"{env.CI_WORKSPACE}/geos", - geos_install_dir=f"{env.CI_WORKSPACE}/geos/install", - ) - - build_cmd = ( - f"{one_gpu_srun(log='build.out', time='01:30:00')} make -j48 install" - ) - script = ShellScript("build_geos") - script.write( + ShellScript("CMake_geos").write( modules=[], env_to_source=[ set_env_script, @@ -138,13 +134,28 @@ def run_action( "mkdir $TMP", "echo $TMP", cmake_cmd, + ], + ) + + build_cmd = ( + f"{one_gpu_srun(log='build.out', time='01:30:00')} make -j48 install" + ) + make_script = ShellScript("make_geos") + make_script.write( + modules=[], + env_to_source=[ + set_env_script, + ], + shell_commands=[ + "cd geos/build", + f"export TMP={env.CI_WORKSPACE}/geos/build/tmp", build_cmd, ], ) if not env.setup_only: - script.execute(script) + make_script.execute() else: - Progress.log(f"= = = Skipping {script.name} = = =") + Progress.log(f"= = = Skipping {make_script.name} = = =") _epilogue(env)