diff --git a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala index 2b5579763..2cf5858fc 100644 --- a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala +++ b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala @@ -94,6 +94,7 @@ private[events] object DatapointConverter { v * v } + @scala.annotation.tailrec private[events] def toDouble(value: Any, dflt: Any): Double = { value match { case v: Boolean => if (v) 1.0 else 0.0 @@ -118,17 +119,17 @@ private[events] object DatapointConverter { } } - private[events] def addNaN(value: AtomicDouble, amount: Double): Unit = { + private[events] def addNaN(now: Long, value: StepDouble, amount: Double): Unit = { if (amount.isNaN) return var set = false while (!set) { - val v = value.get() + val v = value.getCurrent(now) if (v.isNaN) { - set = value.compareAndSet(v, amount) + set = value.compareAndSet(now, v, amount) } else { - value.addAndGet(amount) + value.addAndGet(now, amount) set = true } } @@ -154,7 +155,7 @@ private[events] object DatapointConverter { override def update(value: Double): Unit = { if (value.isFinite && value >= 0.0) { - addNaN(buffer.getCurrent, value) + addNaN(params.clock.wallTime(), buffer, value) } } @@ -169,7 +170,7 @@ private[events] object DatapointConverter { override def hasNoData: Boolean = { val now = params.clock.wallTime() - buffer.getCurrent(now).get().isNaN && buffer.poll(now).isNaN + buffer.getCurrent(now).isNaN && buffer.poll(now).isNaN } } @@ -183,7 +184,7 @@ private[events] object DatapointConverter { } override def update(value: Double): Unit = { - addNaN(buffer.getCurrent, 1.0) + addNaN(params.clock.wallTime(), buffer, 1.0) } override def flush(timestamp: Long): Unit = { @@ -197,7 +198,7 @@ private[events] object DatapointConverter { override def hasNoData: Boolean = { val now = params.clock.wallTime() - buffer.getCurrent(now).get().isNaN && buffer.poll(now).isNaN + buffer.getCurrent(now).isNaN && buffer.poll(now).isNaN } } @@ -211,9 +212,7 @@ private[events] object DatapointConverter { } override def update(value: Double): Unit = { - if (value.isFinite) { - buffer.getCurrent.max(value) - } + buffer.max(params.clock.wallTime(), value) } override def flush(timestamp: Long): Unit = { @@ -227,7 +226,7 @@ private[events] object DatapointConverter { override def hasNoData: Boolean = { val now = params.clock.wallTime() - buffer.getCurrent(now).get().isNaN && buffer.poll(now).isNaN + buffer.getCurrent(now).isNaN && buffer.poll(now).isNaN } } @@ -241,9 +240,7 @@ private[events] object DatapointConverter { } override def update(value: Double): Unit = { - if (value.isFinite) { - min(buffer.getCurrent, value) - } + buffer.min(params.clock.wallTime(), value) } private def min(current: AtomicDouble, value: Double): Unit = { @@ -270,7 +267,7 @@ private[events] object DatapointConverter { override def hasNoData: Boolean = { val now = params.clock.wallTime() - buffer.getCurrent(now).get().isNaN && buffer.poll(now).isNaN + buffer.getCurrent(now).isNaN && buffer.poll(now).isNaN } } diff --git a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/StreamMetadata.scala b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/StreamMetadata.scala index ec75cc885..10eefda87 100644 --- a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/StreamMetadata.scala +++ b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/StreamMetadata.scala @@ -38,16 +38,17 @@ import com.netflix.spectator.impl.StepLong case class StreamMetadata( streamId: String, remoteAddress: String = "unknown", + clock: Clock = Clock.SYSTEM, receivedMessages: StepLong = new StepLong(0, Clock.SYSTEM, 60_000), droppedMessages: StepLong = new StepLong(0, Clock.SYSTEM, 60_000) ) extends JsonSupport { def updateReceived(n: Int): Unit = { - receivedMessages.getCurrent.addAndGet(n) + receivedMessages.addAndGet(clock.wallTime(), n) } def updateDropped(n: Int): Unit = { - droppedMessages.getCurrent.addAndGet(n) + droppedMessages.addAndGet(clock.wallTime(), n) } override def hasCustomEncoding: Boolean = true diff --git a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/StreamMetadataSuite.scala b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/StreamMetadataSuite.scala index 3ed491084..8bade7e0b 100644 --- a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/StreamMetadataSuite.scala +++ b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/StreamMetadataSuite.scala @@ -25,7 +25,7 @@ class StreamMetadataSuite extends FunSuite { val clock = new ManualClock() val step = 60_000L val meta = - StreamMetadata("id", "addr", new StepLong(0, clock, step), new StepLong(0, clock, step)) + StreamMetadata("id", "addr", clock, new StepLong(0, clock, step), new StepLong(0, clock, step)) meta.updateReceived(100) meta.updateDropped(2) diff --git a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscriptionManagerSuite.scala b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscriptionManagerSuite.scala index 26a7e4c56..329a9c73a 100644 --- a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscriptionManagerSuite.scala +++ b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscriptionManagerSuite.scala @@ -252,7 +252,7 @@ class SubscriptionManagerSuite extends FunSuite { val sm = new SubscriptionManager[Integer](registry) val meta = - StreamMetadata("a", "test", new StepLong(0, clock, step), new StepLong(0, clock, step)) + StreamMetadata("a", "test", clock, new StepLong(0, clock, step), new StepLong(0, clock, step)) assert(sm.register(meta, 1)) val ok = Id.create("atlas.lwcapi.currentStreams").withTag("state", "ok") diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 88f532751..ea61147b7 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -6,12 +6,12 @@ object Dependencies { object Versions { val pekko = "1.0.3" val pekkoHttpV = "1.0.1" - val iep = "5.0.26" + val iep = "5.0.27" val jackson = "2.18.0" val log4j = "2.24.1" val scala = "2.13.15" val slf4j = "2.0.16" - val spectator = "1.7.19" + val spectator = "1.8.0" val spring = "6.1.12" val crossScala = Seq(scala, "3.4.1")