my error likes :
2022-04-29 14:56:01,700 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Join(joinType=[LeftOuterJoin], where=[(id = id0)], select=[id, name, id0, age], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) -> Calc(select=[id, name, age]) -> Sink: Sink(table=[kudu.default_database.test_zc_1], fields=[id, name, age]) (1/1) (2b3a6732eb50271cd37b8b843563233d) switched from RUNNING to FAILED on 127.0.0.1:63027-bc5b32 @ localhost (dataPort=63029).
java.io.IOException: Could not perform checkpoint 9 for operator Join(joinType=[LeftOuterJoin], where=[(id = id0)], select=[id, name, id0, age], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) -> Calc(select=[id, name, age]) -> Sink: Sink(table=[kudu.default_database.test_zc_1], fields=[id, name, age]) (1/1)#0.
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:972) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:131) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.runtime.io.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:156) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:157) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:179) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:97) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:398) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_301]
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 9 for operator Join(joinType=[LeftOuterJoin], where=[(id = id0)], select=[id, name, id0, age], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) -> Calc(select=[id, name, age]) -> Sink: Sink(table=[kudu.default_database.test_zc_1], fields=[id, name, age]) (1/1)#0. Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:697) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:618) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:583) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:309) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1013) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:997) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:956) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
... 14 more
Caused by: org.apache.flink.util.SerializedThrowable: Error while sending value.
Row error for primary key=[-128, 0, 0, 1], tablet=null, server=null, status=Corruption: Unknown operation type: 12 (error 0)
Row error for primary key=[-128, 0, 0, 1], tablet=null, server=null, status=Corruption: Unknown operation type: 12 (error 0)
at org.colloh.flink.kudu.connector.internal.failure.DefaultKuduFailureHandler.onFailure(DefaultKuduFailureHandler.java:37) ~[flink-connector-kudu_2.11-1.2.1.jar:?]
at org.colloh.flink.kudu.connector.internal.writer.KuduWriter.checkAsyncErrors(KuduWriter.java:164) ~[flink-connector-kudu_2.11-1.2.1.jar:?]
at org.colloh.flink.kudu.connector.internal.writer.KuduWriter.flushAndCheckErrors(KuduWriter.java:115) ~[flink-connector-kudu_2.11-1.2.1.jar:?]
at org.colloh.flink.kudu.connector.table.sink.KuduSink.snapshotState(KuduSink.java:112) ~[flink-connector-kudu_2.11-1.2.1.jar:?]
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:697) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:618) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:583) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:309) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1013) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:997) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:956) ~[flink-dist_2.11-1.12.7.jar:1.12.7]
... 14 more
and my code is like:
val text = env.socketTextStream("127.0.0.1", 9999)
val text2 = env.socketTextStream("127.0.0.1", 9998)
val po = text.map(x => {
val array = x.split(",")
val po = TestPo(array(0).toInt, array(1), array(2).toInt)
println("=1===> " + po)
po
})
val po2 = text2.map(x => {
val array = x.split(",")
val po = TestPo(array(0).toInt, array(1), array(2).toInt)
println("=2===> " + po)
po
})
// 生成table的上下文
val setting = EnvironmentSettings.newInstance.inStreamingMode.build
val tableEnv = StreamTableEnvironment.create(env, setting)
tableEnv.getConfig.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key, 1)
tableEnv.executeSql(
s"""
|create table test_zc_1
|(id INT,
|name STRING,
|age INT
|) WITH
|(
|'connector'='kudu',
|'kudu.masters'='${kudu_master}',
|'kudu.hash-columns'='id',
|'kudu.primary-key-columns'='id',
|'kudu.table'='test_zc_1'
|)
|""".stripMargin)
val catalog = new KuduCatalog(kudu_master)
tableEnv.registerCatalog("kudu", catalog)
tableEnv.useCatalog("kudu")
// 注册两张临时view
val poTable = tableEnv.fromDataStream(po)
val poTable2 = tableEnv.fromDataStream(po2)
tableEnv.createTemporaryView("test_po", poTable)
tableEnv.createTemporaryView("test_po2", poTable2)
tableEnv.executeSql("insert into test_zc_1 select a.id ,a.name, b.age from test_po a left join test_po2 b on a.id=b.id ")
第一条数据,先在9999上输入,或者在9998上输入都可以。
第二条数据,如果在9998上输入就没问题;但是先在9999上输入,再去9998上输入就会出现如上的错误。
请帮忙看看,万分感谢。
flink版本12.7 或14.4都会有如上问题。