Skip to content

Commit

Permalink
[LIVY-563] Propagate RSC configuration when creating sessions.
Browse files Browse the repository at this point in the history
Even though not all RSC configs apply to the remote driver, a few do,
so propagate all of them when starting a new session.

Includes new unit test.

Author: Marcelo Vanzin <[email protected]>

Closes #168 from vanzin/LIVY-563.
  • Loading branch information
Marcelo Vanzin committed Apr 11, 2019
1 parent d5b27dd commit 5abc043
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 2 deletions.
2 changes: 1 addition & 1 deletion rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class RSCConf extends ClientConf<RSCConf> {
public static final String SPARK_CONF_PREFIX = "spark.";
public static final String LIVY_SPARK_PREFIX = SPARK_CONF_PREFIX + "__livy__.";

private static final String RSC_CONF_PREFIX = "livy.rsc.";
public static final String RSC_CONF_PREFIX = "livy.rsc.";

public static enum Entry implements ConfEntry {
CLIENT_ID("client.auth.id", null),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,15 @@ object InteractiveSession extends Logging {
mergeHiveSiteAndHiveDeps(sparkMajorVersion)
}

// Pick all the RSC-specific configs that have not been explicitly set otherwise, and
// put them in the resulting properties, so that the remote driver can use them.
livyConf.iterator().asScala.foreach { e =>
val (key, value) = (e.getKey(), e.getValue())
if (key.startsWith(RSCConf.RSC_CONF_PREFIX) && !builderProperties.contains(key)) {
builderProperties(key) = value
}
}

builderProperties
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ class InteractiveSessionSpec extends FunSpec
"dummy.jar"))
}


it("should set rsc jars through livy conf") {
val rscJars = Set(
"dummy.jar",
Expand Down Expand Up @@ -177,6 +176,19 @@ class InteractiveSessionSpec extends FunSpec
session.state should (be(SessionState.Starting) or be(SessionState.Idle))
}

it("should propagate RSC configuration properties") {
val livyConf = new LivyConf(false)
.set(LivyConf.REPL_JARS, "dummy.jar")
.set(RSCConf.Entry.SASL_QOP.key(), "foo")
.set(RSCConf.Entry.RPC_CHANNEL_LOG_LEVEL.key(), "TRACE")
.set(LivyConf.LIVY_SPARK_VERSION, sys.env("LIVY_SPARK_VERSION"))
.set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.10")

val properties = InteractiveSession.prepareBuilderProp(Map.empty, Spark, livyConf)
assert(properties(RSCConf.Entry.SASL_QOP.key()) === "foo")
assert(properties(RSCConf.Entry.RPC_CHANNEL_LOG_LEVEL.key()) === "TRACE")
}

withSession("should execute `1 + 2` == 3") { session =>
val pyResult = executeStatement("1 + 2", Some("pyspark"))
pyResult should equal (Extraction.decompose(Map(
Expand Down

0 comments on commit 5abc043

Please sign in to comment.