Skip to content

Commit

Permalink
Consolidate staging settings
Browse files Browse the repository at this point in the history
- Move cycle, rRUN, model_start_date_current_cycle, and
member start/stop settings into new configure function
and out of JJOB.
- Address some reviewer feedback and make corrections.

Refs #2475
  • Loading branch information
KateFriedman-NOAA committed Aug 13, 2024
1 parent ff1246b commit 7ae9ea4
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 68 deletions.
21 changes: 1 addition & 20 deletions jobs/JGLOBAL_STAGE_IC
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,12 @@
source "${HOMEgfs}/ush/preamble.sh"
source "${HOMEgfs}/ush/jjob_header.sh" -e "stage_ic" -c "base stage_ic"

# Define significant cycles
# shellcheck disable=SC2153
half_window=$(( assim_freq / 2 ))
current_cycle="${PDY}${cyc}"
previous_cycle=$(date --utc -d "${PDY} ${cyc} - ${assim_freq} hours" +%Y%m%d%H)
current_cycle_begin=$(date --utc -d "${PDY} ${cyc} - ${half_window} hours" +%Y%m%d%H)
current_cycle_end=$(date --utc -d "${PDY} ${cyc} + ${half_window} hours" +%Y%m%d%H)

# Define model start date for current_cycle as the time the forecast will start
if [[ "${DOIAU:-NO}" == "YES" ]] && [[ "${MODE}" == "cycled" ]] ; then
model_start_date_current_cycle="${current_cycle_begin}"
else
if [[ "${REPLAY_ICS:-NO}" == "YES" ]]; then
model_start_date_current_cycle=${current_cycle_end}
else
model_start_date_current_cycle=${current_cycle}
fi
fi
export current_cycle previous_cycle model_start_date_current_cycle

# Initialize return code
err=0

# Execute staging
"${SCRgfs}/exglobal_stage_ic.py"
err=$?

###############################################################
# Check for errors and exit if any of the above failed
Expand Down
60 changes: 30 additions & 30 deletions parm/stage/stage.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
{% set current_cycle_HH = current_cycle | strftime("%H") %}
{% set previous_cycle_YMD = previous_cycle | to_YMD %}
{% set previous_cycle_HH = previous_cycle | strftime("%H") %}
{% set m_prefix = model_start_date_current_cycle | to_YMD + "." + model_start_date_current_cycle | strftime("%H") + "0000" %}
{% set p_prefix = previous_cycle | to_YMD + "." + previous_cycle | strftime("%H") + "0000" %}
{% set m_prefix = model_start_date_current_cycle | strftime("%Y%m%d.%H0000") %}
{% set p_prefix = previous_cycle | strftime("%Y%m%d.%H0000") %}

####################################################################
# Initial condition to stage
Expand All @@ -37,26 +37,26 @@
{% else %}
{% set mem_char = '' %}
{% endif %}
{% set current_dict = ({ '${ROTDIR}':ROTDIR,
'${RUN}':RUN,
'${YMD}':current_cycle_YMD,
'${HH}':current_cycle_HH,
'${MEMDIR}': mem_char }) %}
{% set previous_dict = ({ '${ROTDIR}':ROTDIR,
'${RUN}':rRUN,
'${YMD}':previous_cycle_YMD,
'${HH}':previous_cycle_HH,
'${MEMDIR}': mem_char }) %}
{% set previous_run_dict = ({ '${ROTDIR}':ROTDIR,
'${RUN}':RUN,
'${YMD}':previous_cycle_YMD,
'${HH}':previous_cycle_HH,
'${MEMDIR}': mem_char }) %}
{% set current_cycle_dict = ({ '${ROTDIR}':ROTDIR,
'${RUN}':RUN,
'${YMD}':current_cycle_YMD,
'${HH}':current_cycle_HH,
'${MEMDIR}': mem_char }) %}
{% set previous_cycle_dict = ({ '${ROTDIR}':ROTDIR,
'${RUN}':rRUN,
'${YMD}':previous_cycle_YMD,
'${HH}':previous_cycle_HH,
'${MEMDIR}': mem_char }) %}
{% set previous_cycle_and_run_dict = ({ '${ROTDIR}':ROTDIR,
'${RUN}':RUN,
'${YMD}':previous_cycle_YMD,
'${HH}':previous_cycle_HH,
'${MEMDIR}': mem_char }) %}

# Initial condition definitions

{% if MODE == "cycled" and RUN == "gdas" %}
{% set COMOUT_ATMOS_ANALYSIS = COM_ATMOS_ANALYSIS_TMPL | replace_tmpl(current_dict) %}
{% set COMOUT_ATMOS_ANALYSIS = COM_ATMOS_ANALYSIS_TMPL | replace_tmpl(current_cycle_dict) %}
analysis:
mkdir:
- "{{ COMOUT_ATMOS_ANALYSIS }}"
Expand All @@ -69,7 +69,7 @@ analysis:
{% endif %}

{% if EXP_WARM_START == True %}
{% set COMOUT_ATMOS_RESTART_PREV = COM_ATMOS_RESTART_TMPL | replace_tmpl(previous_dict) %}
{% set COMOUT_ATMOS_RESTART_PREV = COM_ATMOS_RESTART_TMPL | replace_tmpl(previous_cycle_dict) %}
atmosphere_warm:
mkdir:
- "{{ COMOUT_ATMOS_RESTART_PREV }}"
Expand All @@ -88,7 +88,7 @@ atmosphere_warm:
{% endif %} # path_exists
{% endfor %} # ntile
{% else %} # cold start
{% set COMOUT_ATMOS_INPUT = COM_ATMOS_INPUT_TMPL | replace_tmpl(current_dict) %}
{% set COMOUT_ATMOS_INPUT = COM_ATMOS_INPUT_TMPL | replace_tmpl(current_cycle_dict) %}
atmosphere_cold:
mkdir:
- "{{ COMOUT_ATMOS_INPUT }}"
Expand All @@ -102,7 +102,7 @@ atmosphere_cold:
{% endif %}

{% if REPLAY_ICS == True %}
{% set COMOUT_ATMOS_ANALYSIS = COM_ATMOS_ANALYSIS_TMPL | replace_tmpl(current_dict) %}
{% set COMOUT_ATMOS_ANALYSIS = COM_ATMOS_ANALYSIS_TMPL | replace_tmpl(current_cycle_dict) %}
atmosphere_perturbation:
mkdir:
- "{{ COMOUT_ATMOS_ANALYSIS }}"
Expand All @@ -114,15 +114,15 @@ atmosphere_perturbation:
atmosphere_nest:
{% set ntile = 7 %}
{% if EXP_WARM_START == True %}
{% set COMOUT_ATMOS_RESTART_PREV = COM_ATMOS_RESTART_TMPL | replace_tmpl(previous_dict) %}
{% set COMOUT_ATMOS_RESTART_PREV = COM_ATMOS_RESTART_TMPL | replace_tmpl(previous_cycle_dict) %}
mkdir:
- "{{ COMOUT_ATMOS_RESTART_PREV }}"
copy:
{% for ftype in ["ca_data", "fv_core.res", "fv_srf_wnd.res", "fv_tracer.res", "phy_data", "sfc_data"] %}
- ["{{ ICSDIR }}/{{ COMOUT_ATMOS_RESTART_PREV | relpath(ROTDIR) }}/{{ m_prefix }}.{{ ftype }}.tile{{ ntile }}.nc", "{{ COMOUT_ATMOS_RESTART_PREV }}/{{ m_prefix }}.{{ ftype }}.nest0{{ ntile-5 }}.tile{{ ntile }}.nc"]
{% endfor %}
{% else %} # cold start
{% set COMOUT_ATMOS_INPUT = COM_ATMOS_INPUT_TMPL | replace_tmpl(current_dict) %}
{% set COMOUT_ATMOS_INPUT = COM_ATMOS_INPUT_TMPL | replace_tmpl(current_cycle_dict) %}
mkdir:
- "{{ COMOUT_ATMOS_INPUT }}"
copy:
Expand All @@ -134,14 +134,14 @@ atmosphere_nest:

{% if DO_ICE %}
{% if DO_JEDIOCNVAR == True %}
{% set COMOUT_ICE_ANALYSIS = COM_ICE_ANALYSIS_TMPL | replace_tmpl(current_dict) %}
{% set COMOUT_ICE_ANALYSIS = COM_ICE_ANALYSIS_TMPL | replace_tmpl(current_cycle_dict) %}
ice:
mkdir:
- "{{ COMOUT_ICE_ANALYSIS }}"
copy:
- ["{{ ICSDIR }}/{{ COMOUT_ICE_ANALYSIS | relpath(ROTDIR) }}/{{ m_prefix }}.cice_model_anl.res.nc", "{{ COMOUT_ICE_ANALYSIS }}"]
{% else %}
{% set COMOUT_ICE_RESTART_PREV = COM_ICE_RESTART_TMPL | replace_tmpl(previous_dict) %}
{% set COMOUT_ICE_RESTART_PREV = COM_ICE_RESTART_TMPL | replace_tmpl(previous_cycle_dict) %}
ice:
mkdir:
- "{{ COMOUT_ICE_RESTART_PREV }}"
Expand All @@ -151,7 +151,7 @@ ice:
{% endif %}

{% if DO_OCN %}
{% set COMOUT_OCEAN_RESTART_PREV = COM_OCEAN_RESTART_TMPL | replace_tmpl(previous_dict) %}
{% set COMOUT_OCEAN_RESTART_PREV = COM_OCEAN_RESTART_TMPL | replace_tmpl(previous_cycle_dict) %}
ocean:
mkdir:
- "{{ COMOUT_OCEAN_RESTART_PREV }}"
Expand All @@ -164,7 +164,7 @@ ocean:
{% endif %}

{% if DO_JEDIOCNVAR == True %}
{% set COMOUT_OCEAN_ANALYSIS = COM_OCEAN_ANALYSIS_TMPL | replace_tmpl(current_dict) %}
{% set COMOUT_OCEAN_ANALYSIS = COM_OCEAN_ANALYSIS_TMPL | replace_tmpl(current_cycle_dict) %}
rerun:
mkdir:
- "{{ COMOUT_OCEAN_ANALYSIS }}"
Expand All @@ -173,7 +173,7 @@ rerun:
{% endif %}

{% if REPLAY_ICS == True %}
{% set COMOUT_OCEAN_ANALYSIS = COM_OCEAN_ANALYSIS_TMPL | replace_tmpl(current_dict) %}
{% set COMOUT_OCEAN_ANALYSIS = COM_OCEAN_ANALYSIS_TMPL | replace_tmpl(current_cycle_dict) %}
replay:
mkdir:
- "{{ COMOUT_OCEAN_ANALYSIS }}"
Expand All @@ -182,7 +182,7 @@ replay:
{% endif %}

{% if EXP_WARM_START == True %}
{% set COMOUT_MED_RESTART_PREV = COM_MED_RESTART_TMPL | replace_tmpl(previous_dict) %}
{% set COMOUT_MED_RESTART_PREV = COM_MED_RESTART_TMPL | replace_tmpl(previous_cycle_dict) %}
{% if path_exists(ICSDIR ~ "/" ~ COMOUT_MED_RESTART_PREV | relpath(ROTDIR) ~ "/" ~ m_prefix ~ ".ufs.cpld.cpl.r.nc") %}
mediator:
mkdir:
Expand All @@ -195,7 +195,7 @@ mediator:
{% endif %} # DO_OCN=True

{% if DO_WAVE %}
{% set COMOUT_WAVE_RESTART_PREV = COM_WAVE_RESTART_TMPL | replace_tmpl(previous_run_dict) %}
{% set COMOUT_WAVE_RESTART_PREV = COM_WAVE_RESTART_TMPL | replace_tmpl(previous_cycle_and_run_dict) %}
wave:
mkdir:
- "{{ COMOUT_WAVE_RESTART_PREV }}"
Expand Down
8 changes: 5 additions & 3 deletions scripts/exglobal_stage_ic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import os

from pygfs.task.stage import Stage
from pygfs.task.stage_ic import Stage
from wxflow import AttrDict, Logger, cast_strdict_as_dtypedict, logit

# Initialize root logger
Expand All @@ -19,8 +19,7 @@ def main():

# Pull out all the configuration keys needed to run stage job
keys = ['RUN', 'MODE', 'EXP_WARM_START', 'NMEM_ENS',
'previous_cycle', 'current_cycle',
'model_start_date_current_cycle',
'current_cycle', 'previous_cycle',
'ROTDIR', 'ICSDIR', 'STAGE_IC_YAML_TMPL',
'OCNRES', 'waveGRD', 'ntiles', 'DO_JEDIOCNVAR',
'REPLAY_ICS', 'DO_WAVE', 'DO_OCN', 'DO_ICE', 'DO_NEST']
Expand All @@ -34,6 +33,9 @@ def main():
if key.startswith("COM"):
stage_dict[key] = stage.task_config[key]

# Configure staging
stage_dict = stage.configure(stage_dict)

# Stage ICs
stage.execute_stage(stage_dict)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from typing import Any, Dict, List

from wxflow import (AttrDict, FileHandler, Task, cast_strdict_as_dtypedict,
logit, parse_j2yaml, strftime,
to_YMD, to_YMDH, Template, TemplateConstants)
logit, parse_j2yaml, strftime, to_YMD,
add_to_datetime, to_timedelta, Template, TemplateConstants)

logger = getLogger(__name__.split('.')[-1])

Expand All @@ -32,13 +32,11 @@ def __init__(self, config: Dict[str, Any]) -> None:
"""
super().__init__(config)

rotdir = self.task_config.ROTDIR + os.sep

self.task_config = AttrDict(**self.task_config)

@logit(logger)
def execute_stage(self, stage_dict: Dict[str, Any]) -> None:
"""Perform local staging of initial condition files.
def configure(self, stage_dict: Dict[str, Any]) -> (Dict[str, Any]):
"""Determine stage settings based on configuration and runtime options.
Parameters
----------
Expand All @@ -47,22 +45,38 @@ def execute_stage(self, stage_dict: Dict[str, Any]) -> None:
Returns
-------
None
stage_dict : Dict[str, Any]
Configuration dictionary updated
"""

if not os.path.isdir(stage_dict.ROTDIR):
raise FileNotFoundError(f"FATAL ERROR: The ROTDIR ({stage_dict.ROTDIR}) does not exist!")

# Add the os.path.exists function to the dict for yaml parsing
#-------------------------------------------------------------
stage_dict['path_exists'] = os.path.exists

# Determine model start date
#---------------------------
current_cycle_begin = add_to_datetime(self.task_config.current_cycle, -to_timedelta(f"{self.task_config['assim_freq']}H") / 2)
current_cycle_end = add_to_datetime(self.task_config.current_cycle, to_timedelta(f"{self.task_config['assim_freq']}H") / 2)

if self.task_config.DOIAU == True and self.task_config.MODE == "cycled":
model_start_date_current_cycle = current_cycle_begin
else:
if self.task_config.REPLAY_ICS == True:
model_start_date_current_cycle = current_cycle_end
else:
model_start_date_current_cycle = self.task_config.current_cycle

stage_dict['model_start_date_current_cycle'] = model_start_date_current_cycle

# Determine restart RUN
#----------------------
rRUN = self.task_config.RUN
if self.task_config.RUN == "gfs":
rRUN = "gdas"
stage_dict['rRUN'] = rRUN

# Determine ensemble member settings
#-----------------------------------
MEM_START = -1 # Deterministic default, no members
if self.task_config.NMEM_ENS > 0:
if self.task_config.RUN == "gefs":
Expand All @@ -71,14 +85,33 @@ def execute_stage(self, stage_dict: Dict[str, Any]) -> None:
MEM_START = 1

if MEM_START >= 0: # Ensemble RUN
first_mem = MEM_START
last_mem = self.task_config.NMEM_ENS
stage_dict['first_mem'] = MEM_START
stage_dict['last_mem'] = self.task_config.NMEM_ENS
else: # Deteministic RUN
first_mem = MEM_START
last_mem = MEM_START
stage_dict['first_mem'] = MEM_START
stage_dict['last_mem'] = MEM_START

return stage_dict

@logit(logger)
def execute_stage(self, stage_dict: Dict[str, Any]) -> None:
"""Perform local staging of initial condition files.
Parameters
----------
stage_dict : Dict[str, Any]
Configuration dictionary
Returns
-------
None
"""

if not os.path.isdir(stage_dict.ROTDIR):
raise FileNotFoundError(f"FATAL ERROR: The ROTDIR ({stage_dict.ROTDIR}) does not exist!")

# Loop over members
for mem in range(first_mem, last_mem + 1):
for mem in range(stage_dict.first_mem, stage_dict.last_mem + 1):
stage_dict['mem'] = mem
stage_set = parse_j2yaml(self.task_config.STAGE_IC_YAML_TMPL, stage_dict, allow_missing=False)
# Copy files to ROTDIR
Expand Down

0 comments on commit 7ae9ea4

Please sign in to comment.