-
-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sliding window for pl.Expr.over function #8976
Comments
+1, great idea. I guess we can rewrite this issue request as a "sliding window" clause. |
Yes, I was looking for something like this PySpark function. |
If you want to address the current row and only the first/last row you can df = pl.DataFrame({
"id": [1, 2, 3, 1, 2, 3, 1, 2, 3],
"value": ["a", "b", "c", "d", "e", "f", "g", "h", "i"]
})
df.with_columns(all = pl.col("value") + pl.first("value").over("id") + pl.last("value").over("id"))
# shape: (9, 3)
# ┌─────┬───────┬─────┐
# │ id ┆ value ┆ all │
# │ --- ┆ --- ┆ --- │
# │ i64 ┆ str ┆ str │
# ╞═════╪═══════╪═════╡
# │ 1 ┆ a ┆ aag │
# │ 2 ┆ b ┆ bbh │
# │ 3 ┆ c ┆ cci │
# │ 1 ┆ d ┆ dag │
# │ 2 ┆ e ┆ ebh │
# │ 3 ┆ f ┆ fci │
# │ 1 ┆ g ┆ gag │
# │ 2 ┆ h ┆ hbh │
# │ 3 ┆ i ┆ ici │
# └─────┴───────┴─────┘ If you have a specific example, perhaps a better suggestion could be made. |
For now, I was thinking of doing this instead
|
I dont want to compare it only with the first and current but rather everything in between. |
Basically, I want to rewrite this pyspark function to polars
|
Are you asking to apply functions to an expanding window in addition to the rolling window operations in Polars? To clarify: And you're asking this to be done per group as well as over a whole frame/series? |
In short, I want this functionality, of rowsbetween. Basically, using pl.Expr.over I want to be able to get a different result for each element of the group. For example getting the accumulated sales for each product. I want to group by product, have each slice sorted by month, and apply a sum operation for the current row and all the preceding ones. So jan will have the sales for jan It is a function that SQL has, an PySpark as well. I think it could be a useful enhancement. |
That sounds like a df = pl.DataFrame(dict(
product = ["A", "B", "A", "C", "C", "B", "A"],
date = ["2022/12/01", "2022/10/02", "2022/01/11", "2022/02/01", "2022/03/02", "2022/04/22", "2022/06/12"],
sales = [12, 11, 13, 14, 20, 11, 7]
))
(df.with_row_count()
.sort(pl.col("date").str.to_datetime().dt.month())
.with_columns(
cumsum = pl.col("sales").cumsum().over("product")
)
)
# shape: (7, 5)
# ┌────────┬─────────┬────────────┬───────┬────────┐
# │ row_nr ┆ product ┆ date ┆ sales ┆ cumsum │
# │ --- ┆ --- ┆ --- ┆ --- ┆ --- │
# │ u32 ┆ str ┆ str ┆ i64 ┆ i64 │
# ╞════════╪═════════╪════════════╪═══════╪════════╡
# │ 2 ┆ A ┆ 2022/01/11 ┆ 13 ┆ 13 │
# │ 3 ┆ C ┆ 2022/02/01 ┆ 14 ┆ 14 │
# │ 4 ┆ C ┆ 2022/03/02 ┆ 20 ┆ 34 │ # 14 + 20 (row_nr: 3, 4)
# │ 5 ┆ B ┆ 2022/04/22 ┆ 11 ┆ 11 │
# │ 6 ┆ A ┆ 2022/06/12 ┆ 7 ┆ 20 │ # 13 + 7 (row_nr: 2, 6)
# │ 1 ┆ B ┆ 2022/10/02 ┆ 11 ┆ 22 │ # 11 + 11 (row_nr: 5, 1)
# │ 0 ┆ A ┆ 2022/12/01 ┆ 12 ┆ 32 │ # 13 + 7 + 12 (row_nr: 2, 6, 0)
# └────────┴─────────┴────────────┴───────┴────────┘ |
Maybe not the best example then. However, this is not specific to .cumsum(). |
I also think this would be a useful addition to Polars the SQL and Spark syntax ‘preceding rows’ and ‘following rows’ is very flexible and allows expressing complex calculations in an easy way It’s probably the only thing I miss from PySpark . |
How does this relate to |
hi @ritchie46 , I believe the groupby_rolling does not allow to 'look ahead', like the 'preceding / following' syntax does. For example, here is a Stack Overflow question for reproducing the 'preceding / following' syntax in Polars: So this operation is possible in Polars using the solutions from SO but it's more verbose and potentially less clear than a 'preceding / following' syntax. So overall adding it could reduce the number of Polars functions to learn when first starting with Polars. A Polars syntax could be something like What do you think? |
There seem to be two questions going on here. With the first that asks for an expanding/cumulative window, there's a min_periods argument you can set to 1, but then you also need to know the number of rows in the data/per group, which I guess is an extra step. |
hi @magarick , I agree that it's possible to reproduce most transformations in Polars today (using expressions as building blocks is indeed quite powerful). I think the opportunity is for an easy way to express those transformations. I teach Polars to mid-sized and big organizations, and how to do 'preceding / following' is a question that often comes up. There is oftentimes a way to reproduce calculations with Polars syntax, however this would simplify things (in my opinion). To wrap it up: I am not arguing that Polars should do something because other tools do it. Polars has the advantage of not taking all syntax from other tools and keeping a clean API. However, in this case, it might be worth in my opinion. for the example of preceding = 2i and following = unbounded, I did not take it from a specific use case curious to hear @ritchie46 's thoughts |
There is some support with df = pl.DataFrame(dict(
id = [1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2],
idx = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
val = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k"]
))
(df.groupby_rolling(index_column="idx", by="id", period="5i", offset="-3i")
.agg(window = pl.col("val"))
.with_columns( # just to display it
pl.col("window").list.to_struct(n_field_strategy="max_width")
)
)
# shape: (11, 3)
# ┌─────┬─────┬─────────────────────────┐
# │ id ┆ idx ┆ window │
# │ --- ┆ --- ┆ --- │
# │ i64 ┆ i64 ┆ struct[5] │
# ╞═════╪═════╪═════════════════════════╡
# │ 1 ┆ 1 ┆ {"a","b","c",null,null} │
# │ 1 ┆ 2 ┆ {"a","b","c","d",null} │
# │ 1 ┆ 3 ┆ {"a","b","c","d","e"} │
# │ 1 ┆ 4 ┆ {"b","c","d","e","f"} │
# │ 1 ┆ 5 ┆ {"c","d","e","f",null} │
# │ 1 ┆ 6 ┆ {"d","e","f",null,null} │
# │ 2 ┆ 7 ┆ {"g","h","i",null,null} │
# │ 2 ┆ 8 ┆ {"g","h","i","j",null} │
# │ 2 ┆ 9 ┆ {"g","h","i","j","k"} │
# │ 2 ┆ 10 ┆ {"h","i","j","k",null} │
# │ 2 ┆ 11 ┆ {"i","j","k",null,null} │
# └─────┴─────┴─────────────────────────┘ You don't get leading nulls though, so the "positions" are not "correct", and I'm not sure if there is a proper way to do unbounded. You can use I imagine polars-sql will eventually have to support this RANGE BETWEEN FOLLOWING/PRECEDING UNBOUNDED syntax? So it seems like a good idea if a polars idiom existed for it. |
I ran into this problem while trying to get the count of records that ocurred before the current record. It is trivial to get this result using PySpark but I am having a hard time to get it using Polars. I have created this StackOverflow question but I am also copying the contents of it here. StackOverflow Question> Given a dataset with records of an event where the event can happen multiple times for the same ID I want to find the aggregate of the previous records of that ID. Let's say I have the following table: > > | id | datetime | value | > |-----|----------------|-------| > | 123 | 20230101T00:00 | 2 | > | 123 | 20230101T01:00 | 5 | > | 123 | 20230101T03:00 | 7 | > | 123 | 20230101T04:00 | 1 | > | 456 | 20230201T04:00 | 1 | > | 456 | 20230201T07:00 | 1 | > | 456 | 20230205T04:00 | 1 | > > I want to create a new column "agg" that adds the previous values of "value" found for that same record to get the following table: > > | id | datetime | value | agg | > |-----|----------------|-------|-----| > | 123 | 20230101T00:00 | 2 | 0 | > | 123 | 20230101T01:00 | 5 | 2 | > | 123 | 20230101T03:00 | 7 | 7 | > | 123 | 20230101T04:00 | 1 | 0 | > | 456 | 20230201T04:00 | 1 | 1 | > | 456 | 20230201T07:00 | 1 | 2 | > | 456 | 20230205T04:00 | 1 | 3 | > > [Polars documentation](https://pola-rs.github.io/polars-book/user-guide/expressions/window/) says there is a window function but it is not clear how to collect just the previous values of the current record. I know it is possible to do this with PySpark using: > > ```python > window = Window.partitionBy('id').orderBy('datetime').rowsBetween(Window.unboundedPreceding, -1) > > ( > df_pyspark > .withColumn('agg', f.sum('value').over(window).cast('int')) > .fillna(0, subset=['agg']) > ) > ``` > > --- >I tried using EDIT: There is an answer in StackOverflow that works for my case, maybe it solves some other people's problems here too. I still believe the solution isn't as intuitive as PySpark Window but it works for now. |
Honestly this is the biggest drawback holding me from switching many of my current code from pyspark to Polars. |
The particular example you've given is sort of possible - I believe it is what Pandas calls an "expanding window".
So it can be expressed in terms of
(I'm also not sure of how "efficient" this ends up being) pl.Config(fmt_table_cell_list_len=-1)
df = pl.DataFrame(dict(letter=list("abcdefgh"))).with_row_index()
# period = df.height
# offset = -(df.height + 1)
(df.rolling(index_column=pl.col("index").cast(int), period="8i", offset="-9i")
.agg(
window = pl.col("letter")
)
)
# shape: (8, 2)
# ┌───────┬─────────────────────────────────────┐
# │ index ┆ window │
# │ --- ┆ --- │
# │ i64 ┆ list[str] │
# ╞═══════╪═════════════════════════════════════╡
# │ 0 ┆ [] │
# │ 1 ┆ ["a"] │
# │ 2 ┆ ["a", "b"] │
# │ 3 ┆ ["a", "b", "c"] │
# │ 4 ┆ ["a", "b", "c", "d"] │
# │ 5 ┆ ["a", "b", "c", "d", "e"] │
# │ 6 ┆ ["a", "b", "c", "d", "e", "f"] │
# │ 7 ┆ ["a", "b", "c", "d", "e", "f", "g"] │
# └───────┴─────────────────────────────────────┘ (There was an issue for adding dedicated "expanding" functions but it has gone stale: #4799) |
@cmdlineluser I've tried experimenting with offset and period and yeah, for some small windows sizes it seems to work. However, trying to reproduce an "unbounded preceding" window causes my machine to crash for OOM, even on a big cluster. I believe it's just not optimized for it as it seems is not able to compute it in lazy mode (that's just one hypothesis). |
Any update on this? |
I've been using Polars heavily for the past year and it has become my preferred tool of choice. However, the lack of a Polars equivalent of Spark's from datetime import date
import duckdb
import polars as pl
input_df = pl.DataFrame(
{
"element_id": [1, 1, 1, 2, 2, 2, 2],
"observation_dt": [
date(2024, 5, 25),
date(2024, 6, 1),
date(2024, 6, 7),
date(2024, 5, 7),
date(2024, 5, 12),
date(2024, 5, 14),
date(2024, 5, 20),
],
"feature": [0, 2, 1, 3, 6, 1, 0],
}
)
query = """
SELECT element_id,
observation_dt,
sum(feature) OVER (PARTITION BY element_id ORDER BY observation_dt RANGE BETWEEN INTERVAL 14 DAYS PRECEDING AND INTERVAL 1 DAYS PRECEDING) as transformed_feature
FROM input_df
"""
output_df = duckdb.sql(query).pl() Where
and
I know that one workaround it to use |
This problem still exists in polars 1.0.0. I believe that the PySpark solution using windows and unbounded preceding is the way to go for the Polars API. |
I hope a PySpark-like solution using rowsBetween will be available soon. |
This cannot work when each "group" has a different number of elements. |
A possible solution comes from using the SQL Context from Polars 1.0. df = pl.DataFrame({
'NAME': ['a', 'a', 'a', 'b', 'b'],
'INDEX': [0, 1, 2, 0, 2],
'VAL': [1,1,1,1,1]
})
sql = pl.SQLContext()
sql.register('df', df)
sql.execute(
f"""
SELECT
NAME,
INDEX,
SUM(VAL) OVER (PARTITION BY NAME ORDER BY INDEX ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS CUMSUM
FROM
df
"""
).collect() I get the following
instead of the cumulative sum over the
|
@CarloNicolini I can't find anything in It seems it's just silenty ignored at the moment. |
Yeah I tried with polars.sql too and the results are wrong. Probabily this is intended as there is no specific polars API available yet for |
Any update on this issue? I’m in the process of porting a Spark code-base with a significant amount of |
(input_df
.with_row_index()
.join_where(input_df,
pl.col("element_id") == pl.col("element_id_right"),
pl.col("observation_dt_right").is_between(
pl.col("observation_dt").dt.offset_by("-14d"),
pl.col("observation_dt").dt.offset_by("-1d"),
)
)
.group_by("index")
.agg(
pl.col("element_id", "observation_dt").first(),
pl.col("feature_right").sum().alias("transformed_feature")
)
)
# shape: (5, 4)
# ┌───────┬────────────┬────────────────┬─────────────────────┐
# │ index ┆ element_id ┆ observation_dt ┆ transformed_feature │
# │ --- ┆ --- ┆ --- ┆ --- │
# │ u32 ┆ i64 ┆ date ┆ i64 │
# ╞═══════╪════════════╪════════════════╪═════════════════════╡
# │ 5 ┆ 2 ┆ 2024-05-14 ┆ 9 │
# │ 4 ┆ 2 ┆ 2024-05-12 ┆ 3 │
# │ 6 ┆ 2 ┆ 2024-05-20 ┆ 10 │
# │ 2 ┆ 1 ┆ 2024-06-07 ┆ 2 │
# │ 1 ┆ 1 ┆ 2024-06-01 ┆ 0 │
# └───────┴────────────┴────────────────┴─────────────────────┘ I suppose you could compare But I'm guessing an SQL engine uses some other algorithms/optimizations and these are not actually implemented as just non-equi joins? |
One way to avoid burdening the core polars project (and its developers) with the panoply of exotic (or not) group-by requests would be to allow users to construct custom groupings (similar in spirit to the expression plugin API) and then pass them to the compute engine for executing aggregations. I filed an issue here a while ago: #16584 @ritchie46 does something like this sound feasible? |
Problem description
I would like to have a parameter for pl.Expr.over to determine the window edges. Like to take only the first and current row, or the current row and the last. So, not only the whole group, but also only bits of the group.
Maybe there is such feature, but I could not find it
The text was updated successfully, but these errors were encountered: