Spark 是当今最流行的分布式计算框架之一。本文将深入探讨 Spark 的核心架构、RDD 原理、DAG 调度、内存管理以及性能优化策略。
Spark 架构概览
Spark 采用主从架构,由以下几个核心组件组成:
Driver 程序
Driver 是 Spark 应用程序的控制中心,负责:
- 创建 SparkContext,与集群管理器通信
- 构建 DAG(有向无环图)
- 将 DAG 拆分为 Stage
- 调度 Task 到 Executor 执行
- 监控 Task 执行状态
Executor(执行器)
Executor 是运行在 Worker 节点上的进程,负责:
- 运行分配的 Task
- 存储 RDD 的数据分区
- 向 Driver 报告执行状态
- 提供内存管理
集群管理器
Spark 支持多种集群管理器:
- Standalone:Spark 自带的独立集群管理器
- YARN:Hadoop 的资源管理器
- Mesos:通用的集群管理器
- Kubernetes:容器编排平台
执行流程
1. 用户提交 Spark 应用
2. Driver 启动,创建 SparkContext
3. Driver 向集群管理器申请资源
4. 集群管理器启动 Executor
5. Executor 向 Driver 注册
6. Driver 构建并调度 Task
7. Executor 执行 Task 并返回结果
RDD(弹性分布式数据集)
RDD(Resilient Distributed Dataset)是 Spark 最核心的抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。
RDD 的五大特性
- 分区列表:数据被分割成多个分区,每个分区是一个数据集的片段
- 计算函数:每个分区都有一个计算函数
- 依赖关系:RDD 保存对父 RDD 的依赖关系
- 分区器:可选的分区策略(如 HashPartitioner、RangePartitioner)
- 最佳位置:可选的数据本地性偏好
RDD 的创建方式
// 1. 从集合创建
val rdd1 = sc.parallelize(1 to 10)
// 2. 从外部存储创建
val rdd2 = sc.textFile("hdfs://path/to/file")
// 3. 从已有 RDD 转换
val rdd3 = rdd1.map(_ * 2)
RDD 的操作类型
Transformation(转换操作)
Transformation 是懒执行的,不会立即触发计算,只是记录 DAG:
// map:对每个元素应用函数
val rdd = sc.parallelize(1 to 5).map(_ * 2)
// filter:过滤元素
val filtered = rdd.filter(_ > 5)
// flatMap:将每个元素映射为多个元素
val words = sc.parallelize(List("hello world", "spark rdd"))
.flatMap(_.split(" "))
// reduceByKey:按 Key 聚合
val pairs = sc.parallelize(List(("a", 1), ("b", 1), ("a", 1)))
val counts = pairs.reduceByKey(_ + _)
// join:连接两个 RDD
val rdd1 = sc.parallelize(List(("a", 1), ("b", 2)))
val rdd2 = sc.parallelize(List(("a", 3), ("b", 4)))
val joined = rdd1.join(rdd2)
Action(动作操作)
Action 会触发实际的计算:
// collect:将所有元素返回到 Driver
val result = rdd.collect()
// count:返回元素个数
val count = rdd.count()
// reduce:聚合元素
val sum = rdd.reduce(_ + _)
// take:返回前 N 个元素
val first10 = rdd.take(10)
// foreach:对每个元素执行操作
rdd.foreach(println)
// saveAsTextFile:保存到文件系统
rdd.saveAsTextFile("hdfs://path/to/output")
RDD 的依赖关系
窄依赖(Narrow Dependency)
父 RDD 的每个分区最多被子 RDD 的一个分区使用:
RDD A: [Partition 1] [Partition 2] [Partition 3]
↓ ↓ ↓
RDD B: [Partition 1] [Partition 2] [Partition 3]
窄依赖的算子:map、filter、union、flatMap 等
宽依赖(Wide Dependency)
父 RDD 的每个分区被子 RDD 的多个分区使用(Shuffle):
RDD A: [Partition 1] [Partition 2] [Partition 3]
↓ ↓ ↓ ↓ ↓ ↓
RDD B: [Partition 1] [Partition 2] [Partition 3]
宽依赖的算子:groupByKey、reduceByKey、join、sortByKey 等
RDD 的容错机制
RDD 通过 Lineage(血统)实现容错:
- 记录依赖关系:RDD 记录了它如何从父 RDD 计算而来
- 重新计算:当分区丢失时,根据依赖关系重新计算
- 检查点:对于长 Lineage,可以设置检查点避免重新计算
// 设置检查点
sc.setCheckpointDir("hdfs://path/to/checkpoint")
// 对 RDD 进行检查点
rdd.checkpoint()
DAG 调度器
DAG 调度器负责将 RDD 的依赖关系转换为可执行的 Stage。
Stage 的划分
Stage 划分的基本规则:
- 根据宽依赖划分 Stage
- 每个 Stage 内部只包含窄依赖
- Stage 之间通过 Shuffle 边界连接
Stage 1: map -> filter -> map
↓ (Shuffle)
Stage 2: groupByKey -> map
↓ (Shuffle)
Stage 3: reduceByKey
Task 调度
Task 调度器负责将 Stage 中的 Task 分配到 Executor:
- TaskSet 提交:DAG 调度器提交 TaskSet 给 Task 调度器
- 本地性调度:优先调度到数据所在的节点
- 推测执行:对于慢的 Task,启动备份 Task
- 结果收集:收集 Task 执行结果
数据本地性
Spark 优先将 Task 调度到数据所在的节点,减少网络传输:
- PROCESS_LOCAL:数据和计算在同一个进程
- NODE_LOCAL:数据和计算在同一个节点
- RACK_LOCAL:数据和计算在同一个机架
- ANY:数据可能在任意位置
内存管理
Spark 的内存管理是其高性能的关键之一。
堆内内存和堆外内存
堆内内存(Heap Memory)
- 由 JVM 管理
- 受 GC 影响
- 存储对象和序列化数据
堆外内存(Off-Heap Memory)
- 由 Spark 直接管理
- 不受 GC 影响
- 存储二进制数据
- 通过
spark.memory.offHeap.enabled启用
内存区域划分
graph TD
Total[总内存]
Exec[Execution Memory 执行内存]
Stor[Storage Memory 存储内存]
SJ[Shuffle Join]
SS[Shuffle Sort]
SA[Shuffle Aggregation]
Cache[Cache]
BV[Broadcast Variables]
Total --> Exec
Total --> Stor
Exec --> SJ
Exec --> SS
Exec --> SA
Stor --> Cache
Stor --> BV
内存配置
# 执行内存占比
spark.memory.fraction 0.6
# 存储内存占比
spark.memory.storageFraction 0.5
# 堆外内存大小
spark.memory.offHeap.size 1g
# 堆外内存启用
spark.memory.offHeap.enabled true
内存溢出处理
1. 增加执行内存
spark.executor.memory 4g
spark.memory.fraction 0.8
2. 减少 Shuffle 数据量
// 在 Shuffle 前进行聚合
val reduced = rdd.map(x => (x._1, (x._2, 1)))
.reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
3. 使用广播变量
// 广播小表
val broadcastVar = sc.broadcast(smallData)
val result = largeData.map(x => (x, broadcastVar.value))
Shuffle 机制
Shuffle 是 Spark 中最昂贵的操作,涉及跨节点的数据重分布。
Shuffle 过程
Map Phase:
1. Partitioning:根据分区器将数据分配到不同分区
2. Sorting:对每个分区的数据进行排序
3. Spilling:内存不足时溢写到磁盘
Reduce Phase:
1. Fetching:从各个节点拉取数据
2. Merging:合并拉取的数据
3. Aggregation:执行聚合操作
Shuffle 实现方式
HashShuffle
- Spark 1.2 之前的默认方式
- 为每个 Reduce Task 创建一个文件
- 文件数量:Map Task × Reduce Task
- 优点:不需要排序
- 缺点:文件数量过多
SortShuffle
- Spark 1.2 之后的默认方式
- 为每个 Map Task 创建一个数据文件和一个索引文件
- 文件数量:2 × Map Task
- 优点:文件数量少,支持排序
- 缺点:需要排序
Tungsten SortShuffle
- 使用堆外内存和二进制格式
- 优化的排序算法
- 减少序列化和反序列化开销
Shuffle 优化
# 启用 Tungsten Shuffle
spark.sql.shuffle.partitions 200
# 增加 Shuffle 缓冲区大小
spark.shuffle.file.buffer 64k
# 增加 Shuffle Sort 缓冲区大小
spark.shuffle.sort.initialBufferFactor 0.2
# 启用 Shuffle Spill 压缩
spark.shuffle.compress true
# 启用 Shuffle Spill 合并
spark.shuffle.spill.numElementsForceSpillThreshold 52428800
性能优化
1. 并行度优化
// 设置并行度
val rdd = sc.parallelize(1 to 1000000, 100)
// 重新分区
val repartitioned = rdd.repartition(100)
// 增加分区数
val increased = rdd.coalesce(200, shuffle = true)
2. 数据序列化
# 使用 Kryo 序列化
spark.serializer org.apache.spark.serializer.KryoSerializer
# 注册 Kryo 类
spark.kryo.registrationRequired true
# Kryo 缓冲区大小
spark.kryoserializer.buffer.max 512m
3. 广播变量
// 广播大变量
val lookupTable = sc.broadcast(Map(1 -> "a", 2 -> "b"))
// 在 Task 中使用
val result = rdd.map(x => lookupTable.value.getOrElse(x, "unknown"))
4. 累加器
// 创建累加器
val counter = sc.longAccumulator("counter")
// 在 Task 中使用
rdd.foreach(x => counter.add(1))
// 获取结果
println(counter.value)
5. 持久化策略
// MEMORY_ONLY:只存储在内存
rdd.persist(StorageLevel.MEMORY_ONLY)
// MEMORY_AND_DISK:内存不足时溢写到磁盘
rdd.persist(StorageLevel.MEMORY_AND_DISK)
// DISK_ONLY:只存储在磁盘
rdd.persist(StorageLevel.DISK_ONLY)
// MEMORY_ONLY_SER:序列化后存储在内存
rdd.persist(StorageLevel.MEMORY_ONLY_SER)
// OFF_HEAP:存储在堆外内存
rdd.persist(StorageLevel.OFF_HEAP)
6. 避免创建过多 RDD
// 避免:创建多个 RDD
val rdd1 = sc.textFile("data.txt")
val rdd2 = rdd1.filter(_.length > 10)
val rdd3 = rdd2.map(_.toUpperCase)
val rdd4 = rdd3.count()
// 推荐:链式调用
val count = sc.textFile("data.txt")
.filter(_.length > 10)
.map(_.toUpperCase)
.count()
7. 使用 DataFrame/Dataset
// 使用 DataFrame(Catalyst 优化)
val df = spark.read.json("data.json")
val result = df.filter($"age" > 18)
.groupBy("department")
.count()
// 使用 Dataset(类型安全)
case class Person(name: String, age: Int)
val ds = spark.read.json("data.json").as[Person]
val filtered = ds.filter(_.age > 18)
Spark SQL 优化
Catalyst 优化器
Catalyst 是 Spark SQL 的查询优化器,提供以下优化:
- 逻辑优化:谓词下推、列剪裁、常量折叠
- 物理优化:选择最优的物理执行计划
- 代码生成:生成高效的 Java 字节码
谓词下推(Predicate Pushdown)
// 自动谓词下推
val df = spark.read.parquet("data.parquet")
val result = df.filter($"age" > 18)
.select("name", "email")
// 等价于
// SELECT name, email FROM data WHERE age > 18
列剪裁(Column Pruning)
// 只读取需要的列
val df = spark.read.parquet("data.parquet")
val result = df.select("name", "email")
// 只读取 name 和 email 列,不读取其他列
Join 优化
Broadcast Hash Join
// 小表广播
val smallDF = spark.read.parquet("small.parquet")
val largeDF = spark.read.parquet("large.parquet")
// 使用提示强制广播
val result = largeDF.join(broadcast(smallDF), "key")
Sort Merge Join
// 大表连接
val df1 = spark.read.parquet("large1.parquet")
val df2 = spark.read.parquet("large2.parquet")
// 自动选择 Sort Merge Join
val result = df1.join(df2, "key")
实战案例
案例1:词频统计
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("WordCount")
.setMaster("local[*]")
val sc = new SparkContext(conf)
// 读取文件
val textFile = sc.textFile("hdfs://path/to/input")
// 词频统计
val wordCounts = textFile
.flatMap(line => line.split("\\s+"))
.map(word => (word, 1))
.reduceByKey(_ + _)
.sortBy(_._2, ascending = false)
// 保存结果
wordCounts.saveAsTextFile("hdfs://path/to/output")
sc.stop()
}
}
案例2:用户行为分析
import org.apache.spark.sql.SparkSession
object UserBehaviorAnalysis {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("UserBehaviorAnalysis")
.getOrCreate()
import spark.implicits._
// 读取用户行为数据
val behaviorDF = spark.read.json("user_behavior.json")
// 计算每日活跃用户数
val dailyActiveUsers = behaviorDF
.groupBy($"date")
.agg(countDistinct("user_id").as("active_users"))
// 计算用户留存率
val retentionRate = behaviorDF
.filter($"date" === "2024-03-27")
.join(
behaviorDF.filter($"date" === "2024-03-28"),
"user_id"
)
.agg(count("user_id") / countDistinct("user_id"))
// 计算用户行为转化漏斗
val funnel = behaviorDF
.groupBy("user_id")
.agg(
sum(when($"action" === "view", 1).otherwise(0)).as("views"),
sum(when($"action" === "click", 1).otherwise(0)).as("clicks"),
sum(when($"action" === "purchase", 1).otherwise(0)).as("purchases")
)
.agg(
sum("views").as("total_views"),
sum("clicks").as("total_clicks"),
sum("purchases").as("total_purchases")
)
// 保存结果
dailyActiveUsers.write.parquet("output/daily_active_users")
retentionRate.write.parquet("output/retention_rate")
funnel.write.parquet("output/funnel")
spark.stop()
}
}
案例3:实时流处理
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
object RealTimeProcessing {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("RealTimeProcessing")
.getOrCreate()
import spark.implicits._
// 读取实时数据流
val streamDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
.select(from_json($"value", eventSchema).as("event"))
.select("event.*")
// 实时统计
val countByType = streamDF
.groupBy($"event_type")
.count()
// 输出结果
val query = countByType.writeStream
.outputMode("complete")
.format("console")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start()
query.awaitTermination()
}
}
最佳实践
- 合理设置并行度:根据集群资源和数据量设置合适的并行度
- 避免数据倾斜:通过加盐、扩容等方式解决数据倾斜问题
- 使用广播变量:对于小表使用广播变量减少 Shuffle
- 合理持久化:对需要多次使用的 RDD 进行持久化
- 监控和调优:使用 Spark UI 监控作业执行情况
- 使用 DataFrame/Dataset:优先使用 DataFrame/Dataset 获得更好的性能
- 资源隔离:使用动态资源分配和资源队列
总结
Spark 作为强大的分布式计算框架,其核心优势在于内存计算和 DAG 调度。通过深入理解 RDD 原理、DAG 调度、内存管理和 Shuffle 机制,我们可以编写出高效的 Spark 应用程序。
记住:
- RDD 是 Spark 的核心抽象,理解其五大特性至关重要
- DAG 调度器负责将作业划分为可执行的 Stage
- 内存管理是 Spark 高性能的关键
- Shuffle 是最昂贵的操作,需要重点优化
- 使用 DataFrame/Dataset 可以获得更好的性能