From a65ce77f904e888e30c2c763cb76117e9e5382c6 Mon Sep 17 00:00:00 2001 From: "yennan.liu" Date: Tue, 27 Apr 2021 10:30:01 +0800 Subject: [PATCH] get spark app name via get class, update trigger time to streamSocketEventToHDFSV5.scala --- spark/src/main/scala/com/yen/dev/KafkaAvroSinkDemo1.scala | 2 +- spark/src/main/scala/com/yen/dev/KafkaAvroSourceDemo1.scala | 2 +- spark/src/main/scala/com/yen/dev/KafkaSinkDemo1.scala | 2 +- spark/src/main/scala/com/yen/dev/MultiQueryDemo.scala | 2 +- spark/src/main/scala/com/yen/dev/ParseTxt2Json.scala | 2 +- spark/src/main/scala/com/yen/dev/SlidingWindowDemo.scala | 2 +- spark/src/main/scala/com/yen/dev/SparkBatchTest.scala | 2 +- spark/src/main/scala/com/yen/dev/SparkBatchTest2.scala | 2 +- spark/src/main/scala/com/yen/dev/SreamFromFile.scala | 2 +- spark/src/main/scala/com/yen/dev/StreamDetailFromKafka.scala | 2 +- spark/src/main/scala/com/yen/dev/StreamFromFile2.scala | 2 +- spark/src/main/scala/com/yen/dev/StreamFromFile3.scala | 2 +- .../src/main/scala/com/yen/dev/StreamFromFileSendToKafka1.scala | 2 +- spark/src/main/scala/com/yen/dev/StreamFromKafka.scala | 2 +- .../src/main/scala/com/yen/dev/StreamFromKafkaWithSchema1.scala | 2 +- spark/src/main/scala/com/yen/dev/StreamFromSocket.scala | 2 +- spark/src/main/scala/com/yen/dev/StreamFromSocket2.scala | 2 +- spark/src/main/scala/com/yen/dev/StreamFromSocket3.scala | 2 +- spark/src/main/scala/com/yen/dev/StreamFromSocket4.scala | 2 +- spark/src/main/scala/com/yen/dev/StreamFromSocket5.scala | 2 +- spark/src/main/scala/com/yen/dev/TransformDemo1.scala | 2 +- spark/src/main/scala/com/yen/dev/TumblingWindowDemo1.scala | 2 +- .../com/yen/streamSocketToHDFS/streamSocketEventToHDFSV5.scala | 2 ++ 23 files changed, 24 insertions(+), 22 deletions(-) diff --git a/spark/src/main/scala/com/yen/dev/KafkaAvroSinkDemo1.scala b/spark/src/main/scala/com/yen/dev/KafkaAvroSinkDemo1.scala index 2567953..c782e8b 100644 --- a/spark/src/main/scala/com/yen/dev/KafkaAvroSinkDemo1.scala +++ b/spark/src/main/scala/com/yen/dev/KafkaAvroSinkDemo1.scala @@ -14,7 +14,7 @@ object KafkaAvroSinkDemo1 extends App{ val spark = SparkSession.builder() .master("local[3]") - .appName("Kafka Avro Sink Demo") + .appName(this.getClass.getName) .config("spark.streaming.stopGracefullyOnShutdown", "true") .getOrCreate() diff --git a/spark/src/main/scala/com/yen/dev/KafkaAvroSourceDemo1.scala b/spark/src/main/scala/com/yen/dev/KafkaAvroSourceDemo1.scala index 280bc7b..fd9dbdb 100644 --- a/spark/src/main/scala/com/yen/dev/KafkaAvroSourceDemo1.scala +++ b/spark/src/main/scala/com/yen/dev/KafkaAvroSourceDemo1.scala @@ -15,7 +15,7 @@ object KafkaAvroSourceDemo1 extends App{ val spark = SparkSession.builder() .master("local[3]") - .appName("KafkaAvroSourceDemo1") + .appName(this.getClass.getName) .config("spark.streaming.stopGracefullyOnShutdown", "true") .getOrCreate() diff --git a/spark/src/main/scala/com/yen/dev/KafkaSinkDemo1.scala b/spark/src/main/scala/com/yen/dev/KafkaSinkDemo1.scala index cfe0f60..c3fa3a6 100644 --- a/spark/src/main/scala/com/yen/dev/KafkaSinkDemo1.scala +++ b/spark/src/main/scala/com/yen/dev/KafkaSinkDemo1.scala @@ -14,7 +14,7 @@ object KafkaSinkDemo1 extends App{ val spark = SparkSession.builder() .master("local[3]") - .appName("KafkaSinkDemo1") + .appName(this.getClass.getName) .config("spark.streaming.stopGracefullyOnShutdown", "true") .getOrCreate() diff --git a/spark/src/main/scala/com/yen/dev/MultiQueryDemo.scala b/spark/src/main/scala/com/yen/dev/MultiQueryDemo.scala index 701a6a5..106741e 100644 --- a/spark/src/main/scala/com/yen/dev/MultiQueryDemo.scala +++ b/spark/src/main/scala/com/yen/dev/MultiQueryDemo.scala @@ -14,7 +14,7 @@ object MultiQueryDemo extends App{ val spark = SparkSession.builder() .master("local[3]") - .appName("MultiQueryDemo") + .appName(this.getClass.getName) .config("spark.streaming.stopGracefullyOnShutdown", "true") .getOrCreate() diff --git a/spark/src/main/scala/com/yen/dev/ParseTxt2Json.scala b/spark/src/main/scala/com/yen/dev/ParseTxt2Json.scala index d8172d7..f9b0d61 100644 --- a/spark/src/main/scala/com/yen/dev/ParseTxt2Json.scala +++ b/spark/src/main/scala/com/yen/dev/ParseTxt2Json.scala @@ -13,7 +13,7 @@ object ParseTxt2Json extends App{ val spark = SparkSession .builder - .appName("SparkBatchTest") + .appName(this.getClass.getName) .master("local[*]") .getOrCreate() diff --git a/spark/src/main/scala/com/yen/dev/SlidingWindowDemo.scala b/spark/src/main/scala/com/yen/dev/SlidingWindowDemo.scala index 6bc1d29..2254ed6 100644 --- a/spark/src/main/scala/com/yen/dev/SlidingWindowDemo.scala +++ b/spark/src/main/scala/com/yen/dev/SlidingWindowDemo.scala @@ -15,7 +15,7 @@ object SlidingWindowDemo extends App { val spark = SparkSession.builder() .master("local[3]") - .appName("SlidingWindowDemo") + .appName(this.getClass.getName) .config("spark.streaming.stopGracefullyOnShutdown", "true") .config("spark.sql.shuffle.partitions", 1) .getOrCreate() diff --git a/spark/src/main/scala/com/yen/dev/SparkBatchTest.scala b/spark/src/main/scala/com/yen/dev/SparkBatchTest.scala index 40e1f0a..b6a3233 100644 --- a/spark/src/main/scala/com/yen/dev/SparkBatchTest.scala +++ b/spark/src/main/scala/com/yen/dev/SparkBatchTest.scala @@ -14,7 +14,7 @@ object SparkBatchTest extends App{ val spark = SparkSession .builder - .appName("SparkBatchTest") + .appName(this.getClass.getName) .master("local[*]") .getOrCreate() diff --git a/spark/src/main/scala/com/yen/dev/SparkBatchTest2.scala b/spark/src/main/scala/com/yen/dev/SparkBatchTest2.scala index c728203..25b36bb 100644 --- a/spark/src/main/scala/com/yen/dev/SparkBatchTest2.scala +++ b/spark/src/main/scala/com/yen/dev/SparkBatchTest2.scala @@ -14,7 +14,7 @@ object SparkBatchTest2 extends App { val spark = SparkSession .builder - .appName("SparkBatchAdaptorTest") + .appName(this.getClass.getName) .master("local[*]") .getOrCreate() diff --git a/spark/src/main/scala/com/yen/dev/SreamFromFile.scala b/spark/src/main/scala/com/yen/dev/SreamFromFile.scala index 55eeea3..85c5123 100644 --- a/spark/src/main/scala/com/yen/dev/SreamFromFile.scala +++ b/spark/src/main/scala/com/yen/dev/SreamFromFile.scala @@ -13,7 +13,7 @@ object StreamFromFile extends App{ val sparkSession = SparkSession.builder .master("local") - .appName("StreamFromFile") + .appName(this.getClass.getName) .getOrCreate() val schema = StructType( diff --git a/spark/src/main/scala/com/yen/dev/StreamDetailFromKafka.scala b/spark/src/main/scala/com/yen/dev/StreamDetailFromKafka.scala index c9bd1df..8c1fc7a 100644 --- a/spark/src/main/scala/com/yen/dev/StreamDetailFromKafka.scala +++ b/spark/src/main/scala/com/yen/dev/StreamDetailFromKafka.scala @@ -16,7 +16,7 @@ object StreamDetailFromKafka extends App{ val spark = SparkSession .builder - .appName("StreamDetailFromKafka") + .appName(this.getClass.getName) .master("local[*]") .config("spark.sql.warehouse.dir", "/temp") // Necessary to work around a Windows bug in Spark 2.0.0; omit if you're not on Windows. .getOrCreate() diff --git a/spark/src/main/scala/com/yen/dev/StreamFromFile2.scala b/spark/src/main/scala/com/yen/dev/StreamFromFile2.scala index a67ca7f..837e26e 100644 --- a/spark/src/main/scala/com/yen/dev/StreamFromFile2.scala +++ b/spark/src/main/scala/com/yen/dev/StreamFromFile2.scala @@ -13,7 +13,7 @@ object StreamFromFile2 extends App{ val spark = SparkSession.builder() .master("local[3]") - .appName("StreamFromFile2") + .appName(this.getClass.getName) .config("spark.streaming.stopGracefullyOnShutdown", "true") .config("spark.sql.streaming.schemaInference", "true") .getOrCreate() diff --git a/spark/src/main/scala/com/yen/dev/StreamFromFile3.scala b/spark/src/main/scala/com/yen/dev/StreamFromFile3.scala index 98000f4..4814967 100644 --- a/spark/src/main/scala/com/yen/dev/StreamFromFile3.scala +++ b/spark/src/main/scala/com/yen/dev/StreamFromFile3.scala @@ -14,7 +14,7 @@ object StreamFromFile3 extends App{ val spark = SparkSession.builder() .master("local[3]") - .appName("StreamFromFile3") + .appName(this.getClass.getName) .config("spark.streaming.stopGracefullyOnShutdown", "true") .config("spark.sql.streaming.schemaInference", "true") .getOrCreate() diff --git a/spark/src/main/scala/com/yen/dev/StreamFromFileSendToKafka1.scala b/spark/src/main/scala/com/yen/dev/StreamFromFileSendToKafka1.scala index f00a2aa..399d576 100644 --- a/spark/src/main/scala/com/yen/dev/StreamFromFileSendToKafka1.scala +++ b/spark/src/main/scala/com/yen/dev/StreamFromFileSendToKafka1.scala @@ -12,7 +12,7 @@ object StreamFromFileSendToKafka1 extends App{ val spark = SparkSession.builder() .master("local[3]") - .appName("StreamFromFileSendToKafka1") + .appName(this.getClass.getName) .config("spark.streaming.stopGracefullyOnShutdown", "true") .config("spark.sql.streaming.schemaInference", "true") .getOrCreate() diff --git a/spark/src/main/scala/com/yen/dev/StreamFromKafka.scala b/spark/src/main/scala/com/yen/dev/StreamFromKafka.scala index eff2cf7..c105a69 100644 --- a/spark/src/main/scala/com/yen/dev/StreamFromKafka.scala +++ b/spark/src/main/scala/com/yen/dev/StreamFromKafka.scala @@ -14,7 +14,7 @@ object StreamFromKafka extends App{ val spark = SparkSession .builder - .appName("StreamFromKafka") + .appName(this.getClass.getName) .master("local[*]") .config("spark.sql.warehouse.dir", "/temp") // Necessary to work around a Windows bug in Spark 2.0.0; omit if you're not on Windows. .getOrCreate() diff --git a/spark/src/main/scala/com/yen/dev/StreamFromKafkaWithSchema1.scala b/spark/src/main/scala/com/yen/dev/StreamFromKafkaWithSchema1.scala index cd5295f..dab9fa1 100644 --- a/spark/src/main/scala/com/yen/dev/StreamFromKafkaWithSchema1.scala +++ b/spark/src/main/scala/com/yen/dev/StreamFromKafkaWithSchema1.scala @@ -16,7 +16,7 @@ object StreamFromKafkaWithSchema1 extends App{ val spark = SparkSession.builder() .master("local[3]") - .appName("StreamFromKafkaWithSchema1") + .appName(this.getClass.getName) .config("spark.streaming.stopGracefullyOnShutdown", "true") .getOrCreate() diff --git a/spark/src/main/scala/com/yen/dev/StreamFromSocket.scala b/spark/src/main/scala/com/yen/dev/StreamFromSocket.scala index ea813f6..0137187 100644 --- a/spark/src/main/scala/com/yen/dev/StreamFromSocket.scala +++ b/spark/src/main/scala/com/yen/dev/StreamFromSocket.scala @@ -17,7 +17,7 @@ object StreamFromSocket extends App{ val spark = SparkSession.builder() .master("local[3]") - .appName("StreamFromSocket") + .appName(this.getClass.getName) .config("spark.streaming.stopGracefullyOnShutdown", "true") .config("spark.sql.shuffle.partitions", 3) .getOrCreate() diff --git a/spark/src/main/scala/com/yen/dev/StreamFromSocket2.scala b/spark/src/main/scala/com/yen/dev/StreamFromSocket2.scala index 893c5e8..36f1697 100644 --- a/spark/src/main/scala/com/yen/dev/StreamFromSocket2.scala +++ b/spark/src/main/scala/com/yen/dev/StreamFromSocket2.scala @@ -12,7 +12,7 @@ object StreamFromSocket2 extends App { val spark = SparkSession.builder() .master("local[3]") - .appName("StreamFromSocket2") + .appName(this.getClass.getName) .config("spark.streaming.stopGracefullyOnShutdown", "true") .config("spark.sql.shuffle.partitions", 3) .getOrCreate() diff --git a/spark/src/main/scala/com/yen/dev/StreamFromSocket3.scala b/spark/src/main/scala/com/yen/dev/StreamFromSocket3.scala index 0f10bd4..5d78f17 100644 --- a/spark/src/main/scala/com/yen/dev/StreamFromSocket3.scala +++ b/spark/src/main/scala/com/yen/dev/StreamFromSocket3.scala @@ -12,7 +12,7 @@ object StreamFromSocket3 extends App { val spark = SparkSession.builder() .master("local[3]") - .appName("StreamFromSocket3") + .appName(this.getClass.getName) .config("spark.streaming.stopGracefullyOnShutdown", "true") .config("spark.sql.shuffle.partitions", 3) .getOrCreate() diff --git a/spark/src/main/scala/com/yen/dev/StreamFromSocket4.scala b/spark/src/main/scala/com/yen/dev/StreamFromSocket4.scala index 0ceda93..dbf796b 100644 --- a/spark/src/main/scala/com/yen/dev/StreamFromSocket4.scala +++ b/spark/src/main/scala/com/yen/dev/StreamFromSocket4.scala @@ -18,7 +18,7 @@ object StreamFromSocket4 extends App{ val spark = SparkSession.builder() .master("local[3]") - .appName("StreamFromSocket") + .appName(this.getClass.getName) .config("spark.streaming.stopGracefullyOnShutdown", "true") .config("spark.sql.shuffle.partitions", 3) .getOrCreate() diff --git a/spark/src/main/scala/com/yen/dev/StreamFromSocket5.scala b/spark/src/main/scala/com/yen/dev/StreamFromSocket5.scala index f43fa7f..2bdb0a8 100644 --- a/spark/src/main/scala/com/yen/dev/StreamFromSocket5.scala +++ b/spark/src/main/scala/com/yen/dev/StreamFromSocket5.scala @@ -23,7 +23,7 @@ object StreamFromSocket5 extends App { val spark = SparkSession.builder() .master("local[3]") - .appName("StreamFromSocket5") + .appName(this.getClass.getName) .config("spark.streaming.stopGracefullyOnShutdown", "true") .config("spark.sql.shuffle.partitions", 3) .getOrCreate() diff --git a/spark/src/main/scala/com/yen/dev/TransformDemo1.scala b/spark/src/main/scala/com/yen/dev/TransformDemo1.scala index 79c3424..1b1b967 100644 --- a/spark/src/main/scala/com/yen/dev/TransformDemo1.scala +++ b/spark/src/main/scala/com/yen/dev/TransformDemo1.scala @@ -13,7 +13,7 @@ object TransformDemo1 extends App { // get sparkConf val sparkConf: SparkConf = new SparkConf() .setMaster("local[*]") - .setAppName("TransformDemo1") + .setAppName(this.getClass.getName) // get StreamingContext val ssc = new StreamingContext(sparkConf, Seconds(3)) diff --git a/spark/src/main/scala/com/yen/dev/TumblingWindowDemo1.scala b/spark/src/main/scala/com/yen/dev/TumblingWindowDemo1.scala index df782c1..d355ac8 100644 --- a/spark/src/main/scala/com/yen/dev/TumblingWindowDemo1.scala +++ b/spark/src/main/scala/com/yen/dev/TumblingWindowDemo1.scala @@ -15,7 +15,7 @@ object TumblingWindowDemo1 extends App { val spark = SparkSession.builder() .master("local[3]") - .appName("TumblingWindowDemo1") + .appName(this.getClass.getName) .config("spark.streaming.stopGracefullyOnShutdown", "true") .config("spark.sql.shuffle.partitions", 2) .getOrCreate() diff --git a/spark/src/main/scala/com/yen/streamSocketToHDFS/streamSocketEventToHDFSV5.scala b/spark/src/main/scala/com/yen/streamSocketToHDFS/streamSocketEventToHDFSV5.scala index d3b9cb3..bb3e20c 100644 --- a/spark/src/main/scala/com/yen/streamSocketToHDFS/streamSocketEventToHDFSV5.scala +++ b/spark/src/main/scala/com/yen/streamSocketToHDFS/streamSocketEventToHDFSV5.scala @@ -12,6 +12,7 @@ package com.yen.streamSocketToHDFS import org.apache.log4j.Logger import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ +import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType} object streamSocketEventToHDFSV5 extends App { @@ -53,6 +54,7 @@ object streamSocketEventToHDFSV5 extends App { val query = eventDF.writeStream .format("console") .option("checkpointLocation", "chk-point-dir") + .trigger(Trigger.ProcessingTime("10 second")) .start() query.awaitTermination()