spark diver资源申请任务提交时序图
2021-02-25 15:50:50   2  举报             
     
         
 spark yarn-cluster 资源申请任务提交时序图
    作者其他创作
 大纲/内容
 添加到消息队列中 receivers.offer(data)
  receivers:LinkedBlockingQueue[EndpointData] 
  dagScheduler.handleJobSubmitted
    extends EventLoop[DAGSchedulerEvent]
  YarnRMClient
  mainMethod.invoke
  yarnClient.start()
  run()
  new()
  prepareCommand()(bin/java CoarseGrainedExecutorBackend)
  rpcEnv.setupEndpoint()
  Client
  new Thread().run()
  new DriverEndpoint
  processRpcRequest
  NettyRpcEnv
  createAMRMClient()
  YarnClusterApplication
   instanceofResponseMessage
  Dispatcher
  font color=\"#000099\
   instanceofRequestMessage
  SchedulerBackend
  createSparkEnv()
  (class)ApplicationMaster
  SparkSubmitArguments
  DAGSchedulerEventProcessLoop 
  excu CMD(bin/java org.apache.spark.deploy.SparkSubmit)
  prepareSubmitEnvironment(args)
  org.apache.spark.launcher.Main.main()-->builder.buildCommand(env)
  DriverEndpoint
  org.apache.spark.launcher.Main.main
  submitApplication()
  doOnReceive()
  OneWayOutboxMessage
  sendwith()
  org.apache.spark.deploy.yarn.Client
  ThreadDAGSchedulerEventProcessLoop
  runJob()
  registerAM()
  createDriverEndpoint()
  driver Thread
  runImpl()
  AMRMClient(和RM交互)
  _taskScheduler.start()
  org.apache.spark.deploy.SparkSubmit.main
  createServer()
  handleAllocatedContainers
  classForName
  request instanceof ChunkFetchRequest || RpcRequest || OneWayMessage || StreamRequest
  data.inbox.process()
  虚线箭头在别处调用Thread.start()后启动
  eventQueue阻塞队列
  JVM主要验证submit参数
  eventQueue: BlockingQueue 
  case RegisterExecutor
  executorRef.send(RegisteredExecutor)
  org.apache.spark.deploy.SparkSubmit
  NettyRpcEnvFactory
  action 算子
  onReceive()
  createTaskScheduler
  new
  TransportChannelHandler
  TransportClientTransportResponseHandlerTransportRequestHandler
  channelActivechannelInactivefont color=\"#0000cc\
  SparkSubmit JVM
  request instanceof RequestMessage or  ResponseMessage
  SparkEnv
  DAGScheduler 
  run(while(!stopped.get)..
  dagScheduler.runJob()
  ChannelInboundHandlerAdapter
    backend.start()
  Dispatcher 
  create()
  take()
  CMD(java .....)
  getOrCreateParentStages()
  createNMClient()init(conf)start()
  new DAGScheduler
  createDriverEnv()
  eventQueue.put(event)
  createSchedulerBackend
  spark-submit.sh 
  createDriverEndpointRef
  MessageLoop: Thread
  new Thread(driver)
  RpcRequest
  Spark-class.sh
  rmClient
  Inbox
  createYarnClient
  start()
  inbox.process()
  TransportContext
  new YarnRMClient()
  TransportClient
  send()
   runMain
  runDriver()
  createResultStage()
  YarnAllocator
  startUserApplication()
  clientRegisteredExecutor
  Server
  postMessage()
  (object)ApplicationMaster.main
  spark RPC通信
  userThread.setName(\"Driver\")
  NettyRpcHandler
  font color=\"#000066\
  ExecutorRunnable
  EndpointData
  DriverEndpoint in CoarseGrainedSchedulerBackend
  endpoint.receiveAndReply()
  1. SparkSubmit        // 启动进程    -- main            // 封装参数        -- new SparkSubmitArguments                // 提交        -- submit                    // 准备提交环境            -- prepareSubmitEnvironment                            // Cluster                -- childMainClass = \"org.apache.spark.deploy.yarn.YarnClusterApplication\
  new Thread(和NM交互)
  NMClient 
  new Thread()
  ApplicationMaster JVM
  TransportServer(netty)
  run(while(true))
  startContainer()
  registerApplicationMaster
  eventQueue.take()
  responseHandler.handle((ResponseMessage) request)
  RpcMessage
  take()是阻塞的
  getShuffleDependencies()
  doRunMain()
  runAllocatedContainers
   requestHandler.handle((RequestMessage) request)
  new SparkSubmitArguments
  userAPP.main 
  case
   getOrCreateShuffleMapStage
  requestHandler.handle()
  NettyRpcHandler 
  onReceive(eventQueue.take())
  startServer()
  allocateResources()
  SparkContext
  submitJob()
  TaskScheduler
  RpcEnv
  YarnClient
  register()
  new Thread ().start()userAPP
  post()
  new ResultStage
  parse(args.asJava)
   
 
 
 
 
  0 条评论
 下一页
  
   
   
   
  
  
  
  
  
  
  
  
 