Skip to content

Commit

Permalink
fix: Allow more than one dictionary encoded data page per column chun…
Browse files Browse the repository at this point in the history
…k in parquet

Fixes pola-rs#20141.
  • Loading branch information
coastalwhite committed Dec 17, 2024
1 parent 2f6c23b commit 3255ad5
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 38 deletions.
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/dictionary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ impl<K: DictionaryKey> Array for DictionaryArray<K> {

impl<K: DictionaryKey> Splitable for DictionaryArray<K> {
fn check_bound(&self, offset: usize) -> bool {
offset < self.len()
offset <= self.len()
}

unsafe fn _split_at_unchecked(&self, offset: usize) -> (Self, Self) {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/fixed_size_binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ impl Array for FixedSizeBinaryArray {

impl Splitable for FixedSizeBinaryArray {
fn check_bound(&self, offset: usize) -> bool {
offset < self.len()
offset <= self.len()
}

unsafe fn _split_at_unchecked(&self, offset: usize) -> (Self, Self) {
Expand Down
112 changes: 95 additions & 17 deletions crates/polars-parquet/src/arrow/write/dictionary.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use arrow::array::{
Array, BinaryViewArray, DictionaryArray, DictionaryKey, PrimitiveArray, Utf8ViewArray,
Array, BinaryViewArray, DictionaryArray, DictionaryKey, PrimitiveArray, Splitable,
Utf8ViewArray,
};
use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::buffer::Buffer;
Expand All @@ -20,7 +21,10 @@ use super::pages::PrimitiveNested;
use super::primitive::{
build_statistics as primitive_build_statistics, encode_plain as primitive_encode_plain,
};
use super::{binview, nested, EncodeNullability, Nested, WriteOptions};
use super::{
binview, estimated_byte_size_to_values_per_page, nested, EncodeNullability, Nested,
WriteOptions,
};
use crate::arrow::read::schema::is_nullable;
use crate::arrow::write::{slice_nested_leaf, utils};
use crate::parquet::encoding::hybrid_rle::encode;
Expand Down Expand Up @@ -348,13 +352,77 @@ fn normalized_validity<K: DictionaryKey>(array: &DictionaryArray<K>) -> Option<B
}
}

fn serialize_keys<K: DictionaryKey>(
fn serialize_keys_flat<K: DictionaryKey>(
array: &DictionaryArray<K>,
type_: PrimitiveType,
_statistics: Option<ParquetStatistics>,
options: WriteOptions,
) -> PolarsResult<Vec<Page>> {
// Parquet only accepts a single validity - we "&" the validities into a single one
// and ignore keys whose _value_ is null.
// It's important that we slice before normalizing.
let validity = normalized_validity(array);
let mut array = array.clone();
array.set_validity(validity);

let estimated_bits_per_value = array.values().len().next_power_of_two().trailing_zeros() + 1;
let estimated_bits_per_value = estimated_bits_per_value as usize;
let estimated_byte_size = (estimated_bits_per_value * array.len()).div_ceil(8);

let rows_per_page = estimated_byte_size_to_values_per_page(
array.len(),
estimated_byte_size,
options.data_page_size,
);

let num_pages = array.len().div_ceil(rows_per_page);
let mut data_pages = Vec::with_capacity(num_pages);

while !array.is_empty() {
let num_page_rows = rows_per_page.min(array.len());

let page_array;
(page_array, array) = array.split_at(num_page_rows);

let mut buffer = vec![];

let is_optional = is_nullable(&type_.field_info);
serialize_def_levels_simple(
page_array.validity(),
num_page_rows,
is_optional,
options,
&mut buffer,
)?;
let definition_levels_byte_length = buffer.len();

serialize_keys_values(&page_array, page_array.validity(), &mut buffer)?;

let page = utils::build_plain_page(
buffer,
num_page_rows, // num_values == num_rows when flat
num_page_rows,
page_array.null_count(),
0, // flat means no repetition values
definition_levels_byte_length,
None, // we don't support writing page level statistics atm
type_.clone(),
options,
Encoding::RleDictionary,
)?;
data_pages.push(Page::Data(page));
}

Ok(data_pages)
}

fn serialize_keys_nested<K: DictionaryKey>(
array: &DictionaryArray<K>,
type_: PrimitiveType,
nested: &[Nested],
statistics: Option<ParquetStatistics>,
options: WriteOptions,
) -> PolarsResult<Page> {
) -> PolarsResult<Vec<Page>> {
let mut buffer = vec![];

let (start, len) = slice_nested_leaf(nested);
Expand Down Expand Up @@ -382,25 +450,36 @@ fn serialize_keys<K: DictionaryKey>(

serialize_keys_values(&array, validity.as_ref(), &mut buffer)?;

let (num_values, num_rows) = if nested.len() == 1 {
(array.len(), array.len())
} else {
(nested::num_values(&nested), nested[0].len())
};
let num_values = array.len();
let num_rows = nested[0].len();

utils::build_plain_page(
let page = utils::build_plain_page(
buffer,
num_values,
num_rows,
array.null_count(),
array.clone().null_count(),
repetition_levels_byte_length,
definition_levels_byte_length,
statistics,
type_,
options,
Encoding::RleDictionary,
)
.map(Page::Data)
)?;
Ok(vec![Page::Data(page)])
}

fn serialize_keys<K: DictionaryKey>(
array: &DictionaryArray<K>,
type_: PrimitiveType,
nested: &[Nested],
statistics: Option<ParquetStatistics>,
options: WriteOptions,
) -> PolarsResult<Vec<Page>> {
if nested.len() == 1 {
serialize_keys_flat(array, type_, statistics, options)
} else {
serialize_keys_nested(array, type_, nested, statistics, options)
}
}

macro_rules! dyn_prim {
Expand Down Expand Up @@ -582,12 +661,11 @@ pub fn array_to_pages<K: DictionaryKey>(
}

// write DataPage pointing to DictPage
let data_page =
serialize_keys(array, type_, nested, statistics, options)?.unwrap_data();
let data_pages = serialize_keys(array, type_, nested, statistics, options)?;

Ok(DynIter::new(
[Page::Dict(dict_page), Page::Data(data_page)]
.into_iter()
std::iter::once(Page::Dict(dict_page))
.chain(data_pages)
.map(Ok),
))
},
Expand Down
1 change: 1 addition & 0 deletions crates/polars-parquet/src/arrow/write/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ impl<W: Write> FileWriter<W> {
parquet_schema,
FileWriteOptions {
version: options.version,
write_indexes: false,
write_statistics: options.has_statistics(),
},
created_by,
Expand Down
35 changes: 25 additions & 10 deletions crates/polars-parquet/src/arrow/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,29 @@ pub fn slice_parquet_array(
}
}

pub(crate) fn estimated_byte_size_to_estimated_num_pages(
estimated_byte_size: usize,
data_page_size: Option<usize>,
) -> usize {
const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
const MAX_PAGE_SIZE: usize = 2usize.pow(31) - 2usize.pow(25);

let page_size = data_page_size.unwrap_or(DEFAULT_PAGE_SIZE);
let page_size = page_size.min(MAX_PAGE_SIZE);

let estimated_amount_of_pages = estimated_byte_size.div_ceil(page_size);
estimated_amount_of_pages.max(1)
}

pub(crate) fn estimated_byte_size_to_values_per_page(
number_of_values: usize,
estimated_byte_size: usize,
data_page_size: Option<usize>,
) -> usize {
number_of_values
/ estimated_byte_size_to_estimated_num_pages(estimated_byte_size, data_page_size)
}

/// Get the length of [`Array`] that should be sliced.
pub fn get_max_length(nested: &[Nested]) -> usize {
let mut length = 0;
Expand Down Expand Up @@ -319,16 +342,8 @@ pub fn array_to_pages(
// note: this is not correct if the array is sliced - the estimation should happen on the
// primitive after sliced for parquet
let byte_size = estimated_bytes_size(primitive_array);

const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
let max_page_size = options.data_page_size.unwrap_or(DEFAULT_PAGE_SIZE);
let max_page_size = max_page_size.min(2usize.pow(31) - 2usize.pow(25)); // allowed maximum page size
let bytes_per_row = if number_of_rows == 0 {
0
} else {
((byte_size as f64) / (number_of_rows as f64)) as usize
};
let rows_per_page = (max_page_size / (bytes_per_row + 1)).max(1);
let rows_per_page =
estimated_byte_size_to_values_per_page(number_of_rows, byte_size, options.data_page_size);

let row_iter = (0..number_of_rows)
.step_by(rows_per_page)
Expand Down
1 change: 1 addition & 0 deletions crates/polars-parquet/src/arrow/write/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ where
parquet_schema.clone(),
ParquetWriteOptions {
version: options.version,
write_indexes: false,
write_statistics: options.has_statistics(),
},
created_by,
Expand Down
7 changes: 0 additions & 7 deletions crates/polars-parquet/src/parquet/page/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,6 @@ impl Page {
Self::Dict(page) => page.buffer.to_mut(),
}
}

pub(crate) fn unwrap_data(self) -> DataPage {
match self {
Self::Data(page) => page,
_ => panic!(),
}
}
}

/// A [`CompressedPage`] is a compressed, encoded representation of a Parquet page. It holds actual data
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-parquet/src/parquet/write/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl<W: Write> FileWriter<W> {
// compute file stats
let num_rows = self.row_groups.iter().map(|group| group.num_rows).sum();

if self.options.write_statistics {
if self.options.write_indexes {
// write column indexes (require page statistics)
self.row_groups
.iter_mut()
Expand Down
4 changes: 3 additions & 1 deletion crates/polars-parquet/src/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ pub type RowGroupIter<'a, E> = DynIter<'a, RowGroupIterColumns<'a, E>>;
/// Write options of different interfaces on this crate
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub struct WriteOptions {
/// Whether to write statistics, including indexes
/// Whether to write statistics
pub write_statistics: bool,
/// Whether to write indexes
pub write_indexes: bool,
/// Which Parquet version to use
pub version: Version,
}
Expand Down
2 changes: 2 additions & 0 deletions crates/polars/tests/it/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ fn test_column(column: &str, compression: CompressionOptions) -> ParquetResult<(

let options = WriteOptions {
write_statistics: true,
write_indexes: false,
version: Version::V1,
};

Expand Down Expand Up @@ -177,6 +178,7 @@ fn basic() -> ParquetResult<()> {

let options = WriteOptions {
write_statistics: false,
write_indexes: false,
version: Version::V1,
};

Expand Down
1 change: 1 addition & 0 deletions crates/polars/tests/it/io/parquet/write/sidecar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ fn basic() -> Result<(), ParquetError> {
schema.clone(),
WriteOptions {
write_statistics: true,
write_indexes: false,
version: Version::V2,
},
None,
Expand Down

0 comments on commit 3255ad5

Please sign in to comment.