RSparkling > The best of R + H2O + Spark

What you get from R + H2O + Spark?

R is great for statistical computing and graphics, and small scale data preparation, H2O is amazing distributed machine learning platform designed for scale and speed and Spark is great for super fast data processing at mega scale. So combining all of these 3 together you get the best of data science, machine learning and data processing, all in one.

rsparkling: The rsparkling R package is an extension package for sparklyr that creates an R front-end for the Sparkling WaterSpark package from H2O. This provides an interface to H2O’s high performance, distributed machine learning algorithms on Spark, using R.

SparkR is an R package that provides a light-weight frontend to use Apache Spark from R. In Spark 2.2.0, SparkR provides a distributed data frame implementation that supports operations like selection, filtering, aggregation etc. (similar to R data frames, dplyr) but on large datasets. SparkR also supports distributed machine learning using MLlib.

H2O is an in-memory platform for distributed, scalable machine learning. H2O uses familiar interfaces like R, Python, Scala, Java, JSON and the Flow notebook/web interface, and works seamlessly with big data technologies like Hadoop and Spark.

Apache Spark is a fast and general engine for big data processing, with built-in modules for streaming, SQL, machine learning and graph processing.

Sparkling Water integrates H2O’s fast scalable machine learning engine with Spark. With Sparkling Water you can publish Spark data structures (RDDs, DataFrames, Datasets) as H2O’s frames and vice versa, DSL to use Spark data structures as input for H2O’s algorithms. You can create ML applications utilizing Spark and H2O APIs, and Python interface enabling use of Sparkling Water directly from PySpark.

Installation Packages:

Quick Start Script:

Sys.setenv(SPARK_HOME='/Users/avkashchauhan/tools/spark-2.1.0-bin-hadoop2.7')
options(rsparkling.sparklingwater.version = "2.1.14") 
options(rsparkling.sparklingwater.location = "/Users/avkashchauhan/tools/sw2/sparkling-water-2.1.14/assembly/build/libs/sparkling-water-assembly_2.11-2.1.14-all.jar")
library(rsparkling)
library(sparklyr)
sc = spark_connect(master = "local", version = "2.1.0")
sc
h2o_context(sc, strict_version_check = FALSE)
library(h2o)
h2o.clusterInfo()
h2o_flow(sc)
spark_disconnect(sc)

Important Settings for your environment:

  • master = “local” > To start local spark cluster
  • master = “yarn-client” > To start a cluster managed by YARN
  • To get a list of supported Sparkling Water versions: h2o_release_table()
  • When you will call spark_connect() you will see a new “tab” appears
    • Tab “Spark” is used to launch “SparkUI”
    • Tab “Log” is used to collect spark logs
  • If there is any issue with sparklyr and spark version pass exact version above otherwise you dont need to pass version.

Startup Script with config parameters to set executor settings:

These are the settings you will use to get our rsparkling/spark session up and running in RStudio:

Sys.setenv(SPARK_HOME='/Users/avkashchauhan/tools/spark-2.1.0-bin-hadoop2.7')
options(rsparkling.sparklingwater.version = "2.1.14") 
options(rsparkling.sparklingwater.location = "/Users/avkashchauhan/tools/sw2/sparkling-water-2.1.14/assembly/build/libs/sparkling-water-assembly_2.11-2.1.14-all.jar")
library(rsparkling)
library(sparklyr)
config <- spark_config()
config$spark.executor.cores <- 4
config$spark.executor.memory <- "4G”
config$spark.executor.instances = 3  <==== This will create 3 Nodes Instance
sc <- spark_connect(master = "local", config = config, version = '2.1.0')
sc
h2o_context(sc, strict_version_check = FALSE)
library(h2o)
h2o.clusterInfo()
spark_disconnect(sc)

Accessing SparkUI:

You can access Spark UI just by clicking  SparkUI button at the spark tab as shown below:

Screen Shot 2017-10-28 at 9.54.48 AM

Accessing H2O FLOW UI:

You just need to pass the command to open H2O FLOW UI on selected browser:

h2o_flow()

Screen Shot 2017-10-28 at 9.55.03 AM

Building H2O GLM model using rsparkling + sparklyr + H2O:

In This example we are ingesting the famous “CARS & MPG” dataset and building a GLM (Generalized Linear Model) to predict the miles-per-gallon from the given specification of car capabilities:

options(rsparkling.sparklingwater.location = "/tmp/sparkling-water-assembly_2.11-2.1.7-all.jar")
library(rsparkling)
library(sparklyr)
library(h2o)
sc <- spark_connect(master = "local", version = "2.1.0")
mtcars_tbl <- copy_to(sc, mtcars, "mtcars")
sciris_tbl <- copy_to(sc, iris)
mtcars_tbl <- copy_to(sc, mtcars, "iris1")
mtcars_tbl <- copy_to(sc, mtcars, "mtcars")
mtcars_tbl <- copy_to(sc, mtcars, "mtcars", overwrite = TRUE)
mtcars_h2o <- as_h2o_frame(sc, mtcars_tbl, strict_version_check = FALSE)
mtcars_glm <- h2o.glm(x = c("wt", "cyl"),mtcars_glm <- h2o.glm(x = c("wt", "cyl"),y = "mpg",training_frame = mtcars_h2o,lambda_search = TRUE)
mtcars_glm
spark_disconnect(sc)

That’s all, enjoy!!

Advertisements

Calculating AUC and GINI model metrics for logistic classification

For logistics classification problem we use AUC metrics to check the model performance. The higher is better however any value above 80% is considered good and over 90% means the model is behaving great.

AUC is an abbreviation for Area Under the Curve. It is used in classification analysis in order to determine which of the used models predicts the classes best. An example of its application are ROC curves. Here, the true positive rates are plotted against false positive rates. You can learn more about AUC in this QUORA discussion.

We will also look for GINI metric which you can learn from wiki.  In this example we will learn how AUC and GINI model metric is calculated using True Positive Results (TPR) and False Positive Results (FPR) values from a given test dataset.

You can get the full working Jupyter Notebook here from my Github.

Lets build a logistic classification model in H2O using the prostate data set:

Preparation of H2O environment and dataset:

## Importing required libraries
import h2o
import sys
import pandas as pd
from h2o.estimators.gbm import H2OGradientBoostingEstimator

## Starting H2O machine learning cluster
h2o.init()

## Importing dataset
local_url = "https://raw.githubusercontent.com/h2oai/sparkling-water/master/examples/smalldata/prostate/prostate.csv"
df = h2o.import_file(local_url)

## defining feaures and response column
y = "CAPSULE"
feature_names = df.col_names
feature_names.remove(y)

## setting our response column to catagorical so our model classify the problem
df[y] = df[y].asfactor()

Now we will be splitting the dataset into 3 sets for training, validation and test:

df_train, df_valid, df_test = df.split_frame(ratios=[0.8,0.1])
print(df_train.shape)
print(df_valid.shape)
print(df_test.shape)

Setting  H2O GBM Estimator and building GBM Model:

prostate_gbm = H2OGradientBoostingEstimator(model_id = "prostate_gbm",
 ntrees=500,
 learn_rate=0.001,
 max_depth=10,
 score_each_iteration=True)

## Building H2O GBM Model:
prostate_gbm.train(x = feature_names, y = y, training_frame=df_train, validation_frame=df_valid)

## Understand the H2O GBM Model
prostate_gbm

Generating model performance with training, validation & test datasets:

train_performance = prostate_gbm.model_performance(df_train)
valid_performance = prostate_gbm.model_performance(df_valid)
test_performance = prostate_gbm.model_performance(df_test)

Let’s take a look at the AUC metrics provided by Model performance:

print(train_performance.auc())
print(valid_performance.auc())
print(test_performance.auc())
print(prostate_gbm.auc())

Let’s take a look at the GINI metrics provided by Model performance:

print(train_performance.gini())
print(valid_performance.gini())
print(test_performance.gini())
print(prostate_gbm.gini())

Let generate the predictions using test dataset:

predictions = prostate_gbm.predict(df_test)
## Here we will get the probability for the 'p1' values from the prediction frame:
predict_probability = predictions['p1']

Now we will import required scikit-learn libraries to generate AUC manually:

from sklearn.metrics import roc_curve, auc
import matplotlib.pyplot as plt
import random

Lets get the real response results from the test data frame:

actual = df_test[y].as_data_frame()
actual_list = actual['CAPSULE'].tolist()
print(actual_list)

Now lets get the results probabilities from the prediction frame:

predictions_temp = predict_probability_x['p1'].as_data_frame()
predictions_list = predictions_temp['p1'].tolist()
print(predictions_list)

Calculating False Positive Rate and True Positive Rate:

Lets calculate TPR, FPR and Threshold metrics from the predictions and original data frame
– False Positive Rate (fpr)
– True Positive Rate (tpr)
– Threashold

fpr, tpr, thresholds = roc_curve(actual_list, predictions_list)
roc_auc = auc(fpr, tpr)
print(roc_auc)
print(test_performance.auc())

Note: Above you will see that our calculated ROC values is exactly same as given by model performance for test dataset. 

Lets plot the AUC Curve using matplotlib:

plt.title('ROC (Receiver Operating Characteristic)')
plt.plot(fpr, tpr, 'b',
label='AUC = %0.4f'% roc_auc)
plt.legend(loc='lower right')
plt.plot([0,1],[0,1],'r--')
plt.xlim([-0.1,1.2])
plt.ylim([-0.1,1.2])
plt.ylabel('True Positive Rate (TPR)')
plt.xlabel('False Positive Rate (FPR)')
plt.show()

Screen Shot 2017-10-19 at 10.30.21 PM

This is how GINI metric is calculated from AUC:

GINI = (2 * roc_auc) - 1
print(GINI)
print(test_performance.gini())

Note: Above you will see that our calculated GINI values is exactly same as given by model performance for test dataset.

Thats it, enjoy!!


Exploring & transforming H2O Data Frame in R and Python

Sometime you may need to ingest a dataset for building models and then your first task is to explore all the features and their type you have. Once that is done you may want to change the feature types to the one you want.

Here is the code snippet in Python:

df = h2o.import_file('https://raw.githubusercontent.com/h2oai/sparkling-water/master/examples/smalldata/prostate.csv')
df.types
{    u'AGE': u'int', u'CAPSULE': u'int', u'DCAPS': u'int', 
     u'DPROS': u'int', u'GLEASON': u'int', u'ID': u'int',
     u'PSA': u'real', u'RACE': u'int', u'VOL': u'real'
}
If you would like to visualize all the features in graphical format you can do the following:
import pylab as pl
df.as_data_frame().hist(figsize=(20,20))
pl.show()
The result looks like as below on jupyter notebook:
Screen Shot 2017-10-05 at 5.20.03 PM
Note: If you have features above 50, you might have to trim your data frame to less features so you can have effective visualization.
Next you may need to You can also use the following function to convert a list of columns as factor/categorical by passing H2O dataframe and a list of columns:
def convert_columns_as_factor(hdf, column_list):
    list_count = len(column_list)
    if list_count is 0:
        return "Error: You don't have a list of binary columns."
    if (len(pdf.columns)) is 0:
        return "Error: You don't have any columns in your data frame."
    local_column_list = pdf.columns
    for i in range(list_count):
        try:
            target_index = local_column_list.index(column_list[i])
            pdf[column_list[i]] = pdf[column_list[i]].asfactor()
            print('Column ' + column_list[i] + " is converted into factor/catagorical.")
        except ValueError:
            print('Error: ' + str(column_list[i]) + " not found in the data frame.")

The following script is in R to perform the same above tasks:

N=100
set.seed(999)
color = sample(c("D","E","I","F","M"),size=N,replace=TRUE)
num = rnorm(N,mean = 12,sd = 21212)
sex = sample(c("male","female"),size=N,replace=TRUE)
sex = as.factor(sex)
color = as.factor(color)
data = sample(c(0,1),size = N,replace = T)
fdata = factor(data)
table(fdata)
dd = data.frame(color,sex,num,fdata)
data = as.h2o(dd)
str(data)
data$sex = h2o.setLevels(x = data$sex ,levels = c("F","M"))
data
Thats it, enjoy!!

Renaming H2O data frame column name in R

Following is the code snippet showing how you can rename a column in H2O data frame in R:

> train.hex <- h2o.importFile("https://h2o-public-test-data.s3.amazonaws.com/smalldata/iris/iris_wheader.csv")
  |======================================================| 100%

> train.hex
  sepal_len sepal_wid petal_len petal_wid class
1 5.1 3.5 1.4 0.2 Iris-setosa
2 4.9 3.0 1.4 0.2 Iris-setosa
3 4.7 3.2 1.3 0.2 Iris-setosa
4 4.6 3.1 1.5 0.2 Iris-setosa
5 5.0 3.6 1.4 0.2 Iris-setosa
6 5.4 3.9 1.7 0.4 Iris-setosa
[150 rows x 5 columns] 

> h2o.names(train.hex)
[1] "sepal_len" "sepal_wid" "petal_len" "petal_wid" "class"    

> h2o.colnames(train.hex)
[1] "sepal_len" "sepal_wid" "petal_len" "petal_wid" "class" 

## Now use the index starting from 1 and if you can change the column name as below
## Changing "class" to "new_class"

> names(train.hex)[5] = c("new_class")

# Checking Result:
> h2o.colnames(train.hex)
[1] "sepal_len" "sepal_wid" "petal_len" "petal_wid" "new_class"
> h2o.names(train.hex)
[1] "sepal_len" "sepal_wid" "petal_len" "petal_wid" "new_class"  

## Now changing "sepal_len" to "sepal_len_new"
> names(train.hex)[1] = c("sepal_len_new")

> h2o.names(train.hex)
[1] "sepal_len_new" "sepal_wid" "petal_len" "petal_wid" "new_class"    

> h2o.colnames(train.hex)
[1] "sepal_len_new" "sepal_wid" "petal_len" "petal_wid" "new_class"    

> train.hex
  sepal_len_new sepal_wid petal_len petal_wid new_class
1 5.1 3.5 1.4 0.2 Iris-setosa
2 4.9 3.0 1.4 0.2 Iris-setosa
3 4.7 3.2 1.3 0.2 Iris-setosa
4 4.6 3.1 1.5 0.2 Iris-setosa
5 5.0 3.6 1.4 0.2 Iris-setosa
6 5.4 3.9 1.7 0.4 Iris-setosa

 

That’s it, enjoy!!

Setting stopping criteria into H2O K-means

Sometime you may be looking for k-means stopping criteria, based off of “Number of Reassigned Observations Within Cluster”.

H2O K-means implementation has following 2 stopping criteria in k-means:

  1. Outer loop for estimate_k – stop when relative reduction of sum-of-within-centroid-sum-of-squares is small enough
  2. lloyds iteration – stops when relative fraction of reassigned points is small enough
In H2O Machine Learning library you just need to enabled _estimate_k to True and then have _max_iterations set to a very high number i.e. 100.
Using this combination, what happens is that algorithm will find best suitable K until it hits the max. There are no other fine-tuning parameters available.

In R here is what you can do:

h2o.kmeans(x = predictors, k = 100, estimate_k = T, standardize = F,
                          training_frame = train, validation_frame=valid, seed = 1234)

In Python here is what you can do:

iris_kmeans = H2OKMeansEstimator(k = 100, estimate_k = True, standardize = False, seed = 1234)
iris_kmeans.train(x = predictors, training_frame = train, validation_frame=valid)

In Java/Scala:

_estimate_k  = TRUE
_max_iterations = 100 (or a larger number.)

That’s it, enjoy!!

Calculating Standard Deviation using custom UDF and group by in H2O

Here is the full code to calculate standard deviation using H2O group by method as well as using customer UDF:

library(h2o)
h2o.init()
irisPath <- system.file("extdata", "iris_wheader.csv", package = "h2o")
iris.hex <- h2o.uploadFile(path = irisPath, destination_frame = "iris.hex")

# Calculating Standard Deviation using h2o group by
SdValue <- h2o.group_by(data = iris.hex, by = "class", sd("sepal_len"))

# Printing result
SdValue

# Alternative defining a UDF for Standard Deviation
mySDUdf <- function(df) { sd(df[,1],na.rm = T) }

# Using h2o ddply with UDF
SdValue <- h2o.ddply(iris.hex, "class", mySDUdf)

# Printing result
SdValue

Thats it, enjoy!!

Calculate mean using UDF in H2O

Here is the full code to write a UDF to calculate mean for a given data frame using H2O machine learning platform:

 

library(h2o)
h2o.init()
ausPath <- system.file("extdata", "australia.csv", package="h2o")
australia.hex <- h2o.uploadFile(path = ausPath)

# Writing the UDF
myMeanUDF = function(Fr) { mean(Fr[, 1]) }

# Applying UDF using ddply
MeanValue = h2o.ddply(australia.hex[, c("premax", "salmax", "Max_czcs")], c("premax", "salmax"), myMeanUDF)

# Printing Results
MeanValue

Thats it, enjoy!!