Demo project for Kafka Ignite streamer, Kafka as source and Ignite cache as sink

Overview

ignite-kafka-streamer

**Description :

Demo project for Kafka Ignite streamer, Kafka as source and Ignite cache as sink

Step-1) Run both Zookeeper and Kafka using the docker-compose file provided in the root of the project. (go to that directory and run following command)

ignite-kafka-streamer-demo % docker compose up

verify both verify both Zookeeper and Kafka running by executing following command.

ignite-kafka-streamer-demo % docker compose ps

NAME COMMAND SERVICE STATUS PORTS

ignite-kafka-streamer-demo-kafka-1 "/opt/bitnami/script…" kafka running 0.0.0.0:9092->9092/tcp

ignite-kafka-streamer-demo-zookeeper-1 "/opt/bitnami/script…" zookeeper running 0.0.0.0:2181->2181/tcp

Step-2) Create the topic and verify the topic exists by running following docker commands.

**Create the topic **

docker exec ignite-kafka-streamer-demo-kafka-1 kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4

you should see following message : Created topic test-topic.

verify the topic

ignite-kafka-streamer-demo % docker exec -it ignite-kafka-streamer-demo-kafka-1 kafka-topics.sh --describe --topic test-topic --bootstrap-server localhost:9092

Topic: test-topic TopicId: jGDuAm3JS5es7l-tO1mwzQ PartitionCount: 4 ReplicationFactor: 1 Configs: segment.bytes=1073741824

Topic: test-topic	Partition: 0	Leader: 1	Replicas: 1	Isr: 1

Topic: test-topic	Partition: 1	Leader: 1	Replicas: 1	Isr: 1

Topic: test-topic	Partition: 2	Leader: 1	Replicas: 1	Isr: 1

Topic: test-topic	Partition: 3	Leader: 1	Replicas: 1	Isr: 1

Step-3) Bring up Ignite Server with the config provided under the cluster_config directory:

[23:05:35] Ignite node started OK (id=6d27bba6)

[23:05:35] Topology snapshot [ver=1, locNode=6d27bba6, servers=1, clients=0, state=ACTIVE, CPUs=10, offheap=6.4GB, heap=7.1GB]

[23:05:35] ^-- Baseline [id=0, size=1, online=1, offline=0]

Step-4) Verify the partitioned cache got created from visor

visor> cache

Time of the snapshot: 2022-01-31 23:22:08

+==============================================================================================================================================================+

| Name(@) | Mode | Nodes | Total entries (Heap / Off-heap) | Primary entries (Heap / Off-heap) | Hits | Misses | Reads | Writes |

+==============================================================================================================================================================+

| mydemocache(@c0) | PARTITIONED | 2 | 0 (0 / 0) | min: 0 (0 / 0) | min: 0 | min: 0 | min: 0 | min: 0 |

| | | | | avg: 0.00 (0.00 / 0.00) | avg: 0.00 | avg: 0.00 | avg: 0.00 | avg: 0.00 |

| | | | | max: 0 (0 / 0) | max: 0 | max: 0 | max: 0 | max: 0 |

+--------------------------------------------------------------------------------------------------------------------------------------------------------------+

Step-5) Make sure the Ignite Kafka Server application started which joined as a member

Run the MyKafkaStreamer.java

23:06:02] Topology snapshot [ver=2, locNode=60fed73e, servers=2, clients=0, state=ACTIVE, CPUs=10, offheap=13.0GB, heap=14.0GB]

[23:06:02] ^-- Baseline [id=0, size=2, online=2, offline=0] [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000

Ignite server recognized additional node as well (one started in step1)

[23:05:35] Ignite node started OK (id=6d27bba6)

[23:05:35] Topology snapshot [ver=1, locNode=6d27bba6, servers=1, clients=0, state=ACTIVE, CPUs=10, offheap=6.4GB, heap=7.1GB]

[23:05:35] ^-- Baseline [id=0, size=1, online=1, offline=0]

....

....

[23:15:40] Joining node doesn't have stored group keys [node=5e52b3ae-91f4-40aa-a0b8-361a8d32a5fd]

[23:15:41] Topology snapshot [ver=4, locNode=6d27bba6, servers=2, clients=0, state=ACTIVE, CPUs=20, offheap=13.0GB, heap=14.0GB]

[23:15:41] ^-- Baseline [id=0, size=2, online=2, offline=0]

You should see Following the Kafka consumer config in the startup log of MyKafkaStreamer. ..... [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [localhost:9092] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = consumer-gpdev1-1 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = gpdev1 group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.connect.timeout.ms = null sasl.login.read.timeout.ms = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.login.retry.backoff.max.ms = 10000 sasl.login.retry.backoff.ms = 100 sasl.mechanism = GSSAPI sasl.oauthbearer.clock.skew.seconds = 30 sasl.oauthbearer.expected.audience = null sasl.oauthbearer.expected.issuer = null sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000 sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000 sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100 sasl.oauthbearer.jwks.endpoint.url = null sasl.oauthbearer.scope.claim.name = scope sasl.oauthbearer.sub.claim.name = sub sasl.oauthbearer.token.endpoint.url = null security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 45000 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.2 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0

Step-6) Ignite Streamer app with id gpdev1 joined as a consumer (logs from KAFKA console)

: Stabilized group gpdev1 generation 2 (__consumer_offsets-23) with 4 members (kafka.coordinator.group.GroupCoordinator)

ignite-kafka-streamer-demo-kafka-1 | [2022-02-01 04:35:55,636] INFO [GroupCoordinator 1]: Assignment received from leader consumer-gpdev1-3-f1d6b278-f016-4e99-a204-5db4c401d522 for group gpdev1 for generation 2. The group has 4 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)

ignite-kafka-streamer-demo-kafka-1 | [2022-02-01 04:36:51,342] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group gpdev1 in Stable state. Created a new member id consumer-gpdev1-3-5de0d9d2-0997-4617-b4df-45ad40ba654b and request the

Step-7 : Start a kafka console producer for topic topic "test-topic" and published some messages as follows :

gourabpattanayak@Gourabs-MBP ignite-kafka-streamer-demo % docker exec -it ignite-kafka-streamer-demo-kafka-1 kafka-console-producer.sh --topic test-topic --property "parse.key=true" --property "key.separator=:" --bootstrap-server localhost:9092

demokey1:demovalue1

Step-8 : I can see the Ignite Kafka Streamer got the message and the MyStreamExtractor class also able to read the ConsumerRecord and able to print the message as follows:

###############ConsumerRecord topic name:test-topic key :demokey1 and value :demovalue1

Step-9 At this point expectation is the cache "mydemocache" should have entries for the key/value pairs published to Kafka Topic , so verify from the visor tool.

visor> cache

Time of the snapshot: 2022-02-01 01:04:05

+==============================================================================================================================================================+

| Name(@) | Mode | Nodes | Total entries (Heap / Off-heap) | Primary entries (Heap / Off-heap) | Hits | Misses | Reads | Writes |

+==============================================================================================================================================================+

| mydemocache(@c0) | PARTITIONED | 2 | 1 (0 / 1) | min: 0 (0 / 0) | min: 0 | min: 0 | min: 0 | min: 0 |

| | | | | avg: 0.50 (0.00 / 0.50) | avg: 0.00 | avg: 0.00 | avg: 0.00 | avg: 0.00 |

| | | | | max: 1 (0 / 1) | max: 0 | max: 0 | max: 0 | max: 0 |

+--------------------------------------------------------------------------------------------------------------------------------------------------------------+

You might also like...

Output Keycloak Events and Admin Events to a Kafka topic.

keycloak-kafka-eventlistener Output Keycloak Events and Admin Events to a Kafka topic. Based on Keycloak 15.0.2+ / RH-SSO 7.5.0+ How to use the plugin

Oct 10, 2022

A distributed event bus that implements a RESTful API abstraction on top of Kafka-like queues

A distributed event bus that implements a RESTful API abstraction on top of Kafka-like queues

Nakadi Event Broker Nakadi is a distributed event bus broker that implements a RESTful API abstraction on top of Kafka-like queues, which can be used

Dec 21, 2022

Mirror of Apache Kafka

Apache Kafka See our web site for details on the project. You need to have Java installed. We build and test Apache Kafka with Java 8, 11 and 15. We s

Jan 5, 2023

A command line client for Kafka Connect

kcctl -- A CLI for Apache Kafka Connect This project is a command-line client for Kafka Connect. Relying on the idioms and semantics of kubectl, it al

Dec 19, 2022

A command line client for Kafka Connect

A command line client for Kafka Connect

🧸 kcctl – Your Cuddly CLI for Apache Kafka Connect This project is a command-line client for Kafka Connect. Relying on the idioms and semantics of ku

Dec 19, 2022

Publish Kafka messages from HTTP

Kafka Bridge Publish Kafka messages from HTTP Configuration Example configuration for commonly used user + password authentication: kafka-bridge: ka

Nov 9, 2021

Implementação de teste com Kafka

Implementação de teste com Kafka

TesteKafka01 Implementação de teste com Kafka Projeto criado para estudo e testes com Kafka Recursos que estarão disponiveis: -Envio de msg -Recebe Ms

Sep 17, 2021

MemQ is a new PubSub system that augments Kafka

MemQ is a new PubSub system that augments Kafka

MemQ: An efficient, scalable cloud native PubSub system MemQ is a new PubSub system that augments Kafka at Pinterest. It uses a decoupled storage and

Dec 30, 2022

Aula sobre segurança no kafka usando SSL

Aula sobre segurança no kafka usando SSL

Kafka4Devs - Segurança no Kafka com SSL Você sabe o que acontece por debaixo dos panos de uma aplicação segura? Sabe como empresas grandes que utiliza

Feb 28, 2022
Owner
null
A template and introduction for the first kafka stream application. The readme file contains all the required commands to run the Kafka cluster from Scrach

Kafka Streams Template Maven Project This project will be used to create the followings: A Kafka Producer Application that will start producing random

null 2 Jan 10, 2022
Kafka example - a simple producer and consumer for kafka using spring boot + java

Kafka example - a simple producer and consumer for kafka using spring boot + java

arturcampos 1 Feb 18, 2022
Kryptonite is a turn-key ready transformation (SMT) for Apache Kafka® Connect to do field-level 🔒 encryption/decryption 🔓 of records. It's an UNOFFICIAL community project.

Kryptonite - An SMT for Kafka Connect Kryptonite is a turn-key ready transformation (SMT) for Apache Kafka® to do field-level encryption/decryption of

Hans-Peter Grahsl 53 Jan 3, 2023
Flink Demo

flink-demo minimum code just run flink-ds-connector DataStream API usage kafka es jdbc file row string parquet avro avro custom avro flink-sql-connect

hiscat 40 Dec 4, 2022
Demo for schema references feature on the Confluent Schema Registry

Schema references demos This project aims to showcase the schema references feature on Confluent Schema Registry. Two distinct use case are considered

David Araujo 7 Sep 5, 2022
Firehose is an extensible, no-code, and cloud-native service to load real-time streaming data from Kafka to data stores, data lakes, and analytical storage systems.

Firehose - Firehose is an extensible, no-code, and cloud-native service to load real-time streaming data from Kafka to data stores, data lakes, and analytical storage systems.

Open DataOps Foundation 279 Dec 22, 2022
HornetQ is an open source project to build a multi-protocol, embeddable, very high performance, clustered, asynchronous messaging system.

HornetQ If you need information about the HornetQ project please go to http://community.jboss.org/wiki/HornetQ http://www.jboss.org/hornetq/ This file

HornetQ 245 Dec 3, 2022
Fast and reliable message broker built on top of Kafka.

Hermes Hermes is an asynchronous message broker built on top of Kafka. We provide reliable, fault tolerant REST interface for message publishing and a

Allegro Tech 742 Jan 3, 2023
Dataflow template which read data from Kafka (Support SSL), transform, and outputs the resulting records to BigQuery

Kafka to BigQuery Dataflow Template The pipeline template read data from Kafka (Support SSL), transform the data and outputs the resulting records to

DoiT International 12 Jun 1, 2021
KC4Streams - a simple Java library that provides utility classes and standard implementations for most of the Kafka Streams pluggable interfaces

KC4Streams (which stands for Kafka Commons for Streams) is a simple Java library that provides utility classes and standard implementations for most of the Kafka Streams pluggable interfaces.

StreamThoughts 2 Mar 2, 2022