Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Model log compression #532

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/source/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,21 @@ section for details.
POSIX filesystem.


Archiving
---------

``archiving``
On completion of a model run, payu moves model output, restart, and log
files from the temporary work area to the experiment archive directory.
The following settings control the steps taken during the archive step:
``enable`` (*Default:* ``True``)
Flag to enable/disable the archive step. If ``False`` all output, restart,
and log files will remain in the work directory, and any collation, post-processing,
and syncing will not be run.
``compress_logs`` (*Default:* ``True``)
Compress model log files into a tarball. Currently only implemented for CICE4.


Collation
---------

Expand Down
21 changes: 19 additions & 2 deletions payu/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ def setup(self, force_archive=False):

# Check restart pruning for valid configuration values and
# warns user if more restarts than expected would be pruned
if self.config.get('archive', True):
if self.archiving():
self.get_restarts_to_prune()

def run(self, *user_flags):
Expand Down Expand Up @@ -769,8 +769,25 @@ def run(self, *user_flags):
if run_script:
self.run_userscript(run_script)

def archiving(self):
"""
Determine whether to run archive step based on config.yaml settings.
Default to True when archive settings are absent.
"""
archive_config = self.config.get('archive', {})
if isinstance(archive_config, dict):
return archive_config.get('enable', True)

# Backwards compatibility for configs with boolean archive setting
elif isinstance(archive_config, bool):
return archive_config

else:
msg = "Incorrect format for archive settings in config.yaml"
raise RuntimeError(msg)

aidanheerdegen marked this conversation as resolved.
Show resolved Hide resolved
def archive(self, force_prune_restarts=False):
if not self.config.get('archive', True):
if not self.archiving():
print('payu: not archiving due to config.yaml setting.')
return

Expand Down
53 changes: 53 additions & 0 deletions payu/models/cice.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import sys
import shutil
import datetime
import re
import tarfile

# Extensions
import f90nml
Expand Down Expand Up @@ -51,6 +53,13 @@ def __init__(self, expt, name, config):

self.copy_inputs = False

# regex patterns for matching log files. When empty, no logs compressed
self.logs_to_compress = [r"iceout[0-9]{3}",
r"debug\.root\.[0-9]{2}",
r"ice_diag\.d",
r"ice_diag_out"]
self.log_tar_name = "logfiles.tar.gz"

def set_model_pathnames(self):
super(Cice, self).set_model_pathnames()

Expand Down Expand Up @@ -333,6 +342,50 @@ def archive(self, **kwargs):
else:
shutil.rmtree(self.work_input_path)

if self.compression_enabled():
self.compress_log_files()

def compression_enabled(self):
"""
Determine whether to run log compression based on config.yaml settings.
Default to True when 'compress_logs' setting is absent.
"""
archive_config = self.expt.config.get('archive', {})
if isinstance(archive_config, dict):
return archive_config.get('compress_logs', True)
else:
return True

aidanheerdegen marked this conversation as resolved.
Show resolved Hide resolved
def get_log_files(self):
"""
Find model log files in the work directory based on regex patterns
in self.logs_to_compress.

Returns
-------
log_files: list of paths to model log files.
"""
log_files = []
for filename in os.listdir(self.work_path):
if any((re.match(pattern, filename)
for pattern in self.logs_to_compress)):
log_files.append(os.path.join(self.work_path, filename))
return log_files

def compress_log_files(self):
"""
Compress model log files into tarball.
"""
log_files = self.get_log_files()
with tarfile.open(name=os.path.join(self.work_path, self.log_tar_name),
mode="w:gz") as tar:
for file in log_files:
tar.add(file, arcname=os.path.basename(file))

# Delete files after tarball is written
for file in log_files:
os.remove(file)

def collate(self):
pass

Expand Down
3 changes: 3 additions & 0 deletions payu/models/cice5.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def __init__(self, expt, name, config):
self.copy_restarts = True
self.copy_inputs = True

# Empty list means no log files will be compressed
self.logs_to_compress = []

def set_local_timestep(self, t_step):
dt = self.ice_in['setup_nml']['dt']
npt = self.ice_in['setup_nml']['npt']
Expand Down
147 changes: 127 additions & 20 deletions test/models/test_cice.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

import pytest
import f90nml
import tarfile
from pathlib import Path

import payu

Expand Down Expand Up @@ -124,41 +126,48 @@ def empty_workdir():
workdir.symlink_to(expt_workdir)

yield expt_workdir
shutil.rmtree(expt_workdir)
try:
shutil.rmtree(expt_workdir)
except FileNotFoundError:
pass
workdir.unlink()


@pytest.fixture
def cice_nml():
nml_path = os.path.join(ctrldir, CICE_NML_NAME)
f90nml.write(DEFAULT_CICE_NML, nml_path)

yield nml_path

# Cleanup
os.remove(nml_path)

aidanheerdegen marked this conversation as resolved.
Show resolved Hide resolved

# Important to test None case without separate ice history file
@pytest.fixture(params=[None,
{"icefields_nml": {"f_icy": "m"}},
{"icefields_nml": {"f_icy": "m", "f_new": "y"}}])
def cice_config_files(request):
def cice_history_nml(request):
"""
Write the default cice_in.nml namelist, and if included, separate ice
history namelist used by ESM1.5.
Write separate ice history namelist used by ESM1.5, if provided.
"""
cice_nml = DEFAULT_CICE_NML
ice_history = request.param
ice_history_path = os.path.join(ctrldir, HIST_NML_NAME)

with cd(ctrldir):
# 2. Create config.nml
f90nml.write(cice_nml, CICE_NML_NAME)

if ice_history:
f90nml.write(ice_history, HIST_NML_NAME)
if ice_history:
f90nml.write(ice_history, ice_history_path)

yield {'ice_history': ice_history}

# cleanup
with cd(ctrldir):
os.remove(CICE_NML_NAME)
if ice_history:
os.remove(HIST_NML_NAME)
if ice_history:
os.remove(ice_history_path)


@pytest.mark.parametrize("config", [DEFAULT_CONFIG],
indirect=True)
def test_setup(config, cice_config_files):
def test_setup(config, cice_nml, cice_history_nml):
"""
Confirm that
1: payu overwrites cice_in with ice_history
Expand All @@ -183,9 +192,9 @@ def test_setup(config, cice_config_files):
# Check cice_in was patched with ice_history
work_input_fpath = os.path.join(model.work_path, CICE_NML_NAME)
input_nml = f90nml.read(work_input_fpath)
if cice_config_files['ice_history']:
if cice_history_nml['ice_history']:
assert (input_nml["icefields_nml"] ==
cice_config_files["ice_history"]["icefields_nml"])
cice_history_nml["ice_history"]["icefields_nml"])
else:
assert input_nml["icefields_nml"] == DEFAULT_CICE_NML["icefields_nml"]

Expand Down Expand Up @@ -238,7 +247,7 @@ def prior_restart_cice4(run_timing_params):

@pytest.mark.parametrize("config", [CONFIG_WITH_RESTART],
indirect=True)
def test_restart_setup(config, cice_config_files, prior_restart_cice4,
def test_restart_setup(config, cice_nml, cice_history_nml, prior_restart_cice4,
run_timing_params):
"""
Test that seting up an experiment from a cloned control directory
Expand Down Expand Up @@ -280,7 +289,7 @@ def test_restart_setup(config, cice_config_files, prior_restart_cice4,

@pytest.mark.parametrize("config", [DEFAULT_CONFIG],
indirect=True)
def test_no_restart_ptr(config, cice_config_files):
def test_no_restart_ptr(config, cice_nml, cice_history_nml):
"""
Test that payu raises an error if no prior restart path is specified,
restart is `true` in cice_in.nml, and the restart pointer is missing.
Expand All @@ -300,3 +309,101 @@ def test_no_restart_ptr(config, cice_config_files):
with pytest.raises(RuntimeError,
match="Cannot find previous restart file"):
model.setup()


CONFIG_WITH_COMPRESSION = {
"laboratory": "lab",
"jobname": "testrun",
"model": "cice",
"exe": "test.exe",
"experiment": ctrldir_basename,
"metadata": {"enable": False},
"compress_logs": True
}


@pytest.fixture
def cice4_log_files():
"""
Create cice log files based on ESM1.5 logs.
"""
non_pe_logs = {
"ice_diag_out": "block id, proc, local_block:",
"ice_diag.d": "istep0 = ******",
"debug.root.03": "oasis_io_read_avfile:av2_isst_ia:NetCDF:"
}
pe_logs = {
f'iceout{x:03d}': "Fake iceout file {x}"
for x in range(85, 96)
}

log_files = non_pe_logs | pe_logs

log_paths = []
for log_name, log_contents in log_files.items():
log_path = Path(expt_workdir/log_name)
with open(log_path, "w") as log:
log.write(log_contents)
log_paths.append(log_path)

yield log_files

# Cleanup
for log_file in log_paths:
try:
log_file.unlink()
except FileNotFoundError:
pass


@pytest.fixture
def non_log_file():
"""
Create a cice4 output file to be ignored by log compression.
Use cice_in.nml which is copied to the work directory in ESM1.5.
"""
non_log_path = Path(expt_workdir)/CICE_NML_NAME
non_log_path.touch()

yield non_log_path

# Cleanup
non_log_path.unlink()


@pytest.mark.parametrize("config", [CONFIG_WITH_COMPRESSION],
indirect=True)
def test_log_compression(config, cice4_log_files, non_log_file,
cice_nml # Required by expt.__init__
):
"""
Test that logfiles produced by cice during ESM1.5 simulations are
properly compressed into a tarball by cice.compress_log_files().
"""
with cd(ctrldir):
# Initialise laboratory and experiment
lab = payu.laboratory.Laboratory(lab_path=str(labdir))
expt = payu.experiment.Experiment(lab, reproduce=False)
model = expt.models[0]

# Function to test
model.compress_log_files()
aidanheerdegen marked this conversation as resolved.
Show resolved Hide resolved

# Check that log tarball created and no original logs remain
assert set(os.listdir(expt_workdir)) == {model.log_tar_name,
non_log_file.name}

# Check all logs present in tarball
log_file_names = {log_name for
log_name in cice4_log_files}

with tarfile.open(os.path.join(expt_workdir, model.log_tar_name),
mode="r") as tar:
assert set(tar.getnames()) == log_file_names
aidanheerdegen marked this conversation as resolved.
Show resolved Hide resolved

# Check contents of compressed files
for entry in tar:
entry_name = entry.name
with tar.extractfile(entry) as open_entry:
file_contents = open_entry.read().decode("utf-8")
assert file_contents == cice4_log_files[entry_name]
Loading