Skip to content

Commit

Permalink
[FLINK-27876][table-planner] Choose the right side as build side when…
Browse files Browse the repository at this point in the history
… using default shuffle hash strategy if left size is equal with right (apache#19866)

[FLINK-27876][table-planner] Choose the right side as build side when using default shuffle hash strategy if left size is equal to right
  • Loading branch information
xuyangzhong committed Dec 17, 2023
1 parent ac88acf commit f7ababa
Show file tree
Hide file tree
Showing 22 changed files with 880 additions and 880 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ trait BatchPhysicalJoinRuleBase {
val leftIsBuild = if (leftSize == null || rightSize == null || leftSize == rightSize) {
// use left to build hash table if leftSize or rightSize is unknown or equal size.
// choose right to build if join is SEMI/ANTI.
!join.getJoinType.projectsRight
join.getJoinType.projectsRight
} else {
leftSize < rightSize
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ LogicalProject(a=[$0], EXPR$1=[$1], d=[$2], EXPR$10=[$3])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
== Optimized Physical Plan ==
HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, EXPR$1, d, EXPR$10], build=[right])
HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, EXPR$1, d, EXPR$10], build=[left])
:- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_SUM(sum$0) AS EXPR$1])
: +- Exchange(distribution=[hash[a]])
: +- LocalHashAggregate(groupBy=[a], select=[a, Partial_SUM(b) AS sum$0])
Expand All @@ -43,7 +43,7 @@ HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, EXPR$1, d, EXPR$10],
+- BoundedStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[d, e, f])
== Optimized Execution Plan ==
MultipleInput(readOrder=[1,0], members=[\nHashJoin(joinType=[InnerJoin], where=[(a = d)], select=[a, EXPR$1, d, EXPR$10], build=[right])\n:- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_SUM(sum$0) AS EXPR$1])\n: +- [#1] Exchange(distribution=[hash[a]])\n+- HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_SUM(sum$0) AS EXPR$1])\n +- [#2] Exchange(distribution=[hash[d]])\n])
MultipleInput(readOrder=[0,1], members=[\nHashJoin(joinType=[InnerJoin], where=[(a = d)], select=[a, EXPR$1, d, EXPR$10], build=[left])\n:- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_SUM(sum$0) AS EXPR$1])\n: +- [#1] Exchange(distribution=[hash[a]])\n+- HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_SUM(sum$0) AS EXPR$1])\n +- [#2] Exchange(distribution=[hash[d]])\n])
:- Exchange(distribution=[hash[a]])
: +- LocalHashAggregate(groupBy=[a], select=[a, Partial_SUM(b) AS sum$0])
: +- Calc(select=[a, b])
Expand All @@ -69,7 +69,7 @@ LogicalProject(a=[$0], EXPR$1=[$1], d=[$2], EXPR$10=[$3])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
== Optimized Physical Plan ==
HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, EXPR$1, d, EXPR$10], build=[right]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, EXPR$1, d, EXPR$10], build=[left]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
:- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_SUM(sum$0) AS EXPR$1]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
: +- Exchange(distribution=[hash[a]]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
: +- LocalHashAggregate(groupBy=[a], select=[a, Partial_SUM(b) AS sum$0]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
Expand All @@ -82,7 +82,7 @@ HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, EXPR$1, d, EXPR$10],
+- BoundedStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[d, e, f]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
== Optimized Execution Plan ==
MultipleInput(readOrder=[1,0], members=[\nHashJoin(joinType=[InnerJoin], where=[(a = d)], select=[a, EXPR$1, d, EXPR$10], build=[right])\n:- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_SUM(sum$0) AS EXPR$1])\n: +- [#1] Exchange(distribution=[hash[a]])\n+- HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_SUM(sum$0) AS EXPR$1])\n +- [#2] Exchange(distribution=[hash[d]])\n])
MultipleInput(readOrder=[0,1], members=[\nHashJoin(joinType=[InnerJoin], where=[(a = d)], select=[a, EXPR$1, d, EXPR$10], build=[left])\n:- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_SUM(sum$0) AS EXPR$1])\n: +- [#1] Exchange(distribution=[hash[a]])\n+- HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_SUM(sum$0) AS EXPR$1])\n +- [#2] Exchange(distribution=[hash[d]])\n])
:- Exchange(distribution=[hash[a]])
: +- LocalHashAggregate(groupBy=[a], select=[a, Partial_SUM(b) AS sum$0])
: +- Calc(select=[a, b])
Expand All @@ -94,7 +94,7 @@ MultipleInput(readOrder=[1,0], members=[\nHashJoin(joinType=[InnerJoin], where=[
]]>
</Resource>
</TestCase>
<TestCase name="testExplainWithAgg[extended=false]">
<TestCase name="testExplainWithAgg[extended=true]">
<Resource name="explain">
<![CDATA[== Abstract Syntax Tree ==
LogicalProject(EXPR$0=[$1])
Expand All @@ -103,12 +103,12 @@ LogicalProject(EXPR$0=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
== Optimized Physical Plan ==
Calc(select=[EXPR$0])
+- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count1$0) AS EXPR$0])
+- Exchange(distribution=[hash[a]])
+- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(*) AS count1$0])
+- Calc(select=[a])
+- BoundedStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c])
Calc(select=[EXPR$0]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count1$0) AS EXPR$0]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+- Exchange(distribution=[hash[a]]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(*) AS count1$0]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+- Calc(select=[a]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+- BoundedStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
== Optimized Execution Plan ==
Calc(select=[EXPR$0])
Expand All @@ -120,7 +120,7 @@ Calc(select=[EXPR$0])
]]>
</Resource>
</TestCase>
<TestCase name="testExplainWithAgg[extended=true]">
<TestCase name="testExplainWithAgg[extended=false]">
<Resource name="explain">
<![CDATA[== Abstract Syntax Tree ==
LogicalProject(EXPR$0=[$1])
Expand All @@ -129,12 +129,12 @@ LogicalProject(EXPR$0=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
== Optimized Physical Plan ==
Calc(select=[EXPR$0]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count1$0) AS EXPR$0]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+- Exchange(distribution=[hash[a]]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(*) AS count1$0]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+- Calc(select=[a]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+- BoundedStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
Calc(select=[EXPR$0])
+- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count1$0) AS EXPR$0])
+- Exchange(distribution=[hash[a]])
+- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(*) AS count1$0])
+- Calc(select=[a])
+- BoundedStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c])
== Optimized Execution Plan ==
Calc(select=[EXPR$0])
Expand Down Expand Up @@ -446,28 +446,28 @@ SortLimit(orderBy=[a ASC], offset=[0], fetch=[5], global=[true])
]]>
</Resource>
</TestCase>
<TestCase name="testExplainWithTableSourceScan[extended=false]">
<TestCase name="testExplainWithTableSourceScan[extended=true]">
<Resource name="explain">
<![CDATA[== Abstract Syntax Tree ==
LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
== Optimized Physical Plan ==
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
== Optimized Execution Plan ==
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
<TestCase name="testExplainWithTableSourceScan[extended=true]">
<TestCase name="testExplainWithTableSourceScan[extended=false]">
<Resource name="explain">
<![CDATA[== Abstract Syntax Tree ==
LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
== Optimized Physical Plan ==
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
== Optimized Execution Plan ==
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ LegacySink(name=[`default_catalog`.`default_database`.`sink`], fields=[a, b, c,
+- Correlate(invocation=[split($cor0.c)], correlate=[table(split($cor0.c))], select=[a,b,c,d,e,f,i,j,k,l,m,EXPR$0], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER d, BIGINT e, VARCHAR(2147483647) f, INTEGER i, BIGINT j, INTEGER k, VARCHAR(2147483647) l, BIGINT m, VARCHAR(2147483647) EXPR$0)], joinType=[INNER])
+- HashJoin(joinType=[InnerJoin], where=[(a = i)], select=[a, b, c, d, e, f, i, j, k, l, m], build=[left])
:- Exchange(distribution=[hash[a]])
: +- HashJoin(joinType=[InnerJoin], where=[(b = e)], select=[a, b, c, d, e, f], build=[right])
: +- HashJoin(joinType=[InnerJoin], where=[(b = e)], select=[a, b, c, d, e, f], build=[left])
: :- Exchange(distribution=[hash[b]])
: : +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: +- Exchange(distribution=[hash[e]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ LogicalIntersect(all=[false])
<![CDATA[
HashAggregate(isMerge=[false], groupBy=[a, b, c], select=[a, b, c])
+- Exchange(distribution=[keep_input_as_is[hash[a, b, c]]])
+- HashJoin(joinType=[LeftSemiJoin], where=[(((a = a0) OR (a IS NULL AND a0 IS NULL)) AND ((b = b0) OR (b IS NULL AND b0 IS NULL)) AND ((c = c0) OR (c IS NULL AND c0 IS NULL)))], select=[a, b, c], build=[left])
:- Exchange(distribution=[hash[a, b, c]])
+- HashJoin(joinType=[LeftSemiJoin], where=[(((a = a0) OR (a IS NULL AND a0 IS NULL)) AND ((b = b0) OR (b IS NULL AND b0 IS NULL)) AND ((c = c0) OR (c IS NULL AND c0 IS NULL)))], select=[a, b, c], build=[right])
:- Exchange(distribution=[hash[a, b, c]], shuffle_mode=[BATCH])
: +- BoundedStreamScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])
+- Exchange(distribution=[hash[a, b, c]], shuffle_mode=[BATCH])
+- Exchange(distribution=[hash[a, b, c]])
+- BoundedStreamScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])
]]>
</Resource>
Expand Down Expand Up @@ -319,19 +319,19 @@ LogicalUnion(all=[true])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
MultipleInput(readOrder=[0,2,1], members=[\nUnion(all=[true], union=[cnt1, cnt2])\n:- Calc(select=[CAST(cnt1 AS BIGINT) AS cnt1, cnt2])\n: +- HashJoin(joinType=[LeftOuterJoin], where=[(a = d)], select=[a, cnt1, d, cnt2], build=[right])\n: :- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count1$0) AS cnt1])(reuse_id=[1])\n: : +- [#3] Exchange(distribution=[hash[a]])\n: +- [#1] HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count1$0) AS cnt2])\n+- Calc(select=[cnt1, CAST(cnt2 AS BIGINT) AS cnt2])\n +- HashJoin(joinType=[LeftOuterJoin], where=[(d = a)], select=[d, cnt2, a, cnt1], build=[right])\n :- [#2] Exchange(distribution=[hash[d]], shuffle_mode=[BATCH])\n +- Reused(reference_id=[1])\n])
:- Exchange(distribution=[keep_input_as_is[hash[d]]])
: +- HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count1$0) AS cnt2])(reuse_id=[1])
: +- Exchange(distribution=[hash[d]])
: +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(*) AS count1$0])
: +- Calc(select=[d])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
:- Exchange(distribution=[hash[d]], shuffle_mode=[BATCH])
MultipleInput(readOrder=[0,2,1], members=[\nUnion(all=[true], union=[cnt1, cnt2])\n:- Calc(select=[CAST(cnt1 AS BIGINT) AS cnt1, cnt2])\n: +- HashJoin(joinType=[LeftOuterJoin], where=[(a = d)], select=[a, cnt1, d, cnt2], build=[left])\n: :- [#1] HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count1$0) AS cnt1])\n: +- HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count1$0) AS cnt2])(reuse_id=[1])\n: +- [#3] Exchange(distribution=[hash[d]])\n+- Calc(select=[cnt1, CAST(cnt2 AS BIGINT) AS cnt2])\n +- HashJoin(joinType=[LeftOuterJoin], where=[(d = a)], select=[d, cnt2, a, cnt1], build=[left])\n :- Reused(reference_id=[1])\n +- [#2] Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])\n])
:- Exchange(distribution=[keep_input_as_is[hash[a]]])
: +- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count1$0) AS cnt1])(reuse_id=[1])
: +- Exchange(distribution=[hash[a]])
: +- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(*) AS count1$0])
: +- Calc(select=[a])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
:- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
: +- Reused(reference_id=[1])
+- Exchange(distribution=[hash[a]])
+- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(*) AS count1$0])
+- Calc(select=[a])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[d]])
+- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(*) AS count1$0])
+- Calc(select=[d])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
]]>
</Resource>
</TestCase>
Expand Down Expand Up @@ -369,7 +369,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$3], b0=[$4], c0=[$5], a1=[$6], b1=[$
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
HashJoin(joinType=[InnerJoin], where=[(c = c1)], select=[a, b, c, a0, b0, c0, a1, b1, c1, a00, b00, c00], build=[right])
HashJoin(joinType=[InnerJoin], where=[(c = c1)], select=[a, b, c, a0, b0, c0, a1, b1, c1, a00, b00, c00], build=[left])
:- Exchange(distribution=[hash[c]])
: +- HashJoin(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, c, a0, b0, c0], build=[left])
: :- Exchange(distribution=[keep_input_as_is[hash[a]]])
Expand Down Expand Up @@ -416,11 +416,11 @@ LogicalProject(a=[$0], a0=[$1])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
HashJoin(joinType=[InnerJoin], where=[(a = a0)], select=[a, a0], build=[right])
:- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
HashJoin(joinType=[InnerJoin], where=[(a = a0)], select=[a, a0], build=[left])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a])(reuse_id=[1])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[a]])
+- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
+- Reused(reference_id=[1])
]]>
</Resource>
Expand Down Expand Up @@ -451,17 +451,17 @@ LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3], a0=[$4], b0=[$5], d0=[$6], e0=[$7
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
HashJoin(joinType=[InnerJoin], where=[(b = e0)], select=[a, b, d, e, a0, b0, d0, e0], build=[right])
:- Exchange(distribution=[hash[b]], shuffle_mode=[BATCH])
: +- HashJoin(joinType=[InnerJoin], where=[(a = d)], select=[a, b, d, e], build=[right])
HashJoin(joinType=[InnerJoin], where=[(b = e0)], select=[a, b, d, e, a0, b0, d0, e0], build=[left])
:- Exchange(distribution=[hash[b]])
: +- HashJoin(joinType=[InnerJoin], where=[(a = d)], select=[a, b, d, e], build=[left])
: :- Exchange(distribution=[hash[a]])
: : +- Calc(select=[a, b], where=[(a < 10)])
: : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])(reuse_id=[1])
: +- Exchange(distribution=[hash[d]])
: +- Calc(select=[d, e], where=[(d < 10)])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])(reuse_id=[2])
+- Exchange(distribution=[hash[e]])
+- HashJoin(joinType=[InnerJoin], where=[(a = d)], select=[a, b, d, e], build=[right])
+- Exchange(distribution=[hash[e]], shuffle_mode=[BATCH])
+- HashJoin(joinType=[InnerJoin], where=[(a = d)], select=[a, b, d, e], build=[left])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a, b], where=[(a > 5)])
: +- Reused(reference_id=[1])
Expand Down Expand Up @@ -539,8 +539,8 @@ LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$3], b0=[$4], c0=[$5])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
HashJoin(joinType=[InnerJoin], where=[(c = c0)], select=[a, b, c, a0, b0, c0], build=[right])
:- Exchange(distribution=[hash[c]], shuffle_mode=[BATCH])
HashJoin(joinType=[InnerJoin], where=[(c = c0)], select=[a, b, c, a0, b0, c0], build=[left])
:- Exchange(distribution=[hash[c]])
: +- Calc(select=[w0$o0 AS a, b, c])
: +- OverAggregate(partitionBy=[b], orderBy=[b ASC], window#0=[MAX(a) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[a, b, c, w0$o0])
: +- Exchange(distribution=[forward])
Expand All @@ -551,7 +551,7 @@ HashJoin(joinType=[InnerJoin], where=[(c = c0)], select=[a, b, c, a0, b0, c0], b
: +- Sort(orderBy=[b ASC])
: +- Exchange(distribution=[hash[b]])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[c]])
+- Exchange(distribution=[hash[c]], shuffle_mode=[BATCH])
+- Calc(select=[w0$o0 AS a, b, c])
+- OverAggregate(partitionBy=[b], orderBy=[b ASC], window#0=[MIN(a) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[a, b, c, w0$o0])
+- Exchange(distribution=[forward])
Expand Down
Loading

0 comments on commit f7ababa

Please sign in to comment.