DSAI4205 动手教程:用 Pandas、Dask、Spark RDD 与 SparkSQL 完成大数据分析入门

上一篇 DSAI4205 笔记偏“全景导览”,解释了大数据分析的系统主线。这一篇换成动手教程:根据 Tutorial 1-4、solution PDF 和配套数据,把 Pandas、Dask、Spark RDD、SparkSQL 串成一条从单机数据分析到分布式数据处理的实践路线。

读完这篇,目标不是记住所有 API,而是建立一个非常重要的迁移关系:

Pandas DataFrameDask DataFrameSpark RDDSpark DataFrame / SparkSQL\text{Pandas DataFrame} \rightarrow \text{Dask DataFrame} \rightarrow \text{Spark RDD} \rightarrow \text{Spark DataFrame / SparkSQL}

原始资料与数据:

1. 这组教程到底在训练什么

这四个 tutorial 表面上分别讲不同工具,但真正训练的是同一件事:同样的数据分析逻辑,如何随着数据规模变大而迁移。

在小数据上,我们用 Pandas。数据在内存里,API 简洁,适合做探索性分析。

当数据接近或超过单机内存,或者希望利用多核并行时,可以用 Dask。Dask 尽量保留 NumPy/Pandas 风格,但把计算拆成 partitions 和 task graph。

当进入真正的分布式处理时,Spark 提供 RDD 和 DataFrame 两套抽象。RDD 更底层,训练你理解 map、filter、flatMap、reduceByKey、join 这些分布式操作;SparkSQL 更高层,让你用 DataFrame/SQL 风格表达结构化数据分析。

可以把四个 tutorial 的关系理解成:

阶段 工具 核心对象 适合解决的问题
Tutorial 1 Pandas DataFrame 单机表格数据清洗、筛选、合并、聚合
Tutorial 2 Dask Dask Array / Dask DataFrame 大数组、大表格、延迟计算、并行执行
Tutorial 3 Spark RDD 分布式函数式计算、word count、key-value 聚合
Tutorial 4 SparkSQL Spark DataFrame 分布式结构化数据分析、SQL-like 查询

2. 数据文件先认清

这组资料里有几类数据。

data.tsv 是电影相关数据,字段包括:

1
userID, age, gender, movieID, name, year, genre1, genre2, genre3

它适合练习 Pandas 的读取、筛选、列处理和 genre 合并。

ratings.csv 包含用户评分:

1
userID, movieID, rating

它适合和 data.tsv 做 merge,再做 groupby 计算平均评分。

mtcars.csv 是经典汽车数据,字段包括:

1
mpg, cyl, disp, hp, drat, wt, qsec, vs, am, gear, carb

它适合 SparkSQL 练习读取 CSV、选择列、过滤、排序、聚合和 schema 检查。

data.zip 解压后是一组航班数据 CSV,包含 1990.csv1999.csv。它适合 Dask DataFrame 练习,因为文件多、数据量更大,天然需要 partition-based processing。

3. Tutorial 1:Python 基础是为了读懂数据操作

Tutorial 1 前半部分复习 Python 基础:if statement、字符串、for loop、list、dictionary、list comprehension。

这些内容看似基础,但后面非常常用。例如 list comprehension:

1
2
nums = [1, 2, 3, 4, 5]
squares = [x ** 2 for x in nums]

它的含义是:对 iterable 里的每个元素应用一个表达式,生成新 list。

加上条件:

1
large_values = [x for x in nums if x > 3]

后面无论是 Pandas 的 apply,还是 Spark 的 map,思想都很像:把一个函数应用到一批数据上。

所以 Python 基础里最值得掌握的不是语法细节,而是这三种思维:

  1. 条件判断:决定保留什么数据。
  2. 循环/遍历:逐个处理数据。
  3. 函数映射:把 transformation 应用到每个元素。

4. Pandas:表格数据分析的基本工作流

Pandas 的核心对象有两个:

Series 是一维带标签数组,可以理解为 DataFrame 的一列。

DataFrame 是二维表格,由多列 Series 组成。

创建 DataFrame 的例子:

1
2
3
4
5
6
7
import pandas as pd

programme_offer = pd.DataFrame({
"course_codes": ["DSAI4205", "DSAI5201", "DSAI5202"],
"course_names": ["Big Data Analytics", "AI Practice", "Emerging Topics"],
"tools": ["Python, Pandas, Spark", "Python, TensorFlow", "Python, Dask"]
})

读取文件时,data.tsv 要注意分隔符是 tab:

1
df = pd.read_csv("data.tsv", sep="\t")

读入后先做三件事:

1
2
3
df.head()
df.info()
df.describe()

head() 看前几行,info() 看列名、缺失值和类型,describe() 看数值统计。这三个动作相当于数据分析的“开箱检查”。

5. Pandas 列选择:单中括号和双中括号

Tutorial 1 里有一个很重要的小问题:

1
programme_offer["course_names"]

1
programme_offer[["course_names"]]

有什么区别?

单中括号选一列时,返回的是 Series。双中括号选一组列时,返回的是 DataFrame,即使里面只有一列。

这个区别会影响后续操作。如果你只需要一列做向量计算,用 Series 更直接;如果你希望结果仍然保持二维表格结构,用 DataFrame 更安全。

1
2
type(programme_offer["course_names"])    # pandas.Series
type(programme_offer[["course_names"]]) # pandas.DataFrame

6. Pandas 条件筛选、排序和新增列

电影数据中常见问题是:筛选 1990 年之后、女性用户观看过的电影,并按年份降序排列。

1
2
filtered = df[(df["year"] > 1990) & (df["gender"] == "F")]
filtered = filtered.sort_values("year", ascending=False)

注意 Pandas 里多个条件要用 &,并且每个条件都要加括号。不能直接写 Python 的 and,因为 Pandas 的条件是一整列 boolean Series。

新增列也很常见。比如把三列 genre 合并为一列:

1
2
3
4
5
6
def combine_genre(row):
genres = [row["genre1"], row["genre2"], row["genre3"]]
genres = [g for g in genres if pd.notna(g) and g != ""]
return "|".join(genres)

df["genre"] = df.apply(combine_genre, axis=1)

这里 axis=1 表示按行 apply。每次传入的是一行,所以可以同时读取 genre1genre2genre3

7. Pandas merge:把电影信息和评分连起来

data.tsv 里有电影和用户信息,ratings.csv 里有评分。想分析评分,就需要把它们按共同字段合并。

1
2
3
4
5
6
7
ratings = pd.read_csv("ratings.csv")
merged = pd.merge(
left=df,
right=ratings,
on=["userID", "movieID"],
how="inner"
)

how="inner" 表示只保留两边都匹配上的记录。

常见 join 类型:

join 类型 含义
inner 只保留左右两边都匹配的行
left 保留左表全部行,右表匹配不到就填 NaN
right 保留右表全部行,左表匹配不到就填 NaN
outer 左右两边全部保留,匹配不到就填 NaN

实际项目里,merge 后一定要检查行数有没有异常变化:

1
2
3
df.shape
ratings.shape
merged.shape

如果 merge 后行数突然暴涨,可能是 key 不唯一导致 many-to-many join。

8. Pandas groupby:从记录变成统计结果

groupby 的核心模式是:

1
dataframe.groupby(keys).aggregate_function()

比如计算每部电影的平均评分:

1
2
3
4
5
6
7
movie_rating = (
merged
.groupby(["movieID", "name"])["rating"]
.mean()
.reset_index()
.sort_values("rating", ascending=False)
)

这里发生了四步:

  1. movieIDname 分组。
  2. 取每组里的 rating
  3. 计算平均值。
  4. reset_index() 把 groupby 后的 index 还原为普通列。

这个模式非常重要。后面 Dask DataFrame 和 SparkSQL 里的 groupby,本质都是同一个逻辑。

9. Dask:为什么 Pandas 不够时要换工具

Pandas 很好用,但它通常把数据放进单机内存。如果数据太大,或者你想利用多核并行,Pandas 就不够了。

Dask 的关键思想是:

  1. 把大数据切成 chunks 或 partitions。
  2. 每个 partition 可以看成一个较小的 NumPy array 或 Pandas DataFrame。
  3. 操作不会立刻执行,而是先构建 task graph。
  4. 调用 compute() 时,Dask 才真正执行。

这就是 lazy evaluation。

1
2
3
4
import dask.array as da

arr = da.ones(20, chunks=10)
result = arr.sum()

此时 result 还不是最终数值,而是一个延迟计算对象。

1
result.compute()

调用 compute() 后才真正计算。

10. Dask Array:chunk size 是性能关键

Tutorial 2 里用 Dask Array 创建大数组:

1
2
3
4
5
6
7
8
dask_arr = da.random.normal(
20,
0.1,
size=(20000, 20000),
chunks=(2000, 2000)
)

dask_arr_mean = dask_arr.mean(axis=0).compute()

这里 chunks=(2000, 2000) 表示把大矩阵切成很多 2000×20002000\times 2000 的小块。

chunk size 不能太小,也不能太大。

如果 chunk 太小,每个 task 做的实际计算很少,调度开销会很大。

如果 chunk 太大,每个 worker 需要一次加载太多数据,可能内存不够,甚至 spill 到磁盘,速度会明显下降。

所以 Dask 里一个核心经验是:

chunk size=enough work per task+fits in memory\text{chunk size} = \text{enough work per task} + \text{fits in memory}

教程中的 TODO 比较了 chunks=(10000,10000)chunks=(30,30)。前者 task 少但单块大,后者 task 很多但调度开销高。这个实验比单纯背概念更重要,因为它能让你直观看到 chunk size 对性能的影响。

11. Dask DataFrame:把 Pandas 操作迁移到多文件数据

data.zip 里有 1990 到 1999 年的航班 CSV。解压后可以用 Dask 一次读取多个文件:

1
2
3
import dask.dataframe as dd

ddf = dd.read_csv("data/*.csv")

Dask DataFrame 看起来像 Pandas,但背后是很多 Pandas DataFrame partition。

常见操作:

1
non_cancelled = ddf[~ddf["Cancelled"]]

统计未取消航班数量:

1
len(non_cancelled.compute())

按机场统计未取消航班:

1
non_cancelled.groupby("Origin").Origin.count().compute()

计算每个机场平均出发延误:

1
ddf.groupby("Origin").DepDelay.mean().compute()

找平均延误最高的星期:

1
ddf.groupby("DayOfWeek").DepDelay.mean().idxmax().compute()

这些代码和 Pandas 很像,但要注意:Dask 里很多结果都需要 compute() 才会真正执行。

12. Dask 的 compute、persist 和 map_partitions

如果你写:

1
2
mean_delay = non_cancelled["DepDelay"].mean()
std_delay = non_cancelled["DepDelay"].std()

这两个变量还只是延迟计算。分别调用两次:

1
2
mean_delay.compute()
std_delay.compute()

可能会重复读取和过滤数据。更好的方式是一次性触发多个计算:

1
2
3
import dask

mean_result, std_result = dask.compute(mean_delay, std_delay)

如果某个中间结果会反复使用,可以 persist()

1
2
ddf_jfk = non_cancelled[non_cancelled["Origin"] == "JFK"]
ddf_jfk = ddf_jfk.persist()

persist() 会把中间结果保留在内存里,避免后续重复执行读取和筛选。

对于 Dask DataFrame 没有直接支持的自定义逻辑,可以用 map_partitions()。比如把距离从 miles 转成 kilometers:

1
2
3
4
5
6
7
8
9
10
11
12
import pandas as pd

def converter(series, multiplier=1.60934):
return series * multiplier

meta = pd.Series(name="Distance", dtype="float64")

distance_km = ddf.Distance.map_partitions(
converter,
multiplier=1.60934,
meta=meta
)

map_partitions() 的含义是:把函数应用到每个内部 Pandas partition 上。meta 用来告诉 Dask 输出的数据结构和类型,帮助它构建 task graph。

13. Spark:先理解 RDD

Spark 的入口通常是 SparkSession

1
2
3
4
5
6
7
8
9
10
from pyspark.sql import SparkSession

ss = (
SparkSession.builder
.master("local[1]")
.appName("DSAI4205")
.getOrCreate()
)

spark = ss.sparkContext

这里 .master("local[1]") 表示在本机用 1 个 worker thread 跑。真实集群里 master 会指向 cluster manager。

RDD 是 Spark 的底层分布式数据抽象。创建 RDD 有两种常见方式:

1
rdd = spark.parallelize([1, 2, 3])

或者从文件读取:

1
text_rdd = spark.textFile("wordCount.txt")

RDD 操作分两类:

Transformation 返回新的 RDD,比如 mapfilterflatMapreduceByKey

Action 触发执行并返回结果,比如 collectcounttake

这和 Dask 的 lazy evaluation 很像:不是每一步都立刻执行,而是遇到 action 才真正计算。

14. Spark RDD 的 map、filter、flatMap

map() 对每个元素做一对一转换:

1
2
rdd = spark.parallelize([1, 2, 3])
rdd.map(lambda x: x + 1).collect()

filter() 保留满足条件的元素:

1
rdd.filter(lambda x: x > 1).collect()

flatMap() 是一对多转换,然后把结果压平。word count 里非常常用:

1
2
3
lines = spark.parallelize(["I like Spark", "I like Python"])
words = lines.flatMap(lambda line: line.split())
words.collect()

如果用 map(lambda line: line.split()),结果会是 list of lists;用 flatMap,结果是一个平铺后的 word stream。

15. Word Count:Spark 的经典模式

word count 是 Spark RDD 的入门模板:

1
2
3
4
5
6
7
8
9
10
11
12
13
from operator import add

text_rdd = spark.textFile("wordCountEx.txt")

counts = (
text_rdd
.flatMap(lambda line: line.split())
.map(lambda word: (word.lower(), 1))
.reduceByKey(add)
.sortBy(lambda x: x[1], ascending=False)
)

counts.collect()

这段代码对应四步:

  1. flatMap:把每行文本拆成词。
  2. map:把每个词变成 (word, 1)
  3. reduceByKey:按 word 聚合,把 1 加起来。
  4. sortBy:按 count 降序排序。

这是分布式计算里最重要的模式之一:

recordskey-value pairsgroup by keyaggregate\text{records} \rightarrow \text{key-value pairs} \rightarrow \text{group by key} \rightarrow \text{aggregate}

16. Key-value RDD:join 和 reduceByKey

RDD 里很多高级操作都围绕 key-value pair。

1
kv_rdd = spark.parallelize([(3, 4), (3, 1), (5, 6), (5, 2)])

按 key 求和:

1
2
3
from operator import add

kv_rdd.reduceByKey(add).collect()

对 value 做变换:

1
kv_rdd.mapValues(lambda v: v ** 2).collect()

两个 key-value RDD 可以 join:

1
2
3
4
left = spark.parallelize([(1, 2), (3, 4), (3, 6)])
right = spark.parallelize([(3, 7)])

left.join(right).collect()

inner join 只保留两边都有的 key。leftOuterJoin() 会保留左边所有 key,如果右边没有对应值,会出现 None。Tutorial 3 的 solution 里把 None 替换成 "error"

1
2
3
4
5
6
7
8
9
result.map(
lambda x: (
x[0],
(
"error" if x[1][0] is None else x[1][0],
"error" if x[1][1] is None else x[1][1]
)
)
).collect()

这个例子很适合理解 Spark 里的 tuple 结构:x[0] 是 key,x[1] 是 join 后的 value tuple。

17. DNA k-mer 练习:把序列变成分布式计数问题

Tutorial 3 的 DNA 练习很有价值,因为它把字符串序列分析转成了 Spark RDD pipeline。

给定 DNA 字符串:

1
dna_str = "gcctaagccta"

如果 k=5,连续 pattern 是:

1
gccta, cctaa, ctaag, taagc, aagcc, agcct, gccta

可以写函数:

1
2
def generate_pattern(line, k):
return [(line[i:i+k], 1) for i in range(len(line) - k + 1)]

然后用 Spark 统计:

1
2
3
4
5
6
7
8
k = 5

seq_rdd = (
dna_rdd
.flatMap(lambda line: generate_pattern(line, k))
.reduceByKey(add)
.sortBy(lambda x: x[1], ascending=False)
)

如果要找出现超过一次的 palindrome pattern:

1
2
3
4
5
palindromes = (
seq_rdd
.filter(lambda x: x[1] > 1)
.filter(lambda x: x[0] == x[0][::-1])
)

这个练习的重点不是 DNA 本身,而是把复杂对象拆成 key-value,再用分布式聚合解决。

18. SparkSQL:从 RDD 走向 DataFrame

RDD 很灵活,但写起来偏底层。SparkSQL 的 DataFrame API 更接近 Pandas 和 SQL,也更适合结构化数据分析。

读取 mtcars.csv

1
2
3
4
5
6
7
8
9
10
mtcars = ss.read.csv(
path="mtcars.csv",
sep=",",
encoding="UTF-8",
comment=None,
header=True,
inferSchema=True
)

mtcars.show(5, truncate=False)

inferSchema=True 会让 Spark 自动推断列类型。如果不推断,很多数值列可能会被当成 string,后面做比较和聚合会出问题。

从 Python list 创建 DataFrame:

1
2
my_list = [["a", 1], ["b", 2]]
df = ss.createDataFrame(my_list, ["letter", "number"])

从 Pandas DataFrame 转 Spark DataFrame:

1
2
3
4
5
6
7
8
import pandas as pd

pdf = pd.DataFrame({
"name": ["Alice", "Bob"],
"score": [90, 85]
})

df_spark = ss.createDataFrame(pdf)

19. Spark DataFrame 常用操作

查看列名:

1
mtcars.columns

重命名列:

1
df0 = mtcars.withColumnRenamed("_c0", "model")

选择列:

1
mtcars.select("mpg", "cyl").show(5)

过滤:

1
df1 = df0.filter(df0.mpg > 18)

排序:

1
df0.sort("model").show(5)

删除列:

1
df0.drop("mpg").show(5)

看 schema:

1
df0.printSchema()

聚合:

1
2
mtcars.agg({"mpg": "max"}).show()
mtcars.agg({"mpg": "mean"}).show()

这些操作对应 Pandas 里熟悉的 select columnsfilter rowssort_valuesdropgroupby/agg。区别是 Spark DataFrame 的执行是分布式的,而且很多操作也是 lazy 的。

20. withColumn、col、when:SparkSQL 的表达式思维

Spark DataFrame 里经常用 withColumn() 新增或替换列。

例如计算山地比赛平均时间:

1
2
3
4
5
6
7
8
9
10
11
12
from pyspark.sql.functions import col

hills2000_df = ss.read.csv(
"hills2000.txt",
header=True,
inferSchema=True
)

hills2000_df = hills2000_df.withColumn(
"avg_time",
(col("time") + col("timef")) / 2
)

再计算平均速度:

1
2
3
4
hills2000_df = hills2000_df.withColumn(
"avg_speed",
col("dist") / col("avg_time")
)

如果要根据难度分级,可以用 when()

1
2
3
4
5
6
7
8
from pyspark.sql.functions import when

hills_union_diff_level = hills_union_diff.withColumn(
"difficulty",
when(col("diff_score") < 200, "Easy")
.when(col("diff_score") < 350, "Moderate")
.otherwise("Difficult")
)

这里的关键是:Spark DataFrame 不是直接对 Python 变量逐行循环,而是构造 column expression。col("time") 表示一整列,withColumn() 表示基于表达式生成新列。

21. DataFrame 与 RDD 可以互相转换

SparkSQL 里可以把 DataFrame 转成 RDD:

1
rdd = mtcars.rdd

每一行会变成 Row 对象。反过来,也可以从 RDD 创建 DataFrame:

1
2
3
4
5
from pyspark.sql import Row

data = [("ctaag", 103), ("gccta", 96)]
rdd = ss.sparkContext.parallelize(data)
df = rdd.map(lambda x: Row(sequence=x[0], count=x[1])).toDF()

什么时候用 RDD,什么时候用 DataFrame?

如果数据是非结构化的、逻辑很自定义,比如 DNA k-mer 或 word count,RDD 很自然。

如果数据是表格,有列名、类型、过滤、聚合、join,那么 DataFrame/SparkSQL 更合适,也更容易被 Spark 优化。

22. 把四个 Tutorial 串成一套学习路线

如果你要真正掌握这组资料,建议按下面顺序练:

第一轮:Pandas

  1. pd.read_csv("data.tsv", sep="\t") 读取电影数据。
  2. 用条件筛选找出 1990 年之后女性用户相关记录。
  3. apply(axis=1) 合并 genre。
  4. 读取 ratings.csv 并和电影数据 merge。
  5. 用 groupby 计算每部电影平均评分。

第二轮:Dask

  1. 创建 Dask Array,观察 chunks。
  2. 修改 chunk size,比较计算表现。
  3. 解压 data.zip,用 dd.read_csv("data/*.csv") 读取多文件。
  4. 计算未取消航班、机场延误、星期延误。
  5. 比较 compute()dask.compute()persist()
  6. map_partitions() 写自定义转换。

第三轮:Spark RDD

  1. 创建 SparkSession 和 SparkContext。
  2. parallelize() 创建 RDD。
  3. 练习 mapfilterflatMap
  4. 完成 word count。
  5. 完成 key-value RDD 的 reduceByKey 和 join。
  6. 完成 DNA k-mer count 和 palindrome filter。

第四轮:SparkSQL

  1. 读取 mtcars.csv
  2. 练习 selectfiltersortdropagg
  3. withColumncol 创建新列。
  4. when 做分级标签。
  5. 理解 DataFrame 和 RDD 的互相转换。

23. 最重要的迁移关系

这组 tutorial 的价值在于,你会发现很多操作其实是同一种思想的不同实现。

数据分析意图 Pandas Dask Spark RDD SparkSQL
读取数据 pd.read_csv dd.read_csv textFile ss.read.csv
按行转换 apply(axis=1) map_partitions map / flatMap withColumn
过滤 boolean mask boolean mask filter filter
分组聚合 groupby().mean() groupby().mean().compute() map + reduceByKey groupBy().agg()
合并 merge limited merge/join join join
触发执行 立即执行 compute() collect() / action action,如 show()

理解这张表,比记住单个 API 更重要。因为以后换工具时,你会知道自己真正要做的是哪类数据操作。

24. 常见坑

Pandas 的 and / or 不能直接用于 Series 条件。

要用 &|,并给每个条件加括号:

1
df[(df["year"] > 1990) & (df["gender"] == "F")]

Pandas 单中括号和双中括号返回类型不同。

1
2
df["name"]    # Series
df[["name"]] # DataFrame

Dask 看到结果对象不代表已经计算。

Dask 很多操作只是构建 task graph,需要 compute()

Dask chunk 太小或太大都不好。

太小会有调度开销,太大可能内存压力大。

Spark 的 collect() 不要乱用在大数据上。

collect() 会把分布式结果拉回 driver,本地小练习可以用,真实大数据场景容易爆内存。

Spark DataFrame 里的 col() 是列表达式,不是普通 Python 值。

写 SparkSQL 时,要习惯 expression-based programming。

25. 总结

这组资料最值得学到的不是“某个函数怎么写”,而是从单机到分布式的数据分析抽象。

Pandas 让你熟悉表格数据分析的基本语法:读、筛、改、合、聚合。

Dask 让你理解 partitions、chunks、lazy evaluation、task graph,以及为什么计算需要延迟和调度。

Spark RDD 让你理解最基础的分布式函数式计算:map、flatMap、filter、reduceByKey、join。

SparkSQL 让你回到更接近业务分析的 DataFrame/SQL 风格,同时保留分布式执行能力。

如果你未来要做数据挖掘、搜索推荐、RAG 数据管线或大模型应用工程,这条路线很实用:小数据先用 Pandas 验证逻辑,数据变大后用 Dask 或 Spark 扩展,结构化分析用 SparkSQL,非结构化或 key-value 任务用 RDD 思维处理。

DSAI4205 动手教程:用 Pandas、Dask、Spark RDD 与 SparkSQL 完成大数据分析入门

https://richardf123.github.io/2026/06/24/dsai4205-big-data-hands-on-tutorial/

作者

RichardF

发布于

2026-06-24

更新于

2026-06-24

许可协议