diff --git a/test/test_um2netcdf.py b/test/test_um2netcdf.py index df1781f..e962d5e 100644 --- a/test/test_um2netcdf.py +++ b/test/test_um2netcdf.py @@ -5,7 +5,272 @@ import umpost.um2netcdf as um2nc import pytest +import numpy as np + import mule +import mule.ff +import iris.cube + + +@pytest.fixture +def z_sea_rho_data(): + # data ripped from aiihca.paa1jan.subset: ff.level_dependent_constants.zsea_at_rho + # TODO: dtype is object, should it be float? + data = np.array([9.9982061118072, 49.998881525751194, 130.00023235363918, + 249.99833311211358, 410.00103476788956, 610.000486354252, + 850.0006133545584, 1130.0014157688088, 1449.9989681136456, + 1810.0011213557837, 2210.0000245285087, 2649.9996031151773, + 3129.9998571157903, 3650.000786530347, 4209.99846587549, + 4810.000746117935, 5449.999776290966, 6129.999481877941, + 6849.999862878861, 7610.000919293723, 8409.998725639172, + 9250.001132881924, 10130.00029005526, 11050.000122642545, + 12010.000630643768, 13010.001814058938, 14050.40014670717, + 15137.719781928794, 16284.973697054254, 17506.96881530842, + 18820.820244130424, 20246.59897992768, 21808.13663216417, + 23542.18357603375, 25520.960854349545, 27901.358260464756, + 31063.888598164976, 36081.76331548462, -1073741824.0], dtype=object) + return data + + +@pytest.fixture +def z_sea_theta_data(): + # data ripped from aiihca.paa1jan.subset: ff.level_dependent_constants.zsea_at_theta + # TODO: dtype is object, should it be float? + data = np.array([0.0, 20.000337706971997, 80.00135082788799, 179.9991138793904, + 320.00147782819437, 500.00059170758476, 720.0003810009191, + 980.0008457081975, 1279.9980603460624, 1619.9998758812287, + 1999.9984413469813, 2420.001607710036, 2880.0015240036764, + 3379.9981902279037, 3919.9994573494323, 4500.001399884905, + 5120.000092350965, 5779.999460230968, 6479.999503524915, + 7220.000222232806, 8000.001616354641, 8819.999760407061, + 9679.998579873429, 10579.998074753737, 11519.998245047991, + 12499.999090756188, 13520.000611878331, 14580.799681536007, + 15694.639882321579, 16875.311437270288, 18138.62619334655, + 19503.01036943094, 20990.18759042441, 22626.081748420565, + 24458.285403646936, 26583.640230535515, 29219.080215877355, + 32908.69305496925, 39254.833576], dtype=object) + return data + + +@pytest.fixture +def mule_vars(z_sea_rho_data, z_sea_theta_data): + """Simulate mule variables from aiihca.paa1jan.subset data.""" + d_lat = 1.25 # spacing manually copied from aiihca.paa1jan.subset file + d_lon = 1.875 + return um2nc.MuleVars(um2nc.GRID_NEW_DYNAMICS, d_lat, d_lon, z_sea_rho_data, z_sea_theta_data) + + +def set_default_attrs(cube, item_code: int, var_name: str): + """Add subset of default attributes to flesh out cube like objects.""" + cube.__dict__.update({"item_code": item_code, + "var_name": var_name, + "long_name": "", + "coord": {"latitude": 0.0, # TODO: real val = ? + "longitude": 0.0}, # TODO: real val + "cell_methods": [], + "data": None, + }) + + section, item = um2nc.to_stash_code(item_code) + cube.attributes = {um2nc.STASH: DummyStash(section, item)} + + +@pytest.fixture +def air_temp_cube(): + # data copied from aiihca.paa1jan.subset file + name = "air_temperature" + m_air_temp = mock.NonCallableMagicMock(spec=iris.cube.Cube, name=name) + set_default_attrs(m_air_temp, 30204, name) + return m_air_temp + + +@pytest.fixture +def precipitation_flux_cube(): + # copied from aiihca.paa1jan.subset file + name = "precipitation_flux" + m_flux = mock.NonCallableMagicMock(spec=iris.cube.Cube, name=name) + set_default_attrs(m_flux, 5216, name) + return m_flux + + +# create cube requiring heaviside_t masking +@pytest.fixture +def geo_potential_cube(): + name = "geopotential_height" + m_geo_potential = mock.NonCallableMagicMock(spec=iris.cube.Cube, name=name) + set_default_attrs(m_geo_potential, 30297, name) + return m_geo_potential + + +@pytest.fixture +def std_args(): + # TODO: make args namedtuple? + args = mock.Mock() + args.nomask = False # perform masking if possible + args.nohist = False + args.nckind = 3 + args.include_list = None + args.exclude_list = None + args.simple = False + args.verbose = False + return args + + +@pytest.fixture +def fake_in_path(): + # use junk paths to protect against accidentally touching filesystems + return "/tmp-does-not-exist/fake_input_fields_file" + + +@pytest.fixture +def fake_out_path(): + # use junk paths to protect against accidentally touching filesystems + return "/tmp-does-not-exist/fake_input_fields_file.nc" + + +def test_process_no_heaviside_drop_cubes(air_temp_cube, precipitation_flux_cube, + geo_potential_cube, mule_vars, std_args, + fake_in_path, fake_out_path): + """Attempt end-to-end process() test, dropping cubes requiring masking.""" + + # FIXME: this convoluted setup is a code smell + # use these tests to gradually refactor process() + # TODO: move towards a design where input & output I/O is extracted from process() + # process() should eventually operate on *data only* args + with ( + # use mocks to prevent mule data extraction file I/O + mock.patch("mule.load_umfile"), + mock.patch("umpost.um2netcdf.process_mule_vars") as m_mule_vars, + + mock.patch("iris.load") as m_iris_load, + mock.patch("iris.fileformats.netcdf.Saver") as m_saver, # prevent I/O + + # TODO: lat/long & level coord fixes require more internal data attrs + # skip temporarily to manage test complexity + mock.patch("umpost.um2netcdf.fix_latlon_coord"), + mock.patch("umpost.um2netcdf.fix_level_coord"), + mock.patch("umpost.um2netcdf.cubewrite"), + ): + m_mule_vars.return_value = mule_vars + + # include cubes requiring both heaviside uv & t cubes to filter, to + # ensure both uv/t dependent cubes are dropped + cubes = [air_temp_cube, precipitation_flux_cube, geo_potential_cube] + + m_iris_load.return_value = cubes + m_saver().__enter__ = mock.Mock(name="mock_sman") + std_args.verbose = True # test some warning branches + + # trying to mask None will break in numpy + assert precipitation_flux_cube.data is None + + # air temp & geo potential should be dropped in process() + processed = um2nc.process(fake_in_path, fake_out_path, std_args) + assert len(processed) == 1 + cube = processed[0] + + assert cube is precipitation_flux_cube + + # contrived testing: if the masking code was reached for some reason, + # the test would fail during process() + assert cube.data is None # masking wasn't called/nothing changed + + +def test_process_all_cubes_filtered(air_temp_cube, geo_potential_cube, + mule_vars, std_args, + fake_in_path, fake_out_path): + """Ensure process() exits early if all cubes are removed in filtering.""" + with ( + mock.patch("mule.load_umfile"), + mock.patch("umpost.um2netcdf.process_mule_vars") as m_mule_vars, + + mock.patch("iris.load") as m_iris_load, + mock.patch("iris.fileformats.netcdf.Saver") as m_saver, # prevent I/O + ): + m_mule_vars.return_value = mule_vars + m_iris_load.return_value = [air_temp_cube, geo_potential_cube] + m_saver().__enter__ = mock.Mock(name="mock_sman") + + # all cubes should be dropped + assert um2nc.process(fake_in_path, fake_out_path, std_args) == [] + + +def test_process_mask_with_heaviside(air_temp_cube, precipitation_flux_cube, + heaviside_uv_cube, heaviside_t_cube, + geo_potential_cube, mule_vars, + std_args, fake_in_path, fake_out_path): + """Run process() with pressure level masking cubes present.""" + with ( + mock.patch("mule.load_umfile"), + mock.patch("umpost.um2netcdf.process_mule_vars") as m_mule_vars, + + mock.patch("iris.load") as m_iris_load, + mock.patch("iris.fileformats.netcdf.Saver") as m_saver, # prevent I/O + mock.patch("umpost.um2netcdf.fix_latlon_coord"), + mock.patch("umpost.um2netcdf.fix_level_coord"), + mock.patch("umpost.um2netcdf.apply_mask"), # TODO: eventually call real version + mock.patch("umpost.um2netcdf.cubewrite"), + ): + m_mule_vars.return_value = mule_vars + + # air temp requires heaviside_uv & geo_potential_cube requires heaviside_t + # masking, include both to enable code execution for both masks + cubes = [air_temp_cube, precipitation_flux_cube, geo_potential_cube, + heaviside_uv_cube, heaviside_t_cube] + + # TODO: convert heaviside cubes to NonCallableMagicMock like other fixtures? + for c in [heaviside_uv_cube, heaviside_t_cube]: + # add attrs to mimic real cubes + attrs = {um2nc.STASH: DummyStash(*um2nc.to_stash_code(c.item_code))} + c.attributes = attrs + c.cell_methods = [] + + m_iris_load.return_value = cubes + m_saver().__enter__ = mock.Mock(name="mock_sman") + + # all cubes should be processed & not dropped + processed = um2nc.process(fake_in_path, fake_out_path, std_args) + assert len(processed) == len(cubes) + + for pc in processed: + assert pc in cubes + + +def test_process_no_masking_keep_all_cubes(air_temp_cube, precipitation_flux_cube, + geo_potential_cube, mule_vars, std_args, + fake_in_path, fake_out_path): + """Run process() with masking off, ensuring all cubes are kept & modified.""" + with ( + mock.patch("mule.load_umfile"), + mock.patch("umpost.um2netcdf.process_mule_vars") as m_mule_vars, + + mock.patch("iris.load") as m_iris_load, + mock.patch("iris.fileformats.netcdf.Saver") as m_saver, # prevent I/O + + mock.patch("umpost.um2netcdf.fix_latlon_coord"), + mock.patch("umpost.um2netcdf.fix_level_coord"), + mock.patch("umpost.um2netcdf.cubewrite"), + ): + m_mule_vars.return_value = mule_vars + + # air temp and geo potential would need heaviside uv & t respectively + cubes = [air_temp_cube, precipitation_flux_cube, geo_potential_cube] + + m_iris_load.return_value = cubes + m_saver().__enter__ = mock.Mock(name="mock_sman") + std_args.nomask = True + + # all cubes should be kept with masking off + processed = um2nc.process(fake_in_path, fake_out_path, std_args) + assert len(processed) == len(cubes) + + for pc in processed: + assert pc in cubes + + +def test_to_stash_code(): + assert um2nc.to_stash_code(5126) == (5, 126) + assert um2nc.to_stash_code(30204) == (30, 204) def test_get_eg_grid_type(): @@ -79,7 +344,7 @@ def test_stash_code_to_item_code_conversion(): assert result == 30255 -@dataclass +@dataclass(frozen=True) class DummyStash: """ Partial Stash representation for testing. @@ -127,12 +392,23 @@ def __init__(self, item_code, var_name=None, attributes=None, units=None): self.var_name = var_name or "unknown_var" self.attributes = attributes self.units = None or units + self.standard_name = None + self.long_name = None + self.coord = {} + def name(self): + # mimic iris API + return self.var_name -def test_set_item_codes_fail_on_overwrite(): - cubes = [DummyCube(1007, "fake_var")] - with pytest.raises(NotImplementedError): - um2nc.set_item_codes(cubes) + +def test_set_item_codes_avoid_overwrite(): + item_code = 1007 + item_code2 = 51006 + + cubes = [DummyCube(item_code, "fake_var"), DummyCube(item_code2, "fake_var2")] + um2nc.set_item_codes(cubes) + assert cubes[0].item_code == item_code + assert cubes[1].item_code == item_code2 @pytest.fixture @@ -150,46 +426,9 @@ def ta_plev_cube(): return DummyCube(30294, "ta_plev") -def test_check_pressure_level_masking_need_heaviside_uv(ua_plev_cube, - heaviside_uv_cube): - cubes = [ua_plev_cube, heaviside_uv_cube] - - (need_heaviside_uv, heaviside_uv, - need_heaviside_t, heaviside_t) = um2nc.check_pressure_level_masking(cubes) - - assert need_heaviside_uv - assert heaviside_uv - assert need_heaviside_t is False - assert heaviside_t is None - - -def test_check_pressure_level_masking_missing_heaviside_uv(ua_plev_cube): - cubes = [ua_plev_cube] - need_heaviside_uv, heaviside_uv, _, _ = um2nc.check_pressure_level_masking(cubes) - - assert need_heaviside_uv - assert heaviside_uv is None - - -def test_check_pressure_level_masking_need_heaviside_t(ta_plev_cube): - heaviside_t_cube = DummyCube(30304) - cubes = (ta_plev_cube, heaviside_t_cube) - - (need_heaviside_uv, heaviside_uv, - need_heaviside_t, heaviside_t) = um2nc.check_pressure_level_masking(cubes) - - assert need_heaviside_uv is False - assert heaviside_uv is None - assert need_heaviside_t - assert heaviside_t - - -def test_check_pressure_level_masking_missing_heaviside_t(ta_plev_cube): - cubes = (ta_plev_cube, ) - _, _, need_heaviside_t, heaviside_t = um2nc.check_pressure_level_masking(cubes) - - assert need_heaviside_t - assert heaviside_t is None +@pytest.fixture +def heaviside_t_cube(): + return DummyCube(30304, "heaviside_t") # cube filtering tests diff --git a/umpost/um2netcdf.py b/umpost/um2netcdf.py index eaba138..31c0482 100644 --- a/umpost/um2netcdf.py +++ b/umpost/um2netcdf.py @@ -11,10 +11,10 @@ """ import os -import sys import argparse import datetime import warnings +import collections from umpost import stashvar_cmip6 as stashvar @@ -40,6 +40,13 @@ # TODO: what is this limit & does it still exist? XCONV_LONG_NAME_LIMIT = 110 +NC_FORMATS = { + 1: 'NETCDF3_CLASSIC', + 2: 'NETCDF3_64BIT', + 3: 'NETCDF4', + 4: 'NETCDF4_CLASSIC' +} + class PostProcessingError(Exception): """Generic class for um2nc specific errors.""" @@ -161,6 +168,7 @@ def fix_level_coord(cube, z_rho, z_theta): c_sigma.var_name = 'sigma_theta' +# TODO: split cube ops into functions, this will likely increase process() workflow steps def cubewrite(cube, sman, compression, use64bit, verbose): try: plevs = cube.coord('pressure') @@ -300,38 +308,29 @@ def apply_mask(c, heaviside, hcrit): def process(infile, outfile, args): - # Use mule to get the model levels to help with dimension naming - # mule 2020.01.1 doesn't handle pathlib Paths properly ff = mule.load_umfile(str(infile)) - - if isinstance(ff, mule.ancil.AncilFile): - raise NotImplementedError('Ancillary files are currently not supported') - - # TODO: eventually move these calls closer to their usage - grid_type = get_grid_type(ff) - dlat, dlon = get_grid_spacing(ff) - z_rho, z_theta = get_z_sea_constants(ff) + mv = process_mule_vars(ff) cubes = iris.load(infile) set_item_codes(cubes) cubes.sort(key=lambda cs: cs.item_code) - (need_heaviside_uv, heaviside_uv, - need_heaviside_t, heaviside_t) = check_pressure_level_masking(cubes) + if args.include_list or args.exclude_list: + cubes = [c for c in filtered_cubes(cubes, args.include_list, args.exclude_list)] - do_mask = not args.nomask # make warning logic more readable + do_masking = not args.nomask + heaviside_uv, heaviside_t = get_heaviside_cubes(cubes) - if do_mask: - # TODO: rename func to better name - check_pressure_warnings(need_heaviside_uv, heaviside_uv, - need_heaviside_t, heaviside_t) + if do_masking: + # drop cubes which cannot be pressure masked if heaviside uv or t is missing + # otherwise keep all cubes when masking is off + cubes = list(non_masking_cubes(cubes, heaviside_uv, heaviside_t, args.verbose)) - # TODO: can NC type be a single arg? - # defer to new process() API - nc_formats = {1: 'NETCDF3_CLASSIC', 2: 'NETCDF3_64BIT', - 3: 'NETCDF4', 4: 'NETCDF4_CLASSIC'} + if not cubes: + print("No cubes left to process after filtering") + return cubes - with iris.fileformats.netcdf.Saver(outfile, nc_formats[args.nckind]) as sman: + with iris.fileformats.netcdf.Saver(outfile, NC_FORMATS[args.nckind]) as sman: # TODO: move attribute mods to end of process() to group sman ops # do when sman ops refactored into a write function # Add global attributes @@ -340,7 +339,7 @@ def process(infile, outfile, args): sman.update_global_attributes({'Conventions': 'CF-1.6'}) - for c in filtered_cubes(cubes, args.include_list, args.exclude_list): + for c in cubes: umvar = stashvar.StashVar(c.item_code) # TODO: rename with `stash` as it's from stash codes fix_var_name(c, umvar.uniquename, args.simple) @@ -352,33 +351,63 @@ def process(infile, outfile, args): c.cell_methods = fix_cell_methods(c.cell_methods) try: - fix_latlon_coord(c, grid_type, dlat, dlon) + fix_latlon_coord(c, mv.grid_type, mv.d_lat, mv.d_lon) except iris.exceptions.CoordinateNotFoundError: print('\nMissing lat/lon coordinates for variable (possible timeseries?)\n') print(c) raise Exception("Variable can not be processed") - fix_level_coord(c, z_rho, z_theta) + fix_level_coord(c, mv.z_rho, mv.z_theta) - if do_mask: + if do_masking: # Pressure level data should be masked - if require_heaviside_uv(c.item_code): - if heaviside_uv: - apply_mask(c, heaviside_uv, args.hcrit) - else: - continue - - if require_heaviside_t(c.item_code): - if heaviside_t: - apply_mask(c, heaviside_t, args.hcrit) - else: - continue + if require_heaviside_uv(c.item_code) and heaviside_uv: + apply_mask(c, heaviside_uv, args.hcrit) + + if require_heaviside_t(c.item_code) and heaviside_t: + apply_mask(c, heaviside_t, args.hcrit) if args.verbose: print(c.name(), c.item_code) + # TODO: split cubewrite ops into funcs & bring those steps into process() workflow + # or a sub process workflow function (like process_mule_vars()) cubewrite(c, sman, args.compression, args.use64bit, args.verbose) + return cubes + + +MuleVars = collections.namedtuple("MuleVars", "grid_type, d_lat, d_lon, z_rho, z_theta") + + +# TODO: rename this function, it's *getting* variables +def process_mule_vars(fields_file: mule.ff.FieldsFile): + """ + Extract model levels and grid structure with Mule. + + The model levels help with workflow dimension naming. + + Parameters + ---------- + fields_file : an open mule fields file. + + Returns + ------- + A MuleVars data structure. + """ + if isinstance(fields_file, mule.ancil.AncilFile): + raise NotImplementedError('Ancillary files are currently not supported') + + if mule.__version__ == "2020.01.1": + msg = "mule 2020.01.1 doesn't handle pathlib Paths properly" + raise NotImplementedError(msg) # fail fast + + grid_type = get_grid_type(fields_file) + d_lat, d_lon = get_grid_spacing(fields_file) + z_rho, z_theta = get_z_sea_constants(fields_file) + + return MuleVars(grid_type, d_lat, d_lon, z_rho, z_theta) + def get_grid_type(ff): """ @@ -462,52 +491,46 @@ def to_item_code(stash_code): return 1000 * stash_code.section + stash_code.item +def to_stash_code(item_code: int): + """Helper: convert item code back to older section & item components.""" + return item_code // 1000, item_code % 1000 + + def set_item_codes(cubes): for cube in cubes: if hasattr(cube, ITEM_CODE): - msg = f"Cube {cube.var_name} already has 'item_code' attribute" - raise NotImplementedError(msg) + msg = f"Cube {cube.var_name} already has 'item_code' attribute, skipping." + warnings.warn(msg) + continue # hack: manually store item_code in cubes item_code = to_item_code(cube.attributes[STASH]) setattr(cube, ITEM_CODE, item_code) -def check_pressure_level_masking(cubes): +def get_heaviside_cubes(cubes): """ - Examines cubes for heaviside uv/t pressure level masking components. + Finds heaviside_uv, heaviside_t cubes in given sequence. Parameters ---------- - cubes : sequence iris Cube objects. + cubes : sequence of cubes. Returns ------- - Tuple: (need_heaviside_uv [bool], heaviside_uv [iris cube or None], - need_heaviside_t [bool], heaviside_t [iris cube or None]) - + (heaviside_uv, heaviside_t) tuple, or None for either cube where the + heaviside_uv/t cubes not found. """ - # Check whether there are any pressure level fields that should be masked. - # Can use temperature to mask instantaneous fields, so really should check - # whether these are time means - need_heaviside_uv = need_heaviside_t = False heaviside_uv = None heaviside_t = None for cube in cubes: - if require_heaviside_uv(cube.item_code): - need_heaviside_uv = True - if is_heaviside_uv(cube.item_code): heaviside_uv = cube - - if require_heaviside_t(cube.item_code): - need_heaviside_t = True - - if is_heaviside_t(cube.item_code): + elif is_heaviside_t(cube.item_code): heaviside_t = cube - return need_heaviside_uv, heaviside_uv, need_heaviside_t, heaviside_t + return heaviside_uv, heaviside_t def require_heaviside_uv(item_code): @@ -530,24 +553,36 @@ def is_heaviside_t(item_code): return item_code == 30304 -def check_pressure_warnings(need_heaviside_uv, heaviside_uv, need_heaviside_t, heaviside_t): +def non_masking_cubes(cubes, heaviside_uv, heaviside_t, verbose: bool): """ - Prints warnings if either of heaviside uv/t are required and not present. + Yields cubes that: + * do not require pressure level masking + * require pressure level masking & the relevant masking cube exists + + This provides filtering to remove cubes from workflows for efficiency. Parameters ---------- - need_heaviside_uv : (bool) - heaviside_uv : iris Cube or None - need_heaviside_t : (bool) - heaviside_t : iris Cube or None + cubes : sequence of iris cubes for filtering + heaviside_uv : heaviside_uv cube or None if it's missing + heaviside_t : heaviside_t cube or None if it's missing + verbose : True to emit warnings to indicate a cube has been removed """ - if need_heaviside_uv and heaviside_uv is None: - print("Warning: heaviside_uv field needed for masking pressure level data is not present. " - "These fields will be skipped") + msg = ("{} field needed for masking pressure level data is missing. " + "Excluding cube '{}' as it cannot be masked") + + for c in cubes: + if require_heaviside_uv(c.item_code) and heaviside_uv is None: + if verbose: + warnings.warn(msg.format("heaviside_uv", c.name())) + continue + + elif require_heaviside_t(c.item_code) and heaviside_t is None: + if verbose: + warnings.warn(msg.format("heaviside_t", c.name())) + continue - if need_heaviside_t and heaviside_t is None: - print("Warning: heaviside_t field needed for masking pressure level data is not present. " - "These fields will be skipped") + yield c def filtered_cubes(cubes, include=None, exclude=None):