Skip to content

Commit

Permalink
add support for query all and scan all (#180)
Browse files Browse the repository at this point in the history
* add support for query all and scan all

* add tests for queryAll and queryAllContents

* update query tests

* add tests for scanAll

* update scan tests

* use filterExpression parameter for scans

* update comments

* try using GSI

* convert scan to GSI to stop type failure

* remove gradle files

---------

Co-authored-by: Tim Kye <[email protected]>
  • Loading branch information
danieloh0714 and kyeotic authored May 2, 2024
1 parent 057472e commit 0c53a3f
Show file tree
Hide file tree
Showing 7 changed files with 422 additions and 0 deletions.
98 changes: 98 additions & 0 deletions tempest2/src/main/kotlin/app/cash/tempest2/Query.kt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,104 @@ interface Queryable<K : Any, I : Any> {
initialOffset,
config.returnConsumedCapacity
)

/**
* Executes a query and returns a sequence of pages that contains all results, regardless of page size.
* New pages will be fetched as needed when the resulting sequence is enumerated.
*/
fun queryAll(
keyCondition: KeyCondition<K>,
asc: Boolean = true,
pageSize: Int = 100,
consistentRead: Boolean = false,
filterExpression: Expression? = null,
initialOffset: Offset<K>? = null,
): Sequence<Page<K, I>>

// Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`).

fun queryAll(keyCondition: KeyCondition<K>) = queryAll(
keyCondition,
config = QueryConfig.Builder().build(),
initialOffset = null,
)

fun queryAll(keyCondition: KeyCondition<K>, initialOffset: Offset<K>?) = queryAll(
keyCondition,
config = QueryConfig.Builder().build(),
initialOffset = initialOffset,
)

fun queryAll(keyCondition: KeyCondition<K>, config: QueryConfig): Sequence<Page<K, I>> {
return queryAll(
keyCondition,
config = config,
initialOffset = null,
)
}

fun queryAll(
keyCondition: KeyCondition<K>,
config: QueryConfig,
initialOffset: Offset<K>?,
): Sequence<Page<K, I>> {
return queryAll(
keyCondition,
config.asc,
config.pageSize,
config.consistentRead,
config.filterExpression,
initialOffset,
)
}

/**
* Executes a query and returns a sequence that contains all results, regardless of page size.
* New pages will be fetched as needed when the resulting sequence is enumerated.
*/
fun queryAllContents(
keyCondition: KeyCondition<K>,
asc: Boolean = true,
pageSize: Int = 100,
consistentRead: Boolean = false,
filterExpression: Expression? = null,
initialOffset: Offset<K>? = null,
): Sequence<I>

// Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`).

fun queryAllContents(keyCondition: KeyCondition<K>) = queryAllContents(
keyCondition,
config = QueryConfig.Builder().build(),
initialOffset = null,
)

fun queryAllContents(keyCondition: KeyCondition<K>, initialOffset: Offset<K>?) = queryAllContents(
keyCondition,
config = QueryConfig.Builder().build(),
initialOffset = initialOffset,
)

fun queryAllContents(keyCondition: KeyCondition<K>, config: QueryConfig) = queryAllContents(
keyCondition,
config = config,
initialOffset = null,
)

fun queryAllContents(
keyCondition: KeyCondition<K>,
config: QueryConfig,
initialOffset: Offset<K>?,
): Sequence<I> {
return queryAllContents(
keyCondition,
config.asc,
config.pageSize,
config.consistentRead,
config.filterExpression,
initialOffset,
)
}
}

data class QueryConfig internal constructor(
Expand Down
74 changes: 74 additions & 0 deletions tempest2/src/main/kotlin/app/cash/tempest2/Scan.kt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,80 @@ interface Scannable<K : Any, I : Any> {
config.filterExpression,
initialOffset
)

/**
* Executes a scan and returns a sequence of pages that contains all results, regardless of page size.
* New pages will be fetched as needed when the resulting sequence is enumerated.
*/
fun scanAll(
pageSize: Int = 100,
consistentRead: Boolean = false,
filterExpression: Expression? = null,
initialOffset: Offset<K>? = null,
): Sequence<Page<K, I>>

// Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`).

fun scanAll() = scanAll(
ScanConfig.Builder().build(),
initialOffset = null
)

fun scanAll(initialOffset: Offset<K>?) = scanAll(
ScanConfig.Builder().build(),
initialOffset = initialOffset
)

fun scanAll(config: ScanConfig) = scanAll(
config,
initialOffset = null
)

fun scanAll(config: ScanConfig, initialOffset: Offset<K>?): Sequence<Page<K, I>> {
return scanAll(
config.pageSize,
config.consistentRead,
config.filterExpression,
initialOffset
)
}

/**
* Executes a scan and returns a sequence that contains all results, regardless of page size.
* New pages will be fetched as needed when the resulting sequence is enumerated.
*/
fun scanAllContents(
pageSize: Int = 100,
consistentRead: Boolean = false,
filterExpression: Expression? = null,
initialOffset: Offset<K>? = null,
): Sequence<I>

// Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`).

fun scanAllContents() = scanAllContents(
ScanConfig.Builder().build(),
initialOffset = null
)

fun scanAllContents(initialOffset: Offset<K>?) = scanAllContents(
ScanConfig.Builder().build(),
initialOffset = initialOffset
)

fun scanAllContents(config: ScanConfig) = scanAllContents(
config,
initialOffset = null
)

fun scanAllContents(config: ScanConfig, initialOffset: Offset<K>?): Sequence<I> {
return scanAllContents(
config.pageSize,
config.consistentRead,
config.filterExpression,
initialOffset
)
}
}

data class ScanConfig internal constructor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,36 @@ internal class DynamoDbQueryable<K : Any, I : Any, R : Any>(
.iterator().next()
return toQueryResponse(page)
}

override fun queryAll(
keyCondition: KeyCondition<K>,
asc: Boolean,
pageSize: Int,
consistentRead: Boolean,
filterExpression: Expression?,
initialOffset: Offset<K>?
): Sequence<Page<K, I>> {
return generateSequence(
query(keyCondition, asc, pageSize, consistentRead, filterExpression, initialOffset)
) { page ->
page.offset?.let { offset ->
query(keyCondition, asc, pageSize, consistentRead, filterExpression, offset)
}
}
}

override fun queryAllContents(
keyCondition: KeyCondition<K>,
asc: Boolean,
pageSize: Int,
consistentRead: Boolean,
filterExpression: Expression?,
initialOffset: Offset<K>?
): Sequence<I> {
return queryAll(keyCondition, asc, pageSize, consistentRead, filterExpression, initialOffset)
.map { it.contents }
.flatten()
}
}

fun async(dynamoDbTable: DynamoDbAsyncTable<R>) = Async(dynamoDbTable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,32 @@ internal class DynamoDbScannable<K : Any, I : Any, R : Any>(
.iterator().next()
return toScanResponse(page)
}

override fun scanAll(
pageSize: Int,
consistentRead: Boolean,
filterExpression: Expression?,
initialOffset: Offset<K>?
): Sequence<Page<K, I>> {
return generateSequence(
scan(pageSize, consistentRead, filterExpression, initialOffset)
) { page ->
page.offset?.let { offset ->
scan(pageSize, consistentRead, filterExpression, offset)
}
}
}

override fun scanAllContents(
pageSize: Int,
consistentRead: Boolean,
filterExpression: Expression?,
initialOffset: Offset<K>?
): Sequence<I> {
return scanAll(pageSize, consistentRead, filterExpression, initialOffset)
.map { it.contents }
.flatten()
}
}

fun async(dynamoDbTable: DynamoDbAsyncTable<R>) = Async(dynamoDbTable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,26 @@ internal class UnsupportedQueryable<K : Any, I : Any>(
): Page<K, I> {
throw UnsupportedOperationException("Require $rawType to have a range key. You can query a table or an index only if it has a composite primary key (partition key and sort key)")
}

override fun queryAllContents(
keyCondition: KeyCondition<K>,
asc: Boolean,
pageSize: Int,
consistentRead: Boolean,
filterExpression: Expression?,
initialOffset: Offset<K>?
): Sequence<I> {
TODO("Not yet implemented")
}

override fun queryAll(
keyCondition: KeyCondition<K>,
asc: Boolean,
pageSize: Int,
consistentRead: Boolean,
filterExpression: Expression?,
initialOffset: Offset<K>?
): Sequence<Page<K, I>> {
TODO("Not yet implemented")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,101 @@ class DynamoDbQueryableTest {
assertThat(page1.consumedCapacity?.capacityUnits()).isGreaterThan(0.0)
}

@Test
fun queryAll() {
musicTable.givenAlbums(AFTER_HOURS_EP)

val page = musicTable.albumTracks.queryAll(
keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")),
).iterator().next()

assertThat(page.hasMorePages).isFalse()
assertThat(page.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.slice(0..4))
}

@Test
fun queryAllPagination() {
musicTable.givenAlbums(AFTER_HOURS_EP)

val itr = musicTable.albumTracks.queryAll(
keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")),
pageSize = 2
).iterator()

val page1 = itr.next()
assertThat(page1.hasMorePages).isTrue()
assertThat(page1.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.slice(0..1))

val page2 = itr.next()
assertThat(page2.hasMorePages).isTrue()
assertThat(page2.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.slice(2..3))

val page3 = itr.next()
assertThat(page3.hasMorePages).isFalse()
assertThat(page3.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.slice(4..4))
}

@Test
fun queryAllDesc() {
musicTable.givenAlbums(AFTER_HOURS_EP)

val page = musicTable.albumTracks.queryAll(
keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")),
asc = false
).iterator().next()

assertThat(page.hasMorePages).isFalse()
assertThat(page.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.reversed())
}

@Test
fun queryAllDescPagination() {
musicTable.givenAlbums(AFTER_HOURS_EP)

val itr = musicTable.albumTracks.queryAll(
keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")),
asc = false,
pageSize = 2
).iterator()

val page1 = itr.next()
assertThat(page1.hasMorePages).isTrue()
assertThat(page1.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.reversed().slice(0..1))

val page2 = itr.next()
assertThat(page2.hasMorePages).isTrue()
assertThat(page2.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.reversed().slice(2..3))

val page3 = itr.next()
assertThat(page3.hasMorePages).isFalse()
assertThat(page3.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.reversed().slice(4..4))
}

@Test
fun queryAllContents() {
musicTable.givenAlbums(AFTER_HOURS_EP)

val sequence = musicTable.albumTracks.queryAllContents(
keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")),
)

assertThat(sequence.map { it.track_title }
.toList()).containsAll(AFTER_HOURS_EP.trackTitles.slice(0..4))
}

@Test
fun queryAllContentsDesc() {
musicTable.givenAlbums(AFTER_HOURS_EP)

val sequence = musicTable.albumTracks.queryAllContents(
keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")),
asc = false
)

assertThat(sequence.map { it.track_title }
.toList()).containsAll(AFTER_HOURS_EP.trackTitles.reversed())
}

private fun runLengthLongerThan(duration: Duration): Expression {
return Expression.builder()
.expression("run_length > :duration")
Expand Down
Loading

0 comments on commit 0c53a3f

Please sign in to comment.