Flink Connector for Apache Doris(incubating)

Overview

Flink Connector for Apache Doris (incubating)

License Join the Doris Community at Slack

Flink Doris Connector

More information about compilation and usage, please visit Flink Doris Connector

License

Apache License, Version 2.0

Report issues or submit pull request

If you find any bugs, feel free to file a GitHub issue or fix it by submitting a pull request.

Contact Us

Contact us through the following mailing list.

Name Scope
[email protected] Development-related discussions Subscribe Unsubscribe Archives

Links

Comments
  • json load by line

    json load by line

    Proposed changes

    1. For stream load with json format,support read_json_by_line instead of strip_outer_array
    2. Remove non empty check for dorisReadOptions

    Checklist(Required)

    1. Does it affect the original behavior: (No)
    2. Has unit tests been added: (Yes)
    3. Has document been added or modified: (No Need)
    4. Does it need to update dependencies: (No)
    5. Are there any changes that cannot be rolled back: (No)

    Further comments

    If this is a relatively large or complex change, kick off the discussion at [email protected] by explaining why you chose the solution you did and what alternatives you considered, etc...

    opened by madongz 7
  • [Bug] Stream load fails when there's no data

    [Bug] Stream load fails when there's no data

    Search before asking

    • [X] I had searched in the issues and found no similar issues.

    Version

    1.1.0

    What's Wrong?

    Stream load fails if there's no data to flush.

    截屏2022-08-09 17 41 43

    We could fix this by skip the flush if there's no pending data.

    What You Expected?

    No failure when there's no input data.

    How to Reproduce?

    Submit a job with an empty data source and Doris sink.

    Anything Else?

    No response

    Are you willing to submit PR?

    • [X] Yes I am willing to submit a PR!

    Code of Conduct

    opened by link3280 5
  • [feature]Supports traversal of Doris FE nodes when searching for Doris BE

    [feature]Supports traversal of Doris FE nodes when searching for Doris BE

    Proposed changes

    Problem Summary:

    目前,使用 flink-doris-connector 写入 Doris ,当配置了多个 Doris FE 节点的前提下, 内部通过 FE 节点请求到 BE 时会随机选择其中一个 FE 提交 http 请求 BE 节点地址;在生产环境中,由于机器原因或者是版本升级需要滚动重启 FE 服务时,由于某个 FE 服务暂时不可用,可能会引起 flink 作业异常退出; 为了降低集群变更或偶发异常影响实时流稳定,考虑通过对所有配置的 Doris FE 节点发送 http 请求查询 Doris BE。

    Currently, when Flink-doris-Connetor writes Doris, it will queries Doris FE node through one of the Doris BE nodes randomly. It may causing job failures when one of Doris BE node in cluster is out of service. For example, sometime machine has problem and need to restart or admin need to Actively restart service In the process of upgrading the version

    Checklist(Required)

    1. Does it affect the original behavior: (Yes)
    2. Has unit tests been added: (No Need)
    3. Has document been added or modified: (No Need)
    4. Does it need to update dependencies: (No)
    5. Are there any changes that cannot be rolled back: (No)

    Further comments

    opened by bridgeDream 4
  • When doing checkpoint, write cache data to doris to prevent loss

    When doing checkpoint, write cache data to doris to prevent loss

    Proposed changes

    Issue Number: close

    Problem Summary:

    Describe the overview of changes. flink写doris是批量写入,用户设置条数为1000条时,才flush,假如写入500条时,程序正好做checkpoint成功了,kafka的offset做了相应的commit,后面500条过来后,未做checkpoint,但此时doris服务器出了问题,导致flush报错,flink的task重启,重新从上次checkpoint commit的offset消费kafka,导致前面500条数据出现丢失情况

    Checklist(Required)

    1. Does it affect the original behavior: (No)
    2. Has unit tests been added: (No)
    3. Has document been added or modified: (No)
    4. Does it need to update dependencies: (No)
    5. Are there any changes that cannot be rolled back: (No)

    Further comments

    If this is a relatively large or complex change, kick off the discussion at [email protected] by explaining why you chose the solution you did and what alternatives you considered, etc...

    good first issue 
    opened by liuyaolin 4
  • Feature/20220305 support flush when batch bytes reach threshold

    Feature/20220305 support flush when batch bytes reach threshold

    Proposed changes

    Problem Summary:

    目前,dorisSink 刷入 batch 的时机依赖 "batchSize" 和 "batchIntervalMs",一般情形下是能够符合预期的;但是当数据源流量或者单条消息数据量极大时,可能会造成单个 batch 的数据很大,进而会导致数据刷入 doris 时长时间阻塞影响实时性能,甚至请求 BE 超时无法正常写入的问题。

    出于上述原因,提供依据 "maxBatchBytes" 刷 batch 到 doris 中,修改点:

    • DorisExecutionOptions: 增加一个可配置选项 maxBatchBytes
    • DorisDynamicOutputFormat: 估计当前 batch 的数据量 batchBytes 达到阈值 maxBatchBytes 时,刷入 doris。

    Checklist(Required)

    1. Does it affect the original behavior: (Yes/No/I Don't know)
    2. Has unit tests been added: (No Need)
    3. Has document been added or modified: (No Need)
    4. Does it need to update dependencies: (No)
    5. Are there any changes that cannot be rolled back: (No)

    Further comments

    opened by bridgeDream 3
  • [feature]Supports traversal of Doris FE nodes when searching for Doris BE on branch flink-before-1.13

    [feature]Supports traversal of Doris FE nodes when searching for Doris BE on branch flink-before-1.13

    Proposed changes

    Problem Summary:

    目前,使用 flink-doris-connector 写入 Doris ,当配置了多个 Doris FE 节点的前提下, 内部通过 FE 节点请求到 BE 时会随机选择其中一个 FE 提交 http 请求 BE 节点地址;在生产环境中,由于机器原因或者是版本升级需要滚动重启 FE 服务时,由于某个 FE 服务暂时不可用,可能会引起 flink 作业异常退出; 为了降低集群变更或偶发异常影响实时流稳定,考虑通过对所有配置的 Doris FE 节点发送 http 请求查询 Doris BE。

    Currently, when Flink-doris-Connetor writes Doris, it will queries Doris FE node through one of the Doris BE nodes randomly. It may causing job failures when one of Doris BE node in cluster is out of service. For example, sometime machine has problem and need to restart or admin need to Actively restart service In the process of upgrading the version

    Checklist(Required)

    1. Does it affect the original behavior: (Yes)
    2. Has unit tests been added: (No Need)
    3. Has document been added or modified: (No Need)
    4. Does it need to update dependencies: (No)
    5. Are there any changes that cannot be rolled back: (No)

    Further comments

    opened by bridgeDream 3
  • Flink Doris Connector Release Note 1.2.0

    Flink Doris Connector Release Note 1.2.0

    Feature

    1. Support Flink 1.15 read and write
    2. Optimize log dependencies and delete log-related dependencies of pom files

    Bugfix

    1. Solve the exception when MaxRetries = 0

    Thanks

    Thanks to everyone who has contributed to this release:

    @hf200012 @JNSimba @link3280 @morningman @yeyudefeng

    release notes 
    opened by JNSimba 2
  • Release Note 1.1.0

    Release Note 1.1.0

    Feature

    1. Refactoring DorisSouce based on FLIP-27

    2. With the two-phase commit of Stream Load in Doris 1.x version, Flink Doris Connector implements Exactly Once semantics

    3. Support streaming loading in json format, support read_json_by_line instead of strip_outer_array

    Bugfix

    1. fix flink schema and doris schema column mappingn

    2. fix flink date and timestamp type not mapping

    3. Fix row type decimal convert bug

    Note: We will no longer support Flink versions prior to 1.14 in this version, and we will provide long-term support versions for subsequent versions of Flink based on this version.

    Thanks

    Thanks to everyone who has contributed to this release:

    @aiwenmo @bridgeDream @cxzl25 @gj-zhang @hf200012 @JNSimba @liuyaolin @madongz @morningman @stalary @yangzhg @Yankee24

    release notes 
    opened by hf200012 2
  • [Bug] IllegalArgumentException: Row arity: 5, but serializer arity: 4

    [Bug] IllegalArgumentException: Row arity: 5, but serializer arity: 4

    Search before asking

    • [X] I had searched in the issues and found no similar issues.

    Version

    flink version : flink-1.14.4-scala_2.12

    flink doris connector version : flink-doris-connector-1.14_2.12-1.0.3.jar

    doris version : doris-0.15.0

    What's Wrong?

    submit a job use flink sql-client :insert into select

    error logs:

    2022-04-22 22:27:56 java.lang.IllegalArgumentException: Row arity: 5, but serializer arity: 4 at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:124) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:80) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513) at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103) at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)

    What You Expected?

    bug

    How to Reproduce?

    No response

    Anything Else?

    No response

    Are you willing to submit PR?

    • [ ] Yes I am willing to submit a PR!

    Code of Conduct

    opened by hellozhaoxudong 2
  • [improvement] (before 1.13)Support set max bytes in each batch to avoid congestion

    [improvement] (before 1.13)Support set max bytes in each batch to avoid congestion

    Problem Summary:

    目前,dorisSink 刷入 batch 的时机依赖 "batchSize" 和 "batchIntervalMs",一般情形下是能够符合预期的;但是当数据源流量或者单条消息数据量极大时,可能会造成单个 batch 的数据很大,进而会导致数据刷入 doris 时长时间阻塞影响实时性能,甚至请求 BE 超时无法正常写入的问题。

    出于上述原因,提供依据 "maxBatchBytes" 刷 batch 到 doris 中,修改点:

    • DorisExecutionOptions: 增加一个可配置选项 maxBatchBytes
    • DorisDynamicOutputFormat: 估计当前 batch 的数据量 batchBytes 达到阈值 maxBatchBytes 时,刷入 doris。

    Checklist(Required)

    1. Does it affect the original behavior: (Yes/No/I Don't know)
    2. Has unit tests been added: (No Need)
    3. Has document been added or modified: (No Need)
    4. Does it need to update dependencies: (No)
    5. Are there any changes that cannot be rolled back: (No)

    Further comments

    opened by bridgeDream 2
  • [Feature] Support Flink 1.14

    [Feature] Support Flink 1.14

    Proposed changes

    Support flink1.14

    Problem Summary:

    Describe the overview of changes.

    Checklist(Required)

    1. Does it affect the original behavior: (Yes/No/I Don't know)
    2. Has unit tests been added: (Yes/No/No Need)
    3. Has document been added or modified: (Yes/No/No Need)
    4. Does it need to update dependencies: (Yes/No)
    5. Are there any changes that cannot be rolled back: (Yes/No)

    Further comments

    If this is a relatively large or complex change, kick off the discussion at [email protected] by explaining why you chose the solution you did and what alternatives you considered, etc...

    opened by JNSimba 2
  • [Feature] Can one job works for multiples tables?

    [Feature] Can one job works for multiples tables?

    Search before asking

    • [X] I had searched in the issues and found no similar issues.

    Description

    It's wasterful for my system to handle every table with their alone slot, because my tables is not so big and change not so fast.

    My temp solution is creating custom streamLoad and doriswriter. And it works for all Listening tables .

    Use case

    one job can works for multiples tables

    Related issues

    No response

    Are you willing to submit PR?

    • [X] Yes I am willing to submit a PR!

    Code of Conduct

    opened by 979734140 0
  • [Improvement]dynamic refresh BE node

    [Improvement]dynamic refresh BE node

    Proposed changes

    Issue Number: close #xxx

    Problem Summary:

    requesting a fixed Coordinator BE for a long time, will cause the Coordinator BE to suffer a lot of network traffic and scheduling management, which will cause the Coordinator BE in a high load state.

    Checklist(Required)

    1. Does it affect the original behavior: (Yes/No/I Don't know)
    2. Has unit tests been added: (Yes/No/No Need)
    3. Has document been added or modified: (Yes/No/No Need)
    4. Does it need to update dependencies: (Yes/No)
    5. Are there any changes that cannot be rolled back: (Yes/No)

    Further comments

    If this is a relatively large or complex change, kick off the discussion at [email protected] by explaining why you chose the solution you did and what alternatives you considered, etc...

    opened by DongLiang-0 0
  • Release Note 1.3.0

    Release Note 1.3.0

    Feature

    Support Flink 1.16 version Support doris catalog #60 Add RowSerializer for datastream #71 Support 'sink.parallelism' config for flinksql #72 #77 Support flink table lookup join #61 Add JsonDebeziumSchemaSerializer for schema change with datastream #79

    Bug

    fix enable-delete option bug #63 fix bug when abort fail #78

    Thanks

    Thanks to everyone who has contributed to this release: @caoliang-web @dinggege1024 @DongLiang-0 @geniusjoe @huyuanfeng2018 @JNSimba

    release notes 
    opened by JNSimba 0
  • Release Note 1.2.1

    Release Note 1.2.1

    Feature

    Support doris catalog #60 Add RowSerializer for datastream #71 Support 'sink.parallelism' config for flinksql #72 #77 Support flink table lookup join #61 Add JsonDebeziumSchemaSerializer for schema change with datastream #79

    Bug

    fix enable-delete option bug #63 fix bug when abort fail #78

    Thanks

    Thanks to everyone who has contributed to this release: @caoliang-web @dinggege1024 @DongLiang-0 @geniusjoe @huyuanfeng2018 @JNSimba

    release notes 
    opened by JNSimba 1
  • Release Note 1.1.1

    Release Note 1.1.1

    Feature

    Add JsonDebeziumSchemaSerializer for schema change with datastream #64

    Thanks

    Thanks to everyone who has contributed to this release: @JNSimba

    release notes 
    opened by JNSimba 0
  • [Bug] retry infinite 2pc

    [Bug] retry infinite 2pc

    Search before asking

    • [X] I had searched in the issues and found no similar issues.

    Version

    1.15 sink.max-retries = 1

    What's Wrong?

    retry infinite 2pc

    
    2022-11-18 11:53:48,008 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65212 (019990f47bcb3edc3ef9a00232143186).
    2022-11-18 11:53:48,008 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65212 019990f47bcb3edc3ef9a00232143186.
    2022-11-18 11:53:49,013 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot e32293a602f95c9f67ad0f8e2c653365.
    2022-11-18 11:53:49,013 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2), deploy into slot with allocation id e32293a602f95c9f67ad0f8e2c653365.
    2022-11-18 11:53:49,013 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2) switched from CREATED to DEPLOYING.
    2022-11-18 11:53:49,014 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Loading JAR files for task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2) [DEPLOYING].
    2022-11-18 11:53:49,014 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@3d7d4ae1
    2022-11-18 11:53:49,014 INFO  org.apache.flink.runtime.state.StateBackendLoader            [] - State backend loader loads the state backend as HashMapStateBackend
    2022-11-18 11:53:49,014 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Checkpoint storage is set to 'jobmanager'
    2022-11-18 11:53:49,014 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2) switched from DEPLOYING to INITIALIZING.
    2022-11-18 11:53:49,020 WARN  org.apache.flink.metrics.MetricGroup                         [] - Name collision: Group already contains a Metric with the name 'currentEmitEventTimeLag'. Metric will not be reported.[localhost, taskmanager, localhost:33197-e443eb, insert-into_default_catalog.default_database.doris_purchase, Source: purchase[1], 0]
    2022-11-18 11:53:49,020 WARN  org.apache.flink.metrics.MetricGroup                         [] - Name collision: Group already contains a Metric with the name 'sourceIdleTime'. Metric will not be reported.[localhost, taskmanager, localhost:33197-e443eb, insert-into_default_catalog.default_database.doris_purchase, Source: purchase[1], 0]
    2022-11-18 11:53:49,021 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing Source Reader.
    2022-11-18 11:53:49,021 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2) switched from INITIALIZING to FAILED with failure cause: org.apache.doris.flink.exception.DorisRuntimeException: Commit failed {
        "status": "Fail",
        "msg": "errCode = 2, detailMessage = transaction [49734] not found"
    }
    	at org.apache.doris.flink.sink.committer.DorisCommitter.commitTransaction(DorisCommitter.java:116)
    	at org.apache.doris.flink.sink.committer.DorisCommitter.commit(DorisCommitter.java:71)
    	at org.apache.flink.streaming.api.transformations.SinkV1Adapter$CommitterAdapter.commit(SinkV1Adapter.java:282)
    	at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:127)
    	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:176)
    	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:160)
    	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:121)
    	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
    	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
    	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    	at java.lang.Thread.run(Thread.java:748)
    
    2022-11-18 11:53:49,021 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2).
    2022-11-18 11:53:49,021 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 0eedf3500794baa86ab2fdfcbefeb1c2.
    2022-11-18 11:53:50,025 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot e32293a602f95c9f67ad0f8e2c653365.
    2022-11-18 11:53:50,026 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65214 (f1346df276ea9b9c68b8907169fe5e3e), deploy into slot with allocation id e32293a602f95c9f67ad0f8e2c653365.
    2022-11-18 11:53:50,026 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65214 (f1346df276ea9b9c68b8907169fe5e3e) switched from CREATED to DEPLOYING.
    2022-11-18 11:53:50,026 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Loading JAR files for task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65214 (f1346df276ea9b9c68b8907169fe5e3e) [DEPLOYING].
    2022-11-18 11:53:50,026 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@3c9058a4
    2022-11-18 11:53:50,026 INFO  org.apache.flink.runtime.state.StateBackendLoader            [] - State backend loader loads the state backend as HashMapStateBackend
    2022-11-18 11:53:50,026 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Checkpoint storage is set to 'jobmanager'
    2022-11-18 11:53:50,026 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65214 (f1346df276ea9b9c68b8907169fe5e3e) switched from DEPLOYING to INITIALIZING.
    2022-11-18 11:53:50,032 WARN  org.apache.flink.metrics.MetricGroup                         [] - Name collision: Group already contains a Metric with the name 'currentEmitEventTimeLag'. Metric will not be reported.[localhost, taskmanager, localhost:33197-e443eb, insert-into_default_catalog.default_database.doris_purchase, Source: purchase[1], 0]
    2022-11-18 11:53:50,032 WARN  org.apache.flink.metrics.MetricGroup                         [] - Name collision: Group already contains a Metric with the name 'sourceIdleTime'. Metric will not be reported.[localhost, taskmanager, localhost:33197-e443eb, insert-into_default_catalog.default_database.doris_purchase, Source: purchase[1], 0]
    2022-11-18 11:53:50,033 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing Source Reader.
    2022-11-18 11:53:50,033 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65214 (f1346df276ea9b9c68b8907169fe5e3e) switched from INITIALIZING to FAILED with failure cause: org.apache.doris.flink.exception.DorisRuntimeException: Commit failed {
        "status": "Fail",
        "msg": "errCode = 2, detailMessage = transaction [49734] not found"
    }
    	at org.apache.doris.flink.sink.committer.DorisCommitter.commitTransaction(DorisCommitter.java:116)
    	at org.apache.doris.flink.sink.committer.DorisCommitter.commit(DorisCommitter.java:71)
    	at org.apache.flink.streaming.api.transformations.SinkV1Adapter$CommitterAdapter.commit(SinkV1Adapter.java:282)
    	at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:127)
    	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:176)
    	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:160)
    	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:121)
    	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
    	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
    	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    	at java.lang.Thread.run(Thread.java:748)
    
    

    What You Expected?

    don't retry infinite 2pc which not found transaction in doris be.

    How to Reproduce?

    You can reproduce If you killed doris backend during sinking data through flink.

    Anything Else?

    No response

    Are you willing to submit PR?

    • [X] Yes I am willing to submit a PR!

    Code of Conduct

    opened by rafael81 1
Releases(1.16-1.3.0)
Owner
The Apache Software Foundation
The Apache Software Foundation
A tool based on mysql-connector to simplify the use of databases, tables & columns

Description A tool based on mysql-connector to simplify the use of databases, tables & columns. This tool automatically creates the databases & tables

nz 6 Nov 17, 2022
Amazon AppFlow Custom JDBC Connector example

Amazon AppFlow Custom JDBC Connector example This project contains source code and supporting files that implements Amazon Custom Connector SDK and re

AWS Samples 6 Oct 26, 2022
🚀flink-sql-submit is a custom SQL submission client

??flink-sql-submit is a custom SQL submission client This is a customizable extension of the client, unlike flink's official default client.

ccinn 3 Mar 28, 2022
Apache Cayenne is an open source persistence framework licensed under the Apache License

Apache Cayenne is an open source persistence framework licensed under the Apache License, providing object-relational mapping (ORM) and remoting services.

The Apache Software Foundation 284 Dec 31, 2022
Apache Calcite

Apache Calcite Apache Calcite is a dynamic data management framework. It contains many of the pieces that comprise a typical database management syste

The Apache Software Foundation 3.6k Dec 31, 2022
Apache Druid: a high performance real-time analytics database.

Website | Documentation | Developer Mailing List | User Mailing List | Slack | Twitter | Download Apache Druid Druid is a high performance real-time a

The Apache Software Foundation 12.3k Jan 1, 2023
Apache Hive

Apache Hive (TM) The Apache Hive (TM) data warehouse software facilitates reading, writing, and managing large datasets residing in distributed storag

The Apache Software Foundation 4.6k Dec 28, 2022
The Chronix Server implementation that is based on Apache Solr.

Chronix Server The Chronix Server is an implementation of the Chronix API that stores time series in Apache Solr. Chronix uses several techniques to o

Chronix 262 Jul 3, 2022
Apache Pinot - A realtime distributed OLAP datastore

What is Apache Pinot? Features When should I use Pinot? Building Pinot Deploying Pinot to Kubernetes Join the Community Documentation License What is

The Apache Software Foundation 4.4k Dec 30, 2022
Apache Ant is a Java-based build tool.

Apache Ant What is it? ----------- Ant is a Java based build tool. In theory it is kind of like "make" without makes wrinkles and with

The Apache Software Foundation 355 Dec 22, 2022
Apache Aurora - A Mesos framework for long-running services, cron jobs, and ad-hoc jobs

NOTE: The Apache Aurora project has been moved into the Apache Attic. A fork led by members of the former Project Management Committee (PMC) can be fo

The Apache Software Foundation 627 Nov 28, 2022
Apache Drill is a distributed MPP query layer for self describing data

Apache Drill Apache Drill is a distributed MPP query layer that supports SQL and alternative query languages against NoSQL and Hadoop data storage sys

The Apache Software Foundation 1.8k Jan 7, 2023
HurricaneDB a real-time distributed OLAP engine, powered by Apache Pinot

HurricaneDB is a real-time distributed OLAP datastore, built to deliver scalable real-time analytics with low latency. It can ingest from batch data sources (such as Hadoop HDFS, Amazon S3, Azure ADLS, Google Cloud Storage) as well as stream data sources (such as Apache Kafka).

GuinsooLab 4 Dec 28, 2022
Flink/Spark Connectors for Apache Doris(Incubating)

Apache Doris (incubating) Connectors The repository contains connectors for Apache Doris (incubating) Flink Doris Connector More information about com

The Apache Software Foundation 30 Dec 7, 2022
Flink/Spark Connectors for Apache Doris

Flink/Spark Connectors for Apache Doris

The Apache Software Foundation 30 Dec 7, 2022
Flink CDC Connectors is a set of source connectors for Apache Flink

Flink CDC Connectors is a set of source connectors for Apache Flink, ingesting changes from different databases using change data capture (CDC). The Flink CDC Connectors integrates Debezium as the engine to capture data changes.

null 6 Mar 23, 2022
Flink Table Store is a unified streaming and batch store for building dynamic tables on Apache Flink

Flink Table Store is a unified streaming and batch store for building dynamic tables on Apache Flink

The Apache Software Foundation 366 Jan 1, 2023
flink-connector-redis

github: https://github.com/future94/flink-connector-redis gitee : https://gitee.com/future94/flink-connector-redis Stargazers over time 为什么写这个项目 对比其他的

invalley 3 Aug 30, 2022
Apache Heron (Incubating) is a realtime, distributed, fault-tolerant stream processing engine from Twitter

Heron is a realtime analytics platform developed by Twitter. It has a wide array of architectural improvements over it's predecessor. Heron in Apache

The Apache Software Foundation 3.6k Dec 28, 2022