Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add staged CABLE driver for CABLE configurations #461

Merged
merged 20 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 103 additions & 47 deletions payu/models/staged_cable.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""payu.models.cable
"""payu.models.staged_cable
================

Driver interface to CABLE-POP_TRENDY branch
Expand All @@ -10,22 +10,31 @@
# Standard Library
import os
import shutil
import itertools

# Extensions
import f90nml
import yaml
# import xarray

# Local
from payu.models.model import Model
from payu.fsops import mkdir_p


def deep_update(d_1, d_2):
"""Deep update of namelists."""
for key, value in d_2.items():
if isinstance(value, dict):
deep_update(d_1[key], d_2[key])
# Nested struct
if key in d_1:
# If the master namelist contains the key, then recursively
# apply
deep_update(d_1[key], d_2[key])
else:
# Otherwise just set the value from the patch dict
d_1[key] = value
else:
# Is value, just override
d_1[key] = value


Expand All @@ -41,6 +50,7 @@ def __init__(self, expt, name, config):
self.config_files = ['stage_config.yaml']
self.optional_config_files = ['cable.nml', 'cru.nml',
'luc.nml', 'met_names.nml']

self.configuration_log = {}

# Read the stage_config.yaml file
Expand All @@ -54,14 +64,10 @@ def __init__(self, expt, name, config):
# Read the current configuration log
self._read_configuration_log()

# Add the restart directories to inputs
num_completed_stages = len(self.configuration_log['completed_stages'])
for restart_dir in reversed(range(num_completed_stages)):
config['input'].append(os.path.realpath(
f'archive/output{restart_dir:03d}/restart'))

# Now set the number of runs using the configuration_log
remaining_stages = len(self.configuration_log['queued_stages'])
print("Overriding the remaining number of runs according to the " +
"number of queued stages in the configuration log.")
os.environ['PAYU_N_RUNS'] = str(remaining_stages)

def _build_new_configuration_log(self):
Expand Down Expand Up @@ -101,41 +107,78 @@ def _prepare_configuration(self):
# looks like [S1, S2, S1, S2, S1, S2, S1, S1].
# So what we need to do is first record the number
# of times each step is run
step_names = []
step_counts = []
for step_name, step_opts in stage_opts.items():
step_names.append(step_name)
step_counts.append(step_opts['count'])

# Now iterate to the maximum number of steps
for step in range(max(step_counts)):
for step_id, _ in enumerate(step_names):
if step_counts[step_id] > step:
cable_stages.append(step_names[step_id])
# Finish handling of multistep stage

# Use recipe from https://stackoverflow.com/questions/48199961
# Turn the stages into lists of "count" length
steps = [[step_name] * stage_opts[step_name]['count']
for step_name in stage_opts.keys()]

cable_stages.extend(
[stage for stage in itertools.chain.from_iterable(
itertools.zip_longest(*steps)
) if stage is not None]
)
# Finish handling of multistep stage

else:
# A single step stage, in general we only want to run this
# once, but check for the count anyway
for _ in range(stage_opts['count']):
cable_stages.append(stage_name)
cable_stages.extend([stage_name] * stage_opts['count'])

# Finish handling of single step stage
# Finish handling of single step stage
return cable_stages

def setup(self):
super(StagedCable, self).setup()

# Directories required by CABLE for outputs
os.makedirs(os.path.join(self.work_output_path, 'logs'),
exist_ok=True)
os.makedirs(os.path.join(self.work_output_path, 'restart'),
exist_ok=True)
os.makedirs(os.path.join(self.work_output_path, 'outputs'),
exist_ok=True)
# Make the logging directory
mkdir_p(os.path.join(self.work_path, "logs"))

# Build the namelists for the stage
self._prepare_stage()

# Get the additional restarts from older restart dirs
self._get_further_restarts()

def _get_further_restarts(self):
"""Get the restarts from stages further in the past where necessary."""

# Often we take restarts from runs which are not the most recent run as
# inputs for particular science modules, which means we have to extend
# the existing functionality around retrieving restarts.

# We can't supercede the parent get_prior_restart_files, since the
# files returned by said function are prepended by
# self.prior_restart_path, which is not desirable in this instance.

num_completed_stages = len(self.configuration_log['completed_stages'])

for stage_number in reversed(range(num_completed_stages - 1)):
respath = os.path.join(
self.expt.archive_path,
f'restart{stage_number:03d}'
)
for f_name in os.listdir(respath):
if os.path.isfile(os.path.join(respath, f_name)):
f_orig = os.path.join(respath, f_name)
f_link = os.path.join(self.work_init_path_local, f_name)
# Check whether a given link already exists in the
# manifest, so we don't write over a newer version of a
# restart
if f_link not in self.expt.manifest.manifests['restart']:
self.expt.manifest.add_filepath(
'restart',
f_link,
f_orig,
self.copy_restarts
)

def set_model_pathnames(self):
super(StagedCable, self).set_model_pathnames()

self.work_restart_path = os.path.join(self.work_path, 'restart')
self.work_output_path = os.path.join(self.work_path, 'outputs')

def _prepare_stage(self):
"""Apply the stage namelist to the master namelist."""

Expand All @@ -152,7 +195,7 @@ def _prepare_stage(self):
yaml.dump(self.configuration_log, conf_log_f)

Whyborn marked this conversation as resolved.
Show resolved Hide resolved
# Ensure the directory exists
if not os.path.isdir(stage_name):
if not os.path.isdir(os.path.join(self.control_path, stage_name)):
errmsg = f"""Directory containing namelists for stage {stage_name}
does not exist."""
raise FileNotFoundError(errmsg)
Expand All @@ -165,41 +208,54 @@ def _prepare_stage(self):

def _apply_stage_namelists(self, stage_name):
"""Apply the stage namelists to the master namelists."""
namelists = os.listdir(stage_name)
namelists = os.listdir(os.path.join(self.control_path, stage_name))

for namelist in namelists:
with open(namelist, 'r') as master_nml_f:
master_namelist = f90nml.read(master_nml_f)

with open(os.path.join(stage_name, namelist), 'r') as stage_nml_f:
stage_namelist = f90nml.read(stage_nml_f)

deep_update(master_namelist, stage_namelist)

# Write the namelist to the work directory
master_namelist.write(os.path.join(self.work_input_path, namelist),
force=True)
write_target = os.path.join(self.work_input_path, namelist)
stage_nml = os.path.join(self.control_path, stage_name, namelist)

if os.path.isfile(os.path.join(self.control_path, namelist)):
# Usually, there will be a master version of a namelist and the
# stage version, in which case we need to patch the stage
# namelist over the master namelist.
with open(stage_nml) as stage_nml_f:
Whyborn marked this conversation as resolved.
Show resolved Hide resolved
stage_namelist = f90nml.read(stage_nml_f)

master_nml = os.path.join(self.control_path, namelist)
f90nml.patch(master_nml, stage_namelist, write_target)
else:
# But it's also possible to just provide a stage namelist, in
# which case we just copy it into the work directory as is
shutil.copy(stage_nml, write_target)
Whyborn marked this conversation as resolved.
Show resolved Hide resolved

def archive(self):
"""Store model output to laboratory archive and update the
configuration log."""

# Move files from the restart directory within work to the archive
# restart directory.
for f in os.listdir(self.work_restart_path):
shutil.move(os.path.join(self.work_restart_path, f),
self.restart_path)
os.rmdir(self.work_restart_path)

# Update the configuration log and save it to the working directory
completed_stage = self.configuration_log['current_stage']
self.configuration_log['current_stage'] = ''
self.configuration_log['completed_stages'].append(completed_stage)

self.save_configuration_log()
self._save_configuration_log()

if int(os.environ['PAYU_N_RUNS']) == 1:
if len(self.configuration_log["queued_stages"]) == 0:
# Configuration successfully completed
os.remove('configuration_log.yaml')

super(StagedCable, self).archive()

def collate(self):
pass

def save_configuration_log(self):
def _save_configuration_log(self):
"""Write the updated configuration log back to the staging area."""
with open('configuration_log.yaml', 'w+') as config_log_f:
yaml.dump(self.configuration_log, config_log_f)
3 changes: 2 additions & 1 deletion test/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ def payu_setup(model_type=None,

def write_config(config, path=config_path):
with path.open('w') as file:
file.write(yaml.dump(config, default_flow_style=False))
file.write(yaml.dump(config, default_flow_style=False,
sort_keys=False))


def make_exe():
Expand Down
Loading
Loading