Scoring H2O MOJO models with spark UDF and Scala

With H2O machine learning the best case is that your machine learning models can be exported as Java code so you can use them for scoring in any platform which supports Java. H2O algorithms generates POJO and MOJO models which does not require H2O runtime to score which is great for any enterprise. You can learn more about H2O POJO and MOJO models here.

Here is the Spark Scala code which shows how to score the H2O MOJO model by loading it from the disk and then using RowData object to pass as row to H2O easyPredict class:

import _root_.hex.genmodel.GenModel
import _root_.hex.genmodel.easy.{EasyPredictModelWrapper, RowData}
import _root_.hex.genmodel.easy.prediction
import _root_.hex.genmodel.MojoModel
import _root_.hex.genmodel.easy.RowData

// Load Mojo
val mojo = MojoModel.load("/Users/avkashchauhan/learn/customers/mojo_bin/gbm_model.zip")
val easyModel = new EasyPredictModelWrapper(mojo)

// Get Mojo Details
var features = mojo.getNames.toBuffer

// Creating the row
val r = new RowData
r.put("AGE", "68")
r.put("RACE", "2")
r.put("DCAPS", "2")
r.put("VOL", "0")
r.put("GLEASON", "6")

// Performing the Prediction
val prediction = easyModel.predictBinomial(r).classProbabilities

Above the MOJO model is stored into local file system as gbm_prostate_model.zip and it is loaded as resources inside the Scala code.  The full execution of above code is available here.

Following is the simple Java code which shows how you could use the same code to write a Java application to perform scoring based on H2O MOJO Model:

import java.io.*;
import hex.genmodel.easy.RowData;
import hex.genmodel.easy.EasyPredictModelWrapper;
import hex.genmodel.easy.prediction.*;
import hex.genmodel.MojoModel;
import java.util.Arrays;

public class main {
  public static void main(String[] args) throws Exception {
    EasyPredictModelWrapper model = new EasyPredictModelWrapper(MojoModel.load("gbm_prostate_model.zip"));

    hex.genmodel.GenModel mojo = MojoModel.load("gbm_prostate_model.zip");

    System.out.println("isSupervised : " + mojo.isSupervised());
    System.out.println("Columns Names : " + Arrays.toString(mojo.getNames()));
    System.out.println("Number of columns : " + mojo.getNumCols());
    System.out.println("Response ID : " + mojo.getResponseIdx());
    System.out.println("Response Name : " + mojo.getResponseName());

    for (int i = 0; i < mojo.getNumCols(); i++) {
      String[] domainValues = mojo.getDomainValues(i);
      System.out.println(Arrays.toString(domainValues));
    }

    RowData row = new RowData();
    row.put("AGE", "68");
    row.put("RACE", "2");
    row.put("DCAPS", "2");
    row.put("VOL", "0");
    row.put("GLEASON", "6");

    BinomialModelPrediction p = model.predictBinomial(row);
    System.out.println("Has penetrated the prostatic capsule (1=yes; 0=no): " + p.label);
    System.out.print("Class probabilities: ");
    for (int i = 0; i < p.classProbabilities.length; i++) {
      if (i > 0) {
    System.out.print(",");
      }
      System.out.print(p.classProbabilities[i]);
    }
    System.out.println("");
  }
}

Thats it, enjoy!!

Advertisements

Launching H2O cluster on different port in pysparkling

In this example we will launch H2O machine learning cluster using pysparkling package. You can visit my github and this article to learn more about the code execution explained in this article.

For you would need to  install pysparkling in python 2.7 setup as below:

> pip install -U h2o_pysparkling_2.1

Now we can launch the pysparkling Shell as below:

SPARK_HOME=/Users/avkashchauhan/tools/spark-2.1.0-bin-hadoop2.6

Launch pysparkling shell:

~/tools/sw2/sparkling-water-2.1.14 $ bin/pysparkling

Python Code Script Launch the H2O cluster in pysparkling:

## Importing Libraries
from pysparkling import *
import h2o

## Setting H2O Conf Object
h2oConf = H2OConf(sc)
h2oConf

## Setting H2O Conf for different port
h2oConf.set_client_port_base(54300)
h2oConf.set_node_base_port(54300)

## Gett H2O Conf Object to see the configuration
h2oConf

## Launching H2O Cluster
hc = H2OContext.getOrCreate(spark, h2oConf)

## Getting H2O Cluster status
h2o.cluster_status()

Now If you verify the Sparkling Water configuration you will see that the H2O is running on the given IP and port 54300 as configured:

Sparkling Water configuration:
  backend cluster mode : internal
  workers              : None
  cloudName            : Not set yet, it will be set automatically before starting H2OContext.
  flatfile             : true
  clientBasePort       : 54300
  nodeBasePort         : 54300
  cloudTimeout         : 60000
  h2oNodeLog           : INFO
  h2oClientLog         : WARN
  nthreads             : -1
  drddMulFactor        : 10

Thats it, enjoy!!

Reading nested parquet file in Scala and exporting to CSV

Recently we were working on a problem where the parquet compressed file had lots of nested tables and some of the tables had columns with array type and our objective was to read it and save it to CSV.

We wrote a script in Scala which does the following

  • Handles nested parquet compressed content
  • Look for columns as “Array” and then remove those columns

Here is a the script

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colPath = if (prefix == null) s"`${f.name}`" else s"${prefix}.`${f.name}`"

    f.dataType match {
      case st: StructType => flattenSchema(st, colPath)
      // Skip user defined types like array or vectors
      case x if x.isInstanceOf[ArrayType] => Array.empty[Column]
      case _ => Array(col(colPath).alias(colPath.replaceAll("[.`]", "_")))
    }
  })
}

Here are the all the steps you would need to take while reading the parquet compressed content and then exporting it to disk as CSV.

val spark = new org.apache.spark.sql.SQLContext(sc)
import org.apache.spark.sql.types._
import org.apache.spark.sql.Column
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

scala> :paste
// Entering paste mode (ctrl-D to finish)

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colPath = if (prefix == null) s"`${f.name}`" else s"${prefix}.`${f.name}`"

    f.dataType match {
      case st: StructType => flattenSchema(st, colPath)
      // Skip user defined types like array or vectors
      case x if x.isInstanceOf[ArrayType] => Array.empty[Column]
      case _ => Array(col(colPath).alias(colPath.replaceAll("[.`]", "_")))
    }
  })
}

// Exiting paste mode, now interpreting.

flattenSchema: (schema: org.apache.spark.sql.types.StructType, prefix: String)Array[org.apache.spark.sql.Column]

scala >

val df = spark.read.parquet("/user/avkash/test.parquet")

df.select(flattenSchema(df.schema):_*).write.format("com.databricks.spark.csv").save("/Users/avkashchauhan/Downloads/saveit/result.csv")

If you want to see the full working scripts with output you can visit any of the following link based on your Spark Version:

  • Here is the full working demo in Spark 2.1.0
  • Here is the full working demo in Spark 1.6.x.

We got some help from the StackOverflow discussion here. Michal K and Michal M helped me to write above solution.

Thats it, enjoy!!

Spark with H2O using rsparkling and sparklyr in R

You must have installed:

  • sparklyr
  • rsparkling

 

Here is the working script:

library(sparklyr)
> options(rsparkling.sparklingwater.version = “2.1.6”)
> Sys.setenv(SPARK_HOME=’/Users/avkashchauhan/tools/spark-2.1.0-bin-hadoop2.6′)
> library(rsparkling)
> spark_disconnect(sc)
> sc <- spark_connect(master = “local”, version = “2.1.0”)

Testing the spark context:

sc
$master
[1] “local[8]”

$method
[1] “shell”

$app_name
[1] “sparklyr”

$config
$config$sparklyr.cores.local
[1] 8

$config$spark.sql.shuffle.partitions.local
[1] 8

$config$spark.env.SPARK_LOCAL_IP.local
[1] “127.0.0.1”

$config$sparklyr.csv.embedded
[1] “^1.*”

$config$`sparklyr.shell.driver-class-path`
[1] “”

attr(,”config”)
[1] “default”
attr(,”file”)
[1] “/Library/Frameworks/R.framework/Versions/3.4/Resources/library/sparklyr/conf/config-template.yml”

$spark_home
[1] “/Volumes/OSxexT/tools/spark-2.1.0-bin-hadoop2.6”

$backend
A connection with
description “->localhost:53374”
class “sockconn”
mode “wb”
text “binary”
opened “opened”
can read “yes”
can write “yes”

$monitor
A connection with
description “->localhost:8880”
class “sockconn”
mode “rb”
text “binary”
opened “opened”
can read “yes”
can write “yes”

$output_file
[1] “/var/folders/x7/331tvwcd6p17jj9zdmhnkpyc0000gn/T//RtmpIIVL8I/file6ba7b454325_spark.log”

$spark_context
<jobj[5]>
class org.apache.spark.SparkContext
org.apache.spark.SparkContext@159ba51c

$java_context
<jobj[6]>
class org.apache.spark.api.java.JavaSparkContext
org.apache.spark.api.java.JavaSparkContext@6b114a2d

$hive_context
<jobj[9]>
class org.apache.spark.sql.SparkSession
org.apache.spark.sql.SparkSession@2cd7fdf8

attr(,”class”)
[1] “spark_connection” “spark_shell_connection” “DBIConnection”
> h2o_context(sc, st)
Error in is.H2OFrame(x) : object ‘st’ not found
> h2o_context(sc, strict_version_check = FALSE)
<jobj[15]>
class org.apache.spark.h2o.H2OContext

Sparkling Water Context:
* H2O name: sparkling-water-avkashchauhan_1672148412
* cluster size: 1
* list of used nodes:
(executorId, host, port)
————————
(driver,127.0.0.1,54321)
————————

Open H2O Flow in browser: http://127.0.0.1:54321 (CMD + click in Mac OSX)

You can use the following command to launch H2O FLOW:

h2o_flow(sc, strict_version_check = FALSE)

Thats it, enjoy!!

 

Binomial classification example in Scala and GBM with H2O

Here is a sample for binomial classification problem using H2O GBM algorithm using Credit Card data set in Scala language.

The following sample is for multinomial classification problem. This sample is created using Spark 2.1.0 with Sparkling Water 2.1.4.

import org.apache.spark.h2o._
import water.support.SparkContextSupport.addFiles
import org.apache.spark.SparkFiles
import java.io.File
import water.support.{H2OFrameSupport, SparkContextSupport, ModelMetricsSupport}
import water.Key
import _root_.hex.glm.GLMModel
import _root_.hex.ModelMetricsBinomial


val hc = H2OContext.getOrCreate(sc)
import hc._
import hc.implicits._

addFiles(sc, "/Users/avkashchauhan/learn/deepwater/credit_card_clients.csv")
val creditCardData = new H2OFrame(new File(SparkFiles.get("credit_card_clients.csv")))

val ratios = Array[Double](0.8)
val keys = Array[String]("train.hex", "valid.hex")
val frs = H2OFrameSupport.split(creditCardData, keys, ratios)
val (train, valid) = (frs(0), frs(1))

def buildGLMModel(train: Frame, valid: Frame, response: String)
 (implicit h2oContext: H2OContext): GLMModel = {
 import _root_.hex.glm.GLMModel.GLMParameters.Family
 import _root_.hex.glm.GLM
 import _root_.hex.glm.GLMModel.GLMParameters
 val glmParams = new GLMParameters(Family.binomial)
 glmParams._train = train
 glmParams._valid = valid
 glmParams._response_column = response
 glmParams._alpha = Array[Double](0.5)
 val glm = new GLM(glmParams, Key.make("glmModel.hex"))
 glm.trainModel().get()
 //val glmModel = glm.trainModel().get()
}

val glmModel = buildGLMModel(train, valid, 'default_payment_next_month)(hc)

// Collect model metrics and evaluate model quality
val trainMetrics = ModelMetricsSupport.modelMetrics[ModelMetricsBinomial](glmModel, train)
val validMetrics = ModelMetricsSupport.modelMetrics[ModelMetricsBinomial](glmModel, valid)
println(trainMetrics.rmse)
println(validMetrics.rmse)
println(trainMetrics.mse)
println(validMetrics.mse)
println(trainMetrics.r2)
println(validMetrics.r2)
println(trainMetrics.auc)
println(validMetrics.auc)

// Preduction
addFiles(sc, "/Users/avkashchauhan/learn/deepwater/credit_card_predict.csv")
val creditPredictData = new H2OFrame(new File(SparkFiles.get("credit_card_predict.csv")))

val predictionFrame = glmModel.score(creditPredictData)
var predictonResults = asRDD[DoubleHolder](predictionFrame).collect.map(_.result.getOrElse(Double.NaN))

Thats it, enjoy!!

Binomial classification example in Scala and GLM with H2O

Here is a sample for binomial classification problem using H2O GLM algorithm using Credit Card data set in Scala language.

The following sample is for multinomial classification problem. This sample is created using Spark 2.1.0 with Sparkling Water 2.1.4.

import org.apache.spark.h2o._
import water.support.SparkContextSupport.addFiles
import org.apache.spark.SparkFiles
import java.io.File
import water.support.{H2OFrameSupport, SparkContextSupport, ModelMetricsSupport}
import water.Key
import _root_.hex.glm.GLMModel
import _root_.hex.ModelMetricsBinomial


val hc = H2OContext.getOrCreate(sc)
import hc._
import hc.implicits._

addFiles(sc, "/Users/avkashchauhan/learn/deepwater/credit_card_clients.csv")
val creditCardData = new H2OFrame(new File(SparkFiles.get("credit_card_clients.csv")))

val ratios = Array[Double](0.8)
val keys = Array[String]("train.hex", "valid.hex")
val frs = H2OFrameSupport.split(creditCardData, keys, ratios)
val (train, valid) = (frs(0), frs(1))

def buildGLMModel(train: Frame, valid: Frame, response: String)
 (implicit h2oContext: H2OContext): GLMModel = {
 import _root_.hex.glm.GLMModel.GLMParameters.Family
 import _root_.hex.glm.GLM
 import _root_.hex.glm.GLMModel.GLMParameters
 val glmParams = new GLMParameters(Family.binomial)
 glmParams._train = train
 glmParams._valid = valid
 glmParams._response_column = response
 glmParams._alpha = Array[Double](0.5)
 val glm = new GLM(glmParams, Key.make("glmModel.hex"))
 glm.trainModel().get()
 //val glmModel = glm.trainModel().get()
}

val glmModel = buildGLMModel(train, valid, 'default_payment_next_month)(hc)

// Collect model metrics and evaluate model quality
val trainMetrics = ModelMetricsSupport.modelMetrics[ModelMetricsBinomial](glmModel, train)
val validMetrics = ModelMetricsSupport.modelMetrics[ModelMetricsBinomial](glmModel, valid)
println(trainMetrics.rmse)
println(validMetrics.rmse)
println(trainMetrics.mse)
println(validMetrics.mse)
println(trainMetrics.r2)
println(validMetrics.r2)
println(trainMetrics.auc)
println(validMetrics.auc)

// Prediction
addFiles(sc, "/Users/avkashchauhan/learn/deepwater/credit_card_predict.csv")
val creditPredictData = new H2OFrame(new File(SparkFiles.get("credit_card_predict.csv")))

val predictionFrame = glmModel.score(creditPredictData)
var predictonResults = asRDD[DoubleHolder](predictionFrame).collect.map(_.result.getOrElse(Double.NaN))

 

Thats it, enjoy!!

Multinomial classification example in Scala and Deep Learning with H2O

Here is a sample for multinomial classification problem using H2O Deep Learning algorithm and iris data set in Scala language.

The following sample is for multinomial classification problem. This sample is created using Spark 2.1.0 with Sparkling Water 2.1.4.

import org.apache.spark.h2o._
import water.support.SparkContextSupport.addFiles
import org.apache.spark.SparkFiles
import java.io.File
import water.support.{H2OFrameSupport, SparkContextSupport, ModelMetricsSupport}
import water.Key
import _root_.hex.deeplearning.DeepLearningModel
import _root_.hex.ModelMetricsMultinomial


val hc = H2OContext.getOrCreate(sc)
import hc._
import hc.implicits._

addFiles(sc, "/Users/avkashchauhan/smalldata/iris/iris.csv")
val irisData = new H2OFrame(new File(SparkFiles.get("iris.csv")))

val ratios = Array[Double](0.8)
val keys = Array[String]("train.hex", "valid.hex")
val frs = H2OFrameSupport.split(irisData, keys, ratios)
val (train, valid) = (frs(0), frs(1))

def buildDLModel(train: Frame, valid: Frame, response: String,
 epochs: Int = 10, l1: Double = 0.001, l2: Double = 0.0,
 hidden: Array[Int] = Array[Int](200, 200))
 (implicit h2oContext: H2OContext): DeepLearningModel = {
 import h2oContext.implicits._
 // Build a model
 import _root_.hex.deeplearning.DeepLearning
 import _root_.hex.deeplearning.DeepLearningModel.DeepLearningParameters
 val dlParams = new DeepLearningParameters()
 dlParams._train = train
 dlParams._valid = valid
 dlParams._response_column = response
 dlParams._epochs = epochs
 dlParams._l1 = l1
 dlParams._hidden = hidden
 // Create a job
 val dl = new DeepLearning(dlParams, Key.make("dlModel.hex"))
 dl.trainModel.get
}


// Note: The response column name is C5 here so passing:
val dlModel = buildDLModel(train, valid, 'C5)(hc)

// Collect model metrics and evaluate model quality
val trainMetrics = ModelMetricsSupport.modelMetrics[ModelMetricsMultinomial](dlModel, train)
val validMetrics = ModelMetricsSupport.modelMetrics[ModelMetricsMultinomial](dlModel, valid)
println(trainMetrics.rmse)
println(validMetrics.rmse)
println(trainMetrics.mse)
println(validMetrics.mse)
println(trainMetrics.r2)
println(validMetrics.r2)

Thats it, enjoy!!