Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for query all and scan all #180

Merged
merged 10 commits into from
May 2, 2024
97 changes: 97 additions & 0 deletions tempest2/src/main/kotlin/app/cash/tempest2/Query.kt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,103 @@ interface Queryable<K : Any, I : Any> {
initialOffset,
config.returnConsumedCapacity
)

/**
* Executes a query and returns a sequence of pages that contains all results.
*/
fun queryAll(
keyCondition: KeyCondition<K>,
asc: Boolean = true,
pageSize: Int = 100,
consistentRead: Boolean = false,
filterExpression: Expression? = null,
initialOffset: Offset<K>? = null,
): Sequence<Page<K, I>>

// Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`).

fun queryAll(keyCondition: KeyCondition<K>) = queryAll(
keyCondition,
config = QueryConfig.Builder().build(),
initialOffset = null,
)

fun queryAll(keyCondition: KeyCondition<K>, initialOffset: Offset<K>?) = queryAll(
keyCondition,
config = QueryConfig.Builder().build(),
initialOffset = initialOffset,
)

fun queryAll(keyCondition: KeyCondition<K>, config: QueryConfig): Sequence<Page<K, I>> {
return queryAll(
keyCondition,
config = config,
initialOffset = null,
)
}

fun queryAll(
keyCondition: KeyCondition<K>,
config: QueryConfig,
initialOffset: Offset<K>?,
): Sequence<Page<K, I>> {
return queryAll(
keyCondition,
config.asc,
config.pageSize,
config.consistentRead,
config.filterExpression,
initialOffset,
)
}

/**
* Executes a query and returns a sequence that contains all results, regardless of page size.
* New pages will be fetched as needed when the resulting sequence is enumerated.
*/
fun queryAllContents(
keyCondition: KeyCondition<K>,
asc: Boolean = true,
pageSize: Int = 100,
consistentRead: Boolean = false,
filterExpression: Expression? = null,
initialOffset: Offset<K>? = null,
): Sequence<I>

// Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`).

fun queryAllContents(keyCondition: KeyCondition<K>) = queryAllContents(
keyCondition,
config = QueryConfig.Builder().build(),
initialOffset = null,
)

fun queryAllContents(keyCondition: KeyCondition<K>, initialOffset: Offset<K>?) = queryAllContents(
keyCondition,
config = QueryConfig.Builder().build(),
initialOffset = initialOffset,
)

fun queryAllContents(keyCondition: KeyCondition<K>, config: QueryConfig) = queryAllContents(
keyCondition,
config = config,
initialOffset = null,
)

fun queryAllContents(
keyCondition: KeyCondition<K>,
config: QueryConfig,
initialOffset: Offset<K>?,
): Sequence<I> {
return queryAllContents(
keyCondition,
config.asc,
config.pageSize,
config.consistentRead,
config.filterExpression,
initialOffset,
)
}
}

data class QueryConfig internal constructor(
Expand Down
69 changes: 69 additions & 0 deletions tempest2/src/main/kotlin/app/cash/tempest2/Scan.kt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,75 @@ interface Scannable<K : Any, I : Any> {
config.filterExpression,
initialOffset
)

/**
* Executes a scan and returns a sequence of pages that contains all results.
*/
fun scanAll(
pageSize: Int = 100,
consistentRead: Boolean = false,
filterExpression: Expression? = null,
initialOffset: Offset<K>? = null,
): Sequence<Page<K, I>>

// Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`).

fun scanAll() = scanAll(
ScanConfig.Builder().build(),
initialOffset = null
)

fun scanAll(initialOffset: Offset<K>?) = scanAll(
ScanConfig.Builder().build(),
initialOffset = initialOffset
)

fun scanAll(config: ScanConfig) = scanAll(
config,
initialOffset = null
)

fun scanAll(config: ScanConfig, initialOffset: Offset<K>?) = scanAll(
config.pageSize,
config.consistentRead,
config.filterExpression,
initialOffset
)

/**
* Executes a scan and returns a sequence that contains all results, regardless of page size.
* New pages will be fetched as needed when the resulting sequence is enumerated.
*/
fun scanAllContents(
pageSize: Int = 100,
consistentRead: Boolean = false,
filterExpression: Expression? = null,
initialOffset: Offset<K>? = null,
): Sequence<I>

// Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`).

fun scanAllContents() = scanAllContents(
ScanConfig.Builder().build(),
initialOffset = null
)

fun scanAllContents(initialOffset: Offset<K>?) = scanAllContents(
ScanConfig.Builder().build(),
initialOffset = initialOffset
)

fun scanAllContents(config: ScanConfig) = scanAllContents(
config,
initialOffset = null
)

fun scanAllContents(config: ScanConfig, initialOffset: Offset<K>?) = scanAllContents(
config.pageSize,
config.consistentRead,
config.filterExpression,
initialOffset
)
}

data class ScanConfig internal constructor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,38 @@ internal class DynamoDbQueryable<K : Any, I : Any, R : Any>(
.iterator().next()
return toQueryResponse(page)
}

override fun queryAll(
keyCondition: KeyCondition<K>,
asc: Boolean,
pageSize: Int,
consistentRead: Boolean,
filterExpression: Expression?,
initialOffset: Offset<K>?
): Sequence<Page<K, I>> {
return generateSequence(
query(keyCondition, asc, pageSize, consistentRead, filterExpression, initialOffset)
) { page ->
page.offset?.let { offset ->
query(
keyCondition, asc, pageSize, consistentRead, filterExpression, initialOffset = offset
)
}
}
}

override fun queryAllContents(
keyCondition: KeyCondition<K>,
asc: Boolean,
pageSize: Int,
consistentRead: Boolean,
filterExpression: Expression?,
initialOffset: Offset<K>?
): Sequence<I> {
return queryAll(keyCondition, asc, pageSize, consistentRead, filterExpression, initialOffset)
.map { it.contents }
.flatten()
}
}

fun async(dynamoDbTable: DynamoDbAsyncTable<R>) = Async(dynamoDbTable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,32 @@ internal class DynamoDbScannable<K : Any, I : Any, R : Any>(
.iterator().next()
return toScanResponse(page)
}

override fun scanAll(
pageSize: Int,
consistentRead: Boolean,
filterExpression: Expression?,
initialOffset: Offset<K>?
): Sequence<Page<K, I>> {
return generateSequence(
scan(pageSize, consistentRead, filterExpression, initialOffset)
) { page ->
page.offset?.let { offset ->
scan(offset)
}
}
}

override fun scanAllContents(
pageSize: Int,
consistentRead: Boolean,
filterExpression: Expression?,
initialOffset: Offset<K>?
): Sequence<I> {
return scanAll(pageSize, consistentRead, filterExpression, initialOffset)
.map { it.contents }
.flatten()
}
}

fun async(dynamoDbTable: DynamoDbAsyncTable<R>) = Async(dynamoDbTable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,15 @@ internal class UnsupportedQueryable<K : Any, I : Any>(
): Page<K, I> {
throw UnsupportedOperationException("Require $rawType to have a range key. You can query a table or an index only if it has a composite primary key (partition key and sort key)")
}

override fun queryAllContents(
keyCondition: KeyCondition<K>,
asc: Boolean,
pageSize: Int,
consistentRead: Boolean,
filterExpression: Expression?,
initialOffset: Offset<K>?
): Sequence<I> {
TODO("Not yet implemented")
}
}