Connecting...

Bright Celebration Close Up 282045

Training Your First Classifier with Spark and Scala by Jeremy Miller

Bright Celebration Close Up 282045

Are you looking to start your machine learning journey?

Working with big data you often have to use Apache Spark which is written in Scala, therefore, you will notice a much better performance by using Scala. Want some guidance?

Check out this great article written by Data Scientist, Jeremy Miller to help you out.

 

Many people begin their machine learning journey using Python and Sklearn. If you want to work with big data you have to use Apache Spark. It is possible to work with Spark in Python using Pyspark. However, since Spark is written in Scala, you will see much better performance by using Scala. There are numerous tutorials out there about how to get up and running with Spark on your computer so I won’t go into that. I will only suggest that two ways to get started quickly are to use a docker image, or the community version of Databricks. Let’s get started!

I prefer to use the spark-shell and start it with the color option enabled:

1 spark-shell --conf spark.driver.extraJavaOptions="-Dscala.color"

These imports will help with file navigation while inside the spark-shell:

1 import sys.process._  
2 import scala.language.postfixOps
3 
4 "ls" ! // this is how you run the command ls while within the spark-shell

Next come all of our imports. It looks like some repetition; this is because some of the functionality in the Spark RDD-API has yet been ported over to the newer Spark Dataframe-API:

1 import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator  
2 import org.apache.spark.mllib.evaluation.MulticlassMetrics  
3 import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics  
4 import org.apache.spark.ml.classification.RandomForestClassifier  
5 import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit, CrossValidator}  
6 import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer, OneHotEncoderEstimator}  
7 import org.apache.spark.ml.linalg.Vectors  
8 import org.apache.spark.ml.Pipeline  
9 import org.apache.log4j._  
10 Logger.getLogger("org").setLevel(Level.ERROR) 

If you are not using the spark-shell you may need the following additional imports. The spark-shell automatically creates a spark context as “sc” and a spark session as “spark”.

1 import org.apache.spark.sql.SparkSession
2 val spark = SparkSession.builder().getOrCreate()

Now we can load in the data. I’m using the Harvard EdX dataset as an example. NB: I’m not going to do much feature engineering because I want to focus on the mechanics of training the model in Spark. In the end I will have a classifier that predicts whether a student passes a course based on data accumulated throughout the entire course. It better have a good score! Creating a useful model would require featurizing the data to train the model on what you would know at the time that you wanted to make the prediction. With that clarified, here we go:

1 val data = spark.read.option("header","true").
2     option("inferSchema","true").
3     format("csv").
4     load("path_to_data/mooc.csv")

A Spark model needs exactly two columns: “label” and “features”. To get there will take a few steps. First we will identify our label using the select method while also keeping only relevant columns (see the caveat above about feature engineering):

1 val df = (data.select(data("certified").as("label"), 
2           $"registered", $"viewed", $"explored", 
3           $"final_cc_cname_DI", $"gender", $"nevents", 
4           $"ndays_act", $"nplay_video", $"nchapters", $"nforum_posts"))

Putting the entire method call in a set of parentheses allows you to break up the lines arbitrarily without Spark freaking out.

Next we will do some one-hot encoding on our categorical features. This takes a few steps. First we have to use the StringIndexer to convert the strings to integers. Then we have to use the OneHotEncoderEstimator to do the encoding.

1  // string indexing
2  val indexer1 = new StringIndexer().
3      setInputCol("final_cc_cname_DI").
4      setOutputCol("countryIndex").
5      setHandleInvalid("keep") 
6  val indexed1 = indexer1.fit(df).transform(df)
7
8  val indexer2 = new StringIndexer().
9      setInputCol("gender").
10     setOutputCol("genderIndex").
11     setHandleInvalid("keep")
12 val indexed2 = indexer2.fit(indexed1).transform(indexed1)
13
14 // one hot encoding
15 val encoder = new OneHotEncoderEstimator().
16   setInputCols(Array("countryIndex", "genderIndex")).
17   setOutputCols(Array("countryVec", "genderVec"))
18 val encoded = encoder.fit(indexed2).transform(indexed2)

Next we check for null values. In this dataset I was able to find the number of null values through some relatively simple code, though depending on the data it may be more complicated:

1 val nanEvents = encoded.groupBy("nevents").count().orderBy($"count".desc)
2 for (line <- nanEvents){
3     println(line)
4 }

After checking the columns, I decided to impute the null values of the following columns using the median value of that column: nevents, ndays_act, nplay_video, nchapters. I did this like so:

1  // define medians
2  val neventsMedianArray = encoded.stat.approxQuantile("nevents", Array(0.5), 0)
3  val neventsMedian = neventsMedianArray(0)
4
5  val ndays_actMedianArray = encoded.stat.approxQuantile("ndays_act", Array(0.5), 0)
6  val ndays_actMedian = ndays_actMedianArray(0)
7
8  val nplay_videoMedianArray = encoded.stat.approxQuantile("nplay_video", Array(0.5), 0)
9  val nplay_videoMedian = nplay_videoMedianArray(0)
10 
11 val nchaptersMedianArray = encoded.stat.approxQuantile("nchapters", Array(0.5), 0)
12 val nchaptersMedian = nchaptersMedianArray(0)
13 
14 // replace 
15 val filled = encoded.na.fill(Map(
16   "nevents" -> neventsMedian, 
17   "ndays_act" -> ndays_actMedian, 
18   "nplay_video" -> nplay_videoMedian, 
19   "nchapters" -> nchaptersMedian))

Then we use the VectorAssembler object to construct our “features” column. Remember, Spark models need exactly two columns: “label” and “features”.

1  // Set the input columns as the features we want to use
2  val assembler = (new VectorAssembler().setInputCols(Array(
3    "viewed", "explored", "nevents", "ndays_act", "nplay_video", 
4    "nchapters", "nforum_posts", "countryVec", "genderVec")).
5     setOutputCol("features")
6
7  // Transform the DataFrame
8  val output = assembler.transform(filled).select($"label",$"features")

Now we split the data into training and test sets.

1 // Splitting the data by create an array of the training and test data
2 val Array(training, test) = output.select("label","features").
3                            randomSplit(Array(0.7, 0.3), seed = 12345)

The data is set up! Now we can create a model object (I’m using a Random Forest Classifier), define a parameter grid (I kept it simple and only varied the number of trees), create a Cross Validator object (here is where we set our scoring metric for training the model) and fit the model.

WARNING: This code will take some time to run! If you have a particularly old / underpowered computer, beware.

1  // create the model
2  val rf = new RandomForestClassifier()
3
4  // create the param grid
5  val paramGrid = new ParamGridBuilder().
6    addGrid(rf.numTrees,Array(20,50,100)).
7    build()
8
9  // create cross val object, define scoring metric
10 val cv = new CrossValidator().
11   setEstimator(rf).
12   setEvaluator(new MulticlassClassificationEvaluator().setMetricName("weightedRecall")).
13   setEstimatorParamMaps(paramGrid).
14   setNumFolds(3).
15   setParallelism(2)
16
17 // You can then treat this object as the model and use fit on it.
18 val model = cv.fit(training)

Now we have a trained, cross validated model! You can explore the attributes and methods of the model by typing “model.” and then pressing tab on your keyboard (note the period after the word model). I encourage you to spend some time here to get a sense of what this model object is and what it can do.

It’s time for some model evaluation. This is a little more difficult because the evaluation functionality still mostly resides in the RDD-API for Spark, requiring some different syntax. Let’s begin by getting predictions on our test data and storing them.

1 val results = model.transform(test).select("features", "label", "prediction")

We will then convert these results to an RDD.

1 val predictionAndLabels = results.
2     select($"prediction",$"label").
3     as[(Double, Double)].
4     rdd

Then we can create our metrics objects and print out the confusion matrix.

1  // Instantiate a new metrics objects
2  val bMetrics = new BinaryClassificationMetrics(predictionAndLabels)
3  val mMetrics = new MulticlassMetrics(predictionAndLabels)
4  val labels = mMetrics.labels
5
6  // Print out the Confusion matrix
7  println("Confusion matrix:")
8  println(mMetrics.confusionMatrix)

Now we have some results! You can use the numbers in the confusion matrix to calculate your various metrics. Spark will do this for us and print them out, but the syntax is bulky:

1  // Precision by label
2  labels.foreach { l =>
3    println(s"Precision($l) = " + mMetrics.precision(l))
4  }
5
6  // Recall by label
7  labels.foreach { l =>
8    println(s"Recall($l) = " + mMetrics.recall(l))
9  }
10
11 // False positive rate by label
12 labels.foreach { l =>
13   println(s"FPR($l) = " + mMetrics.falsePositiveRate(l))
14 }
15 
16 // F-measure by label
17 labels.foreach { l =>
18   println(s"F1-Score($l) = " + mMetrics.fMeasure(l))
19 }

We can also calculate more sophisticated metrics such as AUC and AUPRC:

1  // Precision by threshold
2  val precision = bMetrics.precisionByThreshold
3  precision.foreach { case (t, p) =>
4    println(s"Threshold: $t, Precision: $p")
5  }
6 
7  // Recall by threshold
8  val recall = bMetrics.recallByThreshold
9  recall.foreach { case (t, r) =>
10   println(s"Threshold: $t, Recall: $r")
11 }
12 
13 // Precision-Recall Curve
14 val PRC = bMetrics.pr
15 
16 // F-measure
17 val f1Score = bMetrics.fMeasureByThreshold
18 f1Score.foreach { case (t, f) =>
19   println(s"Threshold: $t, F-score: $f, Beta = 1")
20 }
21
22 val beta = 0.5
23 val fScore = bMetrics.fMeasureByThreshold(beta)
24 f1Score.foreach { case (t, f) =>
25   println(s"Threshold: $t, F-score: $f, Beta = 0.5")
26 }
27 
28 // AUPRC
29 val auPRC = bMetrics.areaUnderPR
30 println("Area under precision-recall curve = " + auPRC)
31 
32 // Compute thresholds used in ROC and PR curves
33 val thresholds = precision.map(_._1)
34 
35 // ROC Curve
36 val roc = bMetrics.roc
37 
38 // AUROC
39 val auROC = bMetrics.areaUnderROC
40 println("Area under ROC = " + auRO

Et voila! We have trained and evaluated our classifier! I hope you see that using Apache Spark for machine learning is only a bit more complicated than using libraries such as Sklearn or H2O. This extra effort pays off by allowing you to work with big data. I encourage you play around with different models available in the Spark ML Library. Thanks for reading.'

 

This article was written by Jeremy Miller and originally posted on towardsdatascience.com