创建 DataFrame

Note

可以从 RDD、从文件、从 RDBMS 创建 DataFrame 。

from pyspark.sql import SparkSession

spark = (SparkSession
         .builder
         .appName("create dataframe")
         .config('spark.executor.memory', '4g')
         .config('spark.driver.memory', '8g')
         .config("spark.executor.cores", 2)
         .config('spark.cores.max', 4)
         .getOrCreate())

从 RDD 创建

# 创建 RDD
rdd = spark.sparkContext.parallelize([('Alice', 18), ('Bob', 20)])
# 指明列
df1 = spark.createDataFrame(rdd, ['name', 'age'])
df1.show()
+-----+---+
| name|age|
+-----+---+
|Alice| 18|
|  Bob| 20|
+-----+---+
# 使用 toDF 方法达到同样的效果
rdd.toDF(['name', 'age']).show()
+-----+---+
| name|age|
+-----+---+
|Alice| 18|
|  Bob| 20|
+-----+---+
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# 定义 schema
schema = StructType([StructField("name", StringType()),
                     StructField("age", IntegerType())])
# 指明 schema
df2 = spark.createDataFrame(rdd, schema=schema)
df2.printSchema()
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

读取文件创建

# 指明 schema,且遇到脏数据跳过加载
df = (spark.read.format("csv")
      .schema(schema)
      .option("header", True)
      .option("mode", "dropMalformed")
      .load("../data/sample.csv"))
df.show()
+-----+---+
| name|age|
+-----+---+
|Alice| 18|
|  Bob| 20|
| John| 35|
| Lily|  6|
+-----+---+

也可从 Parquet/ORC 文件创建 DataFrame,因为它们是列存文件,因此无需手动指定 schema 。

# 读取 parquet 文件
df3 = spark.read.format("parquet").load("/road_to_parquet_file")
# 读取 orc 文件
df4 = spark.read.format("orc").load("/road_to_orc_file")

从 RDBMS 创建

RDBMS 即关系型数据库管理系统,从 RDBMS 创建 DataFrame 的代码示例如下:

df = (spark.read.format("jdbc")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("url", "jdbc:mysql://hostname:port/mysql")
      .option("user", "your_user_name")
      .option("password", "your_password")
      .option("numPartitions", 20)
      .option("dbtable", "table_name")
      .load())