From 7127b24632c0877142ef35fd6cd5b12183f5df1a Mon Sep 17 00:00:00 2001 From: Paulius Date: Wed, 18 May 2022 13:17:24 +0300 Subject: [PATCH] Sort Apps, add workaround for Yarn (#46) * Sort apps * Fix review comments --- .../lighter/application/ApplicationState.java | 9 ++++++++ .../application/ApplicationStatusHandler.java | 9 ++++++++ .../application/batch/BatchHandler.java | 3 ++- .../application/batch/BatchService.java | 7 +++--- .../application/sessions/SessionHandler.java | 3 ++- .../application/sessions/SessionService.java | 7 +++--- .../lighter/rest/BatchController.java | 10 +++++++-- .../lighter/spark/SparkListener.java | 5 +++-- .../lighter/storage/ApplicationStorage.java | 9 ++++++-- .../exacaster/lighter/storage/SortOrder.java | 5 +++++ .../storage/jdbc/JdbcApplicationStorage.java | 22 +++++++++++-------- .../application/batch/BatchHandlerTest.groovy | 5 +++-- .../application/batch/BatchServiceTest.groovy | 5 +++-- .../sessions/SessionServiceTest.groovy | 5 +++-- .../jdbc/JdbcApplicationStorageTest.groovy | 5 +++-- .../lighter/test/InMemoryStorage.groovy | 5 ++++- 16 files changed, 82 insertions(+), 32 deletions(-) create mode 100644 server/src/main/java/com/exacaster/lighter/storage/SortOrder.java diff --git a/server/src/main/java/com/exacaster/lighter/application/ApplicationState.java b/server/src/main/java/com/exacaster/lighter/application/ApplicationState.java index 999de62d..a0fe7d72 100644 --- a/server/src/main/java/com/exacaster/lighter/application/ApplicationState.java +++ b/server/src/main/java/com/exacaster/lighter/application/ApplicationState.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.annotation.JsonValue; import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; public enum ApplicationState { @@ -42,6 +43,14 @@ public static List runningStates() { return RUNNING_STATES; } + public static Optional from(String state) { + try { + return Optional.of(ApplicationState.valueOf(state)); + } catch (IllegalArgumentException e) { + return Optional.empty(); + } + } + public boolean isComplete() { return isComplete; } diff --git a/server/src/main/java/com/exacaster/lighter/application/ApplicationStatusHandler.java b/server/src/main/java/com/exacaster/lighter/application/ApplicationStatusHandler.java index 8cb52816..a55f5d2f 100644 --- a/server/src/main/java/com/exacaster/lighter/application/ApplicationStatusHandler.java +++ b/server/src/main/java/com/exacaster/lighter/application/ApplicationStatusHandler.java @@ -51,6 +51,15 @@ public ApplicationState processApplicationRunning(Application app) { } public void processApplicationError(Application application, Throwable error) { + // Workaround for case when Spark launcher logs + // `DEBUG Configuration: Handling deprecation for hive.stats.ndv.error` + // on org.apache.spark.launcher.OutputRedirector#redirect() it marks all stdout lines + // containing `error` string as Exceptions. + if (error.getMessage().contains("hive.stats.ndv.error")) { + LOG.debug("Skipping", error); + return; + } + LOG.warn("Application {} error occurred", application, error); var appId = backend.getInfo(application).map(ApplicationInfo::getApplicationId) .orElse(null); diff --git a/server/src/main/java/com/exacaster/lighter/application/batch/BatchHandler.java b/server/src/main/java/com/exacaster/lighter/application/batch/BatchHandler.java index a4ea612a..1c426ab4 100644 --- a/server/src/main/java/com/exacaster/lighter/application/batch/BatchHandler.java +++ b/server/src/main/java/com/exacaster/lighter/application/batch/BatchHandler.java @@ -12,6 +12,7 @@ import com.exacaster.lighter.configuration.AppConfiguration; import com.exacaster.lighter.spark.ConfigModifier; import com.exacaster.lighter.spark.SparkApp; +import com.exacaster.lighter.storage.SortOrder; import io.micronaut.scheduling.annotation.Scheduled; import jakarta.inject.Singleton; import java.util.List; @@ -55,7 +56,7 @@ public void processScheduledBatches() throws InterruptedException { var emptySlots = countEmptySlots(); var slotsToTake = Math.min(MAX_SLOTS_PER_ITERATION, emptySlots); LOG.info("Processing scheduled batches, found empty slots: {}, using {}", emptySlots, slotsToTake); - var waitables = batchService.fetchByState(ApplicationState.NOT_STARTED, slotsToTake) + var waitables = batchService.fetchByState(ApplicationState.NOT_STARTED, SortOrder.ASC, 0, slotsToTake) .stream() .map(batch -> { LOG.info("Launching {}", batch); diff --git a/server/src/main/java/com/exacaster/lighter/application/batch/BatchService.java b/server/src/main/java/com/exacaster/lighter/application/batch/BatchService.java index 8daccf7d..295f746d 100644 --- a/server/src/main/java/com/exacaster/lighter/application/batch/BatchService.java +++ b/server/src/main/java/com/exacaster/lighter/application/batch/BatchService.java @@ -7,6 +7,7 @@ import com.exacaster.lighter.backend.Backend; import com.exacaster.lighter.spark.SubmitParams; import com.exacaster.lighter.storage.ApplicationStorage; +import com.exacaster.lighter.storage.SortOrder; import java.time.LocalDateTime; import java.util.List; import java.util.Optional; @@ -43,13 +44,13 @@ public Application update(Application application) { return applicationStorage.saveApplication(application); } - public List fetchByState(ApplicationState state, Integer limit) { - return applicationStorage.findApplicationsByStates(ApplicationType.BATCH, List.of(state), limit); + public List fetchByState(ApplicationState state, SortOrder order, Integer from, Integer limit) { + return applicationStorage.findApplicationsByStates(ApplicationType.BATCH, List.of(state), order, from, limit); } public List fetchRunning() { return applicationStorage - .findApplicationsByStates(ApplicationType.BATCH, ApplicationState.runningStates(), Integer.MAX_VALUE); + .findApplicationsByStates(ApplicationType.BATCH, ApplicationState.runningStates(), SortOrder.ASC, 0, Integer.MAX_VALUE); } public Optional fetchOne(String id) { diff --git a/server/src/main/java/com/exacaster/lighter/application/sessions/SessionHandler.java b/server/src/main/java/com/exacaster/lighter/application/sessions/SessionHandler.java index dd0ac896..2b9a86bc 100644 --- a/server/src/main/java/com/exacaster/lighter/application/sessions/SessionHandler.java +++ b/server/src/main/java/com/exacaster/lighter/application/sessions/SessionHandler.java @@ -15,6 +15,7 @@ import com.exacaster.lighter.configuration.AppConfiguration; import com.exacaster.lighter.spark.ConfigModifier; import com.exacaster.lighter.spark.SparkApp; +import com.exacaster.lighter.storage.SortOrder; import io.micronaut.scheduling.annotation.Scheduled; import jakarta.inject.Singleton; import java.time.LocalDateTime; @@ -83,7 +84,7 @@ public void keepPermanentSessions() throws InterruptedException { @Scheduled(fixedRate = "1m") public void processScheduledSessions() throws InterruptedException { assertLocked(); - var waitables = sessionService.fetchByState(ApplicationState.NOT_STARTED, 10).stream() + var waitables = sessionService.fetchByState(ApplicationState.NOT_STARTED, SortOrder.ASC, 10).stream() .map(this::launchSession) .collect(Collectors.toList()); diff --git a/server/src/main/java/com/exacaster/lighter/application/sessions/SessionService.java b/server/src/main/java/com/exacaster/lighter/application/sessions/SessionService.java index 13decb1b..33dc414b 100644 --- a/server/src/main/java/com/exacaster/lighter/application/sessions/SessionService.java +++ b/server/src/main/java/com/exacaster/lighter/application/sessions/SessionService.java @@ -10,6 +10,7 @@ import com.exacaster.lighter.backend.Backend; import com.exacaster.lighter.spark.SubmitParams; import com.exacaster.lighter.storage.ApplicationStorage; +import com.exacaster.lighter.storage.SortOrder; import com.exacaster.lighter.storage.StatementStorage; import jakarta.inject.Singleton; import java.time.LocalDateTime; @@ -58,11 +59,11 @@ public Application createSession(SubmitParams params, String sessionId) { public List fetchRunning() { return applicationStorage - .findApplicationsByStates(ApplicationType.SESSION, ApplicationState.runningStates(), Integer.MAX_VALUE); + .findApplicationsByStates(ApplicationType.SESSION, ApplicationState.runningStates(), SortOrder.ASC, 0, Integer.MAX_VALUE); } - public List fetchByState(ApplicationState state, Integer limit) { - return applicationStorage.findApplicationsByStates(ApplicationType.SESSION, List.of(state), limit); + public List fetchByState(ApplicationState state, SortOrder order, Integer limit) { + return applicationStorage.findApplicationsByStates(ApplicationType.SESSION, List.of(state), order, 0, limit); } public Optional fetchOne(String id, boolean liveStatus) { diff --git a/server/src/main/java/com/exacaster/lighter/rest/BatchController.java b/server/src/main/java/com/exacaster/lighter/rest/BatchController.java index 3e809af3..2943572c 100644 --- a/server/src/main/java/com/exacaster/lighter/rest/BatchController.java +++ b/server/src/main/java/com/exacaster/lighter/rest/BatchController.java @@ -2,10 +2,12 @@ import com.exacaster.lighter.application.Application; import com.exacaster.lighter.application.ApplicationList; +import com.exacaster.lighter.application.ApplicationState; import com.exacaster.lighter.application.batch.BatchService; import com.exacaster.lighter.log.Log; import com.exacaster.lighter.log.LogService; import com.exacaster.lighter.spark.SubmitParams; +import com.exacaster.lighter.storage.SortOrder; import io.micronaut.http.annotation.Body; import io.micronaut.http.annotation.Controller; import io.micronaut.http.annotation.Delete; @@ -35,8 +37,12 @@ public Application create(@Valid @Body SubmitParams batch) { } @Get - public ApplicationList get(@QueryValue(defaultValue = "0") Integer from, @QueryValue(defaultValue = "100") Integer size) { - var batches = batchService.fetch(from, size); + public ApplicationList get(@QueryValue(defaultValue = "0") Integer from, + @QueryValue(defaultValue = "100") Integer size, + @QueryValue String state) { + var batches = ApplicationState.from(state) + .map(st -> batchService.fetchByState(st, SortOrder.DESC, from, size)) + .orElseGet(() -> batchService.fetch(from, size)); return new ApplicationList(from, batches.size(), batches); } diff --git a/server/src/main/java/com/exacaster/lighter/spark/SparkListener.java b/server/src/main/java/com/exacaster/lighter/spark/SparkListener.java index 0d161d85..ce327d8e 100644 --- a/server/src/main/java/com/exacaster/lighter/spark/SparkListener.java +++ b/server/src/main/java/com/exacaster/lighter/spark/SparkListener.java @@ -26,9 +26,10 @@ public void stateChanged(SparkAppHandle handle) { var state = handle.getState(); LOG.info("State change. AppId: {}, State: {}", handle.getAppId(), state); handle.getError().ifPresent(errorHandler); - // Disconnect when final or running. + + // Disconnect when final or submitted. // In case app fails after detach, status will be retrieved by ApplicationStatusHandler. - if (state != null && (state.isFinal() || State.RUNNING.equals(state))) { + if (state != null && (state.isFinal() || State.SUBMITTED.equals(state))) { handle.disconnect(); latch.countDown(); } diff --git a/server/src/main/java/com/exacaster/lighter/storage/ApplicationStorage.java b/server/src/main/java/com/exacaster/lighter/storage/ApplicationStorage.java index 07b4e6b3..31a653a2 100644 --- a/server/src/main/java/com/exacaster/lighter/storage/ApplicationStorage.java +++ b/server/src/main/java/com/exacaster/lighter/storage/ApplicationStorage.java @@ -3,14 +3,19 @@ import com.exacaster.lighter.application.Application; import com.exacaster.lighter.application.ApplicationState; import com.exacaster.lighter.application.ApplicationType; -import com.exacaster.lighter.log.Log; import java.util.List; import java.util.Optional; public interface ApplicationStorage { + Optional findApplication(String internalApplicationId); + List findApplications(ApplicationType type, Integer from, Integer size); + void deleteApplication(String internalApplicationId); + Application saveApplication(Application application); - List findApplicationsByStates(ApplicationType type, List states, Integer limit); + + List findApplicationsByStates(ApplicationType type, List states, SortOrder order, + Integer from, Integer size); } diff --git a/server/src/main/java/com/exacaster/lighter/storage/SortOrder.java b/server/src/main/java/com/exacaster/lighter/storage/SortOrder.java new file mode 100644 index 00000000..3769b12b --- /dev/null +++ b/server/src/main/java/com/exacaster/lighter/storage/SortOrder.java @@ -0,0 +1,5 @@ +package com.exacaster.lighter.storage; + +public enum SortOrder { + DESC, ASC +} diff --git a/server/src/main/java/com/exacaster/lighter/storage/jdbc/JdbcApplicationStorage.java b/server/src/main/java/com/exacaster/lighter/storage/jdbc/JdbcApplicationStorage.java index c6e0c378..5a1d08bd 100644 --- a/server/src/main/java/com/exacaster/lighter/storage/jdbc/JdbcApplicationStorage.java +++ b/server/src/main/java/com/exacaster/lighter/storage/jdbc/JdbcApplicationStorage.java @@ -6,16 +6,17 @@ import com.exacaster.lighter.application.ApplicationType; import com.exacaster.lighter.spark.SubmitParams; import com.exacaster.lighter.storage.ApplicationStorage; +import com.exacaster.lighter.storage.SortOrder; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.micronaut.context.annotation.Requires; +import jakarta.inject.Singleton; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; -import jakarta.inject.Singleton; import javax.sql.DataSource; import javax.transaction.Transactional; import org.jdbi.v3.core.Jdbi; @@ -80,10 +81,10 @@ public Application saveApplication(Application application) { // TODO } var updated = handle.createUpdate("UPDATE application SET " - + "app_id=:app_id, " - + "app_info=:app_info, " - + "state=:state, " - + "contacted_at=:contacted_at WHERE id=:id") + + "app_id=:app_id, " + + "app_info=:app_info, " + + "state=:state, " + + "contacted_at=:contacted_at WHERE id=:id") .bind("state", application.getState().name()) .bind("app_id", application.getAppId()) .bind("app_info", application.getAppInfo()) @@ -113,12 +114,14 @@ public Application saveApplication(Application application) { @Override @Transactional public List findApplicationsByStates(ApplicationType type, - List states, Integer limit) { + List states, SortOrder order, Integer from, Integer size) { return jdbi.withHandle(handle -> handle - .createQuery("SELECT * FROM application WHERE type=:type AND state IN () LIMIT :limit") + .createQuery("SELECT * FROM application WHERE type=:type AND state IN () ORDER BY created_at " + + order.name() + " LIMIT :limit OFFSET :offset") .bind("type", type.name()) .bindList("states", states.stream().map(ApplicationState::name).collect(Collectors.toList())) - .bind("limit", limit) + .bind("limit", size) + .bind("offset", from) .map(this) .list() ); @@ -133,7 +136,8 @@ public Application map(ResultSet rs, StatementContext ctx) throws SQLException { // TODO } - var contactedAt = rs.getTimestamp("contacted_at") != null ? rs.getTimestamp("contacted_at").toLocalDateTime() : null; + var contactedAt = + rs.getTimestamp("contacted_at") != null ? rs.getTimestamp("contacted_at").toLocalDateTime() : null; return ApplicationBuilder.builder() .setId(rs.getString("id")) .setType(ApplicationType.valueOf(rs.getString("type"))) diff --git a/server/src/test/groovy/com/exacaster/lighter/application/batch/BatchHandlerTest.groovy b/server/src/test/groovy/com/exacaster/lighter/application/batch/BatchHandlerTest.groovy index 0f95e8a1..4cd7002a 100644 --- a/server/src/test/groovy/com/exacaster/lighter/application/batch/BatchHandlerTest.groovy +++ b/server/src/test/groovy/com/exacaster/lighter/application/batch/BatchHandlerTest.groovy @@ -5,6 +5,7 @@ import com.exacaster.lighter.application.ApplicationStatusHandler import com.exacaster.lighter.backend.Backend import com.exacaster.lighter.concurrency.EmptyWaitable import com.exacaster.lighter.configuration.AppConfiguration +import com.exacaster.lighter.storage.SortOrder import net.javacrumbs.shedlock.core.LockAssert import spock.lang.Specification import spock.lang.Subject @@ -37,7 +38,7 @@ class BatchHandlerTest extends Specification { then: _ * service.fetchRunning() >> [] - 1 * service.fetchByState(ApplicationState.NOT_STARTED, _) >> [app] + 1 * service.fetchByState(ApplicationState.NOT_STARTED, *_) >> [app] 1 * handler.launch(app, _) >> EmptyWaitable.INSTANCE } @@ -50,7 +51,7 @@ class BatchHandlerTest extends Specification { then: _ * service.fetchRunning() >> [app] - _ * service.fetchByState(ApplicationState.NOT_STARTED, config.getMaxRunningJobs() - 1) >> [] + _ * service.fetchByState(ApplicationState.NOT_STARTED, SortOrder.ASC, 0, config.getMaxRunningJobs() - 1) >> [] 0 * handler.launch(app, _) >> { } } diff --git a/server/src/test/groovy/com/exacaster/lighter/application/batch/BatchServiceTest.groovy b/server/src/test/groovy/com/exacaster/lighter/application/batch/BatchServiceTest.groovy index 79b5408d..bcdab238 100644 --- a/server/src/test/groovy/com/exacaster/lighter/application/batch/BatchServiceTest.groovy +++ b/server/src/test/groovy/com/exacaster/lighter/application/batch/BatchServiceTest.groovy @@ -3,6 +3,7 @@ package com.exacaster.lighter.application.batch import com.exacaster.lighter.application.ApplicationBuilder import com.exacaster.lighter.application.ApplicationState import com.exacaster.lighter.backend.Backend +import com.exacaster.lighter.storage.SortOrder import com.exacaster.lighter.test.InMemoryStorage import spock.lang.Specification import spock.lang.Subject @@ -37,13 +38,13 @@ class BatchServiceTest extends Specification { resultList.size() == 1 when: "fetch by status" - resultList = service.fetchByState(ApplicationState.DEAD, 10) + resultList = service.fetchByState(ApplicationState.DEAD, SortOrder.DESC, 0, 10) then: "returns list" resultList.size() == 1 when: "fetch by missing status" - resultList = service.fetchByState(ApplicationState.SUCCESS, 10) + resultList = service.fetchByState(ApplicationState.SUCCESS, SortOrder.DESC, 0, 10) then: "returns empty list" resultList.isEmpty() diff --git a/server/src/test/groovy/com/exacaster/lighter/application/sessions/SessionServiceTest.groovy b/server/src/test/groovy/com/exacaster/lighter/application/sessions/SessionServiceTest.groovy index 90e248e9..3482c9a2 100644 --- a/server/src/test/groovy/com/exacaster/lighter/application/sessions/SessionServiceTest.groovy +++ b/server/src/test/groovy/com/exacaster/lighter/application/sessions/SessionServiceTest.groovy @@ -4,6 +4,7 @@ import com.exacaster.lighter.application.ApplicationState import com.exacaster.lighter.application.sessions.processors.StatementHandler import com.exacaster.lighter.backend.Backend import com.exacaster.lighter.storage.ApplicationStorage +import com.exacaster.lighter.storage.SortOrder import com.exacaster.lighter.storage.StatementStorage import com.exacaster.lighter.test.InMemoryStorage import spock.lang.Specification @@ -43,13 +44,13 @@ class SessionServiceTest extends Specification { sessions.isEmpty() when: "fetch by state" - sessions = service.fetchByState(created.state, 1) + sessions = service.fetchByState(created.state, SortOrder.DESC, 1) then: "returns existing app" sessions == [created] when: "fetch by other state" - sessions = service.fetchByState(ApplicationState.IDLE, 1) + sessions = service.fetchByState(ApplicationState.IDLE, SortOrder.DESC, 1) then: "returns empty list" sessions.isEmpty() diff --git a/server/src/test/groovy/com/exacaster/lighter/storage/jdbc/JdbcApplicationStorageTest.groovy b/server/src/test/groovy/com/exacaster/lighter/storage/jdbc/JdbcApplicationStorageTest.groovy index 82ac7fb9..3fff2b1a 100644 --- a/server/src/test/groovy/com/exacaster/lighter/storage/jdbc/JdbcApplicationStorageTest.groovy +++ b/server/src/test/groovy/com/exacaster/lighter/storage/jdbc/JdbcApplicationStorageTest.groovy @@ -3,6 +3,7 @@ package com.exacaster.lighter.storage.jdbc import com.exacaster.lighter.application.ApplicationBuilder import com.exacaster.lighter.application.ApplicationState import com.exacaster.lighter.application.ApplicationType +import com.exacaster.lighter.storage.SortOrder import io.micronaut.test.extensions.spock.annotation.MicronautTest import jakarta.inject.Inject import javax.transaction.Transactional @@ -48,14 +49,14 @@ class JdbcApplicationStorageTest extends Specification { apps.get(0).state == saved.getState() when: "fetch by state" - apps = storage.findApplicationsByStates(ApplicationType.BATCH, [ApplicationState.ERROR], 10) + apps = storage.findApplicationsByStates(ApplicationType.BATCH, [ApplicationState.ERROR], SortOrder.DESC, 0, 10) then: "returns apps" apps.size() == 1 apps.get(0).id == saved.getId() when: "fetch by missing state" - apps = storage.findApplicationsByStates(ApplicationType.BATCH, [ApplicationState.SHUTTING_DOWN], 1) + apps = storage.findApplicationsByStates(ApplicationType.BATCH, [ApplicationState.SHUTTING_DOWN], SortOrder.DESC, 0, 1) then: "returns empty" apps.isEmpty() diff --git a/server/src/test/groovy/com/exacaster/lighter/test/InMemoryStorage.groovy b/server/src/test/groovy/com/exacaster/lighter/test/InMemoryStorage.groovy index 40eddcf0..edd2391d 100644 --- a/server/src/test/groovy/com/exacaster/lighter/test/InMemoryStorage.groovy +++ b/server/src/test/groovy/com/exacaster/lighter/test/InMemoryStorage.groovy @@ -7,6 +7,7 @@ import com.exacaster.lighter.log.Log import com.exacaster.lighter.storage.ApplicationStorage import com.exacaster.lighter.storage.Entity import com.exacaster.lighter.storage.LogStorage +import com.exacaster.lighter.storage.SortOrder import java.util.concurrent.ConcurrentHashMap import java.util.function.Predicate @@ -39,8 +40,10 @@ class InMemoryStorage implements ApplicationStorage, LogStorage { } @Override - List findApplicationsByStates(ApplicationType type, List states, Integer limit) { + List findApplicationsByStates(ApplicationType type, List states, SortOrder order, Integer offset, Integer limit) { return findMany({ type == it.getType() && states.contains(it.getState()) }, Application.class) + .sorted((app1, app2) -> order == SortOrder.DESC ? app1.createdAt <=> app2.createdAt : app2.createdAt <=> app1.createdAt) + .skip(offset) .limit(limit) .collect(Collectors.toList()) }