Skip to content

Commit

Permalink
Eliminate post groups (NOAA-EMC#2667)
Browse files Browse the repository at this point in the history
Eliminates the post groups used for upp and products jobs so that each
task only processes one forecast hour. This is more efficient and
greatly simplifies downstream dependencies that depend on a specific
forecast hour.

Resolves NOAA-EMC#2666
Refs NOAA-EMC#2642
  • Loading branch information
WalterKolczynski-NOAA authored Jun 20, 2024
1 parent 0b810c8 commit 8993b42
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 101 deletions.
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,14 @@ scripts/exemcsfc_global_sfc_prep.sh
scripts/exgdas_global_marine_analysis_ecen.py
scripts/exglobal_prep_ocean_obs.py
# ush symlinks
ush/bufr2ioda_insitu_profile_argo.py
ush/bufr2ioda_insitu_profile_bathy.py
ush/bufr2ioda_insitu_profile_glider.py
ush/bufr2ioda_insitu_profile_marinemammal.py
ush/bufr2ioda_insitu_profile_tesac.py
ush/bufr2ioda_insitu_profile_xbtctd.py
ush/bufr2ioda_insitu_surface_altkob.py
ush/bufr2ioda_insitu_surface_trkob.py
ush/chgres_cube.sh
ush/emcsfc_ice_blend.sh
ush/emcsfc_snow.sh
Expand Down
20 changes: 5 additions & 15 deletions jobs/rocoto/atmos_ensstat.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,11 @@ if (( status != 0 )); then exit "${status}"; fi
export job="atmos_ensstat"
export jobid="${job}.$$"

###############################################################
# shellcheck disable=SC2153,SC2001
IFS='_' read -ra fhrs <<< "${FHRLST//f}" # strip off the 'f's and convert to array
export FORECAST_HOUR=$(( 10#${FHR3} ))

#---------------------------------------------------------------
###############################################################
# Execute the JJOB
for fhr in "${fhrs[@]}"; do
# The analysis fhr is -001. Performing math on negative, leading 0 integers is tricky.
# The negative needs to be in front of "10#", so do some regex magic to make it happen.
fhr="10#${fhr}"
fhr=${fhr//10\#-/-10\#}
export FORECAST_HOUR=$(( fhr ))
"${HOMEgfs}/jobs/JGLOBAL_ATMOS_ENSSTAT"
status=$?
if (( status != 0 )); then exit "${status}"; fi
done
###############################################################
"${HOMEgfs}/jobs/JGLOBAL_ATMOS_ENSSTAT"

exit 0
exit $?
22 changes: 7 additions & 15 deletions jobs/rocoto/atmos_products.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,13 @@ if (( status != 0 )); then exit "${status}"; fi
export job="atmos_products"
export jobid="${job}.$$"

###############################################################
# shellcheck disable=SC2153,SC2001
IFS='_' read -ra fhrs <<< "${FHRLST//f}" # strip off the 'f's and convert to array
# Negatation needs to be before the base
fhr3_base="10#${FHR3}"
export FORECAST_HOUR=$(( ${fhr3_base/10#-/-10#} ))

#---------------------------------------------------------------
###############################################################
# Execute the JJOB
for fhr in "${fhrs[@]}"; do
# The analysis fhr is -001. Performing math on negative, leading 0 integers is tricky.
# The negative needs to be in front of "10#", so do some regex magic to make it happen.
fhr="10#${fhr}"
fhr=${fhr//10\#-/-10\#}
export FORECAST_HOUR=$(( fhr ))
"${HOMEgfs}/jobs/JGLOBAL_ATMOS_PRODUCTS"
status=$?
if (( status != 0 )); then exit "${status}"; fi
done
###############################################################
"${HOMEgfs}/jobs/JGLOBAL_ATMOS_PRODUCTS"

exit 0
exit $?
18 changes: 6 additions & 12 deletions jobs/rocoto/oceanice_products.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,11 @@ export PYTHONPATH
export job="oceanice_products"
export jobid="${job}.$$"

###############################################################
# shellcheck disable=SC2153,SC2001
IFS='_' read -ra fhrs <<< "${FHRLST//f}" # strip off the 'f's and convert to array
export FORECAST_HOUR=$(( 10#${FHR3} ))

#---------------------------------------------------------------
###############################################################
# Execute the JJOB
for fhr in "${fhrs[@]}"; do
export FORECAST_HOUR=$(( 10#${fhr} ))
"${HOMEgfs}/jobs/JGLOBAL_OCEANICE_PRODUCTS"
status=$?
if (( status != 0 )); then exit "${status}"; fi
done

exit 0
###############################################################
"${HOMEgfs}/jobs/JGLOBAL_OCEANICE_PRODUCTS"

exit $?
17 changes: 6 additions & 11 deletions jobs/rocoto/upp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,11 @@ export PYTHONPATH
export job="upp"
export jobid="${job}.$$"

###############################################################
# shellcheck disable=SC2153,SC2001
IFS='_' read -ra fhrs <<< "${FHRLST//f}" # strip off the 'f's convert to array
export FORECAST_HOUR=$(( 10#${FHR3} ))

###############################################################
# Execute the JJOB
for fhr in "${fhrs[@]}"; do
export FORECAST_HOUR=$(( 10#${fhr} ))
"${HOMEgfs}/jobs/JGLOBAL_ATMOS_UPP"
status=$?
if (( status != 0 )); then exit "${status}"; fi
done

exit 0
###############################################################
"${HOMEgfs}/jobs/JGLOBAL_ATMOS_UPP"

exit $?
2 changes: 2 additions & 0 deletions parm/config/gefs/config.base
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ export ILPOST=1 # gempak output frequency up to F120
export FHMIN_ENKF=${FHMIN_GFS}
export FHMAX_ENKF=${FHMAX_GFS}
export FHOUT_ENKF=${FHOUT_GFS}
export FHOUT_OCN=${FHOUT_OCN_GFS}
export FHOUT_ICE=${FHOUT_ICE_GFS}

# GFS restart interval in hours
export restart_interval_gfs=12
Expand Down
2 changes: 1 addition & 1 deletion scripts/exgdas_enkf_earc.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def main():

# Also import all COMIN* directory and template variables
for key in archive.task_config.keys():
if key.startswith("COMIN"):
if key.startswith("COM"):
archive_dict[key] = archive.task_config[key]

cwd = os.getcwd()
Expand Down
9 changes: 2 additions & 7 deletions workflow/rocoto/gefs_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ def _atmosoceaniceprod(self, component: str):
postenvars = self.envars.copy()
postenvar_dict = {'ENSMEM': '#member#',
'MEMDIR': 'mem#member#',
'FHRLST': '#fhr#',
'FHR3': '#fhr#',
'COMPONENT': component}
for key, value in postenvar_dict.items():
postenvars.append(rocoto.create_envar(name=key, value=str(value)))
Expand All @@ -270,11 +270,6 @@ def _atmosoceaniceprod(self, component: str):
'maxtries': '&MAXTRIES;'}

fhrs = self._get_forecast_hours('gefs', self._configs[config], component)

# ocean/ice components do not have fhr 0 as they are averaged output
if component in ['ocean', 'ice'] and 0 in fhrs:
fhrs.remove(0)

fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])}

fhr_metatask_dict = {'task_name': f'{component}_prod_#member#',
Expand Down Expand Up @@ -303,7 +298,7 @@ def atmos_ensstat(self):
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)

postenvars = self.envars.copy()
postenvar_dict = {'FHRLST': '#fhr#'}
postenvar_dict = {'FHR3': '#fhr#'}
for key, value in postenvar_dict.items():
postenvars.append(rocoto.create_envar(name=key, value=str(value)))

Expand Down
59 changes: 19 additions & 40 deletions workflow/rocoto/gfs_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@ def _fcst_cycled(self):

def atmanlupp(self):
postenvars = self.envars.copy()
postenvar_dict = {'FHRLST': 'f000',
postenvar_dict = {'FHR3': '000',
'UPP_RUN': 'analysis'}
for key, value in postenvar_dict.items():
postenvars.append(rocoto.create_envar(name=key, value=str(value)))
Expand Down Expand Up @@ -975,7 +975,7 @@ def atmanlupp(self):

def atmanlprod(self):
postenvars = self.envars.copy()
postenvar_dict = {'FHRLST': '-f001'}
postenvar_dict = {'FHR3': '-001'}
for key, value in postenvar_dict.items():
postenvars.append(rocoto.create_envar(name=key, value=str(value)))

Expand All @@ -1002,24 +1002,6 @@ def atmanlprod(self):

return task

@staticmethod
def _get_ufs_postproc_grps(cdump, config, component='atmos'):

fhrs = Tasks._get_forecast_hours(cdump, config, component=component)

nfhrs_per_grp = config.get('NFHRS_PER_GROUP', 1)
ngrps = len(fhrs) // nfhrs_per_grp if len(fhrs) % nfhrs_per_grp == 0 else len(fhrs) // nfhrs_per_grp + 1

fhrs = [f'f{fhr:03d}' for fhr in fhrs]
fhrs = np.array_split(fhrs, ngrps)
fhrs = [fhr.tolist() for fhr in fhrs]

grp = ' '.join(f'_{fhr[0]}-{fhr[-1]}' if len(fhr) > 1 else f'_{fhr[0]}' for fhr in fhrs)
dep = ' '.join([fhr[-1] for fhr in fhrs])
lst = ' '.join(['_'.join(fhr) for fhr in fhrs])

return grp, dep, lst

def atmupp(self):
return self._upptask(upp_run='forecast', task_id='atmupp')

Expand All @@ -1032,32 +1014,28 @@ def _upptask(self, upp_run="forecast", task_id="atmupp"):
if upp_run not in VALID_UPP_RUN:
raise KeyError(f"{upp_run} is invalid; UPP_RUN options are: {('|').join(VALID_UPP_RUN)}")

varname1, varname2, varname3 = 'grp', 'dep', 'lst'
varval1, varval2, varval3 = self._get_ufs_postproc_grps(self.cdump, self._configs['upp'])
var_dict = {varname1: varval1, varname2: varval2, varname3: varval3}

postenvars = self.envars.copy()
postenvar_dict = {'FHRLST': '#lst#',
postenvar_dict = {'FHR3': '#fhr#',
'UPP_RUN': upp_run}
for key, value in postenvar_dict.items():
postenvars.append(rocoto.create_envar(name=key, value=str(value)))

atm_hist_path = self._template_to_rocoto_cycstring(self._base["COM_ATMOS_HISTORY_TMPL"])
deps = []
data = f'{atm_hist_path}/{self.cdump}.t@Hz.atm#dep#.nc'
data = f'{atm_hist_path}/{self.cdump}.t@Hz.atmf#fhr#.nc'
dep_dict = {'type': 'data', 'data': data, 'age': 120}
deps.append(rocoto.add_dependency(dep_dict))
data = f'{atm_hist_path}/{self.cdump}.t@Hz.sfc#dep#.nc'
data = f'{atm_hist_path}/{self.cdump}.t@Hz.sfcf#fhr#.nc'
dep_dict = {'type': 'data', 'data': data, 'age': 120}
deps.append(rocoto.add_dependency(dep_dict))
data = f'{atm_hist_path}/{self.cdump}[email protected].log#dep#.txt'
data = f'{atm_hist_path}/{self.cdump}[email protected].logf#fhr#.txt'
dep_dict = {'type': 'data', 'data': data, 'age': 60}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps, dep_condition='and')
cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump
resources = self.get_resource('upp')

task_name = f'{self.cdump}{task_id}#{varname1}#'
task_name = f'{self.cdump}{task_id}_f#fhr#'
task_dict = {'task_name': task_name,
'resources': resources,
'dependency': dependencies,
Expand All @@ -1069,9 +1047,12 @@ def _upptask(self, upp_run="forecast", task_id="atmupp"):
'maxtries': '&MAXTRIES;'
}

fhrs = self._get_forecast_hours(self.cdump, self._configs['upp'])
fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])}

metatask_dict = {'task_name': f'{self.cdump}{task_id}',
'task_dict': task_dict,
'var_dict': var_dict
'var_dict': fhr_var_dict
}

task = rocoto.create_task(metatask_dict)
Expand All @@ -1091,25 +1072,21 @@ def _atmosoceaniceprod(self, component: str):

products_dict = {'atmos': {'config': 'atmos_products',
'history_path_tmpl': 'COM_ATMOS_MASTER_TMPL',
'history_file_tmpl': f'{self.cdump}[email protected].grb2#dep#'},
'history_file_tmpl': f'{self.cdump}[email protected].grb2f#fhr#'},
'ocean': {'config': 'oceanice_products',
'history_path_tmpl': 'COM_OCEAN_HISTORY_TMPL',
'history_file_tmpl': f'{self.cdump}[email protected]_avg.#dep#.nc'},
'history_file_tmpl': f'{self.cdump}[email protected]_avg.f#fhr#.nc'},
'ice': {'config': 'oceanice_products',
'history_path_tmpl': 'COM_ICE_HISTORY_TMPL',
'history_file_tmpl': f'{self.cdump}[email protected]_avg.#dep#.nc'}}
'history_file_tmpl': f'{self.cdump}[email protected]_avg.f#fhr#.nc'}}

component_dict = products_dict[component]
config = component_dict['config']
history_path_tmpl = component_dict['history_path_tmpl']
history_file_tmpl = component_dict['history_file_tmpl']

varname1, varname2, varname3 = 'grp', 'dep', 'lst'
varval1, varval2, varval3 = self._get_ufs_postproc_grps(self.cdump, self._configs[config], component=component)
var_dict = {varname1: varval1, varname2: varval2, varname3: varval3}

postenvars = self.envars.copy()
postenvar_dict = {'FHRLST': '#lst#', 'COMPONENT': component}
postenvar_dict = {'FHR3': '#fhr#', 'COMPONENT': component}
for key, value in postenvar_dict.items():
postenvars.append(rocoto.create_envar(name=key, value=str(value)))

Expand All @@ -1129,7 +1106,7 @@ def _atmosoceaniceprod(self, component: str):
cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump
resources = self.get_resource(component_dict['config'])

task_name = f'{self.cdump}{component}_prod#{varname1}#'
task_name = f'{self.cdump}{component}_prod_f#fhr#'
task_dict = {'task_name': task_name,
'resources': resources,
'dependency': dependencies,
Expand All @@ -1141,9 +1118,11 @@ def _atmosoceaniceprod(self, component: str):
'maxtries': '&MAXTRIES;'
}

fhrs = self._get_forecast_hours(self.cdump, self._configs[config], component)
fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])}
metatask_dict = {'task_name': f'{self.cdump}{component}_prod',
'task_dict': task_dict,
'var_dict': var_dict
'var_dict': fhr_var_dict
}

task = rocoto.create_task(metatask_dict)
Expand Down

0 comments on commit 8993b42

Please sign in to comment.