Hadoop Streaming with Perl Script

In this article, I am going to explain how to use Hadoop streaming with Perl scripts. First, let’s understand some theory behind Hadoop streaming.

Hadoop has been written in Java. Therefore, the native language to write MapReduce program is Java. But, Hadoop also provide an API to MapReduce that allows you to write your map and reduce functions in languages other than Java.

Hadoop Streaming

The API is called Hadoop Streaming which uses Unix standard streams as the interface between Hadoop and your program. To understand this lets have a detailed look how a normal Java MapReduce executed within Tasktracker.

Diagram 1

As shown in the Diagram-1, Once the Tasktracker  assigned a task and prepared**   to execute the task, it create an instance of TaskRunner to run the task.

**Preparation includes: Copy job JAR to Tasktracker and un-jar content to the working directory created locally; Also copies any files needed from distribution cache to local disk; Create instance of TaskRunner

The TaskRunner launches the a new JVM to run each task. When the task finished the clean up action (determined by OutputCommiter) is used to commit the task i.e. output is written to the final location.

Now, it’s interesting to look at where the streaming job differs in the above process.

When Streaming Job is submitted, it runs special Map and Reduced task. The streaming map and reduce task launch a separate process to run user specified executables and  communicate with it using standard input and output streams. (Diagram-2).

Diagram 2

During execution of the task, the Java process passes input key-value pairs to the external process, which runs it through the user-defined map or reduce function (Perl program in this case) and passes the output key-value pairs back to the Java Process. From the Tasktracker’s point of view, it is as if the Tasktracker child process (JVM) ran the map or reduce code itself. But in actual, the streaming process run the job and JVM delegate the input/output to and from the Tasktracker (Parent Process).

Therefore, you can use any language that can read standard input and write to standard output to write your output program. In our example we will use one of such language: PERL.

How to write & execute Hadoop streaming job – Example

Although, programming language like Perl/Python offers very powerful Regular Expressions for pattern matching and text processing. Since in this article our focus is not on Perl programming, I have removed all the gimmicks and have kept it very simple.

Sample sales data (Date, Time, Store, Product Category, sales, Card Type)
2012-01-01 09:00 San Jose Men's Clothing 214.05 Amex
2012-01-01 09:00 Fort Worth Women's Clothing 153.57 Visa
2012-01-01 09:00 San Diego Music 66.08 Cash
2012-01-01 09:00 Pittsburgh Pet Supplies 493.51 Discover
2012-01-01 09:00 Omaha Children's Clothing 235.63 MasterCard
2012-01-01 09:00 Stockton Men's Clothing 247.18 MasterCard
2012-01-01 09:00 Austin Cameras 379.6 Visa
2012-01-01 09:00 New York Consumer Electronics 296.8 Cash
2012-01-01 09:00 Corpus Christi Toys 25.38 Discover
2012-01-01 09:00 Fort Worth Toys 213.88 Visa
vi salesMapper.pl
#!/usr/bin/perl 
foreach(<>) {
chomp;
($store,$sale) = (split(/t/,$_))[2,4];
print "$storet$salen";
#print "{0}t{1}".format($store,$sale);
}
vi salesReducer.pl
#!/usr/bin/perl 
use List::Util qw(sum);
my %hashTab;
$totalsale = 0;
foreach (<>) {
chomp;
@data = split(/t/,$_);
if($#data == 1) {
($store,$sale)=@data;
if(exists($hashTab{$store})) { $hashTab{$store} = sum($hashTab{$store} + $sale);
} else {
$hashTab{$store} = $sale;
}
# $hashTab{$store} = $totalsale;
}
}
foreach (keys(%hashTab)) { print "$_ $hashTab{$_}n";
}

For easy understanding, I have kept the above data structure and program very simple.

Before running on cluster, it’s always better to test your code on sample data. The best thing about streaming is that you can test the scripts by simply using Unix Pipes. (Hadoop not required).

$ cat mydata/sales  | mycode/salesMapper  | sort  | mycode/salesReducer

We are good at this legacy structure of data processing unless we require a feature of Hadoop (Parallel processing for massive data).

So, Lets run it on Hadoop cluster with below command.

$ hadoop jar $HADOOP_PREFIX/contrib/streaming/hadoop-streaming-1.2.1.jar 
-input /user/hduser/sales 
-output /user/hduser/out/sales-out 
-mapper /home/hduser/mycode/salesMapper.pl 
-reducer /home/hduser/mycode/salesReducer.pl 

In the above command hadoop-streaming-1.2.1.jar is required, as this is the tool to run the external streaming process.

                                     *** *** ***

2 Comments

Leave a Reply

Your email address will not be published. Required fields are marked *