springboot 整合 apache rocketmq 源码
2024-07-24 09:28:36 0 举报
登录查看完整内容
springboot整合apache rocketmq源码主要包括了一些关键的配置文件和类,这些配置文件和类主要负责与rocketmq的交互和管理。其中,`application.properties`文件配置了rocketmq的连接信息,如nameserver地址等。`RocketMQConfig`类是一个配置类,它定义了一些rocketmq相关的Bean,如`rocketMQTemplate`,这个Bean用于发送和接收消息。`RocketMQListener`类是一个消息监听器,它实现了`RocketMQListener`接口,用于处理接收到的消息。此外,还有`RocketMQMessageConverter`类,它用于将POJO对象转换为消息。整个源码通过对这些配置文件和类的配置和实现,完成了springboot与apache rocketmq的整合,使得开发者可以在springboot应用中方便地使用rocketmq进行消息的发送和接收。
作者其他创作
大纲/内容
throw exception
container.setMessageConverter(rocketMQMessageConverter.getMessageConverter())
RocketMQReplyListener
executeLocalTransaction
TransactionMQProducer
DefaultMessageListenerConcurrently
监听器获取到 broker 推送的消息进行消费
((TransactionMQProducer) rocketMQTemplate.getProducer()).setExecutorService(...)
RocketMQLocalTransactionListener
消费者启动
applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class)
创建消费者用的是 PushConsumer
rocketMQReplyListener.onMessage(doConvertMessage(messageExt))
afterPropertiesSet
每个 @RocketMQMessageListener 注解的 bean 都会注册一个 DefaultRocketMQListenerContainer Bean对应的 beanname 是 ClassName_计数器
这两个也没啥好说的,就是 RocketMQTemplate 的扩展,producer 和 consumer 分别创建 RocketMQTemplate
在所有单例 bean 都创建完成后执行将事务监听器设置到 producer 中
RocketMQProperties
消费者执行业务逻辑的地方
producer.start()
可以将 DefaultRocketMQListenerContainer 认为是消费者
创建DefaultRocketMQListenerContainer 对象
bean: 有 @RocketMQMessageListener 注解的 beanannotation:bean 对应的@RocketMQMessageListener 注解信息
ListenerContainerConfiguration#createRocketMQListenerContainer
consumer.shutdown()
并发消费监听器
消费者关闭
如果rocketMQListener != null则执行
half 消息回复后执行本地事务rocketmq 的 Message 会先转成 Spring 的 Message
判断事务监听器是否实现 RocketMQLocalTransactionListener 接口
DefaultMessageListenerOrderly
ExtRocketMQTemplateConfiguration
!RocketMQLocalTransactionListener.class.isAssignableFrom(bean.getClass())
在所有单例 bean 都创建完成后执行手动注册 DefaultRocketMQListenerContainer
将事务监听器的设置给 producer
@Bean@ConditionalOnMissingBean(DefaultLitePullConsumer.class)@ConditionalOnProperty(prefix = \"rocketmq\
stop()
RocketMQListener
将 RocketMQLocalTransactionListener 转化成 rocketmq 的 TransactionListener
DefaultRocketMQListenerContainer#handleMessage
事务回查时rocketmq 的 Message 会先转成 Spring 的 Message
DefaultRocketMQListenerContainer
DefaultMessageListenerOrderly#consumeMessage
创建DefaultRocketMQListenerContainer 对象并注册进 Spring 容器中
checkLocalTransaction
listener.checkLocalTransaction(convertToSpringMessage(messageExt))
这里有有个扩展方法当有 @RocketMQMessageListener 注解的bean 实现了RocketMQPushConsumerLifecycleListener 时可用
在初始化方法中进行生产者和消费者的启动
消息转换器 RocketMQMessageConverter 是在ListenerContainerConfiguration 配置类的构造方法中注入的
bean 初始化
@Import@AutoConfigureAfter
RocketMQListener.class.isAssignableFrom(bean.getClass())有 @RocketMQMessageListener 注解的 bean 同时还实现了 RocketMQListener 接口
((TransactionMQProducer) rocketMQTemplate.getProducer()).getTransactionListener() != null
DefaultLitePullConsumer
@Bean@Conditional(ProducerOrConsumerPropertyCondition.class) // 只要存在生产者 bean 或 消费者 bean 就符合条件@ConditionalOnMissingBean(name = \"rocketMQTemplate\") // 没有 beanname 为 rocketMQTemplate 的 RocketMqTemplate bean 存在就符合条件
注册 bean
一个 RocketMQTemplate 只能对应一个事务监听器否则会报错,所以在创建事务监听器时需要注意如果对应的 rocketMQTemplateBeanName 已经被别的事务监听器使用了那么就不要再去设置相同的 RocketMQTemplate了,可以新建一个 RocketMQTemplate bean
MessageListenerConcurrently
生产者已经设置了事务监听器则报错
RocketMQUtil#convert(org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener)
consumer.start()
消费者执行业务逻辑的地方 业务逻辑执行完返回的数据要回复给 broker
ExtConsumerResetConfiguration
执行本地事务、事务回查的逻辑
container.setRocketMQListener((RocketMQListener) bean)
DefaultRocketMQListenerContainer#initRocketMQPushConsumer
事务监听器没有实现 RocketMQLocalTransactionListener则报错
MessageConverterConfiguration
String.format(\"%s_%s\
container.setRocketMQReplyListener((RocketMQReplyListener) bean)
applicationContext.getBeansWithAnnotation(RocketMQTransactionListener.class)
@Bean@ConditionalOnMissingBean(DefaultMQProducer.class)@ConditionalOnProperty(prefix = \"rocketmq\
consumer.setMessageListener(new DefaultMessageListenerConcurrently())
顺序消费监听器用于消费顺序消息
RocketMQTransactionListener
rocketmq 的消息转换器内部封装了一批实际执行消息类型转换的转换器
rocketMQListener.onMessage(doConvertMessage(messageExt)
RocketMQTemplate
((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer)
RocketMQReplyListener.class.isAssignableFrom(bean.getClass())有 @RocketMQMessageListener 注解的 bean 同时还实现了 RocketMQReplyListener接口
DefaultMessageListenerConcurrently#consumeMessage
执行消费消息的业务逻辑
使用的是拉模式中的订阅模式
DefaultMQProducer
@Import@AutoConfigureBefore
RocketMQTemplate 的功能是发布和拉取消息发布消息就用这个RocketMQTemplate 就行了消费消息的话因为 RocketMQTemplate 提供的是拉模式,所以要起个线程去不停的拉取消息(一般不使用它的消费消息功能)
订阅消息
RocketMQMessageListener
MessageListenerOrderly
start()
RocketMQTransactionConfiguration
<!-- 引入依赖时需要注意下版本,看下依赖包引入的 rocketmq 的版本 具体可以看下源码:https://github.com/apache/rocketmq-spring 对应的依赖的版本在 parent pom 中:https://github.com/apache/rocketmq-spring/blob/rocketmq-spring-all-2.2.2/rocketmq-spring-boot-parent/pom.xml rocketmq-spring-boot-starter 2.2.2 -> rocketmq 版本是 4.9.3 -> spring-boot 版本是 2.5.9 -> spring 版本是 5.3.18 - 我的 rocketmq 安装的是 4.9.4,这里使用 rocketmq-spring-boot-starter 2.2.2 基本没问题 rocketmq-spring-boot-starter 2.2.3 对应的 rocketmq 版本是 5.0.0,和安装的 rocketmq 版本差距很大,跨了一个大版本,不能使用这个版本 --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.2</version> </dependency>
afterSingletonsInstantiated
消费者:bean、有 @RocketMQMessageListener 注解、实现RocketMQListener或RocketMQReplyListener 接口事务监听器:bean、有 @RocketMQTransactionListener 注解、实现 RocketMQLocalTransactionListener(注意消息的类 Messge 是Spring 的而不是 RocketMQ 的)生产者:生产者配置、RocketMQTemplate 发送消息如果是 PullConsumer 则使用 RocketMQTemplate如果是 PushConsumer 则使用 @RocketMQMessageListener 注解 这种方式
找有 @RocketMQMessageListener 注解的 bean
TransactionListener
如果rocketMQListener == null 且 rocketMQReplyListener != null则执行
事务监听器实现了 RocketMQLocalTransactionListener获取注解中指定的 RocketMQTemplate bean
beans.forEach(this::registerContainer)
创建的是这个带事务消息功能的生产者
找有 @RocketMQTransactionListener 注解的 bean
((TransactionMQProducer) rocketMQTemplate.getProducer()).setTransactionListener(RocketMQUtil.convert((RocketMQLocalTransactionListener) bean))
@Import
beans.forEach(this::registerTransactionListener)
判断生产者是否已经设置了事务监听器
ListenerContainerConfiguration
生产者还没有设置事务监听器
设置线程池
RocketMQMessageConverter
CompositeMessageConverter
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
(RocketMQTemplate) applicationContext.getBean(annotation.rocketMQTemplateBeanName())
根据 @RocketMQMessageListener 注解中的 ConsumeMode 来设置不同的消息监听器
该接口中的消息对象是 Spring 的 Message,和 rocketmq 的 消息对象不是同一个类
@EnableConfigurationProperties
0 条评论
回复 删除
下一页