From 57369c1fe1f4231b3156e2516497acc447ca27b0 Mon Sep 17 00:00:00 2001 From: brharrington Date: Wed, 20 Nov 2024 15:56:23 -0600 Subject: [PATCH] eval: support samples with message batches (#1730) Update the compact batch format to support a V2 variant of the datapoint messages that includes samples. --- .../atlas/eval/model/LwcMessages.scala | 24 ++++++++++++++++++- .../atlas/eval/model/LwcMessagesSuite.scala | 5 ++++ .../src/main/resources/reference.conf | 2 +- 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcMessages.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcMessages.scala index 36f86da78..121709179 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcMessages.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/LwcMessages.scala @@ -205,6 +205,7 @@ object LwcMessages { private val Heartbeat = 5 private val Event = 6 private val SubscriptionV2 = 7 + private val DatapointV2 = 8 /** * Encode messages using Jackson's smile format into a ByteString. @@ -262,7 +263,7 @@ object LwcMessages { gen.writeNumber(s.step) } gen.writeEndArray() - case msg: LwcDatapoint => + case msg: LwcDatapoint if msg.samples.isEmpty => gen.writeNumber(Datapoint) gen.writeNumber(msg.timestamp) gen.writeString(msg.id) @@ -275,6 +276,20 @@ object LwcMessages { gen.writeString(v) } gen.writeNumber(msg.value) + case msg: LwcDatapoint => + gen.writeNumber(DatapointV2) + gen.writeNumber(msg.timestamp) + gen.writeString(msg.id) + // Should already be sorted, but convert if needed to ensure we can rely on + // the order. It will be a noop if already a SortedTagMap. + val tags = SortedTagMap(msg.tags) + gen.writeNumber(tags.size) + tags.foreachEntry { (k, v) => + gen.writeString(k) + gen.writeString(v) + } + gen.writeNumber(msg.value) + Json.encode(gen, msg.samples) case msg: LwcDiagnosticMessage => gen.writeNumber(LwcDiagnostic) gen.writeString(msg.id) @@ -355,6 +370,13 @@ object LwcMessages { val tags = parseTags(parser, parser.nextIntValue(0)) val value = nextDouble(parser) builder += LwcDatapoint(timestamp, id, tags, value) + case DatapointV2 => + val timestamp = parser.nextLongValue(-1L) + val id = parser.nextTextValue() + val tags = parseTags(parser, parser.nextIntValue(0)) + val value = nextDouble(parser) + val samples = parseSamples(parser) + builder += LwcDatapoint(timestamp, id, tags, value, samples) case LwcDiagnostic => val id = parser.nextTextValue() val typeName = parser.nextTextValue() diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/LwcMessagesSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/LwcMessagesSuite.scala index 7df03abe2..b4f650727 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/LwcMessagesSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/model/LwcMessagesSuite.scala @@ -110,6 +110,11 @@ class LwcMessagesSuite extends FunSuite { val actual = LwcMessages.parse(Json.encode(input)) val expected = input.copy(samples = Json.decode[List[List[JsonNode]]](Json.encode(samples))) assertEquals(actual, expected) + + val encoded = LwcMessages.encodeBatch(List(input)) + val decoded = LwcMessages.parseBatch(encoded) + assertEquals(decoded.size, 1) + assertEquals(decoded.head, expected) } test("datapoint, with samples empty") { diff --git a/atlas-lwc-events/src/main/resources/reference.conf b/atlas-lwc-events/src/main/resources/reference.conf index 0742b5271..96dc933db 100644 --- a/atlas-lwc-events/src/main/resources/reference.conf +++ b/atlas-lwc-events/src/main/resources/reference.conf @@ -26,4 +26,4 @@ atlas.lwc.events { // Maximum size for a group by. Used to avoid OOM for group by with a high cardinality dimension. // If exceeded new groups will be dropped. max-groups = 10000 -} \ No newline at end of file +}