Kafka知识体系
2024-06-07 20:15:02 0 举报
AI智能生成
Kafka知识体系
作者其他创作
大纲/内容
超链接
http://matt33.com/2018/11/04/kafka-transaction
Kafka 事务性最开始的出发点是为了在 Kafka Streams 中实现 Exactly-Once 语义的数据处理,这个问题提出之后,在真正的方案讨论阶段,社区又挖掘了更多的应用场景,也为了尽可能覆盖更多的应用场景,在真正的实现中,在很多地方做了相应的 tradeoffs
txn.id 可以跟内部的 PID 1:1 分配,它们不同的是 txn.id 是用户提供的,而 PID 是 Producer 内部自动生成的(并且故障恢复后这个 PID 会变化),有了 txn.id 这个机制,就可以实现多 partition、跨会话的 EOS 语义
消费者可以消费到 HW (High Watermark) 位置
read_uncommitted(默认)
消费者忽略事务未提交的消息,即只能消费到 LSO (LastStableOffset) 的位置
read_committed
事务隔离级别(isolation.level)
1. 在隔离级别为 read_uncommited 的情况下,消息堆积量为 HW - ConsumerOffset
2. 在隔离级别为 read_commited 的情况下,消息堆积量为 LSO - ConsumerOffset
该值会影响消息堆积量 Kafka Lag的计算,消息堆积可以通过自带工具查看 kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --group test2_consumer_group
LSO存在一个问题,即当前若有一个 long transaction,比如其 first offset 是 1000,另外有几个已经完成的小事务操作,比如:txn1(offset:1100~1200)、txn2(offset:1400~1500),假设此时的 LSO 是 1000,也就是说这个 long transaction 还没有完成,那么已经完成的 txn1、txn2 也会对 consumer 不可见(假设都是 commit 操作),此时受 long transaction 的影响可能会导致数据有延迟。在实际的生产场景中,尽量避免 long transaction 这种操作
LastStableOffset
Broker 会追踪每个 Partition 涉及到的 abort transactions,Partition 的每个 log segment 都会有一个单独后缀为 .txnindex 的文件(append-only file)来存储 abort transaction 信息,因为 abort transaction 并不是很多,所以这个开销是可以可以接受的,之所以要持久化到磁盘,主要是为了故障后快速恢复,要不然 Broker 需要把这个 Partition 的所有数据都读一遍,才能知道哪些事务是 abort 的,这样的话开销太大(如果这个 Partition 没有事务操作,就不会生成这个文件)
有了这个设计,Consumer 在拉取数据时,Broker 会把这批数据涉及到的所有 abort transaction 信息都返回给 Consumer,Server 端会根据拉取的 offset 范围与 abort transaction 的 offset 做对比,返回涉及到的 abort transaction 集合
Consumer 如何过滤 abort 的事务数据
对于每个 Topic-Partition,Broker 都会在内存中维护其 PID 与 sequence number(最后成功写入的 msg 的 sequence number)的对应关系。
Broker 重启时,如果想恢复上面的状态信息,那么它读取所有的 log 文件。相比于之下,定期对这个 state 信息做 checkpoint(Snapshot),明显收益是非常大的,此时如果 Broker 重启,只需要读取最近一个 Snapshot 文件,之后的数据再从 log 文件中恢复即可
PID Snapshot 是做什么的?用来解决什么问题?
producer 端默认事务超时时间 为60s transaction.timeout.ms 。Producer 设置超时时间不能超过 Server,否则会抛出异常
对于 txn.id,我们知道 TransactionCoordinator 会缓存 txn.id 的相关信息,如果没有超时机制,这个 meta 大小是无法预估的,Server 端提供了一个 transaction.id.expiration.ms 参数来配置这个超时时间(默认是 7 天),如果超过这个时间没有任何事务相关的请求发送过来,那么 TransactionCoordinator 将会使这个 txn.id 过期
若 txn.id 长期不使用,server 端怎么处理
关于事务的部分思考
幂等性实现是事务性实现的基础,幂等性提供了单会话单 Partition Exactly-Once 语义的实现,正是因为 Idempotent Producer 不提供跨多个 Partition 和跨会话场景下的保证
只能保证 Producer 在单个会话内不丟不重,如果 Producer 出现意外挂掉再重启是无法保证的(幂等性情况下,是无法获取之前的状态信息,因此是无法做到跨会话级别的不丢不重)
幂等性不能跨多个 Topic-Partition,只能保证单个 partition 内的幂等性,当涉及多个 Topic-Partition 时,这中间的状态并没有同步。
在 0.11.0 之前,Kafka 通过 Producer 端和 Server 端的相关配置可以做到数据不丢,也就是 at least once,但是在一些网络异常情况下,可能会导致数据重复
幂等性用来解决什么问题
PID 用于标识每一个 producer
sequence numbers,client 发送的每条消息都会带相应的 sequence number,Server 端就是根据这个值来判断数据是否重复。sequence number 将会从 0 开始自增,每个 Topic-Partition 都会有一个独立的 sequence number
PID + Sequence Number
Producer 幂等性
跨会话的幂等性写入:即使中间故障,恢复后依然可以保持幂等性;
跨会话的事务恢复:如果一个应用实例挂了,启动的下一个实例依然可以保证上一个事务完成(commit 或者 abort);
跨多个 Topic-Partition 的幂等性写入,Kafka 可以保证跨多个 Topic-Partition 的数据要么全部写入成功,要么全部失败,不会出现中间状态。
kafka的事务保证
TransactionCoordinator 在遇到上 long FGC 时,可能会导致 脑裂 问题,FGC 时会 stop-the-world,这时候可能会与 zk 连接超时导致临时节点消失进而触发 leader 选举,如果 __transaction_state 发生了 leader 选举,TransactionCoordinator 就会切换,如果此时旧的 TransactionCoordinator FGC 完成,在还没来得及同步到最细 meta 之前,会有一个短暂的时刻,对于一个 txn.id 而言就是这个时刻可能出现了两个 TransactionCoordinator
发生场景
通过 CoordinatorEpoch 来判断,每个 TransactionCoordinator 都有其 CoordinatorEpoch 值,这个值就是对应 __transaction_state Partition 的 Epoch 值(每当 leader 切换一次,该值就会自增 1)
解决方式(Fencing)
HDFS 选举得到NN后会在ZK创建一个持久节点,用于记录当前NN的地址信息。当NN会话状态发生正常切换时,该持久节点也会被删除。而异常状态下该节点不会被删除,故NN选举成功后可以检查是否存在该持久节点,存在则说明出现了脑裂的情况, 直接 Kill 旧的 NN 进程或者强制切换状态
与 HDFS 不同(Fencing)
TransactionCoordinator 脑裂问题(brain split)
事务
1. 数据一致性,leader同步到ISR中的副本存在一定时间差,容易造成数据不一致
2. 负载不均衡
为什么kafka不支持读写分离
疑问
Timingwheel
延迟消息
情况2:standalone consumer与consumer group冲突时:这里所说的standalone consumer指的是使用KafkaConsumer.assign()而非subscribe()的消费者。当用户系统中同时出现了standalone consumer和consumer group,并且它们的group id相同时,此时standalone consumer手动提交位移时就会立刻抛出此异常。这是因为group coordinator无法识别这个具有相同group id的consumer,从而向它返回“你不是一个合法的组成员”错误
CommitFailedException
异常
基本消息队列,无副本机制
0.7
引入副本机制,真正意义上完备的分布式高可靠消息队列
0.8
增加基础安全认证及权限功能
使用Java重写新版本consumer API
引入Kafka Connect组件用于实现高性能的数据抽取
Kafka可以自行维护Offset、消费者的Position。也可以开发者自己来维护Offset,实现相关的业务需求。
自行控制Consumer消费消息的位置
可以使用外部存储记录Offset,如数据库之类的
可以使用多线程进行消费
新的Comsumer API
0.9
引入Kafka Streams,正式成为流式处理框架
内置机架感知以便隔离副本,提高可用性
消息格式增加时间戳
引入max.poll.records参数,允许开发者控制返回消息的条数
0.10
实现幂等性producer
支持事务
重构消息格式
新的分配算法 Sticky
老版本controller在执行多步操作时,若其中一步出错,则无法回滚之前的操作
多线程同时访问controller上下文,重构后采用单线程+基于事件队列的方式
重构controller
0.11
支持精确一次性语义EOS
改进Streams Api
改进Connect的度量指标
调整了 SASL 认证模块的错误处理逻辑,原先的认证错误信息现在被清晰地记录到日志当中
更好地支持磁盘容错,更优雅地处理磁盘错误,单个 JBOD 上的磁盘错误不会导致整个集群崩溃。
0.11.0 版本中引入的幂等性生产者需要将max.in.flight.requests.per.connection 参数设置为 1,这对吞吐量造成了一定的限制。而在 1.0.0 版本里,这个参数最大可以被设置为 5,极大提升了吞吐量范围。
1.x (2017.11)
版本历史
Kafka仅提供单个分区内的消费顺序,而不会维护全局的消费顺序
若要实现全局的消费顺序就只能通过让每个consumer group下只包含一个consumer实例来间接实现
消费顺序
version
host
port
jmx_port
timestamp
指明传输协议类型、主机名及端口号,如PLAINTEXT://host1:9092
endpoints
机架信息,若设置了该信息则Kafka在分配副本时会考虑把某个分区的多个副本分配到多个机架上,以保证高可用
rack
broker与外界通信的安全协议类型
listener_security_protocol_map
/<broker.id>保存成员节点信息
/ids
/topics
/seqid
/brokers
/controller
/delete_topics
/reassign_partitions
/preferred_replica_election
/admin
/isr_change_notification
/changes
/clients
/users
/config
保存j集群的简要信息
/cluster
保存controller组件版本号,Kafka使用该版本号来隔离无效的controller请求
/controller_epoch
zookeeper路径
分区中的所有副本统称为 AR
AR(Assigned Replicas)
瞬时高峰流量可能会大于该消息数,故会导致follower被不断的踢出加入,再踢出再加入
replica.lag.max.messages 用于控制follower落后leader副本的消息数
replica.lag.time.max.ms 表示若follower副本无法在该时间内向leader请求数据,则会被踢出ISR
0.9.0版本之前
统一使用参数 replica.lag.time.max.ms,默认10s
0.9.0版本之后
如何判定ISR
1. 请求速度追不上
2. 进程卡住,如频繁GC或程序bug
3. 创建新副本
follower与leader不同步的原因
ISR(In-Sync Replicas)
与 leader 副本同步滞后过多的副本
OSR (Out-of-Sync Replicas )
该副本第一条消息的offset
logStartOffset 受日志清除策略影响,注意 LogStartOffset 不可以缩写为 LSO,因为在 Kafka 中,LSO 特指 LogStableOffset(与事务实现息息相关)
起始位移 (base offset / LogStartOffset)
表示该副本最新一条已提交消息的下一条位移,消费端只能拉取到此offset之前的消息
1. follower成为leader时
2. broker出现崩溃导致副本被踢出ISR时
3. producer向leader写入消息时
4. leader处理follower的FETCH请求时:leader处理follower的FETCH请求时,首先会从底层的log读取数据,之后再尝试更新分区的HW
leader会尝试更新HW的情况
leader broker尝试确定分区HW时,会选出所有满足条件的副本比较他们的LEO以及自己的LEO,并选择最小的LEO最为HW
leader更新HW
处于ISR中的副本
副本落后于leader LEO 的时长不大于replica.lag.time.max.ms(默认10s). 因为有可能不在ISR中的副本会追上leader的进度
满足条件的副本
follower更新HW发生在其更新LEO之后,一旦follower向log写完数据,它就会尝试更新HW。具体算法是比较当前LEO与FETCH响应中leader的HW值,取两者的最小值作为HW
follower更新HW
1. leader接收到生产者消息后将其写入到底层日志,同时更新自身LEO
2. 尝试更新leader的HW:follower尚未发起FETCH请求,故leader保存的remote LEO=0,将remote LEO与leader LEO对比取最小值即 HW = 0,与原HW相同,故不更新分区HW
3. leader 接收到follower的FETCH请求,开始读取底层log数据,并根据FETCH请求中的fetch offset=0 更新remote LEO
4. leader再次尝试更新HW,leader LEO=1,remote LEO =0,故分区HW=0,此时consumer无可消费的消息
5. leader将数据及当前HW返回给follower
6. follower接收到FETCH响应后将消息写入log日志并更新自身 LEO = 1
7. follower根据响应返回的HW与本地LEO做对比,取其中最小的值来更新HW,故HW=0。此时consumer仍无可消费的消息
8. follower发起第二轮FETCH请求,同时带上参数fetch offset=1(自身的LEO=1)发送给leader
9. leader 接收到请求后读取底层log数据,并更新自身的remote LEO=1
11. leader将数据(空数据)及当前的HW返回给follower
12. follower接收到响应后将消息写入log,因为是空数据所有无消息可写,LEO仍然为1
13. follower更新自身HW,对比本地LEO=1与fetch请求返回的HW=1,获取最小值即1,故follower 的HW=1。此时消息已经被成功拷贝到leader及follower的log中且分区HW=1,所以consumer可消费offset=0的消息
情况1:leader副本写入消息后,follower副本发送FETCH请求
第一轮FETCH请求
第二轮FETCH请求
PRODUCE请求
原理与情况1基本类似
情况2:FETCH请求保存在purgatory中时生产者发来消息
完整流程
前提是min.insync.repicas=1
3. 此时B宕机,则重启B后自动将LEO调整为之前的HW,故B会做日志截取,将offset=1的消息从log中删除,并调整LEO=1. 即 B (HW=LEO=1)
4. B重启后需要向A发起FETCH请求,但此时A宕机,B成为新的leader。而当A重启后又向B发起FETCH请求,对比自身LEO与leader的HW,发现应该取leader的HW=1,故A做日志截取,将offset=1的消息删除。此时offset=1的消息在集群所有副本中被永久删除
场景
延迟一轮FETCH请求更新HW值的设计使得follower HW值是异步延迟更新的,若这个过程中leader发生变更,那么成为新leader的follower的HW值就有可能是过期的
总结
备份数据丢失
4. 此时A重启,执行FETCH请求,发现自身LEO=2,而leader的HW=2,两者相同无需更新。这就造成leader中offset=1的消息与follower中offset=1的消息不一致问题
备份数据不一致
缺陷
使用leader epoch解决(0.11.0)
更新机制
AR 集合中最小的 logStartOffset 值
LW ( Low Watermark)
副本日志中下一条待写入消息的offset。每当leader接收到producer推送的消息,它会更新自己的LEO,通常是加1
每个副本都由LEO,不论leader与follower。leader除保存自己的LEO以外还保存这所有follower的LEO
术语及概念
AR = ISR + OSR,默认情况下,当 leader 副本发生故障时,只有 ISR 集合中的副本才有资格被选举为新的 leader
1. broker上的leader副本接收到消息后将自己的LEO值加一
2. follower副本各自发送请求给leader
3. leader分别将该消息推送给follower副本
4. follower副本接收到消息后各自将更新自己的LEO,加一
5. leader副本接收到其他follower副本的响应后,更新HW。也即代表该条消息可以被consumer消息
同步流程
副本同步
每个topic都会有对应的子目录,目录名称为 <topic>_<分区号>
以该文件的第一条消息对应的offset来命名
log.segment.bytes 指定文件大小,默认1G.日志段被填满后,Kafka会自行切分,创建出新的日志段及索引文件
日志段 .log
Kafka强制要求必须是8的整数倍,因为每个索引项占用8字节
.index 位移索引文件
Kafka强制要求必须是12的整数倍. 因为每个索引项占用12字节
.timeindex 时间戳索引文件(0.10.0引入)
分类
全部按升序排列,若个别消息的顺序错乱则不会被写入到索引项中,因为会造成乱序。特别是时间戳
每个索引文件由多条索引项(log entry)组成.Kafka会在写入消息的记录满足 log.index.interval.bytes (默认4K)后增加一个索引项
log.index.size.max.bytes 索引文件大小,默认10M.
方便通过二分查找快速定位(O(LogN))
索引文件
文件结构
默认清除7天前的日志数据(包含日志段文件及两个索引文件)
log.retention.{hours|minutes|ms} 可配置时间间隔,ms优先级最高,minutes次之,hours最后
0.10版本之前使用当前时间与文件最近修改时间做比较,而该时间会时常变动。因此0.10开始取日志段首条消息的时间做对比
1. 基于时间清除
2. 基于文件大小清除
在 Kafka 的日志管理器中会有一个专门的日志删除任务来周期性地检测和删除不符合保留条件的日志分段文件,这个周期可以通过 broker 端参数 log.retention.check.interval.ms 来配置,默认值为 300000,即 5 分钟
3. 定时清除
日志清除策略
日志清除对当前日志段不生效。若日志段文件设置过大而没有日志切分,那么日志清除永远无法执行
基于时间的保留策略
基于日志大小的保留策略
基于 logStartOffset 来实现
基于日志起始偏移量的保留策略
LogSegment保留策略
对于相同key不同value的消息,仅保留最新的一条,多余的会被清除
必须设置可以,否则无法执行该操作
高可用日志化
数据库变更订阅
时间溯源
使用场景
log compaction策略
0.10.1开始同时支持两种策略
log.cleanup.policy
是否启用Cleaner
0.9.0及之前默认为false,之后默认为true
log.cleaner.enable
表示比某个时间段新的数据无需清理,默认为0
log.cleaner.min.compaction.lag.ms
相关参数
日志压缩(compaction)
日志存储
某个broker被选举出来管理集群所有分区状态并执行相应的管理操作
分区创建完成,且分区已确定副本列表,但尚未选出leader和ISR
New
分区leader被选出
Online
崩溃状态
Offline
分区不存在或已删除
NonExisten
状态
分区状态
controller创建副本时的状态,只能成为follower
在线状态,既可成为follower也可成为leader
若开启了topic删除操作,则topic下所有分区的所有副本都会被删除
DeletionStarted
副本删除失败
DeletionIneligible
副本删除成功
副本状态
维护的状态分两类
当有分区信息发生变更时,controller将变更后的信息封装进UpdateMetadataRequests请求发给集群中的broker
更新集群元数据
创建、删除topic
对topic的所有分区重新分配副本所在broker的位置,以期望实现更均匀的分配效果
分区重分配
broker崩溃或加入
在众多 leader 的转移过程中,就会产生 leader 不均衡现象,可能一小部分 broker 上有大量的 leader,影响了整个集群的性能,所以就需要把 leader 调整会最初的 broker 上,这就需要 preferred leader 选举
preferred leader副本选举
若某些topic的现有分区数不足以支撑clients的业务量,因此需要增加分区
topic分区扩展
在zk创建一个临时节点/controller,节点保存了当前controller所在的broker id。集群首次启动时,所有broker都会抢着创建该节点,但zk保证最终只会有一个broker创建成功
controller leader选举
受控关闭可最大限度的降低broker的不一致性
仅依赖RPC实现,无需zk
受控关闭
职责
0.11版本之前controller的设计是多线程的,故使用了大量同步机制
ControllerContext
数据类
ZkClient
负责向其他broker发送请求
ControllerChannelManager
ControllerBrokerRequestBatch
RequestSendThread
ZookeeperLeaderElector
基础功能类
组件
多线程共享状态
代码组织混乱
管理类请求与数据类请求未分开
controller同步些zk且是一个分区一个分区的写
controllerg给broker的请求无版本号信息
老版本设计缺陷
controller
处理请求的模式是Reactor模式
一个acceptor线程及若干个processor线程组成,num.network.threads可配置processor的线程数
1. processor线程接收acceptor线程分配的新socket连接通道,然后开始监听该通道上的数据传输
2. processor将接收到的请求放入broker启动时创建的全局唯一请求队列(queued.max.requests,默认500),一旦队列已满则clients端发送给broker的请求将会被阻塞
3. 队列中的请求由KafkaRequestHandler线程池分配具体线程来处理,线程池大小可以通过 num.io.threas 配置,默认为8个线程
4. broker还会创建与processor线程数等量的响应队列,故processor线程的另一个重要的任务是实时处理各自响应队列中的响应结果
流程
broker请求处理
Broker
consumer group 组内成员发生崩溃时,coordinator检测失败的时间,默认10s
session.timeout.ms
设置消息处理逻辑的最大时间
max.poll.interval.ms
从最早的位移开始消息,不一定时0
earliest
latest
none
无位移信息或位移越界时Kafka的应对策略
auto.offset.reset
comsumer是否自动提交位移
enable.auto.commit
指定了consumer端单次获取数据的最大字节数
fetch.max.bytes
max.poll.records
通知其他成员开启新一轮rebalance的心跳间隔时间
当coordinator决定开启新一轮rebalance时,它会将这个决定以REBALANCE_IN_PROCESS异常的形式放入consumer心跳请求的reponse中,这样其他成员拿到response后才能知道它需要重新加入group
heartbeat.interval.ms
空闲socket的存活时间,默认9min,推荐设置为-1,即无需关闭空闲连接
connections.max.idle.ms
主要参数
订阅列表
基于正则表达式订阅
订阅topic
每个consumer实例都会为它消费的分区维护属于自己的位置信息来记录当前消费了多少条信息
at most once
at least once
exactly once (EOS)
实现消息交付语义保证的基石(message delivery semantic)
broker从此变成了有状态的,增加同步成本,影响伸缩性
需要引入应答机制来确认消费成功与否
由于要保存offset,故需引入复杂的数据结构,从而造成不必要的资源浪费
实现简单,但有几方面影响
保存在服务器端
只需简单保存一个长整型数据即可
引入checkpointing机制,定期对offset进行持久化,简化应答机制实现
保存在consumer group
保存方式对比
consumer客户端需要定期向集群汇报自己消费数据的进度
consumer定期将位移信息提交到zookeeper下的固定节点中
旧版
故新版consumer不再依赖zookeeper
consumer把位移提交到Kafka的一个内部topic中(__consumer_offsets)
新版
位移提交
consumer会在Kafka集群的所有broker中选择一个broker作为consumer group的coordinator,用于实现组成员管理、消费分配方案制定以及位移提交
当consumer运行一段时间后,必须提交自己的位移信息。如果consumer崩溃或关闭,它负责的分区就会被分配给其他consumer。因此一定要在其他consumer读取这些分区前就做好位移提交工作,否则会出现消息的重复消费。
consumer向所属的coordinator发送位移信息,每个位移提交请求都会往__consumer_offsets对应分区上追加写入一条信息。
消息的key由group.id,topic及分区组成,value是位移值
若consumer为同一个group的同一个topic提交了多次位移,那么__consumer_offsets对应的分区上就会有若干条key相同但value不同的消息,默认仅会关心最新一条消息,其他旧消息会在compact过程中被清理
提交位移的机制
参数
auto.commit.enable
位移管理
offset
实现简单
速度快,无线程间交互
易于维护分区间的消息消费顺序
方便位移管理
优势
socket连接开销大
consumer数受限于topic分区数,扩展性差
broker端处理负载高
rebalance可能性增大
1. 每个线程维护一个KafkaConsumer实例
消息获取与处理解耦
可独立扩展consumer数及worker数,伸缩性好
难以维护分区内的消息顺序
处理l链路变长,导致位移管理困难
worker线程异常可能导致消费数据丢失
2. 单个KafkaConsumer实例 + 多 worker 线程
多线程消费
consumer group
standalone consumer
新版本
记录consumer的订阅信息
消费线程是新旧版本consumer在设计上的重大区别
保存consumer各个消费线程的Id
/owners
保存该group消费指定分区的位移信息
/offsets
<groupid>
/consumers
zookeeper负责管理group节点
high level consumer
1. 消息重演
2. 只想消费部分分区数据
即把数据处理与位移提交放入一个事务中处理
3. 实现精确一次处理语义
storm-kafka插件就是使用low level,它把位移保存在zookeeper中特定的位置下
用户必须自行处理位移提交
用户必须寻找分区的leader broker
用户必须自行处理leader变更
劣势
low level consumer
旧版本
新旧版本对比
用于实现高伸缩、高容错的消费机制
组内多个consumer实例可以同时读取Kafka消息,一旦由某个consumer挂了,consumer group会立即将已崩溃consumer负责的分区转交给其他consumer来负责,从而保证整个group可以继续工作,不会丢失数据
定义
规定了一个consumer group下所有consumer如何达成一致来分配订阅topic的所有分区
新consumer加入
已有consumer主动离开
最常见的场景是:consumer消费逻辑过重,无法在指定时间内完成消息的处理,那么coordinator认为该consumer已经崩溃,从而引发rebalance
已有consumer崩溃
1. 组成员发生变更
如使用基于正则表达式的订阅
2. 组订阅topic数发生变更
如使用命令行脚本增加订阅topic的分区数
3. 组订阅topic的分区数发生变更
触发条件
range策略,新版本默认
round-robin策略
避免前两种策略无视历史分配方案的缺陷
可规避极端情况下的数据倾斜并且在两次rebalance间最大限度的维持之前的分配方案
sticky策略 (0.11版本引入)
分区分配策略(partition.assignment.strategy)
每个goup进行rebalance后,generation号都会加1,表示group进入了一个新的版本号
rebalance后,若consumer提交的是rebalance前延迟的offset信息则会被group拒绝
generation(分代概念)
consumer请求加入组
1. JoinGroup
group leader把分配方案同步更新到组内所有成员
2. SyncGroup
consumer定期向coordinator汇报心跳表明自己依然存活
consumer也是根据heartbeat请求的响应中是否包含REBALANCE_IN_PROCESS来判断当前group是否开启新一轮rebalance
3. Heartbeat
consumer主动通知coordinator该consumer即将离组
4. LeaveGroup
查看组的所有信息,包括成员信息、协议信息、分配方案以及订阅信息等。主要供管理员使用,coordinator不使用该请求执行rebalance
5. DescribeGroup
协议
1. 计算Math.abs(groupID.hashCode) % offsets.topic.num.partitions(默认50),获得分区
2. 寻找该分区在__consumer_offsets中的leader副本所在的broker,该broker即这个group的coordinator
确定coordinator所在的broker
组内所有的consumer向coordinator发送JoinGroup请求
当收集全JoinGroup后(若个别consumer指定了自定义分配策略而其他consumer都不支持,则该consumer被拒绝加入),coordinator从中选择一个consumer作为group的leader并把所有成员信息以及他们的订阅信息发送给leader
加入组
leader根据系统指定的分区分配策略决定每个consumer都负责哪些topic的哪些分区
组内成员都会发送SyncGroup请求,只不过只有leader发送的请求才会包含分配方案
分配完成后,leader会把这个分配方案装进SyncGroup请求并发送给coordinator。
coordinator接收到分配方案后把属于每个consumer的方案单独抽取出来作为SyncGroup请求的response返回给各自的consumer
同步更新分配方案
执行流程
便于维护与升级
便于实现自定义策略
解耦组管理与分区分配
分区分配在consumer端执行而非broker执行
ConsumerRebalanceListener
监听器
ByteArrayDeserializer
ByteBufferDeserializer
BytesDeserializer
DoubleDeserializer
IntegerDeserializer
LongDeserializer
StringDeserializer
反序列化
rebalance
group下没有任何active consumer,但可能包含位移信息
Empty
该状态下的group依然可能保存有位移信息,因此clients可以发起OffsetFetch及OffsetCommit请求
preparingRebalance
group所有成员都已加入并等待leader consumer发送分区分配方案。
此时依然可以获取位移,但若提交位移coordinator将会抛出REBALANCE_IN_PROCESS异常
AwaitingSync
Stable
Dead
consumer group执行rebalance
新版KafkaConsumer属非线程安全,若未显示增加同步锁机制,则Kafka会抛出KafkaConsumer is not safe for multi-threaded access
清除consumer创建的各种socket资源
通知consumer group主动离组从而更快的开启新一轮rebalance
consumer消费完成后需要调用close方法关闭consumer
消息获取
coordinator管理
异步任务结果的处理
位移的提交
用户主线程
后台心跳线程
多线程程序(新版本)
Consumer
大量使用操作系统页缓存,速度快,命中率高
以追加的方式写文件,将随机写改为顺序写
使用sendfile系统调用,实现数据零拷贝
高吞吐、低延时
解耦消息的发送与消费
实现灵活的消息处理
消息持久化
保证高可用
负载均衡与故障转移
服务器状态的保存和管理交由专门的协调服务来处理,无需集群间共享,方便集群节点扩容
伸缩性
特性
CRC(4byte)
版本号(1b)
高五位保留
0 无压缩
1 GZIP
2 Snappy
3 LZ4
低三位用于保存压缩类型
属性(1b)
key长度(4b)
key
value长度(4b)
value
V0 (0.10.0.0之前)
时间戳(8byte)
表示消息创建时由producer指定时间戳
CREATE_TIME
后者表示消息被发送到broker时由broker指定时间戳
LONG_APPEND_TIME
第四位用于指定时间戳类型
低三位保存压缩类型不变
属性(1byte)
V1(0.10.0.0)
消息总长度(可变长度)
时间戳增量(可变长度)
位移增量(可变长度)
key lenght(可变长度)
value size(可变长度)
header个数(可变长度)
headers(可变长度)
V2(0.11.0)
一般情况下,使用较多的是小整数,那么较小的整数应使用更少的byte来编码
32位 : h(n) = (n << 1) ^ (n >> 31)
64位 :h(n) = (n << 1) ^ (n >> 63)
直接去掉hash值的前导0之后的byte作为压缩编码
最高位为1表示编码尚未结束,仍需读取后面的字节来获取完整编码
最高位为0表示已获取到完整编码
固定将每个字节的第一位表示该字节是否是某个数编码的最后一个字节
编码
(n >>> 1) ^ -(n & 1)
解码
借鉴Google ProtoBuffer中的Zig-zag编码,降低字节数
增加消息总长度,避免重复计算
不再使用8字节保存时间戳,而是使用一个可变长度保存与消息集合batch起始时间戳的差值,节省空间
增加消息位移增量,保存与消息集合batch起始位移的差值
增加消息头部headers,对用户可见,每个头部包含两个字段key(string)及value(byte[]). 可满足用户定制化需求,如集群间消息路由或承载消息的一些特定元数据信息
去除消息级CRC校验,仅对整个batch校验
放弃attribute字段
概要
消息格式
每个消息集合包含若干日志项,V2版本之前日志项称为log entry,V2版本则称为消息批次(record batch)
1. 若未启用压缩,则浅层消息就是消息本身
Kafka会将多条消息压缩到一起封装到该条浅层消息的value字段
此时该浅层消息被称为wrapper消息,而value字段包含的消息被称为inner消息
2. 若启用压缩
浅层消息(shallow message)
该消息在Kafka分区日志中的offset
1. 未启用压缩
表示wrapper消息中最后一条inner消息的offset
2. 启用压缩
offset (8byte)
size (4byte)
日志项头部(log entry header)
1. offset (8byte)
2. size (4byte)
3. message (由size决定)
日志项格式
V0及V1
3. 分区leader版本号 (4byte)
4. 版本(1byte)
5. CRC(4byte)
第四位依然保存时间戳类型
第五位表示事务类型
第六位表示控制类型
6. attribute(2byte)
7. 最大位移增量(4byte)
8. 起始时间戳 (8byte)
9. 最大时间戳(8byte)
一个幂等性producer的ID
10. PID (8byte)
PID携带的当前版本号
11. Producer epoch (2byte)
12. 起始序列号 (4byte)
13. 消息个数 (4byte)
14. 消息 (可变)
为实现幂等性producer及支持事务而引入
V2
各版本日志项区别
消息集合格式
topic
partition
leader replica
follower replica
replica
不响应客户端发来的写入及消费请求,只与leader保存同步,只有leader replica宕机后才会被选举为新的leader replica
ISR至少存在一个活着的replica
只有ISR集合中的所有replica都接收到了同一条信息,Kafka才会将消息置于已提交状态
只有该集合中的replica才能被选举为leader
ISR (in-sync replica)
消息
运行Java的操作系统通常默认开启页缓存机制,即堆上保存的对象很有可能在页缓存中还保留一份,这就造成了资源浪费。kafka特意避开了Java堆上内存分配,直接使用紧凑二进制字节数组ByteBuffer而不是独立对象
消息引擎
流式处理框架
设计初衷:解决超大数据集的实时传输
kafka-consumer-groups.sh --bootstrap-server xxxx:9090 --list
查看消费者列表 --list
kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --all-groups
该命令可以查看 Lag 消息积压数量
查看消费者组详情 --describe
kafka-consumer-groups.sh --describe --members --group test2_consumer_group --bootstrap-server xxxx:9090
查询消费者成员信息 --members
kafka-consumer-groups.sh --describe --all-groups --state --bootstrap-server xxxx:9090
查询消费者状态信息 --state
删除消费者组 --delete
重置消费组的偏移量 --reset-offsets
消费者组管理 : kafka-consumer-groups.sh。旧版使用的是 ZkConsumerGroupService ,新版使用的 KafkaConsumerGroupService
运维
1. 横向扩展问题
通过增加 broker 机器性能实现纵向扩容
2. 读写热点问题
缺陷与不足
LeaderNotAvailableException
NotControllerException
NetworkException
可重试异常(都继承于RetrizbleException)
换届选举期间,重试后可恢复
RecordTooLargeException
SerializationException
KafkaException
不可重试异常
错误类型
0:无需理睬leader broker是否已经写入成功,吞吐量最高
all / -1:即待ISR中的所有副本都写入成功后才返回,吞吐量最低
1:leader broker仅将消息写入本地日志,无需等待ISR中其他副本写入,默认参数
acks : 确保写入消息的副本数
用于缓存待发送消息的缓冲区大小,单位为字节,默认32M
Java版本producer启动时会首先创建一块内存缓冲区用于保存待发送的消息,然后由另一个专属线程负责从缓冲区读取消息执行真正的发送。
若producer向缓冲区写消息的速度超过了专属IO线程发送的速度,必然会造成缓冲区的不断增大。此时producer会停止手头的工作等待IO线程追上来,若一段时间之后IO线程还是无法追上producer速度,那么producer就会抛出异常并期望用户介入
buffer.memory
gzip
snappy
lz4
compression.type
重试次数,默认为0
0.11版本开始支持精准一次处理语义,从设计上避免了类似问题
1. 重试可能造成消息的重复发送
2. 重试可能造成消息的乱序
设置该参数需注意
retries
指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
该参数设置为1是为了防止topic同分区下的消息乱序问题
如顺序发送1,2两条消息,结果1发送失败,2发送成功。若此时该配置大于1,则producer会重试发送1,待发送成功时1与2的顺序已经混乱
max.in.flight.requests.per.connection
两次重试间会停顿一段时间
retry.backoff.ms
调优producer吞吐量及延时性能的重要指标
批次大小,默认大小 16384 即16k
batch.size
表示batch中的消息是否需要延时发送,即吞吐量与延时之间的权衡
默认为0 ,即消息需要立即发送,无需关心batch是否已被填满
linger.ms
控制producer能够单个请求可发送最大消息的大小
max.request.size
即producer发送请求到broker后,broker需要在该段时间内返回响应,默认30s
request.timeout.ms
实现org.apache.kafka.clients.producer.Partitioner接口,重写partition方法
自定义分区
消息分区机制
ByteArraySerializer
ByteBufferSerializer
BytesSerializer
DoubleSerializer
IntegerSerializer
LongSerializer
StringSerializer
系统序列化器
实现org.apache.kafka.common.serialization.Serializer接口
自定义
消息序列化
运行在用户主线程,在消息被序列化及计算分区前被调用
onSend
在消息被应答之前或消息发送失败时调用,运行在IO线程因此不要在该方法中放入比较中的逻辑,否则会拖慢消息发送速率
onAcknowledgement
close
实现org.apache.kafka.clients.producer.ProducerInterceptor接口
拦截器
1. 将待发送消息进行序列化并计算目标分区
compressor :负责执行追加写入操作
batch缓冲区
thunks:保存消息回调逻辑的集合
每个batch包含最重要的三个组件
2. 追加写入消息到缓冲区(accumulator)
1. 不断轮询缓冲区寻找已做好发送准备的分区
2. 将查找到的各个batch按照目标分区所在的leader broker进行分组
3. 将分组后的batch通过底层创建的socket连接发送给各个broker,并等待response返回
3. Sender线程预处理及消息发送
4. Sender线程处理response
工作流程
发送事件完全异步
Producer
待缓冲区被填满时producer处于阻塞状态并停止接收新消息而不是抛出异常
该参数在0.9版本已经废弃,改用 max.block.ms
block.on.buffer.full=true
acks=all
retries=integer.MAX_VALUE
max.in.flight.requests.per.connection=1
使用带回调机制的send
若不使用close(0),默认情况下producer会被允许将未完成的消息发送出去,这样有可能造成消息乱序
callback逻辑中显式立即关闭producer
producer端
不允许非ISR中的副本被选举为leader,从而避免broker端日志水位截断而造成的消息丢失
unclean.leader.election.enable=false
replication.factor>=3
min.insync.replicas=1
确保replication.factor>min.insync.replicas
broker端
无消息丢失配置
Kafka
0 条评论
回复 删除
下一页