Skip to content

Commit

Permalink
Merge commit '202f415811c0a559dde63108be967855844c14cb' into chunchun…
Browse files Browse the repository at this point in the history
…/update-df-apr-week-1-2
  • Loading branch information
appletreeisyellow committed Apr 22, 2024
2 parents 7c8d77a + 202f415 commit 555c388
Show file tree
Hide file tree
Showing 7 changed files with 510 additions and 134 deletions.
31 changes: 24 additions & 7 deletions datafusion/expr/src/udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,11 @@ impl AggregateUDF {
self.inner.accumulator(acc_args)
}

/// Return the fields of the intermediate state used by this aggregator, given
/// its state name, value type and ordering fields. See [`AggregateUDFImpl::state_fields`]
/// for more details. Supports multi-phase aggregations
/// Return the fields used to store the intermediate state for this aggregator, given
/// the name of the aggregate, value type and ordering fields. See [`AggregateUDFImpl::state_fields`]
/// for more details.
///
/// This is used to support multi-phase aggregations
pub fn state_fields(
&self,
name: &str,
Expand Down Expand Up @@ -283,13 +285,28 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
/// `acc_args`: the arguments to the accumulator. See [`AccumulatorArgs`] for more details.
fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>>;

/// Return the fields of the intermediate state.
/// Return the fields used to store the intermediate state of this accumulator.
///
/// # Arguments:
/// 1. `name`: the name of the expression (e.g. AVG, SUM, etc)
/// 2. `value_type`: Aggregate's aggregate's output (returned by [`Self::return_type`])
/// 3. `ordering_fields`: the fields used to order the input arguments, if any.
/// Empty if no ordering expression is provided.
///
/// # Notes:
///
/// name: the name of the state
/// The default implementation returns a single state field named `name`
/// with the same type as `value_type`. This is suitable for aggregates such
/// as `SUM` or `MIN` where partial state can be combined by applying the
/// same aggregate.
///
/// value_type: the type of the value, it should be the result of the `return_type`
/// For aggregates such as `AVG` where the partial state is more complex
/// (e.g. a COUNT and a SUM), this method is used to define the additional
/// fields.
///
/// ordering_fields: the fields used for ordering, empty if no ordering expression is provided
/// The name of the fields must be unique within the query and thus should
/// be derived from `name`. See [`format_state_name`] for a utility function
/// to generate a unique name.
fn state_fields(
&self,
name: &str,
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ regex = { version = "1.8", optional = true }
sha2 = { version = "^0.10.1", optional = true }

[dev-dependencies]
arrow = { workspace = true, features = ["test_utils"] }
criterion = "0.5"
rand = { workspace = true }
rstest = { workspace = true }
Expand All @@ -81,3 +82,7 @@ tokio = { workspace = true, features = ["rt-multi-thread"] }
[[bench]]
harness = false
name = "in_list"

[[bench]]
harness = false
name = "concat"
47 changes: 47 additions & 0 deletions datafusion/physical-expr/benches/concat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::util::bench_util::create_string_array_with_len;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use datafusion_common::ScalarValue;
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr::string_expressions::concat;
use std::sync::Arc;

fn create_args(size: usize, str_len: usize) -> Vec<ColumnarValue> {
let array = Arc::new(create_string_array_with_len::<i32>(size, 0.2, str_len));
let scalar = ScalarValue::Utf8(Some(", ".to_string()));
vec![
ColumnarValue::Array(array.clone()),
ColumnarValue::Scalar(scalar),
ColumnarValue::Array(array),
]
}

fn criterion_benchmark(c: &mut Criterion) {
for size in [1024, 4096, 8192] {
let args = create_args(size, 32);
let mut group = c.benchmark_group("concat function");
group.bench_function(BenchmarkId::new("concat", size), |b| {
b.iter(|| criterion::black_box(concat(&args).unwrap()))
});
group.finish();
}
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
6 changes: 3 additions & 3 deletions datafusion/physical-expr/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,9 @@ pub fn create_physical_fun(
// string functions
BuiltinScalarFunction::Coalesce => Arc::new(conditional_expressions::coalesce),
BuiltinScalarFunction::Concat => Arc::new(string_expressions::concat),
BuiltinScalarFunction::ConcatWithSeparator => Arc::new(|args| {
make_scalar_function_inner(string_expressions::concat_ws)(args)
}),
BuiltinScalarFunction::ConcatWithSeparator => {
Arc::new(string_expressions::concat_ws)
}
BuiltinScalarFunction::InitCap => Arc::new(|args| match args[0].data_type() {
DataType::Utf8 => {
make_scalar_function_inner(string_expressions::initcap::<i32>)(args)
Expand Down
Loading

0 comments on commit 555c388

Please sign in to comment.