Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add strict parameter to pl.concat(how='horizontal') #20019

Draft
wants to merge 30 commits into
base: main
Choose a base branch
from

Conversation

nimit
Copy link

@nimit nimit commented Nov 27, 2024

PR that closes #19133
Made changes to the python package so that if how='horizontal', the number of rows in the first element are checked with the rest of the elements for both: lazy and eager DataFrames.
strict is set to False by default

Also added unit tests for the changes for cases:

  1. strict=True, rows don't match in eager DataFrame
  2. strict=True, rows don't match in lazy DataFrame
  3. strict=False, rows don't match

@github-actions github-actions bot added enhancement New feature or an improvement of an existing feature python Related to Python Polars labels Nov 27, 2024
@coastalwhite
Copy link
Collaborator

Heya, thank you for the PR.
I do think this needs to happen on the Rust side and not on the Python side. This way there is no way we can reason about it in the query optimizer / engine.

Copy link

codecov bot commented Nov 27, 2024

Codecov Report

Attention: Patch coverage is 92.00000% with 2 lines in your changes missing coverage. Please review.

Project coverage is 79.53%. Comparing base (4c1c51c) to head (6078964).

Files with missing lines Patch % Lines
crates/polars-stream/src/nodes/zip.rs 0.00% 2 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff           @@
##             main   #20019   +/-   ##
=======================================
  Coverage   79.52%   79.53%           
=======================================
  Files        1563     1563           
  Lines      217104   217121   +17     
  Branches     2464     2464           
=======================================
+ Hits       172659   172690   +31     
+ Misses      43885    43871   -14     
  Partials      560      560           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@@ -231,6 +240,14 @@ def concat(
)
)
elif how == "horizontal":
if strict:
nrows = first.select(F.len()).collect()[0, 0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason this should be implemented on the rust side is that this collect here could trigger a massive computation if the query plan is complex, which then gets tossed. The check should be performed when the concatenation operation is actually applied.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. When I initially thought about it, I failed to take into account how I would compare the number of rows on Lazyframes.

@nimit nimit requested a review from orlp as a code owner December 1, 2024 18:16
@nimit
Copy link
Author

nimit commented Dec 2, 2024

@mcrumiller @coastalwhite
Apologies for the delay. I had no idea about some Rust language features.
Can you please have a look at this? I think I made the required changes in concat_df_horizontal, changed the UnionArgs struct to include strict as a parameter and modified the function call in the HConcat plan executor (also made a few changes to other functions that call concat_df_horizontal but they pass None)
The tests should appropriately raise the polars.exceptions.ShapeError when heights don't match.

Unfortunately, my machine is not capable of running make test & make pre-commit in the cargo directory and GitHub Codespaces runs out of disk space every time I try it there.

@mcrumiller
Copy link
Contributor

@nimit I'm not a repo member, I just lurk here a lot, but I can try to help you get things working--what's the issue with running make on your end?

@nimit
Copy link
Author

nimit commented Dec 2, 2024

@nimit I'm not a repo member, I just lurk here a lot, but I can try to help you get things working--what's the issue with running make on your end?

Thanks for your help!
It's running for >4hours with terrible progress (my guess is because I have an older machine)

@mcrumiller
Copy link
Contributor

mcrumiller commented Dec 3, 2024

@nimit it's failing on the new streaming engine:

~/projects/polars/py-polars$ export POLARS_AUTO_NEW_STREAMING=1
~/projects/polars/py-polars$ pytest /home/mcrumiller/projects/polars/py-polars/tests/unit/functions/test_concat.py
=========================================================================================================================================================== test session starts ===========================================================================================================================================================
platform linux -- Python 3.12.6, pytest-8.3.2, pluggy-1.5.0
codspeed: 3.0.0 (disabled, mode: walltime, timer_resolution: 1.0ns)
rootdir: /home/mcrumiller/projects/polars/py-polars
configfile: pyproject.toml
plugins: cov-6.0.0, codspeed-3.0.0, hypothesis-6.119.4, xdist-3.6.1
collected 4 items / 2 deselected / 2 selected                                                                                                                                                                                                                                                                                             

tests/unit/functions/test_concat.py F.                                                                                                                                                                                                                                                                                              [100%]

================================================================================================================================================================ FAILURES =================================================================================================================================================================
_____________________________________________________________________________________________________________________________________________________ test_concat_horizontally_strict _____________________________________________________________________________________________________________________________________________________
tests/unit/functions/test_concat.py:32: in test_concat_horizontally_strict
    with pytest.raises(pl.exceptions.ShapeError):
E   Failed: DID NOT RAISE <class 'polars.exceptions.ShapeError'>
========================================================================================================================================================= short test summary info =========================================================================================================================================================
FAILED tests/unit/functions/test_concat.py::test_concat_horizontally_strict - Failed: DID NOT RAISE <class 'polars.exceptions.ShapeError'>
================================================================================================================================================ 1 failed, 1 passed, 2 deselected in 0.13s ================================================================================================================================================

I'll look into it.

@mcrumiller
Copy link
Contributor

@nimit can you set this PR to draft until we can get this working?

@nimit nimit marked this pull request as draft December 3, 2024 02:31
@mcrumiller
Copy link
Contributor

I'm not familiar at all with the new streaming engine. After taking a look, it looks like there is a parameter in there called null_extend which is the equivalent of our strict from the non-streaming side (with null_extend=False when strict=True).

We need to figure out how to propagate this parameter. Two places to start are polars-stream/src/physical_plan/lower_ir.rs in IR::HConcat. Another is in polars-stream/src/nodes/zip.rs. The calls to concat_df_horizontal(&out, false, false)?; are where we're not propagating our parameter (we need to access the parameter here).

@nimit
Copy link
Author

nimit commented Dec 3, 2024

I believe this should work?
polars-stream/src/physical_plan/lower_ir.rs line 299

IR::HConcat {
            inputs,
            schema: _,
            options,
        } => {
            let null_extend = !options.strict;
            let inputs = inputs
                .clone() // Needed to borrow ir_arena mutably.
                .into_iter()
                .map(|input| lower_ir!(input))
                .collect::<Result<_, _>>()?;
            PhysNodeKind::Zip {
                inputs,
                null_extend: null_extend,
            }
        },

And in polars-stream/src/nodes/zip.rs (line 278),

let strict_concat = !self.null_extend;
let out_df = concat_df_horizontal(&out, false, strict_concat)?;

@mcrumiller
Copy link
Contributor

@nimit yep! That does it. The linter may complain about null_extend: null_extend where you can just use field init shorthand since the parameter's the same name as the field name. Also note that there are two spots where concat_df_horizontal is called and I think we need to pass the strict parameter to both (the other is around line 325).

@nimit
Copy link
Author

nimit commented Dec 4, 2024

@mcrumiller any idea about the error? It is again failing using the streaming engine

@mcrumiller
Copy link
Contributor

I believe that if one of the frames is length-1 there is some broadcasting happening. If we try lengths 2 and 3, it does raise:

df1 = pl.LazyFrame({"a": [0, 1, 2], "b": [1, 2, 3]})
df2 = pl.LazyFrame({"c": [11, 22], "d": [33, 44]})
df = pl.concat([df1.lazy(), df2.lazy()], how="horizontal", strict=True).collect()
# polars.exceptions.ShapeError: zip node received non-equal length inputs

If I set df2 to length 1, we get broadcasting:

df1 = pl.LazyFrame({"a": [0, 1, 2], "b": [1, 2, 3]})
df2 = pl.LazyFrame({"c": [11], "d": [33]})
df = pl.concat([df1.lazy(), df2.lazy()], how="horizontal", strict=True).collect()
# shape: (3, 4)
┌─────┬─────┬─────┬─────┐
│ abcd   │
│ ------------ │
│ i64i64i64i64 │
╞═════╪═════╪═════╪═════╡
│ 011133  │
│ 121133  │
│ 231133  │
└─────┴─────┴─────┴─────┘

Need to understand how the broadcasting here works.

@nimit
Copy link
Author

nimit commented Dec 4, 2024

Is the broadcasting expected behavior though? Maybe it wouldn't broadcast if we declare df2 as, df2 = pl.LazyFrame({"c": 11, "d": 33})
I will check this out though..

Also, the second variant (without broadcasting), generates an error but it doesn't say anything about strict being true. Should we change that? I have added "cannot concat dataframes with different heights in 'strict' mode" when raising the ShapeError but it doesn't seem to be using that

polars.exceptions.ShapeError: zip node received non-equal length inputs

@mcrumiller
Copy link
Contributor

mcrumiller commented Dec 4, 2024

.map(|s| InputHead::new(s, !null_extend))

So negating !null_extend to null_extend here appears to fix everything, but I think we may need @ritchie46 to help us here what the original intent was.

I think the intent here is, when faced with a length-1 df concatenated with other dfs, then either we can 1) broadcast, or 2) fill nulls. Our strict parameter doesn't allow for either of these, which is a problem. However, either something isn't properly implemented, or this parameter was inadvertently negated, because it appears that negating it completely solves the issue:

  1. If we pass a length 1 df with a length > 1 df, and strict=False, we broadcast (is this what we want?).
  2. If we pass a length 1 df with a length > df, and strict=True, we raise correctly.
  3. If we pass a length 2 df with a length 3 df, and strict=False, we get null-filled (is this what we want?).
  4. If we pass a length 2 df with a length 3 df, and strict=True, we raise correctly.

I wonder if the intent is to have some sort of broadcast parameter for the length-1 df case. If not, then we can flip this boolean and all seems well.

@nimit
Copy link
Author

nimit commented Dec 4, 2024

This pull request adds the broadcasting/null extend.
Unfortunately, there is no description so either @orlp (author of the pull request) or @ritchie46 needs to confirm.
But... I agree, it does seem that may_broadcast in the InputHead constructor is getting the wrong value here.

@mcrumiller
Copy link
Contributor

mcrumiller commented Dec 4, 2024

@nimit why don't you:

  1. Flip that boolean so all tests pass (at least, I think they will).
  2. Beef up the test to include more scenarios, i.e. df height 1 vs N, versus df height 2 vs N to distinguish "potential broadcasting" cases. This way we can really spell out what the desired behavior is with regarding to broadcasting vs null-filling.
  3. If all pass, mark as ready for review, so one of the code owners can address our question.

@mcrumiller
Copy link
Contributor

I'm at work so can't rust but I can help looking into the failures tonight.

@nimit
Copy link
Author

nimit commented Dec 4, 2024

I think I figured out the intention behind the negation of null_extend
Zip will always convert singleton elements to same length arrays.
If null_extend is true, it will extend with null elements otherwise it will broadcast the singleton element
So basically, for us it will make the strict parameter redundant in the function call to concat_df_horizontal

@mcrumiller
Copy link
Contributor

mcrumiller commented Dec 4, 2024

That was my guess in my prior message. We may have to make this a new parameter in the streaming, since Zip may be intended to be used elsewhere in addition to horizontal concat. But I'm not sure.

@nimit
Copy link
Author

nimit commented Dec 5, 2024

@mcrumiller I cannot think of a very low impact way of dealing with this.
Should we maybe try and figure out where the call to Zip is made and try to add a height check there?

@mcrumiller
Copy link
Contributor

@mcrumiller I cannot think of a very low impact way of dealing with this. Should we maybe try and figure out where the call to Zip is made and try to add a height check there?

We can't. The entire point of streaming is that it applies to operations where the data "streams" in. You don't know that the heights are different until one df ends and the other doesn't. This has to be a parameter.

@nimit
Copy link
Author

nimit commented Dec 5, 2024

Oh right... That must also mean that all multi-df operations would use Zip (right?).
Should we add a strict_shape or no_modify parameter then?
We would need to amend all Zip calls..

@mcrumiller
Copy link
Contributor

mcrumiller commented Dec 5, 2024

Looking at the Zip node definition, it's this:

    Zip {
        inputs: Vec<PhysNodeKey>,
        /// If true shorter inputs are extended with nulls to the longest input,
        /// if false all inputs must be the same length, or have length 1 in
        /// which case they are broadcast.
        null_extend: bool,
    },

This assumes two possible behaviors when confronted with a 1-row frame against a multi-row frame: broadcast or null-fill. When strict=True, we want to error. So what we need is three options: broadcast, null-fill, or raise.

I think the way to go is to replace this parameter with an Enum called something like extend or extend_behavior with three options: null, broadcast, and raise (or error). We'd have to find where Zip nodes are used elsewhere and make sure to pass the correct value in those cases.

Let's see if we can get Ritchie's input here.

@coastalwhite
Copy link
Collaborator

@orlp might be able to produce some input next week

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or an improvement of an existing feature python Related to Python Polars rust Related to Rust Polars
Projects
None yet
Development

Successfully merging this pull request may close these issues.

pl.concat(how='horizontal') should be strict by default
8 participants