news 2026/6/17 14:54:47

Spark 行动算子(Action)全面解析

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Spark 行动算子(Action)全面解析

Spark 行动算子(Action)全面解析

摘要:本文系统梳理 Spark 中的行动算子,涵盖触发机制、常用 API 分类、执行原理与实际使用场景,帮助你真正理解 Action 背后发生了什么。


一、为什么要讲行动算子?

在 Spark 的编程模型中,算子分为两大类:

类型特点是否触发计算
转换算子(Transformation)惰性求值,返回新 RDD❌ 不触发
行动算子(Action)触发 DAG 执行,返回结果或写出数据✅ 触发

核心理解:每一次调用 Action,Spark 就会向 Driver 提交一个Job,Driver 将 DAG 切分成若干Stage,Stage 内部拆分为Task并分发到各 Executor 执行。

调用 Action └─► 提交 Job └─► DAG Scheduler 切分 Stage └─► Task Scheduler 分发 Task └─► Executor 执行 → 结果返回 Driver

二、行动算子分类总览

Action 算子 ├── 聚合类 collect / count / countByKey / countByValue ├── 取值类 first / take / takeOrdered / takeSample ├── 归约类 reduce / fold / aggregate ├── 遍历类 foreach / foreachPartition ├── 保存类 saveAsTextFile / saveAsObjectFile / saveAsSequenceFile └── 统计类 max / min / sum / mean / variance / stdev

三、常用行动算子详解

3.1 collect()

将 RDD 中所有数据收集到 Driver 端,返回Array

valrdd=sc.parallelize(List(1,2,3,4,5))valresult=rdd.collect()// result: Array[Int] = Array(1, 2, 3, 4, 5)

⚠️注意:生产环境中慎用!数据量过大会导致 Driver OOM。仅适合数据量可控的场景(如测试、调试)。


3.2 count()

返回 RDD 中元素的总个数。

valrdd=sc.parallelize(List("a","b","c","a"))println(rdd.count())// 4

3.3 countByKey()

仅适用于PairRDD (K, V),统计每个 Key 出现的次数,返回Map[K, Long]

valrdd=sc.parallelize(List(("a",1),("b",2),("a",3)))valresult=rdd.countByKey()// result: Map(a -> 2, b -> 1)

3.4 countByValue()

统计 RDD 中每个元素出现的次数,返回Map[T, Long]

valrdd=sc.parallelize(List("apple","banana","apple","orange"))valresult=rdd.countByValue()// result: Map(apple -> 2, banana -> 1, orange -> 1)

3.5 first()

返回 RDD 中的第一个元素,等价于take(1)(0)

valrdd=sc.parallelize(List(10,20,30))println(rdd.first())// 10

3.6 take(n)

返回 RDD 前 n 个元素组成的数组,不保证顺序(按分区顺序扫描)。

valrdd=sc.parallelize(List(5,3,1,4,2))rdd.take(3)// Array(5, 3, 1)

3.7 takeOrdered(n)

返回 RDD 中最小的 n 个元素(升序),可自定义排序规则。

valrdd=sc.parallelize(List(5,3,1,4,2))rdd.takeOrdered(3)// Array(1, 2, 3)rdd.takeOrdered(3)(Ordering[Int].reverse)// Array(5, 4, 3) 降序

take的区别:takeOrdered会在每个 Partition 局部排序后再归并,效率优于sortBy + take


3.8 reduce(func)

通过一个二元函数对 RDD 所有元素进行归约,要求函数满足交换律和结合律

valrdd=sc.parallelize(List(1,2,3,4,5))valsum=rdd.reduce((a,b)=>a+b)// 15valmax=rdd.reduce((a,b)=>if(a>b)aelseb)// 5

执行过程:先在每个 Partition 内部归约,再将各 Partition 结果汇总到 Driver 做最终归约。


3.9 aggregate(zeroValue)(seqOp, combOp)

aggregate是最通用的归约算子,允许返回值类型与输入类型不同

  • seqOp:分区内的聚合函数(U, T) => U
  • combOp:分区间的合并函数(U, U) => U
// 同时计算总和与元素个数,从而得到平均值valrdd=sc.parallelize(List(1,2,3,4,5),2)val(sum,count)=rdd.aggregate((0,0))((acc,num)=>(acc._1+num,acc._2+1),// seqOp(a,b)=>(a._1+b._1,a._2+b._2)// combOp)valavg=sum.toDouble/count// 3.0

3.10 foreach(func)

对 RDD 每个元素执行函数,在 Executor 端执行,不返回值。常用于写入外部系统。

rdd.foreach(x=>println(x))// 输出在 Executor 端,Driver 不可见

3.11 foreachPartition(func)

分区为单位执行函数,每个分区调用一次。适合需要建立连接的场景(如数据库写入),避免每条数据都创建连接。

rdd.foreachPartition{iter=>valconn=createDBConnection()// 每个分区只建一次连接iter.foreach{record=>conn.write(record)}conn.close()}

最佳实践:凡是涉及外部资源(数据库、消息队列、缓存),优先用foreachPartition而非foreach,显著降低连接开销。


3.12 saveAsTextFile(path)

将 RDD 保存为文本文件,每个元素调用toString写为一行。分区数决定输出文件数。

rdd.saveAsTextFile("hdfs://namenode/output/result")// 输出: /output/result/part-00000, part-00001, ...

四、行动算子执行原理深入

4.1 宽依赖与 Stage 划分

rdd1 ──map──► rdd2 ──filter──► rdd3 ──reduceByKey──► rdd4 ──collect() ▲ Shuffle 边界 ◄── Stage 0 ──┤──── Stage 1 ───►
  • mapfilter是窄依赖,同属一个 Stage
  • reduceByKey触发 Shuffle,产生 Stage 边界
  • 调用collect()触发整个 Job 执行

4.2 Action 与 Job 的关系

valrdd=sc.textFile("data.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)rdd.count()// Job 1rdd.collect()// Job 2,DAG 重新执行(除非 cache)

每次 Action 都是一个独立 Job。如果 RDD 会被多次使用,务必调用cache()persist()避免重复计算。


五、常见误区与最佳实践

❌ 误区 1:在循环中多次调用 Action

// 错误:每次循环都触发一个 Jobfor(i<-1to10){println(rdd.count())// 触发 10 个 Job!}// 正确:缓存后复用valcached=rdd.cache()valtotal=cached.count()

❌ 误区 2:用 collect() 处理大数据集

// 危险:数据全量拉到 Driverrdd.collect().foreach(process)// 安全:在 Executor 端处理rdd.foreach(process)// 或写出到存储rdd.saveAsTextFile(outputPath)

✅ 最佳实践总结

场景推荐算子
调试/验证少量数据take(n)first()
统计元素数量count()
全局聚合(同类型)reduce()
全局聚合(跨类型)aggregate()
写入外部存储foreachPartition()
落地到 HDFSsaveAsTextFile()
RDD 多次复用cache(),再 Action

六、总结

行动算子是 Spark 程序的"触发器",理解它的核心在于:

  1. 惰性求值:Transformation 只是构建 DAG,Action 才真正触发计算
  2. Job 粒度:每个 Action 对应一个 Job,合理减少 Action 调用次数
  3. 数据位置collect/take将数据拉回 Driver,foreach/save在 Executor 端处理
  4. 缓存策略:多次复用同一 RDD 时,配合cache()避免重算

掌握行动算子的选择与优化,是写出高性能 Spark 程序的基础。


如有问题欢迎在评论区交流,也欢迎关注后续 Spark 系列文章 🚀

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/17 14:53:19

RK Android15 以太网静态IP重启丢失的解决方案

前言 在Rockchip平台的Android 15开发中,许多开发者会遇到一个棘手的问题:通过Settings界面为以太网设置静态IP后,设备重启或网络重连后,之前配置的静态IP地址会丢失,系统恢复为DHCP自动获取模式。 问题根源:Android版本演进带来的路径变更 Google在Android的发展过程…

作者头像 李华
网站建设 2026/6/14 3:37:58

从策略开通到全网治理,某头部航空企业这样升级安全策略管理

对于航空运输行业而言&#xff0c;网络边界复杂、业务连续性要求高&#xff0c;安全策略管理能力直接关系到业务稳定与安全运营。近期&#xff0c;联软科技为国内某头部航空企业落地安全策略管理项目。作为行业内具有重要影响力的大型航司&#xff0c;该客户网络覆盖生产、测试…

作者头像 李华
网站建设 2026/6/14 3:37:57

为什么只有C/C++还在用头文件?其他语言都淘汰了?

很多初学编程的朋友都会有一个疑惑&#xff0c;为什么学C、C的时候&#xff0c;总要多写一堆后缀为.h的头文件&#xff0c;写代码步骤繁琐又容易出错&#xff1f;但日常主流的Java、Python&#xff0c;还有Go、TS这些新语言&#xff0c;全都彻底抛弃了这种设计。 头文件的本质&…

作者头像 李华
网站建设 2026/6/14 3:37:56

SAP SmartForms打印二维码大小不一?一个空格补全技巧帮你搞定

SAP SmartForms打印二维码尺寸标准化实战指南在SAP物料标签和发货单打印场景中&#xff0c;开发人员经常需要处理一个看似简单却令人头疼的技术细节——当使用SMARTFORMS配合QECODE2005生成二维码时&#xff0c;由于内容长度波动导致的打印尺寸不一致问题。想象一下&#xff0c…

作者头像 李华
网站建设 2026/6/14 3:38:00

吃透Claude Code动态工作流,用法、场景与实战技巧,告别AI任务失效问题

熟悉Claude Code的使用者都清楚&#xff0c;这款AI编程工具在日常编码、代码查询、简单调试等基础任务中表现十分出色。但很多人在处理大规模项目重构、批量数据处理、多维度验证排查、复杂根因分析等高阶任务时&#xff0c;总会遇到各类问题&#xff0c;比如任务做一半提前终止…

作者头像 李华
网站建设 2026/6/14 3:37:56

FigmaCN:让中文设计师无障碍使用全球顶尖设计工具

FigmaCN&#xff1a;让中文设计师无障碍使用全球顶尖设计工具 【免费下载链接】figmaCN 中文 Figma 插件&#xff0c;设计师人工翻译校验 项目地址: https://gitcode.com/gh_mirrors/fi/figmaCN 对于许多中文设计师来说&#xff0c;Figma作为全球领先的设计协作平台&…

作者头像 李华