diff --git a/payu/models/__init__.py b/payu/models/__init__.py index c3b4e971..d1c34391 100644 --- a/payu/models/__init__.py +++ b/payu/models/__init__.py @@ -11,6 +11,7 @@ from payu.models.mom6 import Mom6 from payu.models.nemo import Nemo from payu.models.oasis import Oasis +from payu.models.staged_cable import StagedCable from payu.models.test import Test from payu.models.um import UnifiedModel from payu.models.ww3 import WW3 @@ -38,6 +39,7 @@ 'mom6': Mom6, 'qgcm': Qgcm, 'cable': Cable, + 'staged_cable': StagedCable, # Default 'default': Model, diff --git a/payu/models/staged_cable.py b/payu/models/staged_cable.py new file mode 100644 index 00000000..9b6b8158 --- /dev/null +++ b/payu/models/staged_cable.py @@ -0,0 +1,289 @@ +"""payu.models.staged_cable + ================ + + Driver interface to CABLE-POP_TRENDY branch + + :copyright: Copyright 2021 Marshall Ward, see AUTHORS for details + :license: Apache License, Version 2.0, see LICENSE for details +""" + +# Standard Library +import os +import shutil +import itertools + +# Extensions +import f90nml +import yaml + +# Local +from payu.models.model import Model +from payu.fsops import mkdir_p + + +def deep_update(d_1, d_2): + """Deep update of namelists.""" + for key, value in d_2.items(): + if isinstance(value, dict): + # Nested struct + if key in d_1: + # If the master namelist contains the key, then recursively + # apply + deep_update(d_1[key], d_2[key]) + else: + # Otherwise just set the value from the patch dict + d_1[key] = value + else: + # Is value, just override + d_1[key] = value + + +class StagedCable(Model): + """A driver for running staged CABLE spin-up configurations.""" + + def __init__(self, expt, name, config): + super(StagedCable, self).__init__(expt, name, config) + + self.model_type = 'staged_cable' + self.default_exec = 'cable' + + self.config_files = ['stage_config.yaml'] + self.optional_config_files = ['cable.nml', 'cru.nml', + 'luc.nml', 'met_names.nml'] + + self.configuration_log = {} + + if not os.path.isfile('configuration_log.yaml'): + # Build a new configuration log + self._build_new_configuration_log() + else: + # Read the current configuration log + self._read_configuration_log() + + # Now set the number of runs using the configuration_log + remaining_stages = len(self.configuration_log['queued_stages']) + print("Overriding the remaining number of runs according to the " + + "number of queued stages in the configuration log.") + os.environ['PAYU_N_RUNS'] = str(remaining_stages) + + def _build_new_configuration_log(self): + """Build a new configuration log for the first stage of the run.""" + + # Read the stage_config.yaml file + with open('stage_config.yaml', 'r') as stage_conf_f: + self.stage_config = yaml.safe_load(stage_conf_f) + + # On the first run, we need to read the 'stage_config.yaml' file. + cable_stages = self._prepare_configuration() + + # Build the configuration log + self.configuration_log['queued_stages'] = cable_stages + self.configuration_log['current_stage'] = '' + self.configuration_log['completed_stages'] = [] + + self._save_configuration_log() + + def _read_configuration_log(self): + """Read the existing configuration log.""" + with open('configuration_log.yaml') as conf_log_file: + self.configuration_log = yaml.safe_load(conf_log_file) + + def _prepare_configuration(self): + """Prepare the stages in the CABLE configuration.""" + + # Since Python3.7, dictionary order is guaranteed so we can read + # the entries in order without needing to supply an index + # We just want to populate cable_stages with the list of stages + # to run + cable_stages = [] + for stage_name, stage_opts in self.stage_config.items(): + # Check if stage is a multi-step or single step + if stage_name.startswith('multistep'): + # The multi-step stage can run each internal stage + # a different number of times. For example, a two + # step stage may ask for the first step (S1) 5 times, + # but the second step (S2) only 3 times. The stage + # looks like [S1, S2, S1, S2, S1, S2, S1, S1]. + # So what we need to do is first record the number + # of times each step is run + + # Use recipe from https://stackoverflow.com/questions/48199961 + # Turn the stages into lists of "count" length + steps = [[step_name] * stage_opts[step_name]['count'] + for step_name in stage_opts.keys()] + + cable_stages.extend( + [stage for stage in itertools.chain.from_iterable( + itertools.zip_longest(*steps) + ) if stage is not None] + ) + # Finish handling of multistep stage + + else: + # A single step stage, in general we only want to run this + # once, but check for the count anyway + cable_stages.extend([stage_name] * stage_opts['count']) + + # Finish handling of single step stage + return cable_stages + + def setup(self): + super(StagedCable, self).setup() + + # Prepare the namelists for the stage + stage_name = self._get_stage_name() + self._apply_stage_namelists(stage_name) + + # Make the logging directory + mkdir_p(os.path.join(self.work_path, "logs")) + + # Get the additional restarts from older restart dirs + self._get_further_restarts() + + # Make necessary adjustments to the configuration log + self._handle_configuration_log_setup() + + def _get_further_restarts(self): + """Get the restarts from stages further in the past where necessary.""" + + # Often we take restarts from runs which are not the most recent run as + # inputs for particular science modules, which means we have to extend + # the existing functionality around retrieving restarts. + + # We can't supercede the parent get_prior_restart_files, since the + # files returned by said function are prepended by + # self.prior_restart_path, which is not desirable in this instance. + + num_completed_stages = len(self.configuration_log['completed_stages']) + + for stage_number in reversed(range(num_completed_stages - 1)): + respath = os.path.join( + self.expt.archive_path, + f'restart{stage_number:03d}' + ) + for f_name in os.listdir(respath): + if os.path.isfile(os.path.join(respath, f_name)): + f_orig = os.path.join(respath, f_name) + f_link = os.path.join(self.work_init_path_local, f_name) + # Check whether a given link already exists in the + # manifest, so we don't write over a newer version of a + # restart + if f_link not in self.expt.manifest.manifests['restart']: + self.expt.manifest.add_filepath( + 'restart', + f_link, + f_orig, + self.copy_restarts + ) + + def set_model_pathnames(self): + super(StagedCable, self).set_model_pathnames() + + self.work_restart_path = os.path.join(self.work_path, 'restart') + self.work_output_path = os.path.join(self.work_path, 'outputs') + + def _get_stage_name(self): + """Get the name of the stage being prepared.""" + + if self.configuration_log['current_stage'] != '': + # If the current stage is a non-empty string, it means we exited + # during the running of the previous stage + stage_name = self.configuration_log['current_stage'] + else: + # Pop the stage from the list + stage_name = self.configuration_log['queued_stages'][0] + + # Ensure the directory exists + if not os.path.isdir(os.path.join(self.control_path, stage_name)): + errmsg = f"""Directory containing namelists for stage {stage_name} + does not exist.""" + raise FileNotFoundError(errmsg) + + return stage_name + + def _apply_stage_namelists(self, stage_name): + """Apply the stage namelists to the master namelists. + + The master namelist is the namelist that exists in the control + directory and the stage namelist exists within the directory for the + given stage. If a master version of a given namelist does not exist, + then the stage namelist is taken as is. + + Example: + . + ├── cable.nml + └── cable_stage +    ├── cable.nml +    └── luc.nml + + In this instance, the ```cable.nml``` for ```cable_stage``` would be + a merge of the top level ```cable.nml``` and + ```cable_stage/cable.nml``` (with the latter taking precedence) and + ```luc.nml``` is just ```cable_stage/luc.nml```. + """ + namelists = os.listdir(os.path.join(self.control_path, stage_name)) + + for namelist in namelists: + write_target = os.path.join(self.work_input_path, namelist) + stage_nml = os.path.join(self.control_path, stage_name, namelist) + + if os.path.isfile(os.path.join(self.control_path, namelist)): + # Instance where there is a master and stage namelist + with open(stage_nml) as stage_nml_f: + stage_namelist = f90nml.read(stage_nml_f) + + master_nml = os.path.join(self.control_path, namelist) + f90nml.patch(master_nml, stage_namelist, write_target) + else: + # Instance where there is only a stage namelist + shutil.copy(stage_nml, write_target) + + def _handle_configuration_log_setup(self): + """Make appropriate adjustments to the configuration log to reflect + that the setup of the stage is complete.""" + + if self.configuration_log['current_stage'] != '': + # If the current stage is a non-empty string, it means we exited + # during the running of the previous stage- leave as is + stage_name = self.configuration_log['current_stage'] + else: + # Normal case where we just archived a successful stage. + self.configuration_log['current_stage'] = \ + self.configuration_log['queued_stages'].pop(0) + + self._save_configuration_log() + + # Copy the log to the work directory + shutil.copy('configuration_log.yaml', self.work_input_path) + + def archive(self): + """Store model output to laboratory archive and update the +configuration log.""" + + # Move files from the restart directory within work to the archive + # restart directory. + for f in os.listdir(self.work_restart_path): + shutil.move(os.path.join(self.work_restart_path, f), + self.restart_path) + os.rmdir(self.work_restart_path) + + # Update the configuration log and save it to the working directory + completed_stage = self.configuration_log['current_stage'] + self.configuration_log['current_stage'] = '' + self.configuration_log['completed_stages'].append(completed_stage) + + self._save_configuration_log() + + if len(self.configuration_log["queued_stages"]) == 0: + # Configuration successfully completed + os.remove('configuration_log.yaml') + + super(StagedCable, self).archive() + + def collate(self): + pass + + def _save_configuration_log(self): + """Write the updated configuration log back to the staging area.""" + with open('configuration_log.yaml', 'w+') as config_log_f: + yaml.dump(self.configuration_log, config_log_f) diff --git a/test/common.py b/test/common.py index cbb6b9c9..0f30fc4f 100644 --- a/test/common.py +++ b/test/common.py @@ -137,7 +137,8 @@ def payu_setup(model_type=None, def write_config(config, path=config_path): with path.open('w') as file: - file.write(yaml.dump(config, default_flow_style=False)) + file.write(yaml.dump(config, default_flow_style=False, + sort_keys=False)) def make_exe(): diff --git a/test/models/test_staged_cable.py b/test/models/test_staged_cable.py new file mode 100644 index 00000000..ed7b56c6 --- /dev/null +++ b/test/models/test_staged_cable.py @@ -0,0 +1,172 @@ +import f90nml +import os +import pytest +import shutil + +import payu + +from test.common import cd +from test.common import tmpdir, ctrldir, labdir, expt_workdir, ctrldir_basename +from test.common import archive_dir +from test.common import write_config, write_metadata +from test.common import make_random_file, make_inputs, make_exe + +verbose = True + + +def setup_module(module): + """ + Put any test-wide setup code in here, e.g. creating test files + """ + if verbose: + print("setup_module module:%s" % module.__name__) + + # Should be taken care of by teardown, in case remnants lying around + try: + shutil.rmtree(tmpdir) + except FileNotFoundError: + pass + + try: + tmpdir.mkdir() + labdir.mkdir() + ctrldir.mkdir() + expt_workdir.mkdir(parents=True) + archive_dir.mkdir() + except Exception as e: + print(e) + + config = { + 'laboratory': 'lab', + 'jobname': 'testrun', + 'model': 'staged_cable', + 'exe': 'cable', + 'experiment': ctrldir_basename, + 'metadata': { + 'enable': False + } + } + write_config(config) + + stage1dir = ctrldir / 'stage_1' + stage1dir.mkdir() + stage2dir = ctrldir / 'stage_2' + stage2dir.mkdir() + + # Build a stage config for testing + stage_config = { + 'stage_1': {'count': 1}, + 'stage_2': {'count': 1}, + 'multistep_stage_1': { + 'stage_3': {'count': 3}, + 'stage_4': {'count': 1}, + }, + 'multistep_stage_2': { + 'stage_5': {'count': 1}, + 'stage_6': {'count': 2}, + }, + 'stage_7': {'count': 1} + } + + write_config(stage_config, ctrldir / 'stage_config.yaml') + + # Prepare a master namelist and a stage 1 namelist + master_nml = { + 'cablenml': { + 'option1': 1, + 'struct1': { + 'option2': 2, + 'option3': 3, + }, + 'option4': 4 + } + } + + with open(ctrldir / 'cable.nml', 'w') as master_nml_f: + f90nml.write(master_nml, master_nml_f) + + patch_nml = { + 'cablenml': { + 'option1': 10, + 'struct1': { + 'option2': 20, + 'option5': 50 + }, + 'option6': 60 + } + } + + with open(ctrldir / 'stage_1/cable.nml', 'w') as patch_nml_f: + f90nml.write(patch_nml, patch_nml_f) + + +def teardown_module(module): + """ + Put any test-wide teardown code in here, e.g. removing test outputs + """ + if verbose: + print("teardown_module module:%s" % module.__name__) + + try: + shutil.rmtree(tmpdir) + except Exception as e: + print(e) + + +def test_staged_cable(): + """ + Test the preparing and archiving of a cable_stage. + """ + + with cd(ctrldir): + lab = payu.laboratory.Laboratory(lab_path=str(labdir)) + expt = payu.experiment.Experiment(lab, reproduce=False) + model = expt.models[0] + + # Since we've called the initialiser, we should be able to inspect the + # stages immediately (through the configuration log) + expected_queued_stages = [ + 'stage_1', + 'stage_2', + 'stage_3', + 'stage_4', + 'stage_3', + 'stage_3', + 'stage_5', + 'stage_6', + 'stage_6', + 'stage_7'] + assert model.configuration_log['queued_stages'] == expected_queued_stages + + # Now prepare for a stage- should see changes in the configuration log + # and the patched namelist in the workdir + model.setup() + expected_current_stage = expected_queued_stages.pop(0) + assert model.configuration_log['current_stage'] == expected_current_stage + assert model.configuration_log['queued_stages'] == expected_queued_stages + + # Now check the namelist + expected_namelist = { + 'cablenml': { + 'option1': 10, + 'struct1': { + 'option2': 20, + 'option3': 3, + 'option5': 50 + }, + 'option4': 4, + 'option6': 60 + } + } + + with open(expt_workdir / 'cable.nml') as stage_nml_f: + stage_nml = f90nml.read(stage_nml_f) + + assert stage_nml == expected_namelist + + # Archive the stage and make sure the configuration log is correct + model.archive() + expected_comp_stages = [expected_current_stage] + expected_current_stage = '' + assert model.configuration_log['completed_stages'] == expected_comp_stages + assert model.configuration_log['current_stage'] == expected_current_stage