Spark-Core总结
2023-03-27 15:38:05 0 举报
AI智能生成
登录查看完整内容
对spqrk的几种模式进行详细总结,根据本人开发经验对重点部分有所侧重
作者其他创作
大纲/内容
spark架构
client:该模式提交应用程序后,会在客户端启动Driver进程,适用于测试调试程序。这里的客户端就是指提交应用程序的当前节点,生产环境下不能用clientcluster:该模式提交应用程序后,会向Master请求启动ApplicationMaster,然后再AM中启动Driver进程,适用于生产环境。
Spark On Yarn
当 MapTask 执行完成后,会将 Task 的执行情况和磁盘小文件的地址封装到 MapStatus 对象中,通过MapOutputTrackerWorker 对象向 Driver 中的 MapOutputTrackerMaster 汇报;在所有的 MapTask 执行完毕后, Driver 中就掌握了所有的磁盘小文件的地址;在 ReduceTask 执行之前,会通过 Excutor 中 MapOutPutTrackerWorker 向 Driver 端的MapOutputTrackerMaster 获取磁盘小文件的地址;获取到磁盘小文件的地址后,会通过 BlockManager 中的 ConnectionManager 连接数据所在节点上的ConnectionManager ,然后通过 BlockTransferService 进行数据的传输。BlockTransferService 默认启动 5 个 Task 去节点拉取数据。默认情况下, 5 个 Task 拉取数据量不能超过48M 。
Shuffle文件寻址
Spark是专门为大规模数据处理而设计的快速通用的计算引擎。是一种类似Mapreduce的并行计算框架,它拥有MapReduce所有具有的优点,但不同于MapReduce的是job的中间输出结果可以缓存在内存中,从而不再需要读写HDFS,减少磁盘数据交互。Spark也被称为基于内存的分布式计算框架。
简介
Spark主要用于大数据的计算,而Hadoop由于计算方面采用MapReduce的方式,多次反复读写磁盘,使得速度远远不如Spark快,所以Spark和Hadoop的根本差异是多个作业之间的数据通信问题:Spark多个作业之间数据通信是基于内存,而MR是基于磁盘。
Spark vs MapReduce
1、快:Spark基于内存的运算要比MR的运算快100倍以上,其实现了高效的DAG执行引擎。2、易用:Spark支持多种语言,以及多种交互方式3、通用:Spark提供了统一的解决方案,可用于批处理、交互式查询、实时流处理、机器学期等。4、兼容性:Spark可以非常方便的与其它开源产品进行融合。
特点
概述
RDD是弹性分布式数据集(一种数据结构),是一个读取分区记录的集合,是Spark对需要处理的数据的基本抽象。源码中是一个抽象类,代表一系列弹性的、不可变、可分区、里面元素可并行计算的集合。
1、弹性:弹性存储(内存与磁盘自动切换)、弹性容错(数据丢失可以自动恢复)、弹性计算(计算出错重试机制)、弹性分片(可根据需求重新分片)2、分布式:数据存储在大数据集群不同的节点上3、数据集:RDD只是封装了计算逻辑,并不保存数据4、数据抽象:RDD是一个抽象类,需要子类具体实现5、不可变:RDD封装了计算逻辑,是不可改变的,想要改变,只能产生新的RDD,在新的RDD中封装新的计算逻辑6、可分区:RDD是一种分布式的数据集,由于数据量很大,因此计算时要被切分并存储在各个节点的分区当中7、并行计算:一个分区对应一个任务,分区是Spark计算任务的基本处理单位,决定了并行计算的粒度8、依赖关系:如果某个RDD丢失了,则可以根据血缘关系,从父RDD计算 得来9、惰性执行:Spark对于Transformation转换算子采用惰性计算机制,遇到时并不会立即计算结果,而是要等遇到Action行动算子时才会一起执行。
五大属性
RDD
一个partition对应一个task。partition时spark计算任务的基本处理单位,决定了并行计算的粒度,而partition中的每一条record为基本处理对象。例如对某个RDD进行map操作,在具体执行时是由多个并行的Task对各自分区的每一条记录进行map映射。
Partition
转换往往是从一个RDD到另一个RDD的计算,在执行应用的程序时,遇到转换算子,并不会立即触发计算操作,而是延时遇到Action算子时才会操作。如map、flatmap、filter、sortByKey、reduceByKey
转换算子
一个行动往往代表一种输出到外部系统的操作。在执行应用的程序时,遇到行动算子,会立即产生一个job,对已有的RDD中的数据执行计算后产生结果,将结果返回Driver程序或写入到外部物理存储。如count、take、first、foreach、collect等。
行动算子
1、cache:保存到内存,效率高,数据不安全容易丢失适用场景:应用只提交一次,且某个RDD的数据会被多次使用。2、persist:保存到磁盘(临时文件,作业结束后会删除),效率低,数据安全适用场景:应用只提交一次,且某个RDD的数据会被多次使用3、checkpoint:保存到磁盘(永久保存,一般存储在分布式文件系统中,例如HDFS),效率低,数据安全cache和persist都是懒执行的,必须由一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系
控制算子
将处理的数据逐条进行映射转换,将返回值构成新的RDD。这里的转换可以是类型的转换,也可以时值的转换。
map
以分区为单位进行数据转换操作,会将整个分区的数据加载到内存。处理完的数据不会立刻释放,因为存在引用关系,所以在内存较小、数据量较大的情况下,容易出现OOM内存溢出。能用mapPartitions的地方都可以用map解决,两者的主要区别时调用的粒度不一样:map的输入变换函数时应用于RDD中的每个元素,而mapPartitions的输入函数是应用于每个分区有些时候比如连接数据库时用 mapPartitions 比较好,因为每次连接开销很大,每个分区连一次比每调用一次连接一次要好。mapPartitionsWithIndex 跟 mapPartitions 差不多,只是参数多了个分区号而已。
mapPartitions
扁平化映射可以理解为先map,然后再faltten。简单理解就是首先将函数作用与集合中的每个元素,然后将结果展平,返回新的集合。
flatMap
分组是指将数据按指定条件进行分组,从而方便我们对数据进行统计分析。按照传入函数的返回值进行分组,将相同的key对应的值放入一个迭代器当中。使用gruopBy算子后数据会被打乱重新组合,我们将这样的操作称之为shuffle。
groupBy(shuffle)
过滤是指过滤出符合一定条件的元素,将数据根据指定的规则进行筛选过滤,符合规则和数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。
filter
将数据集中重复的数据去重。Scala的distinct底层采用了HashSet的方式,而Spark的distinct则是采用了mapreduceByKey的方式进行去重。
distinct(shuffle)
重新分配分区数,一定会产生shuffle操作,底层就是调用了coalesce
repartition(shuffle)
sortBy函数可以根据指定的规则对数据源中的数据进行排序,默认为升序,该函数会产生shuffle操作
sortBy(shuffle)
算子转换算子(单Value)
求这些需求,需要两个RDD的元素类型必须相同,否则编译不通过其中:union:表示对两个RDD进行并集操作,且不去重intersection:表示对两个RDD取交集(相同)subtract:表示对两个RDD取差集(不同)
并集/交集/差集/笛卡尔积
将两个RDD合并成一个RDD,两个RDD的partition数量以及元素数量都必须相同,否则会抛出异常
拉链
转换算子(双Value)
将数据按照指定分区数重新进行分区,Spark默认采用HashPartitioner
partitionBy(shuffle)
将k,v格式数据的key根据指定的规则进行排序,默认为升序。
sortByKey(shuffle)
将相同key的值聚合到一起,reduce任务的个数可以通过numPartitions参数来设置。
reduceByKey(shuffle)
按k,v格式数据的key进行分组,会返回(k,iterable【v】)格式数据。
groupByKey(shuffle)
从 Shuffle 的角度:reduceByKey 和 groupByKey 都存在 Shuffle 的操作,但是 reduceByKey 可以在 Shuffle 前对分区内相同 Key 的数据进行预聚合(combine)操作,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。从功能的角度:reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey。如果仅仅只是分组而不聚合,那么还是只能使用 groupByKey。 总结下来就是: reduceByKey:先在分区内进行预聚合(Shuffle 前),再将所有分区的数据按 Key 进行分组并聚合。groupByKey:不会进行预聚合,直接将所有分区的数据一起分组(直接 Shuffle),如果要进行聚合,groupByKey 还需要搭配其他函数一起使用,比如 sum()。
reduceByKey与groupByKey的区别
转换算子(Key-Value)
由于Java自身的序列化比较重,所以出于性能考虑,Spark2.0开始支持另外一种序列化机制Kryo。Kryo的速度是Serialization的10倍。当RDD在shuffle数据的时候,简单的数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化。
Kryo序列化框架
Spark根据用户Application中的RDD的转换算子和行动算子,会生成RDD之间的依赖关系,多个RDD之间的关系又形成一条关系链叫做RDD的血统。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。因为保存血统比保存数据节省空间,如果血统和数据都不保存,那么分区数据丢失时,就需要重头开始计算。
血统
RDD之间的依赖关系又分为窄依赖和宽依赖窄依赖指的是父RDD和子RDD的Partition之间的关系时一对一的(独生子女),如map、filter等。宽依赖指的是父RDD与子RDD的Partition之间的关系时一对多(多胎),宽依赖会有shuffle的产生。如reduceByKey、groupByKey等通过源码发现 OneToOneDependency 继承了 NarrowDependency ,所以它也叫窄依赖。窄依赖指的是父 RDD 和子 RDD 的 Partition 之间的关系是一对一的(独生子女)。如 map、filter 等。通过源码发现 ShuffleDependency 直接继承了 Dependency ,为了方便记忆,一般叫它宽依赖。宽依赖指的是父RDD 与子 RDD 的 Partition 之间的关系是一对多(多胎),宽依赖会有 Shuffle 的产生。如 reduceByKey、groupByKey 等。
血统&依赖关系
每个job会被拆分成多个Task,作为一个TaskSet任务集,其名称为Stage,Stage的划分和调度是由DAGScheduler来负责的。Stage的切割规则为:从后往前,遇到宽依赖就切割Stage
Stage阶段
分析用户提交的应用,根据RDD的依赖关系建立DAG,且将DAG划分为不同的Stage,其划分Stage的依据是根据RDD的依赖关系找出开销最小的调度方法。
SparkContext
Shuffle 不可以避免是因为在分布式系统中的基本点就是把一个很大的的任务/作业分成一百份或者是一千份(分而治之),这一百份和一千份文件在不同的机器上独自完成各自不同的部份,我们是针对整个作业要结果,所以在后面会进行汇聚,这个汇聚的过程的前一阶段到后一阶段以至网络传输的过程就叫 Shuffle。Shuffle 是分布式系统的天敌。
基于 Hash 的 Shuffle 由于不要求数据有序,Shuffle Write 和 Shuffle Read 的任务非常简单:将数据 Partition 好,并持久化形成一个 ShuffleBlockFile,或者简称 FileSegment。之后的 Reducer 会去 Fetch 属于自己的 FileSegment,进入 ShuffleRead 阶段。之所以要持久化,一方面是要减少内存存储空间压力,另一方面也是为了容错性。这样的实现很简单,但有几个问题:产生的 FileSegment 过多。每个 ShuffleMapTask 产生 R(Reducer 个数)个 FileSegment,M 个 ShuffleMapTask 就会产生 M * R 个文件。一般 Spark Job 的 M 和 R 都很大,因此磁盘上会存在大量的数据文件。缓冲区占用内存空间大。每个 ShuffleMapTask 需要开 R 个 Bucket,M 个 ShuffleMapTask 就会产生 M * R 个 Bucket。虽然一个 ShuffleMapTask 结束后,对应的缓冲区可以被回收,但一个 Worker Node 上同时存在的 Bucket 个数可以达到cores * R 个(一般 Worker 同时可以运行 cores 个 ShuffleMapTask),占用的内存空间也就达到了 cores * R * 32KB 。对于 8 核 1000 个 Reducer 来说,占用内存就是 250MB。如果数据处理的规模比较庞大的话,内存不可承受,会出现 OOM 等问题。 为了解决相关问题(其实是第一个问题,因为写磁盘终究是要开缓冲区的,缓冲区太小会影响 IO 速度,第二个问题直到引入 Sort Shuffle 才被解决),Hash Shuffle 进入了第二阶段。
Hash Shuffle
第二阶段的优化思路是在一个 core 上连续执行的 ShuffleMapTasks 可以共用一个输出文件 ShuffleFile。先执行完的ShuffleMapTask 形成 ShuffleBlock 1,后执行的 ShuffleMapTask 可以将输出数据直接追加到 ShuffleBlock 1 后面,形成ShuffleBlock 1',每个 ShuffleBlock 被称为 FileSegment。下一个 Stage 的 Reducer 只需要 Fetch 整个 ShuffleFile 就行了。这样,每个 Worker 持有的文件数降为 cores * R。FileConsolidation 功能可以通过 spark.shuffle.consolidateFiles=true 来开启。这个优化后的 HashShuffle 叫 ConsolidatedShuffle,Consolidated HashShuffle 也有它的弱点:如果 Reducer 端的并行任务或者是数据分片过多的话则 Core * Reducer Task 依旧过大,也会产生很多小文件。
Consolidated Shuffle
为了缓解 Shuffle 过程产生文件数过多和 Writer 缓存开销过大的问题,Spark 引入了类似于 Hadoop MapReduce 的Shuffle 机制。该机制每一个 ShuffleMapTask 不会为后续的任务创建单独的文件,而是会将所有的 Task 结果写入同一个文件,并且对应生成一个索引文件。也就是说 Hash Shuffle 为每一个 Reducer 产生一个文件,但是 Sort Shuffle 只产生一个按照 Reducer ID 排序可索引的文件。这样,只需获取有关文件中的相关数据块的位置信息,并 fseek 就可以读取指定Reducer 的数据。
Sort Shuffle
Shuffle
一种分布式共享只读变量,使用了广播变量存储regex后,Executor端就只会存储一份该数据供多个Task使用,为了防止数据被修改,所以是只读变量。
广播变量初始的时候在Driver端会有一份副本,Task在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的Executor对用的BlockManager中,尝试获取变量副本。如果本地没有,那么就从Driver端远程拉取变量副本,并保存在本地的Executor对应的BlockManager中,此后这个Executor上的其它Task都会直接使用本地的BlockManager中的副本。Executor的BlockManager除了从Driver端上拉取,也能从其他节点的BlockManager上拉取变量副本,距离越近越好。
工作流程
不是每个Task一份变量副本,而是每个节点的Executor一份副本。这样的话,就可以让变量产生的副本数大大减少。变量一旦被定义为一个广播变量,那么这个变量只能读取,不能修改
优点
能不能将一个 RDD 使用广播变量广播出去?不能,因为 RDD 是不存储数据的。可以将 RDD 的结果广播出去。广播变量只能在 Driver 端定义,不能在 Executor 端定义。在 Driver 端可以修改广播变量的值,在 Executor 端无法修改广播变量的值。如果 Executor 端用到了 Driver 端的变量,不使用广播变量,在 Executor 有多少 Task 就有多少 Driver 端的变量副本。如果 Executor 端用到了 Driver 端的变量,使用广播变量,在每个 Executor 中只有一份 Driver 端的变量副本。
注意
广播变量
PV 是网站分析的一个术语,用以衡量网站用户访问的网页的数量。对于广告主, PV 值可预期它能带来多少广告收入。 PV(Page View) 即页面浏览量或点击量。具体的说, PV 值就是所有访问者在 24 小时(0 点到 24 点)内看了某个网站多少个页面或某个网页多少次,每一次页面刷新,就算做一次 PV 流量。一般来说, PV 与来访者的数量成正比,但是 PV 并不直接决定页面的真实来访者数量,如同一个来访者通过不断的刷新页面,也可以制造出非常高的 PV 。 度量方法就是从浏览器发出一个对网络服务器的请求( Request ),网络服务器接到这个请求后,会将该请求对应的一个网页( Page )发送给浏览器,从而产生了一个 PV 。那么在这里只要是这个请求发送给了浏览器,无论这个页面是否完全打开(下载完成),那么都是应当计为 1 个 PV 。
pv
UV(Unique Visitor) 即独立访客数,指访问某个站点或点击某个网页的不同 IP 地址的人数。在同一天内,UV 只记录第一次进入网站的具有独立 IP 的访问者,在同一天内再次访问该网站则不计数。 UV 提供了一定时间内不同观众数量的统计指标,而没有反应出网站的全面活动。
uv
PV&UV
Spark总结
0 条评论
回复 删除
下一页