RocketMq消息发送流程
2021-08-25 10:26:14   45  举报             
     
         
 rocketMq 同步消息,异步消息,broker端处理流程图
    作者其他创作
 大纲/内容
 处理存储结果
  processResponseCommand
  CommitLogputMessage
  MappedFileappendMessage
  doCommit
  rsp
  this.mappedByteBuffer.force();
  1、broker是否有写权限2、topic是否是默认topic3、获取或创建topicConfig4、判断queueId是否超过限制
  异步刷盘根据是否开启transientStorePoolEnable机制,刷盘会有点区别。如果transientStorePoolEnable为true,mq会单独申请一个与目标物理文件(CommitLog)相同大小的堆外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存中 然后提交到与物理文件对应的内存映射内存中,再flush到磁盘。 如果transientStorePoolEnable为false,消息直接追加到与物理文件直接映射的内存中,然后刷写到磁盘=============================================在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入(超过10s,或达到4页数据,才会被刷)
  invokeAsyncImpl
  sendMessage
  1、线程中塞入请求,run方法中就能感知到,并处理刷盘请求service.putRequest(request)2、waitForFlush,等待刷盘结果
  broker响应之后逻辑
  刷盘
  DefaultAppendMessageCallbackdoAppend
  NettyRemotingAbstractprocessMessageReceived
  DefaultMQProducerImplupdateFaultItem
  GroupCommitServicerun
  DefaultMessageStoreputMessage
  DefaultMQProducersend
  sendKernelImpl
  异步线程池,new Runnable来执行的
  1、END_OF_FILE:创建新MappedFile文件,再走之前的appendMessage2、发送消息的一些统计
  ChannelwriteAndFlush
  返回存储结果
  FlushRealTimeServicewakeup
  msgCheck
  appendMessagesInner
  sendDefaultImpl
  handleRetryAndDLQ
  MappedFileQueueflush
  如果从catch异常,递归
  其实就是countDownLatch.await在死等响应,netty客户端发送完之后,等待服务端的响应结果
  返回成功
  异步消息发送的重试,是靠递归来实现的
  CommitLoghandleDiskFlush
  MQClientAPIImplprocessSendResponse
  FlushRealTimeServicerun
  NettyRemotingClientinvokeSync
  MQClientAPIImplsendMessageAsync
  异步
  MQClientAPIImplonExceptionImpl
  isTransientStorePoolEnable
  NettyRemotingClientinvokeAsync
  同步
  consumer消费失败后,发回的重试消息处理逻辑(含发到死信队列)
  Y
  client接收broker端响应
  DefaultMQProducerImplsend
  processRequestCommand
  N
  等待netty回调
  ResponseFutureexecuteInvokeCallback
  await和countDown
  响应client
  borker响应
  SendMessageProcessorprocessRequest
  做了一堆基础校验不通过,则直接返回对应错误
  网络等待
  NettyRemotingClient$NettyClientHandlerchannelRead0
  putResponse就是把返回的结果赋值一下,同时执行countDown,唤醒发送时候wait的线程
  MQClientAPIImplsendMessage
  NettyRemotingServer$NettyServerHandlerchannelRead0
  sendMessageAsync
  ResponseFutureputResponse
  同步消息
  handleHA
  executeInvokeCallback
  刷盘机制
  invokeSyncImpl
  通过netty进行网络发送
  通过校验
  异常返回
  1、获取brokerAddr(IP+PORT)2、给普通message添加uniqId3、对大于4k的普通消息进行压缩4、设置一些标志位(sysFlag,事务消息)5、hook检查6、构建发送消息请求头,requestHeader
  client发送请求
  是否同步
  异步消息
  sendMessageSync
  wakeup后,run方法就不会一直轮询等待了即又走回到上面的那种异步方式了
  InvokeCallbackoperationComplete
  CommitRealTimeServicerun
  ResponseFuturewaitResponse
  10s一次
  MappedFileQueuecommit
    
    收藏 
      
    收藏 
     
 
 
 
 
  0 条评论
 下一页
  
  
  
  
  
  
  
  
  
 