news 2026/6/10 0:24:20

大数据Spark(八十):Action行动算子fold和aggregate使用案例

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
大数据Spark(八十):Action行动算子fold和aggregate使用案例

文章目录

Action行动算子fold和aggregate使用案例

一、fold使用案例

二、aggregate使用案例


Action行动算子fold和aggregate使用案例

一、fold使用案例

fold用于对RDD中的元素进行聚合操作,最终返回一个结果。类似reduce算子,但与reduce不同的是其可以对每个分区中的数据提供一个初始值,让分区中的数据与该初始值进行聚合,最终该初始值还会与各个分区的结果再次聚合。

fold的函数签名如下:

def fold(zeroValue: T)(op: (T, T) => T): T
  • zeroValue:聚合操作的初始值,类型为 T。
  • op:用于合并元素的二元操作函数。

fold的工作原理:在每个分区内,fold 使用初始值 zeroValue 和二元操作函数 op,将该分区内的所有元素进行聚合。在所有分区内的聚合完成后,fold 将各分区的结果与初始值 zeroValue 一起,使用相同的二元操作函数 op 进行全局聚合,得到最终结果。

Java代码:

SparkConf conf = new SparkConf().setMaster("local").setAppName("FoldTest"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> rdd = sc.parallelize(Arrays.asList("a","b","c","d","e","f"), 3); rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() { @Override public Iterator<String> call(Integer index, Iterator<String> iter) throws Exception { ArrayList<String> list = new ArrayList<>(); while (iter.hasNext()) { String next = iter.next(); list.add("rdd partition index: " + index + " current value: " + next); } return list.iterator(); } },true).foreach(x-> System.out.println(x)); /** * 0号分区:a b * 1号分区:c d * 2号分区:e f * * 0号分区:hello~a~b * 1号分区:hello~c~d * 2号分区:hello~e~f * * 最终结果:hello~hello~a~b~hello~c~d~hello~e~f */ String str = rdd.fold("hello", new Function2<String, String, String>() { @Override public String call(String v1, String v2) throws Exception { return v1 + "~" + v2; } }); System.out.println(str); sc.stop();

Scala代码:

val conf = new SparkConf().setMaster("local").setAppName("FoldTest") val sc = new SparkContext(conf) val rdd: RDD[String] = sc.parallelize(List("a", "b", "c", "d", "e", "f"), 3) rdd.mapPartitionsWithIndex((index, iter) => { val list = new ListBuffer[String]() while (iter.hasNext) { list.append(s"rdd partition index: $index ,current value: ${iter.next()}") } list.iterator }).foreach(println) /** * 0号分区:a b * 1号分区:c d * 2号分区:e f * map端聚合: * 0号分区:hello~a~b * 1号分区:hello~c~d * 2号分区:hello~e~f * * 最终结果:hello~hello~a~b~hello~c~d~hello~e~f */ val result: String = rdd.fold("hello")((v1, v2) => { v1 + "~" + v2 }) println(result) sc.stop()

二、aggregate使用案例

aggregate用于对RDD中的元素进行聚合操作,最终返回一个结果。与 fold 和 reduce 等算子不同,aggregate 允许用户分别定义分区内和分区间的聚合函数,提供了更大的灵活性。

aggregate函数签名如下:

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
  • zeroValue:聚合操作的初始值,类型为 U。
  • seqOp:分区内的聚合函数,用于将分区内的元素与累加器进行合并,类型为 (U, T) => U。
  • combOp:分区间的聚合函数,用于将不同分区的累加器结果进行合并,类型为 (U, U) => U

aggregate工作原理:在每个分区内,使用初始值 zeroValue 和函数 seqOp,将该分区内的所有元素进行聚合。在所有分区内的聚合完成后,使用初始值 zeroValue 和函数 combOp,将各分区的结果进行全局聚合,得到最终结果。

Java代码:

SparkConf conf = new SparkConf().setMaster("local").setAppName("AggregateTest"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> rdd = sc.parallelize(Arrays.asList("a","b","c","d","e","f"), 3); rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() { @Override public Iterator<String> call(Integer index, Iterator<String> iter) throws Exception { ArrayList<String> list = new ArrayList<>(); while (iter.hasNext()) { String next = iter.next(); list.add("rdd partition index: " + index + " current value: " + next); } return list.iterator(); } },true).foreach(x-> System.out.println(x)); /** * 0号分区:a b * 1号分区:c d * 2号分区:e f * * map端聚合: * 0号分区:hello~a~b * 1号分区:hello~c~d * 2号分区:hello~e~f * * 最终结果:hello@hello~a~b@hello~c~d@hello~e~f */ String result = rdd.aggregate("hello", new Function2<String, String, String>() { @Override public String call(String s1, String s2) throws Exception { return s1 + "~" + s2; } }, new Function2<String, String, String>() { @Override public String call(String s1, String s2) throws Exception { return s1 + "@" + s2; } }); System.out.println(result); sc.stop();

Scala代码:

val conf = new SparkConf().setMaster("local").setAppName("FoldTest") val sc = new SparkContext(conf) val rdd: RDD[String] = sc.parallelize(List("a", "b", "c", "d", "e", "f"), 3) rdd.mapPartitionsWithIndex((index, iter) => { val list = new ListBuffer[String]() while (iter.hasNext) { list.append(s"rdd partition index: $index ,current value: ${iter.next()}") } list.iterator }).foreach(println) val result: String = rdd.aggregate("hello")( (v1, v2) => { v1 + "~" + v2 }, (v1, v2) => { v1 + "@" + v2 } ) println(result) sc.stop()

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/8 19:46:44

军储库区空间结构透视与人车作业态势一体化感知技术方案

军储库区空间结构透视与人车作业态势一体化感知技术方案摘要军需仓储库区作为高价值物资集中存储与高频作业的重要基础设施&#xff0c;对作业安全性、过程合规性及事后可追溯性具有极高要求。然而&#xff0c;传统基于二维视频画面的监控方式&#xff0c;难以真实反映人员与车…

作者头像 李华
网站建设 2026/6/8 20:09:20

透明物体渲染的步骤(大白话生动版:你以为是“画一下”,其实是“排队、算账、再排队”)

透明物体渲染这件事,表面看起来很简单:不就是玻璃、水、烟、火、UI、半透明布料嘛。 但做过的人都知道:透明一多,画面就开始“玄学”—— 玻璃前后顺序一乱,就像贴纸叠错了; 粒子一多,GPU 直接变“暖手宝”; 深度一开一关,边缘不是穿帮就是发灰; 折射、反射、雾、后处…

作者头像 李华
网站建设 2026/6/8 20:09:36

提示工程的容器编排技巧:优化提示响应时间的5个方法

提示工程的容器编排技巧&#xff1a;优化提示响应时间的5个方法 一、引入与连接 引人入胜的开场 想象一下&#xff0c;你是一家电商公司的客服主管&#xff0c;每天要处理成千上万条客户的咨询信息。客户们都希望能在最短的时间内得到准确的回复&#xff0c;就像你在饥饿时渴望…

作者头像 李华
网站建设 2026/6/8 19:09:04

2026-02-08 全国各地响应最快的 BT Tracker 服务器(联通版)

数据来源&#xff1a;https://bt.me88.top 序号Tracker 服务器地域网络响应(毫秒)1http://60.249.37.20:6969/announce广东肇庆联通282http://211.75.205.187:6969/announce广东肇庆联通293http://211.75.205.188:80/announce广西柳州联通334http://180.114.103.80:6969/annou…

作者头像 李华
网站建设 2026/6/9 22:23:38

基于Spark的豆瓣读书分析大屏可视化(设计源文件+万字报告+讲解)(支持资料、图片参考_相关定制)_文章底部可以扫码

基于Spark的豆瓣读书分析大屏可视化(设计源文件万字报告讲解)&#xff08;支持资料、图片参考_相关定制&#xff09;_文章底部可以扫码数据采集&#xff1a;豆瓣读书网站爬虫(requests、lxml、…) 数据存储&#xff1a;将爬取的数据保存为csv文件&#xff0c;保存到本地或上传到…

作者头像 李华
网站建设 2026/6/9 1:57:38

CentOS7高效部署WebRTC信令服务器:从选型到性能调优实战

背景痛点&#xff1a;CentOS7部署WebRTC信令的“拦路虎” 在实时音视频应用开发中&#xff0c;WebRTC负责端到端的媒体传输&#xff0c;而信令服务器则是整个通信的“交通指挥中心”&#xff0c;负责协商建立连接。然而&#xff0c;在经典的CentOS 7服务器上部署一个高性能、稳…

作者头像 李华