Skip to content

Commit

Permalink
get spark app name via get class, update trigger time to streamSocket…
Browse files Browse the repository at this point in the history
…EventToHDFSV5.scala
  • Loading branch information
yennan.liu committed Apr 27, 2021
1 parent 4f736e0 commit a65ce77
Show file tree
Hide file tree
Showing 23 changed files with 24 additions and 22 deletions.
2 changes: 1 addition & 1 deletion spark/src/main/scala/com/yen/dev/KafkaAvroSinkDemo1.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ object KafkaAvroSinkDemo1 extends App{

val spark = SparkSession.builder()
.master("local[3]")
.appName("Kafka Avro Sink Demo")
.appName(this.getClass.getName)
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.getOrCreate()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ object KafkaAvroSourceDemo1 extends App{

val spark = SparkSession.builder()
.master("local[3]")
.appName("KafkaAvroSourceDemo1")
.appName(this.getClass.getName)
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.getOrCreate()

Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/com/yen/dev/KafkaSinkDemo1.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ object KafkaSinkDemo1 extends App{

val spark = SparkSession.builder()
.master("local[3]")
.appName("KafkaSinkDemo1")
.appName(this.getClass.getName)
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.getOrCreate()

Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/com/yen/dev/MultiQueryDemo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ object MultiQueryDemo extends App{

val spark = SparkSession.builder()
.master("local[3]")
.appName("MultiQueryDemo")
.appName(this.getClass.getName)
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.getOrCreate()

Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/com/yen/dev/ParseTxt2Json.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ object ParseTxt2Json extends App{

val spark = SparkSession
.builder
.appName("SparkBatchTest")
.appName(this.getClass.getName)
.master("local[*]")
.getOrCreate()

Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/com/yen/dev/SlidingWindowDemo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ object SlidingWindowDemo extends App {

val spark = SparkSession.builder()
.master("local[3]")
.appName("SlidingWindowDemo")
.appName(this.getClass.getName)
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/com/yen/dev/SparkBatchTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ object SparkBatchTest extends App{

val spark = SparkSession
.builder
.appName("SparkBatchTest")
.appName(this.getClass.getName)
.master("local[*]")
.getOrCreate()

Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/com/yen/dev/SparkBatchTest2.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ object SparkBatchTest2 extends App {

val spark = SparkSession
.builder
.appName("SparkBatchAdaptorTest")
.appName(this.getClass.getName)
.master("local[*]")
.getOrCreate()

Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/com/yen/dev/SreamFromFile.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ object StreamFromFile extends App{

val sparkSession = SparkSession.builder
.master("local")
.appName("StreamFromFile")
.appName(this.getClass.getName)
.getOrCreate()

val schema = StructType(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ object StreamDetailFromKafka extends App{

val spark = SparkSession
.builder
.appName("StreamDetailFromKafka")
.appName(this.getClass.getName)
.master("local[*]")
.config("spark.sql.warehouse.dir", "/temp") // Necessary to work around a Windows bug in Spark 2.0.0; omit if you're not on Windows.
.getOrCreate()
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/com/yen/dev/StreamFromFile2.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ object StreamFromFile2 extends App{

val spark = SparkSession.builder()
.master("local[3]")
.appName("StreamFromFile2")
.appName(this.getClass.getName)
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.config("spark.sql.streaming.schemaInference", "true")
.getOrCreate()
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/com/yen/dev/StreamFromFile3.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ object StreamFromFile3 extends App{

val spark = SparkSession.builder()
.master("local[3]")
.appName("StreamFromFile3")
.appName(this.getClass.getName)
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.config("spark.sql.streaming.schemaInference", "true")
.getOrCreate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ object StreamFromFileSendToKafka1 extends App{

val spark = SparkSession.builder()
.master("local[3]")
.appName("StreamFromFileSendToKafka1")
.appName(this.getClass.getName)
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.config("spark.sql.streaming.schemaInference", "true")
.getOrCreate()
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/com/yen/dev/StreamFromKafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ object StreamFromKafka extends App{

val spark = SparkSession
.builder
.appName("StreamFromKafka")
.appName(this.getClass.getName)
.master("local[*]")
.config("spark.sql.warehouse.dir", "/temp") // Necessary to work around a Windows bug in Spark 2.0.0; omit if you're not on Windows.
.getOrCreate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ object StreamFromKafkaWithSchema1 extends App{

val spark = SparkSession.builder()
.master("local[3]")
.appName("StreamFromKafkaWithSchema1")
.appName(this.getClass.getName)
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.getOrCreate()

Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/com/yen/dev/StreamFromSocket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object StreamFromSocket extends App{

val spark = SparkSession.builder()
.master("local[3]")
.appName("StreamFromSocket")
.appName(this.getClass.getName)
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.config("spark.sql.shuffle.partitions", 3)
.getOrCreate()
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/com/yen/dev/StreamFromSocket2.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ object StreamFromSocket2 extends App {

val spark = SparkSession.builder()
.master("local[3]")
.appName("StreamFromSocket2")
.appName(this.getClass.getName)
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.config("spark.sql.shuffle.partitions", 3)
.getOrCreate()
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/com/yen/dev/StreamFromSocket3.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ object StreamFromSocket3 extends App {

val spark = SparkSession.builder()
.master("local[3]")
.appName("StreamFromSocket3")
.appName(this.getClass.getName)
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.config("spark.sql.shuffle.partitions", 3)
.getOrCreate()
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/com/yen/dev/StreamFromSocket4.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ object StreamFromSocket4 extends App{

val spark = SparkSession.builder()
.master("local[3]")
.appName("StreamFromSocket")
.appName(this.getClass.getName)
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.config("spark.sql.shuffle.partitions", 3)
.getOrCreate()
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/com/yen/dev/StreamFromSocket5.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ object StreamFromSocket5 extends App {

val spark = SparkSession.builder()
.master("local[3]")
.appName("StreamFromSocket5")
.appName(this.getClass.getName)
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.config("spark.sql.shuffle.partitions", 3)
.getOrCreate()
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/com/yen/dev/TransformDemo1.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ object TransformDemo1 extends App {
// get sparkConf
val sparkConf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("TransformDemo1")
.setAppName(this.getClass.getName)

// get StreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(3))
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/com/yen/dev/TumblingWindowDemo1.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ object TumblingWindowDemo1 extends App {

val spark = SparkSession.builder()
.master("local[3]")
.appName("TumblingWindowDemo1")
.appName(this.getClass.getName)
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.config("spark.sql.shuffle.partitions", 2)
.getOrCreate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package com.yen.streamSocketToHDFS
import org.apache.log4j.Logger
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}

object streamSocketEventToHDFSV5 extends App {
Expand Down Expand Up @@ -53,6 +54,7 @@ object streamSocketEventToHDFSV5 extends App {
val query = eventDF.writeStream
.format("console")
.option("checkpointLocation", "chk-point-dir")
.trigger(Trigger.ProcessingTime("10 second"))
.start()

query.awaitTermination()
Expand Down

0 comments on commit a65ce77

Please sign in to comment.