Skip to content

Commit

Permalink
Round robin polling between tied winners in sort preserving merge (ap…
Browse files Browse the repository at this point in the history
…ache#13133)

* first draft

Signed-off-by: jayzhan211 <[email protected]>

* add data

Signed-off-by: jayzhan211 <[email protected]>

* fix benchmark

Signed-off-by: jayzhan211 <[email protected]>

* add more bencmark data

Signed-off-by: jayzhan211 <[email protected]>

* fix benchmark

Signed-off-by: jayzhan211 <[email protected]>

* fmt

Signed-off-by: jayzhan211 <[email protected]>

* get max size

Signed-off-by: jayzhan211 <[email protected]>

* add license

Signed-off-by: jayzhan211 <[email protected]>

* rm code for merge

Signed-off-by: jayzhan211 <[email protected]>

* cleanup

Signed-off-by: jayzhan211 <[email protected]>

* cleanup

Signed-off-by: jayzhan211 <[email protected]>

* update poll count only we have tie

Signed-off-by: jayzhan211 <[email protected]>

* upd comment

Signed-off-by: jayzhan211 <[email protected]>

* fix logic

Signed-off-by: jayzhan211 <[email protected]>

* configurable

Signed-off-by: jayzhan211 <[email protected]>

* fmt

Signed-off-by: jayzhan211 <[email protected]>

* add mem limit test

Signed-off-by: jayzhan211 <[email protected]>

* rm test

Signed-off-by: jayzhan211 <[email protected]>

* escape bracket

Signed-off-by: jayzhan211 <[email protected]>

* add test

Signed-off-by: jayzhan211 <[email protected]>

* rm per consumer record

Signed-off-by: jayzhan211 <[email protected]>

* repartition limit

Signed-off-by: jayzhan211 <[email protected]>

* add benchmark

Signed-off-by: jayzhan211 <[email protected]>

* cleanup

Signed-off-by: jayzhan211 <[email protected]>

* benchmark with parameter

Signed-off-by: jayzhan211 <[email protected]>

* only calculate consumer pool if the limit is set

Signed-off-by: jayzhan211 <[email protected]>

* combine eq and gt

Signed-off-by: jayzhan211 <[email protected]>

* review part 1

* Update merge.rs

* upd doc

Signed-off-by: jayzhan211 <[email protected]>

* no need index comparison

Signed-off-by: jayzhan211 <[email protected]>

* combine handle tie and eq check

Signed-off-by: jayzhan211 <[email protected]>

* upd doc

Signed-off-by: jayzhan211 <[email protected]>

* fmt

Signed-off-by: jayzhan211 <[email protected]>

* add more comment

Signed-off-by: jayzhan211 <[email protected]>

* remove flag

Signed-off-by: jayzhan211 <[email protected]>

* upd comment

Signed-off-by: jayzhan211 <[email protected]>

* Revert "remove flag"

This reverts commit 8d6c0a6.

* Revert "upd comment"

This reverts commit a18cba8.

* add more comment

Signed-off-by: jayzhan211 <[email protected]>

* add more comment

Signed-off-by: jayzhan211 <[email protected]>

* fmt

Signed-off-by: jayzhan211 <[email protected]>

* simpliy mem pool

Signed-off-by: jayzhan211 <[email protected]>

* clippy

Signed-off-by: jayzhan211 <[email protected]>

* Update merge.rs

* minor

* add comment

Signed-off-by: jayzhan211 <[email protected]>

---------

Signed-off-by: jayzhan211 <[email protected]>
Co-authored-by: berkaysynnada <[email protected]>
  • Loading branch information
2 people authored and wiedld committed Dec 13, 2024
1 parent 922b399 commit 0ee794c
Show file tree
Hide file tree
Showing 6 changed files with 501 additions and 24 deletions.
5 changes: 5 additions & 0 deletions datafusion/physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ rand = { workspace = true }
tokio = { workspace = true }

[dev-dependencies]
criterion = { version = "0.5", features = ["async_futures"] }
datafusion-functions-aggregate = { workspace = true }
rstest = { workspace = true }
rstest_reuse = "0.7.0"
Expand All @@ -76,3 +77,7 @@ tokio = { workspace = true, features = [
"fs",
"parking_lot",
] }

[[bench]]
harness = false
name = "spm"
145 changes: 145 additions & 0 deletions datafusion/physical-plan/benches/spm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use arrow::record_batch::RecordBatch;
use arrow_array::{ArrayRef, Int32Array, Int64Array, StringArray};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_plan::memory::MemoryExec;
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::{collect, ExecutionPlan};

use criterion::async_executor::FuturesExecutor;
use criterion::{black_box, criterion_group, criterion_main, Criterion};

fn generate_spm_for_round_robin_tie_breaker(
has_same_value: bool,
enable_round_robin_repartition: bool,
batch_count: usize,
partition_count: usize,
) -> SortPreservingMergeExec {
let row_size = 256;
let rb = if has_same_value {
let a: ArrayRef = Arc::new(Int32Array::from(vec![1; row_size]));
let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("a"); row_size]));
let c: ArrayRef = Arc::new(Int64Array::from_iter(vec![0; row_size]));
RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
} else {
let v = (0i32..row_size as i32).collect::<Vec<_>>();
let a: ArrayRef = Arc::new(Int32Array::from(v));

// Use alphanumeric characters
let charset: Vec<char> =
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
.chars()
.collect();

let mut strings = Vec::new();
for i in 0..256 {
let mut s = String::new();
s.push(charset[i % charset.len()]);
s.push(charset[(i / charset.len()) % charset.len()]);
strings.push(Some(s));
}

let b: ArrayRef = Arc::new(StringArray::from_iter(strings));

let v = (0i64..row_size as i64).collect::<Vec<_>>();
let c: ArrayRef = Arc::new(Int64Array::from_iter(v));
RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
};

let rbs = (0..batch_count).map(|_| rb.clone()).collect::<Vec<_>>();
let partitiones = vec![rbs.clone(); partition_count];

let schema = rb.schema();
let sort = vec![
PhysicalSortExpr {
expr: col("b", &schema).unwrap(),
options: Default::default(),
},
PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: Default::default(),
},
];

let exec = MemoryExec::try_new(&partitiones, schema, None).unwrap();
SortPreservingMergeExec::new(sort, Arc::new(exec))
.with_round_robin_repartition(enable_round_robin_repartition)
}

fn run_bench(
c: &mut Criterion,
has_same_value: bool,
enable_round_robin_repartition: bool,
batch_count: usize,
partition_count: usize,
description: &str,
) {
let task_ctx = TaskContext::default();
let task_ctx = Arc::new(task_ctx);

let spm = Arc::new(generate_spm_for_round_robin_tie_breaker(
has_same_value,
enable_round_robin_repartition,
batch_count,
partition_count,
)) as Arc<dyn ExecutionPlan>;

c.bench_function(description, |b| {
b.to_async(FuturesExecutor)
.iter(|| black_box(collect(Arc::clone(&spm), Arc::clone(&task_ctx))))
});
}

fn criterion_benchmark(c: &mut Criterion) {
let params = [
(true, false, "low_card_without_tiebreaker"), // low cardinality, no tie breaker
(true, true, "low_card_with_tiebreaker"), // low cardinality, with tie breaker
(false, false, "high_card_without_tiebreaker"), // high cardinality, no tie breaker
(false, true, "high_card_with_tiebreaker"), // high cardinality, with tie breaker
];

let batch_counts = [1, 25, 625];
let partition_counts = [2, 8, 32];

for &(has_same_value, enable_round_robin_repartition, cardinality_label) in &params {
for &batch_count in &batch_counts {
for &partition_count in &partition_counts {
let description = format!(
"{}_batch_count_{}_partition_count_{}",
cardinality_label, batch_count, partition_count
);
run_bench(
c,
has_same_value,
enable_round_robin_repartition,
batch_count,
partition_count,
&description,
);
}
}
}
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
54 changes: 54 additions & 0 deletions datafusion/physical-plan/src/sorts/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ pub trait CursorValues {
/// Returns true if `l[l_idx] == r[r_idx]`
fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool;

/// Returns true if `row[idx] == row[idx - 1]`
/// Given `idx` should be greater than 0
fn eq_to_previous(cursor: &Self, idx: usize) -> bool;

/// Returns comparison of `l[l_idx]` and `r[r_idx]`
fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering;
}
Expand Down Expand Up @@ -95,6 +99,16 @@ impl<T: CursorValues> Cursor<T> {
self.offset += 1;
t
}

pub fn is_eq_to_prev_one(&self, prev_cursor: Option<&Cursor<T>>) -> bool {
if self.offset > 0 {
self.is_eq_to_prev_row()
} else if let Some(prev_cursor) = prev_cursor {
self.is_eq_to_prev_row_in_prev_batch(prev_cursor)
} else {
false
}
}
}

impl<T: CursorValues> PartialEq for Cursor<T> {
Expand All @@ -103,6 +117,22 @@ impl<T: CursorValues> PartialEq for Cursor<T> {
}
}

impl<T: CursorValues> Cursor<T> {
fn is_eq_to_prev_row(&self) -> bool {
T::eq_to_previous(&self.values, self.offset)
}

fn is_eq_to_prev_row_in_prev_batch(&self, other: &Self) -> bool {
assert_eq!(self.offset, 0);
T::eq(
&self.values,
self.offset,
&other.values,
other.values.len() - 1,
)
}
}

impl<T: CursorValues> Eq for Cursor<T> {}

impl<T: CursorValues> PartialOrd for Cursor<T> {
Expand Down Expand Up @@ -156,6 +186,11 @@ impl CursorValues for RowValues {
l.rows.row(l_idx) == r.rows.row(r_idx)
}

fn eq_to_previous(cursor: &Self, idx: usize) -> bool {
assert!(idx > 0);
cursor.rows.row(idx) == cursor.rows.row(idx - 1)
}

fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering {
l.rows.row(l_idx).cmp(&r.rows.row(r_idx))
}
Expand Down Expand Up @@ -188,6 +223,11 @@ impl<T: ArrowNativeTypeOp> CursorValues for PrimitiveValues<T> {
l.0[l_idx].is_eq(r.0[r_idx])
}

fn eq_to_previous(cursor: &Self, idx: usize) -> bool {
assert!(idx > 0);
cursor.0[idx].is_eq(cursor.0[idx - 1])
}

fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering {
l.0[l_idx].compare(r.0[r_idx])
}
Expand Down Expand Up @@ -219,6 +259,11 @@ impl<T: OffsetSizeTrait> CursorValues for ByteArrayValues<T> {
l.value(l_idx) == r.value(r_idx)
}

fn eq_to_previous(cursor: &Self, idx: usize) -> bool {
assert!(idx > 0);
cursor.value(idx) == cursor.value(idx - 1)
}

fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering {
l.value(l_idx).cmp(r.value(r_idx))
}
Expand Down Expand Up @@ -284,6 +329,15 @@ impl<T: CursorValues> CursorValues for ArrayValues<T> {
}
}

fn eq_to_previous(cursor: &Self, idx: usize) -> bool {
assert!(idx > 0);
match (cursor.is_null(idx), cursor.is_null(idx - 1)) {
(true, true) => true,
(false, false) => T::eq(&cursor.values, idx, &cursor.values, idx - 1),
_ => false,
}
}

fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering {
match (l.is_null(l_idx), r.is_null(r_idx)) {
(true, true) => Ordering::Equal,
Expand Down
Loading

0 comments on commit 0ee794c

Please sign in to comment.