Fink
2025-08-04 08:45:08 1 举报
AI智能生成
1
作者其他创作
大纲/内容
有界数据无界数据
前置技能
Lambda
SQL
常见函数
DQL
DML
Scala
编程模型
FinkSQL & Flink Table
代码量少 以实现常见功能为主
以SQL语句为主
代码量少 以实现常见功能为主
以SQL语句为主
Stream & Batch Data Processing
代码量增多
以java 和Scala 代码为主
代码量增多
以java 和Scala 代码为主
Process Function
代码量非常多,功能都是自己实现
以java 和 scala代码为主
代码量非常多,功能都是自己实现
以java 和 scala代码为主
编程模型
运行环境获取集群运行环境
Source
获取数据源 file MQ HDFS
Transformation
数据的转换
Sink
数据输出
代码实现
Source
读取数据
Transformation
FlatMap
Map
Group
Sum
Sink
print
TypeInfomation
基础框架
任务并行度
执行环境、
批处理
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
流处理
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
本地web环境
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new
Configuration());
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new
Configuration());
通用运行架构
流程
1.提交flink jar (source1 -> map2 -> keyBy2 -> sink2)
2.提交作业到yarn
3.yarn RM 初始化 AM
3.Clink 将作业构建为 StreamGraph 提交到 AM
4.在AM中 会初始化flinkMaster 三个角色 Dispatch jobmanager flink RM
Disptch:初始化对象 webUI
jobManager
协调分配任务的
Flink RM :计算需要申请多少资源
向YARN RM 申请资源
向YARN RM 申请资源
5.JobMaster 将作业继续解析为
StreamGraph
中间进入缓冲区 序列化和反序列化 并行度为2
总共做了两次序列化两次反序列化 两次cpu调度
总共做了两次序列化两次反序列化 两次cpu调度
因为并行度相同
内存需要切换 需要序列化和返序列化
算子链技术
jobGraph 优化 算子链优化 不产生序列化 cpu调度
ExecutionGraph
物理执行图
solt
并行度支持
Flink 作业的并行度(Parallelism)通过 Slot 实现。每个并行任务(Subtask)会被分配到一个 Slot 中执行。例如,若作业并行度为 3,则至少需要 3 个 Slot 来运行这些并行任务。
任务链优化
属于同一作业且上下游相连的算子(Operator)会被优化为一个任务链(Task Chain),这些任务可以共享同一个 Slot,减少线程间通信开销,提高执行效率。
Flink 作业的并行度(Parallelism)通过 Slot 实现。每个并行任务(Subtask)会被分配到一个 Slot 中执行。例如,若作业并行度为 3,则至少需要 3 个 Slot 来运行这些并行任务。
任务链优化
属于同一作业且上下游相连的算子(Operator)会被优化为一个任务链(Task Chain),这些任务可以共享同一个 Slot,减少线程间通信开销,提高执行效率。
Source
Transformation
Common
map:转换
source.map(word -> "flink_"+word).print().setParallelism(1);
flatMap:压缩
filter:过滤
sum min max minBy maxBy :滚动聚合
reduce:自定义的滚动聚合
lterate
Sink
Common
File
Console
System.currentTimeMillis()
source.writeAsText
.writeAsCsv
Socket
Connectors
Kafka
nc -lp 19523
environment.socketTextStream("localhost", 19523);
JDBC
Custom
自定义
Partition
GlobalPartitioner
时间语义
事件时间
EventTime
EventTime
事件时间程序员需要将其嵌入到数据的内部
将来分析数据的时候,只需要将数据的时间戳解析出来就可以获取到事件事件,而且事件时间是唯一的
将来分析数据的时候,只需要将数据的时间戳解析出来就可以获取到事件事件,而且事件时间是唯一的
默认:1.12以后
摄入时间
处理时间
ProcessTime
ProcessTime
处理时间是指正在执行相应操作的机器的系统时间
每个机器的系统时间有可能略有差异
数据在上下游处理的过程中,上下游的处理时间也有可能略有差异
每个机器的系统时间有可能略有差异
数据在上下游处理的过程中,上下游的处理时间也有可能略有差异
默认:1.12以前
窗口函数
定义
查询网站最近100笔交易的平均成交价
查询网站每分钟被点击了多少次
将flink的无界流数据按照一定的规则{次数-时间}切分成多个小窗口 然后计算窗口的结果 将Stream->Batch的过程
查询网站每分钟被点击了多少次
将flink的无界流数据按照一定的规则{次数-时间}切分成多个小窗口 然后计算窗口的结果 将Stream->Batch的过程
分类
基于时间驱动
基于数量驱动
基于数量驱动
Keyed
根据数据进入窗口前是否需要进行key分组
如果分组 则每一个分组内的数据要单独进行计算
如果分组 则每一个分组内的数据要单独进行计算
zr:1
hsy:2
zr:2
hsy:1
zr:3
hsy4
zr:5
hsy:8
hsy:2
zr:2
hsy:1
zr:3
hsy4
zr:5
hsy:8
keyed
window 每三次数据计算一次
zr:1
zr:2
zr:3
--------6------窗口结束
zr:2
zr:3
--------6------窗口结束
hsy:2
hsy:3
hsy:4
--------9------窗口结束
hsy:3
hsy:4
--------9------窗口结束
no-keyed
windowAll 每三次数据计算一次
zr:1
hsy:2
zr:2
--------5------窗口结束
hsy:2
zr:2
--------5------窗口结束
窗口函数
Count Window
keyed
Tumbling Window滚动窗口
我们需要指定一下窗口的大小{数据的数量}
滑动窗口的大小是固定的,且各自范围之间不会重叠
触发条件:数据的数量等于窗口的数量
我们需要指定一下窗口的大小{数据的数量}
滑动窗口的大小是固定的,且各自范围之间不会重叠
触发条件:数据的数量等于窗口的数量
SlidingWindow 滑动窗口
滑动窗口需要传入两个参数 size 和 slide 前者标识窗口的大小 后者标识 滑动的步长
如果 slide 小于 窗口大小 ,滑动窗口可以窗口叠加
如果 slide 大于窗口大小 可能会丢失数据
slide 等于 size = 滚动窗口
触发条件:只要满足滑动的条件即可,窗口的大小就是一个上限 数据量不满足上下也会被计算
滑动窗口需要传入两个参数 size 和 slide 前者标识窗口的大小 后者标识 滑动的步长
如果 slide 小于 窗口大小 ,滑动窗口可以窗口叠加
如果 slide 大于窗口大小 可能会丢失数据
slide 等于 size = 滚动窗口
触发条件:只要满足滑动的条件即可,窗口的大小就是一个上限 数据量不满足上下也会被计算
No-Keyed
Window All
Window All 也分为滚动 和 滑动 不需要keyed
代码实现
数量
如果使用count 进行统计如果达不到触发条件 是不会进行计算的 有时候不知道下次的计算什么时候开始 就无法触发
Time Window
类型分类
时间分类
事件时间
EventTime
EventTime
处理时间
ProcessTime
ProcessTime
假设时间窗口定于为五秒 是从当前整点的五秒为一个窗口 不是开启程序的时间 5 - 10 = 15
Keyde 基于 Processing Time
keyed
Tumbling
滚动窗口大小是固定的各自范围之间不重叠
触发条件:时间窗口得到五秒的宽度并且当前窗口包含数据
触发条件:时间窗口得到五秒的宽度并且当前窗口包含数据
Sliding
size slide 滑动窗口可以窗口重叠
触发条件:以滑动的时间为计算标准 只要当前窗口有数据 即使窗口的宽度不足窗口的宽度也会进行计算
触发条件:以滑动的时间为计算标准 只要当前窗口有数据 即使窗口的宽度不足窗口的宽度也会进行计算
Session
会话:服务器默认会保存客户端的session 时长为30分钟 如果在这三十分钟内再次访问服务器 则时间会被重置
窗口是从第一次发送数据开始,就会创建一个会话,保存的时常为Size 如果超过这个时长没发送过数据 则窗口就开始计算
没有固定的开始或者结束时间 开始:第一次接收到数据 结束:超过会话时长还没接收到数据
特点:窗口内的数据量不可控和会话内接受数据量相关会话窗口不会重叠
no-keyed
Keyde 基于Event Time
Global
增量聚合函数
Reduce Function AggregateFunction
全窗口聚合函数
WindowFunction Process WindowFunction
Flink WaterMark 水位线
无序数据:本窗口最大的水位线 要比上一个大
水位线需要周期性生成,不能一对一
有序数据:周期内最后一条
无序数据:周期内最大的一条
有序数据:周期内最后一条
无序数据:周期内最大的一条
无序数据的特殊处理
如果迟到的数据在窗口之外有可能就会被拉下
所以设置延迟时间,就会将整个系统的水位线拉低
如果迟到的数据在窗口之外有可能就会被拉下
所以设置延迟时间,就会将整个系统的水位线拉低
为了统一系统时间
引入事件时间
引入水位线
导致
有序数据最后
无序数据最大
有序数据最后
无序数据最大
无序数据导致迟到
延迟水位线
状态
状态后端
状态后端就是维护状态的
当这个算子的操作计算需要借助历史数据的情况下才需要保存状态
Local State Management 本地状态管理
保证状态的更新和访问
存储在JVM中或者Fs中
存储在RocksDB中
存储在JVM中或者Fs中
存储在RocksDB中
旧版本
MemoryStateBackend 基于内存
FsStateBackend 基于磁盘
RocksDBStateBackend 基于RocksDB 存储
新版本
HashMapStateBackend【默认】
基于内存和磁盘
EmbeddedRocksDBStateBackend
抉择
1.1.3以后统一了存储格式
Remote State Checkpointing 远程状态备份
将本地的State备份到远端的存储介质上
内存 (测试)
分布式文件系统 HDFS
内存 (测试)
分布式文件系统 HDFS
JobManagerCK 内存
FlieSystemCk 磁盘
总结:状态后端就是维护本地数据状态的读取访问和远端的备份
代码:
·
TTL Tiome To Live
为状态设置时长
当我们对数据进行更新或者读写 时长重置
当我们对数据进行更新或者读写 时长重置
代码:
setTtl:设置一个过期时长
上一次访问的时间戳+ttl超过了这个时间 表明状态过期
上一次访问的时间戳+ttl超过了这个时间 表明状态过期
SetUpdateType: 状态的更新策略
禁用
创建或写入
读取或写入
SetStateVlsibility:失效状态的可见性
可以返回未被清理的状态
永远不返回失效的状态
SetTtlTimeCharacteristic:时间语义
窗口联结
Inner
容错机制ESO
数据处理语义
多个Source 读取数据 节点 ,多个Operator 节点 多个Sink 节点
每个节点的并行度有可能有差异,且每个节点都有可能发生故障
每个节点的并行度有可能有差异,且每个节点都有可能发生故障
At-most-once
数据最多发送一次,有可能丢失数据
At-lest-once
数据最少发送一次 无应答继续发送,可能导致数据重复
Exactly-once
每一条消息只能被流处理系统处理一次
At-lest-once + 去重操作
At-lest-once + 去重操作
End-to-End Exactly-Once
端到端的只有一次
数据从其他组件读取到flink然后再写到其他组件
CheckPoint
是flink的容错机制,使得任务失败的时候可以进行重启而不丢失之前的一些信息
分布式快照
Chandy-Lamport
Chandy-Lamport
特定的时间
记录下来分布式系统的全局状态 【global state】
记录下来分布式系统的全局状态 【global state】
global state 要包含所有进程的状态以及所有channel的状态
Barrier
同时存储状态和管道数据太复杂了
所以flink想到一个方法只存储状态
如果只存储状态--让管道没有数据即可,需要拍摄快照时将后续的数据全部拦截不进行发送等最后一条记录依次通过各个算子,计算完成后拍摄算子的状态信息
所以flink想到一个方法只存储状态
如果只存储状态--让管道没有数据即可,需要拍摄快照时将后续的数据全部拦截不进行发送等最后一条记录依次通过各个算子,计算完成后拍摄算子的状态信息
发生故障时候去检查点找最后一次有效状态
上面的思想有个重大问题就是:检查点触发的时候会导致数据源无法向下游派发数据 ,导致集群的实时计算被停滞
数据在传递的过程中有可能只进入某个算子的task中进行计算
解决这些问题引入Barrier :会以检查点ID为数据发送一个barrier这个barrier 在传递的过程中都会被广播到下游
barrier不会进行任何的计算 就是为了分割数据
barrier之前是当前检查点数据
barrier之后是下一个检查点数据
当算子的task接受到barrier说明当前检查点的数据全部被执行完,就会保存当前的task状态
发送barrier之后 后续的数据继续传输 整个集群还是一个实时计算 flink只需要保存状态就可以
Barrier对齐:
等到上游的barrier都到齐,才能保存当前任务的状态
等到上游的所有并行子分区barrier都到齐,才去保存当前任务状态
等到上游的barrier都到齐,才能保存当前任务的状态
等到上游的所有并行子分区barrier都到齐,才去保存当前任务状态
如果barrier1 先到达下游 barrier2还在管道中
此时无法存储状态,因为barrier2到来之前说明当前检查点的数据还有一部分缺失
barrier1:不能计算barrier之后的数据【这些数据属于下一个检查点】还要将自己管道的数据库先存储到当前算子缓存中
barrier2:继续发送数据知道barrrier2到达
当1和2都到之后才能保存状态
此时无法存储状态,因为barrier2到来之前说明当前检查点的数据还有一部分缺失
barrier1:不能计算barrier之后的数据【这些数据属于下一个检查点】还要将自己管道的数据库先存储到当前算子缓存中
barrier2:继续发送数据知道barrrier2到达
当1和2都到之后才能保存状态
检查点调度器
Barrier缺点:先到达分区的要做缓存,会造成数据堆积(背压)(对不齐)
第一次收到barrier就开始存档
后续的数据都进行标记
保存状态和标记数据
定时执行 自动存档
SavePoint
手动存档
触发方式
容错策略
重启策略
恢复策略
端到端 STS
source
端到端的source 前提
能够重复消费
kafkaSource 能够记录偏移量 能够重放数据 将偏移量记录在state中
精确一次&有效一次
没有引擎可以保证正好处理一次 精确一次
处理一次+恢复一次=2次
同一个数据及时处理多次得到的效果还是相同的
At-least-once +去重
At-least-once +幂等
分布式快照 CK
Transformation
<Poerator>
<Poerator>
算子 state + checkPoint
Sink
端到端的Sink前提:防止数据重复写入 要支持回滚
文本型:幂等方式
数据库型: 幂等 事务 2pc
集群搭建
Local
测试自己开发的代码
standalone
资源管理+任务调度 Flink本身自带的
环境搭建
JobManager
TaskManage
Yarn
资源管理+任务调度
CEP
反压
Sql
编程流程
Environment
Source
Transformation
Sink
运行环境
table总是与特定的TableEnvironment绑定
TableEnvironment是TableAPI 和SQL的核心概念
table总是与特定的TableEnvironment绑定
不论输入数据是流还是批 TableAPI 和 sql 查询都会被转换成DataStream 程序
创建表和视图
Catalog:数据库:表名
Catalog:数据库:表名
表:保存数据 和真实数据对应
临时表
依赖于会话,会话结束数据消失
依赖于会话,会话结束数据消失
屏蔽:如果临时表和永久表的标识符相同那么临时表会屏蔽永久表知道临时表失效
永久表
依赖于元数据 需要手动删除
依赖于元数据 需要手动删除
视图:显示数据内存中进行操作
临时视图
依赖于会话,会话结束视图消失
依赖于会话,会话结束视图消失
数据类型
基础类型常用类型
Pojo Get Set
Tuple
Row 类型
数据库日志相关
给我数据库的日志就可以进行解析
Mysql Oracle
查询
TableAPI
是一个类
SQL
字符串
输出表
insertinto
表流转换
执行计划
CBO
基于成本
RBO
基于规则
常量折叠
谓词下推
投影下推
Hash Join
Transformation Tree
0 条评论
下一页