Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patched DataFusion, Oct 30, 2024 (almost) #44

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,26 @@ impl Statistics {
self
}

/// Project the statistics to the given column indices.
///
/// For example, if we had statistics for columns `{"a", "b", "c"}`,
/// projecting to `vec![2, 1]` would return statistics for columns `{"c",
/// "b"}`.
pub fn project(mut self, projection: Option<&Vec<usize>>) -> Self {
let Some(projection) = projection else {
return self;
};

// todo: it would be nice to avoid cloning column statistics if
// possible (e.g. if the projection did not contain duplicates)
self.column_statistics = projection
.iter()
.map(|&i| self.column_statistics[i].clone())
.collect();

self
}

/// Calculates the statistics after `fetch` and `skip` operations apply.
/// Here, `self` denotes per-partition statistics. Use the `n_partitions`
/// parameter to compute global statistics in a multi-partition setting.
Expand Down
10 changes: 10 additions & 0 deletions datafusion/core/src/physical_optimizer/sanity_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};

use datafusion_physical_expr_common::sort_expr::format_physical_sort_requirement_list;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::union::UnionExec;
use itertools::izip;

/// The SanityCheckPlan rule rejects the following query plans:
Expand Down Expand Up @@ -126,6 +128,14 @@ pub fn check_plan_sanity(
plan.required_input_ordering().iter(),
plan.required_input_distribution().iter()
) {
// TEMP HACK WORKAROUND https://github.com/apache/datafusion/issues/11492
if child.as_any().downcast_ref::<UnionExec>().is_some() {
continue;
}
if child.as_any().downcast_ref::<SortExec>().is_some() {
continue;
}

let child_eq_props = child.equivalence_properties();
if let Some(sort_req) = sort_req {
if !child_eq_props.ordering_satisfy_requirement(sort_req) {
Expand Down
7 changes: 6 additions & 1 deletion datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,12 @@ impl ExecutionPlan for FilterExec {
/// The output statistics of a filtering operation can be estimated if the
/// predicate's selectivity value can be determined for the incoming data.
fn statistics(&self) -> Result<Statistics> {
Self::statistics_helper(&self.input, self.predicate(), self.default_selectivity)
let stats = Self::statistics_helper(
&self.input,
self.predicate(),
self.default_selectivity,
)?;
Ok(stats.project(self.projection.as_ref()))
}

fn cardinality_effect(&self) -> CardinalityEffect {
Expand Down
13 changes: 2 additions & 11 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,24 +779,15 @@ impl ExecutionPlan for HashJoinExec {
// TODO stats: it is not possible in general to know the output size of joins
// There are some special cases though, for example:
// - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)`
let mut stats = estimate_join_statistics(
let stats = estimate_join_statistics(
Arc::clone(&self.left),
Arc::clone(&self.right),
self.on.clone(),
&self.join_type,
&self.join_schema,
)?;
// Project statistics if there is a projection
if let Some(projection) = &self.projection {
stats.column_statistics = stats
.column_statistics
.into_iter()
.enumerate()
.filter(|(i, _)| projection.contains(i))
.map(|(_, s)| s)
.collect();
}
Ok(stats)
Ok(stats.project(self.projection.as_ref()))
}
}

Expand Down
49 changes: 49 additions & 0 deletions datafusion/sqllogictest/test_files/parquet.slt
Original file line number Diff line number Diff line change
Expand Up @@ -549,3 +549,52 @@ FixedSizeBinary(16) 0166ce1d46129ad104fa4990c6057c91

statement ok
DROP TABLE test_non_utf8_binary;


## Tests for https://github.com/apache/datafusion/issues/13186
statement ok
create table cpu (time timestamp, usage_idle float, usage_user float, cpu int);

statement ok
insert into cpu values ('1970-01-01 00:00:00', 1.0, 2.0, 3);

# must put it into a parquet file to get statistics
statement ok
copy (select * from cpu) to 'test_files/scratch/parquet/cpu.parquet';

# Run queries against parquet files
statement ok
create external table cpu_parquet
stored as parquet
location 'test_files/scratch/parquet/cpu.parquet';

# Double filtering
#
# Expect 1 row for both queries
query PI
select time, rn
from (
select time, row_number() OVER (ORDER BY usage_idle, time) as rn
from cpu
where cpu = 3
) where rn > 0;
----
1970-01-01T00:00:00 1

query PI
select time, rn
from (
select time, row_number() OVER (ORDER BY usage_idle, time) as rn
from cpu_parquet
where cpu = 3
) where rn > 0;
----
1970-01-01T00:00:00 1


# Clean up
statement ok
drop table cpu;

statement ok
drop table cpu_parquet;
37 changes: 37 additions & 0 deletions datafusion/sqllogictest/test_files/union.slt
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,9 @@ physical_plan
# Clean up after the test
########

statement ok
drop table t

statement ok
drop table t1;

Expand Down Expand Up @@ -761,3 +764,37 @@ SELECT NULL WHERE FALSE;
----
0.5
1

###
# Test for https://github.com/apache/datafusion/issues/11492
###

# Input data is
# a,b,c
# 1,2,3

statement ok
CREATE EXTERNAL TABLE t (
a INT,
b INT,
c INT
)
STORED AS CSV
LOCATION '../core/tests/data/example.csv'
WITH ORDER (a ASC)
OPTIONS ('format.has_header' 'true');

query T
SELECT (SELECT a from t ORDER BY a) UNION ALL (SELECT 'bar' as a from t) ORDER BY a;
----
1
bar

query I
SELECT (SELECT a from t ORDER BY a) UNION ALL (SELECT NULL as a from t) ORDER BY a;
----
1
NULL

statement ok
drop table t
Loading