kafka (消息队列)
2023-11-27 13:55:36 11 举报AI智能生成
kafka相关的概念和操作
kafka
模版推荐
作者其他创作
大纲/内容
消费端口的提交机制
__consumer_offsets中指针移动过程
消费提交结果
消息已提交
知道这条消息消费端已经消费完了,下次消费端重启,继续发送下一条消息给它
消息未提交
未超过保留时间(retention time)
下次消费端重启时再次发送这条未提交的消息给消费者
已超过保留时间(retention time)
消息本身仍然存在于broker上,但无法直接获取。只有在消费者请求拉取消息时,它们才能访问到这些超过保留时间的消息
消费端消息提交方式
参数enable_auto_commit
是否启用自动提交机制,默认是开启的
参数auto_commit_interval
执行自动提交的时间间隔,默认5秒
测试
默认自动提交
消费端程序重启后,之前已经消费过的消息是否会再次收到
结果是不会收到此前曾经处理过的消息
测试不提交
spring.kafka.listener.ack-mode用于配置Spring Kafka消费者监听器的ACK模式
record模式
在每条消息成功处理后自动提交偏移量
batch模式
在一批消息全部成功处理后自动提交偏移量
time模式
定期自动提交偏移量
count模式
在达到指定的记录数后自动提交偏移量
manual模式
手动提交偏移量
显式调用`Acknowledgment`对象的`acknowledge()`方法来提交偏移量,确保在适当的时候进行偏移量的提交
manual_immediate模式
手动提交偏移量,并立即提交
显式调用`Acknowledgment`对象的`acknowledge()`方法来提交偏移量,但在处理消息后立即提交偏移量
评价
手动提交:一个消息执行一次提交,更准确
自动提交:在一定的时间间隔之后,执行提交,存在一定的风险
例如:在时间间隔内,消费者宕机、网络不稳定等原因导致某些消息没有提交,进而导致消息重复投递
异常状态
在接收消息的过程中抛出异常而没有手动提交偏移量时,Kafka会将该消息视为未被成功处理,并尝试重新投递给消费者
Kafka会根据一定的重试策略来决定重新投递消息的次数
max.poll.retries
指定了在发生可恢复异常时,Kafka将尝试重新投递消息的最大次数,默认为10次
retry.backoff.ms
指定了两次重试之间的退避时间间隔,即每次重试失败后等待的时间,默认为100毫秒
等参数控制
如果在所有的重试尝试都失败后,消息仍然未被成功处理,那么这条消息将被视为无法恢复的错误消息,并且可能进入到死信队列等特殊处理机制中,具体取决于您的应用程序的配置
死信主题
借助死信主题机制来接收那些处理失败的消息,做一些后续的善后处理
注解
①@RetryableTopic注解
attempts
失败后重试次数
3
backoff
详情在@Backoff注解设置
@Backoff
autoCreateTopics
设置是否自动创建死信主题死信主题会自动在原主题名称后附加“-dlt”
true
②@Backoff注解
代码
监听死信主题
查看死信主题
kafka-topics.sh --bootstrap-server 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000 --list
削峰限流
设置@KafkaListener注解的concurrency属性
Offset机制
kafka保存消息
给生产者返回ACK确认信息
消息持久化保存到硬盘上的日志文件中
Topic+Partition+Offset组合在一起可以唯一定位到一条具体的消息
日志文件(6个字段)
1.分区号(Partition)
表示消息所属的分区
2.主题(Topic)
表示消息所属的主题
3.消息的偏移量(Offset)
表示消息在分区中的唯一位置
4.消息的时间戳(Timestamp)
表示消息的发送时间或其他自定义时间戳属性
5.消息的键(Key)
可选项,某些情况下用于消息路由和分发
6.消息的值(Value)
消息的实际内容
消息举例
分支主题
__consumer_offset
数据结构
Topic
表示消费者组订阅的主题
Partition
表示消费者订阅的分区
Consumer group
消费者组的标识
分为两种
组内竞争(只一个能获取到消息)
组间共享(都会接收到消息)
Offset
消费者组再该分区中的当前消费进度,指示已经消费的偏移量
Metadata
其他与偏移量相关的元数据信息
HW(High Watermark)
每个分区的高水位线,表示已经被确认的最大偏移量
LEO(Log End Offset)
每个分区的日志结束偏移量,即下一条即将被写入的消息的位置
通过Topic和Partition的组合,定位到一个具体的分区
在__consumer_offsets主题中记录各个Partition中的消息对消费者来说消费到了哪一条
kafka-console-consumer.sh
--offset earliest
本次从Offset最小值开始消费,有可能导致消息重复投递
--offset latest
本次从Offset最小值开始消费,有可能导致消息重复投递
--offset 非负整数
本次从非负整数指定的位置开始消费
旧版本
--to-earliest
本次从Offset最小值开始消费,有可能导致消息重复投递
--to-latest
本次从最新的消息开始消费,而不会考虑之前已经消费的消息,可能会导致消费者跳过一些历史消息
urrent意思是当前,这里指当前已经提交的最新Offset
--to-current
current意思是当前,这里指当前已经提交的最新Offset
--to-offset 非负整数值
本次从非负整数指定的位置开始消费
--execute
上面的参数只是设定方案,必须再带上--execute才表示执行方案
命令行测试
--offset earliest
--offset latest
--offset 非负整数
消费者组查看偏移量
客户端原生API
生产者
创建主题
kafka-topics.sh --bootstrap-server 192.168.200.100:9092 --create --topic topic-java-client
启动消费者监听主题
kafka-console-consumer.sh --bootstrap-server 192.168.200.100:9092 --topic topic-java-client
引入依赖
java程序
send()方法返回Future类型的对象
可以调用Future的get()方法同步获取任务执行结果
代码示例
获取消息发送结果
消费者
kafka整合springboot
流程
生产者
pom
配置文件
启动类
配置类
运行后查看主题
发送消息
linux上监听
java发送消息代码
消费者
pom
配置文件
group-id必须指定(一个具体的consumer是存在一个组里面)
启动类
配置类
接收消息--接收不到就删除对应的Zookeeper下__consumer_offsets
deleteall /brokers/topics/__consumer_offsets
传递实体对象类型的消息
实体类
发送消息的方法
会出现的异常(java.lang.ClassCastException)
原因:目前使用的序列化器是StringSerializer,不支持非字符串序列化
解决办法:把序列化器换成支持复杂类型的
生产者分区策略
指定分区(以分区为准)
send方法中,Integer partition(分区)传递的值,方法中的第二个参数,必须传递key(即便是空),第二个参数才是partition
没有parttion的话(不指定分区),传递的key起作用
key值会首先基于murmur2hash算法,得到HASH值,然后HASH值对分区数量取模
分区和key都不指定
Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区
待该分区的batch(默认16k)已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)
轮询分区器
轮询分区器:轮询
spring.kafka.producer下面:
properties:
partitioner.class: org.apache.kafka.clients.producer.RoundRobinPartitioner
测试代码
自定义分区器
实现自定义逻辑
spring.kafka.producer下面:
properties:
partitioner.class: com.atguigu.kafka.partitioner.AtguiguPartitioner
添加配置自定义类
测试
生产者拦截器
实现接口
mplements ProducerInterceptor<String,Object>
泛型说明
- K:发送消息时指定的key
- V:发送消息时消息数据本身
重写方法
onSend()
消息发送前执行(已经调用了 KafkaTemplate 对象的 send() 方法,但是消息还没有发送到 broker)
onAcknowledgement()
Kafka服务端应答后,应答到达生产者确认回调前执行
close()
拦截器对象销毁之前执行
配置类加上
@PostConstruct注解
被修饰的方法执行时机:Web环境下Servlet对象创建完成,初始化操作之前
对被修饰方法的要求
不能有参数
不能有返回值
必须是public权限修饰符
用之前方法测试
生产者数据有序
Kafka 最多只保证单分区内的消息是有序的
所以如果要保证业务全局严格有序,就要设置 Topic 为单分区
但是,某批次的数据发送失败后,进行了重试,也可能出现后边的批次先于它到达的情况
解决方案
需要有序的消息发送到同一个分区
实现方式一:直接指定相同的partition值
实现方式二:不指定partition,而是指定相同的key
设置retries值为0
有数据丢失的风险
测试
配置文件
spring.kafka.producer下面:
retries: 0
测试同分区代码
调用 get() 方法(能保证)
并不是为了获取任务执行的结果
而是确保第一个任务执行完成之后,再执行第二个任务
如果不这样做,各个子线程负责发送消息没办法保证发送消息的顺序
测试不同分区代码
无法保证有序
- 在生产者端发送消息之前,把消费端程序停止
- 把消息全部发送到broker之后,再启动消费端程序接收消息
结果是有一定顺序
生产者ACK确认
ack
为了确认消息发送结果,我们需要引入ACK确认机制
acknowledge单词的缩写
spring.kafka.producer.acks可配置值
0
生产者发送数据后就不管了,不会等待broker的ack,这个延迟最低但是存储的保证最弱。当server挂掉的时候就会丢数据
几乎不用
1
默认值
生产者会等待ack值 ,leader确认接收到消息后发送ack,不需要follower确认。但是如果leader挂掉后他不确保消息是否同步到了所有的follower中,新leader也会导致数据丢失,可靠性中等,效率中等
一般用于传输普通日志,允许丢个别数据
-1(all)
- 生产者会等所有的follower的副本受到数据后才会收到leader发出的ack,也即Leader和ISR队列里面所有Follwer应答,可靠性最高、效率最低
一般用于传输重要不能丢失的数据(例如:钱、订单、积分等),对可靠性要求比较高的场景
如果没有接收到ack,生产者端会考虑参照retries参数执行重试操作
生产者事务
提出问题
要么都发送到消息队列
要么都不发送
例子
下单成功后,把以下消息存入消息队列:
send()方法
一个事务中所有send()方法都执行成功,所有要发送的消息都存入缓存了,再一起往broker发送
分支主题
生产者端事务仅仅对消息是否全部进入缓冲区进行管理,至于消息在网络上传输过程中是否会丢失就需要借助ack和retries机制了
kafka
对应的spring.kafka.producer下的配置
retries必须大于0
acks必须设置为-1(或all)
测试代码
消息重试机制
事务只负责保证消息存入缓冲区,已经存入缓冲区的消息都是确定要发送到broker的
但是从生产者到broker需要经过网络,broker也有宕机的风险
所以事务提交并不能保证消息一定能写入broker
数据发送可靠性水平
①At Most Once(至多一次)
②At Least Once(至少一次)
③Exactly Once(精准一次)
幂等性
广义理解
个操作执行一次和执行多次对系统的影响是一样的,执行多次不会破坏数据完整性
狭义理解
不论Producer向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复
总结
在Kafka中可以通过事务、重试机制和幂等性等特性来实现类似于"Exactly Once"的效果
风险和应对方式总结
风险
应对方式
效果
业务代码出错,导致在一个逻辑整体中的消息部分发送、部分没有发送
事务
消息要么都发送
要么都不发送
网络传输过程中消息丢失
重试
多次重试保证消息发送成功
重复消息导致数据计算错误
幂等性
每次操作都针对一个具体的id执行对其它数据没有影响
broker实例宕机导致无法接收消息
集群
集群中包含多个broker实例,避免单点故障
数据分区:给数据分区配备复制分片
火山爆发、地震、海啸、火灾……
跨区域容灾
比如:东京地震了,但是我们在北京有跨区域容灾机制
kafka集群
注册到同一个Zookeeper上就代表它们是同一个集群的
Kafka通过broker.id来区分集群中的不同节点
构建集群
分支主题
端口号
配置文件
日志目录
实例01
6000
/opt/k-cluster/server6000.properties
/opt/k-cluster/log6000
实例02
7000
/opt/k-cluster/server7000.properties
/opt/k-cluster/log7000
实例03
8000
/opt/k-cluster/server8000.properties
/opt/k-cluster/log8000
创建目录
mkdir -p /opt/k-cluster/log6000
mkdir -p /opt/k-cluster/log7000
mkdir -p /opt/k-cluster/log8000
复制配置文件
cp /opt/kafka_2.13-3.6.0/config/server.properties /opt/k-cluster/server6000.properties
cp /opt/kafka_2.13-3.6.0/config/server.properties /opt/k-cluster/server7000.properties
cp /opt/kafka_2.13-3.6.0/config/server.properties /opt/k-cluster/server8000.properties
修改配置文件
6000端口
7000端口
8000端口
启动集群实例
验证端口(lsof -i:端口号)
启动失败
虚拟机内存不足
增加内存
关闭一些进程
修改对应启动脚本程序中的内存大小
Zookeeper
zookeeper-server-start.sh
KAFKA_HEAP_OPTS
Kafka
kafka-server-start.sh
KAFKA_HEAP_OPTS
停止集群
使用集群
创建主题
查看主题
集群消息发送
集群消息消费
集群消息接收的时候,收不到
方案一
集群查看
方案二
第一步:把apache-zookeeper-3.9.1-bin.tar.gz上传到Linux系统/opt目录下
第二步:解压apache-zookeeper-3.9.1-bin.tar.gz文件
cd /opt
tar -zxvf apache-zookeeper-3.9.1-bin.tar.gz
第三步:运行zkCli.sh脚本文件,登录到Zookeeper服务器
/opt/apache-zookeeper-3.9.1-bin/bin/zkCli.sh
第四步:删除__consumer_offsets主题
deleteall /brokers/topics/__consumer_offsets
第五步:退出Zookeeper
quit
第六步:重启
先关闭然后重新启动Zookeeper
先关闭然后重新启动集群各实例
Kafka图形化监控Eagle
1.搜索镜像
docker search efak
2.下载镜像运行容器
使用
访问
http://192.168.200.100:8048
登陆
可视化界面查看broker实例
新建相关主题
注意:Kafka集群中broker实例的数量需要大于等于复制因子(Replication factor)
主题列表中可以看到对应的broker拥有主题,主题中可以看到对应的分区,分区中可以看到对应的消息
引入
业务逻辑(例子:下单功能)
同步:串行(多功能)
每一步都满足才有返回
响应时间长
并发压力传递
系统结构传递不足
异步:解耦
存入消息队列就能返回结果(异步处理)
添加新功能植入(应用解耦)
流量削锋
不同进程(process)之间传递消息
两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程(要做隔离)
某一个进程接受的消息太多,一下子无法处理完,对收到的消息进行排队
Apache Kafka
是一个分布式流处理平台,具有高吞吐量、低延迟和可靠性等特点
它广泛应用于实时数据处理、日志收集、消息队列等场景
日志收集
一个公司可以用Kafka可以收集各种服务的Log,通过Kafka以统一接口服务的方式开放给各种Consumer
消息系统
解耦生产者和消费者、缓存消息等
用户活动跟踪
用来记录web用户或者APP用户的各种活动,如网页搜索、搜索、点击,用户数据收集然后进行用户行为分析。
运营指标
Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告
流式处理
比如Spark Streaming和Storm
一款基于zookeeper协调的分布式消息系统
扩展
基于JMS的产品
JMS: Java Message Service API(Java消息服务)只能在java中使用
Apache ActiveMQ:比较成熟的产品
RocketMQ:阿里巴巴产品,目前交由Apache基金会
基于AMQP的产品
AMQP:Advanced Message Queuing Protocol(高级消息队列协议) 基于协议的,可以跨语言使用
RabbitMQ:erlang语言开发,稳定性好,响应快。
消息分类
Peer-to-Peer(Queue)
一般基于Pull或者Polling 接收消息
发送到队列中的消息被一个而且只有一个接受者接受,即便有多个接受者在同一个队列中侦听到同一个消息
即支持异步“即发即弃”的消息传送方式,也支持同步请求、应答的传送方式
发布/订阅(Topic)
发布到一个主题的消息,可以被多个订阅者接收
发布、订阅可基于Push消费数据,也可以基于Pull或者Polling消费数据
解耦能力比P2P模型更强
消息队列产品
分支主题
RabbitMQ
ActiveMQ
RocketMQ
Kafka
研发团队
Rabbit(公司)
Apache(社区)
阿里(公司)
Apache(社区)
开发语言
Erlang
Java
Java
Scala&Java
核心机制
基于AMQW的消息队列模型使用生产者-消费者模式
将消息发布到队列中,然后被消费者订阅和处理
基于JMS的消息传递模型支持点对点模型和发布-订阅模型
分布式的消息队列模型采用主题(Topic)和标签(Tag)的方式
进行消息的分类和过滤
分布式流平台,通过发布-订阅模型进行高吞吐量的消息处理
协议支持
XMPP
STOMP
SMTP
XMPP
STOMP
OpenWireREST
自定义协议
自定义协议社区封装了HTTP协议支持
客户端支持语言
官方支持Erlang,Java,Ruby等社区产出多种API,几乎支持所有语言
JavaC/C++PythonPHPPerl.NET等
JavaC++不成熟
官方支持Java社区产出多种API,如PHP,Python等
可用性
镜像队列
主从复制
主从复制
分区和副本
单机吞吐量
每秒十万左右级别
每秒数万级
每秒十万+级(双十一)
每秒百万级
消息延迟
微秒级
毫秒级
毫秒级
毫秒以内
消息确认
完整的消息确认机制
分支主题
内置消息表,消息保存到数据库实现持久化
分支主题
功能特性
并发能力强,性能极好,延时低,社区活跃,管理界面丰富
老牌产品成熟度高文档丰富
MQ功能比较完备扩展性佳
只支持主要的MQ功能毕竟是专门为大数据领域服务的
Kafka基本结构
Producer
生产者负责将消息发送到 Kafka 集群
Consumer
消费者负责从 Kafka 集群中拉取并消费消息
broker
Broker 是 Kafka 集群中的一个服务代理节点,可以看作是一台服务器
Kafka 集群通常由多个 Broker 组成,以实现负载均衡和容错
防止单点故障
高并发,扩容方便
Topic
对消息进行分类
生产者在发送消息的时候,需要指定发送到某个Topic,然后消息者订阅这个Topic并进行消费消息
Topic 是一个逻辑概念,Partition 是最小的存储单元,掌握着一个 Topic 的部分数据
Partition(分区)
提升性能
每个 Partition 都是一个单独的 log 文件,每条记录都以追加的形式写入
Topic是逻辑概念,而Partition是物理分组
生产者在发送消息的时候,需要指定发送到某个Topic的某个Partition,然后消息者订阅这个Topic并消费这个Partition中的消息
为了提高系统的吞吐量和可扩展性,把一个Topic的不同Partition放到多个Broker节点上,充分利用机器资源,也便于扩展Partition
ConsumerGroup(CG)
消费者组,由多个Consumer组成,每个ConsumerGroup中可以有多个consumer,每个consumer属于一个ConsumerGroup。
同一个Topic下的某一个分区只能被某个消费者组内的同一个消费者所消费,但可以被多个 consumer group 消费
Replication
为了保证数据的安全性和服务的高可用,又在Partition的基础上,引入Replica(副本)的概念
一个Partition包含多个Replica,Replica之间是一主多从的关系,有两种类型Leader Replica(领导者副本)和Follower Replica(跟随者副本)
Replica分布在不同的Broker节点上。
Leader负责接收生产者push的消息和消费者poll消费消息。Follower会实时从自己的Leader中同步数据保持同步
Leader故障时,某个Follower会上位为新的Leader。保证高可用
Offset
生产者Offset:消息写入的时候,每一个分区都有一个offset,这个offset就是生产者的offset,同时也是这个分区的最新最大的offset。
消费者Offset:某个分区的offset情况。例如:生产者写入的offset是最新值是10,当一个Consumer开始消费时,从0消费,一直消费到了5,消费者的offset为5。
Message
kafka集群存储的消息是以topic为类别记录的,每个消息(也叫记录record)是由
In-sync Replicas(ISR)
ISR)已同步副本:表示存活且副本都已和Leader同步的的broker集合,是Leader所有replicas副本的子集。如果某个副本节点宕机,该副本就会从ISR集合中剔除
Kafka中的Broker、Topic、Consumer都会注册到zookeeper
图片
分支主题
kafka安装
kafka下载
https://kafka.apache.org/downloads
解压
cd /opt
tar -zxvf kafka_2.12-3.4.0.tgz
# 修改解压后的文件夹名称为Kafka
mv kafka_2.12-3.4.0 kafka
目录
分支主题
配置环境变量
vim /etc/profile
# 将kafka加入到系统bin命令下配置,实现处处都可以直接使用kafka命令
JAVA_HOME=/opt/jdk1.8.0_152
KAFKA_HOME=/opt/kafka
PATH=/opt/jdk1.8.0_152/bin:/opt/kafka/bin:$PATH
export JAVA_HOME KAFKA_HOME PATH
激活:source /etc/profile
配置kafka的配置文件
编辑config目录下的server.properties(记得提前备份)
listeners=PLAINTEXT://192.168.111.172:9092
advertised.listeners=PLAINTEXT://192.168.111.172:9092
# 上面IP地址为你自己Linux系统的真实IP地址(远程访问需要)
zookeeper.connect=192.168.111.172:2181
# 上面IP地址为你自己Linux系统的真实IP地址
......
示例图片:
分支主题
测试:kafka-topics.sh -version
启动Zookeeper
启动Kafka自身
子主题 9
使用
创建主题Topic
查看主题列表
查看已经创建的Topic信息(查看主题列表)
发送消息
消费消息
停止
zookeeper-server-stop.sh
kafka-server-stop.sh
验证
lsof -i:9092
lsof -i:2181
同一消费者组内竞争
1.新建主题
kafka-topics.sh --bootstrap-server 192.168.200.100:9092 --create --topic kafka-test-group01
2.发送消息
kafka-console-producer.sh --bootstrap-server 192.168.200.100:9092 --topic kafka-test-group01
3.两个消费端接收消息
打开两个不同命令行窗口
执行命令
4.查看消费者组
图片
分支主题
不同消费者组间广播
1.新建主题
2.发送消息
3.两个消费端接收消息
组1接收
组2接收
4.查看消费者组
图片
分支主题
收藏
立即使用
Collect
Get Started
Collect
Get Started
Collect
Get Started
Collect
Get Started
评论
0 条评论
下一页