In this tutorial, we will learn how to Apache Kafka in Spring boot applications. We will see how to create Kafka Producer, Topics, Consumer, and how to exchange different data formats (String and JSON) between Producer and Consumer using Kafka broker.
Let's begin with What is Kafka?
What is Apache Kafka?
Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.Apache Kafka Core Concepts
We will discuss following Apache Kafka core concepts:
1. Kafka Cluster2. Kafka Broker
3. Kafka Producer
4. Kafka Consumer
5. Kafka Topic
6. Kafka Partitions
7. Kafka Offsets
8. Kafka Consumer Group
1. Kafka Cluster
2. Kafka Broker
3. Kafka Producer
4. Kafka Consumer
5. Kafka Topic
- Topic is like a table in a database or folder in a file system.
- Topic is identified by a name.
- You can have any number of topics.
7. Kafka Offsets
Spring Boot Kafka Producer and Consumer Example
In this section of the tutorial, we will learn how to create Kafka Producer and Consumer in Spring Boot Kafka project.
The Spring Team provides Spring for Apache Kafka dependency to work with the development of Kafka-based messaging solutions.YouTube Video Tutorial:
1. Install and Setup Apache Kafka
2. Extract Kafka zip in the local file system
# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties
4. Start Kafka Broker
# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties
Once all services have successfully launched, you will have a basic Kafka environment running and ready to use.2. Create and Setup Spring Boot Project in IntelliJ
Add dependencies:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
3. Configure Kafka Producer and Consumer in an application.properties File
In the application.properties file, add Kafka broker address as well as Consumer and Producer related configuration.
Open the application.properties file and the following content to it:
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: group-id
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer
Let's understand the above spring boot provided Kafka properties:
spring.kafka.consumer.group-id - specifies a unique string that identifies the consumer group this consumer belongs to.spring.kafka.consumer.auto-offset-reset property - specifies what to do when there is no initial offset in Kafka or if the current offset does not exist anymore on the server (e.g. because that data has been deleted):
- earliest: automatically reset the offset to the earliest offset
- latest: automatically reset the offset to the latest offset
- none: throw an exception to the consumer if no previous offset is found for the consumer’s group
- anything else: throw an exception to the consumer.
spring.kafka.consumer.key-deserializer - specifies the deserializer class for keys.
spring.kafka.consumer.value-deserializer - specifies the deserializer class for values.
spring.kafka.producer.key-deserializer - specifies the serializer class for keys.
spring.kafka.producer.value-deserializer - specifies the serializer class for values.
4. Create Kafka Topic
To create a topic on startup, add a bean of type NewTopic. If the topic already exists, the bean is ignored. We will use the topic name "javaguides" in this example.
Let's create a KafkaTopicConfig file and add the following content:package net.javaguides.springbootkafka;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic javaguidesTopic(){
return TopicBuilder.name("javaguides")
.build();
}
}
5. Create Kafka Producer
Creating a producer will write our messages on the topic.
KafkaTemplate
Well, Spring boot provides an auto-configuration for Spring’s KafkaTemplate so you can autowire it directly in your own beans.
For example:
package net.javaguides.springbootkafka.kafka;
import net.javaguides.springbootkafka.utils.AppConstants;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message){
LOGGER.info(String.format("Message sent -> %s", message));
kafkaTemplate.send(AppConstants.TOPIC_NAME, message);
}
}
Create a utils package and within this package create AppConstants with the following content:
package net.javaguides.springbootkafka.utils;
public class AppConstants {
public static final String TOPIC_NAME = "javaguides";
public static final String GROUP_ID = "group_id";
}
KafKaProducer class uses KafkaTemplate to send messages to the configured topic name.
6. Create REST API to Send Message
Create controller package, within controller package create KafkaProducerController with the following content to it:
package net.javaguides.springbootkafka;
import net.javaguides.springbootkafka.kafka.KafkaProducer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/v1/kafka")
public class KafkaProducerController {
private KafkaProducer kafkaProducer;
public KafkaProducerController(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
@GetMapping("/publish")
public ResponseEntity<String> publish(@RequestParam("message") String message){
kafkaProducer.sendMessage(message);
return ResponseEntity.ok("Message sent to kafka topic");
}
}
See Topic Messages via Command Line:
bin/kafka-console-consumer.sh --topic javaguides --from-beginning --bootstrap-server localhost:9092
Make sure to change the topic name. In our case "javaguides" is the topic name.
7. Create Kafka Consumer
Kafka Consumer is the service that will be responsible for reading messages and processing them according to the needs of your own business logic.
To set it up, enter the following:
package net.javaguides.springbootkafka.kafka;
import net.javaguides.springbootkafka.utils.AppConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafKaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafKaConsumer.class);
@KafkaListener(topics = AppConstants.TOPIC_NAME,
groupId = AppConstants.GROUP_ID)
public void consume(String message){
LOGGER.info(String.format("Message received -> %s", message));
}
}
Here, we told our method void to consume (String message) to subscribe to the user’s topic and just emit every message to the application log. In your real application, you can handle messages the way your business requires you to.
KafkaListener endpoint:
@KafkaListener(topics = AppConstants.TOPIC_NAME,
groupId = AppConstants.GROUP_ID)
public void consume(String message){
LOGGER.info(String.format("Message received -> %s", message));
}
8. Demo
Check out the below tutorial to learn how to exchange JSON messages between Producer and Consumer using Apache Kafka in the Spring boot application.
Spring Boot Kafka JsonSerializer and JsonDeserializer Example
Conclusion
In this tutorial, we have learned:Video Course on YouTube - Spring Boot Apache Kafka Tutorial
YouTube Complete playlist at https://youtube.com/playlist?list=PLGRDMO4rOGcNLwoack4ZiTyewUcF6y6BU
Comments
Post a Comment
Leave Comment