资源调优
<span style="font-size: inherit;">在部署Spark集群中指定资源分配的默认参数</span><br>
Spark_worker_cores
spark_worker_memory
spark_worker_instances
在提交Application的时候给当前的Application分配更多资源<br>
动态分派资源(代码)<br>
提交命令选项(任务提交)
配置信息(配置文件)
并行度调优
读取数据在hdfs中,降低block大小,提高了RDD中partition个数
sc.parallelize
sc.makeRDD
sc.parallelizePair
repartions/coalesce
增大分区和减少分区
产生shuffle和不产生shuffle
redecBuKey/groupByKey/join
reducByKey预聚合操作
groupByKey分区聚合
spark.default.parallelism net set
设置默认分区数
只属于RDD
spark.sql.shuffle.partitions---200
配置在为连接或聚合转移数据时要使用的分区数
只属于Spark sql
如果读取数据是在SparkStreaming中
Receiver
spark.streaming.blockInterval—200ms
代码调优
避免重复的RDD
对多次使用的RDD进行持久化操作
checkpoint
载入点
切断RDD之间的依赖关系
shuffle
避免使用shuffle算子
使用广播变量来模拟使用join,使用情况: 一个RDD比较大,一个RDD比较小
使用预聚合的shuffle算子
reduceByKey
aggregateByKey
combinerByKey
尽量使用高性能算子
使用reduceByKey代替groupBykey
使用mapPartition代替map
使用foreachPartition代替map
filter后使用coalesce减少分区
使用repartition和coalesce算子操作分区
使用广播变量
较少网络传输性能开销,用时降低Executor内存的占用开销,较少gc频率
使用Kryo优化序列化性能
速度快,体积小,需要手动注册需要进行序列化的自定义类型
优化数据结构
字符串代替对象
原始类型(int,long)代替字符串
数组代替集合类型
使用高性能的库fastutil
JDK原生类型占内存太大,使用fastutil可以直观减少内存
数据本地化
数据本地化级别
PROCESS_LOCAL
task要计算的数据在本进程(Executor)的内存中
NODE_LOCAL
task所计算的数据在本节点所在的磁盘上
task所计算的数据在本节点所在的磁盘上。
NO_PREF
task所计算的数据在关系型数据库中,如mysql
RACK_LOCAL
task所计算的数据在同机架的不同节点的磁盘或者Executor进程的内存中
每分发失败后,会重新发送,每3秒一次,默认重试5次
Spark数据本地化调优
可以增加每次发送task的等待时间,将3秒倍数调大
Spark Shuffle调优
buffer大小--默认32k
shuffle read拉取数据量的大小--默认48M
shuffle聚合内存的比例--默认20%
拉取数据重试次数--默认5次
重试间隔时间--默认60s
Spark Shuffle的种类
hashshuffle
sortshuffle
SortShuffle bypass机制
调节Executor的堆外内存
堆外内存就是把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机),这样做的结果就是能够在一定程度上减少垃圾回收对应用程序造成的影响。
yarn下
--conf spark.yarn.executor.memoryOverhead=2048 单位M
standalone下
--conf spark.executor.memoryOverhead=2048单位M