Skip to content

Commit

Permalink
Merge pull request payu-org#360 from jo-basevi/200-restoring-remote-a…
Browse files Browse the repository at this point in the history
…rchive

Add `payu sync` cmd for syncing archive to a remote directory
  • Loading branch information
jo-basevi authored Nov 2, 2023
2 parents a336968 + 9ac075a commit bb84f66
Show file tree
Hide file tree
Showing 13 changed files with 872 additions and 89 deletions.
4 changes: 4 additions & 0 deletions bin/payu-sync
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/usr/bin/env python

from payu.subcommands import sync_cmd
sync_cmd.runscript()
1 change: 1 addition & 0 deletions conda/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ build:
- payu-run = payu.subcommands.run_cmd:runscript
- payu-collate = payu.subcommands.collate_cmd:runscript
- payu-profile = payu.subcommands.profile_cmd:runscript
- payu-sync = payu.subcommands.sync_cmd:runscript

source:
git_url: ../
Expand Down
79 changes: 78 additions & 1 deletion docs/source/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ configuration.
able to parse restarts files for a datetime.

``restart_history``
Specifies how many of the most recent restart files to retain regardless of `restart_freq`
Specifies how many of the most recent restart files to retain regardless of
``restart_freq``.

*The following model-based tags are typically not configured*

Expand Down Expand Up @@ -382,12 +383,88 @@ Postprocessing
``error``
User-defined command to be called if model does not run correctly and
returns an error code. Useful for automatic error postmortem.

``sync``
User-defined command to be called at the start of the ``sync`` pbs job.
This is useful for any post-processing before syncing files to a remote
archive.

``postscript``
This is an older, less user-friendly, method to submit a script after ``payu
collate`` has completed. Unlike the ``userscripts``, it does not support
user commands. These scripts are always re-submitted via ``qsub``.

``sync``
Sync archive to a remote directory using rsync. Make sure that the
configured path to sync output to, i.e. ``path``, is the correct location
before enabling automatic syncing or before running ``payu sync``.

If postscript is also configured, the latest output and restart files will
not be automatically synced after a run.

``enable`` (*Default:* ``False``):
Controls whether or not a sync job is submitted either after the archive or
collation job, if collation is enabled.

``queue`` (*Default:* ``copyq``)
PBS queue used to submit the sync job.

``walltime`` (*Default:* ``10:00:00``)
Time required to run the job.

``mem`` (*Default:* ``2GB``)
Memory required for the job.

``ncpus`` (*Default:* ``1``)
Number of ncpus required for the job.

``path``
Destination path to sync archive outputs to. This must be a unique
absolute path for your experiment, otherwise, outputs will be
overwritten.

``restarts`` (*Default:* ``False``)
Sync permanently archived restarts, which are determined by
``restart_freq``.

``rsync_flags`` (*Default:* ``-vrltoD --safe-links``)
Additional flags to add to rsync commands used for syncing files.

``exclude``
Patterns to exclude from rsync commands. This is equivalent to rsync's
``--exclude PATTERN``. This can be a single pattern or a list of
patterns. If a pattern includes any special characters,
e.g. ``.*+?|[]{}()``, it will need to be quoted. For example::
exclude:
- 'iceh.????-??-??.nc'
- '*-IN-PROGRESS'

``exclude_uncollated`` (*Default:* ``True`` if collation is enabled)
Flag to exclude uncollated files from being synced. This is equivalent
to adding ``--exclude *.nc.*``.

``extra_paths``
List of ``glob`` patterns which match extra paths to sync to remote
archive. This can be a single pattern or a list of patterns.
Note that these paths will be protected against any local delete options.

``remove_local_files`` (*Default:* ``False``)
Remove local files once they are successfully synced to the remote
archive. Files in protected paths will not be deleted. Protected paths
include the ``extra_paths`` (if defined), last output, the last saved
restart (determined by ``restart_freq``), and any subsequent restarts.

``remove_local_dirs`` (*Default:* ``False``)
Remove local directories once a directory has been successfully synced.
This will delete any files in local directories that were excluded from
syncing. Similarly to ``remove_local_files``, protected paths will not be
deleted.

``runlog`` (*Default:* ``True``)
Create or update a bare git repository clone of the run history, called
``git-runlog``, in the remote archive directory.


Miscellaneous
=============
Expand Down
13 changes: 13 additions & 0 deletions docs/source/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,16 @@ Alternatively you can directly specify a directory name::
This is useful when the data files have been moved out of the payu
directory structure, or if you need to collate restart files, which is
necessary when changing processor layout.

To manually sync experiment output files to a remote archive, firstly ensure
that ``path`` in the ``sync`` namespace in ``config.yaml``,
is correctly configured as it may overwrite any pre-exisiting outputs.
Then run::

payu sync

By default ``payu sync`` will not sync the latest restarts that may be pruned
at a later date. To sync all restarts including the latest restarts, use the
``--sync-restarts`` flag::

payu sync --sync-restarts
9 changes: 8 additions & 1 deletion payu/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ def get_model_type(model_type, config):


def set_env_vars(init_run=None, n_runs=None, lab_path=None, dir_path=None,
reproduce=False, force=False, force_prune_restarts=False):
reproduce=False, force=False, force_prune_restarts=False,
sync_restarts=False, sync_ignore_last=False):
"""Construct the environment variables used by payu for resubmissions."""
payu_env_vars = {}

Expand Down Expand Up @@ -134,6 +135,12 @@ def set_env_vars(init_run=None, n_runs=None, lab_path=None, dir_path=None,
if force:
payu_env_vars['PAYU_FORCE'] = force

if sync_restarts:
payu_env_vars['PAYU_SYNC_RESTARTS'] = sync_restarts

if sync_ignore_last:
payu_env_vars['PAYU_SYNC_IGNORE_LAST'] = sync_ignore_last

if force_prune_restarts:
payu_env_vars['PAYU_FORCE_PRUNE_RESTARTS'] = force_prune_restarts

Expand Down
129 changes: 44 additions & 85 deletions payu/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
# Standard Library
import datetime
import errno
import getpass
import os
import re
import resource
Expand All @@ -33,13 +32,13 @@
from payu.runlog import Runlog
from payu.manifest import Manifest
from payu.calendar import parse_date_offset
from payu.sync import SyncToRemoteArchive

# Environment module support on vayu
# TODO: To be removed
core_modules = ['python', 'payu']

# Default payu parameters
default_archive_url = 'dc.nci.org.au'
default_restart_freq = 5


Expand Down Expand Up @@ -200,13 +199,16 @@ def max_output_index(self, output_type="output"):
if output_dirs and len(output_dirs):
return int(output_dirs[-1].lstrip(output_type))

def list_output_dirs(self, output_type="output"):
def list_output_dirs(self, output_type="output", full_path=False):
"""Return a sorted list of restart or output directories in archive"""
naming_pattern = re.compile(fr"^{output_type}[0-9][0-9][0-9]+$")
dirs = [d for d in os.listdir(self.archive_path)
if naming_pattern.match(d)]
dirs.sort(key=lambda d: int(d.lstrip(output_type)))
return dirs

if full_path:
dirs = [os.path.join(self.archive_path, d) for d in dirs]
return dirs

def set_stacksize(self, stacksize):

Expand Down Expand Up @@ -794,8 +796,8 @@ def archive(self, force_prune_restarts=False):
if archive_script:
self.run_userscript(archive_script)

# Ensure postprocess runs if model not collating
if not collating and self.postscript:
# Ensure postprocessing runs if model not collating
if not collating:
self.postprocess()

def collate(self):
Expand All @@ -807,87 +809,45 @@ def profile(self):
model.profile()

def postprocess(self):
"""Submit a postprocessing script after collation"""
assert self.postscript
envmod.setup()
envmod.module('load', 'pbs')

cmd = 'qsub {script}'.format(script=self.postscript)

cmd = shlex.split(cmd)
rc = sp.call(cmd)
assert rc == 0, 'Postprocessing script submission failed.'

def remote_archive(self, config_name, archive_url=None,
max_rsync_attempts=1, rsync_protocol=None):

if not archive_url:
archive_url = default_archive_url

archive_address = '{usr}@{url}'.format(usr=getpass.getuser(),
url=archive_url)

ssh_key_path = os.path.join(os.getenv('HOME'), '.ssh',
'id_rsa_file_transfer')

# Top-level path is implicitly set by the SSH key
# (Usually /projects/[group])

# Remote mkdir is currently not possible, so any new subdirectories
# must be created before auto-archival

remote_path = os.path.join(self.model_name, config_name, self.name)
remote_url = '{addr}:{path}'.format(addr=archive_address,
path=remote_path)
"""Submit any postprocessing scripts or remote syncing if enabled"""
# First submit postprocessing script
if self.postscript:
envmod.setup()
envmod.module('load', 'pbs')

# Rsync ouput and restart files
rsync_cmd = ('rsync -a --safe-links -e "ssh -i {key}" '
''.format(key=ssh_key_path))
cmd = 'qsub {script}'.format(script=self.postscript)

if rsync_protocol:
rsync_cmd += '--protocol={0} '.format(rsync_protocol)
cmd = shlex.split(cmd)
sp.check_call(cmd)

run_cmd = rsync_cmd + '{src} {dst}'.format(src=self.output_path,
dst=remote_url)
rsync_calls = [run_cmd]
# Submit a sync script if remote syncing is enabled
sync_config = self.config.get('sync', {})
syncing = sync_config.get('enable', False)
if syncing:
cmd = '{python} {payu} sync'.format(
python=sys.executable,
payu=self.payu_path
)

if (self.counter % 5) == 0 and os.path.isdir(self.restart_path):
# Tar restart files before rsyncing
restart_tar_path = self.restart_path + '.tar.gz'
if self.postscript:
print('payu: warning: postscript is configured, so by default '
'the lastest outputs will not be synced. To sync the '
'latest output, after the postscript job has completed '
'run:\n'
' payu sync')
cmd += f' --sync-ignore-last'

cmd = ('tar -C {0} -czf {1} {2}'
''.format(self.archive_path, restart_tar_path,
os.path.basename(self.restart_path)))
sp.check_call(shlex.split(cmd))

restart_cmd = ('{0} {1} {2}'
''.format(rsync_cmd, restart_tar_path, remote_url))
rsync_calls.append(restart_cmd)
else:
res_tar_path = None

for model in self.models:
for input_path in self.model.input_paths:
# Using explicit path separators to rename the input directory
input_cmd = rsync_cmd + '{0} {1}'.format(
input_path + os.path.sep,
os.path.join(remote_url, 'input') + os.path.sep)
rsync_calls.append(input_cmd)

for cmd in rsync_calls:
cmd = shlex.split(cmd)

for rsync_attempt in range(max_rsync_attempts):
rc = sp.Popen(cmd).wait()
if rc == 0:
break
else:
print('rsync failed, reattempting')
assert rc == 0
def sync(self):
# RUN any user scripts before syncing archive
envmod.setup()
pre_sync_script = self.userscripts.get('sync')
if pre_sync_script:
self.run_userscript(pre_sync_script)

# TODO: Temporary; this should be integrated with the rsync call
if res_tar_path and os.path.exists(res_tar_path):
os.remove(res_tar_path)
# Run rsync commmands
SyncToRemoteArchive(self).run()

def resubmit(self):
next_run = self.counter + 1
Expand Down Expand Up @@ -943,14 +903,13 @@ def sweep(self, hard_sweep=False):
default_job_name = os.path.basename(os.getcwd())
short_job_name = str(self.config.get('jobname', default_job_name))[:15]

log_filenames = [short_job_name + '.o', short_job_name + '.e']
for postfix in ['_c.o', '_c.e', '_p.o', '_p.e', '_s.o', '_s.e']:
log_filenames.append(short_job_name[:13] + postfix)

logs = [
f for f in os.listdir(os.curdir) if os.path.isfile(f) and (
f.startswith(short_job_name + '.o') or
f.startswith(short_job_name + '.e') or
f.startswith(short_job_name[:13] + '_c.o') or
f.startswith(short_job_name[:13] + '_c.e') or
f.startswith(short_job_name[:13] + '_p.o') or
f.startswith(short_job_name[:13] + '_p.e')
f.startswith(tuple(log_filenames))
)
]

Expand Down
4 changes: 4 additions & 0 deletions payu/schedulers/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,13 @@ def submit(self, pbs_script, pbs_config, pbs_vars=None, python_exe=None):
short_path = pbs_config.get('shortpath', None)
if short_path is not None:
extra_search_paths.append(short_path)

module_use_paths = pbs_config.get('modules', {}).get('use', [])
extra_search_paths.extend(module_use_paths)

remote_sync_directory = pbs_config.get('sync', {}).get('path', None)
if remote_sync_directory is not None:
extra_search_paths.append(remote_sync_directory)
storages.update(find_mounts(extra_search_paths, mounts))
storages.update(find_mounts(get_manifest_paths(), mounts))

Expand Down
23 changes: 23 additions & 0 deletions payu/subcommands/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,26 @@
archive, ignoring changes made to configuration.',
}
}

# Flag for syncing all restarts
sync_restarts = {
'flags': {'--sync-restarts'},
'parameters': {
'action': 'store_true',
'dest': 'sync_restarts',
'default': False,
'help': 'Sync all restarts in archive to remote directory.',
}
}

# Flag for ignoring the latest outputs during syncing
sync_ignore_last = {
'flags': {'--sync-ignore-last'},
'parameters': {
'action': 'store_true',
'dest': 'sync_ignore_last',
'default': False,
'help': 'Ignore the latest outputs and restarts in archive during \
syncing.',
}
}
3 changes: 1 addition & 2 deletions payu/subcommands/collate_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,5 +109,4 @@ def runscript():
run_args.lab_path)
expt = Experiment(lab)
expt.collate()
if expt.postscript:
expt.postprocess()
expt.postprocess()
Loading

0 comments on commit bb84f66

Please sign in to comment.