From 720061e373436d2b7fb7291a52a31dff3c60c024 Mon Sep 17 00:00:00 2001 From: brharrington Date: Tue, 10 Dec 2024 13:37:57 -0600 Subject: [PATCH] lwc-events: fix race condition for samples (#1737) In certain cases update and flush could happen such that the sample would be missing. Add a lock to avoid the race condition. May need some further optimization in the future to reduce contention, but for initial tests it is sufficient for now. --- .../atlas/lwc/events/DatapointConverter.scala | 41 +++++++++++++++---- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala index ae8ee26fa..bd95a9e74 100644 --- a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala +++ b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala @@ -28,6 +28,7 @@ import com.typesafe.scalalogging.StrictLogging import java.time.Duration import java.util.Locale import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.locks.ReentrantLock /** * Helper to convert a sequence of events into a data point. @@ -161,13 +162,24 @@ private[events] object DatapointConverter { private val buffer = new StepDouble(Double.NaN, params.clock, params.step) + // Lock to avoid race condition between update and flush that can otherwise result + // in the samples occasionally being missing for an aggregate. + private val lock = new ReentrantLock() private val sampleMapper: LwcEvent => List[Any] = params.sampleMapper.orNull - @volatile private var sample: List[Any] = Nil + private var sample: List[Any] = Nil override def update(event: LwcEvent): Unit = { - update(params.valueMapper(event)) - if (sampleMapper != null && sample.isEmpty) { - sample = sampleMapper(event) + if (sampleMapper == null) { + update(params.valueMapper(event)) + } else { + lock.lock() + try { + update(params.valueMapper(event)) + if (sample.isEmpty) + sample = sampleMapper(event) + } finally { + lock.unlock() + } } } @@ -177,14 +189,25 @@ private[events] object DatapointConverter { } } + private def getAndResetSample(): List[List[Any]] = { + if (sampleMapper == null) { + Nil + } else { + lock.lock() + try { + val s = sample + sample = Nil + List(s) + } finally { + lock.unlock() + } + } + } + override def flush(timestamp: Long): Unit = { val value = buffer.pollAsRate(timestamp) if (value.isFinite) { - var s = List.empty[List[Any]] - if (sampleMapper != null) { - s = List(sample) - sample = Nil - } + val s = getAndResetSample() val ts = timestamp / params.step * params.step val event = DatapointEvent(params.id, params.tags, ts, value, s) params.consumer(params.id, event)