创建 DataFrame¶
Note
可以从 RDD、从文件、从 RDBMS 创建 DataFrame 。
from pyspark.sql import SparkSession
spark = (SparkSession
.builder
.appName("create dataframe")
.config('spark.executor.memory', '4g')
.config('spark.driver.memory', '8g')
.config("spark.executor.cores", 2)
.config('spark.cores.max', 4)
.getOrCreate())
从 RDD 创建¶
# 创建 RDD
rdd = spark.sparkContext.parallelize([('Alice', 18), ('Bob', 20)])
# 指明列
df1 = spark.createDataFrame(rdd, ['name', 'age'])
df1.show()
+-----+---+
| name|age|
+-----+---+
|Alice| 18|
| Bob| 20|
+-----+---+
# 使用 toDF 方法达到同样的效果
rdd.toDF(['name', 'age']).show()
+-----+---+
| name|age|
+-----+---+
|Alice| 18|
| Bob| 20|
+-----+---+
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# 定义 schema
schema = StructType([StructField("name", StringType()),
StructField("age", IntegerType())])
# 指明 schema
df2 = spark.createDataFrame(rdd, schema=schema)
df2.printSchema()
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
读取文件创建¶
# 指明 schema,且遇到脏数据跳过加载
df = (spark.read.format("csv")
.schema(schema)
.option("header", True)
.option("mode", "dropMalformed")
.load("../data/sample.csv"))
df.show()
+-----+---+
| name|age|
+-----+---+
|Alice| 18|
| Bob| 20|
| John| 35|
| Lily| 6|
+-----+---+
也可从 Parquet/ORC 文件创建 DataFrame,因为它们是列存文件,因此无需手动指定 schema 。
# 读取 parquet 文件
df3 = spark.read.format("parquet").load("/road_to_parquet_file")
# 读取 orc 文件
df4 = spark.read.format("orc").load("/road_to_orc_file")
从 RDBMS 创建¶
RDBMS 即关系型数据库管理系统,从 RDBMS 创建 DataFrame 的代码示例如下:
df = (spark.read.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://hostname:port/mysql")
.option("user", "your_user_name")
.option("password", "your_password")
.option("numPartitions", 20)
.option("dbtable", "table_name")
.load())