broker工作流程
1. 首先启动的时候将broker启动信息在zk里面注册
2. 然后抢占,看谁抢的快,谁就是监控的controller
3. 监控的controller监控brokers的节点变化,然后选举leader
选举规则是:按照isr中存活前提,然后按照在ar中的顺序
(isr是能互相连通的所有节点,ar是kafka中所有的副本,or是follower和leader副本同步延迟过多的副本
ar=isr+or)
排在ar前面的优先。
leader负责处理读写操作,而follower只负责副本数据的同步
4. 选举完,controller会将选举的信息上传到zk中
5. 其他controller会从zk同步信息
6. 如果有一个节点挂了,监听的controller监听到了变化会去获取isr,然后重新选举新的leader
再更新leader和isr
<br>
leader和follower故障处理
follower故障处理细节
<br>
1. 当出现follower故障的时候首先会临时踢出isr
2. 在这期间的leader和follower会继续接收数据
3. follower恢复后,follower会读取本地磁盘中在这个follower故障时候的HW,然后将自己高于上一次HW的数据都切掉,因为这些数据是没有验证的数据,然后去从这个上一次的HW开始,同步leader,当follower追上当前的hw就可以加入isr了
leo是每个副本的最后一个offset也就是每个副本的最新的那个数据
hw是所有副本中最小的那个数据,是全局的
生产经验
手动调整分区副本,创建副本存储计划
手动调整分区副本存储的步骤如下:<br>(1)创建一个新的topic,名称为three。<br>bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 4 --replication-factor 2 --topic three<br>(2)查看分区副本存储情况。<br>bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three<br>(3)创建副本存储计划(所有副本都指定存储在broker0、broker1中)。<br>vim increase-replication-factor.json<br>输入如下内容:<br>{<br> "version":1,<br> "partitions":[{"topic":"three","partition":0,"replicas":[0,1]},<br> {"topic":"three","partition":1,"replicas":[0,1]},<br> {"topic":"three","partition":2,"replicas":[1,0]},<br> {"topic":"three","partition":3,"replicas":[1,0]}]<br>}<br>(4)执行副本存储计划。<br>bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute<br>(5)验证副本存储计划。<br> bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify<br>(6)查看分区副本存储情况。<br>bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three<br>
leader Partition自动平衡
auto.leader.rebalance.enable 建议设置false
增加副本因子
由于某个主题很重要,所以我们要对这个重要的主题数据增加副本
1)创建topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 3 --replication-factor 1 --topic four
2)手动增加副本存储
(1)创建副本存储计划(所有副本都指定存储在broker0、broker1、broker2中)。
[atguigu@hadoop102 kafka]$ vim increase-replication-factor.json
输入如下内容:
{"version":1,"partitions":[{"topic":"four","partition":0,"replicas":[0,1,2]},{"topic":"four","partition":1,"replicas":[0,1,2]},{"topic":"four","partition":2,"replicas":[0,1,2]}]}
(2)执行副本存储计划。
[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
文件存储机制
首先kafka中topic是逻辑上的概念,partition是物理上的概念,每个partition对应一个log文件,数据是不断追加到log文件的末尾的。为了防止文件过大定位太慢,kafka使用分片和稀疏索引机制,每个log分为多个segment,segment中包括index文件,log文件和timeindex文件,文件命名规则是:topic名称+分区序号
log文件和index文件详解
如何在log文件中找到offset为600的数据
1. 首先根据segment文件的名字找打segment文件。这个文件里面存储的是相对offset,通过相对offset+segment文件名获取绝对offset
2. 找到小于等于目标pffset的最大的offset对应的索引
3. 然后定位到log文件
4. 向下遍历找到想要的record
注意:
1. kafka的index是稀疏索引,每向log文件中写入4kb数据,就会向index文件写入一条索引。
2. index文件中保存的offset是相对offset
<br>
文件清理策略
日志保存时间默认为7天,一旦超过了设置的时间有两种清理策略
1. delete
2. compact
delete是将过期数据删除
compact是将相同key的不同value值只保留最后一个版本
kafka如何高效读写数据
1. kafka本身是分布式的集群,使用分区技术,并行度高
2. 读数据使用稀疏索引,快速定位
3. 顺序写磁盘,kafka写文件一直是追加写,而不是更新
4. 页缓存和零拷贝技术
kafka生产者的数据会直接写入到linux系统内核的页缓存上,页缓存上数据什么时候落盘到file文件由linux系统确定,然后kafka消费者消费数据的时候先去页缓存上拿取数据,再去file的segment中读取数据,然后直接走网卡到消费者,不做任何处理。因为kafka拿取数据的时候不进行处理,就不走应用层,而是直接将数据网卡给到消费者