Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
yennanliu committed Aug 10, 2023
1 parent cce6f0c commit e98fcfa
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 20 deletions.
3 changes: 2 additions & 1 deletion spark-poc/sparkJob/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ libraryDependencies ++= Seq(
// write to s3
"org.apache.spark" %% "spark-hadoop-cloud" % sparkVersion,

"com.typesafe" % "config" % "1.3.1"
"com.typesafe" % "config" % "1.3.1",
"com.github.scopt" %% "scopt" % "3.6.0"
)

conflictManager := ConflictManager.latestRevision
Expand Down
13 changes: 13 additions & 0 deletions spark-poc/sparkJob/src/main/scala/dev/config/SparkConfig.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package dev.config

import org.ini4j.spi.OptionsParser

object SparkConfig {

// def parser(config: String): OptionsParser[T] = {
// new OptionsParser[T](config){
// //head(config)
// }
// }

}
39 changes: 26 additions & 13 deletions spark-poc/sparkJob/src/main/scala/driver/SparkDriver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import org.apache.spark.sql.SparkSession

object SparkDriver {

val appName: String = "defaultSparkApp"
// val appName: String = "defaultSparkApp"
@transient lazy val logger: Logger = Logger.getLogger(getClass.getName)

def main(args: Array[String]): Unit = {

}
//
// def main(args: Array[String]): Unit = {
//
// }

// attr
val arguments: String = "someArgument"
Expand All @@ -23,14 +23,27 @@ object SparkDriver {
new SparkContext(conf)
}

def getSparkSession(appName: String, executionType: String = "cluster", cores: Int = 3): SparkSession = {
// TODO : setup for local, cluster... mode
SparkSession
.builder()
.appName(appName)
//.enableHiveSupport()
.master("local[*]")
.getOrCreate()
def getSparkSession(appName: String, executionType: String = "cluster", cores: Int = 3, enableHive: Boolean = false): SparkSession = {

var _executionType = executionType
if(!executionType.equals("cluster")){
_executionType = "local[*]"
}

if (enableHive){
SparkSession
.builder()
.appName(appName)
.enableHiveSupport()
.master(_executionType)
.getOrCreate()
}else{
SparkSession
.builder()
.appName(appName)
.master(_executionType)
.getOrCreate()
}

}
}
30 changes: 24 additions & 6 deletions spark-poc/sparkJob/src/main/scala/spark/myJob1.scala
Original file line number Diff line number Diff line change
@@ -1,30 +1,48 @@
package spark

import config.AppConfig
import driver.SparkDriver
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory

object myJob1 {

def main(args: Array[String]): Unit = {

val logger = LoggerFactory.getLogger(this.getClass)
//logger.info(AppConfig.dump)

val conf = new SparkConf()
.setAppName("myJob1")
.setMaster("local[*]")

//val s_driver = SparkDriver.getSparkSession("myJob1");
val s_context = SparkDriver.getSparkContext("myJob1", conf);
val sc = SparkDriver.getSparkContext("myJob1", conf);
val spark = SparkDriver

//println("s_driver = " + s_driver)
println("s_context = " + s_context)
println("sc = " + sc)
println("spark = " + spark)

val rdd1 = s_context.makeRDD(1 to 4, 2)
val rdd1 = sc.makeRDD(1 to 4, 2)

println(rdd1.aggregate(0)(_ + _, _ + _)) // 10

println(rdd1.aggregate(1)(_ + _, _ + _)) // 13

println("================")

val rdd2 = sc.makeRDD(1 to 4, 4)

println(rdd2.aggregate(1)(_ + _, _ + _)) // 15

println("================")

// compare : fold

println(rdd1.fold(0)(_ + _)) // 10

println(rdd1.fold(1)(_ + _)) // 13

println(">>> SparkApp1 end ")
}

}

0 comments on commit e98fcfa

Please sign in to comment.