Skip to content

Commit

Permalink
[TH2-5046] Correct error reporting for group download (#63)
Browse files Browse the repository at this point in the history
* [TH2-5046] Correct error handling when loading group

* [TH2-5046] Update readme
  • Loading branch information
OptimumCode authored Sep 20, 2023
1 parent 98f37bf commit 1a764b3
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 9 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ spec:
+ Add support for requesting message groups in reversed order
+ Add filter by stream to gRPC API for group search request
### Fixed:
+ error reporting when executing group search (if error occurs during a call to the Cradle API stream were closed before the error was reported)
## 2.1.0
+ Updated bom: `4.5.0-dev`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,8 @@ class SearchMessagesHandler(
markerAsGroup = true,
limit = request.limit,
)
try {
rootSink.use { sink ->

rootSink.use { sink ->
try {
val parameters = CradleGroupRequest(
preFilter = createInitialPrefilter(request),
)
Expand Down Expand Up @@ -301,10 +300,10 @@ class SearchMessagesHandler(
}
} while (keepPulling)
}
} catch (ex: Exception) {
logger.error("Error getting messages group", ex)
rootSink.onError(ex)
}
} catch (ex: Exception) {
logger.error("Error getting messages group", ex)
rootSink.onError(ex)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,16 +200,31 @@ class FileDownloadHandler(
) {
val matchedPath = ctx.matchedPath()
var dataSent = 0
ctx.status(HttpStatus.OK)
.contentType(JSON_STREAM_CONTENT_TYPE)
.header(Header.TRANSFER_ENCODING, "chunked")

var writeHeader = true
var status: HttpStatus = HttpStatus.OK

fun writeHeader() {
if (writeHeader) {
ctx.status(status)
.contentType(JSON_STREAM_CONTENT_TYPE)
.header(Header.TRANSFER_ENCODING, "chunked")
writeHeader = false
}
}

val output = ctx.res().outputStream.buffered()
try {
do {
dataMeasurement.start("process_sse_event").use {
val nextEvent = queue.take()
ResponseQueue.currentSize(matchedPath, queue.size)
val sseEvent = dataMeasurement.start("await_convert_to_json").use { nextEvent.get() }
if (writeHeader && sseEvent is SseEvent.ErrorData.SimpleError) {
// something happened during request
status = HttpStatus.INTERNAL_SERVER_ERROR
}
writeHeader()
when (sseEvent.event) {
EventType.KEEP_ALIVE -> output.flush()
EventType.CLOSE -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.exactpro.th2.lwdataprovider.http
import com.exactpro.cradle.Direction
import com.exactpro.cradle.Order
import com.exactpro.cradle.messages.StoredMessageIdUtils
import com.exactpro.cradle.utils.CradleStorageException
import com.exactpro.th2.common.message.addField
import com.exactpro.th2.common.message.message
import com.exactpro.th2.common.message.setMetadata
Expand All @@ -27,8 +28,10 @@ import com.exactpro.th2.lwdataprovider.util.GroupBatch
import com.exactpro.th2.lwdataprovider.util.createCradleStoredMessage
import io.javalin.http.HttpStatus
import org.junit.jupiter.api.Test
import org.mockito.kotlin.any
import org.mockito.kotlin.argThat
import org.mockito.kotlin.doReturn
import org.mockito.kotlin.doThrow
import org.mockito.kotlin.whenever
import strikt.api.expectThat
import strikt.assertions.isEqualTo
Expand Down Expand Up @@ -297,4 +300,31 @@ class TestFileDownloadHandler : AbstractHttpHandlerTest<FileDownloadHandler>() {
}
}
}

@Test
fun `respond with error and correct status if first cradle call throws an exception`() {
whenever(storage.getGroupedMessageBatches(any())) doThrow CradleStorageException("ignore")

startTest { _, client ->
val now = Instant.now().toEpochMilli()
val response = client.get(
"/download/messages?" +
"startTimestamp=${now}&endTimestamp=${now + 100}" +
"&group=test-group" +
"&bookId=test-book" +
"&responseFormat=BASE_64"
)

expectThat(response) {
get { code } isEqualTo HttpStatus.INTERNAL_SERVER_ERROR.code
get { body?.bytes()?.toString(Charsets.UTF_8) }
.isNotNull()
.isEqualTo(
"""{"error":"ignore"}
|
""".trimMargin()
)
}
}
}
}

0 comments on commit 1a764b3

Please sign in to comment.