Skip to content

Commit

Permalink
clean up and move optimizer module & add/modify some comments (#64)
Browse files Browse the repository at this point in the history
  • Loading branch information
hozan23 authored Sep 23, 2024
1 parent 36b4742 commit 538eed9
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 124 deletions.
Original file line number Diff line number Diff line change
@@ -1,132 +1,65 @@
mod scan_result;

use std::sync::Arc;

use datafusion::common::not_impl_err;
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
use datafusion::logical_expr::Extension;
use datafusion::optimizer::optimizer::Optimizer;
use datafusion::optimizer::{OptimizerConfig, OptimizerRule};
use datafusion::{
common::not_impl_err,
common::tree_node::{Transformed, TreeNode, TreeNodeRecursion},
datasource::source_as_provider,
error::Result,
logical_expr::{Expr, LogicalPlan, Projection, TableScan, TableSource},
logical_expr::{Expr, Extension, LogicalPlan, Projection, TableScan, TableSource},
optimizer::optimizer::{Optimizer, OptimizerConfig, OptimizerRule},
};

use crate::{
FederatedTableProviderAdaptor, FederatedTableSource, FederationProvider, FederationProviderRef,
};

use scan_result::ScanResult;

/// An optimizer rule to identifying sub-plans to federate
///
/// The optimizer logic walks over the plan, look for the largest subtrees that only have
/// TableScans from the same [`FederationProvider`]. There 'largest sub-trees' are passed to their
/// respective [`FederationProvider::optimizer`].
#[derive(Default)]
pub struct FederationOptimizerRule {}

impl OptimizerRule for FederationOptimizerRule {
// Walk over the plan, look for the largest subtrees that only have
// TableScans from the same FederationProvider.
// There 'largest sub-trees' are passed to their respective FederationProvider.optimizer.
fn try_optimize(
/// Try to rewrite `plan` to an optimized form, returning `Transformed::yes`
/// if the plan was rewritten and `Transformed::no` if it was not.
///
/// Note: this function is only called if [`Self::supports_rewrite`] returns
/// true. Otherwise the Optimizer calls [`Self::try_optimize`]
fn rewrite(
&self,
plan: &LogicalPlan,
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let (optimized, _) = self.optimize_plan_recursively(plan, true, config)?;
Ok(optimized)
}

/// A human readable name for this optimizer rule
fn name(&self) -> &str {
"federation_optimizer_rule"
) -> Result<Transformed<LogicalPlan>> {
match self.optimize_plan_recursively(&plan, true, config)? {
(Some(optimized_plan), _) => Ok(Transformed::yes(optimized_plan)),
(None, _) => Ok(Transformed::no(plan)),
}
}

/// XXX
/// Does this rule support rewriting owned plans (rather than by reference)?
fn supports_rewrite(&self) -> bool {
false
}
}

enum ScanResult {
None,
Distinct(FederationProviderRef),
Ambiguous,
}

impl ScanResult {
fn merge(&mut self, other: Self) {
match (&self, &other) {
(_, ScanResult::None) => {}
(ScanResult::None, _) => *self = other,
(ScanResult::Ambiguous, _) | (_, ScanResult::Ambiguous) => {
*self = ScanResult::Ambiguous
}
(ScanResult::Distinct(provider), ScanResult::Distinct(other_provider)) => {
if provider != other_provider {
*self = ScanResult::Ambiguous
}
}
}
}
fn add(&mut self, provider: Option<FederationProviderRef>) {
self.merge(ScanResult::from(provider))
}
fn is_ambiguous(&self) -> bool {
matches!(self, ScanResult::Ambiguous)
true
}
fn is_none(&self) -> bool {
matches!(self, ScanResult::None)
}
fn is_some(&self) -> bool {
!self.is_none()
}
fn unwrap(self) -> Option<FederationProviderRef> {
match self {
ScanResult::None => None,
ScanResult::Distinct(provider) => Some(provider),
ScanResult::Ambiguous => panic!("called `ScanResult::unwrap()` on a `Ambiguous` value"),
}
}
fn check_recursion(&self) -> TreeNodeRecursion {
if self.is_ambiguous() {
TreeNodeRecursion::Stop
} else {
TreeNodeRecursion::Continue
}
}
}

impl From<Option<FederationProviderRef>> for ScanResult {
fn from(provider: Option<FederationProviderRef>) -> Self {
match provider {
Some(provider) => ScanResult::Distinct(provider),
None => ScanResult::None,
}
}
}

impl PartialEq<Option<FederationProviderRef>> for ScanResult {
fn eq(&self, other: &Option<FederationProviderRef>) -> bool {
match (self, other) {
(ScanResult::None, None) => true,
(ScanResult::Distinct(provider), Some(other_provider)) => provider == other_provider,
_ => false,
}
}
}

impl Clone for ScanResult {
fn clone(&self) -> Self {
match self {
ScanResult::None => ScanResult::None,
ScanResult::Distinct(provider) => ScanResult::Distinct(provider.clone()),
ScanResult::Ambiguous => ScanResult::Ambiguous,
}
/// A human readable name for this optimizer rule
fn name(&self) -> &str {
"federation_optimizer_rule"
}
}

impl FederationOptimizerRule {
/// Creates a new [`FederationOptimizerRule`]
pub fn new() -> Self {
Self::default()
}

// scans a plan to see if it belongs to a single FederationProvider
/// Scans a plan to see if it belongs to a single [`FederationProvider`].
fn scan_plan_recursively(&self, plan: &LogicalPlan) -> Result<ScanResult> {
let mut sole_provider: ScanResult = ScanResult::None;

Expand All @@ -147,7 +80,7 @@ impl FederationOptimizerRule {
Ok(sole_provider)
}

// scans a plan's expressions to see if it belongs to a single FederationProvider
/// Scans a plan's expressions to see if it belongs to a single [`FederationProvider`].
fn scan_plan_exprs(&self, plan: &LogicalPlan) -> Result<ScanResult> {
let mut sole_provider: ScanResult = ScanResult::None;

Expand All @@ -164,7 +97,7 @@ impl FederationOptimizerRule {
Ok(sole_provider)
}

// scans an expression to see if it belongs to a single FederationProvider
/// scans an expression to see if it belongs to a single [`FederationProvider`]
fn scan_expr_recursively(&self, expr: &Expr) -> Result<ScanResult> {
let mut sole_provider: ScanResult = ScanResult::None;

Expand Down Expand Up @@ -192,18 +125,18 @@ impl FederationOptimizerRule {
Ok(sole_provider)
}

// optimize_recursively recursively finds the largest sub-plans that can be federated
// to a single FederationProvider.
// Returns a plan if a sub-tree was federated, otherwise None.
// Returns a ScanResult of all FederationProviders in the subtree.
/// Recursively finds the largest sub-plans that can be federated
/// to a single FederationProvider.
///
/// Returns a plan if a sub-tree was federated, otherwise None.
///
/// Returns a ScanResult of all FederationProviders in the subtree.
fn optimize_plan_recursively(
&self,
plan: &LogicalPlan,
is_root: bool,
_config: &dyn OptimizerConfig,
) -> Result<(Option<LogicalPlan>, ScanResult)> {
// Used to track if all sources, including tableScan, plan inputs and
// expressions, represents an un-ambiguous or 'sole' FederationProvider
let mut sole_provider: ScanResult = ScanResult::None;

if let LogicalPlan::Extension(Extension { ref node }) = plan {
Expand Down Expand Up @@ -322,7 +255,7 @@ impl FederationOptimizerRule {
Ok((Some(new_plan), ScanResult::Ambiguous))
}

// Optimize all exprs of a plan
/// Optimizes all exprs of a plan
fn optimize_plan_exprs(
&self,
plan: &LogicalPlan,
Expand All @@ -339,8 +272,8 @@ impl FederationOptimizerRule {
.collect::<Result<Vec<_>>>()
}

// recursively optimize expressions
// Current logic: individually federate every sub-query.
/// recursively optimize expressions
/// Current logic: individually federate every sub-query.
fn optimize_expr_recursively(
&self,
expr: Expr,
Expand All @@ -364,8 +297,8 @@ impl FederationOptimizerRule {
}
}

// NopFederationProvider is used to represent tables that are not federated, but
// are resolved by DataFusion. This simplifies the logic of the optimizer rule.
/// NopFederationProvider is used to represent tables that are not federated, but
/// are resolved by DataFusion. This simplifies the logic of the optimizer rule.
struct NopFederationProvider {}

impl FederationProvider for NopFederationProvider {
Expand Down
98 changes: 98 additions & 0 deletions datafusion-federation/src/optimizer/scan_result.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use datafusion::common::tree_node::TreeNodeRecursion;

use crate::FederationProviderRef;

/// Used to track if all sources, including tableScan, plan inputs and
/// expressions, represents an un-ambiguous, none or a sole' [`crate::FederationProvider`].
pub enum ScanResult {
None,
Distinct(FederationProviderRef),
Ambiguous,
}

impl ScanResult {
pub fn merge(&mut self, other: Self) {
match (&self, &other) {
(_, ScanResult::None) => {}
(ScanResult::None, _) => *self = other,
(ScanResult::Ambiguous, _) | (_, ScanResult::Ambiguous) => {
*self = ScanResult::Ambiguous
}
(ScanResult::Distinct(provider), ScanResult::Distinct(other_provider)) => {
if provider != other_provider {
*self = ScanResult::Ambiguous
}
}
}
}

pub fn add(&mut self, provider: Option<FederationProviderRef>) {
self.merge(ScanResult::from(provider))
}

pub fn is_ambiguous(&self) -> bool {
matches!(self, ScanResult::Ambiguous)
}

pub fn is_none(&self) -> bool {
matches!(self, ScanResult::None)
}
pub fn is_some(&self) -> bool {
!self.is_none()
}

pub fn unwrap(self) -> Option<FederationProviderRef> {
match self {
ScanResult::None => None,
ScanResult::Distinct(provider) => Some(provider),
ScanResult::Ambiguous => panic!("called `ScanResult::unwrap()` on a `Ambiguous` value"),
}
}

pub fn check_recursion(&self) -> TreeNodeRecursion {
if self.is_ambiguous() {
TreeNodeRecursion::Stop
} else {
TreeNodeRecursion::Continue
}
}
}

impl From<Option<FederationProviderRef>> for ScanResult {
fn from(provider: Option<FederationProviderRef>) -> Self {
match provider {
Some(provider) => ScanResult::Distinct(provider),
None => ScanResult::None,
}
}
}

impl PartialEq<Option<FederationProviderRef>> for ScanResult {
fn eq(&self, other: &Option<FederationProviderRef>) -> bool {
match (self, other) {
(ScanResult::None, None) => true,
(ScanResult::Distinct(provider), Some(other_provider)) => provider == other_provider,
_ => false,
}
}
}

impl std::fmt::Debug for ScanResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::None => write!(f, "ScanResult::None"),
Self::Distinct(provider) => write!(f, "ScanResult::Distinct({})", provider.name()),
Self::Ambiguous => write!(f, "ScanResult::Ambiguous"),
}
}
}

impl Clone for ScanResult {
fn clone(&self) -> Self {
match self {
ScanResult::None => ScanResult::None,
ScanResult::Distinct(provider) => ScanResult::Distinct(provider.clone()),
ScanResult::Ambiguous => ScanResult::Ambiguous,
}
}
}
14 changes: 9 additions & 5 deletions datafusion-federation/src/sql/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,33 @@ pub type AstAnalyzer = Box<dyn Fn(ast::Statement) -> Result<ast::Statement>>;

#[async_trait]
pub trait SQLExecutor: Sync + Send {
// Context
/// Executor name
fn name(&self) -> &str;

/// Executor compute context allows differentiating the remote compute context
/// such as authorization or active database.
///
/// Note: returning None here may cause incorrect federation with other providers of the
/// same name that also have a compute_context of None.
/// Instead try to return a unique string that will never match any other
/// provider's context.
fn compute_context(&self) -> Option<String>;

// The specific SQL dialect (currently supports 'sqlite', 'postgres', 'flight')
/// The specific SQL dialect (currently supports 'sqlite', 'postgres', 'flight')
fn dialect(&self) -> Arc<dyn Dialect>;

/// Returns an AST analyzer specific for this engine to modify the AST before execution
fn ast_analyzer(&self) -> Option<AstAnalyzer> {
None
}

// Execution
/// Execute a SQL query
fn execute(&self, query: &str, schema: SchemaRef) -> Result<SendableRecordBatchStream>;

// Schema inference
/// Returns the tables provided by the remote
async fn table_names(&self) -> Result<Vec<String>>;
/// Returns the schema of table_name within this SQLExecutor

/// Returns the schema of table_name within this [`SQLExecutor`]
async fn get_table_schema(&self, table_name: &str) -> Result<SchemaRef>;
}

Expand Down
Loading

0 comments on commit 538eed9

Please sign in to comment.