Skip to content

Commit

Permalink
chore: deprecate window_ops (#410)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmoralez authored Nov 21, 2024
1 parent 151f9e8 commit 2342dea
Show file tree
Hide file tree
Showing 13 changed files with 466 additions and 393 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11"]
python-version: ["3.9", "3.10", "3.11", "3.12"]
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID_NIXTLA_TMP }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY_NIXTLA_TMP }}
Expand All @@ -44,7 +44,7 @@ jobs:
fail-fast: false
matrix:
os: [macos-13, macos-14, windows-latest]
python-version: ["3.8", "3.9", "3.10", "3.11"]
python-version: ["3.9", "3.10", "3.11", "3.12"]
steps:
- name: Clone repo
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ for best practices.**
- [m4](https://www.kaggle.com/code/lemuz90/m4-competition)
- [m4-cv](https://www.kaggle.com/code/lemuz90/m4-competition-cv)
- [favorita](https://www.kaggle.com/code/lemuz90/mlforecast-favorita)
- [VN1](https://colab.research.google.com/drive/1UdhCAk49k6HgMezG-U_1ETnAB5pYvZk9)

## Why?

Expand Down
8 changes: 0 additions & 8 deletions mlforecast/_modidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,18 +236,10 @@
'mlforecast/grouped_array.py'),
'mlforecast.grouped_array.GroupedArray.expand_target': ( 'grouped_array.html#groupedarray.expand_target',
'mlforecast/grouped_array.py'),
'mlforecast.grouped_array.GroupedArray.restore_fitted_difference': ( 'grouped_array.html#groupedarray.restore_fitted_difference',
'mlforecast/grouped_array.py'),
'mlforecast.grouped_array.GroupedArray.take': ( 'grouped_array.html#groupedarray.take',
'mlforecast/grouped_array.py'),
'mlforecast.grouped_array.GroupedArray.take_from_groups': ( 'grouped_array.html#groupedarray.take_from_groups',
'mlforecast/grouped_array.py'),
'mlforecast.grouped_array._append_several': ( 'grouped_array.html#_append_several',
'mlforecast/grouped_array.py'),
'mlforecast.grouped_array._expand_target': ( 'grouped_array.html#_expand_target',
'mlforecast/grouped_array.py'),
'mlforecast.grouped_array._restore_fitted_difference': ( 'grouped_array.html#_restore_fitted_difference',
'mlforecast/grouped_array.py'),
'mlforecast.grouped_array._transform_series': ( 'grouped_array.html#_transform_series',
'mlforecast/grouped_array.py')},
'mlforecast.lag_transforms': { 'mlforecast.lag_transforms.Combine': ( 'lag_transforms.html#combine',
Expand Down
11 changes: 10 additions & 1 deletion mlforecast/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,19 +154,28 @@ def _parse_transforms(
namer = _build_transform_name
for lag in lags:
transforms[f"lag{lag}"] = Lag(lag)
has_fns = False
for lag in lag_transforms.keys():
for tfm in lag_transforms[lag]:
if isinstance(tfm, _BaseLagTransform):
tfm_name = namer(tfm, lag)
transforms[tfm_name] = clone(tfm)._set_core_tfm(lag)
else:
has_fns = True
tfm, *args = _as_tuple(tfm)
assert callable(tfm)
tfm_name = namer(tfm, lag, *args)
transforms[tfm_name] = (lag, tfm, *args)
if has_fns:
warnings.warn(
"The `window_ops` package (and thus `numba`) will no longer be "
"a dependency in a future version.\n"
"Please make sure to add it to your requirements to ensure compatibility.",
category=FutureWarning,
)
return transforms

# %% ../nbs/core.ipynb 21
# %% ../nbs/core.ipynb 22
class TimeSeries:
"""Utility class for storing and transforming time series data."""

Expand Down
147 changes: 52 additions & 95 deletions mlforecast/grouped_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
# %% auto 0
__all__ = ['GroupedArray']

# %% ../nbs/grouped_array.ipynb 1
# %% ../nbs/grouped_array.ipynb 2
import concurrent.futures
from typing import Any, Dict, Mapping, Tuple, Union

from coreforecast.grouped_array import GroupedArray as CoreGroupedArray
import numpy as np
from coreforecast.grouped_array import GroupedArray as CoreGroupedArray
from utilsforecast.compat import njit

from .compat import shift_array
from .lag_transforms import _BaseLagTransform

# %% ../nbs/grouped_array.ipynb 2
# %% ../nbs/grouped_array.ipynb 3
@njit(nogil=True)
def _transform_series(data, indptr, updates_only, lag, func, *args) -> np.ndarray:
"""Shifts every group in `data` by `lag` and computes `func(shifted, *args)`.
Expand All @@ -34,66 +34,6 @@ def _transform_series(data, indptr, updates_only, lag, func, *args) -> np.ndarra
out[indptr[i] : indptr[i + 1]] = func(lagged, *args)
return out


@njit
def _restore_fitted_difference(diffs_data, diffs_indptr, data, indptr, d):
n_series = len(indptr) - 1
for i in range(n_series):
serie = data[indptr[i] : indptr[i + 1]]
diffs_size = diffs_indptr[i + 1] - diffs_indptr[i]
dropped_rows = diffs_size - serie.size
start_idx = max(0, d - dropped_rows)
for j in range(start_idx, serie.size):
serie[j] += diffs_data[diffs_indptr[i + 1] - serie.size - d + j]


@njit
def _expand_target(data, indptr, max_horizon):
out = np.empty((data.size, max_horizon), dtype=data.dtype)
n_series = len(indptr) - 1
n = 0
for i in range(n_series):
serie = data[indptr[i] : indptr[i + 1]]
for j in range(serie.size):
upper = min(serie.size - j, max_horizon)
for k in range(upper):
out[n, k] = serie[j + k]
for k in range(upper, max_horizon):
out[n, k] = np.nan
n += 1
return out


@njit
def _append_several(
data: np.ndarray,
indptr: np.ndarray,
new_sizes: np.ndarray,
new_values: np.ndarray,
new_groups: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
new_data = np.empty(data.size + new_values.size, dtype=data.dtype)
new_indptr = np.empty(new_sizes.size + 1, dtype=indptr.dtype)
new_indptr[0] = 0
old_indptr_idx = 0
new_vals_idx = 0
for i, is_new in enumerate(new_groups):
new_size = new_sizes[i]
if is_new:
old_size = 0
else:
prev_slice = slice(indptr[old_indptr_idx], indptr[old_indptr_idx + 1])
old_indptr_idx += 1
old_size = prev_slice.stop - prev_slice.start
new_size += old_size
new_data[new_indptr[i] : new_indptr[i] + old_size] = data[prev_slice]
new_indptr[i + 1] = new_indptr[i] + new_size
new_data[new_indptr[i] + old_size : new_indptr[i + 1]] = new_values[
new_vals_idx : new_vals_idx + new_sizes[i]
]
new_vals_idx += new_sizes[i]
return new_data, new_indptr

# %% ../nbs/grouped_array.ipynb 4
class GroupedArray:
"""Array made up of different groups. Can be thought of (and iterated) as a list of arrays.
Expand Down Expand Up @@ -175,21 +115,22 @@ def apply_multithreaded_transforms(
core_tfms[name] = tfm
else:
numba_tfms[name] = tfm
with concurrent.futures.ThreadPoolExecutor(num_threads) as executor:
for tfm_name, (lag, tfm, *args) in numba_tfms.items():
future = executor.submit(
_transform_series,
self.data,
self.indptr,
updates_only,
lag - offset,
tfm,
*args,
)
future_to_result[future] = tfm_name
for future in concurrent.futures.as_completed(future_to_result):
tfm_name = future_to_result[future]
results[tfm_name] = future.result()
if numba_tfms:
with concurrent.futures.ThreadPoolExecutor(num_threads) as executor:
for tfm_name, (lag, tfm, *args) in numba_tfms.items():
future = executor.submit(
_transform_series,
self.data,
self.indptr,
updates_only,
lag - offset,
tfm,
*args,
)
future_to_result[future] = tfm_name
for future in concurrent.futures.as_completed(future_to_result):
tfm_name = future_to_result[future]
results[tfm_name] = future.result()
if core_tfms:
core_ga = CoreGroupedArray(self.data, self.indptr, num_threads)
for name, tfm in core_tfms.items():
Expand All @@ -199,21 +140,16 @@ def apply_multithreaded_transforms(
results[name] = tfm.transform(core_ga)
return results

def restore_fitted_difference(
self, series_data: np.ndarray, series_indptr: np.ndarray, d: int
) -> None:
if len(self.indptr) != len(series_indptr):
raise ValueError("Found different number of groups in fitted differences.")
_restore_fitted_difference(
self.data,
self.indptr,
series_data,
series_indptr,
d,
)

def expand_target(self, max_horizon: int) -> np.ndarray:
return _expand_target(self.data, self.indptr, max_horizon)
out = np.full_like(
self.data, np.nan, shape=(self.data.size, max_horizon), order="F"
)
for j in range(max_horizon):
for i in range(self.n_groups):
out[self.indptr[i] : self.indptr[i + 1] - j, j] = self.data[
self.indptr[i] + j : self.indptr[i + 1]
]
return out

def take_from_groups(self, idx: Union[int, slice]) -> "GroupedArray":
"""Takes `idx` from each group in the array."""
Expand All @@ -240,9 +176,30 @@ def append(self, new_data: np.ndarray) -> "GroupedArray":
def append_several(
self, new_sizes: np.ndarray, new_values: np.ndarray, new_groups: np.ndarray
) -> "GroupedArray":
new_data, new_indptr = _append_several(
self.data, self.indptr, new_sizes, new_values, new_groups
)
new_data = np.empty(self.data.size + new_values.size, dtype=self.data.dtype)
new_indptr = np.empty(new_sizes.size + 1, dtype=self.indptr.dtype)
new_indptr[0] = 0
old_indptr_idx = 0
new_vals_idx = 0
for i, is_new in enumerate(new_groups):
new_size = new_sizes[i]
if is_new:
old_size = 0
else:
prev_slice = slice(
self.indptr[old_indptr_idx], self.indptr[old_indptr_idx + 1]
)
old_indptr_idx += 1
old_size = prev_slice.stop - prev_slice.start
new_size += old_size
new_data[new_indptr[i] : new_indptr[i] + old_size] = self.data[
prev_slice
]
new_indptr[i + 1] = new_indptr[i] + new_size
new_data[new_indptr[i] + old_size : new_indptr[i + 1]] = new_values[
new_vals_idx : new_vals_idx + new_sizes[i]
]
new_vals_idx += new_sizes[i]
return GroupedArray(new_data, new_indptr)

def __repr__(self) -> str:
Expand Down
50 changes: 37 additions & 13 deletions mlforecast/target_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,22 @@ def __init__(self, differences: Iterable[int]):
self.differences = list(differences)

def fit_transform(self, ga: GroupedArray) -> GroupedArray:
self.fitted_: List[GroupedArray] = []
self.fitted_: List[np.ndarray] = []
self.fitted_indptr_: Optional[np.ndarray] = None
original_sizes = np.diff(ga.indptr)
total_diffs = sum(self.differences)
small_series = original_sizes < total_diffs
if small_series.any():
raise _ShortSeriesException(np.arange(ga.n_groups)[small_series])
raise _ShortSeriesException(np.where(small_series)[0])
self.scalers_ = []
core_ga = CoreGroupedArray(ga.data, ga.indptr, self.num_threads)
for d in self.differences:
if self.store_fitted:
# these are saved in order to be able to perform a correct
# inverse transform when trying to retrieve the fitted values.
self.fitted_.append(
GroupedArray(core_ga.data.copy(), core_ga.indptr.copy())
)
self.fitted_.append(core_ga.data.copy())
if self.fitted_indptr_ is None:
self.fitted_indptr_ = core_ga.indptr.copy()
scaler = core_scalers.Difference(d)
transformed = scaler.fit_transform(core_ga)
self.scalers_.append(scaler)
Expand All @@ -122,14 +123,34 @@ def inverse_transform(self, ga: GroupedArray) -> GroupedArray:
return GroupedArray(transformed, ga.indptr)

def inverse_transform_fitted(self, ga: GroupedArray) -> GroupedArray:
ga = copy.copy(ga)
if self.fitted_[0].size < ga.data.size:
raise ValueError("fitted differences are smaller than provided target.")
transformed = ga.data
for d, fitted in zip(reversed(self.differences), reversed(self.fitted_)):
fitted.restore_fitted_difference(ga.data, ga.indptr, d)
return ga
fitted_ga = CoreGroupedArray(fitted, self.fitted_indptr_)
adds = fitted_ga._lag(d)
if adds.size > ga.data.size:
adds = CoreGroupedArray(adds, self.fitted_indptr_)._tails(ga.indptr)
transformed = transformed + adds
return GroupedArray(transformed, ga.indptr)

def take(self, idxs: np.ndarray) -> "Differences":
out = Differences(self.differences)
out.fitted_ = [ga.take(idxs) for ga in self.fitted_]
if self.fitted_indptr_ is None:
out.fitted_ = []
out.fitted_indptr_ = None
else:
out.fitted_ = [
np.hstack(
[
data[self.fitted_indptr_[i] : self.fitted_indptr_[i + 1]]
for i in idxs
]
)
for data in self.fitted_
]
sizes = np.diff(self.fitted_indptr_)[idxs]
out.fitted_indptr_ = np.append(0, sizes.cumsum())
out.scalers_ = [scaler.take(idxs) for scaler in self.scalers_]
return out

Expand All @@ -140,10 +161,13 @@ def stack(scalers: Sequence["Differences"]) -> "Differences": # type: ignore[ov
diffs = first_scaler.differences
out = Differences(diffs)
out.fitted_ = []
for i in range(len(scalers[0].fitted_)):
data = np.hstack([sc.fitted_[i].data for sc in scalers])
sizes = np.hstack([np.diff(sc.fitted_[i].indptr) for sc in scalers])
out.fitted_.append(GroupedArray(data, np.append(0, sizes.cumsum())))
if first_scaler.fitted_indptr_ is None:
out.fitted_indptr_ = None
else:
for i in range(len(scalers[0].fitted_)):
out.fitted_.append(np.hstack([sc.fitted_[i] for sc in scalers]))
sizes = np.hstack([np.diff(sc.fitted_indptr_) for sc in scalers])
out.fitted_indptr_ = np.append(0, sizes.cumsum())
out.scalers_ = [
core_scaler.stack([sc.scalers_[i] for sc in scalers])
for i in range(len(diffs))
Expand Down
Loading

0 comments on commit 2342dea

Please sign in to comment.