From 1d53953a308361561147899fc18e1aaaf1ccab03 Mon Sep 17 00:00:00 2001 From: Walter Kolczynski - NOAA Date: Mon, 12 Aug 2024 13:50:39 -0400 Subject: [PATCH] Add capability to run forecast in segments (#2795) Adds the ability to run a forecast in segments instead of all at once. To accomplish this, a new local `checkpnts` variable is introduced to `config.base` to contain a comma-separated list of intermediate stopping points for the forecast. This is combined with `FHMIN_GFS` and `FHMAX_GFS` to create a comma-separated string `FCST_SEGMENTS` with all the start/end points that is used by `config.fcst` and rocoto workflow. Capability to parse these into python lists was added to wxflow in an accompanying PR. If `checkpnts` is an empty string, this will result in a single-segment forecast. To accommodate the new segment metatasks that must be run serially, the capability of `create_task()` was expanded to allow a dictionary key of `is_serial`, which controls whether a metatask is parallel or serial using pre-existing capability in rocoto. The default when not given is parallel (i.e. most metatasks). Resolves #2274 Refs NOAA-EMC/wxflow#39 Refs NOAA-EMC/wxflow#40 --- ci/cases/yamls/gfs_extended_ci.yaml | 1 + jobs/JGLOBAL_FORECAST | 13 +++- parm/archive/enkf.yaml.j2 | 4 +- parm/archive/enkf_restarta_grp.yaml.j2 | 4 +- parm/archive/gdas.yaml.j2 | 4 +- parm/archive/gdas_restarta.yaml.j2 | 2 +- parm/archive/gfs_netcdfa.yaml.j2 | 2 +- parm/archive/master_enkf.yaml.j2 | 22 ------ parm/archive/master_gdas.yaml.j2 | 10 --- parm/archive/master_gfs.yaml.j2 | 12 --- parm/config/gefs/config.base | 7 +- parm/config/gefs/config.fcst | 13 +++- parm/config/gefs/yaml/defaults.yaml | 1 + parm/config/gfs/config.aeroanl | 2 +- parm/config/gfs/config.base | 13 ++-- parm/config/gfs/config.fcst | 11 ++- parm/config/gfs/yaml/defaults.yaml | 1 + sorc/wxflow | 2 +- ush/calcanl_gfs.py | 3 +- ush/forecast_predet.sh | 2 +- ush/python/pygfs/task/aero_analysis.py | 2 +- ush/python/pygfs/task/aero_prepobs.py | 2 +- workflow/applications/applications.py | 27 +++++++ workflow/rocoto/gefs_tasks.py | 103 +++++++++++++++++-------- workflow/rocoto/gfs_tasks.py | 63 +++++++++++---- workflow/rocoto/rocoto.py | 3 +- 26 files changed, 209 insertions(+), 120 deletions(-) diff --git a/ci/cases/yamls/gfs_extended_ci.yaml b/ci/cases/yamls/gfs_extended_ci.yaml index 42ee612f3a..8caa942eed 100644 --- a/ci/cases/yamls/gfs_extended_ci.yaml +++ b/ci/cases/yamls/gfs_extended_ci.yaml @@ -9,5 +9,6 @@ base: DO_AWIPS: "NO" DO_NPOESS: "YES" DO_GENESIS_FSU: "NO" + FCST_BREAKPOINTS: 192 FHMAX_GFS: 384 FHMAX_HF_GFS: 120 diff --git a/jobs/JGLOBAL_FORECAST b/jobs/JGLOBAL_FORECAST index 9998470618..e64a91d21c 100755 --- a/jobs/JGLOBAL_FORECAST +++ b/jobs/JGLOBAL_FORECAST @@ -116,6 +116,17 @@ fi # Remove the Temporary working directory ########################################## cd "${DATAROOT}" || true -[[ "${KEEPDATA}" == "NO" ]] && rm -rf "${DATA}" "${DATArestart}" # do not remove DATAjob. It contains DATAoutput +# do not remove DATAjob. It contains DATAoutput +if [[ "${KEEPDATA}" == "NO" ]]; then + rm -rf "${DATA}" + + # Determine if this is the last segment + commas="${FCST_SEGMENTS//[^,]}" + n_segs=${#commas} + if (( n_segs - 1 == ${FCST_SEGMENT:-0} )); then + # Only delete temporary restarts if it is the last segment + rm -rf "${DATArestart}" + fi +fi exit 0 diff --git a/parm/archive/enkf.yaml.j2 b/parm/archive/enkf.yaml.j2 index bc5ef03cb8..92ed0095af 100644 --- a/parm/archive/enkf.yaml.j2 +++ b/parm/archive/enkf.yaml.j2 @@ -11,7 +11,7 @@ enkf: {% endfor %} - "logs/{{ cycle_YMDH }}/{{ RUN }}echgres.log" - "logs/{{ cycle_YMDH }}/{{ RUN }}esfc.log" - {% for grp in range(iaufhrs | length) %} + {% for grp in range(IAUFHRS | length) %} - "logs/{{ cycle_YMDH }}/{{ RUN }}ecen{{ '%03d' % grp }}.log" {% endfor %} @@ -68,7 +68,7 @@ enkf: {% if DOIAU %} # IAU increments/analyses - {% for fhr in iaufhrs if fhr != 6 %} + {% for fhr in IAUFHRS if fhr != 6 %} {% if do_calc_increment %} # Store analyses instead of increments - "{{ COMIN_ATMOS_ANALYSIS_ENSSTAT | relpath(ROTDIR) }}/{{ head }}atma{{ '%03d' % fhr }}.ensmean.nc" diff --git a/parm/archive/enkf_restarta_grp.yaml.j2 b/parm/archive/enkf_restarta_grp.yaml.j2 index 41e03edc92..13c49d4239 100644 --- a/parm/archive/enkf_restarta_grp.yaml.j2 +++ b/parm/archive/enkf_restarta_grp.yaml.j2 @@ -36,14 +36,14 @@ enkf_restarta_grp: {% endif %} # Member increments - {% for iaufhr in iaufhrs if iaufhr != 6 %} + {% for iaufhr in IAUFHRS if iaufhr != 6 %} {% set iaufhr = iaufhr %} {% if do_calc_increment %} - "{{ COMIN_ATMOS_ANALYSIS_MEM | relpath(ROTDIR) }}/{{ head }}atma{{ '%03d' % iaufhr }}.nc" {% else %} - "{{ COMIN_ATMOS_ANALYSIS_MEM | relpath(ROTDIR) }}/{{ head }}ratmi{{ '%03d' % iaufhr }}.nc" {% endif %} - {% endfor %} # iaufhr in iaufhrs + {% endfor %} # iaufhr in IAUFHRS # Conventional data {% if not lobsdiag_forenkf and not DO_JEDIATMENS %} diff --git a/parm/archive/gdas.yaml.j2 b/parm/archive/gdas.yaml.j2 index ce5054a82f..db92141ede 100644 --- a/parm/archive/gdas.yaml.j2 +++ b/parm/archive/gdas.yaml.j2 @@ -49,7 +49,7 @@ gdas: - "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}atmanl.ensres.nc" {% if DOIAU %} # Ensemble IAU analysis residuals - {% for fhr in iaufhrs if fhr != 6 %} + {% for fhr in IAUFHRS if fhr != 6 %} - "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}atma{{ '%03d' % fhr }}.ensres.nc" {% endfor %} {% endif %} @@ -108,7 +108,7 @@ gdas: {% endif %} # End of cycled data # Forecast and post logs - - "logs/{{ cycle_YMDH }}/{{ RUN }}fcst.log" + - "logs/{{ cycle_YMDH }}/{{ RUN }}fcst_seg0.log" {% for fhr in range(0, FHMAX + 1, 3) %} {% set fhr3 = '%03d' % fhr %} diff --git a/parm/archive/gdas_restarta.yaml.j2 b/parm/archive/gdas_restarta.yaml.j2 index 4c0522fed7..9d86292065 100644 --- a/parm/archive/gdas_restarta.yaml.j2 +++ b/parm/archive/gdas_restarta.yaml.j2 @@ -6,7 +6,7 @@ gdas_restarta: # Deterministic analysis increments - "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}atminc.nc" # IAU increments - {% for iaufhr in iaufhrs if iaufhr != 6 %} + {% for iaufhr in IAUFHRS if iaufhr != 6 %} - "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}atmi{{ "%03d" % iaufhr }}.nc" {% endfor %} diff --git a/parm/archive/gfs_netcdfa.yaml.j2 b/parm/archive/gfs_netcdfa.yaml.j2 index 8c0d4a813f..5a51f86148 100644 --- a/parm/archive/gfs_netcdfa.yaml.j2 +++ b/parm/archive/gfs_netcdfa.yaml.j2 @@ -6,7 +6,7 @@ gfs_netcdfa: - "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}atmanl.nc" - "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}sfcanl.nc" - "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}atminc.nc" - {% for iauhr in iaufhrs if iauhr != 6 %} + {% for iauhr in IAUFHRS if iauhr != 6 %} - "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}atmi{{ "%03d" % iauhr }}.nc" {% endfor %} optional: diff --git a/parm/archive/master_enkf.yaml.j2 b/parm/archive/master_enkf.yaml.j2 index 3ebd52dbad..bb8b36c3e0 100644 --- a/parm/archive/master_enkf.yaml.j2 +++ b/parm/archive/master_enkf.yaml.j2 @@ -4,28 +4,6 @@ {% set cycle_YMDH = current_cycle | to_YMDH %} {% set head = RUN + ".t" + cycle_HH + "z." %} -# Split IAUFHRS into a list; typically either "3,6,9" or 6 (integer) -{% if IAUFHRS is string %} - # "3,6,9" - {% set iaufhrs = [] %} - {% for iaufhr in IAUFHRS.split(",") %} - {% do iaufhrs.append(iaufhr | int) %} - {% endfor %} -{% else %} - # 6 (integer) - {% set iaufhrs = [IAUFHRS] %} -{% endif %} - -# Repeat for IAUFHRS_ENKF -{% if IAUFHRS_ENKF is string %} - {% set iaufhrs_enkf = [] %} - {% for iaufhr in IAUFHRS_ENKF.split(",") %} - {% do iaufhrs_enkf.append(iaufhr | int) %} - {% endfor %} -{% else %} - {% set iaufhrs_enkf = [IAUFHRS_ENKF] %} -{% endif %} - # Determine which data to archive datasets: {% if ENSGRP == 0 %} diff --git a/parm/archive/master_gdas.yaml.j2 b/parm/archive/master_gdas.yaml.j2 index 30a2175653..11e83d387b 100644 --- a/parm/archive/master_gdas.yaml.j2 +++ b/parm/archive/master_gdas.yaml.j2 @@ -3,16 +3,6 @@ {% set cycle_YMDH = current_cycle | to_YMDH %} {% set head = "gdas.t" + cycle_HH + "z." %} -# Split IAUFHRS into a list; typically either "3,6,9" or 6 (integer) -{% if IAUFHRS is string %} - {% set iaufhrs = [] %} - {% for iaufhr in IAUFHRS.split(",") %} - {% do iaufhrs.append(iaufhr | int) %} - {% endfor %} -{% else %} - {% set iaufhrs = [IAUFHRS] %} -{% endif %} - datasets: # Always archive atmosphere forecast/analysis data {% filter indent(width=4) %} diff --git a/parm/archive/master_gfs.yaml.j2 b/parm/archive/master_gfs.yaml.j2 index b789598fac..ab9a00c95e 100644 --- a/parm/archive/master_gfs.yaml.j2 +++ b/parm/archive/master_gfs.yaml.j2 @@ -3,18 +3,6 @@ {% set cycle_YMD = current_cycle | to_YMD %} {% set cycle_YMDH = current_cycle | to_YMDH %} -# Split IAUFHRS into a list; typically either "3,6,9" or 6 (integer) -{% if IAUFHRS is string %} - # "3,6,9" - {% set iaufhrs = [] %} - {% for iaufhr in IAUFHRS.split(",") %} - {% do iaufhrs.append(iaufhr | int) %} - {% endfor %} -{% else %} - # 6 (integer) - {% set iaufhrs = [IAUFHRS] %} -{% endif %} - # Determine which data to archive datasets: # Always archive atmosphere forecast/analysis data diff --git a/parm/config/gefs/config.base b/parm/config/gefs/config.base index 735743b568..fad9e3421a 100644 --- a/parm/config/gefs/config.base +++ b/parm/config/gefs/config.base @@ -229,8 +229,11 @@ export gfs_cyc=@gfs_cyc@ # 0: no GFS cycle, 1: 00Z only, 2: 00Z and 12Z only, 4: # GFS output and frequency export FHMIN_GFS=0 -export FHMIN=${FHMIN_GFS} -export FHMAX_GFS=@FHMAX_GFS@ +export FHMAX_GFS="@FHMAX_GFS@" +# Intermediate times to stop forecast when running in segments +breakpnts="@FCST_BREAKPOINTS@" +export FCST_SEGMENTS="${FHMIN_GFS},${breakpnts:+${breakpnts},}${FHMAX_GFS}" + export FHOUT_GFS=6 export FHMAX_HF_GFS=@FHMAX_HF_GFS@ export FHOUT_HF_GFS=1 diff --git a/parm/config/gefs/config.fcst b/parm/config/gefs/config.fcst index e66fc15f87..e6dc335b79 100644 --- a/parm/config/gefs/config.fcst +++ b/parm/config/gefs/config.fcst @@ -30,14 +30,19 @@ string="--fv3 ${CASE}" # shellcheck disable=SC2086 source "${EXPDIR}/config.ufs" ${string} -# shellcheck disable=SC2153 -export FHMAX=${FHMAX_GFS} +# Convert comma-separated string into bash array +IFS=', ' read -ra segments <<< "${FCST_SEGMENTS}" +# Determine MIN and MAX based on the forecast segment +export FHMIN=${segments[${FCST_SEGMENT}]} +export FHMAX=${segments[${FCST_SEGMENT}+1]} +# Cap other FHMAX variables at FHMAX for the segment +export FHMAX_HF=$(( FHMAX_HF_GFS > FHMAX ? FHMAX : FHMAX_HF_GFS )) +export FHMAX_WAV=$(( FHMAX_WAV > FHMAX ? FHMAX : FHMAX_WAV )) # shellcheck disable=SC2153 export FHOUT=${FHOUT_GFS} -export FHMAX_HF=${FHMAX_HF_GFS} export FHOUT_HF=${FHOUT_HF_GFS} export FHOUT_OCN=${FHOUT_OCN_GFS} -export FHOUT_ICE=${FHOUT_ICE_GFS} +export FHOUT_ICE=${FHOUT_ICE_GFS} # Get task specific resources source "${EXPDIR}/config.resources" fcst diff --git a/parm/config/gefs/yaml/defaults.yaml b/parm/config/gefs/yaml/defaults.yaml index d2b486e7ca..e4666d1aba 100644 --- a/parm/config/gefs/yaml/defaults.yaml +++ b/parm/config/gefs/yaml/defaults.yaml @@ -11,5 +11,6 @@ base: DO_EXTRACTVARS: "NO" FHMAX_GFS: 120 FHMAX_HF_GFS: 0 + FCST_BREAKPOINTS: "48" REPLAY_ICS: "NO" USE_OCN_PERTURB_FILES: "false" diff --git a/parm/config/gfs/config.aeroanl b/parm/config/gfs/config.aeroanl index 24a5e92644..a1b7e1d44b 100644 --- a/parm/config/gfs/config.aeroanl +++ b/parm/config/gfs/config.aeroanl @@ -24,7 +24,7 @@ if [[ "${DOIAU}" == "YES" ]]; then export aero_bkg_times="3,6,9" export JEDIYAML="${PARMgfs}/gdas/aero/variational/3dvar_fgat_gfs_aero.yaml.j2" else - export aero_bkg_times="6" + export aero_bkg_times="6," # Trailing comma is necessary so this is treated as a list export JEDIYAML="${PARMgfs}/gdas/aero/variational/3dvar_gfs_aero.yaml.j2" fi diff --git a/parm/config/gfs/config.base b/parm/config/gfs/config.base index 56005199aa..2a7ffab0dd 100644 --- a/parm/config/gfs/config.base +++ b/parm/config/gfs/config.base @@ -285,7 +285,10 @@ export gfs_cyc=@gfs_cyc@ # 0: no GFS cycle, 1: 00Z only, 2: 00Z and 12Z only, 4: # GFS output and frequency export FHMIN_GFS=0 -export FHMAX_GFS=@FHMAX_GFS@ +export FHMAX_GFS="@FHMAX_GFS@" +# Intermediate times to stop forecast when running in segments +breakpnts="@FCST_BREAKPOINTS@" +export FCST_SEGMENTS="${FHMIN_GFS},${breakpnts:+${breakpnts},}${FHMAX_GFS}" export FHOUT_GFS=3 # 3 for ops export FHMAX_HF_GFS=@FHMAX_HF_GFS@ export FHOUT_HF_GFS=1 @@ -384,10 +387,10 @@ fi # if 3DVAR and IAU if [[ ${DOHYBVAR} == "NO" && ${DOIAU} == "YES" ]]; then - export IAUFHRS="6" + export IAUFHRS="6," export IAU_FHROT="3" export IAU_FILTER_INCREMENTS=".true." - export IAUFHRS_ENKF="6" + export IAUFHRS_ENKF="6," fi # Generate post-processing ensemble spread files @@ -397,10 +400,10 @@ export ENKF_SPREAD="YES" if [[ "${MODE}" = "cycled" && "${SDATE}" = "${PDY}${cyc}" && ${EXP_WARM_START} = ".false." ]] || [[ "${DOIAU}" = "NO" ]] || [[ "${MODE}" = "forecast-only" && ${EXP_WARM_START} = ".false." ]] ; then export IAU_OFFSET=0 export IAU_FHROT=0 - export IAUFHRS="6" + export IAUFHRS="6," fi -if [[ "${DOIAU_ENKF}" = "NO" ]]; then export IAUFHRS_ENKF="6"; fi +if [[ "${DOIAU_ENKF}" = "NO" ]]; then export IAUFHRS_ENKF="6,"; fi # Determine restart intervals # For IAU, write restarts at beginning of window also diff --git a/parm/config/gfs/config.fcst b/parm/config/gfs/config.fcst index 4982b8f6e6..2743ea0745 100644 --- a/parm/config/gfs/config.fcst +++ b/parm/config/gfs/config.fcst @@ -33,11 +33,16 @@ source "${EXPDIR}/config.ufs" ${string} # Forecast length for GFS forecast case ${RUN} in *gfs) - # shellcheck disable=SC2153 - export FHMAX=${FHMAX_GFS} + # Convert comma-separated string into bash array + IFS=', ' read -ra segments <<< "${FCST_SEGMENTS}" + # Determine MIN and MAX based on the forecast segment + export FHMIN=${segments[${FCST_SEGMENT}]} + export FHMAX=${segments[${FCST_SEGMENT}+1]} + # Cap other FHMAX variables at FHMAX for the segment + export FHMAX_HF=$(( FHMAX_HF_GFS > FHMAX ? FHMAX : FHMAX_HF_GFS )) + export FHMAX_WAV=$(( FHMAX_WAV > FHMAX ? FHMAX : FHMAX_WAV )) # shellcheck disable=SC2153 export FHOUT=${FHOUT_GFS} - export FHMAX_HF=${FHMAX_HF_GFS} export FHOUT_HF=${FHOUT_HF_GFS} export FHOUT_OCN=${FHOUT_OCN_GFS} export FHOUT_ICE=${FHOUT_ICE_GFS} diff --git a/parm/config/gfs/yaml/defaults.yaml b/parm/config/gfs/yaml/defaults.yaml index da4d587dff..24729ac43e 100644 --- a/parm/config/gfs/yaml/defaults.yaml +++ b/parm/config/gfs/yaml/defaults.yaml @@ -16,6 +16,7 @@ base: DO_METP: "YES" FHMAX_GFS: 120 FHMAX_HF_GFS: 0 + FCST_BREAKPOINTS: "" DO_VRFY_OCEANDA: "NO" GSI_SOILANAL: "NO" EUPD_CYC: "gdas" diff --git a/sorc/wxflow b/sorc/wxflow index d314e06510..e1ef697430 160000 --- a/sorc/wxflow +++ b/sorc/wxflow @@ -1 +1 @@ -Subproject commit d314e065101041a4d45e5a11ec19cd2dc5f38c67 +Subproject commit e1ef697430c09d2b1a0560f21f11c7a32ed5f3e2 diff --git a/ush/calcanl_gfs.py b/ush/calcanl_gfs.py index 5d97d25dfd..9dc6ff9fa6 100755 --- a/ush/calcanl_gfs.py +++ b/ush/calcanl_gfs.py @@ -11,6 +11,7 @@ import gsi_utils from collections import OrderedDict import datetime +from wxflow import cast_as_dtype python2fortran_bool = {True: '.true.', False: '.false.'} @@ -358,7 +359,7 @@ def calcanl_gfs(DoIAU, l4DEnsVar, Write4Danl, ComOut, APrefix, ExecAnl = os.getenv('CALCANLEXEC', './calc_analysis.x') ExecChgresInc = os.getenv('CHGRESINCEXEC', './interp_inc.x') NEMSGet = os.getenv('NEMSIOGET', 'nemsio_get') - IAUHrs = list(map(int, os.getenv('IAUFHRS', '6').split(','))) + IAUHrs = cast_as_dtype(os.getenv('IAUFHRS', '6,')) Run = os.getenv('RUN', 'gdas') JEDI = gsi_utils.isTrue(os.getenv('DO_JEDIATMVAR', 'YES')) diff --git a/ush/forecast_predet.sh b/ush/forecast_predet.sh index ebf7cfd282..6b72f574d8 100755 --- a/ush/forecast_predet.sh +++ b/ush/forecast_predet.sh @@ -77,6 +77,7 @@ common_predet(){ CDATE=${CDATE:-"${PDY}${cyc}"} ENSMEM=${ENSMEM:-000} + MEMBER=$(( 10#${ENSMEM:-"-1"} )) # -1: control, 0: ensemble mean, >0: ensemble member $MEMBER # Define significant cycles half_window=$(( assim_freq / 2 )) @@ -154,7 +155,6 @@ FV3_predet(){ FV3_OUTPUT_FH="${FV3_OUTPUT_FH} $(seq -s ' ' "${fhr}" "${FHOUT}" "${FHMAX}")" # Other options - MEMBER=$(( 10#${ENSMEM:-"-1"} )) # -1: control, 0: ensemble mean, >0: ensemble member $MEMBER PREFIX_ATMINC=${PREFIX_ATMINC:-""} # allow ensemble to use recentered increment # IAU options diff --git a/ush/python/pygfs/task/aero_analysis.py b/ush/python/pygfs/task/aero_analysis.py index 69a992d7d4..ccc5fb601a 100644 --- a/ush/python/pygfs/task/aero_analysis.py +++ b/ush/python/pygfs/task/aero_analysis.py @@ -46,7 +46,7 @@ def __init__(self, config): 'npz_anl': self.task_config['LEVS'] - 1, 'AERO_WINDOW_BEGIN': _window_begin, 'AERO_WINDOW_LENGTH': f"PT{self.task_config['assim_freq']}H", - 'aero_bkg_fhr': map(int, str(self.task_config['aero_bkg_times']).split(',')), + 'aero_bkg_fhr': self.task_config['aero_bkg_times'], 'OPREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", 'APREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", 'GPREFIX': f"gdas.t{self.task_config.previous_cycle.hour:02d}z.", diff --git a/ush/python/pygfs/task/aero_prepobs.py b/ush/python/pygfs/task/aero_prepobs.py index d8396fe3ca..be58fa43a5 100644 --- a/ush/python/pygfs/task/aero_prepobs.py +++ b/ush/python/pygfs/task/aero_prepobs.py @@ -31,7 +31,7 @@ def __init__(self, config: Dict[str, Any]) -> None: { 'window_begin': _window_begin, 'window_end': _window_end, - 'sensors': str(self.task_config['SENSORS']).split(','), + 'sensors': self.task_config['SENSORS'], 'data_dir': self.task_config['VIIRS_DATA_DIR'], 'input_files': '', 'OPREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", diff --git a/workflow/applications/applications.py b/workflow/applications/applications.py index 97a77c2c21..8c1f69735e 100644 --- a/workflow/applications/applications.py +++ b/workflow/applications/applications.py @@ -75,6 +75,10 @@ def __init__(self, conf: Configuration) -> None: self.do_hpssarch = _base.get('HPSSARCH', False) self.nens = _base.get('NMEM_ENS', 0) + self.fcst_segments = _base.get('FCST_SEGMENTS', None) + + if not AppConfig.is_monotonic(self.fcst_segments): + raise ValueError(f'Forecast segments do not increase monotonically: {",".join(self.fcst_segments)}') self.wave_runs = None if self.do_wave: @@ -208,3 +212,26 @@ def get_gfs_interval(gfs_cyc: int) -> timedelta: return to_timedelta(gfs_internal_map[str(gfs_cyc)]) except KeyError: raise KeyError(f'Invalid gfs_cyc = {gfs_cyc}') + + @staticmethod + def is_monotonic(test_list: List, check_decreasing: bool = False) -> bool: + """ + Determine if an array is monotonically increasing or decreasing + + TODO: Move this into wxflow somewhere + + Inputs + test_list: List + A list of comparable values to check + check_decreasing: bool [default: False] + Check whether list is monotonically decreasing + + Returns + bool: Whether the list is monotonically increasing (if check_decreasing + if False) or decreasing (if check_decreasing is True) + + """ + if check_decreasing: + return all(x > y for x, y in zip(test_list, test_list[1:])) + else: + return all(x < y for x, y in zip(test_list, test_list[1:])) diff --git a/workflow/rocoto/gefs_tasks.py b/workflow/rocoto/gefs_tasks.py index e78ac96d83..f0f73d1173 100644 --- a/workflow/rocoto/gefs_tasks.py +++ b/workflow/rocoto/gefs_tasks.py @@ -138,19 +138,34 @@ def fcst(self): dependencies = rocoto.create_dependency(dep_condition='and', dep=dependencies) + num_fcst_segments = len(self.app_config.fcst_segments) - 1 + + fcst_vars = self.envars.copy() + fcst_envars_dict = {'FCST_SEGMENT': '#seg#'} + for key, value in fcst_envars_dict.items(): + fcst_vars.append(rocoto.create_envar(name=key, value=str(value))) + resources = self.get_resource('fcst') - task_name = f'fcst_mem000' + task_name = f'fcst_mem000_seg#seg#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, - 'envars': self.envars, + 'envars': fcst_vars, 'cycledef': 'gefs', 'command': f'{self.HOMEgfs}/jobs/rocoto/fcst.sh', 'job_name': f'{self.pslot}_{task_name}_@H', 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', 'maxtries': '&MAXTRIES;' } - task = rocoto.create_task(task_dict) + + seg_var_dict = {'seg': ' '.join([f"{seg}" for seg in range(0, num_fcst_segments)])} + metatask_dict = {'task_name': f'fcst_mem000', + 'is_serial': True, + 'var_dict': seg_var_dict, + 'task_dict': task_dict + } + + task = rocoto.create_task(metatask_dict) return task @@ -169,36 +184,60 @@ def efcs(self): dependencies = rocoto.create_dependency(dep_condition='and', dep=dependencies) - efcsenvars = self.envars.copy() - efcsenvars_dict = {'ENSMEM': '#member#', - 'MEMDIR': 'mem#member#' - } - for key, value in efcsenvars_dict.items(): - efcsenvars.append(rocoto.create_envar(name=key, value=str(value))) - + num_fcst_segments = len(self.app_config.fcst_segments) - 1 resources = self.get_resource('efcs') - task_name = f'fcst_mem#member#' - task_dict = {'task_name': task_name, - 'resources': resources, - 'dependency': dependencies, - 'envars': efcsenvars, - 'cycledef': 'gefs', - 'command': f'{self.HOMEgfs}/jobs/rocoto/fcst.sh', - 'job_name': f'{self.pslot}_{task_name}_@H', - 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', - 'maxtries': '&MAXTRIES;' - } - - member_var_dict = {'member': ' '.join([f"{mem:03d}" for mem in range(1, self.nmem + 1)])} - metatask_dict = {'task_name': 'fcst_ens', - 'var_dict': member_var_dict, - 'task_dict': task_dict + # Kludge to work around bug in rocoto with serial metatasks nested + # in a parallel one (see christopherwharrop/rocoto#109). For now, + # loop over member to create a separate metatask for each instead + # of a metatask of a metatask. + # + tasks = [] + for member in [f"{mem:03d}" for mem in range(1, self.nmem + 1)]: + + efcsenvars = self.envars.copy() + efcsenvars_dict = {'ENSMEM': f'{member}', + 'MEMDIR': f'mem{member}', + 'FCST_SEGMENT': '#seg#' + } + for key, value in efcsenvars_dict.items(): + efcsenvars.append(rocoto.create_envar(name=key, value=str(value))) + + task_name = f'fcst_mem{member}_seg#seg#' + task_dict = {'task_name': task_name, + 'resources': resources, + 'dependency': dependencies, + 'envars': efcsenvars, + 'cycledef': 'gefs', + 'command': f'{self.HOMEgfs}/jobs/rocoto/fcst.sh', + 'job_name': f'{self.pslot}_{task_name}_@H', + 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', + 'maxtries': '&MAXTRIES;' } - task = rocoto.create_task(metatask_dict) + seg_var_dict = {'seg': ' '.join([f"{seg}" for seg in range(0, num_fcst_segments)])} + seg_metatask_dict = {'task_name': f'fcst_mem{member}', + 'is_serial': True, + 'var_dict': seg_var_dict, + 'task_dict': task_dict + } - return task + tasks.append(rocoto.create_task(seg_metatask_dict)) + + return '\n'.join(tasks) + + # Keeping this in hopes the kludge is no longer necessary at some point + # + # member_var_dict = {'member': ' '.join([f"{mem:03d}" for mem in range(1, self.nmem + 1)])} + # mem_metatask_dict = {'task_name': 'fcst_ens', + # 'is_serial': False, + # 'var_dict': member_var_dict, + # 'task_dict': seg_metatask_dict + # } + + # task = rocoto.create_task(mem_metatask_dict) + + # return task def atmos_prod(self): return self._atmosoceaniceprod('atmos') @@ -236,7 +275,7 @@ def _atmosoceaniceprod(self, component: str): if component in ['ocean']: dep_dict = {'type': 'data', 'data': data, 'age': 120} deps.append(rocoto.add_dependency(dep_dict)) - dep_dict = {'type': 'task', 'name': 'fcst_mem#member#'} + dep_dict = {'type': 'metatask', 'name': 'fcst_mem#member#'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps, dep_condition='or') elif component in ['ice']: @@ -384,7 +423,7 @@ def wavepostsbs(self): def wavepostbndpnt(self): deps = [] - dep_dict = {'type': 'task', 'name': f'fcst_mem#member#'} + dep_dict = {'type': 'metatask', 'name': f'fcst_mem#member#'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps) @@ -429,7 +468,7 @@ def wavepostbndpntbll(self): dep_dict = {'type': 'data', 'data': data} deps.append(rocoto.add_dependency(dep_dict)) - dep_dict = {'type': 'task', 'name': f'fcst_mem#member#'} + dep_dict = {'type': 'metatask', 'name': f'fcst_mem#member#'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='or', dep=deps) @@ -465,7 +504,7 @@ def wavepostbndpntbll(self): def wavepostpnt(self): deps = [] - dep_dict = {'type': 'task', 'name': f'fcst_mem#member#'} + dep_dict = {'type': 'metatask', 'name': f'fcst_mem#member#'} deps.append(rocoto.add_dependency(dep_dict)) if self.app_config.do_wave_bnd: dep_dict = {'type': 'task', 'name': f'wave_post_bndpnt_bull_mem#member#'} diff --git a/workflow/rocoto/gfs_tasks.py b/workflow/rocoto/gfs_tasks.py index 960a7548ab..9d9b28fb17 100644 --- a/workflow/rocoto/gfs_tasks.py +++ b/workflow/rocoto/gfs_tasks.py @@ -688,7 +688,7 @@ def ocnanalprep(self): deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'task', 'name': f'{self.run}marinebmat'} deps.append(rocoto.add_dependency(dep_dict)) - dep_dict = {'type': 'task', 'name': 'gdasfcst', 'offset': f"-{timedelta_to_HMS(self._base['cycle_interval'])}"} + dep_dict = {'type': 'metatask', 'name': 'gdasfcst', 'offset': f"-{timedelta_to_HMS(self._base['cycle_interval'])}"} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) @@ -880,12 +880,22 @@ def _fcst_forecast_only(self): dependencies = rocoto.create_dependency(dep_condition='and', dep=dependencies) + if self.run in ['gfs']: + num_fcst_segments = len(self.app_config.fcst_segments) - 1 + else: + num_fcst_segments = 1 + + fcst_vars = self.envars.copy() + fcst_envars_dict = {'FCST_SEGMENT': '#seg#'} + for key, value in fcst_envars_dict.items(): + fcst_vars.append(rocoto.create_envar(name=key, value=str(value))) + resources = self.get_resource('fcst') - task_name = f'{self.run}fcst' + task_name = f'{self.run}fcst_seg#seg#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, - 'envars': self.envars, + 'envars': fcst_vars, 'cycledef': self.run.replace('enkf', ''), 'command': f'{self.HOMEgfs}/jobs/rocoto/fcst.sh', 'job_name': f'{self.pslot}_{task_name}_@H', @@ -893,7 +903,14 @@ def _fcst_forecast_only(self): 'maxtries': '&MAXTRIES;' } - task = rocoto.create_task(task_dict) + seg_var_dict = {'seg': ' '.join([f"{seg}" for seg in range(0, num_fcst_segments)])} + metatask_dict = {'task_name': f'{self.run}fcst', + 'is_serial': True, + 'var_dict': seg_var_dict, + 'task_dict': task_dict + } + + task = rocoto.create_task(metatask_dict) return task @@ -929,12 +946,22 @@ def _fcst_cycled(self): cycledef = 'gdas_half,gdas' if self.run in ['gdas'] else self.run + if self.run in ['gfs']: + num_fcst_segments = len(self.app_config.fcst_segments) - 1 + else: + num_fcst_segments = 1 + + fcst_vars = self.envars.copy() + fcst_envars_dict = {'FCST_SEGMENT': '#seg#'} + for key, value in fcst_envars_dict.items(): + fcst_vars.append(rocoto.create_envar(name=key, value=str(value))) + resources = self.get_resource('fcst') - task_name = f'{self.run}fcst' + task_name = f'{self.run}fcst_seg#seg#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, - 'envars': self.envars, + 'envars': fcst_vars, 'cycledef': cycledef, 'command': f'{self.HOMEgfs}/jobs/rocoto/fcst.sh', 'job_name': f'{self.pslot}_{task_name}_@H', @@ -942,7 +969,14 @@ def _fcst_cycled(self): 'maxtries': '&MAXTRIES;' } - task = rocoto.create_task(task_dict) + seg_var_dict = {'seg': ' '.join([f"{seg}" for seg in range(0, num_fcst_segments)])} + metatask_dict = {'task_name': f'{self.run}fcst', + 'is_serial': True, + 'var_dict': seg_var_dict, + 'task_dict': task_dict + } + + task = rocoto.create_task(metatask_dict) return task @@ -1104,7 +1138,7 @@ def _atmosoceaniceprod(self, component: str): data = f'{history_path}/{history_file_tmpl}' dep_dict = {'type': 'data', 'data': data, 'age': 120} deps.append(rocoto.add_dependency(dep_dict)) - dep_dict = {'type': 'task', 'name': f'{self.run}fcst'} + dep_dict = {'type': 'metatask', 'name': f'{self.run}fcst'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps, dep_condition='or') @@ -1169,7 +1203,7 @@ def wavepostsbs(self): def wavepostbndpnt(self): deps = [] - dep_dict = {'type': 'task', 'name': f'{self.run}fcst'} + dep_dict = {'type': 'metatask', 'name': f'{self.run}fcst'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps) @@ -1221,7 +1255,7 @@ def wavepostbndpntbll(self): def wavepostpnt(self): deps = [] - dep_dict = {'type': 'task', 'name': f'{self.run}fcst'} + dep_dict = {'type': 'metatask', 'name': f'{self.run}fcst'} deps.append(rocoto.add_dependency(dep_dict)) if self.app_config.do_wave_bnd: dep_dict = {'type': 'task', 'name': f'{self.run}wavepostbndpntbll'} @@ -1318,7 +1352,7 @@ def waveawipsgridded(self): def postsnd(self): deps = [] - dep_dict = {'type': 'task', 'name': f'{self.run}fcst'} + dep_dict = {'type': 'metatask', 'name': f'{self.run}fcst'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps) @@ -1824,8 +1858,9 @@ def metp(self): } metatask_dict = {'task_name': f'{self.run}metp', + 'is_serial': True, 'task_dict': task_dict, - 'var_dict': var_dict + 'var_dict': var_dict, } task = rocoto.create_task(metatask_dict) @@ -2524,7 +2559,7 @@ def ecen(self): def _get_ecengroups(): if self._base.get('DOIAU_ENKF', False): - fhrs = list(self._base.get('IAUFHRS', '6').split(',')) + fhrs = self._base.get('IAUFHRS', '[6]') necengrp = self._configs['ecen']['NECENGRP'] ngrps = necengrp if len(fhrs) > necengrp else len(fhrs) @@ -2666,7 +2701,7 @@ def echgres(self): self._is_this_a_gdas_task(self.run, 'echgres') deps = [] - dep_dict = {'type': 'task', 'name': f'{self.run.replace("enkf","")}fcst'} + dep_dict = {'type': 'metatask', 'name': f'{self.run.replace("enkf","")}fcst'} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'task', 'name': f'{self.run}fcst_mem001'} deps.append(rocoto.add_dependency(dep_dict)) diff --git a/workflow/rocoto/rocoto.py b/workflow/rocoto/rocoto.py index 0abb56cafb..2a20820da8 100644 --- a/workflow/rocoto/rocoto.py +++ b/workflow/rocoto/rocoto.py @@ -56,9 +56,10 @@ def create_task(task_dict: Dict[str, Any]) -> List[str]: else: # There is a nested task_dict, so this is a metatask metataskname = f"{task_dict.get('task_name', 'demometatask')}" + metataskmode = 'serial' if task_dict.get('is_serial', False) else 'parallel' var_dict = task_dict.get('var_dict', None) - strings = [f'\n', + strings = [f'\n', '\n'] if var_dict is None: