diff --git a/docs/guide/query_scan.md b/docs/guide/query_scan.md index 11ece7f9..229297f8 100644 --- a/docs/guide/query_scan.md +++ b/docs/guide/query_scan.md @@ -514,19 +514,7 @@ worker can be a thread (in programming languages that support multithreading) or system process. To perform a parallel scan, each worker issues its own Scan request with an unique `WorkerId`. -=== "Kotlin - SDK 2.x" - - ```kotlin - Not supported - ``` - -=== "Java - SDK 2.x" - - ```java - Not supported - ``` - -=== "Kotlin - SDK 1.x" +=== "Kotlin" ```kotlin private val table: MusicTable @@ -545,7 +533,7 @@ unique `WorkerId`. } ``` -=== "Java - SDK 1.x" +=== "Java" ```java private final MusicTable table; diff --git a/samples/guides2/src/main/java/app/cash/tempest2/guides/java/QueryNScan.java b/samples/guides2/src/main/java/app/cash/tempest2/guides/java/QueryNScan.java index b0f0256f..3cf263c2 100644 --- a/samples/guides2/src/main/java/app/cash/tempest2/guides/java/QueryNScan.java +++ b/samples/guides2/src/main/java/app/cash/tempest2/guides/java/QueryNScan.java @@ -21,21 +21,28 @@ import app.cash.tempest2.Offset; import app.cash.tempest2.Page; import app.cash.tempest2.QueryConfig; +import app.cash.tempest2.ScanConfig; +import app.cash.tempest2.WorkerId; import app.cash.tempest2.musiclibrary.java.AlbumTrack; import app.cash.tempest2.musiclibrary.java.MusicTable; import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import software.amazon.awssdk.enhanced.dynamodb.Expression; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; public class QueryNScan { private final MusicTable table; + private final ExecutorService executor; - public QueryNScan(MusicTable table) { + public QueryNScan(MusicTable table, ExecutorService executor) { this.table = table; + this.executor = executor; } // Query - Key Condition - Partition Key and Entity Type. @@ -164,5 +171,25 @@ public List loadAllAlbumTracks() { } // Scan - Parallel. - // Not supported. + public List loadAllAlbumTracks2() { + Future> segment1 = executor.submit(() -> loadSegment(1)); + Future> segment2 = executor.submit(() -> loadSegment(2)); + List results = new ArrayList<>(); + try { + results.addAll(segment1.get()); + results.addAll(segment2.get()); + } catch (InterruptedException | ExecutionException e) { + throw new IllegalStateException("Failed to load tracks", e); + } + return results; + } + + private List loadSegment(int segment) { + Page page = table.albumTracks().scan( + new ScanConfig.Builder() + .workerId(new WorkerId(segment, /* totalSegments */ 2)) + .build() + ); + return page.getContents(); + } } diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/Scan.kt b/tempest2/src/main/kotlin/app/cash/tempest2/Scan.kt index 7fec51ef..81cbf723 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/Scan.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/Scan.kt @@ -22,36 +22,41 @@ interface Scannable { /** * Scans up to the [pageSize] items or a maximum of 1 MB of data. This limit applies before the * filter expression is evaluated. + * + * @param workerId identifies a tuple of `segment` and `totalSegments` in the context of parallel + * scans. */ fun scan( pageSize: Int = 100, consistentRead: Boolean = false, filterExpression: Expression? = null, - initialOffset: Offset? = null + initialOffset: Offset? = null, + workerId: WorkerId? = null, ): Page // Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`). fun scan() = scan( - ScanConfig.Builder().build(), + config = ScanConfig.Builder().build(), initialOffset = null ) fun scan(initialOffset: Offset?) = scan( - ScanConfig.Builder().build(), + config = ScanConfig.Builder().build(), initialOffset = initialOffset ) fun scan(config: ScanConfig) = scan( - config, + config = config, initialOffset = null ) fun scan(config: ScanConfig, initialOffset: Offset?) = scan( - config.pageSize, - config.consistentRead, - config.filterExpression, - initialOffset + pageSize = config.pageSize, + consistentRead = config.consistentRead, + filterExpression = config.filterExpression, + initialOffset = initialOffset, + workerId = config.workerId ) /** @@ -68,26 +73,40 @@ interface Scannable { // Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`). fun scanAll() = scanAll( - ScanConfig.Builder().build(), + config = ScanConfig.Builder().build(), initialOffset = null ) fun scanAll(initialOffset: Offset?) = scanAll( - ScanConfig.Builder().build(), + config = ScanConfig.Builder().build(), initialOffset = initialOffset ) + /** + * Executes a scan and returns a sequence of pages that contains all results, regardless of page size. + * New pages will be fetched as needed when the resulting sequence is enumerated. + * + * This method doesn't support parallel scans. `workerId`, if provided as part of `config`, will + * be ignored. + */ fun scanAll(config: ScanConfig) = scanAll( - config, + config = config, initialOffset = null ) + /** + * Executes a scan and returns a sequence of pages that contains all results, regardless of page size. + * New pages will be fetched as needed when the resulting sequence is enumerated. + * + * This method doesn't support parallel scans. `workerId`, if provided as part of `config`, will + * be ignored. + */ fun scanAll(config: ScanConfig, initialOffset: Offset?): Sequence> { return scanAll( - config.pageSize, - config.consistentRead, - config.filterExpression, - initialOffset + pageSize = config.pageSize, + consistentRead = config.consistentRead, + filterExpression = config.filterExpression, + initialOffset = initialOffset ) } @@ -105,39 +124,69 @@ interface Scannable { // Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`). fun scanAllContents() = scanAllContents( - ScanConfig.Builder().build(), + config = ScanConfig.Builder().build(), initialOffset = null ) fun scanAllContents(initialOffset: Offset?) = scanAllContents( - ScanConfig.Builder().build(), + config = ScanConfig.Builder().build(), initialOffset = initialOffset ) + /** + * Executes a scan and returns a sequence that contains all results, regardless of page size. + * New pages will be fetched as needed when the resulting sequence is enumerated. + * + * This method doesn't support parallel scans. `workerId`, if provided as part of `config`, will + * be ignored. + */ fun scanAllContents(config: ScanConfig) = scanAllContents( - config, + config = config, initialOffset = null ) + /** + * Executes a scan and returns a sequence that contains all results, regardless of page size. + * New pages will be fetched as needed when the resulting sequence is enumerated. + * + * This method doesn't support parallel scans. `workerId`, if provided as part of `config`, will + * be ignored. + */ fun scanAllContents(config: ScanConfig, initialOffset: Offset?): Sequence { return scanAllContents( - config.pageSize, - config.consistentRead, - config.filterExpression, - initialOffset + pageSize = config.pageSize, + consistentRead = config.consistentRead, + filterExpression = config.filterExpression, + initialOffset = initialOffset ) } } +/** + * In the context of parallel scans, a worker is analogous to a thread or an operating + * system process. Each worker then issues its own Scan request with a unique [WorkerId], which + * represents a tuple of `segment` and `totalSegments`. + */ +data class WorkerId( + val segment: Int, + val totalSegments: Int +) { + init { + require(segment < totalSegments) { "Expect $segment to be less than $totalSegments" } + } +} + data class ScanConfig internal constructor( val pageSize: Int, val consistentRead: Boolean, - val filterExpression: Expression? + val filterExpression: Expression?, + val workerId: WorkerId? ) { class Builder { private var pageSize = 100 private var consistentRead = false private var filterExpression: Expression? = null + private var workerId: WorkerId? = null fun pageSize(pageSize: Int) = apply { this.pageSize = pageSize } @@ -146,10 +195,13 @@ data class ScanConfig internal constructor( fun filterExpression(filterExpression: Expression) = apply { this.filterExpression = filterExpression } + fun workerId(workerId: WorkerId) = apply { this.workerId = workerId } + fun build() = ScanConfig( pageSize, consistentRead, - filterExpression + filterExpression, + workerId ) } } diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbScannable.kt b/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbScannable.kt index 2e783269..010b3f54 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbScannable.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbScannable.kt @@ -21,6 +21,7 @@ import app.cash.tempest2.AsyncScannable import app.cash.tempest2.Offset import app.cash.tempest2.Page import app.cash.tempest2.Scannable +import app.cash.tempest2.WorkerId import kotlinx.coroutines.flow.map import kotlinx.coroutines.reactive.asFlow import kotlinx.coroutines.reactive.asPublisher @@ -50,9 +51,10 @@ internal class DynamoDbScannable( pageSize: Int, consistentRead: Boolean, filterExpression: Expression?, - initialOffset: Offset? + initialOffset: Offset?, + workerId: WorkerId? ): Page { - val request = toScanRequest(consistentRead, pageSize, filterExpression, initialOffset) + val request = toScanRequest(consistentRead, pageSize, filterExpression, initialOffset, workerId) val page = if (secondaryIndexName != null) { dynamoDbTable.index(secondaryIndexName).scan(request) } else { @@ -118,7 +120,8 @@ internal class DynamoDbScannable( consistentRead: Boolean, pageSize: Int, filterExpression: Expression?, - initialOffset: Offset? + initialOffset: Offset?, + workerId: WorkerId? = null ): ScanEnhancedRequest { val scan = ScanEnhancedRequest.builder() .consistentRead(consistentRead) @@ -130,6 +133,10 @@ internal class DynamoDbScannable( if (initialOffset != null) { scan.exclusiveStartKey(initialOffset.encodeOffset()) } + if (workerId != null) { + scan.segment(workerId.segment) + scan.totalSegments(workerId.totalSegments) + } return scan.build() } diff --git a/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbScannableTest.kt b/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbScannableTest.kt index df0aad4d..4d61672a 100644 --- a/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbScannableTest.kt +++ b/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbScannableTest.kt @@ -105,6 +105,59 @@ class DynamoDbScannableTest { ) } + @Test + fun primaryIndexParallelScan() { + musicTable.givenAlbums(THE_WALL) + + val worker1Page1 = musicTable.albumTracks.scan( + workerId = WorkerId(0, 2), + pageSize = 50, + filterExpression = isTrack() + ) + assertThat(worker1Page1.hasMorePages).isFalse() + + val worker2Page1 = musicTable.albumTracks.scan( + workerId = WorkerId(1, 2), + pageSize = 50, + filterExpression = isTrack() + ) + assertThat(worker1Page1.hasMorePages).isFalse() + + assertThat(worker1Page1.trackTitles + worker2Page1.trackTitles) + .containsExactlyInAnyOrderElementsOf(THE_WALL.trackTitles) + } + + @Test + fun primaryIndexParallelScanNoFilter() { + musicTable.givenAlbums( + THE_DARK_SIDE_OF_THE_MOON, + THE_WALL, + WHAT_YOU_DO_TO_ME_SINGLE, + AFTER_HOURS_EP, + LOCKDOWN_SINGLE + ) + + val worker1Page1 = musicTable.albumInfoByArtist.scan( + workerId = WorkerId(0, 2), + pageSize = 50 + ) + assertThat(worker1Page1.hasMorePages).isFalse() + + val worker2Page1 = musicTable.albumInfoByArtist.scan( + workerId = WorkerId(1, 2), + pageSize = 50 + ) + assertThat(worker2Page1.hasMorePages).isFalse() + + assertThat(worker1Page1.albumTitles + worker2Page1.albumTitles).containsExactlyInAnyOrder( + THE_DARK_SIDE_OF_THE_MOON.album_title, + THE_WALL.album_title, + WHAT_YOU_DO_TO_ME_SINGLE.album_title, + AFTER_HOURS_EP.album_title, + LOCKDOWN_SINGLE.album_title + ) + } + @Test fun localSecondaryIndex() { musicTable.givenAlbums(THE_WALL) @@ -205,6 +258,43 @@ class DynamoDbScannableTest { ) } + @Test + fun scanAllWorkerIdShouldBeIgnored() { + musicTable.givenAlbums( + THE_DARK_SIDE_OF_THE_MOON, + THE_WALL, + WHAT_YOU_DO_TO_ME_SINGLE, + AFTER_HOURS_EP, + LOCKDOWN_SINGLE + ) + + val worker1Page1 = musicTable.albumInfoByArtist.scanAll( + ScanConfig.Builder() + .workerId(WorkerId(0, 2)) + .pageSize(50) + .build() + ).iterator().next() + assertThat(worker1Page1.hasMorePages).isFalse() + + val worker2Page1 = musicTable.albumInfoByArtist.scanAll( + ScanConfig.Builder() + .workerId(WorkerId(1, 2)) + .pageSize(50) + .build() + ).iterator().next() + assertThat(worker2Page1.hasMorePages).isFalse() + + assertThat(worker1Page1.albumTitles).containsExactlyInAnyOrderElementsOf( + worker2Page1.albumTitles) + assertThat(worker1Page1.albumTitles).containsExactlyInAnyOrder( + THE_DARK_SIDE_OF_THE_MOON.album_title, + THE_WALL.album_title, + WHAT_YOU_DO_TO_ME_SINGLE.album_title, + AFTER_HOURS_EP.album_title, + LOCKDOWN_SINGLE.album_title + ) + } + @Test fun scanAllPagination() { musicTable.givenAlbums(