flink-connector-redis

Overview

github forks github stars github contributors

Stargazers over time

Stargazers over time

为什么写这个项目

对比其他的flink-connector-redis ,基本上已经停止维护或者使用不是很方便,因公司也要使用,所以写这个项目。目前还在快速开发中,并且会持续更新中。如果您有什么需要暂时还不支持,可以提交issues .


Features

  • Table API Redis Sink
  • Table API Redis Source(非维度表直接查询)
  • Streaming Redis Sink
  • Streaming Redis Source
  • More Redis Command

Development environment

  • JDK 1.8
  • Flink 1.14.5
  • Jedis 4.2.3

Quick Start

使用方式

<dependency>
    <groupId>io.github.future94</groupId>
    <artifactId>flink-connector-redis</artifactId>
    <version>1.0.SNAPSHOT</version>
</dependency>

支持功能

  • Redis单机模式、主从模式(读写分离)、集群模式
  • Redis Source DIM层维度表查询
  • 自定义Redis编解码器(复杂的value也可以关联到维度表中)
  • 自定义Redis命令数据转换

支持的Redis命令

目前只开发了维表查询,其他命令后续支持。

Sink Source
get
hget

With配置可选项

字段 默认值 类型 说明
connector (无) String 如果使用,固定填写redis
model single String Redis的模式:single(单机)、master_slave(主从/读写分离)、cluster(集群)
password (无) String Redis的密码
single.node (无) String Redis单机模式的地址,格式为ip[:port],如果端口不写默认为6379
master.node (无) String Redis主从模式Master的地址,格式为ip[:port],如果端口不写默认为6379
slave.nodes (无) String Redis主从模式Slave的地址,格式为ip[:port][:weight][,ip:port:weight],支持多个Slave节点(用英文逗号,分隔),如果端口不写默认为6379 ,权重不写默认为1(多节点都为1或相等就是轮询),如果指定权重,按固定格式也要指定端口
cluster.nodes (无) String Redis主从模式Slave的地址,格式为ip[:port][,ip:port] ,如果端口不写默认为6379
command (无) String 运行的Redis命令,使用上面支持的命令
timeout 1000ms Integer 链接超时时间
database 0 Integer 单机模式链接的数据库
max.total 8 Integer 最大链接数
max.idle 8 Integer 最大保持连接数
min.idle 0 Integer 最小保持连接数
key.serializer string String Redis的KEY编解码器,默认使用string方式,通过SPI方式可以更改,下面会说
value.serializer string String Redis的VALUE编解码器,默认使用string方式,通过SPI方式可以更改,下面会说
hash.key (无) String Redis如果使用Hash数据结构,如果要关联的表中没有key的信息,可以在这里指定,这样就可以只通过Field进行关联,类似String结构的key => value

自定义编解码器

通过SPI方式可以对redis的解码方式进行自定义,这非常重要,因为很多情况下维度信息不仅仅是单纯的string,我们通常会存储一些Pojo类,如jsonbyte[] 等格式写入,通过自定义编辑码器我们会更方便的关联到创建的维度表中(不需要严格顺序匹配,后面说)。

自定义方式

resources/META-INF/services/org.apache.flink.connector.redis.table.internal.serializer.RedisSerializer 文件中指定自定义类,该类要实现org.apache.flink.connector.redis.table.internal.serializer.RedisSerializer接口。

public interface RedisSerializer<V> extends Serializable {

    /**
     * 标识符,这个就是您在key.serializer和value.serializer中指定的值
     */
    String identifier();

    /**
     * 序列化
     */
    byte[] serialize(BinaryStringData t) throws SerializationException;

    /**
     * 反序列化
     */
    V deserialize(byte[] bytes) throws SerializationException;

}

自定义命令数据转换

通过SPI方式可以对redis的运行返回的数据转为FlinkTableAPI数据进行自定义,这作用是如果你有自己定制的转换方式或者觉得该框架提供的实现不好或者不方便时可以自己实现(还是欢迎pr)。

自定义方式

resources/META-INF/services/org.apache.flink.connector.redis.table.internal.converter.source.RedisSourceConverter 文件中指定自定义类, 该类要实现org.apache.flink.connector.redis.table.internal.converter.source.RedisSourceConverter接口。 也可以继承org.apache.flink.connector.redis.table.internal.converter.source.BaseRedisSourceConverter抽象类。

public interface RedisCommandToRowConverter {

    /**
     * 支持的命令类型
     */
    RedisCommandType support();

    /**
     * 转换数据
     * @param redisCommand          运行环境
     * @param columnNameList        字段名集合
     * @param columnDataTypeList    字段类型集合
     * @param readOptions           读取参数配置
     * @param keys                  联表Key[]
     * @return 转换的数据
     * @throws Exception            转换失败
     */
    Optional<GenericRowData> convert(final RedisCommand redisCommand, final List<String> columnNameList, final List<DataType> columnDataTypeList, final RedisReadOptions readOptions, final Object[] keys) throws Exception;
}

Quick Start

维度关联

GET

编解码器为string

这个格式是严格按照顺序进行匹配的(因为只有key value两个值),维度表第一个值为key,维度表第二个值为value,如果大于两个字典,其他值也全部为null

public class ExampleSingleTest {
    /**
     * <p>Redis使用GET命令匹配,存储的值为String类型
     *
     * <pre>redis数据的格式:
     *          key =>  value
     *          5   =>  123456
     *          6   =>  1234567
     */
    @Test
    public void getString() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        // name为get的key,login_time为get拿到的value
        String dim = "create table dim_table(name varchar, login_time time(3) ) with ( " +
                "'connector'='redis', " +
                "'model'='single', " +
                "'single.node'='192.168.10.14:6379', " +
                "'password'='password', " +
                "'database'='5', " +
                "'command'='get')";
        String source = "create table source_table(username varchar, level varchar, proctime as procTime()) "
                + "with ('connector'='datagen',  'rows-per-second'='1', "
                + "'fields.username.kind'='sequence',  'fields.username.start'='5',  'fields.username.end'='6',"
                + "'fields.level.kind'='sequence',  'fields.level.start'='15',  'fields.level.end'='16'"
                + ")";
        tEnv.executeSql(source);
        tEnv.executeSql(dim);
        // 通过redis的key进行关联(d.name)
        String sql = " select s.username, s.level,  d.login_time from source_table s"
                + "  left join dim_table for system_time as of s.proctime as d "
                + " on d.name = s.username";
        Table table = tEnv.sqlQuery(sql);
        tEnv.toDataStream(table).print();
        env.execute("getString");
    }
}

自定义编解码器,用json举例。

这个格式是不按照顺序进行匹配的

  • 如果反序列化的POJO与维度表字段相同,则会按POJO字段名字与Table字段名对应解析,这时候会没有key的信息
  • 如果反序列化的POJO与维度表字段不相同,则Table表第一个字段值为key,其他会按POJO字段名字与Table字段名对应解析,如果有其他值也全部为null。

resources/META-INF/services/org.apache.flink.connector.redis.table.internal.serializer.RedisSerializer文件中添加下面。

org.apache.flink.connector.redis.table.internal.serializer.JsonListRedisSerializer
public class JsonRedisSerializer implements RedisSerializer<JsonTestDTO> {

    private static final String IDENTIFIER = "json";

    @Override
    public String identifier() {
        return IDENTIFIER;
    }

    @Override
    public byte[] serialize(BinaryStringData t) throws SerializationException {
        return new Gson().toJson(t.toString()).getBytes(StandardCharsets.UTF_8);
    }

    @Override
    public JsonTestDTO deserialize(byte[] bytes) throws SerializationException {
        return new Gson().fromJson(new String(bytes, StandardCharsets.UTF_8), JsonTestDTO.class);
    }
}

public class JsonTestDTO {

    private String desc;

    private Integer login_time;

    private String title;
}

具体使用

public class ExampleSingleTest {
    /**
     * <p>Redis使用GET命令匹配,存储的值为Json类型
     *
     * <pre>redis数据的格式:
     *             key  =>  value
     *             25   =>  {"login_time":123456,"title":"wei","desc":"lai"}
     */
    @Test
    public void getJson() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        // name为key值,title、login_time、desc为反序列化解析出的值
        String dim = "create table dim_table(" +
                "name varchar, title varchar, login_time time(3), desc varchar ) with ( " +
                "'connector'='redis', " +
                "'model'='single', " +
                "'single.node'='192.168.10.14:6379', " +
                "'password'='password', " +
                "'database'='5', " +
                "'command'='get', " +
                "'value.serializer'='json')";
        String source = "create table source_table(username varchar, level varchar, proctime as procTime()) "
                + "with ('connector'='datagen',  'rows-per-second'='1', "
                + "'fields.username.kind'='sequence',  'fields.username.start'='25',  'fields.username.end'='26',"
                + "'fields.level.kind'='sequence',  'fields.level.start'='35',  'fields.level.end'='36')";
        tEnv.executeSql(source);
        tEnv.executeSql(dim);
        // 通过redis的key进行关联(d.name)
        String sql = " select s.username, d.desc, s.level, d.login_time, d.title from source_table s"
                + "  left join dim_table for system_time as of s.proctime as d "
                + " on d.name = s.username";
        Table table = tEnv.sqlQuery(sql);
        tEnv.toDataStream(table).print();
        env.execute("getJson");
    }
}

HGET

编解码器为string

这个格式是严格按照顺序进行匹配的,关联可以通过ON指定两个条件,也可以通过配置指定hash.key只通过一个ON条件关联field。 如果维度信息在不同的hashKey中,那么您必须指定两个ON条件才能分别找到,如果都在一个hash中,则可以省略。

  • 如果维度表有两个字段,那么第一个值为field,维度表第二个值为value。
  • 如果维度表有三个字段,那么第一个值为field,维度表第二个值为value。
  • 多余三个字段,多余的值全部为null。
public class ExampleSingleTest {
    /**
     * <p>Redis使用HGET命令匹配,存储的值为String类型,ON的条件有两个,并需要用HASH的KEY进行匹配
     *
     * <pre>redis数据的格式:
     *          key => field =>  value
     *          45  => 55    =>  wei
     */
    @Test
    public void hGetStringHasHashKey() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        // 这里指定了3个字段,所以key为name,field为level,value为desc
        // 也可以指定2个字段(level varchar, desc varchar),则field为level,value为desc
        String dim = "create table dim_table(name varchar, level varchar, desc varchar) with ( " +
                "'connector'='redis', " +
                "'model'='single', " +
                "'single.node'='192.168.10.14:6379', " +
                "'password'='password', " +
                "'database'='5', " +
                "'command'='hget')";
        String source = "create table source_table(username varchar, level varchar, proctime as procTime()) "
                + "with ('connector'='datagen',  'rows-per-second'='1', "
                + "'fields.username.kind'='sequence',  'fields.username.start'='45',  'fields.username.end'='46',"
                + "'fields.level.kind'='sequence',  'fields.level.start'='55',  'fields.level.end'='56'"
                + ")";
        tEnv.executeSql(source);
        tEnv.executeSql(dim);
        // 这里指定了两个条件(d.name = s.username and d.level = s.level)
        // 也可以指定一个条件(d.level = s.level),然后通过with参数指定hash的key
        String sql = " select s.username, s.level, d.name, d.desc from source_table s"
                + "  left join dim_table for system_time as of s.proctime as d "
                + " on d.name = s.username and d.level = s.level";
        Table table = tEnv.sqlQuery(sql);
        tEnv.toDataStream(table).print();
        env.execute("hGetStringHasHashKey");
    }
}

自定义编解码器,用json举例。

这个格式是不按照顺序进行匹配的,key和信息可以通过with的hash.key指定,也可以通过两个ON条件。

  • 如果反序列化的POJO与维度表字段相同,则会按POJO字段名字与Table字段名对应解析,这时候会没有key和field的信息
  • 如果反序列化的POJO与维度表字段不相同
    • 维度表字段大于POJO一个,则Table表第一个字段值为field,其他会按POJO字段名字与Table字段名对应解析,这时候会没有key的信息。如果有其他值也全部为null。
    • 维度表字段大于POJO两个,则Table表第一个字段值为key第二个字段值为field,其他会按POJO字段名字与Table字段名对应解析,如果有其他值也全部为null。
    • 其他情况RuntimeException
public class ExampleSingleTest {

    /**
     * <p>Redis使用HGET命令匹配,存储的值为Json类型,ON的条件有两个,并需要用HASH的KEY进行匹配
     *
     * <pre>redis数据的格式:
     *             key => field =>  value
     *             65  => 75    =>  {"login_time":123456,"title":"wei","desc":"lai"}
     */
    @Test
    public void hGetJsonHasHashKey() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        String dim = "create table dim_table(name varchar, level varchar, title varchar, login_time time(3), desc varchar) with ( " +
                "'connector'='redis', " +
                "'model'='single', " +
                "'single.node'='192.168.10.14:6379', " +
                "'password'='password', " +
                "'database'='5', " +
                "'value.serializer'='json'," +
                "'command'='hget')";
        String source = "create table source_table(username varchar, level varchar, proctime as procTime()) "
                + "with ('connector'='datagen',  'rows-per-second'='1', "
                + "'fields.username.kind'='sequence',  'fields.username.start'='65',  'fields.username.end'='66',"
                + "'fields.level.kind'='sequence',  'fields.level.start'='75',  'fields.level.end'='76'"
                + ")";
        tEnv.executeSql(source);
        tEnv.executeSql(dim);
        String sql = " select s.username, s.level, d.name, d.login_time, d.level, d.desc, d.title from source_table s"
                + "  left join dim_table for system_time as of s.proctime as d "
                + " on d.name = s.username and d.level = s.level";
        Table table = tEnv.sqlQuery(sql);
        tEnv.toDataStream(table).print();
        env.execute("hGetStringHasHashKey");
    }
}
You might also like...

Open data platform based on flink. Now scaleph is supporting data integration with seatunnel on flink

scaleph The Scaleph project features data integration, develop, job schedule and orchestration and trys to provide one-stop data platform for develope

Jan 3, 2023

hibernate redis 二级缓存使用 spring boot redis 配置

hibernate-redisson-spring-boot-starter 介绍 使用 Redisson 作为 hibernate redis 二级缓存提供器,redisson-hibernate-53(hibernate-version) 无法使用 spring boot 配置文件创建的 org

Jul 3, 2022

基于Apache-bahir-kudu-connector的flink-connector-kudu,支持Flink1.11.x DynamicTableSource/Sink,支持Range分区等

基于Apache-Bahir-Kudu-Connector改造而来的满足公司内部使用的Kudu Connector,支持特性Range分区、定义Hash分桶数、支持Flink1.11.x动态数据源等,改造后已贡献部分功能给社区。

Jul 27, 2022

A tool based on mysql-connector to simplify the use of databases, tables & columns

A tool based on mysql-connector to simplify the use of databases, tables & columns

Description A tool based on mysql-connector to simplify the use of databases, tables & columns. This tool automatically creates the databases & tables

Nov 17, 2022

Amazon AppFlow Custom JDBC Connector example

 Amazon AppFlow Custom JDBC Connector example

Amazon AppFlow Custom JDBC Connector example This project contains source code and supporting files that implements Amazon Custom Connector SDK and re

Oct 26, 2022

A sidecar to run alongside Trino to gather metrics using the JMX connector and expose them in different formats using Apache velocity

Overview A sidecar to run alongside Trino to gather metrics using the JMX connector and expose them in different formats using Apache Velocity. Click

Nov 18, 2021

Example Project which uses spark mongo connector !

mongo-spark-connector-springboot Example Project which uses spark mongo connector to read/aggregate & convert into Spark DataSet/Java RDDs Connects to

Dec 6, 2022

GalaxyGlue is an extension to MySQL Connector/J 8.0.

中文文档 What is ApsaraDB GalaxyGlue ? GalaxyGlue is an extension to MySQL Connector/J 8.0. GalaxyGlue uses asynchronous requests, session and connection

Dec 19, 2022

Apache Flink

Apache Flink Apache Flink is an open source stream processing framework with powerful stream- and batch-processing capabilities. Learn more about Flin

Jan 5, 2023

Apache Flink

Apache Flink Apache Flink is an open source stream processing framework with powerful stream- and batch-processing capabilities. Learn more about Flin

Jan 5, 2023

Flink Demo

Flink Demo

flink-demo minimum code just run flink-ds-connector DataStream API usage kafka es jdbc file row string parquet avro avro custom avro flink-sql-connect

Dec 4, 2022

DataLink is a new open source solution to bring Flink development to data center.

DataLink is a new open source solution to bring Flink development to data center.

DataLink 简介 DataLink 是一个创新的数据中台解决方案,它基于 SpringCloud Alibaba 和 Apache Flink 实现。它使用了时下最具影响力的实时计算框架Flink,而且紧跟社区发展,试图只通过一种计算框架来解决离线与实时的问题,实现Sql语义化的批流一体,帮助

Dec 28, 2022

DataLink is a new open source solution to bring Flink development to data center.

DataLink is a new open source solution to bring Flink development to data center.

DataLink 简介 DataLink 是一个创新的数据中台解决方案,它基于 SpringCloud Alibaba 和 Apache Flink 实现。它使用了时下最具影响力的实时计算框架Flink,而且紧跟社区发展,试图只通过一种计算框架来解决离线与实时的问题,实现Sql语义化的批流一体,帮助

Dec 22, 2021

Table-Computing (Simplified as TC) is a distributed light weighted, high performance and low latency stream processing and data analysis framework. Milliseconds latency and 10+ times faster than Flink for complicated use cases.

Table-Computing Welcome to the Table-Computing GitHub. Table-Computing (Simplified as TC) is a distributed light weighted, high performance and low la

Oct 14, 2022

FLiP: StreamNative: Cloud-Native: Streaming Analytics Using Apache Flink SQL on Apache Pulsar

FLiP: StreamNative: Cloud-Native: Streaming Analytics Using Apache Flink SQL on Apache Pulsar

StreamingAnalyticsUsingFlinkSQL FLiP: StreamNative: Cloud-Native: Streaming Analytics Using Apache Flink SQL on Apache Pulsar Running on NVIDIA XAVIER

Dec 19, 2021

Dagger is an easy-to-use, configuration over code, cloud-native framework built on top of Apache Flink for stateful processing of real-time streaming data.

Dagger Dagger or Data Aggregator is an easy-to-use, configuration over code, cloud-native framework built on top of Apache Flink for stateful processi

Dec 22, 2022

A FlinkSQL studio and real-time computing platform based on Apache Flink

A FlinkSQL studio and real-time computing platform based on Apache Flink

Dinky 简介 实时即未来,Dinky 为 Apache Flink 而生,让 Flink SQL 纵享丝滑,并致力于实时计算平台建设。 Dinky 架构于 Apache Flink,增强 Flink 的应用与体验,探索流式数仓。即站在巨人肩膀上创新与实践,Dinky 在未来批流一体的发展趋势下潜

Dec 30, 2022
Owner
invalley
invalley
A tool based on mysql-connector to simplify the use of databases, tables & columns

Description A tool based on mysql-connector to simplify the use of databases, tables & columns. This tool automatically creates the databases & tables

nz 6 Nov 17, 2022
Amazon AppFlow Custom JDBC Connector example

Amazon AppFlow Custom JDBC Connector example This project contains source code and supporting files that implements Amazon Custom Connector SDK and re

AWS Samples 6 Oct 26, 2022
🚀flink-sql-submit is a custom SQL submission client

??flink-sql-submit is a custom SQL submission client This is a customizable extension of the client, unlike flink's official default client.

ccinn 3 Mar 28, 2022
A blazingly small and sane redis java client

Jedis Jedis is a blazingly small and sane Redis java client. Jedis was conceived to be EASY to use. Jedis is fully compatible with redis 2.8.x, 3.x.x

Redis 10.8k Dec 31, 2022
A blazingly small and sane redis java client

Jedis Jedis is a blazingly small and sane Redis java client. Jedis was conceived to be EASY to use. Jedis is fully compatible with redis 2.8.x, 3.x.x

Redis 10.9k Jan 8, 2023
A practical example to showcase Redis Streams and RediSearch in action

Redis Streams in Action A practical example to showcase Redis Streams in action. The goal is to process Twitter data in real-time for search and query

Abhishek Gupta 35 Dec 19, 2022
Flink Connector for Apache Doris(incubating)

Flink Connector for Apache Doris (incubating) Flink Doris Connector More information about compilation and usage, please visit Flink Doris Connector L

The Apache Software Foundation 115 Dec 20, 2022
Flink CDC Connectors is a set of source connectors for Apache Flink

Flink CDC Connectors is a set of source connectors for Apache Flink, ingesting changes from different databases using change data capture (CDC). The Flink CDC Connectors integrates Debezium as the engine to capture data changes.

null 6 Mar 23, 2022
Flink Table Store is a unified streaming and batch store for building dynamic tables on Apache Flink

Flink Table Store is a unified streaming and batch store for building dynamic tables on Apache Flink

The Apache Software Foundation 366 Jan 1, 2023