Skip to content

Commit

Permalink
Mark application as ERROR if it is not found in YARN (#147)
Browse files Browse the repository at this point in the history
* Add missing master property to launcher
Mark application as ERROR if it is not found in YARN

* Pull error handling for state fetch failures up
  • Loading branch information
Minutis authored Oct 14, 2022
1 parent 6288a79 commit 20be6a3
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
import static org.apache.spark.launcher.SparkLauncher.SPARK_MASTER;
import static org.slf4j.LoggerFactory.getLogger;

import com.exacaster.lighter.application.Application;
import com.exacaster.lighter.application.ApplicationInfo;
import com.exacaster.lighter.application.ApplicationState;
import com.exacaster.lighter.application.*;
import com.exacaster.lighter.backend.Backend;
import com.exacaster.lighter.backend.SparkApp;
import com.exacaster.lighter.configuration.AppConfiguration;
Expand All @@ -22,6 +20,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import java.util.function.Consumer;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
Expand All @@ -47,26 +46,26 @@ public YarnBackend(YarnProperties yarnProperties, YarnClient client, AppConfigur
@Override
public Optional<ApplicationInfo> getInfo(Application application) {
return getYarnApplicationId(application)
.map(id -> new ApplicationInfo(getState(id), id));
.flatMap(id -> getState(id).map(state -> new ApplicationInfo(state, id)));
}

private ApplicationState getState(String id) {
private Optional<ApplicationState> getState(String id) {
try {
var yarnApplication = client.getApplicationReport(fromString(id));
switch (yarnApplication.getFinalApplicationStatus()) {
case UNDEFINED:
return ApplicationState.BUSY;
return Optional.of(ApplicationState.BUSY);
case SUCCEEDED:
return ApplicationState.SUCCESS;
return Optional.of(ApplicationState.SUCCESS);
case FAILED:
return ApplicationState.ERROR;
return Optional.of(ApplicationState.ERROR);
case KILLED:
return ApplicationState.KILLED;
return Optional.of(ApplicationState.KILLED);
}
} catch (YarnException | IOException e) {
LOG.error("Unexpected error for appId: {}", id, e);
}
throw new IllegalStateException("Unexpected state for appId: " + id);
return Optional.empty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
@Factory
@Requires(beans = YarnProperties.class)
public class YarnConfigurationFactory {

@Singleton
public YarnBackend backend(YarnProperties yarnProperties, AppConfiguration conf,
@Property(name = "hadoop.conf.dir") String hadoopConfDir) throws IOException {
Expand Down

0 comments on commit 20be6a3

Please sign in to comment.