Skip to content

Commit

Permalink
Handle unprocessed deletes without throwing an IllegalArgumentExcepti…
Browse files Browse the repository at this point in the history
…on (#187)

* Handle unprocessed deletes correctly

* Add test

* Fix test class name typo
  • Loading branch information
szabado-faire authored Jun 6, 2024
1 parent 2a33625 commit 397e6af
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,8 @@ internal class DynamoDbLogicalDb(
for (rawItem in rawClobbersItems) {
unprocessedClobbers.add(rawItem.rawItemKey().key)
}
val rawDeleteItems = results.flatMap { it.unprocessedDeleteItemsForTable(table) }
for (rawItem in rawDeleteItems) {
unprocessedDeletes.add(rawItem.rawItemKey().key)
}
val unprocessedDeletesForTable = results.flatMap { it.unprocessedDeleteItemsForTable(table) }.filterNotNull()
unprocessedDeletes.addAll(unprocessedDeletesForTable)
}

return app.cash.tempest2.BatchWriteResult(
Expand Down
98 changes: 98 additions & 0 deletions tempest2/src/test/kotlin/app/cash/tempest2/LogicalDbFailureTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2021 Square Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package app.cash.tempest2

import app.cash.tempest2.musiclibrary.AlbumTrack
import app.cash.tempest2.musiclibrary.MusicDb
import app.cash.tempest2.musiclibrary.testDb
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.RegisterExtension
import java.time.Duration
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClient
import software.amazon.awssdk.enhanced.dynamodb.model.BatchWriteItemEnhancedRequest
import software.amazon.awssdk.enhanced.dynamodb.model.BatchWriteResult
import software.amazon.awssdk.services.dynamodb.model.DeleteRequest
import software.amazon.awssdk.services.dynamodb.model.WriteRequest

class LogicalDbFailureTest {

@RegisterExtension
@JvmField
val db = testDb()

// Fake client that prevents batch deletes from succeeding
class FakeDynamoDbEnhancedClient(private val realClient: DynamoDbEnhancedClient) :
DynamoDbEnhancedClient by realClient {
override fun batchWriteItem(request: BatchWriteItemEnhancedRequest?): BatchWriteResult {
val unprocessedDeletes = request!!.writeBatches().map { writeBatch ->
writeBatch.tableName() to writeBatch.writeRequests().map { writeRequest ->
val key = writeRequest.deleteRequest().key()
WriteRequest.builder()
.deleteRequest(
DeleteRequest.builder()
.key(key)
.build()
)
.build()
}
}

val results = unprocessedDeletes.associate { (tableName, keys) ->
tableName to keys
}
return BatchWriteResult.builder()
.unprocessedRequests(results)
.build()
}
}

private val musicDb by lazy {
val enhancedClient = DynamoDbEnhancedClient.builder()
.dynamoDbClient(db.dynamoDb)
.extensions(listOf())
.build()
val fakeEnhancedClient = FakeDynamoDbEnhancedClient(enhancedClient)
LogicalDb.create(MusicDb::class, fakeEnhancedClient)
}
private val musicTable by lazy { musicDb.music }

@Test
fun `batch write handles unprocessed deletes accurately`() {
val albumTracks = listOf(
AlbumTrack("ALBUM_1", 1, "dreamin'", Duration.parse("PT3M28S")),
AlbumTrack("ALBUM_1", 2, "what you do to me", Duration.parse("PT3M24S")),
AlbumTrack("ALBUM_1", 3, "too slow", Duration.parse("PT2M27S"))
)
for (albumTrack in albumTracks) {
musicTable.albumTracks.save(albumTrack)
}

val batchWriteResult = musicDb.batchWrite(
BatchWriteSet.Builder()
.delete(albumTracks.map { it.key })
.build()
)

assertThat(batchWriteResult.unprocessedDeletes.map { it.partitionKeyValue().s() to it.sortKeyValue().get().s() })
.containsExactlyInAnyOrderElementsOf(
albumTracks.map {
it.album_token to "TRACK_${it.track_token}"
}
)
}
}

0 comments on commit 397e6af

Please sign in to comment.