Scoring H2O MOJO models with spark UDF and Scala

With H2O machine learning the best case is that your machine learning models can be exported as Java code so you can use them for scoring in any platform which supports Java. H2O algorithms generates POJO and MOJO models which does not require H2O runtime to score which is great for any enterprise. You can learn more about H2O POJO and MOJO models here.

Here is the Spark Scala code which shows how to score the H2O MOJO model by loading it from the disk and then using RowData object to pass as row to H2O easyPredict class:

import _root_.hex.genmodel.GenModel
import _root_.hex.genmodel.easy.{EasyPredictModelWrapper, RowData}
import _root_.hex.genmodel.easy.prediction
import _root_.hex.genmodel.MojoModel
import _root_.hex.genmodel.easy.RowData

// Load Mojo
val mojo = MojoModel.load("/Users/avkashchauhan/learn/customers/mojo_bin/")
val easyModel = new EasyPredictModelWrapper(mojo)

// Get Mojo Details
var features = mojo.getNames.toBuffer

// Creating the row
val r = new RowData
r.put("AGE", "68")
r.put("RACE", "2")
r.put("DCAPS", "2")
r.put("VOL", "0")
r.put("GLEASON", "6")

// Performing the Prediction
val prediction = easyModel.predictBinomial(r).classProbabilities

Above the MOJO model is stored into local file system as and it is loaded as resources inside the Scala code.  The full execution of above code is available here.

Following is the simple Java code which shows how you could use the same code to write a Java application to perform scoring based on H2O MOJO Model:

import hex.genmodel.easy.RowData;
import hex.genmodel.easy.EasyPredictModelWrapper;
import hex.genmodel.easy.prediction.*;
import hex.genmodel.MojoModel;
import java.util.Arrays;

public class main {
  public static void main(String[] args) throws Exception {
    EasyPredictModelWrapper model = new EasyPredictModelWrapper(MojoModel.load(""));

    hex.genmodel.GenModel mojo = MojoModel.load("");

    System.out.println("isSupervised : " + mojo.isSupervised());
    System.out.println("Columns Names : " + Arrays.toString(mojo.getNames()));
    System.out.println("Number of columns : " + mojo.getNumCols());
    System.out.println("Response ID : " + mojo.getResponseIdx());
    System.out.println("Response Name : " + mojo.getResponseName());

    for (int i = 0; i < mojo.getNumCols(); i++) {
      String[] domainValues = mojo.getDomainValues(i);

    RowData row = new RowData();
    row.put("AGE", "68");
    row.put("RACE", "2");
    row.put("DCAPS", "2");
    row.put("VOL", "0");
    row.put("GLEASON", "6");

    BinomialModelPrediction p = model.predictBinomial(row);
    System.out.println("Has penetrated the prostatic capsule (1=yes; 0=no): " + p.label);
    System.out.print("Class probabilities: ");
    for (int i = 0; i < p.classProbabilities.length; i++) {
      if (i > 0) {

Thats it, enjoy!!


Launching H2O cluster on different port in pysparkling

In this example we will launch H2O machine learning cluster using pysparkling package. You can visit my github and this article to learn more about the code execution explained in this article.

For you would need to  install pysparkling in python 2.7 setup as below:

> pip install -U h2o_pysparkling_2.1

Now we can launch the pysparkling Shell as below:


Launch pysparkling shell:

~/tools/sw2/sparkling-water-2.1.14 $ bin/pysparkling

Python Code Script Launch the H2O cluster in pysparkling:

## Importing Libraries
from pysparkling import *
import h2o

## Setting H2O Conf Object
h2oConf = H2OConf(sc)

## Setting H2O Conf for different port

## Gett H2O Conf Object to see the configuration

## Launching H2O Cluster
hc = H2OContext.getOrCreate(spark, h2oConf)

## Getting H2O Cluster status

Now If you verify the Sparkling Water configuration you will see that the H2O is running on the given IP and port 54300 as configured:

Sparkling Water configuration:
  backend cluster mode : internal
  workers              : None
  cloudName            : Not set yet, it will be set automatically before starting H2OContext.
  flatfile             : true
  clientBasePort       : 54300
  nodeBasePort         : 54300
  cloudTimeout         : 60000
  h2oNodeLog           : INFO
  h2oClientLog         : WARN
  nthreads             : -1
  drddMulFactor        : 10

Thats it, enjoy!!

Flatten complex nested parquet files on Hadoop with Herringbone


Herringbone is a suite of tools for working with parquet files on hdfs, and with impala and hive.

Please visit my github and this specific page for more details.


Note: You must be using a Hadoop machine and herringbone needs Hadoop environmet.

Pre-requsite : Thrift

  • Thrift 0.9.1 (MUST have 0.9.1 as 0.9.3 and 0.10.0 will give error while packaging)
  • Get thrift 0.9.1 Link

Pre-requsite : Impala

  • First setup Cloudera repo in your machine:
  • Install Impala
    • Install impala : $ sudo apt-get install impala
    • Install impala Server : $ sudo apt-get install impala-server
    • Install impala stat-store : $ sudo apt-get install impala-state-store
    • Install impala shell : $ sudo apt-get install impala-shell
    • Verify : impala : $ impala-shell
Starting Impala Shell without Kerberos authentication
Connected to mr-0xd7-precise1.0xdata.loc:21000
Server version: impalad version 2.6.0-cdh5.8.4 RELEASE (build 207450616f75adbe082a4c2e1145a2384da83fa6)
Welcome to the Impala shell. Press TAB twice to see a list of available commands.

Copyright (c) 2012 Cloudera, Inc. All rights reserved.

(Shell build version: Impala Shell v1.4.0-cdh4-INTERNAL (08fa346) built on Mon Jul 14 15:52:52 PDT 2014)

Building : Herringbone source

Here is the successful herringbone “mvn package” command log for your review:

[INFO] Scanning for projects...
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Build Order:
[INFO] Herringbone Impala
[INFO] Herringbone Main
[INFO] Herringbone
[INFO] ------------------------------------------------------------------------
[INFO] Building Herringbone Impala 0.0.2
[INFO] ------------------------------------------------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] Building Herringbone 0.0.1
[INFO] ------------------------------------------------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO] Herringbone Impala ................................. SUCCESS [ 2.930 s]
[INFO] Herringbone Main ................................... SUCCESS [ 13.012 s]
[INFO] Herringbone ........................................ SUCCESS [ 0.000 s]
[INFO] ------------------------------------------------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 16.079 s
[INFO] Finished at: 2017-10-06T11:27:20-07:00
[INFO] Final Memory: 90M/1963M
[INFO] ------------------------------------------------------------------------

Using Herringbone

Note: You must have fiels on Hadoop, not on local file system

Verify the file on Hadoop:

  • ~/herringbone$ hadoop fs -ls /user/avkash/file-test1.parquet
  • -rw-r–r– 3 avkash avkash 1463376 2017-09-13 16:56 /user/avkash/file-test1.parquet
  • ~/herringbone$ bin/herringbone flatten -i /user/avkash/file-test1.parquet
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/avkash/herringbone/herringbone-main/target/herringbone-0.0.1-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.8.4-1.cdh5.8.4.p0.5/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See for further details.
17/10/06 12:06:44 INFO client.RMProxy: Connecting to ResourceManager at mr-0xd1-precise1.0xdata.loc/
17/10/06 12:06:45 INFO Configuration.deprecation: mapred.max.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.maxsize
17/10/06 12:06:45 INFO input.FileInputFormat: Total input paths to process : 1
17/10/06 12:06:45 INFO Configuration.deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
1 initial splits were generated.
  Max: 1.34M
  Min: 1.34M
  Avg: 1.34M
1 merged splits were generated.
  Max: 1.34M
  Min: 1.34M
  Avg: 1.34M
17/10/06 12:06:45 INFO mapreduce.JobSubmitter: number of splits:1
17/10/06 12:06:45 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1499294366934_0707
17/10/06 12:06:45 INFO impl.YarnClientImpl: Submitted application application_1499294366934_0707
17/10/06 12:06:46 INFO mapreduce.Job: The url to track the job: http://mr-0xd1-precise1.0xdata.loc:8088/proxy/application_1499294366934_0707/
17/10/06 12:06:46 INFO mapreduce.Job: Running job: job_1499294366934_0707
17/10/06 12:06:52 INFO mapreduce.Job: Job job_1499294366934_0707 running in uber mode : false
17/10/06 12:06:52 INFO mapreduce.Job:  map 0% reduce 0%
17/10/06 12:07:22 INFO mapreduce.Job:  map 100% reduce 0%

Now verify the file:

~/herringbone$ hadoop fs -ls /user/avkash/file-test1.parquet-flat

Found 2 items
-rw-r--r--   3 avkash avkash          0 2017-10-06 12:07 /user/avkash/file-test1.parquet-flat/_SUCCESS
-rw-r--r--   3 avkash avkash    2901311 2017-10-06 12:07 /user/avkash/file-test1.parquet-flat/part-m-00000.parquet

Thats it, enjoy!!

Reading nested parquet file in Scala and exporting to CSV

Recently we were working on a problem where the parquet compressed file had lots of nested tables and some of the tables had columns with array type and our objective was to read it and save it to CSV.

We wrote a script in Scala which does the following

  • Handles nested parquet compressed content
  • Look for columns as “Array” and then remove those columns

Here is a the script

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colPath = if (prefix == null) s"`${}`" else s"${prefix}.`${}`"

    f.dataType match {
      case st: StructType => flattenSchema(st, colPath)
      // Skip user defined types like array or vectors
      case x if x.isInstanceOf[ArrayType] => Array.empty[Column]
      case _ => Array(col(colPath).alias(colPath.replaceAll("[.`]", "_")))

Here are the all the steps you would need to take while reading the parquet compressed content and then exporting it to disk as CSV.

val spark = new org.apache.spark.sql.SQLContext(sc)
import org.apache.spark.sql.types._
import org.apache.spark.sql.Column
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

scala> :paste
// Entering paste mode (ctrl-D to finish)

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colPath = if (prefix == null) s"`${}`" else s"${prefix}.`${}`"

    f.dataType match {
      case st: StructType => flattenSchema(st, colPath)
      // Skip user defined types like array or vectors
      case x if x.isInstanceOf[ArrayType] => Array.empty[Column]
      case _ => Array(col(colPath).alias(colPath.replaceAll("[.`]", "_")))

// Exiting paste mode, now interpreting.

flattenSchema: (schema: org.apache.spark.sql.types.StructType, prefix: String)Array[org.apache.spark.sql.Column]

scala >

val df ="/user/avkash/test.parquet")*).write.format("com.databricks.spark.csv").save("/Users/avkashchauhan/Downloads/saveit/result.csv")

If you want to see the full working scripts with output you can visit any of the following link based on your Spark Version:

  • Here is the full working demo in Spark 2.1.0
  • Here is the full working demo in Spark 1.6.x.

We got some help from the StackOverflow discussion here. Michal K and Michal M helped me to write above solution.

Thats it, enjoy!!

H2O AutoML examples in python and Scala

AutoML is included into H2O version and above. You can learn more about AutoML in the H2O blog here.

H2O’s AutoML can be used for automating a large part of the machine learning workflow, which includes automatic training and tuning of many models within a user-specified time-limit. The user can also use a performance metric-based stopping criterion for the AutoML process rather than a specific time constraint. Stacked Ensembles will be automatically trained on the collection individual models to produce a highly predictive ensemble model which, in most cases, will be the top performing model in the AutoML Leaderboard.

Here is the full working python code taken from here:

import h2o
from h2o.automl import H2OAutoML

df = h2o.import_file("")
train, test = df.split_frame(ratios=[.9])
# Identify predictors and response
x = train.columns

# For binary classification, response should be a factor
train[y] = train[y].asfactor()
test[y] = test[y].asfactor()

# Run AutoML for 60 seconds
aml = H2OAutoML(max_runtime_secs = 60)
aml.train(x = x, y = y, training_frame = train, leaderboard_frame = test)

# View the AutoML Leaderboard

# To generate predictions on a test set, use `"H2OAutoML"` object, or on the leader model object directly as below:
preds = aml.predict(test)
# or
preds = aml.leader.predict(test)

Here is the full working Scala code:

import ai.h2o.automl.AutoML;
import ai.h2o.automl.AutoMLBuildSpec
import org.apache.spark.h2o._
val h2oContext = H2OContext.getOrCreate(sc)
import h2oContext._
import h2oContext.implicits._
import water.Key
val prostateData = new H2OFrame(new File("/Users/avkashchauhan/src/"))
val autoMLBuildSpec = new AutoMLBuildSpec()
autoMLBuildSpec.input_spec.training_frame = prostateData
autoMLBuildSpec.input_spec.response_column = "CAPSULE";
autoMLBuildSpec.build_control.loss = "AUTO"
import java.util.Date;
val aml = AutoML.makeAutoML(Key.make(), new Date(), autoMLBuildSpec)
// Note: In some cases the above call is non-blocking
// So using the following alternative function will block the next commmand, untill the exection of action command
AutoML.startAutoML(autoMLBuildSpec).get()  ## This is forced blocking call

IF you want to see the full code execution visit here.

Thats it, enjoy!!

Setting H2O FLOW directory path in Sparkling Water

Sometimes you may want to back up H2O FLOW files to some source code repo or to a backup location. For that reason you may want to change the default FLOW directory.

In H2O flag -flow_dir is used to set the local folder for FLOW files.

Note: You can always specify any H2O property by using system properties on Spark driver/executors.

So to change H2O FLOW directory to save you can append to your command line with the Sparkling Water commandline:

--conf spark.driver.extraJavaOptions="-Dai.h2o.flow_dir=/your/backup/location"


Thats it, thanks.

Using RESTful API to get POJO and MOJO models in H2O


CURL API for Listing Models:


CURL API for Listing specific POJO Model:


List Specific MOJO Model:


Here is an example:

curl -X GET "http://localhost:54323/3/Models"
curl -X GET "http://localhost:54323/3/Models/deeplearning_model" >> NAME_IT

curl -X GET "http://localhost:54323/3/Models/deeplearning_model" >>
curl -X GET "http://localhost:54323/3/Models/glm_model/mojo" >

Thats it, enjoy!!