kafka (消息队列)
2023-11-27 13:55:36 9 举报
AI智能生成
登录查看完整内容
kafka相关的概念和操作
作者其他创作
大纲/内容
__consumer_offsets中指针移动过程
知道这条消息消费端已经消费完了,下次消费端重启,继续发送下一条消息给它
消息已提交
下次消费端重启时再次发送这条未提交的消息给消费者
未超过保留时间(retention time)
消息本身仍然存在于broker上,但无法直接获取。只有在消费者请求拉取消息时,它们才能访问到这些超过保留时间的消息
已超过保留时间(retention time)
消息未提交
消费提交结果
是否启用自动提交机制,默认是开启的
参数enable_auto_commit
执行自动提交的时间间隔,默认5秒
参数auto_commit_interval
结果是不会收到此前曾经处理过的消息
消费端程序重启后,之前已经消费过的消息是否会再次收到
默认自动提交
在每条消息成功处理后自动提交偏移量
record模式
在一批消息全部成功处理后自动提交偏移量
batch模式
定期自动提交偏移量
time模式
在达到指定的记录数后自动提交偏移量
count模式
手动提交偏移量
显式调用`Acknowledgment`对象的`acknowledge()`方法来提交偏移量,确保在适当的时候进行偏移量的提交
manual模式
手动提交偏移量,并立即提交
显式调用`Acknowledgment`对象的`acknowledge()`方法来提交偏移量,但在处理消息后立即提交偏移量
manual_immediate模式
spring.kafka.listener.ack-mode用于配置Spring Kafka消费者监听器的ACK模式
手动提交:一个消息执行一次提交,更准确
例如:在时间间隔内,消费者宕机、网络不稳定等原因导致某些消息没有提交,进而导致消息重复投递
自动提交:在一定的时间间隔之后,执行提交,存在一定的风险
评价
测试不提交
测试
在接收消息的过程中抛出异常而没有手动提交偏移量时,Kafka会将该消息视为未被成功处理,并尝试重新投递给消费者
指定了在发生可恢复异常时,Kafka将尝试重新投递消息的最大次数,默认为10次
max.poll.retries
指定了两次重试之间的退避时间间隔,即每次重试失败后等待的时间,默认为100毫秒
retry.backoff.ms
等参数控制
Kafka会根据一定的重试策略来决定重新投递消息的次数
如果在所有的重试尝试都失败后,消息仍然未被成功处理,那么这条消息将被视为无法恢复的错误消息,并且可能进入到死信队列等特殊处理机制中,具体取决于您的应用程序的配置
异常状态
借助死信主题机制来接收那些处理失败的消息,做一些后续的善后处理
3
失败后重试次数
attempts
@Backoff
详情在@Backoff注解设置
backoff
true
设置是否自动创建死信主题死信主题会自动在原主题名称后附加“-dlt”
autoCreateTopics
①@RetryableTopic注解
②@Backoff注解
注解
代码
监听死信主题
查看死信主题
死信主题
设置@KafkaListener注解的concurrency属性
削峰限流
消费端消息提交方式
消费端口的提交机制
给生产者返回ACK确认信息
消息持久化保存到硬盘上的日志文件中
Topic+Partition+Offset组合在一起可以唯一定位到一条具体的消息
表示消息所属的分区
1.分区号(Partition)
表示消息所属的主题
2.主题(Topic)
表示消息在分区中的唯一位置
3.消息的偏移量(Offset)
表示消息的发送时间或其他自定义时间戳属性
4.消息的时间戳(Timestamp)
可选项,某些情况下用于消息路由和分发
5.消息的键(Key)
消息的实际内容
6.消息的值(Value)
日志文件(6个字段)
分支主题
消息举例
kafka保存消息
表示消费者组订阅的主题
Topic
表示消费者订阅的分区
Partition
消费者组的标识
组内竞争(只一个能获取到消息)
组间共享(都会接收到消息)
分为两种
Consumer group
消费者组再该分区中的当前消费进度,指示已经消费的偏移量
Offset
其他与偏移量相关的元数据信息
Metadata
每个分区的高水位线,表示已经被确认的最大偏移量
HW(High Watermark)
每个分区的日志结束偏移量,即下一条即将被写入的消息的位置
LEO(Log End Offset)
数据结构
通过Topic和Partition的组合,定位到一个具体的分区
__consumer_offset
在__consumer_offsets主题中记录各个Partition中的消息对消费者来说消费到了哪一条
本次从Offset最小值开始消费,有可能导致消息重复投递
--offset earliest
--offset latest
本次从非负整数指定的位置开始消费
--offset 非负整数
--to-earliest
urrent意思是当前,这里指当前已经提交的最新Offset
本次从最新的消息开始消费,而不会考虑之前已经消费的消息,可能会导致消费者跳过一些历史消息
--to-latest
current意思是当前,这里指当前已经提交的最新Offset
--to-current
--to-offset 非负整数值
上面的参数只是设定方案,必须再带上--execute才表示执行方案
--execute
旧版本
消费者组查看偏移量
命令行测试
kafka-console-consumer.sh
Offset机制
kafka-topics.sh --bootstrap-server 192.168.200.100:9092 --create --topic topic-java-client
创建主题
kafka-console-consumer.sh --bootstrap-server 192.168.200.100:9092 --topic topic-java-client
启动消费者监听主题
引入依赖
java程序
可以调用Future的get()方法同步获取任务执行结果
代码示例
send()方法返回Future类型的对象
获取消息发送结果
生产者
消费者
客户端原生API
pom
配置文件
启动类
配置类
运行后查看主题
linux上监听
java发送消息代码
发送消息
group-id必须指定(一个具体的consumer是存在一个组里面)
deleteall /brokers/topics/__consumer_offsets
接收消息--接收不到就删除对应的Zookeeper下__consumer_offsets
实体类
发送消息的方法
解决办法:把序列化器换成支持复杂类型的
原因:目前使用的序列化器是StringSerializer,不支持非字符串序列化
会出现的异常(java.lang.ClassCastException)
传递实体对象类型的消息
流程
send方法中,Integer partition(分区)传递的值,方法中的第二个参数,必须传递key(即便是空),第二个参数才是partition
指定分区(以分区为准)
key值会首先基于murmur2hash算法,得到HASH值,然后HASH值对分区数量取模
没有parttion的话(不指定分区),传递的key起作用
Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区
待该分区的batch(默认16k)已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)
分区和key都不指定
轮询分区器:轮询
spring.kafka.producer下面:properties: partitioner.class: org.apache.kafka.clients.producer.RoundRobinPartitioner
测试代码
轮询分区器
实现自定义逻辑
spring.kafka.producer下面:properties: partitioner.class: com.atguigu.kafka.partitioner.AtguiguPartitioner
添加配置自定义类
自定义分区器
生产者分区策略
- K:发送消息时指定的key- V:发送消息时消息数据本身
泛型说明
消息发送前执行(已经调用了 KafkaTemplate 对象的 send() 方法,但是消息还没有发送到 broker)
onSend()
Kafka服务端应答后,应答到达生产者确认回调前执行
onAcknowledgement()
拦截器对象销毁之前执行
close()
重写方法
实现接口
被修饰的方法执行时机:Web环境下Servlet对象创建完成,初始化操作之前
不能有参数
不能有返回值
必须是public权限修饰符
对被修饰方法的要求
@PostConstruct注解
配置类加上
用之前方法测试
生产者拦截器
所以如果要保证业务全局严格有序,就要设置 Topic 为单分区
但是,某批次的数据发送失败后,进行了重试,也可能出现后边的批次先于它到达的情况
Kafka 最多只保证单分区内的消息是有序的
实现方式一:直接指定相同的partition值
实现方式二:不指定partition,而是指定相同的key
需要有序的消息发送到同一个分区
有数据丢失的风险
设置retries值为0
解决方案
spring.kafka.producer下面:retries: 0
并不是为了获取任务执行的结果
而是确保第一个任务执行完成之后,再执行第二个任务
如果不这样做,各个子线程负责发送消息没办法保证发送消息的顺序
调用 get() 方法(能保证)
测试同分区代码
无法保证有序
- 在生产者端发送消息之前,把消费端程序停止- 把消息全部发送到broker之后,再启动消费端程序接收消息结果是有一定顺序
测试不同分区代码
生产者数据有序
为了确认消息发送结果,我们需要引入ACK确认机制
acknowledge单词的缩写
ack
生产者发送数据后就不管了,不会等待broker的ack,这个延迟最低但是存储的保证最弱。当server挂掉的时候就会丢数据
几乎不用
0
默认值
生产者会等待ack值 ,leader确认接收到消息后发送ack,不需要follower确认。但是如果leader挂掉后他不确保消息是否同步到了所有的follower中,新leader也会导致数据丢失,可靠性中等,效率中等
一般用于传输普通日志,允许丢个别数据
1
- 生产者会等所有的follower的副本受到数据后才会收到leader发出的ack,也即Leader和ISR队列里面所有Follwer应答,可靠性最高、效率最低
一般用于传输重要不能丢失的数据(例如:钱、订单、积分等),对可靠性要求比较高的场景
-1(all)
如果没有接收到ack,生产者端会考虑参照retries参数执行重试操作
spring.kafka.producer.acks可配置值
生产者ACK确认
要么都发送到消息队列
要么都不发送
下单成功后,把以下消息存入消息队列:
例子
提出问题
一个事务中所有send()方法都执行成功,所有要发送的消息都存入缓存了,再一起往broker发送
生产者端事务仅仅对消息是否全部进入缓冲区进行管理,至于消息在网络上传输过程中是否会丢失就需要借助ack和retries机制了
send()方法
retries必须大于0
acks必须设置为-1(或all)
对应的spring.kafka.producer下的配置
kafka
事务只负责保证消息存入缓冲区,已经存入缓冲区的消息都是确定要发送到broker的
但是从生产者到broker需要经过网络,broker也有宕机的风险
所以事务提交并不能保证消息一定能写入broker
①At Most Once(至多一次)
②At Least Once(至少一次)
③Exactly Once(精准一次)
数据发送可靠性水平
消息重试机制
个操作执行一次和执行多次对系统的影响是一样的,执行多次不会破坏数据完整性
广义理解
不论Producer向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复
狭义理解
幂等性
在Kafka中可以通过事务、重试机制和幂等性等特性来实现类似于"Exactly Once"的效果
效果
应对方式
风险
消息要么都发送要么都不发送
事务
业务代码出错,导致在一个逻辑整体中的消息部分发送、部分没有发送
多次重试保证消息发送成功
重试
网络传输过程中消息丢失
每次操作都针对一个具体的id执行对其它数据没有影响
重复消息导致数据计算错误
集群中包含多个broker实例,避免单点故障数据分区:给数据分区配备复制分片
集群
broker实例宕机导致无法接收消息
比如:东京地震了,但是我们在北京有跨区域容灾机制
跨区域容灾
火山爆发、地震、海啸、火灾……
风险和应对方式总结
总结
生产者事务
kafka整合springboot
注册到同一个Zookeeper上就代表它们是同一个集群的
Kafka通过broker.id来区分集群中的不同节点
日志目录
端口号
/opt/k-cluster/log6000
/opt/k-cluster/server6000.properties
6000
实例01
/opt/k-cluster/log7000
/opt/k-cluster/server7000.properties
7000
实例02
/opt/k-cluster/log8000
/opt/k-cluster/server8000.properties
8000
实例03
构建集群
mkdir -p /opt/k-cluster/log6000
mkdir -p /opt/k-cluster/log7000
mkdir -p /opt/k-cluster/log8000
创建目录
cp /opt/kafka_2.13-3.6.0/config/server.properties /opt/k-cluster/server6000.properties
cp /opt/kafka_2.13-3.6.0/config/server.properties /opt/k-cluster/server7000.properties
cp /opt/kafka_2.13-3.6.0/config/server.properties /opt/k-cluster/server8000.properties
复制配置文件
6000端口
7000端口
8000端口
修改配置文件
启动集群实例
增加内存
关闭一些进程
虚拟机内存不足
zookeeper-server-start.sh
KAFKA_HEAP_OPTS
Zookeeper
kafka-server-start.sh
Kafka
修改对应启动脚本程序中的内存大小
启动失败
验证端口(lsof -i:端口号)
停止集群
查看主题
集群消息发送
集群查看
方案一
第一步:把apache-zookeeper-3.9.1-bin.tar.gz上传到Linux系统/opt目录下
cd /opt
tar -zxvf apache-zookeeper-3.9.1-bin.tar.gz
第二步:解压apache-zookeeper-3.9.1-bin.tar.gz文件
/opt/apache-zookeeper-3.9.1-bin/bin/zkCli.sh
第三步:运行zkCli.sh脚本文件,登录到Zookeeper服务器
第四步:删除__consumer_offsets主题
quit
第五步:退出Zookeeper
先关闭然后重新启动Zookeeper
先关闭然后重新启动集群各实例
第六步:重启
方案二
集群消息接收的时候,收不到
集群消息消费
使用集群
kafka集群
docker search efak
1.搜索镜像
2.下载镜像运行容器
http://192.168.200.100:8048
访问
登陆
可视化界面查看broker实例
注意:Kafka集群中broker实例的数量需要大于等于复制因子(Replication factor)
新建相关主题
主题列表中可以看到对应的broker拥有主题,主题中可以看到对应的分区,分区中可以看到对应的消息
使用
Kafka图形化监控Eagle
每一步都满足才有返回
响应时间长
并发压力传递
系统结构传递不足
同步:串行(多功能)
存入消息队列就能返回结果(异步处理)
添加新功能植入(应用解耦)
流量削锋
异步:解耦
业务逻辑(例子:下单功能)
两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程(要做隔离)
某一个进程接受的消息太多,一下子无法处理完,对收到的消息进行排队
不同进程(process)之间传递消息
引入
是一个分布式流处理平台,具有高吞吐量、低延迟和可靠性等特点
一个公司可以用Kafka可以收集各种服务的Log,通过Kafka以统一接口服务的方式开放给各种Consumer
日志收集
解耦生产者和消费者、缓存消息等
消息系统
用来记录web用户或者APP用户的各种活动,如网页搜索、搜索、点击,用户数据收集然后进行用户行为分析。
用户活动跟踪
Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告
运营指标
比如Spark Streaming和Storm
流式处理
它广泛应用于实时数据处理、日志收集、消息队列等场景
一款基于zookeeper协调的分布式消息系统
JMS: Java Message Service API(Java消息服务)只能在java中使用
Apache ActiveMQ:比较成熟的产品
RocketMQ:阿里巴巴产品,目前交由Apache基金会
基于JMS的产品
AMQP:Advanced Message Queuing Protocol(高级消息队列协议) 基于协议的,可以跨语言使用
RabbitMQ:erlang语言开发,稳定性好,响应快。
基于AMQP的产品
扩展
一般基于Pull或者Polling 接收消息
发送到队列中的消息被一个而且只有一个接受者接受,即便有多个接受者在同一个队列中侦听到同一个消息
即支持异步“即发即弃”的消息传送方式,也支持同步请求、应答的传送方式
Peer-to-Peer(Queue)
发布到一个主题的消息,可以被多个订阅者接收
发布、订阅可基于Push消费数据,也可以基于Pull或者Polling消费数据
解耦能力比P2P模型更强
发布/订阅(Topic)
消息分类
Apache Kafka
RocketMQ
ActiveMQ
RabbitMQ
Apache(社区)
阿里(公司)
Rabbit(公司)
研发团队
Scala&Java
Java
Erlang
开发语言
分布式流平台,通过发布-订阅模型进行高吞吐量的消息处理
分布式的消息队列模型采用主题(Topic)和标签(Tag)的方式进行消息的分类和过滤
基于JMS的消息传递模型支持点对点模型和发布-订阅模型
基于AMQW的消息队列模型使用生产者-消费者模式将消息发布到队列中,然后被消费者订阅和处理
核心机制
自定义协议社区封装了HTTP协议支持
自定义协议
XMPPSTOMPOpenWireREST
XMPPSTOMPSMTP
协议支持
官方支持Java社区产出多种API,如PHP,Python等
JavaC++不成熟
JavaC/C++PythonPHPPerl.NET等
官方支持Erlang,Java,Ruby等社区产出多种API,几乎支持所有语言
客户端支持语言
分区和副本
主从复制
镜像队列
可用性
每秒百万级
每秒十万+级(双十一)
每秒数万级
每秒十万左右级别
单机吞吐量
毫秒以内
毫秒级
微秒级
消息延迟
内置消息表,消息保存到数据库实现持久化
完整的消息确认机制
消息确认
只支持主要的MQ功能毕竟是专门为大数据领域服务的
MQ功能比较完备扩展性佳
老牌产品成熟度高文档丰富
并发能力强,性能极好,延时低,社区活跃,管理界面丰富
功能特性
消息队列产品
生产者负责将消息发送到 Kafka 集群
Producer
消费者负责从 Kafka 集群中拉取并消费消息
Consumer
Broker 是 Kafka 集群中的一个服务代理节点,可以看作是一台服务器
Kafka 集群通常由多个 Broker 组成,以实现负载均衡和容错
防止单点故障
高并发,扩容方便
broker
对消息进行分类
生产者在发送消息的时候,需要指定发送到某个Topic,然后消息者订阅这个Topic并进行消费消息
Topic 是一个逻辑概念,Partition 是最小的存储单元,掌握着一个 Topic 的部分数据
提升性能
每个 Partition 都是一个单独的 log 文件,每条记录都以追加的形式写入
Topic是逻辑概念,而Partition是物理分组
生产者在发送消息的时候,需要指定发送到某个Topic的某个Partition,然后消息者订阅这个Topic并消费这个Partition中的消息
为了提高系统的吞吐量和可扩展性,把一个Topic的不同Partition放到多个Broker节点上,充分利用机器资源,也便于扩展Partition
Partition(分区)
消费者组,由多个Consumer组成,每个ConsumerGroup中可以有多个consumer,每个consumer属于一个ConsumerGroup。
同一个Topic下的某一个分区只能被某个消费者组内的同一个消费者所消费,但可以被多个 consumer group 消费
ConsumerGroup(CG)
为了保证数据的安全性和服务的高可用,又在Partition的基础上,引入Replica(副本)的概念
一个Partition包含多个Replica,Replica之间是一主多从的关系,有两种类型Leader Replica(领导者副本)和Follower Replica(跟随者副本)
Replica分布在不同的Broker节点上。
Leader负责接收生产者push的消息和消费者poll消费消息。Follower会实时从自己的Leader中同步数据保持同步
Replication
生产者Offset:消息写入的时候,每一个分区都有一个offset,这个offset就是生产者的offset,同时也是这个分区的最新最大的offset。
消费者Offset:某个分区的offset情况。例如:生产者写入的offset是最新值是10,当一个Consumer开始消费时,从0消费,一直消费到了5,消费者的offset为5。
kafka集群存储的消息是以topic为类别记录的,每个消息(也叫记录record)是由
Message
ISR)已同步副本:表示存活且副本都已和Leader同步的的broker集合,是Leader所有replicas副本的子集。如果某个副本节点宕机,该副本就会从ISR集合中剔除
In-sync Replicas(ISR)
Kafka中的Broker、Topic、Consumer都会注册到zookeeper
图片
Kafka基本结构
https://kafka.apache.org/downloads
kafka下载
cd /opttar -zxvf kafka_2.12-3.4.0.tgz# 修改解压后的文件夹名称为Kafkamv kafka_2.12-3.4.0 kafka
解压
目录
vim /etc/profile# 将kafka加入到系统bin命令下配置,实现处处都可以直接使用kafka命令
JAVA_HOME=/opt/jdk1.8.0_152KAFKA_HOME=/opt/kafkaPATH=/opt/jdk1.8.0_152/bin:/opt/kafka/bin:$PATHexport JAVA_HOME KAFKA_HOME PATH
激活:source /etc/profile
配置环境变量
编辑config目录下的server.properties(记得提前备份)
listeners=PLAINTEXT://192.168.111.172:9092advertised.listeners=PLAINTEXT://192.168.111.172:9092# 上面IP地址为你自己Linux系统的真实IP地址(远程访问需要)
zookeeper.connect=192.168.111.172:2181# 上面IP地址为你自己Linux系统的真实IP地址......
示例图片:
配置kafka的配置文件
测试:kafka-topics.sh -version
启动Zookeeper
启动Kafka自身
子主题 9
kafka安装
创建主题Topic
查看主题列表
查看已经创建的Topic信息(查看主题列表)
消费消息
zookeeper-server-stop.sh
kafka-server-stop.sh
停止
lsof -i:9092
lsof -i:2181
验证
kafka-topics.sh --bootstrap-server 192.168.200.100:9092 --create --topic kafka-test-group01
1.新建主题
kafka-console-producer.sh --bootstrap-server 192.168.200.100:9092 --topic kafka-test-group01
2.发送消息
打开两个不同命令行窗口
执行命令
3.两个消费端接收消息
4.查看消费者组
同一消费者组内竞争
组1接收
组2接收
不同消费者组间广播
kafka(消息队列)
收藏
0 条评论
回复 删除
下一页