Codebase for talk on Spring I/O 2022 in Barcelona about Spring for Apache Kafka

Overview

Spring I/O Barcelona 2022 - Spring Kafka beyond the basics

Codebase for my talk on Spring I/O 2022 in Barcelona about Spring for Apache Kafka

Slides

Project modules and applications

Applications Port Avro Topic(s) Description
spring-kafka-producer 8080 YES stock-quotes Simple producer of random stock quotes using Spring Kafka & Apache Avro.
spring-kafka-consumer 8082 YES stock-quotes Simple consumer of stock quotes using using Spring Kafka & Apache Avro.
stock-quote-kafka-streams-avro 8083 YES stock-quotes Simple Kafka Streams application using Spring Kafka & Apache Avro.
Module Description
avro-model Holds the Avro schema for the Stock Quote including avro-maven-plugin to generate Java code based on the Avro Schema. This module is used by both the producer, consumer and Kafka streams application.

Note Confluent Schema Registry is running on port: 8081 using Docker see: docker-compose.yml.

Version

  • Confluent Kafka 7.1.x
  • Confluent Schema Registry 7.1.x
  • Java 11
  • Spring Boot 2.6.x
  • Spring Kafka 2.8.5
  • Apache Avro 1.11

Components running in Docker

Build

./mvnw clean package

Run

without monitoring

docker-compose up -d

with monitoring

docker-compose -f docker-compose.yml -f docker-compose-monitoring.yml up -d

Poison pill scenario: Deserialization exceptions

Start the Kafka console producer from the command line

docker exec -it kafka bash
unset JMX_PORT
kafka-console-producer --bootstrap-server localhost:9092 --topic stock-quotes

The console is waiting for input. Now publish the poison pill:

💊

Result:

Kafka streams application:

  • Streams thread will stop
2022-05-23 14:44:00.014  INFO [streams-application,,] 20635 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [streams-application-6095314e-a3d3-4027-8215-dc9270a3b341-StreamThread-1] Shutdown complete
Exception in thread "streams-application-6095314e-a3d3-4027-8215-dc9270a3b341-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
	at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:83)
	at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
	at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
	at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:304)
	at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:960)
	at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1000)
	at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:914)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555)
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
	at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:250)
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:322)
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:112)
	at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
	at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:66)
	at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:38)
	at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
	at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
	at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
	... 9 more

Consumer application:

  • Will end up in an infinite loop of Deserialization exceptions
2022-05-23 14:44:22.864 ERROR [consumer-application,,] 20179 --- [eListener-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
	at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:149) ~[spring-kafka-2.8.5.jar:2.8.5]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1790) ~[spring-kafka-2.8.5.jar:2.8.5]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1297) ~[spring-kafka-2.8.5.jar:2.8.5]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition stock-quotes-0 at offset 76. If needed, please seek past the record to continue consumption.
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429) ~[kafka-clients-3.0.1.jar:na]
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:134) ~[kafka-clients-3.0.1.jar:na]
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1652) ~[kafka-clients-3.0.1.jar:na]
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1800(Fetcher.java:1488) ~[kafka-clients-3.0.1.jar:na]
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:721) ~[kafka-clients-3.0.1.jar:na]
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:672) ~[kafka-clients-3.0.1.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1277) ~[kafka-clients-3.0.1.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) ~[kafka-clients-3.0.1.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1166) ~[kafka-clients-3.0.1.jar:na]
	at jdk.internal.reflect.GeneratedMethodAccessor89.invoke(Unknown Source) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.19.jar:5.3.19]
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:208) ~[spring-aop-5.3.19.jar:5.3.19]
	at com.sun.proxy.$Proxy138.poll(Unknown Source) ~[na:na]
	at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:89) ~[brave-instrumentation-kafka-clients-5.13.7.jar:na]
	at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:83) ~[brave-instrumentation-kafka-clients-5.13.7.jar:na]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1521) ~[spring-kafka-2.8.5.jar:2.8.5]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1511) ~[spring-kafka-2.8.5.jar:2.8.5]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1339) ~[spring-kafka-2.8.5.jar:2.8.5]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1251) ~[spring-kafka-2.8.5.jar:2.8.5]
	... 3 common frames omitted
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
	at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:250) ~[kafka-schema-serializer-7.1.0.jar:na]
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:322) ~[kafka-avro-serializer-7.1.0.jar:na]
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:112) ~[kafka-avro-serializer-7.1.0.jar:na]
	at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) ~[kafka-avro-serializer-7.1.0.jar:na]
	at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) ~[kafka-clients-3.0.1.jar:na]
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1420) ~[kafka-clients-3.0.1.jar:na]
	... 23 common frames omitted

Magic byte?

The first byte of the message is a magic byte marker. This as an Avro serialized message. Without the magic byte marker the KafkaAvroDeserializer will refuse to read the message! Since we published a message without Avro using the console consumer we don't have a magic byte.

After configuring the dead letter topic recoverer you should see something in you log file like:

2022-05-23 15:49:51.969 DEBUG [consumer-application,14f637ce05adcec2,5263c9f7713ba810] 50590 --- [ad | producer-1] o.s.k.l.DeadLetterPublishingRecoverer    : Successful dead-letter publication: stock-quotes-1@2270 to stock-quotes.DLT-1@0

Exception handling in consumer

Publish a record to Kafka to trigger an exception in the StockQuoteConsumer see: ShakyStockQuoteService.

curl -v -X POST http://localhost:8080/api/quotes -H 'Content-Type: application/json' -d '{ "symbol": "KABOOM", "exchange": "AMS", "tradeValue": "101.99", "currency": "EUR", "description": "Trigger exception!" }'

Or using IntelliJ http client rest-api-requests.http

More advances configuration

Error handling based on exception type

In case you want to send messages to handle exception in a different way take a look at Spring Kafka's: CommonDelegatingErrorHandler.

See Spring Kafka documentation

Publish to different deadletter topic based on exception type

In case you want to send messages to different dead letter topics based on the exception take a look at custom destination resolvers:

private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> CUSTOM_DESTINATION_RESOLVER = (record, exception) -> {

    if (exception.getCause() != null && exception.getCause() instanceof DeserializationException) {
        return new TopicPartition(record.topic() + ".deserialization.failures", record.partition());
    }

    if (exception.getCause() instanceof MethodArgumentNotValidException) {
        return new TopicPartition(record.topic() + ".validation.failures", record.partition());
    }

    else {
        return new TopicPartition(record.topic() + ".other.failures", record.partition());
    }
};

@Bean
public DeadLetterPublishingRecoverer recoverer(KafkaTemplate<?, ?> bytesKafkaTemplate, KafkaTemplate<?, ?> kafkaTemplate) {
    Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates = new LinkedHashMap<>();
    templates.put(byte[].class, bytesKafkaTemplate);
    templates.put(StockQuote.class, kafkaTemplate);
    return new DeadLetterPublishingRecoverer(templates, CUSTOM_DESTINATION_RESOLVER);
}

Shutdown

docker-compose down -v
docker-compose -f docker-compose.yml -f docker-compose-monitoring.yml down -v
You might also like...

A high availability shopping(ecommerce) system using SpringBoot, Spring Cloud, Eureka Server, Spring Cloud Gateway, resillience4j, Kafka, Redis and MySQL.

A high availability shopping(ecommerce) system using SpringBoot, Spring Cloud, Eureka Server, Spring Cloud Gateway, resillience4j, Kafka, Redis and MySQL.

High-availability-shopping-system A high availability shopping(ecommerce) system using SpringBoot, Spring Cloud, Eureka Server, Spring Cloud Gateway,

Oct 26, 2022

:herb: 基于springboot的快速学习示例,整合自己遇到的开源框架,如:rabbitmq(延迟队列)、Kafka、jpa、redies、oauth2、swagger、jsp、docker、spring-batch、异常处理、日志输出、多模块开发、多环境打包、缓存cache、爬虫、jwt、GraphQL、dubbo、zookeeper和Async等等:pushpin:

:herb: 基于springboot的快速学习示例,整合自己遇到的开源框架,如:rabbitmq(延迟队列)、Kafka、jpa、redies、oauth2、swagger、jsp、docker、spring-batch、异常处理、日志输出、多模块开发、多环境打包、缓存cache、爬虫、jwt、GraphQL、dubbo、zookeeper和Async等等:pushpin:

欢迎大家留言和PR~ Tip: 技术更新换代太快,本仓库仅做参考,自己的项目具体使用哪个版本还需谨慎思考~(不推荐使用最新的版本,推荐使用(最新-1|2)的版本,会比较稳定) spring-boot-quick 前言   自己很早就想搞一个总的仓库就是将自己平时遇到的和学习到的东西整合在一起,方便后

Jan 2, 2023

🦄 开源社区系统:基于 SpringBoot + MyBatis + MySQL + Redis + Kafka + Elasticsearch + Spring Security + ... 并提供详细的开发文档和配套教程。包含帖子、评论、私信、系统通知、点赞、关注、搜索、用户设置、数据统计等模块。

🦄 开源社区系统:基于 SpringBoot + MyBatis + MySQL + Redis + Kafka + Elasticsearch + Spring Security + ... 并提供详细的开发文档和配套教程。包含帖子、评论、私信、系统通知、点赞、关注、搜索、用户设置、数据统计等模块。

Echo — 开源社区系统 项目上线到服务器之后可能会出现各种各样的 BUG,比如 Elasticsearch 服务启动失败导致搜索模块不可用,但是在本地运行是完全没问题的,所以各位小伙伴可以放心下载部署。 📚 项目简介 Echo 是一套前后端不分离的开源社区系统,基于目前主流 Java Web

Jan 7, 2023

Spring Boot Debezium Kafka PostgreSQL Relationship

Spring Boot Debezium Kafka PostgreSQL Relationship

spring-boot-debezium-db-kafka Spring-Boot-Debezium-Kafka-PostgreSQL Relationship Installation First,Configure docker-compose.yml Second,Write Db-Kafka

Aug 26, 2022

Microservices with Spring Boot and Kafka Demo Project

Microservices with Spring Boot and Kafka Demo Project

Example microservices showing how to use Kafka and Kafka Streams with Spring Boot on the example of distributed transactions implementations with the SAGA pattern

Jan 7, 2023

Kafka integration with Java Spring-boot: producer-consumer model

Kafka integration with Java Spring-boot with one application serving as a producer and the other consuming the messages

Apr 26, 2022

Sencillo Microservicio en Spring realizado con el alumnado de DAM en 2021/2022.

Sencillo Microservicio en Spring realizado con el alumnado de DAM en 2021/2022.

Spring-Productos-DAM Sencillo Microservicio para API Rest en Spring (SpringBoot) realizada conjuntamente con 2 DAM. Curso 2021/2022 Spring-Productos-D

Oct 6, 2022

循序渐进,学习Spring Boot、Spring Boot & Shiro、Spring Batch、Spring Cloud、Spring Cloud Alibaba、Spring Security & Spring Security OAuth2,博客Spring系列源码:https://mrbird.cc

Spring 系列教程 该仓库为个人博客https://mrbird.cc中Spring系列源码,包含Spring Boot、Spring Boot & Shiro、Spring Cloud,Spring Boot & Spring Security & Spring Security OAuth2

Jan 6, 2023

【咕泡学院实战项目】-基于SpringBoot+Dubbo构建的电商平台-微服务架构、商城、电商、微服务、高并发、kafka、Elasticsearch

【咕泡学院实战项目】-基于SpringBoot+Dubbo构建的电商平台-微服务架构、商城、电商、微服务、高并发、kafka、Elasticsearch

咕泡商城- 微服务架构实战 咕泡商城是咕泡学院 Java架构课程中,帮助学员对于技术更好落地的一个实战项目,项目基于springboot2.1.6.RELEASE+Dubbo2.7.3 来构建微服务。 业务模块划分,尽量贴合互联网公司的架构体系。所以,除了业务本身的复杂度不是很高之外,整体的架构基本

Dec 26, 2022
Owner
Tim van Baarsen
Tim van Baarsen
Kafka-spring-boot-starter: encapsulated based on spring-kafka

Encapsulation based on spring-kafka not only supports native configuration, but also adds multi data source configuration.

liudong 8 Jan 9, 2023
Project developed for MB Talk - App Center integration on React Native

App Center Setup Download APK Click here Requirements React Native CLI Yarn Sign Up on App Center and create a new App. How to use this repo Clone thi

André Angeloni 2 Oct 31, 2022
End to End project for Kafka Streams using Spring Cloud Kafka streams

Spring Kafka Streams using Spring Cloud Streams End to End example Endpoint http://localhost:8080/domain/lookup/facebook - to pull all facebook relate

TechPrimers 43 Dec 20, 2022
A React Native project starter with Typescript, a theme provider with hook to easy styling component, a folder architecture ready and some configs to keep a codebase clean.

React Native Boilerplate Folder structure : src ├── assets │   ├── audios │   ├── fonts │   ├── icons │   └── images ├── components │   ├── Layout.tsx

LazyRabbit 23 Sep 1, 2022
An open source codebase for sharing programming solutions.

Codinasion An open source codebase for sharing programming blogs and solutions. This repository contains the Markdown source files for codinasion.web.

Codinasion 0 Jan 2, 2023
An open source codebase for sharing programming solutions.

Codinasion An open source codebase for sharing programming blogs and solutions. This repository contains the Markdown source files for codinasion.web.

Codinasion 61 Oct 7, 2022
Hacktoberfest 2022 : Repository for open-source contributions towards Hacktoberfest 2022

Hacktoberfest 2022 OPEN Pull Request - FREE T-SHIRT's ?? DON'T COMMIT ~ PR REPO HAS BEEN EXCULDED ( DON'T KNOW THE EXACT REASON ~ RUMORS ARE THAT THIS

Saurabh Kumar 112 Jan 9, 2023
A Spring Boot Camel boilerplate that aims to consume events from Apache Kafka, process it and send to a PostgreSQL database.

SPRING-BOOT CAMEL BOILERPLATE This is a Spring-Boot Camel Application model that you can use as a reference to study or even to use in your company. I

Bruno Delgado 45 Apr 4, 2022
LOQUI - Real-time chat application built using Apache Kafka, Java, Spring Boot, SockJS and React

LOQUI is a simple real-time chat application that demonstrates how to use Apache Kafka as a message broker along with Java, Spring Boot and React on the front-end

Castanho Correia 2 Jun 5, 2022
Source code of Spring boot + Apache Kafka Udemy Course

Spring Boot + Apache Kafka - The Quickstart Practical Guide- Udemy course Course link: https://www.udemy.com/course/spring-boot-and-apache-kafka/?refe

Ramesh Fadatare 24 Dec 21, 2022