Tool & ToolRunner – Simplifying the concept

Writing a mapper & reducer Program definition is easy. Just extend your class by org.apache.hadoop.mapreduce.Mapper and org.apache.hadoop.mapreduce.Reducer respectively and override the map and reduce methods to implement your logics.

But, when it comes to write driver program (contain main method of program) for the MapReduce Job, it’s always preferable to use ToolRunner class & Tool interface. These are just simple implementations but slightly confusing to understand as both contain the “run” methods.

In this article, we will understand the control flow logic of this implementation. Before starting on actual details, first try to find answer of a question, which would lead us to initial subject.

Without ToolRunner

Can we write driver class without implementing these tools?

Yes. Let’s write a driver class called WordCount.

public class WordCount {
       public static void main(String[] args) throws Exception {
           Configuration conf = new Configuration();
           Job job = new Job(conf, "wordcount");

          job.setJarByClass(WordCount.class);
          job.setJobName("Word Count");

          job.setOutputKeyClass(Text.class);
          job.setOutputValueClass(IntWritable.class);

          job.setMapperClass(WordCountMap.class);
          job.setReducerClass(WordCountReduce.class);

          FileInputFormat.addInputPath(job, new Path(args[0]));
          FileOutputFormat.setOutputPath(job, new Path(args[1]));

       job.waitForCompletion(true);
    }
  }

The above code works fine. We have created a Configuration object – conf and passed it to Job class. This takes the default configuration set from .xml files in ../hadoop/conf directory.

Hadoop framework has been made highly configurable. It comes with few helper classes that make it easier to run job from command line. One of such class is GenericOptionsParser. It interprets common Hadoop command line options and set them on the configuration object to use/override default configuration on the fly. In other word, it offers us the convenience to set or modify the configurations from command line also.

GenericOptionsParser

The supported generic options are:

GenericOptionsParser is a utility to parse command line generic arguments to the hadoop framework. GenericOptionsParser recognizes several standard command line arguments called Generic Options.

*     -conf <configuration file>     specify a configuration file 
*     -D <property=value>            use value for given property 
*     -fs <local|namenode:port>      specify a namenode 
*     -jt <local|jobtracker:port>    specify a job tracker 
*     -files <comma separated list of files>    specify comma separated 
                             files to be copied to the map reduce cluster
*     -libjars <comma separated list of jars>   specify comma separated 
                                   jar files to include in the classpath.
*     -archives <comma separated list of archives>    specify comma 
             separated archives to be unarchived on the compute machines.

Arguments other than the generic arguments can be obtained by getRemainingArgs() . For example :- Input path and output path can be considered as non-generic options.

Let’s try to implement GenericOptionsParser in the above code snippet. Although, Hadoop never supports the use of GenericOptionsParser directly, but for the purpose of understanding we take the approach for instance.

public class WordCount {
   public static void main(String[] args) throws Exception {
      Configuration conf = new Configuration();
      GenericOptionsParser parser = new GenericOptionsParser(conf,args);
      String[] otherArgs = parser.getRemainingArgs();

     Job job = new Job(conf, "wordcount");
     job.setJarByClass(WordCount.class);
     job.setJobName("Word Count");

     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(IntWritable.class);

     job.setMapperClass(WordCountMap.class);
     job.setReducerClass(WordCountReduce.class);

     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

     job.waitForCompletion(true);
   }
 }

In the above example GenericOptionParser takes the two arguments: conf – the Configuration to modify ; args – command-line arguments.

GenericOptionsParser parse only the generic Hadoop arguments. The array of string arguments other than the generic arguments can be obtained by getRemainingArgs(). Take a look at bold highlighted words. Especially how the input and output path is pulled out from command Line arguments.

We can achieve our purpose from above code. But, it does not look a mature style of coding. Hadoop provides more convenient way through Tool interface and ToolRunner Class, which uses GenericOptionsParser internally.

Implementing ToolRunner

With the above understanding, Let’s take a look at correct approach to write driver class and then try to understand the flow.

public class WordCount extends Configured implements Tool {
     public static void main(String[] args) throws Exception {
         int res = ToolRunner.run(new Configuration(), new WordCount(), args);
         System.exit(res);
     }
     @Override
     public int run(String[] args) throws Exception {
        // When implementing tool
        Configuration conf = this.getConf();
        
        Job job = new Job(conf, "Word Count");
        job.setJarByClass(WordCount.class);

       job.setMapperClass(WordCountMap.class);
       job.setReducerClass(WordCountReduce.class);

       job.setOutputKeyClass(Text.class);
       job.setOutputValueClass(IntWritable.class);

       FileInputFormat.addInputPath(job, new Path(args[0]));
       FileOutputFormat.setOutputPath(job, new Path(args[1]));

       // Execute job and return status
       return job.waitForCompletion(true) ? 0 : 1;
     }
  }

Extending the class with Configured (implementation of Configurable interface) helps run() method to obtain the Configuration using Configurable’s getConf().

In the above program, there are two run(), both are not the same. I have marked the second run() (outside the main) in bold, just to differentiate from first one (red).

The second run() is implemented by Tool Interface. This method is not directly called in main(), but we pass it to ToolRunner.run() through WordCount (driver class) object.

The first run() in main method is another static method in ToolRunner Class. The ToolRunner will invokes our implementation of Tool->run(String[] args), by passing WordCount (driver class) object through ToolRunner.run().

                                        ~~~ ~~~

The above code completes our driver class. But, for the sake of our understanding lets unlock the ToolRunner Class and find underlying implementation of the ToolRunner.run() method.

public static int run(Configuration conf, Tool tool, String[] args)     
   throws Exception{   
     if(conf == null) {     
       conf = new Configuration();   
     }    
     GenericOptionsParser parser = new GenericOptionsParser(conf, args);   
    
     //set the configuration back, so that Tool can configure itself  
     tool.setConf(conf); 
      
    //get the args w/o generic hadoop args   
     String[] toolArgs = parser.getRemainingArgs();   
     return tool.run(toolArgs);  
 }

It is obvious from above code that ToolRunner.run() is exactly the similar implementation of GenericOptionsParser as in previous example, with a slightly different and standard approach. Also, Pay a little attention on last line; The Tool -> run() method has been called , which actually runs our job.

Note : Tool & ToolRunner are implemented through org.apache.hadoop.util package

I hope this blog would help to get clear understanding of Tool & ToolRunner, rather than just implementing it blindly in the program.

**********************Cheers!!

1 Comment

Leave a Reply

Your email address will not be published. Required fields are marked *