Skip to content

Commit

Permalink
SOLR-15489: Implement OFFSET & FETCH for LIMIT SQL queries (apache#191)
Browse files Browse the repository at this point in the history
  • Loading branch information
thelabdude authored Jun 29, 2021
1 parent e89593c commit 0907fff
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 31 deletions.
2 changes: 2 additions & 0 deletions solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ when told to. The admin UI now tells it to. (Nazerke Seidan, David Smiley)

* SOLR-15456: Get field type info from luke for custom fields instead of defaulting to String in Parallel SQL (Timothy Potter)

* SOLR-15489: Implement OFFSET & FETCH for LIMIT SQL queries (Timothy Potter)

Other Changes
----------------------
* SOLR-14656: Autoscaling framework removed (Ishan Chattopadhyaya, noble, Ilan Ginzburg)
Expand Down
23 changes: 21 additions & 2 deletions solr/core/src/java/org/apache/solr/handler/sql/LimitStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,18 @@ class LimitStream extends TupleStream {

private final TupleStream stream;
private final int limit;
private final int offset;
private int count;

LimitStream(TupleStream stream, int limit) {
this(stream, limit, 0);
}

LimitStream(TupleStream stream, int limit, int offset) {
this.stream = stream;
this.limit = limit;
this.offset = offset > 0 ? offset : 0;
this.count = 0;
}

public void open() throws IOException {
Expand Down Expand Up @@ -75,8 +82,20 @@ public Explanation toExplanation(StreamFactory factory) throws IOException {
}

public Tuple read() throws IOException {
++count;
if(count > limit) {

if (count == 0 && offset > 0) {
// skip offset # of sorted tuples (indexes 0 to offset-1) so that the first tuple returned
while (count < offset) {
++count; // don't increment until after the compare ...
Tuple skip = stream.read();
if (skip.EOF) {
return skip;
}
}
}

// done once we've reached the tuple after limit + offset
if (++count > (limit + offset)) {
return Tuple.EOF();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ enum SolrMethod {
List.class,
String.class,
String.class,
String.class,
String.class);

public final Method method;
Expand Down
5 changes: 5 additions & 0 deletions solr/core/src/java/org/apache/solr/handler/sql/SolrRel.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class Implementor {
String havingPredicate;
boolean negativeQuery;
String limitValue = null;
String offsetValue = null;
final List<Pair<String, String>> orders = new ArrayList<>();
final List<String> buckets = new ArrayList<>();
final List<Pair<String, String>> metricPairs = new ArrayList<>();
Expand Down Expand Up @@ -98,6 +99,10 @@ void setLimit(String limit) {
limitValue = limit;
}

void setOffset(String offset) {
this.offsetValue = offset;
}

void visitChild(int ordinal, RelNode input) {
assert ordinal == 0;
((SolrRel) input).implement(this);
Expand Down
6 changes: 5 additions & 1 deletion solr/core/src/java/org/apache/solr/handler/sql/SolrSort.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ public void implement(Implementor implementor) {
}


if(fetch != null) {
if (fetch != null) {
implementor.setLimit(((RexLiteral) fetch).getValue().toString());
}

if (offset != null && offset instanceof RexLiteral) {
implementor.setOffset(((RexLiteral) offset).getValue2().toString());
}
}
}
44 changes: 32 additions & 12 deletions solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public RelDataType getRowType(RelDataTypeFactory typeFactory) {

private Enumerable<Object> query(final Properties properties) {
return query(properties, Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), null, null, null);
Collections.emptyList(), null, null, null, null);
}

/** Executes a Solr query on the underlying table.
Expand All @@ -104,7 +104,8 @@ private Enumerable<Object> query(final Properties properties,
final List<Pair<String, String>> metricPairs,
final String limit,
final String negativeQuery,
final String havingPredicate) {
final String havingPredicate,
final String offset) {
// SolrParams should be a ModifiableParams instead of a map
boolean mapReduce = "map_reduce".equals(properties.getProperty("aggregationMode"));
boolean negative = Boolean.parseBoolean(negativeQuery);
Expand All @@ -125,7 +126,7 @@ private Enumerable<Object> query(final Properties properties,
String zk = properties.getProperty("zk");
try {
if (metricPairs.isEmpty() && buckets.isEmpty()) {
tupleStream = handleSelect(zk, collection, q, fields, orders, limit);
tupleStream = handleSelect(zk, collection, q, fields, orders, limit, offset);
} else {
if(buckets.isEmpty()) {
tupleStream = handleStats(zk, collection, q, metricPairs, fields);
Expand Down Expand Up @@ -256,7 +257,8 @@ private TupleStream handleSelect(String zk,
String query,
List<Map.Entry<String, Class<?>>> fields,
List<Pair<String, String>> orders,
String limit) throws IOException {
String limit,
String offset) throws IOException {

ModifiableSolrParams params = new ModifiableSolrParams();
params.add(CommonParams.Q, query);
Expand All @@ -275,25 +277,43 @@ private TupleStream handleSelect(String zk,

String fl = getFields(fields);

if(orders.size() > 0) {
if (!orders.isEmpty()) {
params.add(SORT, getSort(orders));
} else {
if(limit == null) {
if (limit == null) {
params.add(SORT, "_version_ desc");
fl = fl+",_version_";
} else {
params.add(SORT, "score desc");
if(fl.indexOf("score") == -1) {
if (!fl.contains("score")) {
fl = fl + ",score";
}
}
}

params.add(CommonParams.FL, fl);


if (offset != null && limit == null) {
throw new IOException("OFFSET without LIMIT not supported by Solr! Specify desired limit using 'FETCH NEXT <LIMIT> ROWS ONLY'");
}

if (limit != null) {
params.add(CommonParams.ROWS, limit);
return new LimitStream(new CloudSolrStream(zk, collection, params), Integer.parseInt(limit));
int limitInt = Integer.parseInt(limit);
// if there's an offset, then we need to fetch offset + limit rows from each shard and then sort accordingly
LimitStream limitStream;
if (offset != null) {
int offsetInt = Integer.parseInt(offset);
int rows = limitInt + offsetInt;
params.add(CommonParams.START, "0"); // tricky ... we need all rows up to limit + offset
params.add(CommonParams.ROWS, String.valueOf(rows));
// re-sort all the streams back from the shards
StreamComparator streamSorter = new MultipleFieldComparator(getComps(orders));
limitStream = new LimitStream(new SortStream(new CloudSolrStream(zk, collection, params), streamSorter), limitInt, offsetInt);
} else {
params.add(CommonParams.ROWS, limit);
limitStream = new LimitStream(new CloudSolrStream(zk, collection, params), limitInt);
}
return limitStream;
} else {
params.add(CommonParams.QT, "/export");
return new CloudSolrStream(zk, collection, params);
Expand Down Expand Up @@ -856,8 +876,8 @@ private Properties getProperties() {
*/
@SuppressWarnings({"UnusedDeclaration"})
public Enumerable<Object> query(List<Map.Entry<String, Class<?>>> fields, String query, List<Pair<String, String>> order,
List<String> buckets, List<Pair<String, String>> metricPairs, String limit, String negativeQuery, String havingPredicate) {
return getTable().query(getProperties(), fields, query, order, buckets, metricPairs, limit, negativeQuery, havingPredicate);
List<String> buckets, List<Pair<String, String>> metricPairs, String limit, String negativeQuery, String havingPredicate, String offset) {
return getTable().query(getProperties(), fields, query, order, buckets, metricPairs, limit, negativeQuery, havingPredicate, offset);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ public int size() {
final Expression limit = list.append("limit", Expressions.constant(solrImplementor.limitValue));
final Expression negativeQuery = list.append("negativeQuery", Expressions.constant(Boolean.toString(solrImplementor.negativeQuery), String.class));
final Expression havingPredicate = list.append("havingTest", Expressions.constant(solrImplementor.havingPredicate, String.class));
final Expression offset = list.append("offset", Expressions.constant(solrImplementor.offsetValue));
Expression enumerable = list.append("enumerable", Expressions.call(table, SolrMethod.SOLR_QUERYABLE_QUERY.method,
fields, query, orders, buckets, metricPairs, limit, negativeQuery, havingPredicate));
fields, query, orders, buckets, metricPairs, limit, negativeQuery, havingPredicate, offset));
Hook.QUERY_PLAN.run(query);
list.add(Expressions.return_(null, enumerable));
return implementor.result(physType, list.toBlock());
Expand Down
2 changes: 1 addition & 1 deletion solr/core/src/test-files/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
<Logger name="org.apache.directory" level="WARN"/>
<Logger name="org.apache.solr.hadoop" level="INFO"/>
<Logger name="org.eclipse.jetty" level="INFO"/>
<Logger name="org.apache.calcite" level="DEBUG"/>
<Logger name="org.apache.calcite" level="INFO"/>

<Root level="INFO">
<AppenderRef ref="STDERR"/>
Expand Down
84 changes: 70 additions & 14 deletions solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package org.apache.solr.handler;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;

Expand Down Expand Up @@ -1854,21 +1854,18 @@ public void testParallelTimeSeriesGrouping() throws Exception {
}

protected List<Tuple> getTuples(final SolrParams params, String baseUrl) throws IOException {
//log.info("Tuples from params: {}", params);
TupleStream tupleStream = new SolrStream(baseUrl, params);

tupleStream.open();
List<Tuple> tuples = new ArrayList<>();
for (; ; ) {
Tuple t = tupleStream.read();
//log.info(" ... {}", t.fields);
if (t.EOF) {
break;
} else {
tuples.add(t);
List<Tuple> tuples = new LinkedList<>();
try (TupleStream tupleStream = new SolrStream(baseUrl, params)) {
tupleStream.open();
for (; ; ) {
Tuple t = tupleStream.read();
if (t.EOF) {
break;
} else {
tuples.add(t);
}
}
}
tupleStream.close();
return tuples;
}

Expand Down Expand Up @@ -2122,4 +2119,63 @@ private String min(String type) {
private String max(String type) {
return String.format(Locale.ROOT, "max(%s) as max_%s", type, type);
}

@Test
public void testOffsetAndFetch() throws Exception {
new UpdateRequest()
.add("id", "01")
.add("id", "02")
.add("id", "03")
.add("id", "04")
.add("id", "05")
.add("id", "06")
.add("id", "07")
.add("id", "08")
.add("id", "09")
.add("id", "10")
.add("id", "11")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);

final int numDocs = 11;

List<Tuple> results = expectResults("SELECT id FROM $ALIAS ORDER BY id DESC OFFSET 0 FETCH NEXT 5 ROWS ONLY", 5);
assertEquals("11", results.get(0).getString("id"));
assertEquals("10", results.get(1).getString("id"));
assertEquals("09", results.get(2).getString("id"));
assertEquals("08", results.get(3).getString("id"));
assertEquals("07", results.get(4).getString("id"));

// no explicit offset, but defaults to 0 if using FETCH!
results = expectResults("SELECT id FROM $ALIAS ORDER BY id DESC FETCH NEXT 5 ROWS ONLY", 5);
assertEquals("11", results.get(0).getString("id"));
assertEquals("10", results.get(1).getString("id"));
assertEquals("09", results.get(2).getString("id"));
assertEquals("08", results.get(3).getString("id"));
assertEquals("07", results.get(4).getString("id"));

results = expectResults("SELECT id FROM $ALIAS ORDER BY id DESC OFFSET 5 FETCH NEXT 5 ROWS ONLY", 5);
assertEquals("06", results.get(0).getString("id"));
assertEquals("05", results.get(1).getString("id"));
assertEquals("04", results.get(2).getString("id"));
assertEquals("03", results.get(3).getString("id"));
assertEquals("02", results.get(4).getString("id"));

results = expectResults("SELECT id FROM $ALIAS ORDER BY id DESC OFFSET 10 FETCH NEXT 5 ROWS ONLY", 1);
assertEquals("01", results.get(0).getString("id"));

expectResults("SELECT id FROM $ALIAS ORDER BY id DESC LIMIT "+numDocs, numDocs);

for (int i=0; i < numDocs; i++) {
results = expectResults("SELECT id FROM $ALIAS ORDER BY id ASC OFFSET "+i+" FETCH NEXT 1 ROW ONLY", 1);
String id = results.get(0).getString("id");
if (id.startsWith("0")) id = id.substring(1);
assertEquals(i+1, Integer.parseInt(id));
}

// just past the end of the results
expectResults("SELECT id FROM $ALIAS ORDER BY id DESC OFFSET "+numDocs+" FETCH NEXT 5 ROWS ONLY", 0);

// Solr doesn't support OFFSET w/o LIMIT
expectThrows(IOException.class, () -> expectResults("SELECT id FROM $ALIAS ORDER BY id DESC OFFSET 5", 5));
}
}
14 changes: 14 additions & 0 deletions solr/solr-ref-guide/src/parallel-sql-interface.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,20 @@ If the `ORDER BY` clause contains the exact fields in the `GROUP BY` clause, the

Order by fields are case sensitive.

==== OFFSET with FETCH

Queries that specify an `ORDER BY` clause may also use the `OFFSET` (0-based index) and `FETCH` operators to page through results; `OFFSET` without `FETCH` is not supported and generates an exception.
For example, the following query requests the second page of 10 results:
[source,sql]
----
ORDER BY ... OFFSET 10 FETCH NEXT 10 ROWS ONLY
----
Paging with SQL suffers the same performance penalty of paging in Solr queries using `start` and `rows` where the distributed query must
over-fetch `OFFSET` + `LIMIT` documents from each shard and then sort the results from each shard to generate the page of results returned to the client.
Consequently, this feature should only be used for small OFFSET / FETCH sizes, such as paging up to 10,000 documents per shard. Solr SQL does not enforce any hard limits but the deeper you go into the results,
each subsequent page request takes longer and consumes more resources. Solr's `cursorMark` feature for deep paging is not supported in SQL; use a SQL query without a `LIMIT` to stream large result sets through the `/export` handler instead.
SQL `OFFSET` is not intended for deep-paging type use cases.

=== LIMIT Clause

Limits the result set to the specified size. In the example above the clause `LIMIT 100` will limit the result set to 100 records.
Expand Down

0 comments on commit 0907fff

Please sign in to comment.