Spark MLlib

Note

我们从 Airbnb 房屋租金预测这个小项目入手,来学习 Spark MLlib 的基本用法。

载入数据

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("design ml").getOrCreate()
filePath = "../data/sf-airbnb/sf-airbnb-clean.parquet/"
airbnbDF = spark.read.parquet(filePath)
airbnbDF.select("neighbourhood_cleansed", "room_type", "bedrooms", "bathrooms",
                "number_of_reviews", "price").show(5)
+----------------------+---------------+--------+---------+-----------------+-----+
|neighbourhood_cleansed|      room_type|bedrooms|bathrooms|number_of_reviews|price|
+----------------------+---------------+--------+---------+-----------------+-----+
|      Western Addition|Entire home/apt|     1.0|      1.0|            180.0|170.0|
|        Bernal Heights|Entire home/apt|     2.0|      1.0|            111.0|235.0|
|        Haight Ashbury|   Private room|     1.0|      4.0|             17.0| 65.0|
|        Haight Ashbury|   Private room|     1.0|      4.0|              8.0| 65.0|
|      Western Addition|Entire home/apt|     2.0|      1.5|             27.0|785.0|
+----------------------+---------------+--------+---------+-----------------+-----+
only showing top 5 rows
# 创建训练和测试集
trainDF, testDF = airbnbDF.randomSplit([0.8, 0.2], seed=42)

特征工程

VectorAssembler takes a list of input columns and creates a new DataFrame with an additional column which combines the values of those input columns into a single vector:

from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(inputCols=["bedrooms"], 
                               outputCol="features")
vecTrainDF = vecAssembler.transform(trainDF)
vecTrainDF.select("bedrooms", "features", "price").show(10)
+--------+--------+-----+
|bedrooms|features|price|
+--------+--------+-----+
|     1.0|   [1.0]|200.0|
|     1.0|   [1.0]|130.0|
|     1.0|   [1.0]| 95.0|
|     1.0|   [1.0]|250.0|
|     3.0|   [3.0]|250.0|
|     1.0|   [1.0]|115.0|
|     1.0|   [1.0]|105.0|
|     1.0|   [1.0]| 86.0|
|     1.0|   [1.0]|100.0|
|     2.0|   [2.0]|220.0|
+--------+--------+-----+
only showing top 10 rows

创建和训练模型

from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol="features", labelCol="price")
lrModel = lr.fit(vecTrainDF)
m = round(lrModel.coefficients[0], 2)
b = round(lrModel.intercept, 2)
print(f"The formula for the linear regression line is price = {m}*bedrooms + {b}")
The formula for the linear regression line is price = 123.68*bedrooms + 47.51

使用 Pipeline

便于复用

from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[vecAssembler, lr])
pipelineModel = pipeline.fit(trainDF)
preDF = pipelineModel.transform(testDF)
preDF.select("bedrooms", "features", "price", "prediction").show(10)
+--------+--------+------+------------------+
|bedrooms|features| price|        prediction|
+--------+--------+------+------------------+
|     1.0|   [1.0]|  85.0|171.18598011578285|
|     1.0|   [1.0]|  45.0|171.18598011578285|
|     1.0|   [1.0]|  70.0|171.18598011578285|
|     1.0|   [1.0]| 128.0|171.18598011578285|
|     1.0|   [1.0]| 159.0|171.18598011578285|
|     2.0|   [2.0]| 250.0|294.86172649777757|
|     1.0|   [1.0]|  99.0|171.18598011578285|
|     1.0|   [1.0]|  95.0|171.18598011578285|
|     1.0|   [1.0]| 100.0|171.18598011578285|
|     1.0|   [1.0]|2010.0|171.18598011578285|
+--------+--------+------+------------------+
only showing top 10 rows

One-hot 编码

from pyspark.ml.feature import OneHotEncoder, StringIndexer

categoricalCols = [field for (field, dataType) in trainDF.dtypes 
                   if dataType == "string"]
indexOutputCols = [x + "Index" for x in categoricalCols]
oheOutputCols = [x + "OHE" for x in categoricalCols]

# 首先使用 StringIndexer 把字符串转化为 index
stringIndexer = StringIndexer(inputCols=categoricalCols, 
                              outputCols=indexOutputCols, 
                              handleInvalid="skip")
# 再使用 OneHotEncoder 把 index 转化为 one-hot
oheEncoder = OneHotEncoder(inputCols=indexOutputCols,
                           outputCols=oheOutputCols)

numericalCols = [field for (field, dataType) in trainDF.dtypes 
                 if ((dataType == "double") & (field != "price"))]
vecAssembler = VectorAssembler(inputCols=oheOutputCols + numericalCols,
                               outputCol="features")
lr = LinearRegression(labelCol="price", featuresCol="features")

pipeline = Pipeline(stages=[stringIndexer, oheEncoder, vecAssembler, lr])
pipelineModel = pipeline.fit(trainDF)

predDF = pipelineModel.transform(testDF)
predDF.select("features", "price", "prediction").show(5)
+--------------------+-----+------------------+
|            features|price|        prediction|
+--------------------+-----+------------------+
|(98,[0,3,6,22,43,...| 85.0| 55.24365707389188|
|(98,[0,3,6,22,43,...| 45.0|23.357685914717877|
|(98,[0,3,6,22,43,...| 70.0|28.474464479034395|
|(98,[0,3,6,12,42,...|128.0| -91.6079079594947|
|(98,[0,3,6,12,43,...|159.0| 95.05688229945372|
+--------------------+-----+------------------+
only showing top 5 rows

模型评估

from pyspark.ml.evaluation import RegressionEvaluator

regressionEvaluator = RegressionEvaluator(predictionCol="prediction", 
                                          labelCol="price", 
                                          metricName="rmse")
rmse = regressionEvaluator.evaluate(predDF)
print(f"RMSE is {rmse:.1f}")
RMSE is 220.6

Another metric:

\[R^{2} = 1 - \frac{SS_{res}}{SS_{tot}}\]

where \(SS_{tot}\) is the total sum of square if you always predict \(\bar{y}\):

\[SS_{tot} = \sum_{i=1}^{n}(y_{i} - \bar{y})^{2}\]

and \(SS_{res}\) is the sum of residuals squared from your model predictions:

\[SS_{res} = \sum_{i=1}^{n}(y_{i} - \hat{y}_{i})^{2}\]
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
print(f"R2 is {r2}")
R2 is 0.16043316698848087

保存和加载模型

# 保存
pipelinePath = "../data/tmp/lr-pipeline-model"
pipelineModel.write().overwrite().save(pipelinePath)
from pyspark.ml import PipelineModel

# 加载
savedPipelineModel = PipelineModel.load(pipelinePath)

树模型

from pyspark.ml.regression import DecisionTreeRegressor

dt = DecisionTreeRegressor(labelCol="price")

# 在 DecisionTree 中不使用 one-hot 编码
vecAssembler = VectorAssembler(inputCols=indexOutputCols + numericalCols, 
                               outputCol="features")
# crucial for performing distributed training
# requires maxBins >= the number of values in each categorical feature
dt.setMaxBins(40)
DecisionTreeRegressor_bb58b45f8dac
stages = [stringIndexer, vecAssembler, dt]
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(trainDF)
# extract the if-then-else rules learned by the decision tree
dtModel = pipelineModel.stages[-1]
print(dtModel.toDebugString[: 1000])
DecisionTreeRegressionModel: uid=DecisionTreeRegressor_bb58b45f8dac, depth=5, numNodes=47, numFeatures=33
  If (feature 12 <= 2.5)
   If (feature 12 <= 1.5)
    If (feature 5 in {1.0,2.0})
     If (feature 4 in {0.0,1.0,3.0,5.0,9.0,10.0,11.0,13.0,14.0,16.0,18.0,24.0})
      If (feature 3 in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0,20.0,21.0,23.0,24.0,25.0,26.0,27.0,28.0,29.0,30.0,31.0,32.0,33.0,34.0})
       Predict: 104.23992784125075
      Else (feature 3 not in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0,20.0,21.0,23.0,24.0,25.0,26.0,27.0,28.0,29.0,30.0,31.0,32.0,33.0,34.0})
       Predict: 250.7111111111111
     Else (feature 4 not in {0.0,1.0,3.0,5.0,9.0,10.0,11.0,13.0,14.0,16.0,18.0,24.0})
      If (feature 3 in {0.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0,20.0,21.0,22.0,23.0,27.0,33.0,35.0})
       Predict: 151.94179894179894
      Else (feat
import pandas as pd

# extract the feature importance scores
featureImp = pd.DataFrame(list(zip(vecAssembler.getInputCols(), 
                                   dtModel.featureImportances)), 
                          columns=["feature", "importance"])
featureImp.sort_values(by="importance", ascending=False)[: 10]
feature importance
12 bedrooms 0.283406
1 cancellation_policyIndex 0.167893
2 instant_bookableIndex 0.140081
4 property_typeIndex 0.128179
15 number_of_reviews 0.126233
3 neighbourhood_cleansedIndex 0.056200
9 longitude 0.038810
14 minimum_nights 0.029473
13 beds 0.015218
5 room_typeIndex 0.010905

随机森林

from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(labelCol="price", maxBins=40, seed=42)

k-折交叉验证

from pyspark.ml.tuning import ParamGridBuilder

# 创建随机森林 pipeline
pipeline = Pipeline(stages = [stringIndexer, vecAssembler, rf])
# 搜索空间
paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.numTrees, [10, 100])
             .build())
evaluator = RegressionEvaluator(labelCol="price",
                                predictionCol="prediction",
                                metricName="rmse")
from pyspark.ml.tuning import CrossValidator

cv = CrossValidator(estimator=pipeline, 
                    evaluator=evaluator, 
                    estimatorParamMaps=paramGrid, 
                    numFolds=3, seed=42)
cvModel = cv.fit(trainDF)
# 获取验证结果
list(zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics))
[({Param(parent='RandomForestRegressor_7881b42f1736', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 2,
   Param(parent='RandomForestRegressor_7881b42f1736', name='numTrees', doc='Number of trees to train (>= 1).'): 10},
  291.18226409247836),
 ({Param(parent='RandomForestRegressor_7881b42f1736', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 2,
   Param(parent='RandomForestRegressor_7881b42f1736', name='numTrees', doc='Number of trees to train (>= 1).'): 100},
  286.7714750274078),
 ({Param(parent='RandomForestRegressor_7881b42f1736', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 4,
   Param(parent='RandomForestRegressor_7881b42f1736', name='numTrees', doc='Number of trees to train (>= 1).'): 10},
  287.6963245160818),
 ({Param(parent='RandomForestRegressor_7881b42f1736', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 4,
   Param(parent='RandomForestRegressor_7881b42f1736', name='numTrees', doc='Number of trees to train (>= 1).'): 100},
  279.99270572360797),
 ({Param(parent='RandomForestRegressor_7881b42f1736', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 6,
   Param(parent='RandomForestRegressor_7881b42f1736', name='numTrees', doc='Number of trees to train (>= 1).'): 10},
  294.34810870889305),
 ({Param(parent='RandomForestRegressor_7881b42f1736', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 6,
   Param(parent='RandomForestRegressor_7881b42f1736', name='numTrees', doc='Number of trees to train (>= 1).'): 100},
  275.3986270472998)]

优化 Pipeline

Even though each of the models in the cross validator is technically independent, spark.ml actually trains the collection of models sequentially rather than in parallel.

# 加速训练,并行度为4
cvModel = cv.setParallelism(4).fit(trainDF)
cv = CrossValidator(estimator=rf,
                    evaluator=evaluator,
                    estimatorParamMaps=paramGrid,
                    numFolds=3,
                    parallelism=4,
                    seed=42)

# won’t be reevaluating the StringIndexer
# 速度更快一些
pipeline = Pipeline(stages=[stringIndexer, vecAssembler, cv])
pipelineModel = pipeline.fit(trainDF)