架构 - MQ消息队列,rabbitmq,kafka
2025-11-25 20:08:57 14 举报AI智能生成
消息队列 - activemq, rabbitmq, kafka, redis 知识图谱,架构设计,中间件,主从,分布式,消息队列设计,
消息队列
架构设计
RabbitMQ
Kafka
中间件
模版推荐
作者其他创作
大纲/内容
MQ基本知识
MQ优点
1.解偶
本质把问题拆分为生产(1-n个生产者),消费(1-n个消费者)的概念
2.异步
3.削峰
4.解决高并发
MQ缺点
1.系统可用性降低
MQ一旦故障,系统A就没办法发送消息到MQ了,系统BCD也没法接收到消息。<b>下游流程没有办法运转了。</b>
2.导致系统致系统复杂性变高
系统A本来给系统B发送一条就可以,但是由于系统A和MQ协调问题,<b>系统A重复发送了两条数据给MQ。</b>
3.一致性问题
有人给A发送请求,本来这个请求ABCD都执行成功才能返回,结果D执行失败了,就导致整个请求给用户返回失败。<b>后台逻辑实际上差了一点需要解决。</b>
MQ调研对比
ActiveMQ
定位小规模吞吐量<br>
成熟,功能强大
偶尔丢失数据,维护越来越少
RocketMQ
吞吐量万级,消息队列<font color="#c41230"></font>
分布式扩展起来比较方便
java开发,源码可以研究点
如果阿里不支持了就黄了
RabbitMQ
万级吞吐量,MQ功能比较完备
用erlang开发,性能极好,延迟最低:微秒级(实时性更强)<br>
可靠性比较高,有队列模式,发布订阅模式,路由模式等方便业务。
社区相对比较活跃,开源提供管理界面很棒。
Redis
十万级别吞吐量,定位灵活的队列消息
多路复用 IO 模型,提升队列效率,使用简单。
基于内存,高效的数据结构。
场景的限制,如果需要更复杂的数据流处理,建议选择更适合的框架或者二次开发。
Kafka
百万万级吞吐量<br>
使用java语言开发,多节点broke的高吞吐量性能,多partition分区的高并发,多rep副本的高可用。
数据功能比较少。延迟一般:毫秒级(批量优化)<br>
扩展
Kafka 通过 Kafka Streams 和 ksqlDB 支持实时流处理,形成“流式数据平台”。<br>
典型能力:<br>窗口计算(如滑动窗口、会话窗口)。<br>状态管理(本地 RocksDB 存储状态)。<br>流表二元性(Table 与 Stream 的相互转换)。
Kafka 作为分布式日志存储系统,常用于数据管道(Data Pipeline)和实时数据集成。<br>
典型应用:<br>数据湖/仓的实时接入:通过 Kafka Connect 将数据同步到 HDFS、S3、数据库等。<br>CDC(变更数据捕获):捕获数据库变更日志(如 Debezium + Kafka)。
天然适合大数据的实时计算,数据流式处理。作为消息队列,结合流处理引擎可以实现高性能、可伸缩、可靠且低延迟的数据流处理框架。
如何保证mq消息不被重复消费(幂等性)?<br>
怎么保证幂等性
幂等性:一个数据或者一个请求,给你重复来多次,你得保证对应的数据是不会变的
根据业务来解决
基于Redis,生产者发送每条数据时,里面加一个全局唯一的id,类似订单id之类的东西。消费者先根据id去Redis里查一之前消费过么?<br>如果没有消费过,就正常处理,然后将id写到redis。<br>如果有消费过,那就不处理了,保证别重复处理相同的消息即可。<br>
kafka消费端出现非幂等
<b>每个消息都有一个offset,代表这个消息的顺序的序号</b><br>消费者从kafka消费的时候,是按照这个offset顺序去消费的<br>消费者会去提交offset,告诉kafka已经消费到offset=153的这条数据了,offset是存储在zookeeper里的<br>消费者系统被重启,重启以后,去kafka继续上次接着消费,<br>但是offset是定时定期提交一次,可能有几条数据消费了,但是没有告诉kafka就重启了<br>导致这几条数据在重启完后重新消费了一次
如何保证从mq拿出来的数据按照顺序执行?
场景:通过mysql binlog进行数据复制,新增,修改,删除操作时,必须保证顺序一致。<br>
RabbitMQ
<font color="#c41230">原因:rabbitMQ顺序不一致情况:一个queue多个消费者consumer</font><br>
<font color="#c41230">解决:设置多个queue,每个queue对应一个消费者,这种顺序操作的数据写入到一个queue里面去</font>
kafka
kafka能做到的保障:<br>1.写入partition中的数据一定是有顺序的。<br>
解决:<br>1.生产者对需要保证顺序的数据指定一个key,保证这些数据都写入同一个partition中。<br>2.消费者来说:一个partition只能被一个消费者消费。<br>
如何使用延迟队列?
生成订单之后,放入TTL队列设置超时时间。<br>
消息超过了有效期,没有人处理,成为死信
然后消费者处理交换机绑定的队列消息,判断是否支付等操作,进行解锁库存取消订单等操作。
消息队列满了以后该怎么处理<br>
场景:消费者故障。一天会有多少数据问题?怎么处理?<br>问题1:1个消费者1s 1000条,一秒2个消费者是2000条,一分钟是10万+条。1000多万+条积压数据。<br>问题2:所以如果你积压了几百万到上千万的数据,即使消费者恢复了,也需要大概1小时的时间才能恢复过来。<br>
rabbitMQ
如果消费程序没有异常,恢复硬件可以新建一个MQ,写到新建的MQ中进行正常消费,保证业务。
如果消费者故障问题没解决,可以先改下消费者代码,拿到消费者的数据直接扔掉或放在某个库里,先保证MQ中的磁盘空间<br>等晚上再临时写程序找到丢失的数据重新补回来。
批量重导:假设1万个订单积压在MQ里面,没有处理,其中1000个订单都丢了,<br>你只能手动写程序把1000个订单查出来,手动发到mq里再去补一次。<br>
kafka
1.先修复consumer消费者的问题,确保其恢复消费速度
2.对接当前topic,临时征用10倍的机器来部署consumer,每一批consumer消费partition数据。
3.积压的数据,消费之后不做耗时的处理。或者考虑使用临时的topic和直接均匀轮训写入到临时建立好的topic里的10倍数量的partition里<br>
RabbitMQ篇
基本结构
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue
Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
消息发布接收
发送消息
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道Channel。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息根据规则转发到指定的Queue。
接收消息
1、消费者和Broker建立TCP连接
2、消费者和Broker建立Channel
3、消费者监听指定的Queue
4、当有消息到达Queue时,Broker默认将消息推送给消费者。Comsumer收到消息。
工作模式
work queues(工作队列模式)
生产者发送消息到一个工作队列,多个消费者同时从队列中接收消息并进行处理。
一条消息只会被一个消费者接收;可以有多个消费者同时处理队列中的消息。
可以设置消息的确认机制和fair dispatch(公平调度)来保证消息处理的可靠性和均衡性。
特点:一个生产者,一个queue,多个消费者也智能在唯一的queue取到的消息唯一<br>
publish/subscribe(发布订阅模式)
生产者发送消息到一个被称为"交换器(Exchange)"的组件,然后交换器将消息广播到绑定在上面的所有队列。
一条消息只会被多个queue接收,一个queue有多个consumer,publish面向交换机发送消息;<br>
通过交换器(exchange)实现消息分发到多queue,每个queue都有自己的消费者进行处理;<br>
特点:一个生产者,发送的消息会被多个queue获取。忽略 routing key。
Routing(路由模式)
Routing模式要求队列在绑定交换机时要指定routingkey,消息会转发到符合routingkey的队列
特点:在发布/订阅基础上,增加消息过滤功能,更精确地控制消息流向。
Topics(通配符模式)
队列绑定交换机指定通配符,和路由模式类似,但支持模式匹配(wildcards),如 *(匹配一个词)和 #(匹配多个词)。
统配符规则:中间以“.”分隔。
headers(头部模式)<br>
根据消息的 header 属性进行匹配,而不是 routing key。
RabbitMQ 注意事项<br>
如果让你来开发一个消息队列中间件,如何设计架构?
1.首先MQ得支持扩容吧,所以可以设计一个分布式的MQ,参考kafka的设计理念,broker->topic->partition,每个partition放一个机器,就存一部分数据。等资源不够了,直接给topic增加partition,然后做数据迁移,增加机器,就可以存放更多数据,增加吞吐量。
2.考虑MQ的数据要持久化,落磁盘的数据要顺序写,这样就没有了磁盘随机读写的寻址开销,性能更高。kafka思路
3.mq可用性。参考kafka,多个副本 leader&follower->broker 挂了重新选举leader即可对外服务
上面整体的总结
消息持久化<br>
场景:防止消息因节点重启或崩溃丢失。<br>生产者设置 delivery_mode=2(持久化消息)。<br>队列声明时设置 durable=true(持久化队列)。<br>交换机声明时设置 durable=true(持久化交换机)。<br><b>注意:仅消息持久化无法完全避免丢失,需结合磁盘写入机制(如刷盘策略)。</b>
消息确认机制(ACK)<br>
手动确认(Manual Ack):消费者处理完消息后需显式发送 basic_ack,避免消息丢失。<br>自动确认(Auto Ack):仅适合非关键任务,可能因消费者故障导致消息丢失。
消费者端的可靠性<br>
实现幂等性:通过唯一ID(如业务主键)避免重复处理。<br>设置合理的 prefetch_count(QoS),防止单个消费者过载。<br>消费者重试策略:结合死信队列(Dead Letter Exchange, DLX)处理失败消息。
资源管理<br>
避免队列无限增长:设置 x-max-length(队列最大消息数)或 x-message-ttl(消息存活时间)。<br>监控连接和通道泄漏:定期检查未关闭的客户端连接。
网络与超时<br>
配置合理的心跳(heartbeat)时间(默认60秒),避免网络波动导致连接断开。<br>客户端需处理连接断开重连逻辑。
Kafka篇
基本结构
组件架构
服务协调
使用 Zookeeper
Zookeeper是Kafka集群的协调者,负责管理Broker的状态、Partition的分配和偏移量的管理。
Kafka依赖Zookeeper来实现集群的协调和一致性。
生产者Producer
中间人Broker
broker就是kafka server,每一台kafka服务器都是一个brocker<br>
broker内部有物理的分区partiton,用作高并发(提高吞吐量)<br>
broker之间有数据副本follower,用作高可用<br>
消费者Consumer
Kafka 使用消费者组(Consumer Group)来管理多个消费者的消费行为。<br>每个消费者属于一个消费者组,而同一个消费者组中的消费者会分区地共享消息。<br>
如果有多个消费者在一个消费者组中,Kafka 会将主题的分区分配给这些消费者。<br>
每个分区只会被分配给一个消费者(在同一个消费者组中),以确保消息的顺序性。<br>
如果消费者数量超过分区数量,多余的消费者将处于空闲状态,不会消费任何消息。<br>
核心设计
topic
<b>topic是kafka的基础单元,是一个<i>逻辑概念</i>。</b>数据实际写入的是物理的partition,一个topic包含多个partition。<br>
每条消息属于且仅属于一个Topic <br>
发送和订阅消息都必须指定topic
broker<br>
<b>Kafka集群中的每个服务器节点都是一个Broker,负责存储和管理消息数据。</b><br>
每个Broker可以容纳多个Partition。
每个Partition又可以分为多个Segment。
partition
partition中消息持久化时,每条消息都是根据一定的分区规则路由到对应的partition中,并append到log文件的尾部,提<b>高并发</b>。<br>
partition的副本可以被分布在不同的broker上,某个broker的故障不会影响到整个主题的<b>可用性</b>。<br>
partition中消息是顺序写入磁盘且有序的,但不同partiton之间不能保证消息的有序性,提<b>高效率</b>。<br>
partition个数最好与服务器个数相当<br>
副本 replication
partition可以有指定数据的副本。主从模式producer和consumer只与leader交互。<br>
replication follower从leader复制数据
Kafka会在Zookeeper上针对每个Topic维护一个in-sync replica(ISR)已同步的副本。<br>如果某个分区的Leader不可用,Kafka就会从ISR集合中选择一个副本作为新的Leader。
setment file
segment file (多个大小相等的段) 组成了一个partition,每个partition 就相当于一个巨型的文件<br>
segment file 的消息数量并不一定相等
组成
.index 索引文件
包含若干索引条目,每个条目表示数据文件中一条message的索引<br>
.log 数据文件<br>
offset
位移
partition中的每个消息都有一个连续的序号,用于partition标识唯一的ID序号消息。<br>Offset记录着下一条将要发送给Consumer的消息的ID序号。<br>Offset从语义上来看拥有两种:Current Offset和Committed Offset。 <br>
Current Offset
<b>Current Offset保存在Consumer中,它表示Consumer希望收到的下一条消息的序号。它仅仅在poll()方法中使用。</b><br>例如,Consumer第一次调用poll()方法后收到了20条消息,那么Current Offset就被设置为20。<br>这样Consumer下一次调用poll()方法时,Kafka就知道应该从序号为21的消息开始读取。这样能够保证每次Consumer poll消息时,收到不重复的消息。 <br>
Committed Offset <br>
<b>已提交位移,保存在Broker上,表示Consumer已经确认消费过的消息的序号。</b><br>举个例子,Consumer通过poll() 方法收到20条消息后,此时Current Offset就是20,经过一系列的逻辑处理后,并没有调用consumer.commitAsync()或consumer.commitSync()来提交Committed Offset,那么此时Committed Offset依旧是0。 <br>
总结
Current Offset是针对Consumer的poll过程的,它可以保证每次poll都返回不重复的消息;<br>Committed Offset能够保证新的Consumer能够从正确的位置开始消费一个partition,从而避免重复消费。 <br>
生产者Producer
self.producer.send(topic=kafka_topic_name, value=message.encode('utf-8'), key=key, partition=partition)<br>
消费者Consumer
self.consumer.poll(self, timeout_ms=0, max_records=None, update_offsets=True)<br>
Kafka 注意事项 <br>
1. 合理规划分区数<br>
分区数直接影响吞吐量和并行度:分区越多,吞吐量越高,但也会增加 ZooKeeper 的负担和客户端元数据开销。<br><br>分区数一旦增加无法减少:需提前预估业务增长,避免后期调整困难。<br><br>建议:根据目标吞吐量、消费者数量和硬件资源综合评估,通常从较小分区开始(如 6-10),逐步扩展。
2. 生产者确认机制(acks)<br>
acks=0:生产者不等待 Broker 确认,吞吐量最高,但可能丢失数据。<br><br><b>acks=1:Leader 副本写入即确认,平衡了可靠性和性能。<br></b><br>acks=all:所有 ISR(In-Sync Replicas)副本写入后才确认,数据最可靠,但延迟较高。<br><br>建议:根据业务容忍度选择。金融场景建议 acks=all,日志收集可降低要求。<br>
经典场景
设置acks=1:partition0 中有一个leader, 两个follower, 在leader写入后,就算写入成功,生产者会收到确认。<br>
设置acks=2:如果设置`min.insync.replicas=2`,那么当acks=all时,必须至少有2个副本(Leader和1个Follower)写入成功,生产者才会收到确认。<br>
3. 消费者偏移量(Offset)管理<br>
自动提交(enable.auto.commit=true):可能导致重复消费(如消费者崩溃后未提交 Offset)。<br><br>手动提交:在消息处理完成后手动提交 Offset,但需处理重复和遗漏问题。<br><br>建议:结合 enable.auto.commit=false 和手动提交,确保 Exactly-Once 语义(如配合事务)。
4. 消息顺序性有吗?<br>
Kafka 仅保证分区内消息顺序:同一分区的消息按写入顺序消费。<br><br>全局顺序性需单分区:若需全局有序,Topic 只能有 1 个分区,但会限制吞吐量。<br><br>建议:通过业务键(如用户 ID)作为key,将相关消息路由到同一分区,局部有序。<br>
典型应用场景<br>
用户行为跟踪<br>需求:同一用户的 登录 -> 浏览 -> 下单 动作必须按顺序处理。<br>实现:以 user_id 为键,确保用户行为序列进入同一分区,消费者按顺序处理。<br><br>实时会话管理<br>需求:聊天消息按 发送时间 顺序展示。<br>实现:以 session_id 为键,同一会话的消息集中处理。
注意事项与优化<br>
消费者并发处理:<br>挑战:单个分区只能由一个消费者线程处理,高吞吐场景可能成为瓶颈。<br>优化方案:<br>分区内多线程处理:在消费者内部按子键(如 user_id 的尾号)进一步拆分,但需业务端保证顺序。<br>
数据倾斜问题:<br>现象:某些键(如热门用户)数据量过大,导致对应分区成为瓶颈。<br>解决方案:<br>键加盐:将 user_id 拼接随机后缀(如 user123_01),分散数据。<br>二级分区:组合键(如 user_id + date),按日期拆分热点。<br>
5. 多个kafka的消费者能同时消费一个topic吗?<br>
可以的,如果有多个消费者在一个消费者组中,Kafka 会将主题Topic的分区分配给这些消费者。(消费者数量 ≤ 分区数量)<br><br>同一消费者组内的消费者不能重复消费消息(即每个消息只会被该组内的一个消费者处理)。<br><br>不同的消费组之间互相不干扰,各个有各自的id记录消费顺序。<br><br>相同key的data应该在同一个partition中,相同的partition 中的消费顺序一致,不同的partition中没有前后顺序。<br>
典型场景
负载均衡:多个消费者可以并行处理消息,提升吞吐量。<br>
高可用性:如果某个消费者宕机,其负责的分区会被重新分配给其他存活的消费者。<br>
6. 数据保留与清理策略<br>
基于时间(retention.ms):默认 7 天,超时数据自动删除。<br><br>基于大小(retention.bytes):限制 Topic 总大小。<br><br>压缩策略(cleanup.policy=compact):保留每个键的最新值,适用于状态更新场景(如数据库变更捕获)。<br><br>建议:根据数据重要性配置保留策略,避免磁盘爆满。
7. 副本与高可用性<br>
ISR 机制:Leader 副本和所有同步的 Follower 副本组成 ISR 列表。<br><br>min.insync.replicas:控制写入成功所需的最小 ISR 副本数(如设为 2,则至少 2 个副本写入成功)。<br><br>建议:生产环境至少部署 3 个 Broker,设置 replication.factor=3 和 min.insync.replicas=2。
8. 监控与运维<br>
关键指标:Broker 的 CPU/磁盘 IO、分区 Leader 分布、消费者 Lag(未消费消息数)。<br><br>工具:Kafka Manager、Prometheus + Grafana、Confluent Control Center。<br><br>建议:设置告警阈值(如消费者 Lag 超过 1 万条时触发告警)。<br>
9. 应用层python包的区别
confluent_kafka 和 kafka-python 用于生产者时,由于它们的底层架构(C扩展 vs 纯Python)和API设计不同
底层架构 confluent_kafka基于 librdkafka (C库) 的绑定;kafka-python基于纯 Python 实现;confluent 性能更高,但安装可能需C编译环境。
发送模式 confluent 必须处理回调来确认消息状态纯异步;kafka-python同步/异步可选 。
错误处理 confluent 的错误是异步报告的,更容易被忽略。kafka-python错误同步:捕获异常;异步:通过回调或Future
保证mq高可用
RabbitMQ集群模式
主备集群模式
每个节点上都有queue的完整镜像,包含了queue的全部数据<br>
优点
能够从多个机器上去消费提高消费的吞吐量而已
缺点
如果queue所在的节点宕机了,可用性几乎没有什么保障,就导致该queue的数据丢失了;数据量大的时候,高并发问题
镜像集群模式<br>
每个节点上都有queue的完整镜像,包含了queue的全部数据<br>
优点:任何一个节点宕机了,高可用。其他节点上还包含了这个queue的完整数据,别的consumer都可以到其他活着的节点上去消费
<font color="#c41230">缺点:不是分布式,</font>如果queue的数据量很大,大到机器上的容量无法容纳了,此时该怎么办呢
如何开启镜像集群模式:在管理控制台,后台新增一个策略。这个策略就是镜像集群模式,指定的时候可以要求数据同步到所有节点的,也可以要求同步到指定数量节点,然后再次创建queue的时候应用这个策略,就会自动将数据同步到其他节点上去了
分布式集群
集群分布式无主架构:弹性扩缩容,容量无上限。<br>
Kafka集群高可用架构
强大天生的分布式系统:每台机器上的broker进程,就可以认为是kafka集群中的一个节点<br>
每个节点存储一部分topic的partition<br>
每个节点可以设置多个副本,选举一个为leader,其他副本为follower<br>生产者只能往leader里写数据,写入数据到leader的时候,leader就会将数据同步到follower上去
保证mq的可靠性
RabbitMQ
1.生产者丢失数据
写消息过程中,消息都没到rabbitmq在网络传输过程中就丢了
解决
方案1:commit事务
使用rabbitMQ提供的事务功能,发送数据前开启rabbitmq事务(<b>channel.txSelect</b>),然后发送消息。<br>如果没有被rabbitMQ接收到消息,生产者会收到异常报错,此时就可以回滚事务(<b>channel.txRollback</b>),然后重新发送消息;<br>如果收到了消息,那么可以<font color="#121212"><b>channel.txCommit</b> 用于提交事务</font>;<br>
缺点:这个事务机制是同步的,生产者发送消息会阻塞卡住等待成功,会导致生产者发送消息的吞吐量降下来
方案2:confirm确认
<font color="#c41230">先把channel设置成confirm模式,</font>发送一个消息,发送完消息后就不用管了。<br><font color="#c41230">rabbitmq如果接收到这条消息,就会回调你生产者本地的一个接口,通知说这条消息已经收到了;<br>rabbitmq如果在接收消息的时候报错了,就会回调你的接口告诉这个消息接收失败了,你可以再次重发;</font>
生产者这块如果要保证消息不丢,一般是用confirm机制,异步的模式,你发送消息之后不会阻塞
2.rabbitMQ丢失数据
rabbitMQ将数据暂存在自己的内存里,结果消费者还没来得及消费,rabbitmq挂掉了,就导致暂存在内存里的数据丢失
解决
rabbitMQ开启持久化
1.创建queue的时候设置queue为持久化,但是queue里的消息不会持久化
2.发送消息msg的时候将msg设置为持久化的,deliveMode设置为2
<font color="#121212">必须要同时设置这两个持久化才行,rabbitmq哪怕是挂了,再次重启也会从磁盘上重启恢复queue,恢复这个queue里的数据。</font><br>
3.消费者丢失数据<br>
打开了autoAck机制,消费者消费到了这个消息,但是没处理就挂掉了,rabbitmq以为这个消费者已经处理完自动ack了。
解决
在消息消费完后,手动ack通知数据成功消费。
在应用端,用逻辑或者约束实现幂等来保证一致性。
4.消息的持久化
存储
RabbitMQ的消息可以存储在磁盘,但会增加开销
删除
消费者从queue获取消息后,ack后消息自动删除
不需要考虑清楚消息的问题
Kafka
生产者会不会丢数据?
生产者确认机制(acks)<br>
如果按照上面的配置设置acks=all,一定不会丢
acks=0:生产者不等待 Broker 确认,吞吐量最高,但可能丢失数据。<br><b>acks=1:Leader 副本写入即确认,平衡了可靠性和性能。</b><br>acks=all:所有 ISR(In-Sync Replicas)副本写入后才确认,数据最可靠,但延迟较高。<br>
建议:根据业务容忍度选择。金融场景建议 acks=all,日志收集可降低要求。
kafka丢失数据
生产者将数据传给了主节点 partition1 leader,但是leader还没有把数据同步到自己的从节点partition1 follower就挂掉了。<br>经过重新指定leader后,新leader里没有这个未同步的数据。
解决
1.给topic设置replication.factor参数:这个值必须大于1,要求每个partition必须有至少2个副本
2.在kafka服务器端设置min.insync.replicas参数:这个值必须大于1,这是要求一个leader至少感知到至少有一个follower还跟自己保持联系,没掉队
3.在producer(生产者)端设置 acks=all:这个是要求每条数据,必须写入所有replica之后,才能认为是写成功了。
acks=0:发送完数据就不管了。
acks=1:发送完数据只要leader接收到就算成功,默认设置这个比较好
acks=all:必须所有分片都同步完数据才算成功
消费者丢失数据
消费者消费到消息,自动提交offset,让kafka以为你已经消费好了消息。但是实际上你刚拿到这个消息还没处理就挂了,导致数据丢失
解决
取消自动offset,进行手动offset,彻底处理完数据再进行offset传输
但是容易出现消费完消息还没提交offset就挂了的情况,导致重复消费
根据实际情况保证幂等性就行
消息的持久化
存储
key value 这种消息存储在不通节点broker的分区partition
删除
消费者消费了消息后,commit后消息仍然存在
可以通过设置 retention 来控制数据的过期和删除
通过过期时间
通过过期数量
收藏
立即使用
收藏
立即使用
收藏
立即使用
收藏
立即使用
Collect
Get Started
Collect
Get Started
Collect
Get Started
Collect
Get Started
评论
0 条评论
下一页