DataFrame 常用算子¶
Note
可以将 DataFrame 的常用算子分为探索类算子、清洗类算子、转换类算子、分析类算子和持久化算子。
from pyspark.sql import SparkSession
spark = (SparkSession
.builder
.appName("df operators")
.config('spark.executor.memory', '3g')
.config('spark.driver.memory', '6g')
.config("spark.executor.cores", 2)
.config('spark.cores.max', 4)
.getOrCreate())
import pyspark.sql.functions as F
# 创建雇员表并持久化
lst = [(1, "John", 18, "Male"),
(2, "Lily", 25, "Female"),
(3, "Raymond", 30, "Male"),
(4, None, 19, "Female"),
(2, "Lily", 25, "Female")]
employeesDF = spark.createDataFrame(lst, ["id", "name", "age", "gender"])
employeesDF.cache()
DataFrame[id: bigint, name: string, age: bigint, gender: string]
探索类算子¶
# 打印 schema
employeesDF.printSchema()
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
# schema 属性
employeesDF.schema
StructType(List(StructField(id,LongType,true),StructField(name,StringType,true),StructField(age,LongType,true),StructField(gender,StringType,true)))
# columns 属性
employeesDF.columns
['id', 'name', 'age', 'gender']
# 一行数据
employeesDF.first()
Row(id=1, name='John', age=18, gender='Male')
# 多行数据
employeesDF.take(2)
[Row(id=1, name='John', age=18, gender='Male'),
Row(id=2, name='Lily', age=25, gender='Female')]
# 全部数据
employeesDF.collect()
[Row(id=1, name='John', age=18, gender='Male'),
Row(id=2, name='Lily', age=25, gender='Female'),
Row(id=3, name='Raymond', age=30, gender='Male'),
Row(id=4, name=None, age=19, gender='Female'),
Row(id=2, name='Lily', age=25, gender='Female')]
# 打印数据,默认20行
employeesDF.show()
+---+-------+---+------+
| id| name|age|gender|
+---+-------+---+------+
| 1| John| 18| Male|
| 2| Lily| 25|Female|
| 3|Raymond| 30| Male|
| 4| null| 19|Female|
| 2| Lily| 25|Female|
+---+-------+---+------+
# 查看列分布
employeesDF.describe(["age"]).show()
+-------+-----------------+
|summary| age|
+-------+-----------------+
| count| 5|
| mean| 23.4|
| stddev|4.929503017546495|
| min| 18|
| max| 30|
+-------+-----------------+
清洗类算子¶
# 去掉一列
employeesDF.drop("gender").show()
+---+-------+---+
| id| name|age|
+---+-------+---+
| 1| John| 18|
| 2| Lily| 25|
| 3|Raymond| 30|
| 4| null| 19|
| 2| Lily| 25|
+---+-------+---+
# 去重
employeesDF.distinct().show()
+---+-------+---+------+
| id| name|age|gender|
+---+-------+---+------+
| 1| John| 18| Male|
| 2| Lily| 25|Female|
| 3|Raymond| 30| Male|
| 4| null| 19|Female|
+---+-------+---+------+
# 针对某几列去重
employeesDF.dropDuplicates(['gender']).show()
+---+----+---+------+
| id|name|age|gender|
+---+----+---+------+
| 2|Lily| 25|Female|
| 1|John| 18| Male|
+---+----+---+------+
# 删除带 null 值的数据记录
employeesDF.na.drop().show()
+---+-------+---+------+
| id| name|age|gender|
+---+-------+---+------+
| 1| John| 18| Male|
| 2| Lily| 25|Female|
| 3|Raymond| 30| Male|
| 2| Lily| 25|Female|
+---+-------+---+------+
# null 值填充
employeesDF.na.fill("?").show()
+---+-------+---+------+
| id| name|age|gender|
+---+-------+---+------+
| 1| John| 18| Male|
| 2| Lily| 25|Female|
| 3|Raymond| 30| Male|
| 4| ?| 19|Female|
| 2| Lily| 25|Female|
+---+-------+---+------+
转换类算子¶
# 提取列
employeesDF.select("name", "gender").show()
+-------+------+
| name|gender|
+-------+------+
| John| Male|
| Lily|Female|
|Raymond| Male|
| null|Female|
| Lily|Female|
+-------+------+
# 提取列,支持 expr
employeesDF.selectExpr("id", "name", "concat(id, '_', name) as id_name").show()
+---+-------+---------+
| id| name| id_name|
+---+-------+---------+
| 1| John| 1_John|
| 2| Lily| 2_Lily|
| 3|Raymond|3_Raymond|
| 4| null| null|
| 2| Lily| 2_Lily|
+---+-------+---------+
# where 就是 filter
df = employeesDF.where(F.col("id") <= 2)
df.show()
+---+----+---+------+
| id|name|age|gender|
+---+----+---+------+
| 1|John| 18| Male|
| 2|Lily| 25|Female|
| 2|Lily| 25|Female|
+---+----+---+------+
# 字段重命名
employeesDF.withColumnRenamed("gender", "sex").show()
+---+-------+---+------+
| id| name|age| sex|
+---+-------+---+------+
| 1| John| 18| Male|
| 2| Lily| 25|Female|
| 3|Raymond| 30| Male|
| 4| null| 19|Female|
| 2| Lily| 25|Female|
+---+-------+---+------+
# 生成新的数据列
employeesDF.withColumn("new age", F.col("age") + 1).show()
+---+-------+---+------+-------+
| id| name|age|gender|new age|
+---+-------+---+------+-------+
| 1| John| 18| Male| 19|
| 2| Lily| 25|Female| 26|
| 3|Raymond| 30| Male| 31|
| 4| null| 19|Female| 20|
| 2| Lily| 25|Female| 26|
+---+-------+---+------+-------+
# 同类型 DataFrame 合并
employeesDF.union(df).show()
+---+-------+---+------+
| id| name|age|gender|
+---+-------+---+------+
| 1| John| 18| Male|
| 2| Lily| 25|Female|
| 3|Raymond| 30| Male|
| 4| null| 19|Female|
| 2| Lily| 25|Female|
| 1| John| 18| Male|
| 2| Lily| 25|Female|
| 2| Lily| 25|Female|
+---+-------+---+------+
# 采样,参数为:是否有放回,比例,随机数种子
employeesDF.sample(True, 0.5, 42).show()
+---+----+---+------+
| id|name|age|gender|
+---+----+---+------+
| 1|John| 18| Male|
+---+----+---+------+
分析类算子¶
毫不夸张的说,前面的探索、清洗、转换,都是在为数据分析做准备。
在大多数数据应用中,数据分析往往是最关键的,甚至是应用本身的核心目的。
# 薪水表
salaries = spark.createDataFrame([(1, 26000), (2, 30000), (3, 25000), (4, 20000)],
["id", "salary"])
# 和另一个表做 inner join
fullInfo = salaries.join(employeesDF, "id", "inner")
fullInfo.show()
+---+------+-------+---+------+
| id|salary| name|age|gender|
+---+------+-------+---+------+
| 1| 26000| John| 18| Male|
| 2| 30000| Lily| 25|Female|
| 2| 30000| Lily| 25|Female|
| 3| 25000|Raymond| 30| Male|
| 4| 20000| null| 19|Female|
+---+------+-------+---+------+
# groupBy 聚合
# agg 统计并生成新列
aggResult = (fullInfo.groupBy("gender")
.agg(F.sum("salary").alias("sum_salary"),
F.avg("salary").alias("avg_salary")))
aggResult.show()
+------+----------+------------------+
|gender|sum_salary| avg_salary|
+------+----------+------------------+
| Male| 51000| 25500.0|
|Female| 80000|26666.666666666668|
+------+----------+------------------+
# orderBy 等同于 sort
aggResult.sort("sum_salary", ascending=True).show()
+------+----------+------------------+
|gender|sum_salary| avg_salary|
+------+----------+------------------+
| Male| 51000| 25500.0|
|Female| 80000|26666.666666666668|
+------+----------+------------------+
持久化算子¶
即 write
API,它类似于 read
API
import os
path = "../data/agg.csv"
if not os.path.exists(path):
aggResult.write.format("csv").option("header", True).save(path)