基于Apache-bahir-kudu-connector的flink-connector-kudu,支持Flink1.11.x DynamicTableSource/Sink,支持Range分区等

Overview

Kudu Connector

  • 基于Apache-Bahir-Kudu-Connector改造而来的满足公司内部使用的Kudu Connector,支持特性Range分区、定义Hash分桶数、支持Flink1.11.x动态数据源等,改造后已贡献部分功能给社区。

使用姿势

  • clone代码后,改造pom项目坐标后上传公司私服使用

Kudu Catalog使用

创建Catalog

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
catalog = new KuduCatalog("cdh01:7051,cdh02:7051,cdh03:7051");
tableEnv = KuduTableTestUtils.createTableEnvWithBlinkPlannerStreamingMode(env);
tableEnv.registerCatalog("kudu", catalog);
tableEnv.useCatalog("kudu");

Catalog API

// dropTable
 catalog.dropTable(new ObjectPath("default_database", "test_Replice_kudu"), true);
 // 通过catalog操作表
tableEnv.sqlQuery("select * from test");
tableEnv.executeSql("drop table test");
tableEnv.executeSql("insert into testRange values(1,'hsm')");

FlinkSQL

KuduTable Properties

  • 通过connector.typeconnector区分使用TableSourceFactory还是KuduDynamicTableSource
kudu.table=指定映射的kudu表
kudu.masters=指定的kudu master地址
kudu.hash-columns=指定的表的hash分区键,多个使用","分割
kudu.replicas=kudu tablet副本数,默认为3
kudu.hash-partition-nums=hash分区的桶个数,默认为2 * replicas
kudu.range-partition-rule=range分区规则,rangeKey#leftValue,RightValue:rangeKey#leftValue1,RightValue1,rangeKey必须为主键
kudu.primary-key-columns=kudu表主键,多个实用","分割,主键定义必须有序
kudu.lookup.cache.max-rows=kudu时态表缓存最大缓存行,默认为不开启
kudu.lookup.cache.ttl=kudu时态表cache过期时间
kudu.lookup.max-retries=时态表join时报错重试次数,默认为3

Flink1.10.x版本

CREATE TABLE TestTableTableSourceFactory (
  first STRING,
  second STRING,
  third INT NOT NULL
) WITH (
  'connector.type' = 'kudu',
  'kudu.masters' = '...',
  'kudu.table' = 'TestTable',
  'kudu.hash-columns' = 'first',
  'kudu.primary-key-columns' = 'first,second'
)

Flink1.11.x版本

CREATE TABLE TestTableKuduDynamicTableSource (
  first STRING,
  second STRING,
  third INT NOT NULL
) WITH (
  'connector' = 'kudu',
  'kudu.masters' = '...',
  'kudu.table' = 'TestTable',
  'kudu.hash-columns' = 'first',
  'kudu.primary-key-columns' = 'first,second'
)

DataStream使用

  • DataStream使用方式具体查看bahir-flink官方,目前对于数仓工程师使用场景偏少。

版本迭代

1.1版本Feature

  • 增加Hash分区bucket属性配置,通过kudu.hash-partition-nums配置
  • 增加Range分区规则,支持Hash和Range分区同时使用,通过参数kudu.range-partition-rule 配置,规则格式如:range分区规则,rangeKey#leftValue,RightValue:rangeKey#leftValue1,RightValue1
  • 增加Kudu时态表支持,通过kudu.lookup.*相关函数控制内存数据的大小和TTL
 /**
     * lookup缓存最大行数
     */
  public static final String KUDU_LOOKUP_CACHE_MAX_ROWS = "kudu.lookup.cache.max-rows";
    /**
     * lookup缓存过期时间
     */
    public static final String KUDU_LOOKUP_CACHE_TTL = "kudu.lookup.cache.ttl";
    /**
     * kudu连接重试次数
     */
    public static final String KUDU_LOOKUP_MAX_RETRIES = "kudu.lookup.max-retries";

实现机制

  • 自定义KuduLookupFunction,使得KuduTableSource实现LookupableTableSource接口将自定义LookupFunction 返回已提供时态表的功能,底层缓存没有使用Flink JDBCGuava Cache而是使用效率更高的Caffeine Cache使得其缓存效率更高,同时也减轻了因大量请求为Kudu带来的压力

未来展望

当前问题

  1. SQL语句主键无法自动推断

目前基于Apache Bahir Kudu Connector增强的功能主要是为了服务公司业务,在使用该版本的connector也遇到了问题,SQL的主键无法自动推断导致数据无法直接传递到下游,内部通过天宫引擎通过Flink Table APIsqlQuery方法将结果集查询为一个Table对象,然后将Table转换为DataStream>撤回流,最终通过Kudu Connector提供的KuduSinkUpsertOperationMapper对象将撤回流输出到Kudu中。

后续计划

  • 计划提供动态数据源来解决这一问题,将Flink 1.11.x之前的KuduTableSource/KuduTableSink改造为DynamicSource/Sink接口实现Source/Sink,以此解决主键推断问题。

1.2版本Feature

  • 改造支持Flink 1.11.x之后的DynamicSource/Sink,以此解决SQL语句主键无法推断问题,支持流批JOIN功能的SQL语句方式,无需在通过转换成DataStream的方式进行多表Join操作。
  • 内嵌Metrics上报机制,通过对Flink动态工厂入口处对操作的kudu表进行指标埋点,从而更加可视化的监控kudu表数据上报问题。
Comments
  • 配置 'connector' = 'kudu' 写入数据异常

    配置 'connector' = 'kudu' 写入数据异常

    我进行如下配置,不能写入数据: 'connector' = 'kudu', 'kudu.masters' = 'xxx1,xxx2,xxx3', 'kudu.table' = 'impala::g2link_stream.kudu_g2park_inout_record', 'kudu.hash-columns' = 'park_code', 'kudu.primary-key-columns' = 'inout_id'

    异常如下: Row error for primary key="1704794203843944449", tablet=null, server=32df83c9ff4a472cb5f0d0abaf1c3c56, status=Not found: key not found (error 0) Row error for primary key="1704794203843944449", tablet=null, server=32df83c9ff4a472cb5f0d0abaf1c3c56, status=Not found: key not found (error 0)

    at org.colloh.flink.kudu.connector.internal.failure.DefaultKuduFailureHandler.onFailure(DefaultKuduFailureHandler.java:37) ~[flink-connector-kudu_2.11-1.2.1.jar:?]
    at org.colloh.flink.kudu.connector.internal.writer.KuduWriter.checkAsyncErrors(KuduWriter.java:156) ~[flink-connector-kudu_2.11-1.2.1.jar:?]
    at org.colloh.flink.kudu.connector.internal.writer.KuduWriter.write(KuduWriter.java:97) ~[flink-connector-kudu_2.11-1.2.1.jar:?]
    at org.colloh.flink.kudu.connector.table.sink.KuduSink.invoke(KuduSink.java:93) ~[flink-connector-kudu_2.11-1.2.1.jar:?]
    at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:49) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
    at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:72) ~[flink-table-blink_2.11-1.12.2.jar:1.12.2]
    
    bug 
    opened by zhiyuan192shine 13
  • he number of replicas does not equal the number of servers

    he number of replicas does not equal the number of servers

    您好,我这边总是遇到这样issue, 您知道什么原因么

    2021-08-30 21:23:51 java.io.IOException: Error while sending value. Row error for primary key="25", tablet=null, server=null, status=Runtime error: the number of replicas does not equal the number of servers Row error for primary key="28", tablet=null, server=null, status=Runtime error: the number of replicas does not equal the number of servers

    at org.colloh.flink.kudu.connector.internal.failure.DefaultKuduFailureHandler.onFailure(DefaultKuduFailureHandler.java:37)
    at org.colloh.flink.kudu.connector.internal.writer.KuduWriter.checkAsyncErrors(KuduWriter.java:164)
    at org.colloh.flink.kudu.connector.internal.writer.KuduWriter.checkErrors(KuduWriter.java:154)
    at org.colloh.flink.kudu.connector.internal.writer.KuduWriter.write(KuduWriter.java:108)
    at org.colloh.flink.kudu.connector.table.sink.KuduSink.invoke(KuduSink.java:97)
    at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:49)
    at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:72)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
    at StreamExecCalc$301.processElement(Unknown Source)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
    at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
    at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:194)
    at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
    at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:187)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    at java.lang.Thread.run(Thread.java:748)
    

    我这边看了下 replcas = 3, server =0, 不知道什么原因 server=0

    opened by yonghaozhang 6
  • Row error for primary key=[-128, 0, 0, 1], tablet=null, server=null, status=Corruption: Unknown operation type: 12 (error 0)

    Row error for primary key=[-128, 0, 0, 1], tablet=null, server=null, status=Corruption: Unknown operation type: 12 (error 0)

    my error likes :

    
    2022-04-29 14:56:01,700 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Join(joinType=[LeftOuterJoin], where=[(id = id0)], select=[id, name, id0, age], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) -> Calc(select=[id, name, age]) -> Sink: Sink(table=[kudu.default_database.test_zc_1], fields=[id, name, age]) (1/1) (2b3a6732eb50271cd37b8b843563233d) switched from RUNNING to FAILED on 127.0.0.1:63027-bc5b32 @ localhost (dataPort=63029).
    java.io.IOException: Could not perform checkpoint 9 for operator Join(joinType=[LeftOuterJoin], where=[(id = id0)], select=[id, name, id0, age], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) -> Calc(select=[id, name, age]) -> Sink: Sink(table=[kudu.default_database.test_zc_1], fields=[id, name, age]) (1/1)#0.
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:972) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:131) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.runtime.io.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:156) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:157) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:179) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:97) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:398) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_301]
    Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 9 for operator Join(joinType=[LeftOuterJoin], where=[(id = id0)], select=[id, name, id0, age], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) -> Calc(select=[id, name, age]) -> Sink: Sink(table=[kudu.default_database.test_zc_1], fields=[id, name, age]) (1/1)#0. Failure reason: Checkpoint was declined.
    	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:697) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:618) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:583) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:309) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1013) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:997) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:956) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	... 14 more
    Caused by: org.apache.flink.util.SerializedThrowable: Error while sending value. 
     Row error for primary key=[-128, 0, 0, 1], tablet=null, server=null, status=Corruption: Unknown operation type: 12 (error 0)
    Row error for primary key=[-128, 0, 0, 1], tablet=null, server=null, status=Corruption: Unknown operation type: 12 (error 0)
    
    	at org.colloh.flink.kudu.connector.internal.failure.DefaultKuduFailureHandler.onFailure(DefaultKuduFailureHandler.java:37) ~[flink-connector-kudu_2.11-1.2.1.jar:?]
    	at org.colloh.flink.kudu.connector.internal.writer.KuduWriter.checkAsyncErrors(KuduWriter.java:164) ~[flink-connector-kudu_2.11-1.2.1.jar:?]
    	at org.colloh.flink.kudu.connector.internal.writer.KuduWriter.flushAndCheckErrors(KuduWriter.java:115) ~[flink-connector-kudu_2.11-1.2.1.jar:?]
    	at org.colloh.flink.kudu.connector.table.sink.KuduSink.snapshotState(KuduSink.java:112) ~[flink-connector-kudu_2.11-1.2.1.jar:?]
    	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:697) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:618) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:583) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:309) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1013) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:997) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:956) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
    	... 14 more
    

    and my code is like:

     val text = env.socketTextStream("127.0.0.1", 9999)
            val text2 = env.socketTextStream("127.0.0.1", 9998)
    
            val po = text.map(x => {
                val array = x.split(",")
                val po = TestPo(array(0).toInt, array(1), array(2).toInt)
                println("=1===> " + po)
                po
            })
    
            val po2 = text2.map(x => {
                val array = x.split(",")
                val po = TestPo(array(0).toInt, array(1), array(2).toInt)
                println("=2===> " + po)
                po
            })
    
    
            // 生成table的上下文
            val setting = EnvironmentSettings.newInstance.inStreamingMode.build
            val tableEnv = StreamTableEnvironment.create(env, setting)
            tableEnv.getConfig.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key, 1)
         
    
    
            tableEnv.executeSql(
                s"""
                   |create table test_zc_1
                   |(id INT,
                   |name STRING,
                   |age INT
                   |) WITH
                   |(
                   |'connector'='kudu',
                   |'kudu.masters'='${kudu_master}',
                   |'kudu.hash-columns'='id',
                   |'kudu.primary-key-columns'='id',
                   |'kudu.table'='test_zc_1'
                   |)
                   |""".stripMargin)
            val catalog = new KuduCatalog(kudu_master)
            tableEnv.registerCatalog("kudu", catalog)
            tableEnv.useCatalog("kudu")
    
            // 注册两张临时view
            val poTable = tableEnv.fromDataStream(po)
            val poTable2 = tableEnv.fromDataStream(po2)
            tableEnv.createTemporaryView("test_po", poTable)
            tableEnv.createTemporaryView("test_po2", poTable2)
    
           tableEnv.executeSql("insert into test_zc_1 select a.id ,a.name, b.age from test_po a left join test_po2 b on a.id=b.id  ")
    
    
    
    

    第一条数据,先在9999上输入,或者在9998上输入都可以。 第二条数据,如果在9998上输入就没问题;但是先在9999上输入,再去9998上输入就会出现如上的错误。 请帮忙看看,万分感谢。 flink版本12.7 或14.4都会有如上问题。

    opened by cherful 5
  • source limit push down bug

    source limit push down bug

    https://github.com/collabH/flink-connector-kudu/blob/8d6d9d2baaf7d167fe9f4ac2b94be9dfbc9b1dd5/src/main/java/org/colloh/flink/kudu/connector/table/source/KuduDynamicTableSource.java#L160 need be replaced by

    configBuilder = configBuilder.setRowLimit((int) limit);

    bug 
    opened by lintingbin2009 3
  • 大佬,这个flink-kudu-connector支持checkpoint吗?

    大佬,这个flink-kudu-connector支持checkpoint吗?

    Sink: Sink(table=[default_catalog.default_database.teacher4], fields=[id, name, age]) (1/1) (8ddb33f3da34221584f625438dc18a8c) switched from RUNNING to FAILED on container_1618471122700_0004_01_000002 @ spark1 (dataPort=36997). java.lang.Exception: Error while triggering checkpoint 1 for Source: TableSourceScan(table=[[default_catalog, default_database, stu4_kafka]], fields=[id, name, age]) -> DropUpdateBefore -> Sink: Sink(table=[default_catalog.default_database.teacher4], fields=[id, name, age]) (1/1)#0 at org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1176) ~[Flink_test-1.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:840) ~[Flink_test-1.0-SNAPSHOT.jar:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_221] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_221] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_221] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_221] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286) ~[Flink_test-1.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201) ~[Flink_test-1.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) ~[Flink_test-1.0-SNAPSHOT.jar:?] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [Flink_test-1.0-SNAPSHOT.jar:?] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [Flink_test-1.0-SNAPSHOT.jar:?] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [Flink_test-1.0-SNAPSHOT.jar:?] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [Flink_test-1.0-SNAPSHOT.jar:?] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [Flink_test-1.0-SNAPSHOT.jar:?] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [Flink_test-1.0-SNAPSHOT.jar:?] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [Flink_test-1.0-SNAPSHOT.jar:?] at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [Flink_test-1.0-SNAPSHOT.jar:?] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [Flink_test-1.0-SNAPSHOT.jar:?] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [Flink_test-1.0-SNAPSHOT.jar:?] at akka.actor.ActorCell.invoke(ActorCell.scala:561) [Flink_test-1.0-SNAPSHOT.jar:?] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [Flink_test-1.0-SNAPSHOT.jar:?] at akka.dispatch.Mailbox.run(Mailbox.scala:225) [Flink_test-1.0-SNAPSHOT.jar:?] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [Flink_test-1.0-SNAPSHOT.jar:?] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [Flink_test-1.0-SNAPSHOT.jar:?] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [Flink_test-1.0-SNAPSHOT.jar:?] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [Flink_test-1.0-SNAPSHOT.jar:?] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [Flink_test-1.0-SNAPSHOT.jar:?] Caused by: java.lang.UnsupportedOperationException: triggerCheckpointAsync not supported by org.apache.flink.streaming.runtime.tasks.SourceStreamTask at org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable.triggerCheckpointAsync(AbstractInvokable.java:222) ~[Flink_test-1.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1166) ~[Flink_test-1.0-SNAPSHOT.jar:?] ... 26 more

    opened by amazingSaltFish 3
  • flink1.13 关联时态表失败

    flink1.13 关联时态表失败

    Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not determine a type inference for lookup function 'default_catalog.default_database.kudu_source'. Lookup functions support regular type inference. However, for convenience, the output class can simply be a Row or RowData class in which case the input and output types are derived from the table's schema with default conversion.

    同样切换flink 1.12 无此类报错

    opened by opoioa 1
  • 单元测试 在window下跑不过 ?

    单元测试 在window下跑不过 ?

    java.lang.RuntimeException: Error while locating kudu binary

    at org.apache.kudu.test.cluster.KuduBinaryLocator.findBinaryLocation(KuduBinaryLocator.java:114)
    at org.apache.kudu.test.cluster.KuduBinaryLocator.findBinary(KuduBinaryLocator.java:127)
    at org.apache.kudu.test.cluster.MiniKuduCluster.start(MiniKuduCluster.java:193)
    at org.apache.kudu.test.cluster.MiniKuduCluster.access$300(MiniKuduCluster.java:74)
    at org.apache.kudu.test.cluster.MiniKuduCluster$MiniKuduClusterBuilder.build(MiniKuduCluster.java:705)
    at org.apache.kudu.test.KuduTestHarness.before(KuduTestHarness.java:145)
    at org.colloh.flink.kudu.connector.KuduTestBase.beforeClass(KuduTestBase.java:73)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:628)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:117)
    at org.junit.jupiter.engine.descriptor.ClassTestDescriptor.lambda$invokeBeforeAllMethods$9(ClassTestDescriptor.java:376)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.ClassTestDescriptor.invokeBeforeAllMethods(ClassTestDescriptor.java:375)
    at org.junit.jupiter.engine.descriptor.ClassTestDescriptor.before(ClassTestDescriptor.java:201)
    at org.junit.jupiter.engine.descriptor.ClassTestDescriptor.before(ClassTestDescriptor.java:77)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:132)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    
    opened by mixhuhu 1
Owner
Shimin Huang
opensource believer.
Shimin Huang
A tool based on mysql-connector to simplify the use of databases, tables & columns

Description A tool based on mysql-connector to simplify the use of databases, tables & columns. This tool automatically creates the databases & tables

nz 6 Nov 17, 2022
Amazon AppFlow Custom JDBC Connector example

Amazon AppFlow Custom JDBC Connector example This project contains source code and supporting files that implements Amazon Custom Connector SDK and re

AWS Samples 6 Oct 26, 2022
Flink Connector for Apache Doris(incubating)

Flink Connector for Apache Doris (incubating) Flink Doris Connector More information about compilation and usage, please visit Flink Doris Connector L

The Apache Software Foundation 115 Dec 20, 2022
A sidecar to run alongside Trino to gather metrics using the JMX connector and expose them in different formats using Apache velocity

Overview A sidecar to run alongside Trino to gather metrics using the JMX connector and expose them in different formats using Apache Velocity. Click

BlueCat Engineering 4 Nov 18, 2021
Example Project which uses spark mongo connector !

mongo-spark-connector-springboot Example Project which uses spark mongo connector to read/aggregate & convert into Spark DataSet/Java RDDs Connects to

Vibhor 2 Dec 6, 2022
GalaxyGlue is an extension to MySQL Connector/J 8.0.

中文文档 What is ApsaraDB GalaxyGlue ? GalaxyGlue is an extension to MySQL Connector/J 8.0. GalaxyGlue uses asynchronous requests, session and connection

null 32 Dec 19, 2022
flink-connector-redis

github: https://github.com/future94/flink-connector-redis gitee : https://gitee.com/future94/flink-connector-redis Stargazers over time 为什么写这个项目 对比其他的

invalley 3 Aug 30, 2022