Skip to content

Commit

Permalink
[SPARK-17649][CORE] Log how many Spark events got dropped in LiveList…
Browse files Browse the repository at this point in the history
…enerBus

## What changes were proposed in this pull request?

Log how many Spark events got dropped in LiveListenerBus so that the user can get insights on how to set a correct event queue size.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <[email protected]>

Closes apache#15220 from zsxwing/SPARK-17649.
  • Loading branch information
zsxwing committed Sep 26, 2016
1 parent f234b7c commit bde85f8
Showing 1 changed file with 25 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.scheduler

import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}

import scala.util.DynamicVariable

Expand Down Expand Up @@ -57,6 +57,12 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
// Indicate if `stop()` is called
private val stopped = new AtomicBoolean(false)

/** A counter for dropped events. It will be reset every time we log it. */
private val droppedEventsCounter = new AtomicLong(0L)

/** When `droppedEventsCounter` was logged last time in milliseconds. */
@volatile private var lastReportTimestamp = 0L

// Indicate if we are processing some event
// Guarded by `self`
private var processingEvent = false
Expand Down Expand Up @@ -123,6 +129,24 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
eventLock.release()
} else {
onDropEvent(event)
droppedEventsCounter.incrementAndGet()
}

val droppedEvents = droppedEventsCounter.get
if (droppedEvents > 0) {
// Don't log too frequently
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
// There may be multiple threads trying to decrease droppedEventsCounter.
// Use "compareAndSet" to make sure only one thread can win.
// And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
// then that thread will update it.
if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
new java.util.Date(prevLastReportTimestamp))
}
}
}
}

Expand Down

0 comments on commit bde85f8

Please sign in to comment.