Flink
2023-03-27 15:20:39 0 举报
AI智能生成
对于flink最详细的总结,结合本人实际生产开发经验!需要的拿走
作者其他创作
大纲/内容
keyed&non-keyed
时间语义
时间概念
事件时间(Event Time)
1、事件时间是每个事件在其生产设备上发生的时间。这个时间通常在记录进入flink之前嵌入在记录中,并且可以从每个记录中提取事件时间戳。
2、在事件时间中,时间的进展取决于数据,而不是任何挂钟。事件时间程序必须指定如何生成事件时间水印,这是表示事件时间进度的机制。
3、事件时间指的是数据本身携带的时间。这个时间是在事件产生时的时间。
4、事件时间对于乱序、延时、或者数据重放等情况,都能给出正确的结果
2、在事件时间中,时间的进展取决于数据,而不是任何挂钟。事件时间程序必须指定如何生成事件时间水印,这是表示事件时间进度的机制。
3、事件时间指的是数据本身携带的时间。这个时间是在事件产生时的时间。
4、事件时间对于乱序、延时、或者数据重放等情况,都能给出正确的结果
摄入时间(Ingestion Time)
摄入时间指的是数据进入flink的时间
处理时间(Processing Time)
1、处理时间是指正在执行相应操作的机器的系统时间
2、当流式程序在基于处理时间运行时,所有基于时间的操作(如时间窗口)都将使用运行相应的机器的系统时钟。
3、每小时处理时间窗口将包括在系统时钟指示整小时的时间之间到达特定操作员的所有记录。
4、处理时间是最简单的时间概念,不需要流和机器之间的协调。它提供了最佳的性能和最低的延迟。但是,在分布式和异步环境中,处理时间不提供确定性,因为它容易受到记录到达系统的速度(例如,从消息队列)、记录在系统内的操作员之间流动的速度以及中断(计划的或其他)的影响。
2、当流式程序在基于处理时间运行时,所有基于时间的操作(如时间窗口)都将使用运行相应的机器的系统时钟。
3、每小时处理时间窗口将包括在系统时钟指示整小时的时间之间到达特定操作员的所有记录。
4、处理时间是最简单的时间概念,不需要流和机器之间的协调。它提供了最佳的性能和最低的延迟。但是,在分布式和异步环境中,处理时间不提供确定性,因为它容易受到记录到达系统的速度(例如,从消息队列)、记录在系统内的操作员之间流动的速度以及中断(计划的或其他)的影响。
窗口函数
基本概念
1、在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。
2、Windows是flink处理无限流的核心,Windows将流拆分为有限大小的“桶”,我们可以在其上应用计算。Flink认为Batch是Streaming的一个特例,所以flink底层引擎是一个流式引擎,在上面实现了流处理和批处理。而窗口就是从Streaming到Batch的一个桥梁。
3、窗口分类:基于时间划分驱动。例如:每30秒钟
基于数据数量驱动。例如:每100个元素,与时间无关
2、Windows是flink处理无限流的核心,Windows将流拆分为有限大小的“桶”,我们可以在其上应用计算。Flink认为Batch是Streaming的一个特例,所以flink底层引擎是一个流式引擎,在上面实现了流处理和批处理。而窗口就是从Streaming到Batch的一个桥梁。
3、窗口分类:基于时间划分驱动。例如:每30秒钟
基于数据数量驱动。例如:每100个元素,与时间无关
keyed&non-keyed
keyed
keyed streams要调用keyBy(...)后再调用window(...)
对于keyed stream,其中数据的任何属性都可以作为key。属于同一个key的元素会被发送到同一个task
使用keyed stream允许你的窗口计算由多个task并行,因为每个逻辑上的keyed stream都可以被单独处理。
对于keyed stream,其中数据的任何属性都可以作为key。属于同一个key的元素会被发送到同一个task
使用keyed stream允许你的窗口计算由多个task并行,因为每个逻辑上的keyed stream都可以被单独处理。
non-keyed
non-keyed streams只用直接调用windowAll(...)
对于non-keyed stream,原始的stream不会被分割为多个逻辑上的stream
所以所有的窗口计算都会被同一个task完成,也就是parallelism为1.
对于non-keyed stream,原始的stream不会被分割为多个逻辑上的stream
所以所有的窗口计算都会被同一个task完成,也就是parallelism为1.
Count Window
1、计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口
2、相当于座位有限、“人满就发车”,是否发车与时间无关。每个窗口截取数据的个数,就是窗口的大小。
3、计数窗口相比时间窗口就更加简单,我们只需指定窗口大小,就可以把数据分配到对应的窗口中了。
2、相当于座位有限、“人满就发车”,是否发车与时间无关。每个窗口截取数据的个数,就是窗口的大小。
3、计数窗口相比时间窗口就更加简单,我们只需指定窗口大小,就可以把数据分配到对应的窗口中了。
Tumbling Window
1、滚动窗口的assigner分发元素到指定大小的窗口。滚动窗口的大小时固定的,且各自范围之间不重叠。
2、滚动计数窗口只需要传入一个长整型的参数size,表示窗口的大小
3、使用场景:适用于按照指定的周期来统计指标,比如说,每两个事件做一次统计
2、滚动计数窗口只需要传入一个长整型的参数size,表示窗口的大小
3、使用场景:适用于按照指定的周期来统计指标,比如说,每两个事件做一次统计
Sliding Window
1、滑动窗口的assigner分发元素到指定大小的窗口,窗口大小通过window size参数设置。
2、滑动窗口需要传入两个参数:size和slide,前者表示窗口大小,后者表示滑动步长。
3、如果slide小于窗口大小,滑动窗口可以允许窗口重叠。在这种情况下,一个元素可能会被分发到多个窗口
4、例如:每5个事件做一次统计,每次滑动长度2
2、滑动窗口需要传入两个参数:size和slide,前者表示窗口大小,后者表示滑动步长。
3、如果slide小于窗口大小,滑动窗口可以允许窗口重叠。在这种情况下,一个元素可能会被分发到多个窗口
4、例如:每5个事件做一次统计,每次滑动长度2
Window All
coutWindowAll数量窗口(不区分数量滚动窗口【滑动窗口与滚动窗口的区别,在于滑动窗口会有元素重叠可能,而滚动窗口不存在元素重叠】)
Session Windowon
1、会话窗口的assigner会把数据按活跃的会话分组
2、与滚动窗口和滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间
3、会话窗口的assigner可以设置固定的会话间隔(session gap)或用session gap extractor函数来动态地定义多长时间算作不活跃
4、当超出了不活跃地时间段,当前地会话就会关闭,并且将接下来地数据分发到新的会话窗口
2、与滚动窗口和滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间
3、会话窗口的assigner可以设置固定的会话间隔(session gap)或用session gap extractor函数来动态地定义多长时间算作不活跃
4、当超出了不活跃地时间段,当前地会话就会关闭,并且将接下来地数据分发到新的会话窗口
Flink Window Functions
增量聚合函数
1、窗口将数据收集起来,最基本的处理操作当然就是进行聚合。窗口对无限流的切分,可以看作得到了一个有界数据集。如果我们等到所有数据都收集齐,在窗口到了结束时间要输出结果的一瞬间再去进行聚合,显然就不够高效
2、为了提高实时性,我们可以再次将流处理的思路发扬光大:就像 DataStream 的简单聚合一样,每来一条数据就立即进行计算,中间只要保持一个简单的聚合状态就可以了;区别只是在于不立即输出结果,而是要等到窗口结束时间。等到窗口到了结束时间需要输出计算结果的时候,我们只需要拿出之前聚合的状态直接输出,这无疑就大大提高了程序运行的效率和实时性。
2、为了提高实时性,我们可以再次将流处理的思路发扬光大:就像 DataStream 的简单聚合一样,每来一条数据就立即进行计算,中间只要保持一个简单的聚合状态就可以了;区别只是在于不立即输出结果,而是要等到窗口结束时间。等到窗口到了结束时间需要输出计算结果的时候,我们只需要拿出之前聚合的状态直接输出,这无疑就大大提高了程序运行的效率和实时性。
全窗口聚合函数
1、全量窗口函数需要对所有进入该窗口的数据进行缓存,等到窗口触发时才会遍历窗口内所有数据,进行结果计算。
2、因为有些场景下,我们要做的计算必须基于全部的数据才有效,这时做增量聚合就没什么意义了;另外,输出的结果有可能要包含上下文中的一些信息(比如窗口的起始时间),这是增量聚合函数做不到的。所以,我们还需要有更丰富的窗口计算方式,这就可以用全窗口函数来实现。
2、因为有些场景下,我们要做的计算必须基于全部的数据才有效,这时做增量聚合就没什么意义了;另外,输出的结果有可能要包含上下文中的一些信息(比如窗口的起始时间),这是增量聚合函数做不到的。所以,我们还需要有更丰富的窗口计算方式,这就可以用全窗口函数来实现。
Flink WaterMark
水位线意义
水位线就是用来度量事件事件
水位线是数据流中的一部分,随着数据一起流动,在不同任务之间传输
水位线可以看作是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间
水位线插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。
水位线是数据流中的一部分,随着数据一起流动,在不同任务之间传输
水位线可以看作是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间
水位线插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。
内置水位线生成器
1.10版本之前
AssignerWithPeriodicWatermarks
周期性的生成watermark,默认周期是200ms,也可以通过setAutoWatermarkInterval设置周期时间
AssignerWithPunctuatedWatermarks
阶段性的生成 watermark,即每来一条数据就生成一个wm
AssignerWithPunctuatedWatermarks
阶段性的生成 watermark,即每来一条数据就生成一个wm
1.11版本以后
WatermarkStrategy
单调递增策略
固定乱序长度策略
不生成策略
固定乱序长度策略
不生成策略
自定义水位线
周期性水位线
周期性调用方法中发出水位线
Periodic Generator
周期性生成器一般是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线
Periodic Generator
周期性生成器一般是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线
定点式水位线
在事件触发的方法中发出水位线
定点式生成器会不停地检测onEvent()中地事件,当发现带有水位线信息地特殊事件时,就会立即发出水位线。
WatermarkGenerator 接口中有两个方法
onEvent():在每个事件到来时调用
onPeriodicEmit():由框架周期性调用
定点式生成器会不停地检测onEvent()中地事件,当发现带有水位线信息地特殊事件时,就会立即发出水位线。
WatermarkGenerator 接口中有两个方法
onEvent():在每个事件到来时调用
onPeriodicEmit():由框架周期性调用
水位线传递
实际应用中上下游都有多个并行子任务,为了统一推进事件时间的进展,要求上游任务处理完水位线、时钟改变之后,将当前水位线广播给所有下游任务
上游并行子任务发来不同的水位线,当前任务会为每一个分区设置一个分区水位线,当前任务时钟取最小水位线。
上游并行子任务发来不同的水位线,当前任务会为每一个分区设置一个分区水位线,当前任务时钟取最小水位线。
延迟数据
简述
现实中很难生成一个完美的水位线,水位线就是在延迟与准确性之前做的一种权衡。如果生成的水位线过于紧迫,即水位线可能会大于后来数据的时间戳,这就意味着数据有延迟,关于延迟数据的处理,Flink提供了一些机制,具体如下:allowedLateness、sideOutputLateData
allowedLateness
flink提供了WindowedStream.allowedLateness()方法来设定窗口的允许延迟
正常情况下窗口触发计算完成之后就会被销毁,但是设定了允许延迟之后,窗口会等待allowedLateness的时长后开始计算并销毁。在该区间内的迟到数据仍然可以进入窗口中,并触发新的计算
什么情况下数据会被丢弃或者说不会被计算?
未设置allowedLateness情况下,某条数据属于某个窗口,但是watermark超过了窗口的结
束时间,则该条数据会被丢弃;
设置allowedLateness情况下,某条数据属于某个窗口,但是watermark超过了窗口的结束
时间+延迟时间,则该条数据会被丢弃;
正常情况下窗口触发计算完成之后就会被销毁,但是设定了允许延迟之后,窗口会等待allowedLateness的时长后开始计算并销毁。在该区间内的迟到数据仍然可以进入窗口中,并触发新的计算
什么情况下数据会被丢弃或者说不会被计算?
未设置allowedLateness情况下,某条数据属于某个窗口,但是watermark超过了窗口的结
束时间,则该条数据会被丢弃;
设置allowedLateness情况下,某条数据属于某个窗口,但是watermark超过了窗口的结束
时间+延迟时间,则该条数据会被丢弃;
sideOutputLateData
保底方案,数据延迟严重,可以保证数据不丢失
延迟的数据通过outputTag输出,必须要事件时间大于watermark+allowed lateness,数据才会存储在outputTag中
延迟的数据通过outputTag输出,必须要事件时间大于watermark+allowed lateness,数据才会存储在outputTag中
Flink算子状态
状态定义
无状态算子
无状态的算子任务只需要观察每个独立事件,根据当前输入的数据直接转换输出结果
如map、filter、flatmap,计算时不依赖其他数据,就属于无状态算子
如map、filter、flatmap,计算时不依赖其他数据,就属于无状态算子
有状态算子
当前数据之外,还需要一些其他数据来得到计算结果。这里的“其它数据”就是所谓的状态
比如,做求和(sum)计算时,需要保存之前所有数据的和,这就是状态。
比如,做求和(sum)计算时,需要保存之前所有数据的和,这就是状态。
为什么需要状态
离线任务失败:重启任务,然后重新读一遍输入数据,最后把昨天数据重新计算一遍即可。
实时任务失败:重启任务,然后重新读一遍输入数据,最后把昨日数据重新计算一遍不就可以了。因为实时任务第一重要的就是时效性,很明显重新计算违背了实时计算的原则。
实时任务失败:重启任务,然后重新读一遍输入数据,最后把昨日数据重新计算一遍不就可以了。因为实时任务第一重要的就是时效性,很明显重新计算违背了实时计算的原则。
托管方式
原始状态
原始状态则是自定义的,相当于就是开辟了一块内存,需要我们自己管理,实现状态的序列化和故障恢复。
flink不会对状态进行任何自动操作,也不知道状态的具体数据类型,只会把它当作最原始的字节数据来存储
flink不会对状态进行任何自动操作,也不知道状态的具体数据类型,只会把它当作最原始的字节数据来存储
托管状态
托管状态就是由flink统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由flink实现,我们只需要调用接口即可。
托管状态是由flink的运行时来托管的,在配置容错机制后,状态会自动持久化保存,并在发生故障时自动恢复。
当应用发生横向扩展时,状态也会自动地重组分配到所有的子任务实例上。
托管状态是由flink的运行时来托管的,在配置容错机制后,状态会自动持久化保存,并在发生故障时自动恢复。
当应用发生横向扩展时,状态也会自动地重组分配到所有的子任务实例上。
状态类型
算子状态
状态作用范围限定为当前的算子任务实例,也就是只对当前并行子任务实例有效
这就意味着对于一个并行子任务,占据了一个“分区”,它所处理的所有数据都会访问到相同的状态,状态对于同一任务而言是共享的。
算子状态也可以用在所有算子上,使用的时候其实就跟一个本地变量没什么区别。因为本地变量的作用域也是当前任务实例。在使用时,我们还需进一步实现CheckpointedFunction接口。
这就意味着对于一个并行子任务,占据了一个“分区”,它所处理的所有数据都会访问到相同的状态,状态对于同一任务而言是共享的。
算子状态也可以用在所有算子上,使用的时候其实就跟一个本地变量没什么区别。因为本地变量的作用域也是当前任务实例。在使用时,我们还需进一步实现CheckpointedFunction接口。
键分区状态
状态是根据输入流中定义的键(key)来维护和访问的,所以只能定义在按键分区流(keyedStream)中,也就keyBy之后才可以使用
聚合算子必须在keyBy之后才能使用,就是因为聚合的结果是以keyed state的形式保存的。
聚合算子必须在keyBy之后才能使用,就是因为聚合的结果是以keyed state的形式保存的。
状态后端
简述
状态后端(State Backends)的作用就是用来维护State的,状态本质上就是数据
状态后端主要有两周形式的状态管理:
1、直接将State以对象的形式存储到JVM的堆上面
2、将State对象序列化后存储到RocksDB中
Remote State Checkpointing
State Backends提供了State Checkpointing的功能,将TaskManager本地的State的备份到远程的存储介质上,可以是分布式的存储系统或者数据库。
状态后端的主要作用包括在每一个TaskManager节点上存储和管理状态,将状态进行远程备份两个部分
状态后端主要有两周形式的状态管理:
1、直接将State以对象的形式存储到JVM的堆上面
2、将State对象序列化后存储到RocksDB中
Remote State Checkpointing
State Backends提供了State Checkpointing的功能,将TaskManager本地的State的备份到远程的存储介质上,可以是分布式的存储系统或者数据库。
状态后端的主要作用包括在每一个TaskManager节点上存储和管理状态,将状态进行远程备份两个部分
存储方式
flink1.13版以前
MemoryStateBackend
基于内存存储
FsStateBackend
基于文件存储
本地路径,其格式为“file:///data/flink/checkpoints”
HDFS路径,其格式为“hdfs://nameservice/flink/checkpoints”
RocksDBStateBackend
基于RocksDB存储
RocksDB 是一个 key/value 的内存存储系统,类似于HBase
当写数据时会先写进write buffer(类似于HBase的memstore),然后在flush到磁盘文
件,
当读取数据时会现在block cache(类似于HBase的block cache),所以速度会很快。
基于内存存储
FsStateBackend
基于文件存储
本地路径,其格式为“file:///data/flink/checkpoints”
HDFS路径,其格式为“hdfs://nameservice/flink/checkpoints”
RocksDBStateBackend
基于RocksDB存储
RocksDB 是一个 key/value 的内存存储系统,类似于HBase
当写数据时会先写进write buffer(类似于HBase的memstore),然后在flush到磁盘文
件,
当读取数据时会现在block cache(类似于HBase的block cache),所以速度会很快。
flink1.13版本以后
HashMapStateBackend【默认】
Fsstatebackend 和 MemoryStatebackend 整合成了 HashMapStateBackend
存储管理状态类似于管理一个java堆中的对象,key/value的状态和窗口操作都会在一个
hash table中进行存储状态。
默认情况下,每一个状态最大为 5 MB。可以通过 MemoryStateBackend 的构造函数
增加最大大小。
内存空间不够时,也会溢出一部分数据到本地磁盘文件;
可以支撑大规模的状态数据,只不过在状态数据规模超出内存空间时,读写效率就会明显
降低
EmbeddedRocksDBStateBackend
该状态后端存储管理状态在RocksDB数据库中,该状态存储在本地taskManager的磁盘
目录。
该状态后端按照指定的类型序列化后变成字节数组将数据存储在磁盘,按照key的字典
序排列
RocksDB 的每个 key 和 value 的最大大小为 2^31 字节。这是因为 RocksDB 的 JNI API
是基于 byte[] 的。
Rockdb 中的数据,有内存缓存的部分,也有磁盘文件的部分;
Rockdb 的磁盘文件数据读写速度相对还是比较快的,所以在支持超大规模状态数据
时,数据的读写效率不会有太大的降低
Fsstatebackend 和 MemoryStatebackend 整合成了 HashMapStateBackend
存储管理状态类似于管理一个java堆中的对象,key/value的状态和窗口操作都会在一个
hash table中进行存储状态。
默认情况下,每一个状态最大为 5 MB。可以通过 MemoryStateBackend 的构造函数
增加最大大小。
内存空间不够时,也会溢出一部分数据到本地磁盘文件;
可以支撑大规模的状态数据,只不过在状态数据规模超出内存空间时,读写效率就会明显
降低
EmbeddedRocksDBStateBackend
该状态后端存储管理状态在RocksDB数据库中,该状态存储在本地taskManager的磁盘
目录。
该状态后端按照指定的类型序列化后变成字节数组将数据存储在磁盘,按照key的字典
序排列
RocksDB 的每个 key 和 value 的最大大小为 2^31 字节。这是因为 RocksDB 的 JNI API
是基于 byte[] 的。
Rockdb 中的数据,有内存缓存的部分,也有磁盘文件的部分;
Rockdb 的磁盘文件数据读写速度相对还是比较快的,所以在支持超大规模状态数据
时,数据的读写效率不会有太大的降低
状态TTL
基本概念
flink可以对状态数据进行存活时长管理,即"新城代谢"
淘汰的机制主要是基于存活时间(Time To Live);存活时长的计时器可以在数据被读、写时重置
TTL存活管理粒度是到元素级的
淘汰的机制主要是基于存活时间(Time To Live);存活时长的计时器可以在数据被读、写时重置
TTL存活管理粒度是到元素级的
相关参数
setTtl:表示状态时间戳的过期时间
setUpdateType:表示状态时间戳的更新的时机,是一个Enum对象
setStateVisibility:表示对已过期但还未被清理掉的状态如何处理,也是Enum对象
ttlTimeCharacteristic:表示StateTTL功能所适用的时间模式,仍然是Enum对象
cleanupStrategies:表示对象的清理策略,目前来说有三种Enum值
setUpdateType:表示状态时间戳的更新的时机,是一个Enum对象
setStateVisibility:表示对已过期但还未被清理掉的状态如何处理,也是Enum对象
ttlTimeCharacteristic:表示StateTTL功能所适用的时间模式,仍然是Enum对象
cleanupStrategies:表示对象的清理策略,目前来说有三种Enum值
Flink窗口联结
简介
对于两条流的合并,很多情况我们不是简单地将所有数据放在一起,而是希望根据某个字段的值将它们联结起来,“配对”去做处理
join
Tumbling Window Join
执行翻滚窗口时,具有公共键和公共翻滚窗口的所有元素将作为成对组合联接,并传递给JoinFunction或FlatJoinFunction
因为它的行为类似于内部连接,所以一个流中的元素在其滚动窗口中没有来自另一个流的元素,因此不会被发射
因为它的行为类似于内部连接,所以一个流中的元素在其滚动窗口中没有来自另一个流的元素,因此不会被发射
Sliding Window Join
在执行滑动窗口连接时,具有公共键和公共滑动窗口的所有元素将作为成对组合连接,并传递给JoinFunction或FlatJoinFunction
在当前滑动窗口中,一个流的元素没有来自另一个流的元素,则不会发射!
在当前滑动窗口中,一个流的元素没有来自另一个流的元素,则不会发射!
Session Window Join
在执行会话窗口联接时,具有相同键(当“组合”时满足会话条件)的所有元素以成对组合方式联接,并传递给JoinFunction或FlatJoinFunction。
同样,这执行一个内部连接,所以如果有一个会话窗口只包含来自一个流的元素,则不会发出任何输出!
同样,这执行一个内部连接,所以如果有一个会话窗口只包含来自一个流的元素,则不会发出任何输出!
CoGroup
除了输出匹配的元素以外,未能匹配的元素也会输出
用于DataStream时返回是CoGroupStreams,用于DataSet时返回时CoGroupOperatorSets
用于DataStream时返回是CoGroupStreams,用于DataSet时返回时CoGroupOperatorSets
Interval Join
业务需求
在有些场景下,我们需要处理的时间间隔可能并不是固定的。
比如,在交易系统中,需要实时地对每一笔交易进行核验,保证两个账户转入转出数额相等,也就是所谓地“实时对账”
两次转账地数据可能写入了不同的日志流,它们的时间戳应该相差不大,所以我们可以考虑只统计一段时间内是否有出账入账的数据匹配
比如,在交易系统中,需要实时地对每一笔交易进行核验,保证两个账户转入转出数额相等,也就是所谓地“实时对账”
两次转账地数据可能写入了不同的日志流,它们的时间戳应该相差不大,所以我们可以考虑只统计一段时间内是否有出账入账的数据匹配
实现原理
间隔联结具体的定义方式是,我们给定两个时间点,分别叫作间隔的“上界”(upperBound)和“下界”(lowerBound)
对于一条流(不妨叫作A)中的任意一个数据元素a,就可以开辟一段时间间隔,把这段时间作为可以匹配另一条流数据的“窗口”范围。
以a的时间戳为中心,下至下界点、上至上界点的一个闭区间[a.timestamp + lowerBound, a.timestamp + upperBound]
对于另一条流(不妨叫B)中的数据元素b,如果它的时间戳落在了这个区间范围内,a和b就可以成功配对,进而进行计算输出结果。所以匹配的条件为:
a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound间隔联结的两条流A和B,也必须基于相同的key;下界lowerBound应该小于等于上界
upperBound,两者都可正可负;间隔联结目前只支持事件时间语义。
对于一条流(不妨叫作A)中的任意一个数据元素a,就可以开辟一段时间间隔,把这段时间作为可以匹配另一条流数据的“窗口”范围。
以a的时间戳为中心,下至下界点、上至上界点的一个闭区间[a.timestamp + lowerBound, a.timestamp + upperBound]
对于另一条流(不妨叫B)中的数据元素b,如果它的时间戳落在了这个区间范围内,a和b就可以成功配对,进而进行计算输出结果。所以匹配的条件为:
a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound间隔联结的两条流A和B,也必须基于相同的key;下界lowerBound应该小于等于上界
upperBound,两者都可正可负;间隔联结目前只支持事件时间语义。
代码实现
间隔联结在代码中,是基于KeyedStream的联结(join)操作。
DataStream在keyBy得到KeyedStream之后,可以调用.intervalJoin()来合并两条流,传入的参数
同样是一个KeyedStream,两者的key类型应该一致;得到的是一个IntervalJoin类型。后续的操作
同样是完全固定的:先通过.between()方法指定间隔的上下界,再调用.process()方法,定义对匹
配数据对的处理操作。调用.process()需要传入一个处理函数,这是处理函数家族的最后一员:“处
理联结函数”ProcessJoinFunction。
DataStream在keyBy得到KeyedStream之后,可以调用.intervalJoin()来合并两条流,传入的参数
同样是一个KeyedStream,两者的key类型应该一致;得到的是一个IntervalJoin类型。后续的操作
同样是完全固定的:先通过.between()方法指定间隔的上下界,再调用.process()方法,定义对匹
配数据对的处理操作。调用.process()需要传入一个处理函数,这是处理函数家族的最后一员:“处
理联结函数”ProcessJoinFunction。
Flink容错机制
简述
flink是一个stateful(带状态)的数据处理系统;系统在处理数据的过程中,各算子所记录的状态会随着数据的处理而不断变化
一旦系统崩溃,需要重启后能够恢复出崩溃前的状态才能进行数据的继续处理
因此,必须要一种机制能够对系统内的各种状态进行持久化容错。
Flink-EOS:Exactly-Once Semantics
指端到端的一致性,从数据读取,引擎计算,写入外部存储的整个过程中,即使机器或软件出现故障,都确保数据仅处理一次,不会重复,也不会丢失
一条(或者一批)数据,从注入系统,中间处理,到输出结果的整个流程中,要么每个环节都处理成功,要么失败回滚。
一旦系统崩溃,需要重启后能够恢复出崩溃前的状态才能进行数据的继续处理
因此,必须要一种机制能够对系统内的各种状态进行持久化容错。
Flink-EOS:Exactly-Once Semantics
指端到端的一致性,从数据读取,引擎计算,写入外部存储的整个过程中,即使机器或软件出现故障,都确保数据仅处理一次,不会重复,也不会丢失
一条(或者一批)数据,从注入系统,中间处理,到输出结果的整个流程中,要么每个环节都处理成功,要么失败回滚。
At-most-once
有可能会有数据丢失
这本质上是最简单的恢复方式,也就是直接从失败处的下个数据开始恢复程序,之前的失败数据处理就不管了。可以保证数据或事件最多由应用程序中的所有算子处理一次,这意味着如果数据在被流应用程序完全处理之前发生丢失,则不会进行其他重试或者重新发送
这本质上是最简单的恢复方式,也就是直接从失败处的下个数据开始恢复程序,之前的失败数据处理就不管了。可以保证数据或事件最多由应用程序中的所有算子处理一次,这意味着如果数据在被流应用程序完全处理之前发生丢失,则不会进行其他重试或者重新发送
At-least-once
有可能重复处理数据
应用程序中的所有算子都保证数据或事件至少被处理一次。这通常意味着如果事件在流应用程序完全处理之前丢失,则将从源头重放或重新传输事件。然而,由于事件是可以被重传的,因此一个事件有时会被处理多次,至于有没有重复数据,不会关心,所以这种场景需要人工干预自己处理重复数据
应用程序中的所有算子都保证数据或事件至少被处理一次。这通常意味着如果事件在流应用程序完全处理之前丢失,则将从源头重放或重新传输事件。然而,由于事件是可以被重传的,因此一个事件有时会被处理多次,至于有没有重复数据,不会关心,所以这种场景需要人工干预自己处理重复数据
Exactly-once
每一条消息只被流处理系统处理一次
即使是在各种故障的情况下,流应用程序中的所有算子都保证事件只会被精确一次处理。
flink实现精确一次的分布式快照/状态检查点方法受到Chandy-Lamport分布式快照算法的启发。
流应用程序中每个算子的所有状态都会定期做checkpoint
如果是在系统中的任何地方发生失败,每个算子的所有状态都回滚到最新的全局一致checkpoint点
在回滚期间,将暂停所有处理,源也会重置为与最近checkpoint相对应的正确偏移量。
整个流应用程序基本上是回到最近一次的一致状态,然后程序从该状态重新启动。
即使是在各种故障的情况下,流应用程序中的所有算子都保证事件只会被精确一次处理。
flink实现精确一次的分布式快照/状态检查点方法受到Chandy-Lamport分布式快照算法的启发。
流应用程序中每个算子的所有状态都会定期做checkpoint
如果是在系统中的任何地方发生失败,每个算子的所有状态都回滚到最新的全局一致checkpoint点
在回滚期间,将暂停所有处理,源也会重置为与最近checkpoint相对应的正确偏移量。
整个流应用程序基本上是回到最近一次的一致状态,然后程序从该状态重新启动。
End-to-End Exactly-Once
端到端的精确一次
flink在1.4.0版本引入Exactly-Once,并号称支持端到端精确一次语义。它指的是flink应用从source端开始到sink端结束,数据必须经过的起始点和结束点
flink在1.4.0版本引入Exactly-Once,并号称支持端到端精确一次语义。它指的是flink应用从source端开始到sink端结束,数据必须经过的起始点和结束点
CheckPoint
数据屏障
数据分割
Flink分布式快照里面的一个核心元素就是流屏障。这些屏障会被插入到数据流中,并作为数据流的一部分随着数据流动。屏障不会持有任何数据,而是和数据一样线性的流动。可以看到屏障将数据流分成了两部分数据(实际上是多个连续的部分)。一部分是当前数据的快照数据,一部分下一个快照的数据。每个屏障会带有它的快照id,这个快照的数据都在这个屏障的前面。
如果是多个输入数据流,多个数据流的屏障会被同时插入到数据流中,快照n的屏障被插入到数据流的点(我们称之为Sn),就是数据流中一直到的某个位置,也就是包含这部分数据的快照。
然后屏障开始向下流动,当一个中间的operator收到它的所有输入源的快照n屏障后,它就会向它所有的输出流发射一个快照n的屏障,一旦一个sink的operator收到所有输入数据流的屏障n,它就会向checkpoint的协调器发送快照n确认。当所有的sink都确认了快照n,系统才认为当前快照的数据已经完成
如果是多个输入数据流,多个数据流的屏障会被同时插入到数据流中,快照n的屏障被插入到数据流的点(我们称之为Sn),就是数据流中一直到的某个位置,也就是包含这部分数据的快照。
然后屏障开始向下流动,当一个中间的operator收到它的所有输入源的快照n屏障后,它就会向它所有的输出流发射一个快照n的屏障,一旦一个sink的operator收到所有输入数据流的屏障n,它就会向checkpoint的协调器发送快照n确认。当所有的sink都确认了快照n,系统才认为当前快照的数据已经完成
barrier对齐
等到上游所有的并行子分区barrier都到齐,才去保存当前任务的状态
缺点:先到达的分区要做缓存等待,会造成数据堆积
缺点:先到达的分区要做缓存等待,会造成数据堆积
流程详解
任务启动
假设任务从 Kafka 的某个 Topic 中读取数据,该Topic 有 2 个 Partition,故任务的并行度为
2。根据读取到数据的奇偶性,将数据分发到两个 task 进行求和。某一时刻,状态如下:
Source1的偏移量为 3,即读取到了 1,2,3 三条数据。数据1已经发送到 sum_odd。
Source2的偏移量为 4,即读取到了1,2,3,4 四条数据。数据1,3已经发送到sum_odd,数据2
已经发送到sum_even
此时 sum_even 的状态为 2,sum_odd 的状态为 5
2。根据读取到数据的奇偶性,将数据分发到两个 task 进行求和。某一时刻,状态如下:
Source1的偏移量为 3,即读取到了 1,2,3 三条数据。数据1已经发送到 sum_odd。
Source2的偏移量为 4,即读取到了1,2,3,4 四条数据。数据1,3已经发送到sum_odd,数据2
已经发送到sum_even
此时 sum_even 的状态为 2,sum_odd 的状态为 5
启动checkpoint
JobManager 根据 Checkpoint 间隔时间,启动 Checkpoint。此时会给每个 Source 发送一
个 barrier 消息,消息中的数值表示 Checkpoint 的序号,每次启动新的 Checkpoint 该值都
会递增。
个 barrier 消息,消息中的数值表示 Checkpoint 的序号,每次启动新的 Checkpoint 该值都
会递增。
source端
当Source接收到barrier消息,会将当前的状态(Partition、Offset)保存到
StateBackend,然后向 JobManager 报告Checkpoint 完成。之后Source会将barrier消息广
播给下游的每一个 task:
StateBackend,然后向 JobManager 报告Checkpoint 完成。之后Source会将barrier消息广
播给下游的每一个 task:
Transformation端
当task接收到某个上游发送来的barrier,会将该上游barrier之前的数据继续进行处理,而
barrier之后发送来的消息不会进行处理,会被缓存起来。
barrier之前的数据属于本次Checkpoint,barrier之后的数据属于下一次Checkpoint,所以
下次Checkpoint的数据是不应该在本次Checkpoint过程中被计算的,因此会将数据进行缓
存。
barrier之后发送来的消息不会进行处理,会被缓存起来。
barrier之前的数据属于本次Checkpoint,barrier之后的数据属于下一次Checkpoint,所以
下次Checkpoint的数据是不应该在本次Checkpoint过程中被计算的,因此会将数据进行缓
存。
barrier对齐
如果某个task有多个上游输入,如这里的 sum_even 有两个 Source 源,当接收到其中一个
Source 的barrier后,会等待其他 Source 的 barrier 到来。在此期间,接收到 barrier 的
Source 发来的数据不会处理,只会缓存。而未接收到 barrier 的
Source 发来的数据依然会进行处理,直到接收到该Source 发来的 barrier,这个过程称为
barrier的对齐 。
barrier对齐只会发生在多对一的Operator(如 join)或者一对多的Operator(如
reparation/shuffle)。如果是一对一的Operator,如map、flatMap 或 filter 等,则没有对
齐这个概念
Source 的barrier后,会等待其他 Source 的 barrier 到来。在此期间,接收到 barrier 的
Source 发来的数据不会处理,只会缓存。而未接收到 barrier 的
Source 发来的数据依然会进行处理,直到接收到该Source 发来的 barrier,这个过程称为
barrier的对齐 。
barrier对齐只会发生在多对一的Operator(如 join)或者一对多的Operator(如
reparation/shuffle)。如果是一对一的Operator,如map、flatMap 或 filter 等,则没有对
齐这个概念
缓存数据
当task接收到所有上游发送来的barrier,即可以认为当前task收到了本次 Checkpoint 的所
有数据。之后 task 会将 barrier 继续发送给下游,然后处理缓存的数据,比如这里
sum_even 会处理 Source1 发送来的数据4. 而且,在这个过程中 Source 会继续读取数据
发送给下游,并不会中断。
有数据。之后 task 会将 barrier 继续发送给下游,然后处理缓存的数据,比如这里
sum_even 会处理 Source1 发送来的数据4. 而且,在这个过程中 Source 会继续读取数据
发送给下游,并不会中断。
Sink端
当sink收到barrier后,会向JobManager上报本次Checkpoint完成。至此,本次Checkpoint
结束,各阶段的状态均进行了持久化,可以用于后续的故障恢复。
结束,各阶段的状态均进行了持久化,可以用于后续的故障恢复。
机制革新
原因
反压时无法做出checkpoint
在反压时候 barrier 无法随着数据往下游流动,造成反压的时候无法做出 Checkpoint。但是
其实在发生反压情况的时候,我们更加需要去做出对数据的Checkpoint,因为这个时候性能
遇到了瓶颈,是更加容易出问题的阶段;
Barrier 对齐阻塞数据处理 :
阻塞对齐对于性能上存在一定的影响;
恢复性能受限于 Checkpoint 间隔 :
在做恢复的时候,延迟受到多大的影响很多时候是取决于 Checkpoint 的间隔,间隔越大,
需要 replay 的数据就会越多,从而造成中断的影响也就会越大。
但是目前 Checkpoint 间隔受制于持久化操作的时间,所以没办法做的很快。
解决方案:Unaligned Checkpoint
barrier 算子在到达 input buffer 最前面的时候,就会开始触发 Checkpoint 操作。它会立刻
把 barrier 传到算子的 OutPut Buffer 的最前面,相当于它会立刻被下游的算子所读取到。
通过这种方式可以使得 barrier 不受到数据阻塞,解决反压时候无法进行 Checkpoint 的问
题。
当我们把 barrier 发下去后,需要做一个短暂的暂停,暂停的时候会把算子的 State 和 input
output buffer 中的数据进行一个标记,以方便后续随时准备上传。对于多路情况会一直等到
另外一路 barrier 到达之前数据,全部进行标注。
通过这种方式整个在做 Checkpoint 的时候,也不需要对 barrier 进行对齐,唯一需要做的停
顿就是在整个过程中对所有 buffer 和 state 标注。这种方式可以很好的解决反压时无法做出
Checkpoint ,和 Barrier 对齐阻塞数据影响性能处理的问题。
在反压时候 barrier 无法随着数据往下游流动,造成反压的时候无法做出 Checkpoint。但是
其实在发生反压情况的时候,我们更加需要去做出对数据的Checkpoint,因为这个时候性能
遇到了瓶颈,是更加容易出问题的阶段;
Barrier 对齐阻塞数据处理 :
阻塞对齐对于性能上存在一定的影响;
恢复性能受限于 Checkpoint 间隔 :
在做恢复的时候,延迟受到多大的影响很多时候是取决于 Checkpoint 的间隔,间隔越大,
需要 replay 的数据就会越多,从而造成中断的影响也就会越大。
但是目前 Checkpoint 间隔受制于持久化操作的时间,所以没办法做的很快。
解决方案:Unaligned Checkpoint
barrier 算子在到达 input buffer 最前面的时候,就会开始触发 Checkpoint 操作。它会立刻
把 barrier 传到算子的 OutPut Buffer 的最前面,相当于它会立刻被下游的算子所读取到。
通过这种方式可以使得 barrier 不受到数据阻塞,解决反压时候无法进行 Checkpoint 的问
题。
当我们把 barrier 发下去后,需要做一个短暂的暂停,暂停的时候会把算子的 State 和 input
output buffer 中的数据进行一个标记,以方便后续随时准备上传。对于多路情况会一直等到
另外一路 barrier 到达之前数据,全部进行标注。
通过这种方式整个在做 Checkpoint 的时候,也不需要对 barrier 进行对齐,唯一需要做的停
顿就是在整个过程中对所有 buffer 和 state 标注。这种方式可以很好的解决反压时无法做出
Checkpoint ,和 Barrier 对齐阻塞数据影响性能处理的问题。
SavePoint
Savepoint 作为实时任务的全局镜像,其在底层使用的代码和Checkpoint的代码是一样的
Savepoint 是依据 Flink checkpointing 机制所创建的流作业执行状态的一致镜像;
Checkpoint 的主要目的是为意外失败的作业提供恢复机制(如 tm/jm 进程挂了)。
Checkpoint 的生命周期由 Flink 管理,即 Flink 创建,管理和删除 Checkpoint - 无需用户交
互。
Savepoint 由用户创建,拥有和删除。 他们的用例是计划的,手动备份和恢复。
Savepoint 应用场景,升级 Flink 版本,调整用户逻辑,改变并行度,以及进行红蓝部署等。
Savepoint 更多地关注可移植性
Savepoint触发方式触发方式目前有三种
使用 flink savepoint 命令触发 Savepoint,其是在程序运行期间触发 savepoint。
使用 flink cancel -s 命令,取消作业时,并触发 Savepoint。
使用 Rest API 触发 Savepoint,格式为:*/jobs/:jobid /savepoints*
Savepoint注意点
由于 Savepoint 是程序的全局状态,对于某些状态很大的实时任务,当我们触发
Savepoint,可能会对运行着的实时任务产生影响,个人建议如果对于状态过大的实时任务,
触发 Savepoint 的时间,不要太过频繁。根据状态的大小,适当的设置触发时间。
当我们从 Savepoint 进行恢复时,需要检查这次 Savepoint 目录文件是否可用。可能存在你
上次触发 Savepoint 没有成功,导致 HDFS 目录上面 Savepoint 文件不可用或者缺少数据文
件等,这种情况下,如果在指定损坏的 Savepoint 的状态目录进行状态恢复,任务会启动不
起来。
Savepoint 是依据 Flink checkpointing 机制所创建的流作业执行状态的一致镜像;
Checkpoint 的主要目的是为意外失败的作业提供恢复机制(如 tm/jm 进程挂了)。
Checkpoint 的生命周期由 Flink 管理,即 Flink 创建,管理和删除 Checkpoint - 无需用户交
互。
Savepoint 由用户创建,拥有和删除。 他们的用例是计划的,手动备份和恢复。
Savepoint 应用场景,升级 Flink 版本,调整用户逻辑,改变并行度,以及进行红蓝部署等。
Savepoint 更多地关注可移植性
Savepoint触发方式触发方式目前有三种
使用 flink savepoint 命令触发 Savepoint,其是在程序运行期间触发 savepoint。
使用 flink cancel -s 命令,取消作业时,并触发 Savepoint。
使用 Rest API 触发 Savepoint,格式为:*/jobs/:jobid /savepoints*
Savepoint注意点
由于 Savepoint 是程序的全局状态,对于某些状态很大的实时任务,当我们触发
Savepoint,可能会对运行着的实时任务产生影响,个人建议如果对于状态过大的实时任务,
触发 Savepoint 的时间,不要太过频繁。根据状态的大小,适当的设置触发时间。
当我们从 Savepoint 进行恢复时,需要检查这次 Savepoint 目录文件是否可用。可能存在你
上次触发 Savepoint 没有成功,导致 HDFS 目录上面 Savepoint 文件不可用或者缺少数据文
件等,这种情况下,如果在指定损坏的 Savepoint 的状态目录进行状态恢复,任务会启动不
起来。
事务写入之预写日志
事务写入
Flink先将待输出的数据保存下来暂时不向外部系统提交,等到Checkpoint结束时,Flink上下
游所有算子的数据都是一致的时候,Flink将之前保存的数据全部提交并commit到外部系
统。换句话说,只有经过Checkpoint确认的数据才向外部系统写入。
如下图所示,如果使用事务写,那只把时间戳3之前的输出提交到外部系统,时间戳3以后的
数据(例如时间戳5和8生成的数据)暂时保存下来,等待下次Checkpoint时一起写入到外部
系统。这就避免了时间戳5这个数据产生多次结果,多次写入到外部系统
游所有算子的数据都是一致的时候,Flink将之前保存的数据全部提交并commit到外部系
统。换句话说,只有经过Checkpoint确认的数据才向外部系统写入。
如下图所示,如果使用事务写,那只把时间戳3之前的输出提交到外部系统,时间戳3以后的
数据(例如时间戳5和8生成的数据)暂时保存下来,等待下次Checkpoint时一起写入到外部
系统。这就避免了时间戳5这个数据产生多次结果,多次写入到外部系统
预写日志
预写日志
事务提交是需要外部存储系统支持事务的,否则没有办法真正实现写入的回撤。那对于一般
不支持事务的存储系统,预写日志(WAL)就是一种非常简单的方式。
先把结果数据作为日志(log)状态保存起来
进行检查点保存时,也会将这些结果数据一并做持久化存储
在收到检查点完成的通知时,将所有结果一次性写入外部系统。
需要注意的是,预写日志这种一批写入的方式,有可能会写入失败;所以在执行写入动作之
后,必须等待发送成功的返回确认消息。在成功写入所有数据后,在内部再次确认相应的检
查点,这才代表着检查点的真正完成。这里需要将确认信息也进行持久化保存,在故障恢复
时,只有存在对应的确认信息,才能保证这批数据已经写入,可以恢复到对应的检查点位
置。
但这种“再次确认”的方式,也会有一些缺陷。如果我们的检查点已经成功保存、数据也成功
地一批写入到了外部系统,但是最终保存确认信息时出现了故障,Flink 最终还是会认为没有
成功写入。于是发生故障时,不会使用这个检查点,而是需要回退到上一个;这样就会导致
这批数据的重复写入。
这两种方式区别主要在于:
WAL方式通用性更强,适合几乎所有外部系统,但也不能提供百分百端到端的ExactlyOnce,因为WAL预习日志会先写内存,而内存是易失介质。
如果外部系统自身就支持事务(比如MySQL、Kafka),可以使用2PC方式,可以提供百分百
端到端的Exactly-Once。
事务写的方式能提供端到端的Exactly-Once一致性,它的代价也是非常明显的,就是牺牲了延迟。
输出数据不再是实时写入到外部系统,而是分批次地提交。目前来说,没有完美的故障恢复和
Exactly-Once保障机制,对于开发者来说,需要在不同需求之间权衡。
事务提交是需要外部存储系统支持事务的,否则没有办法真正实现写入的回撤。那对于一般
不支持事务的存储系统,预写日志(WAL)就是一种非常简单的方式。
先把结果数据作为日志(log)状态保存起来
进行检查点保存时,也会将这些结果数据一并做持久化存储
在收到检查点完成的通知时,将所有结果一次性写入外部系统。
需要注意的是,预写日志这种一批写入的方式,有可能会写入失败;所以在执行写入动作之
后,必须等待发送成功的返回确认消息。在成功写入所有数据后,在内部再次确认相应的检
查点,这才代表着检查点的真正完成。这里需要将确认信息也进行持久化保存,在故障恢复
时,只有存在对应的确认信息,才能保证这批数据已经写入,可以恢复到对应的检查点位
置。
但这种“再次确认”的方式,也会有一些缺陷。如果我们的检查点已经成功保存、数据也成功
地一批写入到了外部系统,但是最终保存确认信息时出现了故障,Flink 最终还是会认为没有
成功写入。于是发生故障时,不会使用这个检查点,而是需要回退到上一个;这样就会导致
这批数据的重复写入。
这两种方式区别主要在于:
WAL方式通用性更强,适合几乎所有外部系统,但也不能提供百分百端到端的ExactlyOnce,因为WAL预习日志会先写内存,而内存是易失介质。
如果外部系统自身就支持事务(比如MySQL、Kafka),可以使用2PC方式,可以提供百分百
端到端的Exactly-Once。
事务写的方式能提供端到端的Exactly-Once一致性,它的代价也是非常明显的,就是牺牲了延迟。
输出数据不再是实时写入到外部系统,而是分批次地提交。目前来说,没有完美的故障恢复和
Exactly-Once保障机制,对于开发者来说,需要在不同需求之间权衡。
事务写入之两阶段提交
两阶段提交,顾名思义,即分两个阶段commit:preCommit和Commit。
preCommit阶段
协调者向所有参与者发起请求,询问是否可以执行提交操作,并开始等待所有参与者的
响应。
所有参与者节点执行协调者询问发起为止的所有事务操作,并将undo和redo信息写入
日志进行持久化。
所有参与者响应协调者发起的询问。对于每个参与者节点,如果他的事务操作执行成
功,则返回“同意”消息;反之,返回“终止”消息。
commit阶段
如果协调者获取到的所有参与者节点返回的消息都为“同意”时,协调者向所有参与者节
点发送“正式提交”的请求(成功情况);反之,如果任意一个参与者节点预提交阶段返
回的响应消息为“终止”,或者协调者询问阶段超时,导致没有收到所有的参与者节点的
响应,那么,协调者向所有参与者节点发送“回滚提交”的请求(失败情况)。
成功情况所有参与者节点正式完成操作,并释放在整个事务期间占用的资源;反之,失
败情况下,所有参与者节点利用之前持久化的预写日志进行事务回滚操作,并释放在整
个事务期间占用的资源。
成功情况下,所有参与者节点向协调者节点发送“事务完成”消息;失败情况下,所有参
与者节点向协调者节点发送“回滚完成”消息。
成功情况下,协调者收到所有参与者节点反馈的“事务完成”消息,完成事务;失败情况
下,协调者收到所有参与者节点反馈的“回滚完成”消息,取消事务。
preCommit阶段
协调者向所有参与者发起请求,询问是否可以执行提交操作,并开始等待所有参与者的
响应。
所有参与者节点执行协调者询问发起为止的所有事务操作,并将undo和redo信息写入
日志进行持久化。
所有参与者响应协调者发起的询问。对于每个参与者节点,如果他的事务操作执行成
功,则返回“同意”消息;反之,返回“终止”消息。
commit阶段
如果协调者获取到的所有参与者节点返回的消息都为“同意”时,协调者向所有参与者节
点发送“正式提交”的请求(成功情况);反之,如果任意一个参与者节点预提交阶段返
回的响应消息为“终止”,或者协调者询问阶段超时,导致没有收到所有的参与者节点的
响应,那么,协调者向所有参与者节点发送“回滚提交”的请求(失败情况)。
成功情况所有参与者节点正式完成操作,并释放在整个事务期间占用的资源;反之,失
败情况下,所有参与者节点利用之前持久化的预写日志进行事务回滚操作,并释放在整
个事务期间占用的资源。
成功情况下,所有参与者节点向协调者节点发送“事务完成”消息;失败情况下,所有参
与者节点向协调者节点发送“回滚完成”消息。
成功情况下,协调者收到所有参与者节点反馈的“事务完成”消息,完成事务;失败情况
下,协调者收到所有参与者节点反馈的“回滚完成”消息,取消事务。
Flink Backpressure
监控反压
如果你看到一个 task 发生 反压警告(例如: High ),意味着它生产数据的速率比下游 task 消
费数据的速率要快。
在工作流中数据记录是从上游向下游流动的(例如:从 Source 到 Sink)。反压沿着相反的方向传
播,沿着数据流向上游传播。
费数据的速率要快。
在工作流中数据记录是从上游向下游流动的(例如:从 Source 到 Sink)。反压沿着相反的方向传
播,沿着数据流向上游传播。
反压原因
反压是流处理系统中用来保障应用可靠性的一个重要机制。由于流应用是7*24小时运行,数据输
入速率也不是一成不变,可能随时间产生波峰波谷,当某个处理单元由于到来的数据忽然增加,暂
时性超出其处理能力时,就会出现数据在接收队列上累积,当数据的累积量超出处理单元的容量
时,会出现数据丢失现象甚至因为系统资源耗尽而导致应用崩溃。为此,需要一种反压机制来告知
上游处理单元降低数据发送的速率,以缓解下游处理单元的压力。
流处理系统需要能优雅地处理反压(backpressure)问题。反压通常产生于这样的场景:短时负
载高峰导致系统接收数据的速率远高于它处理数据的速率
反压并不会直接影响作业的可用性,它表明作业处于亚健康的状态,有潜在的性能瓶颈并可能导致
更大的数据处理延迟。
通常来说,对于一些对延迟要求不太高或者数据量比较小的应用来说,反压的影响可能并不明显,
然而对于规模比较大的 Flink 作业来说反压可能会导致严重的问题。
反压会影响到两项指标: checkpoint 时长和 state 大小。
前者是因为 checkpoint barrier 是不会越过普通数据的,数据处理被阻塞也会导致
checkpoint barrier 流经整个数据管道的时长变长,因而 checkpoint 总体时间(End to End
Duration)变长。
后者是因为为保证 EOS(Exactly-Once-Semantics,准确一次),对于有两个以上输入管道
的 Operator,checkpoint barrier 需要对齐(Alignment),接受到较快的输入管道的
barrier 后,它后面数据会被缓存起来但不处理,直到较慢的输入管道的 barrier 也到达,这
些被缓存的数据会被放到state 里面,导致 checkpoint 变大。
入速率也不是一成不变,可能随时间产生波峰波谷,当某个处理单元由于到来的数据忽然增加,暂
时性超出其处理能力时,就会出现数据在接收队列上累积,当数据的累积量超出处理单元的容量
时,会出现数据丢失现象甚至因为系统资源耗尽而导致应用崩溃。为此,需要一种反压机制来告知
上游处理单元降低数据发送的速率,以缓解下游处理单元的压力。
流处理系统需要能优雅地处理反压(backpressure)问题。反压通常产生于这样的场景:短时负
载高峰导致系统接收数据的速率远高于它处理数据的速率
反压并不会直接影响作业的可用性,它表明作业处于亚健康的状态,有潜在的性能瓶颈并可能导致
更大的数据处理延迟。
通常来说,对于一些对延迟要求不太高或者数据量比较小的应用来说,反压的影响可能并不明显,
然而对于规模比较大的 Flink 作业来说反压可能会导致严重的问题。
反压会影响到两项指标: checkpoint 时长和 state 大小。
前者是因为 checkpoint barrier 是不会越过普通数据的,数据处理被阻塞也会导致
checkpoint barrier 流经整个数据管道的时长变长,因而 checkpoint 总体时间(End to End
Duration)变长。
后者是因为为保证 EOS(Exactly-Once-Semantics,准确一次),对于有两个以上输入管道
的 Operator,checkpoint barrier 需要对齐(Alignment),接受到较快的输入管道的
barrier 后,它后面数据会被缓存起来但不处理,直到较慢的输入管道的 barrier 也到达,这
些被缓存的数据会被放到state 里面,导致 checkpoint 变大。
跨TaskManager反压过程
对于一个 TaskManager 来说会有一个统一的 Network BufferPool 被所有的 Task 共享,在初始化
时会从 Off-heap Memory 中申请内存,申请到内存的后续内存管理就是同步 Network
BufferPool 来进行的,不需要依赖 JVM GC 的机制去释放。有了 Network BufferPool 之后可以为
每一个 ResultSubPartition 创建 Local BufferPool 。
如上图左边的 TaskManager 的 Record Writer 写了 <1,2> 这个两个数据进来,因为
ResultSubPartition 初始化的时候为空,没有 Buffer 用来接收,就会向 Local BufferPool 申请内
存,这时 Local BufferPool 也没有足够的内存于是将请求转到 Network BufferPool,最终将申请
到的 Buffer 按原链路返还给 ResultSubPartition,<1,2> 这个两个数据就可以被写入了。之后
会将 ResultSubPartition 的 Buffer 拷贝到 Netty 的 Buffer 当中最终拷贝到 Socket 的 Buffer 将
消息发送出去。然后接收端按照类似的机制去处理将消息消费掉。
因为速度不匹配就会导致一段时间后 InputChannel 的 Buffer 被用尽,于是他会向 Local
BufferPool 申请新的 Buffer ,这时候可以看到 Local BufferPool 中的一个 Buffer 就会被标记为
Used
发送端还在持续以不匹配的速度发送数据,然后就会导致 InputChannel 向 Local BufferPool 申请
Buffer 的时候发现没有可用的 Buffer 了,这时候就只能向 Network BufferPool 去申请,当然每
个 Local BufferPool 都有最大的可用的 Buffer,防止一个 Local BufferPool 把 Network
BufferPool 耗尽。这时候看到 Network BufferPool 还是有可用的 Buffer 可以向其申请。
显然,再过不久 Socket 的 Buffer 也被用尽,这时就会将 Window = 0 发送给发送端(前文提到的
TCP 滑动窗口的机制)。这时发送端的 Socket 就会停止发送。
很快发送端的 Socket 的 Buffer 也被用尽,Netty 检测到 Socket 无法写了之后就会停止向 Socket
写数据。
Netty 停止写了之后,所有的数据就会阻塞在 Netty 的 Buffer 当中了,但是 Netty 的 Buffer 是无
界的,可以通过 Netty 的水位机制中的 high watermark 控制他的上界。当超过了 high
watermark,Netty 就会将其 channel 置为不可写,ResultSubPartition 在写之前都会检测
Netty 是否可写,发现不可写就会停止向 Netty 写数据。
这时候所有的压力都来到了 ResultSubPartition,和接收端一样他会不断的向 Local BufferPool
和 Network BufferPool 申请内存。
Local BufferPool 和 Network BufferPool 都用尽后整个 Operator 就会停止写数据,达到跨
TaskManager 的反压。
时会从 Off-heap Memory 中申请内存,申请到内存的后续内存管理就是同步 Network
BufferPool 来进行的,不需要依赖 JVM GC 的机制去释放。有了 Network BufferPool 之后可以为
每一个 ResultSubPartition 创建 Local BufferPool 。
如上图左边的 TaskManager 的 Record Writer 写了 <1,2> 这个两个数据进来,因为
ResultSubPartition 初始化的时候为空,没有 Buffer 用来接收,就会向 Local BufferPool 申请内
存,这时 Local BufferPool 也没有足够的内存于是将请求转到 Network BufferPool,最终将申请
到的 Buffer 按原链路返还给 ResultSubPartition,<1,2> 这个两个数据就可以被写入了。之后
会将 ResultSubPartition 的 Buffer 拷贝到 Netty 的 Buffer 当中最终拷贝到 Socket 的 Buffer 将
消息发送出去。然后接收端按照类似的机制去处理将消息消费掉。
因为速度不匹配就会导致一段时间后 InputChannel 的 Buffer 被用尽,于是他会向 Local
BufferPool 申请新的 Buffer ,这时候可以看到 Local BufferPool 中的一个 Buffer 就会被标记为
Used
发送端还在持续以不匹配的速度发送数据,然后就会导致 InputChannel 向 Local BufferPool 申请
Buffer 的时候发现没有可用的 Buffer 了,这时候就只能向 Network BufferPool 去申请,当然每
个 Local BufferPool 都有最大的可用的 Buffer,防止一个 Local BufferPool 把 Network
BufferPool 耗尽。这时候看到 Network BufferPool 还是有可用的 Buffer 可以向其申请。
显然,再过不久 Socket 的 Buffer 也被用尽,这时就会将 Window = 0 发送给发送端(前文提到的
TCP 滑动窗口的机制)。这时发送端的 Socket 就会停止发送。
很快发送端的 Socket 的 Buffer 也被用尽,Netty 检测到 Socket 无法写了之后就会停止向 Socket
写数据。
Netty 停止写了之后,所有的数据就会阻塞在 Netty 的 Buffer 当中了,但是 Netty 的 Buffer 是无
界的,可以通过 Netty 的水位机制中的 high watermark 控制他的上界。当超过了 high
watermark,Netty 就会将其 channel 置为不可写,ResultSubPartition 在写之前都会检测
Netty 是否可写,发现不可写就会停止向 Netty 写数据。
这时候所有的压力都来到了 ResultSubPartition,和接收端一样他会不断的向 Local BufferPool
和 Network BufferPool 申请内存。
Local BufferPool 和 Network BufferPool 都用尽后整个 Operator 就会停止写数据,达到跨
TaskManager 的反压。
TaskMananger内反压过程
下游的 TaskManager 反压导致本 TaskManager 的 ResultSubPartition 无法继续写入数据,于是
Record Writer 的写也被阻塞住了,因为 Operator 需要有输入才能有计算后的输出,输入跟输出
都是在同一线程执行, Record Writer 阻塞了,Record Reader 也停止从 InputChannel 读数据,
这时上游的 TaskManager 还在不断地发送数据,最终将这个 TaskManager 的 Buffer 耗尽。
Record Writer 的写也被阻塞住了,因为 Operator 需要有输入才能有计算后的输出,输入跟输出
都是在同一线程执行, Record Writer 阻塞了,Record Reader 也停止从 InputChannel 读数据,
这时上游的 TaskManager 还在不断地发送数据,最终将这个 TaskManager 的 Buffer 耗尽。
0 条评论
下一页