Kafka
2025-07-21 11:10:33 4 举报
AI智能生成
q
作者其他创作
大纲/内容
MQ消息队列
队列:先进先出
堆:先进后出
作用:解耦
异步执行
同步执行:前面执行完之后后面才能执行
异步请求
数据削峰
秒杀:短时间要处理大量的请求
将用户的请求先放到消息队列,然后根据服务器的处理能力依次处理用户请求
大数据计算
小时榜
将生产者产生的数据快速发送给消费者
生产者和消费者和消费中间件有好多人
缺点
消息系统的原理
数据单元
消息必须关联到某些业务,必须有存在的意义和价值
完整性:要么传输成功要么传输失败 确保该数据单元的完整性
独立性:在各个数据单元之间没有互相依赖 防止导致被分割的消息不能被同一个消费者所消费
颗粒度:粒度太小传输次数变多,集群压力大
粒度太大:传输次数变少,数据有可能很大的延迟
粒度太大:传输次数变少,数据有可能很大的延迟
消息发送
点对点
生产者将消息放到队列中,消费者从队列中消费数据,每一条消息一旦被消费之后就会被从队列中删除,每条消息只能被消费一次,即使有多个消费者也能保证消息消费的顺序 10086
发布订阅
Topic 主题
java中的发布订阅模式,生产者模式
长链接
消息被持久化到一个Topic中,在消息队列中可以有多个Topic
消费者可以根据自己的需求订阅一个或者多个Topic
一个Topic也可以被多个消费者订阅
每个消费者可以消费一个Topic中所有的数据Topic中的数据被消费之后不会被马上删除
消息给予消费者的方式
POLL拉取
消费者根据自己的消费能力去Topic拉取数据
好处:每个消费者都能很好的处理数据
缺点:要经常去服务器询问是否有新数据
PUSH推送
将消息先存到消费者端的缓存中
好处:不用消费者时时刻刻拉取数据
缺点:每个消费者消费能力不同可以造成消息的积压
消费者组
为了照顾有些订阅者消费能力弱
可以将多个订阅者,消费者组成一个消费者组
每一个消费者对一个Topic 的数据只能消费一次
对比
kafka
系统架构
管理模型
Broker:提供服务(work)
每一个 kafkaServer 称为一个Broker,多个borker组成Kafka Cluster 一个broker可以维护多个topic
Controller 提供管理
选举成功的Broker 就是Controller
集群中任意一台Broker都能充当控制器的角色
管理Brocker 管理Topic 管理Parition
架构协调者
zookeeper
新旧版本
2.8之前的kafka考zookeeper
2.8之后的选举用KRaft
负责kafka集群的元数据管理
内部架构
Topic
发送到集群的数据会被存储到Topic 生产者和消费者操作Topic
partition:分区
如果数据都写入Topic 而Topic 只存在于一个节点那么速度就会受到单节点的性能瓶颈
kafka中的Topic 被分成多个Partition 分区,每个topic 至少有一个partition 每个partition 包含数据的一部分 每个partition都有一个log文件,所有的数据都以追加的形式写入而且不会被修改和删除
分区规则
如果指定了分区,则存放到指定的分区
如果没有指定分区但是指定了key hash(key)%分区数
如果没有指定分区还没有指定key轮询选出一个分区
标识数据offset
消息在写入到分区的时候,会自动为这个消息指定一个最大的offset
这个偏移量可以理解为这个数据的唯一标识
每个分区的偏移量都是不同的每个分区维护自己的偏移量
分区内部有序外部无需
如果需要排序
一个分区
把需要排序的数据放到一个分区
Offset
Offset-->生产者
当前分区最大的值 也相当于每条被产生的Message都有唯一的偏移量
Offset--> 消费者
消费者可以根据偏移量来决定消费的位置(位置或者时间)
新加一个zookeeper节点不会环节压力因为zookeeper是最终一致性每个节点上的都是一样的但是kafka不会
Replication:副本
为了保证数据的安全需要对分区的数据进行备份创建多个副本
kafka 为
一个partition
生成多个副本
分散在不同的Broker上
副本算法
4副本=1Leader+3Follower
Leader
负责读写
生产者和消费者只跟Leader打交道
Follower
只负责备份
将Leader和Follower 强行不放在相同的节点上,只有副本数超过节点数才会一个节点多个副本 所以我本一般要求 副本数不能大于节点数
分配算法
将n个broker 和待分配的partition排序
将第一个partition分配到
1 mod n 个brocker上
将第i个partition的第j 个replication 分配到第((i+j)mode n)个broker上
副本是不能用的什么时候成为leader什么时候能用后面会讲到吧
Message
kafka将消息封装为message对象所有的是消息对象都是如下结构
其中 key 和 value 存储的是实际的 Message 内容,长度不固定,而其他都是对 Message 内
容的统计和描述,长度固定。因此在查找实际 Message 过程中,磁盘指针会根据 Message 的
容的统计和描述,长度固定。因此在查找实际 Message 过程中,磁盘指针会根据 Message 的
offset 和 message length 计算移动位数,以加速 Message 的查找过程。之所以可以这
样加速,因为 Kafka 的 .log 文件都是顺序写的,往磁盘上写数据时,就是追加数据,没有随
机写的操作。
样加速,因为 Kafka 的 .log 文件都是顺序写的,往磁盘上写数据时,就是追加数据,没有随
机写的操作。
角色分配
生产者:Producer
生产者向Topic 发送信息 根据分区的规则将信息发送到指定分区
brock接收到生产者发送的消息后,brock将该信息追加到当前用于追加数据的segment文件中
如何发送给分区
一批
一条
消费者组:Consumer Group
每一Consumer属于一个特定的ConsumerGroup -- GroupID
Topic消费的偏移量是根据消费者组定义
Topic消费的偏移量是根据消费者组定义
整个消费者组共享一组偏移量
消费者:Consumer
消费者根据自身的消费能力,从kafka拉取数据根据偏移量记录数据消费的位置
···、
··
环境搭建
主题名:userlog
PartitionCount:分区数三
ReplicationFactor:2 2个副本
Partition: 0 1 2 0分区1分区2分区
Replicas: 1,2:列出了该分区所有副本所在的 Broker 节点 ID
数据存储
分层
Topic
partition-0
N个segment
Segment-0
00000.log 数据文件
0000.index 索引文件--稀疏索引
索引分类
稀疏索引
每间隔一段时间创建一个索引
稠密索引
每一个数据都需要被索引到 msyql B+ Tree
0000.timeindex 时间索引
Segment-1
00123.log
00123.index
00123.timeindex
index文件的名字和其名字相同的log文件对应
文件名字就是代表了这个segment中第一条数据的偏移量offset
为了便于管理才会被切分成一个一个的Segment , kafka自己完成切分的标准 时间 大小
partition-1
partittion-2
索引与数据·
超出4k 算一条
log
大偏移量减去文件名得到小偏移量
index
拿着小偏移量用二分查找进行查找数据
小偏移量和Position
timeindex
时间戳加小偏移量
日志清楚策略
设置过期时间
设置最大时间戳超过几天就进行删除
最大时间戳是最后的数据
选择清楚策略
删除
满足失效时间 删除
压缩
相同key 不同value 只保留最后的一个版本
java代码
数据发送流程
分区器
数据分区发送
副本
Partition 分区
Reaplication 副本
AR 123
全部副本
全部副本
OSR
离开同步队列的副本
离开同步队列的副本
ISR
加入同步队列的副本
即使副本在ISR也有可能和Leader的数据也是不同步的
Leader也属于ISR的一部分
加入同步队列的副本
即使副本在ISR也有可能和Leader的数据也是不同步的
Leader也属于ISR的一部分
评判 OSR和ISR的标准 默认10s isr中的follow没有向leader发送心跳包就会被移除 replica.lag.time.max/ms=10000
保持数据的一致性
LEO
因为leader和follower的偏移量是不同,下一跳待写入数据的偏移量Leader和每个副本有可能不同的LEO
leader:3334
followe1r:3332
follower2:3333
follower3:3331
followe1r:3332
follower2:3333
follower3:3331
HW
HW就决定了消费者可以读取的偏移量
高水位线 是通过LEO计算出来的,基于Leader的LEO和所有ISR中的副本的LEO取所有的LEO的最小值
HW:3331
CheckPoint
故障处理
leader崩溃
优先从ISR中选出一个新的Leader
为了保证多个副本之间的数据一致性,竞选失败的follower 会将各自的log文件高于HW的部分截掉然后从新的Leader同步数据
有可能选举成Leader的不是数据最多的需要把多余的数据切掉感觉会丢数据
有可能选举成Leader的不是数据最多的需要把多余的数据切掉感觉会丢数据
Follower崩溃
Follower发生故障后会被临时踢出ISR
恢复后直接拉取数据即可
恢复后直接拉取数据即可
Unclean
leader 崩了以后 ISR里面也没有副本了 使用脏选举
确认机制
消息语义
最多一条
丢失数据
至少一条
数据重复
只有一条
处理一次
截图
确认机制Ack
0
生产者完全不关心Kafka集群的确认信息
只要发送出去即可,效率高,安全性低
生产者不会重发:最多发一条
只要发送出去即可,效率高,安全性低
生产者不会重发:最多发一条
1
生产者需要等待Leader的确认信息,不需要等待Follower的确认信息
效率:中等 安全性:中等
生产者等不到确认信息,就会重新发送:至少一条
效率:中等 安全性:中等
生产者等不到确认信息,就会重新发送:至少一条
-1 all
生产者需要等到ISR中所有的节点给与确认信息
效率:相对低,安全性高
生产者等不到确认信息,就会重新发送:至少一条
效率:相对低,安全性高
生产者等不到确认信息,就会重新发送:至少一条
消息保障机制(实现消息只发送一次)
消息发送重复
幂等性
幂等性的概念 生产者无论将消息发送多少次集群只存储一份 好比数字取绝对值
实现方式:(PID) Producerld SequenceNum(SN)
每次partition 接收到数据之后会对比本身存放的sn号
生产者发送数据携带sn号
第一种情况
生产者sn<=分区记录的sn -- 数据重复发送
第二种情况
生产者sn - 分区记录的sn = 1 -- 直接写入【这属于正常情况 只接受这种数据】
第三种情况
生产者sn - 分区记录sn >1 丢失数据
缺陷
可以解决数据重复发送的问题,但是消息不能跨分区发送发送的数据必须和分区绑定
截图
事务
事务协调器 : 解决了幂等性不能跨分区的问题
只有一条=至少一条+幂等性+事务
数据的顺序保障
数据序列化
消费者
从kafka拉取数据
优化问题
kafka数据积压之后 不能单纯增加消费者 要增加消费者组
因为一个分区 只能发送给 一个消费者组中的一个消费者进行消费
分区策略(消费者)
RangeAssignor
消费者总数和分区总数进行整除运算来获得一个跨度
然后剩余的从第一个消费者开始多分配一个
n=p/c
m=p%c
前m个消费者每个消费n+1
m之后的消费者每人消费n
优点:简单方便 缺点:如果一个消费者组订阅多个topic 会导致前m 个压力过大
RoundRobinAssignor
将消费组内所有的消费者以及消费者订阅的所有topic 的partition 按照字典序排列,然后通过轮询消费者方式逐个将分区分配给每个消费者
优点将分区统一考虑,降低前m个消费者的压力
优点将分区统一考虑,降低前m个消费者的压力
StickyAssignor
分区的分配要尽可能的均匀
分区的分配尽可能的于上次分配的保持相同
两者发生冲突时第一个目标优于第二个
数据积压
增加分区数和消费者数
提高拉取数据最大的条数
KafkaOffset
生产者
LEO
HW
消费者
自动
手动
外部管理Offset
subscribe 读取整个topic
assign 标记分区
seek 指定分区的指定偏移量
Kafka3.3.1新特性
kraft
列出所有可用主题
kafka-topics.sh --bootstrap-server node01:9092 --list
高可用方式
kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --list
创建 hello 主题,设置分区数为 1,设置分区副本数为 3
kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \
--topic hello \
--create --partitions 1 \
--replication-factor 3
--topic hello \
--create --partitions 1 \
--replication-factor 3
列出指定主题的详细信息
kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \
--topic hello \
--describe
--topic hello \
--describe
通过 topic-id 查看主题
kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \
--topic-id a9Q84XrBT4SqXj2FwAEaYA \
--describe
--topic-id a9Q84XrBT4SqXj2FwAEaYA \
--describe
修改分区数(分区数只能增加不能减少,还有就是命令行的方式无法使用 --replication-factor 修改分区副
本数)
本数)
kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \
--topic hello \
--alter \
--partitions 3
--topic hello \
--alter \
--partitions 3
删除主题
kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \
--topic hello \
--delet
--topic hello \
--delet
Pro & Con 命令
# 创建消费者,从当前最新消息开始消费,不指定分区默认监听所有分区
kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \
--topic hello
# 创建消费者,从当前最新消息开始消费,指定监听分区 0
kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \
--topic hello \
--partition 0
# 创建消费者,从日志中最早出现的消息开始消费
kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \
--topic hello \
--from-beginning
# 创建生成者
kafka-console-producer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \
--topic hello
kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \
--topic hello
# 创建消费者,从当前最新消息开始消费,指定监听分区 0
kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \
--topic hello \
--partition 0
# 创建消费者,从日志中最早出现的消息开始消费
kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \
--topic hello \
--from-beginning
# 创建生成者
kafka-console-producer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \
--topic hello
集成Flume
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置 Agent a1 的 Source r1 的属性
# 使用 Taildir Source 断点续传
a1.sources.r1.type = TAILDIR
# 通过一个 JSON 格式的文件记录每个文件的 Inode 信息、绝对路径和最近一次读取的位置
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
# 定义文件组(被监控的文件夹目录集合),这些文件夹下的文件都会被监控,多个用空格分隔
a1.sources.r1.filegroups = f1
# 声明文件组 f1 监控的文件
a1.sources.r1.filegroups.f1 = /var/logs/test1/baidu.log
# 是否添加文件的绝对路径名(绝对路径+文件名)到 Header 中
a1.sources.r1.fileHeader = true
# 每次重新尝试轮询新数据时的最大时间延迟(毫秒)
a1.sources.ri.maxBatchCount = 1000
# 一次读取数据行和写入 Channel 的最大数量
a1.sources.ri.batchSize = 100
# 配置 Agent a1 的 Sink k1 的属性
# 使用 Kafka Sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# 主题
a1.sinks.k1.kafka.topic = baidu
# Kafka 服务器地址
a1.sinks.k1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
# 一个批次中要处理的消息数,默认为 100。设置较大的值可以提高吞吐量,但是会增加延迟。
a1.sinks.k1.kafka.flumeBatchSize = 20
# 在考虑成功写入之前,要有多少个副本必须确认消息,默认为 1。
# 可选值 0:从不等待确认 1:只等待 Leader 确认;-1:等待所有副本确认。
# 设置为 -1 可以避免因某些情况下 Leader 失败而带来的数据丢失。
a1.sinks.k1.kafka.producer.acks = -1
# 等待时间,该参数指定了生产者在发送批次之前等待更多消息加入批次的时间,生产者会在批次填满或者达到这个
时间时把批次发送出去
a1.sinks.k1.kafka.producer.linger.ms = 1
# 消息压缩算法,默认不启用压缩。可以指定为:gzip、snappy、lz4 或 zstd
a1.sinks.k1.kafka.producer.compression.type = snappy
# 配置 Agent a1 的 Channel c1 的属性,Channel 是用来缓冲 Event 数据的
# 使用 Memory Channel
# Channel 的类型是内存 Channel,顾名思义这个 Channel 是使用内存来缓冲数据
a1.channels.c1.type = memory
# 内存 Channel 的总容量大小是 1000,注意这个容量不是越大越好,配置越大一旦 Flume 挂掉丢失的 Event
也就越多
a1.channels.c1.capacity = 1000
# Source 和 Sink 从内存 Channel 每次事务传输的 Event 数量,该配置要小于总容量大小
a1.channels.c1.transactionCapacity = 100
# 把 Source 和 Sink 绑定到 Channel 上
# 与 Source r1 绑定的 Channel 有一个,叫做 c1,一个 Source 可以绑定多个 Channel
a1.sources.r1.channels = c1
# 与 Sink k1 绑定的 Channel 有一个,叫做 c1,一个 Sink 只能绑定一个 Channel
a1.sinks.k1.channel = c1
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置 Agent a1 的 Source r1 的属性
# 使用 Taildir Source 断点续传
a1.sources.r1.type = TAILDIR
# 通过一个 JSON 格式的文件记录每个文件的 Inode 信息、绝对路径和最近一次读取的位置
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
# 定义文件组(被监控的文件夹目录集合),这些文件夹下的文件都会被监控,多个用空格分隔
a1.sources.r1.filegroups = f1
# 声明文件组 f1 监控的文件
a1.sources.r1.filegroups.f1 = /var/logs/test1/baidu.log
# 是否添加文件的绝对路径名(绝对路径+文件名)到 Header 中
a1.sources.r1.fileHeader = true
# 每次重新尝试轮询新数据时的最大时间延迟(毫秒)
a1.sources.ri.maxBatchCount = 1000
# 一次读取数据行和写入 Channel 的最大数量
a1.sources.ri.batchSize = 100
# 配置 Agent a1 的 Sink k1 的属性
# 使用 Kafka Sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# 主题
a1.sinks.k1.kafka.topic = baidu
# Kafka 服务器地址
a1.sinks.k1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
# 一个批次中要处理的消息数,默认为 100。设置较大的值可以提高吞吐量,但是会增加延迟。
a1.sinks.k1.kafka.flumeBatchSize = 20
# 在考虑成功写入之前,要有多少个副本必须确认消息,默认为 1。
# 可选值 0:从不等待确认 1:只等待 Leader 确认;-1:等待所有副本确认。
# 设置为 -1 可以避免因某些情况下 Leader 失败而带来的数据丢失。
a1.sinks.k1.kafka.producer.acks = -1
# 等待时间,该参数指定了生产者在发送批次之前等待更多消息加入批次的时间,生产者会在批次填满或者达到这个
时间时把批次发送出去
a1.sinks.k1.kafka.producer.linger.ms = 1
# 消息压缩算法,默认不启用压缩。可以指定为:gzip、snappy、lz4 或 zstd
a1.sinks.k1.kafka.producer.compression.type = snappy
# 配置 Agent a1 的 Channel c1 的属性,Channel 是用来缓冲 Event 数据的
# 使用 Memory Channel
# Channel 的类型是内存 Channel,顾名思义这个 Channel 是使用内存来缓冲数据
a1.channels.c1.type = memory
# 内存 Channel 的总容量大小是 1000,注意这个容量不是越大越好,配置越大一旦 Flume 挂掉丢失的 Event
也就越多
a1.channels.c1.capacity = 1000
# Source 和 Sink 从内存 Channel 每次事务传输的 Event 数量,该配置要小于总容量大小
a1.channels.c1.transactionCapacity = 100
# 把 Source 和 Sink 绑定到 Channel 上
# 与 Source r1 绑定的 Channel 有一个,叫做 c1,一个 Source 可以绑定多个 Channel
a1.sources.r1.channels = c1
# 与 Sink k1 绑定的 Channel 有一个,叫做 c1,一个 Sink 只能绑定一个 Channel
a1.sinks.k1.channel = c1
常用命令
创建主题
kafka-topics.sh --zookeeper node01:2181/kafka0110 --create --replication-factor 1 --partitions 2 --topic baidu
查看所有主题
kafka-topics.sh --zookeeper node01:2181/kafka0110 --list
创建生产者
kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic userlog
创建消费者
kafka-console-consumer.sh --bootstrap-server node01:9092 --topic userlog --consumer-property group.id=yjx
删除主题
提前在配置文件server.properties增加设置,默认未开启
delete.topic.enable=true
delete.topic.enable=true
删除命令
kafka-topics --delete --topic userlog --zookeeper
node01:2181,node02:2181,node03:2181/kafka0110
node01:2181,node02:2181,node03:2181/kafka0110
0 条评论
下一页