RSparkling > The best of R + H2O + Spark

What you get from R + H2O + Spark?

R is great for statistical computing and graphics, and small scale data preparation, H2O is amazing distributed machine learning platform designed for scale and speed and Spark is great for super fast data processing at mega scale. So combining all of these 3 together you get the best of data science, machine learning and data processing, all in one.

rsparkling: The rsparkling R package is an extension package for sparklyr that creates an R front-end for the Sparkling WaterSpark package from H2O. This provides an interface to H2O’s high performance, distributed machine learning algorithms on Spark, using R.

SparkR is an R package that provides a light-weight frontend to use Apache Spark from R. In Spark 2.2.0, SparkR provides a distributed data frame implementation that supports operations like selection, filtering, aggregation etc. (similar to R data frames, dplyr) but on large datasets. SparkR also supports distributed machine learning using MLlib.

H2O is an in-memory platform for distributed, scalable machine learning. H2O uses familiar interfaces like R, Python, Scala, Java, JSON and the Flow notebook/web interface, and works seamlessly with big data technologies like Hadoop and Spark.

Apache Spark is a fast and general engine for big data processing, with built-in modules for streaming, SQL, machine learning and graph processing.

Sparkling Water integrates H2O’s fast scalable machine learning engine with Spark. With Sparkling Water you can publish Spark data structures (RDDs, DataFrames, Datasets) as H2O’s frames and vice versa, DSL to use Spark data structures as input for H2O’s algorithms. You can create ML applications utilizing Spark and H2O APIs, and Python interface enabling use of Sparkling Water directly from PySpark.

Installation Packages:

Quick Start Script:

options(rsparkling.sparklingwater.version = "2.1.14") 
options(rsparkling.sparklingwater.location = "/Users/avkashchauhan/tools/sw2/sparkling-water-2.1.14/assembly/build/libs/sparkling-water-assembly_2.11-2.1.14-all.jar")
sc = spark_connect(master = "local", version = "2.1.0")
h2o_context(sc, strict_version_check = FALSE)

Important Settings for your environment:

  • master = “local” > To start local spark cluster
  • master = “yarn-client” > To start a cluster managed by YARN
  • To get a list of supported Sparkling Water versions: h2o_release_table()
  • When you will call spark_connect() you will see a new “tab” appears
    • Tab “Spark” is used to launch “SparkUI”
    • Tab “Log” is used to collect spark logs
  • If there is any issue with sparklyr and spark version pass exact version above otherwise you dont need to pass version.

Startup Script with config parameters to set executor settings:

These are the settings you will use to get our rsparkling/spark session up and running in RStudio:

options(rsparkling.sparklingwater.version = "2.1.14") 
options(rsparkling.sparklingwater.location = "/Users/avkashchauhan/tools/sw2/sparkling-water-2.1.14/assembly/build/libs/sparkling-water-assembly_2.11-2.1.14-all.jar")
config <- spark_config()
config$spark.executor.cores <- 4
config$spark.executor.memory <- "4G”
config$spark.executor.instances = 3  <==== This will create 3 Nodes Instance
sc <- spark_connect(master = "local", config = config, version = '2.1.0')
h2o_context(sc, strict_version_check = FALSE)

Accessing SparkUI:

You can access Spark UI just by clicking  SparkUI button at the spark tab as shown below:

Screen Shot 2017-10-28 at 9.54.48 AM

Accessing H2O FLOW UI:

You just need to pass the command to open H2O FLOW UI on selected browser:


Screen Shot 2017-10-28 at 9.55.03 AM

Building H2O GLM model using rsparkling + sparklyr + H2O:

In This example we are ingesting the famous “CARS & MPG” dataset and building a GLM (Generalized Linear Model) to predict the miles-per-gallon from the given specification of car capabilities:

options(rsparkling.sparklingwater.location = "/tmp/sparkling-water-assembly_2.11-2.1.7-all.jar")
sc <- spark_connect(master = "local", version = "2.1.0")
mtcars_tbl <- copy_to(sc, mtcars, "mtcars")
sciris_tbl <- copy_to(sc, iris)
mtcars_tbl <- copy_to(sc, mtcars, "iris1")
mtcars_tbl <- copy_to(sc, mtcars, "mtcars")
mtcars_tbl <- copy_to(sc, mtcars, "mtcars", overwrite = TRUE)
mtcars_h2o <- as_h2o_frame(sc, mtcars_tbl, strict_version_check = FALSE)
mtcars_glm <- h2o.glm(x = c("wt", "cyl"),mtcars_glm <- h2o.glm(x = c("wt", "cyl"),y = "mpg",training_frame = mtcars_h2o,lambda_search = TRUE)

That’s all, enjoy!!

Launching H2O cluster on different port in pysparkling

In this example we will launch H2O machine learning cluster using pysparkling package. You can visit my github and this article to learn more about the code execution explained in this article.

For you would need to  install pysparkling in python 2.7 setup as below:

> pip install -U h2o_pysparkling_2.1

Now we can launch the pysparkling Shell as below:


Launch pysparkling shell:

~/tools/sw2/sparkling-water-2.1.14 $ bin/pysparkling

Python Code Script Launch the H2O cluster in pysparkling:

## Importing Libraries
from pysparkling import *
import h2o

## Setting H2O Conf Object
h2oConf = H2OConf(sc)

## Setting H2O Conf for different port

## Gett H2O Conf Object to see the configuration

## Launching H2O Cluster
hc = H2OContext.getOrCreate(spark, h2oConf)

## Getting H2O Cluster status

Now If you verify the Sparkling Water configuration you will see that the H2O is running on the given IP and port 54300 as configured:

Sparkling Water configuration:
  backend cluster mode : internal
  workers              : None
  cloudName            : Not set yet, it will be set automatically before starting H2OContext.
  flatfile             : true
  clientBasePort       : 54300
  nodeBasePort         : 54300
  cloudTimeout         : 60000
  h2oNodeLog           : INFO
  h2oClientLog         : WARN
  nthreads             : -1
  drddMulFactor        : 10

Thats it, enjoy!!

Reading nested parquet file in Scala and exporting to CSV

Recently we were working on a problem where the parquet compressed file had lots of nested tables and some of the tables had columns with array type and our objective was to read it and save it to CSV.

We wrote a script in Scala which does the following

  • Handles nested parquet compressed content
  • Look for columns as “Array” and then remove those columns

Here is a the script

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colPath = if (prefix == null) s"`${}`" else s"${prefix}.`${}`"

    f.dataType match {
      case st: StructType => flattenSchema(st, colPath)
      // Skip user defined types like array or vectors
      case x if x.isInstanceOf[ArrayType] => Array.empty[Column]
      case _ => Array(col(colPath).alias(colPath.replaceAll("[.`]", "_")))

Here are the all the steps you would need to take while reading the parquet compressed content and then exporting it to disk as CSV.

val spark = new org.apache.spark.sql.SQLContext(sc)
import org.apache.spark.sql.types._
import org.apache.spark.sql.Column
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

scala> :paste
// Entering paste mode (ctrl-D to finish)

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colPath = if (prefix == null) s"`${}`" else s"${prefix}.`${}`"

    f.dataType match {
      case st: StructType => flattenSchema(st, colPath)
      // Skip user defined types like array or vectors
      case x if x.isInstanceOf[ArrayType] => Array.empty[Column]
      case _ => Array(col(colPath).alias(colPath.replaceAll("[.`]", "_")))

// Exiting paste mode, now interpreting.

flattenSchema: (schema: org.apache.spark.sql.types.StructType, prefix: String)Array[org.apache.spark.sql.Column]

scala >

val df ="/user/avkash/test.parquet")*).write.format("com.databricks.spark.csv").save("/Users/avkashchauhan/Downloads/saveit/result.csv")

If you want to see the full working scripts with output you can visit any of the following link based on your Spark Version:

  • Here is the full working demo in Spark 2.1.0
  • Here is the full working demo in Spark 1.6.x.

We got some help from the StackOverflow discussion here. Michal K and Michal M helped me to write above solution.

Thats it, enjoy!!

Binomial classification example in Scala and GBM with H2O

Here is a sample for binomial classification problem using H2O GBM algorithm using Credit Card data set in Scala language.

The following sample is for multinomial classification problem. This sample is created using Spark 2.1.0 with Sparkling Water 2.1.4.

import org.apache.spark.h2o._
import org.apache.spark.SparkFiles
import{H2OFrameSupport, SparkContextSupport, ModelMetricsSupport}
import water.Key
import _root_.hex.glm.GLMModel
import _root_.hex.ModelMetricsBinomial

val hc = H2OContext.getOrCreate(sc)
import hc._
import hc.implicits._

addFiles(sc, "/Users/avkashchauhan/learn/deepwater/credit_card_clients.csv")
val creditCardData = new H2OFrame(new File(SparkFiles.get("credit_card_clients.csv")))

val ratios = Array[Double](0.8)
val keys = Array[String]("train.hex", "valid.hex")
val frs = H2OFrameSupport.split(creditCardData, keys, ratios)
val (train, valid) = (frs(0), frs(1))

def buildGLMModel(train: Frame, valid: Frame, response: String)
 (implicit h2oContext: H2OContext): GLMModel = {
 import _root_.hex.glm.GLMModel.GLMParameters.Family
 import _root_.hex.glm.GLM
 import _root_.hex.glm.GLMModel.GLMParameters
 val glmParams = new GLMParameters(Family.binomial)
 glmParams._train = train
 glmParams._valid = valid
 glmParams._response_column = response
 glmParams._alpha = Array[Double](0.5)
 val glm = new GLM(glmParams, Key.make("glmModel.hex"))
 //val glmModel = glm.trainModel().get()

val glmModel = buildGLMModel(train, valid, 'default_payment_next_month)(hc)

// Collect model metrics and evaluate model quality
val trainMetrics = ModelMetricsSupport.modelMetrics[ModelMetricsBinomial](glmModel, train)
val validMetrics = ModelMetricsSupport.modelMetrics[ModelMetricsBinomial](glmModel, valid)

// Preduction
addFiles(sc, "/Users/avkashchauhan/learn/deepwater/credit_card_predict.csv")
val creditPredictData = new H2OFrame(new File(SparkFiles.get("credit_card_predict.csv")))

val predictionFrame = glmModel.score(creditPredictData)
var predictonResults = asRDD[DoubleHolder](predictionFrame)

Thats it, enjoy!!

Tips building H2O and Deep Water source code

Get source code:

Building Source code without test:

Build the source code without tests (For both H2O-3 and DeepWater source)

$ ./gradlew build -x test

Build the Java developer version of source code  without tests (For both H2O-3 and DeepWater source)

$ ./gradlew build -x test

H2O tests uses various small and large file during the test. Which you can download separately depending on the size on your working machine. If you decide to download large data sets, it will take good amount of space from your disk.

To download all the large test data files:

$ ./gradlew syncBigdataLaptop

To download all the small test data files:

$ ./gradlew syncSmalldata

Using with intelliJ:

Pull the source code and then import as a project and use gradle as build system. Once project is loaded, if you want to just do the test run, select the following:

h2o-app > srcc > main > java > water > H2OApp

Screen Shot 2017-04-21 at 10.39.04 AM

Thats it, enjoy!!

Running python and pysparkling with Zeppelin and YARN on Hadoop

Apache Zeppelin is very useful to use cell based notebooks (similar to jupyter) to work with various applications i.e. spark, python, hive, hbase etc by using various interpreters.
With H2O and Sparkling Water you can use Zeppelin on Hadoop cluster with YARN, and then could use Python or Pysparkling to submit jobs.
Here are the steps using Pyspakling with YARN on a hadoop cluster.
1. Get the latest build of sparkling water from here
2. Download and unzip the correct Sparkling Water version comparable with the Spark version into one of the edge node in your Hadoop cluster.
3. Set the following environment variables to the right path before running services:
export MASTER=”yarn-client” // To submit to the Yarn cluster
export SPARK_HOME=“path_to_the_directory_where_spark_unzipped”
export HADOOP_CONF_DIR=“path_to_the_hadoop_installation”export SPARK_SUBMIT_OPTIONS=”–packages ai.h2o:sparkling-water-examples_2.11:2.1.0”
export PYTHONPATH=“_path_to_where_python_installed”
export SPARKLING_EGG=$(ls -t /sparkling-water-2.1.0/py/build/dist/h2o_pysparkling*.egg | head -1)
//path to the Sparkling egg file needs to be updated above
Please make sure to check above version values to reflect the following:
  • 2.11-> refers to the scala version.
  • 2.1.0 —> refers to the spark version.
4. Set the “spark.executor.memory 4g” in Zeppelin either in the configuration file or in the Zeppelin UI if Error 143 is seen while starting the zeppelin server.
Note: To configure it in the Zeppelin UI, goto the dropdown next to the user at theTop right corner , select Interpreters and in the Spark section either edit or add the configuration.
5. Start the Zeppelin server using the command below. This would start Zeppelin in a Yarn container.
bin/ -Pspark-2.1
6. In Zeppelin notebook, create a new note with the markdown as below and add the path to the egg file. This will add the dependency and the classes of pysparkling.
sc.addPyFile(“_path_to_the egg_file_on_disk/h2o_pysparkling_2.1-2.1.99999-py2.7.egg”)
7. Now, one can start calling pysparkling API’s like below:
sc.addPyFile(“_path_to_the egg_file_on_disk/h2o_pysparkling_2.1-2.1.99999-py2.7.egg”)
from pysparkling import *
from pyspark import SparkContext
from pyspark.sql import SQLContext
import h2o hc = H2OContext.getOrCreate(sc)
8. To use the scala Sparkling water, one does not need to add dependency explicitly in the note in Zeppelin. A sample script would look like


import org.apache.spark.h2o._
val rdd = sc.parallelize(1 to 1000, 100).map( v => IntHolder(Some(v)))
val h2oContext = H2OContext.getOrCreate(sc)
Thats all, enjoy!!

Using Kyro library with Sparkling Water

Start sparkling water from spark shell:

$ bin/sparkling-shell

Start sparkling water from spark shell and add 3rd party jar:

$ bin/sparkling-shell --jars kryo-4.0.0.jar
Note: Make sure the jar file is accessible from this path

Verify if jar is added:

scala> sc.listJars
res1: Seq[String] = ArrayBuffer(spark://
Access spark web Interface:

Verify on Spark UI that environment is set and jobs are listed as below:

Note: Above you can look for jars, path, configuration etc to verify things.

To look available functions/method for an object type it at Scala console append . and then press TAB

scala> sc.[TAB]
Note: Above will give you the list of methods supported with SparkContext (sc)
Getting Spark Configuration:

scala> spark.conf.getAll
 res2: Map[String,String] = Map(spark.serializer -> org.apache.spark.serializer.KryoSerializer, ->, spark.driver.port -> 53817, 
 hive.metastore.warehouse.dir -> /Volumes/OSxexT/tools/sparkling-water-2.0.0/spark-warehouse,
 spark.repl.class.uri -> spark://, 
 spark.jars -> file:/Volumes/OSxexT/tools/sparkling-water-2.0.0/kryo-4.0.0.jar, 
 spark.repl.class.outputDir -> /private/var/folders/x7/331tvwcd6p17jj9zdmhnkpyc0000gn/T/avkashchauhan/spark/work/spark-551682b2-b75b-44c1-b57e-b5741b29f367/repl-1986c79c-aa93-4f3d-a7ad-3c83ad796d67, -> MyApp, 
 spark.driver.memory -> 3G,
 spark.logConf -> true -> driver, 
 spark.kryo.registrationRequired -> true, 
 spark.driver.extraJavaOptions -> " -XX:MaxPermSize=384m", 

Referencing 3rd Party jar i.e. kryo (we added with –jars)

scala> import com.esotericsoftware.kryo.Kryo
 import com.esotericsoftware.kryo.Kryo
 scala> val k:Kryo = new Kryo()
 k: com.esotericsoftware.kryo.Kryo = com.esotericsoftware.kryo.Kryo@4626f584
 scala> k.[TAB]

Now using SparkSession object:

scala> import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.SparkSession
 scala> val spark = SparkSession.builder()
 .config("spark.kryo.registrationRequired", "true")
 .config("spark.logConf", "true")
 16/10/21 14:56:04 WARN SparkSession$Builder: Use an existing SparkSession, some configuration may not take effect.
 spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2fee69a1

If you wanted to run a quick test from the source you can do the following:

$ SPARK_HOME/bin/spark-submit --class org.apache.spark.examples.h2o.HamOrSpamDemo ./assembly/build/libs/sparkling-water-assembly_2.11-2.0.0-all.jar --jars kryo-4.0.0.jar

Note: Above a built-in example call is called as an application with sparkling water and a 3rd party jar is also passed.