diff --git a/spark/src/main/scala/com/yen/streamSocketToHDFS/streamSocketEventToHDFSV5.scala b/spark/src/main/scala/com/yen/streamSocketToHDFS/streamSocketEventToHDFSV5.scala index bb3e20c..7f6db60 100644 --- a/spark/src/main/scala/com/yen/streamSocketToHDFS/streamSocketEventToHDFSV5.scala +++ b/spark/src/main/scala/com/yen/streamSocketToHDFS/streamSocketEventToHDFSV5.scala @@ -19,6 +19,8 @@ object streamSocketEventToHDFSV5 extends App { @transient lazy val logger: Logger = Logger.getLogger(getClass.getName) + logger.info("process stream start") + val spark = SparkSession.builder() .master("local[3]") .appName(this.getClass.getName) @@ -52,10 +54,12 @@ object streamSocketEventToHDFSV5 extends App { eventDF.printSchema() val query = eventDF.writeStream - .format("console") + .format("json") .option("checkpointLocation", "chk-point-dir") .trigger(Trigger.ProcessingTime("10 second")) + .option("path", "streamSocketEventToHDFSV5") .start() + logger.info("process stream end") query.awaitTermination() }