From b9188f9f798827d9422a570a94bf5af575ebf4f2 Mon Sep 17 00:00:00 2001 From: "yennan.liu" Date: Tue, 27 Apr 2021 10:34:46 +0800 Subject: [PATCH] update streamSocketEventToHDFSV5.scala --- .../yen/streamSocketToHDFS/streamSocketEventToHDFSV5.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/com/yen/streamSocketToHDFS/streamSocketEventToHDFSV5.scala b/spark/src/main/scala/com/yen/streamSocketToHDFS/streamSocketEventToHDFSV5.scala index bb3e20c..7f6db60 100644 --- a/spark/src/main/scala/com/yen/streamSocketToHDFS/streamSocketEventToHDFSV5.scala +++ b/spark/src/main/scala/com/yen/streamSocketToHDFS/streamSocketEventToHDFSV5.scala @@ -19,6 +19,8 @@ object streamSocketEventToHDFSV5 extends App { @transient lazy val logger: Logger = Logger.getLogger(getClass.getName) + logger.info("process stream start") + val spark = SparkSession.builder() .master("local[3]") .appName(this.getClass.getName) @@ -52,10 +54,12 @@ object streamSocketEventToHDFSV5 extends App { eventDF.printSchema() val query = eventDF.writeStream - .format("console") + .format("json") .option("checkpointLocation", "chk-point-dir") .trigger(Trigger.ProcessingTime("10 second")) + .option("path", "streamSocketEventToHDFSV5") .start() + logger.info("process stream end") query.awaitTermination() }