Scoring H2O MOJO models with spark UDF and Scala

With H2O machine learning the best case is that your machine learning models can be exported as Java code so you can use them for scoring in any platform which supports Java. H2O algorithms generates POJO and MOJO models which does not require H2O runtime to score which is great for any enterprise. You can learn more about H2O POJO and MOJO models here.

Here is the Spark Scala code which shows how to score the H2O MOJO model by loading it from the disk and then using RowData object to pass as row to H2O easyPredict class:

import _root_.hex.genmodel.GenModel
import _root_.hex.genmodel.easy.{EasyPredictModelWrapper, RowData}
import _root_.hex.genmodel.easy.prediction
import _root_.hex.genmodel.MojoModel
import _root_.hex.genmodel.easy.RowData

// Load Mojo
val mojo = MojoModel.load("/Users/avkashchauhan/learn/customers/mojo_bin/gbm_model.zip")
val easyModel = new EasyPredictModelWrapper(mojo)

// Get Mojo Details
var features = mojo.getNames.toBuffer

// Creating the row
val r = new RowData
r.put("AGE", "68")
r.put("RACE", "2")
r.put("DCAPS", "2")
r.put("VOL", "0")
r.put("GLEASON", "6")

// Performing the Prediction
val prediction = easyModel.predictBinomial(r).classProbabilities

Above the MOJO model is stored into local file system as gbm_prostate_model.zip and it is loaded as resources inside the Scala code.  The full execution of above code is available here.

Following is the simple Java code which shows how you could use the same code to write a Java application to perform scoring based on H2O MOJO Model:

import java.io.*;
import hex.genmodel.easy.RowData;
import hex.genmodel.easy.EasyPredictModelWrapper;
import hex.genmodel.easy.prediction.*;
import hex.genmodel.MojoModel;
import java.util.Arrays;

public class main {
  public static void main(String[] args) throws Exception {
    EasyPredictModelWrapper model = new EasyPredictModelWrapper(MojoModel.load("gbm_prostate_model.zip"));

    hex.genmodel.GenModel mojo = MojoModel.load("gbm_prostate_model.zip");

    System.out.println("isSupervised : " + mojo.isSupervised());
    System.out.println("Columns Names : " + Arrays.toString(mojo.getNames()));
    System.out.println("Number of columns : " + mojo.getNumCols());
    System.out.println("Response ID : " + mojo.getResponseIdx());
    System.out.println("Response Name : " + mojo.getResponseName());

    for (int i = 0; i < mojo.getNumCols(); i++) {
      String[] domainValues = mojo.getDomainValues(i);
      System.out.println(Arrays.toString(domainValues));
    }

    RowData row = new RowData();
    row.put("AGE", "68");
    row.put("RACE", "2");
    row.put("DCAPS", "2");
    row.put("VOL", "0");
    row.put("GLEASON", "6");

    BinomialModelPrediction p = model.predictBinomial(row);
    System.out.println("Has penetrated the prostatic capsule (1=yes; 0=no): " + p.label);
    System.out.print("Class probabilities: ");
    for (int i = 0; i < p.classProbabilities.length; i++) {
      if (i > 0) {
    System.out.print(",");
      }
      System.out.print(p.classProbabilities[i]);
    }
    System.out.println("");
  }
}

Thats it, enjoy!!

Advertisements

Flatten complex nested parquet files on Hadoop with Herringbone

Herringbone

Herringbone is a suite of tools for working with parquet files on hdfs, and with impala and hive.https://github.com/stripe/herringbone

Please visit my github and this specific page for more details.

Installation:

Note: You must be using a Hadoop machine and herringbone needs Hadoop environmet.

Pre-requsite : Thrift

  • Thrift 0.9.1 (MUST have 0.9.1 as 0.9.3 and 0.10.0 will give error while packaging)
  • Get thrift 0.9.1 Link

Pre-requsite : Impala

  • First setup Cloudera repo in your machine:
  • Install Impala
    • Install impala : $ sudo apt-get install impala
    • Install impala Server : $ sudo apt-get install impala-server
    • Install impala stat-store : $ sudo apt-get install impala-state-store
    • Install impala shell : $ sudo apt-get install impala-shell
    • Verify : impala : $ impala-shell
impala-shell
Starting Impala Shell without Kerberos authentication
Connected to mr-0xd7-precise1.0xdata.loc:21000
Server version: impalad version 2.6.0-cdh5.8.4 RELEASE (build 207450616f75adbe082a4c2e1145a2384da83fa6)
Welcome to the Impala shell. Press TAB twice to see a list of available commands.

Copyright (c) 2012 Cloudera, Inc. All rights reserved.

(Shell build version: Impala Shell v1.4.0-cdh4-INTERNAL (08fa346) built on Mon Jul 14 15:52:52 PDT 2014)

Building : Herringbone source

Here is the successful herringbone “mvn package” command log for your review:

[INFO] Scanning for projects...
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Build Order:
[INFO]
[INFO] Herringbone Impala
[INFO] Herringbone Main
[INFO] Herringbone
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Herringbone Impala 0.0.2
[INFO] ------------------------------------------------------------------------
..
..
..
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Herringbone 0.0.1
[INFO] ------------------------------------------------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] Herringbone Impala ................................. SUCCESS [ 2.930 s]
[INFO] Herringbone Main ................................... SUCCESS [ 13.012 s]
[INFO] Herringbone ........................................ SUCCESS [ 0.000 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 16.079 s
[INFO] Finished at: 2017-10-06T11:27:20-07:00
[INFO] Final Memory: 90M/1963M
[INFO] ------------------------------------------------------------------------

Using Herringbone

Note: You must have fiels on Hadoop, not on local file system

Verify the file on Hadoop:

  • ~/herringbone$ hadoop fs -ls /user/avkash/file-test1.parquet
  • -rw-r–r– 3 avkash avkash 1463376 2017-09-13 16:56 /user/avkash/file-test1.parquet
  • ~/herringbone$ bin/herringbone flatten -i /user/avkash/file-test1.parquet
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/avkash/herringbone/herringbone-main/target/herringbone-0.0.1-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.8.4-1.cdh5.8.4.p0.5/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
17/10/06 12:06:44 INFO client.RMProxy: Connecting to ResourceManager at mr-0xd1-precise1.0xdata.loc/172.16.2.211:8032
17/10/06 12:06:45 INFO Configuration.deprecation: mapred.max.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.maxsize
17/10/06 12:06:45 INFO input.FileInputFormat: Total input paths to process : 1
17/10/06 12:06:45 INFO Configuration.deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
1 initial splits were generated.
  Max: 1.34M
  Min: 1.34M
  Avg: 1.34M
1 merged splits were generated.
  Max: 1.34M
  Min: 1.34M
  Avg: 1.34M
17/10/06 12:06:45 INFO mapreduce.JobSubmitter: number of splits:1
17/10/06 12:06:45 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1499294366934_0707
17/10/06 12:06:45 INFO impl.YarnClientImpl: Submitted application application_1499294366934_0707
17/10/06 12:06:46 INFO mapreduce.Job: The url to track the job: http://mr-0xd1-precise1.0xdata.loc:8088/proxy/application_1499294366934_0707/
17/10/06 12:06:46 INFO mapreduce.Job: Running job: job_1499294366934_0707
17/10/06 12:06:52 INFO mapreduce.Job: Job job_1499294366934_0707 running in uber mode : false
17/10/06 12:06:52 INFO mapreduce.Job:  map 0% reduce 0%
17/10/06 12:07:22 INFO mapreduce.Job:  map 100% reduce 0%

Now verify the file:

~/herringbone$ hadoop fs -ls /user/avkash/file-test1.parquet-flat

Found 2 items
-rw-r--r--   3 avkash avkash          0 2017-10-06 12:07 /user/avkash/file-test1.parquet-flat/_SUCCESS
-rw-r--r--   3 avkash avkash    2901311 2017-10-06 12:07 /user/avkash/file-test1.parquet-flat/part-m-00000.parquet

Thats it, enjoy!!

H2O Word2Vec Tutorial with example in Scala

If you would like to know what is word2vec and why you should use it, there is lots of material available to scan.  You can learn more about H2O implementation of Word2Vec here, along with its configuration and interpretation.

In this Scala example we will use H2O Word2Vec algorithm to build a model using the given Text (as text file, or an Array) and then build Word2vec model from it.

Here is the full Scala code of the following example at my github.

Lets start H2O cluster first:

import org.apache.spark.h2o._
val h2oContext = H2OContext.getOrCreate(spark)

Now we will be importing required libraries to get our job done:

import scala.io.Source
import _root_.hex.word2vec.{Word2Vec, Word2VecModel}
import _root_.hex.word2vec.Word2VecModel.Word2VecParameters
import water.fvec.Vec

Now we will be creating a stop words list which are not useful for text mining and removed from the word source:

val STOP_WORDS = Set("ourselves", "hers", "between", "yourself", "but", "again", "there", "about", 
    "once", "during", "out", "very", "having", "with", "they", "own", "an", "be", "some", "for", "do", 
    "its", "yours", "such", "into", "of", "most", "itself", "other", "off", "is", "s", "am", "or", "who", "as", 
     "from", "him", "each", "the", "themselves", "until", "below", "are", "we", "these", "your", "his", "through", "don", "nor", "me", "were", "her", 
    "more", "himself", "this", "down", "should", "our", "their", "while", "above", "both", "up", 
    "to", "ours", "had", "she", "all", "no", "when", "at", "any", "before", "them", "same", "and", "been", "have", "in", "will", "on", "does", "yourselves", "then", "that", "because", "what", "over", "why", "so", "can", 
    "did", "not", "now", "under", "he", "you", "herself", "has", "just", "where", "too", "only", "myself", "which", "those", "i", "after", "few", "whom", "t", "being", "if", "theirs", "my", "against", "a", "by", "doing", 
    "it", "how", "further", "was", "here", "than")

Note:

Now lets ingest the text data we would want to run Word2Vec algorithms to vectorize the data first and then run machine learning experiment to it.

I have downloaded a free story “The Adventure of Sherlock Holmes” from Internet and using that as my source.  

val filename = "/Users/avkashchauhan/Downloads/TheAdventuresOfSherlockHolmes.txt"
val lines = Source.fromFile(filename).getLines.toArray
val sparkframe = sc.parallelize(lines)

Now lets defined the tokenize function which will convert out input text to tokens:

def tokenize(line: String) = {
 //get rid of nonWords such as punctuation as opposed to splitting by just " "
 line.split("""\W+""")
 .map(_.toLowerCase)

//Lets remove stopwords defined above
 .filterNot(word => STOP_WORDS.contains(word)) :+ null
}

Now we will be calling the tokenize function to create a list of labeled words:

val allLabelledWords = sparkframe.flatMap(d => tokenize(d))

Note: You can also use your own or a custom tokenize function from a library as well, you just need to map the function to the DataFrame.

Now lets convert the collection of label words into an H2O DataFrame:

val h2oFrame = h2oContext.asH2OFrame(allLabelledWords)

Here is the time now to use the H2O Word2Vec algorithm by configuring the parameters first:

val w2vParams = new Word2VecParameters
w2vParams._train = h2oFrame._key
w2vParams._epochs = 500
w2vParams._min_word_freq = 0
w2vParams._init_learning_rate = 0.05f
w2vParams._window_size = 20
w2vParams._vec_size = 20
w2vParams._sent_sample_rate = 0.0001f

Now we will perform the real action, building the model:

val w2v = new Word2Vec(w2vParams).trainModel().get()

Now we can apply the model to perform some actions on it:

Lets start first test by finding synonyms using this given word2vec model. We will be calling findSynonyms method by passing a given word  to find N synonyms, the results will be the top ‘count’ synonyms with their distance values:

w2v.findSynonyms("love", 3)
w2v.findSynonyms("help", 2)
w2v.findSynonyms("hate", 1)

Lets Transform words using w2v model and aggregate method average:

The transform() function takes an H2O Vec as the first parameter, where the vector needs to be extracted from the H2O frame h2oFrame.

val newSparkFrame = w2v.transform(h2oFrame.vec(0), Word2VecModel.AggregateMethod.NONE).toTwoDimTable()

Thats it, enjoy!!

 

Reading nested parquet file in Scala and exporting to CSV

Recently we were working on a problem where the parquet compressed file had lots of nested tables and some of the tables had columns with array type and our objective was to read it and save it to CSV.

We wrote a script in Scala which does the following

  • Handles nested parquet compressed content
  • Look for columns as “Array” and then remove those columns

Here is a the script

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colPath = if (prefix == null) s"`${f.name}`" else s"${prefix}.`${f.name}`"

    f.dataType match {
      case st: StructType => flattenSchema(st, colPath)
      // Skip user defined types like array or vectors
      case x if x.isInstanceOf[ArrayType] => Array.empty[Column]
      case _ => Array(col(colPath).alias(colPath.replaceAll("[.`]", "_")))
    }
  })
}

Here are the all the steps you would need to take while reading the parquet compressed content and then exporting it to disk as CSV.

val spark = new org.apache.spark.sql.SQLContext(sc)
import org.apache.spark.sql.types._
import org.apache.spark.sql.Column
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

scala> :paste
// Entering paste mode (ctrl-D to finish)

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colPath = if (prefix == null) s"`${f.name}`" else s"${prefix}.`${f.name}`"

    f.dataType match {
      case st: StructType => flattenSchema(st, colPath)
      // Skip user defined types like array or vectors
      case x if x.isInstanceOf[ArrayType] => Array.empty[Column]
      case _ => Array(col(colPath).alias(colPath.replaceAll("[.`]", "_")))
    }
  })
}

// Exiting paste mode, now interpreting.

flattenSchema: (schema: org.apache.spark.sql.types.StructType, prefix: String)Array[org.apache.spark.sql.Column]

scala >

val df = spark.read.parquet("/user/avkash/test.parquet")

df.select(flattenSchema(df.schema):_*).write.format("com.databricks.spark.csv").save("/Users/avkashchauhan/Downloads/saveit/result.csv")

If you want to see the full working scripts with output you can visit any of the following link based on your Spark Version:

  • Here is the full working demo in Spark 2.1.0
  • Here is the full working demo in Spark 1.6.x.

We got some help from the StackOverflow discussion here. Michal K and Michal M helped me to write above solution.

Thats it, enjoy!!

Stacked Ensemble Model in Scala using H2O GBM and Deep Learning Models

In this full Scala sample we will be using H2O Stacked Ensembles algorithm. Stacked ensemble is a process of building models of various types first with cross-validation and keep fold columns for each model. In the next step building the stacked ensemble model using all the CV folds. You can learn more about Stacked Ensembles here.

In this Stacked Ensemble we will be using GBM and Deep Learning Algorithms and then finally building the Stacked Ensemble model using the GBM and Deep Learning models.

First lets import key classes specific to H2O:

import org.apache.spark.h2o._
import water.Key
import java.io.File

Now we will create H2O context so we can call key H2O function specific to data ingest and Deep Learning algorithms:

val h2oContext = H2OContext.getOrCreate(sc)
import h2oContext._
import h2oContext.implicits._

Lets import data from local file system as H2O Data Frame:

val prostateData = new H2OFrame(new File("/Users/avkashchauhan/src/github.com/h2oai/sparkling-water/examples/smalldata/prostate.csv"))

In this Stacked Ensemble we will be using GBM and Deep Learning Algorithms so lets first build the deep learning model:

import _root_.hex.deeplearning.DeepLearning
import _root_.hex.deeplearning.DeepLearningModel.DeepLearningParameters

val dlParams = new DeepLearningParameters()
dlParams._epochs = 100
dlParams._train = prostateData
dlParams._response_column = 'CAPSULE
dlParams._variable_importances = true
dlParams._nfolds = 5
dlParams._seed = 1111
dlParams._keep_cross_validation_predictions = true;
val dl = new DeepLearning(dlParams, Key.make("dlProstateModel.hex"))
val dlModel = dl.trainModel.get

Now lets build the GBM model:

import _root_.hex.tree.gbm.GBM
import _root_.hex.tree.gbm.GBMModel.GBMParameters

val gbmParams = new GBMParameters()
gbmParams._train = prostateData
gbmParams._response_column = 'CAPSULE
gbmParams._nfolds = 5
gbmParams._seed = 1111
gbmParams._keep_cross_validation_predictions = true;
val gbm = new GBM(gbmParams,Key.make("gbmRegModel.hex"))
val gbmModel = gbm.trainModel().get()

Now build the Stacked Ensemble Models so first we need classes required for Stacked Ensembles as below:

import _root_.hex.Model
import _root_.hex.StackedEnsembleModel
import _root_.hex.ensemble.StackedEnsemble

Now we will define Stacked Ensembles parameters as below:

val stackedEnsembleParameters = new StackedEnsembleModel.StackedEnsembleParameters()
stackedEnsembleParameters._train = prostateData._key
stackedEnsembleParameters._response_column = 'CAPSULE

Now we need to pass all the different algorithms we would want to use in the Stacked Ensemble by passing their keys as below:

type T_MODEL_KEY = Key[Model[_, _ <: Model.Parameters, _ <:Model.Output]]

// Option 1
stackedEnsembleParameters._base_models = Array(gbmRegModel._key.asInstanceOf[T_MODEL_KEY], dlModel._key.asInstanceOf[T_MODEL_KEY])
// Option 2 
stackedEnsembleParameters._base_models = Array(gbmRegModel, dlModel).map(model => model._key.asInstanceOf[T_MODEL_KEY])

// Note: You can choose any of the above option to pass the model keys

Finally defining the stacked ensemble job as below:

val stackedEnsembleJob = new StackedEnsemble(stackedEnsembleParameters)

And as the last steps let build the stacked ensemble model:

val stackedEnsembleModel = stackedEnsembleJob.trainModel().get();

Now we can take a look at our Stacked Ensemble model as below:

stackedEnsembleModel

Thats it, enjoy!!

Helpful content: https://github.com/h2oai/h2o-3/blob/a554bffabda6770386a31d47e05f00543d7b9ac3/h2o-algos/src/test/java/hex/ensemble/StackedEnsembleTest.java

 

Logistic Regression with H2O Deep Learning in Scala

Here is the sample code which show using Feed Forward Network based Deep Learning algorithms from H2O to perform a logistic regression .

First lets import key classes specific to H2O

import org.apache.spark.h2o._
import water.Key
import java.io.File

Now we will create H2O context so we can call key H2O function specific to data ingest and Deep Learning algorithms:

val h2oContext = H2OContext.getOrCreate(sc)
import h2oContext._
import h2oContext.implicits._

Lets import data from local file system as H2O Data Frame:

val prostateData = new H2OFrame(new File("/Users/avkashchauhan/src/github.com/h2oai/sparkling-water/examples/smalldata/prostate.csv"))

Now lets import Deep Learning classes:

import root.hex.deeplearning.DeepLearning
import root.hex.deeplearning.DeepLearningModel.DeepLearningParameters

Now we will define all key parameters specific to H2O Deep Learning Algorithm

val dlParams = new DeepLearningParameters()
dlParams._epochs = 100
dlParams._train = prostateData
dlParams._response_column = 'CAPSULE
dlParams._variable_importances = true
dlParams._nfolds = 5
dlParams._seed = 1111
dlParams._keep_cross_validation_predictions = true;

Now we will create the Deep Learning Algorithm key first and then start the deep learning algorithm in blocking mode:

val dl = new DeepLearning(dlParams, Key.make("dlProstateModel.hex"))
val dlModel = dl.trainModel.get()

Lets learn more about our model:

dlModel

Now we can perform the prediction by passing an H2O Dataframe (Here I am simply passing the original data frame however you can load your test  data frame and pass it as H2O frame to perform prediction.):

val predictionH2OFrame = dlModel.score(prostateData)('predict)
val predictionsFromModel = asRDD[DoubleHolder](predictionH2OFrame).collect.map(_.result.getOrElse(Double.NaN))

Thats it, enjoy!!

 

 

H2O AutoML examples in python and Scala

AutoML is included into H2O version 3.14.0.1 and above. You can learn more about AutoML in the H2O blog here.

H2O’s AutoML can be used for automating a large part of the machine learning workflow, which includes automatic training and tuning of many models within a user-specified time-limit. The user can also use a performance metric-based stopping criterion for the AutoML process rather than a specific time constraint. Stacked Ensembles will be automatically trained on the collection individual models to produce a highly predictive ensemble model which, in most cases, will be the top performing model in the AutoML Leaderboard.

Here is the full working python code taken from here:

import h2o
from h2o.automl import H2OAutoML

h2o.init()
df = h2o.import_file("https://raw.githubusercontent.com/h2oai/sparkling-water/master/examples/smalldata/prostate.csv")
train, test = df.split_frame(ratios=[.9])
# Identify predictors and response
x = train.columns
y = "CAPSULE"
x.remove(y)

# For binary classification, response should be a factor
train[y] = train[y].asfactor()
test[y] = test[y].asfactor()

# Run AutoML for 60 seconds
aml = H2OAutoML(max_runtime_secs = 60)
aml.train(x = x, y = y, training_frame = train, leaderboard_frame = test)

# View the AutoML Leaderboard
aml.leaderboard
aml.leader

# To generate predictions on a test set, use `"H2OAutoML"` object, or on the leader model object directly as below:
preds = aml.predict(test)
# or
preds = aml.leader.predict(test)

Here is the full working Scala code:

import ai.h2o.automl.AutoML;
import ai.h2o.automl.AutoMLBuildSpec
import org.apache.spark.h2o._
val h2oContext = H2OContext.getOrCreate(sc)
import h2oContext._
import java.io.File
import h2oContext.implicits._
import water.Key
val prostateData = new H2OFrame(new File("/Users/avkashchauhan/src/github.com/h2oai/sparkling-water/examples/smalldata/prostate.csv"))
val autoMLBuildSpec = new AutoMLBuildSpec()
autoMLBuildSpec.input_spec.training_frame = prostateData
autoMLBuildSpec.input_spec.response_column = "CAPSULE";
autoMLBuildSpec.build_control.loss = "AUTO"
autoMLBuildSpec.build_control.stopping_criteria.set_max_runtime_secs(5)
import java.util.Date;
val aml = AutoML.makeAutoML(Key.make(), new Date(), autoMLBuildSpec)
AutoML.startAutoML(aml)
// Note: In some cases the above call is non-blocking
// So using the following alternative function will block the next commmand, untill the exection of action command
AutoML.startAutoML(autoMLBuildSpec).get()  ## This is forced blocking call
aml.leader
aml.leaderboard

IF you want to see the full code execution visit here.

Thats it, enjoy!!