spark源码之DAGScheduler
2019-07-18 11:32:34 0 举报
AI智能生成
spark源码的DAGScher解读
作者其他创作
大纲/内容
0.包路径
包:org/apache/spark/scheduler/DAGScheduler.scala
1.提交Job的入口<br>runJob()
1)生成job的启动时间<br>
2)submitJob()<br>提交job,执行Job过程是异步的,<br>因此submitJob()将立即返回JobWaiter对象
3)利用JobWaiter等待Job处理完毕,成功:打印日志,失败:日志+异常
2.DAGScheduler调度的核心入口<br>handleJobSubmitted()
1)调用createResultStage(),创建ResultStage<br>
2)创建ActiveJob。<br>3)调用clearCacheLocs方法(见代码清单7-23 )清空cacheLocs。<br>4)生成Job提交的时间。<br>5)将jobld与刚创建的ActiveJob之间的对应关系放人jobldToActiveJob中。<br>6)将刚创建的ActiveJob放人activeJobs集合中。<br>7 )使ResultStage的。activeJob 属性持有刚创建的ActiveJob。 <br>
8)获取当前Job的所有Stage对应的StageInfo (即数组stagelnfos)。<br>9)向LiveListenerBus投递SparkL itenerlobSart事件,进而引发所有关注此事件的监<br>听器执行相应的操作。 <br>
10)调用submitage(),<br>提交ResultStage<br>---stage划分算法的入口
01.activeJobForStage(),找到当前stage的所有ActiveJob的身份标识
02.若存在01,判断stage是否还未提交,然后进行以下操作----stage划分算法的精髓<br>①调用getMissingParentStages(),获取当前所有未提交stage的父Stage<br>②如果不存在未提交的父stage,则调用submitMissingTasks()提交当前所有未提交的额Task<br>否则,递归调用submitStage(),提交所有未提交的父stage,并将当前stage加入waitingStages,<br>(表示当前stage必须等待所有的父stage执行完成)
getMissingParentStages()----stage的划分算法<br>如果stage最后一个rdd的所有依赖,都是窄依赖,那么就不会创建任何新的stage<br>但是只要发现这个stage的rdd宽依赖了某个rdd,那么<br>用宽依赖的那个rdd,创建一个新的stage,然后立即将新的stage返回<br>
submitMissingTasks(),<br>提交stage,为stage创建一批task,task数量与partiton数量相同
03.若不存在01,则调用abortStage()终止依赖于当前Stage的所有Job
3.构建stage
创建ResultStage的方法<br>createResultStage()
获取或创建父stage的列表<br>getOrCreateParentStages()
getShuffleDependencies<br>获取RDD所有shuffleDependency的序列,<br>逐个访问每个RDD及其依赖的非shuffle的RDD,<br>获取所有非shuffle的RDD的shuffleDependency
getOrCreateShuffleMapStage<br>为每一个ShuffleDependency获取或者创建对应的ShuffleMapStage
job包含多个stage,<br>划分方式从Resultstage开始从后往前边划分边创建
生成stage身份标识
将ResultStage注册到stageTdToStage中
调用updateJobIdStageIdMaps(),<br>更新Job的身份标识与ResultStage机器祖先的映射关系
stage划分算法总结:<br>1.从fianlkStage倒推<br>2.通过宽依赖来进行stage的划分<br>3.使用递归有限提交父stage<br>
收藏
0 条评论
下一页