Scala code to generate RDD and conver to H2O Data Frame

Launch Sparkling Water on Spark:

sparkling-water-1.6.8$ bin/sparkling-shell –num-executors 3 –executor-memory 20g –driver-memory 20g –conf spark.dynamicAllocation.enabled=false –master yarn –conf spark.scheduler.minRegisteredResourcesRatio=1 –conf spark.ext.h2enabled=false –conf

Environment Execution:

 Spark master (MASTER) : yarn
 Spark home (SPARK_HOME) : /opt/cloudera/parcels/CDH/lib/spark/
 H2O build version : (turing)
 Spark build version : 1.6.2
 WARNING: User-defined SPARK_HOME (/opt/cloudera/parcels/CDH-5.8.2-1.cdh5.8.2.p0.3/lib/spark) overrides detected (/opt/cloudera/parcels/CDH/lib/spark/).
 WARNING: Running spark-class from user-defined location.
 Setting default log level to "WARN".
 To adjust logging level use sc.setLogLevel(newLevel).
 Welcome to
 ____ __
 / __/__ ___ _____/ /__
 _\ \/ _ \/ _ `/ __/ '_/
 /___/ .__/\_,_/_/ /_/\_\ version 1.6.0
 Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_80)
 Type in expressions to have them evaluated.
 Type :help for more information.
 Spark context available as sc (master = yarn-client, app id = application_1476815700342_0913).
 SQL context available as sqlContext.

Scala Script:

import org.apache.spark.h2o._
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.types.DoubleType
 import org.apache.spark.sql.Row
 val hc = H2OContext.getOrCreate(sc)
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 val h2oconf = new H2OConf(sc).setInternalClusterMode()
 val h2oContext = H2OContext.getOrCreate(sc, h2oconf)
 import h2oContext.implicits._
 val numCol = 15
 ## Reduce following number to 1000 for a smaller array count, 100000 will take 10+minutes to convert
 val arraydataInt = 1 to 100000 toArray
 val arraydata = => x.toDouble)
 val slideddata = arraydata.sliding(numCol).toSeq
 val rows = arraydata.sliding(numCol).map { x => Row(x: _*) }
 val datasetsize = arraydataInt.size
 val myrdd = sc.makeRDD(rows.toSeq, arraydata.size - numCol).persist()
 val schemaString = "value1 value2 value3 value4 value5 value6 value7 value8 value9 value10 value11 value12 value13 value14 label"
 val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, DoubleType, true)))
 val df = sqlContext.createDataFrame(myrdd, schema).cache()
 val splitsH = df.randomSplit(Array(0.8, 0.1))
 val trainsetH = splitsH(0).cache()
 val testsetH = splitsH(1).cache()
 println("now entering H2O stage")
 val trainsetH2O: H2OFrame = trainsetH



Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s