Skip to content

Spring Boot + Kafka Integration

Spring Kafka provides high-level abstractions for integrating Apache Kafka with Spring Boot applications, simplifying producer and consumer configuration.

Spring Kafka Dependency

Add Spring Kafka to your pom.xml:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

Producer Configuration (Frontend)

application.yaml

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      properties:
        spring.json.add.type.headers: false

KafkaProducerConfig.java

@Configuration
public class KafkaProducerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ProducerFactory<String, PurchaseOrderDTO> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, PurchaseOrderDTO> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

PurchaseOrderProducer.java

@Component
public class PurchaseOrderProducer {
    private final KafkaTemplate<String, PurchaseOrderDTO> kafkaTemplate;

    public void sendOrder(PurchaseOrderDTO order) {
        kafkaTemplate.send("purchase-orders", order.getOrderId(), order);
    }
}

Consumer Configuration (Backend)

application.yaml

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: order-processing-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "*"

@KafkaListener Annotation

@Component
public class PurchaseOrderConsumer {
    private final OrderCommandService orderCommandService;

    @KafkaListener(topics = "purchase-orders", groupId = "order-processing-group")
    public void consumeOrder(PurchaseOrderDTO order) {
        log.info("Received order: {}", order.getOrderId());
        orderCommandService.createOrder(order);
    }
}

Error Handling

  • Retry Logic: Configure retry attempts for failed message processing
  • Dead Letter Topics: Send failed messages to a separate topic for manual review
  • Error Handlers: Implement custom error handling with SeekToCurrentErrorHandler

Key Takeaways

  • Spring Kafka simplifies Kafka integration with Spring Boot
  • Configuration via YAML and Java config classes
  • @KafkaListener provides declarative consumer implementation
  • Serializers/deserializers handle message format conversion