Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make fork options to task specific #6

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions src/main/scala/sbtsparksubmit/SparkSubmitPlugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ object SparkSubmitPlugin extends AutoPlugin {
lazy val sparkSubmitMaster = settingKey[(Seq[String], Seq[String]) => String]("(SparkArgs, AppArgs) => Default Spark Master")
lazy val sparkSubmitPropertiesFile = settingKey[Option[String]]("The default configuration file used by Spark")
lazy val sparkSubmitClasspath = taskKey[Seq[File]]("Classpath used in SparkSubmit. For example, this can include the HADOOP_CONF_DIR for yarn deployment.")
def sparkSubmit = defaultSparkSubmitKey

class SparkSubmitSetting(name: String) {
lazy val sparkSubmit = InputKey[Unit](name,
Expand All @@ -48,6 +49,13 @@ object SparkSubmitPlugin extends AutoPlugin {


lazy val defaultSettings = Seq(
fork in sparkSubmit := (fork in defaultSparkSubmitKey).value,
javaHome in sparkSubmit := (javaHome in defaultSparkSubmitKey).value,
connectInput in sparkSubmit := (connectInput in defaultSparkSubmitKey).value,
outputStrategy in sparkSubmit := (outputStrategy in defaultSparkSubmitKey).value,
javaOptions in sparkSubmit := (javaOptions in defaultSparkSubmitKey).value,
baseDirectory in sparkSubmit := (baseDirectory in defaultSparkSubmitKey).value,
envVars in sparkSubmit := (envVars in defaultSparkSubmitKey).value,
sparkSubmit := {
val jar = (sparkSubmitJar in sparkSubmit).value

Expand Down Expand Up @@ -80,8 +88,23 @@ object SparkSubmitPlugin extends AutoPlugin {
}
}

val runner =
if((fork in sparkSubmit).value) {
val forkOptions = ForkOptions(
bootJars = Nil,
javaHome = (javaHome in sparkSubmit).value,
connectInput = (connectInput in sparkSubmit).value,
outputStrategy = (outputStrategy in sparkSubmit).value,
runJVMOptions = (javaOptions in sparkSubmit).value,
workingDirectory = Some((baseDirectory in sparkSubmit).value),
envVars = (envVars in sparkSubmit).value
)
new sbt.ForkRun(forkOptions)
} else {
new sbt.Run(scalaInstance.value, trapExit.value, taskTemporaryDirectory.value)
}

runner.value.run(
runner.run(
"org.apache.spark.deploy.SparkSubmit",
sparkSubmitClasspath.value,
options,
Expand Down Expand Up @@ -121,7 +144,8 @@ object SparkSubmitPlugin extends AutoPlugin {
sparkSubmitClasspath := data((fullClasspath in Compile).value)
)

def defaultSparkSubmitSetting: SparkSubmitSetting = SparkSubmitSetting("sparkSubmit")
lazy val defaultSparkSubmitSetting: SparkSubmitSetting = SparkSubmitSetting("sparkSubmit")
def defaultSparkSubmitKey = defaultSparkSubmitSetting.sparkSubmit

override def trigger = allRequirements
}
Expand All @@ -145,4 +169,4 @@ object SparkSubmitYARN extends AutoPlugin {
data((fullClasspath in Compile).value)
}
)
}
}