函数¶
Note
除了 DataFrame 算子,了解 Spark SQL 的常用函数对于我们的应用开发也是非常关键的。
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = (SparkSession
.builder
.appName("functions")
.config('spark.executor.memory', '2g')
.config('spark.driver.memory', '4g')
.config("spark.executor.cores", 2)
.config('spark.cores.max', 4)
.getOrCreate())
isin¶
df = spark.createDataFrame([("Bob", 5), ("Alice", 2)], ("name", "age"))
df.show()
+-----+---+
| name|age|
+-----+---+
| Bob| 5|
|Alice| 2|
+-----+---+
# 列函数,起过滤作用
df[df.name.isin(["Bob", "Mike"])].show()
+----+---+
|name|age|
+----+---+
| Bob| 5|
+----+---+
when¶
# 原来等于2岁的变3岁,其余变4岁
df.select(F.when(df["age"] == 2, 3).otherwise(4).alias("new age")).show()
+-------+
|new age|
+-------+
| 4|
| 3|
+-------+
# 没有 otherwise 其余值为 None
df.select(F.when(df.age == 2, df.age + 1).alias("new age")).show()
+-------+
|new age|
+-------+
| null|
| 3|
+-------+
substring¶
# 姓名的前两个字符,从第1个字符起的2个字符
df.select(F.substring(df["name"], 1, 2).alias("sub name")).show()
+--------+
|sub name|
+--------+
| Bo|
| Al|
+--------+
concat¶
df = spark.createDataFrame([('abcd', 123)], ['s', 'd'])
df.show()
+----+---+
| s| d|
+----+---+
|abcd|123|
+----+---+
# 把两列 concat 起来
df.select(F.concat(df.s, df.d).alias('s')).show()
+-------+
| s|
+-------+
|abcd123|
+-------+
# 也可以 concat 列表,若有 None 则 concat 的结果也为 None
arr_df = spark.createDataFrame([([1, 2], [3, 4], [5]),
([1, 2], None, [3])],
['a', 'b', 'c'])
arr_df.select(F.concat(arr_df.a, arr_df.b, arr_df.c).alias('arr')).show()
+---------------+
| arr|
+---------------+
|[1, 2, 3, 4, 5]|
| null|
+---------------+
concat_ws¶
# 类似于 concat,它把多列并成一个字符串,自己指定 separator
df.select(F.concat_ws('-', df.s, df.d).alias('s')).show()
+--------+
| s|
+--------+
|abcd-123|
+--------+
collect_set¶
Aggregate function: returns a set of objects with duplicate elements eliminated.
age_df = spark.createDataFrame([(2,), (5,), (5,)], ('age',))
age_df.agg(F.collect_set("age").alias("age set")).show()
+-------+
|age set|
+-------+
| [5, 2]|
+-------+
coalesce¶
Returns the first column that is not null.
null_df = spark.createDataFrame([(None, None), (1, None), (None, 2)], ("a", "b"))
null_df.show()
+----+----+
| a| b|
+----+----+
|null|null|
| 1|null|
|null| 2|
+----+----+
# 如果a的值非 null 就用a,否则用 b 的值
null_df.select(F.coalesce(null_df["a"], null_df["b"])).show()
+--------------+
|coalesce(a, b)|
+--------------+
| null|
| 1|
| 2|
+--------------+
regexp_replace¶
Replace all substrings of the specified string value that match regexp with rep.
df = spark.createDataFrame([('100-200',)], ['str'])
# 一连串数字转化为--
df.select(F.regexp_replace('str', r'(\d+)', '--').alias('num to dash')).show()
+-----------+
|num to dash|
+-----------+
| -----|
+-----------+
regexp_extract¶
Extract a specific group matched by a Java regex, from the specified string column. If the regex did not match, or the specified group did not match, an empty string is returned.
"""
第三个参数:
0表示显示与之匹配的整个字符串
1表示显示第 1 个括号里面的字段
2表示显示第 2 个括号里面的字段
"""
df.select(F.regexp_extract('str', r'(\d+)-(\d+)', 1).alias('d')).show()
+---+
| d|
+---+
|100|
+---+
udf¶
Creates a user defined function (UDF).
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
# lambda 定义
slen = udf(lambda s: len(s), IntegerType())
@udf
def to_upper(s):
# 显示变大写
if s is not None:
return s.upper()
@udf(returnType=IntegerType())
def add_one(x):
# 显示+1
if x is not None:
return x + 1
df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
# 就像使用内置函数一样
df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show()
+----------+--------------+------------+
|slen(name)|to_upper(name)|add_one(age)|
+----------+--------------+------------+
| 8| JOHN DOE| 22|
+----------+--------------+------------+