Distributed Stream and Batch Processing

Overview

Join the
community on Slack Code Quality: Java Docker pulls Downloads Contributors

What is Jet

Jet is an open-source, in-memory, distributed batch and stream processing engine. You can use it to process large volumes of real-time events or huge batches of static datasets. To give a sense of scale, a single node of Jet has been proven to aggregate 10 million events per second with latency under 10 milliseconds.

It provides a Java API to build stream and batch processing applications through the use of a dataflow programming model. After you deploy your application to a Jet cluster, Jet will automatically use all the computational resources on the cluster to run your application.

If you add more nodes to the cluster while your application is running, Jet automatically scales up your application to run on the new nodes. If you remove nodes from the cluster, it scales it down seamlessly without losing the current computational state, providing exactly-once processing guarantees.

For example, you can represent the classical word count problem that reads some local files and outputs the frequency of each word to console using the following API:

JetInstance jet = Jet.bootstrappedInstance();

Pipeline p = Pipeline.create();
p.readFrom(Sources.files("/path/to/text-files"))
 .flatMap(line -> traverseArray(line.toLowerCase().split("\\W+")))
 .filter(word -> !word.isEmpty())
 .groupingKey(word -> word)
 .aggregate(counting())
 .writeTo(Sinks.logger());

jet.newJob(p).join();

and then deploy the application to the cluster:

bin/jet submit word-count.jar

Another application which aggregates millions of sensor readings per second with 10-millisecond resolution from Kafka looks like the following:

Pipeline p = Pipeline.create();

p.readFrom(KafkaSources.<String, Reading>kafka(kafkaProperties, "sensors"))
 .withTimestamps(event -> event.getValue().timestamp(), 10) // use event timestamp, allowed lag in ms
 .groupingKey(reading -> reading.sensorId())
 .window(sliding(1_000, 10)) // sliding window of 1s by 10ms
 .aggregate(averagingDouble(reading -> reading.temperature()))
 .writeTo(Sinks.logger());

jet.newJob(p).join();

Jet comes with out-of-the-box support for many kinds of data sources and sinks, including:

  • Apache Kafka
  • Local Files (Text, Avro, JSON)
  • Apache Hadoop (Azure Data Lake, S3, GCS)
  • Apache Pulsar
  • Debezium
  • Elasticsearch
  • JDBC
  • JMS
  • InfluxDB
  • Hazelcast
  • Redis
  • MongoDB
  • Twitter

When Should You Use Jet

Jet is a good fit when you need to process large amounts of data in a distributed fashion. You can use it to build a variety of data-processing applications, such as:

  • Low-latency stateful stream processing. For example, detecting trends in 100 Hz sensor data from 100,000 devices and sending corrective feedback within 10 milliseconds.
  • High-throughput, large-state stream processing. For example, tracking GPS locations of millions of users, inferring their velocity vectors.
  • Batch processing of big data volumes, for example analyzing a day's worth of stock trading data to update the risk exposure of a given portfolio.

Key Features

Predictable Latency Under Load

Jet uses a unique execution model with cooperative multithreading and can achieve extremely low latencies while processing millions of items per second on just a single node:

The engine is able to run anywhere from tens to thousands of jobs concurrently on a fixed number of threads.

Fault Tolerance With No Infrastructure

Jet stores computational state in a distributed, replicated in-memory store and does not require the presence of a distributed file system nor infrastructure like Zookeeper to provide high-availability and fault-tolerance.

Jet implements a version of the Chandy-Lamport algorithm to provide exactly-once processing under the face of failures. When interfacing with external transactional systems like databases, it can provide end-to-end processing guarantees using two-phase commit.

Advanced Event Processing

Event data can often arrive out of order and Jet has first-class support for dealing with this disorder. Jet implements a technique called distributed watermarks to treat disordered events as if they were arriving in order.

How Do I Get Started

Follow the Get Started guide to start using Jet.

Download

You can download Jet from https://jet-start.sh.

Alternatively, you can use the latest docker image:

docker run -p 5701:5701 hazelcast/hazelcast-jet

Use the following Maven coordinates to add Jet to your application:

<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-jet</artifactId>
<version>4.2</version>

Tutorials

See the tutorials for tutorials on using Jet. Some examples:

Reference

Jet supports a variety of transforms and operators. These include:

Community

Hazelcast Jet team actively answers questions on Stack Overflow and Hazelcast Community Slack.

You are also encouraged to join the hazelcast-jet mailing list if you are interested in community discussions

How Can I Contribute

Thanks for your interest in contributing! The easiest way is to just send a pull request. Have a look at the issues marked as good first issue for some guidance.

Building From Source

To build, use:

./mvnw clean package -DskipTests

Use Latest Snapshot Release

You can always use the latest snapshot release if you want to try the features currently under development.

Maven snippet:

<repositories>
    <repository>
        <id>snapshot-repository</id>
        <name>Maven2 Snapshot Repository</name>
        <url>https://oss.sonatype.org/content/repositories/snapshots</url>
        <snapshots>
            <enabled>true</enabled>
            <updatePolicy>daily</updatePolicy>
        </snapshots>
    </repository>
</repositories>
<dependencies>
    <dependency>
        <groupId>com.hazelcast.jet</groupId>
        <artifactId>hazelcast-jet</artifactId>
        <version>4.3-SNAPSHOT</version>
    </dependency>
</dependencies>

Trigger Phrases in the Pull Request Conversation

When you create a pull request (PR), it must pass a build-and-test procedure. Maintainers will be notified about your PR, and they can trigger the build using special comments. These are the phrases you may see used in the comments on your PR:

  • verify - run the default PR builder, equivalent to mvn clean install
  • run-nightly-tests - use the settings for the nightly build (mvn clean install -Pnightly). This includes slower tests in the run, which we don't normally run on every PR
  • run-windows - run the tests on a Windows machine (HighFive is not supported here)
  • run-cdc-debezium-tests - run all tests in the extensions/cdc-debezium module
  • run-cdc-mysql-tests - run all tests in the extensions/cdc-mysql module
  • run-cdc-postgres-tests - run all tests in the extensions/cdc-postgres module

Where not indicated, the builds run on a Linux machine with Oracle JDK 8.

License

Source code in this repository is covered by one of two licenses:

  1. Apache License 2.0
  2. Hazelcast Community License

The default license throughout the repository is Apache License 2.0 unless the header specifies another license. Please see the Licensing section for more information.

Credits

We owe (the good parts of) our CLI tool's user experience to picocli.

Copyright

Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.

Visit www.hazelcast.com for more info.

Comments
  • [005] Define behaviour and network tests for MySQL CDC

    [005] Define behaviour and network tests for MySQL CDC

    Sorting out the behaviour of CDC sources when experiencing connectivity issues with the underlying databases.

    Checklist

    • [x] Tags Set
    • [x] Milestone Set
    cdc 
    opened by jbartok 47
  • Return result to caller

    Return result to caller

    New Jet feature that pushes a Jet job's results directly to the client that submitted it. The mechanism isn't strictly bound to the submitting client so another client could also subscribe to them.

    core Pipeline API 
    opened by jbartok 37
  • Backport fixes

    Backport fixes

    Fixes #3120

    Checklist:

    • [x] Labels and Milestone set
    • [ ] Added a line in hazelcast-jet-distribution/src/root/release_notes.txt (for any non-trivial fix/enhancement/feature)
    • [ ] New public APIs have @Nonnull/@Nullable annotations
    • [ ] New public APIs have @since tags in Javadoc
    security 
    opened by TomaszGaweda 28
  • Adding test data sources designed for benchmarking purposes

    Adding test data sources designed for benchmarking purposes

    Adding this streaming source:

    StreamSource<Long> longStream(long eventsPerSecond, long initialDelayMillis) 
    

    It is a high-throughput source with precisely controlled timing, to be used for benchmarking Jet's throughput and latency.

    Fixes #2244

    Checklist

    • [x] Tags Set
    • [x] Milestone Set
    • [x] New public APIs have @Nonnull/@Nullable annotations
    • [x] New public APIs have @since tags in Javadoc
    enhancement community 
    opened by guenter-hesse 23
  • Improve job lifecycle management

    Improve job lifecycle management

    • Cancel jobs by sending cancellation operations instead of relying on the operation cancellation api.
    • Job.join() returns only when the job is completed on the cluster.
    • Add COMPLETING status. During the job is being completed, its status is reported as COMPLETING.
    • Name jobs
    • Simplify method names in the Job interface
    • Change the job submission phase. Once the job resources are uploaded, the job record is created first. Then, the caller joins to the job. This approach eliminates the race between submitting the job and querying it afterwards.
    enhancement core breaking-change 
    opened by metanet 23
  • Add mixins to all commands to configure address and cluster name

    Add mixins to all commands to configure address and cluster name

    Because the options are already used in other commands, -n/--cluster-name and -a/--addresses will be replaced by -t/--targets. It's possible provide -t/--targets option before and after the command, option after command takes precedence.

    Before jet -n jet -a 127.0.0.1:5701 submit job.jar After jet submit -t [email protected]:5701 job.jar jet -t [email protected]:5701 submit job.jar

    Checklist

    • [x] Tags Set
    • [x] Milestone Set
    • [x] Any breaking changes are documented

    Fixes #2045

    List of breaking changes:

    • -n/--cluster-name and -a/--addresses are removed from Jet Command Line, and replaced by -t/--targets
    enhancement cli 
    opened by caioguedes 22
  • NonCooperative processors - high idle CPU consumption

    NonCooperative processors - high idle CPU consumption

    Hello Jet Team, I have observed high cpu consumption when lots of nonCooperative processors are used in DAG. Example given below takes 40% CPU, with VERTEX_COUNT = 200 it takes 100% CPU on dev laptop.

    INFO: [127.0.0.1]:5001 [dev] [0.6-SNAPSHOT] processors=4, physical.memory.total=15,9G, physical.memory.free=7,5G, swap.space.total=16,9G, swap.space.free=6,5G, heap.memory.used=137,4M, heap.memory.free=144,1M, heap.memory.total=281,5M, heap.memory.max=3,5G, heap.memory.used/total=48,82%, heap.memory.used/max=3,81%, minor.gc.count=12, minor.gc.time=93ms, major.gc.count=1, major.gc.time=40ms, load.process=10,10%, load.system=76,52%, load.systemAverage=n/a thread.count=436, thread.peakCount=439, cluster.timeDiff=0, event.q.size=0, executor.q.async.size=0, executor.q.client.size=0, executor.q.query.size=0, executor.q.scheduled.size=0, executor.q.io.size=0, executor.q.system.size=0, executor.q.operations.size=0, executor.q.priorityOperation.size=0, operations.completed.count=1177, executor.q.mapLoad.size=0, executor.q.mapLoadAllKeys.size=0, executor.q.cluster.size=0, executor.q.response.size=0, operations.running.count=0, operations.pending.invocations.percentage=0,00%, operations.pending.invocations.count=273, proxy.count=0, clientEndpoint.count=0, connection.active.count=0, client.connection.count=0, connection.count=0

    See test below:

    import com.hazelcast.config.Config;
    import com.hazelcast.jet.config.JetConfig;
    import com.hazelcast.jet.core.*;
    import com.hazelcast.jet.core.processor.Processors;
    import com.hazelcast.jet.core.processor.SourceProcessors;
    import com.hazelcast.jet.pipeline.JournalInitialPosition;
    import org.junit.Test;
    
    public class NonCooperativePerformanceTest extends JetTestSupport {
        private static final int VERTEX_COUNT = 100;
        
        @Test
        public void test() {
            DAG dag = buildDAG();
            JetConfig jetConfig = new JetConfig();
            Config hazelcastConfig = new Config();
            hazelcastConfig.getMapEventJournalConfig("test*").setEnabled(true);
            jetConfig.setHazelcastConfig(hazelcastConfig);
            createJetMember(jetConfig).newJob(dag).join();
        }
    
        private DAG buildDAG() {
            DAG dag = new DAG();
            Vertex source = dag.newVertex("source", SourceProcessors.streamMapP("testmap", JournalInitialPosition.START_FROM_CURRENT, WatermarkGenerationParams.noWatermarks()));
            for (int i = 0; i < VERTEX_COUNT; i++) {
                // OK: Vertex v = dag.newVertex("vertex" + i, Processors.mapP(o -> o));
                Vertex v = dag.newVertex("ncvertex" + i, Processors.nonCooperativeP(Processors.mapP(o -> o)));
                dag.edge(Edge.from(source).to(v));
                source = v;
            }
            return dag;
        }
    }
    

    I have observed a similar behavior on the deployed application during idle period.

      PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
     5097 root      20   0 40.369g 3.854g  17344 S 869.4  3.5   1834:44 java -Djava.security.egd=file:/dev/./urandom -Dhazelcast.jmx=true -Dspring.profiles.active=...
    
    enhancement core 
    opened by lukasherman 21
  • Implement processed offset feedback in CDC sources

    Implement processed offset feedback in CDC sources

    Fixes #2841

    Checklist:

    • [x] Labels and Milestone set
    • [ ] Added a line in hazelcast-jet-distribution/src/root/release_notes.txt (for any non-trivial fix/enhancement/feature)
    • [x] New public APIs have @Nonnull/@Nullable annotations
    • [x] New public APIs have @since tags in Javadoc
    cdc 
    opened by jbartok 20
  • Inner join distinction

    Inner join distinction

    As discussed on Gitter and mentioned in #1238, new users often don't know that hashJoin is not inner join (like their intuition says). Adding innerHashJoin with pre-filtering will reduce filtering in user apps and make API more clear for new users.

    Checklist

    • [x] Tags Set
    • [x] Milestone Set
    • [x] New public APIs have @Nonnull/@Nullable annotations
    • [x] New public APIs have @since tags in Javadoc
    • [x] No breaking changes

    Fixes #1238

    enhancement Pipeline API community 
    opened by TomaszGaweda 19
  • Fix MySql related intermittent network test failures (issue #2534)

    Fix MySql related intermittent network test failures (issue #2534)

    Issue is caused by some kind of cleanup problem, affecting the Ryuk resource reapers. This resource manager is not mandatory, tests already do manual container management, so will disable it. Hopefully the reduced complexity will get rid of the clean-up issues experienced.

    Fixes #2534

    test-failure cdc 
    opened by jbartok 18
  • BatchStage.sort()

    BatchStage.sort()

    Introduces sorting feature for batch use cases of hazelcast jet. Usage: BatchStage.sort() for sorting based on natural ordering of items. BatchStage.sort(ComparatorEx<V> comparator) for sorting based on user-defined comparator.

    Breaking change: this changes the Edge and EdgeDef serialization format, breaking the backwards compatibility of a suspended job if it resumes on a newer Jet version.

    Checklist

    • [x] Tags Set
    • [x] Milestone Set
    • [x] New public APIs have @Nonnull/@Nullable annotations
    • [x] New public APIs have @since tags in Javadoc
    enhancement Pipeline API breaking-change-core 
    opened by MohamedMandouh 17
  • Vulnerabilities in parquet-jackson used by Jet

    Vulnerabilities in parquet-jackson used by Jet

    Jet uses parquet-jackson in version 1.12.3 which shades com.fasterxml.jackson.core:jackson-databind:2.13.2.2 which includes following vulnerabilities:

    • CVE-2022-42003 - https://nvd.nist.gov/vuln/detail/CVE-2022-42003
    • CVE-2022-42004 - https://nvd.nist.gov/vuln/detail/CVE-2022-42004

    It's the same as https://github.com/hazelcast/hazelcast/issues/22407#issuecomment-1268404278

    security severity:high 
    opened by olukas 1
  • Vulnerabilities in snakeyaml used by Jet

    Vulnerabilities in snakeyaml used by Jet

    Jet elasticsearch uses snakeyaml in version 1.33 which includes following vulnerability:

    • CVE-2022-1471 - https://ossindex.sonatype.org/vulnerability/CVE-2022-1471

    The same CVE is in jmx_prometheus_javaagent-0.16.1.jar (shaded: org.yaml:snakeyaml:1.29).

    security severity:high 
    opened by olukas 1
  • fix(sec): upgrade org.apache.logging.log4j:log4j-core to 2.17.1

    fix(sec): upgrade org.apache.logging.log4j:log4j-core to 2.17.1

    What happened?

    There are 1 security vulnerabilities found in org.apache.logging.log4j:log4j-core 2.13.3

    What did I do?

    Upgrade org.apache.logging.log4j:log4j-core from 2.13.3 to 2.17.1 for vulnerability fix

    What did you expect to happen?

    Ideally, no insecure libs should be used.

    The specification of the pull request

    PR Specification from OSCS

    opened by 645775992 8
  • fix(sec): upgrade org.springframework:spring-context to 5.3.19

    fix(sec): upgrade org.springframework:spring-context to 5.3.19

    What happened?

    There are 1 security vulnerabilities found in org.springframework:spring-context 4.3.26.RELEASE

    What did I do?

    Upgrade org.springframework:spring-context from 4.3.26.RELEASE to 5.3.19 for vulnerability fix

    What did you expect to happen?

    Ideally, no insecure libs should be used.

    The specification of the pull request

    PR Specification from OSCS

    opened by chncaption 8
  • fix(sec): upgrade org.apache.hadoop:hadoop-common to 3.3.3

    fix(sec): upgrade org.apache.hadoop:hadoop-common to 3.3.3

    What happened?

    There are 1 security vulnerabilities found in org.apache.hadoop:hadoop-common 3.3.0

    What did I do?

    Upgrade org.apache.hadoop:hadoop-common from 3.3.0 to 3.3.3 for vulnerability fix

    What did you expect to happen?

    Ideally, no insecure libs should be used.

    The specification of the pull request

    PR Specification from OSCS

    opened by zhoumengyks 8
  • fix(sec): upgrade com.h2database:h2 to 2.1.210

    fix(sec): upgrade com.h2database:h2 to 2.1.210

    What happened?

    There are 1 security vulnerabilities found in com.h2database:h2 1.4.187

    What did I do?

    Upgrade com.h2database:h2 from 1.4.187 to 2.1.210 for vulnerability fix

    What did you expect to happen?

    Ideally, no insecure libs should be used.

    The specification of the pull request

    PR Specification from OSCS

    opened by Super-Sky 9
Releases(v4.5.4)
Owner
hazelcast
Hazelcast, the leading in-memory computing platform
hazelcast
A reactive dataflow engine, a data stream processing framework using Vert.x

?? NeonBee Core NeonBee is an open source reactive dataflow engine, a data stream processing framework using Vert.x. Description NeonBee abstracts mos

SAP 33 Jan 4, 2023
Netflix, Inc. 23.1k Jan 5, 2023
Build highly concurrent, distributed, and resilient message-driven applications on the JVM

Akka We believe that writing correct concurrent & distributed, resilient and elastic applications is too hard. Most of the time it's because we are us

Akka Project 12.6k Jan 3, 2023
Apache Mesos is a cluster manager that provides efficient resource isolation and sharing across distributed applications, or frameworks

Apache Mesos is a cluster manager that provides efficient resource isolation and sharing across distributed applications, or frameworks. It can run Hadoop, Jenkins, Spark, Aurora, and other frameworks on a dynamically shared pool of nodes.

The Apache Software Foundation 5k Dec 31, 2022
A reactive Java framework for building fault-tolerant distributed systems

Atomix Website | Javadoc | Slack | Google Group A reactive Java framework for building fault-tolerant distributed systems Please see the website for f

Atomix 2.3k Dec 29, 2022
APM, (Application Performance Management) tool for large-scale distributed systems.

Visit our official web site for more information and Latest updates on Pinpoint. Latest Release (2020/01/21) We're happy to announce the release of Pi

null 12.5k Dec 29, 2022
Orbit - Virtual actor framework for building distributed systems

Full Documentation See the documentation website for full documentation, examples and other information. Orbit 1 Looking for Orbit 1? Visit the orbit1

Orbit 1.7k Dec 28, 2022
BitTorrent library and client with DHT, magnet links, encryption and more

Bt A full-featured BitTorrent implementation in Java 8 peer exchange | magnet links | DHT | encryption | LSD | private trackers | extended protocol |

Andrei Tomashpolskiy 2.1k Jan 2, 2023
Fault tolerance and resilience patterns for the JVM

Failsafe Failsafe is a lightweight, zero-dependency library for handling failures in Java 8+, with a concise API for handling everyday use cases and t

Jonathan Halterman 3.9k Dec 29, 2022
Fibers, Channels and Actors for the JVM

Quasar Fibers, Channels and Actors for the JVM Getting started Add the following Maven/Gradle dependencies: Feature Artifact Core (required) co.parall

Parallel Universe 4.5k Dec 25, 2022
Resilience4j is a fault tolerance library designed for Java8 and functional programming

Fault tolerance library designed for functional programming Table of Contents 1. Introduction 2. Documentation 3. Overview 4. Resilience patterns 5. S

Resilience4j 8.5k Jan 2, 2023
Zuul is a gateway service that provides dynamic routing, monitoring, resiliency, security, and more.

Zuul Zuul is an L7 application gateway that provides capabilities for dynamic routing, monitoring, resiliency, security, and more. Please view the wik

Netflix, Inc. 12.4k Jan 3, 2023
a blockchain network simulator aimed at researching consensus algorithms for performance and security

Just Another Blockchain Simulator JABS - Just Another Blockchain Simulator. JABS is a blockchain network simulator aimed at researching consensus algo

null 49 Jan 1, 2023
Simple and lightweight sip server to create voice robots, based on vert.x

Overview Lightweight SIP application built on vert.x. It's intended to be used as addon for full-featured PBX to implement programmable voice scenario

Ivoice Technology 7 May 15, 2022
Distributed and fault-tolerant realtime computation: stream processing, continuous computation, distributed RPC, and more

IMPORTANT NOTE!!! Storm has Moved to Apache. The official Storm git repository is now hosted by Apache, and is mirrored on github here: https://github

Nathan Marz 8.9k Dec 26, 2022
Stream Processing and Complex Event Processing Engine

Siddhi Core Libraries Siddhi is a cloud native Streaming and Complex Event Processing engine that understands Streaming SQL queries in order to captur

Siddhi - Cloud Native Stream Processor 1.4k Jan 6, 2023
Table-Computing (Simplified as TC) is a distributed light weighted, high performance and low latency stream processing and data analysis framework. Milliseconds latency and 10+ times faster than Flink for complicated use cases.

Table-Computing Welcome to the Table-Computing GitHub. Table-Computing (Simplified as TC) is a distributed light weighted, high performance and low la

Alibaba 34 Oct 14, 2022
Apache Heron (Incubating) is a realtime, distributed, fault-tolerant stream processing engine from Twitter

Heron is a realtime analytics platform developed by Twitter. It has a wide array of architectural improvements over it's predecessor. Heron in Apache

The Apache Software Foundation 3.6k Dec 28, 2022
A distributed data integration framework that simplifies common aspects of big data integration such as data ingestion, replication, organization and lifecycle management for both streaming and batch data ecosystems.

Apache Gobblin Apache Gobblin is a highly scalable data management solution for structured and byte-oriented data in heterogeneous data ecosystems. Ca

The Apache Software Foundation 2.1k Jan 4, 2023