KC4Streams - a simple Java library that provides utility classes and standard implementations for most of the Kafka Streams pluggable interfaces

Overview

Java CI Build kc4streams kc4streams kc4streams kc4streams badge

KC4Streams: Kafka Commons for Streams

A collection of common utilities and reusable classes for Kafka Streams applications.

Description

KC4Streams (which stands for Kafka Commons for Streams) is a simple Java library that provides utility classes and standard implementations for most of the Kafka Streams pluggable interfaces.

Built With

KC4 Streams is built with the following dependencies:

  • Java 17

  • Kafka Streams (>=3.1.x)

Getting Started ?

Add the following dependency to your project :

Maven
<dependency>
    <groupId>io.streamthoughtsgroupId>
    <artifactId>kc4streamsartifactId>
    <scope>${kc4streams.version}scope>
dependency>
Gradle
implementation group: 'io.streamthoughts', name: 'kc4streams', version: '1.0.0'

Usage ?

Error handling

KafkaStreams allows you to register handler classes to specify how an exception should be handled.

Here is the three interfaces that you can implement and configure :

  • ProductionExceptionHandler: Specifies how an exception when attempting to produce a result to Kafka should be handled.

  • DeserializationExceptionHandler: Specifies how an exception when attempting to deserialize an input record should be handled.

  • StreamsUncaughtExceptionHandler: Specifies how an exception when processing a record should be handled.

For example, here is how you can set a custom deserialization exception :

clientProps.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class.getName());

By default, KafkaStreams only provides a few built-in implementations for those interface that are not sufficient for a production usage.

Kafka Streams and the DLQ (Dead Letter Queue)

In addition to the built-in exception handlers that Kafka Streams provides, KC4Streams implement custom handlers that let you send the record to a special Kafka topics acting as a DLQ.

Here is how you can configure them:

// Handling production exception
clientProps.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, DeadLetterTopicProductionExceptionHandler.class.getName());

// Handling deserialization exception
clientProps.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeadLetterTopicProductionExceptionHandler.class.getName());

// Handling stream uncautch exception
var client = new KafkaStreams(buildTopology(), new StreamsConfig(clientProps));
client.setUncaughtExceptionHandler(new DeadLetterTopicStreamUncaughtExceptionHandler(clientProps));

All the exception handlers can be configured with some default properties:

Property

Description

Type

Default

exception.handler.dlq.default.topic-extractor

Specifies the fully-classified name of the class to be used for extracting the name of dead-letter topic

class

DefaultDLQTopicNameExtractor

exception.handler.dlq.default.reponse

The default response that must be returned by the handler [FAIL|CONTINUE]

string

exception.handler.dlq.default.return-fail-on-errors

Specifies the comma-separated list of FQCN of the exceptions on which the handler must fail

List

exception.handler.dlq.default.return-continue-on-errors

Specifies the comma-separated list of FQCN of the exceptions on which the handler must continue.

exception.handler.dlq.default.headers.

Specifies the value of a custom record-header to be added to the corrupted record send into the dead-letter topic.

string

Setting the default DLQ name

By default, DLQ handlers will send records in error to a dedicated topic named:

  • <[source|sink]-topic-name>. . (e.g., input-topic-1.error.my-streaming-application)

Property

Description

Type

Default

exception.handler.dlq.default.topic-suffix

Specifies the suffix to be used for naming the DLQ (optional)

string

error.

exception.handler.dlq.default.topic-name

Specifies the name of the DLQ to be used (optional)

string

error.

exception.handler.dlq.default.topic-per-application-id

Specifies whether the application-id for Kafka Streams should be used for naming the DLQ.

boolean

true.

In addition, you can implement custom DLQTopicNameExtractor class, as follows:

class CustomDLQTopicNameExtractor implements DeadLetterTopicNameExtractor {
    public String extract(final byte[] key, final byte[] value, final FailedRecordContext recordContext) {
        return recordContext.topic() + "-DLQ";
    }
}

Then, you can configure that custom `DeadLetterTopicNameExtractor as follows:

clientProps.put(DLQExceptionHandlerConfig.DLQ_DEFAULT_TOPIC_NAME_EXTRACTOR_CONFIG, CustomDLQTopicNameExtractor.class.getName());
Handling Production Exceptions

The DLQProductionExceptionHandler configuration can be overridden with those following properties.

Configuration

Property

Description

Type

Default

exception.handler.dlq.production.topic-extractor

Specifies the fully-classified name of the class to be used for extracting the name of dead-letter topic

class

DefaultDLQTopicNameExtractor

exception.handler.dlq.production.reponse

The default response that must be returned by the handler [FAIL|CONTINUE]

string

exception.handler.dlq.production.return-fail-errors

Specifies the comma-separated list of FQCN of the exceptions on which the handler must fail

List

exception.handler.dlq.production.return-continue-errors

Specifies the comma-separated list of FQCN of the exceptions on which the handler must continue.

exception.handler.dlq.production.headers.

Specifies the value of a custom record-header to be added to the corrupted record send into the dead-letter topic.

string

Handling Deserialization Exceptions

The DLQDeserializationExceptionHandler configuration can be overridden with those following properties.

Configuration

Property

Description

Type

Default

exception.handler.dlq.deserialization.topic-extractor

Specifies the fully-classified name of the class to be used for extracting the name of dead-letter topic

class

DefaultDLQTopicNameExtractor

exception.handler.dlq.deserialization.reponse

The default response that must be returned by the handler [FAIL|CONTINUE]

string

exception.handler.dlq.deserialization.return-fail-on-errors

Specifies the comma-separated list of FQCN of the exceptions on which the handler must fail

List

exception.handler.dlq.deserialization.return-continue-on-errors

Specifies the comma-separated list of FQCN of the exceptions on which the handler must continue.

exception.handler.dlq.deserialization.headers.

Specifies the value of a custom record-header to be added to the corrupted record send into the dead-letter topic.

string

Handling Stream Uncaught Exceptions

The DLQStreamUncaughtExceptionHandler configuration can be overridden with those following properties.

Configuration

Property

Description

Type

Default

exception.handler.dlq.streams.topic-extractor

Specifies the fully-classified name of the class to be used for extracting the name of dead-letter topic

class

DefaultDLQTopicNameExtractor

exception.handler.dlq.streams.reponse

The default response that must be returned by the handler [FAIL|CONTINUE]

string

exception.handler.dlq.streams.return-fail-errors

Specifies the comma-separated list of FQCN of the exceptions on which the handler must fail

List

exception.handler.dlq.streams.continue.errors

Specifies the comma-separated list of FQCN of the exceptions on which the handler must continue.

exception.handler.dlq.streams.headers.

Specifies the value of a custom record-header to be added to the corrupted record send into the dead-letter topic.

string

Handling Processing Exceptions

All the exception handlers that we discussed above internally used a singleton instance of the DLQRecordCollector class to send records to dedicated DLQs with contextual information about the errors.

The DLQRecordCollector accepts the following config properties for specifying, for example, whether DLQs topic should be automatically created

Configuration

Property

Description

Type

Default

exception.handler.dlq.global.producer.

Specifies the Producer’s config properties to override

-

-

exception.handler.dlq.global.admin.

Specifies the AdminClient’s config properties to override

-

-

exception.handler.dlq.topics.auto-create-enabled

Specifies whether missing DLQ topics should be automatically created.

string

true

exception.handler.dlq.topics.num-partitions

Specifies the number of partitions to be used for DLQ topics.

integer

-1

exception.handler.dlq.topics.replication-factors

Specifies the replication factor to be used for DLQ topics.

short

-1

Usage
// Create KafkaStreams client configuration
Map<String, Object> streamsConfigs = new HashMap<>();

// Initialize the GlobalDeadLetterTopicCollector.
DLQRecordCollector.getOrCreate(streamsConfigs);

// Create a Kafka Stream Topology
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, String> stream = streamsBuilder.stream(INPUT_TOPIC);
stream.mapValues((key, value) -> {
    Long output = null;
    try {
        output = Long.parseLong(value);
    } catch (Exception e) {
        // Sends the corrupted-record to a DLQ
        DLQRecordCollector.get().send(
                INPUT_TOPIC + "-DLQ",
                key,
                value,
                Serdes.String().serializer(),
                Serdes.String().serializer(),
                Failed.withProcessingError((String) streamsConfigs.get(StreamsConfig.APPLICATION_ID_CONFIG), e)
        );
    }
    return output;
});

Recording the failure reason using message headers

Each message sent to a DLQ is enriched with headers containing information about the reason for the message’s rejection.

Here’s the list of headers:

Header

Description

__streams.errors.topic

The topic of the record in error.

__streams.errors.partition

The partition of the record in error.

__streams.errors.offset

The offset of the record in error (empty for production error).

__streams.errors.timestamp

The epoch-timestamp of the error.

__streams.errors.stage

The stage of the error [DESERIALIZATION | PRODUCTION | PROCESSING | STREAMS]

__streams.errors.exception.message

The exception message

__streams.errors.exception.class.name

The exception class name

__streams.errors.exception.stacktrace

The exception stacktrace

__streams.errors.application.id

The stream application id.

SafeDeserializer & SafeSerde

Another solution for dealing with deserialization exception is to return a sentinel-value (e.g. null, "N/A", -1) when a corrupted-record (a.k.a. poison-pill) is handle by a Kafka Deserializer.

The SafeDeserializer can be used to wrap an existing Deserializer to catch any Exception that may be thrown when attempting to deserialize a record and return a configured (or default) value.

Creating a SafeDeserializer
SafeDeserializer deserializer = new SafeDeserializer<>(
    new GenericAvroSerde().deserializer(), // the delegating deserializer
    (GenericRecord)null     			   // the sentinel-object to return when an exception is catch
);
Configuring a SafeDeserializer
SafeDeserializer<Double> deserializer = new SafeDeserializer<>(
    Serdes.Double().deserializer(), // the delegating deserializer
    Double.class    		        // the value type
);

Map<String, Object> configs = new HashMap<>();
configs.put(SafeDeserializerConfig.SAFE_DESERIALIZER_DEFAULT_VALUE_CONFIG, 0.0);
deserializer.configure(configs, false);

In addition, you can use the SafeSerde utility class that allows wrapping an existing Serde or Deserializer.

Behind the scene, SafeSerde uses the SafeDeserializer for wrapping existing Deserializer.

Serde<String> stringSerde = SafeSerdes.Double();
// or
SafeSerdes.serdeFrom(Serdes.String(), 0.0);

RocksDB

How to tune internal RocksDB state stores ?

KafkaStreams relies on RocksDB an embedded key-value store to provided persistent storage. Depending on the throughput of your application, you may want to tune internal RocksDB instances. Kafka Streams allows you to customize the RocksDB settings for a given Store by implementing the interface org.apache.kafka.streams.state.RocksDBConfigSetter.

The custom implementation must then be configured using :

streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class);

KC4Streams provides a built-in io.streamthoughts.kc4streams.rocksdb.StreamsRocksDBConfigSetter that allows overriding not only some default RocksDB options but also to enable log statistics, for performance debugging, and shared memory usage.

Configuration

Property

Description

Type

Default

rocksdb.stats.enable

Enable RocksDB statistics

boolean

-

rocksdb.stats.dump.period.sec

Specifies the RocksDB statistics dump period in seconds.

integer

-

rocksdb.log.dir

Specifies the RocksDB log directory

`string

rocksdb.log.level

Specifies the RocksDB log level (see org.rocksdb.InfoLogLevel).

string

-

rocksdb.log.max.file.size

Specifies the RocksDB maximum log file size.

integer

-

rocksdb.max.write.buffer.number

Specifies the maximum number of memtables build up in memory before they flush to SST files.

integer

rocksdb.write.buffer.size

Specifies the size of a single memtable.

long

-

rocksdb.memory.managed

Enable automatic memory management across all RocksDB instances.

boolean

false

rocksdb.memory.write.buffer.ratio

Specifies the ratio of total cache memory which will be reserved for write buffer manager. This property is only used when rocksdb.memory.managed is set to true.

double

0.5

rocksdb.memory.high.prio.pool.ratio

Specifies the ratio of cache memory that is reserved for high priority blocks (e.g.: indexes filters and compressions blocks).

double

0.1

rocksdb.memory.strict.capacity.limit

Create a block cache with strict capacity limit (i.e. insert to the cache will fail when cache is full). This property is only used when rocksdb.memory.managed is set to true or rocksdb.block.cache.size is set.

boolean

false

rocksdb.block.cache.size

Specifies the total size to be used for caching uncompressed data blocks.

long

false

rocksdb.compaction.style

Specifies the compaction style.

string

-

rocksdb.compression.type

Specifies the compression type.

string

-

rocksdb.files.open

Specifies the maximum number of open files that can be used per RocksDB instance.

long

-

rocksdb.max.background.jobs

Specifies the maximum number of concurrent background jobs (both flushes and compactions combined).

integer

-

Example
var streamsConfig = new HashMap<String, Object>();
streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, StreamsRocksDBConfigSetter.class);
streamsConfig.put(RocksDBConfig.ROCKSDB_MEMORY_MANAGED_CONFIG, true);
streamsConfig.put(RocksDBConfig.ROCKSDB_STATS_ENABLE_CONFIG, true);
streamsConfig.put(RocksDBConfig.ROCKSDB_LOG_DIR_CONFIG, "/tmp/rocksdb-logs");
Note
Please read the official documentation for more information: RocksDB Tuning Guide

StateListener

KafkaStreams allows you to register a StateRestoreListener for listening to various states of the restoration process of a StateStore.

You can set the LoggingStateRestoreListener implementation for logging the restoration process.

Contribute to KC4Streams

We’re an active open source software community. We welcome and value contributions from everyone. Any feedback, bug reports and PRs are greatly appreciated!

Talk to us

To talk with our community about development related topics:

  • Open an issue on GitHub for questions, improvement suggestions or anything related to the use of KC4Streams.

Issue Tracker

We use GitHub to track all code related issues: https://github.com/streamthoughts/kc4streams/issues.

Development

To build this project (using Maven Wrapper)

./mvwn clean package

Licence

Copyright 2022 StreamThoughts.

Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License

You might also like...

Mirror of Apache Kafka

Apache Kafka See our web site for details on the project. You need to have Java installed. We build and test Apache Kafka with Java 8, 11 and 15. We s

Jan 5, 2023

Kryptonite is a turn-key ready transformation (SMT) for Apache Kafka® Connect to do field-level 🔒 encryption/decryption 🔓 of records. It's an UNOFFICIAL community project.

Kryptonite - An SMT for Kafka Connect Kryptonite is a turn-key ready transformation (SMT) for Apache Kafka® to do field-level encryption/decryption of

Jan 3, 2023

A command line client for Kafka Connect

kcctl -- A CLI for Apache Kafka Connect This project is a command-line client for Kafka Connect. Relying on the idioms and semantics of kubectl, it al

Dec 19, 2022

A command line client for Kafka Connect

A command line client for Kafka Connect

🧸 kcctl – Your Cuddly CLI for Apache Kafka Connect This project is a command-line client for Kafka Connect. Relying on the idioms and semantics of ku

Dec 19, 2022

Publish Kafka messages from HTTP

Kafka Bridge Publish Kafka messages from HTTP Configuration Example configuration for commonly used user + password authentication: kafka-bridge: ka

Nov 9, 2021

Implementação de teste com Kafka

Implementação de teste com Kafka

TesteKafka01 Implementação de teste com Kafka Projeto criado para estudo e testes com Kafka Recursos que estarão disponiveis: -Envio de msg -Recebe Ms

Sep 17, 2021

MemQ is a new PubSub system that augments Kafka

MemQ is a new PubSub system that augments Kafka

MemQ: An efficient, scalable cloud native PubSub system MemQ is a new PubSub system that augments Kafka at Pinterest. It uses a decoupled storage and

Dec 30, 2022

Aula sobre segurança no kafka usando SSL

Aula sobre segurança no kafka usando SSL

Kafka4Devs - Segurança no Kafka com SSL Você sabe o que acontece por debaixo dos panos de uma aplicação segura? Sabe como empresas grandes que utiliza

Feb 28, 2022

EventStoreDB is the database for Event Sourcing. This repository provides a sample of event sourced system that uses EventStoreDB as event store.

EventStoreDB is the database for Event Sourcing. This repository provides a sample of event sourced system that uses EventStoreDB as event store.

Event Sourcing with EventStoreDB Introduction Example Domain Event Sourcing and CQRS 101 State-Oriented Persistence Event Sourcing CQRS Advantages of

Dec 15, 2022
Releases(v1.0.0)
Owner
StreamThoughts
The leading French start-up in event streaming.
StreamThoughts
Kafka example - a simple producer and consumer for kafka using spring boot + java

Kafka example - a simple producer and consumer for kafka using spring boot + java

arturcampos 1 Feb 18, 2022
A template and introduction for the first kafka stream application. The readme file contains all the required commands to run the Kafka cluster from Scrach

Kafka Streams Template Maven Project This project will be used to create the followings: A Kafka Producer Application that will start producing random

null 2 Jan 10, 2022
Demo project for Kafka Ignite streamer, Kafka as source and Ignite cache as sink

ignite-kafka-streamer **Description : Demo project for Kafka Ignite streamer, Kafka as source and Ignite cache as sink Step-1) Run both Zookeeper and

null 1 Feb 1, 2022
MC Protocol specification of the current minecraft release / snapshot. Most useful when developing with ProtocolLib.

Snapshot 1.19.1-pre4 (1.19.1), Protocol 97 (1073741921), Release Protocol: 760 Handshaking (Server -> Client) Handshaking (Client -> Server) 0x00 - Cl

Pasqual Koschmieder 9 Dec 12, 2022
Firehose is an extensible, no-code, and cloud-native service to load real-time streaming data from Kafka to data stores, data lakes, and analytical storage systems.

Firehose - Firehose is an extensible, no-code, and cloud-native service to load real-time streaming data from Kafka to data stores, data lakes, and analytical storage systems.

Open DataOps Foundation 279 Dec 22, 2022
Fast and reliable message broker built on top of Kafka.

Hermes Hermes is an asynchronous message broker built on top of Kafka. We provide reliable, fault tolerant REST interface for message publishing and a

Allegro Tech 742 Jan 3, 2023
Dataflow template which read data from Kafka (Support SSL), transform, and outputs the resulting records to BigQuery

Kafka to BigQuery Dataflow Template The pipeline template read data from Kafka (Support SSL), transform the data and outputs the resulting records to

DoiT International 12 Jun 1, 2021
Output Keycloak Events and Admin Events to a Kafka topic.

keycloak-kafka-eventlistener Output Keycloak Events and Admin Events to a Kafka topic. Based on Keycloak 15.0.2+ / RH-SSO 7.5.0+ How to use the plugin

Dwayne Du 4 Oct 10, 2022
CookieClient is a utility client for anarchy servers

CookieClient CookieClient is a utility client for anarchy servers. Its a forge mod so you need forge to run it. Currently supported versions: 1.12.2 D

null 82 Dec 4, 2022
A distributed event bus that implements a RESTful API abstraction on top of Kafka-like queues

Nakadi Event Broker Nakadi is a distributed event bus broker that implements a RESTful API abstraction on top of Kafka-like queues, which can be used

Zalando SE 866 Dec 21, 2022