From 9ac075a31bc13b00c1ef0a067151d208e9125b3f Mon Sep 17 00:00:00 2001 From: Jo Basevi Date: Mon, 16 Oct 2023 10:33:33 +1100 Subject: [PATCH] Extend sync command and refactor existing changes - Update storage path check to look for sync path in config.yaml - Add options for local delete of files/dirs after syncing - Add protected paths in get_archive_paths_to_sync. This is protect the last output, and last saved restart (needed for date-based restart pruning) from delete local options - remove_local_files config flag for removing local files once synced - remove_local_dirs config flag for removing local restart/output dirs onced synced. This will remove any empty dirs after rsync operation and any files that were excluded from rsync. - Add excludes options - Add single or list options to extra paths to sync and exclude - Add documention for sync configuration options and usage - Add runlog option to sync which defaults to True - Remove hyperthreading in sync command, and explicitly add a default walltime - Raise error when sync path is not defined - Remove sync ssh keys - Add flag for syncing uncollated files which defaults to True when collation is enabled. --- docs/source/config.rst | 79 +++++++++- docs/source/usage.rst | 13 ++ payu/experiment.py | 122 +--------------- payu/schedulers/pbs.py | 6 +- payu/subcommands/sync_cmd.py | 29 +--- payu/sync.py | 275 +++++++++++++++++++++++++++++++++++ test/test_sync.py | 260 ++++++++++++++++++++++++--------- 7 files changed, 567 insertions(+), 217 deletions(-) create mode 100644 payu/sync.py diff --git a/docs/source/config.rst b/docs/source/config.rst index 4f1483a3..a701fab5 100644 --- a/docs/source/config.rst +++ b/docs/source/config.rst @@ -211,7 +211,8 @@ configuration. able to parse restarts files for a datetime. ``restart_history`` - Specifies how many of the most recent restart files to retain regardless of `restart_freq` + Specifies how many of the most recent restart files to retain regardless of + ``restart_freq``. *The following model-based tags are typically not configured* @@ -382,12 +383,88 @@ Postprocessing ``error`` User-defined command to be called if model does not run correctly and returns an error code. Useful for automatic error postmortem. + + ``sync`` + User-defined command to be called at the start of the ``sync`` pbs job. + This is useful for any post-processing before syncing files to a remote + archive. ``postscript`` This is an older, less user-friendly, method to submit a script after ``payu collate`` has completed. Unlike the ``userscripts``, it does not support user commands. These scripts are always re-submitted via ``qsub``. +``sync`` + Sync archive to a remote directory using rsync. Make sure that the + configured path to sync output to, i.e. ``path``, is the correct location + before enabling automatic syncing or before running ``payu sync``. + + If postscript is also configured, the latest output and restart files will + not be automatically synced after a run. + + ``enable`` (*Default:* ``False``): + Controls whether or not a sync job is submitted either after the archive or + collation job, if collation is enabled. + + ``queue`` (*Default:* ``copyq``) + PBS queue used to submit the sync job. + + ``walltime`` (*Default:* ``10:00:00``) + Time required to run the job. + + ``mem`` (*Default:* ``2GB``) + Memory required for the job. + + ``ncpus`` (*Default:* ``1``) + Number of ncpus required for the job. + + ``path`` + Destination path to sync archive outputs to. This must be a unique + absolute path for your experiment, otherwise, outputs will be + overwritten. + + ``restarts`` (*Default:* ``False``) + Sync permanently archived restarts, which are determined by + ``restart_freq``. + + ``rsync_flags`` (*Default:* ``-vrltoD --safe-links``) + Additional flags to add to rsync commands used for syncing files. + + ``exclude`` + Patterns to exclude from rsync commands. This is equivalent to rsync's + ``--exclude PATTERN``. This can be a single pattern or a list of + patterns. If a pattern includes any special characters, + e.g. ``.*+?|[]{}()``, it will need to be quoted. For example:: + + exclude: + - 'iceh.????-??-??.nc' + - '*-IN-PROGRESS' + + ``exclude_uncollated`` (*Default:* ``True`` if collation is enabled) + Flag to exclude uncollated files from being synced. This is equivalent + to adding ``--exclude *.nc.*``. + + ``extra_paths`` + List of ``glob`` patterns which match extra paths to sync to remote + archive. This can be a single pattern or a list of patterns. + Note that these paths will be protected against any local delete options. + + ``remove_local_files`` (*Default:* ``False``) + Remove local files once they are successfully synced to the remote + archive. Files in protected paths will not be deleted. Protected paths + include the ``extra_paths`` (if defined), last output, the last saved + restart (determined by ``restart_freq``), and any subsequent restarts. + + ``remove_local_dirs`` (*Default:* ``False``) + Remove local directories once a directory has been successfully synced. + This will delete any files in local directories that were excluded from + syncing. Similarly to ``remove_local_files``, protected paths will not be + deleted. + + ``runlog`` (*Default:* ``True``) + Create or update a bare git repository clone of the run history, called + ``git-runlog``, in the remote archive directory. + Miscellaneous ============= diff --git a/docs/source/usage.rst b/docs/source/usage.rst index 086b4ca7..773bbf55 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -292,3 +292,16 @@ Alternatively you can directly specify a directory name:: This is useful when the data files have been moved out of the payu directory structure, or if you need to collate restart files, which is necessary when changing processor layout. + +To manually sync experiment output files to a remote archive, firstly ensure +that ``path`` in the ``sync`` namespace in ``config.yaml``, +is correctly configured as it may overwrite any pre-exisiting outputs. +Then run:: + + payu sync + +By default ``payu sync`` will not sync the latest restarts that may be pruned +at a later date. To sync all restarts including the latest restarts, use the +``--sync-restarts`` flag:: + + payu sync --sync-restarts diff --git a/payu/experiment.py b/payu/experiment.py index df11ac42..65d68bea 100644 --- a/payu/experiment.py +++ b/payu/experiment.py @@ -11,7 +11,6 @@ # Standard Library import datetime import errno -import getpass import os import re import resource @@ -33,6 +32,7 @@ from payu.runlog import Runlog from payu.manifest import Manifest from payu.calendar import parse_date_offset +from payu.sync import SyncToRemoteArchive # Environment module support on vayu # TODO: To be removed @@ -839,129 +839,15 @@ def postprocess(self): sp.check_call(shlex.split(cmd)) - def get_archive_paths_to_sync(self): - """Returns a list of dirs/files in archive to sync to remote archive""" - sync_config = self.config.get('sync', {}) - - # Get sorted lists of outputs and restarts in archive - outputs = self.list_output_dirs(output_type='output', full_path=True) - restarts = self.list_output_dirs(output_type='restart', full_path=True) - - # Ignore the latest output/restart if flagged - ignore_last_outputs = os.environ.get('PAYU_SYNC_IGNORE_LAST', False) - if ignore_last_outputs: - if len(outputs) > 0: - outputs.pop() - if len(restarts) > 0: - restarts.pop() - - # Add outputs to rsync paths - src_paths = outputs - - # Get auto-sync restart flag - syncing_restarts = sync_config.get('restarts', False) - syncing_all_restarts = os.environ.get('PAYU_SYNC_RESTARTS', False) - - # Add restarts to rsync paths - if syncing_all_restarts: - # Sync all restarts - src_paths.extend(restarts) - elif syncing_restarts: - # Only sync restarts that will be permanently archived - restarts_to_prune = self.get_restarts_to_prune( - ignore_intermediate_restarts=True) - for restart_path in restarts: - restart = os.path.basename(restart_path) - if restart not in restarts_to_prune: - src_paths.append(restart_path) - - # Add pbs and error logs to rsync - for log_type in ['error_logs', 'pbs_logs']: - log_path = os.path.join(self.archive_path, log_type) - if os.path.isdir(log_path): - src_paths.append(log_path) - - return src_paths - def sync(self): - """Sync archive to remote directory""" # RUN any user scripts before syncing archive + envmod.setup() pre_sync_script = self.userscripts.get('sync') if pre_sync_script: self.run_userscript(pre_sync_script) - sync_config = self.config.get('sync', {}) - - # Remote archive user - default_user = getpass.getuser() - remote_user = sync_config.get('user', default_user) - - # Remote archive url - remote_url = sync_config.get('url', None) - # Flag if syncing to remote machine - remote_syncing = remote_url is not None - - # Remote path to sync output to - dest_path = sync_config.get('path', None) - if not remote_syncing: - if dest_path is None: - # Automate destination path to: - # /g/data/{project}/{user}/{experiment_name}/archive - project = self.config.get('project', os.environ['PROJECT']) - dest_path = os.path.join('/', 'g', 'data', project, - remote_user, self.name, 'archive') - - # Create destination directory - mkdir_p(dest_path) - - # Build rsync commands - rsync_cmd = f'rsync -vrltoD --safe-links' - - # Add any additional rsync flags, e.g. more exclusions - additional_rsync_flags = sync_config.get('rsync_flags', None) - if additional_rsync_flags: - rsync_cmd += f' {additional_rsync_flags}' - - # Add exclusion for uncollated files - ignore_uncollated_files = sync_config.get('ignore_uncollated', True) - if ignore_uncollated_files: - rsync_cmd += ' --exclude *.nc.*' - - # Add rsync protocol, if defined - rsync_protocol = sync_config.get('rsync_protocol', None) - if rsync_protocol: - rsync_cmd += f' --protocol={rsync_protocol}' - - # Add remote host rsync options - if remote_syncing: - ssh_key_path = os.path.join(os.getenv('HOME'), '.ssh', - 'id_rsa_file_transfer') - rsync_cmd += f' -e "ssh -i {ssh_key_path}"' - # TODO: Below comments from previous remote_archive- Need to verify - # Top-level path is implicitly set by the SSH key - # (Usually /projects/[group]) - - # Remote mkdir is currently not possible, so any new subdirectories - # must be created before auto-archival - # TODO: If so, need to add instructions to create archive to docs - if dest_path is None: - # TODO: What should be the default path for remote archive - os.path.join(self.model_name, self.name) - dest_path = f'{remote_user}@{remote_url}:{dest_path}' - - # Get archive source paths to sync - src_paths = self.get_archive_paths_to_sync() - - # Run rsync commands - for src_path in src_paths: - run_cmd = f'{rsync_cmd} {src_path} {dest_path}' - cmd = shlex.split(run_cmd) - - rc = sp.Popen(cmd).wait() - if rc != 0: - raise sp.CalledProcessError( - 'payu: Error syncing archive to remote directory: ', - f'rsync failed after with command: {cmd}') + # Run rsync commmands + SyncToRemoteArchive(self).run() def resubmit(self): next_run = self.counter + 1 diff --git a/payu/schedulers/pbs.py b/payu/schedulers/pbs.py index 6a881a9d..931fc891 100644 --- a/payu/schedulers/pbs.py +++ b/payu/schedulers/pbs.py @@ -108,13 +108,11 @@ def submit(self, pbs_script, pbs_config, pbs_vars=None, python_exe=None): short_path = pbs_config.get('shortpath', None) if short_path is not None: extra_search_paths.append(short_path) - + module_use_paths = pbs_config.get('modules', {}).get('use', []) extra_search_paths.extend(module_use_paths) - remote_sync_directory = pbs_vars.get('PAYU_SYNC_PATH', None) - if remote_sync_directory is None: - remote_sync_directory = pbs_config.get('sync', {}).get('directory', None) + remote_sync_directory = pbs_config.get('sync', {}).get('path', None) if remote_sync_directory is not None: extra_search_paths.append(remote_sync_directory) storages.update(find_mounts(extra_search_paths, mounts)) diff --git a/payu/subcommands/sync_cmd.py b/payu/subcommands/sync_cmd.py index 9618717e..253b3da5 100644 --- a/payu/subcommands/sync_cmd.py +++ b/payu/subcommands/sync_cmd.py @@ -33,6 +33,7 @@ def runcmd(model_type, config_path, lab_path, dir_path, sync_restarts, default_ncpus = 1 default_queue = 'copyq' default_mem = '2GB' + default_walltime = '10:00:00' pbs_config['queue'] = sync_config.get('queue', default_queue) @@ -40,6 +41,8 @@ def runcmd(model_type, config_path, lab_path, dir_path, sync_restarts, pbs_config['mem'] = sync_config.get('mem', default_mem) + pbs_config['walltime'] = sync_config.get('walltime', default_walltime) + sync_jobname = sync_config.get('jobname') if not sync_jobname: pbs_jobname = pbs_config.get('jobname') @@ -53,31 +56,7 @@ def runcmd(model_type, config_path, lab_path, dir_path, sync_restarts, pbs_config['jobname'] = sync_jobname[:15] - # Replace (or remove) walltime - walltime = sync_config.get('walltime') - if walltime: - pbs_config['walltime'] = walltime - else: - # Remove walltime if set - try: - pbs_config.pop('walltime') - except KeyError: - pass - - # Disable hyperthreading - qsub_flags = [] - iflags = iter(pbs_config.get('qsub_flags', '').split()) - for flag in iflags: - if flag == '-l': - try: - flag += ' ' + next(iflags) - except StopIteration: - break - - if 'hyperthread' not in flag: - qsub_flags.append(flag) - - pbs_config['qsub_flags'] = ' '.join(qsub_flags) + pbs_config['qsub_flags'] = sync_config.get('qsub_flags', '') cli.submit_job('payu-sync', pbs_config, pbs_vars) diff --git a/payu/sync.py b/payu/sync.py new file mode 100644 index 00000000..88026074 --- /dev/null +++ b/payu/sync.py @@ -0,0 +1,275 @@ +"""Experiment post-processing - syncing archive to a remote directory + +:copyright: Copyright 2011 Marshall Ward, see AUTHORS for details. +:license: Apache License, Version 2.0, see LICENSE for details. +""" + +# Standard +import getpass +import glob +import os +import shutil +import subprocess + + +# Local +from payu.fsops import mkdir_p + + +class SourcePath(): + """Helper class for building rsync commands - stores attributes + of source paths to sync. + Note: Protected paths are paths that shouldn't be removed + locally if still running an experiment - i.e last output or last + permanently archived and subsequent restarts + """ + def __init__(self, path, protected=False, is_log_file=False): + self.protected = protected + self.path = path + self.is_log_file = is_log_file + + +class SyncToRemoteArchive(): + """Class used for archiving experiment outputs to a remote directory""" + + def __init__(self, expt): + self.expt = expt + self.config = self.expt.config.get('sync', {}) + + # Ignore the latest output/restart if flagged + self.ignore_last = os.environ.get('PAYU_SYNC_IGNORE_LAST', False) + + # Use configured url to flag syncing to remote machine + self.remote_url = self.config.get('url', None) + self.remote_syncing = self.remote_url is not None + + self.source_paths = [] + + def add_outputs_to_sync(self): + """Add paths of outputs in archive to sync. The last output is + protected""" + outputs = self.expt.list_output_dirs(output_type='output', + full_path=True) + if len(outputs) > 0: + last_output = outputs.pop() + if not self.ignore_last: + # Protect the last output + self.source_paths.append(SourcePath(path=last_output, + protected=True)) + self.source_paths.extend([SourcePath(path) for path in outputs]) + + def add_restarts_to_sync(self): + """Add paths and protected paths of restarts in archive to sync. + Last permanently-archived restart and subsequent restarts are + protected (as local date-based restart pruning uses the last-saved + restart as a checkpoint for a datetime)""" + syncing_restarts = self.config.get('restarts', False) + syncing_all_restarts = os.environ.get('PAYU_SYNC_RESTARTS', False) + if not (syncing_all_restarts or syncing_restarts): + return + + # Get sorted list of restarts in archive + restarts = self.expt.list_output_dirs(output_type='restart', + full_path=True) + if restarts == []: + return + + # Find all restarts that will be 'permanently archived' + pruned_restarts = self.expt.get_restarts_to_prune( + ignore_intermediate_restarts=True) + saved_restarts = [ + restart for restart in restarts + if os.path.basename(restart) not in pruned_restarts + ] + + # Sync only permanently saved restarts unless flagged to sync all + to_sync = saved_restarts if not syncing_all_restarts else restarts + + # Protect last saved restart and any intermediate restarts + if to_sync and saved_restarts: + last_saved_index = to_sync.index(saved_restarts[-1]) + paths = to_sync[:last_saved_index] + protected_paths = to_sync[last_saved_index:] + else: + protected_paths, paths = to_sync, [] + + if self.ignore_last: + # Remove the last restart from sync paths + if protected_paths and protected_paths[-1] == restarts[-1]: + protected_paths.pop() + + # Add to sync source paths + self.source_paths.extend([SourcePath(path=path, protected=True) + for path in protected_paths]) + self.source_paths.extend([SourcePath(path) for path in paths]) + + def add_extra_source_paths(self): + """Add additional paths to sync to remote archive""" + paths = self.config.get('extra_paths', []) + if isinstance(paths, str): + paths = [paths] + + for path in paths: + matching_paths = glob.glob(path) + # First check if any matching paths exists + if matching_paths: + # Add extra paths to protected paths - so they can't be deleted + self.source_paths.append(SourcePath(path=path, protected=True)) + else: + print(f"payu: error: No paths matching {path} found. " + "Failed to sync path to remote archive") + + def set_destination_path(self): + "set or create destination path to sync archive to" + # Remote path to sync output to + dest_path = self.config.get('path', None) + if dest_path is None: + print("There's is no configured path to sync output to. " + "In config.yaml, set:\n" + " sync:\n path: PATH/TO/REMOTE/ARCHIVE\n" + "Replace PATH/TO/REMOTE/ARCHIVE with a unique absolute path " + "to sync outputs to. Ensure path is unique to avoid " + "overwriting exsiting output!") + raise ValueError("payu: error: Sync path is not defined.") + + if not self.remote_syncing: + # Create local destination directory if it does not exist + mkdir_p(dest_path) + else: + # Syncing to remote machine + remote_user = self.config.get('user', None) + if remote_user is not None: + dest_path = f'{remote_user}@{self.remote_url}:{dest_path}' + else: + dest_path = f'{self.remote_url}:{dest_path}' + + self.destination_path = dest_path + + def set_excludes_flags(self): + """Add lists of patterns of filepaths to exclude from sync commands""" + # Get any excludes + exclude = self.config.get('exclude', []) + if isinstance(exclude, str): + exclude = [exclude] + + excludes = ' '.join(['--exclude ' + pattern for pattern in exclude]) + + # Default to exclude uncollated files if collation is enabled + # This can be over-riden using exclude_uncollated config flag + exclude_uncollated = self.config.get('exclude_uncollated', None) + + if exclude_uncollated is None: + collate_config = self.expt.config.get('collate', {}) + collating = collate_config.get('enable', True) + if collating: + exclude_uncollated = True + + exclude_flag = "--exclude *.nc.*" + if (exclude_uncollated and exclude_flag not in excludes + and exclude_flag not in self.config.get('rsync_flags', [])): + excludes += " --exclude *.nc.*" + + self.excludes = excludes + + def build_cmd(self, source_path): + """Given a source path to sync, return a rsync command""" + if source_path.protected: + # No local delete option for protected paths + cmd = f'{self.base_rsync_cmd} {self.excludes} ' + elif source_path.is_log_file: + cmd = f'{self.base_rsync_cmd} {self.remove_files} ' + else: + cmd = f'{self.base_rsync_cmd} {self.excludes} {self.remove_files} ' + + cmd += f'{source_path.path} {self.destination_path}' + return cmd + + def run_cmd(self, source_path): + """Given an source path, build and run rsync command""" + cmd = self.build_cmd(source_path) + print(cmd) + try: + subprocess.check_call(cmd, shell=True) + except subprocess.CalledProcessError as e: + print('payu: Error rsyncing archive to remote directory: ' + f'Failed running command: {cmd}.') + # TODO: Raise or return? + return + + if not source_path.protected and self.remove_local_dirs: + # Only delete real directories; ignore symbolic links + path = source_path.path + if os.path.isdir(path) and not os.path.islink(path): + print(f"Removing {path} from local archive") + shutil.rmtree(path) + + def git_runlog(self): + """Add git runlog to remote archive""" + add_git_runlog = self.config.get("runlog", True) + + if add_git_runlog: + # Currently runlog is only set up for local remote archive + if self.remote_syncing: + print("payu: error: Syncing the git runlog is not implemented " + "for syncing to a remote machine") + return + + control_path = self.expt.control_path + runlog_path = os.path.join(self.destination_path, "git-runlog") + if not os.path.exists(runlog_path): + # Create a bare repository, if it doesn't exist + try: + print("Creating git-runlog bare repository clone" + f" at {runlog_path}") + cmd = f"git clone --bare {control_path} {runlog_path}" + subprocess.check_call(cmd, shell=True) + except subprocess.CalledProcessError as e: + print("payu: error: Failed to create a bare repository. ", + f"Error: {e}") + return + else: + # Update bare gitlog repo + try: + print(f"Updating git-runlog at {runlog_path}") + cmd = f"git push {runlog_path}" + subprocess.check_call(cmd, shell=True, cwd=control_path) + except subprocess.CalledProcessError as e: + print("payu: error: Failed to push git runlog to bare " + f"repository. Error: {e}") + + def run(self): + """Build and run rsync cmds to remote remote archive """ + # Add outputs and restarts to source paths to sync + self.add_outputs_to_sync() + self.add_restarts_to_sync() + + # Add pbs and error logs to paths + for log_type in ['error_logs', 'pbs_logs']: + log_path = os.path.join(self.expt.archive_path, log_type) + if os.path.isdir(log_path): + self.source_paths.append(SourcePath(path=log_path, + is_log_file=True)) + + # Add any additional paths to protected paths + self.add_extra_source_paths() + + # Set rsync command components + self.set_destination_path() + self.set_excludes_flags() + + # Set base rsync command + default_flags = '-vrltoD --safe-links' + rsync_flags = self.config.get('rsync_flags', default_flags) + self.base_rsync_cmd = f'rsync {rsync_flags}' + + # Set remove local files/dirs options + remove_files = self.config.get('remove_local_files', False) + self.remove_files = '--remove-source-files' if remove_files else '' + self.remove_local_dirs = self.config.get('remove_local_dirs', False) + + # Build and run all rsync commands + for source_path in self.source_paths: + self.run_cmd(source_path) + + # Add git runlog to remote archive + self.git_runlog() diff --git a/test/test_sync.py b/test/test_sync.py index 320c7f08..e1d27a31 100644 --- a/test/test_sync.py +++ b/test/test_sync.py @@ -10,7 +10,7 @@ from test.common import tmpdir, ctrldir, labdir, expt_archive_dir from test.common import config as config_orig from test.common import write_config -from test.common import make_all_files +from test.common import make_all_files, make_random_file from test.common import make_expt_archive_dir verbose = True @@ -43,7 +43,8 @@ def setup_module(module): # Create 5 restarts and outputs for dir_type in ['restart', 'output']: for i in range(5): - make_expt_archive_dir(type=dir_type, index=i) + path = make_expt_archive_dir(type=dir_type, index=i) + make_random_file(os.path.join(path, f'test-{dir_type}00{i}-file')) def teardown_module(module): @@ -60,22 +61,87 @@ def teardown_module(module): print(e) +def setup_sync(additional_config, add_envt_vars=None): + """Given additional configuration and envt_vars, return initialised + class used to build/run rsync commands""" + # Set experiment config + test_config = copy.deepcopy(config) + test_config.update(additional_config) + write_config(test_config) + + # Set up Experiment + with cd(ctrldir): + lab = payu.laboratory.Laboratory(lab_path=str(labdir)) + experiment = payu.experiment.Experiment(lab, reproduce=False) + + # Set enviroment vars + if add_envt_vars is not None: + for var, value in add_envt_vars.items(): + os.environ[var] = value + + return payu.sync.SyncToRemoteArchive(experiment) + + +def assert_expected_archive_paths(source_paths, + expected_dirs, + expected_protected_dirs): + """Check given source archive source paths that it includes + the expected dirs to sync""" + dirs, protected_dirs = [], [] + for source_path in source_paths: + path = source_path.path + assert os.path.dirname(path) == str(expt_archive_dir) + + dir = os.path.basename(path) + if source_path.protected: + protected_dirs.append(dir) + else: + dirs.append(dir) + + assert dirs == expected_dirs + assert protected_dirs == expected_protected_dirs + + @pytest.mark.parametrize( - "additional_config, expected_dirs_to_sync", + "envt_vars, expected_outputs, expected_protected_outputs", [ ( {}, - ['output000', 'output001', 'output002', 'output003', 'output004'] + ['output000', 'output001', 'output002', 'output003'], ['output004'] ), + ( + {'PAYU_SYNC_IGNORE_LAST': 'True'}, + ['output000', 'output001', 'output002', 'output003'], [] + ), + ]) +def test_add_outputs_to_sync(envt_vars, expected_outputs, + expected_protected_outputs): + sync = setup_sync(additional_config={}, add_envt_vars=envt_vars) + + # Test function + sync.add_outputs_to_sync() + + # Assert expected outputs and protected outputs are added + assert_expected_archive_paths(sync.source_paths, + expected_outputs, + expected_protected_outputs) + + # Tidy up test - Remove any added enviroment variables + for envt_var in envt_vars.keys(): + del os.environ[envt_var] + + +@pytest.mark.parametrize( + "add_config, envt_vars, expected_restarts, expected_protected_restarts", + [ ( { "sync": { 'restarts': True }, "restart_freq": 5 - }, - ['output000', 'output001', 'output002', 'output003', 'output004', - 'restart000'] + }, {}, + [], ['restart000'] ), ( { @@ -83,69 +149,101 @@ def teardown_module(module): 'restarts': True }, "restart_freq": 2 - }, - ['output000', 'output001', 'output002', 'output003', 'output004', - 'restart000', 'restart002', 'restart004'] + }, {}, + ['restart000', 'restart002'], ['restart004'] + ), + ( + { + "sync": { + "restarts": True + }, + "restart_freq": 2 + }, {'PAYU_SYNC_IGNORE_LAST': 'True'}, + ['restart000', 'restart002'], [] + ), + ( + {"restart_freq": 3}, {'PAYU_SYNC_RESTARTS': 'True'}, + ['restart000', 'restart001', 'restart002'], + ['restart003', 'restart004'] ), ]) -def test_get_archive_paths_to_sync(additional_config, expected_dirs_to_sync): - # Write config - test_config = copy.deepcopy(config) - test_config.update(additional_config) - write_config(test_config) +def test_restarts_to_sync(add_config, envt_vars, + expected_restarts, expected_protected_restarts): + sync = setup_sync(add_config, envt_vars) - with cd(ctrldir): - lab = payu.laboratory.Laboratory(lab_path=str(labdir)) - expt = payu.experiment.Experiment(lab, reproduce=False) + # Test function + sync.add_restarts_to_sync() - # Function to test - src_paths = expt.get_archive_paths_to_sync() + # Assert expected restarts and protected restarts are added + assert_expected_archive_paths(sync.source_paths, + expected_restarts, + expected_protected_restarts) - dirs = [] - for path in src_paths: - assert os.path.dirname(path) == str(expt_archive_dir) - dirs.append(os.path.basename(path)) + # Tidy up test - Remove any added enviroment variables + for envt_var in envt_vars.keys(): + del os.environ[envt_var] + + +def test_set_destination_path(): + additional_config = { + "sync": { + "url": "test.domain", + "user": "test-usr", + "path": "remote/path", + }} + sync = setup_sync(additional_config=additional_config) + + # Test destination_path + sync.set_destination_path() + assert sync.destination_path == "test-usr@test.domain:remote/path" - assert dirs == expected_dirs_to_sync + # Test value error raised when path is not set + sync = setup_sync(additional_config={}) + with pytest.raises(ValueError): + sync.set_destination_path() @pytest.mark.parametrize( - "set_enviroment_var, expected_dirs_to_sync", + "add_config, expected_excludes", [ ( - 'PAYU_SYNC_IGNORE_LAST', - ['output000', 'output001', 'output002', 'output003'] + { + "sync": { + "exclude": ["iceh.????-??-??.nc", "*-DEPRECATED"] + }, + "collate": { + "enable": True + } + }, ("--exclude iceh.????-??-??.nc --exclude *-DEPRECATED" + " --exclude *.nc.*") ), ( - 'PAYU_SYNC_RESTARTS', - ['output000', 'output001', 'output002', 'output003', 'output004', - 'restart000', 'restart001', 'restart002', 'restart003', - 'restart004'] + { + "sync": { + "exclude_uncollated": False + }, + "collate": { + "enable": True + } + }, "" ), + ( + { + "sync": { + "exclude": "*-DEPRECATED" + }, + "collate": { + "enable": False + } + }, "--exclude *-DEPRECATED" + ) ]) -def test_get_archive_paths_to_sync_environ_vars(set_enviroment_var, - expected_dirs_to_sync): - # Write config - write_config(config) - - os.environ[set_enviroment_var] = 'True' - - with cd(ctrldir): - lab = payu.laboratory.Laboratory(lab_path=str(labdir)) - expt = payu.experiment.Experiment(lab, reproduce=False) - - # Function to test - src_paths = expt.get_archive_paths_to_sync() - - dirs = [] - for path in src_paths: - assert os.path.dirname(path) == str(expt_archive_dir) - dirs.append(os.path.basename(path)) - - assert dirs == expected_dirs_to_sync +def test_set_excludes_flags(add_config, expected_excludes): + sync = setup_sync(additional_config=add_config) - # Tidy up test - del os.environ[set_enviroment_var] + # Test setting excludes + sync.set_excludes_flags() + assert sync.excludes == expected_excludes def test_sync(): @@ -177,27 +275,20 @@ def test_sync(): additional_config = { "sync": { - "path": str(remote_archive) + "path": str(remote_archive), + "runlog": False } } + sync = setup_sync(additional_config) - # Write config - test_config = copy.deepcopy(config) - test_config.update(additional_config) - write_config(test_config) - - with cd(ctrldir): - lab = payu.laboratory.Laboratory(lab_path=str(labdir)) - expt = payu.experiment.Experiment(lab, reproduce=False) + # Function to test + sync.run() - # Function to test - expt.sync() - - expected_dirs_synced = ['output000', 'output001', 'output002', - 'output003', 'output004', 'pbs_logs'] + expected_dirs_synced = {'output000', 'output001', 'output002', + 'output003', 'output004', 'pbs_logs'} # Test output is moved to remote dir - assert os.listdir(remote_archive) == expected_dirs_synced + assert set(os.listdir(remote_archive)) == expected_dirs_synced # Test inner log files are copied remote_log_path = os.path.join(remote_archive, 'pbs_logs', log_filename) @@ -212,3 +303,34 @@ def test_sync(): # Check that uncollated files are not synced by default assert not os.path.exists(os.path.join(remote_archive, uncollated_file)) assert os.path.exists(os.path.join(remote_archive, collated_file)) + + # Check synced file still exist locally + local_archive_dirs = os.listdir(expt_archive_dir) + for dir in expected_dirs_synced: + assert dir in local_archive_dirs + + # Test sync with remove synced files locally flag + additional_config['sync']['remove_local_files'] = True + sync = setup_sync(additional_config) + sync.run() + + # Check synced files are removed from local archive + # Except for the protected paths (last output in this case) + for output in ['output000', 'output001', 'output002', 'output003']: + file_path = os.path.join(expt_archive_dir, dir, f'test-{output}-file') + assert not os.path.exists(file_path) + + last_output_path = os.path.join(expt_archive_dir, 'output004') + last_output_file = os.path.join(last_output_path, f'test-output004-file') + assert os.path.exists(last_output_file) + + # Test sync with remove synced dirs flag as well + additional_config['sync']['remove_local_dirs'] = True + sync = setup_sync(additional_config) + sync.run() + + # Assert synced output dirs removed (except for the last output) + local_archive_dirs = os.listdir(expt_archive_dir) + for output in ['output000', 'output001', 'output002', 'output003']: + assert output not in local_archive_dirs + assert 'output004' in local_archive_dirs