Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for query all and scan all #180

Merged
merged 10 commits into from
May 2, 2024
98 changes: 98 additions & 0 deletions tempest2/src/main/kotlin/app/cash/tempest2/Query.kt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,104 @@ interface Queryable<K : Any, I : Any> {
initialOffset,
config.returnConsumedCapacity
)

/**
* Executes a query and returns a sequence of pages that contains all results, regardless of page size.
* New pages will be fetched as needed when the resulting sequence is enumerated.
*/
fun queryAll(
keyCondition: KeyCondition<K>,
asc: Boolean = true,
pageSize: Int = 100,
consistentRead: Boolean = false,
filterExpression: Expression? = null,
initialOffset: Offset<K>? = null,
): Sequence<Page<K, I>>

// Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`).

fun queryAll(keyCondition: KeyCondition<K>) = queryAll(
keyCondition,
config = QueryConfig.Builder().build(),
initialOffset = null,
)

fun queryAll(keyCondition: KeyCondition<K>, initialOffset: Offset<K>?) = queryAll(
keyCondition,
config = QueryConfig.Builder().build(),
initialOffset = initialOffset,
)

fun queryAll(keyCondition: KeyCondition<K>, config: QueryConfig): Sequence<Page<K, I>> {
return queryAll(
keyCondition,
config = config,
initialOffset = null,
)
}

fun queryAll(
keyCondition: KeyCondition<K>,
config: QueryConfig,
initialOffset: Offset<K>?,
): Sequence<Page<K, I>> {
return queryAll(
keyCondition,
config.asc,
config.pageSize,
config.consistentRead,
config.filterExpression,
initialOffset,
)
}

/**
* Executes a query and returns a sequence that contains all results, regardless of page size.
* New pages will be fetched as needed when the resulting sequence is enumerated.
*/
fun queryAllContents(
keyCondition: KeyCondition<K>,
asc: Boolean = true,
pageSize: Int = 100,
consistentRead: Boolean = false,
filterExpression: Expression? = null,
initialOffset: Offset<K>? = null,
): Sequence<I>

// Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`).

fun queryAllContents(keyCondition: KeyCondition<K>) = queryAllContents(
keyCondition,
config = QueryConfig.Builder().build(),
initialOffset = null,
)

fun queryAllContents(keyCondition: KeyCondition<K>, initialOffset: Offset<K>?) = queryAllContents(
keyCondition,
config = QueryConfig.Builder().build(),
initialOffset = initialOffset,
)

fun queryAllContents(keyCondition: KeyCondition<K>, config: QueryConfig) = queryAllContents(
keyCondition,
config = config,
initialOffset = null,
)

fun queryAllContents(
keyCondition: KeyCondition<K>,
config: QueryConfig,
initialOffset: Offset<K>?,
): Sequence<I> {
return queryAllContents(
keyCondition,
config.asc,
config.pageSize,
config.consistentRead,
config.filterExpression,
initialOffset,
)
}
}

data class QueryConfig internal constructor(
Expand Down
74 changes: 74 additions & 0 deletions tempest2/src/main/kotlin/app/cash/tempest2/Scan.kt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,80 @@ interface Scannable<K : Any, I : Any> {
config.filterExpression,
initialOffset
)

/**
* Executes a scan and returns a sequence of pages that contains all results, regardless of page size.
* New pages will be fetched as needed when the resulting sequence is enumerated.
*/
fun scanAll(
pageSize: Int = 100,
consistentRead: Boolean = false,
filterExpression: Expression? = null,
initialOffset: Offset<K>? = null,
): Sequence<Page<K, I>>

// Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`).

fun scanAll() = scanAll(
ScanConfig.Builder().build(),
initialOffset = null
)

fun scanAll(initialOffset: Offset<K>?) = scanAll(
ScanConfig.Builder().build(),
initialOffset = initialOffset
)

fun scanAll(config: ScanConfig) = scanAll(
config,
initialOffset = null
)

fun scanAll(config: ScanConfig, initialOffset: Offset<K>?): Sequence<Page<K, I>> {
return scanAll(
config.pageSize,
config.consistentRead,
config.filterExpression,
initialOffset
)
}

/**
* Executes a scan and returns a sequence that contains all results, regardless of page size.
* New pages will be fetched as needed when the resulting sequence is enumerated.
*/
fun scanAllContents(
pageSize: Int = 100,
consistentRead: Boolean = false,
filterExpression: Expression? = null,
initialOffset: Offset<K>? = null,
): Sequence<I>

// Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`).

fun scanAllContents() = scanAllContents(
ScanConfig.Builder().build(),
initialOffset = null
)

fun scanAllContents(initialOffset: Offset<K>?) = scanAllContents(
ScanConfig.Builder().build(),
initialOffset = initialOffset
)

fun scanAllContents(config: ScanConfig) = scanAllContents(
config,
initialOffset = null
)

fun scanAllContents(config: ScanConfig, initialOffset: Offset<K>?): Sequence<I> {
return scanAllContents(
config.pageSize,
config.consistentRead,
config.filterExpression,
initialOffset
)
}
}

data class ScanConfig internal constructor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,36 @@ internal class DynamoDbQueryable<K : Any, I : Any, R : Any>(
.iterator().next()
return toQueryResponse(page)
}

override fun queryAll(
keyCondition: KeyCondition<K>,
asc: Boolean,
pageSize: Int,
consistentRead: Boolean,
filterExpression: Expression?,
initialOffset: Offset<K>?
): Sequence<Page<K, I>> {
return generateSequence(
query(keyCondition, asc, pageSize, consistentRead, filterExpression, initialOffset)
) { page ->
page.offset?.let { offset ->
query(keyCondition, asc, pageSize, consistentRead, filterExpression, offset)
}
}
}

override fun queryAllContents(
keyCondition: KeyCondition<K>,
asc: Boolean,
pageSize: Int,
consistentRead: Boolean,
filterExpression: Expression?,
initialOffset: Offset<K>?
): Sequence<I> {
return queryAll(keyCondition, asc, pageSize, consistentRead, filterExpression, initialOffset)
.map { it.contents }
.flatten()
}
}

fun async(dynamoDbTable: DynamoDbAsyncTable<R>) = Async(dynamoDbTable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,32 @@ internal class DynamoDbScannable<K : Any, I : Any, R : Any>(
.iterator().next()
return toScanResponse(page)
}

override fun scanAll(
pageSize: Int,
consistentRead: Boolean,
filterExpression: Expression?,
initialOffset: Offset<K>?
): Sequence<Page<K, I>> {
return generateSequence(
scan(pageSize, consistentRead, filterExpression, initialOffset)
) { page ->
page.offset?.let { offset ->
scan(pageSize, consistentRead, filterExpression, offset)
}
}
}

override fun scanAllContents(
pageSize: Int,
consistentRead: Boolean,
filterExpression: Expression?,
initialOffset: Offset<K>?
): Sequence<I> {
return scanAll(pageSize, consistentRead, filterExpression, initialOffset)
.map { it.contents }
.flatten()
}
}

fun async(dynamoDbTable: DynamoDbAsyncTable<R>) = Async(dynamoDbTable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,26 @@ internal class UnsupportedQueryable<K : Any, I : Any>(
): Page<K, I> {
throw UnsupportedOperationException("Require $rawType to have a range key. You can query a table or an index only if it has a composite primary key (partition key and sort key)")
}

override fun queryAllContents(
keyCondition: KeyCondition<K>,
asc: Boolean,
pageSize: Int,
consistentRead: Boolean,
filterExpression: Expression?,
initialOffset: Offset<K>?
): Sequence<I> {
TODO("Not yet implemented")
}

override fun queryAll(
keyCondition: KeyCondition<K>,
asc: Boolean,
pageSize: Int,
consistentRead: Boolean,
filterExpression: Expression?,
initialOffset: Offset<K>?
): Sequence<Page<K, I>> {
TODO("Not yet implemented")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,101 @@ class DynamoDbQueryableTest {
assertThat(page1.consumedCapacity?.capacityUnits()).isGreaterThan(0.0)
}

@Test
fun queryAll() {
musicTable.givenAlbums(AFTER_HOURS_EP)

val page = musicTable.albumTracks.queryAll(
keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")),
).iterator().next()

assertThat(page.hasMorePages).isFalse()
assertThat(page.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.slice(0..4))
}

@Test
fun queryAllPagination() {
musicTable.givenAlbums(AFTER_HOURS_EP)

val itr = musicTable.albumTracks.queryAll(
keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")),
pageSize = 2
).iterator()

val page1 = itr.next()
assertThat(page1.hasMorePages).isTrue()
assertThat(page1.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.slice(0..1))

val page2 = itr.next()
assertThat(page2.hasMorePages).isTrue()
assertThat(page2.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.slice(2..3))

val page3 = itr.next()
assertThat(page3.hasMorePages).isFalse()
assertThat(page3.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.slice(4..4))
}

@Test
fun queryAllDesc() {
musicTable.givenAlbums(AFTER_HOURS_EP)

val page = musicTable.albumTracks.queryAll(
keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")),
asc = false
).iterator().next()

assertThat(page.hasMorePages).isFalse()
assertThat(page.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.reversed())
}

@Test
fun queryAllDescPagination() {
musicTable.givenAlbums(AFTER_HOURS_EP)

val itr = musicTable.albumTracks.queryAll(
keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")),
asc = false,
pageSize = 2
).iterator()

val page1 = itr.next()
assertThat(page1.hasMorePages).isTrue()
assertThat(page1.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.reversed().slice(0..1))

val page2 = itr.next()
assertThat(page2.hasMorePages).isTrue()
assertThat(page2.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.reversed().slice(2..3))

val page3 = itr.next()
assertThat(page3.hasMorePages).isFalse()
assertThat(page3.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.reversed().slice(4..4))
}

@Test
fun queryAllContents() {
musicTable.givenAlbums(AFTER_HOURS_EP)

val sequence = musicTable.albumTracks.queryAllContents(
keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")),
)

assertThat(sequence.map { it.track_title }
.toList()).containsAll(AFTER_HOURS_EP.trackTitles.slice(0..4))
}

@Test
fun queryAllContentsDesc() {
musicTable.givenAlbums(AFTER_HOURS_EP)

val sequence = musicTable.albumTracks.queryAllContents(
keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")),
asc = false
)

assertThat(sequence.map { it.track_title }
.toList()).containsAll(AFTER_HOURS_EP.trackTitles.reversed())
}

private fun runLengthLongerThan(duration: Duration): Expression {
return Expression.builder()
.expression("run_length > :duration")
Expand Down
Loading
Loading