Hadoop Job and Task Name Classification and Convention

Hadoop MapReduce jobs and tasks have preconfigured naming convention so during job analysis or troubleshooting you can very easily understand what and where to look for.

hadoop

 

 

 

 

 

 

 

Here is some key information with regard to Hadoop jobs and mappers/reducers tasks naming classification and convention:

Job Name convention:

  • job_{DATE-TIME-WHEN-TASK-TRACKER-WAS-STARTED}_JobID
    • First Part – “job” keyword is assigned for job
    • Second Part – Full date and time when task tracker was started
    • Third Part – It is the job counter since task tracker was running

A task is unit of job execution consist of mappers and reducers. The total number of mappers and reducers are created when a job is submitted and based on number of mappers and reducer slots are available in a Hadoop cluster, job tracker send these tasks. There are two kind of tasks

  1. Mapper

    1. There are 3 kind of mappers
      1. Work mapper – These tasks are the actual mapper tasks which perform the identical work as other mappers. The ID for these mappers tasks starts with 0 and ends with Total – 1.
      2. Setup Mapper – This is the very last mapper task.
      3. Closeup Mapper – This is the task which clean the overall work. The ID for this task is “Total tasks – 1”. (See the example below to understand it clearly)
      4. Note: Both Setup and Closeup mappers are not counted in the actual mappers calculation. Also depending tasks count it is possible to have more than 1 cleanup task also.
  2. Reducer

    1. There are only 1 kind of reducer.

Task Name convention: 

  • For mapper
    • task_{DATE-TIME-WHEN-TASK-TRACKER-WAS-STARTED_JobID}_m_{6-Digit-Mapper-ID}_{mapper-instance}
  • For reducer
    • task_{DATE-TIME-WHEN-TASK-TRACKER-WAS-STARTED_JobID}_r_{6-Digit-Mapper-ID}_{reducer-instance}

Here is an Example:

  • Job ID
    • job_201307091604_1081
      • job – job
      • 201307091604 – The time when the Hadoop cluster was started
        • 2013/07/09 – Date
        • 16:04 (4:04 PM)
      • 1081 – Job ID
  • Mappers (Ex total 20 -> 000000 – 000019)
    • task_201307091604_1081_m_000000_0
      • First instance of mapper task (ID – 000)
    • task_201307091604_1081_m_000010_0
    • task_201307091604_1081_m_000010_1
    • task_201307091604_1081_m_000010_2
      • Above 3 instance of Same MapReduce task (ID – 010)
    • task_201307091604_1081_m_000019_0
      • First instance of last Mapper task (ID –  019)
  • Reducers (Total 6)
    • task_201307091604_1081_r_000000_0
      • First Instance of first reducer task (ID – 000)
    • task_201307091604_1081_r_000005_0
      • First instance of 6th reducer task (ID – 005)
  • Besides above there are 2 more mapper tasks added in every job as
    • Setup task
      • Even when it is Setup task however this task counter is very last
    • Cleanup task
      • This task ID will be “LAST – 1”
    • For example if you have total 20 mappers then Setup task ID will 21 and Cleanup taks will be 20.
      • 0 – 19 – total 20 mappers
      • 20 – cleanup task
      • 21 – setup task

12 key steps to keep your Hadoop Cluster running strong and performing optimum

1. Hadoop deployment on 64bit OS:

  • 32bit OS have 3GB limit on Java Heap size so make sure that Hadoop Namenode/Datanode is running on 64bit OS.

2. Mapper and Reducer count setup:

This is cluster specific value and reflects total number of mapper and reducers per tasktracker.

conf/mapred-site.xml mapred.tasktracker.map.tasks.maximum N The maximum number of map task slots to run simultaneously
conf/mapred-site.xml mapred.tasktracker.reduce.tasks.maximum N The maximum number of reduce task slots to run simultaneously

 

 

 

If no value is set the default is 2 and -1 specifies that the number of map/reduce task slots is based on the total amount of memory reserved for MapReduce by the sysadmin.

To set this value you would need to consider tasktracker CPU (+/- HT), DISK and Memory in account along with if your job is CPU intensive or not from a degree 1-10. For example if tasktracker is a quad core CPU with hyper-threading box, then there will be 4 physical and 4 virtual, total 8 CPU. For a high CPU intensive job we sure can assign 4 mappers and 4 reducer tasks however for a far less CPU intensive job, we can have up to 40 mappers & 40 reducers. You don’t need to have mapper or reducers count same as it is all depend on how the job are created. We can also have 6 Mappers and 2 Reducer also depend on  how much work is done by each mapper and reduce and to get this info, we can look at job specific counters. The number of mappers and reducer per tasktracker is depend of CPU utilization per task. You can also look at each reduce task counter to see how long CPU was utilized for the total map/reduce task time. If there is long wait then you may need to reduce the count however if everything is done very fast, it gives you some idea on adding either mapper or reducer count per tasktracker.

Users must understand that having larger mapper count compare to physical CPU cores, will result in CPU context switching, which may result as an overall slow job completion. However a balanced per CPU job configuration may results faster job completion results.

 

3. Per Task JVM Memory Configuration:

This particular memory configuration is important to setup based on total RAM in each tasktracker.

conf/mapred-site.xml mapred.child.java.opts -Xmx{YOUR_Value}M Larger heap-size for child jvms of maps/reduces.

 

The value for above parameter is depend on total mapper and reducer task per tasktracker so you must know these two parameters before setting. Here are few ways to calculate proper values for these parameters:

  • Lets consider there are 4 mappers and 4 reducer per tasktracker with 32GB total RAM in each machine
    • In this scenario there will be total 8 tasks running in any tasktracker
    • Lets consider about 2-4 GB RAM is required for Tasktracker to perform other jobs so there is about ~28GB RAM available for Hadoop Tasks
    • Now we can divide 28/8 and get 3.5GB per task RAM
    • The value in this case will be -Xmx3500M
  • Lets consider there are 8 mappers and 4 reducer per tasktracker with 32GB total RAM
    • In this scenario there will be total 12 tasks running in any tasktracker
    • Lets consider about 2-4 GB RAM is required for Tasktracker to perform other jobs so there is about ~28GB RAM available for Hadoop Tasks
    • Now we can divide 28/12 and get 2.35GB per task RAM
    • The value in this case will be -Xmx2300M
  • Lets consider there are 12 mappers and 8 reducer per tasktracker with 128GB total RAM, also one specific node is working as secondary namenode
    • It is not suggested to keep Secondary Namenode with Datanode/TaskTracker however in this example we will keep it here for the sake of calculation.
    • In this scenario there will be total 20 tasks running in any tasktracker
    • Lets consider about  8 GB RAM is required for Secondary namenode to perform its jobs and  4GB  for other jobs so there is about ~100GB RAM available for Hadoop Tasks
    • Now we can divide 100/20 and get 5GB per task RAM
    • The value in this case will be around -Xmx5000M
  • Note:
    • HDP 1.2 have some new JVM specific configuration which can be used for much more granular memory setting.
    • If Hadoop cluster does not have identical machines in memory (i.e. a collection of machines with 32GB & 64GB RAM) then user should use lower memory configuration as the base line.
    • It is always best to have ~20% memory left for other processes.
    • Do not overcommit the memory for total tasks, it sure will cause JVM OOM errors.

4. Setting mapper or reducer memory limit to unlimited:

Setting both mapred.job.{map|reduce}.memory.mb value to -1 or maximum helps mapreduce  jobs use maximum amount memory available.

mapred.job.map.memory.mb
-1
This property’s value sets the virtual memory size of a single map task for the job.
mapred.job.reduce.memory.mb -1 This property’s value sets the virtual memory size of a single reduce task for the job

 

5. Setting No limit (or Maximum) for total number of tasks per job:

Setting this value to a certain limit put constraints on mapreduce job completion & performance. It is best to set it as -1 so it can use the maximum available.

mapred.jobtracker.maxtasks.per.job -1 Set this property’s value to any positive integer to set the maximum number of tasks for a single job. The default value of -1 indicates that there is no maximum.

6. Memory configuration for sorting data within processes:

There are two values io.sort.factor and io.sort.mb in this segment.  Based on experience this value io.sort.mb should be 25-30% of mapred.child.java.opts value.

conf/core-site.xml io.sort.factor 100 More streams merged at once while sorting files.
conf/core-site.xml io.sort.mb NNN Higher memory-limit while sorting data.

So for example if mapred.child.java.opts is 2 GB, io.sort.mb can be 500MB or if mapred.child.java.opts is 3.5 GB then io.sort.mb can be 768MB.

Also after running a few mapreduce jobs, analyzing log messages will help you to determine a better settings for io.sort.mb memory size. User must know that having a low io.sort.mb will cause lot more time in sort procedure, however a higher value may result job failure.

 

7. Reducer Parallel copies configuration:

A large number of parallel copies would cause high memory utilization and cause java heap error. However a small number would cause slow job completion. Keeping this valve to optimum helps mapreduce jobs complete faster.

conf/mapred-site.xml mapred.reduce.parallel.copies 20 The default number of parallel transfers run by reduce during the copy(shuffle) phase.

Higher number of parallel copies run by reduces to fetch outputs from very large number of maps.

This value is very much network specific. Having a larger value means higher network activity between tasktrackers. With higher parallel reduce copies, reducers will create many network connections which congest the network in a Hadoop cluster. A lower number helps stable network connectivity in a Hadoop cluster. Users should choose this number depending on their network strength.  I think the recommended value can be between 12-18 in a gigabit network.

 

8. Setting Reducer Input limit to maximum:

Sometimes setting a lower limit to reducer input size may cause job failures. It is best to set the reducer input limit to maximum.

conf/mapred-site.xml mapreduce.reduce.input.limit -1 The limit on the input size of the reduce. If the estimated input size of the reduce is greater than this value, job is failed. A value of -1 means that there is no limit set.

This value is based on disk size and available space in the tasktracker. So if there is a cluster in which each datanode has variation in configured disk space, setting a specific value may cause job failures. Setting this value to -1 helps reducers to work based on available space.

 

9. Setting Map input split size:

During a mapreduce job execution,  map jobs are created per split. Having split size set to 0 helps jobtracker  to decide the split size based on data source.

mapred.min.split.size 0 The minimum size chunk that map input should be split into. File formats with minimum split sizes take priority over this setting.

10. Setting HDFS block size:

  • Currently I have seen various Hadoop clusters running great with variety of HDFS block sizes.
  • A user can set dfs.block.size in hdfs-site.xml between 64MB and 1GB or more.

11. Setting  user priority, “High” in Hadoop Cluster:

  • In Hadoop clusters jobs, are submitted based on users priority if certain type of job scheduler are configured
  • If a hadoop user is lower in priority, the mappers and reducers task will have to wait longer to get task slots in tasktracker. This could ultimately cause longer mapreduce jobs.
    • In some cases a time out could occur and the mapreduce job may fail
  • If a job scheduler is configured, submitting job through high  job scheduling priority user, will result faster job completion in a Hadoop cluster.

 

12. Secondary Namenode or Highly Available Namenode Configuration:

  • Having secondary namenode or Highly Available namenode helps Hadoop cluster to be always/highly available.
  • However I  have seen some cases where secondary namenode or HA namenode is running on a datanode which could impact the cluster performance.
  • Keeping Secondary Namenode or High Available Namenode separate from Datanode/JobTracker helps dedicated resources available for tasks assigned to the tasktracker.

 

What to do when compiling Hadoop branch 1.2.x returns java.io.IOException: Cannot run program "autoreconf"

Compiling Hadoop branch 1.2.x code in OSX returned exception as below:

create-native-configure:

BUILD FAILED
/Users/avkash/work/hadoop/branch-1.2/build.xml:634: Execute failed: java.io.IOException: Cannot run program “autoreconf” (in directory “/Users/avkash/work/hadoop/branch-1.2/src/native”): error=2, No such file or directory
at java.lang.ProcessBuilder.processException(ProcessBuilder.java:478)
at java.lang.ProcessBuilder.start(ProcessBuilder.java:457)
at java.lang.Runtime.exec(Runtime.java:593)
at org.apache.tools.ant.taskdefs.Execute$Java13CommandLauncher.exec(Execute.java:862)
at org.apache.tools.ant.taskdefs.Execute.launch(Execute.java:481)
at org.apache.tools.ant.taskdefs.Execute.execute(Execute.java:495)
at org.apache.tools.ant.taskdefs.ExecTask.runExecute(ExecTask.java:631)
at org.apache.tools.ant.taskdefs.ExecTask.runExec(ExecTask.java:672)
at org.apache.tools.ant.taskdefs.ExecTask.execute(ExecTask.java:498)
at org.apache.tools.ant.UnknownElement.execute(UnknownElement.java:291)
at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.tools.ant.dispatch.DispatchUtils.execute(DispatchUtils.java:106)
at org.apache.tools.ant.Task.perform(Task.java:348)
at org.apache.tools.ant.Target.execute(Target.java:390)
at org.apache.tools.ant.Target.performTasks(Target.java:411)
at org.apache.tools.ant.Project.executeSortedTargets(Project.java:1399)
at org.apache.tools.ant.Project.executeTarget(Project.java:1368)
at org.apache.tools.ant.helper.DefaultExecutor.executeTargets(DefaultExecutor.java:41)
at org.apache.tools.ant.Project.executeTargets(Project.java:1251)
at org.apache.tools.ant.Main.runBuild(Main.java:809)
at org.apache.tools.ant.Main.startAnt(Main.java:217)
at org.apache.tools.ant.launch.Launcher.run(Launcher.java:280)
at org.apache.tools.ant.launch.Launcher.main(Launcher.java:109)
Caused by: java.io.IOException: error=2, No such file or directory
at java.lang.UNIXProcess.forkAndExec(Native Method)
at java.lang.UNIXProcess.<init>(UNIXProcess.java:53)
at java.lang.ProcessImpl.start(ProcessImpl.java:91)
at java.lang.ProcessBuilder.start(ProcessBuilder.java:452)
… 23 more

Looking src/native there is no “autoconf” shipped with the source so to solve this problem the best option is to install

└─[1] <git:(master✗✈)> brew install automake
==> Installing automake dependency: autoconf
==> Downloading http://ftpmirror.gnu.org/autoconf/autoconf-2.69.tar.gz
######################################################################## 100.0%
==> ./configure –prefix=/usr/local/Cellar/autoconf/2.69
==> make install
🍺 /usr/local/Cellar/autoconf/2.69: 69 files, 2.0M, built in 21 seconds
==> Installing automake
==> Downloading http://ftpmirror.gnu.org/automake/automake-1.14.tar.gz
######################################################################## 100.0%
==> ./configure –prefix=/usr/local/Cellar/automake/1.14
==> make install
🍺 /usr/local/Cellar/automake/1.14: 127 files, 2.5M, built in 7 seconds

Thats all!!

After than just run the following commands to make sure the same exception does not come back:

$ant create-native-configure

Customized bash command prompt with line separator and other goodies

I wanted to have a fancy looking and very useful terminal windows with customize command prompt so after digging I build something as below for me:

Image

So what it have:

  • Line Separator including current time at the end of the terminal
  • History counter along with current command counter
  • Logged user @ Hostname
  • Current working folder $

Here is what I have done. First created a file call .avkashbash_profile at my $HOME folder as below:

fill=”-“
reset_style='[33[00m]’
status_style=$reset_style'[33[0;32m]’ # gray color; use 0;37m for lighter color
prompt_style=$reset_style
command_style=$reset_style'[33[0;32m]’ # bold black
# Prompt variable:
PS1=”$status_style”‘$fill[T]n'”$prompt_style”‘${debian_chroot:+($debian_chroot)}e[0;31me[47m[!:#]e[
0m-e[1;33m[u@h]e[0mn[w]e[0;32m$ ‘”$command_style “
# Reset color for command output
# (this one is invoked every time before a command is executed):
function prompt_command {
# create a $fill of all screen width minus the time string and a space:
let fillsize=${COLUMNS}-11
fill=””
while [ “$fillsize” -gt “0” ]
do
fill=”-${fill}” # fill with underscores to work on
let fillsize=${fillsize}-1
done
}
PROMPT_COMMAND=prompt_command

To make the setting permanent, just add the following code in .bash_profile first:

if [ -f “$HOME/.avkashbash_ps1” ]; then
. “$HOME/.avkashbash_ps1”
fi

And then run the following command to set it:

$sources .bash_profile

Or if you don’t want to make it permanent, just add the following code to .bashrc first:

if [ -f “$HOME/.avkashbash_ps1” ]; then
. “$HOME/.avkashbash_ps1”
fi

And then run the following command to set it:

$sources .bashrc

Thats all.

Thanks for the guys at here and here!!

Building Hadoop Source in OSX

Step 1. Select your desired Hadoop Branch from a list below:

https://svn.apache.org/repos/asf/hadoop/common/branches/

Step 2. Use svn to checkout and download source from the branch i.e.

$ svn co https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.0.5-alpha/ hadoop-2.0.5

Note: Above command will download Hadoop Branch 2.0.5 Alpha source code to a folder name hadoop-2.0.5.

Step 3: Change your current folder to hadoop-2.0.5 folder which will be considered as Hadoop source root folder.

Step 4:  Now open pom.xml and verify hadoop-main version as below to make sure this is the branch your are targeting to build for:

<artifactId>hadoop-main</artifactId>
<version>2.0.5-alpha</version>

Step 5: Now open BUILDING.txt file and put your attention at requirement as described below:

* JDK 1.6
* Maven 3.0
* Findbugs 1.3.9 (if running findbugs)
* ProtocolBuffer 2.4.1+ (for MapReduce and HDFS)
* CMake 2.6 or newer (if compiling native code)
* Internet connection for first build (to fetch all Maven and Hadoop dependencies)

Step 6 : Make sure you do have everything needed in step 5 and if now use the info below to install required components:

  • Maven 3.0.4 works fine
  • For ProtocolBuffer just download it from here
  • $ ./configure
  • $ make
  • $ make install
  • For CMake you can use brew on OSX
  • $ brew install cmake

Step 7: Now be at your Hadoop source root and run the following commands in order to compile source, and build package

  •  $ mvn -version
  •  $ mvn clean
  •  $ mvn install  -DskipTests
  •  $ mvn compile  -DskipTests
  •  $ mvn package  -DskipTests
  •  $ mvn package -Pdist -DskipTests -Dtar

Now you can dive into hadoop-2.0.5/hadoop-dist/target/hadoop-2.0.5-alpha/bin folder and run the Hadoop commands i.e. hadoop, hdfs, mapred etc as below:

~/work/hadoop-2.0.5/hadoop-dist/target/hadoop-2.0.5-alpha/bin$ ./hadoop version
Hadoop 2.0.5-alpha
Subversion https://svn.apache.org/repos/asf/hadoop/common -r 1511192
Compiled by hadoopworld on 2013-08-07T07:01Z
From source with checksum c8f4bd45ac25c31b815f311b32ef17
This command was run using ~/work/hadoop-2.0.5/hadoop-dist/target/hadoop-2.0.5-alpha/share/hadoop/common/hadoop-common-2.0.5-alpha.jar