Skip to content

Commit

Permalink
fix(11770): add tests and preliminary code for where to encode the ar…
Browse files Browse the repository at this point in the history
…row_schema into the kv_meta
  • Loading branch information
wiedld committed Dec 20, 2024
1 parent 1ff2b41 commit 8165e05
Showing 1 changed file with 67 additions and 6 deletions.
73 changes: 67 additions & 6 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use crate::physical_plan::{

use arrow::compute::sum;
use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions};
use datafusion_common::file_options::parquet_writer::ParquetWriterOptions;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::stats::Precision;
use datafusion_common::{
Expand Down Expand Up @@ -74,9 +73,9 @@ use parquet::arrow::{
arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter,
};
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
use parquet::file::properties::WriterProperties;
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
use parquet::file::writer::SerializedFileWriter;
use parquet::format::FileMetaData;
use parquet::format::{FileMetaData, KeyValue};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task::JoinSet;
Expand Down Expand Up @@ -788,7 +787,26 @@ impl DataSink for ParquetSink {
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
let parquet_props = ParquetWriterOptions::try_from(&self.parquet_options)?;
let mut parquet_opts_builder =
WriterPropertiesBuilder::try_from(&self.parquet_options)?;
if self.parquet_options.global.include_arrow_metadata {
let kv_meta = self
.parquet_options
.key_value_metadata
.to_owned()
.drain()
.map(|(key, value)| KeyValue { key, value })
.collect::<Vec<_>>();
// let schema = self.config.output_schema();

// TODO: make this method public in parquet crate?
// https://github.com/apache/arrow-rs/blob/2908a80d9ca3e3fb0414e35b67856f1fb761304c/parquet/src/arrow/schema/mod.rs#L195
// add_encoded_arrow_schema_to_metadata(schema, &mut kv_meta);

parquet_opts_builder =
parquet_opts_builder.set_key_value_metadata(Some(kv_meta));
};
let parquet_props = parquet_opts_builder.build();

let object_store = context
.runtime_env()
Expand Down Expand Up @@ -832,7 +850,7 @@ impl DataSink for ParquetSink {
.create_async_arrow_writer(
&path,
Arc::clone(&object_store),
parquet_props.writer_options().clone(),
parquet_props.clone(),
)
.await?;
let mut reservation =
Expand Down Expand Up @@ -867,7 +885,7 @@ impl DataSink for ParquetSink {
writer,
rx,
schema,
props.writer_options(),
&props,
parallel_options_clone,
pool,
)
Expand Down Expand Up @@ -2389,6 +2407,49 @@ mod tests {
Ok(())
}

/// TODO: update this test once parquet method is exposed.
#[tokio::test]
async fn parquet_sink_write_insert_schema_into_metadata() -> Result<()> {
// expected kv metadata with schema
let expected_kv_meta = vec![
KeyValue { key: "ARROW:schema".to_string(), value: Some("/////5QAAAAQAAAAAAAKAAwACgAJAAQACgAAABAAAAAAAQQACAAIAAAABAAIAAAABAAAAAIAAAA8AAAABAAAANz///8UAAAADAAAAAAAAAUMAAAAAAAAAMz///8BAAAAYgAAABAAFAAQAAAADwAEAAAACAAQAAAAGAAAAAwAAAAAAAAFEAAAAAAAAAAEAAQABAAAAAEAAABhAAAA".to_string()) },
KeyValue {
key: "my-data".to_string(),
value: Some("stuff".to_string()),
},
KeyValue {
key: "my-data-bool-key".to_string(),
value: None,
},
];

// single threaded write
let opts = ParquetOptions {
allow_single_file_parallelism: false,
include_arrow_metadata: true,
..Default::default()
};
let parquet_sink =
create_written_parquet_sink_using_config("file:///", opts).await?;
let (_, file_metadata) = get_written(parquet_sink)?;
assert_file_metadata(file_metadata, expected_kv_meta.clone());

// multithreaded write
let opts = ParquetOptions {
allow_single_file_parallelism: true,
maximum_parallel_row_group_writers: 2,
maximum_buffered_record_batches_per_stream: 2,
include_arrow_metadata: true,
..Default::default()
};
let parquet_sink =
create_written_parquet_sink_using_config("file:///", opts).await?;
let (_, file_metadata) = get_written(parquet_sink)?;
assert_file_metadata(file_metadata, expected_kv_meta);

Ok(())
}

#[tokio::test]
async fn parquet_sink_write_with_extension() -> Result<()> {
let filename = "test_file.custom_ext";
Expand Down

0 comments on commit 8165e05

Please sign in to comment.