Hadoop library for large-scale data processing, now an Apache Incubator project

Related tags

Big data datafu
Overview

Apache DataFu

Follow @apachedatafu

Apache DataFu is a collection of libraries for working with large-scale data in Hadoop. The project was inspired by the need for stable, well-tested libraries for data mining and statistics.

It consists of two libraries:

  • Apache DataFu Pig: a collection of user-defined functions for Apache Pig
  • Apache DataFu Hourglass: an incremental processing framework for Apache Hadoop in MapReduce

DataFu is currently undergoing incubation with Apache. A mirror of the official git repository can be found on GitHub at https://github.com/apache/incubator-datafu.

For more information please visit the website:

If you'd like to jump in and get started, check out the corresponding guides for each library:

Blog Posts

Presentations

Videos

Other Resources

An interesting example of using Quantile from DataFu can be found in the Hadoop Real-World Solutions Cookbook.

From Around the Web

Papers

Getting Help

Please visit the website:

Comments
  • Track pull requests from king821221/datafu@master to linkedin/datafu@master

    Track pull requests from king821221/datafu@master to linkedin/datafu@master

    This pull request has centrally tracked the following features:

    (1) create initial version of entroy UDFs issue: https://github.com/linkedin/datafu/issues/77 (2) experiment and implement weighted sampling algorithms issue: https://github.com/linkedin/datafu/issues/80

    opened by king821221 11
  • AliasableEvalFunc subcalss, how to test it?

    AliasableEvalFunc subcalss, how to test it?

    Hi, I've wrote UDF in groovy using AliasableEvalFunc

    I want to test it on unit level, I don't want to wrap this class with pig script. How can I push down a tuple schema to AliasableEvalFunc subcalss? I can't find example in tests.

    I've wrote such ugly code, looks like it works: ``groovy

    ZonesGenerator initUdf(){ def getAliasSchemaMap = { def aliasSchemaAsStr = 'bag1.field1=0, bag2::moreName.field1=1' aliasSchemaAsStr.split(',').inject([:]){memo, keyVal -> memo[keyVal.split('=').first()] = Integer.valueOf(keyVal.split('=').last()) return memo } }

        new ZonesGenerator(){
            public Map<String, Integer> getFieldAliases(){
                return getAliasSchemaMap
            }
        }
    

    }

    ``

    opened by seregasheypak 10
  • Implement Entropy UDF

    Implement Entropy UDF

    In the real world, there are occasions we need to calculate the entropy of discrete random variables, for instance, to calculate the mutual information between variable X and Y using its entropy-based formula(mutual information calculation could be found at http://en.wikipedia.org/wiki/Mutual_information#Relation_to_other_quantities). Would suggest to implement a UDF to calculate the entropy of given input samples, following the definition at http://en.wikipedia.org/wiki/Entropy_%28information_theory%29

    In Apache Mahout, there is an existing implementation of entropy using Map/Reduce API, please refer to https://issues.apache.org/jira/browse/MAHOUT-747.

    UDF Spec:

    package datafu.pig.stats;

    public class Entropy extends AccumulatorEvalFunc

    Constructor accepts a string to indicate its logarithm base, including: 2, e(Euler's number), and 10

    Input: a bag of ordered tuples X = {x[0], x[1], ... x[N - 1]}

    Output: entropy value of double type

    How to use:

    X = LOAD 'input' AS (val:int);
    EX = FOREACH (GROUP X ALL) {OX = ORDER X by val; GENERATE Entropy(OX);}
    
    opened by king821221 10
  • Implement and experiment with different weighted sampling algorithms

    Implement and experiment with different weighted sampling algorithms

    This is the reference paper I use to learn about the weighted sampleing algorithm: http://utopia.duth.gr/~pefraimi/research/data/2007EncOfAlg.pdf

    The present WeightedSample.java implements the Algorithm D.

    We may try Algorithm A, A-res and A-expJ since they could be used in a data stream and distributed environment. These algorithms could be implemented based on ReservoirSample.java(inherit from this class?) since they also need a reservior to store the selected items.

    opened by king821221 9
  • Allow overriding java source and target version via properties

    Allow overriding java source and target version via properties

    I've moved the hard coded target version to a property, so that the default behavior will remain the same. However user will have ability to override the target version via the property on command line, for example:

    ant clean jar -DtargetJavaVersion=1.7

    opened by jarcec 7
  • Quantile and StreamingQuantile don't work - 'can't work with argument null'

    Quantile and StreamingQuantile don't work - 'can't work with argument null'

    file ./topics.pig, line 31, column 62> Failed to generate logical plan. Nested exception: java.lang.RuntimeException: could not instantiate 'datafu.pig.stats.Quantile' with arguments 'null'

    When:

    quantiles = foreach (group token_counts all) generate FLATTEN(datafu.pig.stats.Quantile('0.10', '0.90')) as (low_ten, high_ten);

    opened by rjurney 6
  • datafu.pig.sampling.ReservoirSample

    datafu.pig.sampling.ReservoirSample

    HI when using ReservoirSample it seems like the sample is done on the full input instead of the group input.

    e.g. Lets say my input.txt is a1,5 a1,6 a1,7 a2,5 a2,6 a2,7

    I have the following program: DEFINE SRS datafu.pig.sampling.ReservoirSample('2'); data = LOAD 'input.txt' USING PigStorage(',') AS (key: chararray, value: chararray); grouped = GROUP data BY key; sample2 = FOREACH grouped GENERATE FLATTEN(SRS(data));

    The output was. I would assume that that for each group it would sample (meaning I would see 2 samples of a1 and 2 samples of a2...why is that not)? (a1,7) (a1,6) (a1,6) (a1,6)

    opened by basiamucha 5
  • Finish support for Pig 0.12

    Finish support for Pig 0.12

    Pig 0.12.0 has added several new keywords such as "Assert" or "In" (via PIG-3367 and PIG-3269). This is causing issues to DataFu UDFs with the same name. I wasn't able to find any other solution other then renaming the UDFs in question. I do understand that this is very intrusive change, so I've provided deprecated ones for backward compatibility - e.g users on Pig 0.11.0 will still be able to use the Assert or In as they are now.

    After applying this final set of changes I was able to run all unit tests against Pig 0.12.0 without any issues!

    opened by jarcec 5
  • Fixed StreamingQuantile.outputSchema so that it conforms to the real output

    Fixed StreamingQuantile.outputSchema so that it conforms to the real output

    The StreamingQuantile UDF generates intermediary quantiles but outputs only the request quantiles. However, the outputSchema method does not reflect that. So the following Pig script breaks with an incompatible schema error:

    define Quantiles datafu.pig.stats.StreamingQuantile('0.5', '0.9', '0.95', '1'); A = foreach B generate flatten(Quantiles(B.C)) as (q1, q2, q3, q4);

    I changed the outputSchema so that only the requested quantiles are in the schema.

    opened by rafbarr 5
  • Add Lead function which is similar to Oracle's same name window function

    Add Lead function which is similar to Oracle's same name window function

    Lead is an analytic function like Oracle's Lead function. It provides access to more than one tuple of a bag at the same time without a self join. Given a bag of tuple returned from a query, LEAD provides access to a tuple at a given physical offset beyond that position. Generates pairs of all items in a bag.

    If you do not specify offset, then its default is 1. Null is returned if the offset goes beyond the scope of the bag.

    Example 1:

       register ba-pig-0.1.jar
    
       define Lead datafu.pig.bags.Lead('2');
    
       -- INPUT: ({(1),(2),(3),(4)})
       data = LOAD 'input' AS (data: bag {T: tuple(v:INT)});
       describe data;
    
       -- OUTPUT:  ({((1),(2),(3)),((2),(3),(4)),((3),(4),),((4),,)})
       -- OUTPUT SCHEMA: data2: {lead_data: {(elem0: (v: int),elem1: (v: int),elem2: (v: int))}}
       data2 = FOREACH data GENERATE Lead(data);
       describe data2;
       DUMP data2;
    

    Example 2

       register  ba-pig-0.1.jar
    
       define Lead datafu.pig.bags.Lead();
    
       -- INPUT: ({(10,{(1),(2),(3)}),(20,{(4),(5),(6)}),(30,{(7),(8)}),(40,{(9),(10),(11)}),(50,{(12),(13),(14),(15)})})
       data = LOAD 'input' AS (data: bag {T: tuple(v1:INT,B: bag{T: tuple(v2:INT)})});
       --describe data;
    
       -- OUPUT: ({((10,{(1),(2),(3)}),(20,{(4),(5),(6)})),((20,{(4),(5),(6)}),(30,{(7),(8)})),((30,{(7),(8)}),(40,{(9),(10),(11)})),((40,{(9),(10),(11)}),(50,{(12),(13),(14),(15)})),((50,{(12),(13),(14),(15)}),)})
       data2 = FOREACH data GENERATE Lead(data);
       --describe data2;
       DUMP data2;
    
    opened by coderplay 4
  • TotalTimeBetween UDF

    TotalTimeBetween UDF

    This UDF takes an input bag which is assumed to be ordered by an ISO8601 timestamp field and returns the total time elapsed (in seconds) by all tuples in the bag. The first field in each of the tuples in the DataBag is assumed to be the ISO8601 timestamp.

    Example:

     DEFINE TotalTimeBetween datafu.pig.bags.TotalTimeBetween();
    
    -- input: 
    -- 2012-01-01T00:00:00\tbiuA8n98wn\thttp://www.google.com/
    -- 2012-01-01T00:00:00\tasd909m09j\thttp://www.google.com/
    -- 2012-01-01T00:01:00\tbiuA8n98wn\thttp://www.google.com/1
    -- 2012-01-01T00:01:05\tbiuA8n98wn\thttp://www.google.com/
    -- 2012-01-01T00:02:00\tasd909m09j\thttp://www.google.com/2
    input = LOAD 'input' AS (timestamp:chararray, visitor_id:chararray, url:chararray);
    by_visitor = GROUP data BY visitor_id;
    count_time = FOREACH by_visitor {
        ordered = ORDER data BY timestamp ASC;
        GENERATE group, TotalTimeBetween(ordered);
    };
    DUMP count_time;
    -- (biuA8n98wn,65)
    -- (asd909m09j,120)
    

    In truth I wasn't really sure what to call / where to put this UDF so I'm open to suggestions.

    opened by msukmanowsky 4
  • Release : When can we expect the 1.3 release of DataFu ?

    Release : When can we expect the 1.3 release of DataFu ?

    Thanks for the fix to SampleByKey issue. Please let us know when can we expect the release that contains this fix.

    Or If the build instructions are documented somewhere I can get the appropriate patch and get a temporary build till the official JAR is out.

    Thanks Bala

    opened by BalachanderGS 1
  • Getting java.lang.NullPointerException in running PageRank

    Getting java.lang.NullPointerException in running PageRank

    I have a pair of 35M of links from 117K nodes and ran pagerank job on 3 node m2.2xlarge EMR cluster. Initially I got out of memory error in the reduce phase so I increased the JVM size and then now I am getting the following error (and this happens in one reduce job and the other 3 completes without any error):

    2015-01-04 03:44:40,349 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : org.apache.pig.backend.executionengine.ExecException: ERROR 0: Exception while executing (Name: topic_ranks: New For Each(false,false,false)[bag] - scope-42 Operator Key: scope-42): org.apache.pig.backend.executionengine.ExecException: ERROR 0: Exception while executing [POUserFunc (Name: POUserFunc(datafu.pig.linkanalysis.PageRank)[bag] - scope-33 Operator Key: scope-33) children: null at []]: java.lang.NullPointerException at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:289) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:242) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.runPipeline(PigGenericMapReduce.java:464) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.processOnePackageOutput(PigGenericMapReduce.java:432) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:412) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:256) at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171) at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:635) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:390) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)

    Any idea how to resolve this issue? I used Hadoop 2.4.0 and Pig 0.12.0.

    opened by keeyonghan 1
  • Adding a new UDF called Ndcg to support calculating Ndcg.

    Adding a new UDF called Ndcg to support calculating Ndcg.

    A couple of different configuration options are available for calculating ndcg. You can specify positional values per range, use a standard logarithmic discounting function, or use a custom function.

    opened by jhartman 0
Releases(v1.2.0)
  • v1.2.0(Dec 6, 2013)

    Additions:

    • Pair of UDFs for simple random sampling with replacement.
    • More dependencies now packaged in DataFu so fewer JAR dependencies required.
    • SetDifference UDF for computing set difference (e.g. A-B or A-B-C).
    • HyperLogLogPlusPlus UDF for efficient cardinality estimation.
    Source code(tar.gz)
    Source code(zip)
  • v1.1.0(Nov 7, 2013)

    This release adds compatibility with Pig 0.12 (courtesy of @jarcec).

    Additions:

    • Added SHA hash UDF.
    • InUDF and AssertUDF added for Pig 0.12 compatibility. These are the same as In and Assert.
    • SimpleRandomSample, which implements a scalable simple random sampling algorithm.

    Fixes:

    • Fixed the schema declarations of several UDFs for compatibility with Pig 0.12, which is now stricter with schemas.
    Source code(tar.gz)
    Source code(zip)
  • v1.0.0(Sep 4, 2013)

    This is not a backwards compatible release.

    Additions:

    • Added SampleByKey, which provides a way to sample tuples based on certain fields.
    • Added Coalesce, which returns the first non-null value from a list of arguments like SQL's COALESCE.
    • Added BagGroup, which performs an in-memory group operation on a bag.
    • Added ReservoirSample
    • Added In filter func, which behaves like SQL's IN
    • Added EmptyBagToNullFields, which enables multi-relation left joins using COGROUP
    • Sessionize now supports long values for timestamp, in addition to string representation of time.
    • BagConcat can now operate on a bag of bags, in addition to a tuple of bags
    • Created TransposeTupleToBag, which creates a bag of key-value pairs from a tuple
    • SessionCount now implements Accumulator interface
    • DistinctBy now implements Accumulator interface
    • Using PigUnit from Maven for testing, instead of checked-in JAR
    • Added many more test cases to improve coverage
    • Improved documentation

    Changes:

    • Moved WeightedSample to datafu.pig.sampling
    • Using Pig 0.11.1 for testing.
    • Renamed package datafu.pig.numbers to datafu.pig.random
    • Renamed package datafu.pig.bag.sets to datafu.pig.sets
    • Renamed TimeCount to SessionCount, moved to datafu.pig.sessions
    • ASSERT renamed to Assert
    • MD5Base64 merged into MD5 implementation, constructor arg picks which method, default being hex

    Removals:

    • Removed ApplyQuantiles
    • Removed AliasBagFields, since can now achieve with nested foreach

    Fixes:

    • Quantile now outputs schemas consistent with StreamingQuantile
    • Necessary fastutil classes now packaged in datafu JAR, so fastutil JAR not needed as dependency
    • Non-deterministic UDFs now marked as so
    Source code(tar.gz)
    Source code(zip)
  • v0.0.10(Sep 4, 2013)

    Additions:

    • CountEach now implements Accumulator
    • Added AliasableEvalFunc, a base class to enable UDFs to access fields in tuple by name instead of position
    • Added BagLeftOuterJoin, which can perform left join on two or more reasonably sized bags without a reduce

    Fixes:

    • StreamingQuantile schema fix
    Source code(tar.gz)
    Source code(zip)
  • v0.0.9(Sep 4, 2013)

    Additions:

    • WeightedSample can now take a seed

    Changes:

    • Test against Pig 0.11.0

    Fixes:

    • Null pointer fix for Enumerate's Accumulator implementation
    Source code(tar.gz)
    Source code(zip)
Owner
LinkedIn's Attic
The OSS projects that LinkedIn no longer maintains live here.
LinkedIn's Attic
Program finds average number of words in each comment given a large data set by use of hadoop's map reduce to work in parallel efficiently.

Finding average number of words in all the comments in a data set ?? Mapper Function In the mapper function we first tokenize entire data and then fin

Aleezeh Usman 3 Aug 23, 2021
This code base is retained for historical interest only, please visit Apache Incubator Repo for latest one

Apache Kylin Apache Kylin is an open source Distributed Analytics Engine to provide SQL interface and multi-dimensional analysis (OLAP) on Hadoop supp

Kylin OLAP Engine 561 Dec 4, 2022
Real-time Query for Hadoop; mirror of Apache Impala

Welcome to Impala Lightning-fast, distributed SQL queries for petabytes of data stored in Apache Hadoop clusters. Impala is a modern, massively-distri

Cloudera 27 Dec 28, 2022
A distributed data integration framework that simplifies common aspects of big data integration such as data ingestion, replication, organization and lifecycle management for both streaming and batch data ecosystems.

Apache Gobblin Apache Gobblin is a highly scalable data management solution for structured and byte-oriented data in heterogeneous data ecosystems. Ca

The Apache Software Foundation 2.1k Jan 4, 2023
Twitter's collection of LZO and Protocol Buffer-related Hadoop, Pig, Hive, and HBase code.

Elephant Bird About Elephant Bird is Twitter's open source library of LZO, Thrift, and/or Protocol Buffer-related Hadoop InputFormats, OutputFormats,

Twitter 1.1k Jan 5, 2023
Google Mr4c GNU Lesser 3 Google Mr4c MR4C is an implementation framework that allows you to run native code within the Hadoop execution framework. License: GNU Lesser 3, .

Introduction to the MR4C repo About MR4C MR4C is an implementation framework that allows you to run native code within the Hadoop execution framework.

Google 911 Dec 9, 2022
:elephant: Elasticsearch real-time search and analytics natively integrated with Hadoop

Elasticsearch Hadoop Elasticsearch real-time search and analytics natively integrated with Hadoop. Supports Map/Reduce, Apache Hive, Apache Pig, Apach

elastic 1.9k Dec 22, 2022
In this task, we had to write a MapReduce program to analyze the sentiment of a keyword from a list of comments. This was done using Hadoop HDFS.

All the files have been commented for your ease. Furthermore you may also add further comments if you may. For further queries contact me at : chhxnsh

Hassan Shahzad 5 Aug 14, 2021
Program that uses Hadoop Map-Reduce to identify the anagrams of the words of a file

Hadoop-MapReduce-Anagram-Solver The implementation consists of a program that utilizes the Hadoop Map-Reduce framework to identify the anagrams of the

Nikolas Petrou 2 Dec 4, 2022
PageRank implementation in hadoop

PageRank implementation in hadoop Use kiwenalu/hadoop-cluster-docker (set cluster size for 5) for running JAR. Load dataset to memory using script

Maksym Zub 1 Jan 24, 2022
Apache Heron (Incubating) is a realtime, distributed, fault-tolerant stream processing engine from Twitter

Heron is a realtime analytics platform developed by Twitter. It has a wide array of architectural improvements over it's predecessor. Heron in Apache

The Apache Software Foundation 3.6k Dec 28, 2022
Hudi manages the storage of large analytical datasets on DFS

Apache Hudi Apache Hudi (pronounced Hoodie) stands for Hadoop Upserts Deletes and Incrementals. Hudi manages the storage of large analytical datasets

The Apache Software Foundation 3.8k Dec 30, 2022
Distributed and fault-tolerant realtime computation: stream processing, continuous computation, distributed RPC, and more

IMPORTANT NOTE!!! Storm has Moved to Apache. The official Storm git repository is now hosted by Apache, and is mirrored on github here: https://github

Nathan Marz 8.9k Dec 26, 2022
Web-based notebook that enables data-driven, interactive data analytics and collaborative documents with SQL, Scala and more.

Apache Zeppelin Documentation: User Guide Mailing Lists: User and Dev mailing list Continuous Integration: Contributing: Contribution Guide Issue Trac

The Apache Software Foundation 5.9k Jan 8, 2023
Mirror of Apache Storm

Master Branch: Storm is a distributed realtime computation system. Similar to how Hadoop provides a set of general primitives for doing batch processi

The Apache Software Foundation 6.4k Jan 3, 2023
Apache Flink

Apache Flink Apache Flink is an open source stream processing framework with powerful stream- and batch-processing capabilities. Learn more about Flin

The Apache Software Foundation 20.4k Jan 5, 2023
Apache Druid: a high performance real-time analytics database.

Website | Documentation | Developer Mailing List | User Mailing List | Slack | Twitter | Download Apache Druid Druid is a high performance real-time a

The Apache Software Foundation 12.3k Jan 9, 2023
Apache Hive

Apache Hive (TM) The Apache Hive (TM) data warehouse software facilitates reading, writing, and managing large datasets residing in distributed storag

The Apache Software Foundation 4.6k Dec 28, 2022
Apache Dubbo漏洞测试Demo及其POC

DubboPOC Apache Dubbo 漏洞POC 持续更新中 CVE-2019-17564 CVE-2020-1948 CVE-2020-1948绕过 CVE-2021-25641 CVE-2021-30179 others 免责声明 项目仅供学习使用,任何未授权检测造成的直接或者间接的后果及

lz2y 19 Dec 12, 2022