From f4af1ffaf49381a8a789faf77d75ee19ab89fb8e Mon Sep 17 00:00:00 2001 From: Lachlan Whyborn Date: Wed, 25 Sep 2024 16:00:50 +1000 Subject: [PATCH] Add staged CABLE driver for CABLE configurations (#461) * Add CABLE-POP driver * Support for multi-stage CABLE-POP spinups * Added CablePOP model import * Moved cablepop to staged_cable, now runs as desired. staged_cable driver works for running arbitrary staged CABLE spin-up configurations. * Removed old cablepop.py * PEP8 compliance * Apply suggestions from PR * Missed bracket * Add testing for the staged_cable model. * Apply some suggestions from Aidan and some corrections caught by testing * PEP8 compliance * Applied autopep8 aggressively, hopefully catches everything * Set the restart and output paths for the driver. * Minor adjustments to retrieving of restarts and cleanup. * Moved bracket and renamed test restart locations * Added a get_further_restarts function to retrieve older restarts The files retrieved by get_prior_restart_files are automatically prepended by the experiment restart directory, which is the most recent restart. Adding '..' to redirect to the older restart directories is also not an option, as the generated links will inherit the same redirection. Also handed situation where user only provides a stage namelist, with no corresponding master. * Remove now-superfluous tests * Minor change in setting up directories * Extended some comments for clarity * Restructure the setup stage and be more verbose in doc strings --- payu/models/__init__.py | 2 + payu/models/staged_cable.py | 289 +++++++++++++++++++++++++++++++ test/common.py | 3 +- test/models/test_staged_cable.py | 172 ++++++++++++++++++ 4 files changed, 465 insertions(+), 1 deletion(-) create mode 100644 payu/models/staged_cable.py create mode 100644 test/models/test_staged_cable.py diff --git a/payu/models/__init__.py b/payu/models/__init__.py index c3b4e971..d1c34391 100644 --- a/payu/models/__init__.py +++ b/payu/models/__init__.py @@ -11,6 +11,7 @@ from payu.models.mom6 import Mom6 from payu.models.nemo import Nemo from payu.models.oasis import Oasis +from payu.models.staged_cable import StagedCable from payu.models.test import Test from payu.models.um import UnifiedModel from payu.models.ww3 import WW3 @@ -38,6 +39,7 @@ 'mom6': Mom6, 'qgcm': Qgcm, 'cable': Cable, + 'staged_cable': StagedCable, # Default 'default': Model, diff --git a/payu/models/staged_cable.py b/payu/models/staged_cable.py new file mode 100644 index 00000000..9b6b8158 --- /dev/null +++ b/payu/models/staged_cable.py @@ -0,0 +1,289 @@ +"""payu.models.staged_cable + ================ + + Driver interface to CABLE-POP_TRENDY branch + + :copyright: Copyright 2021 Marshall Ward, see AUTHORS for details + :license: Apache License, Version 2.0, see LICENSE for details +""" + +# Standard Library +import os +import shutil +import itertools + +# Extensions +import f90nml +import yaml + +# Local +from payu.models.model import Model +from payu.fsops import mkdir_p + + +def deep_update(d_1, d_2): + """Deep update of namelists.""" + for key, value in d_2.items(): + if isinstance(value, dict): + # Nested struct + if key in d_1: + # If the master namelist contains the key, then recursively + # apply + deep_update(d_1[key], d_2[key]) + else: + # Otherwise just set the value from the patch dict + d_1[key] = value + else: + # Is value, just override + d_1[key] = value + + +class StagedCable(Model): + """A driver for running staged CABLE spin-up configurations.""" + + def __init__(self, expt, name, config): + super(StagedCable, self).__init__(expt, name, config) + + self.model_type = 'staged_cable' + self.default_exec = 'cable' + + self.config_files = ['stage_config.yaml'] + self.optional_config_files = ['cable.nml', 'cru.nml', + 'luc.nml', 'met_names.nml'] + + self.configuration_log = {} + + if not os.path.isfile('configuration_log.yaml'): + # Build a new configuration log + self._build_new_configuration_log() + else: + # Read the current configuration log + self._read_configuration_log() + + # Now set the number of runs using the configuration_log + remaining_stages = len(self.configuration_log['queued_stages']) + print("Overriding the remaining number of runs according to the " + + "number of queued stages in the configuration log.") + os.environ['PAYU_N_RUNS'] = str(remaining_stages) + + def _build_new_configuration_log(self): + """Build a new configuration log for the first stage of the run.""" + + # Read the stage_config.yaml file + with open('stage_config.yaml', 'r') as stage_conf_f: + self.stage_config = yaml.safe_load(stage_conf_f) + + # On the first run, we need to read the 'stage_config.yaml' file. + cable_stages = self._prepare_configuration() + + # Build the configuration log + self.configuration_log['queued_stages'] = cable_stages + self.configuration_log['current_stage'] = '' + self.configuration_log['completed_stages'] = [] + + self._save_configuration_log() + + def _read_configuration_log(self): + """Read the existing configuration log.""" + with open('configuration_log.yaml') as conf_log_file: + self.configuration_log = yaml.safe_load(conf_log_file) + + def _prepare_configuration(self): + """Prepare the stages in the CABLE configuration.""" + + # Since Python3.7, dictionary order is guaranteed so we can read + # the entries in order without needing to supply an index + # We just want to populate cable_stages with the list of stages + # to run + cable_stages = [] + for stage_name, stage_opts in self.stage_config.items(): + # Check if stage is a multi-step or single step + if stage_name.startswith('multistep'): + # The multi-step stage can run each internal stage + # a different number of times. For example, a two + # step stage may ask for the first step (S1) 5 times, + # but the second step (S2) only 3 times. The stage + # looks like [S1, S2, S1, S2, S1, S2, S1, S1]. + # So what we need to do is first record the number + # of times each step is run + + # Use recipe from https://stackoverflow.com/questions/48199961 + # Turn the stages into lists of "count" length + steps = [[step_name] * stage_opts[step_name]['count'] + for step_name in stage_opts.keys()] + + cable_stages.extend( + [stage for stage in itertools.chain.from_iterable( + itertools.zip_longest(*steps) + ) if stage is not None] + ) + # Finish handling of multistep stage + + else: + # A single step stage, in general we only want to run this + # once, but check for the count anyway + cable_stages.extend([stage_name] * stage_opts['count']) + + # Finish handling of single step stage + return cable_stages + + def setup(self): + super(StagedCable, self).setup() + + # Prepare the namelists for the stage + stage_name = self._get_stage_name() + self._apply_stage_namelists(stage_name) + + # Make the logging directory + mkdir_p(os.path.join(self.work_path, "logs")) + + # Get the additional restarts from older restart dirs + self._get_further_restarts() + + # Make necessary adjustments to the configuration log + self._handle_configuration_log_setup() + + def _get_further_restarts(self): + """Get the restarts from stages further in the past where necessary.""" + + # Often we take restarts from runs which are not the most recent run as + # inputs for particular science modules, which means we have to extend + # the existing functionality around retrieving restarts. + + # We can't supercede the parent get_prior_restart_files, since the + # files returned by said function are prepended by + # self.prior_restart_path, which is not desirable in this instance. + + num_completed_stages = len(self.configuration_log['completed_stages']) + + for stage_number in reversed(range(num_completed_stages - 1)): + respath = os.path.join( + self.expt.archive_path, + f'restart{stage_number:03d}' + ) + for f_name in os.listdir(respath): + if os.path.isfile(os.path.join(respath, f_name)): + f_orig = os.path.join(respath, f_name) + f_link = os.path.join(self.work_init_path_local, f_name) + # Check whether a given link already exists in the + # manifest, so we don't write over a newer version of a + # restart + if f_link not in self.expt.manifest.manifests['restart']: + self.expt.manifest.add_filepath( + 'restart', + f_link, + f_orig, + self.copy_restarts + ) + + def set_model_pathnames(self): + super(StagedCable, self).set_model_pathnames() + + self.work_restart_path = os.path.join(self.work_path, 'restart') + self.work_output_path = os.path.join(self.work_path, 'outputs') + + def _get_stage_name(self): + """Get the name of the stage being prepared.""" + + if self.configuration_log['current_stage'] != '': + # If the current stage is a non-empty string, it means we exited + # during the running of the previous stage + stage_name = self.configuration_log['current_stage'] + else: + # Pop the stage from the list + stage_name = self.configuration_log['queued_stages'][0] + + # Ensure the directory exists + if not os.path.isdir(os.path.join(self.control_path, stage_name)): + errmsg = f"""Directory containing namelists for stage {stage_name} + does not exist.""" + raise FileNotFoundError(errmsg) + + return stage_name + + def _apply_stage_namelists(self, stage_name): + """Apply the stage namelists to the master namelists. + + The master namelist is the namelist that exists in the control + directory and the stage namelist exists within the directory for the + given stage. If a master version of a given namelist does not exist, + then the stage namelist is taken as is. + + Example: + . + ├── cable.nml + └── cable_stage +    ├── cable.nml +    └── luc.nml + + In this instance, the ```cable.nml``` for ```cable_stage``` would be + a merge of the top level ```cable.nml``` and + ```cable_stage/cable.nml``` (with the latter taking precedence) and + ```luc.nml``` is just ```cable_stage/luc.nml```. + """ + namelists = os.listdir(os.path.join(self.control_path, stage_name)) + + for namelist in namelists: + write_target = os.path.join(self.work_input_path, namelist) + stage_nml = os.path.join(self.control_path, stage_name, namelist) + + if os.path.isfile(os.path.join(self.control_path, namelist)): + # Instance where there is a master and stage namelist + with open(stage_nml) as stage_nml_f: + stage_namelist = f90nml.read(stage_nml_f) + + master_nml = os.path.join(self.control_path, namelist) + f90nml.patch(master_nml, stage_namelist, write_target) + else: + # Instance where there is only a stage namelist + shutil.copy(stage_nml, write_target) + + def _handle_configuration_log_setup(self): + """Make appropriate adjustments to the configuration log to reflect + that the setup of the stage is complete.""" + + if self.configuration_log['current_stage'] != '': + # If the current stage is a non-empty string, it means we exited + # during the running of the previous stage- leave as is + stage_name = self.configuration_log['current_stage'] + else: + # Normal case where we just archived a successful stage. + self.configuration_log['current_stage'] = \ + self.configuration_log['queued_stages'].pop(0) + + self._save_configuration_log() + + # Copy the log to the work directory + shutil.copy('configuration_log.yaml', self.work_input_path) + + def archive(self): + """Store model output to laboratory archive and update the +configuration log.""" + + # Move files from the restart directory within work to the archive + # restart directory. + for f in os.listdir(self.work_restart_path): + shutil.move(os.path.join(self.work_restart_path, f), + self.restart_path) + os.rmdir(self.work_restart_path) + + # Update the configuration log and save it to the working directory + completed_stage = self.configuration_log['current_stage'] + self.configuration_log['current_stage'] = '' + self.configuration_log['completed_stages'].append(completed_stage) + + self._save_configuration_log() + + if len(self.configuration_log["queued_stages"]) == 0: + # Configuration successfully completed + os.remove('configuration_log.yaml') + + super(StagedCable, self).archive() + + def collate(self): + pass + + def _save_configuration_log(self): + """Write the updated configuration log back to the staging area.""" + with open('configuration_log.yaml', 'w+') as config_log_f: + yaml.dump(self.configuration_log, config_log_f) diff --git a/test/common.py b/test/common.py index 97d91416..2ef76b3b 100644 --- a/test/common.py +++ b/test/common.py @@ -140,7 +140,8 @@ def payu_setup(model_type=None, def write_config(config, path=config_path): with path.open('w') as file: - file.write(yaml.dump(config, default_flow_style=False)) + file.write(yaml.dump(config, default_flow_style=False, + sort_keys=False)) def make_exe(exe_name=None): diff --git a/test/models/test_staged_cable.py b/test/models/test_staged_cable.py new file mode 100644 index 00000000..ed7b56c6 --- /dev/null +++ b/test/models/test_staged_cable.py @@ -0,0 +1,172 @@ +import f90nml +import os +import pytest +import shutil + +import payu + +from test.common import cd +from test.common import tmpdir, ctrldir, labdir, expt_workdir, ctrldir_basename +from test.common import archive_dir +from test.common import write_config, write_metadata +from test.common import make_random_file, make_inputs, make_exe + +verbose = True + + +def setup_module(module): + """ + Put any test-wide setup code in here, e.g. creating test files + """ + if verbose: + print("setup_module module:%s" % module.__name__) + + # Should be taken care of by teardown, in case remnants lying around + try: + shutil.rmtree(tmpdir) + except FileNotFoundError: + pass + + try: + tmpdir.mkdir() + labdir.mkdir() + ctrldir.mkdir() + expt_workdir.mkdir(parents=True) + archive_dir.mkdir() + except Exception as e: + print(e) + + config = { + 'laboratory': 'lab', + 'jobname': 'testrun', + 'model': 'staged_cable', + 'exe': 'cable', + 'experiment': ctrldir_basename, + 'metadata': { + 'enable': False + } + } + write_config(config) + + stage1dir = ctrldir / 'stage_1' + stage1dir.mkdir() + stage2dir = ctrldir / 'stage_2' + stage2dir.mkdir() + + # Build a stage config for testing + stage_config = { + 'stage_1': {'count': 1}, + 'stage_2': {'count': 1}, + 'multistep_stage_1': { + 'stage_3': {'count': 3}, + 'stage_4': {'count': 1}, + }, + 'multistep_stage_2': { + 'stage_5': {'count': 1}, + 'stage_6': {'count': 2}, + }, + 'stage_7': {'count': 1} + } + + write_config(stage_config, ctrldir / 'stage_config.yaml') + + # Prepare a master namelist and a stage 1 namelist + master_nml = { + 'cablenml': { + 'option1': 1, + 'struct1': { + 'option2': 2, + 'option3': 3, + }, + 'option4': 4 + } + } + + with open(ctrldir / 'cable.nml', 'w') as master_nml_f: + f90nml.write(master_nml, master_nml_f) + + patch_nml = { + 'cablenml': { + 'option1': 10, + 'struct1': { + 'option2': 20, + 'option5': 50 + }, + 'option6': 60 + } + } + + with open(ctrldir / 'stage_1/cable.nml', 'w') as patch_nml_f: + f90nml.write(patch_nml, patch_nml_f) + + +def teardown_module(module): + """ + Put any test-wide teardown code in here, e.g. removing test outputs + """ + if verbose: + print("teardown_module module:%s" % module.__name__) + + try: + shutil.rmtree(tmpdir) + except Exception as e: + print(e) + + +def test_staged_cable(): + """ + Test the preparing and archiving of a cable_stage. + """ + + with cd(ctrldir): + lab = payu.laboratory.Laboratory(lab_path=str(labdir)) + expt = payu.experiment.Experiment(lab, reproduce=False) + model = expt.models[0] + + # Since we've called the initialiser, we should be able to inspect the + # stages immediately (through the configuration log) + expected_queued_stages = [ + 'stage_1', + 'stage_2', + 'stage_3', + 'stage_4', + 'stage_3', + 'stage_3', + 'stage_5', + 'stage_6', + 'stage_6', + 'stage_7'] + assert model.configuration_log['queued_stages'] == expected_queued_stages + + # Now prepare for a stage- should see changes in the configuration log + # and the patched namelist in the workdir + model.setup() + expected_current_stage = expected_queued_stages.pop(0) + assert model.configuration_log['current_stage'] == expected_current_stage + assert model.configuration_log['queued_stages'] == expected_queued_stages + + # Now check the namelist + expected_namelist = { + 'cablenml': { + 'option1': 10, + 'struct1': { + 'option2': 20, + 'option3': 3, + 'option5': 50 + }, + 'option4': 4, + 'option6': 60 + } + } + + with open(expt_workdir / 'cable.nml') as stage_nml_f: + stage_nml = f90nml.read(stage_nml_f) + + assert stage_nml == expected_namelist + + # Archive the stage and make sure the configuration log is correct + model.archive() + expected_comp_stages = [expected_current_stage] + expected_current_stage = '' + assert model.configuration_log['completed_stages'] == expected_comp_stages + assert model.configuration_log['current_stage'] == expected_current_stage