Skip to content

Commit

Permalink
Sort Apps, add workaround for Yarn (#46)
Browse files Browse the repository at this point in the history
* Sort apps

* Fix review comments
  • Loading branch information
pdambrauskas authored May 18, 2022
1 parent 718b2d1 commit 7127b24
Show file tree
Hide file tree
Showing 16 changed files with 82 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.annotation.JsonValue;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public enum ApplicationState {
Expand Down Expand Up @@ -42,6 +43,14 @@ public static List<ApplicationState> runningStates() {
return RUNNING_STATES;
}

public static Optional<ApplicationState> from(String state) {
try {
return Optional.of(ApplicationState.valueOf(state));
} catch (IllegalArgumentException e) {
return Optional.empty();
}
}

public boolean isComplete() {
return isComplete;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ public ApplicationState processApplicationRunning(Application app) {
}

public void processApplicationError(Application application, Throwable error) {
// Workaround for case when Spark launcher logs
// `DEBUG Configuration: Handling deprecation for hive.stats.ndv.error`
// on org.apache.spark.launcher.OutputRedirector#redirect() it marks all stdout lines
// containing `error` string as Exceptions.
if (error.getMessage().contains("hive.stats.ndv.error")) {
LOG.debug("Skipping", error);
return;
}

LOG.warn("Application {} error occurred", application, error);
var appId = backend.getInfo(application).map(ApplicationInfo::getApplicationId)
.orElse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.exacaster.lighter.configuration.AppConfiguration;
import com.exacaster.lighter.spark.ConfigModifier;
import com.exacaster.lighter.spark.SparkApp;
import com.exacaster.lighter.storage.SortOrder;
import io.micronaut.scheduling.annotation.Scheduled;
import jakarta.inject.Singleton;
import java.util.List;
Expand Down Expand Up @@ -55,7 +56,7 @@ public void processScheduledBatches() throws InterruptedException {
var emptySlots = countEmptySlots();
var slotsToTake = Math.min(MAX_SLOTS_PER_ITERATION, emptySlots);
LOG.info("Processing scheduled batches, found empty slots: {}, using {}", emptySlots, slotsToTake);
var waitables = batchService.fetchByState(ApplicationState.NOT_STARTED, slotsToTake)
var waitables = batchService.fetchByState(ApplicationState.NOT_STARTED, SortOrder.ASC, 0, slotsToTake)
.stream()
.map(batch -> {
LOG.info("Launching {}", batch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.exacaster.lighter.backend.Backend;
import com.exacaster.lighter.spark.SubmitParams;
import com.exacaster.lighter.storage.ApplicationStorage;
import com.exacaster.lighter.storage.SortOrder;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -43,13 +44,13 @@ public Application update(Application application) {
return applicationStorage.saveApplication(application);
}

public List<Application> fetchByState(ApplicationState state, Integer limit) {
return applicationStorage.findApplicationsByStates(ApplicationType.BATCH, List.of(state), limit);
public List<Application> fetchByState(ApplicationState state, SortOrder order, Integer from, Integer limit) {
return applicationStorage.findApplicationsByStates(ApplicationType.BATCH, List.of(state), order, from, limit);
}

public List<Application> fetchRunning() {
return applicationStorage
.findApplicationsByStates(ApplicationType.BATCH, ApplicationState.runningStates(), Integer.MAX_VALUE);
.findApplicationsByStates(ApplicationType.BATCH, ApplicationState.runningStates(), SortOrder.ASC, 0, Integer.MAX_VALUE);
}

public Optional<Application> fetchOne(String id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.exacaster.lighter.configuration.AppConfiguration;
import com.exacaster.lighter.spark.ConfigModifier;
import com.exacaster.lighter.spark.SparkApp;
import com.exacaster.lighter.storage.SortOrder;
import io.micronaut.scheduling.annotation.Scheduled;
import jakarta.inject.Singleton;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -83,7 +84,7 @@ public void keepPermanentSessions() throws InterruptedException {
@Scheduled(fixedRate = "1m")
public void processScheduledSessions() throws InterruptedException {
assertLocked();
var waitables = sessionService.fetchByState(ApplicationState.NOT_STARTED, 10).stream()
var waitables = sessionService.fetchByState(ApplicationState.NOT_STARTED, SortOrder.ASC, 10).stream()
.map(this::launchSession)
.collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.exacaster.lighter.backend.Backend;
import com.exacaster.lighter.spark.SubmitParams;
import com.exacaster.lighter.storage.ApplicationStorage;
import com.exacaster.lighter.storage.SortOrder;
import com.exacaster.lighter.storage.StatementStorage;
import jakarta.inject.Singleton;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -58,11 +59,11 @@ public Application createSession(SubmitParams params, String sessionId) {

public List<Application> fetchRunning() {
return applicationStorage
.findApplicationsByStates(ApplicationType.SESSION, ApplicationState.runningStates(), Integer.MAX_VALUE);
.findApplicationsByStates(ApplicationType.SESSION, ApplicationState.runningStates(), SortOrder.ASC, 0, Integer.MAX_VALUE);
}

public List<Application> fetchByState(ApplicationState state, Integer limit) {
return applicationStorage.findApplicationsByStates(ApplicationType.SESSION, List.of(state), limit);
public List<Application> fetchByState(ApplicationState state, SortOrder order, Integer limit) {
return applicationStorage.findApplicationsByStates(ApplicationType.SESSION, List.of(state), order, 0, limit);
}

public Optional<Application> fetchOne(String id, boolean liveStatus) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import com.exacaster.lighter.application.Application;
import com.exacaster.lighter.application.ApplicationList;
import com.exacaster.lighter.application.ApplicationState;
import com.exacaster.lighter.application.batch.BatchService;
import com.exacaster.lighter.log.Log;
import com.exacaster.lighter.log.LogService;
import com.exacaster.lighter.spark.SubmitParams;
import com.exacaster.lighter.storage.SortOrder;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Delete;
Expand Down Expand Up @@ -35,8 +37,12 @@ public Application create(@Valid @Body SubmitParams batch) {
}

@Get
public ApplicationList get(@QueryValue(defaultValue = "0") Integer from, @QueryValue(defaultValue = "100") Integer size) {
var batches = batchService.fetch(from, size);
public ApplicationList get(@QueryValue(defaultValue = "0") Integer from,
@QueryValue(defaultValue = "100") Integer size,
@QueryValue String state) {
var batches = ApplicationState.from(state)
.map(st -> batchService.fetchByState(st, SortOrder.DESC, from, size))
.orElseGet(() -> batchService.fetch(from, size));
return new ApplicationList(from, batches.size(), batches);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ public void stateChanged(SparkAppHandle handle) {
var state = handle.getState();
LOG.info("State change. AppId: {}, State: {}", handle.getAppId(), state);
handle.getError().ifPresent(errorHandler);
// Disconnect when final or running.

// Disconnect when final or submitted.
// In case app fails after detach, status will be retrieved by ApplicationStatusHandler.
if (state != null && (state.isFinal() || State.RUNNING.equals(state))) {
if (state != null && (state.isFinal() || State.SUBMITTED.equals(state))) {
handle.disconnect();
latch.countDown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@
import com.exacaster.lighter.application.Application;
import com.exacaster.lighter.application.ApplicationState;
import com.exacaster.lighter.application.ApplicationType;
import com.exacaster.lighter.log.Log;
import java.util.List;
import java.util.Optional;

public interface ApplicationStorage {

Optional<Application> findApplication(String internalApplicationId);

List<Application> findApplications(ApplicationType type, Integer from, Integer size);

void deleteApplication(String internalApplicationId);

Application saveApplication(Application application);
List<Application> findApplicationsByStates(ApplicationType type, List<ApplicationState> states, Integer limit);

List<Application> findApplicationsByStates(ApplicationType type, List<ApplicationState> states, SortOrder order,
Integer from, Integer size);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.exacaster.lighter.storage;

public enum SortOrder {
DESC, ASC
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@
import com.exacaster.lighter.application.ApplicationType;
import com.exacaster.lighter.spark.SubmitParams;
import com.exacaster.lighter.storage.ApplicationStorage;
import com.exacaster.lighter.storage.SortOrder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Singleton;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import jakarta.inject.Singleton;
import javax.sql.DataSource;
import javax.transaction.Transactional;
import org.jdbi.v3.core.Jdbi;
Expand Down Expand Up @@ -80,10 +81,10 @@ public Application saveApplication(Application application) {
// TODO
}
var updated = handle.createUpdate("UPDATE application SET "
+ "app_id=:app_id, "
+ "app_info=:app_info, "
+ "state=:state, "
+ "contacted_at=:contacted_at WHERE id=:id")
+ "app_id=:app_id, "
+ "app_info=:app_info, "
+ "state=:state, "
+ "contacted_at=:contacted_at WHERE id=:id")
.bind("state", application.getState().name())
.bind("app_id", application.getAppId())
.bind("app_info", application.getAppInfo())
Expand Down Expand Up @@ -113,12 +114,14 @@ public Application saveApplication(Application application) {
@Override
@Transactional
public List<Application> findApplicationsByStates(ApplicationType type,
List<ApplicationState> states, Integer limit) {
List<ApplicationState> states, SortOrder order, Integer from, Integer size) {
return jdbi.withHandle(handle -> handle
.createQuery("SELECT * FROM application WHERE type=:type AND state IN (<states>) LIMIT :limit")
.createQuery("SELECT * FROM application WHERE type=:type AND state IN (<states>) ORDER BY created_at "
+ order.name() + " LIMIT :limit OFFSET :offset")
.bind("type", type.name())
.bindList("states", states.stream().map(ApplicationState::name).collect(Collectors.toList()))
.bind("limit", limit)
.bind("limit", size)
.bind("offset", from)
.map(this)
.list()
);
Expand All @@ -133,7 +136,8 @@ public Application map(ResultSet rs, StatementContext ctx) throws SQLException {
// TODO
}

var contactedAt = rs.getTimestamp("contacted_at") != null ? rs.getTimestamp("contacted_at").toLocalDateTime() : null;
var contactedAt =
rs.getTimestamp("contacted_at") != null ? rs.getTimestamp("contacted_at").toLocalDateTime() : null;
return ApplicationBuilder.builder()
.setId(rs.getString("id"))
.setType(ApplicationType.valueOf(rs.getString("type")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.exacaster.lighter.application.ApplicationStatusHandler
import com.exacaster.lighter.backend.Backend
import com.exacaster.lighter.concurrency.EmptyWaitable
import com.exacaster.lighter.configuration.AppConfiguration
import com.exacaster.lighter.storage.SortOrder
import net.javacrumbs.shedlock.core.LockAssert
import spock.lang.Specification
import spock.lang.Subject
Expand Down Expand Up @@ -37,7 +38,7 @@ class BatchHandlerTest extends Specification {

then:
_ * service.fetchRunning() >> []
1 * service.fetchByState(ApplicationState.NOT_STARTED, _) >> [app]
1 * service.fetchByState(ApplicationState.NOT_STARTED, *_) >> [app]
1 * handler.launch(app, _) >> EmptyWaitable.INSTANCE
}

Expand All @@ -50,7 +51,7 @@ class BatchHandlerTest extends Specification {

then:
_ * service.fetchRunning() >> [app]
_ * service.fetchByState(ApplicationState.NOT_STARTED, config.getMaxRunningJobs() - 1) >> []
_ * service.fetchByState(ApplicationState.NOT_STARTED, SortOrder.ASC, 0, config.getMaxRunningJobs() - 1) >> []
0 * handler.launch(app, _) >> { }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.exacaster.lighter.application.batch
import com.exacaster.lighter.application.ApplicationBuilder
import com.exacaster.lighter.application.ApplicationState
import com.exacaster.lighter.backend.Backend
import com.exacaster.lighter.storage.SortOrder
import com.exacaster.lighter.test.InMemoryStorage
import spock.lang.Specification
import spock.lang.Subject
Expand Down Expand Up @@ -37,13 +38,13 @@ class BatchServiceTest extends Specification {
resultList.size() == 1

when: "fetch by status"
resultList = service.fetchByState(ApplicationState.DEAD, 10)
resultList = service.fetchByState(ApplicationState.DEAD, SortOrder.DESC, 0, 10)

then: "returns list"
resultList.size() == 1

when: "fetch by missing status"
resultList = service.fetchByState(ApplicationState.SUCCESS, 10)
resultList = service.fetchByState(ApplicationState.SUCCESS, SortOrder.DESC, 0, 10)

then: "returns empty list"
resultList.isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.exacaster.lighter.application.ApplicationState
import com.exacaster.lighter.application.sessions.processors.StatementHandler
import com.exacaster.lighter.backend.Backend
import com.exacaster.lighter.storage.ApplicationStorage
import com.exacaster.lighter.storage.SortOrder
import com.exacaster.lighter.storage.StatementStorage
import com.exacaster.lighter.test.InMemoryStorage
import spock.lang.Specification
Expand Down Expand Up @@ -43,13 +44,13 @@ class SessionServiceTest extends Specification {
sessions.isEmpty()

when: "fetch by state"
sessions = service.fetchByState(created.state, 1)
sessions = service.fetchByState(created.state, SortOrder.DESC, 1)

then: "returns existing app"
sessions == [created]

when: "fetch by other state"
sessions = service.fetchByState(ApplicationState.IDLE, 1)
sessions = service.fetchByState(ApplicationState.IDLE, SortOrder.DESC, 1)

then: "returns empty list"
sessions.isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.exacaster.lighter.storage.jdbc
import com.exacaster.lighter.application.ApplicationBuilder
import com.exacaster.lighter.application.ApplicationState
import com.exacaster.lighter.application.ApplicationType
import com.exacaster.lighter.storage.SortOrder
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import jakarta.inject.Inject
import javax.transaction.Transactional
Expand Down Expand Up @@ -48,14 +49,14 @@ class JdbcApplicationStorageTest extends Specification {
apps.get(0).state == saved.getState()

when: "fetch by state"
apps = storage.findApplicationsByStates(ApplicationType.BATCH, [ApplicationState.ERROR], 10)
apps = storage.findApplicationsByStates(ApplicationType.BATCH, [ApplicationState.ERROR], SortOrder.DESC, 0, 10)

then: "returns apps"
apps.size() == 1
apps.get(0).id == saved.getId()

when: "fetch by missing state"
apps = storage.findApplicationsByStates(ApplicationType.BATCH, [ApplicationState.SHUTTING_DOWN], 1)
apps = storage.findApplicationsByStates(ApplicationType.BATCH, [ApplicationState.SHUTTING_DOWN], SortOrder.DESC, 0, 1)

then: "returns empty"
apps.isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.exacaster.lighter.log.Log
import com.exacaster.lighter.storage.ApplicationStorage
import com.exacaster.lighter.storage.Entity
import com.exacaster.lighter.storage.LogStorage
import com.exacaster.lighter.storage.SortOrder

import java.util.concurrent.ConcurrentHashMap
import java.util.function.Predicate
Expand Down Expand Up @@ -39,8 +40,10 @@ class InMemoryStorage implements ApplicationStorage, LogStorage {
}

@Override
List<Application> findApplicationsByStates(ApplicationType type, List<ApplicationState> states, Integer limit) {
List<Application> findApplicationsByStates(ApplicationType type, List<ApplicationState> states, SortOrder order, Integer offset, Integer limit) {
return findMany({ type == it.getType() && states.contains(it.getState()) }, Application.class)
.sorted((app1, app2) -> order == SortOrder.DESC ? app1.createdAt <=> app2.createdAt : app2.createdAt <=> app1.createdAt)
.skip(offset)
.limit(limit)
.collect(Collectors.toList())
}
Expand Down

0 comments on commit 7127b24

Please sign in to comment.