From 7088a91e4f5461d96d97429f67c8a3519db6d3d0 Mon Sep 17 00:00:00 2001 From: AntonMFernando-NOAA <167725623+AntonMFernando-NOAA@users.noreply.github.com> Date: Wed, 25 Sep 2024 10:47:03 -0400 Subject: [PATCH] Add an archive task to GEFS system to archive files in HPSS (#2895) - This task is an extension of the arch job previously merged that archives files in ROTDIR (#2816 https://github.com/AntonMFernando-NOAA/global-workflow/commit/2816c3b69d87425c538baa1215ba29719bdcde47) - This feature adds an archive task to GEFS system to archive files in HPSSARCH and LOCALARCH. Resolves #2698 Refs #2816 #2772 #832 --------- Co-authored-by: David Huber --- parm/archive/gefs_extracted_atmos.yaml.j2 | 86 +++++++++++++++++++++++ parm/archive/gefs_extracted_ice.yaml.j2 | 33 +++++++++ parm/archive/gefs_extracted_ocean.yaml.j2 | 40 +++++++++++ parm/archive/gefs_extracted_wave.yaml.j2 | 51 ++++++++++++++ parm/archive/master_gefs.yaml.j2 | 12 ++++ parm/config/gefs/yaml/defaults.yaml | 1 - parm/config/gfs/config.resources | 11 +-- parm/config/gfs/config.resources.GAEA | 5 ++ parm/config/gfs/config.resources.HERA | 5 ++ parm/config/gfs/config.resources.JET | 5 ++ parm/config/gfs/config.resources.ORION | 6 ++ parm/config/gfs/config.resources.S4 | 6 ++ scripts/exglobal_archive.py | 5 +- ush/python/pygfs/task/archive.py | 3 - workflow/applications/gefs.py | 4 +- workflow/rocoto/gefs_tasks.py | 30 ++++++-- 16 files changed, 286 insertions(+), 17 deletions(-) create mode 100644 parm/archive/gefs_extracted_atmos.yaml.j2 create mode 100644 parm/archive/gefs_extracted_ice.yaml.j2 create mode 100644 parm/archive/gefs_extracted_ocean.yaml.j2 create mode 100644 parm/archive/gefs_extracted_wave.yaml.j2 create mode 100644 parm/archive/master_gefs.yaml.j2 diff --git a/parm/archive/gefs_extracted_atmos.yaml.j2 b/parm/archive/gefs_extracted_atmos.yaml.j2 new file mode 100644 index 0000000000..7ceba551bf --- /dev/null +++ b/parm/archive/gefs_extracted_atmos.yaml.j2 @@ -0,0 +1,86 @@ +{% set cycle_HH = current_cycle | strftime("%H") %} +{% set cycle_YMD = current_cycle | to_YMD %} +{% set cycle_YMDH = current_cycle | to_YMDH %} +{% set head = RUN + ".t" + cycle_HH + "z." %} + +gefs_atmos: + name: "GEFS_ATMOS" + target: "{{ ATARDIR }}/{{ cycle_YMDH }}/gefs_atmos.tar" + required: +#select mem%03d and ensstat files required +{% set members = ["ensstat"] %} +{% for mem_nm in range(0, NMEM_ENS + 1) %} + {% do members.append("mem" ~ '%03d' % mem_nm ) %} +{% endfor %} + +{% if REPLAY_ICS %} + {% set ofst_hr = OFFSET_START_HOUR %} +{% else %} + {% set ofst_hr = FHMIN_GFS %} +{% endif %} + +{% for mem in members %} + {% for res in ['0p25', '0p50', '1p00'] %} + {% set tmpl_dict = ({ '${ROTDIR}':ROTDIR, + '${RUN}':RUN, + '${YMD}':cycle_YMD, + '${HH}':cycle_HH, + '${GRID}':res, + '${MEMDIR}':mem }) %} + + {% set COMIN_ATMOS_GRIB = COM_ATMOS_GRIB_GRID_TMPL | replace_tmpl(tmpl_dict) %} + +# Select pgrb and grib files to copy to the atardir + {% if path_exists(COMIN_ATMOS_GRIB) %} + {% if FHMAX_HF_GFS == 0 %} + {% for fhr in range(ofst_hr, FHMAX_GFS + FHOUT_GFS, FHOUT_GFS) %} + {% if mem=="ensstat" %} + {% set file_name = head ~ "mean.pres_." ~ res ~ ".f" ~ '%03d'|format(fhr) ~ ".grib2" %} + {% set file_path = COMIN_ATMOS_GRIB ~ "/" ~ file_name %} + - "{{ file_path | relpath(ROTDIR)}}" + {% else %} + {% set file_name = head ~ "pgrb2." ~ res ~ ".f" ~ '%03d'|format(fhr) %} + {% set file_path = COMIN_ATMOS_GRIB ~ "/" ~ file_name %} + - "{{ file_path | relpath(ROTDIR)}}" + {% set file_name = head ~ "pgrb2b." ~ res ~ ".f" ~ '%03d'|format(fhr) %} + {% set file_path = COMIN_ATMOS_GRIB ~ "/" ~ file_name %} + - "{{ file_path | relpath(ROTDIR)}}" + {% endif %} + {% endfor %} + {% else %} + {% if res == "0p25" %} + {% for fhr in range(ofst_hr, FHMAX_HF_GFS + FHOUT_HF_GFS, FHOUT_HF_GFS) %} + {% if mem=="ensstat" %} + {% set file_name = head ~ "mean.pres_." ~ res ~ ".f" ~ '%03d'|format(fhr) ~ ".grib2" %} + {% set file_path = COMIN_ATMOS_GRIB ~ "/" ~ file_name %} + - "{{ file_path | relpath(ROTDIR)}}" + {% else %} + {% set file_name = head ~ "pgrb2." ~ res ~ ".f" ~ '%03d'|format(fhr) %} + {% set file_path = COMIN_ATMOS_GRIB ~ "/" ~ file_name %} + - "{{ file_path | relpath(ROTDIR)}}" + {% set file_name = head ~ "pgrb2b." ~ res ~ ".f" ~ '%03d'|format(fhr) %} + {% set file_path = COMIN_ATMOS_GRIB ~ "/" ~ file_name %} + - "{{ file_path | relpath(ROTDIR)}}" + {% endif %} + {% endfor %} + {% endif %} + {% if res == "0p50" %} + {% for fhr in range(FHMAX_HF_GFS + FHOUT_GFS, FHMAX_GFS + FHOUT_GFS, FHOUT_GFS) %} + {% if mem=="ensstat" %} + {% set file_name = head ~ "mean.pres_." ~ res ~ ".f" ~ '%03d'|format(fhr) ~ ".grib2" %} + {% set file_path = COMIN_ATMOS_GRIB ~ "/" ~ file_name %} + - "{{ file_path | relpath(ROTDIR)}}" + {% else %} + {% set file_name = head ~ "pgrb2." ~ res ~ ".f" ~ '%03d'|format(fhr) %} + {% set file_path = COMIN_ATMOS_GRIB ~ "/" ~ file_name %} + - "{{ file_path | relpath(ROTDIR)}}" + {% set file_name = head ~ "pgrb2b." ~ res ~ ".f" ~ '%03d'|format(fhr) %} + {% set file_path = COMIN_ATMOS_GRIB ~ "/" ~ file_name %} + - "{{ file_path | relpath(ROTDIR)}}" + {% endif %} + {% endfor %} + {% endif %} + {% endif %} + {% endif %} + {% endfor %} +{% endfor %} diff --git a/parm/archive/gefs_extracted_ice.yaml.j2 b/parm/archive/gefs_extracted_ice.yaml.j2 new file mode 100644 index 0000000000..786d502f23 --- /dev/null +++ b/parm/archive/gefs_extracted_ice.yaml.j2 @@ -0,0 +1,33 @@ +{% set cycle_HH = current_cycle | strftime("%H") %} +{% set cycle_YMD = current_cycle | to_YMD %} +{% set cycle_YMDH = current_cycle | to_YMDH %} +{% set head = RUN + ".ice.t" + cycle_HH + "z." %} + +gefs_ice: + name: "GEFS_ICE" + target: "{{ ATARDIR }}/{{ cycle_YMDH }}/gefs_ice.tar" + required: +#select mem%03d and ensstat files required +{% set members = [] %} +{% for mem_nm in range(0, NMEM_ENS + 1) %} + {% do members.append("mem" ~ '%03d' % mem_nm ) %} +{% endfor %} + +{% for mem in members %} + {% set tmpl_dict = ({ '${ROTDIR}':ROTDIR, + '${RUN}':RUN, + '${YMD}':cycle_YMD, + '${HH}':cycle_HH, + '${MEMDIR}':mem }) %} + + {% set COMIN_ICE_HISTORY = COM_ICE_HISTORY_TMPL | replace_tmpl(tmpl_dict) %} + +# Select netcdf files to copy to the atardir + {% if path_exists(COMIN_ICE_HISTORY) %} + {% for fhr in range(FHMIN_GFS + FHOUT_ICE_GFS, FHMAX_GFS + FHOUT_ICE_GFS, FHOUT_ICE_GFS) %} + {% set file_name = head ~ FHOUT_ICE_GFS ~ "hr_avg" ~ ".f" ~ '%03d'|format(fhr) ~ ".nc" %} + {% set file_path = COMIN_ICE_HISTORY ~ "/" ~ file_name %} + - "{{ file_path | relpath(ROTDIR)}}" + {% endfor %} + {% endif %} +{% endfor %} diff --git a/parm/archive/gefs_extracted_ocean.yaml.j2 b/parm/archive/gefs_extracted_ocean.yaml.j2 new file mode 100644 index 0000000000..e5e3b36e3b --- /dev/null +++ b/parm/archive/gefs_extracted_ocean.yaml.j2 @@ -0,0 +1,40 @@ +{% set cycle_HH = current_cycle | strftime("%H") %} +{% set cycle_YMD = current_cycle | to_YMD %} +{% set cycle_YMDH = current_cycle | to_YMDH %} +{% set head = RUN + ".ocean.t" + cycle_HH + "z." %} + +gefs_ocean: + name: "GEFS_OCEAN" + target: "{{ ATARDIR }}/{{ cycle_YMDH }}/gefs_ocean.tar" + required: +#select mem%03d and ensstat files required +{% set members = [] %} +{% for mem_nm in range(0, NMEM_ENS + 1) %} + {% do members.append("mem" ~ '%03d' % mem_nm ) %} +{% endfor %} + +{% if OCNRES == "025" %} + {% set res = "1p00" %} +{% else %} + {% set res = (OCNRES|string())[0] ~ "p" ~ (OCNRES|string())[-2:] %} +{% endif %} + +{% for mem in members %} + {% set tmpl_dict = ({ '${ROTDIR}':ROTDIR, + '${RUN}':RUN, + '${YMD}':cycle_YMD, + '${HH}':cycle_HH, + '${MEMDIR}':mem }) %} + + {% set COMIN_OCEAN_NETCDF = COM_OCEAN_NETCDF_TMPL | replace_tmpl(tmpl_dict) %} + + # Select netcdf files to copy to the atardir + {% set netcdf_grid_dir = COMIN_OCEAN_NETCDF ~ "/" ~ res %} + {% if path_exists(netcdf_grid_dir) %} + {% for fhr in range(FHMIN_GFS + FHOUT_OCN_GFS, FHMAX_GFS + FHOUT_OCN_GFS, FHOUT_OCN_GFS) %} + {% set file_name = head ~ res ~ ".f" ~ '%03d'|format(fhr) ~ ".nc" %} + {% set file_path = netcdf_grid_dir ~ "/" ~ file_name %} + - "{{ file_path | relpath(ROTDIR)}}" + {% endfor %} + {% endif %} +{% endfor %} diff --git a/parm/archive/gefs_extracted_wave.yaml.j2 b/parm/archive/gefs_extracted_wave.yaml.j2 new file mode 100644 index 0000000000..e0aa07c816 --- /dev/null +++ b/parm/archive/gefs_extracted_wave.yaml.j2 @@ -0,0 +1,51 @@ +{% set cycle_HH = current_cycle | strftime("%H") %} +{% set cycle_YMD = current_cycle | to_YMD %} +{% set cycle_YMDH = current_cycle | to_YMDH %} +{% set head = RUN + "wave.t" + cycle_HH + "z." %} + +gefs_wave: + name: "GEFS_WAVE" + target: "{{ ATARDIR }}/{{ cycle_YMDH }}/gefs_wave.tar" + required: +{% if REPLAY_ICS %} + {% set ofst_hr = OFFSET_START_HOUR %} +{% else %} + {% set ofst_hr = FHMIN_GFS %} +{% endif %} + +{% set res = (waveGRD[-3:])[0] ~ "p" ~ (waveGRD[-3:])[-2:] %} + +#select mem%03d and ensstat files required +{% set members = [] %} +{% for mem_nm in range(0, NMEM_ENS + 1) %} + {% do members.append("mem" ~ '%03d' % mem_nm ) %} +{% endfor %} + +{% for mem in members %} + {% set tmpl_dict = ({ '${ROTDIR}':ROTDIR, + '${RUN}':RUN, + '${YMD}':cycle_YMD, + '${HH}':cycle_HH, + '${MEMDIR}':mem }) %} + + {% set COMIN_WAVE_GRID = COM_WAVE_GRID_TMPL | replace_tmpl(tmpl_dict) %} + # Select grib2 files to copy to the atardir + {% if path_exists(COMIN_WAVE_GRID) %} + {% for fhr in range(ofst_hr, FHMAX_GFS + FHOUT_WAV, FHOUT_WAV) %} + {% set file_name = head ~ "global." ~ res ~ ".f" ~ '%03d'|format(fhr) ~ ".grib2" %} + {% set file_path = COMIN_WAVE_GRID ~ "/" ~ file_name %} + - "{{ file_path | relpath(ROTDIR)}}" + {% endfor %} + {% endif %} + + {% set COMIN_WAVE_STATION = COM_WAVE_STATION_TMPL | replace_tmpl(tmpl_dict) %} + # Select station files to copy to the atardir + {% if path_exists(COMIN_WAVE_STATION) %} + {% set file_path = COMIN_WAVE_STATION ~ "/" ~ RUN ~ "wave.t" ~ cycle_HH ~ "z.spec_tar.gz" %} + - "{{ file_path | relpath(ROTDIR)}}" + {% set file_path = COMIN_WAVE_STATION ~ "/" ~ RUN ~ "wave.t" ~ cycle_HH ~ "z.cbull_tar" %} + - "{{ file_path | relpath(ROTDIR)}}" + {% set file_path = COMIN_WAVE_STATION ~ "/" ~ RUN ~ "wave.t" ~ cycle_HH ~ "z.bull_tar" %} + - "{{ file_path | relpath(ROTDIR)}}" + {% endif %} +{% endfor %} diff --git a/parm/archive/master_gefs.yaml.j2 b/parm/archive/master_gefs.yaml.j2 new file mode 100644 index 0000000000..5dc046dcfd --- /dev/null +++ b/parm/archive/master_gefs.yaml.j2 @@ -0,0 +1,12 @@ +# Set variables/lists needed to parse the gefs templates +{% set cycle_HH = current_cycle | strftime("%H") %} +{% set cycle_YMD = current_cycle | to_YMD %} +{% set cycle_YMDH = current_cycle | to_YMDH %} + +datasets: +{% filter indent(width=4) %} + {% include "gefs_extracted_atmos.yaml.j2" %} + {% include "gefs_extracted_ocean.yaml.j2" %} + {% include "gefs_extracted_ice.yaml.j2" %} + {% include "gefs_extracted_wave.yaml.j2" %} +{% endfilter %} diff --git a/parm/config/gefs/yaml/defaults.yaml b/parm/config/gefs/yaml/defaults.yaml index 5ecf690e18..382e60ee12 100644 --- a/parm/config/gefs/yaml/defaults.yaml +++ b/parm/config/gefs/yaml/defaults.yaml @@ -14,5 +14,4 @@ base: FCST_BREAKPOINTS: "48" REPLAY_ICS: "NO" USE_OCN_PERTURB_FILES: "false" - HPSSARCH: "NO" LOCALARCH: "NO" diff --git a/parm/config/gfs/config.resources b/parm/config/gfs/config.resources index afc5939fcd..0479543ebc 100644 --- a/parm/config/gfs/config.resources +++ b/parm/config/gfs/config.resources @@ -38,6 +38,7 @@ echo "BEGIN: config.resources" case ${machine} in "WCOSS2") max_tasks_per_node=128 + # WCOSS2 nodes have 512GB of RAM, but only 500GB are reservable # shellcheck disable=SC2034 mem_node_max="500GB" ;; @@ -54,12 +55,12 @@ case ${machine} in "ORION") max_tasks_per_node=40 # shellcheck disable=SC2034 - mem_node_max="192GB" + mem_node_max="180GB" ;; "HERCULES") max_tasks_per_node=80 # shellcheck disable=SC2034 - mem_node_max="512GB" + mem_node_max="500GB" ;; "JET") case ${PARTITION_BATCH} in @@ -145,10 +146,10 @@ export max_tasks_per_node case ${step} in "prep") walltime='00:30:00' - ntasks=4 - tasks_per_node=2 + ntasks=14 + tasks_per_node=14 threads_per_task=1 - memory="40GB" + memory="${mem_node_max}" ;; "prepsnowobs") diff --git a/parm/config/gfs/config.resources.GAEA b/parm/config/gfs/config.resources.GAEA index a4d4ddfece..c50601da00 100644 --- a/parm/config/gfs/config.resources.GAEA +++ b/parm/config/gfs/config.resources.GAEA @@ -3,6 +3,11 @@ # Gaea-specific job resources case ${step} in + "prep") + # Run on two nodes (requires ~400GB total) + tasks_per_node=7 + ;; + "eobs") # The number of tasks and cores used must be the same for eobs # See https://github.com/NOAA-EMC/global-workflow/issues/2092 for details diff --git a/parm/config/gfs/config.resources.HERA b/parm/config/gfs/config.resources.HERA index d1b09fcc32..ac3067d9f9 100644 --- a/parm/config/gfs/config.resources.HERA +++ b/parm/config/gfs/config.resources.HERA @@ -3,6 +3,11 @@ # Hera-specific job resources case ${step} in + "prep") + # Run on 7 nodes for memory requirement + tasks_per_node=2 + ;; + "anal") if [[ "${CASE}" == "C384" ]]; then export ntasks_gdas=270 diff --git a/parm/config/gfs/config.resources.JET b/parm/config/gfs/config.resources.JET index 47b953c0f4..bbd308f439 100644 --- a/parm/config/gfs/config.resources.JET +++ b/parm/config/gfs/config.resources.JET @@ -3,6 +3,11 @@ # Jet-specific job resources case ${step} in + "prep") + # Run on 7 nodes for memory requirement + tasks_per_node=2 + ;; + "anal") if [[ "${CASE}" == "C384" ]]; then export ntasks=270 diff --git a/parm/config/gfs/config.resources.ORION b/parm/config/gfs/config.resources.ORION index 6b42d780d4..d761df7b73 100644 --- a/parm/config/gfs/config.resources.ORION +++ b/parm/config/gfs/config.resources.ORION @@ -3,6 +3,12 @@ # Orion-specific job resources case ${step} in + "prep") + # Run on 2 nodes for memory requirement + # This may not be enough and may need to run on more nodes. + export tasks_per_node=7 + ;; + "anal") # TODO: # On Orion, after Rocky 9 upgrade, GSI performance is degraded. diff --git a/parm/config/gfs/config.resources.S4 b/parm/config/gfs/config.resources.S4 index 817494c7cd..fa64068e81 100644 --- a/parm/config/gfs/config.resources.S4 +++ b/parm/config/gfs/config.resources.S4 @@ -3,6 +3,12 @@ # S4-specific job resources case ${step} in + "prep") + # Run on two nodes for memory requirement + # This may not be enough memory. Decrease tasks/node to 2 if necessary. + tasks_per_node=7 + ;; + "anal") case ${CASE} in "C384") diff --git a/scripts/exglobal_archive.py b/scripts/exglobal_archive.py index 793fa1c1ac..4ee9e5ed0e 100755 --- a/scripts/exglobal_archive.py +++ b/scripts/exglobal_archive.py @@ -30,8 +30,9 @@ def main(): 'FHOUT_HF_WAV', 'FHMAX_WAV', 'FHMAX_HF_WAV', 'FHMAX_WAV_GFS', 'restart_interval_gdas', 'restart_interval_gfs', 'AERO_ANL_RUN', 'AERO_FCST_RUN', 'DOIBP_WAV', 'DO_JEDIOCNVAR', - 'NMEM_ENS', 'DO_JEDIATMVAR', 'DO_VRFY_OCEANDA', 'FHMAX_FITS', - 'IAUFHRS', 'DO_FIT2OBS', 'NET'] + 'NMEM_ENS', 'DO_JEDIATMVAR', 'DO_VRFY_OCEANDA', 'FHMAX_FITS', 'waveGRD', + 'IAUFHRS', 'DO_FIT2OBS', 'NET', 'FHOUT_HF_GFS', 'FHMAX_HF_GFS', 'REPLAY_ICS', + 'OFFSET_START_HOUR'] archive_dict = AttrDict() for key in keys: diff --git a/ush/python/pygfs/task/archive.py b/ush/python/pygfs/task/archive.py index 14cd015601..d138474e9a 100644 --- a/ush/python/pygfs/task/archive.py +++ b/ush/python/pygfs/task/archive.py @@ -114,9 +114,6 @@ def configure(self, arch_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[str self.tar_cmd = "" return arcdir_set, [] - if arch_dict.NET == "gefs": - raise NotImplementedError("GEFS archiving is not yet implemented!") - master_yaml = "master_" + arch_dict.RUN + ".yaml.j2" parsed_sets = parse_j2yaml(os.path.join(archive_parm, master_yaml), diff --git a/workflow/applications/gefs.py b/workflow/applications/gefs.py index afb4072596..c15786a2e8 100644 --- a/workflow/applications/gefs.py +++ b/workflow/applications/gefs.py @@ -80,8 +80,8 @@ def get_task_names(self): tasks += ['wavepostpnt'] if self.do_extractvars: - tasks += ['extractvars'] + tasks += ['extractvars', 'arch'] - tasks += ['arch', 'cleanup'] + tasks += ['cleanup'] return {f"{self.run}": tasks} diff --git a/workflow/rocoto/gefs_tasks.py b/workflow/rocoto/gefs_tasks.py index 3b72677a58..e214bb8c19 100644 --- a/workflow/rocoto/gefs_tasks.py +++ b/workflow/rocoto/gefs_tasks.py @@ -560,10 +560,32 @@ def arch(self): def cleanup(self): deps = [] - dep_dict = {'type': 'task', 'name': 'arch'} - deps.append(rocoto.add_dependency(dep_dict)) - - dependencies = rocoto.create_dependency(dep=deps) + if self.app_config.do_extractvars: + dep_dict = {'type': 'task', 'name': 'arch'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps) + else: + dep_dict = {'type': 'metatask', 'name': 'atmos_prod'} + deps.append(rocoto.add_dependency(dep_dict)) + dep_dict = {'type': 'metatask', 'name': 'atmos_ensstat'} + deps.append(rocoto.add_dependency(dep_dict)) + if self.app_config.do_ice: + dep_dict = {'type': 'metatask', 'name': 'ice_prod'} + deps.append(rocoto.add_dependency(dep_dict)) + if self.app_config.do_ocean: + dep_dict = {'type': 'metatask', 'name': 'ocean_prod'} + deps.append(rocoto.add_dependency(dep_dict)) + if self.app_config.do_wave: + dep_dict = {'type': 'metatask', 'name': 'wave_post_grid'} + deps.append(rocoto.add_dependency(dep_dict)) + dep_dict = {'type': 'metatask', 'name': 'wave_post_pnt'} + deps.append(rocoto.add_dependency(dep_dict)) + if self.app_config.do_wave_bnd: + dep_dict = {'type': 'metatask', 'name': 'wave_post_bndpnt'} + deps.append(rocoto.add_dependency(dep_dict)) + dep_dict = {'type': 'metatask', 'name': 'wave_post_bndpnt_bull'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps, dep_condition='and') resources = self.get_resource('cleanup') task_name = 'cleanup'