Flink运行原理和算子逻辑
2024-05-23 21:18:20 1 举报
flink 运行原理和算子逻辑
作者其他创作
大纲/内容
data stream
storagecheckpoint
task slot
job.jar
XXXprocessFunction
1、保存checkpoint
Heap
处理节点 n(归因、dim...)
1、提交应用程序DAG有向无环图
3 TX 1 commit
container-1
state ID-100
5 收集完所有后就写入一个 completed checkpoint meta
bucket_1窗口 4:00 ~ 5:00空窗
actor system(Akka)
滚动 tumbling window固定长度、没有重叠timewindow(time)
sideOutputLateData
9
checksum
read commited
一个 Flink 程序由多个任务组成(source、transformation和 sink)。 一个任务由多个并行的实例(线程)来执行。 一个任务的并行实例(线程)数目就被称为该任务的并行度 一个任务的多个并行实例即多个 subtask, 每个substask由独立的线程执行,任务链优化后多个不同的 subtask 会组成 operator-chain 在一个 slot 中执行。所以当多个 subtask(线程)同用一个 slot 时,每个 subtask 占用 CPU core 的时间是被切割的。 举例: 3 个 subtask 被分到一个 slot 里, slot 被分配了一个 cpu core ,subtask 1 处理时占用一个 cpu core ,剩下 subtask 2 和 3 只能等待 cpu core 处理完后 subtask 1 的任务后,再处理。所以规划 slot 数量时,不仅要考虑 cpu core 的数量和 slot 数量,还需要考虑 subtask 的数量以及 operator chain 中有哪些 subtask。 每个TaskManager每个核同时能跑一个task,所以增加了TaskManager的个数相当于增大了任务的并发度。在资源充足的情况下,可以相应增加TaskManager的个数,以提高运行效率
更新ACCsum & count
window
全局链路健康状态
执行 dataflow graph 的数据交换
merge合并 ACC
resource manager
local state
每个task manager的slot数量配置numberOfTaskSlots:3推荐 number of CPU cores
滚动.countWindow(size)
算子1
分配容器
7
2
8
2、4、6 已处理完
windowFunctionapply 聚合
execution graph调度层级job manager 生成
6、根据申请发出提供slot的指令
datastream
collector
client
consume
2 checkpoint completed
GlobalStateGraph ID-100
- source1 state_id code desc- source1 state_id code desc
- map#1 state_id code desc- map#2 state_id code desc-------------------------- sink#1 state_id code desc
resource manager(flink)如果有资源管理平台yarn、k8s、mesos
基数
sink 1
processElement
5
6、写入
node manager 2
process
算子
从 event spanContext 反序列化
1 CheckPointTriggerID-100
barrier 分隔事务每一个 barrier 就是一个事务
5、注册 slots
SDK
11
Application Master
接收请求
task
2、启动job提交应用
keyBy
设置WM 的时间来源(时间中的timestamp)timeCharacteristic.eventTime设置WM 延迟:assignTimestampAndWatermarks( BoundedOutOrdernessTimestampExtractor(time.seconds(2)) )
回传
1、3、5已处理完
dispatcher
1、提交应用
application master
状态同步
trace
physical graph“服务端”层级task 上运行
HDFS
sink
8、提交具体的task取消/暂停checkpoint
stream graph代码层级client 生成
source
4
task manager
KAFKA
创建ACCcreateAccumulator
m
event'
job manager
假设算子有两个上游输入1. 算子收到通道 1 的Barrier,没收到通道 2 的 Barrier ,算子会继续接收通道 1 的数据,但不处理,直接保存的输入缓存中,等待通道 2 的 Barrier 2. 通道 2 的 Barrier 到达后,算子开始对其 State 进行异步快照,并将 Barrier 向下游传播,不用等待快照完成 3. 算子异步快照时,算子首先处理缓存中的数据,在从通道中接受数据
4、在 container 中启动 TM
netty
barrier ID-100
nginxlog
不会关窗,但会触发计算结果
1 TX 1 pre
3
Slot 1
taskA-1
1、重新读取或消费2、如果是 ListState 有可能会根据并行度重新分配
bucket_2窗口 3:00 ~ 4:00
上报
算子1 + 2
pre log
task manager (worker) JVM 进程
不会关窗
sum
rolling aggregation,sum、min、max 返回单一聚合字段;minBy、maxBy 返回其他辅助字段Reduce,对 keyedStream 进行split,打 tag,配合 select 进行提取connect & coMap, datastream + datastream [connect]-> connectedStreams [map coMapFunction ] -> 合流union,连接多条同类型datastreamrichFunction,通过 open close 控制 function 生命周期,通过 runtimeContext 等获取运行时上下文等信息分区,keyby、Rebalance、shuffle、partitionCustom自定义---------窗口:datastream -> keyby -> timeWindow -> [trigger] -> [evictor] -> [allowedLateness] -> [sideOutputLateDate] -> aggregation function收集延迟数据:SingleOutputStreamOperator.getSideOutPut.sink 之后通过其他批处理去补偿操作---------状态后端配置:memory、hdfs、rocksDB检查点配置:enableCheckpointing、setCheckpointMode、setCheckpointTimeout、setMaxConcurrentsetCheckpoints、minPauseBetweensetCheckpoints、prefersetCheckpointForRecovery、tolerablesetCheckpointFailureNumber重启策略:restartStrategy------------genericWriteAHeadSink,把结果数据当成状态保存,等收到 checkpoint 完成通知时,再写入真正的 sinktwoPhaseCommitSinkFunction,2PC 事务提交,将事务提交和 checkpoint 绑定在一起,完成 end2end
每个TaskManager有一个槽意味着每个任务组在单独的JVM中运行(例如,JVM可以在单独的容器中启动)。拥有多个槽意味着更多子任务共享同一个JVM。相同JVM中的任务共享TCP连接(通过多路复用)和心跳消息。它们还可以共享数据集和数据结构,从而减少每个任务的开销。
增量聚合aggregate / reduce
2PC + barrier 保证消息不丢失
bucket_3窗口 2:00 ~ 3:00
windowall
诊断分析
业务处理
会话特点:时间无对齐.window(eventTimeSessionWindows.withGap(time))
5、返回 containers2-1、3-1、3-2
编写算子和流程
job graph“编译”层级client 生成
singleOutputStream
resultSubPartition 3
设计分布式计算考虑的点:网络磁盘计算和存储节点划分数据交换并行度设计执行计划无界关注吞吐量和低延时有界的处理时间可以度量
6、启动并初始化 container8、对任务监控管理
算子2
数据源 2
做 CP
6
allowedLateness
合并关键步骤
1-2-1 forwarding模式(窄依赖)算子:filter、source、map、flatmap
SlotPool
dataFlow 转换为 graph 流程
windowAssigner 分发到window中
chandy-lamport 分布式快照
taskA-2
2、提交 job
生命周期open / close
node manager
5、启动 Task Manager
Off-Heapmemory segmentmemory 段bufferbuffer pool
2、task 故障
container
TaskManager
source 2
time window
SlotManager
backpressure反压与流控优化
15
avg
Coordinator
1
flink 故障恢复依靠的就是 checkpoint 机制checkpoint 其实就是所有任务的状态在某个时刻的 snapshot这个时间点恰好是所有任务都处理完一个相同的输入(source)数据的时候
2 先传递barrier
code
window 0~10s 延迟 2s
关窗,单独处理迟到数据
WM = ??触发窗口计算
DataStream
connectedStream
// Keyed Windowstream .keyBy(...) <- 按照一个Key进行分组 .window(...) <- 将数据流中的元素分配到相应的窗口中 [.trigger(...)] <- 指定触发器Trigger(可选) [.evictor(...)] <- 指定清除器Evictor(可选) [.allowedLateness] <- 迟到数据时长,还算做窗口内的数据 [.sideOutputLateDate] <- 超过迟到数据,打迟到tag,已不算窗口内数据, .reduce/aggregate/process() <- 窗口处理函数Window Function [.getSideOutPut] <- 获取迟到tag的数据进行后续批处理// Non-Keyed Windowstream .windowAll(...) <- 不分组,将数据流中的所有元素分配到相应的窗口中 [.trigger(...)] <- 指定触发器Trigger(可选) [.evictor(...)] <- 指定清除器Evictor(可选) [.allowedLateness] <- 迟到数据时长,还算做窗口内的数据 [.sideOutputLateDate] <- 超过迟到数据,打迟到tag,已不算窗口内数据, .reduce/aggregate/process() <- 窗口处理函数Window Function [.getSideOutPut] <- 获取迟到tag的数据进行后续批处理
在计算之前要创建一个新的ACC,这时ACC还没有任何实际表示意义,当有新数据流入时,Flink会调用add方法,更新ACC,并返回最新的ACC,ACC是一个中间状态数据。当有一些跨节点的ACC融合时,Flink会调用merge,生成新的ACC。当所有的ACC最后融合为一个ACC后,Flink调用getResult生成结果
数据源
3、启动 AM
TX 2
4、申请任务所需要的资源
封装
source 1
处理节点 n(数据处理、归因、dim...)
snapshot:source 读取完 5sum 处理完 5map 不处理 5,也代表完成
Slot 2
9、RPC 汇报进度状态
checkpoint file ID-100
- source1 offset1000- source2 offset500
- map#1 state=100- map#2 state=??-------------------------- sink#1 state=??
4、继续处理
yarn client
TM 之间只建立一个 netty 网络连接
AllWindowedStream
上下文context
operator chains 优化,节省通信开销1、相同 slot 共享组2、1-2-1模式3、相同并行度
算子3
诊断工具
JobManager
ProcessWindowFunction相比AggregateFunction和ReduceFunction的应用场景更广,能解决的问题也更复杂。但ProcessWindowFunction需要将窗口中所有元素作为状态存储起来,这将占用大量的存储资源,尤其是在数据量大窗口多的场景下,使用不慎可能导致整个程序宕机。比如,每天的数据在TB级,我们需要Slide为十分钟Size为一小时的滑动窗口,这种设置会导致窗口数量很多,而且一个元素会被复制好多份分给每个所属的窗口,这将带来巨大的内存压力。
3、申请 slot 资源
并行度的设置:一个任务的并行度设置可以从多个层次指定* Operator Level(算子层次)* Execution Environment Level(执行环境层次)* Client Level(客户端层次)* System Level(系统层次)
coMap
4 上报 state 地址
3 上报
node manager 1
下游 App
resource manager(yarn)
event
job manager(standby)
窗口内所有数据计算 + Context处理
数据传输形式
watermark 是带着 timestamp 的特殊数据watermark 就是数据间隔最大时间,也就是允许的乱序程度在各窗口中,watermark 向下游所有 slot 同步最小记录,同理 kafka HWAssignerWithPunctuatedWatermarks:间断性生成,每个事件后面跟一个 WM,好处:实时性高,坏处:大小翻倍,适用:稀疏数据AssignerWithPeriodicWatermarks:周期性生成 setautowatermarkinterval,好处:不会增大数据,坏处:延迟,适用:稠密数据时区问题通过 offset time.hour(-8)
1、3、5 已处理完
合并
map 1
task执行计划
actor system
abstractRichFunction
算子4
4、申请资源
7、保持心跳
WM = 15 触发窗口计算
2 先传递 event
count window
dataflowgraph
CheckPointCoordinator
state manager
7、提供 slot
input buffer queue等待所有相同 barrier ID 到达后才能处理 data
迟到数据
生成GlobalStateGraph
recordA traceid 100
- source1 span1 parentSpan0- source1 span2 parentSpan0
- map#1 span3 parentSpan1- map#2 span4 parentSpan3-------------------------- sink#1 span5 parentSpan4
2、4 已处理完
10、注销并允许属于AM的container被回收
窗口函数
ProcessAllWindowFunction和ProcessFunction类相似,都是用来对上游过来的元素做处理,不过ProcessFunction是每个元素执行一次processElement方法,ProcessAllWindowFunction是每个窗口执行一次process方法(方法内可以遍历该窗口内的所有元素)ProcessWindowFunction和KeyedProcessFunction类似,都是处理分区的数据,不过KeyedProcessFunction是每个元素执行一次processElement方法,而ProcessWindowFunction是每个窗口执行一次process方法(方法内可以遍历该key当前窗口内的所有元素)
中间结果state
map 2
记录窗口内所有数据
watermark 的 timestamp 从事件中提取 eventTime 的场景
connect(stream)
keyedStream
全窗口 (拿到更多信息)process /apply
onTimer
memoryManager
watermark
windowStream
处理节点 n
resultSubPartition 2
3 做CP
偶数
taskA-3
多个操作算子可能在一个任务槽中执行每个线程执行一个task,意味着三个线程在同一个 slot 竞争资源
窗口内全部数据
6、加载 Flink jar 配置构建环境
2、启动 AM
s
制定并行执行计划
1 detect eventID-100
task slot 分配内存1/2
1、上传jar
task slot 数量 = 并发处理任务的数量 = job 最高并行度原因: 1、相同的算子放到同样的slot中,就无法实现并行,所以不同类型的算子才可以实现 slot 共享。比如 source-map-keyby-sink 可以共享 slot 2、默认开启共享 slot ,如果不想共享可以进行分组 3、所以一个job 的并行度 = MAX(各算子并行度)
redistributing 模式(宽依赖)算子:Rebalance、shuffle、hash(keyby)、rescale
task slot 分配内存1/2
window 10~20s 延迟 2s
3、重启应用或 region 重启从 checkpoint 读取拓扑和状态,并重置各 task 状态
node manager 3
aggregate
source 在数据流中插入 barrier
KUDUads_trace_table
resultSubPartition 1
checkpoint
bucket_4窗口 1:00 ~ 2:00
getResult
JM
task manager (worker)
3、注册 / 心跳
1.ResourceManager(RM)RM是一个全局的资源管理器,管理整个集群的计算资源,并将这些资源分配给应用程序。包括:与客户端交互,处理来自客户端的请求启动和管理ApplicationMaster,接收来自ApplicationMaster 的资源申请请求,并为之分配资源,并在它运行失败时重新启动它管理NodeManager ,接收来自NodeManager 的资源汇报信息,并向NodeManager下达管理指令资源管理与调度,2.ApplicationMaster(AM)应用程序级别的,管理运行在YARN上的应用程序。包括:用户提交的每个应用程序均包含一个AM,它可以运行在RM以外的机器上。负责与RM调度器协商以获取资源(用Container表示)将得到的资源进一步分配给内部的任务(资源的二次分配)与NM通信以启动/停止任务。监控所有任务运行状态,并在任务运行失败时重新为任务申请资源以重启任务3.NodeManager(NM)YARN中每个节点上的代理,它管理Hadoop集群中单个计算节点。包括:启动和监视节点上的计算容器(Container)以心跳的形式向RM汇报本节点上的资源使用情况和各个Container的运行状态(CPU和内存等资源)接收并处理来自AM的Container启动/停止等各种请求4.ContainerContainer是YARN中资源的抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等。Container由AM向RM申请的,由RM中的资源调度器异步分配给AM。Container的运行是由AM向资源所在的NM发起。一个应用程序所需的Container分为两大类:(1) 运行AM的Container:这是由RM(向内部的资源调度器)申请和启动的,用户提交应用程序时,可指定唯一的AM所需的资源;(2) 运行各类任务的Container:这是由AM向RM申请的,并由AM与NM通信以启动之。以上两类Container可能在任意节点上,它们的位置通常而言是随机的,即AM可能与它管理的任务运行在一个节点上。
container-2
0 条评论
下一页