From 3ece7a736193a87941a00eb35f3001df282fd075 Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 10 Sep 2024 01:32:04 -1000 Subject: [PATCH] Fix parquet statistics for ListingTable and Utf8View with `schema_force_string_view`, rename config option to `schema_force_view_types` (#12232) * chore: move schema_force_string_view upwards to be listed with other reading props * refactor(12123): have file schema be merged on view types with table schema * test(12123): test for with, and without schema_force_string_view * test(12123): demonstrate current upstream failure when reading page stats * chore(12123): update config.md * chore: cleanup * chore(12123): temporarily remove test until next arrow release * chore(12123): rename all variables to force_view_types * refactor(12123): make interface ParquetFormat::with_force_view_types public * chore(12123): rename helper method which coerces the schema (not merging fields) * chore(12123): add dosc to ParquetFormat to clarify exactly how the view types are used * test(12123): cleanup tests to be more explicit with ForceViews enum * test(12123): update tests to pass now that latest arrow-rs release is in * fix: use proper naming on benchmark --- benchmarks/src/clickbench.rs | 2 +- benchmarks/src/tpch/run.rs | 6 +- benchmarks/src/util/options.rs | 2 +- datafusion/common/src/config.rs | 8 +- .../common/src/file_options/parquet_writer.rs | 7 +- .../core/src/datasource/file_format/mod.rs | 45 ++++ .../src/datasource/file_format/parquet.rs | 236 +++++++++++++++--- .../datasource/physical_plan/parquet/mod.rs | 34 ++- .../physical_plan/parquet/opener.rs | 11 +- datafusion/functions/benches/pad.rs | 4 +- datafusion/functions/benches/repeat.rs | 4 +- datafusion/functions/benches/substr.rs | 8 +- .../proto/datafusion_common.proto | 2 +- datafusion/proto-common/src/from_proto/mod.rs | 2 +- .../proto-common/src/generated/pbjson.rs | 26 +- .../proto-common/src/generated/prost.rs | 2 +- datafusion/proto-common/src/to_proto/mod.rs | 2 +- .../src/generated/datafusion_proto_common.rs | 2 +- .../proto/src/logical_plan/file_formats.rs | 4 +- .../test_files/information_schema.slt | 4 +- docs/source/user-guide/configs.md | 2 +- 21 files changed, 318 insertions(+), 95 deletions(-) diff --git a/benchmarks/src/clickbench.rs b/benchmarks/src/clickbench.rs index a0f051d17623..207da4020b58 100644 --- a/benchmarks/src/clickbench.rs +++ b/benchmarks/src/clickbench.rs @@ -121,7 +121,7 @@ impl RunOpt { .options_mut() .execution .parquet - .schema_force_string_view = self.common.string_view; + .schema_force_view_types = self.common.force_view_types; let ctx = SessionContext::new_with_config(config); self.register_hits(&ctx).await?; diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index d79eb65649bd..1a1f51f70065 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -124,7 +124,7 @@ impl RunOpt { .options_mut() .execution .parquet - .schema_force_string_view = self.common.string_view; + .schema_force_view_types = self.common.force_view_types; let ctx = SessionContext::new_with_config(config); // register tables @@ -345,7 +345,7 @@ mod tests { partitions: Some(2), batch_size: 8192, debug: false, - string_view: false, + force_view_types: false, }; let opt = RunOpt { query: Some(query), @@ -379,7 +379,7 @@ mod tests { partitions: Some(2), batch_size: 8192, debug: false, - string_view: false, + force_view_types: false, }; let opt = RunOpt { query: Some(query), diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index 02591e293272..efdb074b2461 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -41,7 +41,7 @@ pub struct CommonOpt { /// If true, will use StringView/BinaryViewArray instead of String/BinaryArray /// when reading ParquetFiles #[structopt(long)] - pub string_view: bool, + pub force_view_types: bool, } impl CommonOpt { diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index d77655e8a77f..1e1c5d5424b0 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -380,6 +380,10 @@ config_namespace! { /// the filters are applied in the same order as written in the query pub reorder_filters: bool, default = false + /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, + /// and `Binary/BinaryLarge` with `BinaryView`. + pub schema_force_view_types: bool, default = false + // The following options affect writing to parquet files // and map to parquet::file::properties::WriterProperties @@ -483,10 +487,6 @@ config_namespace! { /// writing out already in-memory data, such as from a cached /// data frame. pub maximum_buffered_record_batches_per_stream: usize, default = 2 - - /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, - /// and `Binary/BinaryLarge` with `BinaryView`. - pub schema_force_string_view: bool, default = false } } diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index e42fb96ed6a5..5d553d59da4e 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -175,7 +175,7 @@ impl ParquetOptions { maximum_parallel_row_group_writers: _, maximum_buffered_record_batches_per_stream: _, bloom_filter_on_read: _, // reads not used for writer props - schema_force_string_view: _, + schema_force_view_types: _, } = self; let mut builder = WriterProperties::builder() @@ -441,7 +441,7 @@ mod tests { maximum_buffered_record_batches_per_stream: defaults .maximum_buffered_record_batches_per_stream, bloom_filter_on_read: defaults.bloom_filter_on_read, - schema_force_string_view: defaults.schema_force_string_view, + schema_force_view_types: defaults.schema_force_view_types, } } @@ -542,8 +542,7 @@ mod tests { maximum_buffered_record_batches_per_stream: global_options_defaults .maximum_buffered_record_batches_per_stream, bloom_filter_on_read: global_options_defaults.bloom_filter_on_read, - schema_force_string_view: global_options_defaults - .schema_force_string_view, + schema_force_view_types: global_options_defaults.schema_force_view_types, }, column_specific_options, key_value_metadata, diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index d21464b74b53..1dcf480cf4f2 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -229,6 +229,51 @@ pub fn transform_schema_to_view(schema: &Schema) -> Schema { Schema::new_with_metadata(transformed_fields, schema.metadata.clone()) } +/// Coerces the file schema if the table schema uses a view type. +pub(crate) fn coerce_file_schema_to_view_type( + table_schema: &Schema, + file_schema: &Schema, +) -> Option { + let mut transform = false; + let table_fields: HashMap<_, _> = table_schema + .fields + .iter() + .map(|f| { + let dt = f.data_type(); + if dt.equals_datatype(&DataType::Utf8View) { + transform = true; + } + (f.name(), dt) + }) + .collect(); + if !transform { + return None; + } + + let transformed_fields: Vec> = file_schema + .fields + .iter() + .map( + |field| match (table_fields.get(field.name()), field.data_type()) { + (Some(DataType::Utf8View), DataType::Utf8) + | (Some(DataType::Utf8View), DataType::LargeUtf8) => Arc::new( + Field::new(field.name(), DataType::Utf8View, field.is_nullable()), + ), + (Some(DataType::BinaryView), DataType::Binary) + | (Some(DataType::BinaryView), DataType::LargeBinary) => Arc::new( + Field::new(field.name(), DataType::BinaryView, field.is_nullable()), + ), + _ => field.clone(), + }, + ) + .collect(); + + Some(Schema::new_with_metadata( + transformed_fields, + file_schema.metadata.clone(), + )) +} + #[cfg(test)] pub(crate) mod test_util { use std::ops::Range; diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 76e8ad9da559..2a862dd6dcb3 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -24,7 +24,10 @@ use std::sync::Arc; use super::write::demux::start_demuxer_task; use super::write::{create_writer, SharedBuffer}; -use super::{transform_schema_to_view, FileFormat, FileFormatFactory, FileScanConfig}; +use super::{ + coerce_file_schema_to_view_type, transform_schema_to_view, FileFormat, + FileFormatFactory, FileScanConfig, +}; use crate::arrow::array::RecordBatch; use crate::arrow::datatypes::{Fields, Schema, SchemaRef}; use crate::datasource::file_format::file_compression_type::FileCompressionType; @@ -227,6 +230,29 @@ impl ParquetFormat { pub fn options(&self) -> &TableParquetOptions { &self.options } + + /// Return `true` if should use view types. + /// + /// If this returns true, DataFusion will instruct the parquet reader + /// to read string / binary columns using view `StringView` or `BinaryView` + /// if the table schema specifies those types, regardless of any embedded metadata + /// that may specify an alternate Arrow type. The parquet reader is optimized + /// for reading `StringView` and `BinaryView` and such queries are significantly faster. + /// + /// If this returns false, the parquet reader will read the columns according to the + /// defaults or any embedded Arrow type information. This may result in reading + /// `StringArrays` and then casting to `StringViewArray` which is less efficient. + pub fn force_view_types(&self) -> bool { + self.options.global.schema_force_view_types + } + + /// If true, will use view types (StringView and BinaryView). + /// + /// Refer to [`Self::force_view_types`]. + pub fn with_force_view_types(mut self, use_views: bool) -> Self { + self.options.global.schema_force_view_types = use_views; + self + } } /// Clears all metadata (Schema level and field level) on an iterator @@ -317,12 +343,7 @@ impl FileFormat for ParquetFormat { Schema::try_merge(schemas) }?; - let schema = if state - .config_options() - .execution - .parquet - .schema_force_string_view - { + let schema = if self.force_view_types() { transform_schema_to_view(&schema) } else { schema @@ -515,10 +536,13 @@ pub fn statistics_from_parquet_meta_calc( statistics.total_byte_size = Precision::Exact(total_byte_size); let file_metadata = metadata.file_metadata(); - let file_schema = parquet_to_arrow_schema( + let mut file_schema = parquet_to_arrow_schema( file_metadata.schema_descr(), file_metadata.key_value_metadata(), )?; + if let Some(merged) = coerce_file_schema_to_view_type(&table_schema, &file_schema) { + file_schema = merged; + } statistics.column_statistics = if has_statistics { let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema); @@ -1249,8 +1273,8 @@ mod tests { use arrow_schema::{DataType, Field}; use async_trait::async_trait; use datafusion_common::cast::{ - as_binary_array, as_boolean_array, as_float32_array, as_float64_array, - as_int32_array, as_timestamp_nanosecond_array, + as_binary_array, as_binary_view_array, as_boolean_array, as_float32_array, + as_float64_array, as_int32_array, as_timestamp_nanosecond_array, }; use datafusion_common::config::ParquetOptions; use datafusion_common::ScalarValue; @@ -1271,8 +1295,12 @@ mod tests { use parquet::file::page_index::index::Index; use tokio::fs::File; - #[tokio::test] - async fn read_merged_batches() -> Result<()> { + enum ForceViews { + Yes, + No, + } + + async fn _run_read_merged_batches(force_views: ForceViews) -> Result<()> { let c1: ArrayRef = Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); @@ -1286,7 +1314,11 @@ mod tests { let session = SessionContext::new(); let ctx = session.state(); - let format = ParquetFormat::default(); + let force_views = match force_views { + ForceViews::Yes => true, + ForceViews::No => false, + }; + let format = ParquetFormat::default().with_force_view_types(force_views); let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap(); let stats = @@ -1316,6 +1348,14 @@ mod tests { Ok(()) } + #[tokio::test] + async fn read_merged_batches() -> Result<()> { + _run_read_merged_batches(ForceViews::No).await?; + _run_read_merged_batches(ForceViews::Yes).await?; + + Ok(()) + } + #[tokio::test] async fn is_schema_stable() -> Result<()> { let c1: ArrayRef = @@ -1446,8 +1486,7 @@ mod tests { } } - #[tokio::test] - async fn fetch_metadata_with_size_hint() -> Result<()> { + async fn _run_fetch_metadata_with_size_hint(force_views: ForceViews) -> Result<()> { let c1: ArrayRef = Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); @@ -1471,7 +1510,13 @@ mod tests { let session = SessionContext::new(); let ctx = session.state(); - let format = ParquetFormat::default().with_metadata_size_hint(Some(9)); + let force_views = match force_views { + ForceViews::Yes => true, + ForceViews::No => false, + }; + let format = ParquetFormat::default() + .with_metadata_size_hint(Some(9)) + .with_force_view_types(force_views); let schema = format .infer_schema(&ctx, &store.upcast(), &meta) .await @@ -1501,7 +1546,9 @@ mod tests { // ensure the requests were coalesced into a single request assert_eq!(store.request_count(), 1); - let format = ParquetFormat::default().with_metadata_size_hint(Some(size_hint)); + let format = ParquetFormat::default() + .with_metadata_size_hint(Some(size_hint)) + .with_force_view_types(force_views); let schema = format .infer_schema(&ctx, &store.upcast(), &meta) .await @@ -1536,6 +1583,14 @@ mod tests { Ok(()) } + #[tokio::test] + async fn fetch_metadata_with_size_hint() -> Result<()> { + _run_fetch_metadata_with_size_hint(ForceViews::No).await?; + _run_fetch_metadata_with_size_hint(ForceViews::Yes).await?; + + Ok(()) + } + #[tokio::test] async fn test_statistics_from_parquet_metadata_dictionary() -> Result<()> { // Data for column c_dic: ["a", "b", "c", "d"] @@ -1578,8 +1633,9 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_statistics_from_parquet_metadata() -> Result<()> { + async fn _run_test_statistics_from_parquet_metadata( + force_views: ForceViews, + ) -> Result<()> { // Data for column c1: ["Foo", null, "bar"] let c1: ArrayRef = Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); @@ -1597,28 +1653,42 @@ mod tests { let store = Arc::new(LocalFileSystem::new()) as _; let (files, _file_names) = store_parquet(vec![batch1, batch2], false).await?; - let state = SessionContext::new().state(); - let format = ParquetFormat::default(); + let force_views = match force_views { + ForceViews::Yes => true, + ForceViews::No => false, + }; + + let mut state = SessionContext::new().state(); + state = set_view_state(state, force_views); + let format = ParquetFormat::default().with_force_view_types(force_views); let schema = format.infer_schema(&state, &store, &files).await.unwrap(); let null_i64 = ScalarValue::Int64(None); - let null_utf8 = ScalarValue::Utf8(None); + let null_utf8 = if force_views { + ScalarValue::Utf8View(None) + } else { + ScalarValue::Utf8(None) + }; // Fetch statistics for first file let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[0], None).await?; let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; - // assert_eq!(stats.num_rows, Precision::Exact(3)); // column c1 let c1_stats = &stats.column_statistics[0]; assert_eq!(c1_stats.null_count, Precision::Exact(1)); + let expected_type = if force_views { + ScalarValue::Utf8View + } else { + ScalarValue::Utf8 + }; assert_eq!( c1_stats.max_value, - Precision::Exact(ScalarValue::Utf8(Some("bar".to_string()))) + Precision::Exact(expected_type(Some("bar".to_string()))) ); assert_eq!( c1_stats.min_value, - Precision::Exact(ScalarValue::Utf8(Some("Foo".to_string()))) + Precision::Exact(expected_type(Some("Foo".to_string()))) ); // column c2: missing from the file so the table treats all 3 rows as null let c2_stats = &stats.column_statistics[1]; @@ -1644,6 +1714,15 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_statistics_from_parquet_metadata() -> Result<()> { + _run_test_statistics_from_parquet_metadata(ForceViews::No).await?; + + _run_test_statistics_from_parquet_metadata(ForceViews::Yes).await?; + + Ok(()) + } + #[tokio::test] async fn read_small_batches() -> Result<()> { let config = SessionConfig::new().with_batch_size(2); @@ -1718,10 +1797,31 @@ mod tests { Ok(()) } - #[tokio::test] - async fn read_alltypes_plain_parquet() -> Result<()> { + fn set_view_state(mut state: SessionState, use_views: bool) -> SessionState { + let mut options = TableParquetOptions::default(); + options.global.schema_force_view_types = use_views; + state + .register_file_format( + Arc::new(ParquetFormatFactory::new_with_options(options)), + true, + ) + .expect("ok"); + state + } + + async fn _run_read_alltypes_plain_parquet( + force_views: ForceViews, + expected: &str, + ) -> Result<()> { + let force_views = match force_views { + ForceViews::Yes => true, + ForceViews::No => false, + }; + let session_ctx = SessionContext::new(); - let state = session_ctx.state(); + let mut state = session_ctx.state(); + state = set_view_state(state, force_views); + let task_ctx = state.task_ctx(); let projection = None; let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?; @@ -1733,8 +1833,20 @@ mod tests { .map(|f| format!("{}: {:?}", f.name(), f.data_type())) .collect(); let y = x.join("\n"); - assert_eq!( - "id: Int32\n\ + assert_eq!(expected, y); + + let batches = collect(exec, task_ctx).await?; + + assert_eq!(1, batches.len()); + assert_eq!(11, batches[0].num_columns()); + assert_eq!(8, batches[0].num_rows()); + + Ok(()) + } + + #[tokio::test] + async fn read_alltypes_plain_parquet() -> Result<()> { + let no_views = "id: Int32\n\ bool_col: Boolean\n\ tinyint_col: Int32\n\ smallint_col: Int32\n\ @@ -1744,15 +1856,21 @@ mod tests { double_col: Float64\n\ date_string_col: Binary\n\ string_col: Binary\n\ - timestamp_col: Timestamp(Nanosecond, None)", - y - ); + timestamp_col: Timestamp(Nanosecond, None)"; + _run_read_alltypes_plain_parquet(ForceViews::No, no_views).await?; - let batches = collect(exec, task_ctx).await?; - - assert_eq!(1, batches.len()); - assert_eq!(11, batches[0].num_columns()); - assert_eq!(8, batches[0].num_rows()); + let with_views = "id: Int32\n\ + bool_col: Boolean\n\ + tinyint_col: Int32\n\ + smallint_col: Int32\n\ + int_col: Int32\n\ + bigint_col: Int64\n\ + float_col: Float32\n\ + double_col: Float64\n\ + date_string_col: BinaryView\n\ + string_col: BinaryView\n\ + timestamp_col: Timestamp(Nanosecond, None)"; + _run_read_alltypes_plain_parquet(ForceViews::Yes, with_views).await?; Ok(()) } @@ -1889,7 +2007,9 @@ mod tests { #[tokio::test] async fn read_binary_alltypes_plain_parquet() -> Result<()> { let session_ctx = SessionContext::new(); - let state = session_ctx.state(); + let mut state = session_ctx.state(); + state = set_view_state(state, false); + let task_ctx = state.task_ctx(); let projection = Some(vec![9]); let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?; @@ -1913,6 +2033,35 @@ mod tests { Ok(()) } + #[tokio::test] + async fn read_binaryview_alltypes_plain_parquet() -> Result<()> { + let session_ctx = SessionContext::new(); + let mut state = session_ctx.state(); + state = set_view_state(state, true); + + let task_ctx = state.task_ctx(); + let projection = Some(vec![9]); + let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?; + + let batches = collect(exec, task_ctx).await?; + assert_eq!(1, batches.len()); + assert_eq!(1, batches[0].num_columns()); + assert_eq!(8, batches[0].num_rows()); + + let array = as_binary_view_array(batches[0].column(0))?; + let mut values: Vec<&str> = vec![]; + for i in 0..batches[0].num_rows() { + values.push(std::str::from_utf8(array.value(i)).unwrap()); + } + + assert_eq!( + "[\"0\", \"1\", \"0\", \"1\", \"0\", \"1\", \"0\", \"1\"]", + format!("{values:?}") + ); + + Ok(()) + } + #[tokio::test] async fn read_decimal_parquet() -> Result<()> { let session_ctx = SessionContext::new(); @@ -2047,8 +2196,13 @@ mod tests { limit: Option, ) -> Result> { let testdata = crate::test_util::parquet_test_data(); - let format = ParquetFormat::default(); - scan_format(state, &format, &testdata, file_name, projection, limit).await + + let format = state + .get_file_format_factory("parquet") + .map(|factory| factory.create(state, &Default::default()).unwrap()) + .unwrap_or(Arc::new(ParquetFormat::new())); + + scan_format(state, &*format, &testdata, file_name, projection, limit).await } fn build_ctx(store_url: &url::Url) -> Arc { diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index b2f86db742f2..54d4d7262a8e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -726,10 +726,6 @@ impl ExecutionPlan for ParquetExec { enable_page_index: self.enable_page_index(), enable_bloom_filter: self.bloom_filter_on_read(), schema_adapter_factory, - schema_force_string_view: self - .table_parquet_options - .global - .schema_force_string_view, }; let stream = @@ -2095,6 +2091,36 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_struct_filter_parquet_with_view_types() -> Result<()> { + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().to_str().unwrap().to_string() + "/test.parquet"; + write_file(&path); + + let ctx = SessionContext::new(); + + let mut options = TableParquetOptions::default(); + options.global.schema_force_view_types = true; + let opt = + ListingOptions::new(Arc::new(ParquetFormat::default().with_options(options))); + + ctx.register_listing_table("base_table", path, opt, None, None) + .await + .unwrap(); + let sql = "select * from base_table where name='test02'"; + let batch = ctx.sql(sql).await.unwrap().collect().await.unwrap(); + assert_eq!(batch.len(), 1); + let expected = [ + "+---------------------+----+--------+", + "| struct | id | name |", + "+---------------------+----+--------+", + "| {id: 4, name: aaa2} | 2 | test02 |", + "+---------------------+----+--------+", + ]; + crate::assert_batches_eq!(expected, &batch); + Ok(()) + } + fn write_file(file: &String) { let struct_fields = Fields::from(vec![ Field::new("id", DataType::Int64, false), diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs index 7e331293eb19..a12e60eb414a 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs @@ -17,7 +17,7 @@ //! [`ParquetOpener`] for opening Parquet files -use crate::datasource::file_format::transform_schema_to_view; +use crate::datasource::file_format::coerce_file_schema_to_view_type; use crate::datasource::physical_plan::parquet::page_filter::PagePruningAccessPlanFilter; use crate::datasource::physical_plan::parquet::row_group_filter::RowGroupAccessPlanFilter; use crate::datasource::physical_plan::parquet::{ @@ -57,7 +57,6 @@ pub(super) struct ParquetOpener { pub enable_page_index: bool, pub enable_bloom_filter: bool, pub schema_adapter_factory: Arc, - pub schema_force_string_view: bool, } impl FileOpener for ParquetOpener { @@ -93,7 +92,6 @@ impl FileOpener for ParquetOpener { ); let enable_bloom_filter = self.enable_bloom_filter; let limit = self.limit; - let schema_force_string_view = self.schema_force_string_view; Ok(Box::pin(async move { let options = ArrowReaderOptions::new().with_page_index(enable_page_index); @@ -101,9 +99,10 @@ impl FileOpener for ParquetOpener { let metadata = ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?; let mut schema = metadata.schema().clone(); - - if schema_force_string_view { - schema = Arc::new(transform_schema_to_view(&schema)); + // read with view types + if let Some(merged) = coerce_file_schema_to_view_type(&table_schema, &schema) + { + schema = Arc::new(merged); } let options = ArrowReaderOptions::new() diff --git a/datafusion/functions/benches/pad.rs b/datafusion/functions/benches/pad.rs index 0c496bc63347..71fa68762c1e 100644 --- a/datafusion/functions/benches/pad.rs +++ b/datafusion/functions/benches/pad.rs @@ -67,11 +67,11 @@ where fn create_args( size: usize, str_len: usize, - use_string_view: bool, + force_view_types: bool, ) -> Vec { let length_array = Arc::new(create_primitive_array::(size, 0.0, str_len)); - if !use_string_view { + if !force_view_types { let string_array = Arc::new(create_string_array_with_len::(size, 0.1, str_len)); let fill_array = Arc::new(create_string_array_with_len::(size, 0.1, str_len)); diff --git a/datafusion/functions/benches/repeat.rs b/datafusion/functions/benches/repeat.rs index e45313660ea2..5643ccf07133 100644 --- a/datafusion/functions/benches/repeat.rs +++ b/datafusion/functions/benches/repeat.rs @@ -31,13 +31,13 @@ fn create_args( size: usize, str_len: usize, repeat_times: i64, - use_string_view: bool, + force_view_types: bool, ) -> Vec { let number_array = Arc::new(Int64Array::from( (0..size).map(|_| repeat_times).collect::>(), )); - if use_string_view { + if force_view_types { let string_array = Arc::new(create_string_view_array_with_len(size, 0.1, str_len, false)); vec![ diff --git a/datafusion/functions/benches/substr.rs b/datafusion/functions/benches/substr.rs index 1a696520c3ad..90ba75c1e8a5 100644 --- a/datafusion/functions/benches/substr.rs +++ b/datafusion/functions/benches/substr.rs @@ -30,7 +30,7 @@ fn create_args_without_count( size: usize, str_len: usize, start_half_way: bool, - use_string_view: bool, + force_view_types: bool, ) -> Vec { let start_array = Arc::new(Int64Array::from( (0..size) @@ -44,7 +44,7 @@ fn create_args_without_count( .collect::>(), )); - if use_string_view { + if force_view_types { let string_array = Arc::new(create_string_view_array_with_len(size, 0.1, str_len, false)); vec![ @@ -66,7 +66,7 @@ fn create_args_with_count( size: usize, str_len: usize, count_max: usize, - use_string_view: bool, + force_view_types: bool, ) -> Vec { let start_array = Arc::new(Int64Array::from((0..size).map(|_| 1).collect::>())); @@ -75,7 +75,7 @@ fn create_args_with_count( (0..size).map(|_| count).collect::>(), )); - if use_string_view { + if force_view_types { let string_array = Arc::new(create_string_view_array_with_len(size, 0.1, str_len, false)); vec![ diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 51e94d2caaf4..d1506fcd64f0 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -493,7 +493,7 @@ message ParquetOptions { uint64 maximum_buffered_record_batches_per_stream = 25; // default = 2 bool bloom_filter_on_read = 26; // default = true bool bloom_filter_on_write = 27; // default = false - bool schema_force_string_view = 28; // default = false + bool schema_force_view_types = 28; // default = false oneof metadata_size_hint_opt { uint64 metadata_size_hint = 4; diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 45d275fb488e..d1b4374fc0e7 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -957,7 +957,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { allow_single_file_parallelism: value.allow_single_file_parallelism, maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as usize, maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize, - schema_force_string_view: value.schema_force_string_view, + schema_force_view_types: value.schema_force_view_types, }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index 78ba829f8c50..fa5d1f442754 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -4891,7 +4891,7 @@ impl serde::Serialize for ParquetOptions { if self.bloom_filter_on_write { len += 1; } - if self.schema_force_string_view { + if self.schema_force_view_types { len += 1; } if self.dictionary_page_size_limit != 0 { @@ -4977,8 +4977,8 @@ impl serde::Serialize for ParquetOptions { if self.bloom_filter_on_write { struct_ser.serialize_field("bloomFilterOnWrite", &self.bloom_filter_on_write)?; } - if self.schema_force_string_view { - struct_ser.serialize_field("schemaForceStringView", &self.schema_force_string_view)?; + if self.schema_force_view_types { + struct_ser.serialize_field("schemaForceViewTypes", &self.schema_force_view_types)?; } if self.dictionary_page_size_limit != 0 { #[allow(clippy::needless_borrow)] @@ -5097,8 +5097,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "bloomFilterOnRead", "bloom_filter_on_write", "bloomFilterOnWrite", - "schema_force_string_view", - "schemaForceStringView", + "schema_force_view_types", + "schemaForceViewTypes", "dictionary_page_size_limit", "dictionaryPageSizeLimit", "data_page_row_count_limit", @@ -5140,7 +5140,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { MaximumBufferedRecordBatchesPerStream, BloomFilterOnRead, BloomFilterOnWrite, - SchemaForceStringView, + schemaForceViewTypes, DictionaryPageSizeLimit, DataPageRowCountLimit, MaxRowGroupSize, @@ -5188,7 +5188,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "maximumBufferedRecordBatchesPerStream" | "maximum_buffered_record_batches_per_stream" => Ok(GeneratedField::MaximumBufferedRecordBatchesPerStream), "bloomFilterOnRead" | "bloom_filter_on_read" => Ok(GeneratedField::BloomFilterOnRead), "bloomFilterOnWrite" | "bloom_filter_on_write" => Ok(GeneratedField::BloomFilterOnWrite), - "schemaForceStringView" | "schema_force_string_view" => Ok(GeneratedField::SchemaForceStringView), + "schemaForceViewTypes" | "schema_force_view_types" => Ok(GeneratedField::schemaForceViewTypes), "dictionaryPageSizeLimit" | "dictionary_page_size_limit" => Ok(GeneratedField::DictionaryPageSizeLimit), "dataPageRowCountLimit" | "data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit), "maxRowGroupSize" | "max_row_group_size" => Ok(GeneratedField::MaxRowGroupSize), @@ -5234,7 +5234,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut maximum_buffered_record_batches_per_stream__ = None; let mut bloom_filter_on_read__ = None; let mut bloom_filter_on_write__ = None; - let mut schema_force_string_view__ = None; + let mut schema_force_view_types__ = None; let mut dictionary_page_size_limit__ = None; let mut data_page_row_count_limit__ = None; let mut max_row_group_size__ = None; @@ -5336,11 +5336,11 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } bloom_filter_on_write__ = Some(map_.next_value()?); } - GeneratedField::SchemaForceStringView => { - if schema_force_string_view__.is_some() { - return Err(serde::de::Error::duplicate_field("schemaForceStringView")); + GeneratedField::schemaForceViewTypes => { + if schema_force_view_types__.is_some() { + return Err(serde::de::Error::duplicate_field("schemaForceViewTypes")); } - schema_force_string_view__ = Some(map_.next_value()?); + schema_force_view_types__ = Some(map_.next_value()?); } GeneratedField::DictionaryPageSizeLimit => { if dictionary_page_size_limit__.is_some() { @@ -5442,7 +5442,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { maximum_buffered_record_batches_per_stream: maximum_buffered_record_batches_per_stream__.unwrap_or_default(), bloom_filter_on_read: bloom_filter_on_read__.unwrap_or_default(), bloom_filter_on_write: bloom_filter_on_write__.unwrap_or_default(), - schema_force_string_view: schema_force_string_view__.unwrap_or_default(), + schema_force_view_types: schema_force_view_types__.unwrap_or_default(), dictionary_page_size_limit: dictionary_page_size_limit__.unwrap_or_default(), data_page_row_count_limit: data_page_row_count_limit__.unwrap_or_default(), max_row_group_size: max_row_group_size__.unwrap_or_default(), diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index cb8f86a022a6..d6f982278d67 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -819,7 +819,7 @@ pub struct ParquetOptions { pub bloom_filter_on_write: bool, /// default = false #[prost(bool, tag = "28")] - pub schema_force_string_view: bool, + pub schema_force_view_types: bool, #[prost(uint64, tag = "12")] pub dictionary_page_size_limit: u64, #[prost(uint64, tag = "18")] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index e608caf0ecf8..ebb53ae7577c 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -830,7 +830,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { allow_single_file_parallelism: value.allow_single_file_parallelism, maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as u64, maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as u64, - schema_force_string_view: value.schema_force_string_view, + schema_force_view_types: value.schema_force_view_types, }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index dc8d0017d3fd..be12d5b8e30e 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -819,7 +819,7 @@ pub struct ParquetOptions { pub bloom_filter_on_write: bool, /// default = false #[prost(bool, tag = "28")] - pub schema_force_string_view: bool, + pub schema_force_view_types: bool, #[prost(uint64, tag = "12")] pub dictionary_page_size_limit: u64, #[prost(uint64, tag = "18")] diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 2e3476da6ac0..0f9f9d335afe 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -406,7 +406,7 @@ impl TableParquetOptionsProto { allow_single_file_parallelism: global_options.global.allow_single_file_parallelism, maximum_parallel_row_group_writers: global_options.global.maximum_parallel_row_group_writers as u64, maximum_buffered_record_batches_per_stream: global_options.global.maximum_buffered_record_batches_per_stream as u64, - schema_force_string_view: global_options.global.schema_force_string_view, + schema_force_view_types: global_options.global.schema_force_view_types, }), column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| { ParquetColumnSpecificOptions { @@ -496,7 +496,7 @@ impl From<&ParquetOptionsProto> for ParquetOptions { allow_single_file_parallelism: proto.allow_single_file_parallelism, maximum_parallel_row_group_writers: proto.maximum_parallel_row_group_writers as usize, maximum_buffered_record_batches_per_stream: proto.maximum_buffered_record_batches_per_stream as usize, - schema_force_string_view: proto.schema_force_string_view, + schema_force_view_types: proto.schema_force_view_types, } } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index beefa24ba4c6..7acdf25b6596 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -200,7 +200,7 @@ datafusion.execution.parquet.metadata_size_hint NULL datafusion.execution.parquet.pruning true datafusion.execution.parquet.pushdown_filters false datafusion.execution.parquet.reorder_filters false -datafusion.execution.parquet.schema_force_string_view false +datafusion.execution.parquet.schema_force_view_types false datafusion.execution.parquet.skip_metadata true datafusion.execution.parquet.statistics_enabled page datafusion.execution.parquet.write_batch_size 1024 @@ -290,7 +290,7 @@ datafusion.execution.parquet.metadata_size_hint NULL (reading) If specified, the datafusion.execution.parquet.pruning true (reading) If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file datafusion.execution.parquet.pushdown_filters false (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". datafusion.execution.parquet.reorder_filters false (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query -datafusion.execution.parquet.schema_force_string_view false (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. +datafusion.execution.parquet.schema_force_view_types false (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. datafusion.execution.parquet.skip_metadata true (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata datafusion.execution.parquet.statistics_enabled page (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in bytes diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index f7e25bd55850..4cb43fb89016 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -56,6 +56,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.metadata_size_hint | NULL | (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer | | datafusion.execution.parquet.pushdown_filters | false | (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". | | datafusion.execution.parquet.reorder_filters | false | (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | +| datafusion.execution.parquet.schema_force_view_types | false | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | | datafusion.execution.parquet.data_pagesize_limit | 1048576 | (writing) Sets best effort maximum size of data page in bytes | | datafusion.execution.parquet.write_batch_size | 1024 | (writing) Sets write_batch_size in bytes | | datafusion.execution.parquet.writer_version | 1.0 | (writing) Sets parquet writer version valid values are "1.0" and "2.0" | @@ -76,7 +77,6 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.allow_single_file_parallelism | true | (writing) Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns. | | datafusion.execution.parquet.maximum_parallel_row_group_writers | 1 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | | datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | -| datafusion.execution.parquet.schema_force_string_view | false | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | | datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | | datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | | datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. |