How to chain multiple MapReduce jobs in Hadoop?

When running MapReduce jobs it is possible to have several MapReduce steps with overall job scenarios means the last reduce output will be used as input for the next map job.

Map1 -> Reduce1 -> Map2 -> Reduce2 -> Map3…

While searching for an answer to my MapReduce job, I stumbled upon several cool new ways to achieve my objective. Here are some of the ways:

Using Map/Reduce JobClient.runJob() Library to chain jobs:

You can easily chain jobs together in this fashion by writing multiple driver methods, one for each job. Call the first driver method, which uses JobClient.runJob() to run the job and wait for it to complete. When that job has completed, then call the next driver method, which creates a new JobConf object referring to different instances of Mapper and Reducer, etc. The first job in the chain should write its output to a path which is then used as the input path for the second job. This process can be repeated for as many jobs are necessary to arrive at a complete solution to the problem

Method 1:

  • First create the JobConf object “job1” for the first job and set all the parameters with “input” as inputdirectory and “temp” as output directory. Execute this job:
  • Immediately below it, create the JobConf object “job2” for the second job and set all the parameters with “temp” as inputdirectory and “output” as output directory. Finally execute second job:

Method 2:

  • Create two JobConf objects and set all the parameters in them just like (1) except that you don’t use
  • Then create two Job objects with jobconfs as parameters: Job job1=new Job(jobconf1); Job job2=new Job(jobconf2);
  • Using the jobControl object, you specify the job dependencies and then run the jobs: JobControl jbcntrl=new JobControl(“jbcntrl”); jbcntrl.addJob(job1); jbcntrl.addJob(job2); job2.addDependingJob(job1);;

Using Oozie which is Hadoop Workflow Service described as below:

3.1.5 Fork and Join Control Nodes

A fork node splits one path of execution into multiple concurrent paths of execution. A join node waits until every concurrent execution path of a previous fork node arrives to it. fork and join nodes must be used in pairs. The join node assumes concurrent execution paths are children of the same fork node.

The name attribute in the fork node is the name of the workflow fork node. The to attribute in the transition elements in the fork node indicate the name of the workflow node that will be part of the concurrent execution. The name attribute in the join node is the name of the workflow join node. The to attribute in the transition element in the join node indicates the name of the workflow node that will executed after all  3.1.5 Fork and Join Control Nodes 5concurrent execution paths of the corresponding fork arrive to the join node.


<hadoop−workflow name=”sample−wf”>

<fork name=”forking”>

<transition to=”firstparalleljob”/>

<transition to=”secondparalleljob”/>


<hadoop name=”firstparalleljob”>


<transition name=”OK” to=”joining”/>

<transition name=”ERROR” to=”fail”/>


<hadoop name=”secondparalleljob”>


<transition name=”OK” to=”joining”/>

<transition name=”ERROR” to=”fail”/>


<join name=”joining”>

<transition to=”nextaction”/>



Using Cascading Library (GPL):

Cascading is a Data Processing API, Process Planner, and Process Scheduler used for defining and executing complex, scale-free, and fault tolerant data processing workflows on an Apache Hadoop cluster. All without having to ‘think’ inMapReduce.

Using Apache Mahout Recommender Job Sample:

Apache Mahout project has a sample call which chains together multiple MapReduce jobs. You can find the sample here:

Using a simple java library name “Hadoop-orchestration” at GitHub:

This library, enables execution of multiple Hadoop jobs as a workflow. The job configuration and workflow defining inter job dependency is configured in a JSON file. Everything is externally configurable and does not require any change in existing map reduce implementation to be part of a workflow. Details can be found here. Source code and jar is available in github.


How to wipe out the DFS in Hadoop?

If you format only Namenode, it will remove the metadata stored by the Namenode, however all the temporary storage and Datanode blocks will still be there. To remove temporary storage and all the Datanode blocks you would need to delete the main Hadoop storage directory from every node. This directory is defined by the hadoop.tmp.dir property in your hdfs-site.xml.

First you would need to stop all the Hadoop processes in your Namenode. This can be done by running the default stop-all script which will also stop DFS:

  • On Linux – bin/
  • On Windows – C:appsdistbinStopHadoop.cmd

Now you would need to delete all files in your main Hadoop storage based on your Hadoop. The storage directory is defined using Hadoop.tmp.dir parameter in hdfs-site.xml file. Be sure to perform this action on every machine in your cluster i.e Namenodes, JobTrackers, Datanodes etc.:

  • On Linux: hadoop dfs -rmr /
  • On Windows: 
    • hadoop fs -rmr (At Hadoop Command Shell)
    • #rmr (At Interactive JavaScript Shell)

At last you would need to reformat the namenode as below:

  • hadoop namenode -format

Finally you start your cluster again by running the following command which will startup DFS again:

  • On Linux: bin/
  • On Windows: C:appsdistbinStartHadoop.cmd

Video Resources on Machine Learning from Big Data Workshop NIPS2011

Big Learning Workshop: Algorithms, Systems, and Tools for Learning at Scale at NIPS 2011

Invited Talk: Machine Learning and Hadoop by Josh Wills

Abstract: We’ll review common use cases for machine learning and advanced analytics found in our customer base at Cloudera and ways in which Apache Hadoop supports these use cases. We’ll then discuss upcoming developments for Apache Hadoop that will enable new classes of applications to be supported by the system.

Tutorial: Vowpal Wabbit by John Langford

Abstract: We present a system and a set of techniques for learning linear predictors with convex losses on terascale datasets, with trillions of features,footnote{The number of features here refers to the number of non-zero entries in the data matrix.} billions of training examples and millions of parameters in an hour using a cluster of 1000 machines. One of the core techniques used is a new communication infrastructure–often referred to as AllReduce–implemented for compatibility with MapReduce clusters. The communication infrastructure appears broadly reusable for many other tasks.

Tutorial: Group Sparse Hidden Markov Models

Sparse Representation and Low-rank Approximation Workshop at NIPS 2011

Invited Talk: Group Sparse Hidden Markov Models by Jen-Tzung Chien, National Cheng Kung University, Taiwan

Invited Talk: A Common GPU n-Dimensional Array for Python and C by Arnaud Bergeron

Abstract: Currently there are multiple incompatible array/matrix/n-dimensional base object implementations for GPUs. This hinders the sharing of GPU code and causes duplicate development work.This paper proposes and presents a first version of a common GPU n-dimensional array(tensor) named GpuNdArray~citep{GpuNdArray} that works with both CUDA and OpenCL.It will be usable from python, C and possibly other languages.

Invited Talk: A Topic Model for Melodic Sequences by Athina Spiliopoulou

Athina is a PhD student in the Machine Learning group of the Institute for Adaptive and Neural Computation at the School of Informatics,University of Edinburgh. She works with Amos Storkey on Machine Learning methods for music, with a specific interest in unsupervised learning of musical structure from melodic sequences.