From b74e6965726f7cbaf52807b00e429131c5a83500 Mon Sep 17 00:00:00 2001 From: "kate.friedman" Date: Mon, 12 Aug 2024 14:03:47 +0000 Subject: [PATCH] Refactor staging job for mem loop and COM declare - Move rRUN, mem list, and COM declares inside python/yaml - Move mem loop inside python - Add path_exists checks to some initial conditions - Create dicts for replace_tmpl COM declares Refs #2475 --- jobs/JGLOBAL_STAGE_IC | 55 +----------------------- parm/stage/stage.yaml.j2 | 77 +++++++++++++++++++++++++++------- scripts/exglobal_stage_ic.py | 2 +- ush/python/pygfs/task/stage.py | 34 ++++++++++++--- 4 files changed, 92 insertions(+), 76 deletions(-) diff --git a/jobs/JGLOBAL_STAGE_IC b/jobs/JGLOBAL_STAGE_IC index 33b44f2a70..3346db0827 100755 --- a/jobs/JGLOBAL_STAGE_IC +++ b/jobs/JGLOBAL_STAGE_IC @@ -3,12 +3,6 @@ source "${HOMEgfs}/ush/preamble.sh" source "${HOMEgfs}/ush/jjob_header.sh" -e "stage_ic" -c "base stage_ic" -# Restart conditions for GFS cycle come from GDAS -# shellcheck disable=SC2153 -rRUN=${RUN} -# shellcheck disable=SC2153 -[[ ${RUN} == "gfs" ]] && rRUN="gdas" - # Define significant cycles # shellcheck disable=SC2153 half_window=$(( assim_freq / 2 )) @@ -30,56 +24,11 @@ else fi export current_cycle previous_cycle current_cycle_offset model_start_date_current_cycle -# Define MEMDIR_ARRAY -MEMDIR_ARRAY=() -if [[ "${RUN:-}" = "enkfgdas" || "${RUN:-}" = "gefs" ]]; then - if [[ "${RUN:-}" = "gefs" ]]; then - ii_start=0 - elif [[ "${RUN:-}" = "enkfgdas" ]]; then - ii_start=1 - fi - # Populate the member_dirs array based on the value of NMEM_ENS - for ((ii = "${ii_start}"; ii <= "${NMEM_ENS:-0}"; ii++)); do - MEMDIR_ARRAY+=("mem$(printf "%03d" "${ii}")") - done -else - MEMDIR_ARRAY+=("") -fi - # Initialize return code err=0 -############################################################### -for MEMDIR in "${MEMDIR_ARRAY[@]}"; do - - # Declare COMs - if [[ "${MODE}" = "cycled" && "${RUN}" = "gdas" ]]; then - YMD=${current_cycle:0:8} HH=${current_cycle:8:2} declare_from_tmpl -x COMOUT_ATMOS_ANALYSIS:COM_ATMOS_ANALYSIS_TMPL - fi - - if [[ ${EXP_WARM_START:-".false."} = ".true." ]]; then - MEMDIR=${MEMDIR} RUN=${rRUN} YMD=${previous_cycle:0:8} HH=${previous_cycle:8:2} declare_from_tmpl -x COMOUT_ATMOS_RESTART_PREV:COM_ATMOS_RESTART_TMPL - MEMDIR=${MEMDIR} RUN=${rRUN} YMD=${previous_cycle:0:8} HH=${previous_cycle:8:2} declare_from_tmpl -x COMOUT_MED_RESTART_PREV:COM_MED_RESTART_TMPL - else - MEMDIR=${MEMDIR} YMD=${current_cycle:0:8} HH=${current_cycle:8:2} declare_from_tmpl -x COMOUT_ATMOS_INPUT:COM_ATMOS_INPUT_TMPL - fi - if [[ "${DO_OCN:-}" = "YES" ]]; then - MEMDIR=${MEMDIR} RUN=${rRUN} YMD=${previous_cycle:0:8} HH=${previous_cycle:8:2} declare_from_tmpl -x COMOUT_OCEAN_RESTART_PREV:COM_OCEAN_RESTART_TMPL - if [[ "${REPLAY_ICS:-NO}" = "YES" ]]; then - MEMDIR=${MEMDIR} YMD=${current_cycle:0:8} HH=${current_cycle:8:2} declare_from_tmpl -x COMOUT_OCEAN_ANALYSIS:COM_OCEAN_ANALYSIS_TMPL - fi - fi - if [[ "${DO_ICE:-}" = "YES" ]]; then - MEMDIR=${MEMDIR} RUN=${rRUN} YMD=${previous_cycle:0:8} HH=${previous_cycle:8:2} declare_from_tmpl -x COMOUT_ICE_RESTART_PREV:COM_ICE_RESTART_TMPL - fi - if [[ "${DO_WAVE:-}" = "YES" ]]; then - MEMDIR=${MEMDIR} YMD=${previous_cycle:0:8} HH=${previous_cycle:8:2} declare_from_tmpl -x COMOUT_WAVE_RESTART_PREV:COM_WAVE_RESTART_TMPL - fi - - # Execute staging - "${SCRgfs}/exglobal_stage_ic.py" - -done # for MEMDIR in "${MEMDIR_ARRAY[@]}"; do +# Execute staging +"${SCRgfs}/exglobal_stage_ic.py" ############################################################### # Check for errors and exit if any of the above failed diff --git a/parm/stage/stage.yaml.j2 b/parm/stage/stage.yaml.j2 index a8e86a8899..56f9eda35b 100644 --- a/parm/stage/stage.yaml.j2 +++ b/parm/stage/stage.yaml.j2 @@ -19,43 +19,77 @@ ############################################################# # Set variables used below -{% set cycle_HH = current_cycle | strftime("%H") %} +{% set current_cycle_YMD = current_cycle | to_YMD %} +{% set current_cycle_HH = current_cycle | strftime("%H") %} +{% set previous_cycle_YMD = previous_cycle | to_YMD %} +{% set previous_cycle_HH = previous_cycle | strftime("%H") %} {% set m_prefix = model_start_date_current_cycle | to_YMD + "." + model_start_date_current_cycle | strftime("%H") + "0000" %} {% set o_prefix = current_cycle_offset | to_YMD + "." + current_cycle_offset | strftime("%H") + "0000" %} {% set p_prefix = previous_cycle | to_YMD + "." + previous_cycle | strftime("%H") + "0000" %} -############################################################# +#################################################################### # Initial condition to stage +#################################################################### + +# Declare a dict of search and replace terms to run on each template + +{% if mem >= 0 %} + {% set mem_char = 'mem%03d' | format(mem) %} +{% else %} + {% set mem_char = '' %} +{% endif %} +{% set current_dict = ({ '${ROTDIR}':ROTDIR, + '${RUN}':RUN, + '${YMD}':current_cycle_YMD, + '${HH}':current_cycle_HH, + '${MEMDIR}': mem_char }) %} +{% set previous_dict = ({ '${ROTDIR}':ROTDIR, + '${RUN}':rRUN, + '${YMD}':previous_cycle_YMD, + '${HH}':previous_cycle_HH, + '${MEMDIR}': mem_char }) %} +{% set previous_run_dict = ({ '${ROTDIR}':ROTDIR, + '${RUN}':RUN, + '${YMD}':previous_cycle_YMD, + '${HH}':previous_cycle_HH, + '${MEMDIR}': mem_char }) %} + +# Initial condition definitions {% if MODE == "cycled" and RUN == "gdas" %} +{% set COMOUT_ATMOS_ANALYSIS = COM_ATMOS_ANALYSIS_TMPL | replace_tmpl(current_dict) %} analysis: mkdir: - "{{ COMOUT_ATMOS_ANALYSIS }}" copy: {% for ftype in ["abias", "abias_air", "abias_int", "abias_pc", "atminc.nc", "radstat"] %} - {% if path_exists(ICSDIR ~ "/" ~ COMOUT_ATMOS_ANALYSIS | relpath(ROTDIR) ~ "/" ~ RUN ~ ".t" ~ cycle_HH ~ "z." ~ ftype) %} - - ["{{ ICSDIR }}/{{ COMOUT_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ RUN }}.t{{ cycle_HH }}z.{{ ftype }}", "{{ COMOUT_ATMOS_ANALYSIS }}"] + {% if path_exists(ICSDIR ~ "/" ~ COMOUT_ATMOS_ANALYSIS | relpath(ROTDIR) ~ "/" ~ RUN ~ ".t" ~ current_cycle_HH ~ "z." ~ ftype) %} + - ["{{ ICSDIR }}/{{ COMOUT_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ RUN }}.t{{ current_cycle_HH }}z.{{ ftype }}", "{{ COMOUT_ATMOS_ANALYSIS }}"] {% endif %} {% endfor %} {% endif %} {% if EXP_WARM_START == True %} +{% set COMOUT_ATMOS_RESTART_PREV = COM_ATMOS_RESTART_TMPL | replace_tmpl(previous_dict) %} atmosphere_warm: mkdir: - "{{ COMOUT_ATMOS_RESTART_PREV }}" copy: {% for ftype in ["coupler.res", "fv_core.res.nc"] %} - - ["{{ ICSDIR }}/{{ COMOUT_ATMOS_RESTART_PREV | relpath(ROTDIR) }}/{{ o_prefix }}.{{ ftype }}", "{{ COMOUT_ATMOS_RESTART_PREV }}"] + - ["{{ ICSDIR }}/{{ COMOUT_ATMOS_RESTART_PREV | relpath(ROTDIR) }}/{{ m_prefix }}.{{ ftype }}", "{{ COMOUT_ATMOS_RESTART_PREV }}"] {% endfor %} {% for ftype in ["ca_data", "fv_core.res", "fv_srf_wnd.res", "fv_tracer.res", "phy_data", "sfc_data"] %} {% for ntile in range(1, ntiles + 1) %} - - ["{{ ICSDIR }}/{{ COMOUT_ATMOS_RESTART_PREV | relpath(ROTDIR) }}/{{ o_prefix }}.{{ ftype }}.tile{{ ntile }}.nc", "{{ COMOUT_ATMOS_RESTART_PREV }}"] + - ["{{ ICSDIR }}/{{ COMOUT_ATMOS_RESTART_PREV | relpath(ROTDIR) }}/{{ m_prefix }}.{{ ftype }}.tile{{ ntile }}.nc", "{{ COMOUT_ATMOS_RESTART_PREV }}"] {% endfor %} # ntile {% endfor %} # ftype {% for ntile in range(1, ntiles + 1) %} + {% if path_exists(ICSDIR ~ "/" ~ COMOUT_ATMOS_RESTART_PREV | relpath(ROTDIR) ~ "/" ~ p_prefix ~ ".sfcanl_data.tile" ~ ntile ~ ".nc") %} - ["{{ ICSDIR }}/{{ COMOUT_ATMOS_RESTART_PREV | relpath(ROTDIR) }}/{{ p_prefix }}.sfcanl_data.tile{{ ntile }}.nc", "{{ COMOUT_ATMOS_RESTART_PREV }}"] + {% endif %} # path_exists {% endfor %} # ntile -{% else %} +{% else %} # cold start +{% set COMOUT_ATMOS_INPUT = COM_ATMOS_INPUT_TMPL | replace_tmpl(current_dict) %} atmosphere_cold: mkdir: - "{{ COMOUT_ATMOS_INPUT }}" @@ -69,31 +103,38 @@ atmosphere_cold: {% endif %} {% if REPLAY_ICS == "YES" %} +{% set COMOUT_ATMOS_ANALYSIS = COM_ATMOS_ANALYSIS_TMPL | replace_tmpl(current_dict) %} atmosphere_perturbation: mkdir: - "{{ COMOUT_ATMOS_ANALYSIS }}" copy: - - ["{{ ICSDIR }}/{{ COMOUT_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ m_prefix }}.fv3_perturbation.nc", "{{ COMOUT_ATMOS_ANALYSIS }}/{{ RUN }}.t{{ cycle_HH }}z.atminc.nc"] + - ["{{ ICSDIR }}/{{ COMOUT_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ m_prefix }}.fv3_perturbation.nc", "{{ COMOUT_ATMOS_ANALYSIS }}/{{ RUN }}.t{{ current_cycle_HH }}z.atminc.nc"] {% endif %} {% if DO_NEST %} atmosphere_nest: - {% set ntile = 7 %} + {% set ntile = 7 %} + {% if EXP_WARM_START == True %} + {% set COMOUT_ATMOS_RESTART_PREV = COM_ATMOS_RESTART_TMPL | replace_tmpl(previous_dict) %} mkdir: - "{{ COMOUT_ATMOS_RESTART_PREV }}" copy: - {% if EXP_WARM_START == True %} - {% for ftype in ["ca_data", "fv_core.res", "fv_srf_wnd.res", "fv_tracer.res", "phy_data", "sfc_data"] %} + {% for ftype in ["ca_data", "fv_core.res", "fv_srf_wnd.res", "fv_tracer.res", "phy_data", "sfc_data"] %} - ["{{ ICSDIR }}/{{ COMOUT_ATMOS_RESTART_PREV | relpath(ROTDIR) }}/{{ o_prefix }}.{{ ftype }}.tile{{ ntile }}.nc", "{{ COMOUT_ATMOS_RESTART_PREV }}/{{ o_prefix }}.{{ ftype }}.nest0{{ ntile-5 }}.tile{{ ntile }}.nc"] - {% endfor %} # ftype - {% else %} - {% for ftype in ["gfs_data", "sfc_data"] %} + {% endfor %} + {% else %} # cold start + {% set COMOUT_ATMOS_INPUT = COM_ATMOS_INPUT_TMPL | replace_tmpl(current_dict) %} + mkdir: + - "{{ COMOUT_ATMOS_INPUT }}" + copy: + {% for ftype in ["gfs_data", "sfc_data"] %} - ["{{ COMOUT_ATMOS_INPUT }}/{{ ftype }}.tile{{ ntile }}.nc", "{{ COMOUT_ATMOS_INPUT }}/{{ ftype }}.nest0{{ ntile-5 }}.tile{{ ntile }}.nc"] - {% endfor %} # ftype - {% endif %} # cold-start + {% endfor %} + {% endif %} {% endif %} {% if DO_ICE %} +{% set COMOUT_ICE_RESTART_PREV = COM_ICE_RESTART_TMPL | replace_tmpl(previous_dict) %} ice: mkdir: - "{{ COMOUT_ICE_RESTART_PREV }}" @@ -102,6 +143,7 @@ ice: {% endif %} {% if DO_OCN %} +{% set COMOUT_OCEAN_RESTART_PREV = COM_OCEAN_RESTART_TMPL | replace_tmpl(previous_dict) %} ocean: mkdir: - "{{ COMOUT_OCEAN_RESTART_PREV }}" @@ -114,6 +156,7 @@ ocean: {% endif %} {% if REPLAY_ICS == "YES" %} +{% set COMOUT_OCEAN_ANALYSIS = COM_OCEAN_ANALYSIS_TMPL | replace_tmpl(current_dict) %} replay: mkdir: - "{{ COMOUT_OCEAN_ANALYSIS }}" @@ -122,6 +165,7 @@ replay: {% endif %} {% if EXP_WARM_START == True %} +{% set COMOUT_MED_RESTART_PREV = COM_MED_RESTART_TMPL | replace_tmpl(previous_dict) %} {% if path_exists(ICSDIR ~ "/" ~ COMOUT_MED_RESTART_PREV | relpath(ROTDIR) ~ "/" ~ m_prefix ~ ".ufs.cpld.cpl.r.nc") %} mediator: mkdir: @@ -134,6 +178,7 @@ mediator: {% endif %} # DO_OCN=YES {% if DO_WAVE %} +{% set COMOUT_WAVE_RESTART_PREV = COM_WAVE_RESTART_TMPL | replace_tmpl(previous_run_dict) %} wave: mkdir: - "{{ COMOUT_WAVE_RESTART_PREV }}" diff --git a/scripts/exglobal_stage_ic.py b/scripts/exglobal_stage_ic.py index d5be8e8ee1..8e0832de4b 100755 --- a/scripts/exglobal_stage_ic.py +++ b/scripts/exglobal_stage_ic.py @@ -18,7 +18,7 @@ def main(): stage = Stage(config) # Pull out all the configuration keys needed to run stage job - keys = ['RUN', 'MODE', 'EXP_WARM_START', + keys = ['RUN', 'MODE', 'EXP_WARM_START', 'NMEM_ENS', 'previous_cycle', 'current_cycle', 'current_cycle_offset', 'model_start_date_current_cycle', 'ROTDIR', 'ICSDIR', 'STAGE_IC_YAML_TMPL', diff --git a/ush/python/pygfs/task/stage.py b/ush/python/pygfs/task/stage.py index e1459fdee0..edc12540cb 100644 --- a/ush/python/pygfs/task/stage.py +++ b/ush/python/pygfs/task/stage.py @@ -56,9 +56,31 @@ def execute_stage(self, stage_dict: Dict[str, Any]) -> None: # Add the os.path.exists function to the dict for yaml parsing stage_dict['path_exists'] = os.path.exists - stage_set = parse_j2yaml(self.task_config.STAGE_IC_YAML_TMPL, stage_dict, - allow_missing=False) - - # Copy files to ROTDIR - for key in stage_set.keys(): - FileHandler(stage_set[key]).sync() + # Determine restart RUN + rRUN = self.task_config.RUN + if self.task_config.RUN == "gfs": + rRUN = "gdas" + stage_dict['rRUN'] = rRUN + + # Determine ensemble member settings + MEM_START = -1 # Deterministic default, no members + if self.task_config.NMEM_ENS > 0: + if self.task_config.RUN == "gefs": + MEM_START = 0 + elif self.task_config.RUN == "enkfgdas": + MEM_START = 1 + + if MEM_START >= 0: # Ensemble RUN + first_mem = MEM_START + last_mem = self.task_config.NMEM_ENS + else: # Deteministic RUN + first_mem = MEM_START + last_mem = MEM_START + + # Loop over members + for mem in range(first_mem, last_mem + 1): + stage_dict['mem'] = mem + stage_set = parse_j2yaml(self.task_config.STAGE_IC_YAML_TMPL, stage_dict, allow_missing=False) + # Copy files to ROTDIR + for key in stage_set.keys(): + FileHandler(stage_set[key]).sync()