Skip to content

Commit

Permalink
update deps & use datafusion 43
Browse files Browse the repository at this point in the history
  • Loading branch information
hozan23 committed Nov 10, 2024
1 parent c5e3149 commit b46dd10
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 19 deletions.
18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ readme = "README.md"
repository = "https://github.com/datafusion-contrib/datafusion-federation"

[workspace.dependencies]
arrow = "53.0.0"
arrow-flight = { version = "53.0.0", features = ["flight-sql-experimental"] }
arrow-json = "53.0.0"
arrow = "53.2"
arrow-flight = { version = "53.2", features = ["flight-sql-experimental"] }
arrow-json = "53.2"
async-stream = "0.3.5"
async-trait = "0.1.81"
datafusion = "42.0.0"
async-trait = "0.1.83"
datafusion = "43.0.0"
datafusion-federation = { path = "./datafusion-federation", version = "0.3.0" }
datafusion-substrait = "42.0.0"
futures = "0.3.30"
tokio = { version = "1.39.3", features = ["full"] }
tonic = { version = "0.12.1", features = [
datafusion-substrait = "43.0.0"
futures = "0.3.31"
tokio = { version = "1.41", features = ["full"] }
tonic = { version = "0.12", features = [
"tls",
"transport",
"codegen",
Expand Down
2 changes: 1 addition & 1 deletion datafusion-federation/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use scan_result::ScanResult;
/// The optimizer logic walks over the plan, look for the largest subtrees that only have
/// TableScans from the same [`FederationProvider`]. There 'largest sub-trees' are passed to their
/// respective [`FederationProvider::optimizer`].
#[derive(Default)]
#[derive(Default, Debug)]
pub struct FederationOptimizerRule {}

impl OptimizerRule for FederationOptimizerRule {
Expand Down
15 changes: 13 additions & 2 deletions datafusion-federation/src/plan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ impl UserDefinedLogicalNodeCore for FederatedPlanNode {
}
}

#[derive(Default)]

#[derive(Default, Debug)]
pub struct FederatedQueryPlanner {}

impl FederatedQueryPlanner {
Expand Down Expand Up @@ -110,13 +109,25 @@ pub trait FederationPlanner: Send + Sync {
) -> Result<Arc<dyn ExecutionPlan>>;
}

impl std::fmt::Debug for dyn FederationPlanner {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "FederationPlanner")
}
}

impl PartialEq<FederatedPlanNode> for FederatedPlanNode {
/// Comparing name, args and return_type
fn eq(&self, other: &FederatedPlanNode) -> bool {
self.plan == other.plan
}
}

impl PartialOrd<FederatedPlanNode> for FederatedPlanNode {
fn partial_cmp(&self, other: &FederatedPlanNode) -> Option<std::cmp::Ordering> {
self.plan.partial_cmp(&other.plan)
}
}

impl Eq for FederatedPlanNode {}

impl Hash for FederatedPlanNode {
Expand Down
6 changes: 4 additions & 2 deletions datafusion-federation/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::{
// extern crate derive_builder;

// SQLFederationProvider provides federation to SQL DMBSs.
#[derive(Debug)]
pub struct SQLFederationProvider {
optimizer: Arc<Optimizer>,
executor: Arc<dyn SQLExecutor>,
Expand Down Expand Up @@ -70,6 +71,7 @@ impl FederationProvider for SQLFederationProvider {
}
}

#[derive(Debug)]
struct SQLFederationOptimizerRule {
planner: Arc<dyn FederationPlanner>,
}
Expand Down Expand Up @@ -926,8 +928,8 @@ mod tests {
),
// different tables in single aggregation expression
(
"SELECT COUNT(CASE WHEN app_table.a > 0 THEN app_table.a ELSE foo.df_table.a END) FROM app_table, foo.df_table",
r#"SELECT count(CASE WHEN (remote_table.a > 0) THEN remote_table.a ELSE remote_table.a END) FROM remote_table JOIN remote_table ON true"#,
"SELECT COUNT(CASE WHEN appt.a > 0 THEN appt.a ELSE dft.a END) FROM app_table as appt, foo.df_table as dft",
"SELECT count(CASE WHEN (appt.a > 0) THEN appt.a ELSE dft.a END) FROM remote_table AS appt JOIN remote_table AS dft"
),
];

Expand Down
3 changes: 3 additions & 0 deletions datafusion-federation/src/sql/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
FederationProvider,
};

#[derive(Debug)]
pub struct SQLSchemaProvider {
// provider: Arc<SQLFederationProvider>,
tables: Vec<Arc<SQLTableSource>>,
Expand Down Expand Up @@ -73,6 +74,7 @@ impl SchemaProvider for SQLSchemaProvider {
}
}

#[derive(Debug)]
pub struct MultiSchemaProvider {
children: Vec<Arc<dyn SchemaProvider>>,
}
Expand Down Expand Up @@ -107,6 +109,7 @@ impl SchemaProvider for MultiSchemaProvider {
}
}

#[derive(Debug)]
pub struct SQLTableSource {
provider: Arc<SQLFederationProvider>,
table_name: String,
Expand Down
19 changes: 16 additions & 3 deletions datafusion-federation/src/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@ use datafusion::{
common::Constraints,
datasource::TableProvider,
error::{DataFusionError, Result},
logical_expr::{Expr, LogicalPlan, TableProviderFilterPushDown, TableSource, TableType},
logical_expr::{
dml::InsertOp, Expr, LogicalPlan, TableProviderFilterPushDown, TableSource, TableType,
},
physical_plan::ExecutionPlan,
};

use crate::FederationProvider;

// FederatedTableSourceWrapper helps to recover the FederatedTableSource
// from a TableScan. This wrapper may be avoidable.
#[derive(Debug)]
pub struct FederatedTableProviderAdaptor {
pub source: Arc<dyn FederatedTableSource>,
pub table_provider: Option<Arc<dyn TableProvider>>,
Expand Down Expand Up @@ -124,10 +127,10 @@ impl TableProvider for FederatedTableProviderAdaptor {
&self,
_state: &dyn Session,
input: Arc<dyn ExecutionPlan>,
overwrite: bool,
insert_op: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> {
if let Some(table_provider) = &self.table_provider {
return table_provider.insert_into(_state, input, overwrite).await;
return table_provider.insert_into(_state, input, insert_op).await;
}

Err(DataFusionError::NotImplemented(
Expand All @@ -143,3 +146,13 @@ pub trait FederatedTableSource: TableSource {
// Return the FederationProvider associated with this Table
fn federation_provider(&self) -> Arc<dyn FederationProvider>;
}

impl std::fmt::Debug for dyn FederatedTableSource {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"FederatedTableSource: {:?}",
self.federation_provider().name()
)
}
}
4 changes: 2 additions & 2 deletions datafusion-flight-sql-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ impl FlightSqlService {
/// Replaces the sql_options with the provided options.
/// These options are used to verify all SQL queries.
/// When None the default [`SQLOptions`] are used.
pub fn with_sql_options(self, sql_options: Option<SQLOptions>) -> Self {
pub fn with_sql_options(self, sql_options: SQLOptions) -> Self {
Self {
sql_options,
sql_options: Some(sql_options),
..self
}
}
Expand Down

0 comments on commit b46dd10

Please sign in to comment.