An estimator is an abstraction of a learning algorithm that fits a model on a dataset.
Note
|
That was so machine learning to explain an estimator this way, wasn’t it? It is that the more I spend time with Pipeline API the often I use the terms and phrases from this space. Sorry. |
Technically, an Estimator
produces a Model (i.e. a Transformer) for a given DataFrame
and parameters (as ParamMap
). It fits a model to the input DataFrame
and ParamMap
to produce a Transformer
(a Model
) that can calculate predictions for any DataFrame
-based input datasets.
It is basically a function that maps a DataFrame
onto a Model
through fit
method, i.e. it takes a DataFrame
and produces a Transformer
as a Model
.
estimator: DataFrame =[fit]=> Model
Estimators are instances of org.apache.spark.ml.Estimator abstract class that comes with fit
method (with the return type M
being a Model
):
fit(dataset: DataFrame): M
An Estimator
is a PipelineStage (so it can be a part of a Pipeline).
Note
|
Pipeline considers Estimator special and executes fit method before transform (as for other Transformer objects in a pipeline). Consult Pipeline document.
|
As an example you could use LinearRegression learning algorithm estimator to train a LinearRegressionModel.
Some of the direct specialized implementations of the Estimator
abstract class are as follows:
org.apache.spark.ml.feature.StringIndexer
is an Estimator
that produces StringIndexerModel
.
val df = ('a' to 'a' + 9).map(_.toString)
.zip(0 to 9)
.map(_.swap)
.toDF("id", "label")
import org.apache.spark.ml.feature.StringIndexer
val strIdx = new StringIndexer()
.setInputCol("label")
.setOutputCol("index")
scala> println(strIdx.explainParams)
handleInvalid: how to handle invalid entries. Options are skip (which will filter out rows with bad values), or error (which will throw an error). More options may be added later (default: error)
inputCol: input column name (current: label)
outputCol: output column name (default: strIdx_ded89298e014__output, current: index)
val model = strIdx.fit(df)
val indexed = model.transform(df)
scala> indexed.show
+---+-----+-----+
| id|label|index|
+---+-----+-----+
| 0| a| 3.0|
| 1| b| 5.0|
| 2| c| 7.0|
| 3| d| 9.0|
| 4| e| 0.0|
| 5| f| 2.0|
| 6| g| 6.0|
| 7| h| 8.0|
| 8| i| 4.0|
| 9| j| 1.0|
+---+-----+-----+
KMeans
class is an implementation of the K-means clustering algorithm in machine learning with support for k-means|| (aka k-means parallel) in Spark MLlib.
Roughly, k-means is an unsupervised iterative algorithm that groups input data in a predefined number of k
clusters. Each cluster has a centroid which is a cluster center. It is a highly iterative machine learning algorithm that measures the distance (between a vector and centroids) as the nearest mean. The algorithm steps are repeated till the convergence of a specified number of steps.
Note
|
K-Means algorithm uses Lloyd’s algorithm in computer science. |
It is an Estimator
that produces a KMeansModel.
Tip
|
Do import org.apache.spark.ml.clustering.KMeans to work with KMeans algorithm.
|
KMeans
defaults to use the following values:
-
Number of clusters or centroids (
k
):2
-
Maximum number of iterations (
maxIter
):20
-
Initialization algorithm (
initMode
):k-means||
-
Number of steps for the k-means|| (
initSteps
):5
-
Convergence tolerance (
tol
):1e-4
import org.apache.spark.ml.clustering._
val kmeans = new KMeans()
scala> println(kmeans.explainParams)
featuresCol: features column name (default: features)
initMode: initialization algorithm (default: k-means||)
initSteps: number of steps for k-means|| (default: 5)
k: number of clusters to create (default: 2)
maxIter: maximum number of iterations (>= 0) (default: 20)
predictionCol: prediction column name (default: prediction)
seed: random seed (default: -1689246527)
tol: the convergence tolerance for iterative algorithms (default: 1.0E-4)
KMeans
assumes that featuresCol
is of type VectorUDT and appends predictionCol
of type IntegerType
.
Internally, fit
method "unwraps" the feature vector in featuresCol
column in the input DataFrame
and creates an RDD[Vector]
. It then hands the call over to the MLlib variant of KMeans in org.apache.spark.mllib.clustering.KMeans
. The result is copied to KMeansModel
with a calculated KMeansSummary
.
Each item (row) in a data set is described by a numeric vector of attributes called features
. A single feature (a dimension of the vector) represents a word (token) with a value that is a metric that defines the importance of that word or term in the document.
Tip
|
Enable Add the following line to
Refer to Logging. |
You can represent a text corpus (document collection) using the vector space model. In this representation, the vectors have dimension that is the number of different words in the corpus. It is quite natural to have vectors with a lot of zero values as not all words will be in a document. We will use an optimized memory representation to avoid zero values using sparse vectors.
This example shows how to use k-means to classify emails as a spam or not.
// NOTE Don't copy and paste the final case class with the other lines
// It won't work with paste mode in spark-shell
final case class Email(id: Int, text: String)
val emails = Seq(
"This is an email from your lovely wife. Your mom says...",
"SPAM SPAM spam",
"Hello, We'd like to offer you").zipWithIndex.map(_.swap).toDF("id", "text").as[Email]
// Prepare data for k-means
// Pass emails through a "pipeline" of transformers
import org.apache.spark.ml.feature._
val tok = new RegexTokenizer()
.setInputCol("text")
.setOutputCol("tokens")
.setPattern("\\W+")
val hashTF = new HashingTF()
.setInputCol("tokens")
.setOutputCol("features")
.setNumFeatures(20)
val preprocess = (tok.transform _).andThen(hashTF.transform)
val features = preprocess(emails.toDF)
scala> features.select('text, 'features).show(false)
+--------------------------------------------------------+------------------------------------------------------------+
|text |features |
+--------------------------------------------------------+------------------------------------------------------------+
|This is an email from your lovely wife. Your mom says...|(20,[0,3,6,8,10,11,17,19],[1.0,2.0,1.0,1.0,2.0,1.0,2.0,1.0])|
|SPAM SPAM spam |(20,[13],[3.0]) |
|Hello, We'd like to offer you |(20,[0,2,7,10,11,19],[2.0,1.0,1.0,1.0,1.0,1.0]) |
+--------------------------------------------------------+------------------------------------------------------------+
import org.apache.spark.ml.clustering.KMeans
val kmeans = new KMeans
scala> val kmModel = kmeans.fit(features.toDF)
16/04/08 15:57:37 WARN KMeans: The input data is not directly cached, which may hurt performance if its parent RDDs are also uncached.
16/04/08 15:57:37 INFO KMeans: Initialization with k-means|| took 0.219 seconds.
16/04/08 15:57:37 INFO KMeans: Run 0 finished in 1 iterations
16/04/08 15:57:37 INFO KMeans: Iterations took 0.030 seconds.
16/04/08 15:57:37 INFO KMeans: KMeans converged in 1 iterations.
16/04/08 15:57:37 INFO KMeans: The cost for the best run is 5.000000000000002.
16/04/08 15:57:37 WARN KMeans: The input data was not directly cached, which may hurt performance if its parent RDDs are also uncached.
kmModel: org.apache.spark.ml.clustering.KMeansModel = kmeans_7a13a617ce0b
scala> kmModel.clusterCenters.map(_.toSparse)
res36: Array[org.apache.spark.mllib.linalg.SparseVector] = Array((20,[13],[3.0]), (20,[0,2,3,6,7,8,10,11,17,19],[1.5,0.5,1.0,0.5,0.5,0.5,1.5,1.0,1.0,1.0]))
val email = Seq("hello mom").toDF("text")
val result = kmModel.transform(preprocess(email))
scala> .show(false)
+---------+------------+---------------------+----------+
|text |tokens |features |prediction|
+---------+------------+---------------------+----------+
|hello mom|[hello, mom]|(20,[2,19],[1.0,1.0])|1 |
+---------+------------+---------------------+----------+
A Predictor
is a specialization of Estimator
for a PredictionModel with its own abstract train
method.
train(dataset: DataFrame): M
The train
method is supposed to ease dealing with schema validation and copying parameters to a trained PredictionModel
model. It also sets the parent of the model to itself.
A Predictor
is basically a function that maps a DataFrame
onto a PredictionModel
.
predictor: DataFrame =[train]=> PredictionModel
It implements the abstract fit(dataset: DataFrame)
of the Estimator
abstract class that validates and transforms the schema of a dataset (using a custom transformSchema
of PipelineStage), and then calls the abstract train
method.
Validation and transformation of a schema (using transformSchema
) makes sure that:
-
features
column exists and is of correct type (defaults to Vector). -
label
column exists and is ofDouble
type.
As the last step, it adds the prediction
column of Double
type.
The following is a list of Predictor
examples for different learning algorithms:
LinearRegression
is an example of Predictor (indirectly through the specialized Regressor
private abstract class), and hence a Estimator
, that represents the linear regression algorithm in Machine Learning.
LinearRegression
belongs to org.apache.spark.ml.regression
package.
Tip
|
Read the scaladoc of LinearRegression. |
It expects org.apache.spark.mllib.linalg.Vector
as the input type of the column in a dataset and produces LinearRegressionModel.
import org.apache.spark.ml.regression.LinearRegression
val lr = new LinearRegression
The acceptable parameters:
scala> println(lr.explainParams)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty (default: 0.0)
featuresCol: features column name (default: features)
fitIntercept: whether to fit an intercept term (default: true)
labelCol: label column name (default: label)
maxIter: maximum number of iterations (>= 0) (default: 100)
predictionCol: prediction column name (default: prediction)
regParam: regularization parameter (>= 0) (default: 0.0)
solver: the solver algorithm for optimization. If this is not set or empty, default value is 'auto' (default: auto)
standardization: whether to standardize the training features before fitting the model (default: true)
tol: the convergence tolerance for iterative algorithms (default: 1.0E-6)
weightCol: weight column name. If this is not set or empty, we treat all instance weights as 1.0 (default: )
train(dataset: DataFrame): LinearRegressionModel
train
(protected) method of LinearRegression
expects a dataset
DataFrame with two columns:
-
label
of typeDoubleType
. -
features
of type Vector.
It returns LinearRegressionModel
.
It first counts the number of elements in features column (usually features
). The column has to be of mllib.linalg.Vector type (and can easily be prepared using HashingTF transformer).
val spam = Seq(
(0, "Hi Jacek. Wanna more SPAM? Best!"),
(1, "This is SPAM. This is SPAM")).toDF("id", "email")
import org.apache.spark.ml.feature.RegexTokenizer
val regexTok = new RegexTokenizer()
val spamTokens = regexTok.setInputCol("email").transform(spam)
scala> spamTokens.show(false)
+---+--------------------------------+---------------------------------------+
|id |email |regexTok_646b6bcc4548__output |
+---+--------------------------------+---------------------------------------+
|0 |Hi Jacek. Wanna more SPAM? Best!|[hi, jacek., wanna, more, spam?, best!]|
|1 |This is SPAM. This is SPAM |[this, is, spam., this, is, spam] |
+---+--------------------------------+---------------------------------------+
import org.apache.spark.ml.feature.HashingTF
val hashTF = new HashingTF()
.setInputCol(regexTok.getOutputCol)
.setOutputCol("features")
.setNumFeatures(5000)
val spamHashed = hashTF.transform(spamTokens)
scala> spamHashed.select("email", "features").show(false)
+--------------------------------+----------------------------------------------------------------+
|email |features |
+--------------------------------+----------------------------------------------------------------+
|Hi Jacek. Wanna more SPAM? Best!|(5000,[2525,2943,3093,3166,3329,3980],[1.0,1.0,1.0,1.0,1.0,1.0])|
|This is SPAM. This is SPAM |(5000,[1713,3149,3370,4070],[1.0,1.0,2.0,2.0]) |
+--------------------------------+----------------------------------------------------------------+
// Create labeled datasets for spam (1)
val spamLabeled = spamHashed.withColumn("label", lit(1d))
scala> spamLabeled.show
+---+--------------------+-----------------------------+--------------------+-----+
| id| email|regexTok_646b6bcc4548__output| features|label|
+---+--------------------+-----------------------------+--------------------+-----+
| 0|Hi Jacek. Wanna m...| [hi, jacek., wann...|(5000,[2525,2943,...| 1.0|
| 1|This is SPAM. Thi...| [this, is, spam.,...|(5000,[1713,3149,...| 1.0|
+---+--------------------+-----------------------------+--------------------+-----+
val regular = Seq(
(2, "Hi Jacek. I hope this email finds you well. Spark up!"),
(3, "Welcome to Apache Spark project")).toDF("id", "email")
val regularTokens = regexTok.setInputCol("email").transform(regular)
val regularHashed = hashTF.transform(regularTokens)
// Create labeled datasets for non-spam regular emails (0)
val regularLabeled = regularHashed.withColumn("label", lit(0d))
val training = regularLabeled.union(spamLabeled).cache
scala> training.show
+---+--------------------+-----------------------------+--------------------+-----+
| id| email|regexTok_646b6bcc4548__output| features|label|
+---+--------------------+-----------------------------+--------------------+-----+
| 2|Hi Jacek. I hope ...| [hi, jacek., i, h...|(5000,[72,105,942...| 0.0|
| 3|Welcome to Apache...| [welcome, to, apa...|(5000,[2894,3365,...| 0.0|
| 0|Hi Jacek. Wanna m...| [hi, jacek., wann...|(5000,[2525,2943,...| 1.0|
| 1|This is SPAM. Thi...| [this, is, spam.,...|(5000,[1713,3149,...| 1.0|
+---+--------------------+-----------------------------+--------------------+-----+
import org.apache.spark.ml.regression.LinearRegression
val lr = new LinearRegression
// the following calls train by the Predictor contract (see above)
val lrModel = lr.fit(training)
// Let's predict whether an email is a spam or not
val email = Seq("Hi Jacek. you doing well? Bye!").toDF("email")
val emailTokens = regexTok.setInputCol("email").transform(email)
val emailHashed = hashTF.transform(emailTokens)
scala> lrModel.transform(emailHashed).select("prediction").show
+-----------------+
| prediction|
+-----------------+
|0.563603440350882|
+-----------------+
RandomForestRegressor
is a concrete Predictor for Random Forest learning algorithm. It trains RandomForestRegressionModel (a subtype of PredictionModel) using DataFrame
with features
column of Vector
type.
Caution
|
FIXME |
import org.apache.spark.mllib.linalg.Vectors
val features = Vectors.sparse(10, Seq((2, 0.2), (4, 0.4)))
val data = (0.0 to 4.0 by 1).map(d => (d, features)).toDF("label", "features")
// data.as[LabeledPoint]
scala> data.show(false)
+-----+--------------------------+
|label|features |
+-----+--------------------------+
|0.0 |(10,[2,4,6],[0.2,0.4,0.6])|
|1.0 |(10,[2,4,6],[0.2,0.4,0.6])|
|2.0 |(10,[2,4,6],[0.2,0.4,0.6])|
|3.0 |(10,[2,4,6],[0.2,0.4,0.6])|
|4.0 |(10,[2,4,6],[0.2,0.4,0.6])|
+-----+--------------------------+
import org.apache.spark.ml.regression.{ RandomForestRegressor, RandomForestRegressionModel }
val rfr = new RandomForestRegressor
val model: RandomForestRegressionModel = rfr.fit(data)
scala> model.trees.foreach(println)
DecisionTreeRegressionModel (uid=dtr_247e77e2f8e0) of depth 1 with 3 nodes
DecisionTreeRegressionModel (uid=dtr_61f8eacb2b61) of depth 2 with 7 nodes
DecisionTreeRegressionModel (uid=dtr_63fc5bde051c) of depth 2 with 5 nodes
DecisionTreeRegressionModel (uid=dtr_64d4e42de85f) of depth 2 with 5 nodes
DecisionTreeRegressionModel (uid=dtr_693626422894) of depth 3 with 9 nodes
DecisionTreeRegressionModel (uid=dtr_927f8a0bc35e) of depth 2 with 5 nodes
DecisionTreeRegressionModel (uid=dtr_82da39f6e4e1) of depth 3 with 7 nodes
DecisionTreeRegressionModel (uid=dtr_cb94c2e75bd1) of depth 0 with 1 nodes
DecisionTreeRegressionModel (uid=dtr_29e3362adfb2) of depth 1 with 3 nodes
DecisionTreeRegressionModel (uid=dtr_d6d896abcc75) of depth 3 with 7 nodes
DecisionTreeRegressionModel (uid=dtr_aacb22a9143d) of depth 2 with 5 nodes
DecisionTreeRegressionModel (uid=dtr_18d07dadb5b9) of depth 2 with 7 nodes
DecisionTreeRegressionModel (uid=dtr_f0615c28637c) of depth 2 with 5 nodes
DecisionTreeRegressionModel (uid=dtr_4619362d02fc) of depth 2 with 5 nodes
DecisionTreeRegressionModel (uid=dtr_d39502f828f4) of depth 2 with 5 nodes
DecisionTreeRegressionModel (uid=dtr_896f3a4272ad) of depth 3 with 9 nodes
DecisionTreeRegressionModel (uid=dtr_891323c29838) of depth 3 with 7 nodes
DecisionTreeRegressionModel (uid=dtr_d658fe871e99) of depth 2 with 5 nodes
DecisionTreeRegressionModel (uid=dtr_d91227b13d41) of depth 2 with 5 nodes
DecisionTreeRegressionModel (uid=dtr_4a7976921f4b) of depth 2 with 5 nodes
scala> model.treeWeights
res12: Array[Double] = Array(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0)
scala> model.featureImportances
res13: org.apache.spark.mllib.linalg.Vector = (1,[0],[1.0])
The following example uses LinearRegression estimator.
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
val data = (0.0 to 9.0 by 1) // create a collection of Doubles
.map(n => (n, n)) // make it pairs
.map { case (label, features) =>
LabeledPoint(label, Vectors.dense(features)) } // create labeled points of dense vectors
.toDF // make it a DataFrame
scala> data.show
+-----+--------+
|label|features|
+-----+--------+
| 0.0| [0.0]|
| 1.0| [1.0]|
| 2.0| [2.0]|
| 3.0| [3.0]|
| 4.0| [4.0]|
| 5.0| [5.0]|
| 6.0| [6.0]|
| 7.0| [7.0]|
| 8.0| [8.0]|
| 9.0| [9.0]|
+-----+--------+
import org.apache.spark.ml.regression.LinearRegression
val lr = new LinearRegression
val model = lr.fit(data)
scala> model.intercept
res1: Double = 0.0
scala> model.coefficients
res2: org.apache.spark.mllib.linalg.Vector = [1.0]
// make predictions
scala> val predictions = model.transform(data)
predictions: org.apache.spark.sql.DataFrame = [label: double, features: vector ... 1 more field]
scala> predictions.show
+-----+--------+----------+
|label|features|prediction|
+-----+--------+----------+
| 0.0| [0.0]| 0.0|
| 1.0| [1.0]| 1.0|
| 2.0| [2.0]| 2.0|
| 3.0| [3.0]| 3.0|
| 4.0| [4.0]| 4.0|
| 5.0| [5.0]| 5.0|
| 6.0| [6.0]| 6.0|
| 7.0| [7.0]| 7.0|
| 8.0| [8.0]| 8.0|
| 9.0| [9.0]| 9.0|
+-----+--------+----------+
import org.apache.spark.ml.evaluation.RegressionEvaluator
// rmse is the default metric
// We're explicit here for learning purposes
val regEval = new RegressionEvaluator().setMetricName("rmse")
val rmse = regEval.evaluate(predictions)
scala> println(s"Root Mean Squared Error: $rmse")
Root Mean Squared Error: 0.0
import org.apache.spark.mllib.linalg.DenseVector
// NOTE Follow along to learn spark.ml-way (not RDD-way)
predictions.rdd.map { r =>
(r(0).asInstanceOf[Double], r(1).asInstanceOf[DenseVector](0).toDouble, r(2).asInstanceOf[Double]))
.toDF("label", "feature0", "prediction").show
+-----+--------+----------+
|label|feature0|prediction|
+-----+--------+----------+
| 0.0| 0.0| 0.0|
| 1.0| 1.0| 1.0|
| 2.0| 2.0| 2.0|
| 3.0| 3.0| 3.0|
| 4.0| 4.0| 4.0|
| 5.0| 5.0| 5.0|
| 6.0| 6.0| 6.0|
| 7.0| 7.0| 7.0|
| 8.0| 8.0| 8.0|
| 9.0| 9.0| 9.0|
+-----+--------+----------+
// Let's make it nicer to the eyes using a Scala case class
scala> :pa
// Entering paste mode (ctrl-D to finish)
import org.apache.spark.sql.Row
import org.apache.spark.mllib.linalg.DenseVector
case class Prediction(label: Double, feature0: Double, prediction: Double)
object Prediction {
def apply(r: Row) = new Prediction(
label = r(0).asInstanceOf[Double],
feature0 = r(1).asInstanceOf[DenseVector](0).toDouble,
prediction = r(2).asInstanceOf[Double])
}
// Exiting paste mode, now interpreting.
import org.apache.spark.sql.Row
import org.apache.spark.mllib.linalg.DenseVector
defined class Prediction
defined object Prediction
scala> predictions.rdd.map(Prediction.apply).toDF.show
+-----+--------+----------+
|label|feature0|prediction|
+-----+--------+----------+
| 0.0| 0.0| 0.0|
| 1.0| 1.0| 1.0|
| 2.0| 2.0| 2.0|
| 3.0| 3.0| 3.0|
| 4.0| 4.0| 4.0|
| 5.0| 5.0| 5.0|
| 6.0| 6.0| 6.0|
| 7.0| 7.0| 7.0|
| 8.0| 8.0| 8.0|
| 9.0| 9.0| 9.0|
+-----+--------+----------+