1. Install and Setup Apache Kafka
2. Extract Kafka zip in the local file system
# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties
4. Start Kafka Broker
# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties
Once all services have successfully launched, you will have a basic Kafka environment running and ready to use.2. Create and Setup Spring Boot Project in IntelliJ
Add dependencies:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
3. Configure Kafka Producer and Consumer in an application.properties File
In the application.properties file, add Kafka broker address as well as Consumer and Producer related configuration.
Open the application.properties file and let's configure Kafka Producer and Consumer to exchange JSON messages:
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: group-id
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
We are using the following Producer property to convert Java object into JSON:
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.bootstrap-servers - Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for consumers.
spring.kafka.consumer.auto-offset-reset - What to do when there is no initial offset in Kafka or if the current offset no longer exists on the server.
spring.kafka.producer.key-serializer - Serializer class for keys.
4. Create Kafka Topic
To create a topic on startup, add a bean of type NewTopic. If the topic already exists, the bean is ignored. We will use the topic name "javaguides" in this example.
Let's create a KafkaTopicConfig file and add the following content:package net.javaguides.springbootkafka;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic javaguidesTopic(){
return TopicBuilder.name("javaguides")
.build();
}
}
5. Create Simple POJO to Serialize / Deserialize
Let's create a User class to send and receive a User object to and from a Kafka topic.
Well, the User instance will be serialized by JsonSerializer to a byte array. Kafka finally stores this byte array into the given partition of the particular topic.
During deserialization, JsonDeserializer is used to receive JSON from Kafka as a byte array, convert it JSON byte array to the User object, and return it to the application.
package net.javaguides.springbootkafka.payload;
public class User {
private int id;
private String firstName;
private String lastName;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", firstName='" + firstName + '\'' +
", lastName='" + lastName + '\'' +
'}';
}
}
6. Create Kafka Producer to Produce JSON Message
Let's create Kafka Producer to Produce JSON Messages using Spring Kafka.
KafkaTemplate
Well, Spring boot provides an auto-configuration for Spring’s KafkaTemplate so you can autowire it directly in your own beans.
package net.javaguides.springbootkafka.kafka;
import net.javaguides.springbootkafka.payload.User;
import net.javaguides.springbootkafka.utils.AppConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
@Autowired
private KafkaTemplate<String, User> kafkaTemplate;
public void sendMessage(User data){
LOGGER.info(String.format("Message sent -> %s", data.toString()));
Message<User> message = MessageBuilder
.withPayload(data)
.setHeader(KafkaHeaders.TOPIC, AppConstants.TOPIC_NAME)
.build();
kafkaTemplate.send(message);
}
}
Let’s start by sending a User object to a Kafka Topic.
Notice: we created a KafkaTemplate<String, User> since we are sending Java Objects to the Kafka topic that’ll automatically be transformed into a JSON byte[].
In this example, we created a Message using the MessageBuilder. It’s important to add the topic to which we are going to send the message too.
7. Create REST API to Send JSON Object
Let's create a simple POST REST API to send User information as a JSON object.
Instead of sending a message string, we will create a POST REST API to post a complete User object as a JSON so that the Kafka producer can able to write the User object to the Kafka topic.
package net.javaguides.springbootkafka.controller;
import net.javaguides.springbootkafka.kafka.KafkaProducer;
import net.javaguides.springbootkafka.payload.User;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/v1/kafka")
public class KafkaProducerController {
private KafkaProducer kafkaProducer;
public KafkaProducerController(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
@PostMapping("/publish")
public ResponseEntity<String> publish(@RequestBody User user){
kafkaProducer.sendMessage(user);
return ResponseEntity.ok("Message sent to kafka topic");
}
}
8. Create Kafka Consumer to Consume JSON Message
package net.javaguides.springbootkafka.kafka;
import net.javaguides.springbootkafka.payload.User;
import net.javaguides.springbootkafka.utils.AppConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafKaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafKaConsumer.class);
@KafkaListener(topics = AppConstants.TOPIC_NAME,
groupId = AppConstants.GROUP_ID)
public void consume(User data){
LOGGER.info(String.format("Message received -> %s", data.toString()));
}
}
9. Demo
Observe the console logs:
Conclusion
Video Course on YouTube - Spring Boot Apache Kafka Tutorial
YouTube Complete playlist at https://youtube.com/playlist?list=PLGRDMO4rOGcNLwoack4ZiTyewUcF6y6BU
Comments
Post a Comment
Leave Comment