Skip to content

Commit

Permalink
Add APIs to execute batch, query and load requests along with consume…
Browse files Browse the repository at this point in the history
…d capacity (#173)

* Add API to get the consumed capacity with batch requests

* Add configuration to query to request consumed capacity

* Add attributes to load items with capacity usage

* Add test for queryable interface

* Include used capacity in the ItemSet class

* Add default value to new argument to make it backwards compatible

* Add test for the newly added load with capacity method

* Add more test

* Change method name and add some more test

* Replace custom data class with a pair

* Make consumed capacity nullable

* refactor loadWithCapacity to be load() overload

---------

Co-authored-by: Tim Kye <[email protected]>
  • Loading branch information
jaischeema and kyeotic authored Mar 21, 2024
1 parent ffaf9b5 commit 7be0a9e
Show file tree
Hide file tree
Showing 20 changed files with 334 additions and 58 deletions.
4 changes: 2 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ kotlin = "1.9.23"

[libraries]
assertj = { module = "org.assertj:assertj-core", version = "3.23.1" }
aws2Dynamodb = { module = "software.amazon.awssdk:dynamodb", version = "2.17.134" }
aws2DynamodbEnhanced = { module = "software.amazon.awssdk:dynamodb-enhanced", version = "2.17.134" }
aws2Dynamodb = { module = "software.amazon.awssdk:dynamodb", version = "2.25.11" }
aws2DynamodbEnhanced = { module = "software.amazon.awssdk:dynamodb-enhanced", version = "2.25.11" }
awsDynamodb = { module = "com.amazonaws:aws-java-sdk-dynamodb", version = "1.11.960" }
awsDynamodbLocal = { module = "com.amazonaws:DynamoDBLocal", version = "1.13.5" }
clikt = { module = "com.github.ajalt:clikt", version = "2.8.0" }
Expand Down
24 changes: 15 additions & 9 deletions tempest2/src/main/kotlin/app/cash/tempest2/AsyncLogicalDb.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.reactivestreams.Publisher
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedAsyncClient
import software.amazon.awssdk.enhanced.dynamodb.extensions.annotations.DynamoDbVersionAttribute
import software.amazon.awssdk.services.dynamodb.DynamoDbClient
import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity
import java.util.concurrent.CompletableFuture
import javax.annotation.CheckReturnValue
import kotlin.reflect.KClass
Expand All @@ -49,9 +50,10 @@ interface AsyncLogicalDb : AsyncLogicalTable.Factory {
suspend fun batchLoad(
keys: KeySet,
consistentReads: Boolean = false,
maxPageSize: Int = MAX_BATCH_READ
maxPageSize: Int = MAX_BATCH_READ,
returnConsumedCapacity: ReturnConsumedCapacity = ReturnConsumedCapacity.NONE
): ItemSet =
batchLoadAsync(keys, consistentReads, maxPageSize).asFlow().reduce { acc, item ->
batchLoadAsync(keys, consistentReads, maxPageSize, returnConsumedCapacity).asFlow().reduce { acc, item ->
ItemSet(acc.getAllItems() + item.getAllItems())
}

Expand Down Expand Up @@ -159,23 +161,27 @@ interface AsyncLogicalDb : AsyncLogicalTable.Factory {
fun batchLoadAsync(
keys: KeySet,
consistentReads: Boolean,
maxPageSize: Int
maxPageSize: Int,
returnConsumedCapacity: ReturnConsumedCapacity = ReturnConsumedCapacity.NONE
): Publisher<ItemSet>

fun batchLoadAsync(
keys: KeySet,
consistentReads: Boolean
) = batchLoadAsync(keys, consistentReads, MAX_BATCH_READ)
consistentReads: Boolean,
returnConsumedCapacity: ReturnConsumedCapacity = ReturnConsumedCapacity.NONE
) = batchLoadAsync(keys, consistentReads, MAX_BATCH_READ, returnConsumedCapacity)

fun batchLoadAsync(
keys: Iterable<Any>,
consistentReads: Boolean
) = batchLoadAsync(KeySet(keys), consistentReads, MAX_BATCH_READ)
consistentReads: Boolean,
returnConsumedCapacity: ReturnConsumedCapacity = ReturnConsumedCapacity.NONE
) = batchLoadAsync(KeySet(keys), consistentReads, returnConsumedCapacity)

fun batchLoadAsync(
vararg keys: Any,
consistentReads: Boolean
) = batchLoadAsync(keys.toList(), consistentReads)
consistentReads: Boolean,
returnConsumedCapacity: ReturnConsumedCapacity = ReturnConsumedCapacity.NONE
) = batchLoadAsync(keys.toList(), consistentReads, returnConsumedCapacity)

fun batchLoadAsync(
keys: Iterable<Any>
Expand Down
22 changes: 17 additions & 5 deletions tempest2/src/main/kotlin/app/cash/tempest2/AsyncQuery.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package app.cash.tempest2
import kotlinx.coroutines.reactive.awaitFirst
import org.reactivestreams.Publisher
import software.amazon.awssdk.enhanced.dynamodb.Expression
import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity

interface AsyncQueryable<K : Any, I : Any> {

Expand All @@ -32,8 +33,17 @@ interface AsyncQueryable<K : Any, I : Any> {
pageSize: Int = 100,
consistentRead: Boolean = false,
filterExpression: Expression? = null,
initialOffset: Offset<K>? = null
): Page<K, I> = queryAsync(keyCondition, asc, pageSize, consistentRead, filterExpression, initialOffset).awaitFirst()
initialOffset: Offset<K>? = null,
returnConsumedCapacity: ReturnConsumedCapacity? = null,
): Page<K, I> = queryAsync(
keyCondition,
asc,
pageSize,
consistentRead,
filterExpression,
initialOffset,
returnConsumedCapacity
).awaitFirst()

// Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`).

Expand All @@ -43,7 +53,8 @@ interface AsyncQueryable<K : Any, I : Any> {
pageSize: Int,
consistentRead: Boolean,
filterExpression: Expression?,
initialOffset: Offset<K>?
initialOffset: Offset<K>?,
returnConsumedCapacity: ReturnConsumedCapacity?,
): Publisher<Page<K, I>>

fun queryAsync(keyCondition: KeyCondition<K>) = queryAsync(
Expand All @@ -67,13 +78,14 @@ interface AsyncQueryable<K : Any, I : Any> {
fun queryAsync(
keyCondition: KeyCondition<K>,
config: QueryConfig,
initialOffset: Offset<K>?
initialOffset: Offset<K>?,
) = queryAsync(
keyCondition,
config.asc,
config.pageSize,
config.consistentRead,
config.filterExpression,
initialOffset
initialOffset,
config.returnConsumedCapacity
)
}
14 changes: 14 additions & 0 deletions tempest2/src/main/kotlin/app/cash/tempest2/AsyncView.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import kotlinx.coroutines.future.await
import software.amazon.awssdk.enhanced.dynamodb.Expression
import software.amazon.awssdk.enhanced.dynamodb.extensions.VersionedRecordExtension
import software.amazon.awssdk.services.dynamodb.DynamoDbClient
import software.amazon.awssdk.services.dynamodb.model.ConsumedCapacity
import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity
import java.util.concurrent.CompletableFuture

interface AsyncView<K : Any, I : Any> {
Expand All @@ -29,6 +31,12 @@ interface AsyncView<K : Any, I : Any> {
*/
suspend fun load(key: K, consistentReads: Boolean = false): I? = loadAsync(key, consistentReads).await()

suspend fun load(
key: K,
consistentReads: Boolean = false,
returnConsumedCapacity: ReturnConsumedCapacity = ReturnConsumedCapacity.TOTAL
): Pair<I?, ConsumedCapacity?> = loadAsync(key, consistentReads, returnConsumedCapacity).await()

/**
* Saves an item in DynamoDB. This method uses [DynamoDbClient.putItem] to clear
* and replace all attributes, including unmodeled ones, on save. Partial update, i.e.
Expand Down Expand Up @@ -68,6 +76,12 @@ interface AsyncView<K : Any, I : Any> {

fun loadAsync(key: K, consistentReads: Boolean): CompletableFuture<I?>

fun loadAsync(
key: K,
consistentReads: Boolean,
returnConsumedCapacity: ReturnConsumedCapacity
): CompletableFuture<Pair<I?, ConsumedCapacity?>>

fun loadAsync(key: K) = loadAsync(key, false)

fun saveAsync(
Expand Down
16 changes: 9 additions & 7 deletions tempest2/src/main/kotlin/app/cash/tempest2/LogicalDb.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import app.cash.tempest2.internal.LogicalDbFactory
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClient
import software.amazon.awssdk.enhanced.dynamodb.extensions.annotations.DynamoDbVersionAttribute
import software.amazon.awssdk.services.dynamodb.DynamoDbClient
import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity
import javax.annotation.CheckReturnValue
import kotlin.reflect.KClass

Expand Down Expand Up @@ -57,27 +58,28 @@ interface LogicalDb : LogicalTable.Factory {
fun batchLoad(
keys: KeySet,
consistentReads: Boolean = false,
maxPageSize: Int = MAX_BATCH_READ
maxPageSize: Int = MAX_BATCH_READ,
returnConsumedCapacity: ReturnConsumedCapacity = ReturnConsumedCapacity.NONE
): ItemSet

fun batchLoad(
keys: Iterable<Any>,
consistentReads: Boolean = false,
maxPageSize: Int = MAX_BATCH_READ
maxPageSize: Int = MAX_BATCH_READ,
returnConsumedCapacity: ReturnConsumedCapacity = ReturnConsumedCapacity.NONE
): ItemSet {
return batchLoad(KeySet(keys), consistentReads, maxPageSize)
return batchLoad(KeySet(keys), consistentReads, maxPageSize, returnConsumedCapacity)
}

fun batchLoad(
vararg keys: Any,
consistentReads: Boolean = false,
maxPageSize: Int = MAX_BATCH_READ
maxPageSize: Int = MAX_BATCH_READ,
returnConsumedCapacity: ReturnConsumedCapacity = ReturnConsumedCapacity.NONE
): ItemSet {
return batchLoad(keys.toList(), consistentReads, maxPageSize)
return batchLoad(keys.toList(), consistentReads, maxPageSize, returnConsumedCapacity)
}



/**
* Saves and deletes the objects given using one or more calls to the
* [DynamoDbClient.batchWriteItem] API. **Callers should always check the returned
Expand Down
6 changes: 4 additions & 2 deletions tempest2/src/main/kotlin/app/cash/tempest2/Model.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package app.cash.tempest2

import software.amazon.awssdk.enhanced.dynamodb.Expression
import software.amazon.awssdk.enhanced.dynamodb.Key
import software.amazon.awssdk.services.dynamodb.model.ConsumedCapacity
import kotlin.reflect.KClass

/**
Expand Down Expand Up @@ -212,10 +213,11 @@ class KeySet private constructor(
* A collection of items across tables.
*/
class ItemSet private constructor(
private val contents: Set<Any>
private val contents: Set<Any>,
val consumedCapacity: List<ConsumedCapacity>
) : Set<Any> by contents {

constructor(contents: Iterable<Any>) : this(contents.toSet())
constructor(contents: Iterable<Any>, consumedCapacity: List<ConsumedCapacity> = emptyList()) : this(contents.toSet(), consumedCapacity)

fun <I : Any> getItems(
itemType: KClass<I>
Expand Down
5 changes: 4 additions & 1 deletion tempest2/src/main/kotlin/app/cash/tempest2/Paging.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@

package app.cash.tempest2

import software.amazon.awssdk.services.dynamodb.model.ConsumedCapacity

data class Page<K, T> internal constructor(
val contents: List<T>,
val offset: Offset<K>?
val offset: Offset<K>?,
val consumedCapacity: ConsumedCapacity?
) {
val hasMorePages: Boolean
get() = offset != null
Expand Down
23 changes: 16 additions & 7 deletions tempest2/src/main/kotlin/app/cash/tempest2/Query.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package app.cash.tempest2

import software.amazon.awssdk.enhanced.dynamodb.Expression
import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity

interface Queryable<K : Any, I : Any> {

Expand All @@ -30,7 +31,8 @@ interface Queryable<K : Any, I : Any> {
pageSize: Int = 100,
consistentRead: Boolean = false,
filterExpression: Expression? = null,
initialOffset: Offset<K>? = null
initialOffset: Offset<K>? = null,
returnConsumedCapacity: ReturnConsumedCapacity? = null,
): Page<K, I>

// Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`).
Expand All @@ -56,28 +58,31 @@ interface Queryable<K : Any, I : Any> {
fun query(
keyCondition: KeyCondition<K>,
config: QueryConfig,
initialOffset: Offset<K>?
initialOffset: Offset<K>?,
) = query(
keyCondition,
config.asc,
config.pageSize,
config.consistentRead,
config.filterExpression,
initialOffset
initialOffset,
config.returnConsumedCapacity
)
}

data class QueryConfig internal constructor(
val asc: Boolean,
val pageSize: Int,
val consistentRead: Boolean,
val filterExpression: Expression?
val filterExpression: Expression?,
val returnConsumedCapacity: ReturnConsumedCapacity?,
) {
class Builder {
private var asc = true
private var pageSize = 100
private var consistentRead = false
private var filterExpression: Expression? = null
private var returnConsumedCapacity: ReturnConsumedCapacity? = null

fun asc(asc: Boolean) = apply { this.asc = asc }

Expand All @@ -88,11 +93,15 @@ data class QueryConfig internal constructor(
fun filterExpression(filterExpression: Expression) =
apply { this.filterExpression = filterExpression }

fun returnConsumedCapacity(returnConsumedCapacity: ReturnConsumedCapacity) =
apply { this.returnConsumedCapacity = returnConsumedCapacity }

fun build() = QueryConfig(
asc,
pageSize,
consistentRead,
filterExpression
filterExpression,
returnConsumedCapacity
)
}
}
Expand All @@ -107,7 +116,7 @@ sealed class KeyCondition<K>
* - begins_with (a, substr)— true if the value of attribute a begins with a particular substring.
*/
data class BeginsWith<K>(
val prefix: K
val prefix: K,
) : KeyCondition<K>()

/**
Expand All @@ -116,5 +125,5 @@ data class BeginsWith<K>(
*/
data class Between<K>(
val startInclusive: K,
val endInclusive: K
val endInclusive: K,
) : KeyCondition<K>()
8 changes: 8 additions & 0 deletions tempest2/src/main/kotlin/app/cash/tempest2/View.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package app.cash.tempest2
import software.amazon.awssdk.enhanced.dynamodb.Expression
import software.amazon.awssdk.enhanced.dynamodb.extensions.VersionedRecordExtension
import software.amazon.awssdk.services.dynamodb.DynamoDbClient
import software.amazon.awssdk.services.dynamodb.model.ConsumedCapacity
import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity

interface View<K : Any, I : Any> {
/**
Expand All @@ -27,6 +29,12 @@ interface View<K : Any, I : Any> {
*/
fun load(key: K, consistentReads: Boolean = false): I?

fun load(
key: K,
consistentReads: Boolean = false,
returnConsumedCapacity: ReturnConsumedCapacity = ReturnConsumedCapacity.TOTAL
): Pair<I?, ConsumedCapacity?>

/**
* Saves an item in DynamoDB. This method uses [DynamoDbClient.putItem] to clear
* and replace all attributes, including unmodeled ones, on save. Partial update, i.e.
Expand Down
Loading

0 comments on commit 7be0a9e

Please sign in to comment.