FLiP: StreamNative: Cloud-Native: Streaming Analytics Using Apache Flink SQL on Apache Pulsar

Overview

StreamingAnalyticsUsingFlinkSQL

FLiP: StreamNative: Cloud-Native: Streaming Analytics Using Apache Flink SQL on Apache Pulsar

Running on NVIDIA XAVIER NX - 6 CPU, GPU, 8GB RAM

Xavier

Compile Java

jetson_clocks

mvn clean compile assembly:single

Create Your Topic and Schema

StreamNative Cloud Schema

Run Python and Java

/dev/null java -jar IoTProducer-1.0-jar-with-dependencies.jar --topic 'jetsoniot2' --serviceUrl pulsar+ssl://cluster.org.snio.cloud:6651 --audience urn:sn:pulsar:org:cluster --issuerUrl https://auth.streamnative.cloud --privateKey file:///home/nvidia/nvme/pulsar-demo/org-tspann.json --message "`tail -1 /home/nvidia/nvme/logs/demo1.log`" sleep 1 done ">

#!/bin/bash

while :
do

        DATE=$(date +"%Y-%m-%d_%H%M")
        python3 -W ignore /home/nvidia/nvme/minifi-jetson-xavier/demo.py --camera /dev/video0 --network googlenet /home/nvidia/nvme/images/$DATE.jpg  2>/dev/null

        java -jar IoTProducer-1.0-jar-with-dependencies.jar --topic 'jetsoniot2' --serviceUrl pulsar+ssl://cluster.org.snio.cloud:6651 --audience urn:sn:pulsar:org:cluster --issuerUrl https://auth.streamnative.cloud --privateKey file:///home/nvidia/nvme/pulsar-demo/org-tspann.json --message "`tail -1 /home/nvidia/nvme/logs/demo1.log`"

        sleep 1
done


Create a New Topic

StreamNative Cloud Create New Topic

Create a New Subscription

StreamNative Cloud Create New Subscription

Browse Data

StreamNative Cloud Consumer

Created Schema

{
    "type": "record",
    "name": "IoTMessage",
    "namespace": "io.streamnative.examples.oauth2",
    "fields": [
        {
            "name": "camera",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "cpu",
            "type": "double"
        },
        {
            "name": "cputemp",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "cputempf",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "diskusage",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "filename",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "gputemp",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "gputempf",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "host",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "host_name",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "imageinput",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "ipaddress",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "macaddress",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "memory",
            "type": "double"
        },
        {
            "name": "networktime",
            "type": "double"
        },
        {
            "name": "runtime",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "systemtime",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "te",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "top1",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "top1pct",
            "type": "double"
        },
        {
            "name": "uuid",
            "type": [
                "null",
                "string"
            ],
            "default": null
        }
    ]
}

Create a Flink SQL Table on Pulsar


CREATE TABLE jetsoniot3
(
  `id` STRING, uuid STRING, ir STRING,
  `end` STRING, lux STRING, gputemp STRING, 
  cputemp STRING, `te` STRING, systemtime STRING, hum STRING,
 memory STRING, gas STRING, pressure STRING, 
 `host` STRING, diskusage STRING, ipaddress STRING, macaddress STRING, 
  gputempf STRING, host_name STRING, camera STRING, filename STRING, 
    `runtime` STRING, cpu STRING,cputempf STRING, imageinput STRING,
    `networktime` STRING, top1 STRING, top1pct STRING, 
  publishTime TIMESTAMP(3) METADATA,
  WATERMARK FOR publishTime AS publishTime - INTERVAL '5' SECOND
) WITH (
  'topic' = 'persistent://public/default/jetsoniot2',
  'value.format' = 'json',
  'scan.startup.mode' = 'earliest'
)

CREATE TABLE topitems (
  uuid  STRING,
  top1 STRING, top1pct STRING, 
  camera STRING,
  systemtime STRING,
  cputempf STRING,
  gputempf STRING,
  insert_time TIMESTAMP(3)
)

CREATE TABLE jetsoniotresults
(
  uuid STRING,
camera STRING,
 ipaddress STRING,
    `networktime` STRING,
top1pct double, 
 top1 STRING, 
cputemp STRING,
gputemp STRING,
gputempf STRING,
cputempf STRING,
`runtime` STRING,
`host` STRING,
`filename` STRING,
imageinput STRING,
host_name STRING,
macaddress STRING,
`te` STRING,
`systemtime` STRING,
cpu double,
`diskusage` STRING,
`memory` double,
  publishTime TIMESTAMP(3) METADATA,
  WATERMARK FOR publishTime AS publishTime - INTERVAL '5' SECOND
)

Run Your Flink SQL

StreamNative Cloud Flink SQL

select cputempf, gputempf, diskusage, cpu, systemtime, uuid
from jetsoniot2
where cputempf > 80


INSERT INTO jetsoniot2 VALUES
  (1, 100, 30.15, CURRENT_TIMESTAMP),
  (2, 200, 40, CURRENT_TIMESTAMP),
  (3, 300, 28000.56, CURRENT_TIMESTAMP),
  (4, 400, 42960.90, CURRENT_TIMESTAMP),
  (5, 500, 50000.1, CURRENT_TIMESTAMP),
  (6, 100, 688888888.7, CURRENT_TIMESTAMP),
  (7, 300, 20.99, CURRENT_TIMESTAMP),
  (8, 100, 6000, CURRENT_TIMESTAMP)
  
  
  select camera,
        max(cputempf) as maxcputempf, avg(cputempf) as avgcputtempf, min(cputempf) as mincputempf  
from jetsoniot2 
group by camera


select camera,
        max(cputempf) as maxcputempf
from jetsoniot2 /*+ OPTIONS('scan.startup.mode'='earliest') */
group by camera

select *
from jetsoniot2 /*+ OPTIONS('scan.startup.mode'='earliest') */

select camera,
        min(cputempf) as mincputempf
from jetsoniot2 /*+ OPTIONS('scan.startup.mode'='earliest') */
group by camera


Checks


jtop

Resources (Ops, DevOps, Management, Administration, SQL, Compute, Deploy)

You might also like...

Kafka example - a simple producer and consumer for kafka using spring boot + java

Kafka example - a simple producer and consumer for kafka using spring boot + java

Feb 18, 2022

Mirror of Apache Kafka

Apache Kafka See our web site for details on the project. You need to have Java installed. We build and test Apache Kafka with Java 8, 11 and 15. We s

Jan 5, 2023

Mirror of Apache RocketMQ

Apache RocketMQ Apache RocketMQ is a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level c

Dec 28, 2022

Apache Camel is an open source integration framework that empowers you to quickly and easily integrate various systems consuming or producing data.

Apache Camel Apache Camel is a powerful, open-source integration framework based on prevalent Enterprise Integration Patterns with powerful bean integ

Dec 31, 2022

Mirror of Apache ActiveMQ

Welcome to Apache ActiveMQ Apache ActiveMQ is a high performance Apache 2.0 licensed Message Broker and JMS 1.1 implementation. Getting Started To hel

Jan 2, 2023

Mirror of Apache ActiveMQ Artemis

ActiveMQ Artemis This file describes some minimum 'stuff one needs to know' to get started coding in this project. Source For details about the modify

Dec 26, 2022

An XMPP server licensed under the Open Source Apache License.

Openfire About Openfire is a real time collaboration (RTC) server licensed under the Open Source Apache License. It uses the only widely adopted open

Jan 3, 2023

Kryptonite is a turn-key ready transformation (SMT) for Apache Kafka® Connect to do field-level 🔒 encryption/decryption 🔓 of records. It's an UNOFFICIAL community project.

Kryptonite - An SMT for Kafka Connect Kryptonite is a turn-key ready transformation (SMT) for Apache Kafka® to do field-level encryption/decryption of

Jan 3, 2023

RocketMQ-on-Pulsar - A protocol handler that brings native RocketMQ protocol to Apache Pulsar

RocketMQ-on-Pulsar - A protocol handler that brings native RocketMQ protocol to Apache Pulsar

RocketMQ on Pulsar(RoP) RoP stands for RocketMQ on Pulsar. Rop broker supports RocketMQ-4.6.1 protocol, and is backed by Pulsar. RoP is implemented as

Jan 4, 2023

Dagger is an easy-to-use, configuration over code, cloud-native framework built on top of Apache Flink for stateful processing of real-time streaming data.

Dagger Dagger or Data Aggregator is an easy-to-use, configuration over code, cloud-native framework built on top of Apache Flink for stateful processi

Dec 22, 2022

Flink Table Store is a unified streaming and batch store for building dynamic tables on Apache Flink

Flink Table Store is a unified streaming and batch store for building dynamic tables on Apache Flink

Jan 1, 2023

SpringBoot show case application for reactive-pulsar library (Reactive Streams adapter for Apache Pulsar Java Client)

Reactive Pulsar Client show case application Prerequisites Cloning reactive-pulsar Running this application requires cloning https://github.com/lhotar

Nov 10, 2022

🚀flink-sql-submit is a custom SQL submission client

🚀flink-sql-submit is a custom SQL submission client This is a customizable extension of the client, unlike flink's official default client.

Mar 28, 2022

Flink CDC Connectors is a set of source connectors for Apache Flink

Flink CDC Connectors is a set of source connectors for Apache Flink

Flink CDC Connectors is a set of source connectors for Apache Flink, ingesting changes from different databases using change data capture (CDC). The Flink CDC Connectors integrates Debezium as the engine to capture data changes.

Mar 23, 2022

Firehose is an extensible, no-code, and cloud-native service to load real-time streaming data from Kafka to data stores, data lakes, and analytical storage systems.

Firehose - Firehose is an extensible, no-code, and cloud-native service to load real-time streaming data from Kafka to data stores, data lakes, and analytical storage systems.

Dec 22, 2022

Open data platform based on flink. Now scaleph is supporting data integration with seatunnel on flink

scaleph The Scaleph project features data integration, develop, job schedule and orchestration and trys to provide one-stop data platform for develope

Jan 3, 2023

Web-based notebook that enables data-driven, interactive data analytics and collaborative documents with SQL, Scala and more.

Apache Zeppelin Documentation: User Guide Mailing Lists: User and Dev mailing list Continuous Integration: Contributing: Contribution Guide Issue Trac

Jan 8, 2023

PolarDB-X is a cloud native distributed SQL Database designed for high concurrency, massive storage, complex querying scenarios.

PolarDB-X is a cloud native distributed SQL Database designed for high concurrency, massive storage, complex querying scenarios.

中文文档 What is PolarDB-X ? PolarDB-X is a cloud native distributed SQL Database designed for high concurrency, massive storage and complex querying scen

Dec 31, 2022

Spring Native provides beta support for compiling Spring applications to native executables using GraalVM native-image compiler.

Spring Native provides beta support for compiling Spring applications to native executables using GraalVM native-image compiler.

Spring Native provides beta support for compiling Spring applications to native executables using GraalVM native-image compiler.

Jan 6, 2023
Owner
Timothy Spann
Dev Advocate, Java Dev, Cloud Analytics Dev, Apache NiFi Dev, Data Engineer, IoT Dev, AI, ML, Python Developer, RP, FLANK, FLIP, Pulsar, Streamnative
Timothy Spann
Dagger is an easy-to-use, configuration over code, cloud-native framework built on top of Apache Flink for stateful processing of real-time streaming data.

Dagger Dagger or Data Aggregator is an easy-to-use, configuration over code, cloud-native framework built on top of Apache Flink for stateful processi

Open DataOps Foundation 238 Dec 22, 2022
Firehose is an extensible, no-code, and cloud-native service to load real-time streaming data from Kafka to data stores, data lakes, and analytical storage systems.

Firehose - Firehose is an extensible, no-code, and cloud-native service to load real-time streaming data from Kafka to data stores, data lakes, and analytical storage systems.

Open DataOps Foundation 279 Dec 22, 2022
Open data platform based on flink. Now scaleph is supporting data integration with seatunnel on flink

scaleph The Scaleph project features data integration, develop, job schedule and orchestration and trys to provide one-stop data platform for develope

null 151 Jan 3, 2023
Apache Pulsar - distributed pub-sub messaging system

Pulsar is a distributed pub-sub messaging platform with a very flexible messaging model and an intuitive client API. Learn more about Pulsar at https:

The Apache Software Foundation 12.1k Jan 4, 2023
Template for an Apache Flink project.

Minimal Apache Flink Project Template It contains some basic jobs for testing if everything runs smoothly. How to Use This Repository Import this repo

Timo Walther 2 Sep 20, 2022
Pipeline for Visualization of Streaming Data

Seminararbeit zum Thema Visualisierung von Datenströmen Diese Arbeit entstand als Seminararbeit im Rahmen der Veranstaltung Event Processing an der Ho

Domenic Cassisi 1 Feb 13, 2022
Flink Demo

flink-demo minimum code just run flink-ds-connector DataStream API usage kafka es jdbc file row string parquet avro avro custom avro flink-sql-connect

hiscat 40 Dec 4, 2022
Real Time communication library using Animated Gifs as a transport™

gifsockets "This library is the websockets of the '90s" - Somebody at Hacker News. This library shows how to achieve realtime text communication using

Alvaro Videla 1.8k Dec 17, 2022
Examples using libp5x

Examples for libp5x libp5x is a modularized fork of Processing used inside PraxisLIVE. Based on Processing v3.5.3, it fully supports Java 11+ and the

Codelerity 4 May 17, 2021
Microservice-based online payment system for customers and merchants using RESTful APIs and message queues

Microservice-based online payment system for customers and merchants using RESTful APIs and message queues

Daniel Larsen 1 Mar 23, 2022