From 0907fff65cb236897731972da49b7118a8c40a09 Mon Sep 17 00:00:00 2001 From: Timothy Potter Date: Tue, 29 Jun 2021 17:03:38 -0600 Subject: [PATCH] SOLR-15489: Implement OFFSET & FETCH for LIMIT SQL queries (#191) --- solr/CHANGES.txt | 2 + .../apache/solr/handler/sql/LimitStream.java | 23 ++++- .../apache/solr/handler/sql/SolrMethod.java | 1 + .../org/apache/solr/handler/sql/SolrRel.java | 5 ++ .../org/apache/solr/handler/sql/SolrSort.java | 6 +- .../apache/solr/handler/sql/SolrTable.java | 44 +++++++--- .../sql/SolrToEnumerableConverter.java | 3 +- solr/core/src/test-files/log4j2.xml | 2 +- .../apache/solr/handler/TestSQLHandler.java | 84 +++++++++++++++---- .../src/parallel-sql-interface.adoc | 14 ++++ 10 files changed, 153 insertions(+), 31 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 3c9269fe750a..47389a4d8b84 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -151,6 +151,8 @@ when told to. The admin UI now tells it to. (Nazerke Seidan, David Smiley) * SOLR-15456: Get field type info from luke for custom fields instead of defaulting to String in Parallel SQL (Timothy Potter) +* SOLR-15489: Implement OFFSET & FETCH for LIMIT SQL queries (Timothy Potter) + Other Changes ---------------------- * SOLR-14656: Autoscaling framework removed (Ishan Chattopadhyaya, noble, Ilan Ginzburg) diff --git a/solr/core/src/java/org/apache/solr/handler/sql/LimitStream.java b/solr/core/src/java/org/apache/solr/handler/sql/LimitStream.java index 772f639a762b..5bdf3e7aa25d 100644 --- a/solr/core/src/java/org/apache/solr/handler/sql/LimitStream.java +++ b/solr/core/src/java/org/apache/solr/handler/sql/LimitStream.java @@ -32,11 +32,18 @@ class LimitStream extends TupleStream { private final TupleStream stream; private final int limit; + private final int offset; private int count; LimitStream(TupleStream stream, int limit) { + this(stream, limit, 0); + } + + LimitStream(TupleStream stream, int limit, int offset) { this.stream = stream; this.limit = limit; + this.offset = offset > 0 ? offset : 0; + this.count = 0; } public void open() throws IOException { @@ -75,8 +82,20 @@ public Explanation toExplanation(StreamFactory factory) throws IOException { } public Tuple read() throws IOException { - ++count; - if(count > limit) { + + if (count == 0 && offset > 0) { + // skip offset # of sorted tuples (indexes 0 to offset-1) so that the first tuple returned + while (count < offset) { + ++count; // don't increment until after the compare ... + Tuple skip = stream.read(); + if (skip.EOF) { + return skip; + } + } + } + + // done once we've reached the tuple after limit + offset + if (++count > (limit + offset)) { return Tuple.EOF(); } diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrMethod.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrMethod.java index b130fdf2795e..660482217334 100644 --- a/solr/core/src/java/org/apache/solr/handler/sql/SolrMethod.java +++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrMethod.java @@ -34,6 +34,7 @@ enum SolrMethod { List.class, String.class, String.class, + String.class, String.class); public final Method method; diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrRel.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrRel.java index 370de16d886e..3467e4a49c1f 100644 --- a/solr/core/src/java/org/apache/solr/handler/sql/SolrRel.java +++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrRel.java @@ -40,6 +40,7 @@ class Implementor { String havingPredicate; boolean negativeQuery; String limitValue = null; + String offsetValue = null; final List> orders = new ArrayList<>(); final List buckets = new ArrayList<>(); final List> metricPairs = new ArrayList<>(); @@ -98,6 +99,10 @@ void setLimit(String limit) { limitValue = limit; } + void setOffset(String offset) { + this.offsetValue = offset; + } + void visitChild(int ordinal, RelNode input) { assert ordinal == 0; ((SolrRel) input).implement(this); diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrSort.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrSort.java index 1c5274a2ca6b..ca161cecd320 100644 --- a/solr/core/src/java/org/apache/solr/handler/sql/SolrSort.java +++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrSort.java @@ -72,8 +72,12 @@ public void implement(Implementor implementor) { } - if(fetch != null) { + if (fetch != null) { implementor.setLimit(((RexLiteral) fetch).getValue().toString()); } + + if (offset != null && offset instanceof RexLiteral) { + implementor.setOffset(((RexLiteral) offset).getValue2().toString()); + } } } diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java index dcc1302678b0..b3b0d5675e14 100644 --- a/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java +++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java @@ -86,7 +86,7 @@ public RelDataType getRowType(RelDataTypeFactory typeFactory) { private Enumerable query(final Properties properties) { return query(properties, Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(), - Collections.emptyList(), null, null, null); + Collections.emptyList(), null, null, null, null); } /** Executes a Solr query on the underlying table. @@ -104,7 +104,8 @@ private Enumerable query(final Properties properties, final List> metricPairs, final String limit, final String negativeQuery, - final String havingPredicate) { + final String havingPredicate, + final String offset) { // SolrParams should be a ModifiableParams instead of a map boolean mapReduce = "map_reduce".equals(properties.getProperty("aggregationMode")); boolean negative = Boolean.parseBoolean(negativeQuery); @@ -125,7 +126,7 @@ private Enumerable query(final Properties properties, String zk = properties.getProperty("zk"); try { if (metricPairs.isEmpty() && buckets.isEmpty()) { - tupleStream = handleSelect(zk, collection, q, fields, orders, limit); + tupleStream = handleSelect(zk, collection, q, fields, orders, limit, offset); } else { if(buckets.isEmpty()) { tupleStream = handleStats(zk, collection, q, metricPairs, fields); @@ -256,7 +257,8 @@ private TupleStream handleSelect(String zk, String query, List>> fields, List> orders, - String limit) throws IOException { + String limit, + String offset) throws IOException { ModifiableSolrParams params = new ModifiableSolrParams(); params.add(CommonParams.Q, query); @@ -275,25 +277,43 @@ private TupleStream handleSelect(String zk, String fl = getFields(fields); - if(orders.size() > 0) { + if (!orders.isEmpty()) { params.add(SORT, getSort(orders)); } else { - if(limit == null) { + if (limit == null) { params.add(SORT, "_version_ desc"); fl = fl+",_version_"; } else { params.add(SORT, "score desc"); - if(fl.indexOf("score") == -1) { + if (!fl.contains("score")) { fl = fl + ",score"; } } } params.add(CommonParams.FL, fl); - + + if (offset != null && limit == null) { + throw new IOException("OFFSET without LIMIT not supported by Solr! Specify desired limit using 'FETCH NEXT ROWS ONLY'"); + } + if (limit != null) { - params.add(CommonParams.ROWS, limit); - return new LimitStream(new CloudSolrStream(zk, collection, params), Integer.parseInt(limit)); + int limitInt = Integer.parseInt(limit); + // if there's an offset, then we need to fetch offset + limit rows from each shard and then sort accordingly + LimitStream limitStream; + if (offset != null) { + int offsetInt = Integer.parseInt(offset); + int rows = limitInt + offsetInt; + params.add(CommonParams.START, "0"); // tricky ... we need all rows up to limit + offset + params.add(CommonParams.ROWS, String.valueOf(rows)); + // re-sort all the streams back from the shards + StreamComparator streamSorter = new MultipleFieldComparator(getComps(orders)); + limitStream = new LimitStream(new SortStream(new CloudSolrStream(zk, collection, params), streamSorter), limitInt, offsetInt); + } else { + params.add(CommonParams.ROWS, limit); + limitStream = new LimitStream(new CloudSolrStream(zk, collection, params), limitInt); + } + return limitStream; } else { params.add(CommonParams.QT, "/export"); return new CloudSolrStream(zk, collection, params); @@ -856,8 +876,8 @@ private Properties getProperties() { */ @SuppressWarnings({"UnusedDeclaration"}) public Enumerable query(List>> fields, String query, List> order, - List buckets, List> metricPairs, String limit, String negativeQuery, String havingPredicate) { - return getTable().query(getProperties(), fields, query, order, buckets, metricPairs, limit, negativeQuery, havingPredicate); + List buckets, List> metricPairs, String limit, String negativeQuery, String havingPredicate, String offset) { + return getTable().query(getProperties(), fields, query, order, buckets, metricPairs, limit, negativeQuery, havingPredicate, offset); } } diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrToEnumerableConverter.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrToEnumerableConverter.java index 45f131ce5c05..acea8bd08077 100644 --- a/solr/core/src/java/org/apache/solr/handler/sql/SolrToEnumerableConverter.java +++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrToEnumerableConverter.java @@ -85,8 +85,9 @@ public int size() { final Expression limit = list.append("limit", Expressions.constant(solrImplementor.limitValue)); final Expression negativeQuery = list.append("negativeQuery", Expressions.constant(Boolean.toString(solrImplementor.negativeQuery), String.class)); final Expression havingPredicate = list.append("havingTest", Expressions.constant(solrImplementor.havingPredicate, String.class)); + final Expression offset = list.append("offset", Expressions.constant(solrImplementor.offsetValue)); Expression enumerable = list.append("enumerable", Expressions.call(table, SolrMethod.SOLR_QUERYABLE_QUERY.method, - fields, query, orders, buckets, metricPairs, limit, negativeQuery, havingPredicate)); + fields, query, orders, buckets, metricPairs, limit, negativeQuery, havingPredicate, offset)); Hook.QUERY_PLAN.run(query); list.add(Expressions.return_(null, enumerable)); return implementor.result(physType, list.toBlock()); diff --git a/solr/core/src/test-files/log4j2.xml b/solr/core/src/test-files/log4j2.xml index d2e2dfa41b4d..53dcae5c7480 100644 --- a/solr/core/src/test-files/log4j2.xml +++ b/solr/core/src/test-files/log4j2.xml @@ -34,7 +34,7 @@ - + diff --git a/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java b/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java index 60990c07e190..68defe739066 100644 --- a/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java +++ b/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java @@ -17,8 +17,8 @@ package org.apache.solr.handler; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; +import java.util.LinkedList; import java.util.List; import java.util.Locale; @@ -1854,21 +1854,18 @@ public void testParallelTimeSeriesGrouping() throws Exception { } protected List getTuples(final SolrParams params, String baseUrl) throws IOException { - //log.info("Tuples from params: {}", params); - TupleStream tupleStream = new SolrStream(baseUrl, params); - - tupleStream.open(); - List tuples = new ArrayList<>(); - for (; ; ) { - Tuple t = tupleStream.read(); - //log.info(" ... {}", t.fields); - if (t.EOF) { - break; - } else { - tuples.add(t); + List tuples = new LinkedList<>(); + try (TupleStream tupleStream = new SolrStream(baseUrl, params)) { + tupleStream.open(); + for (; ; ) { + Tuple t = tupleStream.read(); + if (t.EOF) { + break; + } else { + tuples.add(t); + } } } - tupleStream.close(); return tuples; } @@ -2122,4 +2119,63 @@ private String min(String type) { private String max(String type) { return String.format(Locale.ROOT, "max(%s) as max_%s", type, type); } + + @Test + public void testOffsetAndFetch() throws Exception { + new UpdateRequest() + .add("id", "01") + .add("id", "02") + .add("id", "03") + .add("id", "04") + .add("id", "05") + .add("id", "06") + .add("id", "07") + .add("id", "08") + .add("id", "09") + .add("id", "10") + .add("id", "11") + .commit(cluster.getSolrClient(), COLLECTIONORALIAS); + + final int numDocs = 11; + + List results = expectResults("SELECT id FROM $ALIAS ORDER BY id DESC OFFSET 0 FETCH NEXT 5 ROWS ONLY", 5); + assertEquals("11", results.get(0).getString("id")); + assertEquals("10", results.get(1).getString("id")); + assertEquals("09", results.get(2).getString("id")); + assertEquals("08", results.get(3).getString("id")); + assertEquals("07", results.get(4).getString("id")); + + // no explicit offset, but defaults to 0 if using FETCH! + results = expectResults("SELECT id FROM $ALIAS ORDER BY id DESC FETCH NEXT 5 ROWS ONLY", 5); + assertEquals("11", results.get(0).getString("id")); + assertEquals("10", results.get(1).getString("id")); + assertEquals("09", results.get(2).getString("id")); + assertEquals("08", results.get(3).getString("id")); + assertEquals("07", results.get(4).getString("id")); + + results = expectResults("SELECT id FROM $ALIAS ORDER BY id DESC OFFSET 5 FETCH NEXT 5 ROWS ONLY", 5); + assertEquals("06", results.get(0).getString("id")); + assertEquals("05", results.get(1).getString("id")); + assertEquals("04", results.get(2).getString("id")); + assertEquals("03", results.get(3).getString("id")); + assertEquals("02", results.get(4).getString("id")); + + results = expectResults("SELECT id FROM $ALIAS ORDER BY id DESC OFFSET 10 FETCH NEXT 5 ROWS ONLY", 1); + assertEquals("01", results.get(0).getString("id")); + + expectResults("SELECT id FROM $ALIAS ORDER BY id DESC LIMIT "+numDocs, numDocs); + + for (int i=0; i < numDocs; i++) { + results = expectResults("SELECT id FROM $ALIAS ORDER BY id ASC OFFSET "+i+" FETCH NEXT 1 ROW ONLY", 1); + String id = results.get(0).getString("id"); + if (id.startsWith("0")) id = id.substring(1); + assertEquals(i+1, Integer.parseInt(id)); + } + + // just past the end of the results + expectResults("SELECT id FROM $ALIAS ORDER BY id DESC OFFSET "+numDocs+" FETCH NEXT 5 ROWS ONLY", 0); + + // Solr doesn't support OFFSET w/o LIMIT + expectThrows(IOException.class, () -> expectResults("SELECT id FROM $ALIAS ORDER BY id DESC OFFSET 5", 5)); + } } diff --git a/solr/solr-ref-guide/src/parallel-sql-interface.adoc b/solr/solr-ref-guide/src/parallel-sql-interface.adoc index 357122158476..9947835d4c13 100644 --- a/solr/solr-ref-guide/src/parallel-sql-interface.adoc +++ b/solr/solr-ref-guide/src/parallel-sql-interface.adoc @@ -275,6 +275,20 @@ If the `ORDER BY` clause contains the exact fields in the `GROUP BY` clause, the Order by fields are case sensitive. +==== OFFSET with FETCH + +Queries that specify an `ORDER BY` clause may also use the `OFFSET` (0-based index) and `FETCH` operators to page through results; `OFFSET` without `FETCH` is not supported and generates an exception. +For example, the following query requests the second page of 10 results: +[source,sql] +---- +ORDER BY ... OFFSET 10 FETCH NEXT 10 ROWS ONLY +---- +Paging with SQL suffers the same performance penalty of paging in Solr queries using `start` and `rows` where the distributed query must +over-fetch `OFFSET` + `LIMIT` documents from each shard and then sort the results from each shard to generate the page of results returned to the client. +Consequently, this feature should only be used for small OFFSET / FETCH sizes, such as paging up to 10,000 documents per shard. Solr SQL does not enforce any hard limits but the deeper you go into the results, +each subsequent page request takes longer and consumes more resources. Solr's `cursorMark` feature for deep paging is not supported in SQL; use a SQL query without a `LIMIT` to stream large result sets through the `/export` handler instead. +SQL `OFFSET` is not intended for deep-paging type use cases. + === LIMIT Clause Limits the result set to the specified size. In the example above the clause `LIMIT 100` will limit the result set to 100 records.