diff --git a/spark/src/main/scala/com/yen/dev/StreamFromSocket5.scala b/spark/src/main/scala/com/yen/dev/StreamFromSocket5.scala index 2ddf7bc..f43fa7f 100644 --- a/spark/src/main/scala/com/yen/dev/StreamFromSocket5.scala +++ b/spark/src/main/scala/com/yen/dev/StreamFromSocket5.scala @@ -15,6 +15,7 @@ package com.yen.dev import org.apache.log4j.Logger import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ +import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType} object StreamFromSocket5 extends App { @@ -55,6 +56,7 @@ object StreamFromSocket5 extends App { val query = eventDF.writeStream .format("console") .option("checkpointLocation", "chk-point-dir") + .trigger(Trigger.ProcessingTime("10 second")) .start() query.awaitTermination()