Kafka
2023-06-13 20:39:07 1 举报
AI智能生成
Kafka
作者其他创作
大纲/内容
概念与基础架构
介绍
<b>是什么?</b>
是一个<b><font color="#e74f4c">分布式、分区的、多副本的、多生产者、多订阅者</font></b>,基于 <b>zookeeper协调</b>的分布式日志系统(也可以当做MQ系统),主要的应用场景:<b>日志收集系统和消息系统</b>
<b>Kafka主要设计目标</b>
以时间复杂度为O(1)的方式提供消息持久化能力,即使对<b>TB级</b>以上数据也能保证常数时间的访问性能。
高吞吐率。即使在非常廉价的商用机器上也能做到<b>单机支持每秒100K</b>条消息的传输。
支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition(分区)内的消息顺序传输。
同时支持<b>离线数据处理(持久化)</b>和<b>实时数据处理(消费)</b>。
支持在线水平扩展
优势
<b>高吞吐量</b>
单机每秒处理几十上百万的消息量。即使存储了许多TB的消息,它也保持稳定的性能
<b>高性能</b>
单节点支持上千个客户端,并保证零停机和零数据丢失。
<b>持久化数据存储</b>
将消息持久化到磁盘。通过将数据持久化到硬盘以及replication防止数据丢失。
零拷贝
顺序读写
Page Cache
<b>分布式系统,易于向外扩展</b>
无需停机即可扩展机器
<b>可靠性</b>
Kafka是分布式,分区,复制和容错的
<b>客户端状态维护</b>
消息被处理的状态是在Consumer端维护,而不是由server端维护。当失败时能自动平衡(再平衡)
支持online【实时数据消费】和 offline 【离线数据持久化处理】场景
支持多种客户端语言,Kafka支持Java、.NET、PHP、Python等多种语言
应用场景
<b>日志收集</b>
一个公司可以用Kafka可以收集各种服务的Log,通过Kafka以统一接口服务的方式开放 给各种Consumer
<b>消息系统</b>
解耦生产者和消费者、缓存消息等
<b>用户活动跟踪</b>
Kafka经常被用来记录Web用户或者App用户的各种活动,如浏览网页、搜索、点击 等活动,这些活动信息被各个服务器发布到Kafka的Topic中,然后消费者通过订阅这些Topic来做实时的监控分析,亦可保存到数据库;
<b>运营指标(监控数据)</b>
Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的 集中反馈,比如报警和报告
<b>流式处理</b>
Spark Streaming和Storm。
基本架构
<b>消息和批次</b>
为了提高效率,消息被分批写入Kafka。<b>批次就是一组消息</b>
<b>模式(数据格式)</b>
xml
json
简单,可视性好
<b>Avro(kafka使用)</b>
<b>Avro提供了一种紧凑的序列化格式,模式和消息体分开。</b>当模式发生变化时,不需要重新生成代码,它还支持强类型和模式进化,其版本既向前兼容,也向后兼容
性能高
<b>主题和分区</b>
Kafka的消息通过主题进行分类。主题可比是数据库的表或者文件系统里的文件夹。主题可以被分为若干分区,一个主题通过分区分布于Kafka集群中,提供了<b>横向扩展</b>的能力。
<b>生产者、消费者</b>
生产者在默认情况下把消息均衡地分布到主题的所有分区上
直接指定消息的分区
根据消息的key散列取模得出分区
轮询指定分区
消费者通过偏移量来区分已经读过的消息,从而消费消息。 消费者是消费组的一部分。<b><font color="#e74f4c">消费组保证每个分区只能被一个消费者使用</font></b>,避免重复消费
<b>Broker 和 集群</b>
一个独立的Kafka服务器称为broker。broker接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker为消费者提供服务,对读取分区的请求做出响应,返回已经提交到磁盘上的消息。<font color="#e74f4c">单个broker可以轻松处理数千个分区以及每秒百万级的消息量</font>
核心概念
<b>Producer</b>
该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到 当前用于追加数据的 <b>segment 文件</b>中
Topic分区如何负载均衡
轮询(默认)
生产者会把消息直接写到指定的分区。这通常是通过<b>消息键和分区器</b>来实现的,分区器为键生成一个散列值,并将其映射到指定的分区上。这样可以保证包含同一个键的消息会被写到同一个分区上
自定义分区器,根据不同的业务规则将消息映射到分区。
<b>Consumer</b>
<ul><li>消费者订阅一个或多个主题,并按照消息生成的顺序读取它们。 </li><li>消费者通过检查消息的<b>偏移量</b>来区分已经读取过的消息。偏移量是另一种元数据,它是一个不断递增的整数值,在创建消息时,Kafka 会把它添加到消息里。在给定的分区里,<font color="#e74f4c">每个消息的偏移量都是唯一的</font>。消费者把每个分区<b>最后读取的消息偏移量保存在Zookeeper(低版本) 或Kafka(高版本) 上</b>,如果消费者关闭或重启,它的读取状态不会丢失。</li><li>消费者是消费组的一部分。<font color="#e74f4c">群组保证每个分区只能被一个消费者使用</font>。 </li><li>如果一个消费者失效,消费组里的其他消费者可以接管失效消费者的工作,<b>再平衡,分区重新分配</b>。</li></ul>
<b>Broker</b>
<ul><li>broker 是集群的组成部分。<b>每个集群都有一个broker 同时充当了<font color="#e74f4c">集群控制器</font>的角色(自动从集群 的活跃成员中选举出来)</b>。 </li><li>控制器负责管理工作,包括将分区分配给broker 和监控broker。 </li><li>在集群中,<font color="#e74f4c">一个分区从属于一个broker,该broker 被称为分区的首领</font></li></ul>
<b>Topic</b>
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,物理上不同Topic的消息分开存储
<font color="#e74f4c">主题就好比数据库的表,尤其是分库分表之后的逻辑表</font>
<b>Partition</b>
主题可以被分为若干个分区,一个分区就是一个提交日志
消息<b>以追加的方式写入分区</b>,然后<b>以先入先出的顺序读取</b>
无法在整个主题范围内保证消息的顺序,但可以保证消息在单个分区内的顺序
Kafka 通过分区来实现数据冗余和伸缩性。
需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。<br>
<b>Replica</b>
Master(首领副本)
每个分区都有一个首领副本。<b>为了保证一致性,所有生产者请求和消费者请求都会经过这个副本</b>
Follower(跟随者副本)
首领以外的副本都是跟随者副本。<font color="#e74f4c">跟随者副本不处理来自客户端的请求</font>,它们唯一的任务就是从首领那里复制消息,保持与首领一致的状态。如果首领发生崩溃,其中的一个跟随者会被提升为新首领
<b>Offset</b>
生产者Offset
消费者Offset
<b>副本</b>
<b>AR(Assigned Replicas)</b>
分区中的所有副本统称为AR(Assigned Repllicas),<font color="#e74f4c">AR = ISR + OSR</font>
<b>ISR(In-Sync Replicas)</b>
<font color="#e74f4c">与leader副本保持一定程度同步的副本(包括Leader)</font>
ISR集合 是AR集合中的一个子集。消息会先发送到leader副本,然后follower副本才能从leader副本中拉取消息 进行同步,同步期间内follower副本相对于leader副本而言会有一定程度的滞后。前面所说的“一定程度” 是指可以忍受的滞后范围,这个范围可以通过参数进行配置。
<b>OSR(Out-Sync Replicas)</b>
<font color="#e74f4c">与leader副本同步滞后过多的副本(不包括leader)副本</font>
<b>HW(消费者读指针limit)</b>
HW是High Watermak的缩写, 俗称高水位,它表示了一个特定消息的偏移量(offset),消费时只能拉取到这个offset之前的消息
<b>LEO(生产者写指针)</b>
Log End Offset,它表示了当前日志文件中<b>下一条待写入</b>消息的offset
Kafka开发使用
<b>生产者:消息的发送</b>
生产者主要的对象有: `<b>KafkaProducer</b>` , `<b>ProducerRecord</b>` 。 其中 KafkaProducer 是用于发送消息的类, ProducerRecord 类用于封装Kafka的消息
消息发送配置
其他参数可以在 <b>org.apache.kafka.clients.producer.ProducerConfig</b>
消费者生产消息后,需要broker端的确认,可以<b>同步确认</b>,也可以<b>异步确认</b>
<b>消费者:消息的消费</b>
消息消费配置
其他参数可以从 <b>org.apache.kafka.clients.producer.ProducerConfig</b> 中找到
<b>SpringBoot Kafka</b>
依赖
application.yml 配置
配置类
生产者
消费者
高级特性
生产者
消息发送
<b>数据生产流程</b>
<b>必要参数配置</b>
<b>拦截器</b>
对于Producer而言,Interceptor使得用户在消息发送前以及Producer回调逻辑前有机会对消息做 一些定制化需求,比如修改消息等。同时,Producer允许用户指定多个<font color="#e74f4c">Interceptor</font>按序作用于同一条消 息从而形成<b>拦截链(interceptor chain)</b>。Intercetpor的实现接口是<font color="#e74f4c">org.apache.kafka.clients.producer.ProducerInterceptor</font>,其定义的方法包括:<br>
<ul><li><b>onSend(ProducerRecord) :</b> Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算</li><li><b>onAcknowledgement(RecordMetadata, Exception)</b>:在消息被应答之前或消息发送失败时调用,并且通常都是在Producer回调逻辑触发之前。<font color="#e74f4c">onAcknowledgement运行在 Producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢Producer的消息发送效率</font></li><li><b>close:</b>关闭Interceptor,主要用于执行一些资源清理工作</li></ul>
自定义拦截器
1. 实现ProducerInterceptor接口
2. 在KafkaProducer的设置中设置自定义的拦截器
<b>序列化器</b>
<font color="#e74f4c"><b>Kafka中的数据都是字节数组</b></font>,在将消息发送到Kafka之前需要先将数据序列化为字节数组
分区器
分区计算
自定义分区器
1. 首先开发<b>Partitioner</b>接口的实现类
2. 在KafkaProducer中进行设置:<b>configs.put("partitioner.class", "xxx.xx.Xxx.class")</b>
原理剖析
参数配置补充
消费者
概念
消费者组、消费者
消费组的主要作用就是保证消费不会被重复消费(<b>一个分区只能分配给消费者组中的一个消费者</b>)<br>
<ul><li>消费者从订阅的主题消费消息,消费消息的偏移量保存在Kafka的名字是 <b>__consumer_offsets</b> 的 主题中。</li><li>消费者还可以将自己的偏移量存储到Zookeeper,需要设置<font color="#a23c73"><b>offset.storage=zookeeper</b></font>。 </li><li><font color="#e74f4c">推荐使用Kafka存储消费者的偏移量。因为Zookeeper不适合高并发</font></li></ul>
<b>向消费组添加消费者是横向扩展消费能力的主要方式</b>
必要时,需要为主题创建大量分区,在负载增长时可以加入更多的消费者。但是<b>不要让消费者的数量超过主题分区的数量</b>
除了通过增加消费者来横向扩展单个应用的消费能力之外,经常出现多个应用程序从同一个主题消费的情况。 <br>此时,每个应用都可以获取到所有的消息。只要保证每个应用都有自己的消费组,就可以让它们获取到主题所有的消息。<br>横向扩展消费者和消费组不会对性能造成负面影响<br>
<b>心跳机制</b>
目的是为了检测消费者或者Broker是否宕机,从而触发<b>再平衡</b>
参数
消息接收
<b>必要参数配置</b>
<b>订阅</b>
consumer 采用 <b><font color="#e74f4c">pull 模式</font></b>从 broker 中读取数据,采用 pull 模式,consumer 可<b style=""><font color="#262626">自主控制消费消息的速率</font></b>, 可以<b>自己控制消费方式(批量消费/逐条 消费)</b>,还可以<b>选择不同的提交方式从而实现不同的传输语义</b>
<b>位移提交</b>
<ul><li>Consumer需要向Kafka记录自己的位移数据,这个汇报过程称为 <b>提交位移(Committing Offsets)</b></li><li>Consumer 需要为分配给它的每个分区提交各自的位移数据</li><li>位移提交的由Consumer端负责的,Kafka只负责保管。<b>__consumer_offsets</b></li><li>位移提交分为<b>自动提交和手动提交</b></li><li>手动位移提交分为<b>同步提交</b>和<b>异步提交</b></li></ul>
<b>自动提交</b>
开启自动提交
<font color="#e74f4c">enable.auto.commit=true</font>
配置自动提交间隔:Consumer端
<font color="#e74f4c">auto.commit.interval.ms </font>,默认 5s
<b>手动提交</b>
同步提交
<font color="#a23c73"><b>KafkaConsumer#commitSync()</b></font>
会提交 KafkaConsumer#poll() 返回的最新 offset(等待直到 offset 被成功提交才返回)
异步提交
<font color="#a23c73"><b>KafkaConsumer#commitAsync()</b></font>
<font color="#e74f4c">异步提交出现问题不会重试</font>
处理方式
<b>消费者位移管理</b>
Kafka提供了消费者API,让消费者可以管理自己的位移
<b>Rebalance</b>
触发的条件
<b>消费者组内成员发生变更</b>:这个变更包括了增加和减少消费者,比如消费者宕机退出消费组
<b>主题的分区数发生变更:</b> kafka目前只支持增加分区,当增加的时候就会触发重平衡
<b>订阅的主题发生变化</b>:当消费者组使用<b>正则表达式订阅主题</b>,而恰好又新建了对应的主题,就会触发重平衡
<b><font color="#314aa4">为什么说重平衡为人诟病呢?</font></b>
因为<b><font color="#e74f4c">重平衡过程中,消费者无法从kafka消费消息</font>,这对kafka的TPS 影响极大</b>,而如果kafka集内节点较多,比如数百个,那重平衡可能会耗时极多。数分钟到数小时都有 可能,而这段时间kafka基本处于<b>不可用状态</b>。所以在实际环境中,应该尽量避免重平衡发生。
<b style=""><font color="#000000">尽可能的避免Rebalance</font></b>
<ul><li><span style="font-size: inherit;">在分布式系统中,通常是通过心跳来维持分布式系统的,kafka也不例外。</span></li><li><span style="font-size: inherit;">在分布式系统中,由于网络问题你不清楚没接收到心跳,是因为对方真正挂了还是只是因为负载过重没来得及发生心跳或是网络堵塞。所以一般会约定一个时间,超时即判定对方挂了。</span><b style="font-size: inherit;">而在kafka消费 者场景中,`<font color="#a23c73">session.timout.ms</font>`参数就是规定这个超时时间是多少</b><span style="font-size: inherit;">。</span></li><li><font color="#a23c73" style="font-size: inherit;"><b>`heartbeat.interval.ms</b></font><span style="font-size: inherit;">`,这个参数控制发送心跳的频率,频率越高越不容易被误判,但也会消耗更多资源。</span></li><li><span style="font-size: inherit;"><font color="#a23c73"><b>max.poll.interval.ms</b></font>`,消费者poll数据后,需要一些处理,再进行拉取。如果<b>两次拉取时间间隔超过这个参数设置的值,那么消费者就会被踢出消费者组</b>。也就是说,拉取,然后处理,这个处理的时间不能超过 `max.poll.interval.ms` 这个参数的值。这个参数的默认值是<b>5分钟</b>,而如果消费者接收到数据后会执行耗时的操作,则应该将其设置得大一些</span></li></ul>
控制心跳的超时时间
session.timout.ms
控制心跳的发送频率
heartbeat.interval.ms
控制poll的间隔
max.poll.interval.ms
合理的配置
<ul><li>session.timout.ms:设置为6s </li><li>heartbeat.interval.ms:设置2s </li><li>max.poll.interval.ms:推荐为消费者处理消息最长耗时再加1分钟</li></ul>
<b>反序列化</b>
<b>拦截器</b>
消费者在拉取了分区消息之后,要首先经过反序列化器对key和value进行反序列化处理。 处理完之后,如果消费端设置了拦截器,则需要经过拦截器的处理之后,才能返回给消费者应用程序进行处理
消费端定义消息拦截器,需要实现 <font color="#a23c73">org.apache.kafka.clients.consumer.ConsumerInterceptor </font>接口
<ul><li>一个可插拔接口,允许拦截甚至更改消费者接收到的消息。首要的用例在于将第三方组件引入 消费者应用程序,<b>用于定制的监控、日志处理等</b></li><li>该接口的实现类通过configre方法获取消费者配置的属性,如果消费者配置中没有指定 clientID,还可以获取KafkaConsumer生成的clientId。获取的这个配置是跟其他拦截器共享的,需要保证不会在各个拦截器之间产生冲突。</li><li><b>ConsumerInterceptor方法抛出的异常会被捕获、记录,但是不会向下传播</b>。如果用户配置了错误的key或value类型参数,消费者不会抛出异常,而仅仅是记录下来。</li><li>ConsumerInterceptor回调发生在 `<font color="#a23c73">org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)</font>`方法<b>同一个线程</b></li></ul>
消费者配置:
<b>消费者参数补充</b>
消费者组
<b>consumer group是kafka提供的<font color="#e74f4c">可扩展且具有容错性</font>的消费者机制</b>
<ul><li>1. 消费组有一个或多个消费者,消费者可以是一个进程,也可以是一个线程</li><li>2. group.id是一个字符串,唯一标识一个消费组</li><li>3. 消费组订阅的主题每个分区只能分配给消费组一个消费者。</li></ul>
<b>消费者位移</b>
<ul><li>消费者在消费的过程中记录已消费的数据,即消费位移(offset)信息(消费者组)。</li><li><span style="font-size: inherit;">每个消费组保存自己的位移信息,那么只需要简单的一个整数表示位置就够了;同时可以引入</span><b style="font-size: inherit;"> checkpoint机制</b><span style="font-size: inherit;">定期持久化。</span></li></ul>
<b>位移管理(offset management)</b>
自动 VS 手动
Kafka默认定期自动提交位移( <font color="#a23c73"><b>enable.auto.commit = true </b></font>),也手动提交位移。另外kafka会定期把group消费情况保存起来,做成一个 offset map
位移提交
位移是提交到Kafka中的<b> __consumer_offsets 主题</b>。 <b>__consumer_offsets</b>中的消息保存了每个消费组某一时刻提交的offset信息
<font color="#a23c73">kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server node1:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" -- consumer.config /opt/kafka_2.12-1.0.2/config/consumer.properties --frombeginning | head</font>
<ul><li>上图标出来的,表示消费组为<b> test-consumer-group</b> ,消费的主题为 <b>__consumer_offsets</b> , 消费的分区是4,偏移量为5。</li><li>__consumers_offsets 主题配置了<b>compact策略</b>,使得它总是能够保存最新的位移信息,既控制了该topic总体的日志容量,也能实现保存最新offset的目的</li></ul>
<b>再均衡 Rebalanace</b>
再均衡(Rebalance)本质上是一种<b>协议</b>,规定了一个消费组中所有消费者如何达成一致来分配订阅主题的每个分区
组内分配分区策略
RangeAssignor
RoundRobinAssignor
StickyAssignor
谁来执行再均衡和消费组管理 ?
<b>Group Coordinator(消费组协调器)</b>
如何确定coordinator ?
1、确定消费组位移信息写入 `<font color="#e74f4c">__consumers_offsets</font>` 的哪个分区。具体计算公式 : <br><b>__consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)</b><br>
groupMetadataTopicPartitionCount 由 <font color="#a23c73"><b>offsets.topic.num.partitions</b></font> 指定,默认是<b>50个分区</b>
2、该分区leader所在的broker就是组协调器
Rebalance Generation
它表示Rebalance之后主题分区到消费组中消费者<b>映射关系的一个版本</b>,主要是<b>用于保护消费组, 隔离无效偏移量提交的 ,</b><font color="#e74f4c">每次 Rebalance后,generation号会+1</font>
<b>协议(protocol)</b>
<b>Heartbeat请求</b>
consumer需要定期给组协调器发送心跳来表明自己还活着
<b>LeaveGroup请求</b>
主动告诉组协调器我要离开消费组
<b>SyncGroup请求</b>
消费组Leader把分配方案告诉组内所有成员
<b>joinGroup请求</b>
成员请求加入组
<b>DescribeGroup请求</b>
显示组的所有信息,包括成员信息,协议名称,分配方案,订阅信息等。通常该请求是给管理员使用
<span style="font-weight: normal;">消费者如何向消费组协调器证明自己还活着?(Liveness)</span>
通过定时向消费组协调器发送Heartbeat请求。如果超过了设定的超时时间,那么协调器认为该消费者已经挂了。一旦协调器认为某个消费者挂了,那么它就会开启新一轮再均衡,并且在当前其他消费者的心跳响应中添加“<font color="#e74f4c">REBALANCE_IN_PROGRESS</font>”,告诉 其他消费者:重新分配分区
<b>再均衡过程</b>
再均衡分为2步:<b>Join和Sync</b><br>
<b>① join, 加入组。</b>所有成员都向消费组协调器发送JoinGroup请求,请求加入消费组。一旦所有成员都发送了JoinGroup请求,协调器从中选择一个消费者担任Leader的角色,并把组成员信息以及订阅信息发给Leader
<b>② Sync,Leader开始分配消费方案,即哪个消费者负责消费哪些主题的哪些分区</b>。一旦完成分配,Leader会将这个方案封装进SyncGroup请求中发给消费组协调器,非Leader也会发 SyncGroup请求,只是内容为空。消费组协调器接收到分配方案之后会把方案塞进SyncGroup 的response中发给各个消费者
在协调器收集到所有成员请求前,它会把已收到请求放入一个叫p<b>urgatory(炼狱)</b>的地方。然后是分发分配方案的过程,即SyncGroup请求
<b>消费组状态机</b>
<ul><li><b>Dead:</b>组内已经没有任何成员的最终状态,组的元数据也已经被组协调器移除了。这种状态响应各种请求都是一个response: UNKNOWN_MEMBER_ID</li><li><b>Empty:</b>组内无成员,但是位移信息还没有过期。这种状态只能响应JoinGroup请求</li><li><b>PreparingRebalance:</b>组准备开启新的rebalance,等待成员加入</li><li><b>AwaitingSync</b>:正在等待leader consumer将分配方案传给各个成员 </li><li><b>Stable</b>:再均衡完成,可以开始消费</li></ul>
主题
主题的管理
kafka-topics.sh脚本
创建主题
查看主图
修改主题
删除主题
<font color="#e74f4c">删除是给主题加上标记,过一段时间后在删除</font>
增加分区
通过命令行工具操作,<b>主题的分区只能增加,不能减少</b>
分区副本的分配
<b>副本因子 = (主分区 + 副本分区)</b>
副本分配的三个目标
<ul><li>均衡地将副本分散于各个broker上</li><li>对于某个broker上分配的分区,它的其他副本在其他broker上</li><li>如果所有的broker都有机架(机房)信息,尽量将分区的各个副本分配到不同机架(机房)上的broker</li></ul>
在不考虑机架信息的情况下
1. 第一个副本分区通过<b>轮询</b>的方式挑选一个broker,进行分配。该轮询从broker列表的随机位置进行轮询。<br>2. 其余副本通过增加偏移进行分配。
实际上也不全是按照上面的规则,如下图的p5分区,偏移一个broker位在继续进行增量分配,这样做的目的是尽可能将分区分配的位置打散。随着分区数量的增加偏移的位置会越来越多。如果15个分区会偏移2个broker位置。这是个基于 broker数量的基础上。
考虑到机架信息
首先为每个机架创建一个broker列表
如: 三个机架(rack1,rack2,rack3),六个broker(0,1,2,3,4,5)
通过简单的轮询将分区分配给不同机架上的broker
<b>必要参数配置</b>
<b>KafkaAdminClient应用</b>
除了使用Kafka的bin目录下的脚本工具来管理Kafka,还可以使用管理Kafka的API将某些管理查看 的功能集成到系统中
<b>偏移量管理</b>
<font color="#e74f4c" style="">__consumer_offsets主题</font>中保存各个消费组的偏移量
早期由zookeeper管理消费组的偏移量<br>
通过原生 kafka 提供的工具脚本进行查询(<font color="#a23c73"><b>bin/kafka-consumer-groups.sh</b></font>)
分区
<b>副本机制</b>
<b><font color="#e74f4c">Leader副本负责读写,Follower副本只做消息的备份,提供高可用。<br></font>Follower分区像普通的Kafka消费者一样,消费来自Leader分区的消息,并将其持久化到自己的日志中</b><br>
<font color="#e74f4c">如果想扩展Topic的读写能力,就得多创建几个分区</font>
<font color="#a23c73"><b>--replication-factor 3 1leader+2follower</b></font>
同步节点定义
<ul><li>节点必须能够维持与ZooKeeper的会话(通过ZooKeeper的心跳机制) </li><li>对于Follower副本分区,它复制在Leader分区上的写入,并且不要延迟太多</li></ul>
<b>Kafka提供的保证是,只要有至少一个同步副本处于活动状态,提交的消息就不会丢失</b>
<b><font color="#314aa4">宕机如何恢复 ?</font></b>
少部分副本宕机
当leader宕机了,会从follower选择一个作为leader。当宕机的重新恢复时,会把之前commit的数据清空,重新从leader里pull数据
全部副本宕机
当全部副本宕机了有两种恢复方式 <br>1、等待ISR中的一个恢复后,并选它作为leader。(等待时间较长,降低可用性) <br>2、选择第一个恢复的副本作为新的leader,无论是否在ISR中。(并未包含之前leader commit的 数据,因此造成数据丢失)
<b>Leader选举</b>
如何选举?
<b>只有跟Leader保持同步的Follower才应该被选作新的Leader</b>。 Kafka会在Zookeeper上针对每个Topic维护一个称为<b>ISR(in-sync replica,已同步的副本)</b>的集合,该集合中是一些分区的副本。只有当这些副本都跟Leader中的副本同步了之后,kafka才会认为消息已提交,并反馈给消息的生产者
为什么不用少数服从多数的方法(过半机制)?
少数服从多数是一种比较常见的一致性算发和Leader选举法。 它的含义是只有超过半数的副本同步了,系统才会认为数据已同步; 选择Leader时也是从超过半数的同步的副本中选择。 这种算法需要较高的冗余度,跟Kafka比起来,<b><font color="#e74f4c">浪费资源</font></b>。 譬如只允许一台机器失败,需要有三个副本;而如果只容忍两台机器失败,则需要五个副本。 而kafka的ISR集合方法,分别只需要两个和三个副本
如果所有的ISR副本都失败了怎么办?
分区重新分配
自动再均衡
物理存储
日志存储概述
<b>Kafka 消息是以主题为单位进行归类</b>,各个主题之间是彼此独立的,互不影响。 每个主题又可以分为一个或多个分区。每个分区各自存在一个记录消息数据的日志文件
在分区日志文件中,你会发现很多类型的文件,比 如: <b>.index、.timestamp、.log、.snapshot</b>等。<br>其中,文件名一致的文件集合就称为 <b>LogSement</b>。
<b>LogSegment</b>
<ul><li>分区日志文件中包含很多的 LogSegment </li><li>Kafka 日志追加是<b>顺序写入</b>的</li><li>LogSegment 可以减小日志文件的大小</li><li>进行日志删除的时候和数据查找的时候可以快速定位。</li><li>ActiveLogSegment 是活跃的日志分段,拥有文件拥有写入权限,其余的 LogSegment 只有只读的权限</li></ul>
类别
日志文件存在多种后缀文件,重点需要关注 <font color="#e74f4c">.index、.timestamp、.log</font>三种类型
<b>每个 LogSegment 都有一个基准偏移量,表示当前 LogSegment 中第一条消息的 offset</b>
偏移量是一个 64 位的长整形数,固定是20位数字,长度未达到,用 0 进行填补,索引文件和日志 文件都由该作为文件名命名规(00000000000000000000.index、 00000000000000000000.timestamp、00000000000000000000.log
如果日志文件名为 00000000000000000121.log ,则当前日志文件的一条数据偏移量就是 121(偏移量从 0 开始)
<b>日志与索引文件</b>
配置项默认值说明
切分文件
大小
时间
<b><font color="#314aa4">为什么是 Integer.MAX_VALUE ?</font></b>
1024 * 1024 * 1024=1073741824 在偏移量索引文件中,<b>每个索引项共占用 8 个字节,并分为两部分。 相对偏移量和物理地址</b>
<ul><li><b>相对偏移量</b>:表示消息相对与基准偏移量的偏移量,占 4 个字节</li><li><b>物理地址</b>:消息在日志分段文件中对应的物理位置,也占 4 个字节</li></ul>
<b>索引文件切分过程</b>
<ul><li>索引文件会根据 <font color="#e74f4c">log.index.size.max.bytes</font> 值进行预先分配空间,即文件创建的时候就是最大值。</li><li>当真正的进行索引文件切分的时候,才会将其裁剪到实际数据大小的文件。 这一点是跟日志文件有所区别的地方。其意义降低了代码逻辑的复杂性</li></ul>
日志存储
索引文件
<ul><li><b>偏移量索引文件用于记录消息偏移量与物理地址之间的映射关系。</b></li><li><b>时间戳索引文件则根据时间戳查找对应的偏移量。</b></li></ul>
<b>文件</b>
<ul><li><b>log文件名是以文件中第一条message的offset来命名的</b>,<b>实际offset长度是64位</b>,但是这里只使用了20位,应付生产是足够的。</li><li>一组index+log+timeindex文件的名字是一样的,并且log文件默认写满<b>1G</b>后,会进行log rolling形成一个新的组合来记录消息,这个是通过broker端 `<font color="#e74f4c">log.segment.bytes =1073741824</font>`指定的。</li><li>index和timeindex在刚使用时会分配<b>10M</b>的大小,当进行 log rolling 后,它会修剪为实际的大小。</li></ul>
查看存储文件
如果想查看这些文件,可以使用kafka提供的shell来完成,几个关键信息如下
(1)offset是逐渐增加的整数,每个offset对应一个消息的偏移量。 <br>(2)position:消息批字节数,用于计算物理地址。 <br>(3)CreateTime:时间戳。 <br>(4)magic:2代表这个消息类型是V2,如果是0则代表是V0类型,1代表V1类型。 <br>(5)compresscodec:None说明没有指定压缩类型,kafka目前提供了4种可选择,0-None、1- GZIP、2-snappy、3-lz4。 <br>(6)crc:对所有字段进行校验后的crc值。
<b>偏移量</b><br>
offset 与 position 没有直接关系,因为会删除数据和清理日志
<font color="#314aa4"><b>如何查看偏移量为23的消息 ?</b></font>
Kafka 中存在一个 ConcurrentSkipListMap 来保存在每个日志分段,通过<b>跳跃表</b>方式,定位到在 00000000000000000000.index ,通过<b>二分法</b>在偏移量索引文件中找到不大于 23 的最大索引项,即offset 20 那栏,然后从日志分段文件中的物理位置为320 开始顺序查找偏移量为 23 的消息。
<b>时间戳</b>
<b>通过时间戳方式进行查找消息,需要通过查找时间戳索引和偏移量索引两个文件。</b>
时间戳索引索引格式:<b><font color="#e74f4c">前八个字节表示时间戳,后四个字节表示偏移量</font></b>
<font color="#e74f4c">timestamp文件中的 offset 与 index 文件中的 relativeOffset 不是一一对应的,因为数据的 写入是各自追加。</font>
<b><font color="#314aa4">查找时间戳为 1557554753430 开始的消息?</font></b>
1. 查找该时间戳应该在哪个日志分段中。将1557554753430和每个日志分段中最大时间戳largestTimeStamp逐一对比,直到找到不小于1557554753430所对应的日志分段。日志分段中的largestTimeStamp的计算是:先查询该日志分段所对应时间戳索引文件,找到最后一条索引项,若最后一条索引项的时间戳字段值大于0,则取该值,否则取该日志分段的最近修改时间。<br>2. 查找该日志分段的偏移量索引文件,查找该偏移量对应的物理地址。<br>3. 日志文件中从 320 的物理位置开始查找不小于 1557554753430 数据。
清理
Kafka 提供两种日志清理策略: <br><ul><li><b>日志删除(delete)</b>:按照一定的删除策略,将不满足条件的数据进行数据删除 </li><li><b>日志压缩(compact)</b>:针对每个消息的 Key 进行整合,对于有相同 Key 的不同 Value 值,只保留最后一个版本。</li></ul>
Kafka 提供 <font color="#a23c73">log.cleanup.policy </font>参数进行相应配置,默认值: <b>delete </b>,还可以选择 <b>compact</b><br>主题级别的配置项是 <font color="#a23c73">cleanup.policy</font><br>
<b>日志删除</b>
基于时间
日志删除任务会根据 <font color="#a23c73"><b>log.retention.hours/log.retention.minutes/log.retention.ms</b></font> 设定日志保留的时间节点。如果超过该设定值,就需要进行删除。<b>默认是 7 天</b>, <b>log.retention.ms 优先级最高</b>
删除过程
1、从日志对象中所维护日志分段的跳跃表中移除待删除的日志分段,保证没有线程对这些日志分段进行读取操作。<br>2、这些日志分段所有文件添加 上 <b>.delete 后缀</b>。<br>3、交由一个以 "<b>delete-file" 命名的延迟任务</b>来删除这些 <b>.delete 为后缀</b>的文件。延迟执行时间可以通过 <font color="#a23c73"><b>file.delete.delay.ms</b></font> 进行设置
基于日志大小
日志删除任务会检查当前日志的大小是否超过设定值。设定项为 `<font color="#a23c73"><b>log.retention.bytes</b></font>` ,单个日志分段的大小由 `<b><font color="#a23c73">log.segment.bytes</font>`</b> 进行设定
删除过程
1、计算需要被删除的日志总大小 (当前日志文件大小(所有分段)减去retention值)。<br>2、从日志文件第一个 LogSegment 开始查找可删除的日志分段的文件集合。<br>3、执行删除。
基于偏移量
根据<b>日志分段的下一个日志分段的起始偏移量</b>是否大于等于日志文件的起始偏移量,若是,则可以删除此日志分段
<b>日志压缩策略</b>
日志压缩是Kafka的一种机制,可以提供较为细粒度的记录保留,而不是基于粗粒度的基于时间的保留<br><font color="#e74f4c"><b>对于具有相同的Key,而数据不同,只保留最后一条数据,前面的数据在合适的情况下删除</b></font><br>
主题的 <font color="#a23c73"><b>cleanup.policy = compact</b></font>。
<b>Kafka的后台线程会定时将Topic遍历两次</b>
1、记录每个key的hash值最后一次出现的偏移量<br>2、第二次检查每个offset对应的Key是否在后面的日志中出现过,如果出现了就删除对应的日志(保留最新的数据)。
压缩是在Kafka后台通过<b>定时重新打开Segment</b>来完成的
日志压缩可以确保
任何保持在日志头部以内的使用者都将看到所写的每条消息,这些消息将具有顺序偏移量。可以使用Topic的<font color="#a23c73"><b>`min.compaction.lag.ms</b></font>`属性来保证消息在被压缩之前必须经过的最短时间。 也就是说,它为每个消息在(未压缩)头部停留的时间提供了一个下限。可以使用Topic的 `<font color="#a23c73"><b>max.compaction.lag.ms</b></font>`属性来保证从收到消息到消息符合压缩条件之间的最大延时。
<font color="#e74f4c">消息始终保持顺序,压缩永远不会重新排序消息,只是删除一些而已</font>
消息的偏移量永远不会改变,它是日志中位置的永久标识符
从日志开始的任何使用者将至少看到所有记录的最终状态,按记录的顺序写入。另外,如果使用者在比Topic的 <font color="#a23c73"><b>log.cleaner.delete.retention.ms </b></font>短的时间内到达日志的头部,则会看到已删除记录的所有delete标记。保留时间默认是24小时。
默认情况下,启动日志清理器,若需要启动特定Topic的日志清理,请添加特定的属性。配置日志清理器
<b><font color="#a23c73">`log.cleanup.policy`</font></b> 设置为 compact ,Broker的配置,影响集群中所有的Topic。
<b><font color="#a23c73">`log.cleaner.min.compaction.lag.ms</font></b>` ,用于防止对更新超过最小消息进行压缩,如果没有设置,除最后一个Segment之外,所有Segment都有资格进行压缩
<font color="#a23c73"><b> `log.cleaner.max.compaction.lag.ms`</b></font> ,用于防止低生产速率的日志在无限制的时间内不压缩。
<b>Kafka的日志压缩原理并不复杂,就是<font color="#e74f4c">定时把所有的日志读取两遍,写一遍</font>,而CPU的速度超过磁盘完全不是问题,只要日志的量对应的读取两遍和写入一遍的时间在可接受的范围内,那么它的性能就是可以接受的</b>
磁盘存储
<b>零拷贝</b>
传统IO
NIO优化
<font color="#b71c1c"><b>DirectByteBuf 直接内存</b></font><br>
进一步优化(底层采用了 <b><font color="#b71c1c">linux 2.1 后提供的 `sendFile` 方法</font></b>),java 中对应着两个 channel 调用 <b><font color="#e74f4c">t</font><font color="#e74f4c" style="">ransferTo/transferFrom</font></b><font color="#b71c1c" style="font-weight: bold;"> </font>方法拷贝数据
进一步优化(<b><font color="#b71c1c">linux 2.4</font></b>)
零拷贝优点
<ul><li>更少的用户态与内核态的切换</li><li>不利用 cpu 计算,减少 cpu 缓存伪共享</li><li>零拷贝适合小文件传输</li></ul>
kafka的两个过程<br>
<b>网络数据持久化到磁盘 (Producer 到 Broker)</b>
磁盘数据通过<b>DMA(Direct Memory Access,直接存储器访问) </b>拷贝到内核态 Buffer,直接通过 DMA 拷贝到 NIC Buffer(socket buffer),无需 CPU 拷贝。<br>除了减少数据拷贝外,整个读文件 ==> 网络发送由一个 <b>`sendfile`</b> 调用完成,整个过程只有两次上下 文切换,因此大大提高了性能
Java NIO对sendfile的支持就是<font color="#e74f4c"> FileChannel.transferTo()/transferFrom()</font><br>
<b>磁盘文件通过网络发送(Broker 到 Consumer)</b>
把磁盘文件读取OS内核缓冲区后的fileChannel,直接转给socketChannel发送;底层就是 `<b>sendfile</b>`。消费者从broker读取数据,就是由此实现
具体来看,Kafka 的数据传输通过 TransportLayer 来完成,其子类 PlaintextTransportLayer 通过 Java NIO 的 FileChannel 的 <b>transferTo </b>和 <b>transferFrom </b>方法实现零拷贝。<br><br>
<b>页缓存</b>
<ul><li>页缓存是操作系统实现的一种主要的<b>磁盘缓存,以此用来减少对磁盘 I/O 的操作</b>。 具体来说,就是<b>把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。<br></b></li><li>Kafka接收来自socket buffer的网络数据,应用进程不需要中间处理、直接进行持久化时。可以使用<b>mmap内存文件映射</b></li></ul>
<font color="#e74f4c">Memory Mapped Files 简称mmap</font>,简单描述其作用就是:<b>将磁盘文件映射到内存, 用户通过修改内存就能修改磁盘文件<br><br></b>它的工作原理是直接利用操作系统的Page来实现磁盘文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)<b><br></b>
<b><font color="#e74f4c">消息先被写入页缓存,由操作系统负责刷盘任务。</font></b>
<ul><li>通过mmap,进程像读写硬盘一样读写内存(当然是虚拟机内存)。使用这种方式可以获取很大的 I/O提升,省去了用户空间到内核空间复制的开销。</li><li>mmap也有一个很明显的缺陷:<b>不可靠</b>,写到mmap中的数据并没有被真正的写到硬盘,<b>操作系统会在程序主动调用flush的时候才把数据真正的写到硬盘</b></li></ul>
Kafka提供了一个参数 `<font color="#a23c73"><b>producer.type</b></font>` 来控制是不是主动flush; <br><br><ul><li>如果Kafka<b>写入到mmap之后就立即`flush`然后再返回Producer叫同步(`sync`)</b>; </li><li><b>写入mmap之后立即返回Producer不调用flush叫异步(`async`)</b>。</li></ul>
Java NIO对文件映射的支持
Java NIO,提供了一个<font color="#e74f4c">MappedByteBuffer</font> 类可以用来实现内存映射。 <font color="#e74f4c">MappedByteBuffer</font>只能通过调用<b>FileChannel.map()</b>取得,再没有其他方式。<br>`FileChannel.map()`是抽象方法,具体实现是在 `<b>FileChannelImpl.map()</b>`可自行查看JDK源码,其 <b>map0()</b>方法就是调用了Linux内核的mmap的API。
使用 MappedByteBuffer类要注意的是 mmap的文件映射,在<b>Full gc</b>时才会进行释放。当close时,<b>需要手动清除内存映射文件</b>,可以反射调用<font color="#e74f4c">sun.misc.Cleaner</font>方法
<b>一个进程读取磁盘上的文件</b>
<ul><li>1、 操作系统会先查看待读取的数据所在的页 (page)是否在<b>页缓存(pagecache)</b>中,如果存在(命中) 则直接返回数据,从而避免了对物理磁盘的 I/O 操作; </li><li>2、如果没有命中,则操作系统会向磁盘发起读取请求并将读取的数据页存入页缓存,之后再将数据返回给进程。</li></ul>
<b>一个进程将数据写入磁盘</b>
<ul><li>1、操作系统也会检测数据对应的页是否在页缓存中,如果不存在,则会先在页缓存中添加相应的页,最后将数据写入对应的页。 </li><li>2、被修改过后的页也就变成了脏页,操作系统会在合适的时间把脏页中的数据写入磁盘,以保持数据的一致性</li></ul>
<ul><li><b>对一个进程而言,它会在进程内部缓存处理所需的数据,然而这些数据有可能还缓存在操作系统的页缓存中</b>,因此同一份数据有可能被缓存了两次。并且,除非使用Direct I/O的方式, 否则页缓存很难被禁止。 </li><li>当使用页缓存的时候,即使<b>Kafka服务重启, 页缓存还是会保持有效,然而`进程内的缓存却需要重建`</b>。这样也极大地简化了代码逻辑,因为维护页缓存和文件之间的一致性交由操作系统来负责,这样会比进程内维护更加安全有效。</li></ul>
<font color="#e74f4c"><b>Kafka中大量使用了页缓存,这是 Kafka 实现高吞吐的重要因素之一</b></font>
<b>顺序写入</b>
Kafka 在设计时采用了<b>文件追加的方式</b>来写入消息,即只能在日志文件的尾部追加新的消息,并且也<b>不允许修改已写入的消息</b>,这种方式属于典型的顺序写盘的操作,所以就算 Kafka 使用磁盘作为存储介质,也能承载非常大的吞吐量
操作系统可以针对线性读写做深层次的优化,比如<b>预读(read-ahead,提前将一个比较大的磁盘块读入内存)</b> 和<b>后写(write-behind,将很多小的逻辑写操作合并起来组成一个大的物理写操作)</b>技术。
Kafka速度快的原因<br>
<b><font color="#314aa4">mmap和sendfile</font></b>
Linux内核提供、实现零拷贝的API;
sendfile 是将读到内核空间的数据,转到socket buffer,进行网络发送;
mmap将磁盘文件映射到内存,支持读和写,对内存的操作会反映在磁盘文件上。
<font color="#e74f4c">RocketMQ 在消费消息时,使用了mmap。kafka 使用了 sendFile。</font>
<b><font color="#314aa4">Kafka速度快的原因 ?</font></b><br>
partition`<b>顺序读写</b>`,充分利用磁盘特性,这是基础;
Producer生产的数据持久化到broker,采用`<b>mmap文件映射</b>`,实现顺序的快速写入;
Customer从broker读取数据,采用`<b>sendfile</b>`,将磁盘文件读到OS内核缓冲区后,直接转到 socket buffer进行网络发送。
读写数据的<b>批量batch处理</b>以及<b>压缩传输</b>
稳定性
<b>事务</b>
控制器
<b>控制器就是一个broker。<br>控制器除了一般broker的功能,还负责Leader分区的选举。</b>
<b>实现方式</b>
<b>借助zk Watch机制(类似于分布式锁)</b>
集群里第一个启动的broker在Zookeeper中创建<b>临时节点 /controller</b>。 其他broker在该控制器节点创建Zookeeper watch对象,使用Zookeeper的监听机制接收该节点的变更。
每个新选出的控制器通过 Zookeeper 的条件递增操作获得一个全新的、数值更大的<b> controller epoch</b>。其他 broker 在知道当前 controller epoch 后,如果<b>收到由控制器发出的包含较旧epoch 的消息,就会忽略它们,以防止“脑裂”</b>
<b>Leader分区的选举也是基于 zk Watch机制(监听leader分区节点)</b>
当控制器发现一个 broker 已经离开集群,那些失去Leader副本分区的Follower分区需要一个新 Leader(这些分区的首领刚好是在这个 broker 上)<br><ul><li>控制器需要知道哪个broker宕机了?</li><li>控制器需要知道宕机的broker上负责的时候哪些分区的Leader副本分区?</li></ul>
<b>结论</b>
<ul><li>1、Kafka使用 Zookeeper 的分布式锁选举控制器,并在节点加入集群或退出集群时通知控制器 </li><li>2、控制器负责在节点加入或离开集群时进行分区Leader选举</li><li>3、控制器使用epoch来避免“脑裂”。“脑裂”是指两个节点同时认为自己是当前的控制器</li></ul>
可靠性保证(副本机制)<br>
副本的分配
当某个topic的 <font color="#a23c73"><b>--replication-factor</b></font> 为N(N>1)时,每个Partition都有N个副本,称作replica。原则上是将replica均匀的分配到整个集群上。不仅如此,partition的分配也同样需要均匀分配,为了更好的负载均衡
副本分配的三个目标
<ul><li>均衡地将副本分散于各个broker上 </li><li>对于某个broker上分配的分区,它的其他副本在其他broker上 </li><li>如果所有的broker都有机架信息,尽量将分区的各个副本分配到不同机架上的broker</li></ul>
<b>失效副本</b>
当ISR中的一个Follower副本滞后Leader副本的时间超过参数 `<font color="#a23c73"><b>replica.lag.time.max.ms【默认 10000】</b></font>` 指定的值 时即判定为副本失效,需要将此Follower副本剔出除ISR。
<b>副本复制</b><br>
日志复制算法(log replication algorithm)必须提供的基本保证是,如果它告诉客户端消息已被提交,而<font color="#e74f4c">当前Leader出现故障,新选出的Leader也必须具有该消息</font>。在出现故障时,Kafka会从挂掉 Leader的ISR里面选择一个Follower作为这个分区新的Leader
<b>ACKS=ALL</b>,只有将消息成功复制到所有同步副本(ISR)后,这条消息才算被提交
<b><font color="#314aa4">什么情况下会导致一个副本与 leader 失去同步</font></b>
<b>慢副本(Slow replica)</b>:follower replica 在一段时间内一直无法赶上 leader 的写进度。造成这种情况的最常见原因之一是 <font color="#e74f4c">follower replica 上的 I/O瓶颈</font>,导致它持久化日志的时间比它从 leader 消费消息的时间要长;
<b>卡住副本(Stuck replica)</b>:follower replica 在很长一段时间内停止从 leader 获取消息。 这可能是以为 <font color="#e74f4c">GC 停顿,或者副本出现故障;</font>
<b>刚启动副本(Bootstrapping replica)</b>:当用户给某个主题增加副本因子时,新的 follower replicas 是不同步的,直到它跟上 leader 的日志。
一致性保证
概念
<b>水位标记 (HW)</b>
水位或水印(watermark)一词,表示位置信息,即位移(offset)。Kafka源码中使用的名字是高水位,HW(high watermark)
<b>副本角色</b>
Kafka分区使用多个副本(replica)提供高可用
<b>LEO & HW</b>
每个分区副本对象都有两个重要的属性:LEO和HW<br><ul><li><b>- LEO:</b>即日志末端位移(log end offset),记录了<font color="#e74f4c">该副本日志中下一条消息的位移值</font>。如果 LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]。另外,Leader LEO和 Follower LEO的更新是有区别的。 </li><li><b>- HW</b>:即上面提到的水位值。<font color="#e74f4c">对于同一个副本对象而言,其HW值不会大于LEO值</font>。小于等于 HW值的所有消息都被认为是“已备份”的(replicated)。Leader副本和Follower副本的HW更新不同。</li></ul>
<b>Follower副本何时更新LEO ? </b>
Follower副本不停地向Leader副本所在的broker发送<b>FETCH请求</b>,一旦获取消息后写入自己的日志中进行备份。那么Follower副本的LEO是何时更新的呢?首先我必须言明,<font color="#e74f4c">Kafka有两套Follower副本 LEO</font>:<br><ul><li>Follower副本所在Broker的副本管理机中</li><li>Leader副本所在Broker的副本管理机中。Leader副本机器上保存了所有的 follower副本的LEO</li></ul>
<b>Kafka使用前者帮助Follower副本更新其HW值;利用后者帮助Leader副本更新其HW</b>
Follower副本的本地LEO更新
Follower副本的LEO值就是日志的LEO值,每当新写入一条消息,LEO值就会被更新。当 Follower发送FETCH请求后,Leader将数据返回给Follower,此时Follower开始Log写数据, 从而自动更新LEO值。
Leader端Follower的LEO更新
Leader端的Follower的LEO更新发生在Leader在处理Follower FETCH请求时。一旦Leader接收到Follower发送的FETCH请求,它先从Log中读取相应的数据,给Follower返回数据前,先更新Follower的LEO
<b>Follower副本何时更新HW ?</b>
Follower更新HW发生在其更新LEO之后,一旦Follower向Log写完数据,尝试更新自己的HW值。 比较当前LEO值与FETCH响应中Leader的HW值,取两者的小者作为新的HW值(<font color="#e74f4c">Follower HW值不会大于Leader的HW值</font>)
<b>Leader副本何时更新LEO ?</b>
和Follower更新LEO相同,Leader写Log时自动更新自己的LEO值
<b>Leader副本何时更新HW值 ?</b>
<font color="#e74f4c">Leader的HW值就是分区HW值,直接影响分区数据对消费者的可见性</font>
Leader会尝试去更新分区HW的四种情况
Follower副本成为Leader副本时:Kafka会尝试去更新分区HW。
Broker崩溃导致副本被踢出ISR时:检查下分区HW值是否需要更新是有必要的。
生产者向Leader副本写消息时:因为写入消息会更新Leader的LEO,有必要检查HW值是否需 要更新
Leader处理Follower FETCH请求时:首先从Log读取数据,之后尝试更新分区HW值
<b>结论: </b><br>
当Kafka broker都正常工作时,分区HW值的更新时机有两个: <br><ul><li><b>Leader处理PRODUCE请求时</b></li><li><b>Leader处理FETCH请求时。</b></li></ul>
Leader如何更新自己的HW值?
Leader broker上保存了一套Follower副本的LEO以及自己的LEO。 当尝试确定分区HW时,它会选出所有<b>满足条件的副本</b>,比较它们的LEO(包括Leader的LEO),并<font color="#e74f4c">选择最小的LEO值作为HW值</font>
<b>需要满足的条件</b>
1、处于ISR中 <br>2、副本LEO落后于Leader LEO的时长不大于 <font color="#a23c73"><b>replica.lag.time.max.ms</b></font> 参数值(默认是10s)【使得OSR可以晋升到ISR】
如果Kafka只判断第一个条件的话,确定分区HW值时就不会考虑这些未在ISR中的副本,但这些副本已经具备了“<b>立刻进入ISR”的资格</b>,因此就可能出现分区HW值越过ISR中副本LEO的情况——不允许。 因为分区HW定义就是ISR中所有副本LEO的最小值
<b>HW和LEO正常更新案例</b>
<b>HW和LEO异常案例</b>
Kafka使用HW值来决定副本备份的进度,而<font color="#e74f4c"><b>HW值的更新通常需要额外一轮FETCH RPC才能完成。</b></font> 但这种设计是有问题的,可能引起的问题包括:<br><ul><li>备份数据丢失</li><li>备份数据不一致</li></ul>
数据丢失
Leader和Follower数据离散
<b>Leader Epoch使用</b>
Kafka解决方案:造成上述两个问题的根本原因在于:<br><ul><li>HW值被用于衡量副本备份的成功与否。 </li><li>在出现失败重启时作为日志截断的依据</li></ul>
HW值的更新是异步延迟的,特别是需要额外的FETCH请求处理流程才能更新,故这中间发生的 任何崩溃都可能导致HW值的过期。<br>Kafka从0.11引入了 leader epoch 来取代HW值。Leader端使用<b>内存</b>保存Leader的epoch信息
所谓Leader epoch实际上是一对值:<br><ul><li>1、epoch表示Leader的版本号,从0开始,Leader变更过1次,epoch+1 </li><li>2.、offset对应于该epoch版本的Leader写入第一条消息的offset。因此假设有两对值 <0, 0> <1, 120>,则表示第一个Leader从位移0开始写入消息;共写了120条[0, 119];而第二个Leader版本号是1, 从位移120处开始写入消息。</li></ul>
1、 Leader broker中会保存这样的一个缓存,并定期地写入到一个 checkpoint 文件中。<br>2、当Leader写Log时它会尝试更新整个缓存:如果这个Leader首次写消息,则会在缓存中增加一 个条目;否则就不做更新。 <br>3、 每次副本变为Leader时会查询这部分缓存,获取出对应Leader版本的位移,则不会发生数据不一致和丢失的情况
<b>规避数据丢失</b>
<b>规避数据不一致</b>
消息重复消费
<b>生产者阶段</b>
生产者消息重复场景
<b>生产者发送的消息没有收到正确的broke响应,导致生产者重试</b>。 生产者发出一条消息,broke落盘以后因为网络等种种原因发送端得到一个发送失败的响应或者网络中断,然后生产者收到一个<b>可恢复的Exception重试消息</b>导致消息重复
重试过程
<ul><li>new KafkaProducer()后创建一个<b>后台线程KafkaThread</b>扫描<b>RecordAccumulator</b>中是否有消息;</li><li>调用KafkaProducer.send()发送消息,实际上只是把消息保存到RecordAccumulator中;</li><li>后台线程KafkaThread扫描到RecordAccumulator中有消息后,将消息发送到kafka集群;</li><li>如果发送成功,那么返回成功;</li><li>如果发送失败,那么判断是否允许重试。如果不允许重试,那么返回失败的结果;如果允许重试,把消息再保存到RecordAccumulator中,等待后台线程KafkaThread扫描再次发送;</li></ul>
可恢复异常说明
异常是`<b>RetriableException</b>`类型或者`<b>TransactionManager</b>`允许重试
重复解决方案
<b>启动kafka的幂等性</b>
要启动kafka的幂等性,设置: `<font color="#a23c73"><b>enable.idempotence=true</b></font>` ,以及 `<font color="#a23c73"><b>ack=all`</b></font> 以及 <font color="#a23c73"><b>`retries > 1</b></font>` 。
<b>ack=0,不重试</b>
可能会丢消息,适用于吞吐量指标重要性高于数据丢失,例如:日志收集
<b>生产者-Broker阶段</b>
消息丢失场景
解决
<b>消费者阶段</b>
重复场景
数据消费完没有及时提交offset到broker
解决方案
<ul><li><b>取消自动提交 </b>:每次消费完或者程序退出时手动提交。这可能也没法保证一条重复。</li><li><b>做幂等:</b> 一般是让下游做幂等或者尽量每消费一条消息都记录offset,对于少数严格的场景可能需要把offset 或唯一ID(例如订单ID)和下游状态更新放在同一个数据库里面做事务来保证精确的一次更新或者在下 游数据表里面同时记录消费offset,然后更新下游数据的时候用消费位移做乐观锁拒绝旧位移的数据更新</li></ul>
_consume_offset
<b>Zookeeper不适合大批量的频繁写入操作。<br>Kafka 1.0.2将consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets主题<br></b>
默认情况下<b>__consumer_offsets有50个分区</b>
延迟队列
两个follower副本都已经拉取到了leader副本的最新位置,此时又向leader副本发送拉取请求,而 leader副本并没有新的消息写入,那么此时leader副本该如何处理呢?可以直接返回空的拉取结果给 follower副本,不过在leader副本一直没有新消息写入的情况下,follower副本会一直发送拉取请求,并且总收到空的拉取结果,消耗资源。
Kafka在处理拉取请求时,会先读取一次日志文件,如果收集不到足够多(fetchMinBytes,由参数 <b>fetch.min.bytes</b>配置,默认值为1)的消息,那么就会创建一个<b>延时拉取操作(DelayedFetch)</b>以等待拉取到足够数量的消息
重试队列
kafka没有重试机制不支持消息重试,也没有死信队列,因此使用kafka做消息队列时,需要自己实现消息重试的功能
<b>实现</b>
创建新的kafka主题作为重试队列: <br>1、创建一个topic作为重试topic,用于接收等待重试的消息 <br>2、普通topic消费者设置待重试消息的下一个重试topic<br>3、从重试topic获取待重试消息储存到redis的zset中,并以下一次消费时间排序 <br>4、定时任务从redis获取到达消费事件的消息,并把消息发送到对应的topic <br>5、同一个消息重试次数过多则不再重试
具体代码实现查看 md文档
<b><font color="#314aa4">Kafka速度快的原因 ?</font></b><br>
partition`<b>顺序读写</b>`,充分利用磁盘特性,这是基础;
Producer生产的数据持久化到broker,采用`<b>mmap文件映射</b>`,实现顺序的快速写入;
Customer从broker读取数据,采用`<b>sendfile</b>`,将磁盘文件读到OS内核缓冲区后,直接转到 socket buffer进行网络发送。
读写数据的批量batch处理以及压缩传输
Kafka集群
集群架构
线上环境
线上环境规划
JVM参数调配
kafka是scala语言开发,运行在JVM上,需要对JVM参数合理设置,参看JVM调优专题<br>修改<font color="#a23c73"><b>bin/kafka-start-server.sh</b></font>中的jvm设置,假设机器是32G内存,可以如下设置
<font color="#a23c73"><b>export KAFKA_HEAP_OPTS="-Xmx16G -Xms16G -Xmn10G -XX:MetaspaceSize=256M -XX:+UseG1GC -XX:MaxGCPauseMillis=50 -XX:G1HeapRegionSize=16M"</b></font>
这种大内存的情况一般都要用G1垃圾收集器,因为年轻代内存比较大,用G1可以设置GC最大停顿时间,不至于一次minor gc就花费太长时间,当然,因为像kafka,rocketmq,es这些中间件,写数据到磁盘会用到操作系统的<b>page cache</b>,所以JVM内存不宜分配过大,需要给操作系统的缓存留出几个G
<b>分区数越多吞吐量越高吗</b>
可以用kafka压测工具自己测试分区数不同,各种情况下的吞吐量
从压测结果来看,分区数到达某个值吞吐量反而开始下降,实际上很多事情都会有一个临界值,当超过这个临界值之后,很多原本符合既定逻辑的走向又会变得不同。一般情况分区数跟集群机器数量相当就差不多了(吞吐量的数值和走势还会和<b>磁盘、文件系统、 I/O调度策略</b>等因素相关)<br>
0 条评论
下一页
为你推荐
查看更多