RocketMq_JAVA_流程图_架构图_系统图_数据库
2024-11-03 14:30:41 30 举报
为你推荐
查看更多
RocketMq(底层解析)消息发送流程
作者其他创作
大纲/内容
处理存储结果
processResponseCommand
CommitLogputMessage
MappedFileappendMessage
doCommit
1、broker是否有写权限2、topic是否是默认topic3、获取或创建topicConfig4、判断queueId是否超过限制
rsp
this.mappedByteBuffer.force();
异步刷盘根据是否开启transientStorePoolEnable机制,刷盘会有点区别。如果transientStorePoolEnable为true,mq会单独申请一个与目标物理文件(CommitLog)相同大小的堆外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存中 然后提交到与物理文件对应的内存映射内存中,再flush到磁盘。 如果transientStorePoolEnable为false,消息直接追加到与物理文件直接映射的内存中,然后刷写到磁盘=============================================在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入(超过10s,或达到4页数据,才会被刷)
invokeAsyncImpl
sendMessage
1、线程中塞入请求,run方法中就能感知到,并处理刷盘请求service.putRequest(request)2、waitForFlush,等待刷盘结果
broker响应之后逻辑
client发送请求
刷盘
DefaultAppendMessageCallbackdoAppend
NettyRemotingAbstractprocessMessageReceived
DefaultMQProducerImplupdateFaultItem
GroupCommitServicerun
consumer消费失败后,发回的重试消息处理逻辑(含发到死信队列)
DefaultMessageStoreputMessage
DefaultMQProducersend
sendKernelImpl
1、END_OF_FILE:创建新MappedFile文件,再走之前的appendMessage2、发送消息的一些统计
ChannelwriteAndFlush
返回存储结果
FlushRealTimeServicewakeup
msgCheck
appendMessagesInner
sendDefaultImpl
handleRetryAndDLQ
MappedFileQueueflush
如果从catch异常,递归
其实就是countDownLatch.await在死等响应,netty客户端发送完之后,等待服务端的响应结果
返回成功
异步线程池,new Runnable来执行
CommitLoghandleDiskFlush
MQClientAPIImplprocessSendResponse
FlushRealTimeServicerun
NettyRemotingClientinvokeSync
MQClientAPIImplsendMessageAsync
异步
MQClientAPIImplonExceptionImpl
isTransientStorePoolEnable
NettyRemotingClientinvokeAsync
同步
Y
client接收broker端响应
DefaultMQProducerImplsend
processRequestCommand
N
等待netty回调
ResponseFutureexecuteInvokeCallback
await和countDown
响应client
borker响应
异步消息发送的重试,是靠递归来实现的
SendMessageProcessorprocessRequest
做了一堆基础校验不通过,则直接返回对应错误
网络等待
NettyRemotingClient$NettyClientHandlerchannelRead0
putResponse就是把返回的结果赋值一下,同时执行countDown,唤醒发送时候wait的线程
MQClientAPIImplsendMessage
broker启动的时候,会针对不同的请求,注册不同的处理器,针对发送消息的处理器,代码如下:font color=\"#e74f4c\
NettyRemotingServer$NettyServerHandlerchannelRead0
sendMessageAsync
ResponseFutureputResponse
同步消息
handleHA
executeInvokeCallback
刷盘机制
invokeSyncImpl
通过netty进行网络发送
通过校验
异常返回
1、获取brokerAddr(IP+PORT)2、给普通message添加uniqId3、对大于4k的普通消息进行压缩4、设置一些标志位(sysFlag,事务消息)5、hook检查6、构建发送消息请求头,requestHeader
是否同步
异步消息
sendMessageSync
wakeup后,run方法就不会一直轮询等待了即又走回到上面的那种异步方式了
InvokeCallbackoperationComplete
CommitRealTimeServicerun
ResponseFuturewaitResponse
10s一次
MappedFileQueuecommit
0 条评论
回复 删除
下一页