Spark MLlib¶
Note
我们从 Airbnb 房屋租金预测这个小项目入手,来学习 Spark MLlib 的基本用法。
载入数据¶
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("design ml").getOrCreate()
filePath = "../data/sf-airbnb/sf-airbnb-clean.parquet/"
airbnbDF = spark.read.parquet(filePath)
airbnbDF.select("neighbourhood_cleansed", "room_type", "bedrooms", "bathrooms",
"number_of_reviews", "price").show(5)
+----------------------+---------------+--------+---------+-----------------+-----+
|neighbourhood_cleansed| room_type|bedrooms|bathrooms|number_of_reviews|price|
+----------------------+---------------+--------+---------+-----------------+-----+
| Western Addition|Entire home/apt| 1.0| 1.0| 180.0|170.0|
| Bernal Heights|Entire home/apt| 2.0| 1.0| 111.0|235.0|
| Haight Ashbury| Private room| 1.0| 4.0| 17.0| 65.0|
| Haight Ashbury| Private room| 1.0| 4.0| 8.0| 65.0|
| Western Addition|Entire home/apt| 2.0| 1.5| 27.0|785.0|
+----------------------+---------------+--------+---------+-----------------+-----+
only showing top 5 rows
# 创建训练和测试集
trainDF, testDF = airbnbDF.randomSplit([0.8, 0.2], seed=42)
特征工程¶
VectorAssembler
takes a list of input columns and creates a new DataFrame with an additional column which combines
the values of those input columns into a single vector:
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=["bedrooms"],
outputCol="features")
vecTrainDF = vecAssembler.transform(trainDF)
vecTrainDF.select("bedrooms", "features", "price").show(10)
+--------+--------+-----+
|bedrooms|features|price|
+--------+--------+-----+
| 1.0| [1.0]|200.0|
| 1.0| [1.0]|130.0|
| 1.0| [1.0]| 95.0|
| 1.0| [1.0]|250.0|
| 3.0| [3.0]|250.0|
| 1.0| [1.0]|115.0|
| 1.0| [1.0]|105.0|
| 1.0| [1.0]| 86.0|
| 1.0| [1.0]|100.0|
| 2.0| [2.0]|220.0|
+--------+--------+-----+
only showing top 10 rows
创建和训练模型¶
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol="features", labelCol="price")
lrModel = lr.fit(vecTrainDF)
m = round(lrModel.coefficients[0], 2)
b = round(lrModel.intercept, 2)
print(f"The formula for the linear regression line is price = {m}*bedrooms + {b}")
The formula for the linear regression line is price = 123.68*bedrooms + 47.51
使用 Pipeline¶
便于复用
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vecAssembler, lr])
pipelineModel = pipeline.fit(trainDF)
preDF = pipelineModel.transform(testDF)
preDF.select("bedrooms", "features", "price", "prediction").show(10)
+--------+--------+------+------------------+
|bedrooms|features| price| prediction|
+--------+--------+------+------------------+
| 1.0| [1.0]| 85.0|171.18598011578285|
| 1.0| [1.0]| 45.0|171.18598011578285|
| 1.0| [1.0]| 70.0|171.18598011578285|
| 1.0| [1.0]| 128.0|171.18598011578285|
| 1.0| [1.0]| 159.0|171.18598011578285|
| 2.0| [2.0]| 250.0|294.86172649777757|
| 1.0| [1.0]| 99.0|171.18598011578285|
| 1.0| [1.0]| 95.0|171.18598011578285|
| 1.0| [1.0]| 100.0|171.18598011578285|
| 1.0| [1.0]|2010.0|171.18598011578285|
+--------+--------+------+------------------+
only showing top 10 rows
One-hot 编码¶
from pyspark.ml.feature import OneHotEncoder, StringIndexer
categoricalCols = [field for (field, dataType) in trainDF.dtypes
if dataType == "string"]
indexOutputCols = [x + "Index" for x in categoricalCols]
oheOutputCols = [x + "OHE" for x in categoricalCols]
# 首先使用 StringIndexer 把字符串转化为 index
stringIndexer = StringIndexer(inputCols=categoricalCols,
outputCols=indexOutputCols,
handleInvalid="skip")
# 再使用 OneHotEncoder 把 index 转化为 one-hot
oheEncoder = OneHotEncoder(inputCols=indexOutputCols,
outputCols=oheOutputCols)
numericalCols = [field for (field, dataType) in trainDF.dtypes
if ((dataType == "double") & (field != "price"))]
vecAssembler = VectorAssembler(inputCols=oheOutputCols + numericalCols,
outputCol="features")
lr = LinearRegression(labelCol="price", featuresCol="features")
pipeline = Pipeline(stages=[stringIndexer, oheEncoder, vecAssembler, lr])
pipelineModel = pipeline.fit(trainDF)
predDF = pipelineModel.transform(testDF)
predDF.select("features", "price", "prediction").show(5)
+--------------------+-----+------------------+
| features|price| prediction|
+--------------------+-----+------------------+
|(98,[0,3,6,22,43,...| 85.0| 55.24365707389188|
|(98,[0,3,6,22,43,...| 45.0|23.357685914717877|
|(98,[0,3,6,22,43,...| 70.0|28.474464479034395|
|(98,[0,3,6,12,42,...|128.0| -91.6079079594947|
|(98,[0,3,6,12,43,...|159.0| 95.05688229945372|
+--------------------+-----+------------------+
only showing top 5 rows
模型评估¶
from pyspark.ml.evaluation import RegressionEvaluator
regressionEvaluator = RegressionEvaluator(predictionCol="prediction",
labelCol="price",
metricName="rmse")
rmse = regressionEvaluator.evaluate(predDF)
print(f"RMSE is {rmse:.1f}")
RMSE is 220.6
Another metric:
\[R^{2} = 1 - \frac{SS_{res}}{SS_{tot}}\]
where \(SS_{tot}\) is the total sum of square if you always predict \(\bar{y}\):
\[SS_{tot} = \sum_{i=1}^{n}(y_{i} - \bar{y})^{2}\]
and \(SS_{res}\) is the sum of residuals squared from your model predictions:
\[SS_{res} = \sum_{i=1}^{n}(y_{i} - \hat{y}_{i})^{2}\]
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
print(f"R2 is {r2}")
R2 is 0.16043316698848087
保存和加载模型¶
# 保存
pipelinePath = "../data/tmp/lr-pipeline-model"
pipelineModel.write().overwrite().save(pipelinePath)
from pyspark.ml import PipelineModel
# 加载
savedPipelineModel = PipelineModel.load(pipelinePath)
树模型¶
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(labelCol="price")
# 在 DecisionTree 中不使用 one-hot 编码
vecAssembler = VectorAssembler(inputCols=indexOutputCols + numericalCols,
outputCol="features")
# crucial for performing distributed training
# requires maxBins >= the number of values in each categorical feature
dt.setMaxBins(40)
DecisionTreeRegressor_bb58b45f8dac
stages = [stringIndexer, vecAssembler, dt]
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(trainDF)
# extract the if-then-else rules learned by the decision tree
dtModel = pipelineModel.stages[-1]
print(dtModel.toDebugString[: 1000])
DecisionTreeRegressionModel: uid=DecisionTreeRegressor_bb58b45f8dac, depth=5, numNodes=47, numFeatures=33
If (feature 12 <= 2.5)
If (feature 12 <= 1.5)
If (feature 5 in {1.0,2.0})
If (feature 4 in {0.0,1.0,3.0,5.0,9.0,10.0,11.0,13.0,14.0,16.0,18.0,24.0})
If (feature 3 in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0,20.0,21.0,23.0,24.0,25.0,26.0,27.0,28.0,29.0,30.0,31.0,32.0,33.0,34.0})
Predict: 104.23992784125075
Else (feature 3 not in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0,20.0,21.0,23.0,24.0,25.0,26.0,27.0,28.0,29.0,30.0,31.0,32.0,33.0,34.0})
Predict: 250.7111111111111
Else (feature 4 not in {0.0,1.0,3.0,5.0,9.0,10.0,11.0,13.0,14.0,16.0,18.0,24.0})
If (feature 3 in {0.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0,20.0,21.0,22.0,23.0,27.0,33.0,35.0})
Predict: 151.94179894179894
Else (feat
import pandas as pd
# extract the feature importance scores
featureImp = pd.DataFrame(list(zip(vecAssembler.getInputCols(),
dtModel.featureImportances)),
columns=["feature", "importance"])
featureImp.sort_values(by="importance", ascending=False)[: 10]
feature | importance | |
---|---|---|
12 | bedrooms | 0.283406 |
1 | cancellation_policyIndex | 0.167893 |
2 | instant_bookableIndex | 0.140081 |
4 | property_typeIndex | 0.128179 |
15 | number_of_reviews | 0.126233 |
3 | neighbourhood_cleansedIndex | 0.056200 |
9 | longitude | 0.038810 |
14 | minimum_nights | 0.029473 |
13 | beds | 0.015218 |
5 | room_typeIndex | 0.010905 |
随机森林¶
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(labelCol="price", maxBins=40, seed=42)
k-折交叉验证¶
from pyspark.ml.tuning import ParamGridBuilder
# 创建随机森林 pipeline
pipeline = Pipeline(stages = [stringIndexer, vecAssembler, rf])
# 搜索空间
paramGrid = (ParamGridBuilder()
.addGrid(rf.maxDepth, [2, 4, 6])
.addGrid(rf.numTrees, [10, 100])
.build())
evaluator = RegressionEvaluator(labelCol="price",
predictionCol="prediction",
metricName="rmse")
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=pipeline,
evaluator=evaluator,
estimatorParamMaps=paramGrid,
numFolds=3, seed=42)
cvModel = cv.fit(trainDF)
# 获取验证结果
list(zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics))
[({Param(parent='RandomForestRegressor_7881b42f1736', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 2,
Param(parent='RandomForestRegressor_7881b42f1736', name='numTrees', doc='Number of trees to train (>= 1).'): 10},
291.18226409247836),
({Param(parent='RandomForestRegressor_7881b42f1736', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 2,
Param(parent='RandomForestRegressor_7881b42f1736', name='numTrees', doc='Number of trees to train (>= 1).'): 100},
286.7714750274078),
({Param(parent='RandomForestRegressor_7881b42f1736', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 4,
Param(parent='RandomForestRegressor_7881b42f1736', name='numTrees', doc='Number of trees to train (>= 1).'): 10},
287.6963245160818),
({Param(parent='RandomForestRegressor_7881b42f1736', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 4,
Param(parent='RandomForestRegressor_7881b42f1736', name='numTrees', doc='Number of trees to train (>= 1).'): 100},
279.99270572360797),
({Param(parent='RandomForestRegressor_7881b42f1736', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 6,
Param(parent='RandomForestRegressor_7881b42f1736', name='numTrees', doc='Number of trees to train (>= 1).'): 10},
294.34810870889305),
({Param(parent='RandomForestRegressor_7881b42f1736', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 6,
Param(parent='RandomForestRegressor_7881b42f1736', name='numTrees', doc='Number of trees to train (>= 1).'): 100},
275.3986270472998)]
优化 Pipeline¶
Even though each of the models in the cross validator is technically independent, spark.ml actually trains the collection of models sequentially rather than in parallel.
# 加速训练,并行度为4
cvModel = cv.setParallelism(4).fit(trainDF)
cv = CrossValidator(estimator=rf,
evaluator=evaluator,
estimatorParamMaps=paramGrid,
numFolds=3,
parallelism=4,
seed=42)
# won’t be reevaluating the StringIndexer
# 速度更快一些
pipeline = Pipeline(stages=[stringIndexer, vecAssembler, cv])
pipelineModel = pipeline.fit(trainDF)