Skip to content

Commit

Permalink
Detach when app is running (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
pdambrauskas authored May 11, 2022
1 parent a8f5824 commit 718b2d1
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,27 @@ public Waitable launch(Application application, Consumer<Throwable> errorHandler
return app.launch();
}

@SchedulerLock(name = "keepPermanentSession")
@SchedulerLock(name = "keepPermanentSession", lockAtLeastFor = "1m")
@Scheduled(fixedRate = "1m")
public void keepPermanentSessions() throws InterruptedException {
assertLocked();
LOG.info("Start provisioning permanent sessions.");
for (var sessionConf : appConfiguration.getSessionConfiguration().getPermanentSessions()) {
var session = sessionService.fetchOne(sessionConf.getId());
if (session.map(Application::getState).filter(this::running).isEmpty() ||
session.flatMap(backend::getInfo).map(ApplicationInfo::getState).filter(this::running).isEmpty()) {
sessionService.deleteOne(sessionConf.getId());
launchSession(sessionService.createSession(sessionConf.getSubmitParams(), sessionConf.getId()))
.waitCompletion();
LOG.info("Permanent session {} needs to be (re)started.", sessionConf.getId());
var sessionToLaunch = sessionService.createSession(
sessionConf.getSubmitParams(),
sessionConf.getId()
);

sessionService.deleteOne(sessionToLaunch);
launchSession(sessionToLaunch).waitCompletion();
LOG.info("Permanent session {} (re)started.", sessionConf.getId());
}
}
LOG.info("End provisioning permanent sessions.");
}

@SchedulerLock(name = "processScheduledSessions")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

@Singleton
public class SessionService {

private final ApplicationStorage applicationStorage;
private final StatementStorage statementStorage;
private final Backend backend;
Expand Down Expand Up @@ -86,10 +87,12 @@ public Optional<Application> fetchOne(String id) {
}

public void deleteOne(String id) {
this.fetchOne(id).ifPresent(app -> {
backend.kill(app);
applicationStorage.deleteApplication(id);
});
this.fetchOne(id).ifPresent(this::deleteOne);
}

public void deleteOne(Application app) {
backend.kill(app);
applicationStorage.deleteApplication(app.getId());
}

public void killOne(Application app) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.function.Consumer;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkAppHandle.Listener;
import org.apache.spark.launcher.SparkAppHandle.State;
import org.slf4j.Logger;

public class SparkListener implements Listener, Waitable {
Expand All @@ -22,9 +23,12 @@ public SparkListener(Consumer<Throwable> errorHandler) {

@Override
public void stateChanged(SparkAppHandle handle) {
LOG.info("State change. AppId: {}, State: {}", handle.getAppId(), handle.getState());
var state = handle.getState();
LOG.info("State change. AppId: {}, State: {}", handle.getAppId(), state);
handle.getError().ifPresent(errorHandler);
if (handle.getState() != null && handle.getState().isFinal()) {
// Disconnect when final or running.
// In case app fails after detach, status will be retrieved by ApplicationStatusHandler.
if (state != null && (state.isFinal() || State.RUNNING.equals(state))) {
handle.disconnect();
latch.countDown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class SessionHandlerTest extends Specification {
handler.keepPermanentSessions()

then: "restart permanent session"
1 * service.deleteOne(session.id)
1 * service.deleteOne({ it -> it.getId() == session.getId() })
1 * service.createSession(session.submitParams, session.id) >> permanentSession
1 * handler.launch(permanentSession, _) >> EmptyWaitable.INSTANCE
}
Expand Down

0 comments on commit 718b2d1

Please sign in to comment.