Flink
2023-03-27 15:20:39 0 举报
AI智能生成
登录查看完整内容
对于flink最详细的总结,结合本人实际生产开发经验!需要的拿走
作者其他创作
大纲/内容
keyed&non-keyed
1、事件时间是每个事件在其生产设备上发生的时间。这个时间通常在记录进入flink之前嵌入在记录中,并且可以从每个记录中提取事件时间戳。2、在事件时间中,时间的进展取决于数据,而不是任何挂钟。事件时间程序必须指定如何生成事件时间水印,这是表示事件时间进度的机制。3、事件时间指的是数据本身携带的时间。这个时间是在事件产生时的时间。4、事件时间对于乱序、延时、或者数据重放等情况,都能给出正确的结果
事件时间(Event Time)
摄入时间指的是数据进入flink的时间
摄入时间(Ingestion Time)
1、处理时间是指正在执行相应操作的机器的系统时间2、当流式程序在基于处理时间运行时,所有基于时间的操作(如时间窗口)都将使用运行相应的机器的系统时钟。3、每小时处理时间窗口将包括在系统时钟指示整小时的时间之间到达特定操作员的所有记录。4、处理时间是最简单的时间概念,不需要流和机器之间的协调。它提供了最佳的性能和最低的延迟。但是,在分布式和异步环境中,处理时间不提供确定性,因为它容易受到记录到达系统的速度(例如,从消息队列)、记录在系统内的操作员之间流动的速度以及中断(计划的或其他)的影响。
处理时间(Processing Time)
时间概念
时间语义
1、在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。2、Windows是flink处理无限流的核心,Windows将流拆分为有限大小的“桶”,我们可以在其上应用计算。Flink认为Batch是Streaming的一个特例,所以flink底层引擎是一个流式引擎,在上面实现了流处理和批处理。而窗口就是从Streaming到Batch的一个桥梁。3、窗口分类:基于时间划分驱动。例如:每30秒钟 基于数据数量驱动。例如:每100个元素,与时间无关
基本概念
keyed streams要调用keyBy(...)后再调用window(...)对于keyed stream,其中数据的任何属性都可以作为key。属于同一个key的元素会被发送到同一个task使用keyed stream允许你的窗口计算由多个task并行,因为每个逻辑上的keyed stream都可以被单独处理。
keyed
non-keyed streams只用直接调用windowAll(...)对于non-keyed stream,原始的stream不会被分割为多个逻辑上的stream所以所有的窗口计算都会被同一个task完成,也就是parallelism为1.
non-keyed
1、计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口2、相当于座位有限、“人满就发车”,是否发车与时间无关。每个窗口截取数据的个数,就是窗口的大小。3、计数窗口相比时间窗口就更加简单,我们只需指定窗口大小,就可以把数据分配到对应的窗口中了。
Count Window
1、滚动窗口的assigner分发元素到指定大小的窗口。滚动窗口的大小时固定的,且各自范围之间不重叠。2、滚动计数窗口只需要传入一个长整型的参数size,表示窗口的大小3、使用场景:适用于按照指定的周期来统计指标,比如说,每两个事件做一次统计
Tumbling Window
1、滑动窗口的assigner分发元素到指定大小的窗口,窗口大小通过window size参数设置。2、滑动窗口需要传入两个参数:size和slide,前者表示窗口大小,后者表示滑动步长。3、如果slide小于窗口大小,滑动窗口可以允许窗口重叠。在这种情况下,一个元素可能会被分发到多个窗口4、例如:每5个事件做一次统计,每次滑动长度2
Sliding Window
coutWindowAll数量窗口(不区分数量滚动窗口【滑动窗口与滚动窗口的区别,在于滑动窗口会有元素重叠可能,而滚动窗口不存在元素重叠】)
Window All
1、会话窗口的assigner会把数据按活跃的会话分组2、与滚动窗口和滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间3、会话窗口的assigner可以设置固定的会话间隔(session gap)或用session gap extractor函数来动态地定义多长时间算作不活跃4、当超出了不活跃地时间段,当前地会话就会关闭,并且将接下来地数据分发到新的会话窗口
Session Windowon
窗口函数
1、窗口将数据收集起来,最基本的处理操作当然就是进行聚合。窗口对无限流的切分,可以看作得到了一个有界数据集。如果我们等到所有数据都收集齐,在窗口到了结束时间要输出结果的一瞬间再去进行聚合,显然就不够高效2、为了提高实时性,我们可以再次将流处理的思路发扬光大:就像 DataStream 的简单聚合一样,每来一条数据就立即进行计算,中间只要保持一个简单的聚合状态就可以了;区别只是在于不立即输出结果,而是要等到窗口结束时间。等到窗口到了结束时间需要输出计算结果的时候,我们只需要拿出之前聚合的状态直接输出,这无疑就大大提高了程序运行的效率和实时性。
增量聚合函数
1、全量窗口函数需要对所有进入该窗口的数据进行缓存,等到窗口触发时才会遍历窗口内所有数据,进行结果计算。2、因为有些场景下,我们要做的计算必须基于全部的数据才有效,这时做增量聚合就没什么意义了;另外,输出的结果有可能要包含上下文中的一些信息(比如窗口的起始时间),这是增量聚合函数做不到的。所以,我们还需要有更丰富的窗口计算方式,这就可以用全窗口函数来实现。
全窗口聚合函数
Flink Window Functions
水位线就是用来度量事件事件水位线是数据流中的一部分,随着数据一起流动,在不同任务之间传输水位线可以看作是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间水位线插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。
水位线意义
周期性的生成watermark,默认周期是200ms,也可以通过setAutoWatermarkInterval设置周期时间AssignerWithPunctuatedWatermarks阶段性的生成 watermark,即每来一条数据就生成一个wm
AssignerWithPeriodicWatermarks
1.10版本之前
单调递增策略固定乱序长度策略不生成策略
WatermarkStrategy
1.11版本以后
内置水位线生成器
周期性调用方法中发出水位线Periodic Generator周期性生成器一般是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线
周期性水位线
在事件触发的方法中发出水位线定点式生成器会不停地检测onEvent()中地事件,当发现带有水位线信息地特殊事件时,就会立即发出水位线。WatermarkGenerator 接口中有两个方法onEvent():在每个事件到来时调用onPeriodicEmit():由框架周期性调用
定点式水位线
实际应用中上下游都有多个并行子任务,为了统一推进事件时间的进展,要求上游任务处理完水位线、时钟改变之后,将当前水位线广播给所有下游任务上游并行子任务发来不同的水位线,当前任务会为每一个分区设置一个分区水位线,当前任务时钟取最小水位线。
水位线传递
自定义水位线
Flink WaterMark
现实中很难生成一个完美的水位线,水位线就是在延迟与准确性之前做的一种权衡。如果生成的水位线过于紧迫,即水位线可能会大于后来数据的时间戳,这就意味着数据有延迟,关于延迟数据的处理,Flink提供了一些机制,具体如下:allowedLateness、sideOutputLateData
简述
flink提供了WindowedStream.allowedLateness()方法来设定窗口的允许延迟正常情况下窗口触发计算完成之后就会被销毁,但是设定了允许延迟之后,窗口会等待allowedLateness的时长后开始计算并销毁。在该区间内的迟到数据仍然可以进入窗口中,并触发新的计算什么情况下数据会被丢弃或者说不会被计算?未设置allowedLateness情况下,某条数据属于某个窗口,但是watermark超过了窗口的结束时间,则该条数据会被丢弃;设置allowedLateness情况下,某条数据属于某个窗口,但是watermark超过了窗口的结束时间+延迟时间,则该条数据会被丢弃;
allowedLateness
保底方案,数据延迟严重,可以保证数据不丢失延迟的数据通过outputTag输出,必须要事件时间大于watermark+allowed lateness,数据才会存储在outputTag中
sideOutputLateData
延迟数据
无状态的算子任务只需要观察每个独立事件,根据当前输入的数据直接转换输出结果如map、filter、flatmap,计算时不依赖其他数据,就属于无状态算子
无状态算子
当前数据之外,还需要一些其他数据来得到计算结果。这里的“其它数据”就是所谓的状态比如,做求和(sum)计算时,需要保存之前所有数据的和,这就是状态。
有状态算子
离线任务失败:重启任务,然后重新读一遍输入数据,最后把昨天数据重新计算一遍即可。实时任务失败:重启任务,然后重新读一遍输入数据,最后把昨日数据重新计算一遍不就可以了。因为实时任务第一重要的就是时效性,很明显重新计算违背了实时计算的原则。
为什么需要状态
状态定义
原始状态则是自定义的,相当于就是开辟了一块内存,需要我们自己管理,实现状态的序列化和故障恢复。flink不会对状态进行任何自动操作,也不知道状态的具体数据类型,只会把它当作最原始的字节数据来存储
原始状态
托管状态就是由flink统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由flink实现,我们只需要调用接口即可。托管状态是由flink的运行时来托管的,在配置容错机制后,状态会自动持久化保存,并在发生故障时自动恢复。当应用发生横向扩展时,状态也会自动地重组分配到所有的子任务实例上。
托管状态
托管方式
状态作用范围限定为当前的算子任务实例,也就是只对当前并行子任务实例有效这就意味着对于一个并行子任务,占据了一个“分区”,它所处理的所有数据都会访问到相同的状态,状态对于同一任务而言是共享的。算子状态也可以用在所有算子上,使用的时候其实就跟一个本地变量没什么区别。因为本地变量的作用域也是当前任务实例。在使用时,我们还需进一步实现CheckpointedFunction接口。
算子状态
状态是根据输入流中定义的键(key)来维护和访问的,所以只能定义在按键分区流(keyedStream)中,也就keyBy之后才可以使用聚合算子必须在keyBy之后才能使用,就是因为聚合的结果是以keyed state的形式保存的。
键分区状态
状态类型
状态后端(State Backends)的作用就是用来维护State的,状态本质上就是数据状态后端主要有两周形式的状态管理:1、直接将State以对象的形式存储到JVM的堆上面2、将State对象序列化后存储到RocksDB中Remote State CheckpointingState Backends提供了State Checkpointing的功能,将TaskManager本地的State的备份到远程的存储介质上,可以是分布式的存储系统或者数据库。状态后端的主要作用包括在每一个TaskManager节点上存储和管理状态,将状态进行远程备份两个部分
状态后端
MemoryStateBackend基于内存存储FsStateBackend基于文件存储本地路径,其格式为“file:///data/flink/checkpoints”HDFS路径,其格式为“hdfs://nameservice/flink/checkpoints”RocksDBStateBackend基于RocksDB存储RocksDB 是一个 key/value 的内存存储系统,类似于HBase当写数据时会先写进write buffer(类似于HBase的memstore),然后在flush到磁盘文件,当读取数据时会现在block cache(类似于HBase的block cache),所以速度会很快。
flink1.13版以前
flink1.13版本以后
存储方式
flink可以对状态数据进行存活时长管理,即\"新城代谢\"淘汰的机制主要是基于存活时间(Time To Live);存活时长的计时器可以在数据被读、写时重置TTL存活管理粒度是到元素级的
setTtl:表示状态时间戳的过期时间setUpdateType:表示状态时间戳的更新的时机,是一个Enum对象setStateVisibility:表示对已过期但还未被清理掉的状态如何处理,也是Enum对象ttlTimeCharacteristic:表示StateTTL功能所适用的时间模式,仍然是Enum对象cleanupStrategies:表示对象的清理策略,目前来说有三种Enum值
相关参数
状态TTL
Flink算子状态
对于两条流的合并,很多情况我们不是简单地将所有数据放在一起,而是希望根据某个字段的值将它们联结起来,“配对”去做处理
简介
执行翻滚窗口时,具有公共键和公共翻滚窗口的所有元素将作为成对组合联接,并传递给JoinFunction或FlatJoinFunction因为它的行为类似于内部连接,所以一个流中的元素在其滚动窗口中没有来自另一个流的元素,因此不会被发射
Tumbling Window Join
在执行滑动窗口连接时,具有公共键和公共滑动窗口的所有元素将作为成对组合连接,并传递给JoinFunction或FlatJoinFunction在当前滑动窗口中,一个流的元素没有来自另一个流的元素,则不会发射!
Sliding Window Join
在执行会话窗口联接时,具有相同键(当“组合”时满足会话条件)的所有元素以成对组合方式联接,并传递给JoinFunction或FlatJoinFunction。同样,这执行一个内部连接,所以如果有一个会话窗口只包含来自一个流的元素,则不会发出任何输出!
Session Window Join
join
除了输出匹配的元素以外,未能匹配的元素也会输出用于DataStream时返回是CoGroupStreams,用于DataSet时返回时CoGroupOperatorSets
CoGroup
在有些场景下,我们需要处理的时间间隔可能并不是固定的。比如,在交易系统中,需要实时地对每一笔交易进行核验,保证两个账户转入转出数额相等,也就是所谓地“实时对账”两次转账地数据可能写入了不同的日志流,它们的时间戳应该相差不大,所以我们可以考虑只统计一段时间内是否有出账入账的数据匹配
业务需求
实现原理
间隔联结在代码中,是基于KeyedStream的联结(join)操作。DataStream在keyBy得到KeyedStream之后,可以调用.intervalJoin()来合并两条流,传入的参数同样是一个KeyedStream,两者的key类型应该一致;得到的是一个IntervalJoin类型。后续的操作同样是完全固定的:先通过.between()方法指定间隔的上下界,再调用.process()方法,定义对匹配数据对的处理操作。调用.process()需要传入一个处理函数,这是处理函数家族的最后一员:“处理联结函数”ProcessJoinFunction。
代码实现
Interval Join
Flink窗口联结
flink是一个stateful(带状态)的数据处理系统;系统在处理数据的过程中,各算子所记录的状态会随着数据的处理而不断变化一旦系统崩溃,需要重启后能够恢复出崩溃前的状态才能进行数据的继续处理因此,必须要一种机制能够对系统内的各种状态进行持久化容错。Flink-EOS:Exactly-Once Semantics指端到端的一致性,从数据读取,引擎计算,写入外部存储的整个过程中,即使机器或软件出现故障,都确保数据仅处理一次,不会重复,也不会丢失一条(或者一批)数据,从注入系统,中间处理,到输出结果的整个流程中,要么每个环节都处理成功,要么失败回滚。
有可能会有数据丢失这本质上是最简单的恢复方式,也就是直接从失败处的下个数据开始恢复程序,之前的失败数据处理就不管了。可以保证数据或事件最多由应用程序中的所有算子处理一次,这意味着如果数据在被流应用程序完全处理之前发生丢失,则不会进行其他重试或者重新发送
At-most-once
有可能重复处理数据应用程序中的所有算子都保证数据或事件至少被处理一次。这通常意味着如果事件在流应用程序完全处理之前丢失,则将从源头重放或重新传输事件。然而,由于事件是可以被重传的,因此一个事件有时会被处理多次,至于有没有重复数据,不会关心,所以这种场景需要人工干预自己处理重复数据
At-least-once
每一条消息只被流处理系统处理一次即使是在各种故障的情况下,流应用程序中的所有算子都保证事件只会被精确一次处理。flink实现精确一次的分布式快照/状态检查点方法受到Chandy-Lamport分布式快照算法的启发。流应用程序中每个算子的所有状态都会定期做checkpoint如果是在系统中的任何地方发生失败,每个算子的所有状态都回滚到最新的全局一致checkpoint点在回滚期间,将暂停所有处理,源也会重置为与最近checkpoint相对应的正确偏移量。整个流应用程序基本上是回到最近一次的一致状态,然后程序从该状态重新启动。
Exactly-once
End-to-End Exactly-Once
Flink容错机制
Flink分布式快照里面的一个核心元素就是流屏障。这些屏障会被插入到数据流中,并作为数据流的一部分随着数据流动。屏障不会持有任何数据,而是和数据一样线性的流动。可以看到屏障将数据流分成了两部分数据(实际上是多个连续的部分)。一部分是当前数据的快照数据,一部分下一个快照的数据。每个屏障会带有它的快照id,这个快照的数据都在这个屏障的前面。如果是多个输入数据流,多个数据流的屏障会被同时插入到数据流中,快照n的屏障被插入到数据流的点(我们称之为Sn),就是数据流中一直到的某个位置,也就是包含这部分数据的快照。然后屏障开始向下流动,当一个中间的operator收到它的所有输入源的快照n屏障后,它就会向它所有的输出流发射一个快照n的屏障,一旦一个sink的operator收到所有输入数据流的屏障n,它就会向checkpoint的协调器发送快照n确认。当所有的sink都确认了快照n,系统才认为当前快照的数据已经完成
数据分割
等到上游所有的并行子分区barrier都到齐,才去保存当前任务的状态缺点:先到达的分区要做缓存等待,会造成数据堆积
barrier对齐
数据屏障
任务启动
JobManager 根据 Checkpoint 间隔时间,启动 Checkpoint。此时会给每个 Source 发送一个 barrier 消息,消息中的数值表示 Checkpoint 的序号,每次启动新的 Checkpoint 该值都会递增。
启动checkpoint
当Source接收到barrier消息,会将当前的状态(Partition、Offset)保存到StateBackend,然后向 JobManager 报告Checkpoint 完成。之后Source会将barrier消息广播给下游的每一个 task:
source端
当task接收到某个上游发送来的barrier,会将该上游barrier之前的数据继续进行处理,而barrier之后发送来的消息不会进行处理,会被缓存起来。barrier之前的数据属于本次Checkpoint,barrier之后的数据属于下一次Checkpoint,所以下次Checkpoint的数据是不应该在本次Checkpoint过程中被计算的,因此会将数据进行缓存。
Transformation端
如果某个task有多个上游输入,如这里的 sum_even 有两个 Source 源,当接收到其中一个Source 的barrier后,会等待其他 Source 的 barrier 到来。在此期间,接收到 barrier 的Source 发来的数据不会处理,只会缓存。而未接收到 barrier 的Source 发来的数据依然会进行处理,直到接收到该Source 发来的 barrier,这个过程称为barrier的对齐 。barrier对齐只会发生在多对一的Operator(如 join)或者一对多的Operator(如reparation/shuffle)。如果是一对一的Operator,如map、flatMap 或 filter 等,则没有对齐这个概念
当task接收到所有上游发送来的barrier,即可以认为当前task收到了本次 Checkpoint 的所有数据。之后 task 会将 barrier 继续发送给下游,然后处理缓存的数据,比如这里sum_even 会处理 Source1 发送来的数据4. 而且,在这个过程中 Source 会继续读取数据发送给下游,并不会中断。
缓存数据
当sink收到barrier后,会向JobManager上报本次Checkpoint完成。至此,本次Checkpoint结束,各阶段的状态均进行了持久化,可以用于后续的故障恢复。
Sink端
反压时无法做出checkpoint在反压时候 barrier 无法随着数据往下游流动,造成反压的时候无法做出 Checkpoint。但是其实在发生反压情况的时候,我们更加需要去做出对数据的Checkpoint,因为这个时候性能遇到了瓶颈,是更加容易出问题的阶段;Barrier 对齐阻塞数据处理 :阻塞对齐对于性能上存在一定的影响;恢复性能受限于 Checkpoint 间隔 :在做恢复的时候,延迟受到多大的影响很多时候是取决于 Checkpoint 的间隔,间隔越大,需要 replay 的数据就会越多,从而造成中断的影响也就会越大。但是目前 Checkpoint 间隔受制于持久化操作的时间,所以没办法做的很快。解决方案:Unaligned Checkpointbarrier 算子在到达 input buffer 最前面的时候,就会开始触发 Checkpoint 操作。它会立刻把 barrier 传到算子的 OutPut Buffer 的最前面,相当于它会立刻被下游的算子所读取到。通过这种方式可以使得 barrier 不受到数据阻塞,解决反压时候无法进行 Checkpoint 的问题。当我们把 barrier 发下去后,需要做一个短暂的暂停,暂停的时候会把算子的 State 和 inputoutput buffer 中的数据进行一个标记,以方便后续随时准备上传。对于多路情况会一直等到另外一路 barrier 到达之前数据,全部进行标注。通过这种方式整个在做 Checkpoint 的时候,也不需要对 barrier 进行对齐,唯一需要做的停顿就是在整个过程中对所有 buffer 和 state 标注。这种方式可以很好的解决反压时无法做出Checkpoint ,和 Barrier 对齐阻塞数据影响性能处理的问题。
原因
机制革新
流程详解
SavePoint
Flink先将待输出的数据保存下来暂时不向外部系统提交,等到Checkpoint结束时,Flink上下游所有算子的数据都是一致的时候,Flink将之前保存的数据全部提交并commit到外部系统。换句话说,只有经过Checkpoint确认的数据才向外部系统写入。如下图所示,如果使用事务写,那只把时间戳3之前的输出提交到外部系统,时间戳3以后的数据(例如时间戳5和8生成的数据)暂时保存下来,等待下次Checkpoint时一起写入到外部系统。这就避免了时间戳5这个数据产生多次结果,多次写入到外部系统
事务写入
预写日志事务提交是需要外部存储系统支持事务的,否则没有办法真正实现写入的回撤。那对于一般不支持事务的存储系统,预写日志(WAL)就是一种非常简单的方式。先把结果数据作为日志(log)状态保存起来进行检查点保存时,也会将这些结果数据一并做持久化存储在收到检查点完成的通知时,将所有结果一次性写入外部系统。需要注意的是,预写日志这种一批写入的方式,有可能会写入失败;所以在执行写入动作之后,必须等待发送成功的返回确认消息。在成功写入所有数据后,在内部再次确认相应的检查点,这才代表着检查点的真正完成。这里需要将确认信息也进行持久化保存,在故障恢复时,只有存在对应的确认信息,才能保证这批数据已经写入,可以恢复到对应的检查点位置。但这种“再次确认”的方式,也会有一些缺陷。如果我们的检查点已经成功保存、数据也成功地一批写入到了外部系统,但是最终保存确认信息时出现了故障,Flink 最终还是会认为没有成功写入。于是发生故障时,不会使用这个检查点,而是需要回退到上一个;这样就会导致这批数据的重复写入。这两种方式区别主要在于:WAL方式通用性更强,适合几乎所有外部系统,但也不能提供百分百端到端的ExactlyOnce,因为WAL预习日志会先写内存,而内存是易失介质。如果外部系统自身就支持事务(比如MySQL、Kafka),可以使用2PC方式,可以提供百分百端到端的Exactly-Once。事务写的方式能提供端到端的Exactly-Once一致性,它的代价也是非常明显的,就是牺牲了延迟。输出数据不再是实时写入到外部系统,而是分批次地提交。目前来说,没有完美的故障恢复和Exactly-Once保障机制,对于开发者来说,需要在不同需求之间权衡。
预写日志
事务写入之预写日志
两阶段提交,顾名思义,即分两个阶段commit:preCommit和Commit。preCommit阶段协调者向所有参与者发起请求,询问是否可以执行提交操作,并开始等待所有参与者的响应。所有参与者节点执行协调者询问发起为止的所有事务操作,并将undo和redo信息写入日志进行持久化。所有参与者响应协调者发起的询问。对于每个参与者节点,如果他的事务操作执行成功,则返回“同意”消息;反之,返回“终止”消息。commit阶段如果协调者获取到的所有参与者节点返回的消息都为“同意”时,协调者向所有参与者节点发送“正式提交”的请求(成功情况);反之,如果任意一个参与者节点预提交阶段返回的响应消息为“终止”,或者协调者询问阶段超时,导致没有收到所有的参与者节点的响应,那么,协调者向所有参与者节点发送“回滚提交”的请求(失败情况)。成功情况所有参与者节点正式完成操作,并释放在整个事务期间占用的资源;反之,失败情况下,所有参与者节点利用之前持久化的预写日志进行事务回滚操作,并释放在整个事务期间占用的资源。成功情况下,所有参与者节点向协调者节点发送“事务完成”消息;失败情况下,所有参与者节点向协调者节点发送“回滚完成”消息。成功情况下,协调者收到所有参与者节点反馈的“事务完成”消息,完成事务;失败情况下,协调者收到所有参与者节点反馈的“回滚完成”消息,取消事务。
事务写入之两阶段提交
CheckPoint
如果你看到一个 task 发生 反压警告(例如: High ),意味着它生产数据的速率比下游 task 消费数据的速率要快。在工作流中数据记录是从上游向下游流动的(例如:从 Source 到 Sink)。反压沿着相反的方向传播,沿着数据流向上游传播。
监控反压
反压是流处理系统中用来保障应用可靠性的一个重要机制。由于流应用是7*24小时运行,数据输入速率也不是一成不变,可能随时间产生波峰波谷,当某个处理单元由于到来的数据忽然增加,暂时性超出其处理能力时,就会出现数据在接收队列上累积,当数据的累积量超出处理单元的容量时,会出现数据丢失现象甚至因为系统资源耗尽而导致应用崩溃。为此,需要一种反压机制来告知上游处理单元降低数据发送的速率,以缓解下游处理单元的压力。流处理系统需要能优雅地处理反压(backpressure)问题。反压通常产生于这样的场景:短时负载高峰导致系统接收数据的速率远高于它处理数据的速率反压并不会直接影响作业的可用性,它表明作业处于亚健康的状态,有潜在的性能瓶颈并可能导致更大的数据处理延迟。通常来说,对于一些对延迟要求不太高或者数据量比较小的应用来说,反压的影响可能并不明显,然而对于规模比较大的 Flink 作业来说反压可能会导致严重的问题。反压会影响到两项指标: checkpoint 时长和 state 大小。前者是因为 checkpoint barrier 是不会越过普通数据的,数据处理被阻塞也会导致checkpoint barrier 流经整个数据管道的时长变长,因而 checkpoint 总体时间(End to EndDuration)变长。后者是因为为保证 EOS(Exactly-Once-Semantics,准确一次),对于有两个以上输入管道的 Operator,checkpoint barrier 需要对齐(Alignment),接受到较快的输入管道的barrier 后,它后面数据会被缓存起来但不处理,直到较慢的输入管道的 barrier 也到达,这些被缓存的数据会被放到state 里面,导致 checkpoint 变大。
反压原因
对于一个 TaskManager 来说会有一个统一的 Network BufferPool 被所有的 Task 共享,在初始化时会从 Off-heap Memory 中申请内存,申请到内存的后续内存管理就是同步 NetworkBufferPool 来进行的,不需要依赖 JVM GC 的机制去释放。有了 Network BufferPool 之后可以为每一个 ResultSubPartition 创建 Local BufferPool 。如上图左边的 TaskManager 的 Record Writer 写了 <1,2> 这个两个数据进来,因为ResultSubPartition 初始化的时候为空,没有 Buffer 用来接收,就会向 Local BufferPool 申请内存,这时 Local BufferPool 也没有足够的内存于是将请求转到 Network BufferPool,最终将申请到的 Buffer 按原链路返还给 ResultSubPartition,<1,2> 这个两个数据就可以被写入了。之后会将 ResultSubPartition 的 Buffer 拷贝到 Netty 的 Buffer 当中最终拷贝到 Socket 的 Buffer 将消息发送出去。然后接收端按照类似的机制去处理将消息消费掉。因为速度不匹配就会导致一段时间后 InputChannel 的 Buffer 被用尽,于是他会向 LocalBufferPool 申请新的 Buffer ,这时候可以看到 Local BufferPool 中的一个 Buffer 就会被标记为Used发送端还在持续以不匹配的速度发送数据,然后就会导致 InputChannel 向 Local BufferPool 申请Buffer 的时候发现没有可用的 Buffer 了,这时候就只能向 Network BufferPool 去申请,当然每个 Local BufferPool 都有最大的可用的 Buffer,防止一个 Local BufferPool 把 NetworkBufferPool 耗尽。这时候看到 Network BufferPool 还是有可用的 Buffer 可以向其申请。显然,再过不久 Socket 的 Buffer 也被用尽,这时就会将 Window = 0 发送给发送端(前文提到的TCP 滑动窗口的机制)。这时发送端的 Socket 就会停止发送。很快发送端的 Socket 的 Buffer 也被用尽,Netty 检测到 Socket 无法写了之后就会停止向 Socket写数据。Netty 停止写了之后,所有的数据就会阻塞在 Netty 的 Buffer 当中了,但是 Netty 的 Buffer 是无界的,可以通过 Netty 的水位机制中的 high watermark 控制他的上界。当超过了 highwatermark,Netty 就会将其 channel 置为不可写,ResultSubPartition 在写之前都会检测Netty 是否可写,发现不可写就会停止向 Netty 写数据。这时候所有的压力都来到了 ResultSubPartition,和接收端一样他会不断的向 Local BufferPool和 Network BufferPool 申请内存。Local BufferPool 和 Network BufferPool 都用尽后整个 Operator 就会停止写数据,达到跨TaskManager 的反压。
跨TaskManager反压过程
下游的 TaskManager 反压导致本 TaskManager 的 ResultSubPartition 无法继续写入数据,于是 Record Writer 的写也被阻塞住了,因为 Operator 需要有输入才能有计算后的输出,输入跟输出都是在同一线程执行, Record Writer 阻塞了,Record Reader 也停止从 InputChannel 读数据,这时上游的 TaskManager 还在不断地发送数据,最终将这个 TaskManager 的 Buffer 耗尽。
TaskMananger内反压过程
Flink Backpressure
Flink
0 条评论
回复 删除
下一页