函数

Note

除了 DataFrame 算子,了解 Spark SQL 的常用函数对于我们的应用开发也是非常关键的。

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = (SparkSession
         .builder
         .appName("functions")
         .config('spark.executor.memory', '2g')
         .config('spark.driver.memory', '4g')
         .config("spark.executor.cores", 2)
         .config('spark.cores.max', 4)
         .getOrCreate())

isin

df = spark.createDataFrame([("Bob", 5), ("Alice", 2)], ("name", "age"))
df.show()
+-----+---+
| name|age|
+-----+---+
|  Bob|  5|
|Alice|  2|
+-----+---+
# 列函数,起过滤作用
df[df.name.isin(["Bob", "Mike"])].show()
+----+---+
|name|age|
+----+---+
| Bob|  5|
+----+---+

when

# 原来等于2岁的变3岁,其余变4岁
df.select(F.when(df["age"] == 2, 3).otherwise(4).alias("new age")).show()
+-------+
|new age|
+-------+
|      4|
|      3|
+-------+
# 没有 otherwise 其余值为 None
df.select(F.when(df.age == 2, df.age + 1).alias("new age")).show()
+-------+
|new age|
+-------+
|   null|
|      3|
+-------+

substring

# 姓名的前两个字符,从第1个字符起的2个字符
df.select(F.substring(df["name"], 1, 2).alias("sub name")).show()
+--------+
|sub name|
+--------+
|      Bo|
|      Al|
+--------+

concat

df = spark.createDataFrame([('abcd', 123)], ['s', 'd'])
df.show()
+----+---+
|   s|  d|
+----+---+
|abcd|123|
+----+---+
# 把两列 concat 起来
df.select(F.concat(df.s, df.d).alias('s')).show()
+-------+
|      s|
+-------+
|abcd123|
+-------+
# 也可以 concat 列表,若有 None 则 concat 的结果也为 None
arr_df = spark.createDataFrame([([1, 2], [3, 4], [5]), 
                                ([1, 2], None, [3])], 
                               ['a', 'b', 'c'])
arr_df.select(F.concat(arr_df.a, arr_df.b, arr_df.c).alias('arr')).show()
+---------------+
|            arr|
+---------------+
|[1, 2, 3, 4, 5]|
|           null|
+---------------+

concat_ws

# 类似于 concat,它把多列并成一个字符串,自己指定 separator
df.select(F.concat_ws('-', df.s, df.d).alias('s')).show()
+--------+
|       s|
+--------+
|abcd-123|
+--------+

collect_set

Aggregate function: returns a set of objects with duplicate elements eliminated.

age_df = spark.createDataFrame([(2,), (5,), (5,)], ('age',))
age_df.agg(F.collect_set("age").alias("age set")).show()
+-------+
|age set|
+-------+
| [5, 2]|
+-------+

coalesce

Returns the first column that is not null.

null_df = spark.createDataFrame([(None, None), (1, None), (None, 2)], ("a", "b"))
null_df.show()
+----+----+
|   a|   b|
+----+----+
|null|null|
|   1|null|
|null|   2|
+----+----+
# 如果a的值非 null 就用a,否则用 b 的值
null_df.select(F.coalesce(null_df["a"], null_df["b"])).show()
+--------------+
|coalesce(a, b)|
+--------------+
|          null|
|             1|
|             2|
+--------------+

regexp_replace

Replace all substrings of the specified string value that match regexp with rep.

df = spark.createDataFrame([('100-200',)], ['str'])

# 一连串数字转化为--
df.select(F.regexp_replace('str', r'(\d+)', '--').alias('num to dash')).show()
+-----------+
|num to dash|
+-----------+
|      -----|
+-----------+

regexp_extract

Extract a specific group matched by a Java regex, from the specified string column. If the regex did not match, or the specified group did not match, an empty string is returned.

"""
第三个参数:
0表示显示与之匹配的整个字符串
1表示显示第 1 个括号里面的字段
2表示显示第 2 个括号里面的字段
"""
df.select(F.regexp_extract('str', r'(\d+)-(\d+)', 1).alias('d')).show()
+---+
|  d|
+---+
|100|
+---+

udf

Creates a user defined function (UDF).

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# lambda 定义
slen = udf(lambda s: len(s), IntegerType())
@udf
def to_upper(s):
    # 显示变大写
    if s is not None:
        return s.upper()
    
@udf(returnType=IntegerType())
def add_one(x):
    # 显示+1
    if x is not None:
        return x + 1
df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
# 就像使用内置函数一样
df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show()
+----------+--------------+------------+
|slen(name)|to_upper(name)|add_one(age)|
+----------+--------------+------------+
|         8|      JOHN DOE|          22|
+----------+--------------+------------+