Kafka
2023-06-13 20:39:07 1 举报
AI智能生成
登录查看完整内容
Kafka
作者其他创作
大纲/内容
是一个font color=\"#e74f4c\
是什么?
以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition(分区)内的消息顺序传输。
同时支持离线数据处理(持久化)和实时数据处理(消费)。
支持在线水平扩展
Kafka主要设计目标
单机每秒处理几十上百万的消息量。即使存储了许多TB的消息,它也保持稳定的性能
高吞吐量
单节点支持上千个客户端,并保证零停机和零数据丢失。
高性能
将消息持久化到磁盘。通过将数据持久化到硬盘以及replication防止数据丢失。
零拷贝
顺序读写
Page Cache
持久化数据存储
无需停机即可扩展机器
分布式系统,易于向外扩展
Kafka是分布式,分区,复制和容错的
可靠性
消息被处理的状态是在Consumer端维护,而不是由server端维护。当失败时能自动平衡(再平衡)
客户端状态维护
支持online【实时数据消费】和 offline 【离线数据持久化处理】场景
支持多种客户端语言,Kafka支持Java、.NET、PHP、Python等多种语言
优势
一个公司可以用Kafka可以收集各种服务的Log,通过Kafka以统一接口服务的方式开放 给各种Consumer
日志收集
解耦生产者和消费者、缓存消息等
消息系统
Kafka经常被用来记录Web用户或者App用户的各种活动,如浏览网页、搜索、点击 等活动,这些活动信息被各个服务器发布到Kafka的Topic中,然后消费者通过订阅这些Topic来做实时的监控分析,亦可保存到数据库;
用户活动跟踪
Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的 集中反馈,比如报警和报告
运营指标(监控数据)
Spark Streaming和Storm。
流式处理
应用场景
介绍
为了提高效率,消息被分批写入Kafka。批次就是一组消息
消息和批次
xml
简单,可视性好
json
Avro提供了一种紧凑的序列化格式,模式和消息体分开。当模式发生变化时,不需要重新生成代码,它还支持强类型和模式进化,其版本既向前兼容,也向后兼容
性能高
Avro(kafka使用)
模式(数据格式)
Kafka的消息通过主题进行分类。主题可比是数据库的表或者文件系统里的文件夹。主题可以被分为若干分区,一个主题通过分区分布于Kafka集群中,提供了横向扩展的能力。
主题和分区
直接指定消息的分区
根据消息的key散列取模得出分区
轮询指定分区
生产者在默认情况下把消息均衡地分布到主题的所有分区上
消费者通过偏移量来区分已经读过的消息,从而消费消息。 消费者是消费组的一部分。消费组保证每个分区只能被一个消费者使用,避免重复消费
生产者、消费者
一个独立的Kafka服务器称为broker。broker接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker为消费者提供服务,对读取分区的请求做出响应,返回已经提交到磁盘上的消息。单个broker可以轻松处理数千个分区以及每秒百万级的消息量
Broker 和 集群
基本架构
该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到 当前用于追加数据的 segment 文件中
轮询(默认)
生产者会把消息直接写到指定的分区。这通常是通过消息键和分区器来实现的,分区器为键生成一个散列值,并将其映射到指定的分区上。这样可以保证包含同一个键的消息会被写到同一个分区上
自定义分区器,根据不同的业务规则将消息映射到分区。
Topic分区如何负载均衡
Producer
消费者订阅一个或多个主题,并按照消息生成的顺序读取它们。 消费者通过检查消息的偏移量来区分已经读取过的消息。偏移量是另一种元数据,它是一个不断递增的整数值,在创建消息时,Kafka 会把它添加到消息里。在给定的分区里,每个消息的偏移量都是唯一的。消费者把每个分区最后读取的消息偏移量保存在Zookeeper(低版本) 或Kafka(高版本) 上,如果消费者关闭或重启,它的读取状态不会丢失。消费者是消费组的一部分。群组保证每个分区只能被一个消费者使用。 如果一个消费者失效,消费组里的其他消费者可以接管失效消费者的工作,再平衡,分区重新分配。
Consumer
broker 是集群的组成部分。每个集群都有一个broker 同时充当了集群控制器的角色(自动从集群 的活跃成员中选举出来)。 控制器负责管理工作,包括将分区分配给broker 和监控broker。 在集群中,一个分区从属于一个broker,该broker 被称为分区的首领
Broker
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,物理上不同Topic的消息分开存储
主题就好比数据库的表,尤其是分库分表之后的逻辑表
Topic
主题可以被分为若干个分区,一个分区就是一个提交日志
消息以追加的方式写入分区,然后以先入先出的顺序读取
无法在整个主题范围内保证消息的顺序,但可以保证消息在单个分区内的顺序
Kafka 通过分区来实现数据冗余和伸缩性。
需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。
Partition
每个分区都有一个首领副本。为了保证一致性,所有生产者请求和消费者请求都会经过这个副本
Master(首领副本)
首领以外的副本都是跟随者副本。跟随者副本不处理来自客户端的请求,它们唯一的任务就是从首领那里复制消息,保持与首领一致的状态。如果首领发生崩溃,其中的一个跟随者会被提升为新首领
Follower(跟随者副本)
Replica
生产者Offset
消费者Offset
Offset
分区中的所有副本统称为AR(Assigned Repllicas),AR = ISR + OSR
AR(Assigned Replicas)
与leader副本保持一定程度同步的副本(包括Leader)
ISR集合 是AR集合中的一个子集。消息会先发送到leader副本,然后follower副本才能从leader副本中拉取消息 进行同步,同步期间内follower副本相对于leader副本而言会有一定程度的滞后。前面所说的“一定程度” 是指可以忍受的滞后范围,这个范围可以通过参数进行配置。
ISR(In-Sync Replicas)
与leader副本同步滞后过多的副本(不包括leader)副本
OSR(Out-Sync Replicas)
HW是High Watermak的缩写, 俗称高水位,它表示了一个特定消息的偏移量(offset),消费时只能拉取到这个offset之前的消息
HW(消费者读指针limit)
Log End Offset,它表示了当前日志文件中下一条待写入消息的offset
LEO(生产者写指针)
副本
核心概念
概念与基础架构
生产者主要的对象有: `KafkaProducer` , `ProducerRecord` 。 其中 KafkaProducer 是用于发送消息的类, ProducerRecord 类用于封装Kafka的消息
其他参数可以在 org.apache.kafka.clients.producer.ProducerConfig
消费者生产消息后,需要broker端的确认,可以同步确认,也可以异步确认
消息发送配置
生产者:消息的发送
其他参数可以从 org.apache.kafka.clients.producer.ProducerConfig 中找到
消息消费配置
消费者:消息的消费
依赖
application.yml 配置
配置类
生产者
消费者
SpringBoot Kafka
Kafka开发使用
数据生产流程
必要参数配置
对于Producer而言,Interceptor使得用户在消息发送前以及Producer回调逻辑前有机会对消息做 一些定制化需求,比如修改消息等。同时,Producer允许用户指定多个Interceptor按序作用于同一条消 息从而形成拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
1. 实现ProducerInterceptor接口
2. 在KafkaProducer的设置中设置自定义的拦截器
自定义拦截器
拦截器
Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要先将数据序列化为字节数组
序列化器
分区计算
1. 首先开发Partitioner接口的实现类
2. 在KafkaProducer中进行设置:configs.put(\"partitioner.class\
自定义分区器
分区器
消息发送
原理剖析
参数配置补充
消费组的主要作用就是保证消费不会被重复消费(一个分区只能分配给消费者组中的一个消费者)
消费者从订阅的主题消费消息,消费消息的偏移量保存在Kafka的名字是 __consumer_offsets 的 主题中。消费者还可以将自己的偏移量存储到Zookeeper,需要设置offset.storage=zookeeper。 推荐使用Kafka存储消费者的偏移量。因为Zookeeper不适合高并发
必要时,需要为主题创建大量分区,在负载增长时可以加入更多的消费者。但是不要让消费者的数量超过主题分区的数量
除了通过增加消费者来横向扩展单个应用的消费能力之外,经常出现多个应用程序从同一个主题消费的情况。 此时,每个应用都可以获取到所有的消息。只要保证每个应用都有自己的消费组,就可以让它们获取到主题所有的消息。横向扩展消费者和消费组不会对性能造成负面影响
向消费组添加消费者是横向扩展消费能力的主要方式
消费者组、消费者
目的是为了检测消费者或者Broker是否宕机,从而触发再平衡
参数
心跳机制
概念
consumer 采用 pull 模式从 broker 中读取数据,采用 pull 模式,consumer 可自主控制消费消息的速率, 可以自己控制消费方式(批量消费/逐条 消费),还可以选择不同的提交方式从而实现不同的传输语义
订阅
Consumer需要向Kafka记录自己的位移数据,这个汇报过程称为 提交位移(Committing Offsets)Consumer 需要为分配给它的每个分区提交各自的位移数据位移提交的由Consumer端负责的,Kafka只负责保管。__consumer_offsets位移提交分为自动提交和手动提交手动位移提交分为同步提交和异步提交
enable.auto.commit=true
开启自动提交
auto.commit.interval.ms ,默认 5s
配置自动提交间隔:Consumer端
自动提交
会提交 KafkaConsumer#poll() 返回的最新 offset(等待直到 offset 被成功提交才返回)
KafkaConsumer#commitSync()
同步提交
KafkaConsumer#commitAsync()
异步提交出现问题不会重试
处理方式
异步提交
手动提交
位移提交
Kafka提供了消费者API,让消费者可以管理自己的位移
消费者位移管理
消费者组内成员发生变更:这个变更包括了增加和减少消费者,比如消费者宕机退出消费组
主题的分区数发生变更: kafka目前只支持增加分区,当增加的时候就会触发重平衡
订阅的主题发生变化:当消费者组使用正则表达式订阅主题,而恰好又新建了对应的主题,就会触发重平衡
触发的条件
因为重平衡过程中,消费者无法从kafka消费消息,这对kafka的TPS 影响极大,而如果kafka集内节点较多,比如数百个,那重平衡可能会耗时极多。数分钟到数小时都有 可能,而这段时间kafka基本处于不可用状态。所以在实际环境中,应该尽量避免重平衡发生。
为什么说重平衡为人诟病呢?
在分布式系统中,通常是通过心跳来维持分布式系统的,kafka也不例外。在分布式系统中,由于网络问题你不清楚没接收到心跳,是因为对方真正挂了还是只是因为负载过重没来得及发生心跳或是网络堵塞。所以一般会约定一个时间,超时即判定对方挂了。而在kafka消费 者场景中,`session.timout.ms`参数就是规定这个超时时间是多少。`heartbeat.interval.ms`,这个参数控制发送心跳的频率,频率越高越不容易被误判,但也会消耗更多资源。max.poll.interval.ms`,消费者poll数据后,需要一些处理,再进行拉取。如果两次拉取时间间隔超过这个参数设置的值,那么消费者就会被踢出消费者组。也就是说,拉取,然后处理,这个处理的时间不能超过 `max.poll.interval.ms` 这个参数的值。这个参数的默认值是5分钟,而如果消费者接收到数据后会执行耗时的操作,则应该将其设置得大一些
session.timout.ms
控制心跳的超时时间
heartbeat.interval.ms
控制心跳的发送频率
max.poll.interval.ms
控制poll的间隔
session.timout.ms:设置为6s heartbeat.interval.ms:设置2s max.poll.interval.ms:推荐为消费者处理消息最长耗时再加1分钟
合理的配置
尽可能的避免Rebalance
Rebalance
反序列化
消费者在拉取了分区消息之后,要首先经过反序列化器对key和value进行反序列化处理。 处理完之后,如果消费端设置了拦截器,则需要经过拦截器的处理之后,才能返回给消费者应用程序进行处理
消费端定义消息拦截器,需要实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口
一个可插拔接口,允许拦截甚至更改消费者接收到的消息。首要的用例在于将第三方组件引入 消费者应用程序,用于定制的监控、日志处理等该接口的实现类通过configre方法获取消费者配置的属性,如果消费者配置中没有指定 clientID,还可以获取KafkaConsumer生成的clientId。获取的这个配置是跟其他拦截器共享的,需要保证不会在各个拦截器之间产生冲突。ConsumerInterceptor方法抛出的异常会被捕获、记录,但是不会向下传播。如果用户配置了错误的key或value类型参数,消费者不会抛出异常,而仅仅是记录下来。ConsumerInterceptor回调发生在 `org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)`方法同一个线程
消费者配置:
消费者参数补充
消息接收
1. 消费组有一个或多个消费者,消费者可以是一个进程,也可以是一个线程2. group.id是一个字符串,唯一标识一个消费组3. 消费组订阅的主题每个分区只能分配给消费组一个消费者。
consumer group是kafka提供的可扩展且具有容错性的消费者机制
消费者在消费的过程中记录已消费的数据,即消费位移(offset)信息(消费者组)。每个消费组保存自己的位移信息,那么只需要简单的一个整数表示位置就够了;同时可以引入 checkpoint机制定期持久化。
消费者位移
Kafka默认定期自动提交位移( enable.auto.commit = true ),也手动提交位移。另外kafka会定期把group消费情况保存起来,做成一个 offset map
自动 VS 手动
kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server node1:9092 --formatter \"kafka.coordinator.group.GroupMetadataManager\\$OffsetsMessageFormatter\" -- consumer.config /opt/kafka_2.12-1.0.2/config/consumer.properties --frombeginning | head
上图标出来的,表示消费组为 test-consumer-group ,消费的主题为 __consumer_offsets , 消费的分区是4,偏移量为5。__consumers_offsets 主题配置了compact策略,使得它总是能够保存最新的位移信息,既控制了该topic总体的日志容量,也能实现保存最新offset的目的
位移是提交到Kafka中的 __consumer_offsets 主题。 __consumer_offsets中的消息保存了每个消费组某一时刻提交的offset信息
位移管理(offset management)
再均衡(Rebalance)本质上是一种协议,规定了一个消费组中所有消费者如何达成一致来分配订阅主题的每个分区
RangeAssignor
RoundRobinAssignor
StickyAssignor
组内分配分区策略
Group Coordinator(消费组协调器)
谁来执行再均衡和消费组管理 ?
groupMetadataTopicPartitionCount 由 offsets.topic.num.partitions 指定,默认是50个分区
1、确定消费组位移信息写入 `__consumers_offsets` 的哪个分区。具体计算公式 : __consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)
2、该分区leader所在的broker就是组协调器
如何确定coordinator ?
它表示Rebalance之后主题分区到消费组中消费者映射关系的一个版本,主要是用于保护消费组, 隔离无效偏移量提交的 ,每次 Rebalance后,generation号会+1
Rebalance Generation
consumer需要定期给组协调器发送心跳来表明自己还活着
Heartbeat请求
主动告诉组协调器我要离开消费组
LeaveGroup请求
消费组Leader把分配方案告诉组内所有成员
SyncGroup请求
成员请求加入组
joinGroup请求
显示组的所有信息,包括成员信息,协议名称,分配方案,订阅信息等。通常该请求是给管理员使用
DescribeGroup请求
协议(protocol)
通过定时向消费组协调器发送Heartbeat请求。如果超过了设定的超时时间,那么协调器认为该消费者已经挂了。一旦协调器认为某个消费者挂了,那么它就会开启新一轮再均衡,并且在当前其他消费者的心跳响应中添加“REBALANCE_IN_PROGRESS”,告诉 其他消费者:重新分配分区
消费者如何向消费组协调器证明自己还活着?(Liveness)
再均衡分为2步:Join和Sync
① join, 加入组。所有成员都向消费组协调器发送JoinGroup请求,请求加入消费组。一旦所有成员都发送了JoinGroup请求,协调器从中选择一个消费者担任Leader的角色,并把组成员信息以及订阅信息发给Leader
② Sync,Leader开始分配消费方案,即哪个消费者负责消费哪些主题的哪些分区。一旦完成分配,Leader会将这个方案封装进SyncGroup请求中发给消费组协调器,非Leader也会发 SyncGroup请求,只是内容为空。消费组协调器接收到分配方案之后会把方案塞进SyncGroup 的response中发给各个消费者
在协调器收集到所有成员请求前,它会把已收到请求放入一个叫purgatory(炼狱)的地方。然后是分发分配方案的过程,即SyncGroup请求
再均衡过程
Dead:组内已经没有任何成员的最终状态,组的元数据也已经被组协调器移除了。这种状态响应各种请求都是一个response: UNKNOWN_MEMBER_IDEmpty:组内无成员,但是位移信息还没有过期。这种状态只能响应JoinGroup请求PreparingRebalance:组准备开启新的rebalance,等待成员加入AwaitingSync:正在等待leader consumer将分配方案传给各个成员 Stable:再均衡完成,可以开始消费
消费组状态机
再均衡 Rebalanace
消费者组
kafka-topics.sh脚本
创建主题
查看主图
修改主题
删除是给主题加上标记,过一段时间后在删除
删除主题
主题的管理
通过命令行工具操作,主题的分区只能增加,不能减少
增加分区
副本因子 = (主分区 + 副本分区)
均衡地将副本分散于各个broker上对于某个broker上分配的分区,它的其他副本在其他broker上如果所有的broker都有机架(机房)信息,尽量将分区的各个副本分配到不同机架(机房)上的broker
副本分配的三个目标
1. 第一个副本分区通过轮询的方式挑选一个broker,进行分配。该轮询从broker列表的随机位置进行轮询。2. 其余副本通过增加偏移进行分配。
实际上也不全是按照上面的规则,如下图的p5分区,偏移一个broker位在继续进行增量分配,这样做的目的是尽可能将分区分配的位置打散。随着分区数量的增加偏移的位置会越来越多。如果15个分区会偏移2个broker位置。这是个基于 broker数量的基础上。
在不考虑机架信息的情况下
首先为每个机架创建一个broker列表
如: 三个机架(rack1,rack2,rack3),六个broker(0,1,2,3,4,5)
通过简单的轮询将分区分配给不同机架上的broker
考虑到机架信息
分区副本的分配
除了使用Kafka的bin目录下的脚本工具来管理Kafka,还可以使用管理Kafka的API将某些管理查看 的功能集成到系统中
KafkaAdminClient应用
早期由zookeeper管理消费组的偏移量
__consumer_offsets主题中保存各个消费组的偏移量
通过原生 kafka 提供的工具脚本进行查询(bin/kafka-consumer-groups.sh)
偏移量管理
主题
如果想扩展Topic的读写能力,就得多创建几个分区
font color=\"#e74f4c\
--replication-factor 3 1leader+2follower
节点必须能够维持与ZooKeeper的会话(通过ZooKeeper的心跳机制) 对于Follower副本分区,它复制在Leader分区上的写入,并且不要延迟太多
同步节点定义
Kafka提供的保证是,只要有至少一个同步副本处于活动状态,提交的消息就不会丢失
当leader宕机了,会从follower选择一个作为leader。当宕机的重新恢复时,会把之前commit的数据清空,重新从leader里pull数据
少部分副本宕机
当全部副本宕机了有两种恢复方式 1、等待ISR中的一个恢复后,并选它作为leader。(等待时间较长,降低可用性) 2、选择第一个恢复的副本作为新的leader,无论是否在ISR中。(并未包含之前leader commit的 数据,因此造成数据丢失)
全部副本宕机
宕机如何恢复 ?
副本机制
只有跟Leader保持同步的Follower才应该被选作新的Leader。 Kafka会在Zookeeper上针对每个Topic维护一个称为ISR(in-sync replica,已同步的副本)的集合,该集合中是一些分区的副本。只有当这些副本都跟Leader中的副本同步了之后,kafka才会认为消息已提交,并反馈给消息的生产者
如何选举?
少数服从多数是一种比较常见的一致性算发和Leader选举法。 它的含义是只有超过半数的副本同步了,系统才会认为数据已同步; 选择Leader时也是从超过半数的同步的副本中选择。 这种算法需要较高的冗余度,跟Kafka比起来,浪费资源。 譬如只允许一台机器失败,需要有三个副本;而如果只容忍两台机器失败,则需要五个副本。 而kafka的ISR集合方法,分别只需要两个和三个副本
为什么不用少数服从多数的方法(过半机制)?
如果所有的ISR副本都失败了怎么办?
Leader选举
分区重新分配
自动再均衡
分区
在分区日志文件中,你会发现很多类型的文件,比 如: .index、.timestamp、.log、.snapshot等。其中,文件名一致的文件集合就称为 LogSement。
Kafka 消息是以主题为单位进行归类,各个主题之间是彼此独立的,互不影响。 每个主题又可以分为一个或多个分区。每个分区各自存在一个记录消息数据的日志文件
分区日志文件中包含很多的 LogSegment Kafka 日志追加是顺序写入的LogSegment 可以减小日志文件的大小进行日志删除的时候和数据查找的时候可以快速定位。ActiveLogSegment 是活跃的日志分段,拥有文件拥有写入权限,其余的 LogSegment 只有只读的权限
日志文件存在多种后缀文件,重点需要关注 .index、.timestamp、.log三种类型
类别
偏移量是一个 64 位的长整形数,固定是20位数字,长度未达到,用 0 进行填补,索引文件和日志 文件都由该作为文件名命名规(00000000000000000000.index、 00000000000000000000.timestamp、00000000000000000000.log
如果日志文件名为 00000000000000000121.log ,则当前日志文件的一条数据偏移量就是 121(偏移量从 0 开始)
每个 LogSegment 都有一个基准偏移量,表示当前 LogSegment 中第一条消息的 offset
日志与索引文件
配置项默认值说明
大小
时间
1024 * 1024 * 1024=1073741824 在偏移量索引文件中,每个索引项共占用 8 个字节,并分为两部分。 相对偏移量和物理地址
相对偏移量:表示消息相对与基准偏移量的偏移量,占 4 个字节物理地址:消息在日志分段文件中对应的物理位置,也占 4 个字节
为什么是 Integer.MAX_VALUE ?
切分文件
索引文件会根据 log.index.size.max.bytes 值进行预先分配空间,即文件创建的时候就是最大值。当真正的进行索引文件切分的时候,才会将其裁剪到实际数据大小的文件。 这一点是跟日志文件有所区别的地方。其意义降低了代码逻辑的复杂性
索引文件切分过程
LogSegment
日志存储概述
偏移量索引文件用于记录消息偏移量与物理地址之间的映射关系。时间戳索引文件则根据时间戳查找对应的偏移量。
log文件名是以文件中第一条message的offset来命名的,实际offset长度是64位,但是这里只使用了20位,应付生产是足够的。一组index+log+timeindex文件的名字是一样的,并且log文件默认写满1G后,会进行log rolling形成一个新的组合来记录消息,这个是通过broker端 `log.segment.bytes =1073741824`指定的。index和timeindex在刚使用时会分配10M的大小,当进行 log rolling 后,它会修剪为实际的大小。
如果想查看这些文件,可以使用kafka提供的shell来完成,几个关键信息如下
(1)offset是逐渐增加的整数,每个offset对应一个消息的偏移量。 (2)position:消息批字节数,用于计算物理地址。 (3)CreateTime:时间戳。 (4)magic:2代表这个消息类型是V2,如果是0则代表是V0类型,1代表V1类型。 (5)compresscodec:None说明没有指定压缩类型,kafka目前提供了4种可选择,0-None、1- GZIP、2-snappy、3-lz4。 (6)crc:对所有字段进行校验后的crc值。
查看存储文件
文件
offset 与 position 没有直接关系,因为会删除数据和清理日志
Kafka 中存在一个 ConcurrentSkipListMap 来保存在每个日志分段,通过跳跃表方式,定位到在 00000000000000000000.index ,通过二分法在偏移量索引文件中找到不大于 23 的最大索引项,即offset 20 那栏,然后从日志分段文件中的物理位置为320 开始顺序查找偏移量为 23 的消息。
如何查看偏移量为23的消息 ?
偏移量
通过时间戳方式进行查找消息,需要通过查找时间戳索引和偏移量索引两个文件。
时间戳索引索引格式:前八个字节表示时间戳,后四个字节表示偏移量
timestamp文件中的 offset 与 index 文件中的 relativeOffset 不是一一对应的,因为数据的 写入是各自追加。
1. 查找该时间戳应该在哪个日志分段中。将1557554753430和每个日志分段中最大时间戳largestTimeStamp逐一对比,直到找到不小于1557554753430所对应的日志分段。日志分段中的largestTimeStamp的计算是:先查询该日志分段所对应时间戳索引文件,找到最后一条索引项,若最后一条索引项的时间戳字段值大于0,则取该值,否则取该日志分段的最近修改时间。2. 查找该日志分段的偏移量索引文件,查找该偏移量对应的物理地址。3. 日志文件中从 320 的物理位置开始查找不小于 1557554753430 数据。
查找时间戳为 1557554753430 开始的消息?
时间戳
索引文件
Kafka 提供两种日志清理策略: 日志删除(delete):按照一定的删除策略,将不满足条件的数据进行数据删除 日志压缩(compact):针对每个消息的 Key 进行整合,对于有相同 Key 的不同 Value 值,只保留最后一个版本。
Kafka 提供 log.cleanup.policy 参数进行相应配置,默认值: delete ,还可以选择 compact主题级别的配置项是 cleanup.policy
日志删除任务会根据 log.retention.hours/log.retention.minutes/log.retention.ms 设定日志保留的时间节点。如果超过该设定值,就需要进行删除。默认是 7 天, log.retention.ms 优先级最高
1、从日志对象中所维护日志分段的跳跃表中移除待删除的日志分段,保证没有线程对这些日志分段进行读取操作。2、这些日志分段所有文件添加 上 .delete 后缀。3、交由一个以 \"delete-file\" 命名的延迟任务来删除这些 .delete 为后缀的文件。延迟执行时间可以通过 file.delete.delay.ms 进行设置
删除过程
基于时间
日志删除任务会检查当前日志的大小是否超过设定值。设定项为 `log.retention.bytes` ,单个日志分段的大小由 `log.segment.bytes` 进行设定
1、计算需要被删除的日志总大小 (当前日志文件大小(所有分段)减去retention值)。2、从日志文件第一个 LogSegment 开始查找可删除的日志分段的文件集合。3、执行删除。
基于日志大小
根据日志分段的下一个日志分段的起始偏移量是否大于等于日志文件的起始偏移量,若是,则可以删除此日志分段
基于偏移量
日志删除
日志压缩是Kafka的一种机制,可以提供较为细粒度的记录保留,而不是基于粗粒度的基于时间的保留对于具有相同的Key,而数据不同,只保留最后一条数据,前面的数据在合适的情况下删除
主题的 cleanup.policy = compact。
1、记录每个key的hash值最后一次出现的偏移量2、第二次检查每个offset对应的Key是否在后面的日志中出现过,如果出现了就删除对应的日志(保留最新的数据)。
Kafka的后台线程会定时将Topic遍历两次
压缩是在Kafka后台通过定时重新打开Segment来完成的
任何保持在日志头部以内的使用者都将看到所写的每条消息,这些消息将具有顺序偏移量。可以使用Topic的`min.compaction.lag.ms`属性来保证消息在被压缩之前必须经过的最短时间。 也就是说,它为每个消息在(未压缩)头部停留的时间提供了一个下限。可以使用Topic的 `max.compaction.lag.ms`属性来保证从收到消息到消息符合压缩条件之间的最大延时。
消息始终保持顺序,压缩永远不会重新排序消息,只是删除一些而已
消息的偏移量永远不会改变,它是日志中位置的永久标识符
从日志开始的任何使用者将至少看到所有记录的最终状态,按记录的顺序写入。另外,如果使用者在比Topic的 log.cleaner.delete.retention.ms 短的时间内到达日志的头部,则会看到已删除记录的所有delete标记。保留时间默认是24小时。
日志压缩可以确保
`log.cleanup.policy` 设置为 compact ,Broker的配置,影响集群中所有的Topic。
`log.cleaner.min.compaction.lag.ms` ,用于防止对更新超过最小消息进行压缩,如果没有设置,除最后一个Segment之外,所有Segment都有资格进行压缩
`log.cleaner.max.compaction.lag.ms` ,用于防止低生产速率的日志在无限制的时间内不压缩。
默认情况下,启动日志清理器,若需要启动特定Topic的日志清理,请添加特定的属性。配置日志清理器
Kafka的日志压缩原理并不复杂,就是定时把所有的日志读取两遍,写一遍,而CPU的速度超过磁盘完全不是问题,只要日志的量对应的读取两遍和写入一遍的时间在可接受的范围内,那么它的性能就是可以接受的
日志压缩策略
清理
日志存储
传统IO
DirectByteBuf 直接内存
进一步优化(底层采用了 linux 2.1 后提供的 `sendFile` 方法),java 中对应着两个 channel 调用 transferTo/transferFrom 方法拷贝数据
进一步优化(linux 2.4)
NIO优化
更少的用户态与内核态的切换不利用 cpu 计算,减少 cpu 缓存伪共享零拷贝适合小文件传输
零拷贝优点
磁盘数据通过DMA(Direct Memory Access,直接存储器访问) 拷贝到内核态 Buffer,直接通过 DMA 拷贝到 NIC Buffer(socket buffer),无需 CPU 拷贝。除了减少数据拷贝外,整个读文件 ==> 网络发送由一个 `sendfile` 调用完成,整个过程只有两次上下 文切换,因此大大提高了性能
Java NIO对sendfile的支持就是 FileChannel.transferTo()/transferFrom()
网络数据持久化到磁盘 (Producer 到 Broker)
把磁盘文件读取OS内核缓冲区后的fileChannel,直接转给socketChannel发送;底层就是 `sendfile`。消费者从broker读取数据,就是由此实现
具体来看,Kafka 的数据传输通过 TransportLayer 来完成,其子类 PlaintextTransportLayer 通过 Java NIO 的 FileChannel 的 transferTo 和 transferFrom 方法实现零拷贝。
磁盘文件通过网络发送(Broker 到 Consumer)
kafka的两个过程
页缓存是操作系统实现的一种主要的磁盘缓存,以此用来减少对磁盘 I/O 的操作。 具体来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。Kafka接收来自socket buffer的网络数据,应用进程不需要中间处理、直接进行持久化时。可以使用mmap内存文件映射
消息先被写入页缓存,由操作系统负责刷盘任务。
Kafka提供了一个参数 `producer.type` 来控制是不是主动flush; 如果Kafka写入到mmap之后就立即`flush`然后再返回Producer叫同步(`sync`); 写入mmap之后立即返回Producer不调用flush叫异步(`async`)。
通过mmap,进程像读写硬盘一样读写内存(当然是虚拟机内存)。使用这种方式可以获取很大的 I/O提升,省去了用户空间到内核空间复制的开销。mmap也有一个很明显的缺陷:不可靠,写到mmap中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用flush的时候才把数据真正的写到硬盘
Java NIO,提供了一个MappedByteBuffer 类可以用来实现内存映射。 MappedByteBuffer只能通过调用FileChannel.map()取得,再没有其他方式。`FileChannel.map()`是抽象方法,具体实现是在 `FileChannelImpl.map()`可自行查看JDK源码,其 map0()方法就是调用了Linux内核的mmap的API。
使用 MappedByteBuffer类要注意的是 mmap的文件映射,在Full gc时才会进行释放。当close时,需要手动清除内存映射文件,可以反射调用sun.misc.Cleaner方法
Java NIO对文件映射的支持
1、 操作系统会先查看待读取的数据所在的页 (page)是否在页缓存(pagecache)中,如果存在(命中) 则直接返回数据,从而避免了对物理磁盘的 I/O 操作; 2、如果没有命中,则操作系统会向磁盘发起读取请求并将读取的数据页存入页缓存,之后再将数据返回给进程。
一个进程读取磁盘上的文件
1、操作系统也会检测数据对应的页是否在页缓存中,如果不存在,则会先在页缓存中添加相应的页,最后将数据写入对应的页。 2、被修改过后的页也就变成了脏页,操作系统会在合适的时间把脏页中的数据写入磁盘,以保持数据的一致性
一个进程将数据写入磁盘
对一个进程而言,它会在进程内部缓存处理所需的数据,然而这些数据有可能还缓存在操作系统的页缓存中,因此同一份数据有可能被缓存了两次。并且,除非使用Direct I/O的方式, 否则页缓存很难被禁止。 当使用页缓存的时候,即使Kafka服务重启, 页缓存还是会保持有效,然而`进程内的缓存却需要重建`。这样也极大地简化了代码逻辑,因为维护页缓存和文件之间的一致性交由操作系统来负责,这样会比进程内维护更加安全有效。
Kafka中大量使用了页缓存,这是 Kafka 实现高吞吐的重要因素之一
页缓存
Kafka 在设计时采用了文件追加的方式来写入消息,即只能在日志文件的尾部追加新的消息,并且也不允许修改已写入的消息,这种方式属于典型的顺序写盘的操作,所以就算 Kafka 使用磁盘作为存储介质,也能承载非常大的吞吐量
操作系统可以针对线性读写做深层次的优化,比如预读(read-ahead,提前将一个比较大的磁盘块读入内存) 和后写(write-behind,将很多小的逻辑写操作合并起来组成一个大的物理写操作)技术。
顺序写入
磁盘存储
Kafka速度快的原因
Linux内核提供、实现零拷贝的API;
sendfile 是将读到内核空间的数据,转到socket buffer,进行网络发送;
mmap将磁盘文件映射到内存,支持读和写,对内存的操作会反映在磁盘文件上。
RocketMQ 在消费消息时,使用了mmap。kafka 使用了 sendFile。
mmap和sendfile
partition`顺序读写`,充分利用磁盘特性,这是基础;
Producer生产的数据持久化到broker,采用`mmap文件映射`,实现顺序的快速写入;
Customer从broker读取数据,采用`sendfile`,将磁盘文件读到OS内核缓冲区后,直接转到 socket buffer进行网络发送。
读写数据的批量batch处理以及压缩传输
Kafka速度快的原因 ?
物理存储
事务
控制器就是一个broker。控制器除了一般broker的功能,还负责Leader分区的选举。
集群里第一个启动的broker在Zookeeper中创建临时节点 /controller。 其他broker在该控制器节点创建Zookeeper watch对象,使用Zookeeper的监听机制接收该节点的变更。
每个新选出的控制器通过 Zookeeper 的条件递增操作获得一个全新的、数值更大的 controller epoch。其他 broker 在知道当前 controller epoch 后,如果收到由控制器发出的包含较旧epoch 的消息,就会忽略它们,以防止“脑裂”
借助zk Watch机制(类似于分布式锁)
当控制器发现一个 broker 已经离开集群,那些失去Leader副本分区的Follower分区需要一个新 Leader(这些分区的首领刚好是在这个 broker 上)控制器需要知道哪个broker宕机了?控制器需要知道宕机的broker上负责的时候哪些分区的Leader副本分区?
Leader分区的选举也是基于 zk Watch机制(监听leader分区节点)
实现方式
1、Kafka使用 Zookeeper 的分布式锁选举控制器,并在节点加入集群或退出集群时通知控制器 2、控制器负责在节点加入或离开集群时进行分区Leader选举3、控制器使用epoch来避免“脑裂”。“脑裂”是指两个节点同时认为自己是当前的控制器
结论
控制器
当某个topic的 --replication-factor 为N(N>1)时,每个Partition都有N个副本,称作replica。原则上是将replica均匀的分配到整个集群上。不仅如此,partition的分配也同样需要均匀分配,为了更好的负载均衡
均衡地将副本分散于各个broker上 对于某个broker上分配的分区,它的其他副本在其他broker上 如果所有的broker都有机架信息,尽量将分区的各个副本分配到不同机架上的broker
副本的分配
当ISR中的一个Follower副本滞后Leader副本的时间超过参数 `replica.lag.time.max.ms【默认 10000】` 指定的值 时即判定为副本失效,需要将此Follower副本剔出除ISR。
失效副本
日志复制算法(log replication algorithm)必须提供的基本保证是,如果它告诉客户端消息已被提交,而当前Leader出现故障,新选出的Leader也必须具有该消息。在出现故障时,Kafka会从挂掉 Leader的ISR里面选择一个Follower作为这个分区新的Leader
ACKS=ALL,只有将消息成功复制到所有同步副本(ISR)后,这条消息才算被提交
慢副本(Slow replica):follower replica 在一段时间内一直无法赶上 leader 的写进度。造成这种情况的最常见原因之一是 follower replica 上的 I/O瓶颈,导致它持久化日志的时间比它从 leader 消费消息的时间要长;
卡住副本(Stuck replica):follower replica 在很长一段时间内停止从 leader 获取消息。 这可能是以为 GC 停顿,或者副本出现故障;
刚启动副本(Bootstrapping replica):当用户给某个主题增加副本因子时,新的 follower replicas 是不同步的,直到它跟上 leader 的日志。
什么情况下会导致一个副本与 leader 失去同步
副本复制
可靠性保证(副本机制)
水位或水印(watermark)一词,表示位置信息,即位移(offset)。Kafka源码中使用的名字是高水位,HW(high watermark)
水位标记 (HW)
Kafka分区使用多个副本(replica)提供高可用
副本角色
每个分区副本对象都有两个重要的属性:LEO和HW- LEO:即日志末端位移(log end offset),记录了font color=\"#e74f4c\
LEO & HW
Follower副本不停地向Leader副本所在的broker发送FETCH请求,一旦获取消息后写入自己的日志中进行备份。那么Follower副本的LEO是何时更新的呢?首先我必须言明,Kafka有两套Follower副本 LEO:Follower副本所在Broker的副本管理机中Leader副本所在Broker的副本管理机中。Leader副本机器上保存了所有的 follower副本的LEO
Follower副本的LEO值就是日志的LEO值,每当新写入一条消息,LEO值就会被更新。当 Follower发送FETCH请求后,Leader将数据返回给Follower,此时Follower开始Log写数据, 从而自动更新LEO值。
Follower副本的本地LEO更新
Leader端的Follower的LEO更新发生在Leader在处理Follower FETCH请求时。一旦Leader接收到Follower发送的FETCH请求,它先从Log中读取相应的数据,给Follower返回数据前,先更新Follower的LEO
Leader端Follower的LEO更新
Kafka使用前者帮助Follower副本更新其HW值;利用后者帮助Leader副本更新其HW
Follower副本何时更新LEO ?
Follower更新HW发生在其更新LEO之后,一旦Follower向Log写完数据,尝试更新自己的HW值。 比较当前LEO值与FETCH响应中Leader的HW值,取两者的小者作为新的HW值(Follower HW值不会大于Leader的HW值)
Follower副本何时更新HW ?
和Follower更新LEO相同,Leader写Log时自动更新自己的LEO值
Leader副本何时更新LEO ?
Leader的HW值就是分区HW值,直接影响分区数据对消费者的可见性
Follower副本成为Leader副本时:Kafka会尝试去更新分区HW。
Broker崩溃导致副本被踢出ISR时:检查下分区HW值是否需要更新是有必要的。
生产者向Leader副本写消息时:因为写入消息会更新Leader的LEO,有必要检查HW值是否需 要更新
Leader处理Follower FETCH请求时:首先从Log读取数据,之后尝试更新分区HW值
当Kafka broker都正常工作时,分区HW值的更新时机有两个: Leader处理PRODUCE请求时Leader处理FETCH请求时。
结论:
Leader会尝试去更新分区HW的四种情况
Leader broker上保存了一套Follower副本的LEO以及自己的LEO。 当尝试确定分区HW时,它会选出所有满足条件的副本,比较它们的LEO(包括Leader的LEO),并选择最小的LEO值作为HW值
1、处于ISR中 2、副本LEO落后于Leader LEO的时长不大于 replica.lag.time.max.ms 参数值(默认是10s)【使得OSR可以晋升到ISR】
如果Kafka只判断第一个条件的话,确定分区HW值时就不会考虑这些未在ISR中的副本,但这些副本已经具备了“立刻进入ISR”的资格,因此就可能出现分区HW值越过ISR中副本LEO的情况——不允许。 因为分区HW定义就是ISR中所有副本LEO的最小值
需要满足的条件
Leader如何更新自己的HW值?
Leader副本何时更新HW值 ?
HW和LEO正常更新案例
Kafka使用HW值来决定副本备份的进度,而HW值的更新通常需要额外一轮FETCH RPC才能完成。 但这种设计是有问题的,可能引起的问题包括:备份数据丢失备份数据不一致
数据丢失
Leader和Follower数据离散
HW和LEO异常案例
Kafka解决方案:造成上述两个问题的根本原因在于:HW值被用于衡量副本备份的成功与否。 在出现失败重启时作为日志截断的依据
HW值的更新是异步延迟的,特别是需要额外的FETCH请求处理流程才能更新,故这中间发生的 任何崩溃都可能导致HW值的过期。Kafka从0.11引入了 leader epoch 来取代HW值。Leader端使用内存保存Leader的epoch信息
1、 Leader broker中会保存这样的一个缓存,并定期地写入到一个 checkpoint 文件中。2、当Leader写Log时它会尝试更新整个缓存:如果这个Leader首次写消息,则会在缓存中增加一 个条目;否则就不做更新。 3、 每次副本变为Leader时会查询这部分缓存,获取出对应Leader版本的位移,则不会发生数据不一致和丢失的情况
规避数据丢失
规避数据不一致
Leader Epoch使用
一致性保证
生产者发送的消息没有收到正确的broke响应,导致生产者重试。 生产者发出一条消息,broke落盘以后因为网络等种种原因发送端得到一个发送失败的响应或者网络中断,然后生产者收到一个可恢复的Exception重试消息导致消息重复
new KafkaProducer()后创建一个后台线程KafkaThread扫描RecordAccumulator中是否有消息;调用KafkaProducer.send()发送消息,实际上只是把消息保存到RecordAccumulator中;后台线程KafkaThread扫描到RecordAccumulator中有消息后,将消息发送到kafka集群;如果发送成功,那么返回成功;如果发送失败,那么判断是否允许重试。如果不允许重试,那么返回失败的结果;如果允许重试,把消息再保存到RecordAccumulator中,等待后台线程KafkaThread扫描再次发送;
重试过程
异常是`RetriableException`类型或者`TransactionManager`允许重试
可恢复异常说明
生产者消息重复场景
要启动kafka的幂等性,设置: `enable.idempotence=true` ,以及 `ack=all` 以及 `retries > 1` 。
启动kafka的幂等性
可能会丢消息,适用于吞吐量指标重要性高于数据丢失,例如:日志收集
ack=0,不重试
重复解决方案
生产者阶段
消息丢失场景
解决
生产者-Broker阶段
数据消费完没有及时提交offset到broker
重复场景
取消自动提交 :每次消费完或者程序退出时手动提交。这可能也没法保证一条重复。做幂等: 一般是让下游做幂等或者尽量每消费一条消息都记录offset,对于少数严格的场景可能需要把offset 或唯一ID(例如订单ID)和下游状态更新放在同一个数据库里面做事务来保证精确的一次更新或者在下 游数据表里面同时记录消费offset,然后更新下游数据的时候用消费位移做乐观锁拒绝旧位移的数据更新
解决方案
消费者阶段
消息重复消费
Zookeeper不适合大批量的频繁写入操作。Kafka 1.0.2将consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets主题
默认情况下__consumer_offsets有50个分区
_consume_offset
稳定性
两个follower副本都已经拉取到了leader副本的最新位置,此时又向leader副本发送拉取请求,而 leader副本并没有新的消息写入,那么此时leader副本该如何处理呢?可以直接返回空的拉取结果给 follower副本,不过在leader副本一直没有新消息写入的情况下,follower副本会一直发送拉取请求,并且总收到空的拉取结果,消耗资源。
Kafka在处理拉取请求时,会先读取一次日志文件,如果收集不到足够多(fetchMinBytes,由参数 fetch.min.bytes配置,默认值为1)的消息,那么就会创建一个延时拉取操作(DelayedFetch)以等待拉取到足够数量的消息
延迟队列
kafka没有重试机制不支持消息重试,也没有死信队列,因此使用kafka做消息队列时,需要自己实现消息重试的功能
创建新的kafka主题作为重试队列: 1、创建一个topic作为重试topic,用于接收等待重试的消息 2、普通topic消费者设置待重试消息的下一个重试topic3、从重试topic获取待重试消息储存到redis的zset中,并以下一次消费时间排序 4、定时任务从redis获取到达消费事件的消息,并把消息发送到对应的topic 5、同一个消息重试次数过多则不再重试
具体代码实现查看 md文档
实现
重试队列
Customer从broker读取数据,采用`sendfile`,将磁盘文件读到OS内核缓冲区后,直接转到 socket buffer进行网络发送。
高级特性
集群架构
Kafka集群
线上环境规划
kafka是scala语言开发,运行在JVM上,需要对JVM参数合理设置,参看JVM调优专题修改bin/kafka-start-server.sh中的jvm设置,假设机器是32G内存,可以如下设置
export KAFKA_HEAP_OPTS=\"-Xmx16G -Xms16G -Xmn10G -XX:MetaspaceSize=256M -XX:+UseG1GC -XX:MaxGCPauseMillis=50 -XX:G1HeapRegionSize=16M\"
这种大内存的情况一般都要用G1垃圾收集器,因为年轻代内存比较大,用G1可以设置GC最大停顿时间,不至于一次minor gc就花费太长时间,当然,因为像kafka,rocketmq,es这些中间件,写数据到磁盘会用到操作系统的page cache,所以JVM内存不宜分配过大,需要给操作系统的缓存留出几个G
JVM参数调配
可以用kafka压测工具自己测试分区数不同,各种情况下的吞吐量
从压测结果来看,分区数到达某个值吞吐量反而开始下降,实际上很多事情都会有一个临界值,当超过这个临界值之后,很多原本符合既定逻辑的走向又会变得不同。一般情况分区数跟集群机器数量相当就差不多了(吞吐量的数值和走势还会和磁盘、文件系统、 I/O调度策略等因素相关)
分区数越多吞吐量越高吗
线上环境
Kafka
0 条评论
回复 删除
下一页