DSAI4205 动手教程:用 Pandas、Dask、Spark RDD 与 SparkSQL 完成大数据分析入门
上一篇 DSAI4205 笔记偏“全景导览”,解释了大数据分析的系统主线。这一篇换成动手教程:根据 Tutorial 1-4、solution PDF 和配套数据,把 Pandas、Dask、Spark RDD、SparkSQL 串成一条从单机数据分析到分布式数据处理的实践路线。
读完这篇,目标不是记住所有 API,而是建立一个非常重要的迁移关系:
原始资料与数据:
- Tutorial 1: Intro to Python and Pandas
- Tutorial 2: Intro to Dask Array and Dask DataFrame
- Tutorial 3: Intro to Spark
- Tutorial 4: Intro to SparkSQL
- T02 Solution
- T03 Solution
- T04 Solution
- data.tsv
- ratings.csv
- mtcars.csv
- data.zip
1. 这组教程到底在训练什么
这四个 tutorial 表面上分别讲不同工具,但真正训练的是同一件事:同样的数据分析逻辑,如何随着数据规模变大而迁移。
在小数据上,我们用 Pandas。数据在内存里,API 简洁,适合做探索性分析。
当数据接近或超过单机内存,或者希望利用多核并行时,可以用 Dask。Dask 尽量保留 NumPy/Pandas 风格,但把计算拆成 partitions 和 task graph。
当进入真正的分布式处理时,Spark 提供 RDD 和 DataFrame 两套抽象。RDD 更底层,训练你理解 map、filter、flatMap、reduceByKey、join 这些分布式操作;SparkSQL 更高层,让你用 DataFrame/SQL 风格表达结构化数据分析。
可以把四个 tutorial 的关系理解成:
| 阶段 | 工具 | 核心对象 | 适合解决的问题 |
|---|---|---|---|
| Tutorial 1 | Pandas | DataFrame | 单机表格数据清洗、筛选、合并、聚合 |
| Tutorial 2 | Dask | Dask Array / Dask DataFrame | 大数组、大表格、延迟计算、并行执行 |
| Tutorial 3 | Spark | RDD | 分布式函数式计算、word count、key-value 聚合 |
| Tutorial 4 | SparkSQL | Spark DataFrame | 分布式结构化数据分析、SQL-like 查询 |
2. 数据文件先认清
这组资料里有几类数据。
data.tsv 是电影相关数据,字段包括:
1 | userID, age, gender, movieID, name, year, genre1, genre2, genre3 |
它适合练习 Pandas 的读取、筛选、列处理和 genre 合并。
ratings.csv 包含用户评分:
1 | userID, movieID, rating |
它适合和 data.tsv 做 merge,再做 groupby 计算平均评分。
mtcars.csv 是经典汽车数据,字段包括:
1 | mpg, cyl, disp, hp, drat, wt, qsec, vs, am, gear, carb |
它适合 SparkSQL 练习读取 CSV、选择列、过滤、排序、聚合和 schema 检查。
data.zip 解压后是一组航班数据 CSV,包含 1990.csv 到 1999.csv。它适合 Dask DataFrame 练习,因为文件多、数据量更大,天然需要 partition-based processing。
3. Tutorial 1:Python 基础是为了读懂数据操作
Tutorial 1 前半部分复习 Python 基础:if statement、字符串、for loop、list、dictionary、list comprehension。
这些内容看似基础,但后面非常常用。例如 list comprehension:
1 | nums = [1, 2, 3, 4, 5] |
它的含义是:对 iterable 里的每个元素应用一个表达式,生成新 list。
加上条件:
1 | large_values = [x for x in nums if x > 3] |
后面无论是 Pandas 的 apply,还是 Spark 的 map,思想都很像:把一个函数应用到一批数据上。
所以 Python 基础里最值得掌握的不是语法细节,而是这三种思维:
- 条件判断:决定保留什么数据。
- 循环/遍历:逐个处理数据。
- 函数映射:把 transformation 应用到每个元素。
4. Pandas:表格数据分析的基本工作流
Pandas 的核心对象有两个:
Series 是一维带标签数组,可以理解为 DataFrame 的一列。
DataFrame 是二维表格,由多列 Series 组成。
创建 DataFrame 的例子:
1 | import pandas as pd |
读取文件时,data.tsv 要注意分隔符是 tab:
1 | df = pd.read_csv("data.tsv", sep="\t") |
读入后先做三件事:
1 | df.head() |
head() 看前几行,info() 看列名、缺失值和类型,describe() 看数值统计。这三个动作相当于数据分析的“开箱检查”。
5. Pandas 列选择:单中括号和双中括号
Tutorial 1 里有一个很重要的小问题:
1 | programme_offer["course_names"] |
和
1 | programme_offer[["course_names"]] |
有什么区别?
单中括号选一列时,返回的是 Series。双中括号选一组列时,返回的是 DataFrame,即使里面只有一列。
这个区别会影响后续操作。如果你只需要一列做向量计算,用 Series 更直接;如果你希望结果仍然保持二维表格结构,用 DataFrame 更安全。
1 | type(programme_offer["course_names"]) # pandas.Series |
6. Pandas 条件筛选、排序和新增列
电影数据中常见问题是:筛选 1990 年之后、女性用户观看过的电影,并按年份降序排列。
1 | filtered = df[(df["year"] > 1990) & (df["gender"] == "F")] |
注意 Pandas 里多个条件要用 &,并且每个条件都要加括号。不能直接写 Python 的 and,因为 Pandas 的条件是一整列 boolean Series。
新增列也很常见。比如把三列 genre 合并为一列:
1 | def combine_genre(row): |
这里 axis=1 表示按行 apply。每次传入的是一行,所以可以同时读取 genre1、genre2、genre3。
7. Pandas merge:把电影信息和评分连起来
data.tsv 里有电影和用户信息,ratings.csv 里有评分。想分析评分,就需要把它们按共同字段合并。
1 | ratings = pd.read_csv("ratings.csv") |
how="inner" 表示只保留两边都匹配上的记录。
常见 join 类型:
| join 类型 | 含义 |
|---|---|
| inner | 只保留左右两边都匹配的行 |
| left | 保留左表全部行,右表匹配不到就填 NaN |
| right | 保留右表全部行,左表匹配不到就填 NaN |
| outer | 左右两边全部保留,匹配不到就填 NaN |
实际项目里,merge 后一定要检查行数有没有异常变化:
1 | df.shape |
如果 merge 后行数突然暴涨,可能是 key 不唯一导致 many-to-many join。
8. Pandas groupby:从记录变成统计结果
groupby 的核心模式是:
1 | dataframe.groupby(keys).aggregate_function() |
比如计算每部电影的平均评分:
1 | movie_rating = ( |
这里发生了四步:
- 按
movieID和name分组。 - 取每组里的
rating。 - 计算平均值。
- 用
reset_index()把 groupby 后的 index 还原为普通列。
这个模式非常重要。后面 Dask DataFrame 和 SparkSQL 里的 groupby,本质都是同一个逻辑。
9. Dask:为什么 Pandas 不够时要换工具
Pandas 很好用,但它通常把数据放进单机内存。如果数据太大,或者你想利用多核并行,Pandas 就不够了。
Dask 的关键思想是:
- 把大数据切成 chunks 或 partitions。
- 每个 partition 可以看成一个较小的 NumPy array 或 Pandas DataFrame。
- 操作不会立刻执行,而是先构建 task graph。
- 调用
compute()时,Dask 才真正执行。
这就是 lazy evaluation。
1 | import dask.array as da |
此时 result 还不是最终数值,而是一个延迟计算对象。
1 | result.compute() |
调用 compute() 后才真正计算。
10. Dask Array:chunk size 是性能关键
Tutorial 2 里用 Dask Array 创建大数组:
1 | dask_arr = da.random.normal( |
这里 chunks=(2000, 2000) 表示把大矩阵切成很多 的小块。
chunk size 不能太小,也不能太大。
如果 chunk 太小,每个 task 做的实际计算很少,调度开销会很大。
如果 chunk 太大,每个 worker 需要一次加载太多数据,可能内存不够,甚至 spill 到磁盘,速度会明显下降。
所以 Dask 里一个核心经验是:
教程中的 TODO 比较了 chunks=(10000,10000) 和 chunks=(30,30)。前者 task 少但单块大,后者 task 很多但调度开销高。这个实验比单纯背概念更重要,因为它能让你直观看到 chunk size 对性能的影响。
11. Dask DataFrame:把 Pandas 操作迁移到多文件数据
data.zip 里有 1990 到 1999 年的航班 CSV。解压后可以用 Dask 一次读取多个文件:
1 | import dask.dataframe as dd |
Dask DataFrame 看起来像 Pandas,但背后是很多 Pandas DataFrame partition。
常见操作:
1 | non_cancelled = ddf[~ddf["Cancelled"]] |
统计未取消航班数量:
1 | len(non_cancelled.compute()) |
按机场统计未取消航班:
1 | non_cancelled.groupby("Origin").Origin.count().compute() |
计算每个机场平均出发延误:
1 | ddf.groupby("Origin").DepDelay.mean().compute() |
找平均延误最高的星期:
1 | ddf.groupby("DayOfWeek").DepDelay.mean().idxmax().compute() |
这些代码和 Pandas 很像,但要注意:Dask 里很多结果都需要 compute() 才会真正执行。
12. Dask 的 compute、persist 和 map_partitions
如果你写:
1 | mean_delay = non_cancelled["DepDelay"].mean() |
这两个变量还只是延迟计算。分别调用两次:
1 | mean_delay.compute() |
可能会重复读取和过滤数据。更好的方式是一次性触发多个计算:
1 | import dask |
如果某个中间结果会反复使用,可以 persist():
1 | ddf_jfk = non_cancelled[non_cancelled["Origin"] == "JFK"] |
persist() 会把中间结果保留在内存里,避免后续重复执行读取和筛选。
对于 Dask DataFrame 没有直接支持的自定义逻辑,可以用 map_partitions()。比如把距离从 miles 转成 kilometers:
1 | import pandas as pd |
map_partitions() 的含义是:把函数应用到每个内部 Pandas partition 上。meta 用来告诉 Dask 输出的数据结构和类型,帮助它构建 task graph。
13. Spark:先理解 RDD
Spark 的入口通常是 SparkSession:
1 | from pyspark.sql import SparkSession |
这里 .master("local[1]") 表示在本机用 1 个 worker thread 跑。真实集群里 master 会指向 cluster manager。
RDD 是 Spark 的底层分布式数据抽象。创建 RDD 有两种常见方式:
1 | rdd = spark.parallelize([1, 2, 3]) |
或者从文件读取:
1 | text_rdd = spark.textFile("wordCount.txt") |
RDD 操作分两类:
Transformation 返回新的 RDD,比如 map、filter、flatMap、reduceByKey。
Action 触发执行并返回结果,比如 collect、count、take。
这和 Dask 的 lazy evaluation 很像:不是每一步都立刻执行,而是遇到 action 才真正计算。
14. Spark RDD 的 map、filter、flatMap
map() 对每个元素做一对一转换:
1 | rdd = spark.parallelize([1, 2, 3]) |
filter() 保留满足条件的元素:
1 | rdd.filter(lambda x: x > 1).collect() |
flatMap() 是一对多转换,然后把结果压平。word count 里非常常用:
1 | lines = spark.parallelize(["I like Spark", "I like Python"]) |
如果用 map(lambda line: line.split()),结果会是 list of lists;用 flatMap,结果是一个平铺后的 word stream。
15. Word Count:Spark 的经典模式
word count 是 Spark RDD 的入门模板:
1 | from operator import add |
这段代码对应四步:
flatMap:把每行文本拆成词。map:把每个词变成(word, 1)。reduceByKey:按 word 聚合,把 1 加起来。sortBy:按 count 降序排序。
这是分布式计算里最重要的模式之一:
16. Key-value RDD:join 和 reduceByKey
RDD 里很多高级操作都围绕 key-value pair。
1 | kv_rdd = spark.parallelize([(3, 4), (3, 1), (5, 6), (5, 2)]) |
按 key 求和:
1 | from operator import add |
对 value 做变换:
1 | kv_rdd.mapValues(lambda v: v ** 2).collect() |
两个 key-value RDD 可以 join:
1 | left = spark.parallelize([(1, 2), (3, 4), (3, 6)]) |
inner join 只保留两边都有的 key。leftOuterJoin() 会保留左边所有 key,如果右边没有对应值,会出现 None。Tutorial 3 的 solution 里把 None 替换成 "error":
1 | result.map( |
这个例子很适合理解 Spark 里的 tuple 结构:x[0] 是 key,x[1] 是 join 后的 value tuple。
17. DNA k-mer 练习:把序列变成分布式计数问题
Tutorial 3 的 DNA 练习很有价值,因为它把字符串序列分析转成了 Spark RDD pipeline。
给定 DNA 字符串:
1 | dna_str = "gcctaagccta" |
如果 k=5,连续 pattern 是:
1 | gccta, cctaa, ctaag, taagc, aagcc, agcct, gccta |
可以写函数:
1 | def generate_pattern(line, k): |
然后用 Spark 统计:
1 | k = 5 |
如果要找出现超过一次的 palindrome pattern:
1 | palindromes = ( |
这个练习的重点不是 DNA 本身,而是把复杂对象拆成 key-value,再用分布式聚合解决。
18. SparkSQL:从 RDD 走向 DataFrame
RDD 很灵活,但写起来偏底层。SparkSQL 的 DataFrame API 更接近 Pandas 和 SQL,也更适合结构化数据分析。
读取 mtcars.csv:
1 | mtcars = ss.read.csv( |
inferSchema=True 会让 Spark 自动推断列类型。如果不推断,很多数值列可能会被当成 string,后面做比较和聚合会出问题。
从 Python list 创建 DataFrame:
1 | my_list = [["a", 1], ["b", 2]] |
从 Pandas DataFrame 转 Spark DataFrame:
1 | import pandas as pd |
19. Spark DataFrame 常用操作
查看列名:
1 | mtcars.columns |
重命名列:
1 | df0 = mtcars.withColumnRenamed("_c0", "model") |
选择列:
1 | mtcars.select("mpg", "cyl").show(5) |
过滤:
1 | df1 = df0.filter(df0.mpg > 18) |
排序:
1 | df0.sort("model").show(5) |
删除列:
1 | df0.drop("mpg").show(5) |
看 schema:
1 | df0.printSchema() |
聚合:
1 | mtcars.agg({"mpg": "max"}).show() |
这些操作对应 Pandas 里熟悉的 select columns、filter rows、sort_values、drop、groupby/agg。区别是 Spark DataFrame 的执行是分布式的,而且很多操作也是 lazy 的。
20. withColumn、col、when:SparkSQL 的表达式思维
Spark DataFrame 里经常用 withColumn() 新增或替换列。
例如计算山地比赛平均时间:
1 | from pyspark.sql.functions import col |
再计算平均速度:
1 | hills2000_df = hills2000_df.withColumn( |
如果要根据难度分级,可以用 when():
1 | from pyspark.sql.functions import when |
这里的关键是:Spark DataFrame 不是直接对 Python 变量逐行循环,而是构造 column expression。col("time") 表示一整列,withColumn() 表示基于表达式生成新列。
21. DataFrame 与 RDD 可以互相转换
SparkSQL 里可以把 DataFrame 转成 RDD:
1 | rdd = mtcars.rdd |
每一行会变成 Row 对象。反过来,也可以从 RDD 创建 DataFrame:
1 | from pyspark.sql import Row |
什么时候用 RDD,什么时候用 DataFrame?
如果数据是非结构化的、逻辑很自定义,比如 DNA k-mer 或 word count,RDD 很自然。
如果数据是表格,有列名、类型、过滤、聚合、join,那么 DataFrame/SparkSQL 更合适,也更容易被 Spark 优化。
22. 把四个 Tutorial 串成一套学习路线
如果你要真正掌握这组资料,建议按下面顺序练:
第一轮:Pandas
- 用
pd.read_csv("data.tsv", sep="\t")读取电影数据。 - 用条件筛选找出 1990 年之后女性用户相关记录。
- 用
apply(axis=1)合并 genre。 - 读取
ratings.csv并和电影数据 merge。 - 用 groupby 计算每部电影平均评分。
第二轮:Dask
- 创建 Dask Array,观察 chunks。
- 修改 chunk size,比较计算表现。
- 解压
data.zip,用dd.read_csv("data/*.csv")读取多文件。 - 计算未取消航班、机场延误、星期延误。
- 比较
compute()、dask.compute()和persist()。 - 用
map_partitions()写自定义转换。
第三轮:Spark RDD
- 创建 SparkSession 和 SparkContext。
- 用
parallelize()创建 RDD。 - 练习
map、filter、flatMap。 - 完成 word count。
- 完成 key-value RDD 的
reduceByKey和 join。 - 完成 DNA k-mer count 和 palindrome filter。
第四轮:SparkSQL
- 读取
mtcars.csv。 - 练习
select、filter、sort、drop、agg。 - 用
withColumn和col创建新列。 - 用
when做分级标签。 - 理解 DataFrame 和 RDD 的互相转换。
23. 最重要的迁移关系
这组 tutorial 的价值在于,你会发现很多操作其实是同一种思想的不同实现。
| 数据分析意图 | Pandas | Dask | Spark RDD | SparkSQL |
|---|---|---|---|---|
| 读取数据 | pd.read_csv |
dd.read_csv |
textFile |
ss.read.csv |
| 按行转换 | apply(axis=1) |
map_partitions |
map / flatMap |
withColumn |
| 过滤 | boolean mask | boolean mask | filter |
filter |
| 分组聚合 | groupby().mean() |
groupby().mean().compute() |
map + reduceByKey |
groupBy().agg() |
| 合并 | merge |
limited merge/join | join |
join |
| 触发执行 | 立即执行 | compute() |
collect() / action |
action,如 show() |
理解这张表,比记住单个 API 更重要。因为以后换工具时,你会知道自己真正要做的是哪类数据操作。
24. 常见坑
Pandas 的 and / or 不能直接用于 Series 条件。
要用 & 和 |,并给每个条件加括号:
1 | df[(df["year"] > 1990) & (df["gender"] == "F")] |
Pandas 单中括号和双中括号返回类型不同。
1 | df["name"] # Series |
Dask 看到结果对象不代表已经计算。
Dask 很多操作只是构建 task graph,需要 compute()。
Dask chunk 太小或太大都不好。
太小会有调度开销,太大可能内存压力大。
Spark 的 collect() 不要乱用在大数据上。
collect() 会把分布式结果拉回 driver,本地小练习可以用,真实大数据场景容易爆内存。
Spark DataFrame 里的 col() 是列表达式,不是普通 Python 值。
写 SparkSQL 时,要习惯 expression-based programming。
25. 总结
这组资料最值得学到的不是“某个函数怎么写”,而是从单机到分布式的数据分析抽象。
Pandas 让你熟悉表格数据分析的基本语法:读、筛、改、合、聚合。
Dask 让你理解 partitions、chunks、lazy evaluation、task graph,以及为什么计算需要延迟和调度。
Spark RDD 让你理解最基础的分布式函数式计算:map、flatMap、filter、reduceByKey、join。
SparkSQL 让你回到更接近业务分析的 DataFrame/SQL 风格,同时保留分布式执行能力。
如果你未来要做数据挖掘、搜索推荐、RAG 数据管线或大模型应用工程,这条路线很实用:小数据先用 Pandas 验证逻辑,数据变大后用 Dask 或 Spark 扩展,结构化分析用 SparkSQL,非结构化或 key-value 任务用 RDD 思维处理。
DSAI4205 动手教程:用 Pandas、Dask、Spark RDD 与 SparkSQL 完成大数据分析入门
https://richardf123.github.io/2026/06/24/dsai4205-big-data-hands-on-tutorial/