Skip to content

Commit

Permalink
[SPARK-12221] add cpu time to metrics
Browse files Browse the repository at this point in the history
Currently task metrics don't support executor CPU time, so there's no way to calculate how much CPU time a stage/task took from History Server metrics. This PR enables reporting CPU time.

Author: jisookim <[email protected]>

Closes apache#10212 from jisookim0513/add-cpu-time-metric.
  • Loading branch information
jisookim0513 authored and Marcelo Vanzin committed Sep 23, 2016
1 parent 988c714 commit 90a30f4
Show file tree
Hide file tree
Showing 30 changed files with 492 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ private[spark] object InternalAccumulator {

// Names of internal task level metrics
val EXECUTOR_DESERIALIZE_TIME = METRICS_PREFIX + "executorDeserializeTime"
val EXECUTOR_DESERIALIZE_CPU_TIME = METRICS_PREFIX + "executorDeserializeCpuTime"
val EXECUTOR_RUN_TIME = METRICS_PREFIX + "executorRunTime"
val EXECUTOR_CPU_TIME = METRICS_PREFIX + "executorCpuTime"
val RESULT_SIZE = METRICS_PREFIX + "resultSize"
val JVM_GC_TIME = METRICS_PREFIX + "jvmGCTime"
val RESULT_SERIALIZATION_TIME = METRICS_PREFIX + "resultSerializationTime"
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,18 @@ private[spark] class Executor(
}

override def run(): Unit = {
val threadMXBean = ManagementFactory.getThreadMXBean
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
Thread.currentThread.setContextClassLoader(replClassLoader)
val ser = env.closureSerializer.newInstance()
logInfo(s"Running $taskName (TID $taskId)")
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
var taskStart: Long = 0
var taskStartCpu: Long = 0
startGCTime = computeTotalGcTime()

try {
Expand Down Expand Up @@ -269,6 +274,9 @@ private[spark] class Executor(

// Run the actual task and measure its runtime.
taskStart = System.currentTimeMillis()
taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
var threwException = true
val value = try {
val res = task.run(
Expand Down Expand Up @@ -302,6 +310,9 @@ private[spark] class Executor(
}
}
val taskFinish = System.currentTimeMillis()
val taskFinishCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L

// If the task has been killed, let's fail it.
if (task.killed) {
Expand All @@ -317,8 +328,12 @@ private[spark] class Executor(
// includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
task.metrics.setExecutorDeserializeTime(
(taskStart - deserializeStartTime) + task.executorDeserializeTime)
task.metrics.setExecutorDeserializeCpuTime(
(taskStartCpu - deserializeStartCpuTime) + task.executorDeserializeCpuTime)
// We need to subtract Task.run()'s deserialization time to avoid double-counting
task.metrics.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
task.metrics.setExecutorCpuTime(
(taskFinishCpu - taskStartCpu) - task.executorDeserializeCpuTime)
task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
task.metrics.setResultSerializationTime(afterSerialization - beforeSerialization)

Expand Down
18 changes: 18 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, Accumulat
class TaskMetrics private[spark] () extends Serializable {
// Each metric is internally represented as an accumulator
private val _executorDeserializeTime = new LongAccumulator
private val _executorDeserializeCpuTime = new LongAccumulator
private val _executorRunTime = new LongAccumulator
private val _executorCpuTime = new LongAccumulator
private val _resultSize = new LongAccumulator
private val _jvmGCTime = new LongAccumulator
private val _resultSerializationTime = new LongAccumulator
Expand All @@ -61,11 +63,22 @@ class TaskMetrics private[spark] () extends Serializable {
*/
def executorDeserializeTime: Long = _executorDeserializeTime.sum

/**
* CPU Time taken on the executor to deserialize this task in nanoseconds.
*/
def executorDeserializeCpuTime: Long = _executorDeserializeCpuTime.sum

/**
* Time the executor spends actually running the task (including fetching shuffle data).
*/
def executorRunTime: Long = _executorRunTime.sum

/**
* CPU Time the executor spends actually running the task
* (including fetching shuffle data) in nanoseconds.
*/
def executorCpuTime: Long = _executorCpuTime.sum

/**
* The number of bytes this task transmitted back to the driver as the TaskResult.
*/
Expand Down Expand Up @@ -111,7 +124,10 @@ class TaskMetrics private[spark] () extends Serializable {
// Setters and increment-ers
private[spark] def setExecutorDeserializeTime(v: Long): Unit =
_executorDeserializeTime.setValue(v)
private[spark] def setExecutorDeserializeCpuTime(v: Long): Unit =
_executorDeserializeCpuTime.setValue(v)
private[spark] def setExecutorRunTime(v: Long): Unit = _executorRunTime.setValue(v)
private[spark] def setExecutorCpuTime(v: Long): Unit = _executorCpuTime.setValue(v)
private[spark] def setResultSize(v: Long): Unit = _resultSize.setValue(v)
private[spark] def setJvmGCTime(v: Long): Unit = _jvmGCTime.setValue(v)
private[spark] def setResultSerializationTime(v: Long): Unit =
Expand Down Expand Up @@ -188,7 +204,9 @@ class TaskMetrics private[spark] () extends Serializable {
import InternalAccumulator._
@transient private[spark] lazy val nameToAccums = LinkedHashMap(
EXECUTOR_DESERIALIZE_TIME -> _executorDeserializeTime,
EXECUTOR_DESERIALIZE_CPU_TIME -> _executorDeserializeCpuTime,
EXECUTOR_RUN_TIME -> _executorRunTime,
EXECUTOR_CPU_TIME -> _executorCpuTime,
RESULT_SIZE -> _resultSize,
JVM_GC_TIME -> _jvmGCTime,
RESULT_SERIALIZATION_TIME -> _resultSerializationTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.scheduler

import java.io._
import java.lang.management.ManagementFactory
import java.nio.ByteBuffer
import java.util.Properties

Expand Down Expand Up @@ -61,11 +62,18 @@ private[spark] class ResultTask[T, U](

override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L

func(context, rdd.iterator(partition, context))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.scheduler

import java.lang.management.ManagementFactory
import java.nio.ByteBuffer
import java.util.Properties

Expand Down Expand Up @@ -66,11 +67,18 @@ private[spark] class ShuffleMapTask(

override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L

var writer: ShuffleWriter[Any, Any] = null
try {
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ private[spark] abstract class Task[T](
@volatile @transient private var _killed = false

protected var _executorDeserializeTime: Long = 0
protected var _executorDeserializeCpuTime: Long = 0

/**
* Whether the task has been killed.
Expand All @@ -149,6 +150,7 @@ private[spark] abstract class Task[T](
* Returns the amount of time spent deserializing the RDD and function to be run.
*/
def executorDeserializeTime: Long = _executorDeserializeTime
def executorDeserializeCpuTime: Long = _executorDeserializeCpuTime

/**
* Collect the latest values of accumulators used in this task. If the task failed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ private[v1] object AllStagesResource {
numCompleteTasks = stageUiData.numCompleteTasks,
numFailedTasks = stageUiData.numFailedTasks,
executorRunTime = stageUiData.executorRunTime,
executorCpuTime = stageUiData.executorCpuTime,
submissionTime = stageInfo.submissionTime.map(new Date(_)),
firstTaskLaunchedTime,
completionTime = stageInfo.completionTime.map(new Date(_)),
Expand Down Expand Up @@ -220,7 +221,9 @@ private[v1] object AllStagesResource {
new TaskMetricDistributions(
quantiles = quantiles,
executorDeserializeTime = metricQuantiles(_.executorDeserializeTime),
executorDeserializeCpuTime = metricQuantiles(_.executorDeserializeCpuTime),
executorRunTime = metricQuantiles(_.executorRunTime),
executorCpuTime = metricQuantiles(_.executorCpuTime),
resultSize = metricQuantiles(_.resultSize),
jvmGcTime = metricQuantiles(_.jvmGCTime),
resultSerializationTime = metricQuantiles(_.resultSerializationTime),
Expand All @@ -241,7 +244,9 @@ private[v1] object AllStagesResource {
def convertUiTaskMetrics(internal: InternalTaskMetrics): TaskMetrics = {
new TaskMetrics(
executorDeserializeTime = internal.executorDeserializeTime,
executorDeserializeCpuTime = internal.executorDeserializeCpuTime,
executorRunTime = internal.executorRunTime,
executorCpuTime = internal.executorCpuTime,
resultSize = internal.resultSize,
jvmGcTime = internal.jvmGCTime,
resultSerializationTime = internal.resultSerializationTime,
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class StageData private[spark](
val numFailedTasks: Int,

val executorRunTime: Long,
val executorCpuTime: Long,
val submissionTime: Option[Date],
val firstTaskLaunchedTime: Option[Date],
val completionTime: Option[Date],
Expand Down Expand Up @@ -166,7 +167,9 @@ class TaskData private[spark](

class TaskMetrics private[spark](
val executorDeserializeTime: Long,
val executorDeserializeCpuTime: Long,
val executorRunTime: Long,
val executorCpuTime: Long,
val resultSize: Long,
val jvmGcTime: Long,
val resultSerializationTime: Long,
Expand Down Expand Up @@ -202,7 +205,9 @@ class TaskMetricDistributions private[spark](
val quantiles: IndexedSeq[Double],

val executorDeserializeTime: IndexedSeq[Double],
val executorDeserializeCpuTime: IndexedSeq[Double],
val executorRunTime: IndexedSeq[Double],
val executorCpuTime: IndexedSeq[Double],
val resultSize: IndexedSeq[Double],
val jvmGcTime: IndexedSeq[Double],
val resultSerializationTime: IndexedSeq[Double],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
val timeDelta =
taskMetrics.executorRunTime - oldMetrics.map(_.executorRunTime).getOrElse(0L)
stageData.executorRunTime += timeDelta

val cpuTimeDelta =
taskMetrics.executorCpuTime - oldMetrics.map(_.executorCpuTime).getOrElse(0L)
stageData.executorCpuTime += cpuTimeDelta
}

override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) {
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ private[spark] object UIData {
var numKilledTasks: Int = _

var executorRunTime: Long = _
var executorCpuTime: Long = _

var inputBytes: Long = _
var inputRecords: Long = _
Expand Down Expand Up @@ -137,7 +138,9 @@ private[spark] object UIData {
metrics.map { m =>
TaskMetricsUIData(
executorDeserializeTime = m.executorDeserializeTime,
executorDeserializeCpuTime = m.executorDeserializeCpuTime,
executorRunTime = m.executorRunTime,
executorCpuTime = m.executorCpuTime,
resultSize = m.resultSize,
jvmGCTime = m.jvmGCTime,
resultSerializationTime = m.resultSerializationTime,
Expand Down Expand Up @@ -179,7 +182,9 @@ private[spark] object UIData {

case class TaskMetricsUIData(
executorDeserializeTime: Long,
executorDeserializeCpuTime: Long,
executorRunTime: Long,
executorCpuTime: Long,
resultSize: Long,
jvmGCTime: Long,
resultSerializationTime: Long,
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,9 @@ private[spark] object JsonProtocol {
("Status" -> blockStatusToJson(status))
})
("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~
("Executor Deserialize CPU Time" -> taskMetrics.executorDeserializeCpuTime) ~
("Executor Run Time" -> taskMetrics.executorRunTime) ~
("Executor CPU Time" -> taskMetrics.executorCpuTime) ~
("Result Size" -> taskMetrics.resultSize) ~
("JVM GC Time" -> taskMetrics.jvmGCTime) ~
("Result Serialization Time" -> taskMetrics.resultSerializationTime) ~
Expand Down Expand Up @@ -759,7 +761,15 @@ private[spark] object JsonProtocol {
return metrics
}
metrics.setExecutorDeserializeTime((json \ "Executor Deserialize Time").extract[Long])
metrics.setExecutorDeserializeCpuTime((json \ "Executor Deserialize CPU Time") match {
case JNothing => 0
case x => x.extract[Long]
})
metrics.setExecutorRunTime((json \ "Executor Run Time").extract[Long])
metrics.setExecutorCpuTime((json \ "Executor CPU Time") match {
case JNothing => 0
case x => x.extract[Long]
})
metrics.setResultSize((json \ "Result Size").extract[Long])
metrics.setJvmGCTime((json \ "JVM GC Time").extract[Long])
metrics.setResultSerializationTime((json \ "Result Serialization Time").extract[Long])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"numCompleteTasks" : 8,
"numFailedTasks" : 0,
"executorRunTime" : 162,
"executorCpuTime" : 0,
"submissionTime" : "2015-02-03T16:43:07.191GMT",
"firstTaskLaunchedTime" : "2015-02-03T16:43:07.191GMT",
"completionTime" : "2015-02-03T16:43:07.226GMT",
Expand All @@ -31,6 +32,7 @@
"numCompleteTasks" : 8,
"numFailedTasks" : 0,
"executorRunTime" : 3476,
"executorCpuTime" : 0,
"submissionTime" : "2015-02-03T16:43:05.829GMT",
"firstTaskLaunchedTime" : "2015-02-03T16:43:05.829GMT",
"completionTime" : "2015-02-03T16:43:06.286GMT",
Expand All @@ -56,6 +58,7 @@
"numCompleteTasks" : 8,
"numFailedTasks" : 0,
"executorRunTime" : 4338,
"executorCpuTime" : 0,
"submissionTime" : "2015-02-03T16:43:04.228GMT",
"firstTaskLaunchedTime" : "2015-02-03T16:43:04.234GMT",
"completionTime" : "2015-02-03T16:43:04.819GMT",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"numCompleteTasks" : 7,
"numFailedTasks" : 1,
"executorRunTime" : 278,
"executorCpuTime" : 0,
"submissionTime" : "2015-02-03T16:43:06.296GMT",
"firstTaskLaunchedTime" : "2015-02-03T16:43:06.296GMT",
"completionTime" : "2015-02-03T16:43:06.347GMT",
Expand Down
Loading

0 comments on commit 90a30f4

Please sign in to comment.