Apache Flink

Overview

Apache Flink

Apache Flink is an open source stream processing framework with powerful stream- and batch-processing capabilities.

Learn more about Flink at https://flink.apache.org/

Features

  • A streaming-first runtime that supports both batch processing and data streaming programs

  • Elegant and fluent APIs in Java and Scala

  • A runtime that supports very high throughput and low event latency at the same time

  • Support for event time and out-of-order processing in the DataStream API, based on the Dataflow Model

  • Flexible windowing (time, count, sessions, custom triggers) across different time semantics (event time, processing time)

  • Fault-tolerance with exactly-once processing guarantees

  • Natural back-pressure in streaming programs

  • Libraries for Graph processing (batch), Machine Learning (batch), and Complex Event Processing (streaming)

  • Built-in support for iterative programs (BSP) in the DataSet (batch) API

  • Custom memory management for efficient and robust switching between in-memory and out-of-core data processing algorithms

  • Compatibility layers for Apache Hadoop MapReduce

  • Integration with YARN, HDFS, HBase, and other components of the Apache Hadoop ecosystem

Streaming Example

case class WordWithCount(word: String, count: Long)

val text = env.socketTextStream(host, port, '\n')

val windowCounts = text.flatMap { w => w.split("\\s") }
  .map { w => WordWithCount(w, 1) }
  .keyBy("word")
  .window(TumblingProcessingTimeWindow.of(Time.seconds(5)))
  .sum("count")

windowCounts.print()

Batch Example

case class WordWithCount(word: String, count: Long)

val text = env.readTextFile(path)

val counts = text.flatMap { w => w.split("\\s") }
  .map { w => WordWithCount(w, 1) }
  .groupBy("word")
  .sum("count")

counts.writeAsCsv(outputPath)

Building Apache Flink from Source

Prerequisites for building Flink:

  • Unix-like environment (we use Linux, Mac OS X, Cygwin, WSL)
  • Git
  • Maven (we recommend version 3.2.5 and require at least 3.1.1)
  • Java 8 or 11 (Java 9 or 10 may work)
git clone https://github.com/apache/flink.git
cd flink
mvn clean package -DskipTests # this will take up to 10 minutes

Flink is now installed in build-target.

NOTE: Maven 3.3.x can build Flink, but will not properly shade away certain dependencies. Maven 3.1.1 creates the libraries properly. To build unit tests with Java 8, use Java 8u51 or above to prevent failures in unit tests that use the PowerMock runner.

Developing Flink

The Flink committers use IntelliJ IDEA to develop the Flink codebase. We recommend IntelliJ IDEA for developing projects that involve Scala code.

Minimal requirements for an IDE are:

  • Support for Java and Scala (also mixed projects)
  • Support for Maven with Java and Scala

IntelliJ IDEA

The IntelliJ IDE supports Maven out of the box and offers a plugin for Scala development.

Check out our Setting up IntelliJ guide for details.

Eclipse Scala IDE

NOTE: From our experience, this setup does not work with Flink due to deficiencies of the old Eclipse version bundled with Scala IDE 3.0.3 or due to version incompatibilities with the bundled Scala version in Scala IDE 4.4.1.

We recommend to use IntelliJ instead (see above)

Support

Don’t hesitate to ask!

Contact the developers and community on the mailing lists if you need any help.

Open an issue if you found a bug in Flink.

Documentation

The documentation of Apache Flink is located on the website: https://flink.apache.org or in the docs/ directory of the source code.

Fork and Contribute

This is an active open-source project. We are always open to people who want to use the system or contribute to it. Contact us if you are looking for implementation tasks that fit your skills. This article describes how to contribute to Apache Flink.

About

Apache Flink is an open source project of The Apache Software Foundation (ASF). The Apache Flink project originated from the Stratosphere research project.

Comments
  • [FLINK-2030][FLINK-2274][core][utils]Histograms for Discrete and Continuous valued data

    [FLINK-2030][FLINK-2274][core][utils]Histograms for Discrete and Continuous valued data

    This implements the Online Histograms for both categorical and continuous data. For continuous data, we emulate a continuous probability distribution which supports finding cumulative sum upto a particular value, and finding value upto a specific cumulative probability [Quantiles]. For categorical fields, we emulate a probability mass function which supports finding the probability associated with every class. The continuous histogram follows this paper: http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf

    Note: This is a sub-task of https://issues.apache.org/jira/browse/FLINK-1727 which already has a PR pending review at https://github.com/apache/flink/pull/710.

    Edit: This adds methods to evaluate statistics on data sets of vectors, like column wise statistics. These include minimum, maximum, mean, variance, entropy, gini impurity, etc. [FLINK-2379]

    Edit: Splitting the PR to move the Statistics part to another PR. #1032

    component=API/DataSet 
    opened by sachingoel0101 111
  • [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support

    [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support

    What is the purpose of the change

    The goal here is to add RecoverableWriter support for Google Storage (GS), to allow writing to GS buckets from Flink's StreamingFileSink. To do this, we must implement and register a FileSystem implementation for GS and then provide a RecoverableWriter implementation via createRecoverableWriter.

    Fortunately, Google supplies a Hadoop FileSystem implementation via GoogleHadoopFileSystem, which can already be used with Flink. So we can wrap this with Flink's HadoopFileSystem to implement the core file-system functionality.

    At a high level, to implement a recoverable writer, one must provide a RecoverableWriter implementation that does the following:

    • Creates an output data stream from a URI (i.e. gs://bucket/foo/bar), i.e. an implementation of FSDataOutputStream
    • Allows writing to the output data stream in multiple chunks
    • Allows persisting the state of the output data stream while the stream is open
    • Allows recovering the persisted state of the output data stream, enabling writes to resume from that point
    • Supports atomic commit of the final file once the stream is closed

    This implementation accomplishes this for GS by writing files to a temporary location as data is written to the output stream, and then combining the various temporary files together upon commit (and deleting the temporary files). Each temporary file is written to GS using the resumable upload API. The recoverable writer state (GSRecoverableWriterState) keeps track of which temporary files, in what order, should be combined together to form the final file. (The recoverable writer state also keeps track of the final file location, the number of bytes written so far, and whether the output stream has been closed.)

    We considered but rejected the idea of using Google's resumable upload support to support the entire process of writing a temporary file, i.e. a design in which there would be exactly one temporary file uploaded for every final file written. We rejected this approach for two reasons:

    • Doing that would have required us to depend on Java serialization to persist the WriteChannel associated with the resumable upload
    • There is a nonconfigurable two-week limit on the duration of a single resumable upload, and we didn't want to have to live with that constraint

    Instead, our approach (potentially) writes multiple temporary files associated with each recoverable write; each time persist is called, any ongoing resumable upload is closed, causing a temporary file to be committed, and a new resumable upload is started if/when more bytes are written. We thus avoid having to persist WriteChannels and we avoid the two-week limit for a recoverable write. Note that each individual temporary file must be written within two weeks, which means that checkpoints need to be taken at least that frequently, but that doesn't seem like a problematic limitation in practice.

    When a recoverable write is cleaned up, either on commit or after a failure, all of the temporary files associated with that recoverable write are deleted. The naming scheme for the temporary files associated with a recoverable write is such that we can be sure to delete all temporary files -- even orphaned ones that might result from restarting from an earlier save/checkpoint.

    To simplify accessing the Google Storage API and to make it mockable for unit testing, this code includes a BlobStorage abstraction. This is implemented against GS in GSBlobStorage and against an in-memory store in MockBlobStorage.

    Brief change log

    Main changes are:

    • fa060e9 Add flink-gs-fs-hadoop project: Add new project for GS file system and recoverable writer with FileSystemFactory wireup.
    • 4b8c0d5 Add BlobStorage abstraction: Add interfaces to abstract away direct access to the Google Storage API, both to simplify that access and to make it mockable.
    • f8bf558 Implement BlobStorage for Google Storage: Add GSBlobStorage, an implementation of BlobStorage against the Google Storage API.
    • f2399cd Add utility functions: Add some utility functions used by the recoverable writer. Includes unit tests.
    • 0f20b24 Implement BlobStorage for unit testing: Add MockBlobStorage, an implementation of BlobStorage against an in-memory store.
    • 1aac728 Implement recoverable writer: Implements RecoverableWriter against the BlobStorage abstraction. Includes unit tests.

    Verifying this change

    This change added tests and can be verified as follows:

    • Added unit tests for utility functions
    • Added unit tests for the recoverable writer against a mock in-memory BlobStorage implementation (MockBlobStorage), to validate various success and failure recoverable-write scenarios.

    Note that there are currently no unit tests that validate that GSBlobStorage (the concrete implementation of BlobStorage against the GS API) properly invokes the underlying API. This API is difficult to mock, as many return values are classes that can't be created or extended outside the package. Unit tests would be much easier here if we were to use something like Mockito, but that is discouraged in the coding guidelines so I'm looking for some guidance here.

    Also, I haven't implemented the FileSystemBehaviorTestSuite, since it seems to be testing the underlying FileSystem behavior which is provided directly by Flink's HadoopFileSystem wrapped around Google's GoogleHadoopFileSystem, and not really by any code in this PR. But if this should be added, I can do that -- just let me know.

    Does this pull request potentially affect one of the following parts:

    • Dependencies (does it add or upgrade a dependency): (yes / no) Yes, this adds dependencies on google-cloud-storage and gcs-connector in the flink-gs-fs-hadoop project. These dependencies require a newer version of guava than is present in flink-fs-hadoop-shaded, so this project pulls in a newer version of guava and shades it.
    • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no) No
    • The serializers: (yes / no / don't know) Yes, adds a new serializer (GsRecoverableWriterStateSerializer) to serialize persisted state of a recoverable write.
    • The runtime per-record code paths (performance sensitive): (yes / no / don't know) No
    • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't know) No, except that we'll need to document how to properly deploy and use this new file system.
    • The S3 file system connector: (yes / no / don't know) No

    Documentation

    • Does this pull request introduce a new feature? (yes / no) Yes.
    • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) We will need some sort of GS documentation like this documentation for S3. I intend to provide that in a follow-up commit once any changes settle down as part of the code review.
    review=description? component=Connectors/FileSystem 
    opened by galenwarren 101
  • [FLINK-377] [FLINK-671] Generic Interface / PAPI

    [FLINK-377] [FLINK-671] Generic Interface / PAPI

    This PR contains the new Generic Language Interface and the Python API built on top of it.

    This version hasn't been tested yet on a cluster, this will be done over the weekend. I'm making the PR already so that the reviewing portion starts earlier. (since only minor changes will be necessary to make it work)

    I will mark several parts where i specifically would like some input on.

    Relevant issues: Ideally, [FLINK-1040] will be merged before this is one, as it removes roughly 600 lines of very much hated code in the PlanBinder.

    A while ago the distributed cache was acting up, not maintaining files across subsequent operations. I will verify whether this issue still exists while testing. That would not strictly be a blocking issue, as it stands i could work around that (with the caveat that a few files will remain in the tmp folder).

    component=API/Python 
    opened by zentol 100
  • [FLINK-21675][table-planner-blink] Allow Predicate Pushdown with Watermark Assigner Between Filter and Scan

    [FLINK-21675][table-planner-blink] Allow Predicate Pushdown with Watermark Assigner Between Filter and Scan

    What is the purpose of the change

    Purpose of the change is to fix https://issues.apache.org/jira/browse/FLINK-21675 which allows predicates to be pushed down even though a LogicalWatermarkAssigner is present between the LogicalFilter and LogicalTableScan.

    This means, that, for example, the following table plan:

    LogicalProject(name=[$0], event_time=[$1])
    +- LogicalFilter(condition=[AND(=(LOWER($0), _UTF-16LE'foo'), IS NOT NULL($0))])
       +- LogicalWatermarkAssigner(rowtime=[event_time], watermark=[-($1, 5000:INTERVAL SECOND)])
          +- LogicalTableScan(table=[[default_catalog, default_database, WithWatermark]])
    

    When PP is enabled, and given that there's support for LOWER pushdown, can be re-written to:

    Calc(select=[name, event_time], where=[IS NOT NULL(name)])
    +- WatermarkAssigner(rowtime=[event_time], watermark=[-(event_time, 5000:INTERVAL SECOND)])
       +- TableSourceScan(table=[[default_catalog, default_database, WithWatermark, filter=[equals(lower(name), 'foo')]]], fields=[name, event_time])
    

    Brief change log

    • Implement PushFilterIntoTableSourceScanAcrossWatermarkRule
    • Refactor PushFilterIntoTableSourceScanRule to inherit a common base class PushFilterIntoSourceScanRuleBase
    • Add new rule to FlinkStreamRuleSets under FILTER_TABLESCAN_PUSHDOWN_RULES and LOGICAL_RULES
    • Implement tests

    Verifying this change

    This change added tests and can be verified as follows:

    • Added testPushdownAcrossWatermarkFullPredicateMatch for testing full match of predicates (all predicates in query) and with SupportsWatermarkPushdown support
    • Added testPushdownAcrossWatermarkPartialPredicateMatch for testing that a partial match generates the correct plan (LogicalFilter with the remaining predicates to be filtered).

    Does this pull request potentially affect one of the following parts:

    • Dependencies (does it add or upgrade a dependency): (yes / no)
    • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
    • The serializers: (yes / no / don't know)
    • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
    • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't know)
    • The S3 file system connector: (yes / no / don't know)

    Documentation

    • Does this pull request introduce a new feature? (yes / no)
    • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
    review=description? component=TableSQL/Planner 
    opened by YuvalItzchakov 92
  • [FLINK-18202][PB format] New Format of protobuf

    [FLINK-18202][PB format] New Format of protobuf

    What is the purpose of the change

    Protobuf is a structured data format introduced by google. Compared to json, protobuf is more efficient of space and computing. Nowadays, many companies use protobuf instead of json as data format in kafka and streaming processing.

    So, we will introduce a new format which can derialize/serialize protobuf data in a fast speed. User can use this format in SQL or Table API.

    Verifying this change

    create table source(
    ....... column list
    ) 
                    with(
                    'connector' = 'kafka',
                    'format' = 'protobuf',
                    'protobuf.message-class-name' = '<message class name>'
    )
    
    create table sink(
    ....... column list
    ) 
    with(
                    'connector' = 'kafka',
                    'format' = 'protobuf',
                    'protobuf.message-class-name' = '<message class name>'
    )
    
    PbRowDeserializationSchema deserializationSchema = new PbRowDeserializationSchema(
    			rowType, //RowType of schema
    			InternalTypeInfo.of(rowType), //TypeInformation<RowData> of schema
    			SimpleProtoTest.class.getName(), //message class name
    			false, // ignoreParseErrors
    			false // readDefaultValues
    );
    
    		PbRowSerializationSchema serializationSchema = new PbRowSerializationSchema(
    			rowType, //RowType  of schema
    			SimpleProtoTest.class.getName() //message class name
    );
    

    Tests

    Add many unit tests to test of des/ser for different data type/structures of protobuf.

    Benchmark

    Performance test for pb object containing 200+ fields. Below is the consumed time of processing 10M rows. Implementation | Deserialize Speed | Serialize Speed -- | -- | -- json | 110s | 120s DynamicMessage and Descriptor API | 152s | 206s Codegen(this PR) | 42s | 33s

    Does this pull request potentially affect one of the following parts:

    • New dependencies: Add protobuf dependency com.google.protobuf:protobuf-java:3.12.2
    • Public API: Add new format in Flink SQL
    • The serializers: Add new PbRowDeserializationSchema and PbRowSerializationSchema
    • The runtime per-record code paths (performance sensitive): yes

    Documentation

    Connector params:

    1. protobuf.message-class-name: Required option to specify the full name of protobuf message class. The protobuf class must be located in the classpath both in client and task side.
    2. protobuf.read-default-values: Optional flag to read as default values instead of null when some field does not exist in deserialization; default to false. If proto syntax is proto3, this value will be set true forcibly because proto3's standard is to use default values.
    3. protobuf.ignore-parse-errors: Optional flag to skip rows with parse errors instead of failing; false by default..
    4. protobuf.write-null-string-literal: When serializing to protobuf data, this is the optional config to specify the string literal in protobuf's array/map in case of null values. By default empty string is used.

    Notice

    default values

    As you know, if the protobuf syntax is proto2, the generated pb class has validity bit flags to indicate whether a field is set or not. We can use pbObject.hasXXX() method to know whether the field is set or not. So if syntax=2,the decoded flink row may contain null values. User can also set protobuf.read-default-values to control the behavior of handling null values. But if the syntax is proto3, the generated protobuf class does not have pbObject.hasXXX() method and does not hold validity bit flags, so there is no way to tell if a field is set or not if it is equals to default value. For example, if pbObj.getDim1() returns 0, there's no way to tell if dim1 is set by 0 or it is not set anyway. So if message class is from proto3 syntax, the decoded flink row will not contain any null values.

    Also pb does not permit null in key/value of map and array. We need to generate default value for them.

    row value | pb value -- | -- map<string,string>(<"a", null>) | map<string,string>(("a", "")) map<string,string>(<null, "a">) | map<string,string>(("", "a")) map<int, int>(null, 1) | map<int, int>(0, 1) map<int, int>(1, null) | map<int, int>(1, 0) map<long, long>(null, 1) | map<long, long>(0, 1) map<long, long>(1, null) | map<long, long>(1, 0) map<bool, bool>(null, true) | map<bool, bool>(false, true) map<bool, bool>(true, null) | map<bool, bool>(true, false) map<string, float>("key", null) | map<string, float>("key", 0) map<string, double>("key", null) | map<string, double>("key", 0) map<string, enum>("key", null) | map<string, enum>("key", first_enum_element) map<string, binary>("key", null) | map<string, binary>("key", ByteString.EMPTY) map<string, MESSAGE>("key", null) | map<string, MESSAGE>("key", MESSAGE.getDefaultInstance()) array<:string>(null) | array("") array<:int >(null) | array(0) array<:long>(null) | array(0) array<:bool>(null) | array(false) array<:float>(null) | array(0) array<:double>(null) | array(0) array<:enum>(null) | array(first_enum_element) array<:binary>(null) | array(ByteString.EMPTY) array<:message>(null) | array(MESSAGE.getDefaultInstance())

    OneOf field

    In serialization process, there's no guarantee that the flink row fields of one-of group only contains at least one non-null value. So in serialization, we set each field in the order of flink schema, so the field in high position will override then field of low position in the same one-of group.

    Enum type

    Enum value of pb will be converted to String and vice versa in the name of enum value definition in pb.

    review=description? component=Formats component=TableSQL/Ecosystem 
    opened by maosuhan 86
  • [FLINK-9697] Provide connector for modern Kafka

    [FLINK-9697] Provide connector for modern Kafka

    What is the purpose of the change

    This pull request provides connector for Kafka 2.0.0

    Brief change log

    • Provide connector for Kafka 2.0.0

    Verifying this change

    This change is already covered by existing tests*.

    Does this pull request potentially affect one of the following parts:

    • Dependencies (does it add or upgrade a dependency): (yes / no)
    • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
    • The serializers: (yes / no / don't know)
    • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
    • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
    • The S3 file system connector: (yes / no / don't know)

    Documentation

    • Does this pull request introduce a new feature? (yes / no)
    • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
    component= 
    opened by yanghua 77
  • [FLINK-1745] Add exact k-nearest-neighbours algorithm to machine learning library

    [FLINK-1745] Add exact k-nearest-neighbours algorithm to machine learning library

    I added a quadtree data structure for the knn algorithm. @chiwanpark made originally made a pull request for a kNN algorithm, and we coordinated so that I incorporate a tree structure. The quadtree scales very well with the number of training + test points, but scales poorly with the dimension (even the R-tree scales poorly with the dimension). I added a flag that is automatically determines whether or not to use the quadtree. My implementation needed to use the Euclidean or SquaredEuclidean distance since I needed a specific notion of the distance between a test point and a box in the quadtree. I added another test KNNQuadTreeSuite in addition to Chiwan Park's KNNITSuite, since C. Park's parameters will automatically choose the brute-force non-quadtree method.

    For more details on the quadtree + how I used it for the KNN query, please see another branch I created that has a README.md: https://github.com/danielblazevski/flink/tree/FLINK-1745-devel/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/nn

    component=Library/MachineLearning 
    opened by danielblazevski 76
  • FLINK-2168 Add HBaseTableSource

    FLINK-2168 Add HBaseTableSource

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the How To Contribute guide. In addition to going through the list, please provide a meaningful description of your changes.

    • [ ] General

      • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      • The pull request addresses only one issue
      • Each commit in the PR has a meaningful commit message (including the JIRA id)
    • [ ] Documentation

      • Documentation has been added for new functionality
      • Old documentation affected by the pull request has been updated
      • JavaDoc for public methods has been added
    • [ ] Tests & Build

      • Functionality added by the pull request is covered by tests
      • mvn clean verify has been executed successfully locally or a Travis build has passed

    @fhueske Trying to create the first version of this PR. I have made the necessary changes to support HBaseTableSource by creating a HBaseTableInputFormat but lot of code is duplicated with TableInputFormat. I have not unified them for now. I tried compiling this code in my linux box but the @Override that I have added in HBaseTableSource overriding the BatchTableSource API shows as compilation issues but my IntelliJ IDE is fine and does not complain. Pls provide your valuable feed back so that I can rebase the next PR with suitable fixes. Thanks.

    component=TableSQL/API 
    opened by ramkrish86 74
  • [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

    [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

    What is the purpose of the change Adding a PubSub connector with support for Checkpointing

    Verifying this change This change added tests and can be verified as follows:

    • Added unit tests.

    • Added integration tests to flink-end-to-end which runs against docker.

    • An example has been added in flink-examples which runs against the actual Google PubSub service. this has been manually verified.

    • Is there a need for integration tests? We feel like there is and have added them.

    Does this pull request potentially affect one of the following parts:

    • Dependencies (does it add or upgrade a dependency): Yes, Google Cloud Sdk for PubSub but because it is a connector this does not add any dependencies in flink itself.
    • The public API, i.e., is any changed class annotated with @Public(Evolving): No
    • The serializers: No
    • The runtime per-record code paths (performance sensitive): Nothing has been changed only a connector has been added.
    • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing,
    • Yarn/Mesos, ZooKeeper: No
    • The S3 file system connector: No

    Documentation Does this pull request introduce a new feature? Yes If yes, how is the feature documented? JavaDocs, added an example in flink-examples and we updated the website docs.

    component=Connectors/GoogleCloudPubSub 
    opened by Xeli 69
  • [FLINK-19667] Add AWS Glue Schema Registry integration

    [FLINK-19667] Add AWS Glue Schema Registry integration

    What is the purpose of the change

    The AWS Glue Schema Registry is a new feature of AWS Glue that allows you to centrally discover, control, and evolve data stream schemas. This request is to add a new format to launch an integration for Apache Flink with AWS Glue Schema Registry.

    Brief change log

    • Added flink-avro-glue-schema-registry module under flink-formats
    • Added end-to-end test named flink-glue-schema-registry-test for the new module

    Verifying this change

    This change added tests and can be verified as follows:

    • Added integration tests for end-to-end deployment

    Does this pull request potentially affect one of the following parts:

    • Dependencies (does it add or upgrade a dependency): (yes)
    • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
    • The serializers: (yes)
    • The runtime per-record code paths (performance sensitive): (don't know)
    • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
    • The S3 file system connector: (no)

    Documentation

    • Does this pull request introduce a new feature? (yes)
    • If yes, how is the feature documented? (JavaDocs)
    review=description? component=Formats 
    opened by mohitpali 62
  • [FLINK-22054][k8s] Using a shared watcher for ConfigMap watching

    [FLINK-22054][k8s] Using a shared watcher for ConfigMap watching

    What is the purpose of the change

    Introduce a shared watcher interface for watching all ConfigMaps, and provide an implementation based on the SharedIndexInformer.

    Brief change log

    Using a shared watcher for all ConfigMap watching.

    Verifying this change

    This change added tests and can be verified as follows:

    • Added IT test that validates that KubernetesConfigMapSharedInformer is worked as expected.
    • Replace the watchConfigMaps unit test with sharedWatcher.
    • Remove the tests for KubernetesTooOldResourceVersionException while watching ConfigMaps.

    Does this pull request potentially affect one of the following parts:

    • Dependencies (does it add or upgrade a dependency): (no)
    • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
    • The serializers: (no)
    • The runtime per-record code paths (performance sensitive): (no)
    • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
    • The S3 file system connector: (no)

    Documentation

    • Does this pull request introduce a new feature? (no)
    • If yes, how is the feature documented? (not applicable)
    review=description? component=Runtime/Coordination component=Deployment/Kubernetes 
    opened by yittg 57
  • [hotfix][docs] maven project generation command for windows in  Fraud Detection with the DataStream API

    [hotfix][docs] maven project generation command for windows in Fraud Detection with the DataStream API

    In the Fraud Detection with the DataStream API tutorial the maven project generation command is for Linux, in this pull request i added the project generation command for Windows.

    Documentation

    • Does this pull request introduce a new feature? (yes / no) No
    opened by Ngugisenior 1
  • [FLINK-30536][runtime] Remove CountingOutput from per-record code path for most operators

    [FLINK-30536][runtime] Remove CountingOutput from per-record code path for most operators

    What is the purpose of the change

    This PR reduces the Flink runtime overhead by reducing the call stack depth needed to produce a record. This is achieved by adding a Counter in the ChainingOutput instead of adding a CountingOutput that wraps around ChainingOutput.

    Brief change log

    • Updated AbstractStreamOperator so that it no longer adds a CountingOutput that wraps around the given output.
    • Updated AbstractStreamOperatorV2 so that it no longer adds a CountingOutput that wraps around the given output.
    • Updated OperatorChain and ChainingOutput so that ChainingOutput would create a Counter that maintains the number of records emitted from the operator on the operator chain.

    Verifying this change

    env.fromSequence(1, 500000000L)
            .map(x -> x)
            .map(x -> x)
            .map(x -> x)
            .map(x -> x)
            .map(x -> x)
            .addSink(new DiscardingSink<>());
    

    Here are the benchmark results obtained by running the above program with parallelism=1 and object re-use enabled. The results are averaged across 5 runs for each setup.

    • Prior to the proposed change, the average execution time is 44.15 sec with std=1.4 sec.
    • After the proposed change, the average execution time is 42.15 sec with std=0.76 sec.
    • The proposed change increases throughput by 4.7%.

    Does this pull request potentially affect one of the following parts:

    • Dependencies (does it add or upgrade a dependency): no
    • The public API, i.e., is any changed class annotated with @Public(Evolving): no
    • The serializers: no
    • The runtime per-record code paths (performance sensitive): yes
    • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
    • The S3 file system connector: no

    Documentation

    • Does this pull request introduce a new feature? no
    • If yes, how is the feature documented? not applicable
    component= 
    opened by lindong28 1
  • [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change

    [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change

    What is the purpose of the change

    Introduce TableChange to represent ALTER TABLE MODIFY syntax.

    Brief change log

    • Collect table changes during the sql to operation
    • Add the modify changes in the TableChange

    Verifying this change

    This change is already covered by existing tests, such as SqlToOperationConverterTest.

    Does this pull request potentially affect one of the following parts:

    • Dependencies (does it add or upgrade a dependency): (yes / no)
    • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
    • The serializers: (yes / no / don't know)
    • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
    • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
    • The S3 file system connector: (yes / no / don't know)

    Documentation

    • Does this pull request introduce a new feature? (yes / no)
    • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
    component= 
    opened by fsk119 1
  • [FLINK-30533][runtime] SourceOperator#emitNext() should push records to DataOutput in a while loop

    [FLINK-30533][runtime] SourceOperator#emitNext() should push records to DataOutput in a while loop

    What is the purpose of the change

    Improve Flink performance by reducing the average depth of the call stack needed to produce a record.

    Brief change log

    Updated SourceOperator#emitNext() to push records to the given DataOutput in a while loop. The loop will break when any of the following conditions are met:

    • The underlying source reader returns DataInputStatus#MORE_AVAILABLE
    • The mailbox used by the StreamTask has at least one mail.

    For the example program provided below, the following 4 function calls will be removed from the call stack needed to produce a record for most records.

    • StreamTask#processInput()
    • StreamOneInputProcessor#processInput()
    • StreamTaskSourceInput#emitNext()
    • SourceOperator#convertToInternalStatus()

    Verifying this change

    env.fromSequence(1, 1000000000L).map(x -> x).addSink(new DiscardingSink<>());
    

    Here are the benchmark results obtained by running the above program with parallelism=1 and object re-use enabled. The results are averaged across 5 runs for each setup.

    • Prior to the proposed change, the average execution time is 46.1 sec with std=5.1 sec.
    • After the proposed change, the average execution time is 33.3 sec with std=0.9 sec.
    • The proposed change increases throughput by 38.4%.

    Does this pull request potentially affect one of the following parts:

    • Dependencies (does it add or upgrade a dependency): no
    • The public API, i.e., is any changed class annotated with @Public(Evolving): no
    • The serializers: no
    • The runtime per-record code paths (performance sensitive): yes
    • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
    • The S3 file system connector: no

    Documentation

    • Does this pull request introduce a new feature? no
    • If yes, how is the feature documented? not applicable
    component=Runtime/Task 
    opened by lindong28 2
  • Update intro_to_table_api.md

    Update intro_to_table_api.md

    What is the purpose of the change

    (For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)

    Brief change log

    (for example:)

    • The TaskInfo is stored in the blob store on job creation time as a persistent artifact
    • Deployments RPC transmits only the blob storage reference
    • TaskManagers retrieve the TaskInfo from the blob cache

    Verifying this change

    Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing

    (Please pick either of the following options)

    This change is a trivial rework / code cleanup without any test coverage.

    (or)

    This change is already covered by existing tests, such as (please describe tests).

    (or)

    This change added tests and can be verified as follows:

    (example:)

    • Added integration tests for end-to-end deployment with large payloads (100MB)
    • Extended integration test for recovery after master (JobManager) failure
    • Added test that validates that TaskInfo is transferred only once across recoveries
    • Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.

    Does this pull request potentially affect one of the following parts:

    • Dependencies (does it add or upgrade a dependency): (yes / no)
    • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
    • The serializers: (yes / no / don't know)
    • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
    • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
    • The S3 file system connector: (yes / no / don't know)

    Documentation

    • Does this pull request introduce a new feature? (yes / no)
    • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
    opened by qq774724635 1
Owner
The Apache Software Foundation
The Apache Software Foundation
DataLink is a new open source solution to bring Flink development to data center.

DataLink 简介 DataLink 是一个创新的数据中台解决方案,它基于 SpringCloud Alibaba 和 Apache Flink 实现。它使用了时下最具影响力的实时计算框架Flink,而且紧跟社区发展,试图只通过一种计算框架来解决离线与实时的问题,实现Sql语义化的批流一体,帮助

null 50 Dec 28, 2022
DataLink is a new open source solution to bring Flink development to data center.

DataLink 简介 DataLink 是一个创新的数据中台解决方案,它基于 SpringCloud Alibaba 和 Apache Flink 实现。它使用了时下最具影响力的实时计算框架Flink,而且紧跟社区发展,试图只通过一种计算框架来解决离线与实时的问题,实现Sql语义化的批流一体,帮助

null 39 Dec 22, 2021
Oryx 2: Lambda architecture on Apache Spark, Apache Kafka for real-time large scale machine learning

Oryx 2 is a realization of the lambda architecture built on Apache Spark and Apache Kafka, but with specialization for real-time large scale machine l

Oryx Project 1.8k Dec 28, 2022
Oryx 2: Lambda architecture on Apache Spark, Apache Kafka for real-time large scale machine learning

Oryx 2 is a realization of the lambda architecture built on Apache Spark and Apache Kafka, but with specialization for real-time large scale machine l

Oryx Project 1.7k Mar 12, 2021
Apache Spark - A unified analytics engine for large-scale data processing

Apache Spark Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Scala, Java, Python, and R, and an op

The Apache Software Foundation 34.7k Jan 2, 2023
Model import deployment framework for retraining models (pytorch, tensorflow,keras) deploying in JVM Micro service environments, mobile devices, iot, and Apache Spark

The Eclipse Deeplearning4J (DL4J) ecosystem is a set of projects intended to support all the needs of a JVM based deep learning application. This mean

Eclipse Foundation 12.7k Dec 30, 2022
Mirror of Apache Mahout

Welcome to Apache Mahout! The goal of the Apache Mahout™ project is to build an environment for quickly creating scalable, performant machine learning

The Apache Software Foundation 2k Jan 4, 2023
Mirror of Apache SystemML

Apache SystemDS Overview: SystemDS is a versatile system for the end-to-end data science lifecycle from data integration, cleaning, and feature engine

The Apache Software Foundation 940 Dec 25, 2022
Mirror of Apache SystemML

Apache SystemDS Overview: SystemDS is a versatile system for the end-to-end data science lifecycle from data integration, cleaning, and feature engine

The Apache Software Foundation 940 Dec 25, 2022
Firestorm is a Remote Shuffle Service, and provides the capability for Apache Spark applications to store shuffle data on remote servers

What is Firestorm Firestorm is a Remote Shuffle Service, and provides the capability for Apache Spark applications to store shuffle data on remote ser

Tencent 246 Nov 29, 2022
Mirror of Apache Qpid

We have moved to using individual Git repositories for the Apache Qpid components and you should look to those for new development. This Subversion re

The Apache Software Foundation 125 Dec 29, 2022
Word Count in Apache Spark using Java

Word Count in Apache Spark using Java

Arjun Gautam 2 Feb 24, 2022
Flink Table Store is a unified streaming and batch store for building dynamic tables on Apache Flink

Flink Table Store is a unified streaming and batch store for building dynamic tables on Apache Flink

The Apache Software Foundation 366 Jan 1, 2023
Open data platform based on flink. Now scaleph is supporting data integration with seatunnel on flink

scaleph The Scaleph project features data integration, develop, job schedule and orchestration and trys to provide one-stop data platform for develope

null 151 Jan 3, 2023
FLiP: StreamNative: Cloud-Native: Streaming Analytics Using Apache Flink SQL on Apache Pulsar

StreamingAnalyticsUsingFlinkSQL FLiP: StreamNative: Cloud-Native: Streaming Analytics Using Apache Flink SQL on Apache Pulsar Running on NVIDIA XAVIER

Timothy Spann 5 Dec 19, 2021
Apache Flink

Apache Flink Apache Flink is an open source stream processing framework with powerful stream- and batch-processing capabilities. Learn more about Flin

The Apache Software Foundation 20.4k Jan 5, 2023
Apache Flink

Apache Flink Apache Flink is an open source stream processing framework with powerful stream- and batch-processing capabilities. Learn more about Flin

The Apache Software Foundation 20.4k Jan 5, 2023
Dagger is an easy-to-use, configuration over code, cloud-native framework built on top of Apache Flink for stateful processing of real-time streaming data.

Dagger Dagger or Data Aggregator is an easy-to-use, configuration over code, cloud-native framework built on top of Apache Flink for stateful processi

Open DataOps Foundation 238 Dec 22, 2022
A FlinkSQL studio and real-time computing platform based on Apache Flink

Dinky 简介 实时即未来,Dinky 为 Apache Flink 而生,让 Flink SQL 纵享丝滑,并致力于实时计算平台建设。 Dinky 架构于 Apache Flink,增强 Flink 的应用与体验,探索流式数仓。即站在巨人肩膀上创新与实践,Dinky 在未来批流一体的发展趋势下潜

null 1.5k Dec 30, 2022