Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor sections property #197

Merged
merged 5 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ docs/_build
.build
.ve
.env
.venv
.vscode
.cache
.pytest
.bootstrap
Expand Down
7 changes: 6 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ lint = [
]
format = ["black .", "isort .", "lint",]
test = ["pytest ./src/ ./tests/",] # --doctest-modules
fast-test = ["pytest ./tests/ -m \"not slow\"",]
coverage = [
"pytest --cov --cov-report term --cov-report xml --junitxml=xunit-result.xml tests/",
]
Expand All @@ -134,6 +135,9 @@ test = ["pytest ./src/ ./tests/",] # --doctest-modules

[tool.pytest.ini_options]
testpaths = ["tests"]
markers = [
"slow: marks tests as slow (deselect with '-m \"not slow\"')",
]

[tool.ruff]
select = [ # It would be nice to have the commented out checks working.
Expand Down Expand Up @@ -181,13 +185,14 @@ max-complexity = 10
py_version=39
force_single_line = true
known_first_party = ["dtscalibration"]
skip = [".gitignore", ".tox", "docs"]
skip = [".gitignore", ".tox", "docs", ".venv"]
src_paths = ["src", "tests"]
line_length = 120

[tool.black]
line-length = 88
target-version = ['py39', 'py310', 'py311']
extend-exclude = ".venv"

[tool.mypy]
ignore_missing_imports = true # Preferably false, but matplotlib, scipy and statsmodels are missing typing stubs
Expand Down
223 changes: 223 additions & 0 deletions src/dtscalibration/calibration/section_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
from typing import Dict
from typing import List

import numpy as np
import xarray as xr
import yaml

from dtscalibration.datastore_utils import ufunc_per_section_helper


def set_sections(ds: xr.Dataset, sections: Dict[str, List[slice]]) -> xr.Dataset:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe change the name to set_sections_to_ds() to better describe what it does.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That feels a bit redundant to me. It's a function that takes a dataset and sections, and returns a dataset.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok! Let's merge

sections_validated = None

if sections is not None:
sections_validated = validate_sections(ds, sections=sections)

ds.attrs["_sections"] = yaml.dump(sections_validated)
return ds


def validate_sections(ds: xr.Dataset, sections: Dict[str, List[slice]]):
assert isinstance(sections, dict)

# be less restrictive for capitalized labels
# find lower cases label
labels = np.reshape([[s.lower(), s] for s in ds.data_vars.keys()], (-1,)).tolist()

sections_fix = dict()
for k, v in sections.items():
if k.lower() in labels:
i_lower_case = labels.index(k.lower())
i_normal_case = i_lower_case + 1
k_normal_case = labels[i_normal_case]
sections_fix[k_normal_case] = v
else:
assert k in ds.data_vars, (
"The keys of the "
"sections-dictionary should "
"refer to a valid timeserie "
"already stored in "
"ds.data_vars "
)

sections_fix_slice_fixed = dict()

for k, v in sections_fix.items():
assert isinstance(v, (list, tuple)), (
"The values of the sections-dictionary " "should be lists of slice objects."
)

for vi in v:
assert isinstance(vi, slice), (
"The values of the sections-dictionary should "
"be lists of slice objects."
)

assert ds.x.sel(x=vi).size > 0, (
f"Better define the {k} section. You tried {vi}, "
"which is not within the x-dimension"
)

# sorted stretches
stretch_unsort = [slice(float(vi.start), float(vi.stop)) for vi in v]
stretch_start = [i.start for i in stretch_unsort]
stretch_i_sorted = np.argsort(stretch_start)
sections_fix_slice_fixed[k] = [stretch_unsort[i] for i in stretch_i_sorted]

# Prevent overlapping slices
ix_sec = ufunc_per_section(
ds, sections=sections_fix_slice_fixed, x_indices=True, calc_per="all"
)
assert np.unique(ix_sec).size == ix_sec.size, "The sections are overlapping"

return sections_fix_slice_fixed


def ufunc_per_section(
ds: xr.Dataset,
sections,
func=None,
label=None,
subtract_from_label=None,
temp_err=False,
x_indices=False,
ref_temp_broadcasted=False,
calc_per="stretch",
**func_kwargs,
):
"""
User function applied to parts of the cable. Super useful,
many options and slightly
complicated.

The function `func` is taken over all the timesteps and calculated
per `calc_per`. This
is returned as a dictionary

Parameters
----------
sections : Dict[str, List[slice]], optional
If `None` is supplied, `ds.sections` is used. Define calibration
sections. Each section requires a reference temperature time series,
such as the temperature measured by an external temperature sensor.
They should already be part of the DataStore object. `sections`
is defined with a dictionary with its keywords of the
names of the reference temperature time series. Its values are
lists of slice objects, where each slice object is a fiber stretch
that has the reference temperature. Afterwards, `sections` is stored
under `ds.sections`.
func : callable, str
A numpy function, or lambda function to apple to each 'calc_per'.
label
subtract_from_label
temp_err : bool
The argument of the function is label minus the reference
temperature.
x_indices : bool
To retreive an integer array with the indices of the
x-coordinates in the section/stretch. The indices are sorted.
ref_temp_broadcasted : bool
calc_per : {'all', 'section', 'stretch'}
func_kwargs : dict
Dictionary with options that are passed to func

TODO: Spend time on creating a slice instead of appendng everything\
to a list and concatenating after.


Returns
-------

Examples
--------

1. Calculate the variance of the residuals in the along ALL the\
reference sections wrt the temperature of the water baths

>>> tmpf_var = d.ufunc_per_section(
>>> func='var',
>>> calc_per='all',
>>> label='tmpf',
>>> temp_err=True)

2. Calculate the variance of the residuals in the along PER\
reference section wrt the temperature of the water baths

>>> tmpf_var = d.ufunc_per_section(
>>> func='var',
>>> calc_per='stretch',
>>> label='tmpf',
>>> temp_err=True)

3. Calculate the variance of the residuals in the along PER\
water bath wrt the temperature of the water baths

>>> tmpf_var = d.ufunc_per_section(
>>> func='var',
>>> calc_per='section',
>>> label='tmpf',
>>> temp_err=True)

4. Obtain the coordinates of the measurements per section

>>> locs = d.ufunc_per_section(
>>> func=None,
>>> label='x',
>>> temp_err=False,
>>> ref_temp_broadcasted=False,
>>> calc_per='stretch')

5. Number of observations per stretch

>>> nlocs = d.ufunc_per_section(
>>> func=len,
>>> label='x',
>>> temp_err=False,
>>> ref_temp_broadcasted=False,
>>> calc_per='stretch')

6. broadcast the temperature of the reference sections to\
stretch/section/all dimensions. The value of the reference\
temperature (a timeseries) is broadcasted to the shape of self[\
label]. The self[label] is not used for anything else.

>>> temp_ref = d.ufunc_per_section(
>>> label='st',
>>> ref_temp_broadcasted=True,
>>> calc_per='all')

7. x-coordinate index

>>> ix_loc = d.ufunc_per_section(x_indices=True)


Note
----
If `self[label]` or `self[subtract_from_label]` is a Dask array, a Dask
array is returned else a numpy array is returned
"""
if label is None:
dataarray = None
else:
dataarray = ds[label]

if x_indices:
x_coords = ds.x
reference_dataset = None
else:
x_coords = None
reference_dataset = {k: ds[k] for k in sections}

return ufunc_per_section_helper(
x_coords=x_coords,
sections=sections,
func=func,
dataarray=dataarray,
subtract_from_dataarray=subtract_from_label,
reference_dataset=reference_dataset,
subtract_reference_from_dataarray=temp_err,
ref_temp_broadcasted=ref_temp_broadcasted,
calc_per=calc_per,
**func_kwargs,
)
Empty file.
Loading
Loading