DataFrame 常用算子

Note

可以将 DataFrame 的常用算子分为探索类算子、清洗类算子、转换类算子、分析类算子和持久化算子。

from pyspark.sql import SparkSession

spark = (SparkSession
         .builder
         .appName("df operators")
         .config('spark.executor.memory', '3g')
         .config('spark.driver.memory', '6g')
         .config("spark.executor.cores", 2)
         .config('spark.cores.max', 4)
         .getOrCreate())
import pyspark.sql.functions as F

# 创建雇员表并持久化
lst = [(1, "John", 18, "Male"), 
       (2, "Lily", 25, "Female"), 
       (3, "Raymond", 30, "Male"),
       (4, None, 19, "Female"),
       (2, "Lily", 25, "Female")]
employeesDF = spark.createDataFrame(lst, ["id", "name", "age", "gender"])
employeesDF.cache()
DataFrame[id: bigint, name: string, age: bigint, gender: string]

探索类算子

# 打印 schema
employeesDF.printSchema()
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)
# schema 属性
employeesDF.schema
StructType(List(StructField(id,LongType,true),StructField(name,StringType,true),StructField(age,LongType,true),StructField(gender,StringType,true)))
# columns 属性
employeesDF.columns
['id', 'name', 'age', 'gender']
# 一行数据
employeesDF.first()
Row(id=1, name='John', age=18, gender='Male')
# 多行数据
employeesDF.take(2)
[Row(id=1, name='John', age=18, gender='Male'),
 Row(id=2, name='Lily', age=25, gender='Female')]
# 全部数据
employeesDF.collect()
[Row(id=1, name='John', age=18, gender='Male'),
 Row(id=2, name='Lily', age=25, gender='Female'),
 Row(id=3, name='Raymond', age=30, gender='Male'),
 Row(id=4, name=None, age=19, gender='Female'),
 Row(id=2, name='Lily', age=25, gender='Female')]
# 打印数据,默认20行
employeesDF.show()
+---+-------+---+------+
| id|   name|age|gender|
+---+-------+---+------+
|  1|   John| 18|  Male|
|  2|   Lily| 25|Female|
|  3|Raymond| 30|  Male|
|  4|   null| 19|Female|
|  2|   Lily| 25|Female|
+---+-------+---+------+
# 查看列分布
employeesDF.describe(["age"]).show()
+-------+-----------------+
|summary|              age|
+-------+-----------------+
|  count|                5|
|   mean|             23.4|
| stddev|4.929503017546495|
|    min|               18|
|    max|               30|
+-------+-----------------+

清洗类算子

# 去掉一列
employeesDF.drop("gender").show()
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|   John| 18|
|  2|   Lily| 25|
|  3|Raymond| 30|
|  4|   null| 19|
|  2|   Lily| 25|
+---+-------+---+
# 去重
employeesDF.distinct().show()
+---+-------+---+------+
| id|   name|age|gender|
+---+-------+---+------+
|  1|   John| 18|  Male|
|  2|   Lily| 25|Female|
|  3|Raymond| 30|  Male|
|  4|   null| 19|Female|
+---+-------+---+------+
# 针对某几列去重
employeesDF.dropDuplicates(['gender']).show()
+---+----+---+------+
| id|name|age|gender|
+---+----+---+------+
|  2|Lily| 25|Female|
|  1|John| 18|  Male|
+---+----+---+------+
# 删除带 null 值的数据记录
employeesDF.na.drop().show()
+---+-------+---+------+
| id|   name|age|gender|
+---+-------+---+------+
|  1|   John| 18|  Male|
|  2|   Lily| 25|Female|
|  3|Raymond| 30|  Male|
|  2|   Lily| 25|Female|
+---+-------+---+------+
# null 值填充
employeesDF.na.fill("?").show()
+---+-------+---+------+
| id|   name|age|gender|
+---+-------+---+------+
|  1|   John| 18|  Male|
|  2|   Lily| 25|Female|
|  3|Raymond| 30|  Male|
|  4|      ?| 19|Female|
|  2|   Lily| 25|Female|
+---+-------+---+------+

转换类算子

# 提取列
employeesDF.select("name", "gender").show()
+-------+------+
|   name|gender|
+-------+------+
|   John|  Male|
|   Lily|Female|
|Raymond|  Male|
|   null|Female|
|   Lily|Female|
+-------+------+
# 提取列,支持 expr
employeesDF.selectExpr("id", "name", "concat(id, '_', name) as id_name").show()
+---+-------+---------+
| id|   name|  id_name|
+---+-------+---------+
|  1|   John|   1_John|
|  2|   Lily|   2_Lily|
|  3|Raymond|3_Raymond|
|  4|   null|     null|
|  2|   Lily|   2_Lily|
+---+-------+---------+
# where 就是 filter
df = employeesDF.where(F.col("id") <= 2)
df.show()
+---+----+---+------+
| id|name|age|gender|
+---+----+---+------+
|  1|John| 18|  Male|
|  2|Lily| 25|Female|
|  2|Lily| 25|Female|
+---+----+---+------+
# 字段重命名
employeesDF.withColumnRenamed("gender", "sex").show()
+---+-------+---+------+
| id|   name|age|   sex|
+---+-------+---+------+
|  1|   John| 18|  Male|
|  2|   Lily| 25|Female|
|  3|Raymond| 30|  Male|
|  4|   null| 19|Female|
|  2|   Lily| 25|Female|
+---+-------+---+------+
# 生成新的数据列
employeesDF.withColumn("new age", F.col("age") + 1).show()
+---+-------+---+------+-------+
| id|   name|age|gender|new age|
+---+-------+---+------+-------+
|  1|   John| 18|  Male|     19|
|  2|   Lily| 25|Female|     26|
|  3|Raymond| 30|  Male|     31|
|  4|   null| 19|Female|     20|
|  2|   Lily| 25|Female|     26|
+---+-------+---+------+-------+
# 同类型 DataFrame 合并
employeesDF.union(df).show()
+---+-------+---+------+
| id|   name|age|gender|
+---+-------+---+------+
|  1|   John| 18|  Male|
|  2|   Lily| 25|Female|
|  3|Raymond| 30|  Male|
|  4|   null| 19|Female|
|  2|   Lily| 25|Female|
|  1|   John| 18|  Male|
|  2|   Lily| 25|Female|
|  2|   Lily| 25|Female|
+---+-------+---+------+
# 采样,参数为:是否有放回,比例,随机数种子
employeesDF.sample(True, 0.5, 42).show()
+---+----+---+------+
| id|name|age|gender|
+---+----+---+------+
|  1|John| 18|  Male|
+---+----+---+------+

分析类算子

毫不夸张的说,前面的探索、清洗、转换,都是在为数据分析做准备。
在大多数数据应用中,数据分析往往是最关键的,甚至是应用本身的核心目的。

# 薪水表
salaries = spark.createDataFrame([(1, 26000), (2, 30000), (3, 25000), (4, 20000)], 
                                 ["id", "salary"])
# 和另一个表做 inner join
fullInfo = salaries.join(employeesDF, "id", "inner")
fullInfo.show()
+---+------+-------+---+------+
| id|salary|   name|age|gender|
+---+------+-------+---+------+
|  1| 26000|   John| 18|  Male|
|  2| 30000|   Lily| 25|Female|
|  2| 30000|   Lily| 25|Female|
|  3| 25000|Raymond| 30|  Male|
|  4| 20000|   null| 19|Female|
+---+------+-------+---+------+
# groupBy 聚合
# agg 统计并生成新列
aggResult = (fullInfo.groupBy("gender")
             .agg(F.sum("salary").alias("sum_salary"), 
                  F.avg("salary").alias("avg_salary")))
aggResult.show()
+------+----------+------------------+
|gender|sum_salary|        avg_salary|
+------+----------+------------------+
|  Male|     51000|           25500.0|
|Female|     80000|26666.666666666668|
+------+----------+------------------+
# orderBy 等同于 sort
aggResult.sort("sum_salary", ascending=True).show()
+------+----------+------------------+
|gender|sum_salary|        avg_salary|
+------+----------+------------------+
|  Male|     51000|           25500.0|
|Female|     80000|26666.666666666668|
+------+----------+------------------+

持久化算子

write API,它类似于 read API

import os

path = "../data/agg.csv"
if not os.path.exists(path):
    aggResult.write.format("csv").option("header", True).save(path)