news 2026/5/5 7:08:25

RDD API 学习

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
RDD API 学习

📊 RDD vs DataFrame 对比

特性RDDDataFrame
API 风格函数式(Scala/Java)声明式(SQL)
性能较慢更快(Catalyst 优化)
类型安全编译时运行时
内存管理手动(JVM)自动(Tungsten)
适用场景复杂 ETL/算法结构化查询

学习目标:理解 Spark 底层原理,处理复杂数据转换。

🔧 RDD 基础操作

1.1 从 DataFrame 转换为 RDD

scala

// 将 DataFrame 转为 RDD val rdd = df.rdd // 查看 RDD 类型 println(rdd.getClass) // 查看分区数 println(s"RDD 分区数: ${rdd.getNumPartitions}") // 查看第一个元素 rdd.first()

1.2 创建 RDD 的几种方式

scala

// 方式1:从集合创建 val dataRDD = spark.sparkContext.parallelize(Seq(1,2,3,4,5,6,7,8,9,10)) dataRDD.collect().foreach(print)// 方式2:从文件创建 val textRDD = spark.sparkContext.textFile("/opt/data/user_behavior.csv") println(s"文件 RDD 行数: ${textRDD.count()}")

1.3 RDD 基本操作:Transformation

scala

// map:逐行转换 val userRDD = textRDD.map(line => { val arr = line.split(",") (arr(0).toLong, arr(3)) // (user_id, behavior_type) }) userRDD.take(10).foreach(println)// filter:过滤 val pvRDD = userRDD.filter(_._2 == "pv") println(s"PV 数量: ${pvRDD.count()}")// distinct:去重 val distinctUsers = userRDD.map(_._1).distinct() println(s"去重用户数: ${distinctUsers.count()}")

🔧 RDD 聚合操作

2.1 reduceByKey:按 Key 聚合

scala

// 统计每个用户的行为次数 val userBehaviorCount = userRDD.map(x => (x._1, 1)) .reduceByKey(_ + _) .sortBy(_._2, false) println("行为次数 Top 10 用户:") userBehaviorCount.take(10).foreach(println)

2.2 groupByKey vs reduceByKey

scala

// groupByKey(不推荐,性能差) val grouped = userRDD.groupByKey() println("groupByKey 结果示例:") grouped.mapValues(_.size).take(5).foreach(println) // reduceByKey(推荐,预聚合) val reduced = userRDD.mapValues(_ => 1).reduceByKey(_ + _) println("reduceByKey 结果示例:") reduced.take(5).foreach(println)

1. groupByKey

  • 先把所有数据 shuffle 拉过去,再分组、再计算

  • 数据全部在网络传输

  • 中间不做任何合并

  • 容易OOM、卡死、倾斜

2. reduceByKey

  • 先在本地预聚合(Map 端聚合),再 shuffle

  • 网络传输数据量大大减少

  • 速度快、不占内存、不倾斜

2.3 aggregateByKey:自定义聚合

scala

// 统计每个用户的行为类型分布 val behaviorAgg = userRDD.map(x => (x._1, Seq[String](x._2))) .aggregateByKey(Seq.empty[String])( (seq, value) => seq ++ Seq(value), // seqOp: 分区内合并 (seq1, seq2) => seq1 ++ seq2 // combOp: 分区间合并 ) behaviorAgg.take(5).foreach(println)


🔧 RDD 高级操作

3.1 join:关联两个 RDD

scala

// 创建用户维度 RDD val userDimRDD = spark.sparkContext.parallelize(Seq( (3987L, "活跃用户"), (8527L, "新用户"), (124L, "高活用户"), (7450L, "付费用户") ))// 行为 RDD val behaviorRDD = userRDD.take(100).map(x => (x._1, x._2))// 执行 Join val joinedRDD = behaviorRDD.join(userDimRDD) joinedRDD.foreach(println)

// 遇到的报错

val joinedRDD = behaviorRDD.join(userDimRDD)
<console>:24: error: value join is not a member of Array[(Long, String)]
val joinedRDD = behaviorRDD.join(userDimRDD)

之前val behaviorRDD = userRDD.take(100).map(x => (x._1, x._2))

take操作使得behaviorRDD变成普通数组,普通数组没有join方法

更改

val behaviorRDD = spark.sparkContext.parallelize( userRDD.take(100) )

先take取数据再转成rdd

3.2 mapPartitions:分区级别操作

scala

// 每个分区内计算 val partitionCounts = rdd.mapPartitions(iter => { var count = 0 while (iter.hasNext) { count += 1 iter.next() } Iterator(count) }).collect() println(s"每个分区的数据量: ${partitionCounts.mkString(", ")}")

3.3 持久化 RDD

scala

// 缓存 RDD val cachedRDD = userRDD.cache() println(s"缓存后触发计算: ${cachedRDD.count()}") // 查看存储级别 println(cachedRDD.getStorageLevel) // 查看缓存状态 spark.sparkContext.getExecutorMemoryStatus.foreach(println)


📊 RDD 操作分类总结

类型操作说明
Transformationmap,filter,flatMap懒执行,返回新 RDD
聚合reduceByKey,groupByKey按 Key 聚合
排序sortBy,sortByKey全局排序
关联join,leftOuterJoin两个 RDD 关联
分区repartition,coalesce调整分区数
Actioncount,collect,take触发计算
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/5 7:06:49

Claude配置编辑器:可视化定制AI助手行为,提升工作效率

1. 项目概述&#xff1a;一个专为Claude设计的配置编辑器最近在折腾AI助手Claude的时候&#xff0c;发现一个挺有意思的开源项目&#xff0c;叫claude-settings-editor。简单来说&#xff0c;它就是一个能让你更方便地管理和编辑Claude配置文件的工具。如果你用过Claude&#x…

作者头像 李华
网站建设 2026/5/5 7:04:28

Copr命令行工具实战:从RPM打包到自动化构建发布

1. 项目概述与核心价值 最近在折腾一些RPM包的构建&#xff0c;发现了一个挺有意思的项目——sureclaw-ai/copr。这名字乍一看&#xff0c;可能很多朋友会联想到Fedora社区那个大名鼎鼎的Copr构建服务。没错&#xff0c;这个项目正是那个服务的命令行客户端工具。但如果你以为…

作者头像 李华
网站建设 2026/5/5 7:01:16

LVDS技术解析:差分信号如何有效抑制EMI干扰

1. LVDS技术概述与EMI挑战在当代电子系统中&#xff0c;电磁干扰(EMI)已成为制约系统性能提升的关键瓶颈。随着数据速率突破Gbps量级&#xff0c;传统单端信号传输方式暴露出的辐射问题日益显著。我曾参与过一款医疗影像设备的研发&#xff0c;当系统时钟频率超过100MHz时&…

作者头像 李华
网站建设 2026/5/5 7:01:14

大语言模型偏见问题:评估与缓解技术实践

1. 大语言模型偏见问题的现状与挑战最近在调试一个开源大语言模型时&#xff0c;遇到一个典型案例&#xff1a;当输入"护士"这个职业词时&#xff0c;模型生成的描述中78%使用了"她"作为代词&#xff1b;而输入"工程师"时&#xff0c;83%的案例使…

作者头像 李华