diff --git a/spark-poc/sparkJob/build.sbt b/spark-poc/sparkJob/build.sbt index cc7d9b3..35c0a8c 100644 --- a/spark-poc/sparkJob/build.sbt +++ b/spark-poc/sparkJob/build.sbt @@ -24,7 +24,8 @@ libraryDependencies ++= Seq( // write to s3 "org.apache.spark" %% "spark-hadoop-cloud" % sparkVersion, - "com.typesafe" % "config" % "1.3.1" + "com.typesafe" % "config" % "1.3.1", + "com.github.scopt" %% "scopt" % "3.6.0" ) conflictManager := ConflictManager.latestRevision diff --git a/spark-poc/sparkJob/src/main/scala/dev/config/SparkConfig.scala b/spark-poc/sparkJob/src/main/scala/dev/config/SparkConfig.scala new file mode 100644 index 0000000..d9e4633 --- /dev/null +++ b/spark-poc/sparkJob/src/main/scala/dev/config/SparkConfig.scala @@ -0,0 +1,13 @@ +package dev.config + +import org.ini4j.spi.OptionsParser + +object SparkConfig { + +// def parser(config: String): OptionsParser[T] = { +// new OptionsParser[T](config){ +// //head(config) +// } +// } + +} diff --git a/spark-poc/sparkJob/src/main/scala/driver/SparkDriver.scala b/spark-poc/sparkJob/src/main/scala/driver/SparkDriver.scala index 29e057f..4da2e1d 100644 --- a/spark-poc/sparkJob/src/main/scala/driver/SparkDriver.scala +++ b/spark-poc/sparkJob/src/main/scala/driver/SparkDriver.scala @@ -6,12 +6,12 @@ import org.apache.spark.sql.SparkSession object SparkDriver { - val appName: String = "defaultSparkApp" +// val appName: String = "defaultSparkApp" @transient lazy val logger: Logger = Logger.getLogger(getClass.getName) - - def main(args: Array[String]): Unit = { - - } +// +// def main(args: Array[String]): Unit = { +// +// } // attr val arguments: String = "someArgument" @@ -23,14 +23,27 @@ object SparkDriver { new SparkContext(conf) } - def getSparkSession(appName: String, executionType: String = "cluster", cores: Int = 3): SparkSession = { - // TODO : setup for local, cluster... mode - SparkSession - .builder() - .appName(appName) - //.enableHiveSupport() - .master("local[*]") - .getOrCreate() + def getSparkSession(appName: String, executionType: String = "cluster", cores: Int = 3, enableHive: Boolean = false): SparkSession = { + + var _executionType = executionType + if(!executionType.equals("cluster")){ + _executionType = "local[*]" + } + + if (enableHive){ + SparkSession + .builder() + .appName(appName) + .enableHiveSupport() + .master(_executionType) + .getOrCreate() + }else{ + SparkSession + .builder() + .appName(appName) + .master(_executionType) + .getOrCreate() + } } } diff --git a/spark-poc/sparkJob/src/main/scala/spark/myJob1.scala b/spark-poc/sparkJob/src/main/scala/spark/myJob1.scala index 70d906e..244c671 100644 --- a/spark-poc/sparkJob/src/main/scala/spark/myJob1.scala +++ b/spark-poc/sparkJob/src/main/scala/spark/myJob1.scala @@ -1,30 +1,48 @@ package spark +import config.AppConfig import driver.SparkDriver import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession +import org.slf4j.LoggerFactory object myJob1 { def main(args: Array[String]): Unit = { + val logger = LoggerFactory.getLogger(this.getClass) + //logger.info(AppConfig.dump) + val conf = new SparkConf() .setAppName("myJob1") .setMaster("local[*]") - //val s_driver = SparkDriver.getSparkSession("myJob1"); - val s_context = SparkDriver.getSparkContext("myJob1", conf); + val sc = SparkDriver.getSparkContext("myJob1", conf); val spark = SparkDriver - - //println("s_driver = " + s_driver) - println("s_context = " + s_context) + println("sc = " + sc) println("spark = " + spark) - val rdd1 = s_context.makeRDD(1 to 4, 2) + val rdd1 = sc.makeRDD(1 to 4, 2) println(rdd1.aggregate(0)(_ + _, _ + _)) // 10 println(rdd1.aggregate(1)(_ + _, _ + _)) // 13 + + println("================") + + val rdd2 = sc.makeRDD(1 to 4, 4) + + println(rdd2.aggregate(1)(_ + _, _ + _)) // 15 + + println("================") + + // compare : fold + + println(rdd1.fold(0)(_ + _)) // 10 + + println(rdd1.fold(1)(_ + _)) // 13 + + println(">>> SparkApp1 end ") } }