Dataflow template which read data from Kafka (Support SSL), transform, and outputs the resulting records to BigQuery

Overview

Kafka to BigQuery Dataflow Template

The pipeline template read data from Kafka (Support SSL), transform the data and outputs the resulting records to BigQuery

Getting Started

Requirements

  • Java 11
  • Maven
  • The Kafka topic(s) exists
  • Valid JSON messages
  • The BigQuery output table exists.
  • The Kafka brokers are reachable from the Dataflow worker machines.
  • Upload your truststore & keystore, in case the SSL enable, to GCP Storage

Configurations

  • Set Environment Variables
export PROJECT="$(gcloud config get-value project)" (Required)
export TEMPLATE_BUCKET_NAME=gs://<bucket-name> (Required)
export STAGING_LOCATION=${TEMPLATE_BUCKET_NAME}/staging (Required)
export TEMP_LOCATION=${TEMPLATE_BUCKET_NAME}/staging (Required)
export TEMPLATE_LOCATION=${TEMPLATE_BUCKET_NAME}/kafka-to-bq (Required)
export INPUT_TOPIC=<topic_name> (Required)
export OUTPUT_TABLE=<table_name> (Optional, default: "kafka-bq.sample")
export KAFKA_BROKER=<broker_host:port> (Optional, default: localhost:9092)
export ENABLE_SSL=<true/false> (Optional, default: false)
export REGION=${"$(gcloud config get-value compute/region)":-"us-central1"} (Reuired)
  • Set Environment Variables for SSL, just when SSL enable
export KEYSTORE_PASS=<keystore_password> (Required)
export TRUSTSTORE_PASS=<truststore_password> (Required)
export KEYSTORE_PATH=<keystore_path> (Optional, default: "/tmp/kafka.keystore")
export TRUSTSTORE_PATH=<truststore_path> (Optional, default: "/tmp/kafka.truststore")
export KEYSTORE_OBJECT_NAME=<object_name> (Required)
export TRUSTSTORE_OBJECT_NAME=<object_name> (Required)
export SSL_BUCKET_NAME=<bucket_name> (Reuired)

Building Template

This template is a classic template

The template requires the following parameters:

  • project: Your GCP project ID
  • inputTopics: Kafka topic to read the messages.
  • outputTable: BigQuery table to write the results.
  • region: Name of your region.
  • bootstrapServers: Comma separated list of bootstrap servers.
  • stagingLocation
  • gcpTempLocation
  • templateLocation: Location for your template.

The template allows for the user to supply the following optional parameters:

  • isEnableSSL: Enable/Disable SSL connection with Kafka

The template allows for the user to supply the following optional parameters in case of SSL ENABLED:

  • keystorePath: Path to your keystore file

  • truststorePath: Path to you truststore file

  • Build the template

mvn compile exec:java \
     -Dexec.mainClass=com.example.template.KafkaToBigquery \
     -Dexec.args="--runner=DataflowRunner \
                  --project=$PROJECT \
                  --stagingLocation=$STAGING_LOCATION \
                  --gcpTempLocation=$TEMP_LOCATION \
                  --templateLocation=$TEMPLATE_LOCATION \
                  --inputTopic=$INPUT_TOPIC \
                  --outputTable=$OUTPUT_TABLE \
                  --bootstrapServer=$KAFKA_BROKER \
                  --region=$REGION"

Optionally specify window size (10 minutes example):

mvn compile exec:java \
     -Dexec.mainClass=com.example.template.KafkaToBigquery \
     -Dexec.args="--runner=DataflowRunner \
                  --project=$PROJECT \
                  --stagingLocation=$STAGING_LOCATION \
                  --gcpTempLocation=$TEMP_LOCATION \
                  --templateLocation=$TEMPLATE_LOCATION \
                  --inputTopic=$INPUT_TOPIC \
                  --outputTable=$OUTPUT_TABLE \
                  --bootstrapServer=$KAFKA_BROKER \
                  --windowSize=10 \
                  --region=$REGION"

Enable SSL for the template building

--isEnableSSL=$ENABLE_SSL \
--keystorePath=$KEYSTORE_PATH \ 
--truststorePath=$TRUSTSTORE_PATH \
--keystorePassword=$KEYSTORE_PASS \
--truststorePassword=$TRUSTSTORE_PASS \
--keystoreObjName=$KEYSTORE_OBJECT_NAME \
--truststoreObjName=$TRUSTSTORE_OBJECT_NAME \
--bucketName=$SSL_BUCKET_NAME

--keystorePath and  --truststorePath are Optional as describe above

Executing Template

Template can be executed using the following gcloud command.

gcloud dataflow jobs run JOB_NAME --gcs-location ${TEMPLATE_LOCATION}

Next Version

Next version will include the following things:

  • Docker compose file for local dev (Kafka, zookeeper ..)
  • Dead leater
  • More flexebility
  • More configurations
  • Script to generate keystore & truststore
  • Tests

Stay Tune

You might also like...

Mirror of Apache Kafka

Apache Kafka See our web site for details on the project. You need to have Java installed. We build and test Apache Kafka with Java 8, 11 and 15. We s

Jan 5, 2023

A command line client for Kafka Connect

kcctl -- A CLI for Apache Kafka Connect This project is a command-line client for Kafka Connect. Relying on the idioms and semantics of kubectl, it al

Dec 19, 2022

A command line client for Kafka Connect

A command line client for Kafka Connect

🧸 kcctl – Your Cuddly CLI for Apache Kafka Connect This project is a command-line client for Kafka Connect. Relying on the idioms and semantics of ku

Dec 19, 2022

Publish Kafka messages from HTTP

Kafka Bridge Publish Kafka messages from HTTP Configuration Example configuration for commonly used user + password authentication: kafka-bridge: ka

Nov 9, 2021

Implementação de teste com Kafka

Implementação de teste com Kafka

TesteKafka01 Implementação de teste com Kafka Projeto criado para estudo e testes com Kafka Recursos que estarão disponiveis: -Envio de msg -Recebe Ms

Sep 17, 2021

MemQ is a new PubSub system that augments Kafka

MemQ is a new PubSub system that augments Kafka

MemQ: An efficient, scalable cloud native PubSub system MemQ is a new PubSub system that augments Kafka at Pinterest. It uses a decoupled storage and

Dec 30, 2022

SeaTunnel is a distributed, high-performance data integration platform for the synchronization and transformation of massive data (offline & real-time).

SeaTunnel is a distributed, high-performance data integration platform for the synchronization and transformation of massive data (offline & real-time).

SeaTunnel SeaTunnel was formerly named Waterdrop , and renamed SeaTunnel since October 12, 2021. SeaTunnel is a very easy-to-use ultra-high-performanc

Jan 2, 2023

Template for an Apache Flink project.

Minimal Apache Flink Project Template It contains some basic jobs for testing if everything runs smoothly. How to Use This Repository Import this repo

Sep 20, 2022

Open data platform based on flink. Now scaleph is supporting data integration with seatunnel on flink

scaleph The Scaleph project features data integration, develop, job schedule and orchestration and trys to provide one-stop data platform for develope

Jan 3, 2023
Owner
DoiT International
Your co-pilot in the cloud
DoiT International
Aula sobre segurança no kafka usando SSL

Kafka4Devs - Segurança no Kafka com SSL Você sabe o que acontece por debaixo dos panos de uma aplicação segura? Sabe como empresas grandes que utiliza

Rocketseat Experts Club 4 Feb 28, 2022
A template and introduction for the first kafka stream application. The readme file contains all the required commands to run the Kafka cluster from Scrach

Kafka Streams Template Maven Project This project will be used to create the followings: A Kafka Producer Application that will start producing random

null 2 Jan 10, 2022
Kryptonite is a turn-key ready transformation (SMT) for Apache Kafka® Connect to do field-level 🔒 encryption/decryption 🔓 of records. It's an UNOFFICIAL community project.

Kryptonite - An SMT for Kafka Connect Kryptonite is a turn-key ready transformation (SMT) for Apache Kafka® to do field-level encryption/decryption of

Hans-Peter Grahsl 53 Jan 3, 2023
Firehose is an extensible, no-code, and cloud-native service to load real-time streaming data from Kafka to data stores, data lakes, and analytical storage systems.

Firehose - Firehose is an extensible, no-code, and cloud-native service to load real-time streaming data from Kafka to data stores, data lakes, and analytical storage systems.

Open DataOps Foundation 279 Dec 22, 2022
Demo project for Kafka Ignite streamer, Kafka as source and Ignite cache as sink

ignite-kafka-streamer **Description : Demo project for Kafka Ignite streamer, Kafka as source and Ignite cache as sink Step-1) Run both Zookeeper and

null 1 Feb 1, 2022
Kafka example - a simple producer and consumer for kafka using spring boot + java

Kafka example - a simple producer and consumer for kafka using spring boot + java

arturcampos 1 Feb 18, 2022
Fast and reliable message broker built on top of Kafka.

Hermes Hermes is an asynchronous message broker built on top of Kafka. We provide reliable, fault tolerant REST interface for message publishing and a

Allegro Tech 742 Jan 3, 2023
KC4Streams - a simple Java library that provides utility classes and standard implementations for most of the Kafka Streams pluggable interfaces

KC4Streams (which stands for Kafka Commons for Streams) is a simple Java library that provides utility classes and standard implementations for most of the Kafka Streams pluggable interfaces.

StreamThoughts 2 Mar 2, 2022
Output Keycloak Events and Admin Events to a Kafka topic.

keycloak-kafka-eventlistener Output Keycloak Events and Admin Events to a Kafka topic. Based on Keycloak 15.0.2+ / RH-SSO 7.5.0+ How to use the plugin

Dwayne Du 4 Oct 10, 2022
A distributed event bus that implements a RESTful API abstraction on top of Kafka-like queues

Nakadi Event Broker Nakadi is a distributed event bus broker that implements a RESTful API abstraction on top of Kafka-like queues, which can be used

Zalando SE 866 Dec 21, 2022