Task部署、Task间数据交换、Task(线程)复用
2022-04-08 17:27:17 15 举报
Task部署、Task间数据交换、Task(线程)复用
作者其他创作
大纲/内容
if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) { kafkaFetcher.runFetchLoop(); } else { runWithPartitionDiscovery(); }
transformPartition()
DefaultScheduler
\tcurrentExecution.deploy();
deployTaskSafe()
异步向TaskManager提交、部署Task
举例执行以下应用代码
transform(transformation)
DataStream.map()
executionVertex.deploy();
创建kafkaFetcher
2
Flink源码(1.11.x)——Task部署、Task间数据交换、Task(线程)复用
doRun();
1、 userFunction.map() 就是用户定义的 MapFunction 里的 map 方法2、 数据经过用户定义的 map 算子,通过采集器往下游发送
配置检查点
getStreamGraph(jobName)
isChainable()解释
运行任务之后的清理工作
setPhysicalEdges()
生成物理执行图(Task 的调度和执行)
mailbox.tryTakeFromBatch()
StreamGraph.getJobGraph()
将JobVertex放入缓存
new Transformation()
把source添加集合合
1、mail入队queue.addFirst(mail);2、唤醒当前Task
for (ResultPartitionWriter partitionWriter : consumableNotifyingPartitionWriters) {\t\t\t\ttaskEventDispatcher.registerPartition(partitionWriter.getPartitionId());\t\t\t}
创建Tashk输出的bufferPool
this.inputEdges = new ExecutionEdge[jobVertex.getJobVertex().getInputs().size()][];
return streamGraph.getJobGraph(null);
初始化slotPool
返回origSecuQuotSource执行以下应用代码
Task输出的buffer数据
isBackPressured返回改Task是否发压,获取全部的AvailableFuture,若是有没完成了则有反压
根据streamGraph.getSourceID找到各节点的输出节点来递归创建JobVertex
transformOneInputTransform()
source
run
createRecordWriter创建Task的输出的缓冲区
createExecutionGraph()
buffers入队
如果是 map 算子,processElement 在 StreamMap.java 调用
RecordWriterOutput.collect()
返回到LocalExecutor.execute()
buffers.add()
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
output.collect(element.replace(userFunction.map(element.getValue())));
this.bufferPool
partitionManager.registerResultPartition(this);
\tfor (JobVertex jobVertex : topologiallySorted)
requestNewBufferBuilder再调用子类ChannelSelectorRecordWriter的
add source()
MailboxProcessor.DefaultActionSuspension.resume()
列举一个Task的执行类StreamTask
1、为产出的每一个 ResultPartition 都有一个关联的 ResultPartitionWriter,同时也都有一个独立的 LocalBufferPool负责提供写入数据所需的 buffer2、ResultPartitionManager 会管理当前 Task 的所有 ResultPartition。
jobVertices.get();downStreamVertex.connectNewDataSetAsInput()
PipelineExecutorUtils.getJobGraph()
构造IntermediateResult中,共有numTaskVertices并行度个IntermediateResultPartition
processMail()
flush
1、向BlobLibraryCacheManager注册该Job; 2、构建ExecutionGraph对象; 3、对JobGraph中的每个顶点进行初始化; 4、将DAG拓扑中从source开始排序,排序后的顶点集合附加到Exec - utionGraph对象;5、获取检查点相关的配置,并将其设置到ExecutionGraph对象; 6、向ExecutionGraph注册相关的listener; 7、执行恢复操作或者将JobGraph信息写入SubmittedJobGraphStore以在后续用于恢 复目的; 8、响应给客户端JobSubmitSuccess消息; 9、对ExecutionGraph对象进行调度执行;
\tbatch.addLast(mail);
开始启动JobMaster
SourceStreamTask朝下游写数据出去
会将 queue 中的消息移到 batch,然后从 batch queue 中依次 take;新 mail 写入 queue。从 batch take 时避免加锁
new SchedulerBase()
运行任务之前的准备工作
schedulerAssignedFuture.thenRun(this::startScheduling);
开始构建Task1、构建ResultPartition2、构建Input Gate
StreamExecutionEnvironment.getStreamGraph()
persistAndRunJob()
LocalInputChannel.LocalInputChannel()
true,创建带有JobVertex的StreamConfig对象
获取缓存physicalEdgesInOrder数据
sources.add(vertexID;
根据不同的实现,执行不同类的方法
Execution
添加到transformations
创建Task执行环境
初始化Task实例
创建Task的输出RecordWriter实例
StreamGraph.addEdge()
StreamingJobGraphGenerator.createJobGraph()
void startScheduling()
build(bufferWriter);
JobManagerRunnerImpl.grantLeadership()
开始部署Task
循环创建RecordWrite
runJob(jobGraph);
向 ResultPartition 添加一个 BufferConsumer, ResultPartition 会将其转交给对应的 ResultSubpartition,消费ResultSubpartition的数据
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception { InputStatus status = inputProcessor.processInput(); //note: event 处理 if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) { //note: 如果输入还有数据,并且 writer 是可用的,这里就直接返回了 return; } if (status == InputStatus.END_OF_INPUT) { //note: 输入已经处理完了,会调用这个方法 controller.allActionsCompleted(); return; } CompletableFuture<?> jointFuture = getInputOutputJointFuture(status); //note: 告诉 MailBox 先暂停 loop MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction(); //note: 等待 future 完成后,继续 mailbox loop(等待 input 和 output 可用后,才会继续) jointFuture.thenRun(suspendedDefaultAction::resume);}
如若buffer 写满了,调用 finishBufferBuilder方法,则调用requestNewBufferBuilder方法
while (isDefaultActionUnavailable() && isMailboxLoopRunning()) {\t\t\tmaybeMail = mailbox.tryTake(MIN_PRIORITY);\t\t\tif (!maybeMail.isPresent()) {\t\t\t\tlong start = System.currentTimeMillis();\t\t\t\tmaybeMail = Optional.of(mailbox.take(MIN_PRIORITY));\t\t\t\tidleTime.markEvent(System.currentTimeMillis() - start);\t\t\t}\t\t\tmaybeMail.get().run();\t\t}
setConnectionType()
start()
TaskMailboxImpl.createBatch()()
设置chain的首选资源
for (Transformation<?> transformation: transformations) { transform(transformation); }
StreamExecutionEnvironment.addSource()
DefaultExecutionVertexOperations
public void notifyDataAvailable() { notifyChannelNonEmpty(); }
SingleInputGate.notifyChannelNonEmpty()
transformLegacySource()
add()
StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.addSource();
朝好下游数据了
executionGraph.enableCheckpointing()
启动JobMaster
PipelinedSubpartition.add()
map
算子执行完成后写数据到下游
启动 JobMaster
1、获取配置文件2、kerberos认证3、*********
创建JobGraph
设置ck
FlinkPipelineTranslationUtil.getJobGraph()
Mail mail; while ((mail = queue.pollFirst()) != null) { batch.addLast(mail); }
真正的部署逻辑
如果是 map 算子,emitRecord 应该在 OneInputStreamTask.java
构造resultTransform
开始执行任务生成StreamGraph
初始inputEdges
循环所有ExecutionVertexfor (final DeploymentHandle deploymentHandle : deploymentHandles) {}
将每个JobVertex的入边集合也序列化到该JobVertex的StreamConfig中 (出边集合已经在setChaining的时候写入了
1、在这里创建生成ExecutionGraph的每个节点2、首先是进行了一堆赋值,将任务信息交给要生成的图节点,以及设定并行度等3、然后是创建本节点的IntermediateResult,根据本节点的下游节点的个数确定创建几份 4、最后是根据设定好的并行度创建用于执行task的ExecutionVertex5、如果job有设定inputsplit的话,这里还要指定inputsplits
streamGraph.addLegacySource()
返回varSecuQuotRealSource
向taskEventDispatcher注册partitionWriter
private void notifyDataAvailable() { if (readView != null) { readView.notifyDataAvailable(); } }
new StreamConfig(new Configuration());
执行应用端其他代码
分布式缓存
开始创建ExecutionEdge
createScheduler(slotPool);
ResultPartition.flushAll()
提交Job
每个source一个分区数对应一个RUNNING的线程拉取数据
MailboxProcessor.sendControlMail()
在每个并行下创建IntermediateResultPartition实例
\tmailboxProcessor.runMailboxLoop();
JobManagerRunnerImpl.start()
1、创建StreamEdge对象2、给sourceId的StreamNode输出集合添加StreamEdge3、给targetId的StreamNode输入集合添加StreamEdge
1
output.emitRecord(recordOrMark.asRecord());
CountingOutput.collect()
PartitionTransformation
return returnStream;
operator.processElement(record);
run()
ExecutionGraphBuilder
运行邮箱处理循环。这是完成主要工作的地方。
// 通过执行图的节点 ID 获取执行图的节点final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);// deploy 方法用来部署执行图节点executionVertexOperations.deploy(executionVertex);
循环拓扑图,开始创建ExecutionJobVertex
new JobMaster()
StreamGraph.addLegacySource()
createAndRestoreExecutionGraph()
启动JM
\tflushAll();
SchedulerBase.startScheduling()
jointFuture.thenRun(suspendedDefaultAction::resume);
ResultPartitionManager 会管理当前 Task 的所有 ResultPartition
false,创建带空结构的StreamConfig对象
setManagedMemoryFraction()
Task.run()
getStreamGraphGenerator().setJobName(jobName).generate();
Task task = new Task()
schedulerNGFactory.createInstance()
transform(transform.getInput());
设置InputGates
Kafka010Fetcher.runFetchLoop()
jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
添加到新mail队列
allocateSlotsAndDeploy()
jobGraph.getVerticesSortedTopologicallyFromSources();
确保能接收到RPC和异步调用
DataStream<String> origSecuQuotSource = streamEnv.addSource();origSecuQuotSource = origSecuQuotSource.rescale();SingleOutputStreamOperator<JSONObject> varSecuQuotRealSource = origSecuQuotSource .map(new UpperField2LowerCase()) .name(\"varSecuQuotRealSource\");
StreamSource.run
Kafka Source
JobVertex.connectNewDataSetAsInput()
生成StreamNode
return internalSubmitJob(jobGraph);
创建JobEdge
TaskExecutor.submitTask()
resume()1、等待 future 完成后,继续 mailbox loop(等待 input 和 output 可用后,才会继续)
new RecordWriter()
StreamGraph创建完成
final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
JobMaster.start
startJobExecution()
if (!mailbox.createBatch())
关键逻辑:运行任务
ExecutionJobVertex
Task.loadAndInstantiateInvokable()
ExecutionVertex
异步执行
new\"Partition\"
处理添加StreamEdge
递归transform方法,来获取每个算子的input算子信息,如果alreadyTransformed已经存在,不再递归
对作业顶点进行拓扑排序,并将图附加到现有的图上
StreamExecutionEnvironment.execute()
从集合中获取对应节点
处理MailboxDefaultAction事件
if (processMail(localMailbox))
缓冲区由数据
queue.addFirst(mail);notEmpty.signal();
protected void notifyChannelNonEmpty() {\t\tinputGate.notifyChannelNonEmpty(this);\t}
真正执行Task
继承
MailboxProcessor.runMailboxLoop()
Flink应用服务主类启动Start
streamEnv.addSource(FlinkKafkaConsumer010);
return transform(\"Map\
deployTaskSafe(executionVertexId);
每个并行创建节点开始
StreamGraphGenerator.generate()
1、校验作业调度状态2、启动管理器
requestNewBufferBuilder先调用父类RecordWriter的
startJobMasterServices()
queueChannel()
batch 获取 mail 执行,直到 batch 中的 mail 处理完
startJobManagerRunner()
初始化资源池的调度
初始化任务调度器,模式:EAGER
maybeMail.get().run();
afterInvoke();
new ChannelSelectorRecordWriter()
StreamTaskNetworkInput.emitNext()
OneInputStreamTask.emitRecord
schedulingStrategy.startScheduling();
将vertexConfigs存到缓存中
if (notifyDataAvailable) {\t\t\tnotifyDataAvailable();\t\t}
FutureUtils.assertNoException(\t\t\tassignAllResources(deploymentHandles).handle(deployAll(deploymentHandles)));
回调处理任务的线程,发送mail
提取输出类型
生成JobGraph
SchedulerBase.createAndRestoreExecutionGraph()
ejv.connectToPredecessors(this.intermediateResults);
回调方法
taskManagerGateway.submitTask()
InputStatus status = input.emitNext(output);
每一个ExecutionJobVertex有parallelism个ExecutionVertex;ExecutionVertex是并行任务的一个子任务
transform()\"Map\"
一个input
StreamSourceContexts.collectWithTimestamp()
currentNodeId.equals(startNodeId)
保存点恢复设置
两个input
emitRecordsWithTimestamps()
queue.pollFirst()
resetAndStartScheduler()
new PartitionTransformation
如果source端产生数据,反复回调
processInput方法: InputStatus status = inputProcessor.processInput();
return startJobMaster(leaderSessionId);
创建ResultPartitionr和InputGate详细流程
DataStream.rescale()
如果source端产生数据,反复调用
Dispatcher.submitJob()
attachJobGraph()
invokable.invoke();
StreamMap.processElement()
jobGraph
deployOrHandleError()
notifyDataAvailable = !isBlockedByCheckpoint && buffers.size() == 1 && buffers.peek().isDataAvailable(); flushRequested = buffers.size() > 1 || notifyDataAvailable;
1、dataStream.map()2、dataStream.filter()3、dataStream.process()4、dataStream.addSink()5、******
阻塞当前线程操作1、mailbox.take获取不到数据,当前线程阻塞
return builtStreamGraph;
循环 它会检测 MailBox 中是否有 mail 需要处理,如果有的话,就做相应的处理,一直将全部的 mail 处理完才会返回,只要 loop 还在进行,这里就会返回 true,否则会返回 false
new LegacySourceTransformation()
ExecutionJobVertex ejv = new ExecutionJobVertex()
加载和实例化 task 的可执行代码根据nameOfInvokableClass加载对应的类并实例化
启动RpcServer RpcService 提 供 了 启 动 RpcServer
toNotify.complete(null);
\tResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);
如果source有数据,则继续执行mail入队
jobMasterFactory.createJobMasterService()
buffers.peek()
把StreamVertex添加到集合
new ExecutionGraph()
new StreamTask()
把转换类的算子加入集合中this.transformations.add(transformation);集合中是Transformation对象
设置chain的最小资源
new FlinkKafkaConsumer010()
LegacySourceTransformation
ChannelSelectorRecordWriter.emit()
创建executionGraph
StreamGraphTranslator.translateToJobGraph()
return statelessCtor.newInstance(environment);
return executionGraph;
setSlotSharingAndCoLocation()
startSchedulingInternal();
output.collect(reuse.replace(element));
transformTwoInputTransform()
this.executionGraph = createAndRestoreExecutionGraph()
添加到的StreamGraph中
MiniCluster.submitJob()
******
提交作业jobGraph,JobGraph的提交依赖于JobClient和JobManager之间的异步通信
1、recordWriter 2、mailboxProcessor
emit
if (processMail(localMailbox)) { mailboxDefaultAction.runDefaultAction(defaultActionContext); return true; } return false;
创建JobManager
执行算子
createScheduler(jobManagerJobMetricGroup);
miniCluster .submitJob(jobGraph)
executionGraph.attachJobGraph(sortedTopology)完成
jobManagerRunnerFuture .thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner))
部署完Task后
设置slot的共享和coLocation。同一个coLocationGroup的task需要在同一个slot中运行CoLocationGroup =设置标识共存组的键。具有相同共定位键的操作符将由调度器,将它们对应的子任务放到相同的槽中
根据jobVertex的输入个数以及并行度来创建ExecutionEdge
executeAsync()
if (processMail(localMailbox))判断执行processMailTask的现场复用逻辑,processMail方法里面实现了线程阻塞
根据输出Edge的个数来创建recordWriter ,每一个输入节点都会有对应一个RecordWrite线程用来定时拉取数据到下游节点
configureCheckpointing()
setup()
创建IntermediateDataSet
new StreamSource(sourceFunction)
创建一个邮件new Mail
pushToRecordWriter(record);
if (availableChannels == 0) { inputChannelsWithData.notifyAll(); toNotify = availabilityHelper.getUnavailableToResetAvailable(); }if (toNotify != null) { toNotify.complete(null); }
deployAll(deploymentHandles)
StreamOneInputProcessor.processInput()
ResultPartition.setup()
从queue获取
创建执行图对象executionGraph
根据JobVertex的IntermediateDataSet来创建ExecutionGraph的中间结果数据集IntermediateResult节点,每一个JobVertex可能有n(n=0)个IntermediateDataSet -- n个IntermediateResult
mailbox.take() notEmpty.await();
PerJobMiniClusterFactory.submitJob()
ExecutionGraphBuilder.buildGraph()
EagerSchedulingStrategy
循环算子列表,生成SteamGraph
InputChannel.notifyChannelNonEmpty()
new PhysicalTransformation()
RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output = new RecordWriterBuilder<SerializationDelegate<StreamRecord<OUT>>>() .setChannelSelector(outputPartitioner) .setTimeout(bufferTimeout) .setTaskName(taskName) .build(bufferWriter); output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
ResultPartition.addBufferConsumer()
死循环从source拉取数据
StreamTask.invoke()
1、创建JobEdge2、把JobEdge放入到缓存Iinputs中3、创建IntermediateDataSet和Jobedge关系4、设置physicalEdgesInOrder
JobMaster
deploy();
new ChannelSelectorRecordWriter
LocalExecutor.execute()
beforeInvoke();这里执行算子的open方法
recordWriter.emit(serializationDelegate);
\trunMailboxLoop();
获取上一个task的输出数据为下一个task的输入数据
生成当前节点的显示名,如:timeDelayStartDS - Sink: timeDelayStartSink
设置 job execution 配置
添加到sources集合
userFunction.run(ctx);
根据同步的分布模式创建ExecutionEdge
RpcTaskManagerGateway.submitTask()
从缓存中获取JobVertex
rescale()
FlinkKafkaConsumerBase.run
verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
new DefaultScheduler()
1、对于每个不可连接的StreamEdge,则将对应的StreamEdge就是当前链的一个输出StreamEdge,所以会添加到transitiveOutEdges 2、添加到transitiveOutEdges这个集合然后递归调用其Target node,注意,startNodeID变成了nonChainable这个 3、StreamEdge的输出节点id,chainIndex也赋值为0,说明重新开始一条链的建立
new RpcEndpoint()
读取配置等
创建DataStreamSource\"Custom Source\"
queue是邮件对队列,获取mail
创建定期刷新输出的线程outputFlusher,timeout=100轮询一次
1、之前已经为 IntermediateResult 添加了 consumer,2、这里为 IntermediateResultPartition 添加 consumer,即关联到 ExecutionEdge 上
rpcService.startServer(this);
new JobManagerRunnerImpl()
schedulerNG.startScheduling();
Task的输入buffer出队返回队列第一个元素
StreamGraph.addOperator()
SingleInputGate.setup()
DefaultJobManagerRunnerFactory.createJobManagerRunner()
为每个JobVertex的出边集合设置序列化到该JobVertex的StreamConfig中
设置相关参数相关代码
SourceStreamTask
currentNodeId.equals(startNodeId通过StreamEdge构建出JobEdge,创建 IntermediateDataSet ,用来将JobVertex和JobEdge相连)
createAndAddResultDataSet()
返回
origSecuQuotSource = origSecuQuotSource.rescale();
createBatch()
TaskMailboxImpl.putFirst()
task.startTaskThread();
jobManagerRunner.start();
PipelinedSubpartition.flush()
生成StreamGraph
for (int i = 0; i < numTaskVertices; i++) { ExecutionVertex vertex = new ExecutionVertex()}
1、包含了从 Execution Graph 到真正物理执行图的转换。2、比如将 IntermediateResultPartition 转化成 ResultPartition,3、ExecutionEdge 转成 InputChannelDeploymentDescriptor(最终会在执行时转化成InputGate)。
inputProcessor.processInput();
开始创建ExecutionGraph
创建StreamNode
createSlotPool(jobGraph.getJobID())
createBatch()
其他
ExecutionGraph
copyFromSerializerToTargetChannel()
真正提交jobGraph
runFetchLoop()
start();
创建ResultPartitionWriter和InputGate
addBufferConsumer()
Optional<Mail> maybeMail; while (isMailboxLoopRunning() && (maybeMail = mailbox.tryTakeFromBatch()).isPresent()) { maybeMail.get().run(); }
jobMasterService.start(new JobMasterId(leaderSessionId));
new JobMaster创建JobMaster的rpc通信RpcEndpoint 定义了一个 Actor 的 路 径
GrantLeadershipCall.run()
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
AbstractFetcher.emitRecordsWithTimestamps()
waitForAllSlotsAndDeploy(deploymentHandles);
StreamTask.processInput()
如果工程不挂则一直循环
input Gate对应的bufferPool
创建JobMaster
1span style=\"font-size: inherit;\
executionGraph.attachJobGraph(sortedTopology);
对排序后的拓扑结构生成ExecutionGraph内部的节点和连接
创建stream环境
RecordWriter.emit()
streamGraph.addOperator()
Task 通过 RecordWriter 将结果写入 ResultPartition 中,主要流程1.通过 ChannelSelector 确定写入的目标 channel2.使用 RecordSerializer 对记录进行序列化3.向 ResultPartition 请求 BufferBuilder,用于写入序列化结果4.向 ResultPartition 添加 BufferConsumer,用于读取写入 Buffer 的数据
调度
启动调度
1、获取queue的数据;2、如果没有数据则阻塞当前Task
设置托管内存权重比(托管内存是由 Flink 管理的用于排序、哈希表、缓存中间结果及 RocksDB State Backend 的本地内存。)
SingleOutputStreamOperator<JSONObject> varSecuQuotRealSource = origSecuQuotSource .map(new UpperField2LowerCase()) .name(\"varSecuQuotRealSource\");
this.producedDataSets = new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];
XXX.Main
streamEnv.execute();
0 条评论
回复 删除
下一页