Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce support for parallel scans into tempest2 #190

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 2 additions & 14 deletions docs/guide/query_scan.md
Original file line number Diff line number Diff line change
Expand Up @@ -514,19 +514,7 @@ worker can be a thread (in programming languages that support multithreading) or
system process. To perform a parallel scan, each worker issues its own Scan request with an
unique `WorkerId`.

=== "Kotlin - SDK 2.x"

```kotlin
Not supported
```

=== "Java - SDK 2.x"

```java
Not supported
```

=== "Kotlin - SDK 1.x"
=== "Kotlin"
abmargb marked this conversation as resolved.
Show resolved Hide resolved

```kotlin
private val table: MusicTable
Expand All @@ -545,7 +533,7 @@ unique `WorkerId`.
}
```

=== "Java - SDK 1.x"
=== "Java"

```java
private final MusicTable table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,28 @@
import app.cash.tempest2.Offset;
import app.cash.tempest2.Page;
import app.cash.tempest2.QueryConfig;
import app.cash.tempest2.ScanConfig;
import app.cash.tempest2.WorkerId;
import app.cash.tempest2.musiclibrary.java.AlbumTrack;
import app.cash.tempest2.musiclibrary.java.MusicTable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import software.amazon.awssdk.enhanced.dynamodb.Expression;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;

public class QueryNScan {

private final MusicTable table;
private final ExecutorService executor;

public QueryNScan(MusicTable table) {
public QueryNScan(MusicTable table, ExecutorService executor) {
this.table = table;
this.executor = executor;
}

// Query - Key Condition - Partition Key and Entity Type.
Expand Down Expand Up @@ -164,5 +171,25 @@ public List<AlbumTrack> loadAllAlbumTracks() {
}

// Scan - Parallel.
// Not supported.
public List<AlbumTrack> loadAllAlbumTracks2() {
Future<List<AlbumTrack>> segment1 = executor.submit(() -> loadSegment(1));
Future<List<AlbumTrack>> segment2 = executor.submit(() -> loadSegment(2));
List<AlbumTrack> results = new ArrayList<>();
try {
results.addAll(segment1.get());
results.addAll(segment2.get());
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException("Failed to load tracks", e);
}
return results;
}

private List<AlbumTrack> loadSegment(int segment) {
Page<AlbumTrack.Key, AlbumTrack> page = table.albumTracks().scan(
new ScanConfig.Builder()
.workerId(new WorkerId(segment, /* totalSegments */ 2))
.build()
);
return page.getContents();
}
}
100 changes: 76 additions & 24 deletions tempest2/src/main/kotlin/app/cash/tempest2/Scan.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,41 @@ interface Scannable<K : Any, I : Any> {
/**
* Scans up to the [pageSize] items or a maximum of 1 MB of data. This limit applies before the
* filter expression is evaluated.
*
* @param workerId identifies a tuple of `segment` and `totalSegments` in the context of parallel
* scans.
*/
fun scan(
pageSize: Int = 100,
consistentRead: Boolean = false,
filterExpression: Expression? = null,
initialOffset: Offset<K>? = null
initialOffset: Offset<K>? = null,
workerId: WorkerId? = null,
): Page<K, I>

// Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`).

fun scan() = scan(
ScanConfig.Builder().build(),
config = ScanConfig.Builder().build(),
initialOffset = null
)

fun scan(initialOffset: Offset<K>?) = scan(
ScanConfig.Builder().build(),
config = ScanConfig.Builder().build(),
initialOffset = initialOffset
)

fun scan(config: ScanConfig) = scan(
config,
config = config,
initialOffset = null
)

fun scan(config: ScanConfig, initialOffset: Offset<K>?) = scan(
config.pageSize,
config.consistentRead,
config.filterExpression,
initialOffset
pageSize = config.pageSize,
consistentRead = config.consistentRead,
filterExpression = config.filterExpression,
initialOffset = initialOffset,
workerId = config.workerId
)

/**
Expand All @@ -68,26 +73,40 @@ interface Scannable<K : Any, I : Any> {
// Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`).

fun scanAll() = scanAll(
ScanConfig.Builder().build(),
config = ScanConfig.Builder().build(),
initialOffset = null
)

fun scanAll(initialOffset: Offset<K>?) = scanAll(
ScanConfig.Builder().build(),
config = ScanConfig.Builder().build(),
initialOffset = initialOffset
)

/**
* Executes a scan and returns a sequence of pages that contains all results, regardless of page size.
* New pages will be fetched as needed when the resulting sequence is enumerated.
*
* This method doesn't support parallel scans. `workerId`, if provided as part of `config`, will
* be ignored.
*/
fun scanAll(config: ScanConfig) = scanAll(
config,
config = config,
initialOffset = null
)

/**
* Executes a scan and returns a sequence of pages that contains all results, regardless of page size.
* New pages will be fetched as needed when the resulting sequence is enumerated.
*
* This method doesn't support parallel scans. `workerId`, if provided as part of `config`, will
* be ignored.
*/
fun scanAll(config: ScanConfig, initialOffset: Offset<K>?): Sequence<Page<K, I>> {
return scanAll(
config.pageSize,
config.consistentRead,
config.filterExpression,
initialOffset
pageSize = config.pageSize,
consistentRead = config.consistentRead,
filterExpression = config.filterExpression,
initialOffset = initialOffset
)
}

Expand All @@ -105,39 +124,69 @@ interface Scannable<K : Any, I : Any> {
// Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`).

fun scanAllContents() = scanAllContents(
ScanConfig.Builder().build(),
config = ScanConfig.Builder().build(),
initialOffset = null
)

fun scanAllContents(initialOffset: Offset<K>?) = scanAllContents(
ScanConfig.Builder().build(),
config = ScanConfig.Builder().build(),
initialOffset = initialOffset
)

/**
* Executes a scan and returns a sequence that contains all results, regardless of page size.
* New pages will be fetched as needed when the resulting sequence is enumerated.
*
* This method doesn't support parallel scans. `workerId`, if provided as part of `config`, will
* be ignored.
*/
fun scanAllContents(config: ScanConfig) = scanAllContents(
config,
config = config,
initialOffset = null
)

/**
* Executes a scan and returns a sequence that contains all results, regardless of page size.
* New pages will be fetched as needed when the resulting sequence is enumerated.
*
* This method doesn't support parallel scans. `workerId`, if provided as part of `config`, will
* be ignored.
*/
fun scanAllContents(config: ScanConfig, initialOffset: Offset<K>?): Sequence<I> {
return scanAllContents(
config.pageSize,
config.consistentRead,
config.filterExpression,
initialOffset
pageSize = config.pageSize,
consistentRead = config.consistentRead,
filterExpression = config.filterExpression,
initialOffset = initialOffset
)
}
}

/**
* In the context of parallel scans, a worker is analogous to a thread or an operating
* system process. Each worker then issues its own Scan request with a unique [WorkerId], which
* represents a tuple of `segment` and `totalSegments`.
*/
data class WorkerId(
val segment: Int,
val totalSegments: Int
) {
init {
require(segment < totalSegments) { "Expect $segment to be less than $totalSegments" }
}
}

data class ScanConfig internal constructor(
val pageSize: Int,
val consistentRead: Boolean,
val filterExpression: Expression?
val filterExpression: Expression?,
val workerId: WorkerId?
) {
class Builder {
private var pageSize = 100
private var consistentRead = false
private var filterExpression: Expression? = null
private var workerId: WorkerId? = null

fun pageSize(pageSize: Int) = apply { this.pageSize = pageSize }

Expand All @@ -146,10 +195,13 @@ data class ScanConfig internal constructor(
fun filterExpression(filterExpression: Expression) =
apply { this.filterExpression = filterExpression }

fun workerId(workerId: WorkerId) = apply { this.workerId = workerId }

fun build() = ScanConfig(
pageSize,
consistentRead,
filterExpression
filterExpression,
workerId
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import app.cash.tempest2.AsyncScannable
import app.cash.tempest2.Offset
import app.cash.tempest2.Page
import app.cash.tempest2.Scannable
import app.cash.tempest2.WorkerId
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.asPublisher
Expand Down Expand Up @@ -50,9 +51,10 @@ internal class DynamoDbScannable<K : Any, I : Any, R : Any>(
pageSize: Int,
consistentRead: Boolean,
filterExpression: Expression?,
initialOffset: Offset<K>?
initialOffset: Offset<K>?,
workerId: WorkerId?
): Page<K, I> {
val request = toScanRequest(consistentRead, pageSize, filterExpression, initialOffset)
val request = toScanRequest(consistentRead, pageSize, filterExpression, initialOffset, workerId)
val page = if (secondaryIndexName != null) {
dynamoDbTable.index(secondaryIndexName).scan(request)
} else {
Expand Down Expand Up @@ -118,7 +120,8 @@ internal class DynamoDbScannable<K : Any, I : Any, R : Any>(
consistentRead: Boolean,
pageSize: Int,
filterExpression: Expression?,
initialOffset: Offset<K>?
initialOffset: Offset<K>?,
workerId: WorkerId? = null
): ScanEnhancedRequest {
val scan = ScanEnhancedRequest.builder()
.consistentRead(consistentRead)
Expand All @@ -130,6 +133,10 @@ internal class DynamoDbScannable<K : Any, I : Any, R : Any>(
if (initialOffset != null) {
scan.exclusiveStartKey(initialOffset.encodeOffset())
}
if (workerId != null) {
scan.segment(workerId.segment)
scan.totalSegments(workerId.totalSegments)
}
return scan.build()
}

Expand Down
Loading
Loading