MQ整理心得
2021-06-24 17:50:14 0 举报
AI智能生成
RocketMQ知识整理
作者其他创作
大纲/内容
综述
什么是消息队列
本质是两个进程传递消息的一种方法,
两个进程可以分布在同一台机器上也可以分布在不同机器上面
两个进程可以分布在同一台机器上也可以分布在不同机器上面
为什么需要消息队列
1、削峰填谷
2、程序间解耦
3、异步处理
4、数据的最终一致性
常见的消息队列
ActiveMQ
1、支持多协议
2、消息持久化的Jdbc
宕机:自动切换
kafka
1、超高写入速率
2、end-to-end 耗时毫秒级
宕机:自动选主
RocketMQ
1、万亿级消息支持
2、万级Topic数量支持
3、end-to-end 耗时毫秒级
宕机:手动重启
RocketMQ的发展史
前世
2001年,阿里巴巴内部为适应淘宝B2C更快、更复杂的业务,启动了“五彩石项目”,第一个消息队列服务产生
2010年,阿里巴巴内部为应对于海量消息对接、顺序消费、完全自主控制消息队列服务,以此背景下,在2011年,MetaQ诞生
云化
2011年,Kafka开源,在2012年阿里巴巴参考kafka的设计,与MetaQ的理解和实际使用,研发了一套通用的消息队列引擎,也就是现在的RocketMQ
2016年,阿里云上线云RocketMQ消息队列服务
毕业
2016年11月,阿里巴巴将RocketMQ捐献给Apache基金会
2017年9月25日,RocketMQ成功在Apache中毕业,成为Apache的顶级项目
RocketMQ的生产者原理和最佳实践
生产者
生产者概述
发送消息的一方被称为生产者
生产者组:一个逻辑概念,在使用生产者实例的时候,需要指定一个组名。一个生产者可以生产多个topic消息
生产者实例:一个生产者组部署了多个进程,每个进程都可以称为一个生产者实例。
Topic:主题名字,一个Topic由若干Queue组成。
RocketMQ客户端的生产者有两个独立的实现类
1、com.alibaba.rocketmq.client.producer.DefaultMQProducer
用于生产普通消息、顺序消息、单向消息、批量消息、延迟消息
2、org.apache.rocketmq.client.producer.TransactionMQProducer
用于生产事务消息
消息结构和消息类型
消息结构
Topic
主题名字,可以通过RocketMQ控制台创建
Properties
消息扩展信息,tag、keys、延迟级别都保存在这边
Body
消息体,字节数组。需要注意编码格式(生产者与消费者的编码格式需要统一)
Keys
设置消息的Key,可以多个Key,帮助用户快速查询使用的
Tags
消息过滤标记,用户可以订阅某个Topic下的某些Tag,这样Broker只会把订阅了Topic-tag的消息发送给消费者
DelayTimeLevel
设置延迟级别、延迟多久消费者可以消费
UserProperty
其它扩展信息。(内部是一个Map)
消息类型
普通消息
并发、无序、单机性能可达10W级别TPS
SendResult sendResult = producer.send(msg);
分区有序消息
把Topic消息分成多个分区,每个分区遵循FIFO(先进先出)的原则
根据 sharding key 进行区块分区
根据 sharding key 进行区块分区
SendResult sendResultRMQ = this.defaultMQProducer.send(msgRMQ, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg,
Object shardingKey) {
int select = Math.abs(shardingKey.hashCode());
if (select < 0) {
select = 0;
}
return mqs.get(select % mqs.size());
}
}, shardingKey);
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg,
Object shardingKey) {
int select = Math.abs(shardingKey.hashCode());
if (select < 0) {
select = 0;
}
return mqs.get(select % mqs.size());
}
}, shardingKey);
全局有序消息
把Topic消息分区数设置为1,那么消息就是单分区,所有消息都遵循FIFO(先进先出)的原则
延迟消息
消息发送后,消费者要在一定时间后/指定时间点才可以消费,在没有延迟消费消息时,基本作法是基于定时计划任务调度,定时发送消息。在RocketMQ中只需要在发送消息时设置 延迟级别即可实现
RocketMQ 预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h
例1:
Message msg1 = new Message("TOPIC","消息体".getBytes());
msg1.setDelayTimeLevel(2);//延迟5秒
Message msg1 = new Message("TOPIC","消息体".getBytes());
msg1.setDelayTimeLevel(2);//延迟5秒
例2:
Message msg1 = new Message("TOPIC","消息体".getBytes());
msg1.setDelayTimeLevel(4);//延迟30秒
Message msg1 = new Message("TOPIC","消息体".getBytes());
msg1.setDelayTimeLevel(4);//延迟30秒
事务消息
主要涉及分布式事务,即保证多个操作同时成功/失败时,才能消费消息。
RocketMQ通过发送Half消息、处理本地事务、提交(Commit)/回滚(RollBack)消息,实现分布式事务
RocketMQ通过发送Half消息、处理本地事务、提交(Commit)/回滚(RollBack)消息,实现分布式事务
1、producer 发送half message到broker中;
2、broker接收到half message后给producer发送成功的发聩,这时,half message才算真正生成完了;
3、producer执行本地事务;
4、producer根据第3步得到执行本地事务的结果,向MQ进行二次确认(到底是commit还是roll back)。如果是commit就将这个half message变为消费可以消费的消息,此时消费者接收到这条half message。如果是roll back,那么MQ就将这条half message丢弃掉;
5、如果MQ在第4步的时候,拿不到producer执行完本地事务的结果,那么告知producer要进行回查;
6、producer 查询执行本地事务结果;
7、producer根据第6步返回的结果再次执行第4步;
2、broker接收到half message后给producer发送成功的发聩,这时,half message才算真正生成完了;
3、producer执行本地事务;
4、producer根据第3步得到执行本地事务的结果,向MQ进行二次确认(到底是commit还是roll back)。如果是commit就将这个half message变为消费可以消费的消息,此时消费者接收到这条half message。如果是roll back,那么MQ就将这条half message丢弃掉;
5、如果MQ在第4步的时候,拿不到producer执行完本地事务的结果,那么告知producer要进行回查;
6、producer 查询执行本地事务结果;
7、producer根据第6步返回的结果再次执行第4步;
生产者高可用
1、客户端保证
重试机制
可以配置失败后重试,如果单个Broker发生故障,重试会选择其它Broker保证消息正常发送
配置项【retryTimesWhenSendFaild(同步)】\【retryTimesWhenSendAsyncFailed(异步)】,
表示同步重试次数,默认【2】次,加上正常发送1次,总共有【3】次发送机会
表示同步重试次数,默认【2】次,加上正常发送1次,总共有【3】次发送机会
保证机制
RocketMQ Client会维护一个 【Broker-发送延迟】列表,发送时选择延迟较低的Broker,剔除已经宕机、不可用的Broker,保证消息正常发送
发送延迟容错开关【sendLatencyFaultEnable】
开
1、获取自增号Index,通过[取模(取余)]获取Queue的位置下标,获取对应的Broker的延迟,
并且是第一次或者和上一次发送的Broker相同的,则选中
并且是第一次或者和上一次发送的Broker相同的,则选中
2、如果第一步没有选中一个Broker,则选择一个延迟较低的Broker
3、如果第一步、第二步都没有选中一个Broker,则随机选中一个Broker
1、如果没有上一次的Broket作为参考,那就随机一个Broker
2、如果有上一次的broker,则选择非上一次使用的Broker
3、打底逻辑为随机选一个
关
2、Broker端保证
数据同步方式保证
(Master Broker ->Slave Broker)
(Master Broker ->Slave Broker)
同步复制
异步复制
生产者启动流程
DefaultMQProducer
RocketMQ中的默认生成者实现,用于生产普通消息、顺序消息、单向消息、批量消息、延迟消息
核心属性
namesrvAddr
继承ClintConfig,表示RocketMQ集群的Namesrv地址,如果是多个则用“分号”分割。
如:
192.168.0.1:9876;192.168.0.2:9876
192.168.0.1:9876;192.168.0.2:9876
clientIP
客户端程序所在机器的IP地址。(支持IPV4/IPV6)(注意:Docker容器中,获取的是Doker容器的IP,非宿主机的IP)
instanceName
实例名,实例名需要唯一,重复会导致启动失败
vipChannelEnabled
boolean,表示是否开启VIP通道(VIP通道区别在通讯时使用的端口不同)
clientCallBackExecutorThreads
客户端回调线程数。表示Netty通讯层回调线程的个数,默认为Runtione,getRuntime().availableProcessors()->表示当前CPU的有效个数
pollNameServerInterval
获取Topic路由信息的间隔时长,单位ms,默认30000ms
heartbeatBrokerInterval
与Broker心跳的间隔时长,单位ms,默认30000ms
defaultMQProducerImpl
默认生产者的实现类,封装了Broker的各种API。支持自定义实现一个生产者
producerGroup
生产者组名
sendMsgTimeout
发送超时时间,单位ms
compressMsgBodyOverHowmuch
消息体的容量上线,超过改上限时消息体会通过ZIP进行压缩,默认为4MB
retryTimesWhenSendFailed
同步发送失败后重试次数,默认2次,连本身1次,共3次机会
retryTimesWhenSendAsyncFailed
异步发送后重试次数,默认2次,连本身1次,共3次机会
start()
启动生产者实例的入口
shutdown()
关闭本地已注册的生产者,关闭已注册到的Broker的客户端
fetchPublishMessageQueues(Topic)
获取Topic中有哪些Queue,在发送消息
send(Message msg)
同步发送消息
send(Message msg ,long timeout)
同步发送普通消息(超时设置)
send(Message msg,SendCallback sendCallback)
异步发送普通消息
send(Message msg,SendCallback sendCallback,long timeout)
异步发送普通消息(超时设置)
sendOneway(Message msg)
发送单向消息。只负责发送消息,不管发送结果
send(Message msg,MessageQueue mq)
同步向指定队列发送消息
send(Message msg,MessageQueue mq,long timeout)
同步向指定队列发送消息(设置超时时间)
send(Message msg,MessageQueue mq ,SendCallback sendCallback)
异步发送消息到指定队列
send(Message msg,MessageQueue mq ,SendCallback sendCallback,long timeout)
异步发送消息到指定队列(设置超时时间)
send(Message msg,MessageQueueSelector selector,MessageQueue mq ,SendCallback sendCallback)
自定义消息发送到指定队列。通过实现MessageQueueSelector接口现在发送到哪个队列
send(Collection<Message> msgs
批量发送消息
createTopic(String key,String newTopic,int queueNum)
创建Topic
viewMessage(String offsetMsgId)
根据消息id查询消息
生产者启动流程
1、DefaultMQProduct
1、构造函数初始化实例->start()
2、启动生产者实例->start()
4.1、抛出异常程序退出
2、DefaultMQProductImpl
3、启动生产者实实现类
4、判断当前生产者服务状态
是否CREATE_JUST
是否CREATE_JUST
否->4.1
5、将生产者状态设置启动失败
6、生产者参数校验
registerProducter()->6.1
7、注册本地路由信息
start() ->7.1
3、MQClientInstance
6.1、注册生产者
7.1、置为服务启动失败状态
8、如果没有配置Namesrv,远程获取
9、启动通讯模块服务
10、启动各种定时任务
11、启动消息拉取服务
12、启动RebalanceService服务
13、启动Push服务
14、置为服务启动状态
消息发送流程
客户端发送消息分为3层
业务层
调用RocketMQ Client 发送API业务代码
消息处理层
RocketMQ Client获取业务发送的消息对象后,一系列的参数检查、消息发送准备、参数包装等操作
通讯层
基于Netty封装的一个RPC通信服务,RocketMQ的各个组件之间的通讯全部使用改通信层
发送消息流程
业务层
1、准备消息对象->send()
DefaultMQProducer
2、准备消息对象-> send()
DefaultMQProducerImpl
3、调用发送方法 -> send()
4、设置超时时间->sendDefaultImpl()
5、设置同步发送->sendKernelImpl()
6、主要的发送逻辑
6.1、发送前check
6.2、选择Queue发送
6.3、可靠发送
6.3.1->8.1
6.4、发送结果处理
6.5、
sendMessageAsync()
sendMessageSync()
sendMessageAsync()
sendMessageSync()
6.5.1->8.2
7、发送Request封装
MQClientAPIImpl
8.1、invokeOneway()
8.2、invokeAnsy()
invokeSync()
invokeSync()
RemotingClient
9、
发送网络请求
发送消息
发送网络请求
发送消息
发送消息样例
发送普通消息
//发送普通消息
public static void main(String[] args) throws Exception {
//第一步 设置生产者
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//第二步 初始化MQ地址
producer.setNamesrvAddr("Addresses");
producer.setRetryTimesWhenSendFailed(2);
producer.start();
//第三步 发送消息
Message msg = new Message("Topic", "TagA", "searchKeys", "body".getBytes());
SendResult sendResult = producer.send(msg);
System.out.println(sendResult.toString());
//第四步 关闭MQ
producer.shutdown();
}
public static void main(String[] args) throws Exception {
//第一步 设置生产者
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//第二步 初始化MQ地址
producer.setNamesrvAddr("Addresses");
producer.setRetryTimesWhenSendFailed(2);
producer.start();
//第三步 发送消息
Message msg = new Message("Topic", "TagA", "searchKeys", "body".getBytes());
SendResult sendResult = producer.send(msg);
System.out.println(sendResult.toString());
//第四步 关闭MQ
producer.shutdown();
}
发送延迟消息
Message msg = new Message("Topic", "TagA", "searchKeys", "body".getBytes());
msg.setDelayTimeLevel(2);
msg.setDelayTimeLevel(2);
不同延迟等级表
1=1s 2 = 2s 3=10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
发送顺序消息
//发送顺序消息
public static void main2(String[] args) throws Exception {
//第一步 设置生产者
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//第二步 初始化MQ地址
producer.setNamesrvAddr("Addresses");
producer.setRetryTimesWhenSendFailed(2);
producer.start();
int hashKey = 123;
//第三步 发送消息
Message msg = new Message("Topic", "TagA", "searchKeys", "body".getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object hashkeyArg) {
int id = (int) hashkeyArg;
int index = id % mqs.size();
return mqs.get(index);
}
}, hashKey);
System.out.println(sendResult.toString());
//第四步 关闭MQ
producer.shutdown();
}
public static void main2(String[] args) throws Exception {
//第一步 设置生产者
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//第二步 初始化MQ地址
producer.setNamesrvAddr("Addresses");
producer.setRetryTimesWhenSendFailed(2);
producer.start();
int hashKey = 123;
//第三步 发送消息
Message msg = new Message("Topic", "TagA", "searchKeys", "body".getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object hashkeyArg) {
int id = (int) hashkeyArg;
int index = id % mqs.size();
return mqs.get(index);
}
}, hashKey);
System.out.println(sendResult.toString());
//第四步 关闭MQ
producer.shutdown();
}
发送事务消息
public static void main(String[] args) throws Exception {
//第一步 初始化生产者
TransactionMQProducer producer = new TransactionMQProducer("ProducerGroupName");
//第二步 初始化MQ地址,配置生产者的各个参数和Broker回调,检查本地事务处理并启动生产者
producer.setNamesrvAddr("Addresses");
producer.setRetryTimesWhenSendFailed(2);
producer.setCheckThreadPoolMinSize(2);
producer.setCheckThreadPoolMaxSize(2);
producer.setCheckRequestHoldMax(200);
producer.setTransactionCheckListener(new TransactionCheckListener() {
private AtomicInteger tramsactionIndex = new AtomicInteger(0);
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
System.out.println(msg);
int value = tramsactionIndex.getAndIncrement();
if ((value % 6) == 0) {
throw new RuntimeException("Could not find db");
} else if ((value % 5) == 0) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if ((value % 4) == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
});
producer.start();
//第三步:设置本地事务处理器,发送消息
Message msg = new Message("Topic", "TagA", "searchKeys", "body".getBytes());
SendResult sendResult = producer.sendMessageInTransaction(msg, new LocalTransactionExecuter() {
private AtomicInteger tramsactionIndex = new AtomicInteger(0);
@Override
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
int value = tramsactionIndex.getAndIncrement();
if ((value % 6) == 0) {
throw new RuntimeException("Could not find db");
} else if ((value % 5) == 0) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if ((value % 4) == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
}, null);
System.out.println(sendResult.toString());
}
public static void main(String[] args) throws Exception {
//第一步 初始化生产者
TransactionMQProducer producer = new TransactionMQProducer("ProducerGroupName");
//第二步 初始化MQ地址,配置生产者的各个参数和Broker回调,检查本地事务处理并启动生产者
producer.setNamesrvAddr("Addresses");
producer.setRetryTimesWhenSendFailed(2);
producer.setCheckThreadPoolMinSize(2);
producer.setCheckThreadPoolMaxSize(2);
producer.setCheckRequestHoldMax(200);
producer.setTransactionCheckListener(new TransactionCheckListener() {
private AtomicInteger tramsactionIndex = new AtomicInteger(0);
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
System.out.println(msg);
int value = tramsactionIndex.getAndIncrement();
if ((value % 6) == 0) {
throw new RuntimeException("Could not find db");
} else if ((value % 5) == 0) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if ((value % 4) == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
});
producer.start();
//第三步:设置本地事务处理器,发送消息
Message msg = new Message("Topic", "TagA", "searchKeys", "body".getBytes());
SendResult sendResult = producer.sendMessageInTransaction(msg, new LocalTransactionExecuter() {
private AtomicInteger tramsactionIndex = new AtomicInteger(0);
@Override
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
int value = tramsactionIndex.getAndIncrement();
if ((value % 6) == 0) {
throw new RuntimeException("Could not find db");
} else if ((value % 5) == 0) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if ((value % 4) == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
}, null);
System.out.println(sendResult.toString());
}
发送单向消息
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("Addresses");
producer.setInstanceName("instance name");
producer.start();
Message msg = new Message("Topic", "TagA", "searchKeys", "body".getBytes());
producer.sendOneway(msg);
}
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("Addresses");
producer.setInstanceName("instance name");
producer.start();
Message msg = new Message("Topic", "TagA", "searchKeys", "body".getBytes());
producer.sendOneway(msg);
}
批量消息发送
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("Addresses");
producer.setInstanceName("instance name");
producer.start();
List<Message> messages = Lists.newArrayList();
messages.add(new Message("Topic", "TagA", "searchKeys", "body1".getBytes()));
messages.add(new Message("Topic", "TagA", "searchKeys", "body2".getBytes()));
messages.add(new Message("Topic", "TagA", "searchKeys", "body3".getBytes()));
producer.send(messages);
producer.shutdown();
}
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("Addresses");
producer.setInstanceName("instance name");
producer.start();
List<Message> messages = Lists.newArrayList();
messages.add(new Message("Topic", "TagA", "searchKeys", "body1".getBytes()));
messages.add(new Message("Topic", "TagA", "searchKeys", "body2".getBytes()));
messages.add(new Message("Topic", "TagA", "searchKeys", "body3".getBytes()));
producer.send(messages);
producer.shutdown();
}
RocketMQ的消费组
消费者概述
消费者一般指获取消息、转发消息给业务代码处理的一系列代码实现。
消费者组:一个逻辑概念,在使用消费者时徐璈指定一个组名。一个消费者组可以订阅多个Topic
消费者实例:一个消费者组程序部署了多个进程,每个进程都可以称为一个消费者实例
订阅关系: 一个消费者组订阅一个Topic的某一个Tag,这种记录被称为订阅关系。(消费者组名-Topic-tag)必须一致
可靠消费
重试-死信机制
RocketMQ消费过程分3个阶段
1、正常消费
失败进入重试消费
2、重试消费
重试16次进入死信
3、死信
待人工处理
Rebalance机制
客户端通过Rebalance服务做到高可靠的,当发生Broker掉线、消费者实例掉线、Topic扩容等突发情况时,触发
消费模式
集群消费模式
在同一个消费者组中的消费者实例,是负载均衡(策略配置)的消费Topic中的消息,假如有一个Producer发送了120条消息,其所属的Topic中有3个Consumer组,每个消费者组设置为集群消费,分别有2个消费者实例。那每个消费者实例有60条数据
广播消费模式
消息是广播分发的,及消费者组中的消费者实例将消费整个Topic的全部消息
消费者启动机制
核心属性
DefaultMQPullConsumer
namesrvAddr: 表示RocketMQ集群Namesrv地址,如果有多个,用分号分开。如:127.0.0.1:9876;127.0.0.2:9876;
clientIP: 使用客户端的程序所在机器的地址
instanceName: 实例名,每个进程的实例名必须不同,否则程序会启动失败。
vipChannelEnable: 表示是否开启VIP通道,VIP通道区别为单独一个端口进行通讯。
clientCallBackExecutorThreads: 客户端回调线程数。等于Netty通讯层回调线程的个数,默认值:Runtime.getRuntime().availableProcessors(),表示当前有效CPU个数
pollNameServerInterval: 获取Topic路由信息间隔,单位ms,默认30000ms
heartbeatBrokerInterval: 客户端和Broker心跳间隔,单位ms,默认30000ms
persistConsumerOffsetInterval: 持久化消费位点时间间隔,单位ms,默认5000ms
defaultMQPullConsumerImpl: 默认Pull消费者的具体实现
consumerGroup: 消费者组名字。
brokerSuspendMaxTimeMillis: 在长轮询模式下,Broker的最大挂起请求时间,不建议修改
consumerTimeoutMillisWhenSuspend: 在长轮询模式下,消费者的最大请求超时时间,必须比brokerSuspendMaxTimeMillis大,不建议修改
consumerPullTimeoutMillis: 消费者Pull消息时Socket的超时时间
messageModel: 消费模式,集群/广播
messageQueueListener: 消息路由信息变化时回调处理监听器,一般在重新平衡是会被调用。
offsetStore: 位点存储模块。集群模式会持久化到Broker中,广播模式会持久在本地文件中。有两个实现类,RemoteBrokerOffsetStore 和LocalFileOffsetStore.
allocateMessageQueueStrategy: 消费Queue分配策略管理器
maxReconsumeTimes: 最大重试次数,可以配置
registerMassageQueueListener(): 注册队列变化监听器,当队列发生变化时会被监听到。
pull(): 从Broker中Pull消息,如果有PullCallBacke参数,表示异步拉取。
pullBlockIfNotFound(): 长轮询方式拉取,如果没有拉取到消息,那么Broker会将请求Hold住一段时间。
updateConsumeOffset(final MessageQueue mq,final long offset): 更新某一个Queue的消费位点。
fetchConsumeOffset(final MessageQueue mq,final boolean fromStore): 查找某个Queue的消费位点。
sendMessageBack(MessageExt msg ,int delayLevel,String brokerName ,String consumerGroup): 如果消费发送失败,则可以将消息重新发回给Broker,这个消费组延迟一段时间可以再消费(也就是重新消费)
fetchSubscribeMessageQueues(final String topic): 获取一个Topic的全部Queue信息。
DefaultMQPushConsumer
大部分与DefaultMQPullConsumer是一样的
defaultMQPushConsumerImpl: 默认的Push消费者具体实现类
consumerFromWhere: 一个枚举值,表示从什么位点开始消费
CONSUME_FROM_LAST_OFFSET: 从上次消费的点位开始消费
CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST: RocketMQ 4.2.0 不支持,处理同CONSUME_FROM_LAST_OFFSET
CONSUM_FROM_MIN_OFFSET: RocketMQ 4.2.0不支持
CONSUM_FROM_MAX_OFFSET: RocketMQ 4.2.0不支持
CONSUME_FROM_FIRST_OFFSET: 从ConsumeQueue的最小位点开始消费。
CONSUME_FROM_TIMESTAMP: 从时间开始消费。
consumeTimestamp: 表示从哪一时刻开始消费,格式为yyyyMMDDHHmmss,默认为半小时前,当consumerFromWhere=CONSUME_FROM_TIMESTAMP 才生效
allocateMessageQueueStrategy: 消费者订阅topic-queue策略
subscription: 订阅关系,表示当前消费者订阅了哪些Topic的哪些Tag。
messageListener: 消息Push回调监听器
consumeThreadMin: 最小消费线程数,必须小于ConsumeThreadMax。
ConsumeThreadMax: 最大消费线程数,必须大于consumeThreadMin。
adjustThreadPollNumsThreshold: 动态调整消费线程池的线程数大小。(开源版本不支持)
consumeConcurrentlyMaxSpan: 并发消息的最大位点查。如果Pull消息的位点差超过该值,拉取变慢
pullThresholdForQueue: 一个Queue能缓存的最大消息数。超过该值则采取拉取流控措施。
pullThresholdForTopic: 一个Topic最大能缓存的消息字节数,超过则采取流控
pullThresholdSizeForTopic: 一个Topic最大能缓存的消息字节数,单位是MB。默认-1 ,结合pullThresholdForQueue配置项生效
pullInterval: 拉取间隔,单位ms
consumMessageBatchMaxSize: 消费者每次批量消费时,最多消费多少条消息。
pullBatchSize: 一次最多拉取多少条消息
postSubscriptionWhenPull: 每次拉取消息时是否更新订阅关系,默认为false
maxReconsumeTimes:最大重试次数,默认返回-1,表示默认最大为16次
suspendCurrentQueueTimeMillis: 为短轮询场景设置的挂起时间,比如顺序消息的场景
consumeTimeout
消费超时时间,单位min,默认15min
启动流程
DefaultMQPullConsumer
1、DefaultMQPullConsumer
1.1、构造函数初始化实例 -> start()
1.2、启动消费者实例 -> start() -> 2.1
1.3、启动失败
2、DefaultMQPullConsumerImpl
2.1、启动消费者实现类
2.2、是否初始化启动
否-> 1.3
是-> 2.3
2.3、
1)将消费者状态置为启动失败
2)消费者参数校验
3)默认消费者组名转换为进程Id
1)将消费者状态置为启动失败
2)消费者参数校验
3)默认消费者组名转换为进程Id
2.4、拷贝订阅关系
2.5、设置重平衡服务参数
2.6、初始化Pull接口包装类
2.7、向Pull接口包装类注册消息过滤器
2.8、初始化位点管理器,并加载消费位点
2.9、本地注册消费者
2.10、启动Client实例 -> start() -> 3.1
3、MQClientInstance
3.1、置为服务启动失败状态
3.2、如果没有配置Namesrv,远程获取
3.3、启动通讯模块服务
3.4、启动各种定时任务
3.5、启动Pull服务
3.6、启动RebalanceService服务
3.7、启动Push服务
3.8、置为服务启动状态
DefaultMQPushConsumer
1、DefaultMQPushConsumer
1.1、构造函数初始化实例 -> start()
1.2、启动消费者实例 -> start() -> 2.1
1.3、启动失败
2、DefaultMQPushConsumerImpl
2.1、启动消费者实现类
2.2、是否初始化启动
否-> 1.3
是-> 2.3
2.3、
1)将消费者状态置为启动失败
2)消费者参数校验
3)默认消费者组名转换为进程Id
1)将消费者状态置为启动失败
2)消费者参数校验
3)默认消费者组名转换为进程Id
2.4、拷贝订阅关系
2.5、设置重平衡服务参数
2.6、初始化Pull接口包装类
2.7、向Pull接口包装类注册消息过滤器
2.8、初始化位点管理器,并加载消费位点
2.9、初始化、启动Push消费服务
2.10、启动Client实例 -> start() -> 3.1
2.11、
1)更新Topic路由信息
2)消费者检查
3)发送心跳给Broker
4)立即重新平衡
1)更新Topic路由信息
2)消费者检查
3)发送心跳给Broker
4)立即重新平衡
3、MQClientInstance
3.1、置为服务启动失败状态
3.2、如果没有配置Namesrv,远程获取
3.3、启动通讯模块服务
3.4、启动各种定时任务
3.5、启动拉取服务
3.6、启动RebalanceService服务
3.7、启动Push服务
3.8、置为服务启动状态
消费进度保存机制
广播消费模式
消费位点存在消费者本地
集群消费模式
消费位点存在Broker中
消费流程
pull消费流程
push消费流程
消息过滤
Tag过滤
SQL过滤
MessageSelector.bySql(" name IS NOT NULL AND name like '%A%' ")
0 条评论
下一页