广播变量 & 累加器

Note

在做应用开发时,总会有一些计算逻辑需要访问”全局变量“,需要在任意时刻对所有 Executors 都是可见的、共享的。
Spark 使用广播变量和累加器对这样的逻辑提供支持。

广播变量

from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("broadcast").setMaster("local")
sc = SparkContext(conf=conf) 
# 第一节的 Word Count
lineRDD = sc.textFile("../data/wikiOfSpark.txt")
wordRDD = lineRDD.flatMap(lambda line: line.split(" "))
# 这里我们希望只统计给定的单词
words = ["Apache", "Spark"]
# 创建广播变量
bc = sc.broadcast(words)
# 在 filter 时使用广播变量
cleanWordRDD = wordRDD.filter(lambda word: word in bc.value)
print(cleanWordRDD.take(5))
['Apache', 'Spark', 'Apache', 'Spark', 'Spark']

累加器

顾名思义,作用是做全局计数。

# 创建累加器,0为初始值
ac = sc.accumulator(0)

def f(x: str) -> bool:
    """过滤空字符串,但这次遇到空字符串累加器+1"""
    if x == "":
        ac.add(1)
        return False
    else:
        return True

# 在 filter 时使用累加器
cleanWordRDD = wordRDD.filter(f)
print(cleanWordRDD.count())
print(ac.value)
2579
79