Kafka
2025-07-21 11:10:33 9 举报
AI智能生成
q
作者其他创作
大纲/内容
MQ消息队列<br>
队列:先进先出<br>
堆:先进后出<br>
作用:解耦<br>
<br>
异步执行
同步执行:前面执行完之后后面才能执行<br>
异步请求
<br>
数据削峰
秒杀:短时间要处理大量的请求<br>
将用户的请求先放到消息队列,然后根据服务器的处理能力依次处理用户请求
大数据计算
小时榜
将生产者产生的数据快速发送给消费者
生产者和消费者和消费中间件有好多人
缺点
<br>
消息系统的原理
数据单元
消息必须关联到某些业务,必须有存在的意义和价值
完整性:要么传输成功要么传输失败 确保该数据单元的完整性<br>
独立性:在各个数据单元之间没有互相依赖 防止导致被分割的消息不能被同一个消费者所消费<br>
颗粒度:粒度太小传输次数变多,集群压力大<br>粒度太大:传输次数变少,数据有可能很大的延迟<br>
消息发送
点对点
生产者将消息放到队列中,消费者从队列中消费数据,每一条消息一旦被消费之后就会被从队列中删除,每条消息只能被消费一次,即使有多个消费者也能保证消息消费的顺序 10086
<br>
发布订阅
Topic 主题<br>
java中的发布订阅模式,生产者模式<br>
长链接
消息被持久化到一个Topic中,在消息队列中可以有多个Topic<br>
消费者可以根据自己的需求订阅一个或者多个Topic<br>
一个Topic也可以被多个消费者订阅<br>
每个消费者可以消费一个Topic中所有的数据Topic中的数据被消费之后不会被马上删除<br>
消息给予消费者的方式
POLL拉取<br>
消费者根据自己的消费能力去Topic拉取数据<br>
好处:每个消费者都能很好的处理数据<br>
缺点:要经常去服务器询问是否有新数据<br>
PUSH推送<br>
将消息先存到消费者端的缓存中
好处:不用消费者时时刻刻拉取数据<br>
缺点:每个消费者消费能力不同可以造成消息的积压<br>
消费者组
为了照顾有些订阅者消费能力弱
可以将多个订阅者,消费者组成一个消费者组
每一个消费者对一个Topic 的数据只能消费一次<br>
<br>
对比
<br>
kafka<br>
系统架构
管理模型
Broker:提供服务(work)<br>
每一个 kafkaServer 称为一个Broker,多个borker组成Kafka Cluster 一个broker可以维护多个topic<br>
Controller 提供管理<br>
选举成功的Broker 就是Controller<br>
集群中任意一台Broker都能充当控制器的角色<br>
管理Brocker 管理Topic 管理Parition <br>
<br>
架构协调者
zookeeper
新旧版本
2.8之前的kafka考zookeeper<br>
2.8之后的选举用KRaft<br>
负责kafka集群的元数据管理<br>
<br>
内部架构
Topic
发送到集群的数据会被存储到Topic 生产者和消费者操作Topic<br>
partition:分区<br>
如果数据都写入Topic 而Topic 只存在于一个节点那么速度就会受到单节点的性能瓶颈<br>
kafka中的Topic 被分成多个Partition 分区,每个topic 至少有一个partition 每个partition 包含数据的一部分 每个partition都有一个l<font color="#e74f4c">og文件</font>,所有的数据都以追加的形式写入而且不会被修改和删除<br>
分区规则
如果指定了分区,则存放到指定的分区
如果没有指定分区但是<font color="#ec7270">指定了key</font> hash(key)%分区数<br>
如果没有指定分区还没有指定key轮询选出一个分区<br>
标识数据offset<br>
消息在写入到分区的时候,会自动为这个消息指定一个最大的offset<br>
<br>
这个偏移量可以理解为这个数据的唯一标识
每个分区的偏移量都是不同的每个分区维护自己的偏移量
分区内部有序外部无需
如果需要排序
一个分区
把需要排序的数据放到一个分区
Offset<br>
Offset-->生产者<br>
当前分区最大的值 也相当于每条被产生的Message都有唯一的偏移量<br>
Offset--> 消费者<br>
消费者可以根据偏移量来决定消费的位置(位置或者时间)<br>
新加一个zookeeper节点不会环节压力因为zookeeper是最终一致性每个节点上的都是一样的但是kafka不会<br>
Replication:副本<br>
为了保证数据的安全需要对分区的数据进行备份创建多个副本
kafka 为<br>
一个partition <br>
生成多个副本
分散在不同的Broker上<br>
副本算法
4副本=1Leader+3Follower<br>
Leader<br>
负责读写
生产者和消费者只跟Leader打交道<br>
Follower<br>
只负责备份
将Leader和Follower 强行不放在相同的节点上,只有副本数超过节点数才会一个节点多个副本 所以我本一般要求 <font color="#e74f4c">副本数不能大于节点数</font><br>
分配算法
将n个broker 和待分配的partition排序 <br>
将第一个partition分配到<br>
1 mod n 个brocker上<br>
将第i个partition的第j 个replication 分配到第((i+j)mode n)个broker上<br>
<br>
副本是不能用的什么时候成为leader什么时候能用后面会讲到吧<br>
Message
kafka将消息封装为message对象所有的是消息对象都是如下结构<br>
<br>
其中 key 和 value 存储的是实际的 Message 内容,长度不固定,而其他都是对 Message 内<br>容的统计和描述,长度固定。因此在查找实际 Message 过程中,磁盘指针会根据 Message 的
offset 和 message length 计算移动位数,以加速 Message 的查找过程。之所以可以这<br>样加速,因为 Kafka 的 .log 文件都是顺序写的,往磁盘上写数据时,就是追加数据,没有随<br>机写的操作。
角色分配
生产者:Producer<br>
生产者向Topic 发送信息 根据分区的规则将信息发送到指定分区<br>
brock接收到生产者发送的消息后,brock将该信息追加到当前用于追加数据的segment文件中<br>
如何发送给分区
一批
一条
消费者组:Consumer Group<br>
每一Consumer属于一个特定的ConsumerGroup -- GroupID <br>Topic消费的偏移量是根据消费者组定义<br>
整个消费者组共享一组偏移量
消费者:Consumer<br>
消费者根据自身的消费能力,从kafka拉取数据根据偏移量记录数据消费的位置<br>
<br>
···、
<br>
··
<br>
环境搭建
<br>
主题名:userlog<br>
PartitionCount:分区数三<br>
ReplicationFactor:2 2个副本
Partition: 0 1 2 0分区1分区2分区<br>
Replicas: 1,2:列出了该分区所有副本所在的 Broker 节点 ID
<br>
数据存储
<br>
分层
Topic<br>
partition-0
N个segment<br>
Segment-0<br>
00000.log 数据文件<br>
0000.index 索引文件--稀疏索引<br>
索引分类
稀疏索引
每间隔一段时间创建一个索引
稠密索引
每一个数据都需要被索引到 msyql B+ Tree<br>
0000.timeindex 时间索引<br>
Segment-1
00123.log
00123.index
00123.timeindex
index文件的名字和其名字相同的log文件对应<br>
文件名字就是代表了这个segment中第一条数据的偏移量offset<br>
为了便于管理才会被切分成一个一个的Segment , kafka自己完成切分的标准 时间 大小<br>
partition-1
partittion-2
<br>
索引与数据·
<br>
<br>
超出4k 算一条<br>
log
大偏移量减去文件名得到小偏移量
index
拿着小偏移量用二分查找进行查找数据
小偏移量和Position<br>
timeindex<br>
时间戳加小偏移量
日志清楚策略
设置过期时间
设置最大时间戳超过几天就进行删除
最大时间戳是最后的数据
选择清楚策略
删除
满足失效时间 删除
压缩
相同key 不同value 只保留最后的一个版本<br>
java代码<br>
数据发送流程
<br>
<br>
分区器
数据分区发送
<br>
副本
Partition 分区<br>
Reaplication 副本<br>
AR 123<br>全部副本
<br>
OSR<br>离开同步队列的副本
ISR<br>加入同步队列的副本<br><font color="#e74f4c">即使副本在ISR也有可能和Leader的数据也是不同步的<br>Leader也属于ISR的一部分</font><br>
评判 OSR和ISR的标准 默认10s isr中的follow没有向leader发送心跳包就会被移除 replica.lag.time.max/ms=10000<br>
保持数据的一致性
LEO<br>
因为leader和follower的偏移量是不同,<font color="#e74f4c">下一跳待写入数据的偏移量</font>Leader和每个副本有可能不同的LEO<br>
leader:3334<br>followe1r:3332<br>follower2:3333<br>follower3:3331<br>
HW<br>
HW就决定了消费者可以读取的偏移量<br>
高水位线 是通过LEO计算出来的,基于Leader的LEO和所有ISR中的副本的LEO取所有的LEO的最小值<br>
HW:3331<br>
CheckPoint<br>
故障处理
leader崩溃<br>
优先从ISR中选出一个新的Leader<br>
为了保证多个副本之间的数据一致性,竞选失败的follower 会将各自的log文件高于HW的部分截掉然后从新的Leader同步数据<br><font color="#e74f4c">有可能选举成Leader的不是数据最多的需要把多余的数据切掉感觉会丢数据</font><br>
Follower崩溃<br>
Follower发生故障后会被临时踢出ISR<br>恢复后直接拉取数据即可
Unclean<br>
leader 崩了以后 ISR里面也没有副本了 使用脏选举<br>
确认机制
消息语义
最多一条
丢失数据
至少一条
数据重复
只有一条
处理一次
截图
<br>
确认机制Ack<br>
0
生产者完全不关心Kafka集群的确认信息<br>只要发送出去即可,效率高,安全性低<br>生产者不会重发:最多发一条<br>
1
生产者需要等待Leader的确认信息,不需要等待Follower的确认信息<br>效率:中等 安全性:中等<br>生产者等不到确认信息,就会重新发送:至少一条<br>
-1 all
生产者需要等到ISR中所有的节点给与确认信息<br>效率:相对低,安全性高<br>生产者等不到确认信息,就会重新发送:至少一条<br>
消息保障机制(实现消息只发送一次)<br>
消息发送重复
<br>
幂等性
幂等性的概念 生产者无论将消息发送多少次集群只存储一份 好比数字取绝对值
实现方式:(PID) Producerld SequenceNum(SN)<br>
每次partition 接收到数据之后会对比本身存放的sn号 <br>
生产者发送数据携带sn号<br>
第一种情况
生产者sn<=分区记录的sn -- 数据重复发送<br>
第二种情况
生产者sn - 分区记录的sn = 1 -- 直接写入【这属于正常情况 只接受这种数据】<br>
第三种情况
生产者sn - 分区记录sn >1 丢失数据<br>
缺陷
可以解决数据重复发送的问题,但是消息不能跨分区发送发送的数据必须和分区绑定
截图
<br>
事务
事务协调器 : 解决了幂等性不能跨分区的问题<br>
只有一条=至少一条+幂等性+事务<br>
数据的顺序保障
<br>
<br>
数据序列化
消费者
从kafka拉取数据 <br>
优化问题
kafka数据积压之后 不能单纯增加消费者 要增加消费者组<br>
<br>
因为一个分区 只能发送给 一个消费者组中的一个消费者进行消费
<br>
<br>
分区策略(消费者)<br>
RangeAssignor<br>
消费者总数和分区总数进行整除运算来获得一个跨度
然后剩余的从第一个消费者开始多分配一个
n=p/c<br>
m=p%c<br>
前m个消费者每个消费n+1<br>
m之后的消费者每人消费n<br>
优点:简单方便 缺点:如果一个消费者组订阅多个topic 会导致前m 个压力过大<br>
RoundRobinAssignor<br>
将消费组内所有的消费者以及消费者订阅的所有topic 的partition 按照字典序排列,然后通过轮询消费者方式逐个将分区分配给每个消费者 <br>优点将分区统一考虑,降低前m个消费者的压力<br>
<br>
StickyAssignor<br>
分区的分配要尽可能的均匀
分区的分配尽可能的于上次分配的保持相同
两者发生冲突时第一个目标优于第二个
<br>
数据积压
增加分区数和消费者数
提高拉取数据最大的条数
<br>
KafkaOffset<br>
生产者
LEO<br>
HW<br>
消费者
自动
手动
外部管理Offset<br>
subscribe 读取整个topic<br>
assign 标记分区<br>
seek 指定分区的指定偏移量<br>
Kafka3.3.1新特性<br>
<br>
kraft<br>
列出所有可用主题
kafka-topics.sh --bootstrap-server node01:9092 --list
高可用方式
kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --list
创建 hello 主题,设置分区数为 1,设置分区副本数为 3
kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \<br>--topic hello \<br>--create --partitions 1 \<br>--replication-factor 3
列出指定主题的详细信息
kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \<br>--topic hello \<br>--describe
通过 topic-id 查看主题
kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \<br>--topic-id a9Q84XrBT4SqXj2FwAEaYA \<br>--describe
修改分区数(分区数只能增加不能减少,还有就是命令行的方式无法使用 --replication-factor 修改分区副<br>本数)
kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \<br>--topic hello \<br>--alter \<br>--partitions 3
删除主题
kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \<br>--topic hello \<br>--delet
Pro & Con 命令
# 创建消费者,从当前最新消息开始消费,不指定分区默认监听所有分区<br>kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \<br>--topic hello<br># 创建消费者,从当前最新消息开始消费,指定监听分区 0<br>kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \<br>--topic hello \<br>--partition 0<br># 创建消费者,从日志中最早出现的消息开始消费<br>kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \<br>--topic hello \<br>--from-beginning<br># 创建生成者<br>kafka-console-producer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \<br>--topic hello
集成Flume<br>
<br>a1.sources = r1<br>a1.sinks = k1<br>a1.channels = c1<br># 配置 Agent a1 的 Source r1 的属性<br># 使用 Taildir Source 断点续传<br>a1.sources.r1.type = TAILDIR<br># 通过一个 JSON 格式的文件记录每个文件的 Inode 信息、绝对路径和最近一次读取的位置<br>a1.sources.r1.positionFile = /var/log/flume/taildir_position.json<br># 定义文件组(被监控的文件夹目录集合),这些文件夹下的文件都会被监控,多个用空格分隔<br>a1.sources.r1.filegroups = f1<br># 声明文件组 f1 监控的文件<br>a1.sources.r1.filegroups.f1 = /var/logs/test1/baidu.log<br># 是否添加文件的绝对路径名(绝对路径+文件名)到 Header 中<br>a1.sources.r1.fileHeader = true<br># 每次重新尝试轮询新数据时的最大时间延迟(毫秒)<br>a1.sources.ri.maxBatchCount = 1000<br># 一次读取数据行和写入 Channel 的最大数量<br>a1.sources.ri.batchSize = 100<br># 配置 Agent a1 的 Sink k1 的属性<br># 使用 Kafka Sink<br>a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink<br># 主题<br>a1.sinks.k1.kafka.topic = baidu<br># Kafka 服务器地址<br>a1.sinks.k1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092<br># 一个批次中要处理的消息数,默认为 100。设置较大的值可以提高吞吐量,但是会增加延迟。<br>a1.sinks.k1.kafka.flumeBatchSize = 20<br># 在考虑成功写入之前,要有多少个副本必须确认消息,默认为 1。<br># 可选值 0:从不等待确认 1:只等待 Leader 确认;-1:等待所有副本确认。<br># 设置为 -1 可以避免因某些情况下 Leader 失败而带来的数据丢失。<br>a1.sinks.k1.kafka.producer.acks = -1<br># 等待时间,该参数指定了生产者在发送批次之前等待更多消息加入批次的时间,生产者会在批次填满或者达到这个<br>时间时把批次发送出去<br>a1.sinks.k1.kafka.producer.linger.ms = 1<br># 消息压缩算法,默认不启用压缩。可以指定为:gzip、snappy、lz4 或 zstd<br>a1.sinks.k1.kafka.producer.compression.type = snappy<br># 配置 Agent a1 的 Channel c1 的属性,Channel 是用来缓冲 Event 数据的<br># 使用 Memory Channel<br># Channel 的类型是内存 Channel,顾名思义这个 Channel 是使用内存来缓冲数据<br>a1.channels.c1.type = memory<br># 内存 Channel 的总容量大小是 1000,注意这个容量不是越大越好,配置越大一旦 Flume 挂掉丢失的 Event<br>也就越多<br>a1.channels.c1.capacity = 1000<br># Source 和 Sink 从内存 Channel 每次事务传输的 Event 数量,该配置要小于总容量大小<br>a1.channels.c1.transactionCapacity = 100<br># 把 Source 和 Sink 绑定到 Channel 上<br># 与 Source r1 绑定的 Channel 有一个,叫做 c1,一个 Source 可以绑定多个 Channel<br>a1.sources.r1.channels = c1<br># 与 Sink k1 绑定的 Channel 有一个,叫做 c1,一个 Sink 只能绑定一个 Channel<br>a1.sinks.k1.channel = c1
常用命令
创建主题
kafka-topics.sh --zookeeper node01:2181/kafka0110 --create --replication-factor 1 --partitions 2 --topic baidu
查看所有主题
kafka-topics.sh --zookeeper node01:2181/kafka0110 --list
创建生产者
kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic userlog
创建消费者
kafka-console-consumer.sh --bootstrap-server node01:9092 --topic userlog --consumer-property group.id=yjx
删除主题
提前在配置文件server.properties增加设置,默认未开启<br>delete.topic.enable=true
删除命令
kafka-topics --delete --topic userlog --zookeeper<br>node01:2181,node02:2181,node03:2181/kafka0110
0 条评论
下一页