Skip to content

Commit

Permalink
add checks of io layout to cmeps driver (#496)
Browse files Browse the repository at this point in the history

This PR implements checks of the processors selected in the nuopc.runconfig file for the access-om3 payu driver. This ensures that the processors requested for normal model operations and parallel IO are within range of the CPUs set in the payu config.yaml and each model "realm" uses processors for IO that are within the range of processors for that model component.

It adds tests for the min/max bounds of each parameter.

Contributes to COSIMA/access-om3#109

---------

Co-authored-by: minghang.li <[email protected]>
Co-authored-by: Aidan Heerdegen <[email protected]>
  • Loading branch information
3 people committed Sep 24, 2024
1 parent 51e60c1 commit eb8e8f8
Show file tree
Hide file tree
Showing 3 changed files with 460 additions and 38 deletions.
125 changes: 109 additions & 16 deletions payu/models/cesm_cmeps.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@

import os
import re
import errno
import glob
import shutil
import multiprocessing
from warnings import warn

from payu.fsops import mkdir_p, make_symlink
from payu.models.model import Model
from payu.models.fms import fms_collate
from payu.models.mom6 import mom6_add_parameter_files

NUOPC_CONFIG = "nuopc.runconfig"
NUOPC_RUNSEQ = "nuopc.runseq"

# Add as needed
component_info = {
"mom": {
Expand Down Expand Up @@ -75,8 +76,8 @@ def __init__(self, expt, name, config):
self.config_files = [
"drv_in",
"fd.yaml",
"nuopc.runconfig",
"nuopc.runseq"
NUOPC_CONFIG,
NUOPC_RUNSEQ
]

self.realms = ["ocn", "ice", "wav", "atm", "rof", "cpl"]
Expand All @@ -85,7 +86,7 @@ def __init__(self, expt, name, config):
self.rpointers = [] # To be inferred from nuopc.runconfig

def get_runconfig(self, path):
self.runconfig = Runconfig(os.path.join(path, 'nuopc.runconfig'))
self.runconfig = Runconfig(os.path.join(path, NUOPC_CONFIG))

def get_components(self):
"""Get components from nuopc.runconfig"""
Expand Down Expand Up @@ -156,15 +157,9 @@ def setup(self):

self.runconfig.set("ALLCOMP_attributes", "start_type", start_type)

# Check pelayout makes sense
all_realms = self.realms + ["glc", "lnd"]
cpucount = int(
self.expt.config.get('ncpus', multiprocessing.cpu_count())
)
for realm in all_realms:
ntasks = int(self.runconfig.get("PELAYOUT_attributes", f"{realm}_ntasks"))
assert cpucount >= ntasks, "Insufficient cpus for the pelayout in nuopc.runconfig"

# run checks on nuopc.runfig
self._setup_checks()

# Ensure that restarts will be written at the end of each run
stop_n = self.runconfig.get("CLOCK_attributes", "stop_n")
stop_option = self.runconfig.get("CLOCK_attributes", "stop_option")
Expand All @@ -174,7 +169,7 @@ def setup(self):
mkdir_p(os.path.join(self.work_path, 'log'))
mkdir_p(os.path.join(self.work_path, 'timing'))

self.runconfig.write(os.path.join(self.work_path, 'nuopc.runconfig'))
self.runconfig.write(os.path.join(self.work_path, NUOPC_CONFIG))

# Horrible hack to make a link to the mod_def.ww3 input in the work
# directory
Expand All @@ -190,6 +185,89 @@ def setup(self):
# TODO: copied this from other models. Surely we want to exit here or something
print('payu: error: Unable to find mod_def.ww3 file in input directory')

def _setup_checks(self):
# check pelayout fits within requested cpucount
cpucount = int(self.expt.config.get('ncpus'))
all_realms = self.realms
for realm in all_realms:
ntasks = int(self.runconfig.get("PELAYOUT_attributes", f"{realm}_ntasks"))
nthreads = int(self.runconfig.get("PELAYOUT_attributes", f"{realm}_nthreads"))
rootpe = int(self.runconfig.get("PELAYOUT_attributes", f"{realm}_rootpe"))
pestride = int(self.runconfig.get("PELAYOUT_attributes", f"{realm}_pestride"))

if nthreads < 1:
raise ValueError(f"The number of {realm}_nthreads ({nthreads}) in "
f"{NUOPC_CONFIG} must be at least 1.")
if nthreads > 1:
npes = nthreads*ntasks*pestride
# this is taken from
# https://github.com/ESCOMP/CMEPS/blob/5b7d76978e2fdc661ec2de4ba9834b985decadc6/cesm/driver/esm.F90#L1007
# the correct calculation might be (ntasks-1)*pestride*nthreads + nthreads
else:
npes = (ntasks-1)*pestride + 1

if (rootpe + npes) > cpucount:
raise ValueError(
f"Insufficient cpus for the {realm} pelayout in {NUOPC_CONFIG}"
)

# check iolayout
if realm == "cpl" or realm == "med":
comp = "MED" # med and cpl names are both used in runconfig
else:
comp = realm.upper()

if comp in self.runconfig.get_component_list():
io_section = f"{comp}_modelio"
nc_type = self.runconfig.get(io_section, "pio_typename")
ioroot = int(self.runconfig.get(io_section, "pio_root"))

if ioroot >= npes:
raise ValueError(
f"{io_section} pio_root exceeds available PEs (max: {npes - 1}) "
f"in {NUOPC_CONFIG}."
)

pio_async = self.runconfig.get(io_section, "pio_async_interface")
if pio_async == ".true.":
warn(
"Payu does not do consistency checks for asynchronous pio, as "
f"set in {io_section} of {NUOPC_CONFIG}. Consider adding them"
)
elif pio_async == ".false.":
match nc_type:
case "netcdf":
break
case "netcdf4p" | "pnetcdf":
niotasks = int(self.runconfig.get(io_section, "pio_numiotasks"))
iostride = int(self.runconfig.get(io_section, "pio_stride"))
if (niotasks <= 0):
warn(f"The pio_numiotasks for {io_section} in {NUOPC_CONFIG} is "
"not set, using model default")
if (iostride <= 0):
warn(f"The pio_stride for {io_section} in {NUOPC_CONFIG} is "
"not set, using model default")
if (all([
niotasks > 0,
iostride > 0,
(ioroot + (niotasks-1)*iostride) >= npes
])):
raise ValueError(
f"The iolayout for {io_section} in {NUOPC_CONFIG} is "
"requesting out of range cpus"
)
case "netcdf4c":
raise ValueError(
f"netcdf4c in {io_section} of {NUOPC_CONFIG} is deprecated, "
"use netcdf4p"
)
case _:
raise ValueError(
f"The iotype for {io_section} in {NUOPC_CONFIG} is "
'invalid, valid options: "netcdf", "pnetcdf", "netcdf4p"'
)
return True

def archive(self):
super().archive()

Expand Down Expand Up @@ -313,6 +391,21 @@ def get(self, section, variable, value=None):
else:
return value

def get_component_list(self):
"""
Get the `component_list`
"""
m = re.search(
r'component_list:\s*(.*)',
self.contents
)

if m is not None:
components_str = m.group(1).strip()
return components_str.split()
else:
return None

def set(self, section, variable, new_value):
"""
Overwrite the value of any existing variable
Expand Down
Loading

0 comments on commit eb8e8f8

Please sign in to comment.