Skip to content

Commit

Permalink
[SPARK-17365][CORE] Remove/Kill multiple executors together to reduce…
Browse files Browse the repository at this point in the history
… RPC call time.

## What changes were proposed in this pull request?
We are killing multiple executors together instead of iterating over expensive RPC calls to kill single executor.

## How was this patch tested?
Executed sample spark job to observe executors being killed/removed with dynamic allocation enabled.

Author: Dhruve Ashar <[email protected]>
Author: Dhruve Ashar <[email protected]>

Closes apache#15152 from dhruve/impr/SPARK-17365.
  • Loading branch information
Dhruve Ashar authored and Marcelo Vanzin committed Sep 22, 2016
1 parent 8a02410 commit 17b72d3
Show file tree
Hide file tree
Showing 11 changed files with 239 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,16 @@ private[spark] trait ExecutorAllocationClient {

/**
* Request that the cluster manager kill the specified executors.
* @return whether the request is acknowledged by the cluster manager.
* @return the ids of the executors acknowledged by the cluster manager to be removed.
*/
def killExecutors(executorIds: Seq[String]): Boolean
def killExecutors(executorIds: Seq[String]): Seq[String]

/**
* Request that the cluster manager kill the specified executor.
* @return whether the request is acknowledged by the cluster manager.
*/
def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId))
def killExecutor(executorId: String): Boolean = {
val killedExecutors = killExecutors(Seq(executorId))
killedExecutors.nonEmpty && killedExecutors(0).equals(executorId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark
import java.util.concurrent.TimeUnit

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.control.ControlThrowable

import com.codahale.metrics.{Gauge, MetricRegistry}
Expand Down Expand Up @@ -279,14 +280,18 @@ private[spark] class ExecutorAllocationManager(

updateAndSyncNumExecutorsTarget(now)

val executorIdsToBeRemoved = ArrayBuffer[String]()
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
if (expired) {
initializing = false
removeExecutor(executorId)
executorIdsToBeRemoved += executorId
}
!expired
}
if (executorIdsToBeRemoved.nonEmpty) {
removeExecutors(executorIdsToBeRemoved)
}
}

/**
Expand Down Expand Up @@ -391,11 +396,67 @@ private[spark] class ExecutorAllocationManager(
}
}

/**
* Request the cluster manager to remove the given executors.
* Returns the list of executors which are removed.
*/
private def removeExecutors(executors: Seq[String]): Seq[String] = synchronized {
val executorIdsToBeRemoved = new ArrayBuffer[String]

logInfo("Request to remove executorIds: " + executors.mkString(", "))
val numExistingExecutors = allocationManager.executorIds.size - executorsPendingToRemove.size

var newExecutorTotal = numExistingExecutors
executors.foreach { executorIdToBeRemoved =>
if (newExecutorTotal - 1 < minNumExecutors) {
logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " +
s"$newExecutorTotal executor(s) left (limit $minNumExecutors)")
} else if (canBeKilled(executorIdToBeRemoved)) {
executorIdsToBeRemoved += executorIdToBeRemoved
newExecutorTotal -= 1
}
}

if (executorIdsToBeRemoved.isEmpty) {
return Seq.empty[String]
}

// Send a request to the backend to kill this executor(s)
val executorsRemoved = if (testing) {
executorIdsToBeRemoved
} else {
client.killExecutors(executorIdsToBeRemoved)
}
// reset the newExecutorTotal to the existing number of executors
newExecutorTotal = numExistingExecutors
if (testing || executorsRemoved.nonEmpty) {
executorsRemoved.foreach { removedExecutorId =>
newExecutorTotal -= 1
logInfo(s"Removing executor $removedExecutorId because it has been idle for " +
s"$executorIdleTimeoutS seconds (new desired total will be $newExecutorTotal)")
executorsPendingToRemove.add(removedExecutorId)
}
executorsRemoved
} else {
logWarning(s"Unable to reach the cluster manager to kill executor/s " +
"executorIdsToBeRemoved.mkString(\",\") or no executor eligible to kill!")
Seq.empty[String]
}
}

/**
* Request the cluster manager to remove the given executor.
* Return whether the request is received.
* Return whether the request is acknowledged.
*/
private def removeExecutor(executorId: String): Boolean = synchronized {
val executorsRemoved = removeExecutors(Seq(executorId))
executorsRemoved.nonEmpty && executorsRemoved(0) == executorId
}

/**
* Determine if the given executor can be killed.
*/
private def canBeKilled(executorId: String): Boolean = synchronized {
// Do not kill the executor if we are not aware of it (should never happen)
if (!executorIds.contains(executorId)) {
logWarning(s"Attempted to remove unknown executor $executorId!")
Expand All @@ -409,26 +470,7 @@ private[spark] class ExecutorAllocationManager(
return false
}

// Do not kill the executor if we have already reached the lower bound
val numExistingExecutors = executorIds.size - executorsPendingToRemove.size
if (numExistingExecutors - 1 < minNumExecutors) {
logDebug(s"Not removing idle executor $executorId because there are only " +
s"$numExistingExecutors executor(s) left (limit $minNumExecutors)")
return false
}

// Send a request to the backend to kill this executor
val removeRequestAcknowledged = testing || client.killExecutor(executorId)
if (removeRequestAcknowledged) {
logInfo(s"Removing executor $executorId because it has been idle for " +
s"$executorIdleTimeoutS seconds (new desired total will be ${numExistingExecutors - 1})")
executorsPendingToRemove.add(executorId)
true
} else {
logWarning(s"Unable to reach the cluster manager to kill executor $executorId," +
s"or no executor eligible to kill!")
false
}
true
}

/**
Expand Down
24 changes: 15 additions & 9 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ import org.apache.spark.util._
* @param config a Spark Config object describing the application configuration. Any settings in
* this config overrides the default configs as well as system properties.
*/
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
class SparkContext(config: SparkConf) extends Logging {

// The call site where this SparkContext was constructed.
private val creationSite: CallSite = Utils.getCallSite()
Expand Down Expand Up @@ -534,7 +534,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
_executorAllocationManager =
if (dynamicAllocationEnabled) {
Some(new ExecutorAllocationManager(this, listenerBus, _conf))
schedulerBackend match {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
case _ =>
None
}
} else {
None
}
Expand Down Expand Up @@ -1473,7 +1479,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
listenerBus.addListener(listener)
}

private[spark] override def getExecutorIds(): Seq[String] = {
private[spark] def getExecutorIds(): Seq[String] = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.getExecutorIds()
Expand All @@ -1498,7 +1504,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @return whether the request is acknowledged by the cluster manager.
*/
@DeveloperApi
override def requestTotalExecutors(
def requestTotalExecutors(
numExecutors: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: scala.collection.immutable.Map[String, Int]
Expand All @@ -1518,7 +1524,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @return whether the request is received.
*/
@DeveloperApi
override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
def requestExecutors(numAdditionalExecutors: Int): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestExecutors(numAdditionalExecutors)
Expand All @@ -1540,10 +1546,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @return whether the request is received.
*/
@DeveloperApi
override def killExecutors(executorIds: Seq[String]): Boolean = {
def killExecutors(executorIds: Seq[String]): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutors(executorIds, replace = false, force = true)
b.killExecutors(executorIds, replace = false, force = true).nonEmpty
case _ =>
logWarning("Killing executors is only supported in coarse-grained mode")
false
Expand All @@ -1562,7 +1568,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @return whether the request is received.
*/
@DeveloperApi
override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId)
def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId))

/**
* Request that the cluster manager kill the specified executor without adjusting the
Expand All @@ -1581,7 +1587,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutors(Seq(executorId), replace = true, force = true)
b.killExecutors(Seq(executorId), replace = true, force = true).nonEmpty
case _ =>
logWarning("Killing executors is only supported in coarse-grained mode")
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* @return whether the kill request is acknowledged. If list to kill is empty, it will return
* false.
*/
final override def killExecutors(executorIds: Seq[String]): Boolean = {
final override def killExecutors(executorIds: Seq[String]): Seq[String] = {
killExecutors(executorIds, replace = false, force = false)
}

Expand All @@ -548,7 +548,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
final def killExecutors(
executorIds: Seq[String],
replace: Boolean,
force: Boolean): Boolean = {
force: Boolean): Seq[String] = {
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")

val response = synchronized {
Expand All @@ -564,6 +564,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
.filter { id => force || !scheduler.isExecutorBusy(id) }
executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace }

logInfo(s"Actual list of executor(s) to be killed is ${executorsToKill.mkString(", ")}")

// If we do not wish to replace the executors we kill, sync the target number of executors
// with the cluster manager to avoid allocating new ones. When computing the new target,
// take into account executors that are pending to be added or removed.
Expand All @@ -583,7 +585,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
_ => Future.successful(false)
}

adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
val killResponse = adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)

killResponse.flatMap(killSuccessful =>
Future.successful (if (killSuccessful) executorsToKill else Seq.empty[String])
)(ThreadUtils.sameThread)
}

defaultAskTimeout.awaitResult(response)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
org.apache.spark.scheduler.DummyExternalClusterManager
org.apache.spark.scheduler.MockExternalClusterManager
org.apache.spark.scheduler.MockExternalClusterManager
org.apache.spark.DummyLocalExternalClusterManager
Loading

0 comments on commit 17b72d3

Please sign in to comment.