RSparkling > The best of R + H2O + Spark

What you get from R + H2O + Spark?

R is great for statistical computing and graphics, and small scale data preparation, H2O is amazing distributed machine learning platform designed for scale and speed and Spark is great for super fast data processing at mega scale. So combining all of these 3 together you get the best of data science, machine learning and data processing, all in one.

rsparkling: The rsparkling R package is an extension package for sparklyr that creates an R front-end for the Sparkling WaterSpark package from H2O. This provides an interface to H2O’s high performance, distributed machine learning algorithms on Spark, using R.

SparkR is an R package that provides a light-weight frontend to use Apache Spark from R. In Spark 2.2.0, SparkR provides a distributed data frame implementation that supports operations like selection, filtering, aggregation etc. (similar to R data frames, dplyr) but on large datasets. SparkR also supports distributed machine learning using MLlib.

H2O is an in-memory platform for distributed, scalable machine learning. H2O uses familiar interfaces like R, Python, Scala, Java, JSON and the Flow notebook/web interface, and works seamlessly with big data technologies like Hadoop and Spark.

Apache Spark is a fast and general engine for big data processing, with built-in modules for streaming, SQL, machine learning and graph processing.

Sparkling Water integrates H2O’s fast scalable machine learning engine with Spark. With Sparkling Water you can publish Spark data structures (RDDs, DataFrames, Datasets) as H2O’s frames and vice versa, DSL to use Spark data structures as input for H2O’s algorithms. You can create ML applications utilizing Spark and H2O APIs, and Python interface enabling use of Sparkling Water directly from PySpark.

Installation Packages:

Quick Start Script:

Sys.setenv(SPARK_HOME='/Users/avkashchauhan/tools/spark-2.1.0-bin-hadoop2.7')
options(rsparkling.sparklingwater.version = "2.1.14") 
options(rsparkling.sparklingwater.location = "/Users/avkashchauhan/tools/sw2/sparkling-water-2.1.14/assembly/build/libs/sparkling-water-assembly_2.11-2.1.14-all.jar")
library(rsparkling)
library(sparklyr)
sc = spark_connect(master = "local", version = "2.1.0")
sc
h2o_context(sc, strict_version_check = FALSE)
library(h2o)
h2o.clusterInfo()
h2o_flow(sc)
spark_disconnect(sc)

Important Settings for your environment:

  • master = “local” > To start local spark cluster
  • master = “yarn-client” > To start a cluster managed by YARN
  • To get a list of supported Sparkling Water versions: h2o_release_table()
  • When you will call spark_connect() you will see a new “tab” appears
    • Tab “Spark” is used to launch “SparkUI”
    • Tab “Log” is used to collect spark logs
  • If there is any issue with sparklyr and spark version pass exact version above otherwise you dont need to pass version.

Startup Script with config parameters to set executor settings:

These are the settings you will use to get our rsparkling/spark session up and running in RStudio:

Sys.setenv(SPARK_HOME='/Users/avkashchauhan/tools/spark-2.1.0-bin-hadoop2.7')
options(rsparkling.sparklingwater.version = "2.1.14") 
options(rsparkling.sparklingwater.location = "/Users/avkashchauhan/tools/sw2/sparkling-water-2.1.14/assembly/build/libs/sparkling-water-assembly_2.11-2.1.14-all.jar")
library(rsparkling)
library(sparklyr)
config <- spark_config()
config$spark.executor.cores <- 4
config$spark.executor.memory <- "4G”
config$spark.executor.instances = 3  <==== This will create 3 Nodes Instance
sc <- spark_connect(master = "local", config = config, version = '2.1.0')
sc
h2o_context(sc, strict_version_check = FALSE)
library(h2o)
h2o.clusterInfo()
spark_disconnect(sc)

Accessing SparkUI:

You can access Spark UI just by clicking  SparkUI button at the spark tab as shown below:

Screen Shot 2017-10-28 at 9.54.48 AM

Accessing H2O FLOW UI:

You just need to pass the command to open H2O FLOW UI on selected browser:

h2o_flow()

Screen Shot 2017-10-28 at 9.55.03 AM

Building H2O GLM model using rsparkling + sparklyr + H2O:

In This example we are ingesting the famous “CARS & MPG” dataset and building a GLM (Generalized Linear Model) to predict the miles-per-gallon from the given specification of car capabilities:

options(rsparkling.sparklingwater.location = "/tmp/sparkling-water-assembly_2.11-2.1.7-all.jar")
library(rsparkling)
library(sparklyr)
library(h2o)
sc <- spark_connect(master = "local", version = "2.1.0")
mtcars_tbl <- copy_to(sc, mtcars, "mtcars")
sciris_tbl <- copy_to(sc, iris)
mtcars_tbl <- copy_to(sc, mtcars, "iris1")
mtcars_tbl <- copy_to(sc, mtcars, "mtcars")
mtcars_tbl <- copy_to(sc, mtcars, "mtcars", overwrite = TRUE)
mtcars_h2o <- as_h2o_frame(sc, mtcars_tbl, strict_version_check = FALSE)
mtcars_glm <- h2o.glm(x = c("wt", "cyl"),mtcars_glm <- h2o.glm(x = c("wt", "cyl"),y = "mpg",training_frame = mtcars_h2o,lambda_search = TRUE)
mtcars_glm
spark_disconnect(sc)

That’s all, enjoy!!

Advertisements

Flatten complex nested parquet files on Hadoop with Herringbone

Herringbone

Herringbone is a suite of tools for working with parquet files on hdfs, and with impala and hive.https://github.com/stripe/herringbone

Please visit my github and this specific page for more details.

Installation:

Note: You must be using a Hadoop machine and herringbone needs Hadoop environmet.

Pre-requsite : Thrift

  • Thrift 0.9.1 (MUST have 0.9.1 as 0.9.3 and 0.10.0 will give error while packaging)
  • Get thrift 0.9.1 Link

Pre-requsite : Impala

  • First setup Cloudera repo in your machine:
  • Install Impala
    • Install impala : $ sudo apt-get install impala
    • Install impala Server : $ sudo apt-get install impala-server
    • Install impala stat-store : $ sudo apt-get install impala-state-store
    • Install impala shell : $ sudo apt-get install impala-shell
    • Verify : impala : $ impala-shell
impala-shell
Starting Impala Shell without Kerberos authentication
Connected to mr-0xd7-precise1.0xdata.loc:21000
Server version: impalad version 2.6.0-cdh5.8.4 RELEASE (build 207450616f75adbe082a4c2e1145a2384da83fa6)
Welcome to the Impala shell. Press TAB twice to see a list of available commands.

Copyright (c) 2012 Cloudera, Inc. All rights reserved.

(Shell build version: Impala Shell v1.4.0-cdh4-INTERNAL (08fa346) built on Mon Jul 14 15:52:52 PDT 2014)

Building : Herringbone source

Here is the successful herringbone “mvn package” command log for your review:

[INFO] Scanning for projects...
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Build Order:
[INFO]
[INFO] Herringbone Impala
[INFO] Herringbone Main
[INFO] Herringbone
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Herringbone Impala 0.0.2
[INFO] ------------------------------------------------------------------------
..
..
..
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Herringbone 0.0.1
[INFO] ------------------------------------------------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] Herringbone Impala ................................. SUCCESS [ 2.930 s]
[INFO] Herringbone Main ................................... SUCCESS [ 13.012 s]
[INFO] Herringbone ........................................ SUCCESS [ 0.000 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 16.079 s
[INFO] Finished at: 2017-10-06T11:27:20-07:00
[INFO] Final Memory: 90M/1963M
[INFO] ------------------------------------------------------------------------

Using Herringbone

Note: You must have fiels on Hadoop, not on local file system

Verify the file on Hadoop:

  • ~/herringbone$ hadoop fs -ls /user/avkash/file-test1.parquet
  • -rw-r–r– 3 avkash avkash 1463376 2017-09-13 16:56 /user/avkash/file-test1.parquet
  • ~/herringbone$ bin/herringbone flatten -i /user/avkash/file-test1.parquet
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/avkash/herringbone/herringbone-main/target/herringbone-0.0.1-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.8.4-1.cdh5.8.4.p0.5/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
17/10/06 12:06:44 INFO client.RMProxy: Connecting to ResourceManager at mr-0xd1-precise1.0xdata.loc/172.16.2.211:8032
17/10/06 12:06:45 INFO Configuration.deprecation: mapred.max.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.maxsize
17/10/06 12:06:45 INFO input.FileInputFormat: Total input paths to process : 1
17/10/06 12:06:45 INFO Configuration.deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
1 initial splits were generated.
  Max: 1.34M
  Min: 1.34M
  Avg: 1.34M
1 merged splits were generated.
  Max: 1.34M
  Min: 1.34M
  Avg: 1.34M
17/10/06 12:06:45 INFO mapreduce.JobSubmitter: number of splits:1
17/10/06 12:06:45 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1499294366934_0707
17/10/06 12:06:45 INFO impl.YarnClientImpl: Submitted application application_1499294366934_0707
17/10/06 12:06:46 INFO mapreduce.Job: The url to track the job: http://mr-0xd1-precise1.0xdata.loc:8088/proxy/application_1499294366934_0707/
17/10/06 12:06:46 INFO mapreduce.Job: Running job: job_1499294366934_0707
17/10/06 12:06:52 INFO mapreduce.Job: Job job_1499294366934_0707 running in uber mode : false
17/10/06 12:06:52 INFO mapreduce.Job:  map 0% reduce 0%
17/10/06 12:07:22 INFO mapreduce.Job:  map 100% reduce 0%

Now verify the file:

~/herringbone$ hadoop fs -ls /user/avkash/file-test1.parquet-flat

Found 2 items
-rw-r--r--   3 avkash avkash          0 2017-10-06 12:07 /user/avkash/file-test1.parquet-flat/_SUCCESS
-rw-r--r--   3 avkash avkash    2901311 2017-10-06 12:07 /user/avkash/file-test1.parquet-flat/part-m-00000.parquet

Thats it, enjoy!!

Reading nested parquet file in Scala and exporting to CSV

Recently we were working on a problem where the parquet compressed file had lots of nested tables and some of the tables had columns with array type and our objective was to read it and save it to CSV.

We wrote a script in Scala which does the following

  • Handles nested parquet compressed content
  • Look for columns as “Array” and then remove those columns

Here is a the script

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colPath = if (prefix == null) s"`${f.name}`" else s"${prefix}.`${f.name}`"

    f.dataType match {
      case st: StructType => flattenSchema(st, colPath)
      // Skip user defined types like array or vectors
      case x if x.isInstanceOf[ArrayType] => Array.empty[Column]
      case _ => Array(col(colPath).alias(colPath.replaceAll("[.`]", "_")))
    }
  })
}

Here are the all the steps you would need to take while reading the parquet compressed content and then exporting it to disk as CSV.

val spark = new org.apache.spark.sql.SQLContext(sc)
import org.apache.spark.sql.types._
import org.apache.spark.sql.Column
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

scala> :paste
// Entering paste mode (ctrl-D to finish)

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colPath = if (prefix == null) s"`${f.name}`" else s"${prefix}.`${f.name}`"

    f.dataType match {
      case st: StructType => flattenSchema(st, colPath)
      // Skip user defined types like array or vectors
      case x if x.isInstanceOf[ArrayType] => Array.empty[Column]
      case _ => Array(col(colPath).alias(colPath.replaceAll("[.`]", "_")))
    }
  })
}

// Exiting paste mode, now interpreting.

flattenSchema: (schema: org.apache.spark.sql.types.StructType, prefix: String)Array[org.apache.spark.sql.Column]

scala >

val df = spark.read.parquet("/user/avkash/test.parquet")

df.select(flattenSchema(df.schema):_*).write.format("com.databricks.spark.csv").save("/Users/avkashchauhan/Downloads/saveit/result.csv")

If you want to see the full working scripts with output you can visit any of the following link based on your Spark Version:

  • Here is the full working demo in Spark 2.1.0
  • Here is the full working demo in Spark 1.6.x.

We got some help from the StackOverflow discussion here. Michal K and Michal M helped me to write above solution.

Thats it, enjoy!!

Generating ROC curve in SCALA from H2O binary classification models

You can use the following blog to built a binomial classification  GLM model:
To collect model metrics  for training use the following:
val trainMetrics = ModelMetricsSupport.modelMetrics[ModelMetricsBinomial](glmModel, train)
Now you can access model AUC (_auc object) as below:
Note: _auc object has array of thresholds, and then for each threshold it has fps and tps
(use tab completion to list them all)
scala> trainMetrics._auc.
_auc   _gini      _n       _p     _tps      buildCM   defaultCM    defaultThreshold   forCriterion   frozenType   pr_auc   readExternal   reloadFromBytes   tn             tp      writeExternal   
_fps   _max_idx   _nBins   _ths   asBytes   clone     defaultErr   fn                 fp             maxF1        read     readJSON       threshold         toJsonString   write   writeJSON
In the above AUC object:
_fps  =  false positives
_tps  =  true positives
_ths  =  threshold values
_p    =  actual trues
_n    =  actual false
Now you can use individual ROC specific values as below to recreate ROC:
trainMetrics._auc._fps
trainMetrics._auc._tps
trainMetrics._auc._ths
To print the whole array in the terminal for inspection, you just need the following:
val dd = trainMetrics._auc._fps
println(dd.mkString(" "))
You can access true positives and true negatives as below where actual trues and actual false are defined as below:
_p    =  actual trues

_n    =  actual false
scala> trainMetrics._auc._n
res42: Double = 2979.0

scala> trainMetrics._auc._p
res43: Double = 1711.0
Thats it, enjoy!!

Starter script for rsparkling (H2O on Spark with R)

The rsparkling R package is an extension package for sparklyr that creates an R front-end for the Sparkling WaterSpark package from H2O. This provides an interface to H2O’s high performance, distributed machine learning algorithms on Spark, using R. Visit github project: https://github.com/h2oai/rsparkling

You must have the following package installed in your R environment:

You must have Sparkling Water latest package download and unzipped locally:

I am using the following package in my environment:

  • Spark 2.1
  • Sparkling Water 2.1.8
  • sparklyr 0.4.4
  • rsparkling 0.2.0

Now here is rspakrling script to create the cluster locally:

options(rsparkling.sparklingwater.location="/tmp/sparkling-water-assembly_2.11-2.1.8-all.jar")
Sys.setenv(SPARK_HOME="/usr/hdp/current/spark2-client/")
library(sparklyr)
library(rsparkling)
config <- spark_config()
config$spark.executor.cores <- 4
config$spark.executor.memory <- "4G"
sc <- spark_connect(master = "local", config = config, version = '2.1.0')
print(sc)
h2o_context(sc, strict_version_check = FALSE)
h2o_flow(sc, strict_version_check = FALSE)
spark_disconnect(sc)

Now here is the rsparkling script to create Spark cluster with Yarn:

options(rsparkling.sparklingwater.location="/tmp/sparkling-water-assembly_2.11-2.1.8-all.jar")
Sys.setenv(SPARK_HOME="/usr/hdp/current/spark2-client/")
library(sparklyr)
library(rsparkling)
config <- spark_config()
config$spark.executor.cores <- 4
config$spark.executor.memory <- "4G"
config$spark.executor.instances = 2
sc <- spark_connect(master = "yarn-client", config = config, version = '2.1.0')
print(sc)
h2o_context(sc, strict_version_check = FALSE)
h2o_flow(sc, strict_version_check = FALSE)
spark_disconnect(sc)

Thats it, Enjoy!!

Binomial classification example in Scala and GBM with H2O

Here is a sample for binomial classification problem using H2O GBM algorithm using Credit Card data set in Scala language.

The following sample is for multinomial classification problem. This sample is created using Spark 2.1.0 with Sparkling Water 2.1.4.

import org.apache.spark.h2o._
import water.support.SparkContextSupport.addFiles
import org.apache.spark.SparkFiles
import java.io.File
import water.support.{H2OFrameSupport, SparkContextSupport, ModelMetricsSupport}
import water.Key
import _root_.hex.glm.GLMModel
import _root_.hex.ModelMetricsBinomial


val hc = H2OContext.getOrCreate(sc)
import hc._
import hc.implicits._

addFiles(sc, "/Users/avkashchauhan/learn/deepwater/credit_card_clients.csv")
val creditCardData = new H2OFrame(new File(SparkFiles.get("credit_card_clients.csv")))

val ratios = Array[Double](0.8)
val keys = Array[String]("train.hex", "valid.hex")
val frs = H2OFrameSupport.split(creditCardData, keys, ratios)
val (train, valid) = (frs(0), frs(1))

def buildGLMModel(train: Frame, valid: Frame, response: String)
 (implicit h2oContext: H2OContext): GLMModel = {
 import _root_.hex.glm.GLMModel.GLMParameters.Family
 import _root_.hex.glm.GLM
 import _root_.hex.glm.GLMModel.GLMParameters
 val glmParams = new GLMParameters(Family.binomial)
 glmParams._train = train
 glmParams._valid = valid
 glmParams._response_column = response
 glmParams._alpha = Array[Double](0.5)
 val glm = new GLM(glmParams, Key.make("glmModel.hex"))
 glm.trainModel().get()
 //val glmModel = glm.trainModel().get()
}

val glmModel = buildGLMModel(train, valid, 'default_payment_next_month)(hc)

// Collect model metrics and evaluate model quality
val trainMetrics = ModelMetricsSupport.modelMetrics[ModelMetricsBinomial](glmModel, train)
val validMetrics = ModelMetricsSupport.modelMetrics[ModelMetricsBinomial](glmModel, valid)
println(trainMetrics.rmse)
println(validMetrics.rmse)
println(trainMetrics.mse)
println(validMetrics.mse)
println(trainMetrics.r2)
println(validMetrics.r2)
println(trainMetrics.auc)
println(validMetrics.auc)

// Preduction
addFiles(sc, "/Users/avkashchauhan/learn/deepwater/credit_card_predict.csv")
val creditPredictData = new H2OFrame(new File(SparkFiles.get("credit_card_predict.csv")))

val predictionFrame = glmModel.score(creditPredictData)
var predictonResults = asRDD[DoubleHolder](predictionFrame).collect.map(_.result.getOrElse(Double.NaN))

Thats it, enjoy!!

Binomial classification example in Scala and GLM with H2O

Here is a sample for binomial classification problem using H2O GLM algorithm using Credit Card data set in Scala language.

The following sample is for multinomial classification problem. This sample is created using Spark 2.1.0 with Sparkling Water 2.1.4.

import org.apache.spark.h2o._
import water.support.SparkContextSupport.addFiles
import org.apache.spark.SparkFiles
import java.io.File
import water.support.{H2OFrameSupport, SparkContextSupport, ModelMetricsSupport}
import water.Key
import _root_.hex.glm.GLMModel
import _root_.hex.ModelMetricsBinomial


val hc = H2OContext.getOrCreate(sc)
import hc._
import hc.implicits._

addFiles(sc, "/Users/avkashchauhan/learn/deepwater/credit_card_clients.csv")
val creditCardData = new H2OFrame(new File(SparkFiles.get("credit_card_clients.csv")))

val ratios = Array[Double](0.8)
val keys = Array[String]("train.hex", "valid.hex")
val frs = H2OFrameSupport.split(creditCardData, keys, ratios)
val (train, valid) = (frs(0), frs(1))

def buildGLMModel(train: Frame, valid: Frame, response: String)
 (implicit h2oContext: H2OContext): GLMModel = {
 import _root_.hex.glm.GLMModel.GLMParameters.Family
 import _root_.hex.glm.GLM
 import _root_.hex.glm.GLMModel.GLMParameters
 val glmParams = new GLMParameters(Family.binomial)
 glmParams._train = train
 glmParams._valid = valid
 glmParams._response_column = response
 glmParams._alpha = Array[Double](0.5)
 val glm = new GLM(glmParams, Key.make("glmModel.hex"))
 glm.trainModel().get()
 //val glmModel = glm.trainModel().get()
}

val glmModel = buildGLMModel(train, valid, 'default_payment_next_month)(hc)

// Collect model metrics and evaluate model quality
val trainMetrics = ModelMetricsSupport.modelMetrics[ModelMetricsBinomial](glmModel, train)
val validMetrics = ModelMetricsSupport.modelMetrics[ModelMetricsBinomial](glmModel, valid)
println(trainMetrics.rmse)
println(validMetrics.rmse)
println(trainMetrics.mse)
println(validMetrics.mse)
println(trainMetrics.r2)
println(validMetrics.r2)
println(trainMetrics.auc)
println(validMetrics.auc)

// Prediction
addFiles(sc, "/Users/avkashchauhan/learn/deepwater/credit_card_predict.csv")
val creditPredictData = new H2OFrame(new File(SparkFiles.get("credit_card_predict.csv")))

val predictionFrame = glmModel.score(creditPredictData)
var predictonResults = asRDD[DoubleHolder](predictionFrame).collect.map(_.result.getOrElse(Double.NaN))

 

Thats it, enjoy!!