RDD Cache¶
Note
默认情况下,每次遇到 Action 算子,Spark 都会从头开始计算。
在一个 RDD 会做多个 Action 运算的情况下,缓存这个 RDD 是一个理想的选择。
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("cache").setMaster("local")
sc = SparkContext(conf=conf)
# 依旧是第一节的 Word Count
lineRDD = sc.textFile("../data/wikiOfSpark.txt")
wordRDD = lineRDD.flatMap(lambda line: line.split(" "))
cleanWordRDD = wordRDD.filter(lambda word: word != "")
kvRDD = cleanWordRDD.map(lambda word: (word, 1))
wordCount = kvRDD.reduceByKey(lambda x, y: x + y)
# 缓存
wordCount.cache()
# 触发缓存
wordCount.count()
1345
import time
start = time.time()
# 多次Action
for i in range(100):
wordCount.take(20)
print(time.time() - start)
2.3890490531921387
# 释放缓存
wordCount.unpersist()
PythonRDD[6] at RDD at PythonRDD.scala:53