Flink/Spark Connectors for Apache Doris(Incubating)

Overview

Apache Doris (incubating) Connectors

License Join the Doris Community at Slack

The repository contains connectors for Apache Doris (incubating)

Flink Doris Connector

More information about compilation and usage, please visit Flink Doris Connector

Spark Doris Connector

More information about compilation and usage, please visit Spark Doris Connector

License

Apache License, Version 2.0

Report issues or submit pull request

If you find any bugs, feel free to file a GitHub issue or fix it by submitting a pull request.

Contact Us

Contact us through the following mailing list.

Name Scope
[email protected] Development-related discussions Subscribe Unsubscribe Archives

Links

Comments
  • [Bug] can not write data to doris0.12

    [Bug] can not write data to doris0.12

    Search before asking

    • [X] I had searched in the issues and found no similar issues.

    Version

    doris version:0.12 spark-doris-connector-3.1_2.12 version:1.0.1

    What's Wrong?

    When I use spark-doris-connector write dataFrame to doris 0.12 , an error occurs("Connect to doris http://xx:8030/api/backends?is_alive=true failed."),as shown in the figure below: 1648548631(1)

    I guess it's because doris-0.12 has no such interface("api/backends" ),but the official website document says that it supports 0.12+. 1648549088(1)

    Hope to get a reply, thank you.

    What You Expected?

    support doris 0.12+ or change the official website document

    How to Reproduce?

    Use the official example to reproduce.

    Anything Else?

    No response

    Are you willing to submit PR?

    • [ ] Yes I am willing to submit a PR!

    Code of Conduct

    opened by lijun816 4
  • [improvement]Add an option to set the partition size of the final write stage

    [improvement]Add an option to set the partition size of the final write stage

    Proposed changes

    Add an option to set the partition size of the final write stage

    1. We can increase the parallelism of the computation and reduce the write doris parallelism to reduce write compaction pressure.
    2. After the spark RDD is filtered, the number of records for each partition is small and the number of partitions is large. The writing frequency becomes high and resources are wasted.

    Problem Summary:

    Describe the overview of changes.

    Checklist(Required)

    1. Does it affect the original behavior: (Yes/No/I Don't know)
    2. Has unit tests been added: (Yes/No/No Need)
    3. Has document been added or modified: (Yes/No/No Need)
    4. Does it need to update dependencies: (Yes/No)
    5. Are there any changes that cannot be rolled back: (Yes/No)

    Further comments

    before : image

    after : image

    image

    opened by lexluo09 3
  • [bug] fix stream dataframe writing to doris json parse exception

    [bug] fix stream dataframe writing to doris json parse exception

    Proposed changes

    Issue Number: close #47

    Problem Summary:

    stream dataframe can write to doris

    Checklist(Required)

    1. Does it affect the original behavior: (Yes/No/I Don't know) No
    2. Has unit tests been added: (Yes/No/No Need) No Need
    3. Has document been added or modified: (Yes/No/No Need) No Need
    4. Does it need to update dependencies: (Yes/No) No
    5. Are there any changes that cannot be rolled back: (Yes/No) No

    Further comments

    If this is a relatively large or complex change, kick off the discussion at [email protected] by explaining why you chose the solution you did and what alternatives you considered, etc...

    opened by chovy-3012 3
  • [feature] Support Spark3.2 compilation

    [feature] Support Spark3.2 compilation

    Proposed changes

    Issue Number: close #23

    Problem Summary:

    Support using Spark3.2 version to compile and test

    Checklist(Required)

    1. Does it affect the original behavior: (Yes/No/I Don't know)
    2. Has unit tests been added: (Yes/No/No Need)
    3. Has document been added or modified: (Yes/No/No Need)
    4. Does it need to update dependencies: (Yes/No)
    5. Are there any changes that cannot be rolled back: (Yes/No)

    Further comments

    If this is a relatively large or complex change, kick off the discussion at [email protected] by explaining why you chose the solution you did and what alternatives you considered, etc...

    opened by cxzl25 2
  • Docs: Change http to https

    Docs: Change http to https

    Proposed changes

    Issue Number: close #xxx

    Problem Summary:

    A minor PR, change links in README from http to https

    Checklist(Required)

    1. Does it affect the original behavior: (Yes/No/I Don't know)
    2. Has unit tests been added: (Yes/No/No Need)
    3. Has document been added or modified: (Yes/No/No Need)
    4. Does it need to update dependencies: (Yes/No)
    5. Are there any changes that cannot be rolled back: (Yes/No)

    Further comments

    If this is a relatively large or complex change, kick off the discussion at [email protected] by explaining why you chose the solution you did and what alternatives you considered, etc...

    opened by pan3793 2
  • [improvement]Randomly get a new BE address when writing one batch at a time

    [improvement]Randomly get a new BE address when writing one batch at a time

    Proposed changes

    reduce the Coordinator BE load pressure

    1. The BE address remains the same until the write fails. Improve cluster load balancing.
    2. The Coordinator BE receiving data and distributing data to other data nodes. It receives more network traffic than other nodes.

    Problem Summary:

    Describe the overview of changes.

    1. The BE nodes collection is cached and refresh periodically.
    2. Randomly get a new BE address from cache when writing one batch at a time.

    Checklist(Required)

    1. Does it affect the original behavior: (Yes/No/I Don't know)
    2. Has unit tests been added: (Yes/No/No Need)
    3. Has document been added or modified: (Yes/No/No Need)
    4. Does it need to update dependencies: (Yes/No)
    5. Are there any changes that cannot be rolled back: (Yes/No)

    Further comments

    opened by lexluo09 1
  • [Bug] spark doris connector read table error: Doris FE's response cannot map to schema.

    [Bug] spark doris connector read table error: Doris FE's response cannot map to schema.

    Search before asking

    • [X] I had searched in the issues and found no similar issues.

    Version

    • connector : org.apache.doris:spark-doris-connector-3.1_2.12:1.0.1
    • doris: 1.1 preview2
    • spark: 3.1.2

    What's Wrong?

    Read a table

    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
     .appName('Spark Doris Demo Nick') \
     .config('org.apache.doris:spark-doris-connector-3.1_2.12:1.0.1') \
     .getOrCreate()
    spark
    
    dorisSparkDF = spark.read.format("doris")\
        .option("doris.table.identifier", "db.token_info")\
        .option("doris.fenodes", "xxx:8031")\
        .option("user", "xxx")\
        .option("password", "xxx").load()
    dorisSparkDF.show(5)
    

    then get a error

    22/06/23 07:47:03 ERROR SchemaUtils: Doris FE's response cannot map to schema. res: {"keysType":"UNIQUE_KEYS","properties":[{"name":"chain","aggregation_type":"","comment":"","type":"STRING"},{"name":"token_slug","aggregation_type":"","comment":"","type":"STRING"},{"name":"token_address","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"token_symbol","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"decimals","aggregation_type":"REPLACE","comment":"","type":"INT"},{"name":"type","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"token_type","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"protocol_slug","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"manual_slug","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"erc20_slug","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"coin_gecko_slug","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"logo","aggregation_type":"REPLACE","comment":"","type":"STRING"}],"status":200}
    org.codehaus.jackson.map.exc.UnrecognizedPropertyException: Unrecognized field "keysType" (Class org.apache.doris.spark.rest.models.Schema), not marked as ignorable
     at [Source: java.io.StringReader@74af102e; line: 1, column: 14] (through reference chain: org.apache.doris.spark.rest.models.Schema["keysType"])
    	at org.codehaus.jackson.map.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:53)
    	at org.codehaus.jackson.map.deser.StdDeserializationContext.unknownFieldException(StdDeserializationContext.java:267)
    	at org.codehaus.jackson.map.deser.std.StdDeserializer.reportUnknownProperty(StdDeserializer.java:673)
    	at org.codehaus.jackson.map.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:659)
    	at org.codehaus.jackson.map.deser.BeanDeserializer.handleUnknownProperty(BeanDeserializer.java:1365)
    	at org.codehaus.jackson.map.deser.BeanDeserializer._handleUnknown(BeanDeserializer.java:725)
    	at org.codehaus.jackson.map.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:703)
    	at org.codehaus.jackson.map.deser.BeanDeserializer.deserialize(BeanDeserializer.java:580)
    	at org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:2732)
    	at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1863)
    	at org.apache.doris.spark.rest.RestService.parseSchema(RestService.java:295)
    	at org.apache.doris.spark.rest.RestService.getSchema(RestService.java:279)
    	at org.apache.doris.spark.sql.SchemaUtils$.discoverSchemaFromFe(SchemaUtils.scala:51)
    	at org.apache.doris.spark.sql.SchemaUtils$.discoverSchema(SchemaUtils.scala:41)
    	at org.apache.doris.spark.sql.DorisRelation.lazySchema$lzycompute(DorisRelation.scala:48)
    	at org.apache.doris.spark.sql.DorisRelation.lazySchema(DorisRelation.scala:48)
    	at org.apache.doris.spark.sql.DorisRelation.schema(DorisRelation.scala:52)
    	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:449)
    	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
    	at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307)
    	at scala.Option.getOrElse(Option.scala:189)
    	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)
    	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:225)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    	at py4j.Gateway.invoke(Gateway.java:282)
    	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    	at py4j.commands.CallCommand.execute(CallCommand.java:79)
    	at py4j.GatewayConnection.run(GatewayConnection.java:238)
    	at java.lang.Thread.run(Thread.java:750)
    ---------------------------------------------------------------------------
    Py4JJavaError                             Traceback (most recent call last)
    Input In [3], in <cell line: 1>()
    ----> 1 dorisSparkDF = spark.read.format("doris")\
          2     .option("doris.table.identifier", "xxx.token_info")\
          3     .option("doris.fenodes", "xxxx:8031")\
          4     .option("user", "xxxx")\
          5     .option("password", "xxxxx").load()
          6 dorisSparkDF.show(5)
    
    File /usr/lib/spark/python/pyspark/sql/readwriter.py:210, in DataFrameReader.load(self, path, format, schema, **options)
        208     return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
        209 else:
    --> 210     return self._df(self._jreader.load())
    
    File /opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py:1304, in JavaMember.__call__(self, *args)
       1298 command = proto.CALL_COMMAND_NAME +\
       1299     self.command_header +\
       1300     args_command +\
       1301     proto.END_COMMAND_PART
       1303 answer = self.gateway_client.send_command(command)
    -> 1304 return_value = get_return_value(
       1305     answer, self.gateway_client, self.target_id, self.name)
       1307 for temp_arg in temp_args:
       1308     temp_arg._detach()
    
    File /usr/lib/spark/python/pyspark/sql/utils.py:111, in capture_sql_exception.<locals>.deco(*a, **kw)
        109 def deco(*a, **kw):
        110     try:
    --> 111         return f(*a, **kw)
        112     except py4j.protocol.Py4JJavaError as e:
        113         converted = convert_exception(e.java_exception)
    
    File /opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
        324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
        325 if answer[1] == REFERENCE_TYPE:
    --> 326     raise Py4JJavaError(
        327         "An error occurred while calling {0}{1}{2}.\n".
        328         format(target_id, ".", name), value)
        329 else:
        330     raise Py4JError(
        331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
        332         format(target_id, ".", name, value))
    
    Py4JJavaError: An error occurred while calling o72.load.
    : org.apache.doris.spark.exception.DorisException: Doris FE's response cannot map to schema. res: {"keysType":"UNIQUE_KEYS","properties":[{"name":"chain","aggregation_type":"","comment":"","type":"STRING"},{"name":"token_slug","aggregation_type":"","comment":"","type":"STRING"},{"name":"token_address","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"token_symbol","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"decimals","aggregation_type":"REPLACE","comment":"","type":"INT"},{"name":"type","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"token_type","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"protocol_slug","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"manual_slug","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"erc20_slug","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"coin_gecko_slug","aggregation_type":"REPLACE","comment":"","type":"STRING"},{"name":"logo","aggregation_type":"REPLACE","comment":"","type":"STRING"}],"status":200}
    	at org.apache.doris.spark.rest.RestService.parseSchema(RestService.java:303)
    	at org.apache.doris.spark.rest.RestService.getSchema(RestService.java:279)
    	at org.apache.doris.spark.sql.SchemaUtils$.discoverSchemaFromFe(SchemaUtils.scala:51)
    	at org.apache.doris.spark.sql.SchemaUtils$.discoverSchema(SchemaUtils.scala:41)
    	at org.apache.doris.spark.sql.DorisRelation.lazySchema$lzycompute(DorisRelation.scala:48)
    	at org.apache.doris.spark.sql.DorisRelation.lazySchema(DorisRelation.scala:48)
    	at org.apache.doris.spark.sql.DorisRelation.schema(DorisRelation.scala:52)
    	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:449)
    	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
    	at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307)
    	at scala.Option.getOrElse(Option.scala:189)
    	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)
    	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:225)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    	at py4j.Gateway.invoke(Gateway.java:282)
    	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    	at py4j.commands.CallCommand.execute(CallCommand.java:79)
    	at py4j.GatewayConnection.run(GatewayConnection.java:238)
    	at java.lang.Thread.run(Thread.java:750)
    Caused by: org.codehaus.jackson.map.exc.UnrecognizedPropertyException: Unrecognized field "keysType" (Class org.apache.doris.spark.rest.models.Schema), not marked as ignorable
     at [Source: java.io.StringReader@74af102e; line: 1, column: 14] (through reference chain: org.apache.doris.spark.rest.models.Schema["keysType"])
    	at org.codehaus.jackson.map.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:53)
    	at org.codehaus.jackson.map.deser.StdDeserializationContext.unknownFieldException(StdDeserializationContext.java:267)
    	at org.codehaus.jackson.map.deser.std.StdDeserializer.reportUnknownProperty(StdDeserializer.java:673)
    	at org.codehaus.jackson.map.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:659)
    	at org.codehaus.jackson.map.deser.BeanDeserializer.handleUnknownProperty(BeanDeserializer.java:1365)
    	at org.codehaus.jackson.map.deser.BeanDeserializer._handleUnknown(BeanDeserializer.java:725)
    	at org.codehaus.jackson.map.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:703)
    	at org.codehaus.jackson.map.deser.BeanDeserializer.deserialize(BeanDeserializer.java:580)
    	at org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:2732)
    	at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1863)
    	at org.apache.doris.spark.rest.RestService.parseSchema(RestService.java:295)
    	... 23 more
    

    What You Expected?

    There should be no errors

    How to Reproduce?

    No response

    Anything Else?

    No response

    Are you willing to submit PR?

    • [ ] Yes I am willing to submit a PR!

    Code of Conduct

    opened by myfjdthink 1
  • [Bug] CREATE TEMPORARY VIEW  USING doris got error

    [Bug] CREATE TEMPORARY VIEW USING doris got error

    Search before asking

    • [X] I had searched in the issues and found no similar issues.

    Version

    Doris 1.0\ Spark-Doris-Connector master \ spark3.2.1

    What's Wrong?

    CREATE TEMPORARY VIEW spark_doris
    USING doris
    OPTIONS(
      "table.identifier"="db_01.datax",
      "fenodes"="10.0.105.243:8030",
      "user"="doris",
      "password"="Doris"
    );
    
    [Code: 0, SQL State: ]  Error operating EXECUTE_STATEMENT: org.apache.doris.spark.exception.DorisException: Doris FE's response cannot map to schema. res: "Access denied for default_cluster:[email protected]"
    	at org.apache.doris.spark.rest.RestService.parseSchema(RestService.java:303)
    	at org.apache.doris.spark.rest.RestService.getSchema(RestService.java:279)
    	at org.apache.doris.spark.sql.SchemaUtils$.discoverSchemaFromFe(SchemaUtils.scala:53)
    	at org.apache.doris.spark.sql.SchemaUtils$.discoverSchema(SchemaUtils.scala:43)
    	at org.apache.doris.spark.sql.DorisRelation.lazySchema$lzycompute(DorisRelation.scala:48)
    	at org.apache.doris.spark.sql.DorisRelation.lazySchema(DorisRelation.scala:48)
    	at org.apache.doris.spark.sql.DorisRelation.schema(DorisRelation.scala:52)
    	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:440)
    	at org.apache.spark.sql.execution.datasources.CreateTempViewUsing.run(ddl.scala:98)
    	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
    	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
    	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
    	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
    	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
    	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
    	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
    	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
    	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
    	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
    	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
    	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:219)
    	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
    	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
    	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
    	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
    	at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.$anonfun$executeStatement$1(ExecuteStatement.scala:100)
    	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    	at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.withLocalProperties(ExecuteStatement.scala:159)
    	at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.org$apache$kyuubi$engine$spark$operation$ExecuteStatement$$executeStatement(ExecuteStatement.scala:94)
    	at org.apache.kyuubi.engine.spark.operation.ExecuteStatement$$anon$1.run(ExecuteStatement.scala:127)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.doris.shaded.com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize value of type `int` from String "Access denied for default_cluster:[email protected]": not a valid `int` value
     at [Source: (String)""Access denied for default_cluster:[email protected]""; line: 1, column: 1]
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.exc.InvalidFormatException.from(InvalidFormatException.java:67)
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.DeserializationContext.weirdStringException(DeserializationContext.java:1851)
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.DeserializationContext.handleWeirdStringValue(DeserializationContext.java:1079)
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.std.StdDeserializer._parseIntPrimitive(StdDeserializer.java:762)
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.std.StdDeserializer._deserializeFromString(StdDeserializer.java:288)
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromString(BeanDeserializerBase.java:1495)
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:207)
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:197)
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4593)
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3548)
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3516)
    	at org.apache.doris.spark.rest.RestService.parseSchema(RestService.java:295)
    	... 48 more
    
    

    What You Expected?

    don't know what's wrong

    How to Reproduce?

    No response

    Anything Else?

    No response

    Are you willing to submit PR?

    • [ ] Yes I am willing to submit a PR!

    Code of Conduct

    opened by lordk911 1
  • [Bug-Fix][Spark-Doris-Connector] resolve the problem of writing Chinese garbled characters

    [Bug-Fix][Spark-Doris-Connector] resolve the problem of writing Chinese garbled characters

    Solve the problem of writing Chinese garbled characters

    Proposed changes

    Issue Number: close #xxx

    Problem Summary:

    Describe the overview of changes.

    Checklist(Required)

    1. Does it affect the original behavior: (No)
    2. Has unit tests been added: (No)
    3. Has document been added or modified: (No)
    4. Does it need to update dependencies: (No)
    5. Are there any changes that cannot be rolled back: (No)

    Further comments

    If this is a relatively large or complex change, kick off the discussion at [email protected] by explaining why you chose the solution you did and what alternatives you considered, etc...

    bug 
    opened by hf200012 1
  • [Bug-Fix][Spark-Doris-Connector] fix spark connector unsupport STRING type.

    [Bug-Fix][Spark-Doris-Connector] fix spark connector unsupport STRING type.

    Proposed changes

    Issue Number: close 7990

    Problem Summary:

    fix spark connector unsupport STRING type.

    Checklist(Required)

    1. Does it affect the original behavior: (No)
    2. Has unit tests been added: (Yes)
    3. Has document been added or modified: (No Need)
    4. Does it need to update dependencies: (No)
    5. Are there any changes that cannot be rolled back: (No)

    Further comments

    If this is a relatively large or complex change, kick off the discussion at [email protected] by explaining why you chose the solution you did and what alternatives you considered, etc...

    opened by Wahno 1
  • [fix] cache loader cannot be serialized

    [fix] cache loader cannot be serialized

    Proposed changes

    Since CacheLoader does not implement the Serializable interface, the anonymous inner class used to construct LoadingCache cannot be serialized. The internal class BackendCacheLoader inherits CacheLoader and implements Serializable, which solves the problem that the anonymous internal class of CacheLoading cannot be serialized.

    Issue Number: close #xxx

    Problem Summary:

    Describe the overview of changes.

    Checklist(Required)

    1. Does it affect the original behavior: (Yes/No/I Don't know)
    2. Has unit tests been added: (Yes/No/No Need)
    3. Has document been added or modified: (Yes/No/No Need)
    4. Does it need to update dependencies: (Yes/No)
    5. Are there any changes that cannot be rolled back: (Yes/No)

    Further comments

    If this is a relatively large or complex change, kick off the discussion at [email protected] by explaining why you chose the solution you did and what alternatives you considered, etc...

    opened by gnehil 0
  • [Doc] Update doris build image

    [Doc] Update doris build image

    Proposed changes

    In README.md, building image apache/incubator-doris is out-dated. It should be apache/doris.

    Problem Summary:

    Describe the overview of changes. See the proposed changes

    Checklist(Required)

    1. Does it affect the original behavior: (Yes/No/I Don't know) No
    2. Has unit tests been added: (Yes/No/No Need) Document update, unit-test is not required.
    3. Has document been added or modified: (Yes/No/No Need) Yes, updated README.md
    4. Does it need to update dependencies: (Yes/No) No
    5. Are there any changes that cannot be rolled back: (Yes/No) No

    Further comments

    If this is a relatively large or complex change, kick off the discussion at [email protected] by explaining why you chose the solution you did and what alternatives you considered, etc...

    opened by chncaesar 0
  • Spark 3.3.0 support

    Spark 3.3.0 support

    Proposed changes

    1. Support Spark 3.3.0 Removed log4j 1.x, and uses Spark's Logging trait, which uses log4j 2.x in Sprak 3.3.0. For older Spark versions , this change does not break the compability. Code changes are in ScalaValueReader.scala

    2. Close BufferedReader in DorisStreamLoad When reading Doris BE rest api's response, BufferedReader should be closed in DorisStreamLoad , function: loadBatch

    3. Change spark.minor.version to spark.major.version In pom.xml, the property spark.minor.version is actually spark major version.

    4. source jar to include scala code changes in pom.xml scala-maven-plugin

    Issue Number: close #xxx

    Problem Summary:

    This pr upgrades the code to support Spark 3.3.0, as well as other minor changes.

    Checklist(Required)

    1. Does it affect the original behavior: (Yes/No/I Don't know) No

    2. Has unit tests been added: (Yes/No/No Need) No unit test is added, but tested manually. in spark-sql CLI.

    3. Has document been added or modified: (Yes/No/No Need)

    4. Does it need to update dependencies: (Yes/No)

    5. Are there any changes that cannot be rolled back: (Yes/No)

    Further comments

    If this is a relatively large or complex change, kick off the discussion at [email protected] by explaining why you chose the solution you did and what alternatives you considered, etc...

    Test results:

    Versions:

    • spark-3.3.0-bin-hadoop3
    • JDK 1.8
    1. truncate Doris table from CLI image

    2. Create spark view and insert data into Doris table Start spark-sql CLI in local mode and execute:

    CREATE TEMPORARY VIEW spark_doris
    USING doris
    OPTIONS(
      "table.identifier"="zjc_1.table_hash",
      "fenodes"="localhost:8030",
      "user"="zjc",
      "password"="******"
    );
    insert into spark_doris select 5,15.0;
    
    1. Check data in Doris image

    2. Select data in spark-sql select * from spark_doris; image

    How to build spark-doris-connector for Spark 3.3.0

    Run the command: sh build.sh --spark 3.3.0 --scala 2.12

    opened by chncaesar 1
  • [Feature] support spark catalog

    [Feature] support spark catalog

    Search before asking

    • [ ] I had searched in the issues and found no similar issues.

    Description

    support spark catalog @hf200012

    Use case

    No response

    Related issues

    No response

    Are you willing to submit PR?

    • [ ] Yes I am willing to submit a PR!

    Code of Conduct

    opened by melin 0
  • Bump jackson-databind from 2.13.3 to 2.13.4.1 in /spark-doris-connector

    Bump jackson-databind from 2.13.3 to 2.13.4.1 in /spark-doris-connector

    Bumps jackson-databind from 2.13.3 to 2.13.4.1.

    Commits

    Dependabot compatibility score

    Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


    Dependabot commands and options

    You can trigger Dependabot actions by commenting on this PR:

    • @dependabot rebase will rebase this PR
    • @dependabot recreate will recreate this PR, overwriting any edits that have been made to it
    • @dependabot merge will merge this PR after your CI passes on it
    • @dependabot squash and merge will squash and merge this PR after your CI passes on it
    • @dependabot cancel merge will cancel a previously requested merge and block automerging
    • @dependabot reopen will reopen this PR if it is closed
    • @dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
    • @dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
    • @dependabot use these labels will set the current labels as the default for future PRs for this repo and language
    • @dependabot use these reviewers will set the current reviewers as the default for future PRs for this repo and language
    • @dependabot use these assignees will set the current assignees as the default for future PRs for this repo and language
    • @dependabot use this milestone will set the current milestone as the default for future PRs for this repo and language

    You can disable automated security fix PRs for this repo from the Security Alerts page.

    dependencies 
    opened by dependabot[bot] 0
  • [Enhancement] can provide a json format read data type  like flink-connector

    [Enhancement] can provide a json format read data type like flink-connector

    Search before asking

    • [X] I had searched in the issues and found no similar issues.

    Description

    source data incude chinese word may cause some problem Reason: actual column number is less than schema column number.actual number: 10, column separator: [ ], line delimiter: [ ], can spark like flink connetor read data by json format to avoid the problem JSON格式导入 'sink.properties.format' = 'json' 'sink.properties.read_json_by_line' = 'true'

    Solution

    No response

    Are you willing to submit PR?

    • [ ] Yes I am willing to submit a PR!

    Code of Conduct

    opened by gitfortian 0
  • [Enhancement] Add param rdd_max-partitions for sink

    [Enhancement] Add param rdd_max-partitions for sink

    Proposed changes

    Issue Number: close #50

    Problem Summary:

    Add param rdd_max-partitions for sink

    Checklist(Required)

    1. Does it affect the original behavior: (No)
    2. Has unit tests been added: (No)
    3. Has document been added or modified: (No)
    4. Does it need to update dependencies: (No)
    5. Are there any changes that cannot be rolled back: (No)

    Further comments

    If this is a relatively large or complex change, kick off the discussion at [email protected] by explaining why you chose the solution you did and what alternatives you considered, etc...

    opened by Wahno 0
Releases(3.2_2.12-1.1.0)
Owner
The Apache Software Foundation
The Apache Software Foundation
Oryx 2: Lambda architecture on Apache Spark, Apache Kafka for real-time large scale machine learning

Oryx 2 is a realization of the lambda architecture built on Apache Spark and Apache Kafka, but with specialization for real-time large scale machine l

Oryx Project 1.7k Mar 12, 2021
Apache Flink

Apache Flink Apache Flink is an open source stream processing framework with powerful stream- and batch-processing capabilities. Learn more about Flin

The Apache Software Foundation 20.4k Jan 5, 2023
Apache Spark - A unified analytics engine for large-scale data processing

Apache Spark Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Scala, Java, Python, and R, and an op

The Apache Software Foundation 34.7k Jan 2, 2023
Model import deployment framework for retraining models (pytorch, tensorflow,keras) deploying in JVM Micro service environments, mobile devices, iot, and Apache Spark

The Eclipse Deeplearning4J (DL4J) ecosystem is a set of projects intended to support all the needs of a JVM based deep learning application. This mean

Eclipse Foundation 12.7k Dec 30, 2022
Firestorm is a Remote Shuffle Service, and provides the capability for Apache Spark applications to store shuffle data on remote servers

What is Firestorm Firestorm is a Remote Shuffle Service, and provides the capability for Apache Spark applications to store shuffle data on remote ser

Tencent 246 Nov 29, 2022
Word Count in Apache Spark using Java

Word Count in Apache Spark using Java

Arjun Gautam 2 Feb 24, 2022
DataLink is a new open source solution to bring Flink development to data center.

DataLink 简介 DataLink 是一个创新的数据中台解决方案,它基于 SpringCloud Alibaba 和 Apache Flink 实现。它使用了时下最具影响力的实时计算框架Flink,而且紧跟社区发展,试图只通过一种计算框架来解决离线与实时的问题,实现Sql语义化的批流一体,帮助

null 50 Dec 28, 2022
DataLink is a new open source solution to bring Flink development to data center.

DataLink 简介 DataLink 是一个创新的数据中台解决方案,它基于 SpringCloud Alibaba 和 Apache Flink 实现。它使用了时下最具影响力的实时计算框架Flink,而且紧跟社区发展,试图只通过一种计算框架来解决离线与实时的问题,实现Sql语义化的批流一体,帮助

null 39 Dec 22, 2021
Sparkling Water provides H2O functionality inside Spark cluster

Sparkling Water Sparkling Water integrates H2O's fast scalable machine learning engine with Spark. It provides: Utilities to publish Spark data struct

H2O.ai 939 Jan 2, 2023
Serverless proxy for Spark cluster

Hydrosphere Mist Hydrosphere Mist is a serverless proxy for Spark cluster. Mist provides a new functional programming framework and deployment model f

hydrosphere.io 317 Dec 1, 2022
SparkFE is the LLVM-based and high-performance Spark native execution engine which is designed for feature engineering.

Spark has rapidly emerged as the de facto standard for big data processing. However, it is not designed for machine learning which has more and more limitation in AI scenarios. SparkFE rewrite the execution engine in C++ and achieve more than 6x performance improvement for feature extraction. It guarantees the online-offline consistency which makes AI landing much easier. For further details, please refer to SparkFE Documentation.

4Paradigm 67 Jun 10, 2021
Spark interface for Drsti

Drsti for Spark (ai.jgp.drsti-spark) Spark interface for Drsti Resources Bringing vision to Apache Spark (2021-09-21) introduces Drsti and explains ho

Jean-Georges 3 Sep 22, 2021
Mirror of Apache Mahout

Welcome to Apache Mahout! The goal of the Apache Mahout™ project is to build an environment for quickly creating scalable, performant machine learning

The Apache Software Foundation 2k Jan 4, 2023
Mirror of Apache SystemML

Apache SystemDS Overview: SystemDS is a versatile system for the end-to-end data science lifecycle from data integration, cleaning, and feature engine

The Apache Software Foundation 940 Dec 25, 2022
Mirror of Apache SystemML

Apache SystemDS Overview: SystemDS is a versatile system for the end-to-end data science lifecycle from data integration, cleaning, and feature engine

The Apache Software Foundation 940 Dec 25, 2022
Mirror of Apache Qpid

We have moved to using individual Git repositories for the Apache Qpid components and you should look to those for new development. This Subversion re

The Apache Software Foundation 125 Dec 29, 2022
Flink CDC Connectors is a set of source connectors for Apache Flink

Flink CDC Connectors is a set of source connectors for Apache Flink, ingesting changes from different databases using change data capture (CDC). The Flink CDC Connectors integrates Debezium as the engine to capture data changes.

null 6 Mar 23, 2022
Flink/Spark Connectors for Apache Doris

Flink/Spark Connectors for Apache Doris

The Apache Software Foundation 30 Dec 7, 2022
Flink Connector for Apache Doris(incubating)

Flink Connector for Apache Doris (incubating) Flink Doris Connector More information about compilation and usage, please visit Flink Doris Connector L

The Apache Software Foundation 115 Dec 20, 2022