Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Allow more than one dictionary encoded data page per column chunk in parquet #20148

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/dictionary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ impl<K: DictionaryKey> Array for DictionaryArray<K> {

impl<K: DictionaryKey> Splitable for DictionaryArray<K> {
fn check_bound(&self, offset: usize) -> bool {
offset < self.len()
offset <= self.len()
}

unsafe fn _split_at_unchecked(&self, offset: usize) -> (Self, Self) {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/fixed_size_binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ impl Array for FixedSizeBinaryArray {

impl Splitable for FixedSizeBinaryArray {
fn check_bound(&self, offset: usize) -> bool {
offset < self.len()
offset <= self.len()
}

unsafe fn _split_at_unchecked(&self, offset: usize) -> (Self, Self) {
Expand Down
8 changes: 8 additions & 0 deletions crates/polars-compute/src/unique/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ impl RangedUniqueKernel for BooleanUniqueKernelState {
self.seen == 0b111
}

fn has_seen_all_ignore_null(&self) -> bool {
self.seen & 0b011 == 0b011
}

fn has_seen_null(&self) -> bool {
self.seen & 0b100 != 0
}

fn append(&mut self, array: &Self::Array) {
if array.len() == 0 {
return;
Expand Down
52 changes: 50 additions & 2 deletions crates/polars-compute/src/unique/dictionary.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use arrow::array::{Array, DictionaryArray};
use arrow::datatypes::ArrowDataType;
use arrow::scalar::Scalar;

use super::{PrimitiveRangedUniqueState, RangedUniqueKernel};
use crate::filter::filter_with_bitmap;
use crate::min_max::dyn_array_min_max_propagate_nan;

/// A specialized unique kernel for [`DictionaryArray`] for when all values are in a small known
/// range.
Expand All @@ -13,21 +16,66 @@ pub struct DictionaryRangedUniqueState {
impl DictionaryRangedUniqueState {
pub fn new(values: Box<dyn Array>) -> Self {
Self {
key_state: PrimitiveRangedUniqueState::new(0, values.len() as u32 + 1),
key_state: PrimitiveRangedUniqueState::new(0, values.len().saturating_sub(1) as u32),
values,
}
}

pub fn key_state(&mut self) -> &mut PrimitiveRangedUniqueState<u32> {
&mut self.key_state
}

pub fn values(&self) -> &dyn Array {
self.values.as_ref()
}

pub fn seen_values(&self) -> Box<dyn Array> {
if self.has_seen_all_ignore_null() {
return self.values().to_boxed();
}

filter_with_bitmap(self.values(), &self.key_state.to_bitmap())
}

pub fn value_min_max(&self) -> Option<(Box<dyn Scalar>, Box<dyn Scalar>)> {
if self.has_seen_all_ignore_null() {
return dyn_array_min_max_propagate_nan(self.values());
}

let validity = match self.values().validity() {
None => self.key_state.to_bitmap(),
Some(v) => v & &self.key_state.to_bitmap(),
};
let values = self.values().with_validity(Some(validity));
dyn_array_min_max_propagate_nan(values.as_ref())
}

pub fn key_min_max(&self) -> Option<(u32, u32)> {
if self.values.is_empty() {
return None;
}

self.key_state.min_max()
}
}

impl RangedUniqueKernel for DictionaryRangedUniqueState {
type Array = DictionaryArray<u32>;

fn has_seen_all(&self) -> bool {
self.key_state.has_seen_all()
if self.values().is_empty() {
self.has_seen_null()
} else {
self.key_state.has_seen_all()
}
}

fn has_seen_all_ignore_null(&self) -> bool {
self.values().is_empty() || self.key_state.has_seen_all_ignore_null()
}

fn has_seen_null(&self) -> bool {
self.key_state.has_seen_null()
}

fn append(&mut self, array: &Self::Array) {
Expand Down
4 changes: 4 additions & 0 deletions crates/polars-compute/src/unique/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ pub trait RangedUniqueKernel {

/// Returns whether all the values in the whole range are in the state
fn has_seen_all(&self) -> bool;
/// Returns whether all the values in the whole range are in the state ignoring nulls.
fn has_seen_all_ignore_null(&self) -> bool;
/// Returns whether null has been seen
fn has_seen_null(&self) -> bool;

/// Append an `Array`'s values to the `State`
fn append(&mut self, array: &Self::Array);
Expand Down
100 changes: 74 additions & 26 deletions crates/polars-compute/src/unique/primitive.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::ops::{Add, RangeInclusive, Sub};

use arrow::array::PrimitiveArray;
use arrow::bitmap;
use arrow::bitmap::bitmask::BitMask;
use arrow::bitmap::MutableBitmap;
use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::datatypes::ArrowDataType;
use arrow::types::NativeType;
use num_traits::{FromPrimitive, ToPrimitive};
Expand All @@ -14,6 +15,7 @@ use super::RangedUniqueKernel;
/// range.
pub struct PrimitiveRangedUniqueState<T: NativeType> {
seen: Seen,
has_seen_null: bool,
range: RangeInclusive<T>,
}

Expand All @@ -22,6 +24,14 @@ enum Seen {
Large(MutableBitmap),
}

fn has_seen_up_to_u128(v: u128, size: usize) -> bool {
if size == 128 {
!v == 0
} else {
v == ((1 << size) - 1)
}
}

impl Seen {
pub fn from_size(size: usize) -> Self {
if size <= 128 {
Expand All @@ -38,10 +48,10 @@ impl Seen {
}
}

fn has_seen_null(&self, size: usize) -> bool {
match self {
Self::Small(v) => v >> (size - 1) != 0,
Self::Large(v) => v.get(size - 1),
fn has_seen_up_to(&self, size: usize) -> bool {
match &self {
Seen::Small(v) => has_seen_up_to_u128(*v, size),
Seen::Large(v) => BitMask::new(v.as_slice(), 0, size).unset_bits() == 0,
}
}
}
Expand All @@ -54,11 +64,10 @@ where
let size = (max_value - min_value).to_usize().unwrap();
// Range is inclusive
let size = size + 1;
// One value is left for null
let size = size + 1;

Self {
seen: Seen::from_size(size),
has_seen_null: false,
range: min_value..=max_value,
}
}
Expand All @@ -69,6 +78,46 @@ where
.unwrap()
+ 1
}

pub fn to_bitmap(&self) -> Bitmap {
#[cfg(not(target_endian = "little"))]
compile_error!("This assumes little-endian");

match &self.seen {
Seen::Small(v) => {
Bitmap::from_u8_slice(bytemuck::must_cast_ref::<u128, [u8; 16]>(v), self.size())
},
Seen::Large(v) => v.clone().freeze(),
}
}

pub fn min_max(&self) -> Option<(T, T)> {
let size = self.size();

let (from_min, from_max) = match &self.seen {
Seen::Small(v) => (
v.trailing_zeros() as usize,
v.leading_zeros() as usize - (128 - size),
),
Seen::Large(v) => {
let slice = v.as_slice();

(
bitmap::utils::leading_zeros(slice, 0, size),
bitmap::utils::trailing_zeros(slice, 0, size),
)
},
};

if from_min >= size {
return None;
}

let min = *self.range.start() + T::from_usize(from_min).unwrap();
let max = *self.range.end() - T::from_usize(from_max).unwrap();

Some((min, max))
}
}

impl<T: NativeType> RangedUniqueKernel for PrimitiveRangedUniqueState<T>
Expand All @@ -78,12 +127,15 @@ where
type Array = PrimitiveArray<T>;

fn has_seen_all(&self) -> bool {
let size = self.size();
match &self.seen {
Seen::Small(v) if size == 128 => !v == 0,
Seen::Small(v) => *v == ((1 << size) - 1),
Seen::Large(v) => BitMask::new(v.as_slice(), 0, size).unset_bits() == 0,
}
self.has_seen_null && self.has_seen_all_ignore_null()
}

fn has_seen_all_ignore_null(&self) -> bool {
self.seen.has_seen_up_to(self.size())
}

fn has_seen_null(&self) -> bool {
self.has_seen_null
}

fn append(&mut self, array: &Self::Array) {
Expand All @@ -98,7 +150,7 @@ where
match self.seen {
Seen::Small(ref mut seen) => {
// Check every so often whether we have already seen all the values.
while *seen != ((1 << (size - 1)) - 1) && i < values.len() {
while !has_seen_up_to_u128(*seen, size) && i < values.len() {
for v in values[i..].iter().take(STEP_SIZE) {
if cfg!(debug_assertions) {
assert!(TotalOrd::tot_ge(v, self.range.start()));
Expand All @@ -115,7 +167,7 @@ where
},
Seen::Large(ref mut seen) => {
// Check every so often whether we have already seen all the values.
while BitMask::new(seen.as_slice(), 0, size - 1).unset_bits() > 0
while BitMask::new(seen.as_slice(), 0, size).unset_bits() > 0
&& i < values.len()
{
for v in values[i..].iter().take(STEP_SIZE) {
Expand All @@ -136,11 +188,10 @@ where
},
Some(_) => {
let iter = array.non_null_values_iter();
self.has_seen_null = true;

match self.seen {
Seen::Small(ref mut seen) => {
*seen |= 1 << (size - 1);

for v in iter {
if cfg!(debug_assertions) {
assert!(TotalOrd::tot_ge(&v, self.range.start()));
Expand All @@ -153,8 +204,6 @@ where
}
},
Seen::Large(ref mut seen) => {
seen.set(size - 1, true);

for v in iter {
if cfg!(debug_assertions) {
assert!(TotalOrd::tot_ge(&v, self.range.start()));
Expand Down Expand Up @@ -186,10 +235,8 @@ where
}

fn finalize_unique(self) -> Self::Array {
let size = self.size();
let seen = self.seen;

let has_null = seen.has_seen_null(size);
let num_values = seen.num_seen();
let mut values = Vec::with_capacity(num_values);

Expand All @@ -212,12 +259,13 @@ where
},
}

let validity = if has_null {
let validity = if self.has_seen_null {
let mut validity = MutableBitmap::new();
validity.extend_constant(values.len() - 1, true);

// Push the null
validity.push(false);
// The null has already been pushed.
*values.last_mut().unwrap() = T::zeroed();
values.push(T::zeroed());
Some(validity.freeze())
} else {
None
Expand All @@ -227,10 +275,10 @@ where
}

fn finalize_n_unique(&self) -> usize {
self.seen.num_seen()
self.seen.num_seen() + usize::from(self.has_seen_null)
}

fn finalize_n_unique_non_null(&self) -> usize {
self.seen.num_seen() - usize::from(self.seen.has_seen_null(self.size()))
self.seen.num_seen()
}
}
1 change: 0 additions & 1 deletion crates/polars-parquet/src/arrow/write/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,5 @@ mod basic;
mod nested;

pub use basic::array_to_page;
pub(crate) use basic::{build_statistics, encode_plain};
pub(super) use basic::{encode_non_null_values, ord_binary};
pub use nested::array_to_page as nested_array_to_page;
2 changes: 1 addition & 1 deletion crates/polars-parquet/src/arrow/write/binview/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod basic;
mod nested;

pub(crate) use basic::{array_to_page, build_statistics, encode_plain};
pub(crate) use basic::{array_to_page, encode_plain};
pub use nested::array_to_page as nested_array_to_page;
Loading
Loading