Skip to content

Commit

Permalink
Merge pull request #655 from exacaster/configure-scheduled-tasks
Browse files Browse the repository at this point in the history
Parametrise session properties
  • Loading branch information
pdambrauskas authored Sep 14, 2023
2 parents af23d95 + 3a42f02 commit 64a1f88
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 21 deletions.
6 changes: 4 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ Lighter can be configured by using environment variables. Currently, Lighter sup
| LIGHTER_EXTERNAL_LOGS_URL_TEMPLATE | Template for link to external logs (Grafana, Graylog, etc.) used on the Lighter UI. Allowed placeholders: `{{id}}`, `{{appId}}`, `{{createdTs}}` | |
| LIGHTER_PY_GATEWAY_PORT | Port for live Spark session communication | 25333 |
| LIGHTER_URL | URL which can be used to access Lighter form Spark Job | http://lighter.spark:8080 |
| LIGHTER_SESSION_TIMEOUT_MINUTES | Session lifetime in minutes (from last statement creation). Use negative value to disable | 90 |
| LIGHTER_SESSION_TIMEOUT_ACTIVE | Should Lighter kill sessions with waiting statements (obsolete when `LIGHTER_SESSION_TIMEOUT_MINUTES` is negative) | false |
| LIGHTER_SESSION_TIMEOUT_INTERVAL | `java.time.Duration` representing session lifetime (from last statement creation). Use `0m` value to disable | 90m |
| LIGHTER_SESSION_TIMEOUT_ACTIVE | Should Lighter kill sessions with waiting statements (obsolete when `LIGHTER_SESSION_TIMEOUT_INTERVAL` is `0m`) | false |
| LIGHTER_SESSION_SCHEDULE_INTERVAL | `java.time.Duration` representing the interval at which a task is triggered to initiate scheduled sessions | 1m |
| LIGHTER_SESSION_TRACK_RUNNING_INTERVAL | `java.time.Duration` representing the interval at which a task is triggered to process and update running session state | 2m |
| LIGHTER_STORAGE_JDBC_URL | JDBC url for lighter storage | jdbc:h2:mem:lighter |
| LIGHTER_STORAGE_JDBC_USERNAME | JDBC username | sa |
| LIGHTER_STORAGE_JDBC_PASSWORD | JDBC password | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void keepPermanentSessions() throws InterruptedException {
}

@SchedulerLock(name = "processScheduledSessions")
@Scheduled(fixedRate = "1m")
@Scheduled(fixedRate = "${lighter.session.schedule-interval}")
public void processScheduledSessions() throws InterruptedException {
assertLocked();
var waitables = sessionService.fetchByState(ApplicationState.NOT_STARTED, SortOrder.ASC, 10).stream()
Expand All @@ -93,7 +93,7 @@ private Waitable launchSession(Application session) {
}

@SchedulerLock(name = "trackRunningSessions", lockAtMostFor = "1m")
@Scheduled(fixedRate = "2m")
@Scheduled(fixedRate = "${lighter.session.track-running-interval}")
public void trackRunning() {
assertLocked();
var running = sessionService.fetchRunning();
Expand All @@ -110,14 +110,14 @@ public void trackRunning() {
public void handleTimeout() {
assertLocked();
var sessionConfiguration = appConfiguration.getSessionConfiguration();
var timeout = sessionConfiguration.getTimeoutMinutes();
if (timeout != null && timeout > 0) {
var timeoutInterval = sessionConfiguration.getTimeoutInterval();
if (timeoutInterval != null && !timeoutInterval.isZero()) {
sessionService.fetchRunning()
.stream()
.filter(s -> isNotPermanent(sessionConfiguration, s))
.filter(s -> sessionConfiguration.shouldTimeoutActive() || !sessionService.isActive(s))
.filter(s -> sessionService.lastUsed(s.getId()).isBefore(LocalDateTime.now().minusMinutes(timeout)))
.peek(s -> LOG.info("Killing because of timeout {}, session: {}", timeout, s))
.filter(s -> sessionService.lastUsed(s.getId()).isBefore(LocalDateTime.now().minus(timeoutInterval)))
.peek(s -> LOG.info("Killing because of timeout {}, session: {}", timeoutInterval, s))
.forEach(sessionService::killOne);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import io.micronaut.core.annotation.Introspected;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.convert.format.MapFormat;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
Expand Down Expand Up @@ -140,21 +142,27 @@ public String toString() {
@ConfigurationProperties("session")
public static class SessionConfiguration {

private final Integer timeoutMinutes;
private final Duration timeoutInterval;
private final Boolean timeoutActive;
private final List<PermanentSession> permanentSessions;
private final Duration scheduleInterval;
private final Duration trackRunningInterval;

@ConfigurationInject
public SessionConfiguration(@Nullable Integer timeoutMinutes,
public SessionConfiguration(@Nullable Duration timeoutInterval,
Boolean timeoutActive,
List<PermanentSession> permanentSessions) {
this.timeoutMinutes = timeoutMinutes;
List<PermanentSession> permanentSessions,
Duration scheduleInterval,
Duration trackRunningInterval) {
this.timeoutInterval = timeoutInterval;
this.timeoutActive = timeoutActive;
this.permanentSessions = permanentSessions;
this.scheduleInterval = scheduleInterval;
this.trackRunningInterval = trackRunningInterval;
}

public Integer getTimeoutMinutes() {
return timeoutMinutes;
public Duration getTimeoutInterval() {
return timeoutInterval;
}

public boolean shouldTimeoutActive() {
Expand All @@ -165,11 +173,21 @@ public List<PermanentSession> getPermanentSessions() {
return permanentSessions;
}

public Duration getScheduleInterval() {
return scheduleInterval;
}

public Duration getTrackRunningInterval() {
return trackRunningInterval;
}

@Override
public String toString() {
return new StringJoiner(", ", SessionConfiguration.class.getSimpleName() + "[", "]")
.add("timeoutMinutes=" + timeoutMinutes)
.add("timeoutMinutes=" + timeoutInterval)
.add("permanentSessions=" + permanentSessions)
.add("scheduleIntervalSeconds=" + scheduleInterval)
.add("trackRunningInterval=" + trackRunningInterval)
.toString();
}
}
Expand Down
4 changes: 3 additions & 1 deletion server/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ lighter:
py-gateway-port: 25333
url: http://lighter.spark:8080
session:
timeout-minutes: 90
timeout-interval: 90m
timeout-active: false
permanent-sessions: []
schedule-interval: 1m
track-running-interval: 2m
kubernetes:
enabled: false
master: k8s://kubernetes.default.svc.cluster.local:443
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class SessionHandlerTest extends Specification {
def "kills timeouted sessions"() {
given:
def oldSession = newSession()
service.lastUsed(oldSession.id) >> LocalDateTime.now().minusMinutes(conf.sessionConfiguration.timeoutMinutes + 1)
service.lastUsed(oldSession.id) >> LocalDateTime.now() - conf.sessionConfiguration.timeoutInterval.plusMinutes(1)

def newSession = app()
service.lastUsed(newSession.id) >> newSession.createdAt
Expand All @@ -62,7 +62,7 @@ class SessionHandlerTest extends Specification {
def "preserves active timeouted sessions"() {
given:
def oldSession = newSession()
service.lastUsed(oldSession.id) >> LocalDateTime.now().minusMinutes(conf.sessionConfiguration.timeoutMinutes + 1)
service.lastUsed(oldSession.id) >> LocalDateTime.now() - conf.sessionConfiguration.timeoutInterval.plusMinutes(1)
service.isActive(oldSession) >> true

1 * service.fetchRunning() >> [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class AppConfigurationTest extends Specification {
def "binds properties form yaml"() {
expect:
appConfiguration.maxRunningJobs == 5
appConfiguration.sessionConfiguration.timeoutMinutes == 90
appConfiguration.sessionConfiguration.timeoutInterval.toMinutes() == 90
appConfiguration.sessionConfiguration.permanentSessions.size() == 1
appConfiguration.sessionConfiguration.permanentSessions.get(0).id == "permanentId1"
appConfiguration.sessionConfiguration.permanentSessions.get(0).submitParams.conf == [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.exacaster.lighter.backend.kubernetes.KubernetesProperties
import com.exacaster.lighter.configuration.AppConfiguration
import com.exacaster.lighter.application.SubmitParams

import java.time.Duration
import java.time.LocalDateTime

class Factories {
Expand Down Expand Up @@ -62,8 +63,9 @@ class Factories {
null,
5432,
"http://lighter:8080",
new AppConfiguration.SessionConfiguration(20, false,
[new AppConfiguration.PermanentSession("permanentSessionId", submitParams())]),
new AppConfiguration.SessionConfiguration(Duration.ofMinutes(20), false,
[new AppConfiguration.PermanentSession("permanentSessionId", submitParams())]
, Duration.ofMinutes(1), Duration.ofMinutes(2)),
["spark.kubernetes.driverEnv.TEST": "test"],
["spark.kubernetes.driverEnv.TEST": "test"]
)
Expand Down

0 comments on commit 64a1f88

Please sign in to comment.