Skip to content

Latest commit

 

History

History
301 lines (257 loc) · 9.39 KB

README.md

File metadata and controls

301 lines (257 loc) · 9.39 KB

KafkaSparkPoc

Kafka-Spark streaming POC project

Intro

  • Projects
    • Spark : Spark application code
    • Kafka : Kafka application code
  • programming language
    • Scala, Java
  • Framework
    • Spark, Kafka
  • Build tool
    • SBT
  • IDE
    • IntellJ

Scope (Kafka - Spark)

- Kafka -> Spark
- Kafka -> Spark -> Kafka
- Kafka -> Kafka -> Spark
- Kafka -> Spark -> HDFS
- Spark -> Kafka

Scope (Kafka)

Kafka

Scope (Spark)

Spark
  • Transformation
    • value
      • map :
        • implement single data point
      • mapPartitions :
        • implement on data points in the SAME partition, may cause OOM
        • good to use when have large memory -> better efficiency
      • mapPartitionsWithIndex
      • flatMap
        • similiar to map, but every input element will be "merged" as an array
      • glom
        • make every partition as an array, and form a RDD with type RDD[Array[T]]
      • groupBy
        • group based on input func, and put values with same key into the same iterator
      • filter
      • sample
      • distinct
      • coalesce
      • repartition
      • sortBy
    • key-value
      • partitionedBy
      • reduceByKey
        • aggregate on key, it has a pre combine step before shuffle, return type : RDD[k,v]
        • reduceByKey1
      • groupByKey
        • group by key, and shuffle directly
        • reduceByKey is more preferable than groupByKey in general cases, but still need to consider biz requirements
      • aggregateByKey
      • foldByKey
          • General ordering :
          • aggregateByKey -> foldByKey -> reduceByKey
        • foldByKey1
      • combineByKey
      • sortedByKey
      • join
      • cogroup
  • Action
    • reduce(func)
      • via func aggregate records in same partition, then aggregate records across partitions
      • there is a "pre-process" step
    • collect
      • workers will aggrgate all its result, then send back to driver
      • DON'T use it in prod env
    • count
    • first
    • take(n)
    • takeOrdered(n)
    • aggregate
    • fold(num)(func)
    • saveAsTextFile
    • saveAsSequenceFile
    • saveAsObjectFile
    • countByKey
    • foreach(func)

Structure

├── Makefile      - kafka help commands
├── README.md
├── data          - sample data for app demo
├── doc           - collection of project docs
├── exampleCode   - external spark stream example code
├── kafka         - Kafka application source code
├── kafkaJava     - Kafka application source code (Java)
├── mk.d
├── script        - helper scripts
├── spark         - Spark application source code

Build

Build
# build spark project
cd spark
sbt clean asembly

# build kafka project
cd kafka
sbt clean assembly

Prerequisites

Prerequisites
  • Install
    • Java JDK 1.8
    • Scala
    • Spark 2.X
    • sbt
    • Kafka
    • HDFS (optional)
# launch kafka
make run_kz

# create kafka topic
kafka-topics --create -zookeeper localhost:2181 --replication-factor 1  --partitions 1 --topic <new_topic>

Run Basic examples

Run Basic examples
  • Spark stream from Kafka with Schema and write back to Kafka
  • example.json
# start zookeeper, kafka
make run_kz
# create kafka topic
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic invoices4
# start producer  
kafka-console-producer --broker-list localhost:9092 --topic invoices4
# and paste some sample data below (sample.json) in the producer console, check the spark-streaming result at /output

# and run the spark-submit script
spark-submit \
 --class com.yen.dev.StreamFromKafkaWithSchema1 \
 target/scala-2.11/spark-app-assembly-0.0.1.jar
  • Spark stream from Kafka with Schema and write back to Kafka
# start zookeeper, kafka
make run_kz
# create kafka topic
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic invoices6
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic notifications
# start producer  
kafka-console-producer --broker-list localhost:9092 --topic invoices5
# start consumer
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic notifications 
# and run the spark-submit script
spark-submit \
 --class com.yen.dev.KafkaSinkDemo1 \
 target/scala-2.11/spark-app-assembly-0.0.1.jar
  • Spark stream from Kafka with Schema and write back to Kafka in avro format
  • example.json
# start zookeeper, kafka
make run_kz
# create kafka topic
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic invoices_avro
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic invoice_avro_output
# start producer  
kafka-console-producer --broker-list localhost:9092 --topic invoices_avro
# start consumer
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic invoice_avro_output 
# and run the spark-submit script
spark-submit \
 --class com.yen.dev.KafkaSinkDemo1 \
 target/scala-2.11/spark-app-assembly-0.0.1.jar
  • Spark stream from Kafka with Schema and process with Tumbling Window for total buy and sell values
  • samples.txt
# start zookeeper, kafka
make run_kz
# create kafka topic
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic trades
# start producer  
kafka-console-producer --broker-list localhost:9092 --topic trades
# and run the spark-submit script
spark-submit \
 --class com.yen.dev.TumblingWindowDemo1 \
 target/scala-2.11/spark-app-assembly-0.0.1.jar
  • Spark stream event from socket to HDFS file system
# open a socket at port 9999
nc -lk 9999
# and run the spark-submit script
spark-submit \
 --class com.yen.streamToHDFS.streamSocketEventToHDFS \
 target/scala-2.11/spark-app-assembly-0.0.1.jar

# check the data
hdfs dfs -ls streamSocketEventToHDFS

Run examples

Run examples

1. Digest Kafka stream and emit to Kafka

Event Source -----------> Kafka -----------> Spark Stream  -----------> Kafka 
                                topic = event_raw        topic = event_clean
# create topic
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic event_raw
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic event_clean

# start consumer
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic event_raw

kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic event_clean
spark-submit \
 --class com.yen.DigestKafkaEmitKafka \
 target/scala-2.11/spark-app-assembly-0.0.1.jar

Ref

Ref