Spark 分布式计算原理

Spark 是当今最流行的分布式计算框架之一。本文将深入探讨 Spark 的核心架构、RDD 原理、DAG 调度、内存管理以及性能优化策略。

Spark 架构概览

Spark 采用主从架构,由以下几个核心组件组成:

Driver 程序

Driver 是 Spark 应用程序的控制中心,负责:

  • 创建 SparkContext,与集群管理器通信
  • 构建 DAG(有向无环图)
  • 将 DAG 拆分为 Stage
  • 调度 Task 到 Executor 执行
  • 监控 Task 执行状态

Executor(执行器)

Executor 是运行在 Worker 节点上的进程,负责:

  • 运行分配的 Task
  • 存储 RDD 的数据分区
  • 向 Driver 报告执行状态
  • 提供内存管理

集群管理器

Spark 支持多种集群管理器:

  • Standalone:Spark 自带的独立集群管理器
  • YARN:Hadoop 的资源管理器
  • Mesos:通用的集群管理器
  • Kubernetes:容器编排平台

执行流程

1. 用户提交 Spark 应用
2. Driver 启动,创建 SparkContext
3. Driver 向集群管理器申请资源
4. 集群管理器启动 Executor
5. Executor 向 Driver 注册
6. Driver 构建并调度 Task
7. Executor 执行 Task 并返回结果

RDD(弹性分布式数据集)

RDD(Resilient Distributed Dataset)是 Spark 最核心的抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。

RDD 的五大特性

  1. 分区列表:数据被分割成多个分区,每个分区是一个数据集的片段
  2. 计算函数:每个分区都有一个计算函数
  3. 依赖关系:RDD 保存对父 RDD 的依赖关系
  4. 分区器:可选的分区策略(如 HashPartitioner、RangePartitioner)
  5. 最佳位置:可选的数据本地性偏好

RDD 的创建方式

// 1. 从集合创建
val rdd1 = sc.parallelize(1 to 10)

// 2. 从外部存储创建
val rdd2 = sc.textFile("hdfs://path/to/file")

// 3. 从已有 RDD 转换
val rdd3 = rdd1.map(_ * 2)

RDD 的操作类型

Transformation(转换操作)

Transformation 是懒执行的,不会立即触发计算,只是记录 DAG:

// map:对每个元素应用函数
val rdd = sc.parallelize(1 to 5).map(_ * 2)

// filter:过滤元素
val filtered = rdd.filter(_ > 5)

// flatMap:将每个元素映射为多个元素
val words = sc.parallelize(List("hello world", "spark rdd"))
  .flatMap(_.split(" "))

// reduceByKey:按 Key 聚合
val pairs = sc.parallelize(List(("a", 1), ("b", 1), ("a", 1)))
val counts = pairs.reduceByKey(_ + _)

// join:连接两个 RDD
val rdd1 = sc.parallelize(List(("a", 1), ("b", 2)))
val rdd2 = sc.parallelize(List(("a", 3), ("b", 4)))
val joined = rdd1.join(rdd2)

Action(动作操作)

Action 会触发实际的计算:

// collect:将所有元素返回到 Driver
val result = rdd.collect()

// count:返回元素个数
val count = rdd.count()

// reduce:聚合元素
val sum = rdd.reduce(_ + _)

// take:返回前 N 个元素
val first10 = rdd.take(10)

// foreach:对每个元素执行操作
rdd.foreach(println)

// saveAsTextFile:保存到文件系统
rdd.saveAsTextFile("hdfs://path/to/output")

RDD 的依赖关系

窄依赖(Narrow Dependency)

父 RDD 的每个分区最多被子 RDD 的一个分区使用:

RDD A: [Partition 1] [Partition 2] [Partition 3]
           ↓         ↓         ↓
RDD B: [Partition 1] [Partition 2] [Partition 3]

窄依赖的算子:map、filter、union、flatMap 等

宽依赖(Wide Dependency)

父 RDD 的每个分区被子 RDD 的多个分区使用(Shuffle):

RDD A: [Partition 1] [Partition 2] [Partition 3]
           ↓    ↓    ↓    ↓    ↓    ↓
RDD B: [Partition 1] [Partition 2] [Partition 3]

宽依赖的算子:groupByKey、reduceByKey、join、sortByKey 等

RDD 的容错机制

RDD 通过 Lineage(血统)实现容错:

  • 记录依赖关系:RDD 记录了它如何从父 RDD 计算而来
  • 重新计算:当分区丢失时,根据依赖关系重新计算
  • 检查点:对于长 Lineage,可以设置检查点避免重新计算
// 设置检查点
sc.setCheckpointDir("hdfs://path/to/checkpoint")

// 对 RDD 进行检查点
rdd.checkpoint()

DAG 调度器

DAG 调度器负责将 RDD 的依赖关系转换为可执行的 Stage。

Stage 的划分

Stage 划分的基本规则:

  • 根据宽依赖划分 Stage
  • 每个 Stage 内部只包含窄依赖
  • Stage 之间通过 Shuffle 边界连接
Stage 1: map -> filter -> map
    ↓ (Shuffle)
Stage 2: groupByKey -> map
    ↓ (Shuffle)
Stage 3: reduceByKey

Task 调度

Task 调度器负责将 Stage 中的 Task 分配到 Executor:

  1. TaskSet 提交:DAG 调度器提交 TaskSet 给 Task 调度器
  2. 本地性调度:优先调度到数据所在的节点
  3. 推测执行:对于慢的 Task,启动备份 Task
  4. 结果收集:收集 Task 执行结果

数据本地性

Spark 优先将 Task 调度到数据所在的节点,减少网络传输:

  • PROCESS_LOCAL:数据和计算在同一个进程
  • NODE_LOCAL:数据和计算在同一个节点
  • RACK_LOCAL:数据和计算在同一个机架
  • ANY:数据可能在任意位置

内存管理

Spark 的内存管理是其高性能的关键之一。

堆内内存和堆外内存

堆内内存(Heap Memory)

  • 由 JVM 管理
  • 受 GC 影响
  • 存储对象和序列化数据

堆外内存(Off-Heap Memory)

  • 由 Spark 直接管理
  • 不受 GC 影响
  • 存储二进制数据
  • 通过 spark.memory.offHeap.enabled 启用

内存区域划分

graph TD
    Total[总内存]
    Exec[Execution Memory 执行内存]
    Stor[Storage Memory 存储内存]
    SJ[Shuffle Join]
    SS[Shuffle Sort]
    SA[Shuffle Aggregation]
    Cache[Cache]
    BV[Broadcast Variables]

    Total --> Exec
    Total --> Stor
    Exec --> SJ
    Exec --> SS
    Exec --> SA
    Stor --> Cache
    Stor --> BV

内存配置

# 执行内存占比
spark.memory.fraction 0.6

# 存储内存占比
spark.memory.storageFraction 0.5

# 堆外内存大小
spark.memory.offHeap.size 1g

# 堆外内存启用
spark.memory.offHeap.enabled true

内存溢出处理

1. 增加执行内存

spark.executor.memory 4g
spark.memory.fraction 0.8

2. 减少 Shuffle 数据量

// 在 Shuffle 前进行聚合
val reduced = rdd.map(x => (x._1, (x._2, 1)))
  .reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))

3. 使用广播变量

// 广播小表
val broadcastVar = sc.broadcast(smallData)
val result = largeData.map(x => (x, broadcastVar.value))

Shuffle 机制

Shuffle 是 Spark 中最昂贵的操作,涉及跨节点的数据重分布。

Shuffle 过程

Map Phase:
  1. Partitioning:根据分区器将数据分配到不同分区
  2. Sorting:对每个分区的数据进行排序
  3. Spilling:内存不足时溢写到磁盘

Reduce Phase:
  1. Fetching:从各个节点拉取数据
  2. Merging:合并拉取的数据
  3. Aggregation:执行聚合操作

Shuffle 实现方式

HashShuffle

  • Spark 1.2 之前的默认方式
  • 为每个 Reduce Task 创建一个文件
  • 文件数量:Map Task × Reduce Task
  • 优点:不需要排序
  • 缺点:文件数量过多

SortShuffle

  • Spark 1.2 之后的默认方式
  • 为每个 Map Task 创建一个数据文件和一个索引文件
  • 文件数量:2 × Map Task
  • 优点:文件数量少,支持排序
  • 缺点:需要排序

Tungsten SortShuffle

  • 使用堆外内存和二进制格式
  • 优化的排序算法
  • 减少序列化和反序列化开销

Shuffle 优化

# 启用 Tungsten Shuffle
spark.sql.shuffle.partitions 200

# 增加 Shuffle 缓冲区大小
spark.shuffle.file.buffer 64k

# 增加 Shuffle Sort 缓冲区大小
spark.shuffle.sort.initialBufferFactor 0.2

# 启用 Shuffle Spill 压缩
spark.shuffle.compress true

# 启用 Shuffle Spill 合并
spark.shuffle.spill.numElementsForceSpillThreshold 52428800

性能优化

1. 并行度优化

// 设置并行度
val rdd = sc.parallelize(1 to 1000000, 100)

// 重新分区
val repartitioned = rdd.repartition(100)

// 增加分区数
val increased = rdd.coalesce(200, shuffle = true)

2. 数据序列化

# 使用 Kryo 序列化
spark.serializer org.apache.spark.serializer.KryoSerializer

# 注册 Kryo 类
spark.kryo.registrationRequired true

# Kryo 缓冲区大小
spark.kryoserializer.buffer.max 512m

3. 广播变量

// 广播大变量
val lookupTable = sc.broadcast(Map(1 -> "a", 2 -> "b"))

// 在 Task 中使用
val result = rdd.map(x => lookupTable.value.getOrElse(x, "unknown"))

4. 累加器

// 创建累加器
val counter = sc.longAccumulator("counter")

// 在 Task 中使用
rdd.foreach(x => counter.add(1))

// 获取结果
println(counter.value)

5. 持久化策略

// MEMORY_ONLY:只存储在内存
rdd.persist(StorageLevel.MEMORY_ONLY)

// MEMORY_AND_DISK:内存不足时溢写到磁盘
rdd.persist(StorageLevel.MEMORY_AND_DISK)

// DISK_ONLY:只存储在磁盘
rdd.persist(StorageLevel.DISK_ONLY)

// MEMORY_ONLY_SER:序列化后存储在内存
rdd.persist(StorageLevel.MEMORY_ONLY_SER)

// OFF_HEAP:存储在堆外内存
rdd.persist(StorageLevel.OFF_HEAP)

6. 避免创建过多 RDD

// 避免:创建多个 RDD
val rdd1 = sc.textFile("data.txt")
val rdd2 = rdd1.filter(_.length > 10)
val rdd3 = rdd2.map(_.toUpperCase)
val rdd4 = rdd3.count()

// 推荐:链式调用
val count = sc.textFile("data.txt")
  .filter(_.length > 10)
  .map(_.toUpperCase)
  .count()

7. 使用 DataFrame/Dataset

// 使用 DataFrame(Catalyst 优化)
val df = spark.read.json("data.json")
val result = df.filter($"age" > 18)
  .groupBy("department")
  .count()

// 使用 Dataset(类型安全)
case class Person(name: String, age: Int)
val ds = spark.read.json("data.json").as[Person]
val filtered = ds.filter(_.age > 18)

Spark SQL 优化

Catalyst 优化器

Catalyst 是 Spark SQL 的查询优化器,提供以下优化:

  • 逻辑优化:谓词下推、列剪裁、常量折叠
  • 物理优化:选择最优的物理执行计划
  • 代码生成:生成高效的 Java 字节码

谓词下推(Predicate Pushdown)

// 自动谓词下推
val df = spark.read.parquet("data.parquet")
val result = df.filter($"age" > 18)
  .select("name", "email")

// 等价于
// SELECT name, email FROM data WHERE age > 18

列剪裁(Column Pruning)

// 只读取需要的列
val df = spark.read.parquet("data.parquet")
val result = df.select("name", "email")

// 只读取 name 和 email 列,不读取其他列

Join 优化

Broadcast Hash Join

// 小表广播
val smallDF = spark.read.parquet("small.parquet")
val largeDF = spark.read.parquet("large.parquet")

// 使用提示强制广播
val result = largeDF.join(broadcast(smallDF), "key")

Sort Merge Join

// 大表连接
val df1 = spark.read.parquet("large1.parquet")
val df2 = spark.read.parquet("large2.parquet")

// 自动选择 Sort Merge Join
val result = df1.join(df2, "key")

实战案例

案例1:词频统计

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("WordCount")
      .setMaster("local[*]")

    val sc = new SparkContext(conf)

    // 读取文件
    val textFile = sc.textFile("hdfs://path/to/input")

    // 词频统计
    val wordCounts = textFile
      .flatMap(line => line.split("\\s+"))
      .map(word => (word, 1))
      .reduceByKey(_ + _)
      .sortBy(_._2, ascending = false)

    // 保存结果
    wordCounts.saveAsTextFile("hdfs://path/to/output")

    sc.stop()
  }
}

案例2:用户行为分析

import org.apache.spark.sql.SparkSession

object UserBehaviorAnalysis {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("UserBehaviorAnalysis")
      .getOrCreate()

    import spark.implicits._

    // 读取用户行为数据
    val behaviorDF = spark.read.json("user_behavior.json")

    // 计算每日活跃用户数
    val dailyActiveUsers = behaviorDF
      .groupBy($"date")
      .agg(countDistinct("user_id").as("active_users"))

    // 计算用户留存率
    val retentionRate = behaviorDF
      .filter($"date" === "2024-03-27")
      .join(
        behaviorDF.filter($"date" === "2024-03-28"),
        "user_id"
      )
      .agg(count("user_id") / countDistinct("user_id"))

    // 计算用户行为转化漏斗
    val funnel = behaviorDF
      .groupBy("user_id")
      .agg(
        sum(when($"action" === "view", 1).otherwise(0)).as("views"),
        sum(when($"action" === "click", 1).otherwise(0)).as("clicks"),
        sum(when($"action" === "purchase", 1).otherwise(0)).as("purchases")
      )
      .agg(
        sum("views").as("total_views"),
        sum("clicks").as("total_clicks"),
        sum("purchases").as("total_purchases")
      )

    // 保存结果
    dailyActiveUsers.write.parquet("output/daily_active_users")
    retentionRate.write.parquet("output/retention_rate")
    funnel.write.parquet("output/funnel")

    spark.stop()
  }
}

案例3:实时流处理

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger

object RealTimeProcessing {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("RealTimeProcessing")
      .getOrCreate()

    import spark.implicits._

    // 读取实时数据流
    val streamDF = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "events")
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String]
      .select(from_json($"value", eventSchema).as("event"))
      .select("event.*")

    // 实时统计
    val countByType = streamDF
      .groupBy($"event_type")
      .count()

    // 输出结果
    val query = countByType.writeStream
      .outputMode("complete")
      .format("console")
      .trigger(Trigger.ProcessingTime("5 seconds"))
      .start()

    query.awaitTermination()
  }
}

最佳实践

  • 合理设置并行度:根据集群资源和数据量设置合适的并行度
  • 避免数据倾斜:通过加盐、扩容等方式解决数据倾斜问题
  • 使用广播变量:对于小表使用广播变量减少 Shuffle
  • 合理持久化:对需要多次使用的 RDD 进行持久化
  • 监控和调优:使用 Spark UI 监控作业执行情况
  • 使用 DataFrame/Dataset:优先使用 DataFrame/Dataset 获得更好的性能
  • 资源隔离:使用动态资源分配和资源队列

总结

Spark 作为强大的分布式计算框架,其核心优势在于内存计算和 DAG 调度。通过深入理解 RDD 原理、DAG 调度、内存管理和 Shuffle 机制,我们可以编写出高效的 Spark 应用程序。

记住:

  • RDD 是 Spark 的核心抽象,理解其五大特性至关重要
  • DAG 调度器负责将作业划分为可执行的 Stage
  • 内存管理是 Spark 高性能的关键
  • Shuffle 是最昂贵的操作,需要重点优化
  • 使用 DataFrame/Dataset 可以获得更好的性能