**1. Estimator、Transformer、Parameter使用示例**
```scala
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object MLInstance1 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local[*]")
.appName(this.getClass.getName)
.getOrCreate()
// 從(label,features)元組列表準備訓練數據
val training:DataFrame = spark.createDataFrame(Seq(
(1.0, Vectors.dense(0.0, 1.1, 0.1)),
(0.0, Vectors.dense(2.0, 1.0, -1.0)),
(0.0, Vectors.dense(2.0, 1.3, 1.0)),
(1.0, Vectors.dense(0.0, 1.2, -0.5)))
).toDF("label", "features")
training.show(5)
// +-----+--------------+
// |label| features|
// +-----+--------------+
// | 1.0| [0.0,1.1,0.1]|
// | 0.0|[2.0,1.0,-1.0]|
// | 0.0| [2.0,1.3,1.0]|
// | 1.0|[0.0,1.2,-0.5]|
// +-----+--------------+
// 創建一個邏輯回歸(LogisticRegression)實例。 該實例是一個 Estimator。
val lr:LogisticRegression = new LogisticRegression()
// 打印出參數,文檔和任何默認值
println("LogisticRegression parameters:\n" + lr.explainParams() + "\n")
// 我們可以使用 setter 方法設置參數
lr.setMaxIter(10).setRegParam(0.01)
// 訓練邏輯回歸(LogisticRegression)模型。 這里使用存儲在 lr 中的參數
val model1 = lr.fit(training)
// 由于 model1 是模型(即 Estimator 產生的 Transformer)
// 我們可以查看它在 fit()中使用的參數
// 這將打印參數(名稱:值)對,其中名稱是此參數的唯一 ID
println("Model 1 was fit using parameters: " + model1.parent.extractParamMap)
// 我們也可以使用 ParamMap 指定參數
// 支持多種用于指定參數的方法。
val paramMap = ParamMap(lr.maxIter -> 20)
// 指定 1 個參數,這將覆蓋原始的 maxIter
.put(lr.maxIter, 30)
// 指定多個參數
.put(lr.regParam -> 0.1, lr.threshold -> 0.55)
// 也可以結合使用 ParamMaps
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") //更改輸出列名稱
val paramMapCombined = paramMap ++ paramMap2
// 現在使用 paramMapCombined 參數學習一個新模型
// paramMapCombined 會覆蓋通過 lr.set *方法設置的所有參數
val model2 = lr.fit(training, paramMapCombined)
println("Model 2 was fit using parameters: " + model2.parent.extractParamMap)
// 準備測試數據
val test = spark.createDataFrame(Seq(
(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
(0.0, Vectors.dense(3.0, 2.0, -0.1)),
(1.0, Vectors.dense(0.0, 2.2, -1.5)))).toDF("label", "features")
// 使用 Transformer.transform()方法對測試數據進行預測
// LogisticRegression.transform 將僅使用“feature”列
// 注意:model2.transform()輸出的是“ myProbability”列,而不是'probability'列,
// 因為我們之前已重命名了 lr.probabilityCol 參數。
model2.transform(test)
.select("features", "label", "myProbability", "prediction")
.collect()
.foreach { case Row(features: Vector, label: Double, prob: Vector, prediction:
Double) =>
println(s"($features, $label) -> prob=$prob, prediction=$prediction")
}
}
}
```
**2. Pipeline使用案例**
```scala
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object PipelineInstance {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local[*]")
.appName(this.getClass.getName)
.getOrCreate()
// 從(id,text,label)元組列表準備測試文檔
val training:DataFrame = spark.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0))).toDF("id", "text", "label")
training.show()
// +---+----------------+-----+
// | id| text|label|
// +---+----------------+-----+
// | 0| a b c d e spark| 1.0|
// | 1| b d| 0.0|
// | 2| spark f g h| 1.0|
// | 3|hadoop mapreduce| 0.0|
// +---+----------------+-----+
// 配置 ML 管道,該管道包括三個階段:令牌生成器,hashingTF 和 lr
val tokenizer:Tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF:HashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr:LogisticRegression = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001)
val pipeline:Pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
// 使管道訓練文檔
val model:PipelineModel = pipeline.fit(training)
// 現在,我們可以選擇將已擬合的管道保存到磁盤
model.write.overwrite().save("F:/ml/tmp/spark-logistic-regression-model")
// 我們也可以將此不合適的管道保存到磁盤
pipeline.write.overwrite().save("F:/ml/tmp/unfit-lr-model")
// 在生產中將其加載回
val sameModel:PipelineModel = PipelineModel.load("F:/ml/tmp/spark-logistic-regression-model")
// 準備沒有標簽(id,text)元組的測試文檔
val test = spark.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "spark hadoop spark"),
(7L, "apache hadoop"))).toDF("id", "text")
// 對測試文檔進行預測
model.transform(test)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}
}
}
```