Dagger is an easy-to-use, configuration over code, cloud-native framework built on top of Apache Flink for stateful processing of real-time streaming data.

Overview

Dagger

build workflow package workflow License Version

Dagger or Data Aggregator is an easy-to-use, configuration over code, cloud-native framework built on top of Apache Flink for stateful processing of real-time streaming data. With Dagger, you don't need to write custom applications or manage resources to process data in real-time. Instead, you can write SQLs to do the processing and analysis on streaming data.

Key Features

Discover why to use Dagger

  • Processing: Dagger can transform, aggregate, join and enrich Protobuf data in real-time.
  • Scale: Dagger scales in an instant, both vertically and horizontally for high performance streaming sink and zero data drops.
  • Extensibility: Add your own sink to dagger with a clearly defined interface or choose from already provided ones.
  • Pluggability: Add custom business logic in form of plugins (UDFs, Transformers, Preprocessors and Post Processors) independent of the core logic.
  • Metrics: Always know what’s going on with your deployment with built-in monitoring of throughput, response times, errors and more.

What problems Dagger solves?

To know more, follow the detailed documentation.

Usage

Explore the following resources to get started with Dagger:

  • Guides provides guidance on creating Dagger with different sinks.
  • Concepts describes all important Dagger concepts.
  • Advance contains details regarding advance features of Dagger.
  • Reference contains details about configurations, metrics and other aspects of Dagger.
  • Contribute contains resources for anyone who wants to contribute to Dagger.
  • Usecase describes examples use cases which can be solved via Dagger.

Running locally

Make sure you have Java8 and local kafka-2.4+ setup pre-installed on your local machine.

# Clone the repo
$ git clone https://github.com/odpf/dagger.git  

# Build the jar
$ ./gradlew clean build 

# Configure env variables
$ cat dagger-core/env/local.properties

# Run a Dagger
$ ./gradlew dagger-core:runFlink

Note: Sample configuration for running a basic dagger can be found here. For detailed configurations, refer here.

Find more detailed steps on local setup here.

Running on cluster

Refer here for details regarding Dagger deployment.

Running tests

# Running unit tests
$ ./gradlew clean test

# Run code quality checks
$ ./gradlew checkstyleMain checkstyleTest

# Cleaning the build
$ ./gradlew clean

Contribute

Development of Dagger happens in the open on GitHub, and we are grateful to the community for contributing bug fixes and improvements. Read below to learn how you can take part in improving Dagger.

Read our contributing guide to learn about our development process, how to propose bug fixes and improvements, and how to build and test your changes to Dagger.

To help you get your feet wet and get you familiar with our contribution process, we have a list of good first issues that contain bugs which have a relatively limited scope. This is a great place to get started.

Credits

This project exists thanks to all the contributors.

License

Dagger is Apache 2.0 licensed.

Comments
  • runFlink fails on fresh checkout repo

    runFlink fails on fresh checkout repo

    Description The failure occurred when I checked out the repo and ran the basic command in the guide. It seems com.tests.TestMessage is somehow not registered in Stencil. I'm on MacOS 12.5.1, with Java8 and Kafka installed. Stacktrace:

    yunfanzhong | dagger $ ./gradlew dagger-core:runFlink
    
    > Task :dagger-core:runFlink FAILED
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/Users/yunfanzhong/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-log4j12/1.7.7/58f588119ffd1702c77ccab6acb54bfb41bed8bd/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/Users/yunfanzhong/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-simple/1.7.25/8dacf9514f0c707cbbcdd6fd699e8940d42fb54e/slf4j-simple-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/Users/yunfanzhong/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-log4j12/1.7.10/b3eeae7d1765f988a1f45ea81517191315c69c9e/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
    09:23:11,421 INFO  io.odpf.dagger.core.config.ConfigurationProviderFactory       - Arguments are:
    com.tests.TestMessage
    io.odpf.dagger.common.exceptions.DescriptorNotFoundException: descriptor not found
            at io.odpf.dagger.common.serde.proto.deserialization.ProtoType.createFieldDescriptor(ProtoType.java:59)
            at io.odpf.dagger.common.serde.proto.deserialization.ProtoType.getProtoFieldDescriptor(ProtoType.java:50)
            at io.odpf.dagger.common.serde.proto.deserialization.ProtoType.getRowType(ProtoType.java:44)
            at io.odpf.dagger.common.serde.proto.deserialization.ProtoDeserializer.<init>(ProtoDeserializer.java:46)
            at io.odpf.dagger.core.deserializer.ProtoDeserializerProvider.getDaggerDeserializer(ProtoDeserializerProvider.java:40)
            at io.odpf.dagger.core.deserializer.DaggerDeserializerFactory.create(DaggerDeserializerFactory.java:28)
            at io.odpf.dagger.core.source.Stream$Builder.build(Stream.java:46)
            at io.odpf.dagger.core.source.StreamsFactory.getStreams(StreamsFactory.java:18)
            at io.odpf.dagger.core.StreamManager.getStreams(StreamManager.java:237)
            at io.odpf.dagger.core.StreamManager.registerSourceWithPreProcessors(StreamManager.java:103)
            at io.odpf.dagger.core.KafkaProtoSQLProcessor.main(KafkaProtoSQLProcessor.java:37)
    Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: io.odpf.dagger.common.exceptions.DescriptorNotFoundException: descriptor not found
            at io.odpf.dagger.core.KafkaProtoSQLProcessor.main(KafkaProtoSQLProcessor.java:43)
    Caused by: io.odpf.dagger.common.exceptions.DescriptorNotFoundException: descriptor not found
            at io.odpf.dagger.common.serde.proto.deserialization.ProtoType.createFieldDescriptor(ProtoType.java:59)
            at io.odpf.dagger.common.serde.proto.deserialization.ProtoType.getProtoFieldDescriptor(ProtoType.java:50)
            at io.odpf.dagger.common.serde.proto.deserialization.ProtoType.getRowType(ProtoType.java:44)
            at io.odpf.dagger.common.serde.proto.deserialization.ProtoDeserializer.<init>(ProtoDeserializer.java:46)
            at io.odpf.dagger.core.deserializer.ProtoDeserializerProvider.getDaggerDeserializer(ProtoDeserializerProvider.java:40)
            at io.odpf.dagger.core.deserializer.DaggerDeserializerFactory.create(DaggerDeserializerFactory.java:28)
            at io.odpf.dagger.core.source.Stream$Builder.build(Stream.java:46)
            at io.odpf.dagger.core.source.StreamsFactory.getStreams(StreamsFactory.java:18)
            at io.odpf.dagger.core.StreamManager.getStreams(StreamManager.java:237)
            at io.odpf.dagger.core.StreamManager.registerSourceWithPreProcessors(StreamManager.java:103)
            at io.odpf.dagger.core.KafkaProtoSQLProcessor.main(KafkaProtoSQLProcessor.java:37)
    
    FAILURE: Build failed with an exception.
    
    * What went wrong:
    Execution failed for task ':dagger-core:runFlink'.
    > Process 'command '/usr/local/Cellar/openjdk@8/1.8.0+345/libexec/openjdk.jdk/Contents/Home/bin/java'' finished with non-zero exit value 1
    
    * Try:
    Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. Run with --scan to get full insights.
    
    * Get more help at https://help.gradle.org
    
    Deprecated Gradle features were used in this build, making it incompatible with Gradle 7.0.
    Use '--warning-mode all' to show the individual deprecation warnings.
    See https://docs.gradle.org/6.6.1/userguide/command_line_interface.html#sec:command_line_warnings
    

    However, all tests pass when I ran ./gradlew clean test. I noticed that com.tests.TestMessage is referenced in many tests. Not sure why the behavior is different in dagger-core:runFlink target.

    To Reproduce Steps to reproduce the behavior: Check out the repo and run ./gradlew dagger-core:runFlink

    Expected behavior Not sure what's the expected behavior, maybe dagger process will keep running until shutdown?

    question 
    opened by yz-chime 6
  • feat: Enable UDF access to SQLTransformers

    feat: Enable UDF access to SQLTransformers

    Currently in Dagger Flink sql-query we can access all the java and python based UDF functions. But the same UDF functions are not accessible in the post-processor(SQLTransformer).

    The Flink API StreamTableEnvironment instance is used to register the UDF function in method call registerFunctions() in StreamManager.java class. Since the same instance is not used to create Flink tables in SQLTransformer.java class, due to which UDFs are not accessible.

    We can solve this by two approaches as below.

    Approach-1: We can introduce the DaggerContext singleton object which holds the StreamExecutionEnvironment, StreamTableEnvironment and Configuration instance variables, we can use these variables throughout the application.This context object gets initialized only once in driver class KafkaProtoSQLProcessor.java.

    The DaggerContext instance is made available to SQLTransformer through constructor(KafkaProtoSQLProcessor -> StreamManager -> PostProcessorFactory -> ParentPostProcessor -> TransformProcessor). With this DaggerContext we can register the Flink table in SQLTransformer.java. And can have access to the UDFs which were registered earlier.

    Approach-2: In SQLTransformer.java class we can create a new instance of StreamManager and call registerFunctions method for each SQLTransformer configuration. With this approach, if the user calls n times SqlTransformer configuration, then n times the registration of UDFs get called and n times Objects are initialized.

    Here we have followed Approach-1.

    opened by Shreyansh228 2
  • feat: docker compose for dagger quickstart

    feat: docker compose for dagger quickstart

    fixes #207

    Acceptance Criteria:

    GIVEN | WHEN | THEN -- | -- | -- Jack is a new user who wants to try out Dagger locally quickly, AND,Jack doesn’t want to do any complex installation of dependencies like Kafka or Apache Flink locally | Jack runs the Docker Compose file by executing docker compose up in the directory containing the docker compose file | Run Dagger jar in a containerA local kafka cluster should be setup with just one broker containerA kafka producer container should be spawned which continuosly produces messages to some source Kafka topicA bare minimum dagger job with just a 30 sec aggregateSQL query should get submitted to the local Flink cluster with both source and sink as KafkaThe dagger should continue to consume messages from the source topic and publish messages to the sink topic until Jack explicitly executes docker compose down Jack is a new user who wants to try out Dagger locally quickly, AND,Jack doesn’t want to do any complex installation of dependencies like Kafka or Apache Flink locally | Jack runs the Docker Compose file by executing docker compose down in the directory containing the docker compose file | All the spawned containers: Dagger, Kafka, etc should be closed down gracefully.

    opened by sumitaich1998 2
  • feat: Conglomerate and update dagger dependencies

    feat: Conglomerate and update dagger dependencies

    Summary As part of the many ongoing features such as parquet, python-udf, etc in Dagger, there will be additions in the dependencies. Hence, the intention is to combine all such planned dependencies and add them to the respective modulebuild.gradle files of Dagger so that the dependencies jar (of the format dagger-core-x.x.x-dependencies.jar) as generated by the gradle task ./gradlew dependenciesJar has all the dependencies packaged together.

    Proposed solution This will reduce migration effort later on when all the feature branches will get merged. Only the main dagger image will need to get changed.

    Additional context N/A

    opened by Meghajit 2
  • Dagger test case are failing in EsResponseHandlerTest while setting it up locally ./gradlew clean build

    Dagger test case are failing in EsResponseHandlerTest while setting it up locally ./gradlew clean build

    io.odpf.dagger.core.processors.external.es.EsResponseHandlerTest > shouldCompleteResultFutureWithInputForPrimitiveData FAILED io.odpf.dagger.core.processors.external.es.EsResponseHandlerTest > shouldCompleteResultFutureExceptionallyWhenPathDoesNotExists FAILED io.odpf.dagger.core.processors.external.es.EsResponseHandlerTest > shouldHandleParseExceptionAndReturnInput FAILED io.odpf.dagger.core.processors.external.es.EsResponseHandlerTest > shouldNotPopulateResultAsObjectIfTypeIsNotPassedAndRetainResponseTypeIsFalse FAILED io.odpf.dagger.core.processors.external.es.EsResponseHandlerTest > shouldCompleteResultFutureWithInput FAILED io.odpf.dagger.core.processors.external.es.EsResponseHandlerTest > shouldHandleIOExceptionAndReturnInput FAILED io.odpf.dagger.core.processors.external.es.EsResponseHandlerTest > shouldHandleExceptionAndReturnInput FAILED io.odpf.dagger.core.processors.external.es.EsResponseHandlerTest > shouldCompleteResultFutureWithInputAsObjectIfTypeIsNotPassedAndRetainResponseTypeIsTrue FAILED io.odpf.dagger.core.processors.external.es.EsResponseHandlerTest > shouldHandleResponseParsingIOExceptionAndReturnInput FAILED

    opened by shubhangi-1 2
  • feat: Add JsonUpdate and JsonQuery scalar udfs

    feat: Add JsonUpdate and JsonQuery scalar udfs

    As of the current version of Flink 1.14.3, to update or to add particular value in input json object using JsonPath and to query child object from parent json using JsonPath are missing. These UDFs will help us in handling such use-cases in flink-sql.

    enhancement 
    opened by Shreyansh228 1
  • Use of python UDF with external third-party dependencies is not supported in Flink SQL-query.

    Use of python UDF with external third-party dependencies is not supported in Flink SQL-query.

    Description In case of Flink python UDF having python third-party dependencies, if used in Flink sql query job is getting failed with “ModuleNotFoundError: No module named xxxxxxx”

    To Reproduce Step1: Mention the third-party dependencies in requirements.txt e.g jsonpath-ng==1.5.3

    Step2: Write python udf and save with function name i.e json_update_jpath.py e.g

    from jsonpath_ng import parse
    import json
    from pyflink.table import DataTypes
    from pyflink.table.udf import udf
    
    @udf(result_type=DataTypes.STRING())
    def json_update_jpath(json_event, obj, jpath):
        json_data = json.loads(json_event)
        jpath_expr = parse(jpath)
        jpath_expr.update(json_data, obj)
        json_out_string = json.dumps(json_data)
        return json_out_string
    

    Step3: Mention these python udf configuration

    {
    	"PYTHON_FILES": "gs://*****/python-udf/master/json_update_jpath.py",
    	"PYTHON_REQUIREMENTS": "gs://*****/python-udf/master/requirements.txt",
    	"PYTHON_ARCHIVES": "gs://*****/python-udf/master/data.zip#data",
    	"PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE": "10000",
    	"PYTHON_FN_EXECUTION_BUNDLE_SIZE": "100000",
    	"PYTHON_FN_EXECUTION_BUNDLE_TIME": "1000"
    }
    

    Step4: Run Dagger job and use the UDF in Flink SQL query

    SELECT
      json_update_jpath(
        '{"k1":"v1","k2":"v2","k3":"v3"}',
        'replaced_value',
        '$.k2'
      ) as json_updated_event
    FROM
      `data_streams_0` 
    

    After following these steps below exceptions occur in Job-manager. Cannot instantiate user-defined function 'json_update_jpath' ModuleNotFoundError: No module named 'jsonpath_ng'.

    Detailed error

    org.apache.flink.table.api.ValidationException: SQL validation failed. Cannot instantiate user-defined function 'json_update_jpath'.
    	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:164)
    	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
    	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:215)
    	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
    	at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:716)
    	at io.odpf.dagger.core.StreamManager.registerOutputStream(StreamManager.java:185)
    	at io.odpf.dagger.core.KafkaProtoSQLProcessor.main(KafkaProtoSQLProcessor.java:33)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
    	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
    	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
    	at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
    	at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
    	at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:104)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
    	at java.lang.Thread.run(Thread.java:750)
    Caused by: org.apache.flink.table.api.ValidationException: Cannot instantiate user-defined function 'json_update_jpath'.
    	at org.apache.flink.table.functions.UserDefinedFunctionHelper.instantiateFunction(UserDefinedFunctionHelper.java:219)
    	at org.apache.flink.table.catalog.FunctionCatalog.getFunctionDefinition(FunctionCatalog.java:659)
    	at org.apache.flink.table.catalog.FunctionCatalog.resolvePreciseFunctionReference(FunctionCatalog.java:562)
    	at org.apache.flink.table.catalog.FunctionCatalog.lambda$resolveAmbiguousFunctionReference$3(FunctionCatalog.java:624)
    	at java.util.Optional.orElseGet(Optional.java:267)
    	at org.apache.flink.table.catalog.FunctionCatalog.resolveAmbiguousFunctionReference(FunctionCatalog.java:624)
    	at org.apache.flink.table.catalog.FunctionCatalog.lookupFunction(FunctionCatalog.java:362)
    	at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:97)
    	at org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1183)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1169)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1200)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1169)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:945)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
    	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:159)
    	... 18 more
    Caused by: java.lang.IllegalStateException: Instantiating python function 'python_udfs.json_update_jpath.json_update_jpath' failed.
    	at org.apache.flink.table.functions.python.utils.PythonFunctionUtils.getPythonFunction(PythonFunctionUtils.java:48)
    	at org.apache.flink.table.functions.UserDefinedFunctionHelper.instantiateFunction(UserDefinedFunctionHelper.java:206)
    	... 33 more
    Caused by: java.lang.reflect.InvocationTargetException
    	at sun.reflect.GeneratedMethodAccessor125.invoke(Unknown Source)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at org.apache.flink.table.functions.python.utils.PythonFunctionUtils.getPythonFunction(PythonFunctionUtils.java:45)
    	... 34 more
    Caused by: org.apache.flink.api.python.shaded.py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
      File "/opt/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 2410, in _call_proxy
        return_value = getattr(self.pool[obj_id], method)(*params)
      File "/opt/flink/opt/python/pyflink.zip/pyflink/java_gateway.py", line 169, in getPythonFunction
        udf_wrapper = getattr(importlib.import_module(moduleName), objectName)
      File "/usr/local/lib/python3.8/importlib/__init__.py", line 127, in import_module
        return _bootstrap._gcd_import(name[level:], package, level)
      File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
      File "<frozen importlib._bootstrap>", line 991, in _find_and_load
      File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
      File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
      File "<frozen importlib._bootstrap_external>", line 843, in exec_module
      File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
      File "/tmp/pyflink/207a6c73-f7ec-4d01-b2ba-ef3a82c18034/26bad213-4e33-4d22-9900-731ffba2fdb1/python_udfs/python_udfs/json_update_jpath.py", line 1, in <module>
        from jsonpath_ng import parse
    ModuleNotFoundError: No module named 'jsonpath_ng'
    
    	at org.apache.flink.api.python.shaded.py4j.Protocol.getReturnValue(Protocol.java:476)
    	at org.apache.flink.api.python.shaded.py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
    	at com.sun.proxy.$Proxy260.getPythonFunction(Unknown Source)
    	at org.apache.flink.client.python.PythonFunctionFactoryImpl.getPythonFunction(PythonFunctionFactoryImpl.java:47)
    	at org.apache.flink.client.python.PythonFunctionFactory.getPythonFunction(PythonFunctionFactory.java:131)
    	... 38 more
    2022-10-19 12:31:27,580 WARN  org.apache.flink.client.deployment.application.DetachedApplicationRunner - Could not execute application: 
    org.apache.flink.client.program.ProgramInvocationException: org.apache.flink.table.api.ValidationException: SQL validation failed. Cannot instantiate user-defined function 'json_update_jpath'.
    	at io.odpf.dagger.core.KafkaProtoSQLProcessor.main(KafkaProtoSQLProcessor.java:37) ~[?:?]
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_322]
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_322]
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_322]
    	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_322]
    	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    	at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    	at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    	at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:104) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) [?:1.8.0_322]
    	at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322]
    Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. Cannot instantiate user-defined function 'json_update_jpath'.
    	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:164) ~[flink-table_2.12-1.14.3.jar:1.14.3]
    	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107) ~[flink-table_2.12-1.14.3.jar:1.14.3]
    	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:215) ~[flink-table_2.12-1.14.3.jar:1.14.3]
    	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) ~[flink-table_2.12-1.14.3.jar:1.14.3]
    	at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:716) ~[flink-table_2.12-1.14.3.jar:1.14.3]
    	at io.odpf.dagger.core.StreamManager.registerOutputStream(StreamManager.java:185) ~[?:?]
    	at io.odpf.dagger.core.KafkaProtoSQLProcessor.main(KafkaProtoSQLProcessor.java:33) ~[?:?]
    	... 12 more
    

    Expected behavior The expected output of the sql query suppose to be {"k1":"v1","k2":"replaced_value","k3":"v3"}

    Analysis We have gone through the logs of task-manager and job-manager during the application startup. The python third-party dependencies as mentioned in requirements.txt file are properly installed in task-manager container, however not in jobmanager. If you manually install the third-party dependency in job-manager container and restart the job, the job is properly started with the udf function properly resulting in the data.

    We are working on such use-case where python udf having third-party dependencies are needed. Please connect with us for more details.

    opened by Shreyansh228 1
  • feat: Enable UDF access to SQLTransformers

    feat: Enable UDF access to SQLTransformers

    Currently in Dagger Flink sql-query we can access all the java and python based UDF functions. But the same UDF functions are not accessible in the post-processor(SQLTransformer).

    The Flink API StreamTableEnvironment instance is used to register the UDF function in method call registerFunctions() in StreamManager.java class. Since the same instance is not used to create Flink tables in SQLTransformer.java class, due to which UDFs are not accessible.

    We can solve this by two approaches as below.

    Approach-1: We can introduce the DaggerContext singleton object which holds the StreamExecutionEnvironment, StreamTableEnvironment and Configuration instance variables, we can use these variables throughout the application.This context object gets initialized only once in driver class KafkaProtoSQLProcessor.java.

    We can call the DaggerContext object as a static method call in the Transformer.java interface. With this DaggerContext we can register the Flink table in SQLTransformer.java. And can have access to the UDFs which were registered earlier.

    Approach-2: In SQLTransformer.java class we can create a new instance of StreamManager and call registerFunctions method for each SQLTransformer configuration. With this approach, if the user calls n times SqlTransformer configuration, then n times the registration of UDFs get called and n times Objects are initialized.

    Here we have followed Approach-1.

    enhancement 
    opened by Shreyansh228 1
  • feat: Capability in dagger to consume from ACL enabled kafka clusters

    feat: Capability in dagger to consume from ACL enabled kafka clusters

    we are working on a centralized Kafka cluster that would contain data from other entities (Gojek/Gofin..etc). For compliance reasons, the cluster needs to be ACL enabled. Also, there is a need to run daggers that would consume from this ACL-enabled cluster. We are planning to use securtiy_protocol: SASL_PLAINTEXT sasl_mechanism: SCRAM-SHA-512/SCRAM-SHA-256 [We took the decision to use SCRAM for the ease of adding new users] Apart from these configs we would also need to have jaas-config in the consumer side containing the user credentials. We would be mounting the jaas-config to that particular flink cluster which would be consuming from ACL-enabled Kafka clusters.

    opened by mayankrai09 1
  • docs: add documentation for BQ sink in Dagger

    docs: add documentation for BQ sink in Dagger

    Summary

    Documentation for BQ sink in Dagger

    Proposed solution

    This will help users in setting up the sink as well as debug issues

    Additional context

    1. Add config document in dagger

    2. Add new sink document

    3. Update this sink information across the documentation

    opened by Meghajit 1
  • feat: exclude httpclient dependency from depot in minimalJar

    feat: exclude httpclient dependency from depot in minimalJar

    Summary Currently, for Parquet Dagger Source, depot has been added to minimalJar. Depot comes with its own transitive dependencies on org.apache.httpcomponents:httpclient with version 4.5.13. However, dagger-functions module uses org.elasticsearch.client:elasticsearch-rest-client:6.6.1 which has a transitive dependency on org.apache.httpcomponents:httpclient with version 4.5.6. This causes the classloader in Flink jobmanager to get confused at runtime when the job is submitted, as there are two different modules using 2 versions of the same dependency. It throws a LinkageError.

    Proposed solution To exclude org.apache.httpcomponents:httpclient dependency from depot in build.gradle of dagger-core module.

    Additional context This is the error that comes Screenshot 2022-07-19 at 3 27 50 PM

    opened by Meghajit 1
  • Gradle script does not run on M1 chipsets

    Gradle script does not run on M1 chipsets

    Description Mentioned version of protoc does not have aarch64 files, that fails the gradle build

    To Reproduce ./gradlew clean dagger-common:generateTestProto

    Expected behavior Proto file generation

    Behavior

    Execution failed for task ':dagger-common:generateTestProto'.
    > Could not resolve all files for configuration ':dagger-common:protobufToolsLocator_protoc'.
       > Could not find protoc-3.1.0-osx-aarch_64.exe (com.google.protobuf:protoc:3.1.0).
         Searched in the following locations:
             https://repo.maven.apache.org/maven2/com/google/protobuf/protoc/3.1.0/protoc-3.1.0-osx-aarch_64.exe
    
    

    Additional context

    opened by stym06 4
  • doc: improve quickstart documentation and related setup

    doc: improve quickstart documentation and related setup

    Summary As of now, there is some issues in the local setup of Dagger:

    1. The local.properties assumes there is a TestMessage proto present. However, it is not the case. This causes the dagger not to start properly.
    2. The othertest protos which come inbuilt with Dagger are in test module which are not usable in local.properties.
    3. The watermark delay is set to 0 in the local.properties which might cause issues in some cases, such as when events are coming in late due to issues with kafka. Also, this watermark delay needs to be much higher when Parquet daggers are being setup.
    4. With multiple ways people can create daggers like with parquet source, kafka source, with preprocessors, etc, ideally there should be specific examples on how to do each

    Proposed solution

    These are some suggestions based on the feedback received from internal discussions as well as from users:

    1. Creating a Dockerfile/Docker compose with all dependencies bundled( local kafka cluster, sample protos, etc). User just need to run it and check the logs for output. No manual setup required except Docker.

    2. Organizing the documentation explaining how to do different things with Dagger in a single place. For example,

    3. Create a Dagger with Kafka as a source

    4. Create a Dagger with Parquet Files as a source

    5. Create a Dagger using preprocessors

    Currently, the reference is not streamlined and is organised in multiple sections.

    What it will achieve

    Following benefits are expected:

    1. Faster onboarding of users: People can utilise the examples to try out different features of daggers without help by the committers/maintainers
    2. Wider adoption: Faster ways to try out different features by minimal setup means people can quickly check whether the product satisfies their needs or not. Will help in a faster feedback loop.

    Additional context None

    opened by Meghajit 0
  • feat: implement IndexOrderedSplitAssigner

    feat: implement IndexOrderedSplitAssigner

    As part of this issue, we want to add support for configuring a split assigner which can assign splits to source readers in an almost-deterministic order as decided by the user.

    The order will be decided by the order of file paths specified as part of the StreamConfig SOURCE_PARQUET_FILE_PATHS.

    opened by Meghajit 1
  • feat: Parquet DataSource should provide ability to read multiple GCS buckets for creating multiple streams

    feat: Parquet DataSource should provide ability to read multiple GCS buckets for creating multiple streams

    As part of this issue, want to add support for handling multiple streams for Parquet Data Source. That is, users should be able to specify multiple GCS URLs. Dagger should create a parquet data source, and hence a data stream for each of these GCS URLs.

    This issue is needed so that the user can do joins and other operations with multiple streams on Parquet DataSource similar to KafkaSource.

    opened by Meghajit 1
Releases(v0.6.3)
  • v0.6.3(Nov 25, 2022)

  • v0.6.2(Nov 16, 2022)

  • v0.6.1(Nov 8, 2022)

  • v0.6.0(Nov 7, 2022)

    What's Changed

    • feat: Capability in dagger to consume from ACL enabled kafka clusters by @mayankrai09 in https://github.com/odpf/dagger/pull/195
    • feat: Enable UDF access to SQLTransformers by @Shreyansh228 in https://github.com/odpf/dagger/pull/202
    • feat: docker compose for dagger quickstart by @sumitaich1998 in https://github.com/odpf/dagger/pull/206

    Full Changelog: https://github.com/odpf/dagger/compare/0.5.1...v0.6.0

    Source code(tar.gz)
    Source code(zip)
    dagger-py-functions.zip(1.58 KB)
  • 0.5.1(Sep 27, 2022)

    What's Changed

    • docs: add quickstart documentation by @Meghajit in https://github.com/odpf/dagger/pull/193
    • feat: Add statsdreporter to bq sink by @lavkesh in https://github.com/odpf/dagger/pull/196
    • chore: bump up version to 0.5.1 by @Meghajit in https://github.com/odpf/dagger/pull/198

    Full Changelog: https://github.com/odpf/dagger/compare/v0.5.0...0.5.1

    Source code(tar.gz)
    Source code(zip)
    dagger-py-functions.zip(1.58 KB)
  • v0.5.0(Sep 20, 2022)

    What's Changed

    • docs: update documentation to account for parquet source by @Meghajit in https://github.com/odpf/dagger/pull/190
    • feat: Bigquery sink using depot (#154) by @lavkesh in https://github.com/odpf/dagger/pull/185
    • chore: version bump by @lavkesh in https://github.com/odpf/dagger/pull/194

    Full Changelog: https://github.com/odpf/dagger/compare/v0.4.1...v0.5.0

    Source code(tar.gz)
    Source code(zip)
    dagger-py-functions.zip(1.58 KB)
  • v0.4.1(Jul 26, 2022)

    What's Changed

    • fix: handle composite map types by @harikrishnakanchi in https://github.com/odpf/dagger/pull/182 and #183
    • chore: bump up version to 0.4.1 by @Meghajit in https://github.com/odpf/dagger/pull/184

    Full Changelog: https://github.com/odpf/dagger/compare/v0.4.0...v0.4.1

    Source code(tar.gz)
    Source code(zip)
    dagger-py-functions.zip(1.58 KB)
  • v0.4.0(Jul 22, 2022)

    What's Changed

    This release includes support for configuring Parquet Files as a data source.

    • feat: Add support for ParquetSource in Dagger capable of parsing primitive data types by @Meghajit in https://github.com/odpf/dagger/pull/121
    • refactor: rename packages and files in dagger-common serde by @Meghajit in https://github.com/odpf/dagger/pull/138
    • feat: Add ability to read and process complex/nested data types from a parquet file in Parquet Data Source: enums, repeated enums, message, repeated message, repeated primitives by @Meghajit in https://github.com/odpf/dagger/pull/140
    • feat: Handle complex/nested data types from a parquet file in Parquet Data Source: Struct, Repeated Struct, Maps and Timestamp of type SimpleGroup by @Meghajit in https://github.com/odpf/dagger/pull/148
    • feat: add time range filter for dagger parquet source by @prakharmathur82 in https://github.com/odpf/dagger/pull/147
    • feat: handle invalid parquet data source configs by @Meghajit in https://github.com/odpf/dagger/pull/152
    • Add exception handling for empty SOURCE_DETAILS config by @Meghajit in https://github.com/odpf/dagger/pull/162
    • Feat: ability to read from GCS buckets by @MayurGubrele in https://github.com/odpf/dagger/pull/156
    • doc: Add documentation+FAQs for Parquet DataSource by @Meghajit in https://github.com/odpf/dagger/pull/153
    • feat: implement state persistence in parquet source by @Meghajit in https://github.com/odpf/dagger/pull/163
    • feat: add parquet source metrics by @Meghajit in https://github.com/odpf/dagger/pull/174
    • feat: exclude dependencies from depot in minimalJar by @Meghajit in https://github.com/odpf/dagger/pull/180
    • chore: version bump to 0.4.0 by @Meghajit in https://github.com/odpf/dagger/pull/181

    Full Changelog: https://github.com/odpf/dagger/compare/v0.3.0...v0.4.0

    Source code(tar.gz)
    Source code(zip)
    dagger-py-functions.zip(1.58 KB)
  • v0.3.0(Jul 12, 2022)

    What's Changed

    • doc: rfc for python udf by @gauravsinghania in https://github.com/odpf/dagger/pull/129
    • feat:add support python udf (scalar, table and aggregate) by @jesrypandawa in https://github.com/odpf/dagger/pull/172
    • fix: fix docs for udf contribution by @jesrypandawa in https://github.com/odpf/dagger/pull/175

    Full Changelog: https://github.com/odpf/dagger/compare/v0.2.9...v0.3.0

    Source code(tar.gz)
    Source code(zip)
    dagger-py-functions.zip(1.58 KB)
  • v0.2.9(Jun 16, 2022)

    What's Changed

    • feat: add dependencies in build.gradle for various features by @Meghajit in https://github.com/odpf/dagger/pull/167

    Full Changelog: https://github.com/odpf/dagger/compare/v0.2.8...v0.2.9

    Source code(tar.gz)
    Source code(zip)
  • v0.2.9-rc1(Jun 13, 2022)

  • v0.2.8(May 13, 2022)

  • v0.2.7(May 6, 2022)

  • v0.2.6(Apr 7, 2022)

  • v0.2.5(Feb 24, 2022)

  • v0.2.4(Feb 23, 2022)

  • v0.2.3(Feb 23, 2022)

  • v0.2.2(Feb 23, 2022)

  • v0.2.1(Feb 18, 2022)

    Hotfixes for the upgrade. Fixes the following:

    1. Handles the Timestamp cast issue for Longbow
    2. Adds support for casting primitive data when serialising data for Kafka
    Source code(tar.gz)
    Source code(zip)
  • v0.2.0(Feb 3, 2022)

    • Flink version bump from 1.9 to 1.14.3.
    • JSON Datatype support.
    • refactor UDFs with new Flink contracts.
    • CI and integration test fixes for the upgrade.
    • updates in watermark preserver logic.
    • stencil upgrade.
    Source code(tar.gz)
    Source code(zip)
  • v0.1.3(Dec 15, 2021)

  • v0.1.2(Oct 1, 2021)

  • v0.1.1(Aug 4, 2021)

  • v0.1.0(Jul 5, 2021)

Owner
Open DataOps Foundation
Building and promoting free and open-source modern data platform.
Open DataOps Foundation
FLiP: StreamNative: Cloud-Native: Streaming Analytics Using Apache Flink SQL on Apache Pulsar

StreamingAnalyticsUsingFlinkSQL FLiP: StreamNative: Cloud-Native: Streaming Analytics Using Apache Flink SQL on Apache Pulsar Running on NVIDIA XAVIER

Timothy Spann 5 Dec 19, 2021
Open data platform based on flink. Now scaleph is supporting data integration with seatunnel on flink

scaleph The Scaleph project features data integration, develop, job schedule and orchestration and trys to provide one-stop data platform for develope

null 151 Jan 3, 2023
Fast and reliable message broker built on top of Kafka.

Hermes Hermes is an asynchronous message broker built on top of Kafka. We provide reliable, fault tolerant REST interface for message publishing and a

Allegro Tech 742 Jan 3, 2023
SeaTunnel is a distributed, high-performance data integration platform for the synchronization and transformation of massive data (offline & real-time).

SeaTunnel SeaTunnel was formerly named Waterdrop , and renamed SeaTunnel since October 12, 2021. SeaTunnel is a very easy-to-use ultra-high-performanc

The Apache Software Foundation 4.4k Jan 2, 2023
Template for an Apache Flink project.

Minimal Apache Flink Project Template It contains some basic jobs for testing if everything runs smoothly. How to Use This Repository Import this repo

Timo Walther 2 Sep 20, 2022
Pipeline for Visualization of Streaming Data

Seminararbeit zum Thema Visualisierung von Datenströmen Diese Arbeit entstand als Seminararbeit im Rahmen der Veranstaltung Event Processing an der Ho

Domenic Cassisi 1 Feb 13, 2022
Real Time communication library using Animated Gifs as a transport™

gifsockets "This library is the websockets of the '90s" - Somebody at Hacker News. This library shows how to achieve realtime text communication using

Alvaro Videla 1.8k Dec 17, 2022
A distributed event bus that implements a RESTful API abstraction on top of Kafka-like queues

Nakadi Event Broker Nakadi is a distributed event bus broker that implements a RESTful API abstraction on top of Kafka-like queues, which can be used

Zalando SE 866 Dec 21, 2022
Apache Camel is an open source integration framework that empowers you to quickly and easily integrate various systems consuming or producing data.

Apache Camel Apache Camel is a powerful, open-source integration framework based on prevalent Enterprise Integration Patterns with powerful bean integ

The Apache Software Foundation 4.7k Dec 31, 2022
An easy-to-use wrapper for many storage systems.

Data Store An easy-to-use wrapper for redis' cached storage system. (support for more data types coming soon) Note: This project is unfinished, and th

Subham 4 Jul 17, 2022
Flink Demo

flink-demo minimum code just run flink-ds-connector DataStream API usage kafka es jdbc file row string parquet avro avro custom avro flink-sql-connect

hiscat 40 Dec 4, 2022
RocketMQ-on-Pulsar - A protocol handler that brings native RocketMQ protocol to Apache Pulsar

RocketMQ on Pulsar(RoP) RoP stands for RocketMQ on Pulsar. Rop broker supports RocketMQ-4.6.1 protocol, and is backed by Pulsar. RoP is implemented as

StreamNative 88 Jan 4, 2023
A JVM library to use RabbitMQ as an embedded service

Embedded RabbitMQ Compatibility: Builds: Linux OS X Windows Reports: Dist: Social: This library allows for the use of various RabbitMQ versions as if

Alejandro Rivera 88 Dec 25, 2021
SMS app based on QKSMS. DISCLAIMER: This project is intended for my own use. No issues are accepted

Messages Messages is an open source replacement to the stock messaging app on Android. DISCLAIMER: Unlike most other projects, this project is for my

Muntashir Al-Islam 13 Dec 16, 2022
Mirror of Apache Kafka

Apache Kafka See our web site for details on the project. You need to have Java installed. We build and test Apache Kafka with Java 8, 11 and 15. We s

The Apache Software Foundation 23.9k Jan 5, 2023
Mirror of Apache RocketMQ

Apache RocketMQ Apache RocketMQ is a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level c

The Apache Software Foundation 18.5k Dec 28, 2022
Apache Pulsar - distributed pub-sub messaging system

Pulsar is a distributed pub-sub messaging platform with a very flexible messaging model and an intuitive client API. Learn more about Pulsar at https:

The Apache Software Foundation 12.1k Jan 4, 2023
Mirror of Apache ActiveMQ

Welcome to Apache ActiveMQ Apache ActiveMQ is a high performance Apache 2.0 licensed Message Broker and JMS 1.1 implementation. Getting Started To hel

The Apache Software Foundation 2.1k Jan 2, 2023
Mirror of Apache ActiveMQ Artemis

ActiveMQ Artemis This file describes some minimum 'stuff one needs to know' to get started coding in this project. Source For details about the modify

The Apache Software Foundation 824 Dec 26, 2022