Skip to content

Commit

Permalink
Merge pull request #167 from exacaster/local_backend
Browse files Browse the repository at this point in the history
Add Local backend
  • Loading branch information
pdambrauskas authored Oct 14, 2022
2 parents e2a0e2d + feca7a4 commit 6288a79
Show file tree
Hide file tree
Showing 22 changed files with 473 additions and 58 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
.gradle
.env
build
bin
out
generated
lib
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ WORKDIR /home/app/
COPY server/ ./server/

WORKDIR /home/app/server/
RUN ./gradlew build -PSPARK_VERSION=${SPARK_VERSION}
RUN ./gradlew build -x test -PSPARK_VERSION=${SPARK_VERSION}

FROM node:lts-alpine3.14 as frontend

Expand Down
23 changes: 23 additions & 0 deletions dev/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Lighter on local environment
Besides K8s and YARN cluster modes, it is also possible to run Lighter in local mode.
Local mode is meant for local testing and demo purposes. You can start Lighter locally
by executing `docker-compose up` command inside `./dev/` folder.

When lighter is running you can execute example applications provided by Apache Spark.
For example `Spark PI` application can be started by executing this `curl` command:

```bash
curl -X 'POST' \
'http://localhost:8080/lighter/api/batches' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"name": "Spark PI",
"file": "/home/app/spark/examples/jars/spark-examples_2.12-3.3.0.jar",
"mainClass": "org.apache.spark.examples.SparkPi",
"args": ["100"]
}'
```

Lighter UI can be accessed on: [http://localhost:8080/lighter](http://localhost:8080/lighter).\
You can also explore Lighter API by visiting Swagger UI on [http://localhost:8080/swagger-ui/](http://localhost:8080/swagger-ui/).
6 changes: 6 additions & 0 deletions dev/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
version: "3.9"
services:
lighter:
build: ../
ports:
- "8080:8080"
4 changes: 3 additions & 1 deletion frontend/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Lighter UI

Frontend application used for tracking Apache Spark applications sbmitted via Lighter API. Application based on React framework.
Frontend application used for tracking Apache Spark applications submitted via Lighter API.
Application based on React framework.

## Available Scripts

Expand All @@ -13,6 +14,7 @@ Installs all dependencies, needed to run the application.
### `yarn start`

Lighter API application should be running on `http://localhost:8080` for UI to work.
[Click here](../dev/README.md) to see how to start Lighter backend locally.

Runs the app in the development mode.\
Open [http://localhost:3000](http://localhost:3000) to view it in the browser.
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/setupProxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const {createProxyMiddleware} = require('http-proxy-middleware');
require('dotenv').config();

const opts = {
target: process.env.LIGHTER_DEV_PROXY_URL,
target: process.env.LIGHTER_DEV_PROXY_URL || 'http://localhost:8080',
secure: true,
cookieDomainRewrite: 'localhost',
preserveHeaderKeyCase: true,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.exacaster.lighter.backend;

import static org.slf4j.LoggerFactory.getLogger;

import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkAppHandle.State;
import org.slf4j.Logger;

public class ClusterSparkListener implements SparkListener {

private static final Logger LOG = getLogger(ClusterSparkListener.class);
private final Consumer<Throwable> errorHandler;
private final CountDownLatch latch;

public ClusterSparkListener(Consumer<Throwable> errorHandler) {
this.errorHandler = errorHandler;
this.latch = new CountDownLatch(1);
}

@Override
public void stateChanged(SparkAppHandle handle) {
var state = handle.getState();
LOG.info("State change. AppId: {}, State: {}", handle.getAppId(), state);
handle.getError().ifPresent((error) -> {
LOG.warn("State changed with error: {} ", error.getMessage());
if (State.FAILED.equals(state)) {
onError(error);
}
});
// Disconnect when final or submitted.
// In case app fails after detach, status will be retrieved by ApplicationStatusHandler.
if (state != null && (state.isFinal() || State.SUBMITTED.equals(state))) {
handle.disconnect();
latch.countDown();
}
}

@Override
public void waitCompletion() throws InterruptedException {
latch.await();
}

@Override
public void onError(Throwable error) {
errorHandler.accept(error);
}
}
10 changes: 10 additions & 0 deletions server/src/main/java/com/exacaster/lighter/backend/Constants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.exacaster.lighter.backend;

public final class Constants {
public final static String DEPLOY_MODE_CLUSTER = "cluster";
public final static String DEPLOY_MODE_CLIENT = "client";
public final static String MASTER_YARN = "yarn";

private Constants() {
}
}
21 changes: 14 additions & 7 deletions server/src/main/java/com/exacaster/lighter/backend/SparkApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,33 @@ public class SparkApp {

private final Map<String, String> configDefaults;
private final Map<String, String> backendConfiguration;
private final Consumer<Throwable> errorHandler;
private final Application application;
private final SparkListener listener;

public SparkApp(Application application, Map<String, String> configDefaults,
Map<String, String> backendConfiguration, Consumer<Throwable> errorHandler) {

public SparkApp(Application application,
Map<String, String> configDefaults,
Map<String, String> backendConfiguration,
SparkListener listener) {
this.application = application;
this.configDefaults = configDefaults;
this.backendConfiguration = backendConfiguration;
this.errorHandler = errorHandler;
this.listener = listener;
}

public SparkApp(Application application,
Map<String, String> configDefaults,
Map<String, String> backendConfiguration,
Consumer<Throwable> errorHandler) {
this(application, configDefaults, backendConfiguration, new ClusterSparkListener(errorHandler));
}

public Waitable launch() {
try {
var launcher = buildLauncher();
var listener = new SparkListener(errorHandler);
launcher.startApplication(listener);
return listener;
} catch (IOException | IllegalArgumentException e) {
this.errorHandler.accept(e);
this.listener.onError(e);
}

return EmptyWaitable.INSTANCE;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,51 +1,14 @@
package com.exacaster.lighter.backend;

import static org.slf4j.LoggerFactory.getLogger;

import com.exacaster.lighter.concurrency.Waitable;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkAppHandle.Listener;
import org.apache.spark.launcher.SparkAppHandle.State;
import org.slf4j.Logger;

public class SparkListener implements Listener, Waitable {

private static final Logger LOG = getLogger(SparkListener.class);
private final Consumer<Throwable> errorHandler;
private final CountDownLatch latch;

public SparkListener(Consumer<Throwable> errorHandler) {
this.errorHandler = errorHandler;
this.latch = new CountDownLatch(1);
}
public interface SparkListener extends Listener, Waitable {

@Override
public void stateChanged(SparkAppHandle handle) {
var state = handle.getState();
LOG.info("State change. AppId: {}, State: {}", handle.getAppId(), state);
handle.getError().ifPresent((error) -> {
LOG.warn("State changed with error: {} ", error.getMessage());
if (State.FAILED.equals(state)) {
errorHandler.accept(error);
}
});
// Disconnect when final or submitted.
// In case app fails after detach, status will be retrieved by ApplicationStatusHandler.
if (state != null && (state.isFinal() || State.SUBMITTED.equals(state))) {
handle.disconnect();
latch.countDown();
}
}

@Override
public void infoChanged(SparkAppHandle handle) {
// TODO: ?
}
void onError(Throwable error);

@Override
public void waitCompletion() throws InterruptedException {
latch.await();
default void infoChanged(SparkAppHandle handle) {
// default: do nothing
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.exacaster.lighter.backend.kubernetes;

import static com.exacaster.lighter.backend.Constants.DEPLOY_MODE_CLUSTER;
import static java.util.Optional.ofNullable;
import static org.apache.spark.launcher.SparkLauncher.DEPLOY_MODE;
import static org.apache.spark.launcher.SparkLauncher.SPARK_MASTER;
import static org.slf4j.LoggerFactory.getLogger;

import com.exacaster.lighter.application.Application;
Expand Down Expand Up @@ -57,10 +60,10 @@ Map<String, String> getBackendConfiguration(Application application) {
var host = uri.getHost();
var props = new HashMap<String, String>();
props.putAll(Map.of(
"spark.submit.deployMode", "cluster",
DEPLOY_MODE, DEPLOY_MODE_CLUSTER,
"spark.kubernetes.namespace", properties.getNamespace(),
"spark.kubernetes.authenticate.driver.serviceAccountName", properties.getServiceAccount(),
"spark.master", properties.getMaster(),
SPARK_MASTER, properties.getMaster(),
"spark.kubernetes.driver.label." + SPARK_APP_TAG_LABEL, application.getId(),
"spark.kubernetes.executor.label." + SPARK_APP_TAG_LABEL, application.getId(),
"spark.kubernetes.driverEnv.PY_GATEWAY_PORT", String.valueOf(conf.getPyGatewayPort()),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.exacaster.lighter.backend.kubernetes;

import com.exacaster.lighter.configuration.AppConfiguration;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Singleton;
Expand All @@ -12,7 +12,7 @@ public class KubernetesConfigurationFactory {

@Singleton
public KubernetesBackend backend(KubernetesProperties properties, AppConfiguration conf) {
var client = new DefaultKubernetesClient();
var client = new KubernetesClientBuilder().build();
return new KubernetesBackend(properties, conf, client);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package com.exacaster.lighter.backend.local;

import static org.slf4j.LoggerFactory.getLogger;

import com.exacaster.lighter.application.Application;
import com.exacaster.lighter.application.ApplicationState;
import com.exacaster.lighter.backend.SparkListener;
import com.exacaster.lighter.backend.local.logger.LogCollectingHandler;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.logging.Logger;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkAppHandle.State;

public class LocalApp implements SparkListener {
private static final org.slf4j.Logger LOG = getLogger(LocalApp.class);

private final LogCollectingHandler logHandle;
private final String loggerName;
private final Consumer<Throwable> errorHandler;
private SparkAppHandle handle;

public LocalApp(Application application, Consumer<Throwable> errorHandler) {
this.errorHandler = errorHandler;
this.logHandle = new LogCollectingHandler(500);
this.loggerName = "spark_app_" + application.getId();
var logger = Logger.getLogger(loggerName);
logger.addHandler(logHandle);
}

public Optional<ApplicationState> getState() {
if (handle == null) {
return Optional.empty();
}

switch (handle.getState()) {
case UNKNOWN:
return Optional.of(ApplicationState.NOT_STARTED);
case CONNECTED:
case SUBMITTED:
case RUNNING:
return Optional.of(ApplicationState.BUSY);
case FINISHED:
return Optional.of(ApplicationState.SUCCESS);
case FAILED:
return Optional.of(ApplicationState.ERROR);
case LOST:
return Optional.of(ApplicationState.DEAD);
case KILLED:
return Optional.of(ApplicationState.KILLED);
}
return Optional.empty();
}

public String getLog() {
return logHandle.getLogs();
}

public void kill() {
Objects.requireNonNull(handle);
handle.kill();
Logger.getLogger(loggerName).removeHandler(logHandle);
}

@Override
public void onError(Throwable error) {
errorHandler.accept(error);
}

@Override
public void waitCompletion() throws InterruptedException {
// Local application is not detached, until it is completed
// do not block application processing for local applications.
}

@Override
public void stateChanged(SparkAppHandle handle) {
this.handle = handle;
var state = handle.getState();
LOG.info("State change. AppId: {}, State: {}", handle.getAppId(), state);
handle.getError().ifPresent((error) -> {
LOG.warn("State changed with error: {} ", error.getMessage());
if (State.FAILED.equals(state)) {
onError(error);
}
});
}

public String getLoggerName() {
return loggerName;
}

}
Loading

0 comments on commit 6288a79

Please sign in to comment.