Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issues/127: Updates to compute_coordinate_variable_names for grouped datasets #135

Merged
merged 11 commits into from
Dec 13, 2022
29 changes: 22 additions & 7 deletions .github/workflows/build-pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ name: Build
on:
# Triggers the workflow on push events
push:
branches: [ develop, release/**, main, feature/** ]
branches: [ develop, release/**, main, feature/**, issue/**, issues/**, dependabot/** ]

# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:
Expand Down Expand Up @@ -36,7 +36,10 @@ jobs:
echo "pyproject_name=$(poetry version | awk '{print $1}')" >> $GITHUB_ENV
- name: Bump pre-alpha version
# If triggered by push to a feature branch
if: ${{ startsWith(github.ref, 'refs/heads/feature/') }}
if: |
${{ startsWith(github.ref, 'refs/heads/issue') }} ||
${{ startsWith(github.ref, 'refs/heads/dependabot/') }} ||
${{ startsWith(github.ref, 'refs/heads/feature/') }}
run: |
new_ver="${{ steps.get-version.outputs.current_version }}+$(git rev-parse --short ${GITHUB_SHA})"
poetry version $new_ver
Expand Down Expand Up @@ -160,6 +163,7 @@ jobs:
name: python-artifact
path: dist/*
- name: Publish to test.pypi.org
id: pypi-test-publish
if: |
github.ref == 'refs/heads/develop' ||
startsWith(github.ref, 'refs/heads/release')
Expand All @@ -170,19 +174,24 @@ jobs:
poetry publish -r testpypi
- name: Publish to pypi.org
if: ${{ github.ref == 'refs/heads/main' }}
id: pypi-publish
env:
POETRY_PYPI_TOKEN_PYPI: ${{secrets.POETRY_PYPI_TOKEN_PYPI}}
run: |
poetry publish
- name: Log in to the Container registry
if: ${{ !startsWith(github.ref, 'refs/heads/feature') }}
if: |
steps.pypi-test-publish.conclusion == 'success' ||
steps.pypi-publish.conclusion == 'success'
uses: docker/login-action@v1
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata (tags, labels) for Docker
if: ${{ !startsWith(github.ref, 'refs/heads/feature') }}
if: |
steps.pypi-test-publish.conclusion == 'success' ||
steps.pypi-publish.conclusion == 'success'
id: meta
uses: docker/metadata-action@v4
with:
Expand All @@ -191,12 +200,16 @@ jobs:
type=semver,pattern={{version}},value=${{ env.software_version }}
type=raw,value=${{ env.venue }}
- name: Wait for package
if: ${{ !startsWith(github.ref, 'refs/heads/feature') }}
if: |
steps.pypi-test-publish.conclusion == 'success' ||
steps.pypi-publish.conclusion == 'success'
run: |
pip install tenacity
${GITHUB_WORKSPACE}/.github/workflows/wait-for-pypi.py ${{env.pyproject_name}}[harmony]==${{ env.software_version }}
- name: Build and push Docker image
if: ${{ !startsWith(github.ref, 'refs/heads/feature') }}
if: |
steps.pypi-test-publish.conclusion == 'success' ||
steps.pypi-publish.conclusion == 'success'
uses: docker/build-push-action@v3
with:
context: .
Expand All @@ -208,7 +221,9 @@ jobs:
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
- name: Run Snyk on Docker Image
if: ${{ !startsWith(github.ref, 'refs/heads/feature') }}
if: |
steps.pypi-test-publish.conclusion == 'success' ||
steps.pypi-publish.conclusion == 'success'
# Snyk can be used to break the build when it detects vulnerabilities.
# In this case we want to upload the issues to GitHub Code Scanning
continue-on-error: true
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ for variables to not have leading slash in the front
### Removed
### Fixed
- PODAAC-5065: integration with SMAP_RSS_L2_SSS_V5, fix way xarray open granules that have `seconds since 2000-1-1 0:0:0 0` as a time unit.
- [issue/127](https://github.com/podaac/l2ss-py/issues/127): Fixed bug when subsetting variables in grouped datasets. Variable names passed to `subset` will now have `/` replaced by `GROUP_DELIM` so they can be located in flattened datasets
### Security

## [2.2.0]
Expand Down
238 changes: 238 additions & 0 deletions podaac/subsetter/group_handling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
"""
group_handling.py

Functions for converting multidimensional data structures
between a group hierarchy and a flat structure
"""
from shutil import copy

import h5py
import netCDF4 as nc
import numpy as np
import xarray as xr

GROUP_DELIM = '__'


def transform_grouped_dataset(nc_dataset, file_to_subset):
"""
Transform a netCDF4 Dataset that has groups to an xarray compatible
dataset. xarray does not work with groups, so this transformation
will flatten the variables in the dataset and use the group path as
the new variable name. For example, data_01 > km > sst would become
'data_01__km__sst', where GROUP_DELIM is __.

This same pattern is applied to dimensions, which are located under
the appropriate group. They are renamed and placed in the root
group.

Parameters
----------
nc_dataset : nc.Dataset
netCDF4 Dataset that contains groups
file_to_subset : str

Returns
-------
nc.Dataset
netCDF4 Dataset that does not contain groups and that has been
flattened.
"""

# Close the existing read-only dataset and reopen in append mode
nc_dataset.close()
nc_dataset = nc.Dataset(file_to_subset, 'r+')

dimensions = {}

def walk(group_node, path):
for key, item in group_node.items():
group_path = f'{path}{GROUP_DELIM}{key}'

# If there are variables in this group, copy to root group
# and then delete from current group
if item.variables:
# Copy variables to root group with new name
for var_name, var in item.variables.items():
var_group_name = f'{group_path}{GROUP_DELIM}{var_name}'
nc_dataset.variables[var_group_name] = var
# Delete variables
var_names = list(item.variables.keys())
for var_name in var_names:
del item.variables[var_name]

if item.dimensions:
dims = list(item.dimensions.keys())
for dim_name in dims:
new_dim_name = f'{group_path.replace("/", GROUP_DELIM)}{GROUP_DELIM}{dim_name}'
item.dimensions[new_dim_name] = item.dimensions[dim_name]
dimensions[new_dim_name] = item.dimensions[dim_name]
item.renameDimension(dim_name, new_dim_name)

# If there are subgroups in this group, call this function
# again on that group.
if item.groups:
walk(item.groups, group_path)

# Delete non-root groups
group_names = list(group_node.keys())
for group_name in group_names:
del group_node[group_name]

for var_name in list(nc_dataset.variables.keys()):
new_var_name = f'{GROUP_DELIM}{var_name}'
nc_dataset.variables[new_var_name] = nc_dataset.variables[var_name]
del nc_dataset.variables[var_name]

walk(nc_dataset.groups, '')

# Update the dimensions of the dataset in the root group
nc_dataset.dimensions.update(dimensions)

return nc_dataset


def recombine_grouped_datasets(datasets, output_file, start_date): # pylint: disable=too-many-branches
"""
Given a list of xarray datasets, combine those datasets into a
single netCDF4 Dataset and write to the disk. Each dataset has been
transformed using its group path and needs to be un-transformed and
placed in the appropriate group.

Parameters
----------
datasets : list (xr.Dataset)
List of xarray datasets to be combined
output_file : str
Name of the output file to write the resulting NetCDF file to.
"""

base_dataset = nc.Dataset(output_file, mode='w')

for dataset in datasets:
group_lst = []
for var_name in dataset.variables.keys(): # need logic if there is data in the top level not in a group
group_lst.append('/'.join(var_name.split(GROUP_DELIM)[:-1]))
group_lst = ['/' if group == '' else group for group in group_lst]
groups = set(group_lst)
for group in groups:
base_dataset.createGroup(group)

for dim_name in list(dataset.dims.keys()):
new_dim_name = dim_name.split(GROUP_DELIM)[-1]
dim_group = _get_nested_group(base_dataset, dim_name)
dim_group.createDimension(new_dim_name, dataset.dims[dim_name])

# Rename variables
_rename_variables(dataset, base_dataset, start_date)

# Remove group vars from base dataset
for var_name in list(base_dataset.variables.keys()):
if GROUP_DELIM in var_name:
del base_dataset.variables[var_name]

# Remove group dims from base dataset
for dim_name in list(base_dataset.dimensions.keys()):
if GROUP_DELIM in dim_name:
del base_dataset.dimensions[dim_name]

# Copy global attributes
base_dataset.setncatts(datasets[0].attrs)
# Write and close
base_dataset.close()


def _get_nested_group(dataset, group_path):
nested_group = dataset
for group in group_path.strip(GROUP_DELIM).split(GROUP_DELIM)[:-1]:
nested_group = nested_group.groups[group]
return nested_group


def _rename_variables(dataset, base_dataset, start_date):
for var_name in list(dataset.variables.keys()):
new_var_name = var_name.split(GROUP_DELIM)[-1]
var_group = _get_nested_group(base_dataset, var_name)
variable = dataset.variables[var_name]
var_dims = [x.split(GROUP_DELIM)[-1] for x in dataset.variables[var_name].dims]
if np.issubdtype(
dataset.variables[var_name].dtype, np.dtype(np.datetime64)
) or np.issubdtype(
dataset.variables[var_name].dtype, np.dtype(np.timedelta64)
):
if start_date:
dataset.variables[var_name].values = (dataset.variables[var_name].values - np.datetime64(start_date))/np.timedelta64(1, 's')
variable = dataset.variables[var_name]
else:
cf_dt_coder = xr.coding.times.CFDatetimeCoder()
encoded_var = cf_dt_coder.encode(dataset.variables[var_name])
variable = encoded_var

var_attrs = variable.attrs
fill_value = var_attrs.get('_FillValue')
var_attrs.pop('_FillValue', None)
comp_args = {"zlib": True, "complevel": 1}

if variable.dtype == object:
var_group.createVariable(new_var_name, 'S1', var_dims, fill_value=fill_value, **comp_args)
elif variable.dtype == 'timedelta64[ns]':
var_group.createVariable(new_var_name, 'i4', var_dims, fill_value=fill_value, **comp_args)
else:
var_group.createVariable(new_var_name, variable.dtype, var_dims, fill_value=fill_value, **comp_args)

# Copy attributes
var_group.variables[new_var_name].setncatts(var_attrs)

# Copy data
var_group.variables[new_var_name].set_auto_maskandscale(False)
var_group.variables[new_var_name][:] = variable.data


def h5file_transform(finput):
"""
Transform a h5py Dataset that has groups to an xarray compatible
dataset. xarray does not work with groups, so this transformation
will flatten the variables in the dataset and use the group path as
the new variable name. For example, data_01 > km > sst would become
'data_01__km__sst', where GROUP_DELIM is __.

Returns
-------
nc.Dataset
netCDF4 Dataset that does not contain groups and that has been
flattened.
"""
data_new = h5py.File(finput, 'r+')
del_group_list = list(data_new.keys())
has_groups = bool(data_new['/'])

def walk_h5py(data_new, group):
# flattens h5py file
for key, item in data_new[group].items():
group_path = f'{group}{key}'
if isinstance(item, h5py.Dataset):
new_var_name = group_path.replace('/', '__')

data_new[new_var_name] = data_new[group_path]
del data_new[group_path]

elif isinstance(item, h5py.Group):
if len(list(item.keys())) == 0:
new_group_name = group_path.replace('/', '__')
data_new[new_group_name] = data_new[group_path]

walk_h5py(data_new, data_new[group_path].name + '/')

walk_h5py(data_new, data_new.name)

for del_group in del_group_list:
del data_new[del_group]

finputnc = '.'.join(finput.split('.')[:-1]) + '.nc'

data_new.close() # close the h5py dataset
copy(finput, finputnc) # copy to a nc file

nc_dataset = nc.Dataset(finputnc, mode='r')

return nc_dataset, has_groups
Loading