Netflix's distributed Data Pipeline

Related tags

Big data suro
Overview

Suro: Netflix's Data Pipeline

Suro is a data pipeline service for collecting, aggregating, and dispatching large volume of application events including log data. It has the following features:

  • It is distributed and can be horizontally scaled.
  • It supports streaming data flow, large number of connections, and high throughput.
  • It allows dynamically dispatching events to different locations with flexible dispatching rules.
  • It has a simple and flexible architecture to allow users to add additional data destinations.
  • It fits well into NetflixOSS ecosystem
  • It is a best-effort data pipeline with support of flexible retries and store-and-forward to minimize message loss

Learn more about Suro on the Suro Wiki and the Netflix TechBlog post where Suro was introduced.

Master Build Status

Pull Request Build Status

Build

NetflixGraph is built via Gradle (www.gradle.org). To build from the command line:

./gradlew build

See the build.gradle file for other gradle targets, like distTar, distZip, installApp, and runServer.

Running the server

You can run the server locally by just running ./gradlew runServer.

More more advanced usage you may wish to run ./gradlew installApp and then:

cd suro-server
java -cp "build/install/suro-server/lib/*" com.netflix.suro.SuroServer -m conf/routingmap.json -s conf/sink.json -i conf/input.json

To enable basic logging you can downloaded slf4j-simple-1.7.7.jar and copy it into suro-server then run:

cd suro-server
java -cp "build/install/suro-server/lib/*:slf4j-simple-1.7.7.jar" com.netflix.suro.SuroServer -m conf/routingmap.json -s conf/sink.json -i conf/input.json

Support

We will use the Google Group, Suro Users, to discuss issues: https://groups.google.com/forum/#!forum/suro-users

Comments
  • Feature/marathon support

    Feature/marathon support

    marathon cli port support

    Command : "cd suro-* && bin/suro-server -p conf/suro.properties -m conf/routingmap.json -s conf/sink.json -P $PORT"

    suro-marathon-screen

    opened by thinker0 9
  • major changes on KafkaSink

    major changes on KafkaSink

    • cleanup a lot of legacy props in constructor
    • we should use the kafka.etc map to all optional kafka props
    • support new boolean flag of normalizeRoutingKey so that we can get rid of PlatformKafkaSink
    • support partitioner config with StickyPartitioner opted in
    • fix test code
    opened by stevenzwu 8
  • Configuration-based Module Load

    Configuration-based Module Load

    Added the ability to load modules from configuration (-x option on command line), removing the compile-time dependency. suro-server project can now specify modules as a runtime dependency.

    opened by adamschmidt 8
  • Use synchronous queue when sink does not want to drop message to deal with back pressure

    Use synchronous queue when sink does not want to drop message to deal with back pressure

    ElasticSearchSink uses MessageQueue4Sink for message handoff between the input and the sink. Its writeTo() method is simply implemented as enqueuing the message to MessageQueue4Sink, which will drop the message if queue is full. Therefore, the queue for ElasticSearchSink is typically configured as a large file based queue to minimize the possibility of dropping messages. This complicates some Suro operations and monitoring as it will have local storage and hence becomes stateful.

    In case that the input to ElasticSearchSink is KafkaConsumer, where there is already a file queue offered by Kafka, having another queue in the system makes things complicated and unnecessary. In this case, if sink is too busy, it should just naturally slow down the input (KafkaConsumer) which will cause lag in the consumer but without having to deal with back pressure.

    Therefore, a synchronous queue seems to be a good fit in this case where it can block the input if sink busy, and shift the back pressure to its upstream.

    With this update, user can configure the queue to be synchronous queue by specifying the capacity of the queue to be 0.

    opened by allenxwang 6
  • Fix issues that ElasticSearchSink may retry indefinitely on exception

    Fix issues that ElasticSearchSink may retry indefinitely on exception

    ElasticSearchSink will automatically re-enqueue messages if the batch failed due to an exception. It is discovered that a NullPointerException might be thrown which causes the indefinitely retries on the batch. Also, it will by default sleep 60 seconds before re-enqueuing which causes huge delay during the retry cycles.

    The pull request tries to address the following issue:

    • The behavior of automatic re-enqueue is made configurable and is turned off by default
    • Retry will be done by RestClient automatically and the times of retry is explicitly configured. Previously retry is not properly enabled because POST is used and RestClient does not automatically retry on POST upon ReadTimeout.
    • Sleep behavior is turned off by default and must be explicitly configured to enable.
    opened by allenxwang 5
  • schedule sink map update in Rx io scheduler, because KafkaProducer.close...

    schedule sink map update in Rx io scheduler, because KafkaProducer.close...

    ... can potentially block. see

    https://issues.apache.org/jira/browse/KAFKA-1660 https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53739782

    However, this is still not perfect. if close blocks on waiting sender thread, it haven't close the metrics yet. we can potentially have bad metrics.

    this is the thread dump, where poller thread for persisted props got locked up //////////////////////////////////// "pollingConfigurationSource" daemon prio=10 tid=0x0000000003547800 nid=0xfe0 in Object.wait() [0x00007f335f6c8000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method)

    • waiting on <0x00000002994dae30> (a org.apache.kafka.common.utils.KafkaThread) at java.lang.Thread.join(Thread.java:1281)
    • locked <0x00000002994dae30> (a org.apache.kafka.common.utils.KafkaThread) at java.lang.Thread.join(Thread.java:1355) at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:422) at com.netflix.suro.sink.kafka.KafkaSink.close(KafkaSink.java:340) at com.netflix.suro.sink.SinkManager.closeSink(SinkManager.java:78) at com.netflix.suro.sink.SinkManager.set(SinkManager.java:118) at com.netflix.chukwa.suro.SuroSinkConfigurator.buildSink(SuroSinkConfigurator.java:66)
    opened by stevenzwu 4
  • Notice failed using S3FileSink

    Notice failed using S3FileSink

    Today the below error cropped up on our Suro instances after running for about two weeks.

    INFO com.netflix.suro.sink.remotefile.RemoteFileSink - upload duration: 2386 ms for /tmp/s3file/P20141029T202649ip1723184136ed362e14915de3c0bb26.done Len: 43195131 bytes ERROR com.netflix.suro.sink.remotefile.RemoteFileSink - Exception while uploading: Notice failed java.lang.RuntimeException: Notice failed at com.netflix.suro.sink.remotefile.S3FileSink.notify(S3FileSink.java:161) at com.netflix.suro.sink.remotefile.RemoteFileSink$3.run(RemoteFileSink.java:190)

    It appears this error occurs when the notice queue fills up. The notice queue is configured with a capacity of 100K entries. Please see the below S3 sink config. This issue also happened about two weeks ago. Does this error mean the notice queue has filled up? Does the below sink configuration look correct or is there something in it that may be causing this issue?

    "S3Sink": {

        "type": "s3",
        "localFileSink": {
            "type": "local",
            "outputDir": "<outputDir>",
            "bucket": "fileSink2"
        },
        "notice": {
            "type":"queue",
            "length":"10000"
        },
        "bucket": "<bucket>",
        "s3Endpoint": "s3.amazonaws.com",
        "prefixFormatter": {
            "type": "source-date",
            "prefix": "<prefix>",
            "dateFormat" : "yyyyMMdd"
        }
    }
    
    opened by bquisling 4
  • Spam in the Wiki

    Spam in the Wiki

    Hi, I just noticed that you have some spam pages in your wiki on gitihub project wiki Here are some links

    • https://github.com/Netflix/suro/wiki/call-center-service-pemanas-air-Phn-021-36069559
    • https://github.com/Netflix/suro/wiki/call-center-service-solahart--Call:-021-36069559,
    • https://github.com/Netflix/suro/wiki/call-center-service-wika-082111266245
    • https://github.com/Netflix/suro/wiki/Call-Center-Service-Wika-Air-Panas-Telp:021-36069559
    opened by yogendra 0
  • Suro with S3 sink

    Suro with S3 sink

    Currently facing an issue with suro communicating with s3

    ERROR [com.netflix.suro.input.thrift.MessageSetProcessor] Failed to process message routingKey: logs, payload byte size: 324: null java.lang.NullPointerException   at com.netflix.suro.routing.MessageRouter.process(MessageRouter.java:84)   at com.netflix.suro.input.thrift.MessageSetProcessor.processMessageSet(MessageSetProcessor.java:235)   at com.netflix.suro.input.thrift.MessageSetProcessor.access$300(MessageSetProcessor.java:57)   at com.netflix.suro.input.thrift.MessageSetProcessor$1.run(MessageSetProcessor.java:210)   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)   at java.lang.Thread.run(Thread.java:745)

    any idea why?

    opened by ghost 0
  • Zookeeper dependencies are missing in suro-server install

    Zookeeper dependencies are missing in suro-server install

    When connecting to kafka source, I've had to add zookeeper in build/install/suro-server/lib to get rid of

    Caused by: java.lang.ClassNotFoundException: org.apache.zookeeper.Watcher

    opened by cheleb 0
  • Added support for and AvroFileWriter

    Added support for and AvroFileWriter

    Hi,

    I did this some time ago and I thought you might be interested in having this in suro's master branch. It's a basic avro file writer that you can use when configuring your sink. The schema must be provided as part of the configuration.

    For instance

      "item-local-sink": {
            "type": "local",
            "maxFileSize": "1048576000",
            "rotationPeriod": "PT1m",
            "outputDir": "/data/surodata/local/item",
            "writer": {
                "type": "avro",
                "schema": "{\"type\":\"record\",\"name\":\"Item\",\"namespace\":\"my.app.namespace\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"description\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"option\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"type\",\"type
    \":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},{\"name\":\"price\",\"type\":[\"null\",\"float\"],\"default\":null}]}"
            }
       }
    

    This is pretty basic but it's been really useful in our case.

    opened by Crystark 2
  • routing map parser fails silently on malformed json

    routing map parser fails silently on malformed json

    I'm working on an app that is based on Suro 0.2.10. If I add malformed json to a routing map config file, Suro will not route messages to any of the sinks. I'm unable to see any errors or exceptions in my logs. Suro appears to boot just fine. I'd expect to see an error message that Suro was unable to parse the routing map.

    opened by ActualAdam 0
Owner
Netflix, Inc.
Netflix Open Source Platform
Netflix, Inc.
Distributed and fault-tolerant realtime computation: stream processing, continuous computation, distributed RPC, and more

IMPORTANT NOTE!!! Storm has Moved to Apache. The official Storm git repository is now hosted by Apache, and is mirrored on github here: https://github

Nathan Marz 8.9k Dec 26, 2022
The official home of the Presto distributed SQL query engine for big data

Presto Presto is a distributed SQL query engine for big data. See the User Manual for deployment instructions and end user documentation. Requirements

Presto 14.3k Jan 5, 2023
Web-based notebook that enables data-driven, interactive data analytics and collaborative documents with SQL, Scala and more.

Apache Zeppelin Documentation: User Guide Mailing Lists: User and Dev mailing list Continuous Integration: Contributing: Contribution Guide Issue Trac

The Apache Software Foundation 5.9k Jan 8, 2023
Apache Heron (Incubating) is a realtime, distributed, fault-tolerant stream processing engine from Twitter

Heron is a realtime analytics platform developed by Twitter. It has a wide array of architectural improvements over it's predecessor. Heron in Apache

The Apache Software Foundation 3.6k Dec 28, 2022
OpenRefine is a free, open source power tool for working with messy data and improving it

OpenRefine OpenRefine is a Java-based power tool that allows you to load data, understand it, clean it up, reconcile it, and augment it with data comi

OpenRefine 9.2k Jan 1, 2023
Hadoop library for large-scale data processing, now an Apache Incubator project

Apache DataFu Follow @apachedatafu Apache DataFu is a collection of libraries for working with large-scale data in Hadoop. The project was inspired by

LinkedIn's Attic 589 Apr 1, 2022
SAMOA (Scalable Advanced Massive Online Analysis) is an open-source platform for mining big data streams.

SAMOA: Scalable Advanced Massive Online Analysis. This repository is discontinued. The development of SAMOA has moved over to the Apache Software Foun

Yahoo Archive 424 Dec 28, 2022
A platform for visualization and real-time monitoring of data workflows

Status This project is no longer maintained. Ambrose Twitter Ambrose is a platform for visualization and real-time monitoring of MapReduce data workfl

Twitter 1.2k Dec 31, 2022
Program finds average number of words in each comment given a large data set by use of hadoop's map reduce to work in parallel efficiently.

Finding average number of words in all the comments in a data set ?? Mapper Function In the mapper function we first tokenize entire data and then fin

Aleezeh Usman 3 Aug 23, 2021
Access paged data as a "stream" with async loading while maintaining order

DataStream What? DataStream is a simple piece of code to access paged data and interface it as if it's a single "list". It only keeps track of queued

Thomas 1 Jan 19, 2022
A distributed data integration framework that simplifies common aspects of big data integration such as data ingestion, replication, organization and lifecycle management for both streaming and batch data ecosystems.

Apache Gobblin Apache Gobblin is a highly scalable data management solution for structured and byte-oriented data in heterogeneous data ecosystems. Ca

The Apache Software Foundation 2.1k Jan 4, 2023
Pipeline for Visualization of Streaming Data

Seminararbeit zum Thema Visualisierung von Datenströmen Diese Arbeit entstand als Seminararbeit im Rahmen der Veranstaltung Event Processing an der Ho

Domenic Cassisi 1 Feb 13, 2022
A lightweight command processing pipeline ❍ ⇢ ❍ ⇢ ❍ for your Java awesome app.

PipelinR PipelinR is a lightweight command processing pipeline ❍ ⇢ ❍ ⇢ ❍ for your awesome Java app. PipelinR has been battle-proven on production, as

Eduards Sizovs 288 Jan 8, 2023
PipelinR is a lightweight command processing pipeline ❍ ⇢ ❍ ⇢ ❍ for your Java awesome app.

PipelinR PipelinR is a lightweight command processing pipeline ❍ ⇢ ❍ ⇢ ❍ for your awesome Java app. PipelinR has been battle-proven on production, as

Eduards Sizovs 288 Jan 8, 2023
Project on End to End CI/CD pipeline for java based application using Git,Github,Jenkins,Maven,Sonarqube,Nexus,Slack,Docker and Kuberenets with ECR as private docker registry and Zero Downtime Deployment

Project on End to End CI/CD pipeline for java based application using Git,Github,Jenkins,Maven,Sonarqube,Nexus,Slack,Docker and Kuberenets with ECR as private docker registry and Zero Downtime Deployment.

NITHIN JOHN GEORGE 10 Nov 22, 2022
BlockChain Pipeline using Jenkins for DevOps

BlockChain Pipeline for Jenkins This project is inspired by the work of Redback and Microsoft teams for developing the process using VSTS. I've chosen

Brantley·Williams 11 Jun 8, 2022
Java implementation of Condensation - a zero-trust distributed database that ensures data ownership and data security

Java implementation of Condensation About Condensation enables to build modern applications while ensuring data ownership and security. It's a one sto

CondensationDB 43 Oct 19, 2022
SeaTunnel is a distributed, high-performance data integration platform for the synchronization and transformation of massive data (offline & real-time).

SeaTunnel SeaTunnel was formerly named Waterdrop , and renamed SeaTunnel since October 12, 2021. SeaTunnel is a very easy-to-use ultra-high-performanc

The Apache Software Foundation 4.4k Jan 2, 2023
Distributed and fault-tolerant realtime computation: stream processing, continuous computation, distributed RPC, and more

IMPORTANT NOTE!!! Storm has Moved to Apache. The official Storm git repository is now hosted by Apache, and is mirrored on github here: https://github

Nathan Marz 8.9k Dec 26, 2022