Spark
2025-09-28 11:11:21 0 举报
AI智能生成
spark
作者其他创作
大纲/内容
概述
Apache Spark 是专门为大规模数据处理而设计的快速通用的计算引擎
Spark 也被称为基于内存的分布式计算框架
是一种类似Hadoop MapReduce 的通用并行计算框架
中间输出结果可以缓存在内存
运行架构
概述
Spark 框架的核心是一个计算引擎,整体来说,它采用了标准的 Master-Slave 结构
无论什么运行模式都会存在这些角色,只是在不同的运行模式下,这些角色的分布会有所不同
模式
单机
运行在一台机器上,称为本地(单机)模式
集群
使用 Spark 自带的资源调度系统,称为 Spark Standalone 模式
以 YARN 或 Mesos 作为底层资源调度系统以分布式的方式在集群中运行,称为
Spark On YARN 模式或 Spark On Mesos 模式
Spark On YARN 模式或 Spark On Mesos 模式
用户在提交任务给 Spark 时,可通过以下参数决定 Spark 的运行方式。而集群模式又根据 Driver 运行在哪被划分为
Client 模式和 Cluster 模式
Client 模式和 Cluster 模式
--master :决定了 Spark 任务提交给哪种模式处理,默认为 local
--deploy-mode :决定了 Driver 的运行方式(运行在哪里),可选值为 client 或 cluster,默认为 client
容器
K8s 的运行模式
...
云
....
运行流程
通用运行流程
资源申请粒度
粗粒度资源申请(Spark)
Spark 会在Application 执行之前,将所有的资源申请完毕,当资源申请成功后,才会进行任务的调度,当所有的
Task 执行完成后,才会释放这部分资源
Task 执行完成后,才会释放这部分资源
优点:在 Application 执行之前,所有的资源都申请完毕,每一个 Task 直接使用资源就可以了,不需要 Task 在
执行前自己去申请资源
执行前自己去申请资源
缺点:直到最后一个Task 执行完成才会释放资源,集群的资源无法被充分利用
细粒度资源申请(Hadoop MapReduce)
MapReduce 在 Application 执行之前不需要去提前申请资源,而是直接执行,
让 Job 中的每一个 Task 在执行前自己去申请资源,Task 执行完成就立刻释放资源
让 Job 中的每一个 Task 在执行前自己去申请资源,Task 执行完成就立刻释放资源
优点:集群的资源可以充分利用
缺点:Task自己去申请资源,Task启动变慢,Application也会变慢
Local 本地模式
所谓的 Local 模式,就是不需要其他任何节点资源就可以在本地执行 Spark 代码的环境,一般用于教学,调试,演示等。
本地模式就是一个独立的进程,通过其内部的多个线程来模拟整个 Spark 运行时环境,每个线程代表一个 Worker
本地模式就是一个独立的进程,通过其内部的多个线程来模拟整个 Spark 运行时环境,每个线程代表一个 Worker
Local 模式在运行时又分为:
Local 和Local Cluster 。
Local :最简单的本地模式。
Local Cluster :本地伪分布式模式
Local 和Local Cluster 。
Local :最简单的本地模式。
Local Cluster :本地伪分布式模式
Standalone 独立模式
独立模式属于自己独立一套集群(Client/Master/Worker),是 Spark 原生的集群管理器,自带完整的服务,可单独部
署,无需依赖任何其他资源管理系统。使用 Standalone 可以很方便地搭建一个集群,一般在公司内部没有搭建其他资源管
理框架的时候才会使用
署,无需依赖任何其他资源管理系统。使用 Standalone 可以很方便地搭建一个集群,一般在公司内部没有搭建其他资源管
理框架的时候才会使用
缺点:资源不利于充分利用
Standalone Client
在 Standalone Client 模式下,Driver 在提交应用程序的当前节点运行
如果在 Master 上提交应用,那么 Driver 就在 Master;如果在 Worker 上提交应用,那么
Driver 就在 Worker
Driver 就在 Worker
Standalone Cluster
在 Standalone Cluster 模式下,任务提交后,Master 会找到一个 Worker 启动 Driver 进程
YARN 模式
YARN 是 Hadoop 的资源调度框架,Spark 也可以基于 YARN 来计算(将 Spark 应用提交到 YARN 上运行)。Spark 客户
端直接连接 YARN,不需要额外构建 Spark 集群,国内使用较多。Spark 中的各个角色运行在 YARN 的容器内部,并组成
Spark 集群环境
端直接连接 YARN,不需要额外构建 Spark 集群,国内使用较多。Spark 中的各个角色运行在 YARN 的容器内部,并组成
Spark 集群环境
任何框架与 YARN 结合,都必须遵循 YARN 的开发模式。
在 Spark On Yarn 模式下,Executor 进程名称为 CoarseGrainedExecutorBackend。一个 CoarseGrainedExecutorBackend
有且仅有一个 Executor 对象, 负责将 Task 包装成 taskRunner,并从线程池中抽取一个空闲线程运行 Task,每一个
CoarseGrainedExecutorBackend 能并行运行 Task 的数量取决于分配给它的 CPU 个数
有且仅有一个 Executor 对象, 负责将 Task 包装成 taskRunner,并从线程池中抽取一个空闲线程运行 Task,每一个
CoarseGrainedExecutorBackend 能并行运行 Task 的数量取决于分配给它的 CPU 个数
Spark On Yarn分为
YARN Client
该模式提交应用程序后,会在客户端启动Driver 进程,适用于测试调试程序。这里的客户端就是指提交应用程序的当前节点。
生产环境下不能使用client 模式,因为 Driver 会和 Executor 通信来调度他们工作,
也就是说 Client 不能离开,所以该 JVM 进程直到 Spark Application 计算完并返回结果后才能退出
生产环境下不能使用client 模式,因为 Driver 会和 Executor 通信来调度他们工作,
也就是说 Client 不能离开,所以该 JVM 进程直到 Spark Application 计算完并返回结果后才能退出
ApplicationMaster 在此种模式下没有作业调度的功能
YARN cluster
该模式提交应用程序后,会向Master (ResourceManager)请求启动
AM 中启动Driver 进程,适用于生产环境
AM 中启动Driver 进程,适用于生产环境
YARN Client
在 YARN Client 模式下,Driver 在应用提交的本地机器上运行
ApplicationMaster 在 YARN Client 模式下没有作业调度的功能。因为该模式下,Application 的注册与 Job
和 Stoge 的划分以及 Task 的创建、分配和调度都是 Driver 负责的,而 ApplicationMaster(ExecutorLauncher ) 只负责了
Executor 资源的申请。原因就是因为在 YARN Client 模式下,Driver 在应用提交的本地机器上运行
和 Stoge 的划分以及 Task 的创建、分配和调度都是 Driver 负责的,而 ApplicationMaster(ExecutorLauncher ) 只负责了
Executor 资源的申请。原因就是因为在 YARN Client 模式下,Driver 在应用提交的本地机器上运行
YARN cluster
在 YARN Cluster 模式下,Driver 运行在 ApplicationMaster 中
在 YARN-Cluster 模式中,当用户向 YARN 中提交一个应用程序后,YARN 会分两个阶段运行该应用程序
第一个阶段是把 Spark 的 Driver 作为一个 ApplicationMaster 在 YARN 集群中先启动
第二个阶段是由 ApplicationMaster 创建应用程序,然后为它向 ResourceManager 申请资源,并启动 Executor 来运行
Task,同时监控它的整个运行过程,直到运行完成
Task,同时监控它的整个运行过程,直到运行完成
核心编程
Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景
RDD:弹性分布式数据集
广播变量:分布式共享只读变量
累加器:分布式共享只写变量
RDD
RDD概述
RDD 是 Resilient Distributed Dataset 的缩写,意思为弹性分布式数据集(一种数据结构),是一个读取分区记录的集
合,是 Spark 对需要处理的数据的基本抽象
合,是 Spark 对需要处理的数据的基本抽象
RDD 有强大的血统(Lineage)特性,RDD 的每次转换都会生成一个新的 RDD,所以 RDD 之间就会形成类似于流水线一样的前后依赖关系。
每一个 RDD 都可以根据其依赖关系一级一级向前回溯重新计算,这便是 Spark 实现容错的一种手段,如果某个 RDD 丢失了,则可以根据血缘关系,
从父 RDD 计算得来
每一个 RDD 都可以根据其依赖关系一级一级向前回溯重新计算,这便是 Spark 实现容错的一种手段,如果某个 RDD 丢失了,则可以根据血缘关系,
从父 RDD 计算得来
创建
通过加载外部物理存储(如HDFS)的数据集,或Application中定义的对象集合(如 List)来创建
RDD被创建后不可改变
RDD被创建后不可改变
转换
对已有的 RDD 中的数据执行计算并进行转换,从而产生新的 RDD,在这个过程中有时会产生中间 RDD。
Spark 对于 Transformation 采用惰性计算机制,遇到 Transformation 时并不会立即计算结果,而是要等遇到 Action 时才会一起执行
Spark 对于 Transformation 采用惰性计算机制,遇到 Transformation 时并不会立即计算结果,而是要等遇到 Action 时才会一起执行
行动
对已有的 RDD 中的数据执行计算后产生结果,将结果返回 Driver 程序或写入到外部物理存储。
在Action 过程中同样有可能产生中间 RDD
在Action 过程中同样有可能产生中间 RDD
RDD总结
RDD 是 Resilient Distributed Dataset 的缩写,意思为弹性分布式数据集(一种数据结构),是一个读取分区记录的集合,
是 Spark 对需要处理的数据的基本抽象。源码中是一个抽象类,代表一系列弹性的、不可变、可分区、里面元素可并行计算的集合
是 Spark 对需要处理的数据的基本抽象。源码中是一个抽象类,代表一系列弹性的、不可变、可分区、里面元素可并行计算的集合
弹性
弹性存储:内存和磁盘自动切换
弹性容错:数据丢失可自动恢复
弹性计算:计算出错重试机制
弹性分片:可根据需求重新分片
分布式
数据存储在大数据集群不同节点上
数据集
RDD只是封装了计算逻辑,不保存数据
数据抽象
RDD是一个抽象类,需要子类具体实现
不可变
RDD封装了计算逻辑,是不可变的,要想改变,只能生成新的RDD,在新的RDD封装新的计算逻辑
可分区
RDD 是一种分布式的数据集,由于数据量很大,因此计算时要被切分并存储在各个结点的分区当中
并行关系
一个分区对应一个任务。分区是 Spark 计算任务的基本处理单位,决定了并行计算的粒度
依赖关系
如果某个 RDD 丢失了,则可以根据血缘关系,从父 RDD 计算得来
惰性执行
Spark 对于 Transformation 转换算子采用惰性计算机制,遇到 Transformation 时并不会立即计算结果,而
是要等遇到 Action 行动算子时才会一起执行
是要等遇到 Action 行动算子时才会一起执行
RDD五大属性
A list of partitions:分区列表。RDD 是由一系列的Partition 组成的,主要用于并行计算
A function for computing each split:分区计算函数。每个分区都有一个计算函数,将数据和计算逻辑关联起来
A list of dependencies on other RDDs:RDD 之间有一系列的依赖关系
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned):
可选的,针对于 K, V 类型的RDD 才具有这个特性,作用是决定了数据的来源以及数据处理后的去向
可选的,针对于 K, V 类型的RDD 才具有这个特性,作用是决定了数据的来源以及数据处理后的去向
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file):计算向数据靠拢,移动数据不如移动计算
Partition(分区)
一个 Partition 对应一个 Task。Partition 是 Spark 计算任务的基本处理单位,决定了并行计算的粒度,而 Partition 中
的每一条 Record 为基本处理对象。例如对某个 RDD 进行 map 操作,在具体执行时是由多个并行的 Task 对各自分区的每
一条记录进行 map 映射
的每一条 Record 为基本处理对象。例如对某个 RDD 进行 map 操作,在具体执行时是由多个并行的 Task 对各自分区的每
一条记录进行 map 映射
集合的分区处理(通过集合构建 RDD 时)
文件的分区处理(通过文件构建 RDD 时)
支持重分区的算子(通过算子的numPartitions: Int 参数重新分区)
分区器的分区处理(通过partitionBy 算子重新分区)
HashPartitioner(哈希分区)
RangePartitioner(范围分区)
自定义分区
算子
算子概述
Spark 记录了 RDD 之间的生成和依赖关系。但是只有当进行行动操作时,Spark 才会根据 RDD 的依赖关系生成 DAG,
并从起点开始真正的计算
并从起点开始真正的计算
算子是一个函数空间到另一个函数空间的映射。广义的讲,对任何函数进行某一项操作都可以认为是一个算子。
在认知心理学领域中,人在解决问题时要利用各种算子来改变问题的起始状态,经过各种中间状态,逐步达到目标状
态,从而解决问题。解决问题中的种种操作被称为算子(Operator)
在认知心理学领域中,人在解决问题时要利用各种算子来改变问题的起始状态,经过各种中间状态,逐步达到目标状
态,从而解决问题。解决问题中的种种操作被称为算子(Operator)
Transformation:转换算子(本质就是函数)。转换往往是从一个 RDD 到另一个 RDD 的计算。在执行应用的程序时,
遇到转换算子,并不会立即触发计算操作,而是延时到遇到 Action 算子时才会操作。如 map、flatMap、filter、
sortByKey、reduceByKey 等
遇到转换算子,并不会立即触发计算操作,而是延时到遇到 Action 算子时才会操作。如 map、flatMap、filter、
sortByKey、reduceByKey 等
Action:行动算子(本质就是函数)。一个行动往往代表一种输出到外部系统的操作。在执行应用的程序时,遇到行
动算子,会立即产生一个 Job,对已有的 RDD 中的数据执行计算后产生结果,将结果返回 Driver 程序或写入到外部物
理存储。如 count、take、first、foreach、collect 等
动算子,会立即产生一个 Job,对已有的 RDD 中的数据执行计算后产生结果,将结果返回 Driver 程序或写入到外部物
理存储。如 count、take、first、foreach、collect 等
控制算子,可以将RDD 持久化,持久化的单位是Partition
cache :保存到内存,效率高,数据不安全容易丢失
persist :保存到磁盘(临时文件,作业结束后会删除),效率低,数据安全
checkpoint :保存到磁盘(永久保存,一般存储在分布式文件系统中,例如 HDFS),效率低,数据安全
cache 和persist 都是懒执行的,必须由一个到磁盘,还能切断RDD 之间的依赖关系
RDD 根据数据处理方式的不同将算子分为单 Value 类型、双 Value 类型和 Key-Value 类型
转换算子(单 Value)
转换算子简单的理解就是:功能的补充与封装,将旧的 RDD 转换为 新的 RDD
map
将处理的数据逐条进行映射转换,将返回值构成新的 RDD
mapPartitions & mapPartitionsWithIndex(慎用)
mapPartitions 是以分区为单位进行数据转换操作,会将整个分区的数据加载到内存。处理完的数据不会立刻释放,因
为存在引用关系,所以在内存较小,数据量较大的情况下,容易出现 OOM 内存溢出
为存在引用关系,所以在内存较小,数据量较大的情况下,容易出现 OOM 内存溢出
flatMap
先 map,然后再 flatten,,就是首先将函数作用于集合中的每个元素,然后将结果展平,返回新的集合
glom
将同一个分区的多个单个数据直接转换为相同类型的单个数组进行处理,适用于以分区为单位的数据统计
groupBy(shuffle)
分组是指将数据按指定条件进行分组,从而方便我们对数据进行统计分析。按照传入函数的返回值进行分组,将相同
的 Key 对应的值放入一个迭代器中。使用 groupBy 算子后数据会被打乱重新组合,我们将这样的操作称之为 Shuffle
的 Key 对应的值放入一个迭代器中。使用 groupBy 算子后数据会被打乱重新组合,我们将这样的操作称之为 Shuffle
注意:相同组的数据会在一个分区中,一个分区中可能会有多个组
filter
过滤是指过滤出符合一定条件的元素。将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数
据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜
据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜
sample
采样就是从大量的数据中获取少量的数据,获取的方法可以依据某种策略,得到的数据用于分析,企图使用少量数据
的分析结果代替全局
的分析结果代替全局
distinct(shuffle)
将数据集中重复的数据去重。Scala 的 distinct 底层采用了 HashSet 的方式,而 Spark 的 distinct 则是采用了 map
reduceByKey map 的方式进行去重
reduceByKey map 的方式进行去重
coalesce(shuffle)
缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。(可以简单地理解为合并分区,可能导致数据倾斜)。
当 Spark 程序中存在过多小任务时,可以通过 coalesce 方法合并分区,减少分区的个数,减小任务调度成本
当 Spark 程序中存在过多小任务时,可以通过 coalesce 方法合并分区,减少分区的个数,减小任务调度成本
coalesce 函数除了可以缩减分区数,还可以扩增分区数,但是扩增分区数时一定要设置 shuffle 为 true,否则不起作
用
用
repartition(shuffle)
重新分配分区数,一定会产生 Shuffle 操作,底层就是调用了
coalesce
coalesce
sortBy(shuffle)
sortBy 函数可以根据指定的规则对数据源中的数据进行排序,默认为升序,该函数会产生 Shuffle 操作
转换算子(双 Value)
并集/交集/差集/笛卡尔积
操作数据时,我们可能会遇到求并集、交集、差集的需求,这时就要用到
union
intersection
subtract
这些方法了。值得注意的是,两个 RDD 的元素类型必须相同,否则编译不通过
union
intersection
subtract
这些方法了。值得注意的是,两个 RDD 的元素类型必须相同,否则编译不通过
union : 表示对两个 RDD 进行并集(合并)操作,且不去重
intersection :表示对两个 RDD 取交集(相同)
subtract :表示对两个 RDD 取差集(不同)
cartesian :表示对两个 RDD 进行笛卡尔积操作,尽量避免使用
拉链 zip
将两个 RDD 合并成一个 RDD,两个 RDD 的 Partition 数量以及元素数量都必须相同,否则会抛出异常
转换算子(Key-Value)
partitionBy(shuffle)
将数据按照指定分区数重新进行分区,Spark 默认采用 HashPartitioner
sortByKey(shuffle)
将 K, V 格式数据的 Key 根据指定的规则进行排序,默认为升序。如果 Key 是元组,如 (x1, x2, x3, ...),会先按照 x1 排
序,若 x1 相同,再按 x2 排序,依次类推。sortByKey 同样是来自 PairRDDFunctions 的函数
序,若 x1 相同,再按 x2 排序,依次类推。sortByKey 同样是来自 PairRDDFunctions 的函数
reduceByKey(shuffle)
将相同 Key 的值聚合到一起,Reduce 任务的个数可以通过 numPartitions 参数来设置。reduceByKey 同样是来自
PairRDDFunctions 的函数
PairRDDFunctions 的函数
groupByKey(shuffle)
按 K, V 格式数据的 Key 进行分组,会返回 (K, Iterable[V]) 格式数据。groupByKey 同样是来自
PairRDDFunctions 的函数
PairRDDFunctions 的函数
reduceByKey 和 groupByKey 的区别
从 Shuffle 的角度:reduceByKey 和 groupByKey 都存在 Shuffle 的操作,但是 reduceByKey 可以在 Shuffle 前对分区内相
同 Key 的数据进行预聚合(combine)操作,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量
减少的问题,reduceByKey 性能比较高
同 Key 的数据进行预聚合(combine)操作,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量
减少的问题,reduceByKey 性能比较高
从功能的角度:reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合
下,推荐使用 reduceByKey。如果仅仅只是分组而不聚合,那么还是只能使用 groupByKey
下,推荐使用 reduceByKey。如果仅仅只是分组而不聚合,那么还是只能使用 groupByKey
reduceByKey:先在分区内进行预聚合(Shuffle 前),再将所有分区的数据按 Key 进行分组并聚合
aggregateByKey(shuffle)
无需搭配其他算子就可以实现分区内和分区间不同的计算逻辑。aggregateByKey 同样是来自PairRDDFunctions 的函数
mapValues
针对于 K, V 形式的数据只对 V 进行操作
foldByKey(shuffle)
当分区内和分区间计算逻辑相同时,Spark 为了让程序员更方便的使用,提供了 foldByKey 算子。foldByKey 同样是来
自 PairRDDFunctions 的函数
自 PairRDDFunctions 的函数
combineByKey(shuffle)
aggregateByKey 算子和 foldByKey 算子都是给每个分区的第一个 Key 的 Value 一个初始值,如果想要再灵活点,可以使用 combineByKey 算子,因为它的第一个参数是给每个分区的第一个 Key 的 Value 一个初始值规则(使用createCombiner 函数来初始化第一个 Key 的初始值)。combineByKey 同样是来自 PairRDDFunctions 的函数
cogroup
两个不同数据源中,先按 Key 分组,然后再将 V 使用迭代器连接在一起
join
在类型为 K, V 和 K, W 的 RDD 上调用,返回一个相同 Key 对应的所有元素对在一起的 K, (V, W) 的 RDD(简单的理解就是将相同 Key 的数据连接在一起,)
两个不同数据源中,相同的 Key 的 Value 会连接在一起,形成元组
两个不同数据源中,没有被匹配到的 Key 不会被连接(不会出现在结果中)
两个不同数据源中,如果出现了多个相同的 Key,则会产生笛卡尔积,工作中谨慎使用
当两个 RDD 分区数和分区方式一样时做 join 就不会产生 Shuffle
聚合算子之间的区别
reduceByKey
传入的值是什么就返回什么
groupByKey
传入的值是什么就返回什么(做了压缩处理使用CompactBuffer 来存储)
aggregateByKey
如果有初始值,给每个分区的 Key 的 Value 一个初始值,否则使用 Key 的 Value 值
foldByKey
如果有初始值,给每个分区的 Key 的 Value 一个初始值,否则使用 Key 的 Value 值
combineByKey
createCombiner: V => C,给每个分区的 Key 的 Value 一个初始值规则
行动算子
行动算子简单的理解就是:生成作业并触发任务的调度和执行
reduce
通过函数聚合 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据
count
返回 RDD 中元素的个数
take
返回 RDD 的前 n 个元素组成的数组
takeOrdered
返回 RDD 排序后的前 n 个元素组成的数组,默认正序
first
返回 RDD 中的第一个元素,底层就是take(1)
foreach
循环遍历数据集中的每个元素,运行相应的计算逻辑(函数)
foreachPartition
按分区循环遍历数据集中的每个元素,并运行相应的计算逻辑(函数)
collect
将不同分区的数据收集到 Driver 并以数组的形式返回数据
aggregate
aggregate 函数会将每个分区里面的元素通过 seqOp 函数和初始值进行聚合,然后用 combOp 函数将每个分区的结果
和初始值(zeroValue)进行 combine 操作。这个函数最终返回的类型不需要和 RDD 中元素类型一致
和初始值(zeroValue)进行 combine 操作。这个函数最终返回的类型不需要和 RDD 中元素类型一致
fold
当 aggregate 的分区内和分区间计算逻辑相同时,Spark 为了让程序员更方便的使用,提供了 fold 算子
countByValue & countByKey
针对 K, V 类型的 RDD,返回一个 K, Int 的 map,表示每一个 Key 对应的元素个数。countByValue 底层调用的是
map().countByKey()。countByKey 底层调用的是 reduceByKey
map().countByKey()。countByKey 底层调用的是 reduceByKey
save 系列
saveAsTextFile
将数据集的元素以 TextFile 的形式保存到 HDFS 文件系统或者其他文件系统,对于每个元素,Spark 将
会调用 toString 方法,将它装换为文件中的文本
会调用 toString 方法,将它装换为文件中的文本
saveAsSequenceFile
将数据集中的元素以 Hadoop SequenceFile 的格式保存到 HDFS 文件系统或者其他文件系统
saveAsObjectFile
将 RDD 中的元素序列化成对象,存储到文件中
闭包检测
继承scala.Serializable
使用样例类(推荐),样例类默认继承了 Serializable
Kryo 序列化框架
由于 Java 自身的序列化比较重(字节多),所以出于性能的考虑,Spark 2.0 开始支持另外一种序列化机制 Kryo。
Kryo 的速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark
内部使用 Kryo 来序列化
Kryo 的速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark
内部使用 Kryo 来序列化
注意:使用 Kryo 序列化,也需要继承 Serializable 接口
血统 & 依赖关系
血统
Spark 根据用户 Application 中的 RDD 的转换算子和行动算子,会生成 RDD 之间的依赖关系,多个 RDD 之间的关系又
形成一条关系链叫做 RDD 的血统(Lineage)
形成一条关系链叫做 RDD 的血统(Lineage)
RDD 的每次转换都会生成一个新的 RDD,所以 RDD 之间就会形成类似于流水线一样的前后依赖关系。在部分
分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据,而不是对 RDD 的所有分区进行重新计算
分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据,而不是对 RDD 的所有分区进行重新计算
依赖关系
RDD 之间的依赖关系又分为窄依赖(Narrow Dependency)和宽依赖(Wide Dependency)
窄依赖指的是父RDD 和子RDD 的Partition 之间的关系是一对一的(独生子女)。如 map、filter 等
宽依赖指的是父 RDD 与子 RDD 的 Partition 之间的关系是一对多 (多胎),宽依赖会有 shuffle 的产生,如reduceByKey、groupByKey等
Stage 阶段
从字面上理解宽窄依赖并不难,关键是它有什么用呢?其实宽窄依赖是为了划分任务阶段 Stage 用的
每个 Job 会被拆分成多个 Task,作为一个 TaskSet 任务集,其名称为 Stage,Stage 的划分和调度是由 DAGScheduler
来负责的。Stage 的切割规则为:从后往前,遇到宽依赖就切割Stage
来负责的。Stage 的切割规则为:从后往前,遇到宽依赖就切割Stage
因为窄依赖是一对一的关系,所以无需等他其他分区数据,即可继续往下执行,所以一般会划分在
一个阶段中。直到遇见宽依赖,此时数据会被打乱重组(Shuffle),需要开始一个新的阶段。Spark 中 Stage 的划分是根
据 Shuffle 来划分的,所以 Stage 的边缘就是产生 Shuffle 的时候
一个阶段中。直到遇见宽依赖,此时数据会被打乱重组(Shuffle),需要开始一个新的阶段。Spark 中 Stage 的划分是根
据 Shuffle 来划分的,所以 Stage 的边缘就是产生 Shuffle 的时候
从后向前推理,遇到宽依赖就断开,遇到窄依赖就把当前的 RDD 加入到 Stage 中
默认情况下每个 Stage 里面的 Task 的数量是由该 Stage 中最后一个 RDD 的 Partition 数量决定的(一个 Partition 对应一个 Task)
最后一个 Stage(ResultStage) 里面的任务类型是 ResultTask,前面所有其他 Stage(ShuffleMapStage) 里面的任务类型都是ShuffleMapTask
代表当前 Stage 的算子一定是该 Stage 的最后一个计算步骤
Job
Job 包含了多个 Task 组成的并行计算,往往由 Spark Action 算子触发生成, 一个 Application 中往往会产生多个Job。
一个Job 包含 N 个 Transformation 算子和 1 个 Action 算子
一个Job 包含 N 个 Transformation 算子和 1 个 Action 算子
Application:初始化一个 SparkContext 即生成一个 Application
Job 是 Application 的子集,以 Spark Action 算子为界,遇到一个 Action 算子就触发一个 Job
Stage 是 Job 的子集,以 RDD 宽依赖(即 Shuffle)为界,遇到 Shuffle 就做一次划分
Task 是 Stage 的子集,以并行度(分区数)来衡量,分区数是多少,就有多少个 Task
提示:Spark 中所谓的并行度是指 RDD 中的分区数,即 RDD 中的 Task 数
Application → Job → Stage → Task,每一层都是 1 对 N 的关系
每个 Job 会被拆分成多个 Task,作为一个 TaskSet 任务集,其名称为 Stage,Stage 的划分和调度是由 DAGScheduler
来负责的
来负责的
SparkContext 分析用户提交的应用,根据 RDD 的依赖关系建立 DAG(Directed Acyclic Graph 有向无环图),且将 DAG
划分为不同的 Stage,其划分 Stage 的依据是根据 RDD 的依赖关系找出开销最小的调度方法。最后提交 Stage 给
TaskScheduler
划分为不同的 Stage,其划分 Stage 的依据是根据 RDD 的依赖关系找出开销最小的调度方法。最后提交 Stage 给
TaskScheduler
DAGScheduler 将划分完成的 Task 提交到 TaskScheduler,TaskScheduler 通过 Cluster Manager 在集群中的某个 Worker
的 Executor 上启动任务,实现类为 TaskSchedulerImpl
的 Executor 上启动任务,实现类为 TaskSchedulerImpl
TaskScheduler 维护了所有 TaskSet,当 Executor 向 Driver 发生心跳时,TaskScheduler 会根据资源剩余情况分配相应
的 Task 到 Executor 中。另外 TaskScheduler 还维护着所有 Task 的运行标签,以及重试失败的 Task
的 Task 到 Executor 中。另外 TaskScheduler 还维护着所有 Task 的运行标签,以及重试失败的 Task
从数据源(可以是本地 File,内存数据结构,HDFS,HBase 等)读取数据并创建最初的 RDD
对 RDD 进行一系列的 transformation() 操作,每一个 transformation() 会产生一个或多个包含不同类型 T 的 RDD[T]。T
可以是 Scala 里面的基本类型或数据结构,不限于 (K, V)。但如果是 (K, V),K 不能是 Array 等复杂类型(因为难以在复
杂类型上定义 Partition 函数)
可以是 Scala 里面的基本类型或数据结构,不限于 (K, V)。但如果是 (K, V),K 不能是 Array 等复杂类型(因为难以在复
杂类型上定义 Partition 函数)
对最后的 final RDD 进行 action() 操作,每个 Partition 计算后产生结果 Result
将 Result 回送到 Driver 端,进行最后的计算。RDD 可以被 cache 到内存或者 checkpoint 到磁盘上。RDD 中的 Partition
个数不固定,通常由用户设定。RDD 和 RDD 之间 Partition 的依赖关系可以是一对一,也可以是多对多
个数不固定,通常由用户设定。RDD 和 RDD 之间 Partition 的依赖关系可以是一对一,也可以是多对多
广播变量
广播变量的优点:不是每个 Task 一份变量副本,而是每个节点的 Executor 一份副本。这样的话,就可以让变量产生
的副本数大大减少。变量一旦被定义为一个广播变量,那么这个变量只能读取,不能修改
的副本数大大减少。变量一旦被定义为一个广播变量,那么这个变量只能读取,不能修改
注意
能不能将一个 RDD 使用广播变量广播出去?不能,因为 RDD 是不存储数据的。可以将 RDD 的结果广播出去
广播变量只能在 Driver 端定义,不能在 Executor 端定义
在 Driver 端可以修改广播变量的值,在 Executor 端无法修改广播变量的值
如果 Executor 端用到了 Driver 端的变量,不使用广播变量,在 Executor 有多少 Task 就有多少 Driver 端的变量副本
如果 Executor 端用到了 Driver 端的变量,使用广播变量,在每个 Executor 中只有一份 Driver 端的变量副本
累加器
变量在分布式运行时只在每个 Task 中进行了改变,改变的只是原始变量的一个副
本,并不能改变 Driver 端原始变量的值,但是当这个变量被声明为累加器后,该变量就会拥有分布式计数的功能
本,并不能改变 Driver 端原始变量的值,但是当这个变量被声明为累加器后,该变量就会拥有分布式计数的功能
累加器在 Driver 端定义并赋初始值,累加器只能在 Driver 端读取最后的值,在 Excutor 端更新
自定义累加器
看需求
0 条评论
下一页