Skip to content

Commit

Permalink
lwc-events: fix race condition for samples (#1737)
Browse files Browse the repository at this point in the history
In certain cases update and flush could happen such that
the sample would be missing. Add a lock to avoid the race
condition. May need some further optimization in the
future to reduce contention, but for initial tests it is
sufficient for now.
  • Loading branch information
brharrington authored Dec 10, 2024
1 parent 0e0e604 commit 720061e
Showing 1 changed file with 32 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.typesafe.scalalogging.StrictLogging
import java.time.Duration
import java.util.Locale
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.locks.ReentrantLock

/**
* Helper to convert a sequence of events into a data point.
Expand Down Expand Up @@ -161,13 +162,24 @@ private[events] object DatapointConverter {

private val buffer = new StepDouble(Double.NaN, params.clock, params.step)

// Lock to avoid race condition between update and flush that can otherwise result
// in the samples occasionally being missing for an aggregate.
private val lock = new ReentrantLock()
private val sampleMapper: LwcEvent => List[Any] = params.sampleMapper.orNull
@volatile private var sample: List[Any] = Nil
private var sample: List[Any] = Nil

override def update(event: LwcEvent): Unit = {
update(params.valueMapper(event))
if (sampleMapper != null && sample.isEmpty) {
sample = sampleMapper(event)
if (sampleMapper == null) {
update(params.valueMapper(event))
} else {
lock.lock()
try {
update(params.valueMapper(event))
if (sample.isEmpty)
sample = sampleMapper(event)
} finally {
lock.unlock()
}
}
}

Expand All @@ -177,14 +189,25 @@ private[events] object DatapointConverter {
}
}

private def getAndResetSample(): List[List[Any]] = {
if (sampleMapper == null) {
Nil
} else {
lock.lock()
try {
val s = sample
sample = Nil
List(s)
} finally {
lock.unlock()
}
}
}

override def flush(timestamp: Long): Unit = {
val value = buffer.pollAsRate(timestamp)
if (value.isFinite) {
var s = List.empty[List[Any]]
if (sampleMapper != null) {
s = List(sample)
sample = Nil
}
val s = getAndResetSample()
val ts = timestamp / params.step * params.step
val event = DatapointEvent(params.id, params.tags, ts, value, s)
params.consumer(params.id, event)
Expand Down

0 comments on commit 720061e

Please sign in to comment.