Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce support for parallel scans into tempest2 #190

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 2 additions & 14 deletions docs/guide/query_scan.md
Original file line number Diff line number Diff line change
Expand Up @@ -514,19 +514,7 @@ worker can be a thread (in programming languages that support multithreading) or
system process. To perform a parallel scan, each worker issues its own Scan request with an
unique `WorkerId`.

=== "Kotlin - SDK 2.x"

```kotlin
Not supported
```

=== "Java - SDK 2.x"

```java
Not supported
```

=== "Kotlin - SDK 1.x"
=== "Kotlin"
abmargb marked this conversation as resolved.
Show resolved Hide resolved

```kotlin
private val table: MusicTable
Expand All @@ -545,7 +533,7 @@ unique `WorkerId`.
}
```

=== "Java - SDK 1.x"
=== "Java"

```java
private final MusicTable table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,28 @@
import app.cash.tempest2.Offset;
import app.cash.tempest2.Page;
import app.cash.tempest2.QueryConfig;
import app.cash.tempest2.ScanConfig;
import app.cash.tempest2.WorkerId;
import app.cash.tempest2.musiclibrary.java.AlbumTrack;
import app.cash.tempest2.musiclibrary.java.MusicTable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import software.amazon.awssdk.enhanced.dynamodb.Expression;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;

public class QueryNScan {

private final MusicTable table;
private final ExecutorService executor;

public QueryNScan(MusicTable table) {
public QueryNScan(MusicTable table, ExecutorService executor) {
this.table = table;
this.executor = executor;
}

// Query - Key Condition - Partition Key and Entity Type.
Expand Down Expand Up @@ -164,5 +171,25 @@ public List<AlbumTrack> loadAllAlbumTracks() {
}

// Scan - Parallel.
// Not supported.
public List<AlbumTrack> loadAllAlbumTracks2() {
Future<List<AlbumTrack>> segment1 = executor.submit(() -> loadSegment(1));
Future<List<AlbumTrack>> segment2 = executor.submit(() -> loadSegment(2));
List<AlbumTrack> results = new ArrayList<>();
try {
results.addAll(segment1.get());
results.addAll(segment2.get());
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException("Failed to load tracks", e);
}
return results;
}

private List<AlbumTrack> loadSegment(int segment) {
Page<AlbumTrack.Key, AlbumTrack> page = table.albumTracks().scan(
new ScanConfig.Builder()
.workerId(new WorkerId(segment, /* totalSegments */ 2))
.build()
);
return page.getContents();
}
}
59 changes: 53 additions & 6 deletions tempest2/src/main/kotlin/app/cash/tempest2/Scan.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ interface Scannable<K : Any, I : Any> {
pageSize: Int = 100,
consistentRead: Boolean = false,
filterExpression: Expression? = null,
initialOffset: Offset<K>? = null
initialOffset: Offset<K>? = null,
workerId: WorkerId? = null,
): Page<K, I>

// Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`).
Expand All @@ -51,7 +52,8 @@ interface Scannable<K : Any, I : Any> {
config.pageSize,
config.consistentRead,
config.filterExpression,
initialOffset
initialOffset,
config.workerId
)

/**
Expand All @@ -63,6 +65,7 @@ interface Scannable<K : Any, I : Any> {
consistentRead: Boolean = false,
filterExpression: Expression? = null,
initialOffset: Offset<K>? = null,
workerId: WorkerId? = null,
): Sequence<Page<K, I>>

// Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`).
Expand All @@ -87,7 +90,8 @@ interface Scannable<K : Any, I : Any> {
config.pageSize,
config.consistentRead,
config.filterExpression,
initialOffset
initialOffset,
config.workerId
)
}

Expand All @@ -100,6 +104,7 @@ interface Scannable<K : Any, I : Any> {
consistentRead: Boolean = false,
filterExpression: Expression? = null,
initialOffset: Offset<K>? = null,
workerId: WorkerId? = null,
): Sequence<I>

// Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`).
Expand All @@ -124,20 +129,59 @@ interface Scannable<K : Any, I : Any> {
config.pageSize,
config.consistentRead,
config.filterExpression,
initialOffset
initialOffset,
config.workerId
)
}
}

/**
* By default, the Scan operation processes data sequentially. Amazon DynamoDB returns data to the
* application in 1 MB increments, and an application performs additional Scan operations to
* retrieve the next 1 MB of data.
*
* The larger the table or index being scanned, the more time the Scan takes to complete. In
* addition, a sequential Scan might not always be able to fully use the provisioned read throughput
* capacity: Even though DynamoDB distributes a large table's data across multiple physical
* partitions, a Scan operation can only read one partition at a time. For this reason, the
* throughput of a Scan is constrained by the maximum throughput of a single partition.
*
* To address these issues, the Scan operation can logically divide a table or secondary index into
* multiple segments, with multiple application workers scanning the segments in parallel. Each
* worker can be a thread (in programming languages that support multithreading) or an operating
* system process. To perform a parallel scan, each worker issues its own Scan request with an
kyeotic marked this conversation as resolved.
Show resolved Hide resolved
* unique [WorkerId].
*/
data class WorkerId(
/**
* A segment to be scanned by a particular worker. Each worker should use a different value for
* Segment.
*
* Segments are zero-based, so the first number is always 0.
*/
val segment: Int,
/**
* The total number of segments for the parallel scan. This value must be the same as the number
* of workers that your application will use.
*/
val totalSegments: Int
) {
init {
require(segment < totalSegments) { "Expect $segment to be less than $totalSegments" }
}
}

data class ScanConfig internal constructor(
val pageSize: Int,
val consistentRead: Boolean,
val filterExpression: Expression?
val filterExpression: Expression?,
val workerId: WorkerId?
) {
class Builder {
private var pageSize = 100
private var consistentRead = false
private var filterExpression: Expression? = null
private var workerId: WorkerId? = null

fun pageSize(pageSize: Int) = apply { this.pageSize = pageSize }

Expand All @@ -146,10 +190,13 @@ data class ScanConfig internal constructor(
fun filterExpression(filterExpression: Expression) =
apply { this.filterExpression = filterExpression }

fun workerId(workerId: WorkerId) = apply { this.workerId = workerId }

fun build() = ScanConfig(
pageSize,
consistentRead,
filterExpression
filterExpression,
workerId
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import app.cash.tempest2.AsyncScannable
import app.cash.tempest2.Offset
import app.cash.tempest2.Page
import app.cash.tempest2.Scannable
import app.cash.tempest2.WorkerId
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.asPublisher
Expand Down Expand Up @@ -50,9 +51,10 @@ internal class DynamoDbScannable<K : Any, I : Any, R : Any>(
pageSize: Int,
consistentRead: Boolean,
filterExpression: Expression?,
initialOffset: Offset<K>?
initialOffset: Offset<K>?,
workerId: WorkerId?
): Page<K, I> {
val request = toScanRequest(consistentRead, pageSize, filterExpression, initialOffset)
val request = toScanRequest(consistentRead, pageSize, filterExpression, initialOffset, workerId)
val page = if (secondaryIndexName != null) {
dynamoDbTable.index(secondaryIndexName).scan(request)
} else {
Expand All @@ -66,13 +68,14 @@ internal class DynamoDbScannable<K : Any, I : Any, R : Any>(
pageSize: Int,
consistentRead: Boolean,
filterExpression: Expression?,
initialOffset: Offset<K>?
initialOffset: Offset<K>?,
workerId: WorkerId?
kyeotic marked this conversation as resolved.
Show resolved Hide resolved
): Sequence<Page<K, I>> {
return generateSequence(
scan(pageSize, consistentRead, filterExpression, initialOffset)
scan(pageSize, consistentRead, filterExpression, initialOffset, workerId)
) { page ->
page.offset?.let { offset ->
scan(pageSize, consistentRead, filterExpression, offset)
scan(pageSize, consistentRead, filterExpression, offset, workerId)
}
}
}
Expand All @@ -81,9 +84,10 @@ internal class DynamoDbScannable<K : Any, I : Any, R : Any>(
pageSize: Int,
consistentRead: Boolean,
filterExpression: Expression?,
initialOffset: Offset<K>?
initialOffset: Offset<K>?,
workerId: WorkerId?
): Sequence<I> {
return scanAll(pageSize, consistentRead, filterExpression, initialOffset)
return scanAll(pageSize, consistentRead, filterExpression, initialOffset, workerId)
.map { it.contents }
.flatten()
}
Expand All @@ -99,7 +103,7 @@ internal class DynamoDbScannable<K : Any, I : Any, R : Any>(
pageSize: Int,
consistentRead: Boolean,
filterExpression: Expression?,
initialOffset: Offset<K>?
initialOffset: Offset<K>?,
): Publisher<Page<K, I>> {
val request = toScanRequest(consistentRead, pageSize, filterExpression, initialOffset)
return if (secondaryIndexName != null) {
Expand All @@ -118,7 +122,8 @@ internal class DynamoDbScannable<K : Any, I : Any, R : Any>(
consistentRead: Boolean,
pageSize: Int,
filterExpression: Expression?,
initialOffset: Offset<K>?
initialOffset: Offset<K>?,
workerId: WorkerId? = null
): ScanEnhancedRequest {
val scan = ScanEnhancedRequest.builder()
.consistentRead(consistentRead)
Expand All @@ -130,6 +135,10 @@ internal class DynamoDbScannable<K : Any, I : Any, R : Any>(
if (initialOffset != null) {
scan.exclusiveStartKey(initialOffset.encodeOffset())
}
if (workerId != null) {
scan.segment(workerId.segment)
scan.totalSegments(workerId.totalSegments)
}
return scan.build()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,28 @@ class DynamoDbScannableTest {
)
}

@Test
fun primaryIndexParallelScan() {
musicTable.givenAlbums(THE_WALL)

val worker1Page1 = musicTable.albumTracks.scan(
workerId = WorkerId(0, 2),
pageSize = 50,
filterExpression = isTrack()
)
assertThat(worker1Page1.hasMorePages).isFalse()

val worker2Page1 = musicTable.albumTracks.scan(
workerId = WorkerId(1, 2),
pageSize = 50,
filterExpression = isTrack()
)
assertThat(worker1Page1.hasMorePages).isFalse()

assertThat(worker1Page1.trackTitles + worker2Page1.trackTitles)
.containsExactlyInAnyOrderElementsOf(THE_WALL.trackTitles)
}

@Test
fun localSecondaryIndex() {
musicTable.givenAlbums(THE_WALL)
Expand Down Expand Up @@ -205,6 +227,73 @@ class DynamoDbScannableTest {
)
}

@Test
fun scanAllParallelScan() {
musicTable.givenAlbums(
THE_DARK_SIDE_OF_THE_MOON,
THE_WALL,
WHAT_YOU_DO_TO_ME_SINGLE,
AFTER_HOURS_EP,
LOCKDOWN_SINGLE
)

val worker1Page1 = musicTable.albumInfoByArtist.scanAll(
workerId = WorkerId(0, 2),
pageSize = 50
).iterator().next()
assertThat(worker1Page1.hasMorePages).isFalse()

val worker2Page1 = musicTable.albumInfoByArtist.scanAll(
workerId = WorkerId(1, 2),
pageSize = 50
).iterator().next()
assertThat(worker2Page1.hasMorePages).isFalse()

assertThat(worker1Page1.albumTitles + worker2Page1.albumTitles).containsExactlyInAnyOrder(
THE_DARK_SIDE_OF_THE_MOON.album_title,
THE_WALL.album_title,
WHAT_YOU_DO_TO_ME_SINGLE.album_title,
AFTER_HOURS_EP.album_title,
LOCKDOWN_SINGLE.album_title
)
}

@Test
fun scanAllParallelScanAndScanConfig() {
musicTable.givenAlbums(
THE_DARK_SIDE_OF_THE_MOON,
THE_WALL,
WHAT_YOU_DO_TO_ME_SINGLE,
AFTER_HOURS_EP,
LOCKDOWN_SINGLE
)

val worker1Page1 = musicTable.albumInfoByArtist.scanAll(
ScanConfig.Builder()
.workerId(WorkerId(0, 2))
.pageSize(50)
.build()
).iterator().next()
assertThat(worker1Page1.hasMorePages).isFalse()

val worker2Page1 = musicTable.albumInfoByArtist.scanAll(
ScanConfig.Builder()
.workerId(WorkerId(1, 2))
.pageSize(50)
.build()
).iterator().next()
assertThat(worker2Page1.hasMorePages).isFalse()

assertThat(worker1Page1.albumTitles.intersect(worker2Page1.albumTitles)).isEmpty()
assertThat(worker1Page1.albumTitles + worker2Page1.albumTitles).containsExactlyInAnyOrder(
THE_DARK_SIDE_OF_THE_MOON.album_title,
THE_WALL.album_title,
WHAT_YOU_DO_TO_ME_SINGLE.album_title,
AFTER_HOURS_EP.album_title,
LOCKDOWN_SINGLE.album_title
)
}

@Test
fun scanAllPagination() {
musicTable.givenAlbums(
Expand Down