Flink Table Store is a unified streaming and batch store for building dynamic tables on Apache Flink

Overview

FLink Table Store

Flink Table Store is a unified streaming and batch store for building dynamic tables on Apache Flink.

Flink Table Store is developed under the umbrella of Apache Flink.

Building the Project

Run the mvn clean package command.

Then you will find a JAR file that contains your application, plus any libraries that you may have added as dependencies to the application: target/ - .jar .

Contributing

You can learn more about how to contribute on the Apache Flink website. For code contributions, please read carefully the Contributing Code section for an overview of ongoing community work.

License

The code in this repository is licensed under the Apache Software License 2.

Comments
  • [FLINK-27626] Introduce AggregatuibMergeFunction

    [FLINK-27626] Introduce AggregatuibMergeFunction

    We can introduce more powerful merge strategies, such as merges that support pre-aggregation. demand comes from https://summer-ospp.ac.cn/#/org/prodetail/225120149 This is an early version that only supports aggregate function usage of all columns Follow up to improve the function as needed, and use different MergeFunctions flexibly on different columns

    opened by yegetables 12
  • [FLINK-29252]Support create table-store table with 'connector'='table-store'

    [FLINK-29252]Support create table-store table with 'connector'='table-store'

    [FLINK-29252]Support create table-store table with 'connector'='table-store' sink to table-store:

    SET 'execution.checkpointing.interval' = '10 s';
    CREATE TEMPORARY TABLE word_table (
        word STRING
    ) WITH (
        'connector' = 'datagen',
        'fields.word.length' = '1'
    );
    CREATE TABLE word_count (
        word STRING PRIMARY KEY NOT ENFORCED,
        cnt BIGINT
    ) WITH(
      'connector' = 'table-store',
      'catalog-name' = 'test-catalog',
      'default-database' = 'test-db',  //should rename 'catalog-database'?
      'catalog-table' = 'test-tb',
      'warehouse'='file:/tmp/table_store'
    );
    INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word; 
    

    source from table-store:

    SET 'execution.checkpointing.interval' = '10 s';
    CREATE TABLE word_count (
        word STRING PRIMARY KEY NOT ENFORCED,
        cnt BIGINT
    ) WITH(
      'connector' = 'table-store',
      'catalog-name' = 'test-catalog',
      'default-database' = 'test-db',
      'catalog-table' = 'test-tb',
      'warehouse'='file:/tmp/table_store'
    );
    CREATE TEMPORARY TABLE word_table (
        word STRING
    ) WITH (
        'connector' = 'print'
    );
    INSERT INTO word_table SELECT word FROM word_count;
    
    opened by MOBIN-F 9
  • [FLINK-27502] Add file Suffix to data files

    [FLINK-27502] Add file Suffix to data files

    What is the purpose of the change

    Add file Suffix to data files, close FLINK-27502

    Verifying this change

    org.apache.flink.table.store.format.FileFormatSuffixTest

    opened by hililiwei 6
  • [FLINK-26217] Introduce manifest.merge-min-count in commit

    [FLINK-26217] Introduce manifest.merge-min-count in commit

    Changes in this PR:

    • Introduce manifest.merge-min-count option in FileStoreOptions.java
    • Update merge function in ManifestFileMata.java to take that option
    opened by shenzhu 6
  • [FLINK-29983] Fix Table Store with Hive3 lacking hive-standalone-metastore dependency

    [FLINK-29983] Fix Table Store with Hive3 lacking hive-standalone-metastore dependency

    For Hive3, org.apache.hadoop.hive.metastore.api is moved to org.apache.hive:hive-standalone-metastore. We should shade this package as well to avoid ClassNotFoundException

    opened by LadyForest 5
  • [FLINK-29823] Support to get schema of table snapshot

    [FLINK-29823] Support to get schema of table snapshot

    Currently we can only query the latest snapshot data from table store. We need to support reading the data of the specified snapshot, in this way, we can read historical versions or compare data across versions as needed. This PR aims to support to get schema for table with given snapshot id as follows and 10 is the snapshot id: SHOW CREATE TABLE word_count$snapshot$10

    One can get the columns for table word_count with snapshot 10, and then can build query on the snapshot.

    opened by zjureel 5
  • [FLINK-27307] Flink table store support append-only ingestion without primary keys.

    [FLINK-27307] Flink table store support append-only ingestion without primary keys.

    This PR is trying to provide table store the ability to accept append-only ingestion without any defined primary keys.

    The previous table store abstraction are built on top of primary keys, so in theory all the read & write path will need to be reconsidered or refactored, so that we can abstract the correct API which works fine for both primary keys storage and immutable logs (without primary keys).

    The current version is a draft PR (Actually, I'm not quite familiar with the flink-table-store project before, so I'm trying to implement this append-only abstraction to understand the API & implementation better).

    There are TODO issues that I didn't consider clearly in this PRs ( I think I will need the next update to address those things):

    1. The append-only table's file level statistics are quite different with the primary key tables. For example, the primary key tables will generate a collection of SstFileMeta when calling the writer#prepareCommit(), and then accomplish the first stage commit in the flink's two-phrase commit. The SstFileMeta will include the statistics for both key fields and value fields, while in the append-only table we don't have any key fields (its statistic information should include all columns' max-min, count etc.) . So in theory, we are required to abstract the common file level statistic informations data structure for both two kinds of table;

    2. The different manifests design for both two kinds of tables.

    3. What's the read API abstraction for those two kinds of tables. I still don't have a clearly propose for it. Will try to update this PR for this.

    opened by openinx 5
  • [FLINK-27843] Schema evolution for data file meta

    [FLINK-27843] Schema evolution for data file meta

    Currently, the table store uses the latest schema id to read the data file meta. When the schema evolves, it will cause errors, for example:

    1. the schema of underlying data is [1->a, 2->b, 3->c, 4->d] and schema id is 0, where 1/2/3/4 is field id and a/b/c/d is field name
    2. After schema evolution, schema id is 1, and the new schema is [1->a, 3->c, 5->f, 6->b, 7->g]

    When table store reads the field stats from data file meta, it should mapping schema 1 to 0 according to their field ids.

    This PR will read and parse the data according to the schema id in the meta file when reading the data file meta, and create index mapping from the table schema and the meta schema, so that the table store can read the correct file meta data through its latest schema.

    The main codes are as follows:

    1. Added SchemaFieldTypeExtractor to extract key fields for ChangelogValueCountFileStoreTable and ChangelogWithKeyFileStoreTable
    2. Added SchemaEvolutionUtil to create index mapping from table schema to meta file schema
    3. Updated FieldStatsArraySerializer to read field stats with given index mapping

    The main tests include:

    1. Added SchemaEvolutionUtilTest to create index mapping between two schemas.
    2. Added FieldStatsArraSerializerTest to read meta from table schema
    3. Added AppendOnlyTableFileMetaFilterTest, ChangelogValueCountFileMetaFilterTest and ChangelogWithKeyFileMetaFilterTest to filter old field, new field, partition field and primary key in data file meta in table scan.
    opened by zjureel 4
  • [FLINK-29964] Support Spark/Hive with OSS

    [FLINK-29964] Support Spark/Hive with OSS

    Currently, we rely on Flink's plugin mechanism to initialize Filesystem, which does not apply to other engines like Spark/Hive. This PR resolves

    • Invoke Filesystem#initialize to configure the target filesystem (including jar dependency and conf parameters)
    • For Spark, convert case insensitive conf to case sensitive conf (special handling for oss)
    • For Hive, let AK/endpoint can be configured via the SET command
    • Tested upon Spark3.3 & Hive3.1, found sorts of class conflicts then decided to shade oss to ease use. So introduce a submodule flink-table-store-filesystem to shade fs-oss/fs-s3(in the next pr)
    • Add readme

    Towards the E2E test, I'm unsure whether we can put AK info directly since the oss server is hard to mock. (maybe GitHub CI can support passing some secret values as env variable, then we can use credential class instead of AK text) For the next pr (which supports s3, there will be an E2E test using minio docker to serve like an s3 server)

    opened by LadyForest 4
  • [FLINK-29636] Add micro benchmarks module

    [FLINK-29636] Add micro benchmarks module

    In this PR, we add micro benchmarks module in flink table store. The module contains sets of micro benchmarks designed to run on a single machine to help flink table store developers assess performance implications of their changes. We add two benchmarks in this PR

    1. Merge tree reader benchmark To performance the scan operation in MergeTreeReader, we first write 50*50000 records by MergeTreeWriter and scan all the data by MergeTreeReader, then we collect the latency of each scan.
    2. Merge tree writer benchmark To performance the write operation in MergeTreeWriter, we statistics throughput of MergeTreeWriter.write with compaction.

    We rename flink-table-store-benchmark to flink-table-store-cluster-benchmark and add new module named flink-table-store-micro-benchmarks

    opened by zjureel 4
  • [FLINK-27336] Avoid merging when there is only one record

    [FLINK-27336] Avoid merging when there is only one record

    If there is just one record, still use MergeFunction to merge. This is not necessary, just output directly.

    The brief change log

    • Updates the merge in the SortBufferMemTable and SortMergeReader to output directly if there is only one record, and use MergeFunction to merge in the case of multiple records.
    opened by SteNicholas 4
  • [FLINK-27958] Compare batch maxKey to reduce comparisons in SortMergeReader

    [FLINK-27958] Compare batch maxKey to reduce comparisons in SortMergeReader

    Currently the SortMergeReader will compare and sort the readers after reading one batch from them to ensure that the sequence is correct. The readers are created from SortedRun list and the key ranges of them may be disjoint. We can compare batch minKey and maxKey for each read in the files of SortedRun list and divide them to multiple regions. When there's only one reader in the region, it can read data directly without compare and sort.

    So the main changes are as follows:

    1. Add SortedRegionDataRecordReader class which can create a reader with minKey and maxKey from each file in SortedRun
    2. Add RecordReaderSubRegion class which includes SortedRegionDataRecordReader list, it is created from one SortedRun
    3. Add RecordReaderRegionManager to divide RecordReaderSubRegion list into multiple RecordReaderRegion, each RecordReaderRegion manages its own RecordReaderSubRegion list and the key range in different RecordReaderRegions are disjoint
    4. Create SortMergeReader from each RecordReaderRegion to reduce the comparisons in different RecordReaderRegions. If the RecordReaderRegion has only one reader, using the specify reader directly

    Test cases RecordReaderRegionTest and RecordReaderRegionManagerTest are added to test the new classes, the SortMergeReader and related classes are tested in MergeTreeTest

    opened by zjureel 5
  • [FLINK-27103] Don't store redundant primary key fields

    [FLINK-27103] Don't store redundant primary key fields

    ChangelogWithKeyFileStoreTable currently stores the primary key redundantly in the file, which could directly use the primary key field in the original fields to avoid redundant storage.

    The brief change log

    • The primaryKey of SinkRecord is set to null for ChangelogWithKeyFileStoreTable and KeyValueSerializer reads the primary key from the value.
    opened by SteNicholas 0
Owner
The Apache Software Foundation
The Apache Software Foundation
Apache ORC - the smallest, fastest columnar storage for Hadoop workloads

Apache ORC ORC is a self-describing type-aware columnar file format designed for Hadoop workloads. It is optimized for large streaming reads, but with

The Apache Software Foundation 576 Jan 2, 2023
gRPC and protocol buffers for Android, Kotlin, and Java.

Wire “A man got to have a code!” - Omar Little See the project website for documentation and APIs. As our teams and programs grow, the variety and vol

Square 3.9k Dec 23, 2022
This repository contains codes for various data structures and algorithms in C, C++, Java, Python, C#, Go, JavaScript and Kotlin.

Overview The goal of this project is to have codes for various data structures and algorithms - in C, C++, Java, Python, C#, Go, JavaScript and Kotlin

Manan 25 Mar 2, 2022
This repository contains all the Data Structures and Algorithms concepts and their implementation in several ways

An Open-Source repository that contains all the Data Structures and Algorithms concepts and their implementation in several ways, programming questions and Interview questions. The main aim of this repository is to help students who are learning Data Structures and Algorithms or preparing for an interview.

Pranay Gupta 691 Dec 31, 2022
A big, fast and persistent queue based on memory mapped file.

Big Queue A big, fast and persistent queue based on memory mapped file. Notice, bigqueue is just a standalone library, for a high-throughput, persiste

bulldog 520 Dec 30, 2022
Union, intersection, and set cardinality in loglog space

HyperMinHash-java A Java implementation of the HyperMinHash algorithm, presented by Yu and Weber. HyperMinHash allows approximating set unions, inters

LiveRamp 48 Sep 22, 2022
A lightning fast, transactional, file-based FIFO for Android and Java.

Tape by Square, Inc. Tape is a collection of queue-related classes for Android and Java. QueueFile is a lightning-fast, transactional, file-based FIFO

Square 2.4k Dec 30, 2022
Popular Algorithms and Data Structures implemented in popular languages

Algos Community (college) maintained list of Algorithms and Data Structures implementations. Implemented Algorithms Algorithm C CPP Java Python Golang

IIIT Vadodara Open Source 1k Dec 28, 2022
A repository that contains Data Structure and Algorithms coded on Java

A repository that contains Data Structure and Algorithms coded on Java . It will also contain solutions of questions from Leetcode.

Akshat Gupta 6 Oct 15, 2022
🎓☕ Repository of lessons and exercises from loiane.training's course on data structure with Java

☕ Curso estrutura de dados com Java by @loiane.training Repositório com as aulas e exercícios do curso de estrutura de dados com Java da loiane.traini

Leticia Campos 2 Feb 1, 2022
Algorithm and Data Structrue

SWE241P Algorithm and Data Structure Ex1 TreeSet with Red-Black Tree HashSet LinkedList Set Ex 2 Selection Sort Insertion Sort Heap Sort Merge Sort Qu

Tiger Liu 4 Apr 13, 2022
Worker-queue implementation on top of Java and database

Database Queue Library provides worker-queue implementation on top of Java and database. Fintech company YooMoney uses db-queue in cases where reliabi

null 17 Dec 12, 2022
Data structures and algorithms exercises in java

Data structure and algorithms in Java About The Project [] In this repository you can find examples of data structure exercises solved in java and som

Luis Perez Contreras 1 Nov 25, 2021
Data structures & algorithms implemented in Java and solutions to leetcode problems.

Hello, World! ?? Hey everyone, I'm Sharad ☃ , and I'm a Software Engineer ?? at eGain! This repository ?? is all about data structures & algorithms an

Sharad Dutta 16 Dec 16, 2022
Immutable key/value store with efficient space utilization and fast reads. They are ideal for the use-case of tables built by batch processes and shipped to multiple servers.

Minimal Perfect Hash Tables About Minimal Perfect Hash Tables are an immutable key/value store with efficient space utilization and fast reads. They a

Indeed Engineering 92 Nov 22, 2022
FLiP: StreamNative: Cloud-Native: Streaming Analytics Using Apache Flink SQL on Apache Pulsar

StreamingAnalyticsUsingFlinkSQL FLiP: StreamNative: Cloud-Native: Streaming Analytics Using Apache Flink SQL on Apache Pulsar Running on NVIDIA XAVIER

Timothy Spann 5 Dec 19, 2021
OBKV Table Client is Java Library that can be used to access table data from OceanBase storage layer.

OBKV Table Client OBKV Table Client is Java Library that can be used to access table data from OceanBase storage layer. Its access method is different

OceanBase 12 Dec 16, 2022
A distributed data integration framework that simplifies common aspects of big data integration such as data ingestion, replication, organization and lifecycle management for both streaming and batch data ecosystems.

Apache Gobblin Apache Gobblin is a highly scalable data management solution for structured and byte-oriented data in heterogeneous data ecosystems. Ca

The Apache Software Foundation 2.1k Jan 4, 2023
Dagger is an easy-to-use, configuration over code, cloud-native framework built on top of Apache Flink for stateful processing of real-time streaming data.

Dagger Dagger or Data Aggregator is an easy-to-use, configuration over code, cloud-native framework built on top of Apache Flink for stateful processi

Open DataOps Foundation 238 Dec 22, 2022