-
Notifications
You must be signed in to change notification settings - Fork 311
Serializing a Spark ML Pipeline and Scoring with MLeap
This tutorial shows you how to use MLeap and Bundle.ML components to export a trained Spark ML Pipeline and use MLeap to transform new data without any dependency on Spark.
We will construct an ML Pipeline comprised of a Vector Assembler, a Binarizer, PCA and a Random Forest Model for handwritten image classification on the MNIST dataset.
The code for this tutorial is split up into two parts to demonstrate MLeap's independence on Spark:
- Spark ML Pipeline Code: Vanilla/out-of-the-box Spark code to train the ML Pipeline, which we serialize to Bundle.ML
- MLeap Code: Load the serialized Bundle to Mleap and transform Leap Frames
Before we begin, let's cover some terms:
- Estimator: The actual learning algorithms that train/fit the transformer against the data frame and produces a Model
- Model: In Spark, the model is the code and metadata needed to score against an already trained algorithm
- Transformer: Anything that transforms a data frame, does not necessarily be trained by an estimator (i.e. a Binarizer)
- LeapFrame: A dataframe structure used for storing your data and the associated schema
// Note that we are taking advantage of com.databricks:spark-csv package to load the data
import org.apache.spark.ml.feature.{VectorAssembler,StringIndexer,IndexToString, Binarizer}
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.{MulticlassClassificationEvaluator}
import org.apache.spark.ml.{Pipeline,PipelineModel}
import org.apache.spark.ml.feature.PCA
val datasetPath = "./mleap-demo/data/mnist/mnist_train.csv"
var dataset = spark.sqlContext.read.format("com.databricks.spark.csv").
option("header", "true").
option("inferSchema", "true").
load(datasetPath)
val testDatasetPath = "./mleap-demo/data/mnist/mnist_test.csv"
var test = spark.sqlContext.read.format("com.databricks.spark.csv").
option("inferSchema", "true").
option("header", "true").
load(testDatasetPath)
// Define Dependent and Independent Features
val predictionCol = "label"
val labels = Seq("0","1","2","3","4","5","6","7","8","9")
val pixelFeatures = (0 until 784).map(x => s"x$x").toArray
val layers = Array[Int](pixelFeatures.length, 784, 800, labels.length)
val vector_assembler = new VectorAssembler()
.setInputCols(featureColumns)
.setOutputCol("features")
val stringIndexer = { new StringIndexer()
.setInputCol(predictionCol)
.setOutputCol("label_index")
.fit(dataset)
}
val binarizer = new Binarizer()
.setInputCol(vector_assembler.getOutputCol)
.setThreshold(127.5)
.setOutputCol("binarized_features")
val pca = new PCA().
setInputCol(binarizer.getOutputCol).
setOutputCol("pcaFeatures").
setK(10)
val featurePipeline = new Pipeline().setStages(Array(vector_assembler, stringIndexer, binarizer, pca))
// Transform the raw data with the feature pipeline and persist it
val featureModel = featurePipeline.fit(dataset)
val datasetWithFeatures = featureModel.transform(dataset)
// Select only the data needed for training and persist it
val datasetPcaFeaturesOnly = datasetWithFeatures.select(stringIndexer.getOutputCol, pca.getOutputCol)
val datasetPcaFeaturesOnlyPersisted = datasetPcaFeaturesOnly.persist()
We could make the random forest model be part of the same pipeline, however, there is an existing bug (SPARK-16845] that prevents us from doing that.
// You can optionally experiment with CrossValidator and MulticlassClassificationEvaluator to determine optimal
// settings for the random forest
val rf = new RandomForestClassifier().
setFeaturesCol(pca.getOutputCol).
setLabelCol(stringIndexer.getOutputCol).
setPredictionCol("prediction").
setProbabilityCol("probability").
setRawPredictionCol("raw_prediction")
val rfModel = rf.fit(datasetPcaFeaturesOnlyPersisted)
import org.apache.spark.ml.mleap.SparkUtil
val pipeline = SparkUtil.createPipelineModel(uid = "pipeline", Array(featurePipeline, rf))
import ml.combust.bundle.BundleFile
import ml.combust.mleap.spark.SparkSupport._
val modelFile = BundleFile("/tmp/mnist-spark-pipeline.zip")
pipeline.write.
overwrite.
name("simple-pipeline").
save(dest)
modelFile.close()
The goal of this step is to show how to deserialize a bundle
and use it to score LeapFrames without any Spark dependencies.
import ml.combust.bundle.BundleFile
import ml.combust.mleap.MleapSupport._
// load the Spark pipeline we saved in the previous section
val bundle = BundleFile("/tmp/mnist-spark-pipeline.zip").load().get
Load the sample LeapFrame from the mleap-demo git repo (data/mnist.json)
import ml.combust.mleap.runtime.serialization.FrameReader
val s = scala.io.Source.fromURL("file:///./mleap-demo/mnist.json").mkString
val bytes = s.getBytes("UTF-8")
val frame = FrameReader("ml.combust.mleap.json").fromBytes(bytes)
// transform the dataframe using our pipeline
val mleapPipeline = bundle.root
val frame2 = mleapPipeline.transform(frame).get
val data = frame2.dataset
What next? You can find more examples and notebooks here.