diff --git a/crates/polars-arrow/src/array/dictionary/mod.rs b/crates/polars-arrow/src/array/dictionary/mod.rs index 3f44dd604980..718cf7a324ce 100644 --- a/crates/polars-arrow/src/array/dictionary/mod.rs +++ b/crates/polars-arrow/src/array/dictionary/mod.rs @@ -422,7 +422,7 @@ impl Array for DictionaryArray { impl Splitable for DictionaryArray { fn check_bound(&self, offset: usize) -> bool { - offset < self.len() + offset <= self.len() } unsafe fn _split_at_unchecked(&self, offset: usize) -> (Self, Self) { diff --git a/crates/polars-arrow/src/array/fixed_size_binary/mod.rs b/crates/polars-arrow/src/array/fixed_size_binary/mod.rs index 8217964ea44d..de2c9fb8dbd8 100644 --- a/crates/polars-arrow/src/array/fixed_size_binary/mod.rs +++ b/crates/polars-arrow/src/array/fixed_size_binary/mod.rs @@ -232,7 +232,7 @@ impl Array for FixedSizeBinaryArray { impl Splitable for FixedSizeBinaryArray { fn check_bound(&self, offset: usize) -> bool { - offset < self.len() + offset <= self.len() } unsafe fn _split_at_unchecked(&self, offset: usize) -> (Self, Self) { diff --git a/crates/polars-parquet/src/arrow/write/dictionary.rs b/crates/polars-parquet/src/arrow/write/dictionary.rs index 487cbcd29def..bc3dd716d4dd 100644 --- a/crates/polars-parquet/src/arrow/write/dictionary.rs +++ b/crates/polars-parquet/src/arrow/write/dictionary.rs @@ -1,5 +1,6 @@ use arrow::array::{ - Array, BinaryViewArray, DictionaryArray, DictionaryKey, PrimitiveArray, Utf8ViewArray, + Array, BinaryViewArray, DictionaryArray, DictionaryKey, PrimitiveArray, Splitable, + Utf8ViewArray, }; use arrow::bitmap::{Bitmap, MutableBitmap}; use arrow::buffer::Buffer; @@ -20,7 +21,10 @@ use super::pages::PrimitiveNested; use super::primitive::{ build_statistics as primitive_build_statistics, encode_plain as primitive_encode_plain, }; -use super::{binview, nested, EncodeNullability, Nested, WriteOptions}; +use super::{ + binview, estimated_byte_size_to_values_per_page, nested, EncodeNullability, Nested, + WriteOptions, +}; use crate::arrow::read::schema::is_nullable; use crate::arrow::write::{slice_nested_leaf, utils}; use crate::parquet::encoding::hybrid_rle::encode; @@ -348,13 +352,77 @@ fn normalized_validity(array: &DictionaryArray) -> Option( +fn serialize_keys_flat( + array: &DictionaryArray, + type_: PrimitiveType, + _statistics: Option, + options: WriteOptions, +) -> PolarsResult> { + // Parquet only accepts a single validity - we "&" the validities into a single one + // and ignore keys whose _value_ is null. + // It's important that we slice before normalizing. + let validity = normalized_validity(array); + let mut array = array.clone(); + array.set_validity(validity); + + let estimated_bits_per_value = array.values().len().next_power_of_two().trailing_zeros() + 1; + let estimated_bits_per_value = estimated_bits_per_value as usize; + let estimated_byte_size = (estimated_bits_per_value * array.len()).div_ceil(8); + + let rows_per_page = estimated_byte_size_to_values_per_page( + array.len(), + estimated_byte_size, + options.data_page_size, + ); + + let num_pages = array.len().div_ceil(rows_per_page); + let mut data_pages = Vec::with_capacity(num_pages); + + while !array.is_empty() { + let num_page_rows = rows_per_page.min(array.len()); + + let page_array; + (page_array, array) = array.split_at(num_page_rows); + + let mut buffer = vec![]; + + let is_optional = is_nullable(&type_.field_info); + serialize_def_levels_simple( + page_array.validity(), + num_page_rows, + is_optional, + options, + &mut buffer, + )?; + let definition_levels_byte_length = buffer.len(); + + serialize_keys_values(&page_array, page_array.validity(), &mut buffer)?; + + let page = utils::build_plain_page( + buffer, + num_page_rows, // num_values == num_rows when flat + num_page_rows, + page_array.null_count(), + 0, // flat means no repetition values + definition_levels_byte_length, + None, // we don't support writing page level statistics atm + type_.clone(), + options, + Encoding::RleDictionary, + )?; + data_pages.push(Page::Data(page)); + } + + Ok(data_pages) +} + +fn serialize_keys_nested( array: &DictionaryArray, type_: PrimitiveType, nested: &[Nested], statistics: Option, options: WriteOptions, -) -> PolarsResult { +) -> PolarsResult> { let mut buffer = vec![]; let (start, len) = slice_nested_leaf(nested); @@ -382,25 +450,36 @@ fn serialize_keys( serialize_keys_values(&array, validity.as_ref(), &mut buffer)?; - let (num_values, num_rows) = if nested.len() == 1 { - (array.len(), array.len()) - } else { - (nested::num_values(&nested), nested[0].len()) - }; + let num_values = array.len(); + let num_rows = nested[0].len(); - utils::build_plain_page( + let page = utils::build_plain_page( buffer, num_values, num_rows, - array.null_count(), + array.clone().null_count(), repetition_levels_byte_length, definition_levels_byte_length, statistics, type_, options, Encoding::RleDictionary, - ) - .map(Page::Data) + )?; + Ok(vec![Page::Data(page)]) +} + +fn serialize_keys( + array: &DictionaryArray, + type_: PrimitiveType, + nested: &[Nested], + statistics: Option, + options: WriteOptions, +) -> PolarsResult> { + if nested.len() == 1 { + serialize_keys_flat(array, type_, statistics, options) + } else { + serialize_keys_nested(array, type_, nested, statistics, options) + } } macro_rules! dyn_prim { @@ -582,12 +661,11 @@ pub fn array_to_pages( } // write DataPage pointing to DictPage - let data_page = - serialize_keys(array, type_, nested, statistics, options)?.unwrap_data(); + let data_pages = serialize_keys(array, type_, nested, statistics, options)?; Ok(DynIter::new( - [Page::Dict(dict_page), Page::Data(data_page)] - .into_iter() + std::iter::once(Page::Dict(dict_page)) + .chain(data_pages) .map(Ok), )) }, diff --git a/crates/polars-parquet/src/arrow/write/file.rs b/crates/polars-parquet/src/arrow/write/file.rs index 0fd32deb5b07..b087aaa4cce5 100644 --- a/crates/polars-parquet/src/arrow/write/file.rs +++ b/crates/polars-parquet/src/arrow/write/file.rs @@ -61,6 +61,7 @@ impl FileWriter { parquet_schema, FileWriteOptions { version: options.version, + write_indexes: false, write_statistics: options.has_statistics(), }, created_by, diff --git a/crates/polars-parquet/src/arrow/write/mod.rs b/crates/polars-parquet/src/arrow/write/mod.rs index 87d849ae6db0..9cc8025ba856 100644 --- a/crates/polars-parquet/src/arrow/write/mod.rs +++ b/crates/polars-parquet/src/arrow/write/mod.rs @@ -265,6 +265,29 @@ pub fn slice_parquet_array( } } +pub(crate) fn estimated_byte_size_to_estimated_num_pages( + estimated_byte_size: usize, + data_page_size: Option, +) -> usize { + const DEFAULT_PAGE_SIZE: usize = 1024 * 1024; + const MAX_PAGE_SIZE: usize = 2usize.pow(31) - 2usize.pow(25); + + let page_size = data_page_size.unwrap_or(DEFAULT_PAGE_SIZE); + let page_size = page_size.min(MAX_PAGE_SIZE); + + let estimated_amount_of_pages = estimated_byte_size.div_ceil(page_size); + estimated_amount_of_pages.max(1) +} + +pub(crate) fn estimated_byte_size_to_values_per_page( + number_of_values: usize, + estimated_byte_size: usize, + data_page_size: Option, +) -> usize { + number_of_values + / estimated_byte_size_to_estimated_num_pages(estimated_byte_size, data_page_size) +} + /// Get the length of [`Array`] that should be sliced. pub fn get_max_length(nested: &[Nested]) -> usize { let mut length = 0; @@ -319,16 +342,8 @@ pub fn array_to_pages( // note: this is not correct if the array is sliced - the estimation should happen on the // primitive after sliced for parquet let byte_size = estimated_bytes_size(primitive_array); - - const DEFAULT_PAGE_SIZE: usize = 1024 * 1024; - let max_page_size = options.data_page_size.unwrap_or(DEFAULT_PAGE_SIZE); - let max_page_size = max_page_size.min(2usize.pow(31) - 2usize.pow(25)); // allowed maximum page size - let bytes_per_row = if number_of_rows == 0 { - 0 - } else { - ((byte_size as f64) / (number_of_rows as f64)) as usize - }; - let rows_per_page = (max_page_size / (bytes_per_row + 1)).max(1); + let rows_per_page = + estimated_byte_size_to_values_per_page(number_of_rows, byte_size, options.data_page_size); let row_iter = (0..number_of_rows) .step_by(rows_per_page) diff --git a/crates/polars-parquet/src/arrow/write/sink.rs b/crates/polars-parquet/src/arrow/write/sink.rs index 3c60ff9e9f70..be1da7c2e813 100644 --- a/crates/polars-parquet/src/arrow/write/sink.rs +++ b/crates/polars-parquet/src/arrow/write/sink.rs @@ -58,6 +58,7 @@ where parquet_schema.clone(), ParquetWriteOptions { version: options.version, + write_indexes: false, write_statistics: options.has_statistics(), }, created_by, diff --git a/crates/polars-parquet/src/parquet/page/mod.rs b/crates/polars-parquet/src/parquet/page/mod.rs index 400bdfc4a0f7..1012ae48c938 100644 --- a/crates/polars-parquet/src/parquet/page/mod.rs +++ b/crates/polars-parquet/src/parquet/page/mod.rs @@ -250,13 +250,6 @@ impl Page { Self::Dict(page) => page.buffer.to_mut(), } } - - pub(crate) fn unwrap_data(self) -> DataPage { - match self { - Self::Data(page) => page, - _ => panic!(), - } - } } /// A [`CompressedPage`] is a compressed, encoded representation of a Parquet page. It holds actual data diff --git a/crates/polars-parquet/src/parquet/write/file.rs b/crates/polars-parquet/src/parquet/write/file.rs index d46f85dd3138..095664cf60bb 100644 --- a/crates/polars-parquet/src/parquet/write/file.rs +++ b/crates/polars-parquet/src/parquet/write/file.rs @@ -188,7 +188,7 @@ impl FileWriter { // compute file stats let num_rows = self.row_groups.iter().map(|group| group.num_rows).sum(); - if self.options.write_statistics { + if self.options.write_indexes { // write column indexes (require page statistics) self.row_groups .iter_mut() diff --git a/crates/polars-parquet/src/parquet/write/mod.rs b/crates/polars-parquet/src/parquet/write/mod.rs index e8f92b46e2eb..8bc6a29c5d87 100644 --- a/crates/polars-parquet/src/parquet/write/mod.rs +++ b/crates/polars-parquet/src/parquet/write/mod.rs @@ -28,8 +28,10 @@ pub type RowGroupIter<'a, E> = DynIter<'a, RowGroupIterColumns<'a, E>>; /// Write options of different interfaces on this crate #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub struct WriteOptions { - /// Whether to write statistics, including indexes + /// Whether to write statistics pub write_statistics: bool, + /// Whether to write indexes + pub write_indexes: bool, /// Which Parquet version to use pub version: Version, } diff --git a/crates/polars/tests/it/io/parquet/write/mod.rs b/crates/polars/tests/it/io/parquet/write/mod.rs index e98a0223937f..1b090c2ca1fc 100644 --- a/crates/polars/tests/it/io/parquet/write/mod.rs +++ b/crates/polars/tests/it/io/parquet/write/mod.rs @@ -52,6 +52,7 @@ fn test_column(column: &str, compression: CompressionOptions) -> ParquetResult<( let options = WriteOptions { write_statistics: true, + write_indexes: false, version: Version::V1, }; @@ -177,6 +178,7 @@ fn basic() -> ParquetResult<()> { let options = WriteOptions { write_statistics: false, + write_indexes: false, version: Version::V1, }; diff --git a/crates/polars/tests/it/io/parquet/write/sidecar.rs b/crates/polars/tests/it/io/parquet/write/sidecar.rs index 00f4397ba6f4..b26d7a6be961 100644 --- a/crates/polars/tests/it/io/parquet/write/sidecar.rs +++ b/crates/polars/tests/it/io/parquet/write/sidecar.rs @@ -20,6 +20,7 @@ fn basic() -> Result<(), ParquetError> { schema.clone(), WriteOptions { write_statistics: true, + write_indexes: false, version: Version::V2, }, None,