flink
2022-04-07 13:19:28 0 举报
AI智能生成
登录查看完整内容
flink知识点总结
作者其他创作
大纲/内容
查询小数据量的维表情况下才使用这种方式,并且要妥善处理连接外部系统的线程,一般还会用到线程池。
实时查询维表
适用于那些实时场景不是很高,维表数据较小的场景
预加载全量数据
如果维表的数据比较大,无法一次性全部加载到内存中,可以使用LRU策略加载维表数据。
利用 Flink 的 RichAsyncFunction 读取 Hbase 的数据到缓存中,我们在关联维度表时先去查询缓存,如果缓存中不存在这条数据,就利用客户端去查询 Hbase,然后插入到缓存中
LRU 缓存
将维表消息广播出去
维度表关联
pre style=\
//3:获取数据Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable(\"broadcastSetName\");
实例
广播
重分区算子用来对数据进行重新分区,可以用来解决数据倾斜问题
dataStream.shuffle()
根据均匀分布随机分配元素,(类似于random.nextInt(3),0 - 3 在概率上是均匀的)
Random Partitioning
dataStream.rebalance()
分区元素循环,每个分区创建相等的负载。数据发生倾斜的时候可以用于性能优化
对数据集进行再平衡,重分组,消除数据倾斜
Rebalancing
dataSteam.rescale()
rescale与rebalance很像,也是将数据均匀分布到各下游各实例上,但它的传输开销更小,因为rescale并不是将每个数据轮询地发送给下游每个实例,而是就近发送给下游实例
Rescaling
自定义分区需要时间Paritition接口
Custom Partitioning
种类
推荐上下游并行度保持一致,即 Kafka 的分区数等于 Flink Consumer 的并行度。
如果你不做任何的设置则会导致部分 Flink Consumer 线程永远消费不到数据
需要设置 Flink 的 Redistributing,也就是数据重分配。
为了加快数据的处理速度,来设置 Flink 消费者的并行度大于 Kafka 的分区数
dataStream .setParallelism(2) // 采用REBALANCE分区策略重分区 .rebalance() //.rescale() .print() .setParallelism(4);
Rebalance 分区策略,数据会以 round-robin 的方式对数据进行再次分区,可以全局负载均衡。
Rescale 分区策略基于上下游的并行度,会将数据以循环的方式输出到下游的每个实例中
数据重分配-再分区
并发
重分区
状态
Flink重启策略
// 1 设置全局并行度设置为3 kafka分区数为6 env.setParallelism(3);
// 4. 设置状态后端 env.setStateBackend(new FsStateBackend(CHECK_POINT_PATH + \"/\" + jobName));
Flink设置环境
// Keyed Windowstream .keyBy(...) <- 按照一个Key进行分组 .window(...) <- 将数据流中的元素分配到相应的窗口中 [.trigger(...)] <- 指定触发器Trigger(可选) [.evictor(...)] <- 指定清除器Evictor(可选) .reduce/aggregate/process() <- 窗口处理函数Window Function// Non-Keyed Windowstream .windowAll(...) <- 不分组,将数据流中的所有元素分配到相应的窗口中 [.trigger(...)] <- 指定触发器Trigger(可选) [.evictor(...)] <- 指定清除器Evictor(可选) .reduce/aggregate/process() <- 窗口处理函数Window Function
骨架
分组
keyBy
不分组窗口
所有数据将发送给下游的单个实例,或者说下游算子的并行度为1
windowAll
不保证顺序
一种基于数量(Count-based Window)
TumblingEventTimeWindows
TumblingProcessingTimeWindows
滚动窗口:Tumbling
SlidingEventTimeWindows
SlidingProcessingTimeWindows
滑动窗口:Sliding
EventTimeSessionWindows
ProcessingTimeSessionWindows
DynamicProcessingTimeSessionWindows
会话窗口:Session
val input: DataStream[T] = ...// event-time session windows with static gapinput .keyBy(...) .window(EventTimeSessionWindows.withGap(Time.minutes(10))) .<window function>(...)// event-time session windows with dynamic gapinput .keyBy(...) .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] { override def extract(element: T): Long = { // determine and return session gap } })) .<window function>(...)// processing-time session windows with static gapinput .keyBy(...) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) .<window function>(...)// processing-time session windows with dynamic gapinput .keyBy(...) .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] { override def extract(element: T): Long = { // determine and return session gap } })) .<window function>(...)
一种基于时间(Time-based Window):TimeWindow
窗口分配器(WindowAssigner)
增量计算指的是窗口保存一份中间数据,每流入一个新元素,新元素与中间数据两两合一,生成新的中间数据,再保存到窗口中
ReduceFunction
AggregateFunction
增量计算
全量计算指的是窗口先缓存该窗口所有元素,等到触发条件后对窗口内的全量元素执行计算。
ProcessWindowFunction
ProcessWindowFunction相比AggregateFunction和ReduceFunction的应用场景更广,能解决的问题也更复杂。但ProcessWindowFunction需要将窗口中所有元素作为状态存储起来,这将占用大量的存储资源,尤其是在数据量大窗口多的场景下,使用不慎可能导致整个程序宕机。比如,每天的数据在TB级,我们需要Slide为十分钟Size为一小时的滑动窗口,这种设置会导致窗口数量很多,而且一个元素会被复制好多份分给每个所属的窗口,这将带来巨大的内存压力。
全量计算
对于一个窗口来说,Flink先增量计算,窗口关闭前,将增量计算结果发送给ProcessWindowFunction作为输入再进行处理。
ProcessWindowFunction与增量计算相结合
窗口函数
增量计算窗口函数对每个新流入的数据直接进行聚合,Trigger决定了在窗口结束时将聚合结果发送出去
全量计算窗口函数需要将窗口内的元素缓存,Trigger决定了在窗口结束时对所有元素进行计算然后将结果发送出去
每个窗口都有一个默认的Trigger,比如前文这些例子都是基于Processing Time的时间窗口,当到达窗口的结束时间时,Trigger以及对应的计算被触发
如果我们有一些个性化的触发条件,比如窗口中遇到某些特定的元素、元素总数达到一定数量或窗口中的元素到达时满足某种特定的模式时,我们可以自定义一个Trigger
甚至可以在Trigger中定义一些提前计算的逻辑,比如在Event Time语义中,虽然Watermark还未到达,但是我们可以定义提前计算输出的逻辑,以快速获取计算结果,获得更低的延迟。
决定何时启动Window Function来处理窗口中的数据以及何时将窗口内的数据清理
每当窗口的Watermark时间戳到达窗口的结束时间,Trigger会发送FIRE
Event Time的窗口会有一个EventTimeTrigger
ProcessingTimeTrigger对应Processing Time窗口
CountTrigger对应Count-based窗口。
WindowAssigner的默认trigger
CONTINUE:什么都不做
FIRE:启动计算并将结果发送给下游,不清理窗口数据。
PURGE:清理窗口数据但不执行计算。
FIRE_AND_PURGE:启动计算,发送结果然后清理窗口数据。
TriggerResult的结果
在股票或任何交易场景中,我们比较关注价格急跌的情况,默认窗口长度是60秒,如果价格跌幅超过5%,则立即执行Window Function,如果价格跌幅在1%到5%之内,那么10秒后触发Window Function。
在自定义Trigger时,如果使用了状态,一定要使用clear方法将状态数据清理,否则随着窗口越来越多,状态数据会越积越多。
自定义Trigger
触发器:Trigger
清除器(Evictor)是在WindowAssigner和Trigger的基础上的一个可选选项,用来清除一些数据。我们可以在Window Function执行前或执行后调用Evictor。
evictBefore和evictAfter分别在Window Function之前和之后被调用,窗口的所有元素被放在了Iterable<TimestampedValue<T>>,我们要实现自己的清除逻辑。当然,对于增量计算的ReduceFunction和AggregateFunction,我们没必要使用Evictor。
CountEvictor保留一定数目的元素,多余的元素按照从前到后的顺序先后清理
TimeEvictor保留一个时间段的元素,早于这个时间段的元素会被清理。
Flink提供了几个实现好的Evictor:
清除器:Evictor
https://zhuanlan.zhihu.com/p/102325190
窗口
为了解决数据到达 Flink 之前发生的乱序问题,用 EventTime 和 WaterMark 进行配合使用。
what
水印的出现是为了解决实时计算中的数据乱序问题,它的本质是 DataStream 中一个带有时间戳的元素
如果 Flink 系统中出现了一个 WaterMark T,那么就意味着 EventTime < T 的数据都已经到达,窗口的结束时间和 T 相同的那个窗口被触发进行计算了。
也就是说:水印是 Flink 判断迟到数据的标准,同时也是窗口触发的标记
在程序并行度大于 1 的情况下,会有多个流产生水印和窗口,这时候 Flink 会选取时间戳最小的水印。
was
Watermaker = 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间
watermark 时间 >= window_end_time;
Flink 在用时间 + 窗口 + 水印来解决实际生产中的数据乱序问题,有如下的触发条件:
如何计算Watermaker
Watermaker是用来触发窗口计算的!
Watermaker有什么用?
1.窗口中有数据
2.Watermaker >= 窗口的结束时间
窗口计算的触发条件为:
Watermaker如何触发窗口计算的?
/** * @author WGR * @create 2021/9/13 -- 15:18 */public class WindowTest3_EventTimeWindow { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setAutoWatermarkInterval(100); // socket文本流 DataStream<String> inputStream = env.socketTextStream(\"192.168.1.180\
标准
/** * @author WGR * @create 2021/9/14 -- 11:04 */public class WindowTest4_EventTimeWindow { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setAutoWatermarkInterval(100); // socket文本流 DataStream<String> inputStream = env.socketTextStream(\"localhost\
OutputTag+allowedLateness
watermarker是将窗口触发时间延迟n秒,用以接收延迟到来的时间窗口期内的eventtime范围时间,窗口开始结束时间是严格的窗口时间范围,但是出发结束的时间被延迟了。
滑动窗口是将窗口前沿时间扩大到n时间以前,扩大统计时间范围
滚动窗口是 统计时间和滑动时间相同的滑动窗口
自我总结:
https://blog.csdn.net/q322625/article/details/110176094
https://www.cnblogs.com/dalianpai/p/15268363.html
refer
水位线 Watermaker
checkpoint
短时间内流量陡增造成数据的堆积或者消费速度变慢
数据的消费速度小于数据的生产速度
特点
导致checkpoint超时
影响state大小:拖慢checkpoint,设置到时OOM
危害
排查时先把operator chain禁用,方便定位到具体算子
该节点发送速率赶不上数据接收速率
下游节点接收速率较慢,通过反压限制了该节点的发送
Flink Web Ul
利用metic定位
定位
可以在 Flink 的后台管理页面看到每个 Task 处理数据的大小。当数据倾斜出现时,通常是简单地使用类似 KeyBy 等分组聚合函数导致的,需要用户将热点 Key 进行预处理,降低或者消除热点 Key 的影
数据倾斜
通过查看运行机器节点的 CPU 和内存情况定位问题
代码执行效率问题
通过 -XX:+PrintGCDetails 参数查看 GC 的日志
分析GC情况
外部组件交互
原因
backpress
flink
0 条评论
回复 删除
下一页