Skip to content

Commit

Permalink
eval: support samples with message batches (#1730)
Browse files Browse the repository at this point in the history
Update the compact batch format to support a V2 variant
of the datapoint messages that includes samples.
  • Loading branch information
brharrington authored Nov 20, 2024
1 parent bee62c2 commit 57369c1
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ object LwcMessages {
private val Heartbeat = 5
private val Event = 6
private val SubscriptionV2 = 7
private val DatapointV2 = 8

/**
* Encode messages using Jackson's smile format into a ByteString.
Expand Down Expand Up @@ -262,7 +263,7 @@ object LwcMessages {
gen.writeNumber(s.step)
}
gen.writeEndArray()
case msg: LwcDatapoint =>
case msg: LwcDatapoint if msg.samples.isEmpty =>
gen.writeNumber(Datapoint)
gen.writeNumber(msg.timestamp)
gen.writeString(msg.id)
Expand All @@ -275,6 +276,20 @@ object LwcMessages {
gen.writeString(v)
}
gen.writeNumber(msg.value)
case msg: LwcDatapoint =>
gen.writeNumber(DatapointV2)
gen.writeNumber(msg.timestamp)
gen.writeString(msg.id)
// Should already be sorted, but convert if needed to ensure we can rely on
// the order. It will be a noop if already a SortedTagMap.
val tags = SortedTagMap(msg.tags)
gen.writeNumber(tags.size)
tags.foreachEntry { (k, v) =>
gen.writeString(k)
gen.writeString(v)
}
gen.writeNumber(msg.value)
Json.encode(gen, msg.samples)
case msg: LwcDiagnosticMessage =>
gen.writeNumber(LwcDiagnostic)
gen.writeString(msg.id)
Expand Down Expand Up @@ -355,6 +370,13 @@ object LwcMessages {
val tags = parseTags(parser, parser.nextIntValue(0))
val value = nextDouble(parser)
builder += LwcDatapoint(timestamp, id, tags, value)
case DatapointV2 =>
val timestamp = parser.nextLongValue(-1L)
val id = parser.nextTextValue()
val tags = parseTags(parser, parser.nextIntValue(0))
val value = nextDouble(parser)
val samples = parseSamples(parser)
builder += LwcDatapoint(timestamp, id, tags, value, samples)
case LwcDiagnostic =>
val id = parser.nextTextValue()
val typeName = parser.nextTextValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ class LwcMessagesSuite extends FunSuite {
val actual = LwcMessages.parse(Json.encode(input))
val expected = input.copy(samples = Json.decode[List[List[JsonNode]]](Json.encode(samples)))
assertEquals(actual, expected)

val encoded = LwcMessages.encodeBatch(List(input))
val decoded = LwcMessages.parseBatch(encoded)
assertEquals(decoded.size, 1)
assertEquals(decoded.head, expected)
}

test("datapoint, with samples empty") {
Expand Down
2 changes: 1 addition & 1 deletion atlas-lwc-events/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ atlas.lwc.events {
// Maximum size for a group by. Used to avoid OOM for group by with a high cardinality dimension.
// If exceeded new groups will be dropped.
max-groups = 10000
}
}

0 comments on commit 57369c1

Please sign in to comment.