Table-Computing (Simplified as TC) is a distributed light weighted, high performance and low latency stream processing and data analysis framework. Milliseconds latency and 10+ times faster than Flink for complicated use cases.

Overview

Table-Computing

Welcome to the Table-Computing GitHub.

Table-Computing (Simplified as TC) is a distributed light weighted, high performance and low latency stream processing and data analysis framework. From our using experience TC can achieve milliseconds latency and 10+ times faster than Flink for complicated use cases. For the same streaming task we use TC achieved 10+ times computing resource saving.

Example

Computes the last hour top 100 sales volume ranking list every half hour

MysqlDimensionTable mysqlDimensionTable = new MysqlDimensionTable("jdbc:mysql://localhost:3306/e-commerce",
        "commodity",
        "userName",
        "password",
        Duration.ofHours(1),
        new ColumnTypeBuilder()
        .column("id", Type.INT)
        .column("name", Type.VARCHAR)
        .column("price", Type.INT)
        .build(),
        "id"
        );

Map<String, Type> columnTypeMap = new ColumnTypeBuilder()
        .column("__time__", Type.BIGINT)
        .column("id", Type.BIGINT)
        .column("commodity_id", Type.INT)
        .column("count", Type.INT)
        .build();

KafkaStreamTable kafkaStreamTable = new KafkaStreamTable(bootstrapServers,
        "consumerGroupId",
        topic,
        0,
        columnTypeMap);
kafkaStreamTable.start();

StreamProcessing sp = new StreamProcessing();
String[] hashBy = new String[]{"commodity_id"};
Rehash rehashForSlideWindow = sp.rehash("uniqueNameForSlideWindow", hashBy);
String[] returnedColumns = new String[]{"commodity_id",
        "sales_volume",
        "saleroom",
        "window_start"};
SlideWindow slideWindow = new SlideWindow(Duration.ofHours(1),
        Duration.ofMinutes(30),
        hashBy,
        "__time__",
        new AggTimeWindowFunction() {
            @Override
            public Comparable[] agg(List<Comparable> partitionByColumns, List<Row> rows, long windowStart, long windowEnd) {
                return new Comparable[]{
                        partitionByColumns.get(0),
                        AggregationUtil.sumInt(rows, "count"),
                        AggregationUtil.sumInt(rows, "total_price"),
                        windowStart
                };
            }
        }, returnedColumns);
slideWindow.setWatermark(Duration.ofSeconds(2));

hashBy = new String[]{"window_start"};
Rehash rehashForSessionWindow = sp.rehash("uniqueNameForSessionWindow", hashBy);
SessionWindow sessionWindow = new SessionWindow(Duration.ofSeconds(1),
        hashBy,
        "window_start",
        new TimeWindowFunction() {
            @Override
            public List<Comparable[]> transform(List<Comparable> partitionByColumns, List<Row> rows, long windowStart, long windowEnd) {
                int[] top100 = WindowUtil.topN(rows, "sales_volume", 100);
                List<Comparable[]> ret = new ArrayList<>(100);
                for (int i = 0; i < top100.length; i++) {
                    ret.add(rows.get(top100[i]).getAll());
                }
                return ret;
            }
        }, returnedColumns);
sessionWindow.setWatermark(Duration.ofSeconds(3));

sp.compute(new Compute() {
    @Override
    public void compute(int myThreadIndex) throws InterruptedException {
        Table table = kafkaStreamTable.consume();
        table = table.select(new ScalarFunction() {
            @Override
            public Comparable[] returnOneRow(Row row) {
                TableIndex tableIndex = mysqlDimensionTable.curTable();
                // Use tableIndex.getRow but not mysqlDimensionTable.curTable().getRow. Consider that in some case
                // you may need to call mysqlDimensionTable.curTable() twice but the second call may correspond
                // to the newly reloaded dimension table which is not consistent with the first mysqlDimensionTable.curTable()
                Row commodity = tableIndex.getRow(row.getInteger("commodity_id"));
                return new Comparable[]{
                        commodity.getString("name"),
                        commodity.getInteger("price"),
                        row.getInteger("count") * commodity.getInteger("price")
                };
            }
        }, true, "commodity_name", "commodity_price", "total_price");
        List<Table> tables = rehashForSlideWindow.rehash(table, myThreadIndex);
        table = slideWindow.slide(tables);
        tables = rehashForSessionWindow.rehash(table, myThreadIndex);
        table = sessionWindow.session(tables);
        if (table.size() > 0) {
            table.print();
            //you can elegantly finish the streaming task when terminate condition is satisfied
            Thread.currentThread().interrupt();
        }
    }
});

Distributed deploy your table-computing task:

java -Dself=localhost:8888 -Dall=localhost:8888,localhost:9999 -jar my_task.jar -cp table-computing-1.0.0.jar;my_other_dependencies...

java -Dself=localhost:9999 -Dall=localhost:8888,localhost:9999 -jar my_task.jar -cp table-computing-1.0.0.jar;my_other_dependencies...

Optimize:

  1. Use only 1 thread concurrency to test the 1 thread throughput, then use upstream data volume divide 1 thread throughput to get the StreamProcessing concurrent thread number. The thread number should not be too large since thread race will lead to unnecessary resource consumption which maybe give rise to OOM (no enough CPU time to release the unused memory)
  2. -Xmx parameter should be appropriate. Since the table data are all store on the off-heap memory to improve performance too large -Xmx will lead to belatedly memory release which may give rise to OOM, while too small -Xmx will lead to too frequently GC to reduce the throughput.

Notice:

  1. For no continuous data case the AbstractStreamTable will return an empty table after sleep 100ms (default) to trigger computing, else the watermark data/window data/rehashed or rebalanced to other server/thread data will never be computed
  2. Reading dimension table thread will block until the dimension table finished loading

Copyright and License

Table-Computing is provided under the Apache-2.0 license.

You might also like...

Simple Binary Encoding (SBE) - High Performance Message Codec

Simple Binary Encoding (SBE) SBE is an OSI layer 6 presentation for encoding and decoding binary application messages for low-latency financial applic

Dec 28, 2022

Stream summarizer and cardinality estimator.

Description A Java library for summarizing data in streams for which it is infeasible to store all events. More specifically, there are classes for es

Dec 30, 2022

Clojure's data structures modified for use outside of Clojure

This library has been extracted from the master branch of Clojure (http://clojure.org) version 1.5.1 (as of October 2013) http://github.com/richhick

Oct 6, 2022

Eclipse Collections is a collections framework for Java with optimized data structures and a rich, functional and fluent API.

Eclipse Collections is a collections framework for Java with optimized data structures and a rich, functional and fluent API.

English | 中文 | Deutsch | Español | Ελληνικά | Français | 日本語 | Norsk (bokmål) | Português-Brasil | Русский | हिंदी Eclipse Collections is a comprehens

Dec 29, 2022

The Java collections framework provides a set of interfaces and classes to implement various data structures and algorithms.

The Java collections framework provides a set of interfaces and classes to implement various data structures and algorithms.

Homework #14 Table of Contents General Info Technologies Used Project Status Contact General Information Homework contains topics: Sorting an ArrayLis

Feb 12, 2022

A fork of Cliff Click's High Scale Library. Improved with bug fixes and a real build system.

High Scale Lib This is Boundary's fork of Cliff Click's high scale lib. We will be maintaining this fork with bug fixes, improvements and versioned bu

Jan 2, 2023

Replicate your Key Value Store across your network, with consistency, persistance and performance.

Chronicle Map Version Overview Chronicle Map is a super-fast, in-memory, non-blocking, key-value store, designed for low-latency, and/or multi-process

Dec 29, 2022

Java Collections till the last breadcrumb of memory and performance

Koloboke A family of projects around collections in Java (so far). The Koloboke Collections API A carefully designed extension of the Java Collections

Nov 14, 2022
Owner
Alibaba
Alibaba Open Source
Alibaba
A gulp of low latency Java

SmoothieMap SmoothieMap is a Map implementation for Java with the lowest memory usage and absence of rehash latency spikes. Under the hood, it is a ve

null 278 Oct 31, 2022
An embedded database implemented in pure java based on bitcask which is a log-structured hash table for K/V Data.

Baka Db An embedded database implemented in pure java based on bitcask which is a log-structured hash table for K/V Data. Usage import cn.ryoii.baka.B

ryoii 3 Dec 20, 2021
BioJava is an open-source project dedicated to providing a Java framework for processing biological data.

Welcome to BioJava is an open-source project dedicated to providing a Java framework for processing biological data. It provides analytical and statis

BioJava 513 Dec 31, 2022
High Performance data structures and utility methods for Java

Agrona Agrona provides a library of data structures and utility methods that are a common need when building high-performance applications in Java. Ma

Real Logic 2.5k Jan 5, 2023
LWJGL is a Java library that enables cross-platform access to popular native APIs useful in the development of graphics (OpenGL, Vulkan), audio (OpenAL), parallel computing (OpenCL, CUDA) and XR (OpenVR, LibOVR) applications.

LWJGL - Lightweight Java Game Library 3 LWJGL (https://www.lwjgl.org) is a Java library that enables cross-platform access to popular native APIs usef

Lightweight Java Game Library 4k Dec 29, 2022
SWE5003 - Achitecting Real Time Systems for Data Processing - Code Base

ARTS2022 SWE5003 - Achitecting Real Time Systems for Data Processing (ISS NUS Offering) - Code Base This module is part of the ISS MTech Graduate Cert

Suria R Asai 5 Apr 2, 2022
Hollow is a java library and toolset for disseminating in-memory datasets from a single producer to many consumers for high performance read-only access.

Hollow Hollow is a java library and toolset for disseminating in-memory datasets from a single producer to many consumers for high performance read-on

Netflix, Inc. 1.1k Dec 25, 2022
A high performance caching library for Java

Caffeine is a high performance, near optimal caching library. For more details, see our user's guide and browse the API docs for the latest release. C

Ben Manes 13k Jan 5, 2023
High performance Java implementation of a Cuckoo filter - Apache Licensed

Cuckoo Filter For Java This library offers a similar interface to Guava's Bloom filters. In most cases it can be used interchangeably and has addition

Mark Gunlogson 161 Dec 30, 2022
High Performance Primitive Collections for Java

HPPC: High Performance Primitive Collections Collections of primitive types (maps, sets, stacks, lists) with open internals and an API twist (no java.

Carrot Search 890 Dec 28, 2022