-
Notifications
You must be signed in to change notification settings - Fork 3.1k
Pushdown of complex operations
The intent of this document is to capture high-level thoughts and ideas about how to add support for pushing down complex operations (filters, projections, aggregations, joins, etc) into connectors and allowing connectors to surface additional filters (for row-level security) or projections (for masking) during query planning.
- This is explicitly a short-term solution. In the long term, we want connectors to be able to provide transformation rules that get evaluated during the optimization loop.
- It's not a goal to build a generic mechanism to support every operation.
- We need to be able to start simple and improve iteratively (near-term improvements should, ideally, be additive).
- It should be implemented using the existing
Rule
/Iterative
optimizer framework instead of visitor-basedPlanOptimizer
s.
The general idea is to introduce a new set of transformation rules, each of which fires on patterns such as filter(tableScan(...))
, project(tableScan(...))
, etc. Each rule would be responsible for pushing down the corresponding type of operation into the table scan. Examples (names TBD) would be: PushFilterIntoConnector
,
PushProjectionIntoConnector
, PushAggregationIntoConnector
, PushJoinIntoConnector
.
These rules interact with connectors via a set of specialized metadata calls. Below is a straw-man for what they might look like (names and concrete signatures TBD). If a connector doesn't support or understand the action, result of the call would indicate so. The language to express filters, projections, aggregates, join criteria, etc. is also TBD.
Metadata.pushFilter(TableHandle, Filter) => TableHandle + remaining filter + new projections
Returns a new table handle and any part of the filter that the connector doesn't understand or support. The result TableHandle represents a table with the same schema as the input TableHandle.
The optimization rule uses this method to transform filter(f1, tablescan(th1))
into filter(f2, tablescan(th2))
.
Let's say a connector knows how to handle LIKE expressions. Given the following plan fragment,
- Filter (a like 'xy%z%' AND f(b)) :: (a varchar, b bigint)
- TableScan (TH(0)) :: (a varchar, b bigint)
a = CH(0)
b = CH(1)
PushFilterIntoTableScan
would call:
Metadata.pushFilter(
TH(0),
call("and",
call("like", CH(0), 'xy%z%'),
call("f", CH(1))))
which would return
new table handle: TH(0')
remaining filter: call("f", CH(1))
The rule would then replace the original fragment with:
- Filter (f(b)) :: (a varchar, b bigint)
- TableScan (TH(0')) :: (a varchar, b bigint)
a = CH(0)
b = CH(1)
Connector knows how to handle sub-expression in filter.
- Filter (a.x > 10) :: (a row(x bigint, y bigint))
- TableScan (TH(0)) :: (a row(x bigint, y bigint))
a = CH(0)
PushFilterIntoTableScan
would call:
Metadata.pushFilter(
TH(0),
call(">",
dereference(CH(0), "x"),
10)
which would return
new table handle: TH(0')
remaining filter: call(">", CH(1), 10)
new projections: [ CH(1) :: bigint ]
The rule would then replace the original fragment with:
- Project [a] :: (a row(x bigint, y bigint))
- Filter (s > 10) :: (a row(x bigint, y bigint), s bigint)
- TableScan (TH(0')) :: (a row(x bigint, y bigint), s bigint)
a = CH(0)
s = CH(1)
Metadata.pushProjection(TableHandle, Projections) -> TableHandle + new projections
Returns a new table handle and a new set of projections that include any projection that aren't supported by the connector.
Let's say a connector knows how to handle dereference expressions. Given the following plan fragment,
- Project [a.x, b + 1] :: (r varchar, s bigint)
- TableScan (TH(0)) :: (a row(x varchar, y bigint), b bigint)
a = CH(0)
b = CH(1)
PushProjectIntoTableScan
would call:
Metadata.pushProjection(
TH(0),
[
dereference(CH(0), "x"),
add(CH(1), 1)
])
which would return
new table handle: TH(0')
new projections: [
CH(0') :: varchar,
add(CH(1), 1) :: bigint
]
The rule would then replace the original fragment with:
- Project [c, b + 1] :: (r varchar, s bigint)
- TableScan (TH(0')) :: (c varchar, b bigint)
c = CH(0')
b = CH(1)
Metadata.pushAggregation(TableHandle, partial?, <group-by columns>, <aggregates>) -> TableHandle + new column handles for aggregates
Given this initial plan fragment:
- Aggregation[SingleStep] :: (k varchar, x bigint, y double)
group by: [k]
aggregates: [x = sum(a), y = avg(b)]
- TableScan(TH(0)) :: (k varchar, a bigint, b bigint)
k = CH(0)
a = CH(1)
b = CH(2)
The rule calls:
Metadata.pushAggregation(
TH(0),
false
[CH(0)],
[
aggregate("sum", CH(1)),
aggregate("avg", CH(2))
])
which returns:
new table handle: TH(0')
new column handles for aggregates: [CH(3), CH(4)]
The fragment is rewritten to:
- TableScan(TH(0')) :: (k varchar, x bigint, y double)
k = CH(0)
x = CH(3)
z = CH(4)
TODO
Metadata.pushJoin(TableHandle, TableHandle, JoinType, Criteria) -> TableHandle + ???
During analysis and initial query planning, connectors can provide additional filters or projections to evaluate immediately after the data is produced during query execution. This can be used to implement row-level filtering based on security rules or masking for sensitive data.
Metadata.getAdditionalFilter(TableHandle) -> Filter
TODO: projections for masking
- We should deprecate
TableLayout
s. They complicate the optimizer and can be replaced by a combination of "pushFilter" above plus a an API to obtain (physical) properties of a table during query optimization. E.g.,
Metadata.getProperties(TableHandle)
-
TupleDomain
is an abstraction we added to deal with the limitation of not being able to communicate more complex expressions to connectors. With these new APIs,TupleDomain
is no longer necessary and we can remove it from the APIs. We'll need to add utility libraries or a toolkit for connector writers to manipulate and extract features out of the expressions.
- How to handle a plan like this one when the filter cannot be pushed into the connector? The generalized predicate pushdown rules moves filters below projections, so adding a rule that moves a projection below a filter would result in non-converging optimization under the IterativeOptimizer. In the long term, it won't be an issue since the optimizer will be able to consider multiple plans simultaneously. One option in the short term is to have specialized rules that match "project(filter(tablescan)))" and handle that specific scenario.
- Project (a.x)
- Filter (f(b))
- TableScan (TH(0))