Spark core
2020-06-09 09:44:13 0 举报AI智能生成
sparkcore知识点覆盖
sparkcore
软件开发
模版推荐
作者其他创作
大纲/内容
架构
driver
clustermanager 程序提供计算资源的外部服务
standalone
mesos
yarn
worker
master
申请计算资源
executor
task
任务提交
bin/spark-submit \<br><br>--class org.apache.spark.examples.SparkPi \<br><br>--master spark://node1:7077,node2:7077,node3:7077 \<br><br>--executor-memory 1G \<br><br>--total-executor-cores 2 \<br>examples/jars/spark-examples_2.11-2.3.3.jar \<br>10
spark shell使用
spark-shell --master local[N]
spark-shell --master spark://node1:7077 --executor-memory 1g --total-executor-cores 4
RDD
弹性分布式数据集<br>
不可变、可分区、里面的元素可并行计算的集合
五大属性
A list of partitions 一个分区(Partition)列表
A function for computing each split 一个计算每个分区的函数<br>
A list of dependencies on other RDDs 一个rdd会依赖于其他多个rdd<br>
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) 一个Partitioner,即RDD的分区函数(可选项)<br>
HashPartitioner
RangePartitioner
只针对k-v格式的RDD
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) 一个列表,存储每个Partition的优先位置
数据本地性,会优先考虑在存有数据的节点开启计算任务
RDD的创建方式<br>
对已经存在的scala集合去构建
sc.parallelize(List(1,2,3,4,5))
sc.parallelize(Array("hadoop","hive","spark"))
sc.makeRDD(List(1,2,3,4))
加载外部的数据源去构建
sc.textFile("/words.txt")
从已经存在的rdd进行转换生成一个新的rdd
rdd1.flatMap(_.split(" "))
rdd2.map((_,1))
RDD的算子分类
transformation(转换)=》 返回一个新的RDD
map / flatMap /mapPartitions/join/union/distinct/filter/reduceByKey/sortByKey /coalesce/repartition 等
延迟加载
action (动作) =》 没有返回值
它会真正触发任务的运行
collect/reduce/count/first /take/ saveAsTextFile/countByKey/foreach/foreachPartition/ foreachRdd 等<br>
spark读取文件数据写入hbase表中
//4.1 获取hbase的数据库连接<br><br> val configuration: Configuration = HBaseConfiguration.create()
//指定zk集群的地址<br><br> configuration.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181")
val connection: Connection = ConnectionFactory.createConnection(configuration)
//4.2 对于hbase表进行操作这里需要一个Table对象<br><br> val table: Table = connection.getTable(TableName.valueOf("person"))
table.put(puts)
依赖关系
窄依赖
map/flatMap/filter/union...
所有的窄依赖不会产生shuffle
宽依赖
宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition
reduceByKey/sortByKey/groupBy/groupByKey/join等
join分为宽依赖和窄依赖,如果RDD有相同的partitioner,那么将不会引起shuffle,这种join是窄依赖,反之就是宽依赖
lineage(血统) => 容错机制
RDD的Lineage会记录RDD的元数据信息和转换行为,lineage保存了RDD的依赖关系,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区
血统就是 一级级Rdd传递下来,可以通过重新计算来恢复得到的结果
RDD的缓存机制
cache
cache最终也是调用了persist方法==,默认的存储级别都是==仅在内存存储一份
persist
unpersis 手动清除缓存
一个application应用程序结束后,缓存自动清除
action触发后才会将数据缓存
RDD的checkpoint机制
把数据保存在分布式文件系统,比如HDFS上
设置ck
sc.setCheckpointDir("hdfs://node1:9000/checkpoint")
val rdd1=sc.textFile("/words.txt")
rdd1.checkpoint
rdd2.collect //action操作触发
会开启新的job执行checkpoint操作
它会改变rdd的依赖关系,后续数据丢失了不能够在通过血统进行数据的恢复
官方建议:先cache再checkpoint,这样checkpoint时不需要重新计算,可直接从cache拿
DAG划分stage
ShuffleMapStage<br>
最后一个shuffle之前的所有变换叫ShuffleMapStage
ResultStage
最后一个shuffle之后的操作叫ResultStage,它是最后一个Stage
stage中的每一个分区对应一个task,在同一个stage中就有很多可以并行运行的task
如何划分stage?划分stage的依据就是宽依赖<br>
spark的任务调度
(1) Driver端运行客户端的main方法,构建SparkContext对象,在SparkContext对象内部依次构建DAGScheduler和TaskScheduler
1.首先从zk找到meta表的region位置,然后读取meta表中的数据,meta表中存储了用户表的region信息
(2) 按照rdd的一系列操作顺序,来生成DAG有向无环图
(3) DAGScheduler拿到DAG有向无环图之后,按照宽依赖进行stage的划分。每一个stage内部有很多可以并行运行的task,最后封装在一个一个的taskSet集合中,然后把taskSet发送给TaskScheduler
4)TaskScheduler得到taskSet集合之后,依次遍历取出每一个task提交到worker节点上的executor进程中运行。
(5)所有task运行完成,整个任务也就结束了
spark的运行架构
(1) Driver端向资源管理器Master发送注册和申请计算资源的请求<br><br><br>(2) Master通知对应的worker节点启动executor进程(计算资源)<br><br><br>(3) executor进程向Driver端发送注册并且申请task请求<br><br><br>(4) Driver端运行客户端的main方法,构建SparkContext对象,在SparkContext对象内部依次构建DAGScheduler和TaskScheduler<br><br><br>(5) 按照客户端代码洪rdd的一系列操作顺序,生成DAG有向无环图<br><br><br>(6) DAGScheduler拿到DAG有向无环图之后,按照宽依赖进行stage的划分。每一个stage内部有很多可以并行运行的task,最后封装在一个一个的taskSet集合中,然后把taskSet发送给TaskScheduler<br><br><br>(7)TaskScheduler得到taskSet集合之后,依次遍历取出每一个task提交到worker节点上的executor进程中运行<br><br><br>(8)所有task运行完成,Driver端向Master发送注销请求,Master通知Worker关闭executor进程,Worker上的计算资源得到释放,最后整个任务也就结束了。
spark自定义分区
继承org.apache.spark.Partitioner
重写numPartitions<br>
重写getPartition
spark的共享变量
将变量只广播给各个Executor,该Executor上的各个Task再从所在节点的BlockManager获取变量,而不是从Driver获取变量
使用
val broadCast = sc.broadcast(word)
broadCast.value //获取广播变量的值<br>
广播变量使用注意事项
1、不能将一个RDD使用广播变量广播出去。(<font color="#f15a23">可先取到RDD的数据,如collect,再将结果广播出去</font>)
2、广播变量只能在Driver端定义,不能在Executor端定义<br>
3、在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值
4、如果executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本
5、如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本
spark的累加器(accumulator)<br>
spark程序的序列化问题
driver本地序列化 => 对象序列化后传输到远程executor节点 => 程executor节点反序列化对象 => 最终远程节点执行<br>
将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化
使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。
解决序列化
类 <font color="#f15a23">extends Serializable</font>
如果函数中使用了该类对象的成员变量,该类除了要实现序列化之外,所有的成员变量必须要实现序列化
对于不能序列化的成员变量使用“<font color="#f15a23">@transient</font>”标注,告诉编译器不需要序列化
也可将依赖的变量独立放到一个小的class中,让这个class支持序列化,这样做可以减少网络传输量,提高效率。
可以把对象的创建直接在该函数中构建这样避免需要序列化
application、job、stage、task之间的关系
一个application就是一个应用程序,包含了客户端所有的代码和计算资源
一个action操作对应一个DAG有向无环图,即一个action操作就会触发产生一个job
一个job中包含了大量的宽依赖(shuffle),按照宽依赖进行stage划分,一个job产生了很多个stage
一个stage中有很多分区,一个分区就是一个task,即一个stage中有很多个task
总结:- 一个application包含了很多个job<br>- 一个job包含了很多个stage<br>- 一个stage包含了很多个task
spark on yarn
yarn-cluster模式
spark-submit --class org.apache.spark.examples.SparkPi \<br><br>--master yarn \<br><br>--deploy-mode cluster \<br><br>--driver-memory 1g \<br><br>--executor-memory 1g \<br><br>--executor-cores 1 \<br><br>/opt/bigdata/spark/examples/jars/spark-examples_2.11-2.3.3.jar \<br><br>10
yarn-client模式
spark-submit --class org.apache.spark.examples.SparkPi \<br><br>--master yarn \<br><br>--deploy-mode client \<br><br>--driver-memory 1g \<br><br>--executor-memory 1g \<br><br>--executor-cores 1 \<br><br>/opt/bigdata/spark/examples/jars/spark-examples_2.11-2.3.3.jar \<br><br>10<br>
collect 算子
默认Driver端的内存大小为1G,由参数 spark.driver.memory 设置
实际企业中一般都会把该参数调大,比如5G/10G等
new SparkConf().set("spark.driver.memory","5G")
spark任务中资源参数剖析
executor-memory
表示每一个executor进程需要的内存大小,它决定了后期操作数据的速度
--total-executor-cores
表示任务运行需要总的cpu核数,它决定了任务并行运行的粒度
调优
十大开发原则
免创建重复的RDD
尽可能复用同一个RDD
当一个rdd是另外一个rdd的子集的时候
对多次使用的RDD进行持久化,避免从源头处重新计算一遍这个RDD<br>
通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写,会导致性能急剧降低
尽量避免使用shuffle类算子
尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽量使用map类的非shuffle算子,<br>
如果其中的一个rdd数据量要小,就把小的这个rdd的数据给广播出去,通过map遍历,将key相同的进行处理,实现join<br>
使用map-side预聚合的shuffle操作
建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子
groupByKey算子是不会进行预聚合
使用高性能的算子
使用reduceByKey/aggregateByKey替代groupByKey
使用mapPartitions替代普通map
使用foreachPartitions替代foreach
使用filter之后进行coalesce操作,手动减少RDD的partition数量
使用repartitionAndSortWithinPartitions替代repartition+sort类操作
如果需要在repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions算子
如果需要在repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions算子
广播大变量
使用Kryo优化序列化性能
sparkconf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册要序列化的自定义类型。<br><br>conf.registerKryoClasses(Array(classOf[Studnet], classOf[School]))
优化数据结构
有三种类型比较耗费内存:<br>- 对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间。<br>- 字符串,每个字符串内部都有一个字符数组以及长度等额外信息。<br>- 集合类型,比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来封装集合元素,比如Map.Entry。
尽可能数据本地化
PROCESS_LOCAL:进程本地化,
NODE_LOCAL:节点本地化
NO_PREF:对于task来说,数据从哪里获取都一样,没有好坏之分<br>
RACK_LOCAL:机架本地化
ANY:性能最差
配置
spark.locality.wait.process 30s<br>spark.locality.wait.node 30s
task等待时间,看能不能按照最优级别去分配
基于Spark内存模型调优
Executor的内存
静态内存模型
execution内存:让task通过shuffle过程拉取了上一个stage的task的输出后,进行聚合等操作时使用,shuffle、join、groupByBkey
storage内存:让RDD持久化时使用,缓存、广播变量
其它:让task执行我们自己编写的代码时使用<br>
配置参数
spark.storage.memoryFraction:默认0.6<br>spark.shuffle.memoryFraction:默认0.2
缺点:Storage内存区域和execution区域,不能互相借用空闲内存
统一内存模型
动态内存模型先是预留了300m内存,防止内存溢出
spark.memory.fraction 这个指的默认值是 0.6
Storage内存
配置参数:spark.memory.storageFraction :0.5
execution内存
0.6 - 0.5 = 0.1
其它 = 1- 0.6 = 0.4
特点:
Storage内存和execution内存 可以相互借用
Execution使用的时候发现内存不够了,然后就会把storage的内存里的数据驱逐到磁盘上
一开始execution的内存使用得不多,但是storage使用的内存多,所以storage就借用了execution的内存,但是后来execution也要需要内存了,这个时候就会把storage的内存里的数据写到磁盘上,腾出内存空间
资源调优
num-executors
如果不设置的话,默认只会给你启动少量的Executor进程
每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适
有多少分区就设置多少个executor
分区数 = task个数, task个数最小2个,默认是 Executor使用的总核数<br>
https://blog.csdn.net/jiede1/article/details/103799012 配置计算
executor-memory
个Executor进程的内存设置4G~8G较为合适
看看自己团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,是不能超过队列的最大内存量的
如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的1/3~1/2
executor-cores
设置为2~4个较为合适
看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core
跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适
driver-memory<br>
如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题
spark.default.parallelism
设置每个stage的默认task数量
设置该参数为num-executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的
spark.storage.memoryFraction(Spark1.6之前的参数)
spark.shuffle.memoryFraction(Spark1.6之前的参数)
示例
./bin/spark-submit \<br><br> --master yarn-cluster \<br><br> --num-executors 100 \<br><br> --executor-memory 6G \<br><br> --executor-cores 4 \<br><br> --driver-memory 1G \<br><br> --conf spark.default.parallelism=1000 \<br><br> --conf spark.storage.memoryFraction=0.5 \<br><br> --conf spark.shuffle.memoryFraction=0.3 \
异常 报告
java.lang.OutOfMemoryError<br><br>ExecutorLostFailure<br><br>Executor exit code 为143<br><br>executor lost<br><br>hearbeat time out<br><br>shuffle file lost
很有可能就是内存除了问题,可以先尝试增加内存。如果还是解决不了,尝试 数据倾斜的解决方案
数据倾斜调优
shuffle类算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等
定位:可以通过Spark Web UI来查看当前运行到了第几个stage,看一下当前这个stage各个task分配的数据量,从而进一步确定是不是task分配的数据不均匀导致了数据倾斜,根据stage划分原理,推算出来发生倾斜的那个stage对应代码中的哪一部分,这部分代码中肯定会有一个shuffle类算子。只要看到Spark代码中出现了一个shuffle类算子或者是Spark SQL的SQL语句中出现了会导致shuffle的语句(比如group by语句),那么就可以判定,以那个地方为界限划分出了前后两个stage
解决方案
使用Hive ETL预处理数据((即通过Hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行join))
导致数据倾斜的是Hive表
该Hive表中的数据本身很不均匀(比如某个key对应了100万数据,其他key才对应了10条数据)
业务场景需要频繁使用Spark对Hive表执行某个分析操作
过滤少数导致倾斜的key
导致倾斜的key就少数几个,而且对计算本身的影响并不大
提高shuffle操作的并行度(效果差)
RDD执行shuffle算子
reduceByKey(1000)
设置了这个shuffle算子执行时shuffle read task的数量
Spark SQL中的shuffle类语句
spark.sql.shuffle.partitions
shuffle read task的并行度,该值默认是200
两阶段聚合(局部聚合+全局聚合)
聚合类的shuffle的算子都可以适用(groupBykey 、Spark SQL中使用group by)
join类的shuffle操作不适用
将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。
将reduce join转为map join
适用join类shuffle算子
rdd1 rdd2(其中的一个rdd1的数据量要少,少于 10G)
将较小的RDD直接广播出去,在executor中map全量遍历,根据业务对key相同的数据 进行联结处理
采样倾斜key并分拆join操作(企业中常出现)
1、对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个key的数量,计算出来数据量最大的是哪几个key。<br> 2、然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD。<br> 3、接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个RDD。<br> 4、再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打散成n份,分散到多个task中去进行join了。<br> 5、而另外两个普通的RDD就照常join即可。<br> 6、最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。
使用随机前缀和扩容RDD进行join
1、该方案的实现思路基本和“解决方案六”类似,首先查看RDD/Hive表中的数据分布情况,找到那个造成数据倾斜的RDD/Hive表,比如有多个key都对应了超过1万条数据。<br><br> 2、然后将该RDD的每条数据都打上一个n以内的随机前缀。<br><br> 3、同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀。<br><br> 4、最后将两个处理后的RDD进行join即可。<br><br> 方案实现原理:将原先一样的key通过附加随机前缀变成不一样的key,然后就可以将这些处理后的“不同key”分散到多个task中去处理,而不是让一个task处理大量的相同key。该方案与“解决方案六”的不同之处就在于,上一种方案是尽量只对少数倾斜key对应的数据进行特殊处理,由于处理过程需要扩容RDD,因此上一种方案扩容RDD后对内存的占用并不大;而这一种方案是针对有大量倾斜key的情况,没法将部分key拆分出来进行单独处理,因此只能对整个RDD进行数据扩容,对内存资源要求很高。
解决方案八:前面的几种解决方案,灵活配合使用
Shuffle调优
Shuffle的核心组件
ShuffleMapStage<br>
为shuffle提供数据的中间stage
ResultStage
为一个action操作计算结果的stage
Spark Shuffle参数调优
spark.shuffle.file.buffer 默认值:32k
将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘
如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k)
spark.reducer.maxSizeInFlight 默认值:48m
决定了每次能够拉取多少数据
如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m)
spark.shuffle.io.maxRetries 默认值:3<br>
shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的
对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次)
spark.shuffle.io.retryWait 默认值:5s<br>
每次重试拉取数据的等待间隔<br>
:建议加大间隔时长(比如60s<br>
spark.shuffle.memoryFraction(Spark1.6,后面的名字变了) 默认值:0.2<br>
Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%<br>
。如果内存充足,而且很少使用持久化操作,建议调高这个比例<br>
spark.shuffle.manager 默认值:sort<br>
用于设置ShuffleManager的类型<br>
如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能
spark.shuffle.sort.bypassMergeThreshold 默认值:200<br>
当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作
当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销
Collect
Get Started
Collect
Get Started
Collect
Get Started
Collect
Get Started
评论
0 条评论
下一页