Skip to content

Commit

Permalink
CLN: rolling step followups (pandas-dev#46191)
Browse files Browse the repository at this point in the history
  • Loading branch information
mroeschke authored Mar 2, 2022
1 parent 31c553f commit 0ccad38
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 185 deletions.
1 change: 0 additions & 1 deletion pandas/_libs/window/indexers.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,5 @@ def calculate_variable_window_bounds(
min_periods,
center: bool,
closed: str | None,
step: int | None,
index: np.ndarray, # const int64_t[:]
) -> tuple[npt.NDArray[np.int64], npt.NDArray[np.int64]]: ...
6 changes: 1 addition & 5 deletions pandas/_libs/window/indexers.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ def calculate_variable_window_bounds(
object min_periods, # unused but here to match get_window_bounds signature
bint center,
str closed,
int64_t step,
const int64_t[:] index
):
"""
Expand All @@ -39,9 +38,6 @@ def calculate_variable_window_bounds(
closed : str
string of side of the window that should be closed
step : int64
Spacing between windows
index : ndarray[int64]
time series index to roll over
Expand Down Expand Up @@ -150,4 +146,4 @@ def calculate_variable_window_bounds(
# right endpoint is open
if not right_closed and not center:
end[i] -= 1
return start[::step], end[::step]
return start, end
25 changes: 5 additions & 20 deletions pandas/core/indexers/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ def get_window_bounds(
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray]:

if step is not None:
raise NotImplementedError("step not implemented for variable window")

# error: Argument 4 to "calculate_variable_window_bounds" has incompatible
# type "Optional[bool]"; expected "bool"
# error: Argument 6 to "calculate_variable_window_bounds" has incompatible
Expand All @@ -128,7 +125,6 @@ def get_window_bounds(
min_periods,
center, # type: ignore[arg-type]
closed,
1,
self.index_array, # type: ignore[arg-type]
)

Expand Down Expand Up @@ -234,12 +230,10 @@ def get_window_bounds(
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray]:

if step is not None:
raise NotImplementedError("step not implemented for expanding window")

end = np.arange(1, num_values + 1, dtype=np.int64)
start = np.zeros(len(end), dtype=np.int64)
return start, end
return (
np.zeros(num_values, dtype=np.int64),
np.arange(1, num_values + 1, dtype=np.int64),
)


class FixedForwardWindowIndexer(BaseIndexer):
Expand Down Expand Up @@ -343,8 +337,6 @@ def get_window_bounds(
closed: str | None = None,
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray]:
if step is not None:
raise NotImplementedError("step not implemented for groupby window")

# 1) For each group, get the indices that belong to the group
# 2) Use the indices to calculate the start & end bounds of the window
Expand Down Expand Up @@ -404,11 +396,4 @@ def get_window_bounds(
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray]:

if step is not None:
raise NotImplementedError(
"step not implemented for exponentail moving window"
)
return (
np.array([0], dtype=np.int64),
np.array([num_values], dtype=np.int64),
)
return np.array([0], dtype=np.int64), np.array([num_values], dtype=np.int64)
60 changes: 24 additions & 36 deletions pandas/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ def _validate(self) -> None:
)
if self.method not in ["table", "single"]:
raise ValueError("method must be 'table' or 'single")
if self.step is not None:
if not is_integer(self.step):
raise ValueError("step must be an integer")
elif self.step < 0:
raise ValueError("step must be >= 0")

def _check_window_bounds(
self, start: np.ndarray, end: np.ndarray, num_vals: int
Expand All @@ -238,16 +243,14 @@ def _check_window_bounds(
f"start ({len(start)}) and end ({len(end)}) bounds must be the "
f"same length"
)
elif not isinstance(self._get_window_indexer(), GroupbyIndexer) and len(
start
) != (num_vals + (self.step or 1) - 1) // (self.step or 1):
elif len(start) != (num_vals + (self.step or 1) - 1) // (self.step or 1):
raise ValueError(
f"start and end bounds ({len(start)}) must be the same length "
f"as the object ({num_vals}) divided by the step ({self.step}) "
f"if given and rounded up"
)

def _slice_index(self, index: Index, result: Sized | None = None) -> Index:
def _slice_axis_for_step(self, index: Index, result: Sized | None = None) -> Index:
"""
Slices the index for a given result and the preset step.
"""
Expand Down Expand Up @@ -446,7 +449,7 @@ def _apply_series(
raise DataError("No numeric types to aggregate") from err

result = homogeneous_func(values)
index = self._slice_index(obj.index, result)
index = self._slice_axis_for_step(obj.index, result)
return obj._constructor(result, index=index, name=obj.name)

def _apply_blockwise(
Expand Down Expand Up @@ -484,7 +487,7 @@ def hfunc(values: ArrayLike) -> ArrayLike:
res_values.append(res)
taker.append(i)

index = self._slice_index(
index = self._slice_axis_for_step(
obj.index, res_values[0] if len(res_values) > 0 else None
)
df = type(obj)._from_arrays(
Expand Down Expand Up @@ -524,7 +527,7 @@ def _apply_tablewise(
values = values.T if self.axis == 1 else values
result = homogeneous_func(values)
result = result.T if self.axis == 1 else result
index = self._slice_index(obj.index, result)
index = self._slice_axis_for_step(obj.index, result)
columns = (
obj.columns
if result.shape[1] == len(obj.columns)
Expand Down Expand Up @@ -644,13 +647,13 @@ def _numba_apply(
)
result = aggregator(values, start, end, min_periods, *func_args)
result = result.T if self.axis == 1 else result
index = self._slice_index(obj.index, result)
index = self._slice_axis_for_step(obj.index, result)
if obj.ndim == 1:
result = result.squeeze()
out = obj._constructor(result, index=index, name=obj.name)
return out
else:
columns = self._slice_index(obj.columns, result.T)
columns = self._slice_axis_for_step(obj.columns, result.T)
out = obj._constructor(result, index=index, columns=columns)
return self._resolve_output(out, obj)

Expand Down Expand Up @@ -692,7 +695,7 @@ def __init__(
obj = obj.drop(columns=self._grouper.names, errors="ignore")
# GH 15354
if kwargs.get("step") is not None:
raise NotImplementedError("step not implemented for rolling groupby")
raise NotImplementedError("step not implemented for groupby")
super().__init__(obj, *args, **kwargs)

def _apply(
Expand Down Expand Up @@ -938,14 +941,12 @@ class Window(BaseWindow):
The closed parameter with fixed windows is now supported.
step : int, default None
When supported, applies ``[::step]`` to the resulting sequence of windows, in a
computationally efficient manner. Currently supported only with fixed-length
window indexers. Note that using a step argument other than None or 1 will
produce a result with a different shape than the input.
..versionadded:: 1.5
..versionadded:: 1.5.0
The step parameter is only supported with fixed windows.
Evaluate the window at every ``step`` result, equivalent to slicing as
``[::step]``. ``window`` must be an integer. Using a step argument other
than None or 1 will produce a result with a different shape than the input.
method : str {'single', 'table'}, default 'single'
Expand Down Expand Up @@ -1605,9 +1606,7 @@ def cov(
**kwargs,
):
if self.step is not None:
raise NotImplementedError(
"step not implemented for rolling and expanding cov"
)
raise NotImplementedError("step not implemented for cov")

from pandas import Series

Expand Down Expand Up @@ -1650,11 +1649,8 @@ def corr(
ddof: int = 1,
**kwargs,
):

if self.step is not None:
raise NotImplementedError(
"step not implemented for rolling and expanding corr"
)
raise NotImplementedError("step not implemented for corr")

from pandas import Series

Expand Down Expand Up @@ -1749,24 +1745,16 @@ def _validate(self):
if self.min_periods is None:
self.min_periods = 1

if self.step is not None:
raise NotImplementedError(
"step is not supported with frequency windows"
)

elif isinstance(self.window, BaseIndexer):
# Passed BaseIndexer subclass should handle all other rolling kwargs
pass
elif not is_integer(self.window) or self.window < 0:
raise ValueError("window must be an integer 0 or greater")
# GH 15354:
# validate window indexer parameters do not raise in get_window_bounds
# this cannot be done in BaseWindow._validate because there _get_window_indexer
# would erroneously create a fixed window given a window argument like "1s" due
# to _win_freq_i8 not being set
indexer = self._get_window_indexer()
indexer.get_window_bounds(
num_values=0,
min_periods=self.min_periods,
center=self.center,
closed=self.closed,
step=self.step,
)

def _validate_datetimelike_monotonic(self):
"""
Expand Down
35 changes: 13 additions & 22 deletions pandas/tests/window/test_apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ def test_rolling_apply_out_of_bounds(engine_and_raw):


@pytest.mark.parametrize("window", [2, "2s"])
@pytest.mark.parametrize("step", [None])
def test_rolling_apply_with_pandas_objects(window, step):
def test_rolling_apply_with_pandas_objects(window):
# 5071
df = DataFrame(
{"A": np.random.randn(5), "B": np.random.randint(0, 10, size=5)},
Expand All @@ -67,8 +66,8 @@ def f(x):
return np.nan
return x.iloc[-1]

result = df.rolling(window, step=step).apply(f, raw=False)
expected = df.iloc[2:].reindex_like(df)[::step]
result = df.rolling(window).apply(f, raw=False)
expected = df.iloc[2:].reindex_like(df)
tm.assert_frame_equal(result, expected)

with tm.external_error_raised(AttributeError):
Expand Down Expand Up @@ -96,8 +95,7 @@ def test_rolling_apply(engine_and_raw, step):
tm.assert_series_equal(result, expected)


@pytest.mark.parametrize("step", [None])
def test_all_apply(engine_and_raw, step):
def test_all_apply(engine_and_raw):
engine, raw = engine_and_raw

df = (
Expand All @@ -106,16 +104,15 @@ def test_all_apply(engine_and_raw, step):
).set_index("A")
* 2
)
er = df.rolling(window=1, step=step)
r = df.rolling(window="1s", step=step)
er = df.rolling(window=1)
r = df.rolling(window="1s")

result = r.apply(lambda x: 1, engine=engine, raw=raw)
expected = er.apply(lambda x: 1, engine=engine, raw=raw)
tm.assert_frame_equal(result, expected)


@pytest.mark.parametrize("step", [None])
def test_ragged_apply(engine_and_raw, step):
def test_ragged_apply(engine_and_raw):
engine, raw = engine_and_raw

df = DataFrame({"B": range(5)})
Expand All @@ -128,24 +125,18 @@ def test_ragged_apply(engine_and_raw, step):
]

f = lambda x: 1
result = df.rolling(window="1s", min_periods=1, step=step).apply(
f, engine=engine, raw=raw
)
expected = df.copy()[::step]
result = df.rolling(window="1s", min_periods=1).apply(f, engine=engine, raw=raw)
expected = df.copy()
expected["B"] = 1.0
tm.assert_frame_equal(result, expected)

result = df.rolling(window="2s", min_periods=1, step=step).apply(
f, engine=engine, raw=raw
)
expected = df.copy()[::step]
result = df.rolling(window="2s", min_periods=1).apply(f, engine=engine, raw=raw)
expected = df.copy()
expected["B"] = 1.0
tm.assert_frame_equal(result, expected)

result = df.rolling(window="5s", min_periods=1, step=step).apply(
f, engine=engine, raw=raw
)
expected = df.copy()[::step]
result = df.rolling(window="5s", min_periods=1).apply(f, engine=engine, raw=raw)
expected = df.copy()
expected["B"] = 1.0
tm.assert_frame_equal(result, expected)

Expand Down
7 changes: 3 additions & 4 deletions pandas/tests/window/test_base_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,13 @@ def test_rolling_forward_cov_corr(func, expected):
["left", [0.0, 0.0, 1.0, 2.0, 5.0, 9.0, 5.0, 6.0, 7.0, 8.0]],
],
)
@pytest.mark.parametrize("step", [None])
def test_non_fixed_variable_window_indexer(closed, expected_data, step):
def test_non_fixed_variable_window_indexer(closed, expected_data):
index = date_range("2020", periods=10)
df = DataFrame(range(10), index=index)
offset = BusinessDay(1)
indexer = VariableOffsetWindowIndexer(index=index, offset=offset)
result = df.rolling(indexer, closed=closed, step=step).sum()
expected = DataFrame(expected_data, index=index)[::step]
result = df.rolling(indexer, closed=closed).sum()
expected = DataFrame(expected_data, index=index)
tm.assert_frame_equal(result, expected)


Expand Down
Loading

0 comments on commit 0ccad38

Please sign in to comment.