Skip to content

Commit

Permalink
Add capability to run forecast in segments
Browse files Browse the repository at this point in the history
Adds the ability to run a forecast in segments instead of all at
once. To accomplish this, the `FHMIN_GFS` and `FHMAX_GFS` settings
have been replaced as user-setable variables in favor of
`FCST_SEGMENTS_STR_GFS`, a comma-separated list of the segment
boundaries (thus there will be one more than the number of segments).
For a traditional single-segment forecast, this would just be set to
`"${FHMIN_GFS},${FHMAX_GFS}"`.

The comma-separated list had to be used instead of a bash array as
the variable must be exported in order for the rocoto generator to
see it, and arrays cannot be exported from shell. Capabilty to parse
these into python lists was added to wxflow in an accompanying PR.

To accomodate the new segment metatasks that must be run serially,
the capability of `create_task()` was expanded to allow a dictionary
key of `is_serial`, which controls whether a metatask is parallel or
serial using pre-existing capability in rocoto. The default when not
given is parallel (i.e. most metatasks).

Resolves #2274
Refs NOAA-EMC/wxflow#39
  • Loading branch information
WalterKolczynski-NOAA committed Jul 26, 2024
1 parent f156a78 commit 8ce6f94
Show file tree
Hide file tree
Showing 12 changed files with 148 additions and 47 deletions.
16 changes: 13 additions & 3 deletions parm/config/gefs/config.base
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,19 @@ export FHOUT_ICE=3
export gfs_cyc=@gfs_cyc@ # 0: no GFS cycle, 1: 00Z only, 2: 00Z and 12Z only, 4: all 4 cycles.

# GFS output and frequency
export FHMIN_GFS=0
export FHMIN=${FHMIN_GFS}
export FHMAX_GFS=@FHMAX_GFS@
# Forecast hour intervals to run the forecast over
# For a single-segment forecast, this is simply "$FHMIN_GFS,$FHMAX_GFS"
export FCST_SEGMENTS_STR_GFS="@FCST_SEGMENTS_GFS@"
IFS=', ' read -ra FCST_SEGMENTS_GFS <<< "${FCST_SEGMENTS_STR_GFS}"
if (( ${FCST_SEGMENT:- -1} < 0 )); then
# Jobs other than the forecast don't care about segments, only the
# absolute start and end
declare -x FHMIN_GFS=${FCST_SEGMENTS_GFS[0]}
declare -x FHMAX_GFS=${FCST_SEGMENTS_GFS[-1]}
else
declare -x FHMIN_GFS=${FCST_SEGMENTS_GFS[${FCST_SEGMENT}]}
declare -x FHMAX_GFS=${FCST_SEGMENTS_GFS[${FCST_SEGMENT}+1]}
fi
export FHOUT_GFS=6
export FHMAX_HF_GFS=@FHMAX_HF_GFS@
export FHOUT_HF_GFS=1
Expand Down
1 change: 1 addition & 0 deletions parm/config/gefs/config.fcst
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ string="--fv3 ${CASE}"
# shellcheck disable=SC2086
source "${EXPDIR}/config.ufs" ${string}

export FHMIN=${FHMIN_GFS}
# shellcheck disable=SC2153
export FHMAX=${FHMAX_GFS}
# shellcheck disable=SC2153
Expand Down
2 changes: 1 addition & 1 deletion parm/config/gefs/yaml/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ base:
DO_AWIPS: "NO"
KEEPDATA: "NO"
DO_EXTRACTVARS: "NO"
FHMAX_GFS: 120
FCST_SEGMENTS_GFS: "0,48,120"
FHMAX_HF_GFS: 0
REPLAY_ICS: "NO"
USE_OCN_PERTURB_FILES: "false"
15 changes: 13 additions & 2 deletions parm/config/gfs/config.base
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,19 @@ export EUPD_CYC="@EUPD_CYC@"
export gfs_cyc=@gfs_cyc@ # 0: no GFS cycle, 1: 00Z only, 2: 00Z and 12Z only, 4: all 4 cycles.

# GFS output and frequency
export FHMIN_GFS=0
export FHMAX_GFS=@FHMAX_GFS@
# Comma-separated forecast hour intervals to run the forecast over
# For a single-segment forecast, this is simply "$FHMIN_GFS,$FHMAX_GFS"
export FCST_SEGMENTS_STR_GFS="@FCST_SEGMENTS_GFS@"
IFS=', ' read -ra FCST_SEGMENTS_GFS <<< "${FCST_SEGMENTS_STR_GFS}"
if (( ${FCST_SEGMENT:- -1} < 0 )); then
# Jobs other than the forecast don't care about segments, only the
# absolute start and end
declare -x FHMIN_GFS=${FCST_SEGMENTS_GFS[0]}
declare -x FHMAX_GFS=${FCST_SEGMENTS_GFS[-1]}
else
declare -x FHMIN_GFS=${FCST_SEGMENTS_GFS[${FCST_SEGMENT}]}
declare -x FHMAX_GFS=${FCST_SEGMENTS_GFS[${FCST_SEGMENT}+1]}
fi
export FHOUT_GFS=3 # 3 for ops
export FHMAX_HF_GFS=@FHMAX_HF_GFS@
export FHOUT_HF_GFS=1
Expand Down
1 change: 1 addition & 0 deletions parm/config/gfs/config.fcst
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ source "${EXPDIR}/config.ufs" ${string}
# Forecast length for GFS forecast
case ${RUN} in
*gfs)
export FHMIN=${FHMIN_GFS}
# shellcheck disable=SC2153
export FHMAX=${FHMAX_GFS}
# shellcheck disable=SC2153
Expand Down
2 changes: 1 addition & 1 deletion parm/config/gfs/yaml/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ base:
DO_GENESIS: "YES"
DO_GENESIS_FSU: "NO"
DO_METP: "YES"
FHMAX_GFS: 120
FCST_SEGMENTS_GFS: "0,120"
FHMAX_HF_GFS: 0
DO_VRFY_OCEANDA: "NO"
GSI_SOILANAL: "NO"
Expand Down
2 changes: 1 addition & 1 deletion sorc/wxflow
2 changes: 1 addition & 1 deletion ush/forecast_predet.sh
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ common_predet(){

CDATE=${CDATE:-"${PDY}${cyc}"}
ENSMEM=${ENSMEM:-000}
MEMBER=$(( 10#${ENSMEM:-"-1"} )) # -1: control, 0: ensemble mean, >0: ensemble member $MEMBER

# Define significant cycles
half_window=$(( assim_freq / 2 ))
Expand Down Expand Up @@ -154,7 +155,6 @@ FV3_predet(){
FV3_OUTPUT_FH="${FV3_OUTPUT_FH} $(seq -s ' ' "${fhr}" "${FHOUT}" "${FHMAX}")"

# Other options
MEMBER=$(( 10#${ENSMEM:-"-1"} )) # -1: control, 0: ensemble mean, >0: ensemble member $MEMBER
PREFIX_ATMINC=${PREFIX_ATMINC:-""} # allow ensemble to use recentered increment

# IAU options
Expand Down
1 change: 1 addition & 0 deletions workflow/applications/applications.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def __init__(self, conf: Configuration) -> None:
self.do_hpssarch = _base.get('HPSSARCH', False)

self.nens = _base.get('NMEM_ENS', 0)
self.fcst_segments = _base.get('FCST_SEGMENTS_STR_GFS', None)

self.wave_cdumps = None
if self.do_wave:
Expand Down
96 changes: 68 additions & 28 deletions workflow/rocoto/gefs_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,19 +138,35 @@ def fcst(self):

dependencies = rocoto.create_dependency(dep_condition='and', dep=dependencies)

num_fcst_segments = len(self.app_config.fcst_segments) - 1

fcst_vars = self.envars.copy()
fcst_envars_dict = {'FCST_SEGMENT': '#seg#'
}
for key, value in fcst_envars_dict.items():
fcst_vars.append(rocoto.create_envar(name=key, value=str(value)))

resources = self.get_resource('fcst')
task_name = f'fcst_mem000'
task_name = f'fcst_mem000_seg#seg#'
task_dict = {'task_name': task_name,
'resources': resources,
'dependency': dependencies,
'envars': self.envars,
'envars': fcst_vars,
'cycledef': 'gefs',
'command': f'{self.HOMEgfs}/jobs/rocoto/fcst.sh',
'job_name': f'{self.pslot}_{task_name}_@H',
'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log',
'maxtries': '&MAXTRIES;'
}
task = rocoto.create_task(task_dict)

seg_var_dict = {'seg': ' '.join([f"{seg}" for seg in range(0, num_fcst_segments)])}
metatask_dict = {'task_name': f'fcst_mem000',
'is_serial': True,
'var_dict': seg_var_dict,
'task_dict': task_dict
}

task = rocoto.create_task(metatask_dict)

return task

Expand All @@ -169,36 +185,60 @@ def efcs(self):

dependencies = rocoto.create_dependency(dep_condition='and', dep=dependencies)

efcsenvars = self.envars.copy()
efcsenvars_dict = {'ENSMEM': '#member#',
'MEMDIR': 'mem#member#'
}
for key, value in efcsenvars_dict.items():
efcsenvars.append(rocoto.create_envar(name=key, value=str(value)))

num_fcst_segments = len(self.app_config.fcst_segments) - 1
resources = self.get_resource('efcs')

task_name = f'fcst_mem#member#'
task_dict = {'task_name': task_name,
'resources': resources,
'dependency': dependencies,
'envars': efcsenvars,
'cycledef': 'gefs',
'command': f'{self.HOMEgfs}/jobs/rocoto/fcst.sh',
'job_name': f'{self.pslot}_{task_name}_@H',
'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log',
'maxtries': '&MAXTRIES;'
}

member_var_dict = {'member': ' '.join([f"{mem:03d}" for mem in range(1, self.nmem + 1)])}
metatask_dict = {'task_name': 'fcst_ens',
'var_dict': member_var_dict,
'task_dict': task_dict
# Kludge to work around bug in rocoto with serial metatasks nested
# in a parallel one (see christopherwharrop/rocoto#109). For now,
# loop over member to create a separate metatask for each instead
# of a metatask of a metatask.
#
tasks=[]
for member in [f"{mem:03d}" for mem in range(1, self.nmem + 1)]:

efcsenvars = self.envars.copy()
efcsenvars_dict = {'ENSMEM': f'{member}',
'MEMDIR': f'mem{member}',
'FCST_SEGMENT': '#seg#'
}
for key, value in efcsenvars_dict.items():
efcsenvars.append(rocoto.create_envar(name=key, value=str(value)))

task_name = f'fcst_mem{member}_seg#seg#'
task_dict = {'task_name': task_name,
'resources': resources,
'dependency': dependencies,
'envars': efcsenvars,
'cycledef': 'gefs',
'command': f'{self.HOMEgfs}/jobs/rocoto/fcst.sh',
'job_name': f'{self.pslot}_{task_name}_@H',
'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log',
'maxtries': '&MAXTRIES;'
}

task = rocoto.create_task(metatask_dict)
seg_var_dict = {'seg': ' '.join([f"{seg}" for seg in range(0, num_fcst_segments)])}
seg_metatask_dict = {'task_name': f'fcst_mem{member}',
'is_serial': True,
'var_dict': seg_var_dict,
'task_dict': task_dict
}

return task
tasks.append(rocoto.create_task(seg_metatask_dict))

return '\n'.join(tasks)

# Keeping this in hopes the kludge is no longer necessary at some point
#
# member_var_dict = {'member': ' '.join([f"{mem:03d}" for mem in range(1, self.nmem + 1)])}
# mem_metatask_dict = {'task_name': 'fcst_ens',
# 'is_serial': False,
# 'var_dict': member_var_dict,
# 'task_dict': seg_metatask_dict
# }

# task = rocoto.create_task(mem_metatask_dict)

# return task

def atmos_prod(self):
return self._atmosoceaniceprod('atmos')
Expand Down
54 changes: 45 additions & 9 deletions workflow/rocoto/gfs_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,20 +880,38 @@ def _fcst_forecast_only(self):

dependencies = rocoto.create_dependency(dep_condition='and', dep=dependencies)

if self.cdump in ['gfs']:
num_fcst_segments = len(self.app_config.fcst_segments) - 1
else:
num_fcst_segments = 1

fcst_vars = self.envars.copy()
fcst_envars_dict = {'FCST_SEGMENT': '#seg#'
}
for key, value in fcst_envars_dict.items():
fcst_vars.append(rocoto.create_envar(name=key, value=str(value)))

resources = self.get_resource('fcst')
task_name = f'{self.cdump}fcst'
task_name = f'{self.cdump}fcst_seg#seg#'
task_dict = {'task_name': task_name,
'resources': resources,
'dependency': dependencies,
'envars': self.envars,
'envars': fcst_vars,
'cycledef': self.cdump.replace('enkf', ''),
'command': f'{self.HOMEgfs}/jobs/rocoto/fcst.sh',
'job_name': f'{self.pslot}_{task_name}_@H',
'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log',
'maxtries': '&MAXTRIES;'
}

task = rocoto.create_task(task_dict)
seg_var_dict = {'seg': ' '.join([f"{seg}" for seg in range(0, num_fcst_segments)])}
metatask_dict = {'task_name': f'{self.cdump}fcst',
'is_serial': True,
'var_dict': seg_var_dict,
'task_dict': task_dict
}

task = rocoto.create_task(metatask_dict)

return task

Expand Down Expand Up @@ -929,20 +947,38 @@ def _fcst_cycled(self):

cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump

if self.cdump in ['gfs']:
num_fcst_segments = len(self.app_config.fcst_segments) - 1
else:
num_fcst_segments = 1

fcst_vars = self.envars.copy()
fcst_envars_dict = {'FCST_SEGMENT', '#seg#'
}
for key, value in fcst_envars_dict.items():
fcst_vars.append(rocoto.create_envar(name=key, value=str(value)))

resources = self.get_resource('fcst')
task_name = f'{self.cdump}fcst'
task_name = f'{self.cdump}fcst_seg#seg#'
task_dict = {'task_name': task_name,
'resources': resources,
'dependency': dependencies,
'envars': self.envars,
'envars': fcst_vars,
'cycledef': cycledef,
'command': f'{self.HOMEgfs}/jobs/rocoto/fcst.sh',
'job_name': f'{self.pslot}_{task_name}_@H',
'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log',
'maxtries': '&MAXTRIES;'
}

task = rocoto.create_task(task_dict)
seg_var_dict = {'seg': ' '.join([f"{seg}" for seg in range(0, num_fcst_segments)])}
metatask_dict = {'task_name': f'{self.cdump}fcst',
'is_serial': True,
'var_dict': seg_var_dict,
'task_dict': task_dict
}

task = rocoto.create_task(metatask_dict)

return task

Expand Down Expand Up @@ -1166,7 +1202,7 @@ def wavepostsbs(self):

def wavepostbndpnt(self):
deps = []
dep_dict = {'type': 'task', 'name': f'{self.cdump}fcst'}
dep_dict = {'type': 'metatask', 'name': f'{self.cdump}fcst'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)

Expand Down Expand Up @@ -1218,7 +1254,7 @@ def wavepostbndpntbll(self):

def wavepostpnt(self):
deps = []
dep_dict = {'type': 'task', 'name': f'{self.cdump}fcst'}
dep_dict = {'type': 'metatask', 'name': f'{self.cdump}fcst'}
deps.append(rocoto.add_dependency(dep_dict))
if self.app_config.do_wave_bnd:
dep_dict = {'type': 'task', 'name': f'{self.cdump}wavepostbndpntbll'}
Expand Down Expand Up @@ -1315,7 +1351,7 @@ def waveawipsgridded(self):

def postsnd(self):
deps = []
dep_dict = {'type': 'task', 'name': f'{self.cdump}fcst'}
dep_dict = {'type': 'metatask', 'name': f'{self.cdump}fcst'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)

Expand Down
3 changes: 2 additions & 1 deletion workflow/rocoto/rocoto.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ def create_task(task_dict: Dict[str, Any]) -> List[str]:
else:
# There is a nested task_dict, so this is a metatask
metataskname = f"{task_dict.get('task_name', 'demometatask')}"
metataskmode = 'serial' if task_dict.get('is_serial', False) else 'parallel'
var_dict = task_dict.get('var_dict', None)

strings = [f'<metatask name="{metataskname}">\n',
strings = [f'<metatask name="{metataskname}" mode="{metataskmode}">\n',
'\n']

if var_dict is None:
Expand Down

0 comments on commit 8ce6f94

Please sign in to comment.