Skip to content

Commit

Permalink
add trigger processing time force spark process stream per 10 sec (St…
Browse files Browse the repository at this point in the history
…reamFromSocket5.scala
  • Loading branch information
yennan.liu committed Apr 27, 2021
1 parent ac20fdb commit 4f736e0
Showing 1 changed file with 2 additions and 0 deletions.
2 changes: 2 additions & 0 deletions spark/src/main/scala/com/yen/dev/StreamFromSocket5.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package com.yen.dev
import org.apache.log4j.Logger
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}

object StreamFromSocket5 extends App {
Expand Down Expand Up @@ -55,6 +56,7 @@ object StreamFromSocket5 extends App {
val query = eventDF.writeStream
.format("console")
.option("checkpointLocation", "chk-point-dir")
.trigger(Trigger.ProcessingTime("10 second"))
.start()

query.awaitTermination()
Expand Down

0 comments on commit 4f736e0

Please sign in to comment.