Fink
2025-08-04 08:45:08 1 举报
AI智能生成
1
作者其他创作
大纲/内容
有界数据无界数据
前置技能
Lambda<br>
SQL<br>
常见函数
DQL<br>
DML<br>
Scala
编程模型
FinkSQL & Flink Table<br>代码量少 以实现常见功能为主<br>以SQL语句为主<br>
Stream & Batch Data Processing<br>代码量增多<br>以java 和Scala 代码为主<br>
Process Function<br>代码量非常多,功能都是自己实现<br>以java 和 scala代码为主<br>
编程模型
运行环境获取集群运行环境
Source<br>
获取数据源 file MQ HDFS<br>
Transformation<br>
数据的转换
Sink<br>
数据输出<br>
<br>
代码实现
Source<br>
读取数据
<br>
Transformation<br>
FlatMap<br>
Map
Group
Sum
Sink
print
TypeInfomation<br>
基础框架
任务并行度
执行环境、
批处理
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
流处理
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
本地web环境<br>
StreamExecutionEnvironment environment =<br>StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new<br>Configuration());
通用运行架构
<br>
<br>
<br>
<br>
流程
1.提交flink jar (source1 -> map2 -> keyBy2 -> sink2)<br>
2.提交作业到yarn<br>
3.yarn RM 初始化 AM<br>
3.Clink 将作业构建为 StreamGraph 提交到 AM<br>
4.在AM中 会初始化flinkMaster 三个角色 Dispatch jobmanager flink RM<br>
Disptch:初始化对象 webUI<br>
jobManager<br>
协调分配任务的
Flink RM :计算需要申请多少资源 <br>向YARN RM 申请资源<br>
5.JobMaster 将作业继续解析为<br>
StreamGraph<br>
<br>
中间进入缓冲区 序列化和反序列化 并行度为2 <br>总共做了两次序列化两次反序列化 两次cpu调度<br>
因为并行度相同
内存需要切换 需要序列化和返序列化
算子链技术
jobGraph 优化 算子链优化 不产生序列化 cpu调度<br>
ExecutionGraph<br>
物理执行图
solt<br>
并行度支持<br>Flink 作业的并行度(Parallelism)通过 Slot 实现。每个并行任务(Subtask)会被分配到一个 Slot 中执行。例如,若作业并行度为 3,则至少需要 3 个 Slot 来运行这些并行任务。<br>任务链优化<br>属于同一作业且上下游相连的算子(Operator)会被优化为一个任务链(Task Chain),这些任务可以共享同一个 Slot,减少线程间通信开销,提高执行效率。
Source<br>
Transformation<br>
Common<br>
map:转换<br>
source.map(word -> "flink_"+word).print().setParallelism(1);
flatMap:压缩<br>
filter:过滤<br>
sum min max minBy maxBy :滚动聚合<br>
reduce:自定义的滚动聚合<br>
lterate
Sink
Common<br>
File<br>
Console
System.currentTimeMillis()
source.writeAsText
.writeAsCsv
Socket
Connectors
Kafka
nc -lp 19523<br>
environment.socketTextStream("localhost", 19523);
<br>
JDBC<br>
Custom
自定义
Partition<br>
GlobalPartitioner
时间语义
事件时间<br>EventTime<br>
事件时间程序员需要将其嵌入到数据的内部<br>将来分析数据的时候,只需要将数据的时间戳解析出来就可以获取到事件事件,而且事件时间是唯一的
默认:1.12以后<br>
摄入时间
处理时间<br>ProcessTime<br>
处理时间是指正在执行相应操作的机器的系统时间<br>每个机器的系统时间有可能略有差异<br>数据在上下游处理的过程中,上下游的处理时间也有可能略有差异
默认:1.12以前<br>
窗口函数
定义
查询网站最近100笔交易的平均成交价<br>查询网站每分钟被点击了多少次<br>将flink的无界流数据按照一定的规则{次数-时间}切分成多个小窗口 然后计算窗口的结果 将Stream->Batch的过程<br>
分类
基于时间驱动<br>基于数量驱动
Keyed
根据数据进入窗口前是否需要进行key分组<br>如果分组 则每一个分组内的数据要单独进行计算
zr:1<br>hsy:2<br>zr:2<br>hsy:1<br>zr:3<br>hsy4<br>zr:5<br>hsy:8<br>
keyed<br>
window 每三次数据计算一次<br>
zr:1<br>zr:2<br>zr:3<br>--------6------窗口结束<br>
hsy:2<br>hsy:3<br>hsy:4<br>--------9------窗口结束
no-keyed<br>
windowAll 每三次数据计算一次<br>
zr:1<br>hsy:2<br>zr:2<br>--------5------窗口结束<br>
窗口函数
Count Window<br>
keyed<br>
<font color="#e74f4c">Tumbling Window</font>滚动窗口<br>我们需要指定一下窗口的大小{数据的数量}<br>滑动窗口的大小是固定的,且各自范围之间不会重叠<br><font color="#e74f4c">触发条件:数据的数量等于窗口的数量</font><br>
<font color="#e74f4c">SlidingWindow</font> 滑动窗口<br>滑动窗口需要传入两个参数 size 和 slide 前者标识窗口的大小 后者标识 滑动的步长<br>如果 slide 小于 窗口大小 ,滑动窗口可以窗口叠加<br>如果 slide 大于窗口大小 可能会丢失数据<br>slide 等于 size = 滚动窗口<br><font color="#e74f4c">触发条件:只要满足滑动的条件即可,窗口的大小就是一个上限 数据量不满足上下也会被计算</font><br>
No-Keyed<br>
Window All<br>
Window All 也分为滚动 和 滑动 不需要keyed<br>
代码实现
<br>
数量
如果使用count 进行统计如果达不到触发条件 是不会进行计算的 有时候不知道下次的计算什么时候开始 就无法触发<br>
Time Window<br>
类型分类
时间分类
事件时间<br>EventTime<br>
处理时间<br>ProcessTime<br>
假设时间窗口定于为五秒 是从当前整点的五秒为一个窗口 不是开启程序的时间 5 - 10 = 15
Keyde 基于 Processing Time <br>
keyed
Tumbling<br>
滚动窗口大小是固定的各自范围之间不重叠<br><font color="#e74f4c">触发条件:时间窗口得到五秒的宽度并且当前窗口包含数据</font><br>
Sliding
size slide 滑动窗口可以窗口重叠<br>触发条件:以滑动的时间为计算标准 只要当前窗口有数据 即使窗口的宽度不足窗口的宽度也会进行计算<br>
Session
<br>
会话:服务器默认会保存客户端的session 时长为30分钟 如果在这三十分钟内再次访问服务器 则时间会被重置<br>
窗口是从第一次发送数据开始,就会创建一个会话,保存的时常为Size 如果超过这个时长没发送过数据 则窗口就开始计算<br>
没有固定的开始或者结束时间 开始:第一次接收到数据 结束:超过会话时长还没接收到数据<br>
特点:窗口内的数据量不可控和会话内接受数据量相关会话窗口不会重叠<br>
no-keyed
Keyde 基于Event Time<br>
Global <br>
增量聚合函数
Reduce Function AggregateFunction<br>
全窗口聚合函数
WindowFunction Process WindowFunction<br>
Flink WaterMark 水位线<br>
无序数据:本窗口最大的水位线 要比上一个大<br>
水位线需要周期性生成,不能一对一<br>有序数据:周期内最后一条<br>无序数据:周期内最大的一条<br>
无序数据的特殊处理<br>如果迟到的数据在窗口之外有可能就会被拉下<br>所以设置延迟时间,就会将整个系统的水位线拉低
<br>
为了统一系统时间
引入事件时间
引入水位线
导致<br>有序数据最后<br>无序数据最大
无序数据导致迟到
延迟水位线
状态
状态后端
状态后端就是维护状态的
<br>
当这个算子的操作计算需<font color="#e74f4c">要借助历史数据的情况下</font>才需要保存状态
Local State Management 本地状态管理<br>
保证状态的更新和访问<br>存储在JVM中或者Fs中<br>存储在RocksDB中<br>
旧版本
MemoryStateBackend 基于内存<br>
FsStateBackend 基于磁盘<br>
RocksDBStateBackend 基于RocksDB 存储<br>
新版本
HashMapStateBackend【默认】<br>
基于内存和磁盘
<br>
EmbeddedRocksDBStateBackend<br>
<br>
抉择
1.1.3以后统一了存储格式<br>
Remote State Checkpointing 远程状态备份<br>
将本地的State备份到远端的存储介质上<br>内存 (测试)<br>分布式文件系统 HDFS<br>
<br>
JobManagerCK 内存<br>
FlieSystemCk 磁盘<br>
总结:状态后端就是维护本地数据状态的读取访问和远端的备份<br>
代码:<br>
<br>
·
<br>
TTL Tiome To Live<br>
为状态设置时长<br>当我们对数据进行更新或者读写 时长重置
代码: <br>
setTtl:设置一个过期时长 <br>上一次访问的时间戳+ttl超过了这个时间 表明状态过期<br>
SetUpdateType: 状态的更新策略<br>
<br>
禁用
创建或写入
读取或写入
SetStateVlsibility:失效状态的可见性<br>
<br>
可以返回未被清理的状态
永远不返回失效的状态
SetTtlTimeCharacteristic:时间语义<br>
<br>
<br>
窗口联结
Inner<br>
<br>
容错机制ESO<br>
数据处理语义
多个Source 读取数据 节点 ,多个Operator 节点 多个Sink 节点<br>每个节点的并行度有可能有差异,且每个节点都有可能发生故障
At-most-once<br>
数据最多发送一次,有可能丢失数据
At-lest-once<br>
数据最少发送一次 无应答继续发送,可能导致数据重复
Exactly-once<br>
每一条消息只能被流处理系统处理一次<br>At-lest-once + 去重操作<br>
End-to-End Exactly-Once<br>
端到端的只有一次
数据从其他组件读取到flink然后再写到其他组件<br>
CheckPoint<br>
是flink的容错机制,使得任务失败的时候可以进行重启而不丢失之前的一些信息<br>
分布式快照<br>Chandy-Lamport<br>
特定的时间<br>记录下来分布式系统的全局状态 【global state】<br>
global state 要包含<font color="#ec7270">所有进程的状态以及所有</font><font color="#70d5d7">channel的状态</font><br>
Barrier<br>
同时存储状态和管道数据太复杂了<br>所以flink想到一个方法只存储状态<br>如果只存储状态--让管道没有数据即可,需要拍摄快照时将后续的<font color="#70d5d7">数据全部拦截</font>不进行发送等最后一条记录依次通过各个算子,计算完成后拍摄算子的状态信息
<br>
发生故障时候去检查点找最后一次有效状态
上面的思想有个重大问题就是:检查点触发的时候会导致数据源无法向下游派发数据 ,导致集群的实时计算被停滞<br>
数据在传递的过程中有可能只进入某个算子的task中进行计算<br>
解决这些问题引入Barrier :会以检查点ID为数据发送一个barrier这个barrier 在传递的过程中都会被广播到下游<br>
barrier不会进行任何的计算 就是为了分割数据<br>
barrier之前是当前检查点数据<br>
barrier之后是下一个检查点数据<br>
当算子的task接受到barrier说明当前检查点的数据全部被执行完,就会保存当前的task状态<br>
发送barrier之后 后续的数据继续传输 整个集群还是一个实时计算 flink只需要保存状态就可以<br>
<br>
Barrier对齐:<br>等到上游的barrier都到齐,才能保存当前任务的状态<br>等到上游的所有并行子分区barrier都到齐,才去保存当前任务状态<br>
如果barrier1 先到达下游 barrier2还在管道中<br>此时无法存储状态,因为barrier2到来之前说明当前检查点的数据还有一部分缺失<br>barrier1:<font color="#ec7270">不能计算barrier之后的数据【这些数据属于下一个检查点</font>】还要将自己管道的数据库<font color="#e74f4c">先存储到当前算子缓存中</font><br>barrier2:继续发送数据知道barrrier2到达<br>当1和2都到之后才能保存状态 <br>
<br>
<br>
<br>
检查点调度器
Barrier缺点:先到达分区的要做缓存,会造成数据堆积(背压)(对不齐)<br>
第一次收到barrier就开始存档<br>
后续的数据都进行标记
保存状态和标记数据
<br>
定时执行 自动存档
SavePoint<br>
手动存档
<br>
触发方式
<br>
容错策略<br>
重启策略
恢复策略
端到端 STS<br>
source<br>
<br>
端到端的source 前提<br>
能够重复消费
kafkaSource 能够记录偏移量 能够重放数据 将偏移量记录在state中<br>
精确一次&有效一次<br>
没有引擎可以保证正好处理一次 精确一次
处理一次+恢复一次=2次<br>
同一个数据及时处理多次得到的效果还是相同的
At-least-once +去重<br>
At-least-once +幂等<br>
分布式快照 CK<br>
Transformation<br><Poerator><br>
算子 state + checkPoint<br>
Sink
端到端的Sink前提:防止数据重复写入 要支持回滚 <br>
文本型:幂等方式<br>
数据库型: 幂等 事务 2pc<br>
<br>
集群搭建
Local<br>
测试自己开发的代码
standalone
资源管理+任务调度 Flink本身自带的<br>
环境搭建
JobManager<br>
TaskManage<br>
Yarn
资源管理+任务调度<br>
CEP<br>
反压
Sql
编程流程
Environment<br>
Source<br>
Transformation<br>
Sink
运行环境
TableEnvironment是TableAPI 和SQL的核心概念
<br>table总是与特定的TableEnvironment绑定<br>
不论输入数据是流还是批 TableAPI 和 sql 查询都会被转换成DataStream 程序<br>
创建表和视图<br>Catalog:数据库:表名<br>
表:保存数据 和真实数据对应<br>
临时表<br>依赖于会话,会话结束数据消失
屏蔽:如果临时表和永久表的标识符相同那么临时表会屏蔽永久表知道临时表失效<br>
永久表<br>依赖于元数据 需要手动删除
视图:显示数据内存中进行操作<br>
临时视图<br>依赖于会话,会话结束视图消失
<br>
数据类型
基础类型常用类型
Pojo Get Set <br>
Tuple <br>
Row 类型<br>
数据库日志相关
给我数据库的日志就可以进行解析
Mysql Oracle<br>
查询
TableAPI<br>
是一个类
SQL<br>
字符串
输出表
insertinto<br>
表流转换
执行计划
CBO<br>
基于成本
RBO<br>
基于规则
常量折叠
谓词下推
投影下推
Hash Join<br>
Transformation Tree<br>
0 条评论
下一页