From 0c53a3f5314aafa35aca5f5432974b0e3d5f59b6 Mon Sep 17 00:00:00 2001 From: Daniel Oh <57500135+danieloh0714@users.noreply.github.com> Date: Thu, 2 May 2024 17:00:37 -0500 Subject: [PATCH] add support for query all and scan all (#180) * add support for query all and scan all * add tests for queryAll and queryAllContents * update query tests * add tests for scanAll * update scan tests * use filterExpression parameter for scans * update comments * try using GSI * convert scan to GSI to stop type failure * remove gradle files --------- Co-authored-by: Tim Kye --- .../main/kotlin/app/cash/tempest2/Query.kt | 98 +++++++++++++++++++ .../src/main/kotlin/app/cash/tempest2/Scan.kt | 74 ++++++++++++++ .../tempest2/internal/DynamoDbQueryable.kt | 30 ++++++ .../tempest2/internal/DynamoDbScannable.kt | 26 +++++ .../tempest2/internal/UnsupportedQueryable.kt | 22 +++++ .../cash/tempest2/DynamoDbQueryableTest.kt | 95 ++++++++++++++++++ .../cash/tempest2/DynamoDbScannableTest.kt | 77 +++++++++++++++ 7 files changed, 422 insertions(+) diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/Query.kt b/tempest2/src/main/kotlin/app/cash/tempest2/Query.kt index d43ffdf79..68ff54085 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/Query.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/Query.kt @@ -68,6 +68,104 @@ interface Queryable { initialOffset, config.returnConsumedCapacity ) + + /** + * Executes a query and returns a sequence of pages that contains all results, regardless of page size. + * New pages will be fetched as needed when the resulting sequence is enumerated. + */ + fun queryAll( + keyCondition: KeyCondition, + asc: Boolean = true, + pageSize: Int = 100, + consistentRead: Boolean = false, + filterExpression: Expression? = null, + initialOffset: Offset? = null, + ): Sequence> + + // Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`). + + fun queryAll(keyCondition: KeyCondition) = queryAll( + keyCondition, + config = QueryConfig.Builder().build(), + initialOffset = null, + ) + + fun queryAll(keyCondition: KeyCondition, initialOffset: Offset?) = queryAll( + keyCondition, + config = QueryConfig.Builder().build(), + initialOffset = initialOffset, + ) + + fun queryAll(keyCondition: KeyCondition, config: QueryConfig): Sequence> { + return queryAll( + keyCondition, + config = config, + initialOffset = null, + ) + } + + fun queryAll( + keyCondition: KeyCondition, + config: QueryConfig, + initialOffset: Offset?, + ): Sequence> { + return queryAll( + keyCondition, + config.asc, + config.pageSize, + config.consistentRead, + config.filterExpression, + initialOffset, + ) + } + + /** + * Executes a query and returns a sequence that contains all results, regardless of page size. + * New pages will be fetched as needed when the resulting sequence is enumerated. + */ + fun queryAllContents( + keyCondition: KeyCondition, + asc: Boolean = true, + pageSize: Int = 100, + consistentRead: Boolean = false, + filterExpression: Expression? = null, + initialOffset: Offset? = null, + ): Sequence + + // Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`). + + fun queryAllContents(keyCondition: KeyCondition) = queryAllContents( + keyCondition, + config = QueryConfig.Builder().build(), + initialOffset = null, + ) + + fun queryAllContents(keyCondition: KeyCondition, initialOffset: Offset?) = queryAllContents( + keyCondition, + config = QueryConfig.Builder().build(), + initialOffset = initialOffset, + ) + + fun queryAllContents(keyCondition: KeyCondition, config: QueryConfig) = queryAllContents( + keyCondition, + config = config, + initialOffset = null, + ) + + fun queryAllContents( + keyCondition: KeyCondition, + config: QueryConfig, + initialOffset: Offset?, + ): Sequence { + return queryAllContents( + keyCondition, + config.asc, + config.pageSize, + config.consistentRead, + config.filterExpression, + initialOffset, + ) + } } data class QueryConfig internal constructor( diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/Scan.kt b/tempest2/src/main/kotlin/app/cash/tempest2/Scan.kt index 69fd2b41e..7fec51ef4 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/Scan.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/Scan.kt @@ -53,6 +53,80 @@ interface Scannable { config.filterExpression, initialOffset ) + + /** + * Executes a scan and returns a sequence of pages that contains all results, regardless of page size. + * New pages will be fetched as needed when the resulting sequence is enumerated. + */ + fun scanAll( + pageSize: Int = 100, + consistentRead: Boolean = false, + filterExpression: Expression? = null, + initialOffset: Offset? = null, + ): Sequence> + + // Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`). + + fun scanAll() = scanAll( + ScanConfig.Builder().build(), + initialOffset = null + ) + + fun scanAll(initialOffset: Offset?) = scanAll( + ScanConfig.Builder().build(), + initialOffset = initialOffset + ) + + fun scanAll(config: ScanConfig) = scanAll( + config, + initialOffset = null + ) + + fun scanAll(config: ScanConfig, initialOffset: Offset?): Sequence> { + return scanAll( + config.pageSize, + config.consistentRead, + config.filterExpression, + initialOffset + ) + } + + /** + * Executes a scan and returns a sequence that contains all results, regardless of page size. + * New pages will be fetched as needed when the resulting sequence is enumerated. + */ + fun scanAllContents( + pageSize: Int = 100, + consistentRead: Boolean = false, + filterExpression: Expression? = null, + initialOffset: Offset? = null, + ): Sequence + + // Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`). + + fun scanAllContents() = scanAllContents( + ScanConfig.Builder().build(), + initialOffset = null + ) + + fun scanAllContents(initialOffset: Offset?) = scanAllContents( + ScanConfig.Builder().build(), + initialOffset = initialOffset + ) + + fun scanAllContents(config: ScanConfig) = scanAllContents( + config, + initialOffset = null + ) + + fun scanAllContents(config: ScanConfig, initialOffset: Offset?): Sequence { + return scanAllContents( + config.pageSize, + config.consistentRead, + config.filterExpression, + initialOffset + ) + } } data class ScanConfig internal constructor( diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbQueryable.kt b/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbQueryable.kt index 98c6502bb..741715f26 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbQueryable.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbQueryable.kt @@ -72,6 +72,36 @@ internal class DynamoDbQueryable( .iterator().next() return toQueryResponse(page) } + + override fun queryAll( + keyCondition: KeyCondition, + asc: Boolean, + pageSize: Int, + consistentRead: Boolean, + filterExpression: Expression?, + initialOffset: Offset? + ): Sequence> { + return generateSequence( + query(keyCondition, asc, pageSize, consistentRead, filterExpression, initialOffset) + ) { page -> + page.offset?.let { offset -> + query(keyCondition, asc, pageSize, consistentRead, filterExpression, offset) + } + } + } + + override fun queryAllContents( + keyCondition: KeyCondition, + asc: Boolean, + pageSize: Int, + consistentRead: Boolean, + filterExpression: Expression?, + initialOffset: Offset? + ): Sequence { + return queryAll(keyCondition, asc, pageSize, consistentRead, filterExpression, initialOffset) + .map { it.contents } + .flatten() + } } fun async(dynamoDbTable: DynamoDbAsyncTable) = Async(dynamoDbTable) diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbScannable.kt b/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbScannable.kt index 1727420df..2e7832695 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbScannable.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbScannable.kt @@ -61,6 +61,32 @@ internal class DynamoDbScannable( .iterator().next() return toScanResponse(page) } + + override fun scanAll( + pageSize: Int, + consistentRead: Boolean, + filterExpression: Expression?, + initialOffset: Offset? + ): Sequence> { + return generateSequence( + scan(pageSize, consistentRead, filterExpression, initialOffset) + ) { page -> + page.offset?.let { offset -> + scan(pageSize, consistentRead, filterExpression, offset) + } + } + } + + override fun scanAllContents( + pageSize: Int, + consistentRead: Boolean, + filterExpression: Expression?, + initialOffset: Offset? + ): Sequence { + return scanAll(pageSize, consistentRead, filterExpression, initialOffset) + .map { it.contents } + .flatten() + } } fun async(dynamoDbTable: DynamoDbAsyncTable) = Async(dynamoDbTable) diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/internal/UnsupportedQueryable.kt b/tempest2/src/main/kotlin/app/cash/tempest2/internal/UnsupportedQueryable.kt index 7bf8be075..60f5be0c6 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/internal/UnsupportedQueryable.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/internal/UnsupportedQueryable.kt @@ -39,4 +39,26 @@ internal class UnsupportedQueryable( ): Page { throw UnsupportedOperationException("Require $rawType to have a range key. You can query a table or an index only if it has a composite primary key (partition key and sort key)") } + + override fun queryAllContents( + keyCondition: KeyCondition, + asc: Boolean, + pageSize: Int, + consistentRead: Boolean, + filterExpression: Expression?, + initialOffset: Offset? + ): Sequence { + TODO("Not yet implemented") + } + + override fun queryAll( + keyCondition: KeyCondition, + asc: Boolean, + pageSize: Int, + consistentRead: Boolean, + filterExpression: Expression?, + initialOffset: Offset? + ): Sequence> { + TODO("Not yet implemented") + } } diff --git a/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbQueryableTest.kt b/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbQueryableTest.kt index ae962b606..50c87dcaa 100644 --- a/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbQueryableTest.kt +++ b/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbQueryableTest.kt @@ -369,6 +369,101 @@ class DynamoDbQueryableTest { assertThat(page1.consumedCapacity?.capacityUnits()).isGreaterThan(0.0) } + @Test + fun queryAll() { + musicTable.givenAlbums(AFTER_HOURS_EP) + + val page = musicTable.albumTracks.queryAll( + keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")), + ).iterator().next() + + assertThat(page.hasMorePages).isFalse() + assertThat(page.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.slice(0..4)) + } + + @Test + fun queryAllPagination() { + musicTable.givenAlbums(AFTER_HOURS_EP) + + val itr = musicTable.albumTracks.queryAll( + keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")), + pageSize = 2 + ).iterator() + + val page1 = itr.next() + assertThat(page1.hasMorePages).isTrue() + assertThat(page1.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.slice(0..1)) + + val page2 = itr.next() + assertThat(page2.hasMorePages).isTrue() + assertThat(page2.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.slice(2..3)) + + val page3 = itr.next() + assertThat(page3.hasMorePages).isFalse() + assertThat(page3.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.slice(4..4)) + } + + @Test + fun queryAllDesc() { + musicTable.givenAlbums(AFTER_HOURS_EP) + + val page = musicTable.albumTracks.queryAll( + keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")), + asc = false + ).iterator().next() + + assertThat(page.hasMorePages).isFalse() + assertThat(page.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.reversed()) + } + + @Test + fun queryAllDescPagination() { + musicTable.givenAlbums(AFTER_HOURS_EP) + + val itr = musicTable.albumTracks.queryAll( + keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")), + asc = false, + pageSize = 2 + ).iterator() + + val page1 = itr.next() + assertThat(page1.hasMorePages).isTrue() + assertThat(page1.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.reversed().slice(0..1)) + + val page2 = itr.next() + assertThat(page2.hasMorePages).isTrue() + assertThat(page2.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.reversed().slice(2..3)) + + val page3 = itr.next() + assertThat(page3.hasMorePages).isFalse() + assertThat(page3.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.reversed().slice(4..4)) + } + + @Test + fun queryAllContents() { + musicTable.givenAlbums(AFTER_HOURS_EP) + + val sequence = musicTable.albumTracks.queryAllContents( + keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")), + ) + + assertThat(sequence.map { it.track_title } + .toList()).containsAll(AFTER_HOURS_EP.trackTitles.slice(0..4)) + } + + @Test + fun queryAllContentsDesc() { + musicTable.givenAlbums(AFTER_HOURS_EP) + + val sequence = musicTable.albumTracks.queryAllContents( + keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")), + asc = false + ) + + assertThat(sequence.map { it.track_title } + .toList()).containsAll(AFTER_HOURS_EP.trackTitles.reversed()) + } + private fun runLengthLongerThan(duration: Duration): Expression { return Expression.builder() .expression("run_length > :duration") diff --git a/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbScannableTest.kt b/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbScannableTest.kt index b56b32cfa..df0aad4dc 100644 --- a/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbScannableTest.kt +++ b/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbScannableTest.kt @@ -183,6 +183,83 @@ class DynamoDbScannableTest { ) } + @Test + fun scanAll() { + musicTable.givenAlbums( + THE_DARK_SIDE_OF_THE_MOON, + THE_WALL, + WHAT_YOU_DO_TO_ME_SINGLE, + AFTER_HOURS_EP, + LOCKDOWN_SINGLE + ) + + val page = musicTable.albumInfoByArtist.scanAll().iterator().next() + + assertThat(page.hasMorePages).isFalse() + assertThat(page.albumTitles).containsExactlyInAnyOrder( + THE_DARK_SIDE_OF_THE_MOON.album_title, + THE_WALL.album_title, + WHAT_YOU_DO_TO_ME_SINGLE.album_title, + AFTER_HOURS_EP.album_title, + LOCKDOWN_SINGLE.album_title + ) + } + + @Test + fun scanAllPagination() { + musicTable.givenAlbums( + THE_DARK_SIDE_OF_THE_MOON, + THE_WALL, + WHAT_YOU_DO_TO_ME_SINGLE, + AFTER_HOURS_EP, + LOCKDOWN_SINGLE + ) + + val itr = musicTable.albumInfoByArtist.scanAll(pageSize = 2).iterator() + val items = mutableListOf() + + val page1 = itr.next() + assertThat(page1.hasMorePages).isTrue() + items.addAll(page1.contents) + + val page2 = itr.next() + assertThat(page2.hasMorePages).isTrue() + items.addAll(page2.contents) + + val page3 = itr.next() + assertThat(page3.hasMorePages).isFalse() + items.addAll(page3.contents) + + assertThat(items.map { it.album_title }).containsExactlyInAnyOrder( + THE_DARK_SIDE_OF_THE_MOON.album_title, + THE_WALL.album_title, + WHAT_YOU_DO_TO_ME_SINGLE.album_title, + AFTER_HOURS_EP.album_title, + LOCKDOWN_SINGLE.album_title + ) + } + + @Test + fun scanAllContents() { + musicTable.givenAlbums( + THE_DARK_SIDE_OF_THE_MOON, + THE_WALL, + WHAT_YOU_DO_TO_ME_SINGLE, + AFTER_HOURS_EP, + LOCKDOWN_SINGLE + ) + + val sequence = musicTable.albumInfoByArtist.scanAllContents() + + assertThat(sequence.map { it.album_title }.toList()).containsExactlyInAnyOrder( + THE_DARK_SIDE_OF_THE_MOON.album_title, + THE_WALL.album_title, + WHAT_YOU_DO_TO_ME_SINGLE.album_title, + AFTER_HOURS_EP.album_title, + LOCKDOWN_SINGLE.album_title + ) + } + private fun releaseYearIs(year: Int): Expression { return Expression.builder() .expression("begins_with(release_date, :year)")