Skip to content

Commit

Permalink
Issues/127: Updates to compute_coordinate_variable_names for grouped …
Browse files Browse the repository at this point in the history
…datasets (#135)

* Update tests to use parameterized pytest. Some tests failing on new TEMPO test dataset

* Replace / in all var names if the dataset is grouped

* perform flattening as necessary in test suite (#130)

* move methods for flattening netcdf and hdf group structures to separate module

* feature/PODAAC-5065 (#129)

* fix way xarray open granules that have  as a time unit

* fix pylint

* change function to use original function if can parse only change units if we can not parse

* make xarray override into its own function

* add test for override_decode_cf_datetime function

* disable pyline one line instead of global

* Update podaac/subsetter/subset.py

Co-authored-by: Frank Greguska <[email protected]>

* add missing parameter to docstring

* typo in docstring

* extract netcdf opening procedure from beginning of `subset() into a new function

* update tests to use netcdf opening wrapper function, to prevent errors with tempo data

* /version 2.3.0-alpha.5

* update `test_specified_variables()` to use netcdf opening wrapper function in multiple places

to prevent errors with tempo data

* cosmetic

* clean up comment and use 'decode_times'=True for test

* feature/issue 126 (#131)

* Add variable leading slash flexibility

* Add tests back to test file

* changelog added and updated

* Update podaac/subsetter/subset.py

Co-authored-by: Frank Greguska <[email protected]>

* update Syntax

* resolve conflict

Co-authored-by: nlensse1 <[email protected]>
Co-authored-by: Frank Greguska <[email protected]>

* /version 2.3.0-alpha.6

* Update build-pipeline.yml

* /version 2.3.0-alpha.7

* Merge changes from origin/develop

* Merge changes from issues/127

Co-authored-by: sliu008 <[email protected]>
Co-authored-by: Frank Greguska <[email protected]>
Co-authored-by: l2ss-py bot <[email protected]>
Co-authored-by: Nick Lenssen <[email protected]>
Co-authored-by: nlensse1 <[email protected]>

* Run verification on issue branches

* Updated changelog

* add module docstring

* fix import statements

* fix import statements

* Fix tests

Co-authored-by: Daniel Kaufman <[email protected]>
Co-authored-by: sliu008 <[email protected]>
Co-authored-by: l2ss-py bot <[email protected]>
Co-authored-by: Nick Lenssen <[email protected]>
Co-authored-by: nlensse1 <[email protected]>
Co-authored-by: danielfromearth <[email protected]>
  • Loading branch information
7 people committed Dec 13, 2022
1 parent 13c5eeb commit e36f1e7
Show file tree
Hide file tree
Showing 8 changed files with 2,274 additions and 2,214 deletions.
29 changes: 22 additions & 7 deletions .github/workflows/build-pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ name: Build
on:
# Triggers the workflow on push events
push:
branches: [ develop, release/**, main, feature/** ]
branches: [ develop, release/**, main, feature/**, issue/**, issues/**, dependabot/** ]

# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:
Expand Down Expand Up @@ -36,7 +36,10 @@ jobs:
echo "pyproject_name=$(poetry version | awk '{print $1}')" >> $GITHUB_ENV
- name: Bump pre-alpha version
# If triggered by push to a feature branch
if: ${{ startsWith(github.ref, 'refs/heads/feature/') }}
if: |
${{ startsWith(github.ref, 'refs/heads/issue') }} ||
${{ startsWith(github.ref, 'refs/heads/dependabot/') }} ||
${{ startsWith(github.ref, 'refs/heads/feature/') }}
run: |
new_ver="${{ steps.get-version.outputs.current_version }}+$(git rev-parse --short ${GITHUB_SHA})"
poetry version $new_ver
Expand Down Expand Up @@ -160,6 +163,7 @@ jobs:
name: python-artifact
path: dist/*
- name: Publish to test.pypi.org
id: pypi-test-publish
if: |
github.ref == 'refs/heads/develop' ||
startsWith(github.ref, 'refs/heads/release')
Expand All @@ -170,19 +174,24 @@ jobs:
poetry publish -r testpypi
- name: Publish to pypi.org
if: ${{ github.ref == 'refs/heads/main' }}
id: pypi-publish
env:
POETRY_PYPI_TOKEN_PYPI: ${{secrets.POETRY_PYPI_TOKEN_PYPI}}
run: |
poetry publish
- name: Log in to the Container registry
if: ${{ !startsWith(github.ref, 'refs/heads/feature') }}
if: |
steps.pypi-test-publish.conclusion == 'success' ||
steps.pypi-publish.conclusion == 'success'
uses: docker/login-action@v1
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata (tags, labels) for Docker
if: ${{ !startsWith(github.ref, 'refs/heads/feature') }}
if: |
steps.pypi-test-publish.conclusion == 'success' ||
steps.pypi-publish.conclusion == 'success'
id: meta
uses: docker/metadata-action@v4
with:
Expand All @@ -191,12 +200,16 @@ jobs:
type=semver,pattern={{version}},value=${{ env.software_version }}
type=raw,value=${{ env.venue }}
- name: Wait for package
if: ${{ !startsWith(github.ref, 'refs/heads/feature') }}
if: |
steps.pypi-test-publish.conclusion == 'success' ||
steps.pypi-publish.conclusion == 'success'
run: |
pip install tenacity
${GITHUB_WORKSPACE}/.github/workflows/wait-for-pypi.py ${{env.pyproject_name}}[harmony]==${{ env.software_version }}
- name: Build and push Docker image
if: ${{ !startsWith(github.ref, 'refs/heads/feature') }}
if: |
steps.pypi-test-publish.conclusion == 'success' ||
steps.pypi-publish.conclusion == 'success'
uses: docker/build-push-action@v3
with:
context: .
Expand All @@ -208,7 +221,9 @@ jobs:
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
- name: Run Snyk on Docker Image
if: ${{ !startsWith(github.ref, 'refs/heads/feature') }}
if: |
steps.pypi-test-publish.conclusion == 'success' ||
steps.pypi-publish.conclusion == 'success'
# Snyk can be used to break the build when it detects vulnerabilities.
# In this case we want to upload the issues to GitHub Code Scanning
continue-on-error: true
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ for variables to not have leading slash in the front
### Removed
### Fixed
- PODAAC-5065: integration with SMAP_RSS_L2_SSS_V5, fix way xarray open granules that have `seconds since 2000-1-1 0:0:0 0` as a time unit.
- [issue/127](https://github.com/podaac/l2ss-py/issues/127): Fixed bug when subsetting variables in grouped datasets. Variable names passed to `subset` will now have `/` replaced by `GROUP_DELIM` so they can be located in flattened datasets
### Security

## [2.2.0]
Expand Down
238 changes: 238 additions & 0 deletions podaac/subsetter/group_handling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
"""
group_handling.py
Functions for converting multidimensional data structures
between a group hierarchy and a flat structure
"""
from shutil import copy

import h5py
import netCDF4 as nc
import numpy as np
import xarray as xr

GROUP_DELIM = '__'


def transform_grouped_dataset(nc_dataset, file_to_subset):
"""
Transform a netCDF4 Dataset that has groups to an xarray compatible
dataset. xarray does not work with groups, so this transformation
will flatten the variables in the dataset and use the group path as
the new variable name. For example, data_01 > km > sst would become
'data_01__km__sst', where GROUP_DELIM is __.
This same pattern is applied to dimensions, which are located under
the appropriate group. They are renamed and placed in the root
group.
Parameters
----------
nc_dataset : nc.Dataset
netCDF4 Dataset that contains groups
file_to_subset : str
Returns
-------
nc.Dataset
netCDF4 Dataset that does not contain groups and that has been
flattened.
"""

# Close the existing read-only dataset and reopen in append mode
nc_dataset.close()
nc_dataset = nc.Dataset(file_to_subset, 'r+')

dimensions = {}

def walk(group_node, path):
for key, item in group_node.items():
group_path = f'{path}{GROUP_DELIM}{key}'

# If there are variables in this group, copy to root group
# and then delete from current group
if item.variables:
# Copy variables to root group with new name
for var_name, var in item.variables.items():
var_group_name = f'{group_path}{GROUP_DELIM}{var_name}'
nc_dataset.variables[var_group_name] = var
# Delete variables
var_names = list(item.variables.keys())
for var_name in var_names:
del item.variables[var_name]

if item.dimensions:
dims = list(item.dimensions.keys())
for dim_name in dims:
new_dim_name = f'{group_path.replace("/", GROUP_DELIM)}{GROUP_DELIM}{dim_name}'
item.dimensions[new_dim_name] = item.dimensions[dim_name]
dimensions[new_dim_name] = item.dimensions[dim_name]
item.renameDimension(dim_name, new_dim_name)

# If there are subgroups in this group, call this function
# again on that group.
if item.groups:
walk(item.groups, group_path)

# Delete non-root groups
group_names = list(group_node.keys())
for group_name in group_names:
del group_node[group_name]

for var_name in list(nc_dataset.variables.keys()):
new_var_name = f'{GROUP_DELIM}{var_name}'
nc_dataset.variables[new_var_name] = nc_dataset.variables[var_name]
del nc_dataset.variables[var_name]

walk(nc_dataset.groups, '')

# Update the dimensions of the dataset in the root group
nc_dataset.dimensions.update(dimensions)

return nc_dataset


def recombine_grouped_datasets(datasets, output_file, start_date): # pylint: disable=too-many-branches
"""
Given a list of xarray datasets, combine those datasets into a
single netCDF4 Dataset and write to the disk. Each dataset has been
transformed using its group path and needs to be un-transformed and
placed in the appropriate group.
Parameters
----------
datasets : list (xr.Dataset)
List of xarray datasets to be combined
output_file : str
Name of the output file to write the resulting NetCDF file to.
"""

base_dataset = nc.Dataset(output_file, mode='w')

for dataset in datasets:
group_lst = []
for var_name in dataset.variables.keys(): # need logic if there is data in the top level not in a group
group_lst.append('/'.join(var_name.split(GROUP_DELIM)[:-1]))
group_lst = ['/' if group == '' else group for group in group_lst]
groups = set(group_lst)
for group in groups:
base_dataset.createGroup(group)

for dim_name in list(dataset.dims.keys()):
new_dim_name = dim_name.split(GROUP_DELIM)[-1]
dim_group = _get_nested_group(base_dataset, dim_name)
dim_group.createDimension(new_dim_name, dataset.dims[dim_name])

# Rename variables
_rename_variables(dataset, base_dataset, start_date)

# Remove group vars from base dataset
for var_name in list(base_dataset.variables.keys()):
if GROUP_DELIM in var_name:
del base_dataset.variables[var_name]

# Remove group dims from base dataset
for dim_name in list(base_dataset.dimensions.keys()):
if GROUP_DELIM in dim_name:
del base_dataset.dimensions[dim_name]

# Copy global attributes
base_dataset.setncatts(datasets[0].attrs)
# Write and close
base_dataset.close()


def _get_nested_group(dataset, group_path):
nested_group = dataset
for group in group_path.strip(GROUP_DELIM).split(GROUP_DELIM)[:-1]:
nested_group = nested_group.groups[group]
return nested_group


def _rename_variables(dataset, base_dataset, start_date):
for var_name in list(dataset.variables.keys()):
new_var_name = var_name.split(GROUP_DELIM)[-1]
var_group = _get_nested_group(base_dataset, var_name)
variable = dataset.variables[var_name]
var_dims = [x.split(GROUP_DELIM)[-1] for x in dataset.variables[var_name].dims]
if np.issubdtype(
dataset.variables[var_name].dtype, np.dtype(np.datetime64)
) or np.issubdtype(
dataset.variables[var_name].dtype, np.dtype(np.timedelta64)
):
if start_date:
dataset.variables[var_name].values = (dataset.variables[var_name].values - np.datetime64(start_date))/np.timedelta64(1, 's')
variable = dataset.variables[var_name]
else:
cf_dt_coder = xr.coding.times.CFDatetimeCoder()
encoded_var = cf_dt_coder.encode(dataset.variables[var_name])
variable = encoded_var

var_attrs = variable.attrs
fill_value = var_attrs.get('_FillValue')
var_attrs.pop('_FillValue', None)
comp_args = {"zlib": True, "complevel": 1}

if variable.dtype == object:
var_group.createVariable(new_var_name, 'S1', var_dims, fill_value=fill_value, **comp_args)
elif variable.dtype == 'timedelta64[ns]':
var_group.createVariable(new_var_name, 'i4', var_dims, fill_value=fill_value, **comp_args)
else:
var_group.createVariable(new_var_name, variable.dtype, var_dims, fill_value=fill_value, **comp_args)

# Copy attributes
var_group.variables[new_var_name].setncatts(var_attrs)

# Copy data
var_group.variables[new_var_name].set_auto_maskandscale(False)
var_group.variables[new_var_name][:] = variable.data


def h5file_transform(finput):
"""
Transform a h5py Dataset that has groups to an xarray compatible
dataset. xarray does not work with groups, so this transformation
will flatten the variables in the dataset and use the group path as
the new variable name. For example, data_01 > km > sst would become
'data_01__km__sst', where GROUP_DELIM is __.
Returns
-------
nc.Dataset
netCDF4 Dataset that does not contain groups and that has been
flattened.
"""
data_new = h5py.File(finput, 'r+')
del_group_list = list(data_new.keys())
has_groups = bool(data_new['/'])

def walk_h5py(data_new, group):
# flattens h5py file
for key, item in data_new[group].items():
group_path = f'{group}{key}'
if isinstance(item, h5py.Dataset):
new_var_name = group_path.replace('/', '__')

data_new[new_var_name] = data_new[group_path]
del data_new[group_path]

elif isinstance(item, h5py.Group):
if len(list(item.keys())) == 0:
new_group_name = group_path.replace('/', '__')
data_new[new_group_name] = data_new[group_path]

walk_h5py(data_new, data_new[group_path].name + '/')

walk_h5py(data_new, data_new.name)

for del_group in del_group_list:
del data_new[del_group]

finputnc = '.'.join(finput.split('.')[:-1]) + '.nc'

data_new.close() # close the h5py dataset
copy(finput, finputnc) # copy to a nc file

nc_dataset = nc.Dataset(finputnc, mode='r')

return nc_dataset, has_groups
Loading

0 comments on commit e36f1e7

Please sign in to comment.