Skip to content

Commit

Permalink
Remove unused RPCs and disable reconfiguration (#2970)
Browse files Browse the repository at this point in the history
As discussed in #2950, we plan to remove obsolete RPCs and reconfiguration-related RPCs in this PR. We will bring reconfiguration back after #2950.

Removed:
1. QueryCurrentInputTuple.
2. ShutdownDPThread.

Disabled, will be added later:
1. Reconfiguration.
2. UpdateExecutor.
3. UpdateMultipleExecutors.
  • Loading branch information
shengquan-ni authored and PurelyBlank committed Dec 4, 2024
1 parent 8f9031d commit 1c6d095
Show file tree
Hide file tree
Showing 15 changed files with 45 additions and 312 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ class ControllerAsyncRPCHandlerInitializer(
with PortCompletedHandler
with ConsoleMessageHandler
with RetryWorkflowHandler
with ModifyLogicHandler
with EvaluatePythonExpressionHandler
with DebugCommandHandler
with TakeGlobalCheckpointHandler
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package edu.uci.ics.amber.engine.architecture.controller

import edu.uci.ics.amber.engine.common.model.tuple.Tuple
import edu.uci.ics.amber.engine.common.rpc.AsyncRPCServer.ControlCommand
import edu.uci.ics.amber.engine.common.virtualidentity.ActorVirtualIdentity
import edu.uci.ics.amber.engine.common.workflowruntimestate.{
Expand All @@ -16,11 +15,6 @@ object ControllerEvent {
operatorMetrics: Map[String, OperatorMetrics]
) extends ControlCommand[Unit]

case class ReportCurrentProcessingTuple(
operatorID: String,
tuple: Array[(Tuple, ActorVirtualIdentity)]
) extends ControlCommand[Unit]

case class WorkerAssignmentUpdate(workerMapping: Map[String, Seq[String]])
extends ControlCommand[Unit]

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,13 @@ package edu.uci.ics.amber.engine.architecture.controller.promisehandlers
import com.twitter.util.Future
import edu.uci.ics.amber.engine.architecture.controller.ControllerEvent.{
ExecutionStateUpdate,
ExecutionStatsUpdate,
ReportCurrentProcessingTuple
ExecutionStatsUpdate
}
import edu.uci.ics.amber.engine.architecture.controller.promisehandlers.PauseHandler.PauseWorkflow
import edu.uci.ics.amber.engine.architecture.controller.ControllerAsyncRPCHandlerInitializer
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.PauseHandler.PauseWorker
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.QueryCurrentInputTupleHandler.QueryCurrentInputTuple
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.QueryStatisticsHandler.QueryStatistics
import edu.uci.ics.amber.engine.common.model.tuple.Tuple
import edu.uci.ics.amber.engine.common.rpc.AsyncRPCServer.ControlCommand
import edu.uci.ics.amber.engine.common.virtualidentity.ActorVirtualIdentity

import scala.collection.mutable

object PauseHandler {

Expand All @@ -38,9 +32,6 @@ trait PauseHandler {
.flatMap(_.getAllOperatorExecutions)
.map {
case (physicalOpId, opExecution) =>
// create a buffer for the current input tuple
// since we need to show them on the frontend
val buffer = mutable.ArrayBuffer[(Tuple, ActorVirtualIdentity)]()
Future
.collect(
opExecution.getWorkerIds
Expand All @@ -52,22 +43,13 @@ trait PauseHandler {
send(PauseWorker(), worker).flatMap { state =>
workerExecution.setState(state)
send(QueryStatistics(), worker)
.join(send(QueryCurrentInputTuple(), worker))
// get the stats and current input tuple from the worker
.map {
case (metrics, tuple) =>
workerExecution.setStats(metrics.workerStatistics)
buffer.append((tuple, worker))
.map { metrics =>
workerExecution.setStats(metrics.workerStatistics)
}
}
}.toSeq
)
.map { _ =>
// for each paused operator, send the input tuple
sendToClient(
ReportCurrentProcessingTuple(physicalOpId.logicalOpId.id, buffer.toArray)
)
}
}
.toSeq
)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,12 @@ class DataProcessorRPCHandlerInitializer(val dp: DataProcessor)
with OpenExecutorHandler
with PauseHandler
with AddPartitioningHandler
with QueryCurrentInputTupleHandler
with QueryStatisticsHandler
with ResumeHandler
with StartHandler
with AssignPortHandler
with AddInputChannelHandler
with ShutdownDPThreadHandler
with FlushNetworkBufferHandler
with UpdateExecutorHandler
with RetrieveStateHandler
with PrepareCheckpointHandler
with FinalizeCheckpointHandler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.AddPartition
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.AssignPortHandler.AssignPort
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.OpenExecutorHandler.OpenExecutor
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.PauseHandler.PauseWorker
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.QueryCurrentInputTupleHandler.QueryCurrentInputTuple
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.QueryStatisticsHandler.QueryStatistics
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.ResumeHandler.ResumeWorker
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.StartHandler.StartWorker
Expand Down Expand Up @@ -48,8 +47,6 @@ object ControlCommandConvertUtils {
AddInputChannelV2(channelId, portId)
case QueryStatistics() =>
QueryStatisticsV2()
case QueryCurrentInputTuple() =>
QueryCurrentInputTupleV2()
case InitializeExecutor(_, opExecInitInfo, isSource) =>
val (code, language) = opExecInitInfo.asInstanceOf[OpExecInitInfoWithCode].codeGen(0, 0)
InitializeExecutorV2(
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import edu.uci.ics.texera.web.model.websocket.response.{HeartBeatResponse, Modif
new Type(value = classOf[OperatorStatisticsUpdateEvent]),
new Type(value = classOf[WebResultUpdateEvent]),
new Type(value = classOf[ConsoleUpdateEvent]),
new Type(value = classOf[OperatorCurrentTuplesUpdateEvent]),
new Type(value = classOf[CacheStatusUpdateEvent]),
new Type(value = classOf[PaginatedResultEvent]),
new Type(value = classOf[PythonExpressionEvaluateResponse]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,25 @@ import edu.uci.ics.amber.engine.architecture.worker.controlcommands.ConsoleMessa
import edu.uci.ics.amber.engine.common.{AmberConfig, VirtualIdentityUtils}
import edu.uci.ics.amber.engine.common.client.AmberClient
import edu.uci.ics.amber.engine.common.virtualidentity.ActorVirtualIdentity
import edu.uci.ics.amber.engine.common.workflowruntimestate.WorkflowAggregatedState.{
RESUMING,
RUNNING
}
import edu.uci.ics.texera.web.model.websocket.event.TexeraWebSocketEvent
import edu.uci.ics.texera.web.model.websocket.event.python.ConsoleUpdateEvent
import edu.uci.ics.texera.web.model.websocket.request.RetryRequest
import edu.uci.ics.texera.web.model.websocket.request.python.{
DebugCommandRequest,
PythonExpressionEvaluateRequest
}
import edu.uci.ics.texera.web.model.websocket.response.python.PythonExpressionEvaluateResponse
import edu.uci.ics.texera.web.storage.ExecutionStateStore
import edu.uci.ics.texera.web.storage.ExecutionStateStore.updateWorkflowState
import edu.uci.ics.amber.engine.common.workflowruntimestate.WorkflowAggregatedState.{
RESUMING,
RUNNING
}
import edu.uci.ics.amber.engine.common.workflowruntimestate.{
EvaluatedValueList,
ExecutionConsoleStore,
OperatorConsole
}
import edu.uci.ics.texera.web.model.websocket.request.RetryRequest
import edu.uci.ics.texera.web.storage.ExecutionStateStore.updateWorkflowState
import edu.uci.ics.texera.web.{SubscriptionManager, WebsocketInput}

import java.time.Instant
Expand Down
Loading

0 comments on commit 1c6d095

Please sign in to comment.