Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix and simplify online archiving #2687

Merged
merged 11 commits into from
Jun 21, 2024
Merged
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ ush/fv3gfs_make_grid.sh
ush/fv3gfs_make_orog.sh
ush/gen_bufr2ioda_json.py
ush/gen_bufr2ioda_yaml.py
ush/bufr2ioda_insitu_profile*.py
ush/bufr2ioda_insitu_surface*.py
ush/global_chgres.sh
ush/global_chgres_driver.sh
ush/global_cycle.sh
Expand Down
250 changes: 141 additions & 109 deletions parm/archive/arcdir.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -4,121 +4,153 @@
{% set head = RUN + ".t" + cycle_HH + "z." %}

# Select data to store in the ARCDIR and VFYARC from deterministic runs
Base: &base # GDAS, GFS, ENKFGDAS, or ENKFGFS
common:
mkdir:
- "{{ ARCDIR }}"

# Common files to be added to both the gfs and gdas keys below
Deterministic: &deterministic
cyclone:
copy:
# Cyclone forecasts, produced for both gdas and gfs cycles
## Only created if tracking is on and there were systems to track
{% if path_exists(COMIN_ATMOS_TRACK ~ "/atcfunix." ~ RUN ~ "." ~ cycle_YMDH) %}
- ["{{ COMIN_ATMOS_TRACK }}/atcfunix.{{ RUN }}.{{ cycle_YMDH }}", "{{ ARCDIR }}/atcfunix.{{ RUN }}.{{ cycle_YMDH }}"]
- ["{{ COMIN_ATMOS_TRACK }}/atcfunixp.{{ RUN }}.{{ cycle_YMDH }}", "{{ ARCDIR }}/atcfunixp.{{ RUN }}.{{ cycle_YMDH }}"]
{% endif %}

# Cyclone tracking data
{% for basin in ["epac", "natl"] %}
{% if path_exists(COMIN_ATMOS_TRACK + "/" + basin) %}
- ["{{ COMIN_ATMOS_TRACK }}/{{ basin }}", "{{ ARCDIR }}/{{ basin }}"]
{% endif %}
{% endfor %}
# This file set will contain all source-destination pairs to send to the FileHandler for copying
{% set file_set = [] %}

# Declare the VFYARC where Fit2Obs data will be sent
{% set VFYARC = ROTDIR ~ "/vrfyarch" %}

# Deterministic files
{% if "enkf" not in RUN %}
# Common files to be added to both the gfs and gdas keys below
{% set det_files = [] %}
# Cyclone forecasts, produced for both gdas and gfs cycles
## Only created if tracking is on and there were systems to track
{% if path_exists(COMIN_ATMOS_TRACK ~ "/atcfunix." ~ RUN ~ "." ~ cycle_YMDH) %}
{% do det_files.append([COMIN_ATMOS_TRACK ~ "/atcfunix." ~ RUN ~ "." ~ cycle_YMDH,
ARCDIR ~"/atcfunix." ~ RUN ~ "." ~ cycle_YMDH ]) %}
{% do det_files.append([COMIN_ATMOS_TRACK ~ "/atcfunixp." ~ RUN ~ "." ~ cycle_YMDH,
ARCDIR ~ "/atcfunixp." ~ RUN ~ "." ~ cycle_YMDH]) %}
{% endif %}

{% if MODE == "cycled" %}
analysis:
copy:
# Analysis data (if we are running in cycled mode)
- ["{{ COMIN_ATMOS_GRIB_1p00 }}/{{ head }}pgrb2.1p00.anl", "{{ ARCDIR }}/pgbanl.{{ RUN }}.{{ cycle_YMDH }}.grib2"]

{% if DO_JEDIATMVAR %}
- ["{{ COMIN_ATMOS_ANALYSIS }}/{{ head }}atmstat", "{{ ARCDIR }}/atmstat.{{ RUN }}.{{ cycle_YMDH }}"]
{% else %}
- ["{{ COMIN_ATMOS_ANALYSIS }}/{{ head }}gsistat", "{{ ARCDIR }}/gsistat.{{ RUN }}.{{ cycle_YMDH }}"]
{% endif %}

{% if DO_JEDISNOWDA %}
- ["{{ COMIN_SNOW_ANALYSIS }}/{{ head }}snowstat.tgz", "{{ ARCDIR }}/snowstat.{{ RUN }}.{{ cycle_YMDH }}.tgz"]
{% endif %}

{% if AERO_ANL_CDUMP == RUN or AERO_ANL_CDUMP == "both" %}
- ["{{ COMIN_CHEM_ANALYSIS }}/{{ head }}aerostat", "{{ ARCDIR }}/aerostat.{{ RUN }}.{{ cycle_YMDH }}"]
{% endif %}

{% if DO_PREP_OBS_AERO %}
- ["{{ COMIN_OBS }}/{{ head }}aeroobs", "{{ ARCDIR }}/aeroobs.{{ RUN }}.{{ cycle_YMDH }}"]
- ["{{ COMIN_OBS }}/{{ head }}aerorawobs", "{{ ARCDIR }}/aerorawobs.{{ RUN }}.{{ cycle_YMDH }}"]
{% endif %}
# Cyclone tracking data
{% for basin in ["epac", "natl"] %}
{% if path_exists(COMIN_ATMOS_TRACK + "/" + basin) %}
{% do det_files.append([COMIN_ATMOS_TRACK ~ "/" ~ basin,
ARCDIR ~ "/" ~ basin ]) %}
{% endif %}
{% endfor %}

# Deterministic analysis files (generated for cycled experiments)
{% set det_anl_files = [] %}
# Analysis data (if we are running in cycled mode)
{% do det_anl_files.append([COMIN_ATMOS_GRIB_1p00 ~ "/" ~ head ~ "pgrb2.1p00.anl",
ARCDIR ~ "/pgbanl." ~ RUN ~ "." ~ cycle_YMDH ~ ".grib2"]) %}

{% if DO_JEDIATMVAR == True %}
{% do det_anl_files.append([COMIN_ATMOS_ANALYSIS ~ "/" ~ head ~ "atmstat",
ARCDIR ~ "/atmstat." ~ RUN ~ "." ~ cycle_YMDH ]) %}
{% else %}
{% do det_anl_files.append([COMIN_ATMOS_ANALYSIS ~ "/" ~ head ~ "gsistat",
ARCDIR ~ "/gsistat." ~ RUN ~ "." ~ cycle_YMDH ]) %}
{% endif %}

{% if DO_JEDISNOWDA == True %}
{% do det_anl_files.append([COMIN_SNOW_ANALYSIS ~ "/" ~ head ~ "snowstat.tgz",
ARCDIR ~ "/snowstat." ~ RUN ~ "." ~ cycle_YMDH ~ ".tgz"]) %}
{% endif %}

{% if RUN == "gfs" %}
gfs: # GFS specific
<<: *base
<<: *deterministic

gfs:
copy:
{% for fhr in range(0, FHMAX_GFS + 1, FHOUT_GFS) %}
- ["{{ COMIN_ATMOS_GRIB_1p00 }}/{{ head }}pgrb2.1p00.f{{ '%03d' % fhr }}", "{{ ARCDIR }}/pgbf{{ '%02d' % fhr }}.{{ RUN }}.{{ cycle_YMDH }}.grib2"]
{% endfor %}

# Cyclone genesis data (only present if there are storms)
{% if path_exists(COMIN_ATMOS_GENESIS ~ "/storms.gfso.atcf_gen." ~ cycle_YMDH) %}
- ["{{ COMIN_ATMOS_GENESIS }}/storms.gfso.atcf_gen.{{ cycle_YMDH }}", "{{ ARCDIR }}/storms.gfso.atcf_gen.{{ cycle_YMDH }}"]
- ["{{ COMIN_ATMOS_GENESIS }}/storms.gfso.atcf_gen.altg.{{ cycle_YMDH }}", "{{ ARCDIR }}/storms.gfso.atcf_gen.altg.{{ cycle_YMDH }}"]
{% endif %}

{% if path_exists(COMIN_ATMOS_GENESIS ~ "/trak.gfso.atcfunix." ~ cycle_YMDH) %}
- ["{{ COMIN_ATMOS_GENESIS }}/trak.gfso.atcfunix.{{ cycle_YMDH }}", "{{ ARCDIR }}/trak.gfso.atcfunix.{{ cycle_YMDH }}"]
- ["{{ COMIN_ATMOS_GENESIS }}/trak.gfso.atcfunix.altg.{{ cycle_YMDH }}", "{{ ARCDIR }}/trak.gfso.atcfunix.altg.{{ cycle_YMDH }}"]
{% endif %}

{% if DO_FIT2OBS %}
fit2obs:

mkdir:
{% set VFYARC = ROTDIR + "/vrfyarch" %}
- "{{ VFYARC }}/{{ RUN }}.{{ cycle_YMD }}/{{ cycle_HH }}"

copy:
{% for fhr in range(0, FHMAX_FITS + 1, 6) %}
{% set sfcfile = "/" + head + "sfcf" + '%03d'|format(fhr) + ".nc" %}
{% set sigfile = "/" + head + "atmf" + '%03d'|format(fhr) + ".nc" %}
- ["{{COMIN_ATMOS_HISTORY}}/{{ sfcfile }}", "{{ VFYARC }}/{{ RUN }}.{{ cycle_YMD }}/{{ cycle_HH }}/{{ sfcfile }}"]
- ["{{COMIN_ATMOS_HISTORY}}/{{ sigfile }}", "{{ VFYARC }}/{{ RUN }}.{{ cycle_YMD }}/{{ cycle_HH }}/{{ sigfile }}"]
{% endfor %}
{% if AERO_ANL_CDUMP == RUN or AERO_ANL_CDUMP == "both" %}
{% do det_anl_files.append([COMIN_CHEM_ANALYSIS ~ "/" ~ head ~ "aerostat",
ARCDIR ~ "/aerostat." ~ RUN ~ "." ~ cycle_YMDH ]) %}
{% endif %}

{% if DO_PREP_OBS_AERO == True %}
{% do det_anl_files.append([COMIN_OBS ~ "/" ~ head ~ "aeroobs",
ARCDIR ~ "/aeroobs." ~ RUN ~ "." ~ cycle_YMDH]
{% do det_anl_files.append([COMIN_OBS ~ "/" ~ head ~ "aeroawobs",
ARCDIR ~ "/aeroawobs." ~ RUN ~ "." ~ cycle_YMDH]
{% endif %}
{% endif %}

{% if RUN == "gdas" %}
gdas: # GDAS specific
<<: *base
<<: *deterministic
gdas:
copy:
{% for fhr in range(0, FHMAX + 1, FHOUT) %}
- ["{{ COMIN_ATMOS_GRIB_1p00 }}/{{ head }}pgrb2.1p00.f{{ '%03d' % fhr }}", "{{ ARCDIR }}/pgbf{{ '%02d' % fhr }}.{{ RUN }}.{{ cycle_YMDH }}.grib2"]
{% endfor %}
{% endif %}
# GFS-specific files
{% set gfs_files = [] %}
{% for fhr in range(0, FHMAX_GFS + 1, FHOUT_GFS) %}
{% do gfs_files.append([COMIN_ATMOS_GRIB_1p00 ~ "/" ~ head ~ "pgrb2.1p00.f" ~ '%03d'|format(fhr),
ARCDIR ~ "/pgbf" ~ '%02d'|format(fhr) ~ "." ~ RUN ~ "." ~ cycle_YMDH ~ ".grib2"]) %}
{% endfor %}

# Cyclone genesis data (only present if there are storms)
{% if path_exists(COMIN_ATMOS_GENESIS ~ "/storms.gfso.atcf_gen." ~ cycle_YMDH) %}
{% do gfs_files.append([COMIN_ATMOS_GENESIS ~ "/storms.gfso.atcf_gen." ~ cycle_YMDH,
ARCDIR ~ "/storms.gfso.atcf_gen." ~ cycle_YMDH ]) %}
{% do gfs_files.append([COMIN_ATMOS_GENESIS ~ "/storms.gfso.atcf_gen.altg." ~ cycle_YMDH,
ARCDIR ~ "/storms.gfso.atcf_gen.altg." ~ cycle_YMDH ]) %}
{% endif %}

{% if path_exists(COMIN_ATMOS_GENESIS ~ "/trak.gfso.atcfunix." ~ cycle_YMDH) %}
{% do gfs_files.append([COMIN_ATMOS_GENESIS ~ "/trak.gfso.atcfunix." ~ cycle_YMDH,
ARCDIR ~ "/trak.gfso.atcfunix." ~ cycle_YMDH ]) %}
{% do gfs_files.append([COMIN_ATMOS_GENESIS ~ "/trak.gfso.atcfunix.altg." ~ cycle_YMDH,
ARCDIR ~ "/trak.gfso.atcfunix.altg." ~ cycle_YMDH ]) %}
{% endif %}

# GFS Fit2Obs data
{% set fit2obs_files = [] %}
{% for fhr in range(0, FHMAX_FITS + 1, 6) %}
{% set sfcfile = "/" + head + "sfcf" + '%03d'|format(fhr) + ".nc" %}
{% set sigfile = "/" + head + "atmf" + '%03d'|format(fhr) + ".nc" %}
{% do fit2obs_files.append([COMIN_ATMOS_HISTORY ~ "/" ~ sfcfile,
VFYARC ~ "/" ~ RUN ~ "." ~ cycle_YMD ~ "/" ~ cycle_HH ~ "/" ~ sfcfile ]) %}
{% do fit2obs_files.append([COMIN_ATMOS_HISTORY ~ "/" ~ sigfile,
VFYARC ~ "/" ~ RUN ~ "." ~ cycle_YMD ~ "/" ~ cycle_HH ~ "/" ~ sigfile ]) %}
{% endfor %}

# GDAS-specific files
{% set gdas_files = [] %}
{% for fhr in range(0, FHMAX + 1, FHOUT) %}
{% do gdas_files.append([COMIN_ATMOS_GRIB_1p00 ~ "/" ~ head ~ "pgrb2.1p00.f" ~ '%03d'|format(fhr),
ARCDIR ~ "/pgbf" ~ '%02d'|format(fhr) ~ "." ~ RUN ~ "." ~ cycle_YMDH ~ ".grib2"]) %}
{% endfor %}

# Now append the necessary file pairs to file_set
# Common deterministic files
{% set file_set = file_set + det_files %}
{% if MODE == "cycled" %}
{% set file_set = file_set + det_anl_files %}
{% endif %}

# Run-specific deterministic files
{% if RUN == "gfs" %}
{% set file_set = file_set + gfs_files %}
# Fit2Obs files
{% if DO_FIT2OBS == True %}
{% set file_set = file_set + fit2obs_files %}
{% endif %}
{% elif RUN == "gdas" %}
{% set file_set = file_set + gdas_files %}
{% endif %}

{% else %} # End of deterministic files

# Ensemble analysis files
{% set enkf_files = [] %}
{% if DO_JEDIATMENS == True %}
{% do enkf_files.append([COMIN_ATMOS_ANALYSIS_ENSSTAT ~ "/" ~ head ~ "atmensstat",
ARCDIR ~ "/atmensstat." ~ RUN ~ "." ~ cycle_YMDH ]) %}
{% do enkf_files.append([COMIN_ATMOS_ANALYSIS_ENSSTAT ~ "/" ~ head ~ "atminc.ensmean.nc",
ARCDIR ~ "/atmensstat." ~ RUN ~ "." ~ cycle_YMDH ~ ".ensmean.nc"]) %}
{% else %}
{% do enkf_files.append([COMIN_ATMOS_ANALYSIS_ENSSTAT ~ "/" ~ head ~ "enkfstat",
ARCDIR ~ "/enkfstat." ~ RUN ~ "." ~ cycle_YMDH ]) %}
{% do enkf_files.append([COMIN_ATMOS_ANALYSIS_ENSSTAT ~ "/" ~ head ~ "gsistat.ensmean",
ARCDIR ~ "/gsistat." ~ RUN ~ "." ~ cycle_YMDH ~ ".ensmean"]) %}
{% endif %}

# Construct the final file set
{% set file_set = file_set + enkf_files %}

Ensemble: &ensemble # ENKFGDAS or ENKFGFS
analysis:
copy:
# Copy ensemble analyses
{% if DO_JEDIATMENS %}
- ["{{ COMIN_ATMOS_ANALYSIS_ENSSTAT }}/{{ head }}atmensstat", "{{ ARCDIR }}/atmensstat.{{ RUN }}.{{ cycle_YMDH }}"]
- ["{{ COMIN_ATMOS_ANALYSIS_ENSSTAT }}/{{ head }}atminc.ensmean.nc", "{{ ARCDIR }}/atmensstat.{{ RUN }}.{{ cycle_YMDH }}.ensmean.nc"]
{% else %}
- ["{{ COMIN_ATMOS_ANALYSIS_ENSSTAT }}/{{ head }}enkfstat", "{{ ARCDIR }}/enkfstat.{{ RUN }}.{{ cycle_YMDH }}"]
- ["{{ COMIN_ATMOS_ANALYSIS_ENSSTAT }}/{{ head }}gsistat.ensmean", "{{ ARCDIR }}/gsistat.{{ RUN }}.{{ cycle_YMDH }}.ensmean"]
{% endif %}

{% if "enkf" in RUN %}
{{ RUN }}:
<<: *base
<<: *ensemble
{% endif %}


# Actually write the yaml
mkdir:
- "{{ ARCDIR }}"

{% if DO_FIT2OBS == True %}
- "{{ VFYARC }}/{{ RUN }}.{{ cycle_YMD }}/{{ cycle_HH }}"
{% endif %}

copy:
{% for source_dest_pair in file_set %}
- {{ source_dest_pair }}
{% endfor %}
24 changes: 8 additions & 16 deletions ush/python/pygfs/task/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
from logging import getLogger
from typing import Any, Dict, List

from wxflow import (AttrDict, FileHandler, Hsi, Htar, Task, cast_strdict_as_dtypedict,
from wxflow import (AttrDict, FileHandler, Hsi, Htar, Task,
chgrp, get_gid, logit, mkdir_p, parse_j2yaml, rm_p, strftime,
to_YMD, to_YMDH, Template, TemplateConstants)
to_YMDH)

logger = getLogger(__name__.split('.')[-1])

Expand Down Expand Up @@ -348,19 +348,11 @@ def _construct_arcdir_set(arcdir_j2yaml, arch_dict) -> Dict:
files need to be copied to the ARCDIR and the Fit2Obs directory.
"""

# Parse the input jinja yaml template
arcdir_yaml = parse_j2yaml(arcdir_j2yaml,
arch_dict,
allow_missing=True)

# Collect the needed FileHandler dicts and construct arcdir_set
arcdir_set = {}
for key, handler in arcdir_yaml[arch_dict.RUN].items():
# Different RUNs can have different filesets that need to be copied.
# Each fileset is stored as a dictionary. Collect the contents of
# each (which should be 'mkdir' and/or 'copy') to produce singular
# mkdir and copy lists.
arcdir_set.update(handler)
# Get the FileHandler dictionary for creating directories and copying
# to the ARCDIR and VFYARC directories.
arcdir_set = parse_j2yaml(arcdir_j2yaml,
arch_dict,
allow_missing=True)

return arcdir_set

Expand Down Expand Up @@ -421,7 +413,7 @@ def replace_string_from_to_file(filename_in, filename_out, search_str, replace_s
with open(filename_in) as old_file:
lines = old_file.readlines()

out_lines = [line.replace(search, replace) for line in lines]
out_lines = [line.replace(search_str, replace_str) for line in lines]

with open("/tmp/track_file", "w") as new_file:
new_file.writelines(out_lines)
Expand Down
Loading