Learning Cloudera Impala Book with a great **Buy One, Get One Free offer** from Pack Publishing Company

Here are some facts about Cloudera Impala as provided recently by Cloudera:

  • Impala is being widely adopted.
    Impala has been downloaded by people in more than 5,000 unique organizations, and Cloudera Enterprise customers from multiple industries and Fortune 100 companies have deployed it as supported infrastructure, integrated with their existing BI tools, in business-critical environments. Furthermore, MapR and Amazon Web Services (via its Elastic MapReduce service) recently announced support for Impala in their own platforms.
  • Impala has demonstrated production readiness.
    Impala has proved out its superior performance for interactive queries in a concurrent-workloads environment, while providing reliable resource sharing compared to batch-processing alternatives like Apache Hive.
  • Impala sports an ever-increasing list of enterprise features.
    Planned new enterprise features are materializing rapidly and according to plan — including the addition of fine-grained, role-based authorizationuser-defined functions (UDFs), cost-based optimization, and pre-built analytic functions.

LearningClouderaImpala

Buy my book  Learning Cloudera Impala from #Packt Publishing Company with amazing Buy One, Get One Free offer bit.ly/1j26nPN.

Shark vs. Impala, Hive and Amazon Redshift

Source: AMPlab (UC-Berkeley). Check out this blog post for more details.

Source: AMPlab (UC-Berkeley)

Pivotal HD Hawq vs. Cloudera Impala and Apache Hive

Source: Pivotal. Check out this whitepaper for more details.

Source: Pivotal

Cloudera Impala is Open Source and you can grab the source from github using the link below:

https://github.com/cloudera/impala

Advertisements

Learning Cloudera Impala – Book Availability

Learning Cloudera Impala:

 
Using Cloudera Impala is for those who really want to take advantage of their Hadoop cluster by processing extremely large amounts of raw data in Hadoop at real-time speed. Prior knowledge of Hadoop and some exposure to HIVE and MapReduce is expected.
 
LearningClouderaImpala

Learning Cloudera Impala

You will learn from this book:

  • Understand the various ways of installing Impala in your Hadoop cluster
  • Use the Impala shell API to interact with Impala components
  • Utilize Impala Query Language and built-in functions to play with data
  • Administrate and fine-tune Impala for high availability
  • Identify and troubleshoot problems in a variety of ways
  • Get acquainted with various input data formats in Hadoop and how to use them with Impala
  • Comprehend how third party applications can connect with Impala to provide data visualization and various other enhancements

Available at:

Screen Shot 2013-12-28 at 11.21.15 AM

Screen Shot 2013-12-28 at 11.20.32 AMScreen Shot 2013-12-28 at 11.20.54 AM

Continue reading

20TB Earth Science Dataset on AWS With NASA / NEX available for Public

AWS has been working with the NASA Earth Exchange (NEX) team to make it easier and more efficient for researchers to access and process earth science data. The goal is to make a number of important data sets accessible to a wider audience of full-time researchers, students, and citizen scientists. This important new project is called OpenNEX. Up until now, it has been logistically difficult for researchers to gain easy access to this data due to its dynamic nature and immense size (tens of terabytes). Limitations on download bandwidth, local storage, and on-premises processing power made in-house processing impractical.

nasa_nex_landsat_us_2005_forest_leaf_area_1

Access Dataset: s3://nasanex/NEX-DCP30

Consult the detail page and the tech note to learn more about the provenance, format, structure, and attribution requirements.

NASA Earth Exchange (NEX):

The NASA Earth Exchange (NEX) Downscaled Climate Projections (NEX-DCP30) dataset is comprised of downscaled climate scenarios for the conterminous United States that are derived from the General Circulation Model (GCM) runs conducted under the Coupled Model Intercomparison Project Phase 5 (CMIP5) [Taylor et al. 2012] and across the four greenhouse gas emissions scenarios known as Representative Concentration Pathways (RCPs) [Meinshausen et al. 2011] developed for the Fifth Assessment Report of the Intergovernmental Panel on Climate Change (IPCC AR5). The dataset includes downscaled projections from 33 models, as well as ensemble statistics calculated for each RCP from all model runs available. The purpose of these datasets is to provide a set of high resolution, bias-corrected climate change projections that can be used to evaluate climate change impacts on processes that are sensitive to finer-scale climate gradients and the effects of local topography on climate conditions.

Each of the climate projections includes monthly averaged maximum temperature, minimum temperature, and precipitation for the periods from 1950 through 2005 (Retrospective Run) and from 2006 to 2099 (Prospective Run).

Website: NASA NEX

Summary

  • Short Name: NEX-DCP30
  • Version: 1
  • Format: netCDF4 classic
  • Spatial Coverage: CONUS
  • Temporal Coverage:
    • 1950 – 2005 historical or 2006 – 2099 RCP
  • Data Resolution:
    • Latitude Resolution: 30 arc second
    • Longitude Resolution: 30 arc second
    • Temporal Resolution: monthly
  • Data Size:
    • Total Dataset Size: 17 TB
    • Individual file size: 2 GB

Learn more about NEX – NASA Earth Exchange Downscaled Project

NEX Virtual Workshop: https://nex.nasa.gov/nex/projects/1328/

 

Hadoop MapReduce job failure with java.io.IOException: Task process exit with nonzero status of 137

While working with Amazon EMR, it is possible that you might see an exception as below with your failed map/reduce task:

java.lang.Throwable: Child Error
at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:271)
Caused by: java.io.IOException: Task process exit with nonzero status of 137. 
at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:258)

The Root Cause:

The potential problem in this case is that, the Hadoop mapper or reducer tasks are killed by Linux OS due to oversubscribing memory.  Keen in mind that this issue  Linux OOM killer and not the Java OOM Killer. In general such issue occurs when a particular process is configured to use lots more memory then the OS could provide and due to over memory subscription OS has no other way to kill the process.

With Amazon EMR such job failure could happen very easily if use have configured mapred.child.java.opts setting to way high comparative to specific EMR instance type. This is because each EMR instance type has preconfigured setting for Map and Reduce jobs and a misconfiguration could lead to this problem.

An Example:

For example the EMR Instance type is m1.large which means it has 768 MB memory allocated for each Map or Reduce task in Hadoop job as below:

m1.xlarge

Parameter Value
HADOOP_JOBTRACKER_HEAPSIZE 6912
HADOOP_NAMENODE_HEAPSIZE 2304
HADOOP_TASKTRACKER_HEAPSIZE 384
HADOOP_DATANODE_HEAPSIZE 384
mapred.child.java.opts -Xmx768m
mapred.tasktracker.map.tasks.maximum 8
mapred.tasktracker.reduce.tasks.maximum 3

However in the mapred-site.xml if user setup the mapred.child.java.opts to way high value i.e. 8GB as below:

<property>
<name>mapred.child.java.opts</name>
<value>-Xmx8192m</value>
</property>

The above configuration would cause Linux OS to kill Mapper or Reduce task due to very high memory subscription. Linus OS will kill the same process even when it is configured to use 4GB also because it is way over the configured limit.

The solution:

The solution  to this problem is to let your job use default mapreduce settings instead setting by your self during the job submission. Using default mapreduce setting helps jobtracker to run the task under  whatever settings Amazon EMR instance already have configured.

 

Hadoop Job and Task Name Classification and Convention

Hadoop MapReduce jobs and tasks have preconfigured naming convention so during job analysis or troubleshooting you can very easily understand what and where to look for.

hadoop

 

 

 

 

 

 

 

Here is some key information with regard to Hadoop jobs and mappers/reducers tasks naming classification and convention:

Job Name convention:

  • job_{DATE-TIME-WHEN-TASK-TRACKER-WAS-STARTED}_JobID
    • First Part – “job” keyword is assigned for job
    • Second Part – Full date and time when task tracker was started
    • Third Part – It is the job counter since task tracker was running

A task is unit of job execution consist of mappers and reducers. The total number of mappers and reducers are created when a job is submitted and based on number of mappers and reducer slots are available in a Hadoop cluster, job tracker send these tasks. There are two kind of tasks

  1. Mapper

    1. There are 3 kind of mappers
      1. Work mapper – These tasks are the actual mapper tasks which perform the identical work as other mappers. The ID for these mappers tasks starts with 0 and ends with Total – 1.
      2. Setup Mapper – This is the very last mapper task.
      3. Closeup Mapper – This is the task which clean the overall work. The ID for this task is “Total tasks – 1”. (See the example below to understand it clearly)
      4. Note: Both Setup and Closeup mappers are not counted in the actual mappers calculation. Also depending tasks count it is possible to have more than 1 cleanup task also.
  2. Reducer

    1. There are only 1 kind of reducer.

Task Name convention: 

  • For mapper
    • task_{DATE-TIME-WHEN-TASK-TRACKER-WAS-STARTED_JobID}_m_{6-Digit-Mapper-ID}_{mapper-instance}
  • For reducer
    • task_{DATE-TIME-WHEN-TASK-TRACKER-WAS-STARTED_JobID}_r_{6-Digit-Mapper-ID}_{reducer-instance}

Here is an Example:

  • Job ID
    • job_201307091604_1081
      • job – job
      • 201307091604 – The time when the Hadoop cluster was started
        • 2013/07/09 – Date
        • 16:04 (4:04 PM)
      • 1081 – Job ID
  • Mappers (Ex total 20 -> 000000 – 000019)
    • task_201307091604_1081_m_000000_0
      • First instance of mapper task (ID – 000)
    • task_201307091604_1081_m_000010_0
    • task_201307091604_1081_m_000010_1
    • task_201307091604_1081_m_000010_2
      • Above 3 instance of Same MapReduce task (ID – 010)
    • task_201307091604_1081_m_000019_0
      • First instance of last Mapper task (ID –  019)
  • Reducers (Total 6)
    • task_201307091604_1081_r_000000_0
      • First Instance of first reducer task (ID – 000)
    • task_201307091604_1081_r_000005_0
      • First instance of 6th reducer task (ID – 005)
  • Besides above there are 2 more mapper tasks added in every job as
    • Setup task
      • Even when it is Setup task however this task counter is very last
    • Cleanup task
      • This task ID will be “LAST – 1”
    • For example if you have total 20 mappers then Setup task ID will 21 and Cleanup taks will be 20.
      • 0 – 19 – total 20 mappers
      • 20 – cleanup task
      • 21 – setup task

12 key steps to keep your Hadoop Cluster running strong and performing optimum

1. Hadoop deployment on 64bit OS:

  • 32bit OS have 3GB limit on Java Heap size so make sure that Hadoop Namenode/Datanode is running on 64bit OS.

2. Mapper and Reducer count setup:

This is cluster specific value and reflects total number of mapper and reducers per tasktracker.

conf/mapred-site.xml mapred.tasktracker.map.tasks.maximum N The maximum number of map task slots to run simultaneously
conf/mapred-site.xml mapred.tasktracker.reduce.tasks.maximum N The maximum number of reduce task slots to run simultaneously

 

 

 

If no value is set the default is 2 and -1 specifies that the number of map/reduce task slots is based on the total amount of memory reserved for MapReduce by the sysadmin.

To set this value you would need to consider tasktracker CPU (+/- HT), DISK and Memory in account along with if your job is CPU intensive or not from a degree 1-10. For example if tasktracker is a quad core CPU with hyper-threading box, then there will be 4 physical and 4 virtual, total 8 CPU. For a high CPU intensive job we sure can assign 4 mappers and 4 reducer tasks however for a far less CPU intensive job, we can have up to 40 mappers & 40 reducers. You don’t need to have mapper or reducers count same as it is all depend on how the job are created. We can also have 6 Mappers and 2 Reducer also depend on  how much work is done by each mapper and reduce and to get this info, we can look at job specific counters. The number of mappers and reducer per tasktracker is depend of CPU utilization per task. You can also look at each reduce task counter to see how long CPU was utilized for the total map/reduce task time. If there is long wait then you may need to reduce the count however if everything is done very fast, it gives you some idea on adding either mapper or reducer count per tasktracker.

Users must understand that having larger mapper count compare to physical CPU cores, will result in CPU context switching, which may result as an overall slow job completion. However a balanced per CPU job configuration may results faster job completion results.

 

3. Per Task JVM Memory Configuration:

This particular memory configuration is important to setup based on total RAM in each tasktracker.

conf/mapred-site.xml mapred.child.java.opts -Xmx{YOUR_Value}M Larger heap-size for child jvms of maps/reduces.

 

The value for above parameter is depend on total mapper and reducer task per tasktracker so you must know these two parameters before setting. Here are few ways to calculate proper values for these parameters:

  • Lets consider there are 4 mappers and 4 reducer per tasktracker with 32GB total RAM in each machine
    • In this scenario there will be total 8 tasks running in any tasktracker
    • Lets consider about 2-4 GB RAM is required for Tasktracker to perform other jobs so there is about ~28GB RAM available for Hadoop Tasks
    • Now we can divide 28/8 and get 3.5GB per task RAM
    • The value in this case will be -Xmx3500M
  • Lets consider there are 8 mappers and 4 reducer per tasktracker with 32GB total RAM
    • In this scenario there will be total 12 tasks running in any tasktracker
    • Lets consider about 2-4 GB RAM is required for Tasktracker to perform other jobs so there is about ~28GB RAM available for Hadoop Tasks
    • Now we can divide 28/12 and get 2.35GB per task RAM
    • The value in this case will be -Xmx2300M
  • Lets consider there are 12 mappers and 8 reducer per tasktracker with 128GB total RAM, also one specific node is working as secondary namenode
    • It is not suggested to keep Secondary Namenode with Datanode/TaskTracker however in this example we will keep it here for the sake of calculation.
    • In this scenario there will be total 20 tasks running in any tasktracker
    • Lets consider about  8 GB RAM is required for Secondary namenode to perform its jobs and  4GB  for other jobs so there is about ~100GB RAM available for Hadoop Tasks
    • Now we can divide 100/20 and get 5GB per task RAM
    • The value in this case will be around -Xmx5000M
  • Note:
    • HDP 1.2 have some new JVM specific configuration which can be used for much more granular memory setting.
    • If Hadoop cluster does not have identical machines in memory (i.e. a collection of machines with 32GB & 64GB RAM) then user should use lower memory configuration as the base line.
    • It is always best to have ~20% memory left for other processes.
    • Do not overcommit the memory for total tasks, it sure will cause JVM OOM errors.

4. Setting mapper or reducer memory limit to unlimited:

Setting both mapred.job.{map|reduce}.memory.mb value to -1 or maximum helps mapreduce  jobs use maximum amount memory available.

mapred.job.map.memory.mb
-1
This property’s value sets the virtual memory size of a single map task for the job.
mapred.job.reduce.memory.mb -1 This property’s value sets the virtual memory size of a single reduce task for the job

 

5. Setting No limit (or Maximum) for total number of tasks per job:

Setting this value to a certain limit put constraints on mapreduce job completion & performance. It is best to set it as -1 so it can use the maximum available.

mapred.jobtracker.maxtasks.per.job -1 Set this property’s value to any positive integer to set the maximum number of tasks for a single job. The default value of -1 indicates that there is no maximum.

6. Memory configuration for sorting data within processes:

There are two values io.sort.factor and io.sort.mb in this segment.  Based on experience this value io.sort.mb should be 25-30% of mapred.child.java.opts value.

conf/core-site.xml io.sort.factor 100 More streams merged at once while sorting files.
conf/core-site.xml io.sort.mb NNN Higher memory-limit while sorting data.

So for example if mapred.child.java.opts is 2 GB, io.sort.mb can be 500MB or if mapred.child.java.opts is 3.5 GB then io.sort.mb can be 768MB.

Also after running a few mapreduce jobs, analyzing log messages will help you to determine a better settings for io.sort.mb memory size. User must know that having a low io.sort.mb will cause lot more time in sort procedure, however a higher value may result job failure.

 

7. Reducer Parallel copies configuration:

A large number of parallel copies would cause high memory utilization and cause java heap error. However a small number would cause slow job completion. Keeping this valve to optimum helps mapreduce jobs complete faster.

conf/mapred-site.xml mapred.reduce.parallel.copies 20 The default number of parallel transfers run by reduce during the copy(shuffle) phase.

Higher number of parallel copies run by reduces to fetch outputs from very large number of maps.

This value is very much network specific. Having a larger value means higher network activity between tasktrackers. With higher parallel reduce copies, reducers will create many network connections which congest the network in a Hadoop cluster. A lower number helps stable network connectivity in a Hadoop cluster. Users should choose this number depending on their network strength.  I think the recommended value can be between 12-18 in a gigabit network.

 

8. Setting Reducer Input limit to maximum:

Sometimes setting a lower limit to reducer input size may cause job failures. It is best to set the reducer input limit to maximum.

conf/mapred-site.xml mapreduce.reduce.input.limit -1 The limit on the input size of the reduce. If the estimated input size of the reduce is greater than this value, job is failed. A value of -1 means that there is no limit set.

This value is based on disk size and available space in the tasktracker. So if there is a cluster in which each datanode has variation in configured disk space, setting a specific value may cause job failures. Setting this value to -1 helps reducers to work based on available space.

 

9. Setting Map input split size:

During a mapreduce job execution,  map jobs are created per split. Having split size set to 0 helps jobtracker  to decide the split size based on data source.

mapred.min.split.size 0 The minimum size chunk that map input should be split into. File formats with minimum split sizes take priority over this setting.

10. Setting HDFS block size:

  • Currently I have seen various Hadoop clusters running great with variety of HDFS block sizes.
  • A user can set dfs.block.size in hdfs-site.xml between 64MB and 1GB or more.

11. Setting  user priority, “High” in Hadoop Cluster:

  • In Hadoop clusters jobs, are submitted based on users priority if certain type of job scheduler are configured
  • If a hadoop user is lower in priority, the mappers and reducers task will have to wait longer to get task slots in tasktracker. This could ultimately cause longer mapreduce jobs.
    • In some cases a time out could occur and the mapreduce job may fail
  • If a job scheduler is configured, submitting job through high  job scheduling priority user, will result faster job completion in a Hadoop cluster.

 

12. Secondary Namenode or Highly Available Namenode Configuration:

  • Having secondary namenode or Highly Available namenode helps Hadoop cluster to be always/highly available.
  • However I  have seen some cases where secondary namenode or HA namenode is running on a datanode which could impact the cluster performance.
  • Keeping Secondary Namenode or High Available Namenode separate from Datanode/JobTracker helps dedicated resources available for tasks assigned to the tasktracker.

 

Uploading file(s) to Amazon S3 using python + boto & s3afe.py script

Tools Used: python, boto, s3afe.py

Step 1: Be sure to have python first and then make sure you can Install boto module in python as well

  • $sudo easy_install pip
  • $ sudo pip install boto

 

Step 2: Download s3afe.py from the link below:

 

Step 3: Setup execution mode (chmod) to s3afe.py and verify that it executes on your machine:

AvkashMBP2:~ userx$ /usr/local/bin/s3afe.py
Usage: s3afe.py -k [access key id] -s [secret access key] -b [bucketname] -n [keyname] -f [file]

S3afe stores a file on Amazon S3

Options:
–version show program’s version number and exit
-h, –help show this help message and exit
-k AWSKEY, –aws_access_key_id=AWSKEY
-s AWSSECRET, –aws_secret_access_key=AWSSECRET
-f FILENAME, –filename=FILENAME
-b BUCKETNAME, –bucketname=BUCKETNAME
-n KEYNAME, –keyname=KEYNAME
-a ACL, –acl=ACL

 

Step 4: To upload a file from your local machine to Amazon S3 you will use the following parameters:

  -k AWSKEY, --aws_access_key_id=AWSKEY
  -s AWSSECRET, --aws_secret_access_key=AWSSECRET
  -f FILENAME, --filename=FILENAME  <-- Source File Name
  -b BUCKETNAME, --bucketname=BUCKETNAME
  -n KEYNAME, --keyname=KEYNAME  <-- New File Name after the upload
  -a ACL, --acl=ACL <-- Use 'private' here

 

Real Usage Example (From my mac):

$/usr/local/bin/s3afe.py  -f ‘/Users/userx/Downloads/file1.tgz’ -n ‘file.tgz’ -b ‘AMAZON_BUCKET_NAME’ -a ‘private’ -k ‘AMAZON_S3_ACCESS_KEY’ -s ‘AMAZON_S3_SECRET_KEY’ 

Note: After file upload is completed, you may get an error as below which is benign because the file does upload and I am not sure why I get this error.

Error: Please check your Amazon credentials!

S3afe.py Documentation: https://code.google.com/p/s3afe/