从代码反推Spark执行计划:两个实战案例拆解Job/Stage/Task生成逻辑
当你第一次接触Spark的Job、Stage、Task概念时,是否曾被各种抽象解释绕得云里雾里?本文将通过逆向思维,带你看代码→猜执行计划→验证UI的完整推理过程。不同于传统教学路径,我们将从两个典型PySpark代码片段出发,像侦探破案一样,一步步还原Spark底层的执行逻辑。
1. 逆向学习法:为什么从代码反推更有效?
大多数Spark教程会先抛出Job/Stage/Task的定义,再给出代码示例。这种"概念→实例"的教学路径往往让初学者陷入"看似明白,实则模糊"的状态。而逆向学习法的优势在于:
- 问题导向:从具体代码出发,带着明确问题去探索
- 验证闭环:推理结果可通过Spark UI直接验证
- 记忆深刻:自主推导的过程比被动接受更易形成长期记忆
提示:本文所有案例均在Spark 3.2+环境验证,建议读者边阅读边在本地pyspark shell中复现
2. 案例一:基础转换与行动操作
让我们从一个基础代码片段开始:
rdd = sc.parallelize([1,3,2,5,7,9,3,5,4], 3) rdd.distinct().collect() # [9, 3, 1, 4, 7, 5, 2] rdd.filter(lambda x:x % 2!=0).collect() # [1, 3, 5, 7, 9, 3, 5]2.1 第一步:识别行动算子(Action)
Spark的懒执行机制决定了只有遇到行动算子才会触发实际计算。在这段代码中:
collect()出现了两次- 每次
collect()都会触发一个独立的Job执行
因此可以确定:总Job数=2
2.2 第二步:分析Stage划分
Stage的划分取决于Shuffle操作的存在。让我们分别分析两个Job:
Job1: distinct().collect()
distinct()操作需要全局去重,必然引发Shuffle- Shuffle前后会形成Stage边界
collect()作为结果收集操作,本身也是一个Stage
Job2: filter().collect()
filter()是窄转换(narrow transformation),不涉及Shuffle- 整个流水线可以合并为一个Stage
因此得出:总Stage数=2(Job1)+1(Job2)=3
2.3 第三步:计算Task数量
Task数量由Stage内的分区数决定。初始RDD明确指定了3个分区:
- Job1:
- Stage0(distinct): 3个分区 → 3个Task
- Stage1(collect): distinct输出默认保持分区数 → 3个Task
- Job2:
- 单个Stage: filter保持分区不变 → 3个Task
汇总得:总Task数=6(Job1)+3(Job2)=9
2.4 Spark UI验证
在Spark UI的Jobs标签页,我们确实能看到:
- 两个独立的Job记录
- 第一个Job显示2个Stages,第二个Job显示1个Stage
- Tasks计数与我们的推理完全一致
3. 案例二:Join操作的执行计划
现在来看一个涉及Shuffle Join的复杂案例:
x = sc.parallelize([("a", 1), ("b", 4)]) y = sc.parallelize([("a", 2), ("a", 3), ("c", 5)], 3) r = x.join(y) r.collect() # [('a', (1, 3)), ('a', (1, 2))]3.1 Job数量分析
这段代码中:
- 只有一个行动算子
collect() - 因此只产生1个Job
3.2 Stage划分逻辑
Join操作是典型的宽转换(wide transformation):
- Stage0:执行join前的准备工作
- 需要对两个RDD按key重新分区
- 这是一个Shuffle边界
- Stage1:实际执行join后的collect操作
因此总Stage数为2,其中:
- Stage0包含join前的所有操作
- Stage1负责最终结果收集
3.3 Task数量计算
这里的分区情况更复杂:
- RDD x:未指定分区数,默认=2(执行器数量)
- RDD y:明确指定3个分区
- join操作会产生分区继承:
Stage0:
- 需要处理x(2分区)和y(3分区)的Shuffle读写
- 实际Task数=2(x)+3(y)=5
Stage1:
- join结果的分区数默认继承父RDD的最大值=3
- 但collect操作会合并所有分区数据到Driver
- 实际Task数=3
注意:不同Spark版本对join后分区数的处理可能略有差异
3.4 UI验证要点
在Spark UI中重点关注:
- DAG可视化图中的Stage划分
- 输入数据大小与Shuffle数据量的对应关系
- 每个Stage的Task执行时间分布
4. 高级技巧:优化执行计划的方法
理解了基本原理后,我们可以主动优化代码:
4.1 减少Shuffle操作
# 非优化写法(两次Shuffle) rdd.distinct().groupByKey() # 优化写法(一次Shuffle) rdd.groupByKey().mapValues(list(set))4.2 合理设置分区数
# 在Shuffle后调整分区数 rdd.join(otherRdd).coalesce(10)4.3 缓存中间结果
# 避免重复计算 processed = rdd.filter(...).distinct().cache() res1 = processed.map(...) res2 = processed.reduceByKey(...)5. 调试实战:当结果不符合预期时
遇到执行计划与预期不符的情况,可以:
- 检查
explain()输出:rdd.join(otherRdd).explain() - 确认Shuffle操作是否真的发生
- 检查自定义分区器的实现
- 对比不同Spark版本的执行计划差异
在最近的一个数据处理项目中,我们发现同样的join操作在Spark 3.1和3.3中产生了不同的Stage划分,最终通过调整spark.sql.shuffle.partitions参数解决了性能问题。