From 0575418d125f6f340203f40251e283bf4e3bbd5f Mon Sep 17 00:00:00 2001 From: Jo Basevi Date: Thu, 24 Aug 2023 14:02:14 +1000 Subject: [PATCH 1/3] Add `payu sync` cmd for syncing archive to a remote directory - Extended postprocess() to run `payu sync`, if syncing is enabled - Automatically check sync path for storage paths --- bin/payu-sync | 4 + conda/meta.yaml | 1 + payu/cli.py | 9 +- payu/experiment.py | 152 ++++++++++++++++++++++++++++---- payu/schedulers/pbs.py | 6 ++ payu/subcommands/args.py | 24 +++++ payu/subcommands/collate_cmd.py | 3 +- payu/subcommands/sync_cmd.py | 109 +++++++++++++++++++++++ setup.py | 1 + 9 files changed, 290 insertions(+), 19 deletions(-) create mode 100644 bin/payu-sync create mode 100644 payu/subcommands/sync_cmd.py diff --git a/bin/payu-sync b/bin/payu-sync new file mode 100644 index 00000000..644453d2 --- /dev/null +++ b/bin/payu-sync @@ -0,0 +1,4 @@ +#!/usr/bin/env python + +from payu.subcommands import sync_cmd +sync_cmd.runscript() \ No newline at end of file diff --git a/conda/meta.yaml b/conda/meta.yaml index 7b88f4b1..6f293879 100644 --- a/conda/meta.yaml +++ b/conda/meta.yaml @@ -11,6 +11,7 @@ build: - payu-run = payu.subcommands.run_cmd:runscript - payu-collate = payu.subcommands.collate_cmd:runscript - payu-profile = payu.subcommands.profile_cmd:runscript + - payu-sync = payu.subcommands.sync_cmd:runscript source: git_url: ../ diff --git a/payu/cli.py b/payu/cli.py index a2876a14..05e0237b 100644 --- a/payu/cli.py +++ b/payu/cli.py @@ -89,7 +89,8 @@ def get_model_type(model_type, config): def set_env_vars(init_run=None, n_runs=None, lab_path=None, dir_path=None, - reproduce=False, force=False, force_prune_restarts=False): + reproduce=False, force=False, force_prune_restarts=False, + sync_path=None, sync_restarts=False): """Construct the environment variables used by payu for resubmissions.""" payu_env_vars = {} @@ -133,6 +134,12 @@ def set_env_vars(init_run=None, n_runs=None, lab_path=None, dir_path=None, if force: payu_env_vars['PAYU_FORCE'] = force + + if sync_path: + payu_env_vars['PAYU_SYNC_PATH'] = os.path.normpath(sync_path) + + if sync_restarts: + payu_env_vars['PAYU_SYNC_RESTARTS'] = sync_restarts if force_prune_restarts: payu_env_vars['PAYU_FORCE_PRUNE_RESTARTS'] = force_prune_restarts diff --git a/payu/experiment.py b/payu/experiment.py index 15f5f8cb..8159a900 100644 --- a/payu/experiment.py +++ b/payu/experiment.py @@ -794,8 +794,8 @@ def archive(self, force_prune_restarts=False): if archive_script: self.run_userscript(archive_script) - # Ensure postprocess runs if model not collating - if not collating and self.postscript: + # Ensure postprocessing runs if model not collating + if not collating: self.postprocess() def collate(self): @@ -807,16 +807,137 @@ def profile(self): model.profile() def postprocess(self): - """Submit a postprocessing script after collation""" - assert self.postscript - envmod.setup() - envmod.module('load', 'pbs') + """Submit any postprocessing scripts or remote syncing if enabled""" + if self.postscript: + envmod.setup() + envmod.module('load', 'pbs') - cmd = 'qsub {script}'.format(script=self.postscript) + cmd = 'qsub {script}'.format(script=self.postscript) + + cmd = shlex.split(cmd) + rc = sp.call(cmd) + assert rc == 0, 'Postprocessing script submission failed.' + + sync_config = self.config.get('sync', {}) + syncing = sync_config.get('enable', False) + if syncing: + cmd = '{python} {payu} sync -i {expt}'.format( + python=sys.executable, + payu=self.payu_path, + expt=self.counter + ) + + # Add any hard-coded envt variables + dir_path = os.environ.get('PAYU_DIR_PATH') + if dir_path: + cmd += f' -d {dir_path}' + lab_path = os.environ.get('PAYU_LAB_PATH') + if lab_path: + cmd += f' -l {lab_path}' + sync_path = os.environ.get('PAYU_SYNC') + if sync_path: + cmd += f' --syncdir {sync_path}' + sync_restarts = os.environ.get('PAYU_SYNC_RESTARTS') + if sync_restarts: + cmd += f' --restarts' + + sp.check_call(shlex.split(cmd)) + + + def sync(self, max_rsync_attempts=1): + """Sync archive to remote directory""" + #TODO: Could automate directory to sync outputs to e.g. /g/data/{project}/{user}/{experimentname}/archive/? + + sync_config = self.config.get('sync', {}) + + # Directory to sync output to + sync_path = os.environ.get('PAYU_SYNC_PATH', None) + if not sync_path: + sync_path = sync_config.get('directory', None) + + assert sync_path, "no directory to sync outputs configured" + print("remote archive sync path: ", sync_path) + + # Syncing restarts flag + restarts_config = sync_config.get('restarts', {}) + sync_restarts = os.environ.get('PAYU_SYNC_RESTARTS', False) + if not sync_restarts: + sync_restarts = restarts_config.get('enable', False) + + # Additional Rsync flags + rsync_flags = sync_config.get('rsync_flags', '') + exclude = sync_config.get('exclude', '') + + # RUN user scripts before syncing archive - any postprocessing + pre_sync_script = self.userscripts.get('sync') + if pre_sync_script: + self.run_userscript(pre_sync_script) + + restart_freq = self.config.get('restart_freq', default_restart_freq) + def syncing_restart(restart_path, counter): + """Return True if syncing restarts is enabled and restart can be archived permanently""" + # TODO: Also use date based restart frequency if implemented + return sync_restarts and (counter % restart_freq) == 0 and os.path.isdir(restart_path) + + # Add output to rsync + src_paths = [] + syncing_output = True + + # Check whether output path has been hard-coded + if os.environ.get('PAYU_DIR_PATH'): + # Check whether output path is actually a restart dir + # e.g. when running `payu collate -d archive/restart` + basename = os.path.basename(self.output_path) + if basename.startswith('restart'): + counter = int(basename.lstrip('restart')) + syncing_output = syncing_restart(self.output_path, counter) + sync_restarts = False # Do not attempt syncing restarts later + + if syncing_output: + src_paths.append(self.output_path) + + # Add restart to rsync, if enabled + if sync_restarts: + collate_config = self.config.get('collate', {}) + collated = collate_config.get('enable', True) + restart_path = None + if collated: + collated_prior_restart = collate_config.get('restart', False) + if collated_prior_restart: + # Sync prior restarts as latest restarts aren't collated automatically + restart_path = self.prior_restart_path + else: + restart_path = self.restart_path + + if restart_path and syncing_restart(restart_path, self.counter - 1): + src_paths.append(self.prior_restart_path) + + # Add pbs and error logs to rsync + for log_type in ['error_logs', 'pbs_logs']: + log_path = os.path.join(self.archive_path, log_type) + if os.path.isdir(log_path): + src_paths.append(log_path) + + # Build rsync commands + rsync_cmd = f'rsync -vrltoD --safe-links {rsync_flags}' + rsync_calls = [] + for src in src_paths: + run_cmd = f'{rsync_cmd} {exclude} {src} {sync_path}' + rsync_calls.append(run_cmd) + + # Execute rsync commands + for cmd in rsync_calls: + print(cmd) + cmd = shlex.split(cmd) + + for rsync_attempt in range(max_rsync_attempts): + rc = sp.Popen(cmd).wait() + if rc == 0: + break + else: + print('rsync failed, reattempting') + assert rc == 0 - cmd = shlex.split(cmd) - rc = sp.call(cmd) - assert rc == 0, 'Postprocessing script submission failed.' def remote_archive(self, config_name, archive_url=None, max_rsync_attempts=1, rsync_protocol=None): @@ -943,14 +1064,13 @@ def sweep(self, hard_sweep=False): default_job_name = os.path.basename(os.getcwd()) short_job_name = str(self.config.get('jobname', default_job_name))[:15] + log_filenames = [short_job_name + '.o', short_job_name + '.e'] + for postfix in ['_c.o', '_c.e', '_p.o', '_p.e', '_s.o', '_s.e']: + log_filenames.append(short_job_name[:13] + postfix) + logs = [ f for f in os.listdir(os.curdir) if os.path.isfile(f) and ( - f.startswith(short_job_name + '.o') or - f.startswith(short_job_name + '.e') or - f.startswith(short_job_name[:13] + '_c.o') or - f.startswith(short_job_name[:13] + '_c.e') or - f.startswith(short_job_name[:13] + '_p.o') or - f.startswith(short_job_name[:13] + '_p.e') + f.startswith(tuple(log_filenames)) ) ] diff --git a/payu/schedulers/pbs.py b/payu/schedulers/pbs.py index cae19e42..6a881a9d 100644 --- a/payu/schedulers/pbs.py +++ b/payu/schedulers/pbs.py @@ -108,9 +108,15 @@ def submit(self, pbs_script, pbs_config, pbs_vars=None, python_exe=None): short_path = pbs_config.get('shortpath', None) if short_path is not None: extra_search_paths.append(short_path) + module_use_paths = pbs_config.get('modules', {}).get('use', []) extra_search_paths.extend(module_use_paths) + remote_sync_directory = pbs_vars.get('PAYU_SYNC_PATH', None) + if remote_sync_directory is None: + remote_sync_directory = pbs_config.get('sync', {}).get('directory', None) + if remote_sync_directory is not None: + extra_search_paths.append(remote_sync_directory) storages.update(find_mounts(extra_search_paths, mounts)) storages.update(find_mounts(get_manifest_paths(), mounts)) diff --git a/payu/subcommands/args.py b/payu/subcommands/args.py index 167fbec6..b01b35cf 100644 --- a/payu/subcommands/args.py +++ b/payu/subcommands/args.py @@ -126,3 +126,27 @@ archive, ignoring changes made to configuration.', } } + +# Specify a remote directory to sync output to +sync_path = { + 'flags': {'--syncdir', '-s'}, + 'parameters': { + 'action': 'store', + 'dest': 'sync_path', + 'default': None, + 'help': 'The remote directory to sync output to, this will over-ride the \ + value given in config.yaml', + } +} + +# Flag for enabling syncing restarts +sync_restarts = { + 'flags': {'--restarts', '-R'}, + 'parameters': { + 'action': 'store_true', + 'dest': 'sync_restarts', + 'default': False, + 'help': 'Sync restarts to remote directory, this will over-ride the \ + value given in config.yaml', + } +} diff --git a/payu/subcommands/collate_cmd.py b/payu/subcommands/collate_cmd.py index 4f495aa2..f481b8e9 100644 --- a/payu/subcommands/collate_cmd.py +++ b/payu/subcommands/collate_cmd.py @@ -109,5 +109,4 @@ def runscript(): run_args.lab_path) expt = Experiment(lab) expt.collate() - if expt.postscript: - expt.postprocess() + expt.postprocess() diff --git a/payu/subcommands/sync_cmd.py b/payu/subcommands/sync_cmd.py new file mode 100644 index 00000000..59148ec1 --- /dev/null +++ b/payu/subcommands/sync_cmd.py @@ -0,0 +1,109 @@ +# coding: utf-8 + +# Standard Library +import argparse +import os + +# Local +from payu import cli +from payu.experiment import Experiment +from payu.laboratory import Laboratory +import payu.subcommands.args as args +from payu import fsops + +title = 'sync' +parameters = {'description': 'Sync model output to a remote directory'} + +arguments = [args.model, args.config, args.initial, args.laboratory, + args.dir_path, args.sync_path, args.sync_restarts] + +def runcmd(model_type, config_path, init_run, lab_path, dir_path, sync_path, sync_restarts): + + pbs_config = fsops.read_config(config_path) + + #TODO: Setting script args as env variables vs appending them at the end of qsub call after payu-sync? + # Went with setting env variables as thats whats done elsewhere + # Though with PBSPro can pass arguments after script name and then could be able to pass arguments directly to expt.sync()? + pbs_vars = cli.set_env_vars(init_run=init_run, + lab_path=lab_path, + dir_path=dir_path, + sync_path=sync_path, + sync_restarts=sync_restarts) + + sync_config = pbs_config.get('sync', {}) + + default_ncpus = 1 + default_queue = 'copyq' + default_mem = '2GB' + + pbs_config['queue'] = sync_config.get('queue', default_queue) + + pbs_config['ncpus'] = sync_config.get('ncpus', default_ncpus) + + pbs_config['mem'] = sync_config.get('mem', default_mem) + + sync_jobname = sync_config.get('jobname') + if not sync_jobname: + pbs_jobname = pbs_config.get('jobname') + if not pbs_jobname: + if dir_path and os.path.isdir(dir_path): + pbs_jobname = os.path.basename(dir_path) + else: + pbs_jobname = os.path.basename(os.getcwd()) + + sync_jobname = pbs_jobname[:13] + '_s' + + pbs_config['jobname'] = sync_jobname[:15] + + # Replace (or remove) walltime + walltime = sync_config.get('walltime') + if walltime: + pbs_config['walltime'] = walltime + else: + # Remove the model walltime if set + try: + pbs_config.pop('walltime') + except KeyError: + pass + + # Disable hyperthreading + qsub_flags = [] + iflags = iter(pbs_config.get('qsub_flags', '').split()) + for flag in iflags: + if flag == '-l': + try: + flag += ' ' + next(iflags) + except StopIteration: + break + + if 'hyperthread' not in flag: + qsub_flags.append(flag) + + pbs_config['qsub_flags'] = ' '.join(qsub_flags) + + cli.submit_job('payu-sync', pbs_config, pbs_vars) + + +def runscript(): + # Currently these run_args are only ever set running `payu-sync` with args directly rather than `payu sync` + parser = argparse.ArgumentParser() + for arg in arguments: + parser.add_argument(*arg['flags'], **arg['parameters']) + + run_args = parser.parse_args() + + pbs_vars = cli.set_env_vars(init_run=run_args.init_run, + lab_path=run_args.lab_path, + dir_path=run_args.dir_path, + sync_path=run_args.sync_path, + sync_restarts=run_args.sync_restarts) + + for var in pbs_vars: + os.environ[var] = str(pbs_vars[var]) + + lab = Laboratory(run_args.model_type, + run_args.config_path, + run_args.lab_path) + expt = Experiment(lab) + + expt.sync() diff --git a/setup.py b/setup.py index 5e870aa2..9b7b2dbc 100644 --- a/setup.py +++ b/setup.py @@ -58,6 +58,7 @@ 'payu-run = payu.subcommands.run_cmd:runscript', 'payu-collate = payu.subcommands.collate_cmd:runscript', 'payu-profile = payu.subcommands.profile_cmd:runscript', + 'payu-sync = payu.subcommands.sync_cmd:runscript', ] }, classifiers=[ From 1849c26037f0daba65ed16fcf0dc0fd10c9e135c Mon Sep 17 00:00:00 2001 From: Jo Basevi Date: Tue, 3 Oct 2023 08:48:58 +1100 Subject: [PATCH 2/3] Merge changes from date-based restart pruning - If syncing restarts is enabled, only sync restarts that will be permanently archived - Add config options for syncing to remote machine, - Add payu sync cmd options to sync all restarts - Add tests - Change logic to automatically sync all output (rather than just the lastest output) --- payu/cli.py | 8 +- payu/experiment.py | 295 +++++++++++++++-------------------- payu/subcommands/args.py | 27 ++-- payu/subcommands/sync_cmd.py | 34 ++-- test/test_sync.py | 214 +++++++++++++++++++++++++ 5 files changed, 370 insertions(+), 208 deletions(-) create mode 100644 test/test_sync.py diff --git a/payu/cli.py b/payu/cli.py index 05e0237b..433dc216 100644 --- a/payu/cli.py +++ b/payu/cli.py @@ -90,7 +90,7 @@ def get_model_type(model_type, config): def set_env_vars(init_run=None, n_runs=None, lab_path=None, dir_path=None, reproduce=False, force=False, force_prune_restarts=False, - sync_path=None, sync_restarts=False): + sync_restarts=False, sync_ignore_last=False): """Construct the environment variables used by payu for resubmissions.""" payu_env_vars = {} @@ -134,13 +134,13 @@ def set_env_vars(init_run=None, n_runs=None, lab_path=None, dir_path=None, if force: payu_env_vars['PAYU_FORCE'] = force - - if sync_path: - payu_env_vars['PAYU_SYNC_PATH'] = os.path.normpath(sync_path) if sync_restarts: payu_env_vars['PAYU_SYNC_RESTARTS'] = sync_restarts + if sync_ignore_last: + payu_env_vars['PAYU_SYNC_IGNORE_LAST'] = sync_ignore_last + if force_prune_restarts: payu_env_vars['PAYU_FORCE_PRUNE_RESTARTS'] = force_prune_restarts diff --git a/payu/experiment.py b/payu/experiment.py index 8159a900..df11ac42 100644 --- a/payu/experiment.py +++ b/payu/experiment.py @@ -39,7 +39,6 @@ core_modules = ['python', 'payu'] # Default payu parameters -default_archive_url = 'dc.nci.org.au' default_restart_freq = 5 @@ -200,13 +199,16 @@ def max_output_index(self, output_type="output"): if output_dirs and len(output_dirs): return int(output_dirs[-1].lstrip(output_type)) - def list_output_dirs(self, output_type="output"): + def list_output_dirs(self, output_type="output", full_path=False): """Return a sorted list of restart or output directories in archive""" naming_pattern = re.compile(fr"^{output_type}[0-9][0-9][0-9]+$") dirs = [d for d in os.listdir(self.archive_path) if naming_pattern.match(d)] dirs.sort(key=lambda d: int(d.lstrip(output_type))) - return dirs + + if full_path: + dirs = [os.path.join(self.archive_path, d) for d in dirs] + return dirs def set_stacksize(self, stacksize): @@ -808,6 +810,7 @@ def profile(self): def postprocess(self): """Submit any postprocessing scripts or remote syncing if enabled""" + # First submit postprocessing script if self.postscript: envmod.setup() envmod.module('load', 'pbs') @@ -815,200 +818,150 @@ def postprocess(self): cmd = 'qsub {script}'.format(script=self.postscript) cmd = shlex.split(cmd) - rc = sp.call(cmd) - assert rc == 0, 'Postprocessing script submission failed.' - + sp.check_call(cmd) + + # Submit a sync script if remote syncing is enabled sync_config = self.config.get('sync', {}) syncing = sync_config.get('enable', False) - if syncing: - cmd = '{python} {payu} sync -i {expt}'.format( + if syncing: + cmd = '{python} {payu} sync'.format( python=sys.executable, - payu=self.payu_path, - expt=self.counter + payu=self.payu_path ) - # Add any hard-coded envt variables - dir_path = os.environ.get('PAYU_DIR_PATH') - if dir_path: - cmd += f' -d {dir_path}' - lab_path = os.environ.get('PAYU_LAB_PATH') - if lab_path: - cmd += f' -l {lab_path}' - sync_path = os.environ.get('PAYU_SYNC') - if sync_path: - cmd += f' --syncdir {sync_path}' - sync_restarts = os.environ.get('PAYU_SYNC_RESTARTS') - if sync_restarts: - cmd += f' --restarts' + if self.postscript: + print('payu: warning: postscript is configured, so by default ' + 'the lastest outputs will not be synced. To sync the ' + 'latest output, after the postscript job has completed ' + 'run:\n' + ' payu sync') + cmd += f' --sync-ignore-last' sp.check_call(shlex.split(cmd)) - - def sync(self, max_rsync_attempts=1): - """Sync archive to remote directory""" - #TODO: Could automate directory to sync outputs to e.g. /g/data/{project}/{user}/{experimentname}/archive/? - + def get_archive_paths_to_sync(self): + """Returns a list of dirs/files in archive to sync to remote archive""" sync_config = self.config.get('sync', {}) - # Directory to sync output to - sync_path = os.environ.get('PAYU_SYNC_PATH', None) - if not sync_path: - sync_path = sync_config.get('directory', None) - - assert sync_path, "no directory to sync outputs configured" - print("remote archive sync path: ", sync_path) - - # Syncing restarts flag - restarts_config = sync_config.get('restarts', {}) - sync_restarts = os.environ.get('PAYU_SYNC_RESTARTS', False) - if not sync_restarts: - sync_restarts = restarts_config.get('enable', False) - - # Additional Rsync flags - rsync_flags = sync_config.get('rsync_flags', '') - exclude = sync_config.get('exclude', '') - - # RUN user scripts before syncing archive - any postprocessing - pre_sync_script = self.userscripts.get('sync') - if pre_sync_script: - self.run_userscript(pre_sync_script) - - restart_freq = self.config.get('restart_freq', default_restart_freq) - def syncing_restart(restart_path, counter): - """Return True if syncing restarts is enabled and restart can be archived permanently""" - # TODO: Also use date based restart frequency if implemented - return sync_restarts and (counter % restart_freq) == 0 and os.path.isdir(restart_path) - - # Add output to rsync - src_paths = [] - syncing_output = True - - # Check whether output path has been hard-coded - if os.environ.get('PAYU_DIR_PATH'): - # Check whether output path is actually a restart dir - # e.g. when running `payu collate -d archive/restart` - basename = os.path.basename(self.output_path) - if basename.startswith('restart'): - counter = int(basename.lstrip('restart')) - syncing_output = syncing_restart(self.output_path, counter) - sync_restarts = False # Do not attempt syncing restarts later - - if syncing_output: - src_paths.append(self.output_path) - - # Add restart to rsync, if enabled - if sync_restarts: - collate_config = self.config.get('collate', {}) - collated = collate_config.get('enable', True) - restart_path = None - if collated: - collated_prior_restart = collate_config.get('restart', False) - if collated_prior_restart: - # Sync prior restarts as latest restarts aren't collated automatically - restart_path = self.prior_restart_path - else: - restart_path = self.restart_path + # Get sorted lists of outputs and restarts in archive + outputs = self.list_output_dirs(output_type='output', full_path=True) + restarts = self.list_output_dirs(output_type='restart', full_path=True) + + # Ignore the latest output/restart if flagged + ignore_last_outputs = os.environ.get('PAYU_SYNC_IGNORE_LAST', False) + if ignore_last_outputs: + if len(outputs) > 0: + outputs.pop() + if len(restarts) > 0: + restarts.pop() + + # Add outputs to rsync paths + src_paths = outputs + + # Get auto-sync restart flag + syncing_restarts = sync_config.get('restarts', False) + syncing_all_restarts = os.environ.get('PAYU_SYNC_RESTARTS', False) + + # Add restarts to rsync paths + if syncing_all_restarts: + # Sync all restarts + src_paths.extend(restarts) + elif syncing_restarts: + # Only sync restarts that will be permanently archived + restarts_to_prune = self.get_restarts_to_prune( + ignore_intermediate_restarts=True) + for restart_path in restarts: + restart = os.path.basename(restart_path) + if restart not in restarts_to_prune: + src_paths.append(restart_path) - if restart_path and syncing_restart(restart_path, self.counter - 1): - src_paths.append(self.prior_restart_path) - # Add pbs and error logs to rsync for log_type in ['error_logs', 'pbs_logs']: log_path = os.path.join(self.archive_path, log_type) if os.path.isdir(log_path): src_paths.append(log_path) - # Build rsync commands - rsync_cmd = f'rsync -vrltoD --safe-links {rsync_flags}' - rsync_calls = [] - for src in src_paths: - run_cmd = f'{rsync_cmd} {exclude} {src} {sync_path}' - rsync_calls.append(run_cmd) - - # Execute rsync commands - for cmd in rsync_calls: - print(cmd) - cmd = shlex.split(cmd) - - for rsync_attempt in range(max_rsync_attempts): - rc = sp.Popen(cmd).wait() - if rc == 0: - break - else: - print('rsync failed, reattempting') - assert rc == 0 + return src_paths + def sync(self): + """Sync archive to remote directory""" + # RUN any user scripts before syncing archive + pre_sync_script = self.userscripts.get('sync') + if pre_sync_script: + self.run_userscript(pre_sync_script) - def remote_archive(self, config_name, archive_url=None, - max_rsync_attempts=1, rsync_protocol=None): + sync_config = self.config.get('sync', {}) - if not archive_url: - archive_url = default_archive_url + # Remote archive user + default_user = getpass.getuser() + remote_user = sync_config.get('user', default_user) - archive_address = '{usr}@{url}'.format(usr=getpass.getuser(), - url=archive_url) + # Remote archive url + remote_url = sync_config.get('url', None) + # Flag if syncing to remote machine + remote_syncing = remote_url is not None - ssh_key_path = os.path.join(os.getenv('HOME'), '.ssh', - 'id_rsa_file_transfer') + # Remote path to sync output to + dest_path = sync_config.get('path', None) + if not remote_syncing: + if dest_path is None: + # Automate destination path to: + # /g/data/{project}/{user}/{experiment_name}/archive + project = self.config.get('project', os.environ['PROJECT']) + dest_path = os.path.join('/', 'g', 'data', project, + remote_user, self.name, 'archive') - # Top-level path is implicitly set by the SSH key - # (Usually /projects/[group]) + # Create destination directory + mkdir_p(dest_path) - # Remote mkdir is currently not possible, so any new subdirectories - # must be created before auto-archival + # Build rsync commands + rsync_cmd = f'rsync -vrltoD --safe-links' - remote_path = os.path.join(self.model_name, config_name, self.name) - remote_url = '{addr}:{path}'.format(addr=archive_address, - path=remote_path) + # Add any additional rsync flags, e.g. more exclusions + additional_rsync_flags = sync_config.get('rsync_flags', None) + if additional_rsync_flags: + rsync_cmd += f' {additional_rsync_flags}' - # Rsync ouput and restart files - rsync_cmd = ('rsync -a --safe-links -e "ssh -i {key}" ' - ''.format(key=ssh_key_path)) + # Add exclusion for uncollated files + ignore_uncollated_files = sync_config.get('ignore_uncollated', True) + if ignore_uncollated_files: + rsync_cmd += ' --exclude *.nc.*' + # Add rsync protocol, if defined + rsync_protocol = sync_config.get('rsync_protocol', None) if rsync_protocol: - rsync_cmd += '--protocol={0} '.format(rsync_protocol) - - run_cmd = rsync_cmd + '{src} {dst}'.format(src=self.output_path, - dst=remote_url) - rsync_calls = [run_cmd] - - if (self.counter % 5) == 0 and os.path.isdir(self.restart_path): - # Tar restart files before rsyncing - restart_tar_path = self.restart_path + '.tar.gz' - - cmd = ('tar -C {0} -czf {1} {2}' - ''.format(self.archive_path, restart_tar_path, - os.path.basename(self.restart_path))) - sp.check_call(shlex.split(cmd)) - - restart_cmd = ('{0} {1} {2}' - ''.format(rsync_cmd, restart_tar_path, remote_url)) - rsync_calls.append(restart_cmd) - else: - res_tar_path = None - - for model in self.models: - for input_path in self.model.input_paths: - # Using explicit path separators to rename the input directory - input_cmd = rsync_cmd + '{0} {1}'.format( - input_path + os.path.sep, - os.path.join(remote_url, 'input') + os.path.sep) - rsync_calls.append(input_cmd) - - for cmd in rsync_calls: - cmd = shlex.split(cmd) - - for rsync_attempt in range(max_rsync_attempts): - rc = sp.Popen(cmd).wait() - if rc == 0: - break - else: - print('rsync failed, reattempting') - assert rc == 0 - - # TODO: Temporary; this should be integrated with the rsync call - if res_tar_path and os.path.exists(res_tar_path): - os.remove(res_tar_path) + rsync_cmd += f' --protocol={rsync_protocol}' + + # Add remote host rsync options + if remote_syncing: + ssh_key_path = os.path.join(os.getenv('HOME'), '.ssh', + 'id_rsa_file_transfer') + rsync_cmd += f' -e "ssh -i {ssh_key_path}"' + # TODO: Below comments from previous remote_archive- Need to verify + # Top-level path is implicitly set by the SSH key + # (Usually /projects/[group]) + + # Remote mkdir is currently not possible, so any new subdirectories + # must be created before auto-archival + # TODO: If so, need to add instructions to create archive to docs + if dest_path is None: + # TODO: What should be the default path for remote archive + os.path.join(self.model_name, self.name) + dest_path = f'{remote_user}@{remote_url}:{dest_path}' + + # Get archive source paths to sync + src_paths = self.get_archive_paths_to_sync() + + # Run rsync commands + for src_path in src_paths: + run_cmd = f'{rsync_cmd} {src_path} {dest_path}' + cmd = shlex.split(run_cmd) + + rc = sp.Popen(cmd).wait() + if rc != 0: + raise sp.CalledProcessError( + 'payu: Error syncing archive to remote directory: ', + f'rsync failed after with command: {cmd}') def resubmit(self): next_run = self.counter + 1 @@ -1065,9 +1018,9 @@ def sweep(self, hard_sweep=False): short_job_name = str(self.config.get('jobname', default_job_name))[:15] log_filenames = [short_job_name + '.o', short_job_name + '.e'] - for postfix in ['_c.o', '_c.e', '_p.o', '_p.e', '_s.o', '_s.e']: + for postfix in ['_c.o', '_c.e', '_p.o', '_p.e', '_s.o', '_s.e']: log_filenames.append(short_job_name[:13] + postfix) - + logs = [ f for f in os.listdir(os.curdir) if os.path.isfile(f) and ( f.startswith(tuple(log_filenames)) diff --git a/payu/subcommands/args.py b/payu/subcommands/args.py index b01b35cf..253701a9 100644 --- a/payu/subcommands/args.py +++ b/payu/subcommands/args.py @@ -127,26 +127,25 @@ } } -# Specify a remote directory to sync output to -sync_path = { - 'flags': {'--syncdir', '-s'}, +# Flag for syncing all restarts +sync_restarts = { + 'flags': {'--sync-restarts'}, 'parameters': { - 'action': 'store', - 'dest': 'sync_path', - 'default': None, - 'help': 'The remote directory to sync output to, this will over-ride the \ - value given in config.yaml', + 'action': 'store_true', + 'dest': 'sync_restarts', + 'default': False, + 'help': 'Sync all restarts in archive to remote directory.', } } -# Flag for enabling syncing restarts -sync_restarts = { - 'flags': {'--restarts', '-R'}, +# Flag for ignoring the latest outputs during syncing +sync_ignore_last = { + 'flags': {'--sync-ignore-last'}, 'parameters': { 'action': 'store_true', - 'dest': 'sync_restarts', + 'dest': 'sync_ignore_last', 'default': False, - 'help': 'Sync restarts to remote directory, this will over-ride the \ - value given in config.yaml', + 'help': 'Ignore the latest outputs and restarts in archive during \ + syncing.', } } diff --git a/payu/subcommands/sync_cmd.py b/payu/subcommands/sync_cmd.py index 59148ec1..9618717e 100644 --- a/payu/subcommands/sync_cmd.py +++ b/payu/subcommands/sync_cmd.py @@ -4,7 +4,7 @@ import argparse import os -# Local +# Local from payu import cli from payu.experiment import Experiment from payu.laboratory import Laboratory @@ -14,24 +14,22 @@ title = 'sync' parameters = {'description': 'Sync model output to a remote directory'} -arguments = [args.model, args.config, args.initial, args.laboratory, - args.dir_path, args.sync_path, args.sync_restarts] +arguments = [args.model, args.config, args.laboratory, args.dir_path, + args.sync_restarts, args.sync_ignore_last] -def runcmd(model_type, config_path, init_run, lab_path, dir_path, sync_path, sync_restarts): + +def runcmd(model_type, config_path, lab_path, dir_path, sync_restarts, + sync_ignore_last): pbs_config = fsops.read_config(config_path) - #TODO: Setting script args as env variables vs appending them at the end of qsub call after payu-sync? - # Went with setting env variables as thats whats done elsewhere - # Though with PBSPro can pass arguments after script name and then could be able to pass arguments directly to expt.sync()? - pbs_vars = cli.set_env_vars(init_run=init_run, - lab_path=lab_path, + pbs_vars = cli.set_env_vars(lab_path=lab_path, dir_path=dir_path, - sync_path=sync_path, - sync_restarts=sync_restarts) + sync_restarts=sync_restarts, + sync_ignore_last=sync_ignore_last) sync_config = pbs_config.get('sync', {}) - + default_ncpus = 1 default_queue = 'copyq' default_mem = '2GB' @@ -60,7 +58,7 @@ def runcmd(model_type, config_path, init_run, lab_path, dir_path, sync_path, syn if walltime: pbs_config['walltime'] = walltime else: - # Remove the model walltime if set + # Remove walltime if set try: pbs_config.pop('walltime') except KeyError: @@ -85,22 +83,20 @@ def runcmd(model_type, config_path, init_run, lab_path, dir_path, sync_path, syn def runscript(): - # Currently these run_args are only ever set running `payu-sync` with args directly rather than `payu sync` parser = argparse.ArgumentParser() for arg in arguments: parser.add_argument(*arg['flags'], **arg['parameters']) run_args = parser.parse_args() - pbs_vars = cli.set_env_vars(init_run=run_args.init_run, - lab_path=run_args.lab_path, + pbs_vars = cli.set_env_vars(lab_path=run_args.lab_path, dir_path=run_args.dir_path, - sync_path=run_args.sync_path, - sync_restarts=run_args.sync_restarts) + sync_restarts=run_args.sync_restarts, + sync_ignore_last=run_args.sync_ignore_last) for var in pbs_vars: os.environ[var] = str(pbs_vars[var]) - + lab = Laboratory(run_args.model_type, run_args.config_path, run_args.lab_path) diff --git a/test/test_sync.py b/test/test_sync.py new file mode 100644 index 00000000..320c7f08 --- /dev/null +++ b/test/test_sync.py @@ -0,0 +1,214 @@ +import os +import copy +import shutil + +import pytest + +import payu + +from test.common import cd +from test.common import tmpdir, ctrldir, labdir, expt_archive_dir +from test.common import config as config_orig +from test.common import write_config +from test.common import make_all_files +from test.common import make_expt_archive_dir + +verbose = True + +# Global config +config = copy.deepcopy(config_orig) + + +def setup_module(module): + """ + Put any test-wide setup code in here, e.g. creating test files + """ + if verbose: + print("setup_module module:%s" % module.__name__) + + # Should be taken care of by teardown, in case remnants lying around + try: + shutil.rmtree(tmpdir) + except FileNotFoundError: + pass + + try: + tmpdir.mkdir() + labdir.mkdir() + ctrldir.mkdir() + make_all_files() + except Exception as e: + print(e) + + # Create 5 restarts and outputs + for dir_type in ['restart', 'output']: + for i in range(5): + make_expt_archive_dir(type=dir_type, index=i) + + +def teardown_module(module): + """ + Put any test-wide teardown code in here, e.g. removing test outputs + """ + if verbose: + print("teardown_module module:%s" % module.__name__) + + try: + shutil.rmtree(tmpdir) + print('removing tmp') + except Exception as e: + print(e) + + +@pytest.mark.parametrize( + "additional_config, expected_dirs_to_sync", + [ + ( + {}, + ['output000', 'output001', 'output002', 'output003', 'output004'] + ), + ( + { + "sync": { + 'restarts': True + }, + "restart_freq": 5 + }, + ['output000', 'output001', 'output002', 'output003', 'output004', + 'restart000'] + ), + ( + { + "sync": { + 'restarts': True + }, + "restart_freq": 2 + }, + ['output000', 'output001', 'output002', 'output003', 'output004', + 'restart000', 'restart002', 'restart004'] + ), + ]) +def test_get_archive_paths_to_sync(additional_config, expected_dirs_to_sync): + # Write config + test_config = copy.deepcopy(config) + test_config.update(additional_config) + write_config(test_config) + + with cd(ctrldir): + lab = payu.laboratory.Laboratory(lab_path=str(labdir)) + expt = payu.experiment.Experiment(lab, reproduce=False) + + # Function to test + src_paths = expt.get_archive_paths_to_sync() + + dirs = [] + for path in src_paths: + assert os.path.dirname(path) == str(expt_archive_dir) + dirs.append(os.path.basename(path)) + + assert dirs == expected_dirs_to_sync + + +@pytest.mark.parametrize( + "set_enviroment_var, expected_dirs_to_sync", + [ + ( + 'PAYU_SYNC_IGNORE_LAST', + ['output000', 'output001', 'output002', 'output003'] + ), + ( + 'PAYU_SYNC_RESTARTS', + ['output000', 'output001', 'output002', 'output003', 'output004', + 'restart000', 'restart001', 'restart002', 'restart003', + 'restart004'] + ), + ]) +def test_get_archive_paths_to_sync_environ_vars(set_enviroment_var, + expected_dirs_to_sync): + # Write config + write_config(config) + + os.environ[set_enviroment_var] = 'True' + + with cd(ctrldir): + lab = payu.laboratory.Laboratory(lab_path=str(labdir)) + expt = payu.experiment.Experiment(lab, reproduce=False) + + # Function to test + src_paths = expt.get_archive_paths_to_sync() + + dirs = [] + for path in src_paths: + assert os.path.dirname(path) == str(expt_archive_dir) + dirs.append(os.path.basename(path)) + + assert dirs == expected_dirs_to_sync + + # Tidy up test + del os.environ[set_enviroment_var] + + +def test_sync(): + # Add some logs + pbs_logs_path = os.path.join(expt_archive_dir, 'pbs_logs') + os.makedirs(pbs_logs_path) + log_filename = 'test_s.e1234' + test_log_content = 'Test log file content' + with open(os.path.join(pbs_logs_path, log_filename), 'w') as f: + f.write(test_log_content) + + # Add nested directories to output000 + nested_output_dirs = os.path.join('output000', 'submodel', 'test_sub-dir') + nested_output_path = os.path.join(expt_archive_dir, nested_output_dirs) + os.makedirs(nested_output_path) + + # Add empty uncollated file + uncollated_file = os.path.join(nested_output_dirs, 'test0.res.nc.0000') + with open(os.path.join(expt_archive_dir, uncollated_file), 'w'): + pass + + # Add empty collated file + collated_file = os.path.join(nested_output_dirs, 'test1.res.nc') + with open(os.path.join(expt_archive_dir, collated_file), 'w'): + pass + + # Remote archive path + remote_archive = tmpdir / 'remote' + + additional_config = { + "sync": { + "path": str(remote_archive) + } + } + + # Write config + test_config = copy.deepcopy(config) + test_config.update(additional_config) + write_config(test_config) + + with cd(ctrldir): + lab = payu.laboratory.Laboratory(lab_path=str(labdir)) + expt = payu.experiment.Experiment(lab, reproduce=False) + + # Function to test + expt.sync() + + expected_dirs_synced = ['output000', 'output001', 'output002', + 'output003', 'output004', 'pbs_logs'] + + # Test output is moved to remote dir + assert os.listdir(remote_archive) == expected_dirs_synced + + # Test inner log files are copied + remote_log_path = os.path.join(remote_archive, 'pbs_logs', log_filename) + assert os.path.exists(remote_log_path) + + with open(remote_log_path, 'r') as f: + assert test_log_content == f.read() + + # Check nested output dirs are synced + assert os.path.exists(os.path.join(remote_archive, nested_output_dirs)) + + # Check that uncollated files are not synced by default + assert not os.path.exists(os.path.join(remote_archive, uncollated_file)) + assert os.path.exists(os.path.join(remote_archive, collated_file)) From 9ac075a31bc13b00c1ef0a067151d208e9125b3f Mon Sep 17 00:00:00 2001 From: Jo Basevi Date: Mon, 16 Oct 2023 10:33:33 +1100 Subject: [PATCH 3/3] Extend sync command and refactor existing changes - Update storage path check to look for sync path in config.yaml - Add options for local delete of files/dirs after syncing - Add protected paths in get_archive_paths_to_sync. This is protect the last output, and last saved restart (needed for date-based restart pruning) from delete local options - remove_local_files config flag for removing local files once synced - remove_local_dirs config flag for removing local restart/output dirs onced synced. This will remove any empty dirs after rsync operation and any files that were excluded from rsync. - Add excludes options - Add single or list options to extra paths to sync and exclude - Add documention for sync configuration options and usage - Add runlog option to sync which defaults to True - Remove hyperthreading in sync command, and explicitly add a default walltime - Raise error when sync path is not defined - Remove sync ssh keys - Add flag for syncing uncollated files which defaults to True when collation is enabled. --- docs/source/config.rst | 79 +++++++++- docs/source/usage.rst | 13 ++ payu/experiment.py | 122 +--------------- payu/schedulers/pbs.py | 6 +- payu/subcommands/sync_cmd.py | 29 +--- payu/sync.py | 275 +++++++++++++++++++++++++++++++++++ test/test_sync.py | 260 ++++++++++++++++++++++++--------- 7 files changed, 567 insertions(+), 217 deletions(-) create mode 100644 payu/sync.py diff --git a/docs/source/config.rst b/docs/source/config.rst index 4f1483a3..a701fab5 100644 --- a/docs/source/config.rst +++ b/docs/source/config.rst @@ -211,7 +211,8 @@ configuration. able to parse restarts files for a datetime. ``restart_history`` - Specifies how many of the most recent restart files to retain regardless of `restart_freq` + Specifies how many of the most recent restart files to retain regardless of + ``restart_freq``. *The following model-based tags are typically not configured* @@ -382,12 +383,88 @@ Postprocessing ``error`` User-defined command to be called if model does not run correctly and returns an error code. Useful for automatic error postmortem. + + ``sync`` + User-defined command to be called at the start of the ``sync`` pbs job. + This is useful for any post-processing before syncing files to a remote + archive. ``postscript`` This is an older, less user-friendly, method to submit a script after ``payu collate`` has completed. Unlike the ``userscripts``, it does not support user commands. These scripts are always re-submitted via ``qsub``. +``sync`` + Sync archive to a remote directory using rsync. Make sure that the + configured path to sync output to, i.e. ``path``, is the correct location + before enabling automatic syncing or before running ``payu sync``. + + If postscript is also configured, the latest output and restart files will + not be automatically synced after a run. + + ``enable`` (*Default:* ``False``): + Controls whether or not a sync job is submitted either after the archive or + collation job, if collation is enabled. + + ``queue`` (*Default:* ``copyq``) + PBS queue used to submit the sync job. + + ``walltime`` (*Default:* ``10:00:00``) + Time required to run the job. + + ``mem`` (*Default:* ``2GB``) + Memory required for the job. + + ``ncpus`` (*Default:* ``1``) + Number of ncpus required for the job. + + ``path`` + Destination path to sync archive outputs to. This must be a unique + absolute path for your experiment, otherwise, outputs will be + overwritten. + + ``restarts`` (*Default:* ``False``) + Sync permanently archived restarts, which are determined by + ``restart_freq``. + + ``rsync_flags`` (*Default:* ``-vrltoD --safe-links``) + Additional flags to add to rsync commands used for syncing files. + + ``exclude`` + Patterns to exclude from rsync commands. This is equivalent to rsync's + ``--exclude PATTERN``. This can be a single pattern or a list of + patterns. If a pattern includes any special characters, + e.g. ``.*+?|[]{}()``, it will need to be quoted. For example:: + + exclude: + - 'iceh.????-??-??.nc' + - '*-IN-PROGRESS' + + ``exclude_uncollated`` (*Default:* ``True`` if collation is enabled) + Flag to exclude uncollated files from being synced. This is equivalent + to adding ``--exclude *.nc.*``. + + ``extra_paths`` + List of ``glob`` patterns which match extra paths to sync to remote + archive. This can be a single pattern or a list of patterns. + Note that these paths will be protected against any local delete options. + + ``remove_local_files`` (*Default:* ``False``) + Remove local files once they are successfully synced to the remote + archive. Files in protected paths will not be deleted. Protected paths + include the ``extra_paths`` (if defined), last output, the last saved + restart (determined by ``restart_freq``), and any subsequent restarts. + + ``remove_local_dirs`` (*Default:* ``False``) + Remove local directories once a directory has been successfully synced. + This will delete any files in local directories that were excluded from + syncing. Similarly to ``remove_local_files``, protected paths will not be + deleted. + + ``runlog`` (*Default:* ``True``) + Create or update a bare git repository clone of the run history, called + ``git-runlog``, in the remote archive directory. + Miscellaneous ============= diff --git a/docs/source/usage.rst b/docs/source/usage.rst index 086b4ca7..773bbf55 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -292,3 +292,16 @@ Alternatively you can directly specify a directory name:: This is useful when the data files have been moved out of the payu directory structure, or if you need to collate restart files, which is necessary when changing processor layout. + +To manually sync experiment output files to a remote archive, firstly ensure +that ``path`` in the ``sync`` namespace in ``config.yaml``, +is correctly configured as it may overwrite any pre-exisiting outputs. +Then run:: + + payu sync + +By default ``payu sync`` will not sync the latest restarts that may be pruned +at a later date. To sync all restarts including the latest restarts, use the +``--sync-restarts`` flag:: + + payu sync --sync-restarts diff --git a/payu/experiment.py b/payu/experiment.py index df11ac42..65d68bea 100644 --- a/payu/experiment.py +++ b/payu/experiment.py @@ -11,7 +11,6 @@ # Standard Library import datetime import errno -import getpass import os import re import resource @@ -33,6 +32,7 @@ from payu.runlog import Runlog from payu.manifest import Manifest from payu.calendar import parse_date_offset +from payu.sync import SyncToRemoteArchive # Environment module support on vayu # TODO: To be removed @@ -839,129 +839,15 @@ def postprocess(self): sp.check_call(shlex.split(cmd)) - def get_archive_paths_to_sync(self): - """Returns a list of dirs/files in archive to sync to remote archive""" - sync_config = self.config.get('sync', {}) - - # Get sorted lists of outputs and restarts in archive - outputs = self.list_output_dirs(output_type='output', full_path=True) - restarts = self.list_output_dirs(output_type='restart', full_path=True) - - # Ignore the latest output/restart if flagged - ignore_last_outputs = os.environ.get('PAYU_SYNC_IGNORE_LAST', False) - if ignore_last_outputs: - if len(outputs) > 0: - outputs.pop() - if len(restarts) > 0: - restarts.pop() - - # Add outputs to rsync paths - src_paths = outputs - - # Get auto-sync restart flag - syncing_restarts = sync_config.get('restarts', False) - syncing_all_restarts = os.environ.get('PAYU_SYNC_RESTARTS', False) - - # Add restarts to rsync paths - if syncing_all_restarts: - # Sync all restarts - src_paths.extend(restarts) - elif syncing_restarts: - # Only sync restarts that will be permanently archived - restarts_to_prune = self.get_restarts_to_prune( - ignore_intermediate_restarts=True) - for restart_path in restarts: - restart = os.path.basename(restart_path) - if restart not in restarts_to_prune: - src_paths.append(restart_path) - - # Add pbs and error logs to rsync - for log_type in ['error_logs', 'pbs_logs']: - log_path = os.path.join(self.archive_path, log_type) - if os.path.isdir(log_path): - src_paths.append(log_path) - - return src_paths - def sync(self): - """Sync archive to remote directory""" # RUN any user scripts before syncing archive + envmod.setup() pre_sync_script = self.userscripts.get('sync') if pre_sync_script: self.run_userscript(pre_sync_script) - sync_config = self.config.get('sync', {}) - - # Remote archive user - default_user = getpass.getuser() - remote_user = sync_config.get('user', default_user) - - # Remote archive url - remote_url = sync_config.get('url', None) - # Flag if syncing to remote machine - remote_syncing = remote_url is not None - - # Remote path to sync output to - dest_path = sync_config.get('path', None) - if not remote_syncing: - if dest_path is None: - # Automate destination path to: - # /g/data/{project}/{user}/{experiment_name}/archive - project = self.config.get('project', os.environ['PROJECT']) - dest_path = os.path.join('/', 'g', 'data', project, - remote_user, self.name, 'archive') - - # Create destination directory - mkdir_p(dest_path) - - # Build rsync commands - rsync_cmd = f'rsync -vrltoD --safe-links' - - # Add any additional rsync flags, e.g. more exclusions - additional_rsync_flags = sync_config.get('rsync_flags', None) - if additional_rsync_flags: - rsync_cmd += f' {additional_rsync_flags}' - - # Add exclusion for uncollated files - ignore_uncollated_files = sync_config.get('ignore_uncollated', True) - if ignore_uncollated_files: - rsync_cmd += ' --exclude *.nc.*' - - # Add rsync protocol, if defined - rsync_protocol = sync_config.get('rsync_protocol', None) - if rsync_protocol: - rsync_cmd += f' --protocol={rsync_protocol}' - - # Add remote host rsync options - if remote_syncing: - ssh_key_path = os.path.join(os.getenv('HOME'), '.ssh', - 'id_rsa_file_transfer') - rsync_cmd += f' -e "ssh -i {ssh_key_path}"' - # TODO: Below comments from previous remote_archive- Need to verify - # Top-level path is implicitly set by the SSH key - # (Usually /projects/[group]) - - # Remote mkdir is currently not possible, so any new subdirectories - # must be created before auto-archival - # TODO: If so, need to add instructions to create archive to docs - if dest_path is None: - # TODO: What should be the default path for remote archive - os.path.join(self.model_name, self.name) - dest_path = f'{remote_user}@{remote_url}:{dest_path}' - - # Get archive source paths to sync - src_paths = self.get_archive_paths_to_sync() - - # Run rsync commands - for src_path in src_paths: - run_cmd = f'{rsync_cmd} {src_path} {dest_path}' - cmd = shlex.split(run_cmd) - - rc = sp.Popen(cmd).wait() - if rc != 0: - raise sp.CalledProcessError( - 'payu: Error syncing archive to remote directory: ', - f'rsync failed after with command: {cmd}') + # Run rsync commmands + SyncToRemoteArchive(self).run() def resubmit(self): next_run = self.counter + 1 diff --git a/payu/schedulers/pbs.py b/payu/schedulers/pbs.py index 6a881a9d..931fc891 100644 --- a/payu/schedulers/pbs.py +++ b/payu/schedulers/pbs.py @@ -108,13 +108,11 @@ def submit(self, pbs_script, pbs_config, pbs_vars=None, python_exe=None): short_path = pbs_config.get('shortpath', None) if short_path is not None: extra_search_paths.append(short_path) - + module_use_paths = pbs_config.get('modules', {}).get('use', []) extra_search_paths.extend(module_use_paths) - remote_sync_directory = pbs_vars.get('PAYU_SYNC_PATH', None) - if remote_sync_directory is None: - remote_sync_directory = pbs_config.get('sync', {}).get('directory', None) + remote_sync_directory = pbs_config.get('sync', {}).get('path', None) if remote_sync_directory is not None: extra_search_paths.append(remote_sync_directory) storages.update(find_mounts(extra_search_paths, mounts)) diff --git a/payu/subcommands/sync_cmd.py b/payu/subcommands/sync_cmd.py index 9618717e..253b3da5 100644 --- a/payu/subcommands/sync_cmd.py +++ b/payu/subcommands/sync_cmd.py @@ -33,6 +33,7 @@ def runcmd(model_type, config_path, lab_path, dir_path, sync_restarts, default_ncpus = 1 default_queue = 'copyq' default_mem = '2GB' + default_walltime = '10:00:00' pbs_config['queue'] = sync_config.get('queue', default_queue) @@ -40,6 +41,8 @@ def runcmd(model_type, config_path, lab_path, dir_path, sync_restarts, pbs_config['mem'] = sync_config.get('mem', default_mem) + pbs_config['walltime'] = sync_config.get('walltime', default_walltime) + sync_jobname = sync_config.get('jobname') if not sync_jobname: pbs_jobname = pbs_config.get('jobname') @@ -53,31 +56,7 @@ def runcmd(model_type, config_path, lab_path, dir_path, sync_restarts, pbs_config['jobname'] = sync_jobname[:15] - # Replace (or remove) walltime - walltime = sync_config.get('walltime') - if walltime: - pbs_config['walltime'] = walltime - else: - # Remove walltime if set - try: - pbs_config.pop('walltime') - except KeyError: - pass - - # Disable hyperthreading - qsub_flags = [] - iflags = iter(pbs_config.get('qsub_flags', '').split()) - for flag in iflags: - if flag == '-l': - try: - flag += ' ' + next(iflags) - except StopIteration: - break - - if 'hyperthread' not in flag: - qsub_flags.append(flag) - - pbs_config['qsub_flags'] = ' '.join(qsub_flags) + pbs_config['qsub_flags'] = sync_config.get('qsub_flags', '') cli.submit_job('payu-sync', pbs_config, pbs_vars) diff --git a/payu/sync.py b/payu/sync.py new file mode 100644 index 00000000..88026074 --- /dev/null +++ b/payu/sync.py @@ -0,0 +1,275 @@ +"""Experiment post-processing - syncing archive to a remote directory + +:copyright: Copyright 2011 Marshall Ward, see AUTHORS for details. +:license: Apache License, Version 2.0, see LICENSE for details. +""" + +# Standard +import getpass +import glob +import os +import shutil +import subprocess + + +# Local +from payu.fsops import mkdir_p + + +class SourcePath(): + """Helper class for building rsync commands - stores attributes + of source paths to sync. + Note: Protected paths are paths that shouldn't be removed + locally if still running an experiment - i.e last output or last + permanently archived and subsequent restarts + """ + def __init__(self, path, protected=False, is_log_file=False): + self.protected = protected + self.path = path + self.is_log_file = is_log_file + + +class SyncToRemoteArchive(): + """Class used for archiving experiment outputs to a remote directory""" + + def __init__(self, expt): + self.expt = expt + self.config = self.expt.config.get('sync', {}) + + # Ignore the latest output/restart if flagged + self.ignore_last = os.environ.get('PAYU_SYNC_IGNORE_LAST', False) + + # Use configured url to flag syncing to remote machine + self.remote_url = self.config.get('url', None) + self.remote_syncing = self.remote_url is not None + + self.source_paths = [] + + def add_outputs_to_sync(self): + """Add paths of outputs in archive to sync. The last output is + protected""" + outputs = self.expt.list_output_dirs(output_type='output', + full_path=True) + if len(outputs) > 0: + last_output = outputs.pop() + if not self.ignore_last: + # Protect the last output + self.source_paths.append(SourcePath(path=last_output, + protected=True)) + self.source_paths.extend([SourcePath(path) for path in outputs]) + + def add_restarts_to_sync(self): + """Add paths and protected paths of restarts in archive to sync. + Last permanently-archived restart and subsequent restarts are + protected (as local date-based restart pruning uses the last-saved + restart as a checkpoint for a datetime)""" + syncing_restarts = self.config.get('restarts', False) + syncing_all_restarts = os.environ.get('PAYU_SYNC_RESTARTS', False) + if not (syncing_all_restarts or syncing_restarts): + return + + # Get sorted list of restarts in archive + restarts = self.expt.list_output_dirs(output_type='restart', + full_path=True) + if restarts == []: + return + + # Find all restarts that will be 'permanently archived' + pruned_restarts = self.expt.get_restarts_to_prune( + ignore_intermediate_restarts=True) + saved_restarts = [ + restart for restart in restarts + if os.path.basename(restart) not in pruned_restarts + ] + + # Sync only permanently saved restarts unless flagged to sync all + to_sync = saved_restarts if not syncing_all_restarts else restarts + + # Protect last saved restart and any intermediate restarts + if to_sync and saved_restarts: + last_saved_index = to_sync.index(saved_restarts[-1]) + paths = to_sync[:last_saved_index] + protected_paths = to_sync[last_saved_index:] + else: + protected_paths, paths = to_sync, [] + + if self.ignore_last: + # Remove the last restart from sync paths + if protected_paths and protected_paths[-1] == restarts[-1]: + protected_paths.pop() + + # Add to sync source paths + self.source_paths.extend([SourcePath(path=path, protected=True) + for path in protected_paths]) + self.source_paths.extend([SourcePath(path) for path in paths]) + + def add_extra_source_paths(self): + """Add additional paths to sync to remote archive""" + paths = self.config.get('extra_paths', []) + if isinstance(paths, str): + paths = [paths] + + for path in paths: + matching_paths = glob.glob(path) + # First check if any matching paths exists + if matching_paths: + # Add extra paths to protected paths - so they can't be deleted + self.source_paths.append(SourcePath(path=path, protected=True)) + else: + print(f"payu: error: No paths matching {path} found. " + "Failed to sync path to remote archive") + + def set_destination_path(self): + "set or create destination path to sync archive to" + # Remote path to sync output to + dest_path = self.config.get('path', None) + if dest_path is None: + print("There's is no configured path to sync output to. " + "In config.yaml, set:\n" + " sync:\n path: PATH/TO/REMOTE/ARCHIVE\n" + "Replace PATH/TO/REMOTE/ARCHIVE with a unique absolute path " + "to sync outputs to. Ensure path is unique to avoid " + "overwriting exsiting output!") + raise ValueError("payu: error: Sync path is not defined.") + + if not self.remote_syncing: + # Create local destination directory if it does not exist + mkdir_p(dest_path) + else: + # Syncing to remote machine + remote_user = self.config.get('user', None) + if remote_user is not None: + dest_path = f'{remote_user}@{self.remote_url}:{dest_path}' + else: + dest_path = f'{self.remote_url}:{dest_path}' + + self.destination_path = dest_path + + def set_excludes_flags(self): + """Add lists of patterns of filepaths to exclude from sync commands""" + # Get any excludes + exclude = self.config.get('exclude', []) + if isinstance(exclude, str): + exclude = [exclude] + + excludes = ' '.join(['--exclude ' + pattern for pattern in exclude]) + + # Default to exclude uncollated files if collation is enabled + # This can be over-riden using exclude_uncollated config flag + exclude_uncollated = self.config.get('exclude_uncollated', None) + + if exclude_uncollated is None: + collate_config = self.expt.config.get('collate', {}) + collating = collate_config.get('enable', True) + if collating: + exclude_uncollated = True + + exclude_flag = "--exclude *.nc.*" + if (exclude_uncollated and exclude_flag not in excludes + and exclude_flag not in self.config.get('rsync_flags', [])): + excludes += " --exclude *.nc.*" + + self.excludes = excludes + + def build_cmd(self, source_path): + """Given a source path to sync, return a rsync command""" + if source_path.protected: + # No local delete option for protected paths + cmd = f'{self.base_rsync_cmd} {self.excludes} ' + elif source_path.is_log_file: + cmd = f'{self.base_rsync_cmd} {self.remove_files} ' + else: + cmd = f'{self.base_rsync_cmd} {self.excludes} {self.remove_files} ' + + cmd += f'{source_path.path} {self.destination_path}' + return cmd + + def run_cmd(self, source_path): + """Given an source path, build and run rsync command""" + cmd = self.build_cmd(source_path) + print(cmd) + try: + subprocess.check_call(cmd, shell=True) + except subprocess.CalledProcessError as e: + print('payu: Error rsyncing archive to remote directory: ' + f'Failed running command: {cmd}.') + # TODO: Raise or return? + return + + if not source_path.protected and self.remove_local_dirs: + # Only delete real directories; ignore symbolic links + path = source_path.path + if os.path.isdir(path) and not os.path.islink(path): + print(f"Removing {path} from local archive") + shutil.rmtree(path) + + def git_runlog(self): + """Add git runlog to remote archive""" + add_git_runlog = self.config.get("runlog", True) + + if add_git_runlog: + # Currently runlog is only set up for local remote archive + if self.remote_syncing: + print("payu: error: Syncing the git runlog is not implemented " + "for syncing to a remote machine") + return + + control_path = self.expt.control_path + runlog_path = os.path.join(self.destination_path, "git-runlog") + if not os.path.exists(runlog_path): + # Create a bare repository, if it doesn't exist + try: + print("Creating git-runlog bare repository clone" + f" at {runlog_path}") + cmd = f"git clone --bare {control_path} {runlog_path}" + subprocess.check_call(cmd, shell=True) + except subprocess.CalledProcessError as e: + print("payu: error: Failed to create a bare repository. ", + f"Error: {e}") + return + else: + # Update bare gitlog repo + try: + print(f"Updating git-runlog at {runlog_path}") + cmd = f"git push {runlog_path}" + subprocess.check_call(cmd, shell=True, cwd=control_path) + except subprocess.CalledProcessError as e: + print("payu: error: Failed to push git runlog to bare " + f"repository. Error: {e}") + + def run(self): + """Build and run rsync cmds to remote remote archive """ + # Add outputs and restarts to source paths to sync + self.add_outputs_to_sync() + self.add_restarts_to_sync() + + # Add pbs and error logs to paths + for log_type in ['error_logs', 'pbs_logs']: + log_path = os.path.join(self.expt.archive_path, log_type) + if os.path.isdir(log_path): + self.source_paths.append(SourcePath(path=log_path, + is_log_file=True)) + + # Add any additional paths to protected paths + self.add_extra_source_paths() + + # Set rsync command components + self.set_destination_path() + self.set_excludes_flags() + + # Set base rsync command + default_flags = '-vrltoD --safe-links' + rsync_flags = self.config.get('rsync_flags', default_flags) + self.base_rsync_cmd = f'rsync {rsync_flags}' + + # Set remove local files/dirs options + remove_files = self.config.get('remove_local_files', False) + self.remove_files = '--remove-source-files' if remove_files else '' + self.remove_local_dirs = self.config.get('remove_local_dirs', False) + + # Build and run all rsync commands + for source_path in self.source_paths: + self.run_cmd(source_path) + + # Add git runlog to remote archive + self.git_runlog() diff --git a/test/test_sync.py b/test/test_sync.py index 320c7f08..e1d27a31 100644 --- a/test/test_sync.py +++ b/test/test_sync.py @@ -10,7 +10,7 @@ from test.common import tmpdir, ctrldir, labdir, expt_archive_dir from test.common import config as config_orig from test.common import write_config -from test.common import make_all_files +from test.common import make_all_files, make_random_file from test.common import make_expt_archive_dir verbose = True @@ -43,7 +43,8 @@ def setup_module(module): # Create 5 restarts and outputs for dir_type in ['restart', 'output']: for i in range(5): - make_expt_archive_dir(type=dir_type, index=i) + path = make_expt_archive_dir(type=dir_type, index=i) + make_random_file(os.path.join(path, f'test-{dir_type}00{i}-file')) def teardown_module(module): @@ -60,22 +61,87 @@ def teardown_module(module): print(e) +def setup_sync(additional_config, add_envt_vars=None): + """Given additional configuration and envt_vars, return initialised + class used to build/run rsync commands""" + # Set experiment config + test_config = copy.deepcopy(config) + test_config.update(additional_config) + write_config(test_config) + + # Set up Experiment + with cd(ctrldir): + lab = payu.laboratory.Laboratory(lab_path=str(labdir)) + experiment = payu.experiment.Experiment(lab, reproduce=False) + + # Set enviroment vars + if add_envt_vars is not None: + for var, value in add_envt_vars.items(): + os.environ[var] = value + + return payu.sync.SyncToRemoteArchive(experiment) + + +def assert_expected_archive_paths(source_paths, + expected_dirs, + expected_protected_dirs): + """Check given source archive source paths that it includes + the expected dirs to sync""" + dirs, protected_dirs = [], [] + for source_path in source_paths: + path = source_path.path + assert os.path.dirname(path) == str(expt_archive_dir) + + dir = os.path.basename(path) + if source_path.protected: + protected_dirs.append(dir) + else: + dirs.append(dir) + + assert dirs == expected_dirs + assert protected_dirs == expected_protected_dirs + + @pytest.mark.parametrize( - "additional_config, expected_dirs_to_sync", + "envt_vars, expected_outputs, expected_protected_outputs", [ ( {}, - ['output000', 'output001', 'output002', 'output003', 'output004'] + ['output000', 'output001', 'output002', 'output003'], ['output004'] ), + ( + {'PAYU_SYNC_IGNORE_LAST': 'True'}, + ['output000', 'output001', 'output002', 'output003'], [] + ), + ]) +def test_add_outputs_to_sync(envt_vars, expected_outputs, + expected_protected_outputs): + sync = setup_sync(additional_config={}, add_envt_vars=envt_vars) + + # Test function + sync.add_outputs_to_sync() + + # Assert expected outputs and protected outputs are added + assert_expected_archive_paths(sync.source_paths, + expected_outputs, + expected_protected_outputs) + + # Tidy up test - Remove any added enviroment variables + for envt_var in envt_vars.keys(): + del os.environ[envt_var] + + +@pytest.mark.parametrize( + "add_config, envt_vars, expected_restarts, expected_protected_restarts", + [ ( { "sync": { 'restarts': True }, "restart_freq": 5 - }, - ['output000', 'output001', 'output002', 'output003', 'output004', - 'restart000'] + }, {}, + [], ['restart000'] ), ( { @@ -83,69 +149,101 @@ def teardown_module(module): 'restarts': True }, "restart_freq": 2 - }, - ['output000', 'output001', 'output002', 'output003', 'output004', - 'restart000', 'restart002', 'restart004'] + }, {}, + ['restart000', 'restart002'], ['restart004'] + ), + ( + { + "sync": { + "restarts": True + }, + "restart_freq": 2 + }, {'PAYU_SYNC_IGNORE_LAST': 'True'}, + ['restart000', 'restart002'], [] + ), + ( + {"restart_freq": 3}, {'PAYU_SYNC_RESTARTS': 'True'}, + ['restart000', 'restart001', 'restart002'], + ['restart003', 'restart004'] ), ]) -def test_get_archive_paths_to_sync(additional_config, expected_dirs_to_sync): - # Write config - test_config = copy.deepcopy(config) - test_config.update(additional_config) - write_config(test_config) +def test_restarts_to_sync(add_config, envt_vars, + expected_restarts, expected_protected_restarts): + sync = setup_sync(add_config, envt_vars) - with cd(ctrldir): - lab = payu.laboratory.Laboratory(lab_path=str(labdir)) - expt = payu.experiment.Experiment(lab, reproduce=False) + # Test function + sync.add_restarts_to_sync() - # Function to test - src_paths = expt.get_archive_paths_to_sync() + # Assert expected restarts and protected restarts are added + assert_expected_archive_paths(sync.source_paths, + expected_restarts, + expected_protected_restarts) - dirs = [] - for path in src_paths: - assert os.path.dirname(path) == str(expt_archive_dir) - dirs.append(os.path.basename(path)) + # Tidy up test - Remove any added enviroment variables + for envt_var in envt_vars.keys(): + del os.environ[envt_var] + + +def test_set_destination_path(): + additional_config = { + "sync": { + "url": "test.domain", + "user": "test-usr", + "path": "remote/path", + }} + sync = setup_sync(additional_config=additional_config) + + # Test destination_path + sync.set_destination_path() + assert sync.destination_path == "test-usr@test.domain:remote/path" - assert dirs == expected_dirs_to_sync + # Test value error raised when path is not set + sync = setup_sync(additional_config={}) + with pytest.raises(ValueError): + sync.set_destination_path() @pytest.mark.parametrize( - "set_enviroment_var, expected_dirs_to_sync", + "add_config, expected_excludes", [ ( - 'PAYU_SYNC_IGNORE_LAST', - ['output000', 'output001', 'output002', 'output003'] + { + "sync": { + "exclude": ["iceh.????-??-??.nc", "*-DEPRECATED"] + }, + "collate": { + "enable": True + } + }, ("--exclude iceh.????-??-??.nc --exclude *-DEPRECATED" + " --exclude *.nc.*") ), ( - 'PAYU_SYNC_RESTARTS', - ['output000', 'output001', 'output002', 'output003', 'output004', - 'restart000', 'restart001', 'restart002', 'restart003', - 'restart004'] + { + "sync": { + "exclude_uncollated": False + }, + "collate": { + "enable": True + } + }, "" ), + ( + { + "sync": { + "exclude": "*-DEPRECATED" + }, + "collate": { + "enable": False + } + }, "--exclude *-DEPRECATED" + ) ]) -def test_get_archive_paths_to_sync_environ_vars(set_enviroment_var, - expected_dirs_to_sync): - # Write config - write_config(config) - - os.environ[set_enviroment_var] = 'True' - - with cd(ctrldir): - lab = payu.laboratory.Laboratory(lab_path=str(labdir)) - expt = payu.experiment.Experiment(lab, reproduce=False) - - # Function to test - src_paths = expt.get_archive_paths_to_sync() - - dirs = [] - for path in src_paths: - assert os.path.dirname(path) == str(expt_archive_dir) - dirs.append(os.path.basename(path)) - - assert dirs == expected_dirs_to_sync +def test_set_excludes_flags(add_config, expected_excludes): + sync = setup_sync(additional_config=add_config) - # Tidy up test - del os.environ[set_enviroment_var] + # Test setting excludes + sync.set_excludes_flags() + assert sync.excludes == expected_excludes def test_sync(): @@ -177,27 +275,20 @@ def test_sync(): additional_config = { "sync": { - "path": str(remote_archive) + "path": str(remote_archive), + "runlog": False } } + sync = setup_sync(additional_config) - # Write config - test_config = copy.deepcopy(config) - test_config.update(additional_config) - write_config(test_config) - - with cd(ctrldir): - lab = payu.laboratory.Laboratory(lab_path=str(labdir)) - expt = payu.experiment.Experiment(lab, reproduce=False) + # Function to test + sync.run() - # Function to test - expt.sync() - - expected_dirs_synced = ['output000', 'output001', 'output002', - 'output003', 'output004', 'pbs_logs'] + expected_dirs_synced = {'output000', 'output001', 'output002', + 'output003', 'output004', 'pbs_logs'} # Test output is moved to remote dir - assert os.listdir(remote_archive) == expected_dirs_synced + assert set(os.listdir(remote_archive)) == expected_dirs_synced # Test inner log files are copied remote_log_path = os.path.join(remote_archive, 'pbs_logs', log_filename) @@ -212,3 +303,34 @@ def test_sync(): # Check that uncollated files are not synced by default assert not os.path.exists(os.path.join(remote_archive, uncollated_file)) assert os.path.exists(os.path.join(remote_archive, collated_file)) + + # Check synced file still exist locally + local_archive_dirs = os.listdir(expt_archive_dir) + for dir in expected_dirs_synced: + assert dir in local_archive_dirs + + # Test sync with remove synced files locally flag + additional_config['sync']['remove_local_files'] = True + sync = setup_sync(additional_config) + sync.run() + + # Check synced files are removed from local archive + # Except for the protected paths (last output in this case) + for output in ['output000', 'output001', 'output002', 'output003']: + file_path = os.path.join(expt_archive_dir, dir, f'test-{output}-file') + assert not os.path.exists(file_path) + + last_output_path = os.path.join(expt_archive_dir, 'output004') + last_output_file = os.path.join(last_output_path, f'test-output004-file') + assert os.path.exists(last_output_file) + + # Test sync with remove synced dirs flag as well + additional_config['sync']['remove_local_dirs'] = True + sync = setup_sync(additional_config) + sync.run() + + # Assert synced output dirs removed (except for the last output) + local_archive_dirs = os.listdir(expt_archive_dir) + for output in ['output000', 'output001', 'output002', 'output003']: + assert output not in local_archive_dirs + assert 'output004' in local_archive_dirs