Kafka 原理详解(详解在注释里面,图片加载需要时间)
2023-07-04 08:40:44 0 举报
AI智能生成
知识要点和原理解析,详细信息和图片均在黄色图标的注释中,鼠标移动到黄色图标上即会显示,图片加载有时较慢。
作者其他创作
大纲/内容
消息队列实现方式<br>
点对点模式:一对一<br>
发布/订阅模式:一对多<br>
push(推)模式<br>
pull(拉)模式(kafka采用该方式)
架构图<br>
组件构成<br>
broker<br>
一台 kafka 服务器就是一个 broker<br>
一个kafka集群由多个 broker 组成<br>
一个 broker 可以容纳多个 topic的多个partition<br>
Topic(主题)<br>
kafka将消息以topic为单位进行归类<br>
在kafka集群中,可以有无数的主题<br>
生产者和消费者消费数据一般以主题为单位。更细粒度可以到分区级别<br>
一个topic 可以划分为多个partition,分布到多个 broker上管理<br>
Partition(分区)<br>
特征
topic是逻辑的概念,partition是物理的概念<br>
每个partition由一个kafka broker服务器管理(即一个broker包含一个或多个partition)<br>
partition 中的每条消息都会被分配一个递增的id(offset),每个 partition 是一个有序的队列,<b><font color="#f44336">kafka 只保证按一个 partition 中的消息的顺序</font></b>,不保证一个 topic 的整体(多个 partition 间)的顺序<br>
每个partition都可以有多个副本<br>
partition的表现形式就是一个一个的文件夹
每一个分区会有一个编号,编号从0开始<br>
Partition的副本数<br>
目的
保障 partition 的高可用<br>
特征
leader replica分布<br>
<b>轮询算法</b><br>
默认副本的<b><font color="#f44336">最大数量是10个</font></b>,且<b>副本的数量不能大于Broker的数量</b>,否则报错,一般情况下等于broker的个数
<b><font color="#f44336">follower和leader绝对是在不同的机器</font></b>,<b>同一机器对同一个分区也只可能存放一个副本</b>(包括自己)<br>
处于同步状态的副本叫做in-sync-replicas(ISR)<br>
<b><font color="#f44336">follower</font></b>通过<b>拉的方式</b>从<b><font color="#f44336">leader</font></b>同步数据<br>
消费者和生产者都是从<b>leader</b>读写数据,不与follower交互
好处<br>
对于 kafka 集群<br>
实现topic数据的负载均衡<br>
对于消费者<br>
提高并发度,提高效率
Segment<br>
特征
一个partition当中由多个segment文件组成<br>
每个segment文件,包含两部分,一个是.log文件,另外一个是.index文件。<br>
.log文件包含了发送的数据存储,.index文件记录的是.log文件的数据索引值,以便于加快数据的查询速度;
索引文件与数据文件的关系<br>
索引文件中元数据指向对应数据文件中message的物理偏移地址
消息Message<br>
字段的含义<br>
Producer(生产者)<br>
Consumer<br>
Consumer Group<br>
特征<br>
同一个消费者组里的所有消费者不能同时消费消息,只能有一个消费者去消费<br>
同一个消费者组里面是不会重复消费消息的<br>
同一个消费者组的一个消费者不是以一条一条数据为单元的,是以分区(partition)为单元,就相当于消费者和分区建立某种socket,进行传输数据,所以,一旦建立这个关系,这个分区的内容只能是由这个消费者消费
模式转换<br>
队列模型<br>
发布-订阅模型<br>
消息分流<br>
注意<br>
消费者数目及partition数目对应关系<br>
partition数目 > 消费者数目<br>
partition数目 = 消费者数目<br>
partition数目 < 消费者数目<br>
两个或多个消费者组<br>
Group Coordinator<br>
cluster、broker、topic、partition、消费者、费者组 关系<br>
线程安全<br>
Metadata<br>
MetadataCache<br>
topic 的详细信息<br>
概括
元数据的作用<br>
客户端<br>
可以通过元数据获取服务地址,进行通信。(类似于服务发现)<br>
服务端<br>
可以通过元数据共享集群状态,一旦出现状态变化能够快速感知到,并且让各个 broker 快速更新元数据去保持一致。
Producer Metadata 的更新策略<br>
1、 周期性的更新<br>
2、失效检测,强制更新<br>
如何触发<br>
在 NetworkClient 的 <b><font color="#f44336">poll() </font></b>方法调用时,就会去检查这两种更新机制,<b><font color="#f44336">只要达到其中一种,就行触发更新操作</font></b>
Metadata更新时特点<br>
异步发送<br>
负载选择<br>
生产者<br>
生产者缓存架构<br>
主线程的逻辑<br>
Sender 线程的逻辑<br>
生产者拦截器 ProducerInterceptor<br>
分类
生产者拦截器<br>
自定义拦截器<br>
序列化(Serializer)<br>
分区器(Partitioner)
分区策略
分区作用
解决水平扩展的问题<br>
解决消息顺序读取的问题<br>
解决负载均衡的问题<br>
键(key)的作用<br>
ProducerRecord对象<br>
key的作用<br>
作为消息的附加信息<br>
用来决定消息被写到主题的哪个分区中。拥有相同键的消息会被写到同一个分区中。
情况分类<br>
情况一:键为空,不指定分区器<br>
情况二:键为空,指定了分区器<br>
partition方法<br>
情况三:键不为空,指定了分区器<br>
情况四:键不为空,没有指定分区器<br>
分区策略<br>
Partitioner接口<br>
策略分类<br>
BuiltInPartitioner(内置的默认分区器):<b>随机</b><br>
nextPartition方法<br>
DefaultPartitioner(默认分区器)- <font color="#f44336"><b>已废弃</b></font><br>
DefaultPartitioner核心逻辑<br>
UniformStickyPartitioner(统一粘性分区器)- <font color="#f44336"><b>已废弃</b></font><br>
如何选择新的粘性分区<br>
与DefaultPartitioner不同点<br>
RoundRobinPartitioner(轮询分区器)<br>
逻主要辑源码<br>
不是真的轮询
自定义分区策略
实现Partitioner接口、partition方法<br>
配置自定义分区策略<br>
弃用默认分区器DefaultPartitioner
DefaultPartitioner策略<br>
弃用的原因<br>
分配倾斜<br>
配倾斜出现的原因<br>
linger.ms <br>
举例<br>
对粘性分区策略问题的优化方案<br>
<b><font color="#f44336">partitioner.class</font></b>将具有<b><font color="#f44336">默认值nul</font></b><font color="#f44336"><b>l</b></font>
改进的主要变化<br>
2.8.0版本的partition()方法逻辑
3.3.0版本partition()方法逻辑
消息累加器(<b><font color="#000000">优化点</font></b>)<br>
构成<br>
结构图
消息缓存模型<br>
ProducerBatch的内存大小<br>
内存分配<br>
ProducerBatch的创建和释放<br>
1、内存16K,<b>缓存池</b>中<b>有可用内存</b><br>
2、内存16K,<b>缓存池</b>中<b>无可用内存</b><br>
3、内存非16K,<b>非缓存池</b>中<b>内存够用</b>
4、内存非16K <b>非缓存池</b>内存<b>不够用</b><br>
消息累加器作用<br>
减少网络传输的资源消耗
减少磁盘I/O资源消耗
消息累加器的结构<br>
Sender线程<br>
KafkaProducer.send()逻辑<br>
生产者消息产生及发送流程<br>
比较重要的生产者参数<br>
acks<br>
acks=1<br>
acks=0<br>
acks=-1/all<br>
max.request.size<br>
retries和retry.backoff.ms<br>
max.in.flight.requests.per.connection<br>
compression.type<br>
connection.max.idle.ms<br>
linger.ms(<b><font color="#000000">优化点</font></b>)<br>
receive.buffer.bytes&send.buffer.bytes<br>
request.timeout.ms<br>
问题和答案<br>
发送消息的时候, 当Broker挂掉了,消息体还能写入到消息缓存中吗<br>
当最新的ProducerBatch还有空余的内存,但是接下来的一条消息很大,不足以加上上一个Batch中,会怎么办呢?
那么创建ProducerBatch的时候,应该分配多少的内存呢?<br>
消费者、消费者组<br>
消费者组是什么<br>
三个特性<br>
Consumer Group 下可以有<b>一个或多个Consumer 实例</b><br>
<b>Group ID </b>是一个字符串, 在Kafka集群中<b>唯一标识Consumer Group</b><br>
Consumer Group 下所有实例订阅主体的单个分区,只能分配给组内某个Consumer实例消费。同一个分区消息可能被多个Group 消费。
Kafka消费者组解决了哪些问题?(与传统消息系统比较)<br>
消息队列模型伸缩性差
发布/订阅模型下伸缩性差<br>
Consumer Group 之间彼此队里,互不影响<br>
用Consumer Group机制,实现了传统两大消息引擎<br>
分区策略(<font color="#000000"><b>重点</b></font>)
设置partition值需要考虑的因素<br>
推荐partition的数量一定要大于等于同时运行的consumer的数量<br>
建议partition的数量大于等于集群broker的数量<br>
分配策略<br>
RangeAssignor(范围)<b>(默认分配策略</b>)<br>
分配<br>
1、<font color="#f44336"><b>以topic为单位</b></font>
2、先对topic下的partition进行排序
3、再对topic下的consumer进行排序
4、将partition依次分配给consumer
每个topic都会重复上面4步的分配流程
配置参数<br>
如何进行计算分区<br>
解析<br>
举例<br>
缺点
分区数和消费者数无法整除时会造成倾斜
RoundRobin(轮询)<br>
两种情况<br>
如果<font color="#f44336"><b>所有consumer实例的订阅是相同的</b></font>,那么partition会<b><font color="#f44336">均匀分布</font></b>
如果<font color="#f44336"><b>同一消费者组内</b>,<b>所订阅的消息是不相同的</b></font>,那么在执行分区分配的时候,就不是完全的轮询分配,有可能会导致<font color="#f44336"><b>分区分配的不均匀</b></font><br>
配置参数<br>
工作原理:<b><font color="#000000">TopicAndPartition组合给consumer均分</font></b><br>
举例<br>
由于分配时是按所有<font color="#a23735">Partition</font>来的,所以即使Topic之间Partition的数量是不平均的,分配结果也是基本平均的,克服了<font color="#a23735">RangeAssignor</font>的缺点<br>
缺点<br>
示例<br>
consumer订阅信息不一致时造成分配不平衡
总结<br>
使用RoundRobin策略有两个前提条件<br>
<b><font color="#f44336">同一个Consumer Group</font></b>里面的所有消费者的<b><font color="#f44336">num.streams</font></b>( <font color="#ed9745">这个参数就是告诉 MirrorMaker 要创建多少个 KafkaConsumer 实例</font>)必须相等
每个消费者订阅的主题必须相同
StickyAssignor(粘滞策略)<br>
目标<br>
1、分区的分配尽可能的均匀<br>
2、分区的分配尽可能和上次分配保持相同<br>
配置参数
示例一(消费者的订阅信息都是相同)<br>
C1下线<br>
采用RoundRobinAssignor策略<br>
采用StickyAssignor策略<br>
示例二(订阅信息不同的情况)<br>
初始状态
采用RoundRobinAssignor策略
采用StickyAssignor策略<br>
C0下线<br>
采用RoundRobinAssignor策略(重新分配)<br>
采用StickyAssignor策略
自定义分配策略<br>
Coordinator-协调者<br>
请求类型<br>
组协调器<br>
GroupCoordinator 的启动<br>
Coordinator的确定与分区分配<br>
1、确定consumer group位移信息写入<b><font color="#f44336">__consumers_offsets</font></b>这个topic的那个分区<br>
2、<font color="#f44336"><b>该分区leader所在的broker</b></font>就是被选定的coordinator<br>
分区步骤<br>
1、<b>第1步就是找到这个coordinator</b>,对于每1个consumer group,Kafka集群为其从broker集群中选择一个broker作为其coordinator。<br>
2、找到coordinator之后,发送JoinGroup请求。<br>
消费者加入组流程 <font color="#f44336"><b>JoinGroup</b></font>
3、JoinGroup返回之后,发送SyncGroup,得到自己所分配到的partition<br>
partition的分配策略和分配结果其实是由client决定的<br>
组协调器同步流程 SyncGroup<br>
heartbeat的实现原理
那这个定期发送如何实现呢?<br>
是通过DelayedQueue来实现的<br>
重平衡Rebalance<br>
触发与通知<br>
Rebalance 的触发条件<br>
1、当 Consumer Group<b><font color="#f44336"> 组成员数量</font></b>发生变化(主动加入或者主动离组,故障下线等);<br>
2、当订阅<font color="#f44336"><b>主题数量</b></font>发生变化;<br>
3、当订阅主题的<font color="#f44336"><b>分区数</b></font>发生变化;
Rebalance 如何通知其他 consumer 进程?<br>
靠 Consumer 端的心跳线程<br>
协议 (protocol) 说明<br>
Heartbeat请求<br>
LeaveGroup请求<br>
SyncGroup请求<br>
JoinGroup请求<br>
DescribeGroup请求<br>
consumer group状态机
核心是 rebalance 操作<br>
重平衡发生在 <b><font color="#f44336">PreparingRebalance</font></b> 和 <font color="#f44336"><b>AwaitingSync </b></font>状态机中
重平衡所涉及的参数<br>
session.timeout.ms<br>
heartbeat.interval.ms<br>
max.poll.interval.ms<br>
Rebalance Generation<br>
重平衡场景举例<br>
有新的成员加入消费组<br>
消费组成员崩溃<br>
消费组成员主动离开<br>
消费组成员提交位移时<br>
优缺点<br>
优点<br>
给消费者组带来了高可用性和伸缩性。<br>
缺点<br>
再均衡期间消费者无法读取消息,整个群组有一小段时间不可用<br>
partition被重新分配给一个消费者时,消费者当前的读取状态会丢失,有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。因此需要进行安全的再均衡和避免不必要的再均衡
kafka 静态消费组成员(<b><font color="#000000">优化点</font></b>)<br>
为什么需要<br>
基本原理<br>
静态消费者情况下重平衡逻辑及注意事项<br>
参数说明<br>
group.instance.id<br>
session.timeout.ms<br>
问题
为什么在一个group内部,1个parition不能被多个consumer拥有?
时序性<br>
offset<br>
如果有多个客户端配置了不同的分配策略, 那么会以哪个配置生效呢?<br>
1、选择所有 Member 都支持的分配策略;<br>
2、在 1 的基础上,优先选择每个partition.assignment.strategy配置靠前的策略。
消费者消费并提交了之后,其他消费者是如何知道我已经消费了,从而不会重新消费的呢?
为什么要在consumer中选一个leader出来,进行分配,而不是由coordinator直接分配呢?<br>
offset管理机制
位移保存
老版本(Kafka0.9版本之前)<br>
保存在 ZooKeeper 中<br>
好处<br>
减少了 Kafka Broker 端的状态保存开销<br>
服务器节点做成无状态的,这样可以自由地扩缩容,实现超强的伸缩性<br>
缺点<br>
ZooKeeper不适合进行频繁的写更新<br>
新版本(从0.9版本开始)<br>
位移保存在 Kafka内部主题的方法,也就是__consumer_offsets
broker无状态<br>
那访问压力去哪了呢?<br>
位移主题(Offsets Topic)<br>
特征
<b><font color="#f44336">__consumer_offsets</font></b> 的主要作用是保存 Kafka 消费者的位移信息<br>
它要求这个提交过程不仅要实现高持久性,还要支持高频的写操作<br>
消息格式却是 Kafka 自己定义的,用户不能修改<br>
Kafka Consumer 有 API 帮你提交位移<br>
分区数<br>
消息格式<br>
key<br>
Group ID,主题名,分区号
value<br>
主要保存的是offset 的信息,当然还有时间戳等信息<br>
offset 的分类<br>
LogStartOffset<br>
ConsumerOffset<br>
HighWatermark<br>
特征
在<b><font color="#000000">分区高水位以下的消息被认为是已提交消息</font></b>,反之就是未提交消息。
<b style=""><font color="#ed9745">位移值等于高水位的消息也属于未提交消息</font></b>。也就是说,高水位上的消息是不能被消费者消费的。
主要作用<br>
<b><font color="#f44336">定义消息可见性</font></b>,即用来标识分区下的哪些消息是可以被消费者消费的。<br>
帮助 Kafka 完成副本同步<br>
LogEndOffset
offset内部原理<br>
原理图<br>
位移提交<br>
<b>自动提交</b><br>
相关参数<br>
enable.auto.commit
auto.commit.interval.ms<br>
提交的时机<br>
存在的问题
数据丢失<br>
重复消费<br>
<b>手动提交</b><br>
参数配置<br>
设置 enable.auto.commit 为 false<br>
提交方式
同步提交<br>
存在的问题<br>
同步操作,即该方法会一直等待,直到位移被成功提交才会返回<br>
异步提交<br>
存在的问题
不会自动重试<br>
当消费者异常关闭或者触发了再均衡前,如果偏移量还未提交就会造成偏移量丢失<br>
解决
解决方案<br>
分区级别提交造成的数据重复<br>
解决方法<br>
异步和同步提交相结合<br>
精细化提交(分批提交)
再平衡监听器提交<br>
Rebalance 可能导致数据重复消费<br>
对于Kafka而言,从poll方法返回消息的那一刻开始这条消息已经算是“消费”完成了
实现ConsumerRebalanceListener 接口
1、初始化消费者<br>
2、实现监听接口
3、运行程序<br>
无论是同步提交还是异步提交都会造成消息丢失或重复<br>
位移提交的异常处理-CommitFailedException(<b><font color="#5c4038">优化点</font></b>)<br>
原因:<b><font color="#f44336">消息的处理时间超长</font></b><br>
避免因处理超时导致的ReBanlance情况采取的措施<br>
缩短单条消息处理的时间<br>
增加 Consumer 端允许下游系统消费一批消息的最大时长<br>
减少下游系统一次性消费的消息总数<br>
下游系统使用多线程来加速消费<br>
<b><font color="#f44336">自动提交</font></b>会不会发生这样的情况
消费者位移重设<br>
位移维度<br>
Earliest<br>
Latest<br>
Current<br>
Specified-Offset<br>
hift-By-N<br>
时间维度<br>
DateTime<br>
Duration<br>
位移重设的方式<br>
代码实现<br>
Offset Commit提交过程<br>
消费端<br>
broker端<br>
Offset Fetch获取过程<br>
消费端<br>
broker端<br>
Compact 策略<br>
索引及日志文件
文件存储机制
<b><font color="#f44336">每个分区</font>就对应<font color="#f44336">一个Log对象</font>,在物理磁盘上则对应于一个子目录</b><br>
<b>日志段是Kafka保存消息的最小载体</b><br>
<b>kafka利用<font color="#f44336">分段+索引</font>的方式来解决查找效率问题</b>
参数<b><font color="#5c4038">log.segment.bytes</font></b><br>
限定了每个日志段文件的大小,最大为1GB。
超过该限制会进行日志切分滚动<b><font color="#f44336">log rolling</font></b><br>
<b>active log segment</b>:正在被写入的日志
<b>ConcurrentSkipListMap:跳表-索引段</b><br>
Message结构<br>
offset<br>
消息大小<br>
消息体<br>
索引机制<br>
分类
偏移量索引文件<br>
格式<br>
index文件<br>
如何通过offset找到对应的消息?<br>
时间戳索引文件<br>
查找<br>
查找原理<br>
格式<br>
索引项:稀疏索引<br>
<b>索引项间隔:log.index.interval.bytes</b>
<b>索引文件以<font color="#f44336">稀疏索引</font>的方式来构建</b>
<b>稀疏索引是通过<font color="#4669ea">MappedByteBuffer</font>将索引文件映射到内存中(pageCache),加快索引的查询速度</b>
索引文件排序<br>
<b><font color="#f44336">按照位移/时间戳升序排序</font></b><br>
用二分法查找索引,时间复杂度是O(logN)
检索原理<br>
索引段:ConcurrentSkipListMap
ConcurrentSkipListMap的方法<br>
索引项:稀疏矩阵<br>
<b>MMAP(MappedByteBuffer)</b><br>
有限内存加载所有segment?
检索整体流程图
日志定位过程<br>
例子<br>
日志切分
每个LogSegment中的日志数据文件大小均相等<br>
消息⽇志清理
两种⽇志清理策略
⽇志删除
⽇志删除策略<br>
基于时间<br>
删除时机<br>
删除过程<br>
基于⽇志⼤⼩<br>
删除时机<br>
删除过程<br>
基于偏移量<br>
删除时机<br>
删除过程<br>
⽇志压缩
⽇志压缩策略<br>
参数配置<br>
图示<br>
⽇志压缩过程<br>
⽇志压缩原理<br>
改进后的二分法<br>
log文件存储格式<br>
offset index<br>
传统二分法源码<br>
问题(pageCache)<br>
改进后的二分法<br>
源码<br>
效果<br>
为什么设置热区大小为8192字节<br>
副本同步机制<br>
高可靠性<br>
概念<br>
AR(Assigned Repllicas)<br>
ISR(In-Sync Replicas)<br>
ISR是AR中的一个子集<br>
OSR(Out-Sync Relipcas)<br>
公式:AR = ISR + OSR
高水位<br>
作用主要<br>
定义消息可见性<br>
帮助 kafka 完成副本机制的同步<br>
三种角色<br>
leader 副本<br>
<font color="#f44336"><b>相应 clients 端读写请求的副本</b></font><br>
Follower 副本<br>
被动的备注 leader 副本的内容,<b><font color="#f44336">不能响应 clients 端读写请求</font></b><br>
ISR 副本<br>
包含了 leader 副本和所有与 leader 副本保持同步的 Followerer 副本
LEO<br>
HW
图示<br>
repcation机制<br>
ISR机制<br>
ISR (In-Sync Replicas):副本同步队列<br>
延迟
replica.lag.time.max.ms:延迟时间<br>
replica.lag.max.messages:延迟条数<br>
ISR机制选择<br>
最后选择了时间,去掉消息数差异
replica.lag.time.max.ms理解<br>
follower故障及leader故障<br>
follower故障<br>
leader故障<br>
副本不同的异常情况<br>
如果leader crash时,ISR为空怎么办?<br>
unclean.leader.election<br>
true(默认)<br>
false<br>
HW和LEO更新机制<br>
高水位更新机制<br>
远程副本的主要作用<br>
运行过程中哪些数据会发生变更<br>
更新部分<br>
不会更新部分<br>
更新时机<br>
follower和leader更新follower的LEO的时间<br>
follower的LEO更新时间<br>
leader端的follower副本的LEO更新时间<br>
leader 副本和 Follower 副本更新流程<br>
更新流程图<br>
follower默认每500ms从leader拉取一次数据
follower同步成功后也会给leader发送ack
副本同步机制举例<br>
1、初始状态<br>
2、第一次同步<br>
1、Leader 副本处理生产者的逻辑<br>
2、Leader 副本处理 Follower 副本拉取消息的逻辑<br>
3、Follower 副本从 Leader 拉取消息的处理逻辑<br>
经过这一次拉取,我们的 Leader 和 Follower 副本的 LEO 都是 1,各自的高水位依然是0,没有被更新。
3、第二次同步<br>
1、Leader 副本处理 Follower 副本拉取消息的逻辑<br>
2、Follower 副本从 Leader 拉取消息的处理逻辑<br>
至此,一次完整的消息同步周期就结束了。事实上,Kafka 就是利用这样的机制,实现了 Leader 和 Follower 副本之间的同步。
HW保证消费者消费数据一致性,是否丢数据由ack保证。(<font color="#4669ea" style="font-weight: normal;">重点</font>)
副本同步过程中一致性保证(<b><font color="#0d47a1">重点</font></b>)<br>
<b>单纯的多副本可以保证kafka高可用,但是并不能保证kafka数据的不丢失</b>
如果要保证写入kafka的数据不丢失<br>
同步过程中数据丢失/不一致 - Leader Epoch<br>
为什么Kafka需要Leader Epoch?<br>
Kafka通过Leader Epoch来解决follower日志恢复过程中潜在的数据不一致问题
问题前提
高水位(High Watermark)<br>
副本策略(ISRs)<br>
有一个前提:min.isr设置的是1<br>
问题分析
场景一:<b>日志丢失</b>问题
第1步状态<br>
第2步状态:<font color="#f44336"><b>A重启了</b></font><br>
第3步状态:<b><font color="#f44336">B崩溃了</font></b><br>
问题在哪里?<br>
场景二:<b>日志错乱</b>问题
第1步状态<br>
第2步状态<br>
第3步状态<br>
问题在哪里?<br>
ISRs中选出的leader一定是安全(包含所有已提交数据)的吗?<br>
Leader Epoch的引入
leader epoch<br>
KIP-101引入如下概念<br>
Leader Epoch解决方案
场景一
如果发起LeaderEpochRequest的时候 B 就已经挂了怎么办?<br>
场景二<br>
不保证数据不丢失,只保证副本间一致性
leader后不会截断自己的日志,日志截断只会发生在follower身上
Kafka Broker 写入数据的过程<br>
隐患:如果数据写入 PageCache 后 Kafka Broker宕机会怎样?机子宕机/掉电?<br>
<b><font color="#f44336">Kafka Broker 宕机: 消息不会丢失</font></b><br>
<b><font color="#f44336">机子宕机/掉电: 消息会丢失</font></b><br>
拓展:Kafka 日志刷盘机制<br>
推荐采用默认值,即不配置该配置,交由操作系统自行决定何时落盘,以提升性能。
相关配置<br>
针对 broker 配置<br>
针对 topic 配置<br>
查看 Linux 后台线程执行配置<br>
保证数据可靠性(<b><font color="#4669ea">重点</font></b>)
哪些环节可能丢消息?<br>
producer可靠性
生产者发送消息一般流程<br>
Producer丢失消息
发生在生产者客户端<br>
几个解决的思路
发生在消息发送过程中
生产端发送消息流程<br>
此环节丢失消息的场景<br>
Producer消息没有发送成功<br>
不恰当配置<br>
回顾下重要的参数: <b>acks</b><br>
acks=0<br>
acks=1<br>
ack=all / -1<br>
Acks=all 就可以代表数据一定不会丢失了吗?<br>
数据重复<br>
解决办法:<b><font color="#0d47a1">幂等性</font></b><br>
幂等原理<br>
broker端可靠性<br>
Kafka Broker 写入数据的过程数据丢失<br>
pageCache刷盘触发条件<br>
主动调用sync或fsync函数<br>
可用内存低于阀值<br>
dirty data时间达到阀值<br>
理论上,<b><font color="#f44336">要完全让kafka保证单个broker不丢失消息是做不到的</font></b>,只能通过调整刷盘机制的参数缓解该情况。比如,减少刷盘间隔,减少刷盘数据量大小。时间越短,性能越差,可靠性越好(尽可能可靠)
主从同步数据丢失
副本之间的数据同步也可能出现问题
解决方案:<b><font color="#f44336">ISR </font></b>和 <b><font color="#f44336">Epoch </font></b>机制<br>
对应需要的配置参数如下<br>
1、acks=-1 或者 acks=all<br>
2、replication.factor >= 3<br>
3、min.insync.replicas > 1<br>
consumer可靠性
自动提交<br>
自动提交存在的问题<br>
数据丢失<br>
重复消费<br>
手动提交<br>
手动<b><font color="#f44336">同步</font></b>提交存在的问题<br>
手动<b><font color="#f44336">异步</font></b>提交存在的问题<br>
手动提交会出现数据重复
消息堆积造成数据丢失<br>
解决措施<br>
怎么确保消息 100% 不丢失?<br>
生产端<br>
设置重试:props.put("retries", "10")<br>
设置acks=all<br>
设置回调:producer.send(msg, new CallBack(){...})<br>
Broker<br>
内存:使用带蓄电池后备电源的缓存cache<br>
Kafka 版本 0.11.x 以上:支持 Epoch 机制<br>
replication.factor >= 3: 副本数至少有 3 个<br>
min.insync.replicas > 1: 代表消息至少写入 2个副本才算发送成功。前提需要 acks=-1<br>
unclean.leader.election.enable=false: 防止不在 ISR 中的 Follower 被选举为 Leader<br>
消费端<br>
客户端版本升级至0.10.2 以上版本<br>
取消自动提交auto.commit = false,改为手动 ack<br>
尽量提高客户端的消费速度,消费逻辑另起线程进行处理
幂等生产者<br>
原理<br>
注意问题<br>
只能保证单分区上的幂等性
只能实现单会话上的幂等性<br>
想实现多分区(partition)以及多会话(session)上的消息无重复,应该怎么做呢?<br>
事务(transaction)
事务<br>
Kafka为什么要引入事务<br>
1、跨会话的幂等性写入<br>
2、跨会话的事务恢复<br>
3、跨多个 Topic-Partition 的幂等性写入<br>
<b><font color="#f44336">幂等性不能跨多个分区运作,而事务可以弥补这个缺陷。</font></b>
前面3个是针对producer的,Consumer 端难以保证事务性<br>
事务性保证<br>
拒绝僵尸实例(Zombie fencing)<br>
Kafka中的事务特性主要用于以下两种场景<br>
1、生产者发送多条消息可以封装在一个事务中,形成一个原子操作<br>
2、read-process-write模式:将消息消费和生产封装在一个事务中,形成一个原子操作<br>
使用示例源码<br>
事务性要解决的问题<br>
在写多个 Topic-Partition 时,执行的一批写入操作,有可能出现部分 Topic-Partition 写入成功,部分写入失败(比如达到重试次数),这相当于出现了中间的状态,这并不是我们期望的结果;
Producer 应用中间挂之后再恢复,无法做到 Exactly-Once 语义保证(<b><font color="#f44336">幂等性无法保证重启后精准一次性</font></b>);<br>
事务性实现的关键<br>
1、事务原子性:2PC<br>
2、TransactionCoordinator高可用<br>
3、Producer 在 Fail 恢复后操作<br>
4、如何标识一个事务操作的状态<br>
事务性的整体流程<br>
1. Finding a TransactionCoordinator<br>
2. Getting a PID<br>
3. Starting a Transaction<br>
4. Consume-Porcess-Produce Loop
5.Committing or Aborting a Transaction<br>
思考<br>
如果多个 Producer 使用同一个 txn.id 会出现什么情况?<br>
Consumer 端如何消费事务数据?<br>
Consumer 的消费策略<br>
read_committed<br>
read_uncommitted<br>
Last Stable Offset(LSO)<br>
Server 处理 read_committed 类型的 Fetch 请求?<br>
这种机制有没有什么问题呢?<br>
Consumer 如何过滤 abort 的事务数据<br>
方案
Consume过滤abort
1、如果这个数据是 control msg(也即是 marker 数据)<br>
2、如果这个数据是正常的数据<br>
3、检查abortedProducerIds队列<br>
Consumer 消费数据时,其顺序如何保证<br>
如果 txn.id 长期不使用,server 端怎么处理?<br>
消息有序性
全局有序<br>
如何保证:<b>需要1个Topic只能对应1个Partition</b><br>
consumer也要使用<b>单线程</b>或者<b>保证消费顺序的线程</b>模型<br>
局部有序<br>
不增加partition数量的情况下想提高消费速度<br>
消息重试对顺序消息的影响<br>
max.in.flight.requests.per.connection<br>
不设置该参数,失败记录捕获后自行处理(<b><font color="#0d47a1">优化点</font></b>)<br>
控制器组件(Controller)<br>
控制器是如何被选出来的?<br>
启动时选举<br>
leader异常选举<br>
控制器是做什么的?<br>
1、主题管理(创建、删除、增加分区)<br>
2、分区重分配<br>
3、Preferred 领导者选举<br>
4、集群成员管理(新增 Broker、Broker 主动关闭、Broker 宕机)<br>
5、数据服务<br>
控制器保存了什么数据?<br>
这里面比较重要的数据有<br>
所有主题信息<br>
所有 Broker 信息<br>
所有涉及运维任务的分区<br>
控制器故障转移(Failover)<br>
控制器故障转移的过程<br>
epoch防止脑裂<br>
每个和控制器交互的请求都会携带controller_epoch字段<br>
分区Leader的选举<br>
消费组Leader的选举<br>
PageCache
概念<br>
基于两个因素进行缓存<br>
1、磁盘访问的速度比内存慢好几个数量级(毫秒和纳秒的差距);<br>
2、被访问过的数据,有很大概率会被再次访问。<br>
PageCache文件读写流程<br>
读Cache<br>
写Cache<br>
触发脏数据刷新到磁盘的条件<br>
超时<br>
脏数据占用内存空间过大<br>
Buffer cache<br>
page cache与buffer cache作用<br>
两类缓存的逻辑关系(PageCache 和 Buffer cache)
第一阶段:仅有Buffer Cache<br>
第二阶段:Page Cache、Buffer Cache两者并存<br>
Page Cache仅负责其中mmap部分的处理,而Buffer Cache实际上负责所有对磁盘的IO访问。
冗余存储<br>
第三阶段:Page Cache、Buffer Cache两者融合
两者的关系<br>
不适应大文件<br>
带来 2 个问题<br>
解决方案<br>
零拷贝<br>
写数据用mmap
读操作用sendfile<br>
CommitFailedException异常怎么处理?<br>
处理的总时间超时<br>
解决方法<br>
1、缩短单条消息处理的时间<br>
2、增加 Consumer 端允许下游系统消费一批消息的最大时长
3、减少下游系统一次性消费的消息总数<br>
4、下游系统使用多线程来加速消费<br>
综合以上这 4 个处理方法,推荐你首先尝试采用方法 1 来预防此异常的发生<br>
Standalone Consumer独立消费者group.id冲突<br>
扩容
0 条评论
下一页