Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Use Column in Row Encoding #20312

Merged
merged 2 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions crates/polars-core/src/chunked_array/ops/row_encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::prelude::*;
use crate::utils::_split_offsets;
use crate::POOL;

pub fn encode_rows_vertical_par_unordered(by: &[Series]) -> PolarsResult<BinaryOffsetChunked> {
pub fn encode_rows_vertical_par_unordered(by: &[Column]) -> PolarsResult<BinaryOffsetChunked> {
let n_threads = POOL.current_num_threads();
let len = by[0].len();
let splits = _split_offsets(len, n_threads);
Expand All @@ -33,7 +33,7 @@ pub fn encode_rows_vertical_par_unordered(by: &[Series]) -> PolarsResult<BinaryO

// Almost the same but broadcast nulls to the row-encoded array.
pub fn encode_rows_vertical_par_unordered_broadcast_nulls(
by: &[Series],
by: &[Column],
) -> PolarsResult<BinaryOffsetChunked> {
let n_threads = POOL.current_num_threads();
let len = by[0].len();
Expand All @@ -51,7 +51,8 @@ pub fn encode_rows_vertical_par_unordered_broadcast_nulls(
.flat_map(|s| {
let s = s.rechunk();
#[allow(clippy::unnecessary_to_owned)]
s.chunks()
s.as_materialized_series()
.chunks()
.to_vec()
.into_iter()
.map(|arr| arr.validity().cloned())
Expand Down Expand Up @@ -186,15 +187,15 @@ pub fn get_row_encoding_dictionary(dtype: &DataType) -> Option<RowEncodingContex
}
}

pub fn encode_rows_unordered(by: &[Series]) -> PolarsResult<BinaryOffsetChunked> {
pub fn encode_rows_unordered(by: &[Column]) -> PolarsResult<BinaryOffsetChunked> {
let rows = _get_rows_encoded_unordered(by)?;
Ok(BinaryOffsetChunked::with_chunk(
PlSmallStr::EMPTY,
rows.into_array(),
))
}

pub fn _get_rows_encoded_unordered(by: &[Series]) -> PolarsResult<RowsEncoded> {
pub fn _get_rows_encoded_unordered(by: &[Column]) -> PolarsResult<RowsEncoded> {
let mut cols = Vec::with_capacity(by.len());
let mut opts = Vec::with_capacity(by.len());
let mut dicts = Vec::with_capacity(by.len());
Expand All @@ -206,6 +207,7 @@ pub fn _get_rows_encoded_unordered(by: &[Series]) -> PolarsResult<RowsEncoded> {
for by in by {
debug_assert_eq!(by.len(), num_rows);

let by = by.as_materialized_series();
let arr = by.to_physical_repr().rechunk().chunks()[0].to_boxed();
let opt = RowEncodingOptions::new_unsorted();
let dict = get_row_encoding_dictionary(by.dtype());
Expand Down Expand Up @@ -268,7 +270,7 @@ pub fn _get_rows_encoded_arr(

pub fn _get_rows_encoded_ca_unordered(
name: PlSmallStr,
by: &[Series],
by: &[Column],
) -> PolarsResult<BinaryOffsetChunked> {
_get_rows_encoded_unordered(by)
.map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-core/src/frame/group_by/into_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ impl IntoGroupsProxy for ListChunked {
sorted: bool,
) -> PolarsResult<GroupsProxy> {
multithreaded &= POOL.current_num_threads() > 1;
let by = &[self.clone().into_series()];
let by = &[self.clone().into_column()];
let ca = if multithreaded {
encode_rows_vertical_par_unordered(by).unwrap()
} else {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-core/src/frame/group_by/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl DataFrame {
let by = by
.iter()
.filter(|s| !s.dtype().is_null())
.map(|c| c.as_materialized_series().clone())
.cloned()
.collect::<Vec<_>>();
if by.is_empty() {
let groups = if self.is_empty() {
Expand Down
9 changes: 2 additions & 7 deletions crates/polars-expr/src/hash_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use polars_core::prelude::row_encode::_get_rows_encoded_unordered;
use polars_core::prelude::PlRandomState;
use polars_core::series::Series;
use polars_utils::hashing::HashPartitioner;
use polars_utils::itertools::Itertools;
use polars_utils::vec::PushUnchecked;
use polars_utils::IdxSize;

Expand All @@ -20,12 +19,8 @@ pub enum HashKeys {
impl HashKeys {
pub fn from_df(df: &DataFrame, random_state: PlRandomState, force_row_encoding: bool) -> Self {
if df.width() > 1 || force_row_encoding {
let keys = df
.get_columns()
.iter()
.map(|c| c.as_materialized_series().clone())
.collect_vec();
let keys_encoded = _get_rows_encoded_unordered(&keys[..]).unwrap().into_array();
let keys = df.get_columns();
let keys_encoded = _get_rows_encoded_unordered(keys).unwrap().into_array();
assert!(keys_encoded.len() == df.height());

// TODO: use vechash? Not supported yet for lists.
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-ops/src/frame/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,9 +572,9 @@ fn prepare_keys_multiple(s: &[Series], join_nulls: bool) -> PolarsResult<BinaryO
.map(|s| {
let phys = s.to_physical_repr();
match phys.dtype() {
DataType::Float32 => phys.f32().unwrap().to_canonical().into_series(),
DataType::Float64 => phys.f64().unwrap().to_canonical().into_series(),
_ => phys.into_owned(),
DataType::Float32 => phys.f32().unwrap().to_canonical().into_column(),
DataType::Float64 => phys.f64().unwrap().to_canonical().into_column(),
_ => phys.into_owned().into_column(),
}
})
.collect::<Vec<_>>();
Expand Down
57 changes: 18 additions & 39 deletions crates/polars-python/src/dataframe/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ use polars::export::arrow::bitmap::MutableBitmap;
use polars::prelude::*;
#[cfg(feature = "pivot")]
use polars_lazy::frame::pivot::{pivot, pivot_stable};
use polars_row::RowEncodingOptions;
use pyo3::exceptions::PyIndexError;
use pyo3::prelude::*;
use pyo3::pybacked::PyBackedStr;
use pyo3::types::PyList;
use pyo3::IntoPyObjectExt;

use self::row_encode::get_row_encoding_dictionary;
use self::row_encode::{_get_rows_encoded_ca, _get_rows_encoded_ca_unordered};
use super::PyDataFrame;
use crate::conversion::Wrap;
use crate::error::PyPolarsErr;
Expand Down Expand Up @@ -684,45 +683,25 @@ impl PyDataFrame {
opts: Vec<(bool, bool, bool)>,
) -> PyResult<PySeries> {
py.allow_threads(|| {
let mut df = self.df.clone();
df.rechunk_mut();

let dicts = df
.get_columns()
.iter()
.map(|c| get_row_encoding_dictionary(c.dtype()))
.collect::<Vec<_>>();

assert_eq!(df.width(), opts.len());

let chunks = df
.get_columns()
.iter()
.map(|c| c.as_materialized_series().to_physical_repr().chunks()[0].to_boxed())
.collect::<Vec<_>>();
let opts = opts
.into_iter()
.map(|(descending, nulls_last, no_order)| {
let mut opt = RowEncodingOptions::default();

opt.set(RowEncodingOptions::DESCENDING, descending);
opt.set(RowEncodingOptions::NULLS_LAST, nulls_last);
opt.set(RowEncodingOptions::NO_ORDER, no_order);

opt
})
.collect::<Vec<_>>();

let rows = polars_row::convert_columns(df.height(), &chunks, &opts, &dicts);

Ok(unsafe {
Series::from_chunks_and_dtype_unchecked(
PlSmallStr::from_static("row_enc"),
vec![rows.into_array().boxed()],
&DataType::BinaryOffset,
let name = PlSmallStr::from_static("row_enc");
let is_unordered = opts.first().is_some_and(|(_, _, v)| *v);

let ca = if is_unordered {
_get_rows_encoded_ca_unordered(name, self.df.get_columns())
} else {
let descending = opts.iter().map(|(v, _, _)| *v).collect::<Vec<_>>();
let nulls_last = opts.iter().map(|(_, v, _)| *v).collect::<Vec<_>>();

_get_rows_encoded_ca(
name,
self.df.get_columns(),
descending.as_slice(),
nulls_last.as_slice(),
)
}
.into())
.map_err(PyPolarsErr::from)?;

Ok(ca.into_series().into())
})
}
}
18 changes: 9 additions & 9 deletions crates/polars-python/src/series/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,17 +582,17 @@ impl PySeries {
let columns = columns
.into_iter()
.zip(dtypes)
.map(|(arr, (name, dtype))| {
unsafe {
Series::from_chunks_and_dtype_unchecked(
PlSmallStr::from(name),
vec![arr],
&dtype.0,
)
}
.map(|(arr, (name, dtype))| unsafe {
Series::from_chunks_and_dtype_unchecked(
PlSmallStr::from(name),
vec![arr],
&dtype.0.to_physical(),
)
.into_column()
.from_physical_unchecked(&dtype.0)
})
.collect::<Vec<_>>();
.collect::<PolarsResult<Vec<_>>>()
.map_err(PyPolarsErr::from)?;
Ok(DataFrame::new(columns).map_err(PyPolarsErr::from)?.into())
})
}
Expand Down
Loading