From 7be0a9e903145eb13dc341f43eaaf48f8e47b0ee Mon Sep 17 00:00:00 2001 From: Jais Cheema Date: Fri, 22 Mar 2024 09:18:35 +1100 Subject: [PATCH] Add APIs to execute batch, query and load requests along with consumed capacity (#173) * Add API to get the consumed capacity with batch requests * Add configuration to query to request consumed capacity * Add attributes to load items with capacity usage * Add test for queryable interface * Include used capacity in the ItemSet class * Add default value to new argument to make it backwards compatible * Add test for the newly added load with capacity method * Add more test * Change method name and add some more test * Replace custom data class with a pair * Make consumed capacity nullable * refactor loadWithCapacity to be load() overload --------- Co-authored-by: Tim Kye --- gradle/libs.versions.toml | 4 +- .../app/cash/tempest2/AsyncLogicalDb.kt | 24 +++++++---- .../kotlin/app/cash/tempest2/AsyncQuery.kt | 22 +++++++--- .../kotlin/app/cash/tempest2/AsyncView.kt | 14 +++++++ .../kotlin/app/cash/tempest2/LogicalDb.kt | 16 ++++---- .../main/kotlin/app/cash/tempest2/Model.kt | 6 ++- .../main/kotlin/app/cash/tempest2/Paging.kt | 5 ++- .../main/kotlin/app/cash/tempest2/Query.kt | 23 +++++++---- .../src/main/kotlin/app/cash/tempest2/View.kt | 8 ++++ .../tempest2/internal/DynamoDbLogicalDb.kt | 36 ++++++++++++---- .../tempest2/internal/DynamoDbQueryable.kt | 16 +++++--- .../tempest2/internal/DynamoDbScannable.kt | 2 +- .../cash/tempest2/internal/DynamoDbView.kt | 41 +++++++++++++++---- .../internal/UnsupportedAsyncQueryable.kt | 4 +- .../tempest2/internal/UnsupportedQueryable.kt | 4 +- .../tempest2/DynamoDbAsyncQueryableTest.kt | 40 ++++++++++++++++++ .../cash/tempest2/DynamoDbAsyncViewTest.kt | 28 +++++++++++++ .../cash/tempest2/DynamoDbQueryableTest.kt | 35 ++++++++++++++++ .../app/cash/tempest2/DynamoDbViewTest.kt | 28 +++++++++++++ .../app/cash/tempest2/LogicalDbBatchTest.kt | 36 ++++++++++++++++ 20 files changed, 334 insertions(+), 58 deletions(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index f64bbb067..c8eef9938 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -4,8 +4,8 @@ kotlin = "1.9.23" [libraries] assertj = { module = "org.assertj:assertj-core", version = "3.23.1" } -aws2Dynamodb = { module = "software.amazon.awssdk:dynamodb", version = "2.17.134" } -aws2DynamodbEnhanced = { module = "software.amazon.awssdk:dynamodb-enhanced", version = "2.17.134" } +aws2Dynamodb = { module = "software.amazon.awssdk:dynamodb", version = "2.25.11" } +aws2DynamodbEnhanced = { module = "software.amazon.awssdk:dynamodb-enhanced", version = "2.25.11" } awsDynamodb = { module = "com.amazonaws:aws-java-sdk-dynamodb", version = "1.11.960" } awsDynamodbLocal = { module = "com.amazonaws:DynamoDBLocal", version = "1.13.5" } clikt = { module = "com.github.ajalt:clikt", version = "2.8.0" } diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/AsyncLogicalDb.kt b/tempest2/src/main/kotlin/app/cash/tempest2/AsyncLogicalDb.kt index ec447625b..e5a8c7f2d 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/AsyncLogicalDb.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/AsyncLogicalDb.kt @@ -24,6 +24,7 @@ import org.reactivestreams.Publisher import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedAsyncClient import software.amazon.awssdk.enhanced.dynamodb.extensions.annotations.DynamoDbVersionAttribute import software.amazon.awssdk.services.dynamodb.DynamoDbClient +import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity import java.util.concurrent.CompletableFuture import javax.annotation.CheckReturnValue import kotlin.reflect.KClass @@ -49,9 +50,10 @@ interface AsyncLogicalDb : AsyncLogicalTable.Factory { suspend fun batchLoad( keys: KeySet, consistentReads: Boolean = false, - maxPageSize: Int = MAX_BATCH_READ + maxPageSize: Int = MAX_BATCH_READ, + returnConsumedCapacity: ReturnConsumedCapacity = ReturnConsumedCapacity.NONE ): ItemSet = - batchLoadAsync(keys, consistentReads, maxPageSize).asFlow().reduce { acc, item -> + batchLoadAsync(keys, consistentReads, maxPageSize, returnConsumedCapacity).asFlow().reduce { acc, item -> ItemSet(acc.getAllItems() + item.getAllItems()) } @@ -159,23 +161,27 @@ interface AsyncLogicalDb : AsyncLogicalTable.Factory { fun batchLoadAsync( keys: KeySet, consistentReads: Boolean, - maxPageSize: Int + maxPageSize: Int, + returnConsumedCapacity: ReturnConsumedCapacity = ReturnConsumedCapacity.NONE ): Publisher fun batchLoadAsync( keys: KeySet, - consistentReads: Boolean - ) = batchLoadAsync(keys, consistentReads, MAX_BATCH_READ) + consistentReads: Boolean, + returnConsumedCapacity: ReturnConsumedCapacity = ReturnConsumedCapacity.NONE + ) = batchLoadAsync(keys, consistentReads, MAX_BATCH_READ, returnConsumedCapacity) fun batchLoadAsync( keys: Iterable, - consistentReads: Boolean - ) = batchLoadAsync(KeySet(keys), consistentReads, MAX_BATCH_READ) + consistentReads: Boolean, + returnConsumedCapacity: ReturnConsumedCapacity = ReturnConsumedCapacity.NONE + ) = batchLoadAsync(KeySet(keys), consistentReads, returnConsumedCapacity) fun batchLoadAsync( vararg keys: Any, - consistentReads: Boolean - ) = batchLoadAsync(keys.toList(), consistentReads) + consistentReads: Boolean, + returnConsumedCapacity: ReturnConsumedCapacity = ReturnConsumedCapacity.NONE + ) = batchLoadAsync(keys.toList(), consistentReads, returnConsumedCapacity) fun batchLoadAsync( keys: Iterable diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/AsyncQuery.kt b/tempest2/src/main/kotlin/app/cash/tempest2/AsyncQuery.kt index 3d7cbc76c..48e391043 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/AsyncQuery.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/AsyncQuery.kt @@ -19,6 +19,7 @@ package app.cash.tempest2 import kotlinx.coroutines.reactive.awaitFirst import org.reactivestreams.Publisher import software.amazon.awssdk.enhanced.dynamodb.Expression +import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity interface AsyncQueryable { @@ -32,8 +33,17 @@ interface AsyncQueryable { pageSize: Int = 100, consistentRead: Boolean = false, filterExpression: Expression? = null, - initialOffset: Offset? = null - ): Page = queryAsync(keyCondition, asc, pageSize, consistentRead, filterExpression, initialOffset).awaitFirst() + initialOffset: Offset? = null, + returnConsumedCapacity: ReturnConsumedCapacity? = null, + ): Page = queryAsync( + keyCondition, + asc, + pageSize, + consistentRead, + filterExpression, + initialOffset, + returnConsumedCapacity + ).awaitFirst() // Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`). @@ -43,7 +53,8 @@ interface AsyncQueryable { pageSize: Int, consistentRead: Boolean, filterExpression: Expression?, - initialOffset: Offset? + initialOffset: Offset?, + returnConsumedCapacity: ReturnConsumedCapacity?, ): Publisher> fun queryAsync(keyCondition: KeyCondition) = queryAsync( @@ -67,13 +78,14 @@ interface AsyncQueryable { fun queryAsync( keyCondition: KeyCondition, config: QueryConfig, - initialOffset: Offset? + initialOffset: Offset?, ) = queryAsync( keyCondition, config.asc, config.pageSize, config.consistentRead, config.filterExpression, - initialOffset + initialOffset, + config.returnConsumedCapacity ) } diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/AsyncView.kt b/tempest2/src/main/kotlin/app/cash/tempest2/AsyncView.kt index 744b89a44..9968c8478 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/AsyncView.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/AsyncView.kt @@ -20,6 +20,8 @@ import kotlinx.coroutines.future.await import software.amazon.awssdk.enhanced.dynamodb.Expression import software.amazon.awssdk.enhanced.dynamodb.extensions.VersionedRecordExtension import software.amazon.awssdk.services.dynamodb.DynamoDbClient +import software.amazon.awssdk.services.dynamodb.model.ConsumedCapacity +import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity import java.util.concurrent.CompletableFuture interface AsyncView { @@ -29,6 +31,12 @@ interface AsyncView { */ suspend fun load(key: K, consistentReads: Boolean = false): I? = loadAsync(key, consistentReads).await() + suspend fun load( + key: K, + consistentReads: Boolean = false, + returnConsumedCapacity: ReturnConsumedCapacity = ReturnConsumedCapacity.TOTAL + ): Pair = loadAsync(key, consistentReads, returnConsumedCapacity).await() + /** * Saves an item in DynamoDB. This method uses [DynamoDbClient.putItem] to clear * and replace all attributes, including unmodeled ones, on save. Partial update, i.e. @@ -68,6 +76,12 @@ interface AsyncView { fun loadAsync(key: K, consistentReads: Boolean): CompletableFuture + fun loadAsync( + key: K, + consistentReads: Boolean, + returnConsumedCapacity: ReturnConsumedCapacity + ): CompletableFuture> + fun loadAsync(key: K) = loadAsync(key, false) fun saveAsync( diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/LogicalDb.kt b/tempest2/src/main/kotlin/app/cash/tempest2/LogicalDb.kt index ac03aedc5..39df00cc1 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/LogicalDb.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/LogicalDb.kt @@ -20,6 +20,7 @@ import app.cash.tempest2.internal.LogicalDbFactory import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClient import software.amazon.awssdk.enhanced.dynamodb.extensions.annotations.DynamoDbVersionAttribute import software.amazon.awssdk.services.dynamodb.DynamoDbClient +import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity import javax.annotation.CheckReturnValue import kotlin.reflect.KClass @@ -57,27 +58,28 @@ interface LogicalDb : LogicalTable.Factory { fun batchLoad( keys: KeySet, consistentReads: Boolean = false, - maxPageSize: Int = MAX_BATCH_READ + maxPageSize: Int = MAX_BATCH_READ, + returnConsumedCapacity: ReturnConsumedCapacity = ReturnConsumedCapacity.NONE ): ItemSet fun batchLoad( keys: Iterable, consistentReads: Boolean = false, - maxPageSize: Int = MAX_BATCH_READ + maxPageSize: Int = MAX_BATCH_READ, + returnConsumedCapacity: ReturnConsumedCapacity = ReturnConsumedCapacity.NONE ): ItemSet { - return batchLoad(KeySet(keys), consistentReads, maxPageSize) + return batchLoad(KeySet(keys), consistentReads, maxPageSize, returnConsumedCapacity) } fun batchLoad( vararg keys: Any, consistentReads: Boolean = false, - maxPageSize: Int = MAX_BATCH_READ + maxPageSize: Int = MAX_BATCH_READ, + returnConsumedCapacity: ReturnConsumedCapacity = ReturnConsumedCapacity.NONE ): ItemSet { - return batchLoad(keys.toList(), consistentReads, maxPageSize) + return batchLoad(keys.toList(), consistentReads, maxPageSize, returnConsumedCapacity) } - - /** * Saves and deletes the objects given using one or more calls to the * [DynamoDbClient.batchWriteItem] API. **Callers should always check the returned diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/Model.kt b/tempest2/src/main/kotlin/app/cash/tempest2/Model.kt index 60e8416b9..9b212e44c 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/Model.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/Model.kt @@ -18,6 +18,7 @@ package app.cash.tempest2 import software.amazon.awssdk.enhanced.dynamodb.Expression import software.amazon.awssdk.enhanced.dynamodb.Key +import software.amazon.awssdk.services.dynamodb.model.ConsumedCapacity import kotlin.reflect.KClass /** @@ -212,10 +213,11 @@ class KeySet private constructor( * A collection of items across tables. */ class ItemSet private constructor( - private val contents: Set + private val contents: Set, + val consumedCapacity: List ) : Set by contents { - constructor(contents: Iterable) : this(contents.toSet()) + constructor(contents: Iterable, consumedCapacity: List = emptyList()) : this(contents.toSet(), consumedCapacity) fun getItems( itemType: KClass diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/Paging.kt b/tempest2/src/main/kotlin/app/cash/tempest2/Paging.kt index bcbc3d2ba..8608d39d2 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/Paging.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/Paging.kt @@ -16,9 +16,12 @@ package app.cash.tempest2 +import software.amazon.awssdk.services.dynamodb.model.ConsumedCapacity + data class Page internal constructor( val contents: List, - val offset: Offset? + val offset: Offset?, + val consumedCapacity: ConsumedCapacity? ) { val hasMorePages: Boolean get() = offset != null diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/Query.kt b/tempest2/src/main/kotlin/app/cash/tempest2/Query.kt index ed87c56c9..d43ffdf79 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/Query.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/Query.kt @@ -17,6 +17,7 @@ package app.cash.tempest2 import software.amazon.awssdk.enhanced.dynamodb.Expression +import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity interface Queryable { @@ -30,7 +31,8 @@ interface Queryable { pageSize: Int = 100, consistentRead: Boolean = false, filterExpression: Expression? = null, - initialOffset: Offset? = null + initialOffset: Offset? = null, + returnConsumedCapacity: ReturnConsumedCapacity? = null, ): Page // Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`). @@ -56,14 +58,15 @@ interface Queryable { fun query( keyCondition: KeyCondition, config: QueryConfig, - initialOffset: Offset? + initialOffset: Offset?, ) = query( keyCondition, config.asc, config.pageSize, config.consistentRead, config.filterExpression, - initialOffset + initialOffset, + config.returnConsumedCapacity ) } @@ -71,13 +74,15 @@ data class QueryConfig internal constructor( val asc: Boolean, val pageSize: Int, val consistentRead: Boolean, - val filterExpression: Expression? + val filterExpression: Expression?, + val returnConsumedCapacity: ReturnConsumedCapacity?, ) { class Builder { private var asc = true private var pageSize = 100 private var consistentRead = false private var filterExpression: Expression? = null + private var returnConsumedCapacity: ReturnConsumedCapacity? = null fun asc(asc: Boolean) = apply { this.asc = asc } @@ -88,11 +93,15 @@ data class QueryConfig internal constructor( fun filterExpression(filterExpression: Expression) = apply { this.filterExpression = filterExpression } + fun returnConsumedCapacity(returnConsumedCapacity: ReturnConsumedCapacity) = + apply { this.returnConsumedCapacity = returnConsumedCapacity } + fun build() = QueryConfig( asc, pageSize, consistentRead, - filterExpression + filterExpression, + returnConsumedCapacity ) } } @@ -107,7 +116,7 @@ sealed class KeyCondition * - begins_with (a, substr)— true if the value of attribute a begins with a particular substring. */ data class BeginsWith( - val prefix: K + val prefix: K, ) : KeyCondition() /** @@ -116,5 +125,5 @@ data class BeginsWith( */ data class Between( val startInclusive: K, - val endInclusive: K + val endInclusive: K, ) : KeyCondition() diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/View.kt b/tempest2/src/main/kotlin/app/cash/tempest2/View.kt index 6592752bf..98527d38a 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/View.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/View.kt @@ -19,6 +19,8 @@ package app.cash.tempest2 import software.amazon.awssdk.enhanced.dynamodb.Expression import software.amazon.awssdk.enhanced.dynamodb.extensions.VersionedRecordExtension import software.amazon.awssdk.services.dynamodb.DynamoDbClient +import software.amazon.awssdk.services.dynamodb.model.ConsumedCapacity +import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity interface View { /** @@ -27,6 +29,12 @@ interface View { */ fun load(key: K, consistentReads: Boolean = false): I? + fun load( + key: K, + consistentReads: Boolean = false, + returnConsumedCapacity: ReturnConsumedCapacity = ReturnConsumedCapacity.TOTAL + ): Pair + /** * Saves an item in DynamoDB. This method uses [DynamoDbClient.putItem] to clear * and replace all attributes, including unmodeled ones, on save. Partial update, i.e. diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbLogicalDb.kt b/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbLogicalDb.kt index 85d50bb79..7384ef3e8 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbLogicalDb.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbLogicalDb.kt @@ -56,6 +56,8 @@ import software.amazon.awssdk.enhanced.dynamodb.model.TransactWriteItemsEnhanced import software.amazon.awssdk.enhanced.dynamodb.model.UpdateItemEnhancedRequest import software.amazon.awssdk.enhanced.dynamodb.model.WriteBatch import software.amazon.awssdk.services.dynamodb.model.AttributeValue +import software.amazon.awssdk.services.dynamodb.model.ConsumedCapacity +import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException import java.util.concurrent.CompletableFuture import kotlin.reflect.KClass @@ -89,16 +91,24 @@ internal class DynamoDbLogicalDb( override fun batchLoad( keys: KeySet, consistentReads: Boolean, - maxPageSize: Int + maxPageSize: Int, + returnConsumedCapacity: ReturnConsumedCapacity ): ItemSet { + val (requestKeys, keysByTable, batchRequests) = toBatchLoadRequests( + keys, + consistentReads, + maxPageSize, + returnConsumedCapacity + ) - val (requestKeys, keysByTable, batchRequests) = toBatchLoadRequests(keys, consistentReads, maxPageSize) val pages = batchRequests.map { dynamoDbEnhancedClient.batchGetItem(it).iterator().next() } + return toBatchLoadResponse(keysByTable, requestKeys, pages) } + override fun batchWrite( writeSet: BatchWriteSet, maxPageSize: Int @@ -136,14 +146,18 @@ internal class DynamoDbLogicalDb( override fun batchLoadAsync( keys: KeySet, consistentReads: Boolean, - maxPageSize: Int + maxPageSize: Int, + returnConsumedCapacity: ReturnConsumedCapacity ): Publisher { - val (requests, requestsByTable, batchRequests) = toBatchLoadRequests(keys, consistentReads, maxPageSize) + val (requests, requestsByTable, batchRequests) = toBatchLoadRequests( + keys, + consistentReads, + maxPageSize, + returnConsumedCapacity + ) return batchRequests - .map { request -> - dynamoDbEnhancedClient.batchGetItem(request).limit(1).asFlow() - } + .map { request -> dynamoDbEnhancedClient.batchGetItem(request).limit(1).asFlow() } .reduce { acc, item -> merge(acc, item) } .map { page -> toBatchLoadResponse(requestsByTable, requests, listOf(page)) } .asPublisher() @@ -191,7 +205,8 @@ internal class DynamoDbLogicalDb( private fun toBatchLoadRequests( keys: KeySet, consistentReads: Boolean, - maxPageSize: Int + maxPageSize: Int, + returnConsumedCapacity: ReturnConsumedCapacity? ): Triple, Map, List>, List> { val requestKeys = keys.map { LoadRequest(it.encodeAsKey().rawItemKey(), it.expectedItemType()) } val keysByTable = mutableMapOf, List>() @@ -200,6 +215,7 @@ internal class DynamoDbLogicalDb( val batchByTable = chunk.groupBy { it.tableType } keysByTable.putAll(batchByTable) BatchGetItemEnhancedRequest.builder() + .returnConsumedCapacity(returnConsumedCapacity) .readBatches( batchByTable.map { (tableType, requestsForTable) -> ReadBatch.builder(tableType.java) @@ -223,9 +239,11 @@ internal class DynamoDbLogicalDb( pages: List ): ItemSet { val results = mutableSetOf() + val consumedCapacity = mutableListOf() val tableTypes = keysByTable.keys val resultTypes = requestKeys.associate { it.key to it.resultType } for (page in pages) { + consumedCapacity.addAll(page.consumedCapacity()) for (tableType in tableTypes) { for (result in page.resultsForTable(mappedTableResource(tableType))) { val resultType = resultTypes[result.rawItemKey()]!! @@ -234,7 +252,7 @@ internal class DynamoDbLogicalDb( } } } - return ItemSet(results) + return ItemSet(results, consumedCapacity) } private fun toBatchWriteRequests( diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbQueryable.kt b/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbQueryable.kt index 648fac11d..98c6502bb 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbQueryable.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbQueryable.kt @@ -38,6 +38,7 @@ import software.amazon.awssdk.enhanced.dynamodb.internal.EnhancedClientUtils import software.amazon.awssdk.enhanced.dynamodb.model.QueryConditional import software.amazon.awssdk.enhanced.dynamodb.model.QueryEnhancedRequest import software.amazon.awssdk.services.dynamodb.model.AttributeValue +import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity internal class DynamoDbQueryable( private val secondaryIndexName: String?, @@ -59,9 +60,10 @@ internal class DynamoDbQueryable( pageSize: Int, consistentRead: Boolean, filterExpression: Expression?, - initialOffset: Offset? + initialOffset: Offset?, + returnConsumedCapacity: ReturnConsumedCapacity? ): Page { - val request = toQueryRequest(keyCondition, asc, consistentRead, pageSize, filterExpression, initialOffset) + val request = toQueryRequest(keyCondition, asc, consistentRead, pageSize, filterExpression, initialOffset, returnConsumedCapacity) val page = if (secondaryIndexName != null) { dynamoDbTable.index(secondaryIndexName).query(request) } else { @@ -77,8 +79,8 @@ internal class DynamoDbQueryable( inner class Async( private val dynamoDbTable: DynamoDbAsyncTable ) : AsyncQueryable { - override fun queryAsync(keyCondition: KeyCondition, asc: Boolean, pageSize: Int, consistentRead: Boolean, filterExpression: Expression?, initialOffset: Offset?): Publisher> { - val request = toQueryRequest(keyCondition, asc, consistentRead, pageSize, filterExpression, initialOffset) + override fun queryAsync(keyCondition: KeyCondition, asc: Boolean, pageSize: Int, consistentRead: Boolean, filterExpression: Expression?, initialOffset: Offset?, returnConsumedCapacity: ReturnConsumedCapacity?): Publisher> { + val request = toQueryRequest(keyCondition, asc, consistentRead, pageSize, filterExpression, initialOffset, returnConsumedCapacity) return if (secondaryIndexName != null) { dynamoDbTable.index(secondaryIndexName).query(request) } else { @@ -97,13 +99,15 @@ internal class DynamoDbQueryable( consistentRead: Boolean, pageSize: Int, filterExpression: Expression?, - initialOffset: Offset? + initialOffset: Offset?, + returnConsumedCapacity: ReturnConsumedCapacity? ): QueryEnhancedRequest { val query = QueryEnhancedRequest.builder() .queryConditional(toQueryConditional(keyCondition)) .scanIndexForward(asc) .consistentRead(consistentRead) .limit(pageSize) + .returnConsumedCapacity(returnConsumedCapacity) .attributesToProject(specificAttributeNames) if (filterExpression != null) { query.filterExpression(filterExpression) @@ -117,7 +121,7 @@ internal class DynamoDbQueryable( private fun toQueryResponse(page: software.amazon.awssdk.enhanced.dynamodb.model.Page): Page { val contents = page.items().map { itemCodec.toApp(it) } val offset = page.lastEvaluatedKey()?.decodeOffset() - return Page(contents, offset) + return Page(contents, offset, page.consumedCapacity()) } private fun toQueryConditional(keyCondition: KeyCondition): QueryConditional { diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbScannable.kt b/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbScannable.kt index 887aa64c3..1727420df 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbScannable.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbScannable.kt @@ -110,7 +110,7 @@ internal class DynamoDbScannable( private fun toScanResponse(page: software.amazon.awssdk.enhanced.dynamodb.model.Page): Page { val contents = page.items().map { itemCodec.toApp(it) } val offset = page.lastEvaluatedKey()?.decodeOffset() - return Page(contents, offset) + return Page(contents, offset, page.consumedCapacity()) } private fun Offset.encodeOffset(): Map { diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbView.kt b/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbView.kt index 12eaa19f5..943d2300e 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbView.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbView.kt @@ -29,6 +29,8 @@ import software.amazon.awssdk.enhanced.dynamodb.internal.EnhancedClientUtils import software.amazon.awssdk.enhanced.dynamodb.model.DeleteItemEnhancedRequest import software.amazon.awssdk.enhanced.dynamodb.model.GetItemEnhancedRequest import software.amazon.awssdk.enhanced.dynamodb.model.PutItemEnhancedRequest +import software.amazon.awssdk.services.dynamodb.model.ConsumedCapacity +import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity import java.util.concurrent.CompletableFuture internal class DynamoDbView( @@ -48,6 +50,17 @@ internal class DynamoDbView( return toLoadResponse(itemObject) } + override fun load( + key: K, + consistentReads: Boolean, + returnConsumedCapacity: ReturnConsumedCapacity + ): Pair { + val request = toLoadRequest(key, consistentReads, returnConsumedCapacity) + val response = dynamoDbTable.getItemWithResponse(request) + val item = toLoadResponse(response.attributes()) + return Pair(item, response.consumedCapacity()) + } + override fun save( item: I, saveExpression: Expression? @@ -62,7 +75,7 @@ internal class DynamoDbView( ): I? { val request = toDeleteKeyRequest(key, deleteExpression) val itemObject = dynamoDbTable.deleteItem(request) - return toDeleteResponse(itemObject) + return toItem(itemObject) } override fun delete( @@ -71,7 +84,7 @@ internal class DynamoDbView( ): I? { val request = toDeleteItemRequest(item, deleteExpression) val itemObject = dynamoDbTable.deleteItem(request) - return toDeleteResponse(itemObject) + return toItem(itemObject) } } @@ -82,7 +95,20 @@ internal class DynamoDbView( ) : AsyncView { override fun loadAsync(key: K, consistentReads: Boolean): CompletableFuture { val request = toLoadRequest(key, consistentReads) - return dynamoDbTable.getItem(request).thenApply(::toDeleteResponse) + return dynamoDbTable.getItem(request).thenApply(::toItem) + } + + override fun loadAsync( + key: K, + consistentReads: Boolean, + returnConsumedCapacity: ReturnConsumedCapacity + ): CompletableFuture> { + val request = toLoadRequest(key, consistentReads, returnConsumedCapacity) + return dynamoDbTable.getItemWithResponse(request) + .thenApply { response -> + val item = toItem(response.attributes()) + Pair(item, response.consumedCapacity()) + } } override fun saveAsync( @@ -98,7 +124,7 @@ internal class DynamoDbView( deleteExpression: Expression? ): CompletableFuture { val request = toDeleteKeyRequest(key, deleteExpression) - return dynamoDbTable.deleteItem(request).thenApply(::toDeleteResponse) + return dynamoDbTable.deleteItem(request).thenApply(::toItem) } override fun deleteAsync( @@ -106,7 +132,7 @@ internal class DynamoDbView( deleteExpression: Expression? ): CompletableFuture { val request = toDeleteItemRequest(item, deleteExpression) - return dynamoDbTable.deleteItem(request).thenApply(::toDeleteResponse) + return dynamoDbTable.deleteItem(request).thenApply(::toItem) } } @@ -117,11 +143,12 @@ internal class DynamoDbView( ) } - private fun toLoadRequest(key: K, consistentReads: Boolean): GetItemEnhancedRequest { + private fun toLoadRequest(key: K, consistentReads: Boolean, returnConsumedCapacity: ReturnConsumedCapacity? = null): GetItemEnhancedRequest { val keyObject = keyCodec.toDb(key) return GetItemEnhancedRequest.builder() .key(keyObject.key()) .consistentRead(consistentReads) + .returnConsumedCapacity(returnConsumedCapacity) .build() } @@ -151,5 +178,5 @@ internal class DynamoDbView( .build() } - private fun toDeleteResponse(itemObject: R?) = if (itemObject != null) itemCodec.toApp(itemObject) else null + private fun toItem(itemObject: R?) = if (itemObject != null) itemCodec.toApp(itemObject) else null } diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/internal/UnsupportedAsyncQueryable.kt b/tempest2/src/main/kotlin/app/cash/tempest2/internal/UnsupportedAsyncQueryable.kt index f290a1433..3a608237b 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/internal/UnsupportedAsyncQueryable.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/internal/UnsupportedAsyncQueryable.kt @@ -24,6 +24,7 @@ import kotlinx.coroutines.flow.flow import kotlinx.coroutines.reactive.asPublisher import org.reactivestreams.Publisher import software.amazon.awssdk.enhanced.dynamodb.Expression +import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity import kotlin.reflect.KClass internal class UnsupportedAsyncQueryable( @@ -35,7 +36,8 @@ internal class UnsupportedAsyncQueryable( pageSize: Int, consistentRead: Boolean, filterExpression: Expression?, - initialOffset: Offset? + initialOffset: Offset?, + returnConsumedCapacity: ReturnConsumedCapacity? ): Publisher> { return flow> { throw UnsupportedOperationException("Require $rawType to have a range key. You can query a table or an index only if it has a composite primary key (partition key and sort key)") diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/internal/UnsupportedQueryable.kt b/tempest2/src/main/kotlin/app/cash/tempest2/internal/UnsupportedQueryable.kt index bd2fd49e8..7bf8be075 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/internal/UnsupportedQueryable.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/internal/UnsupportedQueryable.kt @@ -21,6 +21,7 @@ import app.cash.tempest2.Offset import app.cash.tempest2.Page import app.cash.tempest2.Queryable import software.amazon.awssdk.enhanced.dynamodb.Expression +import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity import java.lang.UnsupportedOperationException import kotlin.reflect.KClass @@ -33,7 +34,8 @@ internal class UnsupportedQueryable( pageSize: Int, consistentRead: Boolean, filterExpression: Expression?, - initialOffset: Offset? + initialOffset: Offset?, + returnConsumedCapacity: ReturnConsumedCapacity? ): Page { throw UnsupportedOperationException("Require $rawType to have a range key. You can query a table or an index only if it has a composite primary key (partition key and sort key)") } diff --git a/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbAsyncQueryableTest.kt b/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbAsyncQueryableTest.kt index a65a249e0..0b031da54 100644 --- a/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbAsyncQueryableTest.kt +++ b/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbAsyncQueryableTest.kt @@ -41,6 +41,7 @@ import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedAsyncClient import software.amazon.awssdk.enhanced.dynamodb.Expression import software.amazon.awssdk.enhanced.dynamodb.TableSchema import software.amazon.awssdk.services.dynamodb.model.AttributeValue +import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity import java.time.Duration import java.time.LocalDate import java.util.concurrent.TimeUnit @@ -106,6 +107,7 @@ class DynamoDbAsyncQueryableTest { ) assertThat(page1.hasMorePages).isFalse() assertThat(page1.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.slice(0..0)) + assertThat(page1.consumedCapacity).isNull() val page2 = musicTable.albumTracks.query( keyCondition = Between( @@ -126,6 +128,44 @@ class DynamoDbAsyncQueryableTest { assertThat(page3.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.slice(0..2)) } + @Test + fun `returns consumed capacity with the result`() = runBlockingTest { + musicTable.givenAlbums(AFTER_HOURS_EP) + + val page1 = musicTable.albumTracks.query( + keyCondition = Between( + AlbumTrack.Key(AFTER_HOURS_EP.album_token, 1), + AlbumTrack.Key(AFTER_HOURS_EP.album_token, 1) + ), + returnConsumedCapacity = ReturnConsumedCapacity.TOTAL + ) + assertThat(page1.hasMorePages).isFalse() + assertThat(page1.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.slice(0..0)) + assertThat(page1.consumedCapacity?.capacityUnits()).isGreaterThan(0.0) + + val page2 = musicTable.albumTracks.query( + keyCondition = Between( + AlbumTrack.Key(AFTER_HOURS_EP.album_token, 2), + AlbumTrack.Key(AFTER_HOURS_EP.album_token, 3) + ), + returnConsumedCapacity = ReturnConsumedCapacity.NONE + ) + assertThat(page2.hasMorePages).isFalse() + assertThat(page2.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.slice(1..2)) + assertThat(page2.consumedCapacity).isNull() + + val page3 = musicTable.albumTracks.query( + keyCondition = Between( + AlbumTrack.Key(AFTER_HOURS_EP.album_token, 1), + AlbumTrack.Key(AFTER_HOURS_EP.album_token, 3) + ), + returnConsumedCapacity = ReturnConsumedCapacity.INDEXES + ) + assertThat(page3.hasMorePages).isFalse() + assertThat(page3.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.slice(0..2)) + assertThat(page3.consumedCapacity?.capacityUnits()).isGreaterThan(0.0) + } + @Test fun primaryIndexBeginsWith() = runBlockingTest { musicTable.givenAlbums(AFTER_HOURS_EP) diff --git a/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbAsyncViewTest.kt b/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbAsyncViewTest.kt index 33f8bc5bc..d58c6c7cd 100644 --- a/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbAsyncViewTest.kt +++ b/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbAsyncViewTest.kt @@ -29,6 +29,7 @@ import org.junit.jupiter.api.extension.RegisterExtension import software.amazon.awssdk.enhanced.dynamodb.Expression import software.amazon.awssdk.services.dynamodb.model.AttributeValue import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException +import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity import java.time.LocalDate class DynamoDbAsyncViewTest { @@ -59,6 +60,33 @@ class DynamoDbAsyncViewTest { assertThat(loadedAlbumInfo.genre_name).isEqualTo(albumInfo.genre_name) } + @Test + fun loadAfterSaveWithConsumedCapacity() = runBlockingTest { + val albumInfo = AlbumInfo( + "ALBUM_1", + "after hours - EP", + "53 Thieves", + LocalDate.of(2020, 2, 21), + "Contemporary R&B" + ) + musicTable.albumInfo.save(albumInfo) + + // Query the movies created. + val (loadedAlbumInfo, consumedCapacity) = musicTable.albumInfo.load(albumInfo.key, returnConsumedCapacity = ReturnConsumedCapacity.TOTAL) + + assertThat(loadedAlbumInfo!!.album_token).isEqualTo(albumInfo.album_token) + assertThat(loadedAlbumInfo.artist_name).isEqualTo(albumInfo.artist_name) + assertThat(loadedAlbumInfo.release_date).isEqualTo(albumInfo.release_date) + assertThat(loadedAlbumInfo.genre_name).isEqualTo(albumInfo.genre_name) + assertThat(consumedCapacity?.capacityUnits()).isGreaterThan(0.0) + + val (_, consumedCapacity2) = musicTable.albumInfo.load( + albumInfo.key, + returnConsumedCapacity = ReturnConsumedCapacity.NONE + ) + assertThat(consumedCapacity2).isNull() + } + @Test fun saveIfNotExist() = runBlockingTest { val albumInfo = AlbumInfo( diff --git a/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbQueryableTest.kt b/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbQueryableTest.kt index ab5d9fce0..ae962b606 100644 --- a/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbQueryableTest.kt +++ b/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbQueryableTest.kt @@ -39,6 +39,7 @@ import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClient import software.amazon.awssdk.enhanced.dynamodb.Expression import software.amazon.awssdk.enhanced.dynamodb.TableSchema import software.amazon.awssdk.services.dynamodb.model.AttributeValue +import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity import java.time.Duration import java.time.LocalDate @@ -334,6 +335,40 @@ class DynamoDbQueryableTest { assertThat(sparseGsiPage.contents.single().label_name).isEqualTo(SPIRIT_WORLD_FIELD_GUIDE.label) } + @Test + fun `returns consumed capacity with the response`() { + musicTable.givenAlbums(AFTER_HOURS_EP) + + val page1 = musicTable.albumTracks.query( + keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")), + pageSize = 2, + returnConsumedCapacity = ReturnConsumedCapacity.TOTAL + ) + assertThat(page1.hasMorePages).isTrue() + assertThat(page1.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.slice(0..1)) + assertThat(page1.consumedCapacity?.capacityUnits()).isGreaterThan(0.0) + + val page2 = musicTable.albumTracks.query( + keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")), + pageSize = 2, + initialOffset = page1.offset, + returnConsumedCapacity = ReturnConsumedCapacity.NONE + ) + assertThat(page2.hasMorePages).isTrue() + assertThat(page2.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.slice(2..3)) + assertThat(page2.consumedCapacity).isNull() + + val page3 = musicTable.albumTracks.query( + keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")), + pageSize = 2, + initialOffset = page2.offset, + returnConsumedCapacity = ReturnConsumedCapacity.INDEXES + ) + assertThat(page3.hasMorePages).isFalse() + assertThat(page3.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.slice(4..4)) + assertThat(page1.consumedCapacity?.capacityUnits()).isGreaterThan(0.0) + } + private fun runLengthLongerThan(duration: Duration): Expression { return Expression.builder() .expression("run_length > :duration") diff --git a/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbViewTest.kt b/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbViewTest.kt index db2102268..789dabf24 100644 --- a/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbViewTest.kt +++ b/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbViewTest.kt @@ -29,6 +29,7 @@ import org.junit.jupiter.api.extension.RegisterExtension import software.amazon.awssdk.enhanced.dynamodb.Expression import software.amazon.awssdk.services.dynamodb.model.AttributeValue import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException +import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity import java.time.LocalDate class DynamoDbViewTest { @@ -59,6 +60,33 @@ class DynamoDbViewTest { assertThat(loadedAlbumInfo.genre_name).isEqualTo(albumInfo.genre_name) } + @Test + fun loadWithCapacityAfterSave() { + val albumInfo = AlbumInfo( + "ALBUM_1", + "after hours - EP", + "53 Thieves", + LocalDate.of(2020, 2, 21), + "Contemporary R&B" + ) + musicTable.albumInfo.save(albumInfo) + + // Query the movies created. + val (loadedAlbumInfo, consumedCapacity) = musicTable.albumInfo.load(albumInfo.key, returnConsumedCapacity = ReturnConsumedCapacity.TOTAL) + + assertThat(loadedAlbumInfo!!.album_token).isEqualTo(albumInfo.album_token) + assertThat(loadedAlbumInfo.artist_name).isEqualTo(albumInfo.artist_name) + assertThat(loadedAlbumInfo.release_date).isEqualTo(albumInfo.release_date) + assertThat(loadedAlbumInfo.genre_name).isEqualTo(albumInfo.genre_name) + assertThat(consumedCapacity?.capacityUnits()).isGreaterThan(0.0) + + val (_, consumedCapacity2) = musicTable.albumInfo.load( + albumInfo.key, + returnConsumedCapacity = ReturnConsumedCapacity.NONE + ) + assertThat(consumedCapacity2).isNull() + } + @Test fun saveIfNotExist() { val albumInfo = AlbumInfo( diff --git a/tempest2/src/test/kotlin/app/cash/tempest2/LogicalDbBatchTest.kt b/tempest2/src/test/kotlin/app/cash/tempest2/LogicalDbBatchTest.kt index 2663460e0..59fa0d36a 100644 --- a/tempest2/src/test/kotlin/app/cash/tempest2/LogicalDbBatchTest.kt +++ b/tempest2/src/test/kotlin/app/cash/tempest2/LogicalDbBatchTest.kt @@ -24,6 +24,7 @@ import app.cash.tempest2.testing.logicalDb import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.RegisterExtension +import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity import java.time.Duration class LogicalDbBatchTest { @@ -162,4 +163,39 @@ class LogicalDbBatchTest { ) assertThat(items).containsExactly(t3, t1) } + + @Test + fun `batchLoad with return capacity requested`() = runBlockingTest { + val albumTracks = (1..(MAX_BATCH_READ + 5)).map { + AlbumTrack("ALBUM_1", it.toLong(), "track $it", Duration.parse("PT3M28S")) + } + for (albumTrack in albumTracks) { + musicTable.albumTracks.save(albumTrack) + } + val playlistInfo = PlaylistInfo( + playlist_token = "PLAYLIST_1", + playlist_name = "WFH Music", + playlist_tracks = listOf(AlbumTrack.Key("ALBUM_1", 1)) + ) + musicTable.playlistInfo.save(playlistInfo) + + val loadedItems = musicDb.batchLoad( + PlaylistInfo.Key("PLAYLIST_1"), + *(albumTracks.map { AlbumTrack.Key("ALBUM_1", track_number = it.track_number) }.toTypedArray()), + returnConsumedCapacity = ReturnConsumedCapacity.TOTAL + ) + assertThat(loadedItems.getItems()).containsAll(albumTracks) + assertThat(loadedItems.getItems()).containsExactly(playlistInfo) + + assertThat(loadedItems.consumedCapacity) + .extracting { it.capacityUnits() } + .contains(50.0, 3.0) + + val loadedWithoutCapacity = musicDb.batchLoad( + PlaylistInfo.Key("PLAYLIST_1"), + *(albumTracks.map { AlbumTrack.Key("ALBUM_1", track_number = it.track_number) }.toTypedArray()), + returnConsumedCapacity = ReturnConsumedCapacity.NONE + ) + assertThat(loadedWithoutCapacity.consumedCapacity).isEmpty() + } }