MapReduce in Cloud

When someone is looking at cloud to find MapReduce to process your large amount of data, I think this is what you are looking for:

  1. A collection of machines which are Hadoop/MapReduce ready and instant available
  2. You just don’t want to build Hadoop(HDFS/MapReduce) instances from scratch because there are several IaaS service available give you hundreds of machines in cloud however building a Hadoop cluster will be nightmare.
  3. It means you just need to hook your data and push MapReduce jobs immediately
  4. Being in cloud, means you just want to harvest the power of thousands of machines available in cloud “instantly” and want to pay the cost of CPU usage per hour you will consume.

Here are a few options which are available now, which I tried before writing here:

Apache Hadoop on Windows Azure:
Microsoft also has Hadoop/MapReduce running on Windows Azure but it is under limited CTP, however you can provide your information and request for CTP access at link below:

The Developer Preview for the Apache Hadoop- based Services for Windows Azure is available by invitation.

Amazon: Elastic Map Reduce
Amazon Elastic MapReduce (Amazon EMR) is a web service that enables businesses, researchers, data analysts, and developers to easily and cost-effectively process vast amounts of data. It utilizes a hosted Hadoop framework running on the web-scale infrastructure of Amazon Elastic Compute Cloud (Amazon EC2) and Amazon Simple Storage Service (Amazon S3).

Google Big Query:
Besides that you can also try Google BigQuery in which you will have to move your data to Google propitiatory Storage first and then run BigQuery on it. Remember BigQuery is based on Dremel which is similar to MapReduce however faster due to column based search processing.
Google BigQuery is invitation only however you sure can request for access:

Mortar Data:
There is another option is to use Mortar Data, as they have used python and pig, intelligently to write jobs easily and visualize the results. I found it very interesting, please have a look:!/how_it_works


Resource Allocation Model in MapReduce 2.0

What was available in previous MapReduce:

  • Each node in the cluster was statically assigned the capability of running a predefined number of Map slots and a predefined number of Reduce slots.
  • The slots could not be shared between Maps and Reduces. This static allocation of slots wasn’t optimal since slot requirements vary during the MR job life cycle
  • In general there is a demand for Map slots when the job starts, as opposed to the need for Reduce slots towards the end

Key drawback in previous MapReduce:

  • In a real cluster, where jobs are randomly submitted and each has its own Map/Reduce slots requirement, having an optimal utilization of the cluster was hard, if not impossible.

What is new in MapReduce 2.0:

  • The resource allocation model in Hadoop 0.23 addresses above (Key drawback) deficiency by providing a more flexible resource modeling.
  • Resources are requested in the form of containers, where each container has a number of non-static attributes.
  • At the time of writing this blog, the only supported attribute was memory (RAM). However, the model is generic and there is intention to add more attributes in future releases (e.g. CPU and network bandwidth).
  • In this new Resource Management model, only a minimum and a maximum for each attribute are defined, and Application Master (AMs) can request containers with attribute values as multiples of these minimums.


A list of error message while processing PIG commands with Hadoop MapReduce

A list of possible error messages while processing PIG Command with Hadoop MapReduce is listed below. This list is not comprehensive and will be modified to reflect the true error message along with the error code:

– ||”’Error Code”’ ||”’Error Message”’ ||”’How to Handle”’ ||
– ||1000 ||Error during parsing ||
– ||1001 ||Unable to descirbe schema for alias <alias> ||
– ||1002 ||Unable to store alias <id> ||
– ||1003 ||Unable to find an operator for alias <alias> ||
– ||1004 ||No alias <alias> to <operation> ||
– ||1005 ||No plan for <alias> to <operation> ||
– ||1006 ||Could not find operator in plan ||
– ||1007 ||Found duplicates in schema. <list of duplicate column names> . Please alias the columns with unique names. ||
– ||1008 ||Expected a bag with a single element of type tuple but got a bag schema with multiple elements ||
– ||1009 ||Expected a bag with a single element of type tuple but got an element of type <type> ||
– ||1010 ||getAtomicGroupByType is used only when dealing with atomic <group/join> col ||
– ||1011 ||getTupleGroupBySchema is used only when dealing with <tuple/join> group col ||
– ||1012 ||Each <COGroup/join> input has to have the same number of inner plans ||
– ||1013 ||attributes can either be star (*) or a list of expressions, but not both. ||
– ||1014 ||Problem with input <operator> of User-defined function: <function> ||
– ||1015 ||Error determining fieldschema of constant: <constant> ||
– ||1016 ||Problems in merging user defined schema ||
– ||1017 ||Schema mismatch. A basic type on flattening cannot have more than one column. User defined schema: <schema> ||
– ||1018 ||Problem determining schema during load ||
– ||1019 ||Unable to merge schemas ||
– ||1020 ||Only a BAG or TUPLE can have schemas. Got <type> ||
– ||1021 ||Type mismatch. No useful type for merging. Field Schema: <field schema>. Other Fileld Schema: + otherFs ||
– ||1022 ||Type mismatch. Field Schema: <field schema>. Other Fileld Schema: + otherFs ||
– ||1023 ||Unable to create field schema ||
– ||1024 ||Found duplicate aliases: <alias> ||
– ||1025 ||Found more than one match: <list of aliases> ||
– ||1026 ||Attempt to fetch field: <field> from schema of size <size> ||
– ||1027 ||Cannot reconcile schemas with different sizes. This schema has size <size> other has size of <size> ||
– ||1028 ||Access to the tuple <alias> of the bag is disallowed. Only access to the elements of the tuple in the bag is allowed. ||
– ||1029 ||One of the schemas is null for merging schemas. Schema: <schema> Other schema: <schema> ||
– ||1030 ||Different schema sizes for merging schemas. Schema size: <size> Other schema size: <size> ||
– ||1031 ||Incompatible types for merging schemas. Field schema type: <type> Other field schema type: <type> ||
– ||1032 ||Incompatible inner schemas for merging schemas. Field schema: <schema> Other field schema: <schema> ||
– ||1033 ||Schema size mismatch for merging schemas. Other schema size greater than schema size. Schema: <schema>. Other schema: <schema> ||
– ||1034 ||TypeCastInserter invoked with an invalid operator class name: <operator class name> ||
– ||1035 ||Error getting LOProject’s input schema ||
– ||1036 ||Map key should be a basic type ||
– ||1037 ||Operand of Regex can be CharArray only ||
– ||1038 ||Operands of AND/OR can be boolean only ||
– ||1039 ||Incompatible types in <Addition/Subtraction/Division/Multiplication/Mod/GreaterThan/LesserThan/> operator. left hand side: <type> right hand size: type ||
– ||1040 ||Could not set <Add/Subtract/Multiply/Divide/Mod/UserFunc/BinCond> field schema ||
– ||1041 ||NEG can be used with numbers or Bytearray only ||
– ||1042 ||NOT can be used with boolean only ||
– ||1043 ||Unable to retrieve field schema of operator. ||
– ||1044 ||Unable to get list of overloaded methods. ||
– ||1045 ||Could not infer the matching function for <func spec> as multiple or none of them fit. Please use an explicit cast. ||
– ||1046 ||Multiple matching functions for <funcspec> with input schemas: ( <schema> , <schema>). Please use an explicit cast. ||
– ||1047 ||Condition in BinCond must be boolean ||
– ||1048 ||Two inputs of BinCond must have compatible schemas ||
– ||1049 ||Problem during evaluaton of BinCond output type ||
– ||1050 ||Unsupported input type for BinCond: lhs = <type>; rhs = <type> ||
– ||1051 ||Cannot cast to bytearray ||
– ||1052 ||Cannot cast <type> [with schema <schema>] to <type> with schema <schema> ||
– ||1053 ||Cannot resolve load function to use for casting from <type> to <type> ||
– ||1054 ||Cannot merge schemas from inputs of UNION ||
– ||1055 ||Problem while reading schemas from inputs of <Union/SplitOutput/Distinct/Limit/Cross> ||
– ||1056 ||Problem while casting inputs of Union ||
– ||1057 ||’s inner plan can only have one output (leaf) ||
– ||1058 ||Split’s condition must evaluate to boolean. Found: <type> ||
– ||1059 ||Problem while reconciling output schema of <Sort/Filter/Split> ||
– ||1060 ||Cannot resolve <COGroup/Foreach/Fragment Replicate Join> output schema ||
– ||1061 ||Sorry, group by complex types will be supported soon ||
– ||1062 ||COGroup by incompatible types ||
– ||1063 ||Problem while reading field schema from input while inserting cast ||
– ||1064 ||Problem reading column <col> from schema: <schema> ||
– ||1065 ||Found more than one load function to use: <list of load functions> ||
– ||1066 ||Unable to open iterator for alias <alias> ||
– ||1067 ||Unable to explain alias <alias> ||
– ||1068 ||Using <Map/Bag> as key not supported. ||
– ||1069 ||Problem resolving class version numbers for class <class> ||
– ||1070 ||Could not resolve <class> using imports: <package import list> ||
– ||1071 ||Cannot convert a <type> to <a/an> <type> ||
– ||1072 ||Out of bounds access: Request for field number <number> exceeds tuple size of <size> ||
– ||1073 ||Cannot determine field schema for <object> ||
– ||1074 ||Problem with formatting. Could not convert <object> to <Integer/Long/Float/Double>. ||
– ||1075 ||Received a bytearray from the UDF. Cannot determine how to convert the bytearray to <int/float/long/double/string/tuple/bag/map> ||
– ||1076 ||Problem while reading field schema of cast operator. ||
– ||1077 ||Two operators that require a cast in between are not adjacent. ||
– ||1078 ||Schema size mismatch for casting. Input schema size: <size>. Target schema size: <size> ||
– ||1079 ||Undefined type checking logic for unary operator: ” <operator> ||
– ||1080 ||Did not find inputs for operator: ” <operator> ||
– ||1081 ||Cannot cast to <int/float/long/double/string/tuple/bag/map>. Expected bytearray but received: <type> ||
– ||1082 ||Cogroups with more than 127 inputs not supported. ||
– ||1083 ||setBatchOn() must be called first. ||
– ||1084 ||Invalid Query: Query is null or of size 0. ||
– ||1085 ||operator in <pushBefore/pushAfter> is null. Cannot <pushBefore/pushAfter> null operators. ||
– ||1086 ||First operator in <pushBefore/pushAfter> should have multiple <inputs/outputs>. Found first operator with <size> <inputs/outputs>. ||
– ||1087 ||The <inputNum/outputNum> <num> should be lesser than the number of <inputs/outputs> of the first operator. Found first operator with <size> <inputs/outputs>. ||
– ||1088 ||operator in <pushBefore/pushAfter> should have <at least> one <output/input>. Found <first/second> operator with <no/<size> > <outputs/inputs>. ||
– ||1089 ||Second operator in <pushBefore/pushAfter> should be the <successor/predecessor> of the First operator. ||
– ||1090 ||Second operator can have at most one <incoming/outgoing> edge from First operator. Found <num> edges. ||
– ||1091 ||First operator does not support multiple <outputs/inputs>. On completing the <pushBefore/pushAfter> operation First operator will end up with <num> edges ||
– ||1092 ||operator in swap is null. Cannot swap null operators. ||
– ||1093 ||Swap supports swap of operators with at most one <input/output>. Found <first/second> operator with <size> <inputs/outputs> ||
– ||1094 ||Attempt to insert between two nodes that were not connected. ||
– ||1095 ||Attempt to remove and reconnect for node with multiple <predecessors/successors>. ||
– ||1096 ||Attempt to remove and reconnect for node with <<size>/no> <predecessors/successors>. ||
– ||1097 ||Containing node cannot be null. ||
– ||1098 ||Node index cannot be negative. ||
– ||1099 ||Node to be replaced cannot be null. ||
– ||1100 ||Replacement node cannot be null. ||
– ||1101 ||Merge Join must have exactly two inputs. Found : + <size> + inputs ||
– ||1102 ||Data is not sorted on <left/right> side. Last two keys encountered were: <previous key>, <current key> ||
– ||1103 ||Merge join only supports Filter, Foreach and Load as its predecessor. Found : <operator> ||
– ||1104 ||Right input of merge-join must implement SamplableLoader interface. This loader doesn’t implement it. ||
– ||1105 ||Heap percentage / Conversion factor cannot be set to 0 ||
– ||1106 ||Merge join is possible only for simple column or ‘*’ join keys when using <funcspec> as the loader ||
– ||1107 ||Try to merge incompatible types (eg. numerical type vs non-numeircal type) ||
– ||1108 ||Duplicated schema ||
– ||1109 ||Input ( <input alias> ) on which outer join is desired should have a valid schema ||
– ||1110 ||Unsupported query: You have an partition column (<colname>) inside a <regexp operator/function/cast/null check operator/bincond operator> in the filter condition. ||
– ||1111 ||Use of partition column/condition with non partition column/condition in filter expression is not supported. ||
– ||1112 ||Unsupported query: You have an partition column (<column name>) in a construction like: (pcond and …) or (pcond and …) where pcond is a condition on a partition column. ||
– ||1113 ||Unable to describe schema for nested expression <alias> ||
– ||1114 ||Unable to find schema for nested alias <nested alias> ||
– ||1115 ||Place holder for Howl related errors ||
– ||1116 ||Duplicate udf script (in scripting language) ||
– ||1117 ||Cannot merge schema ||
– ||1118 ||Cannot convert bytes load from BinStorage ||
– ||1119 ||Cannot find LoadCaster class ||
– ||1120 ||Cannot cast complex data ||
– ||1121 ||Python error ||
– ||1122||The arity of cogroup/group by columns do not match||
– ||1123||Cogroup/Group by * is only allowed if the input has a schema||
– ||1124||Mismatch merging expression field schema .. with user specified schema ..||
– ||1125||Error determining field schema from object in constant expression”||
– ||1126||Schema having field with null alias cannot be merged using alias.||
– ||1127||Dereference index out of range in schema.||
– ||1128||Cannot find field dereference field in schema.||
– ||1129|| Referring to column(s) within a column of type .. is not allowed ||
– ||1130|| Datatype of i’th group/join column in j’th relation of statement is incompatible with corresponding column in other relations in the statement ||

– ||2000 ||Internal error. Mismatch in group by arities. Expected: <schema>. Found: <schema> ||
– ||2001 ||Unable to clone plan before compiling ||
– ||2002 ||The output file(s): <filename> already exists ||
– ||2003 ||Cannot read from the storage where the output <filename> will be stored ||
– ||2004 ||Internal error while trying to check if type casts are needed ||
– ||2005 ||Expected <class>, got <class> ||
– ||2006 ||TypeCastInserter invoked with an invalid operator class name: <class> ||
– ||2007 ||Unable to insert type casts into plan ||
– ||2008 ||cannot have more than one input. Found <n> inputs. ||
– ||2009 ||Can not move LOLimit up ||
– ||2010 ||LOFilter should have one input ||
– ||2011 ||Can not insert LOLimit clone ||
– ||2012 ||Can not remove LOLimit after <class> ||
– ||2013 ||Moving LOLimit in front of <class> is not implemented ||
– ||2014 ||Unable to optimize load-stream-store optimization ||
– ||2015 ||Invalid physical operators in the physical plan ||
– ||2016 ||Unable to obtain a temporary path. ||
– ||2017 ||Internal error creating job configuration. ||
– ||2018 ||Internal error. Unable to introduce the combiner for optimization. ||
– ||2019 ||Expected to find plan with single leaf. Found <n> leaves. ||
– ||2020 ||Expected to find plan with UDF leaf. Found <class> ||
– ||2021 ||Internal error. Unexpected operator project(*) in local rearrange inner plan. ||
– ||2022 ||Both map and reduce phases have been done. This is unexpected while compiling. ||
– ||2023 ||Received a multi input plan when expecting only a single input one. ||
– ||2024 ||Expected reduce to have single leaf. Found <n> leaves. ||
– ||2025 ||Expected leaf of reduce plan to always be POStore. Found <class> ||
– ||2026 ||No expression plan found in POSort. ||
– ||2027 ||Both map and reduce phases have been done. This is unexpected for a merge. ||
– ||2028 ||ForEach can only have one successor. Found <n> successors. ||
– ||2029 ||Error rewriting POJoinPackage. ||
– ||2030 ||Expected reduce plan leaf to have a single predecessor. Found <n> predecessors. ||
– ||2031 ||Found map reduce operator with POLocalRearrange as last oper but with no succesor. ||
– ||2032 ||Expected map reduce operator to have a single successor. Found <n> successors. ||
– ||2033 ||Problems in rearranging map reduce operators in plan. ||
– ||2034 ||Error compiling operator <class> ||
– ||2035 ||Internal error. Could not compute key type of sort operator. ||
– ||2036 ||Unhandled key type <type> ||
– ||2037 ||Invalid ship specification. File doesn’t exist: <file> ||
– ||2038 ||Unable to rename <oldName> to <newName> ||
– ||2039 ||Unable to copy <src> to <dst> ||
– ||2040 ||Unknown exec type: <type> ||
– ||2041 ||No Plan to compile ||
– ||2042 ||Internal error. Unable to translate logical plan to physical plan. ||
– ||2043 ||Unexpected error during execution. ||
– ||2044 ||The type <type> cannot be collected as a Key type ||
– ||2045 ||Internal error. Not able to check if the leaf node is a store operator. ||
– ||2046 ||Unable to create FileInputHandler. ||
– ||2047 ||Internal error. Unable to introduce split operators. ||
– ||2048 ||Error while performing checks to introduce split operators. ||
– ||2049 ||Error while performing checks to optimize limit operator. ||
– ||2050 ||Internal error. Unable to optimize limit operator. ||
– ||2051 ||Did not find a predecessor for <Distinct/Filter/Limit/Negative/Null/Sort/Split/Split Output/Store/Stream>. ||
– ||2052 ||Internal error. Cannot retrieve operator from null or empty list. ||
– ||2053 ||Internal error. Did not find roots in the physical plan. ||
– ||2054 ||Internal error. Could not convert <object> to <Integer/Long/Float/Double/Tuple/Bag/Map> ||
– ||2055 ||Did not find exception name to create exception from string: <string> ||
– ||2056 ||Cannot create exception from empty string. ||Pig could not find an exception in the error messages from Hadoop, examine the [[#clientSideLog|client log]] to find more information. ||
– ||2057 ||Did not find fully qualified method name to reconstruct stack trace: <line> ||
– ||2058 ||Unable to set index on the newly created POLocalRearrange. ||
– ||2059 ||Problem with inserting cast operator for <regular expression/binary conditional/unary operator/user defined function/fragment replicate join/cogroup/project/<operator>> in plan. ||
– ||2060 ||Expected one leaf. Found <n> leaves. ||
– ||2061 ||Expected single group by element but found multiple elements. ||
– ||2062 ||Each COGroup input has to have the same number of inner plans.” ||
– ||2063 ||Expected multiple group by element but found single element. ||
– ||2064 ||Unsupported root type in LOForEach: <operator> ||
– ||2065 ||Did not find roots of the inner plan. ||
– ||2066 ||Unsupported (root) operator in inner plan: <operator> ||
– ||2067 ||does not know how to handle type: <type> ||
– ||2068 ||Internal error. Improper use of method getColumn() in POProject ||
– ||2069 ||Error during map reduce compilation. Problem in accessing column from project operator. ||
– ||2070 ||Problem in accessing column from project operator. ||
– ||2071 ||Problem with setting up local rearrange’s plans. ||
– ||2072 ||Attempt to run a non-algebraic function as an algebraic function ||
– ||2073 ||Problem with replacing distinct operator with distinct built-in function. ||
– ||2074 ||Could not configure distinct’s algebraic functions in map reduce plan. ||
– ||2075 ||Could not set algebraic function type. ||
– ||2076 ||Unexpected Project-Distinct pair while trying to set up plans for use with combiner. ||
– ||2077 ||Problem with reconfiguring plan to add distinct built-in function. ||
– ||2078 ||Caught error from UDF: <class> [<message from UDF>] ||
– ||2079 ||Unexpected error while printing physical plan. ||
– ||2080 ||Foreach currently does not handle type <type> ||
– ||2081 ||Unable to setup the <load/store> function. ||
– ||2082 ||Did not expect result of type: <type> ||
– ||2083 ||Error while trying to get next result in POStream. ||
– ||2084 ||Error while running streaming binary. ||
– ||2085 ||Unexpected problem during optimization. Could not find LocalRearrange in combine plan. ||
– ||2086 ||Unexpected problem during optimization. Could not find all LocalRearrange operators. ||
– ||2087 ||Unexpected problem during optimization. Found index: <index> in multiple LocalRearrange operators. ||
– ||2088 ||Unable to get results for: <file specification> ||
– ||2089 ||Unable to flag project operator to use single tuple bag. ||
– ||2090 ||Received Error while processing the <combine/reduce> plan. ||
– ||2091 ||Packaging error while processing group. ||
– ||2092 ||No input paths specified in job. ||
– ||2093 ||Encountered error in package operator while processing group. ||
– ||2094 ||Unable to deserialize object ||
– ||2095 ||Did not get reduce key type from job configuration. ||
– ||2096 ||Unexpected class in SortPartitioner: <class name> ||
– ||2097 ||Failed to copy from: <src> to: <dst> ||
– ||2098 ||Invalid seek option: <options> ||
– ||2099 ||Problem in constructing slices. ||
– ||2100 ||does not exist. ||
– ||2101 ||should not be used for storing. ||
– ||2102 ||”Cannot test a <type> for emptiness. ||
– ||2103 ||Problem while computing <max/min/sum> of <doubles/floats/ints/longs/strings>. ||
– ||2104 ||Error while determining schema of <BinStorage data/input>. ||
– ||2105 ||Error while converting <int/long/float/double/chararray/tuple/bag/map> to bytes ||
– ||2106 ||Error while computing <arity/count/concat/min/max/sum/size> in <class name> ||
– ||2107 ||DIFF expected two inputs but received <n> inputs. ||
– ||2108 ||Could not determine data type of field: <object> ||
– ||2109 ||TextLoader does not support conversion <from/to> <Bag/Tuple/Map/Integer/Long/Float/Double>. ||
– ||2110 ||Unable to deserialize optimizer rules. ||
– ||2111 ||Unable to create temporary directory: <path> ||
– ||2112 ||Unexpected data while reading tuple from binary file. ||
– ||2113 ||SingleTupleBag should never be serialized or serialized. ||
– ||2114 ||Expected input to be chararray, but got <class name> ||
– ||2115 ||Internal error. Expected to throw exception from the backend. Did not find any exception to throw. ||
– ||2116 ||Unexpected error. Could not check for the existence of the file(s): <filename> ||
– ||2117 ||Unexpected error when launching map reduce job. ||
– ||2118 ||Unable to create input slice for: <filename> ||
– ||2119 ||Internal Error: Found multiple data types for map key ||
– ||2120 ||Internal Error: Unable to determine data type for map key ||
– ||2121 ||Error while calling finish method on UDFs. ||
– ||2122 ||Sum of probabilities should be one ||
– ||2123 ||Internal Error: Unable to discover required fields from the loads ||
– ||2124 ||Internal Error: Unexpected error creating field schema ||
– ||2125 ||Expected at most one predecessor of load ||
– ||2126 ||Predecessor of load should be store ||
– ||2127 ||Cloning of plan failed. ||
– ||2128 ||Failed to connect store with dependent load. ||
– ||2129 ||Internal Error. Unable to add store to the split plan for optimization. ||
– ||2130 ||Internal Error. Unable to merge split plans for optimization. ||
– ||2131 ||Internal Error. Unable to connect split plan for optimization. ||
– ||2132 ||Internal Error. Unable to replace store with split operator for optimization. ||
– ||2133 ||Internal Error. Unable to connect map plan with successors for optimization. ||
– ||2134 ||Internal Error. Unable to connect map plan with predecessors for optimization. ||
– ||2135 ||Received error from store function. ||
– ||2136 ||Internal Error. Unable to set multi-query index for optimization. ||
– ||2137 ||Internal Error. Unable to add demux to the plan as leaf for optimization. ||
– ||2138 ||Internal Error. Unable to connect package to local rearrange operator in pass-through combiner for optimization. ||
– ||2139 ||Invalid value type: <type>. Expected value type is DataBag. ||
– ||2140 ||Invalid package index: <index>. Should be in the range between 0 and <package array size>. ||
– ||2141 ||Internal Error. Cannot merge non-combiner with combiners for optimization. ||
– ||2142 ||ReadOnceBag should never be serialized. ||
– ||2143 ||Expected index value within POPackageLite is 0, but found ‘index’. ||
– ||2144 ||Problem while fixing project inputs during rewiring. ||
– ||2145 ||Problem while rebuilding schemas after transformation. ||
– ||2146 ||Internal Error. Inconsistency in key index found during optimization. ||
– ||2147 ||Error cloning POLocalRearrange for limit after sort. ||
– ||2148 ||Error cloning POPackageLite for limit after sort ||
– ||2149 ||Internal error while trying to check if filters can be pushed up. ||
– ||2150 ||Internal error. The push before input is not set. ||
– ||2151 ||Internal error while pushing filters up. ||
– ||2152 ||Internal error while trying to check if foreach with flatten can be pushed down. ||
– ||2153 ||Internal error. The mapping for the flattened columns is empty ||
– ||2154 ||Internal error. Schema of successor cannot be null for pushing down foreach with flatten. ||
– ||2155 ||Internal error while pushing foreach with flatten down. ||
– ||2156 ||Error while fixing projections. Projection map of node to be replaced is null. ||
– ||2157 ||Error while fixing projections. No mapping available in old predecessor to replace column. ||
– ||2158 ||Error during fixing projections. No mapping available in old predecessor for column to be replaced. ||
– ||2159 ||Error during fixing projections. Could not locate replacement column from the old predecessor. ||
– ||2160 ||Error during fixing projections. Projection map of new predecessor is null. ||
– ||2161 ||Error during fixing projections. No mapping available in new predecessor to replace column. ||
– ||2162 ||Error during fixing projections. Could not locate mapping for column <column> in new predecessor. ||
– ||2163 ||Error during fixing projections. Could not locate replacement column for column: <column> in the new predecessor. ||
– ||2164 ||Expected EOP as return status. Found: <returnStatus> ||
– ||2165 ||Problem in index construction. ||
– ||2166 ||Key type mismatch. Found key of type <type> on left side. But, found key of type <type> in index built for right side. ||
– ||2167 ||LocalRearrange used to extract keys from tuple isn’t configured correctly. ||
– ||2168 ||Expected physical plan with exactly one root and one leaf. ||
– ||2169 ||Physical operator preceding <right/left> predicate not found in compiled MR jobs. ||
– ||2170 ||Physical operator preceding both left and right predicate found to be same. This is not expected. ||
– ||2171 ||Expected one but found more then one root physical operator in physical plan. ||
– ||2172 ||Expected physical operator at root to be POLoad. Found : <PhysicalOperator> ||
– ||2173 ||One of the preceding compiled MR operator is null. This is not expected. ||
– ||2174 ||Internal exception. Could not create the sampler job. ||
– ||2175 ||Internal error. Could not retrieve file size for the sampler. ||
– ||2176 ||Error processing right input during merge join ||
– ||2177 ||Prune column optimization: Cannot retrieve operator from null or empty list ||
– ||2178 ||Prune column optimization: The matching node from the optimizor framework is null ||
– ||2179 ||Prune column optimization: Error while performing checks to prune columns. ||
– ||2180 ||Prune column optimization: Only LOForEach and LOSplit are expected ||
– ||2181 ||Prune column optimization: Unable to prune columns. ||
– ||2182 ||Prune column optimization: Only relational operator can be used in column prune optimization. ||
– ||2183 ||Prune column optimization: LOLoad must be the root logical operator. ||
– ||2184 ||Prune column optimization: Fields list inside RequiredFields is null. ||
– ||2185 ||Prune column optimization: Unable to prune columns. ||
– ||2186 ||Prune column optimization: Cannot locate node from successor ||
– ||2187 ||Column pruner: Cannot get predessors ||
– ||2188 ||Column pruner: Cannot prune columns ||
– ||2189 ||Column pruner: Expect schema ||
– ||2190 ||PruneColumns: Cannot find predecessors for logical operator ||
– ||2191 ||PruneColumns: No input to prune ||
– ||2192 ||PruneColumns: Column to prune does not exist ||
– ||2193 ||PruneColumns: Foreach can only have 1 predecessor ||
– ||2194 ||PruneColumns: Expect schema ||
– ||2195 ||PruneColumns: Fail to visit foreach inner plan ||
– ||2196 ||RelationalOperator: Exception when traversing inner plan ||
– ||2197 ||RelationalOperator: Cannot drop column which require * ||
– ||2198 ||LOLoad: load only take 1 input ||
– ||2199 ||LOLoad: schema mismatch ||
– ||2200 ||PruneColumns: Error getting top level project ||
– ||2201 ||Could not validate schema alias ||
– ||2202 ||Error change distinct/sort to use secondary key optimizer ||
– ||2203 ||Sort on columns from different inputs ||
– ||2204 ||Error setting secondary key plan ||
– ||2205 ||Error visiting POForEach inner plan ||
– ||2206 ||Error visiting POSort inner plan ||
– ||2207 ||POForEach inner plan has more than 1 root ||
– ||2208 ||Exception visiting foreach inner plan ||
– ||2209 ||Internal error while processing any partition filter conditions in the filter after the load ||
– ||2210 ||Internal Error in logical optimizer. ||
– ||2211 ||Column pruner: Unable to prune columns. ||
– ||2212 ||Unable to prune plan. ||
– ||2213 ||Error visiting inner plan for ForEach. ||
– ||2214 ||Cannot find POLocalRearrange to set secondary plan. ||
– ||2215 ||See more than 1 successors in the nested plan. ||
– ||2216 ||Cannot get field schema ||
– ||2217 ||Problem setFieldSchema ||
– ||2218 ||Invalid resource schema: bag schema must have tuple as its field ||
– ||2219 ||Attempt to disconnect operators which are not connected ||
– ||2220 ||Plan in inconssistent state, connected in fromEdges but not toEdges ||
– ||2221 ||No more walkers to pop ||
– ||2222 ||Expected LogicalExpressionVisitor to visit expression node ||
– ||2223 ||Expected LogicalPlanVisitor to visit relational node ||
– ||2224 ||Found LogicalExpressionPlan with more than one root ||
– ||2225 ||Projection with nothing to reference ||
– ||2226 ||Cannot fine reference for ProjectExpression ||
– ||2227 ||LogicalExpressionVisitor expects to visit expression plans ||
– ||2228 ||Could not find a related project Expression for Dereference ||
– ||2229 ||Couldn’t find matching uid for project expression ||
– ||2230 ||Cannot get column from project ||
– ||2231 ||Unable to set index on newly create POLocalRearrange ||
– ||2232 ||Cannot get schema ||
– ||2233 ||Cannot get predecessor ||
– ||2234 ||Cannot get group key schema ||
– ||2235 ||Expected an ArrayList of Expression Plans ||
– ||2236 ||User defined load function should implement the LoadFunc interface ||
– ||2237 ||Unsupported operator in inner plan ||
– ||2238 ||Expected list of expression plans ||
– ||2239 ||Structure of schema change ||
– ||2240 ||LogicalPlanVisitor can only visit logical plan ||
– ||2241 ||UID is not found in the schema ||
– ||2242 ||TypeCastInserter invoked with an invalid operator ||
– ||2243 ||Attempt to remove operator that is still connected to other operators ||
– ||2244 ||Hadoop does not return any error message ||
– ||2245 ||Cannot get schema from loadFunc ||
– ||2246 ||Error merging schema ||
– ||2247 ||Cannot determine skewed join schema ||
– ||2248 ||twoLevelAccessRequired==true is not supported with” +”and isSubNameMatch==true. ||
– ||2249 ||While using ‘collected’ on group; data must be loaded via loader implementing CollectableLoadFunc. ||
– ||2250 ||Blocking operators are not allowed before Collected Group. Consider dropping using ‘collected’. ||
– ||2251 ||Merge Cogroup work on two or more relations. To use map-side group-by on single relation, use ‘collected’ qualifier. ||
– ||2252 ||Base loader in Cogroup must implement CollectableLoadFunc. ||
– ||2253 ||Side loaders in cogroup must implement IndexableLoadFunc. ||
– ||2254 ||Currently merged cogroup is not supported after blocking operators. ||
– ||2255 ||POSkewedJoin operator has ” + compiledInputs.length + ” inputs. It should have 2. ||
– ||2256 ||Cannot remove and reconnect node with multiple inputs/outputs ||
– ||2257 ||An unexpected exception caused the validation to stop ||
– ||2258 ||Bug:Two different load functions mapped to an LOCast op ||
– ||2259 ||Cannot instantiate class ||
– ||2260||in split only one of input/output schema is null||
– ||2261||input and output schema size of split differ||
– ||2262||uid mapped to two different load functions ||
– ||2263||expected only one predecessor||
– ||2264||more than one project star as leaf in plan||
– ||2265||Schema not expected for project-star||
– ||2266||Expected single LOGenerate output in innerplan of foreach||
– ||2267||reset on schema at pos greater than schema size||
– ||2268||More than one input found for scalar expression||
– ||2269||No input found for scalar expression||

– ||2997 ||Encountered IOException. ||
– ||2998 ||Unexpected internal error. ||
– ||2999 ||Unhandled internal error. ||
– ||3000 ||IOException caught while compiling POMergeJoin ||
– ||4000 ||The output file(s): <filename> already exists ||
– ||4001 ||Cannot read from the storage where the output <filename> will be stored ||
– ||4002 ||Can’t read jar file: <name> ||
– ||4003 ||Unable to obtain a temporary path. ||
– ||4004 ||Invalid ship specification. File doesn’t exist: <file> ||
– ||4005 ||Unable to rename <oldName> to <newName> ||
– ||4006 ||Unable to copy <src> to <dst> ||
– ||4007 ||Missing <parameter> from hadoop configuration ||
– ||4008 ||Failed to create local hadoop file <file> ||
– ||4009 ||Failed to copy data to local hadoop file <file> ||
– ||6000 ||The output file(s): <filename> already exists ||
– ||6001 ||Cannot read from the storage where the output <filename> will be stored ||
– ||6002 ||Unable to obtain a temporary path. ||
– ||6003 ||Invalid cache specification. File doesn’t exist: <file> ||
– ||6004 ||Invalid ship specification. File doesn’t exist: <file> ||
– ||6005 ||Unable to rename <oldName> to <newName> ||
– ||6006 ||Unable to copy <src> to <dst> ||
– ||6007 ||Unable to check name <name> ||
– ||6008 ||Failed to obtain glob for <pattern> ||
– ||6009 ||Failed to create job client ||
– ||6010 ||Could not connect to HOD ||
– ||6011 ||Failed to run command <command> on server <server>; return code: <code>; error: <error message> ||
– ||6012 ||Unable to run command: <command> on server <server> ||
– ||6013 ||Unable to chmod <executable> . Thread interrupted. ||
– ||6014 ||Failed to save secondary output ‘<fileName>’ of task: <taskId> ||
– ||6015 ||During execution, encountered a Hadoop error. ||
– ||6016 ||Out of memory. ||
– ||6017 ||Execution failed, while processing ‘<fileNames’> ||
– ||6018 ||Error while reading input ||

Processing already sorted data with Hadoop Map/Reduce jobs without performance overhead

While working with Map/Reduce jobs in Hadoop, it is very much possible that you have got “sorted data” stored in HDFS. As you may know the “Sort function” exists not only after map process in map task but also with merge process during reduce task, so having sorted data to sort again would be a big performance overhead. In this situation you may want to have your Map/Reduce job not to sort the data.


Note: If you have tried changing map.sort.class to no-op, it would haven’t work as well.


So the question comes:

  • if it is possible to force Map/Reduce to not to sort the data again (as it is already sorted) after map phase?
  • Or “how to run Map/Reduce jobs in a way that you can control how do you want to results, sorted or unsorted”?

So if you do not need result be sorted the following Hadoop patch would be great place to start:

Note: Before using above Patch the I would suggest reading the following comment from Robert about this patch:

  • Combiners are not compatible with Is there a reason why we could not make combiners work with this, so long as they must follow the same assumption that they will not get sorted input? If the algorithms you are thinking about would never get any benefit from a combiner, could you also add the check in the client. I would much rather have the client blow up with an error instead of waiting for my map tasks to launch and then blow up 4+ times before I get the error.
  • In your test you never validate that the output is what you expected it to be. That may be hard as it may not be deterministic because there is no sorting, but it would be nice to have something verify that the code did work as expected. Not just that it did not crash.
  • mapred-default.xml Please add to mapred-default.xml. Include with it a brief explanation of what it does.
  • There is no documentation or examples. This is a new feature that could be very useful to lots of people, but if they never know it is there it will not be used. Could you include in your patch updates to the documentation about how to use this, and some useful examples, preferably simple. Perhaps an example computing CTR would be nice.
  • Performance. The entire reason for this change is to improve performance, but I have not seen any numbers showing a performance improvement. No numbers at all in fact. It would be great if you could include here some numbers along with the code you used for your benchmark and a description of your setup. I have spent time on different performance teams, and performance improvement efforts from a huge search engine to an OS on a cell phone and the one thing I have learned is that you have to go off of the numbers because well at least for me my intuition is often wrong and what I thought would make it faster slowed it down instead.
  • Trunk. This patch is specific to 0.20/1.0 line. Before this can get merged into the 0.20/1.0 lines we really need an equivalent patch for trunk, and possibly 0.21, 0.22, and 0.23. This is so there are no regressions. It may be a while off after you get the 1.0 patch cleaned up though.

Keyword: Hadoop, Map/Reduce, Jobs Performance, Hadoop Patch

How to troubleshoot MapReduce jobs in Hadoop

When writing MapReduce programs you definitely going to hit problems in your programs such as infinite loops, crash in MapReduce, Incomplete jobs etc. Here are a few things which will help you to isolate these problems:

Map/Reduce Logs Files:

All MapReduce jobs activities are logged by default in Hadoop. By default, log files are stored in the logs/ subdirectory of the HADOOP_HOME main directory. Thee Log file format is based on HADOOP-username-service-hostname.log. The most recent data is in the .log file; older logs have their date appended to them.

Log File Format:


  • The username in the log filename refers to the username account in which Hadoop was started. In Windows the Hadoop service is started with different  user name however you can logon to the machine with different user name. So the user name is not necessarily the same username you are using to run programs.
  • The service name belong to several Hadoop programs are writing the logm such as below which are important for debugging a whole Hadoop installation:
    • Jobtracker
    • Namenode
    • Datanode
    • Secondarynamenode
    • tasktracker.

For Map/Reduce process, the tasktraker logs provide details about individual programs ran on datanote. Any exceptions thrown by your MapReduce program will be logged in tasktracker logs.

Subdirectory Userlogs:

Inside the HADOOP_HOMElogs folder you will also find a subdirectory name userlogs. In this directory you will find another subdirectory for every MapREduce task running in your Hadoop cluster. Each task records its stdout and stderr to two files in this subdirectory. If you are running a multi-node Hadoop cluster, then the logs you will find here are not centrally aggregated. To collect correct logs you would need to check and verify each TaskNode’s logs/userlogs/ directory for their output and then create the full log history to understand what went wrong.

Video Resources on Machine Learning from Big Data Workshop NIPS2011

Big Learning Workshop: Algorithms, Systems, and Tools for Learning at Scale at NIPS 2011

Invited Talk: Machine Learning and Hadoop by Josh Wills

Abstract: We’ll review common use cases for machine learning and advanced analytics found in our customer base at Cloudera and ways in which Apache Hadoop supports these use cases. We’ll then discuss upcoming developments for Apache Hadoop that will enable new classes of applications to be supported by the system.

Tutorial: Vowpal Wabbit by John Langford

Abstract: We present a system and a set of techniques for learning linear predictors with convex losses on terascale datasets, with trillions of features,footnote{The number of features here refers to the number of non-zero entries in the data matrix.} billions of training examples and millions of parameters in an hour using a cluster of 1000 machines. One of the core techniques used is a new communication infrastructure–often referred to as AllReduce–implemented for compatibility with MapReduce clusters. The communication infrastructure appears broadly reusable for many other tasks.

Tutorial: Group Sparse Hidden Markov Models

Sparse Representation and Low-rank Approximation Workshop at NIPS 2011

Invited Talk: Group Sparse Hidden Markov Models by Jen-Tzung Chien, National Cheng Kung University, Taiwan

Invited Talk: A Common GPU n-Dimensional Array for Python and C by Arnaud Bergeron

Abstract: Currently there are multiple incompatible array/matrix/n-dimensional base object implementations for GPUs. This hinders the sharing of GPU code and causes duplicate development work.This paper proposes and presents a first version of a common GPU n-dimensional array(tensor) named GpuNdArray~citep{GpuNdArray} that works with both CUDA and OpenCL.It will be usable from python, C and possibly other languages.

Invited Talk: A Topic Model for Melodic Sequences by Athina Spiliopoulou

Athina is a PhD student in the Machine Learning group of the Institute for Adaptive and Neural Computation at the School of Informatics,University of Edinburgh. She works with Amos Storkey on Machine Learning methods for music, with a specific interest in unsupervised learning of musical structure from melodic sequences.

Network related particulars with Apache Hadoop Cluster Performance

Network Characteristics:

The nodes in a Hadoop cluster are interconnected through the network. Typically, one or more of the following phases of MapReduce jobs transfers data over the network:

1. Writing data: This phase occurs when the initial data is either streamed or bulk-delivered to HDFS. Data blocks of the loaded files are replicated, transferring additional data over the network.

2. Workload execution: The MapReduce algorithm is run.

a. Map phase: In the map phase of the algorithm, almost no traffic is sent over the network. The network is used at the beginning of the map phase only if a HDFS locality miss occurs (the data block is not locally available and has to be requested from another data node).

b. Shuffle phase: This is the phase of workload execution in which traffic is sent over the network, the degree to which depends on the workload. Data is transferred over the network when the output of the mappers is shuffled to the reducers.

c. Reduce phase: In this phase, almost no traffic is sent over the network because the reducers have all the data they need from the shuffle phase.

d. Output replication: MapReduce output is stored as a file in HDFS. The network is used when the blocks of the result file have to be replicated by HDFS for redundancy.

3. Reading data: This phase occurs when the final data is read from HDFS for consumption by the end application, such as the website, indexing, or SQL database.

In addition, the network is crucial for the Hadoop control plane: the signaling and operations of HDFS and the MapReduce infrastructure.

Be sure to consider the benefits and costs of the choices available when designing a network: network architectures, network devices, resiliency, oversubscription ratios, etc. The following section discusses some of these parameters in more detail.

Impact of Network Characteristics on Job Completion Times

  • A functional and resilient network is a crucial part of a good Hadoop cluster.
  • However, an analysis of the relative importance of the factors shows that other factors in a cluster have a greater influence on the performance of the cluster than the network.
  • Nevertheless, you should consider some of the relevant network characteristics and their potential effects.

Network Latency

  • Variations in switch and router latency have been shown to have only limited impact on cluster performance.
  • From a network point of view, any latency-related optimization should start with a network wide analysis.
  • “Architecture first, and device next” is an effective strategy.
  • Architectures that deliver consistently lower latency at scale are better than architectures with higher overall latency but lower individual device latency.
  • The latency contribution to the workload is much higher at the application level, contributed by the application logic (Java Virtual Machine software stack, socket-buffer etc)than network latency.
  • In any case, slightly more or less network latency will not noticeably affect job completion times.

Data Node Network Speed

  • Data nodes must be provisioned with enough bandwidth for efficient job completion. Price-to-performance ratio entailed in adding more bandwidth to nodes.
  • Recommendations for a cluster depend on workload characteristics.
  • Typical clusters are provisioned with one or two 1-GB uplinks per data node. Cluster management is made easier by choosing network architectures that are proven to be resilient and easy to manage and that can scale as your data grows.
  • The use of 10Gbps server access is largely dependent on the cost/performance trade-off.
  • The workload characteristics and business requirement to complete the job in required time will drive the 10Gbps server connectivity.
  • For example: As 10 Gbps Ethernet LAN-on-motherboard (LOM) connectors become more commonly available on servers in the future, more clusters likely will be built with 10 Gigabit Ethernet data node uplinks.


Hadoop Performance: How storage disk types in individual node will impact the job performance?

As you may have already know that Hadoop Cluster is network and disk, IO intensive. Recently I was trying to run a test scenario where I decided to change SATA hard disk to a high performance SSD Disk while keeping the cluster hardware the same. I was running the terra sort test to validate if having high performance SSD should have impacted the overall performance. I found that having SSD instead of SATA improved the test performance by ~20%.

After that, I tried to dig information on internet about other tests done in similar fashion to see what could be the best practice in this direction. The following recommendation I found from Intel by choosing appropriate combination of disk throughput, in-memory caching, cluster deployment and multi CPU box.

  • We found SSDs to be very effective for both read and write operation.
  • In-memory caching resulted in better response through setting right amount of “HEAP CACHE” to achieve higher cache hit percentage.
  • Cluster environment served the requests faster where as “CPU I/O WAIT” spikes were noticed.
  • Overall most of the CPUs remain idle during the test.

In a test demonstrates by Intel, that impact of going from two to four disks in a node (doubling the IO).

–           Job was completed in half the time with previous IO

–           Increasing server cost by 10% increased, sort performance by 100%.

If we consider MapReduce local directory where mapped files are stored locally, adding multiple same disks to this mount could improve the performance. Replacing

SATA with SSD or PCIe based flash cards can improve IO for certain jobs. Performance increases vary by workload however in a strict sense this increases the per server cost while decreasing the cost per job / transaction.

SVN checkout list of Hadoop Core Components Source

Hadoop Core Components:









Other Hadoop Components: