Skip to content

Setting up Kafka in Linux Ubuntu 22.04 & Windows with Intellij IDEA

Notifications You must be signed in to change notification settings

Marouane-Elgoumiri/kafka_initiation

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

11 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Set up Kafka with Intellij idea

Java Apache Kafka Apache Maven Intellij Idea

I. Download latest version of Kafka from the official docs:

II. Extract the file and run Zookeper and kafka-server

Screenshot from 2024-04-29 10-25-40

open the extracted folder:

   cd kafka_2.12-3.7.0

Debian Linux

Debian Linux Ubuntu

run zookeper:

  bin/zookeeper-server-start.sh config/zookeeper.properties

run Kafka-server client:

  bin/kafka-server-start.sh config/server.properties

Windows

Windows Windows 11

Start Zookeeper

start bin\windows\zookeeper-server-start config\zookeeper.properties

Start Kafka server

   start bin\windows\kafka-server-start config\server.properties

Create Topic

   bin\windows\kafka-topics --create --topic quickstart-events --bootstrap-server localhost:9092
   bin\windows\kafka-topics --create --topic quickstart-events2 --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1

Create Producer

   Start bin\windows\kafka-console-producer –topic quickstart-events --bootstrap-server localhost:9092

Create Consumer

    Start bin\windows\kafka-console-consumer –topic quickstart-events --bootstrap-server localhost:9092  

III. Create a maven project:

Screenshot from 2024-04-29 10-27-58

Use Kafka Plugin in the Marketplace to create Topics directly from the IDE:

Screenshot from 2024-04-29 11-59-51

Create Topics:

Screenshot from 2024-04-29 12-03-43

1. Add the necessary Dependencies from Apache Kafka docs (Client & Streams)

  <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-streams</artifactId>
      <version>3.7.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.13</artifactId>
      <version>3.7.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>3.7.0</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.25</version>
    </dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
    </dependency>

2. Create Producer class:

  public class ProducerApp {
    public static void main(String[] args) {
        ProducerApp producerApp = new ProducerApp();
        producerApp.sendMessage();
    }

    public void sendMessage() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("GROUP_ID_CONFIG", "test-group-1");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        int i = 0;
        while(true){
            ProducerRecord<String, String> message = new ProducerRecord<>("test-topic", Integer.toString(i));
            producer.send(message);
            try {
                i++;
                System.out.println("message" + i);
                Thread.sleep(1000);
            }catch (Exception e) {
                System.out.println(e.getMessage());
            }
        }
    }
}

3. Create Consumer class:

   public class ConsumerApp {
    public static void main(String[] args) {
        ConsumerApp newConsumer = new ConsumerApp();
        newConsumer.consume();
    }

    public static void consume() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("GROUP_ID_CONFIG", "test-group-1");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test-topic"));
        Executors.newScheduledThreadPool(1).scheduleAtFixedRate(()-> {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            records.forEach(record -> {
                System.out.println("Key "+record.key()+" value "+record.value()+" topic:"+record.topic());
            });
        },1000, 1000, TimeUnit.MILLISECONDS);
    }
}

4. Result:

Screenshot from 2024-04-29 11-28-05

About

Setting up Kafka in Linux Ubuntu 22.04 & Windows with Intellij IDEA

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages