Spring Boot + Kafka Integration
Spring Kafka provides high-level abstractions for integrating Apache Kafka with Spring Boot applications, simplifying producer and consumer configuration.
Spring Kafka Dependency
Add Spring Kafka to your pom.xml:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Producer Configuration (Frontend)
application.yaml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
spring.json.add.type.headers: false
KafkaProducerConfig.java
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, PurchaseOrderDTO> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, PurchaseOrderDTO> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
PurchaseOrderProducer.java
@Component
public class PurchaseOrderProducer {
private final KafkaTemplate<String, PurchaseOrderDTO> kafkaTemplate;
public void sendOrder(PurchaseOrderDTO order) {
kafkaTemplate.send("purchase-orders", order.getOrderId(), order);
}
}
Consumer Configuration (Backend)
application.yaml
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: order-processing-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "*"
@KafkaListener Annotation
@Component
public class PurchaseOrderConsumer {
private final OrderCommandService orderCommandService;
@KafkaListener(topics = "purchase-orders", groupId = "order-processing-group")
public void consumeOrder(PurchaseOrderDTO order) {
log.info("Received order: {}", order.getOrderId());
orderCommandService.createOrder(order);
}
}
Error Handling
- Retry Logic: Configure retry attempts for failed message processing
- Dead Letter Topics: Send failed messages to a separate topic for manual review
- Error Handlers: Implement custom error handling with
SeekToCurrentErrorHandler
Key Takeaways
- Spring Kafka simplifies Kafka integration with Spring Boot
- Configuration via YAML and Java config classes
- @KafkaListener provides declarative consumer implementation
- Serializers/deserializers handle message format conversion