Kryptonite is a turn-key ready transformation (SMT) for Apache Kafka® Connect to do field-level 🔒 encryption/decryption 🔓 of records. It's an UNOFFICIAL community project.

Overview

Kryptonite - An SMT for Kafka Connect

Donate

Kryptonite is a turn-key ready transformation (SMT) for Apache Kafka® to do field-level encryption/decryption of records with or without schema in data integration scenarios based on Kafka Connect. It uses authenticated encryption with associated data (AEAD) and in particular applies AES in GCM mode.

tl;dr

Data Records without Schema

The following fictional data record value without schema - represented in JSON-encoded format - is used to illustrate a simple encrypt/decrypt scenario:

{
  "id": "1234567890",
  "myString": "some foo bla text",
  "myInt": 42,
  "myBoolean": true,
  "mySubDoc1": {"myString":"hello json"},
  "myArray1": ["str_1","str_2","...","str_N"],
  "mySubDoc2": {"k1":9,"k2":8,"k3":7}
}

Encryption of selected fields

Let's assume the fields "myString","myArray1" and "mySubDoc2" of the above data record should get encrypted, the CipherField SMT can be configured as follows:

{
  //...
  "transforms":"cipher",
  "transforms.cipher.type":"com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
  "transforms.cipher.cipher_mode": "ENCRYPT",
  "transforms.cipher.cipher_data_keys": "[{\"identifier\":\"my-demo-secret-key-123\",\"material\":\"0bpRAigAvP9fTTFw43goyg==\"}]", //key materials of utmost secrecy!
  "transforms.cipher.cipher_data_key_identifier": "my-demo-secret-key-123",
  "transforms.cipher.field_config": "[{\"name\":\"myString\"},{\"name\":\"myArray1\"},{\"name\":\"mySubDoc2\"}]",
  "transforms.cipher.field_mode": "OBJECT",
  //...
}

The result after applying this SMT is a record in which all the fields specified in the field_config parameter are encrypted using the secret key specified by its id with the cipher_data_key_identifier parameter. Currently, the secret keys to work with have to be configured using the parameter cipher_data_keys. Apparently, the configured key materials have to be treated with utmost secrecy, for leaking any of the secret keys renders encryption useless. The recommended way of doing this for now is to indirectly reference secret key materials by externalizing them into a separate properties file. Read a few details about this here.

Since the configuration parameter field_mode is set to 'OBJECT', complex field types are processed as a whole instead of element-wise.

Below is an exemplary JSON-encoded record after the encryption:

{
  "id": "1234567890",
  "myString": "M007MIScg8F0A/cAddWbayvUPObjxuGFxisu5MUckDhBss6fo3gMWSsR4xOLPEfs4toSDDCxa7E=",
  "myInt": 42,
  "myBoolean": true,
  "mySubDoc1": {"myString":"hello json"},
  "myArray1": "UuEKnrv91bLImQvKqXTET7RTP93XeLfNRhzJaXVc6OGA4E+mbvGFs/q6WEFCAFy9wklJE5EPXJ+P85nTBCiVrTkU+TR+kUWB9zNplmOL70sENwwwsWux",
  "mySubDoc2": "fLAnBod5U8eS+LVNEm3vDJ1m32/HM170ASgJLKdPF78qDxcsiWj+zOkvZBsk2g44ZWHiSDy3JrI1btmUQhJc4OTnmqIPB1qAADqKhJztvyfcffOfM+y0ISsNk4+V6k0XHBdaT1tJXqLTsyoQfWmSZsnwpM4WARo5/cQWdAwwsWux"
}

NOTE: Encrypted fields are always represented as Base64-encoded strings which contain both, the ciphertext of the fields' original values and authenticated but unencrypted(!) meta-data. If you want to learn about a few more details look here.

Decryption of selected fields

Provided that the secret key material used to encrypt the original data record is made available to a specific sink connector, the CipherField SMT can be configured to decrypt the data like so:

{
  //...
  "transforms":"cipher",
  "transforms.cipher.type":"com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
  "transforms.cipher.cipher_mode": "DECRYPT",
  "transforms.cipher.cipher_data_keys": "[{\"identifier\":\"my-demo-secret-key-123\",\"material\":\"0bpRAigAvP9fTTFw43goyg==\"}]", //key materials of utmost secrecy!
  "transforms.cipher.cipher_data_key_identifier": "my-demo-secret-key-123",
  "transforms.cipher.field_config": "[{\"name\":\"myString\"},{\"name\":\"myArray1\"},{\"name\":\"mySubDoc2\"}]",
  "transforms.cipher.field_mode": "OBJECT",
  //...
}

The result after applying this SMT is a record in which all the fields specified in the field_config parameter are decrypted using the secret key id that is specified and was used to encrypt the original data. Apparently, this can work if and only if the secret key id has the correct key material configured using the parameter cipher_data_keys.

Below is an exemplary JSON-encoded record after the decryption, which is equal to the original record:

{
  "id": "1234567890",
  "myString": "some foo bla text",
  "myInt": 42,
  "myBoolean": true,
  "mySubDoc1": {"myString":"hello json"},
  "myArray1": ["str_1","str_2","...","str_N"],
  "mySubDoc2": {"k1":9,"k2":8,"k3":7}
}

Data Records with Schema

The following example is based on an Avro value record and used to illustrate a simple encrypt/decrypt scenario for data records with schema. The schema could be defined as:

{
    "type": "record", "fields": [
        { "name": "id", "type": "string" },
        { "name": "myString", "type": "string" },
        { "name": "myInt", "type": "int" },
        { "name": "myBoolean", "type": "boolean" },
        { "name": "mySubDoc1", "type": "record",
            "fields": [
                { "name": "myString", "type": "string" }
            ]
        },
        { "name": "myArray1", "type": { "type": "array", "items": "string"}},
        { "name": "mySubDoc2", "type": { "type": "map", "values": "int"}}
    ]
}

The data of one such fictional record - represented by its Struct.toString() output - might look as:

Struct{
  id=1234567890,
  myString=some foo bla text,
  myInt=42,
  myBoolean=true,
  mySubDoc1=Struct{myString=hello json},
  myArray1=[str_1, str_2, ..., str_N],
  mySubDoc2={k1=9, k2=8, k3=7}
}

Encryption of selected fields

Let's assume the fields "myString","myArray1" and "mySubDoc2" of the above data record should get encrypted, the CipherField SMT can be configured as follows:

{
  //...
  "transforms":"cipher",
  "transforms.cipher.type":"com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
  "transforms.cipher.cipher_mode": "ENCRYPT",
  "transforms.cipher.cipher_data_keys": "[{\"identifier\":\"my-demo-secret-key-123\",\"material\":\"0bpRAigAvP9fTTFw43goyg==\"}]", //key materials of utmost secrecy!
  "transforms.cipher.cipher_data_key_identifier": "my-demo-secret-key-123",
  "transforms.cipher.field_config": "[{\"name\":\"myString\"},{\"name\":\"myArray1\"},{\"name\":\"mySubDoc2\"}]",
  "transforms.cipher.field_mode": "OBJECT",
  //...
}

The result after applying this SMT is a record in which all the fields specified in the field_config parameter are encrypted using the secret key specified by its id with the cipher_data_key_identifier parameter. Currently, all available secret keys have to be directly configured using the parameter cipher_data_keys. Apparently, the configured key materials have to be treated with utmost secrecy, for leaking any of the secret keys renders encryption useless. The recommended way of doing this for now is to indirectly reference secret key materials by externalizing them into a separate properties file. Read a few details about this here.

Since the configuration parameter field_mode is set to 'OBJECT', complex field types are processed as a whole instead of element-wise.

Below is an exemplary Struct.toString() output of the record after the encryption:

Struct{
  id=1234567890,
  myString=MwpKn9k5V4prVVGvAZdm6iOp8GnVUR7zyT+Ljb+bhcrFaGEx9xSNOpbZaJZ4YeBsJAj7DDCxa7E=,
  myInt=42,
  myBoolean=true,
  mySubDoc1=Struct{myString=hello json},
  myArray1=Ujlij/mbI48akEIZ08q363zOfV+OMJ+ZFewZEMBiaCnk7NuZZH+mfw6HGobtRzvxeavRhTL3lKI1jYPz0CYl7PqS7DJOJtJ1ccKDa5FLAgP0BQwwsWux,
  mySubDoc2=fJxvxo1LX1ceg2/Ba4+vq2NlgyJNiWGZhjWh6rkHQzuG+C7I8lNW8ECLxqJkNhuYuMMlZjK51gAZfID4HEWcMPz026HexzurptZdgkM1fqJMTMIryDKVlAicXc8phZ7gELZCepQWE0XKmQg0UBXr924V46x9I9QwaWUAdgwwsWux
}

NOTE 1: Encrypted fields are always represented as Base64-encoded strings which contain both, the ciphertext of the fields' original values and authenticated meta-data (unencrypted!) about the field in question. If you want to learn about a few more details look here.

NOTE 2: Obviously, in order to support this the original schema of the data record is automatically redacted such that any encrypted fields can be stored as strings, even though the original data types for the fields in question were different ones.

Decryption of selected fields

Provided that the secret key material used to encrypt the original data record is made available to a specific sink connector, the CipherField SMT can be configured to decrypt the data like so:

{
  //...
  "transforms":"cipher",
  "transforms.cipher.type":"com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
  "transforms.cipher.cipher_mode": "DECRYPT",
  "transforms.cipher.cipher_data_keys": "[{\"identifier\":\"my-demo-secret-key-123\",\"material\":\"0bpRAigAvP9fTTFw43goyg==\"}]", //key materials of utmost secrecy!
  "transforms.cipher.cipher_data_key_identifier": "my-demo-secret-key-123",
  "transforms.cipher.field_config": "[{\"name\":\"myString\",\"schema\": {\"type\": \"STRING\"}},{\"name\":\"myArray1\",\"schema\": {\"type\": \"ARRAY\",\"valueSchema\": {\"type\": \"STRING\"}}},{\"name\":\"mySubDoc2\",\"schema\": { \"type\": \"MAP\", \"keySchema\": { \"type\": \"STRING\" }, \"valueSchema\": { \"type\": \"INT32\"}}}]",
  "transforms.cipher.field_mode": "OBJECT",
  //...
}

Take notice of the extended field_config parameter settings. For decryption of schema-aware data, the SMT configuration expects that for each field to decrypt the original schema information is explicitly specified. This allows to redact the encrypted record's schema towards a compatible decrypted record's schema upfront, such that the resulting plaintext field values can be stored in accordance with their original data types.

The result after applying this SMT is a record in which all the fields specified in the field_config parameter are decrypted using the secret key id that is specified and was used to encrypt the original data. Apparently, this can work if and only if the secret key id has the correct key material configured using the parameter cipher_data_keys.

Below is the decrypted data - represented by its Struct.toString() output - which is equal to the original record:

Struct{
  id=1234567890,
  myString=some foo bla text,
  myInt=42,
  myBoolean=true,
  mySubDoc1=Struct{myString=hello json},
  myArray1=[str_1, str_2, ..., str_N],
  mySubDoc2={k1=9, k2=8, k3=7}
}

Configuration Parameters

Name Description Type Default Valid Values Importance
cipher_data_key_identifier secret key identifier to be used as default data encryption key for all fields which don't refer to a field-specific secret key identifier string non-empty string high
cipher_data_keys JSON array with data key objects specifying the key identifiers and base64 encoded key bytes used for encryption / decryption password JSON array holding at least one valid data key config object, e.g. [{"identifier":"my-key-id-1234-abcd","material":"dmVyeS1zZWNyZXQta2V5JA=="}] high
cipher_mode defines whether the data should get encrypted or decrypted string ENCRYPT or DECRYPT high
field_config JSON array with field config objects specifying which fields together with their settings should get either encrypted / decrypted (nested field names are expected to be separated by '.' per default, or by a custom 'path_delimiter' config string JSON array holding at least one valid field config object, e.g. [{"name": "my-field-abc"},{"name": "my-nested.field-xyz"}] high
field_mode defines how to process complex field types (maps, lists, structs), either as full objects or element-wise string ELEMENT ELEMENT or OBJECT medium
cipher_algorithm cipher algorithm used for data encryption (currently supports only one AEAD cipher: AES/GCM/NoPadding) string AES/GCM/NoPadding AES/GCM/NoPadding low
cipher_text_encoding defines the encoding of the resulting ciphertext bytes (currently only supports 'base64') string base64 base64 low
path_delimiter path delimiter used as field name separator when referring to nested fields in the input record string . non-empty string low

Externalize configuration parameters

The problem with directly specifying configuration parameters which contain sensitive data, such as secret key materials, is that they are exposed via Kafka Connect's REST API. This means for connect clusters that are shared among teams the configured secret key materials would leak, which is of course unacceptable. The way to deal with this for now, is to indirectly reference such configuration parameters from external property files.

Below is a quick example of how such a configuration would look like:

  1. Before you can make use of configuration parameters from external sources you have to customize your Kafka Connect worker configuration by adding the following two settings:
connect.config.providers=file
connect.config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
  1. Then you create the external properties file e.g. classified.properties which contains the secret key materials. This file needs to be available on all your Kafka Connect workers which you want to run Kryptonite on. Let's pretend the file is located at path /secrets/kryptonite/classified.properties on your worker nodes:
cipher_data_keys=[{"identifier":"my-demo-secret-key-123","material":"0bpRAigAvP9fTTFw43goyg=="}]
  1. Finally, you simply reference this file and the corresponding key of the property therein, from your SMT configuration like so:
{
  //...
  "transforms":"cipher",
  "transforms.cipher.type":"com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
  "transforms.cipher.cipher_mode": "ENCRYPT",
  "transforms.cipher.cipher_data_keys": "${file:/secrets/kryptonite/classified.properties:cipher_data_keys}",
  "transforms.cipher.cipher_data_key_identifier": "my-demo-secret-key-123",
  "transforms.cipher.field_config": "[{\"name\":\"myString\"},{\"name\":\"myArray1\"},{\"name\":\"mySubDoc2\"}]",
  "transforms.cipher.field_mode": "OBJECT",
  //...
}

In case you want to learn more about configuration parameter externalization there is e.g. this nice blog post from the Debezium team showing how to externalize username and password settings using a docker-compose example.

Build, installation / deployment

Either you can build this project from sources via Maven or you can download a pre-built, self-contained package of Kryptonite kafka-connect-transform-kryptonite-0.1.0.jar.

In order to deploy it you simply put the jar into a 'plugin path' that is configured to be scanned by your Kafka Connect worker nodes.

After that, configure Kryptonite as transformation for any of your source / sink connectors, sit back and relax! Happy 'binge watching' plenty of ciphertexts ;-)

Cipher algorithm specifics

Kryptonite currently provides a single cipher algorithm, namely, AES in GCM mode. It offers so-called authenticated encryption with associated data (AEAD). This basically means that besides the ciphertext, an encrypted field additionally contains unencrypted but authenticated meta-data. In order to keep the storage overhead per encrypted field down to a minimum, the SMT implementation currently only incorporates a version identifier for Kryptonite itself (k1) together with a short identifier representing the algorithm (01 for AES/GCM/NoPadding) which was used to encrypt the field in question. Future versions may support additional algorithms or might benefit from further meta-data, which is why the meta-data handling should be considered to undergo changes.

By design, every application of Kryptonite on a specific record field results in different ciphertexts for one and the same plaintext. This is in general not only desirable but very important to make attacks harder. However, in the context of Kafka Connect records this has an unfavorable consequence for source connectors. Applying the SMT on a source record's key would result in a 'partition mix-up' because records with the same original plaintext key would end up in different topic partitions. In other words, do NOT(!) use Kryptonite for source record keys at the moment. There are plans in place to do away with this restriction and extend Kryptonite with a deterministic mode. This could then safely support the encryption of record keys while at the same time keep topic partitioning and record ordering intact.

Donate

If you like this project and want to support its further development and maintenance we are happy about your PayPal donation.

License Information

This project is licensed according to Apache License Version 2.0

Copyright (c) 2021. Hans-Peter Grahsl ([email protected])

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
You might also like...

A distributed event bus that implements a RESTful API abstraction on top of Kafka-like queues

A distributed event bus that implements a RESTful API abstraction on top of Kafka-like queues

Nakadi Event Broker Nakadi is a distributed event bus broker that implements a RESTful API abstraction on top of Kafka-like queues, which can be used

Dec 21, 2022

Fast and reliable message broker built on top of Kafka.

Hermes Hermes is an asynchronous message broker built on top of Kafka. We provide reliable, fault tolerant REST interface for message publishing and a

Jan 3, 2023

Publish Kafka messages from HTTP

Kafka Bridge Publish Kafka messages from HTTP Configuration Example configuration for commonly used user + password authentication: kafka-bridge: ka

Nov 9, 2021

Implementação de teste com Kafka

Implementação de teste com Kafka

TesteKafka01 Implementação de teste com Kafka Projeto criado para estudo e testes com Kafka Recursos que estarão disponiveis: -Envio de msg -Recebe Ms

Sep 17, 2021

Firehose is an extensible, no-code, and cloud-native service to load real-time streaming data from Kafka to data stores, data lakes, and analytical storage systems.

Firehose - Firehose is an extensible, no-code, and cloud-native service to load real-time streaming data from Kafka to data stores, data lakes, and analytical storage systems.

Dec 22, 2022

MemQ is a new PubSub system that augments Kafka

MemQ is a new PubSub system that augments Kafka

MemQ: An efficient, scalable cloud native PubSub system MemQ is a new PubSub system that augments Kafka at Pinterest. It uses a decoupled storage and

Dec 30, 2022

KC4Streams - a simple Java library that provides utility classes and standard implementations for most of the Kafka Streams pluggable interfaces

KC4Streams (which stands for Kafka Commons for Streams) is a simple Java library that provides utility classes and standard implementations for most of the Kafka Streams pluggable interfaces.

Mar 2, 2022

Output Keycloak Events and Admin Events to a Kafka topic.

keycloak-kafka-eventlistener Output Keycloak Events and Admin Events to a Kafka topic. Based on Keycloak 15.0.2+ / RH-SSO 7.5.0+ How to use the plugin

Oct 10, 2022

Aula sobre segurança no kafka usando SSL

Aula sobre segurança no kafka usando SSL

Kafka4Devs - Segurança no Kafka com SSL Você sabe o que acontece por debaixo dos panos de uma aplicação segura? Sabe como empresas grandes que utiliza

Feb 28, 2022
Comments
  • Tink integration

    Tink integration

    • restructure / rename mvn module and folders and bump version
    • keyset handling
      • change key source config and handling for remote / cloud KMS
      • add prefetch support to KeyMaterialResolver and AzureKeyVault
      • adapt documentation to reflect latest changes w.r.t. tink-integration
    • further improvements
      • tink keyset is used for implicit key rotation support
      • no need for key id in cipherfield smt decrypt config (part of payload meta data)
      • azure key vault secrets based on tink keyset json spec
    • fix wrong config type for kms_config (string -> password)
    • various dependency updates
    • typos and other little corrections
    • mostly doc updates and a few minor changes
    • major overhaul for google tink integration
      • rewrite config based on tink keysets in JSON format
      • add tink's AES GCM besides the 'native' JCE impl
      • add deterministic mode using tink's AEAD AES SIV primitive
      • improve functional test of CipherField SMT
      • further code repackaging and class renaming
    opened by hpgrahsl 0
  • some maintenance and small fixes

    some maintenance and small fixes

    • add util smt for json string <-> map conversion for 'compatibility' with some other connectors
    • fix null handling of struct type values with element mode for schema-aware records
    • respects optional settings in accordance with original schema during schema redaction
    • adapt element mode processing in schemaless record handler
    • modify pom to minimize jar and exclude dsa/rsa
    opened by hpgrahsl 0
  • preliminary support to retrieve key material from external KMS

    preliminary support to retrieve key material from external KMS

    • add / adapt config settings for externalizing secret keys
    • refactor code base for better extensibility regarding crypto algorithms and key management
    • integrate with azure key vault for retrieving keys as secrets from the cloud during SMT configuration
    • update documentation accordingly
    opened by hpgrahsl 0
Releases(v0.3.0)
  • v0.3.0(Dec 8, 2022)

    1) maintenance:

    • updates for all major dependencies across modules
    • renaming of selected packages
    • add/adapt logging
    • commonly used classes are pre-registered for kryo serialization

    2) features:

    • preliminary ksqlDB UDF support 🚀

    3) documentation:

    • adapt readme structure due to splitting the documentation into multiple files residing in the respective modules
    • write basic documentation for stream processing support based on ksqlDB UDFs
    • typos and other little corrections
    • update download links to new versions of the pre-built jars
    Source code(tar.gz)
    Source code(zip)
  • v0.2.0(Sep 19, 2022)

    1) maintenance:

    • restructure / rename mvn module and folders and bump version
    • various dependency updates
    • further code repackaging and class renaming

    2) bug fixes:

    • fix wrong config type for kms_config (string -> password)

    3) features:

    • change key source config and handling for remote / cloud KMS
    • add prefetch support to KeyMaterialResolver and AzureKeyVault
    • tink keyset is used for implicit key rotation support
    • no need for key id in cipherfield smt decrypt config (part of payload meta data)
    • azure key vault secrets based on tink keyset json spec
    • major overhaul for google tink integration
    • rewrite config based on tink keysets in JSON format
    • add tink's AES GCM besides the 'native' JCE impl
    • add deterministic mode using tink's AEAD AES SIV primitive
    • improve functional test of CipherField SMT

    4) documentation:

    • typos and other little corrections
    • mostly doc updates and a few minor changes
    • adapt documentation to reflect changes w.r.t. tink integration
    Source code(tar.gz)
    Source code(zip)
  • v0.1.1(Feb 22, 2022)

    Patch release of Kryptonite

    NOTE: this is the last release before a larger code-base refactoring and major improvements including breaking changes

    1) bugs fixed:

    • null handling of struct type values with element mode for schema-aware records
    • respects optional settings in accordance with original schema during schema redaction
    • adapt element mode processing in schemaless record handler

    2) initial poc support for externalizing keys

    • add / adapt config settings for externalizing secret keys
    • integrate with azure key vault for retrieving keys as secrets from the cloud during SMT configuration

    3) documentation updates

    Source code(tar.gz)
    Source code(zip)
  • v0.1.0(Mar 1, 2021)

    Initial release of Kryptonite

    • it's a turn-key ready transformation (SMT) for Apache Kafka® Connect
    • allows for configurable field-level 🔒 encryption/decryption 🔓 of connect records
    • supports schemaless and schema-aware connect records
    Source code(tar.gz)
    Source code(zip)
Owner
Hans-Peter Grahsl
SW Engineer, Trainer, Consultant, Associate Lecturer & Speaker - compensating with sportive activities. Proud husband, lion-hearted dad + Nespresso aficionado.
Hans-Peter Grahsl
Arduino-Bluetooth-Connect - This is an Arduino Bluetooth Connect App that works as a Serial Monitor.

Arduino-Bluetooth-Connect An app that works as a Serial Monitor. This app can connect to Arduino through Bluetooth HC-05 Module. Commands can be sent

Aritra Mandal 2 Aug 6, 2022
Dataflow template which read data from Kafka (Support SSL), transform, and outputs the resulting records to BigQuery

Kafka to BigQuery Dataflow Template The pipeline template read data from Kafka (Support SSL), transform the data and outputs the resulting records to

DoiT International 12 Jun 1, 2021
A command line client for Kafka Connect

kcctl -- A CLI for Apache Kafka Connect This project is a command-line client for Kafka Connect. Relying on the idioms and semantics of kubectl, it al

Gunnar Morling 274 Dec 19, 2022
A command line client for Kafka Connect

?? kcctl – Your Cuddly CLI for Apache Kafka Connect This project is a command-line client for Kafka Connect. Relying on the idioms and semantics of ku

kcctl 274 Dec 19, 2022
Demo project for Kafka Ignite streamer, Kafka as source and Ignite cache as sink

ignite-kafka-streamer **Description : Demo project for Kafka Ignite streamer, Kafka as source and Ignite cache as sink Step-1) Run both Zookeeper and

null 1 Feb 1, 2022
A template and introduction for the first kafka stream application. The readme file contains all the required commands to run the Kafka cluster from Scrach

Kafka Streams Template Maven Project This project will be used to create the followings: A Kafka Producer Application that will start producing random

null 2 Jan 10, 2022
Kafka example - a simple producer and consumer for kafka using spring boot + java

Kafka example - a simple producer and consumer for kafka using spring boot + java

arturcampos 1 Feb 18, 2022
SeaTunnel is a distributed, high-performance data integration platform for the synchronization and transformation of massive data (offline & real-time).

SeaTunnel SeaTunnel was formerly named Waterdrop , and renamed SeaTunnel since October 12, 2021. SeaTunnel is a very easy-to-use ultra-high-performanc

The Apache Software Foundation 4.4k Jan 2, 2023
Mirror of Apache Kafka

Apache Kafka See our web site for details on the project. You need to have Java installed. We build and test Apache Kafka with Java 8, 11 and 15. We s

The Apache Software Foundation 23.9k Jan 5, 2023
FLiP: StreamNative: Cloud-Native: Streaming Analytics Using Apache Flink SQL on Apache Pulsar

StreamingAnalyticsUsingFlinkSQL FLiP: StreamNative: Cloud-Native: Streaming Analytics Using Apache Flink SQL on Apache Pulsar Running on NVIDIA XAVIER

Timothy Spann 5 Dec 19, 2021