rocketmq 发送消息
2024-04-05 15:48:22   9  举报             
     
         
 rocketmq 发送消息简单流程
    作者其他创作
 大纲/内容
 TimerMessageStore#doEnqueue
  TopicQueueLock#lock用了32个锁根据mq进行hash
  是
  ServiceThread#waitForRunning执行一次等待 30 ms
  TransactionalMessageService#prepareMessage
  state
  TimerMessageStore#convert转换为正常消息,topic等信息
    commit
  java.nio.channels.FileChannel#forcejava.nio.MappedByteBuffer#force真正刷盘
  处理 RequestHeaderV
  MessageStore#putMessage写消息
  TimerMessageStore.TimerEnqueuePutService#fetchTimerRequests一次最多取10条
  rollback
  CommitLog#asyncPutMessage写入消息
  是否是事务消息
  DefaultMappedFile#flush
  TransactionalMessageCheckService
  FlushRealTimeService#run根据 flushIntervalCommitLog(默认500ms) 配置的时间间隔进行定时刷盘
  TimerMessageStore.TimerEnqueueGetService#run间隔100ms
  一大堆 if else 校验
  否
  返回结果
  TimerMessageStore#dequeue
  是否同步刷盘
  TimerWheel#getSlot获取延迟时间所在的 slot
  EndTransactionProcessor#processRequest
  TimerWheel#putSlot修改指定槽的偏移量
  MQClientInstance#updateTopicRouteInfoFromNameServer组装 RemotingCommand 发送请求获取队列信息
  .MessageStore#putMessage写入消息
  AbstractTransactionalMessageCheckListener#sendCheckMessage发送消息到client
  ndTransactionProcessor#endMessageTransaction组装消息,主要是修改topic。从事务队列转到真实的队列
  MappedFileQueue#getLastMappedFile获取要写入的映射文件
  TimerWheel 是一个时间轮,一个有 7*24*3600 个槽默认精度 1s,也就是1s一个槽
  font color=\"#323232\
  CommitLog#handleDiskFlushAndHA刷盘和高可用处理
  .UtilAll#crc32(byte[])计算crc
  DefaultMQProducerImpl#sendDefaultImpl
  TransactionalMessageCheckService#run
  TimerMessageStore#enqueue
  .MessageStore#asyncPutMessage写入消息
  TransactionalMessageServiceImpl#check
  TimerRequest构建一个对象
  延时消息流程
  TimerMessageStore.TimerDequeueGetService#run间隔100ms
  dequeuePutQueue.put
  MessageStore#putMessage写入消息
  如果是设置了org.apache.rocketmq.client.sendSmartMsg或者是批量数据则使用 v2 的头
  v2的头相对于v1就是把一些字段名称给变成了 a-z 。压缩了传输数据量
  整个 rocket 大部分的异步都是通过 CompletableFuture.completedFuture来包装成异步.但是在写入消息的时候是把异步包装成同步(等待执行完成)
  MQClientAPIImpl#sendMessage
  是否需要高可用或者主从同步
  TimerLog#append保存时间轮队列的消息偏移量以及延迟信息
  PutMessageHook#executeBeforePutMessage写入数据前的一个钩子
  executeSendMessageHookAfter和上面相同的钩子
  需要等到满足要求的slave返回ack
  HookUtils#handleScheduleMessage一个钩子在保存消息的时候把延时消息的topic改成默认的
  ServiceThread#onWaitEnd
  校验当前可用接节点是否满足要求否则抛出 IN_SYNC_REPLICAS_NOT_ENOUGH
  DefaultMQProducerImpl#checkTransactionState
  如果是 TRANSACTION_NOT_TYPE 状态不处理,下次继续check
  这些信息会定时的从nameServer获取
  DefaultMQProducer#send各种send方法
  TransactionListener#checkLocalTransaction
  TimerDequeuePutMessageService#run间隔 10ms 一次
  2 个队列4 个定时任务一直倒腾
  getBrokerNameFromMessageQueue找到 brokerName
  TransactionalMessageServiceImpl#deletePrepareMessage删除消息
  findBrokerAddressInPublish找到发送地址
  ClientRemotingProcessor#checkTransactionState
  正常保存消息到 rmq_sys_wheel_timer
  MappedFile#appendMessage
  HookUtils#transformTimerMessage把真实的信息保存到属性中
  PutMessageLock#unlockTopicQueueLock#unlock解锁
  sendMessage
  ServiceThread#startbroker 启动的时候调用
  TopicQueueMappingManager#rewriteRequestForStaticTopic判断当前broker是不是leader
  TimerMessageStore#doPut最多重试 3 次
  ransactionalMessageCheckService#onWaitEnd
  预计执行时间是否到了
  HookUtils#handleScheduleMessage延迟任务校验
  EndTransactionProcessor#sendFinalMessage
  TopicQueueMappingManager#buildTopicQueueMappingContext获取topic的详细信息
  事务消息处理
  设置msg的各种标记
  processTransactionState
  MQClientAPIImpl#endTransactionOneway发送消息到broker
  MessageStore#getConsumeQueue从 rmq_sys_wheel_timer 获取消息
  FlushManager#handleDiskFlush刷盘
  记录执行情况
  GroupCommitRequest#wakeupCustomer刷盘后把同步的请求都标记完成
  AbstractTransactionalMessageCheckListener#resolveHalfMsg
  TimerWheel#getSlot获取当前时间对应的 slot
  TimerMessageStore.TimerEnqueuePutService#putMessageToTimerWheel
  通过前面设置的发送类型调用 remotingClient 的接口
  enqueuePutQueue.offer入队
  MessageClientIDSetter#setUniqID设置消息的唯一标识
  needHandleHA
  SendMessageProcessor#processRequest所有发送的消息都会在 broker 里这个处理器处理
  .HookUtils#checkInnerBatch校验批量数据
  DefaultMQProducerImpl#sendKernelImpl
  needRoll判断是否在当前时间轮中,如果超出则标记到下一轮
  TimerMessageStore.TimerEnqueuePutService#run间隔100ms
   
 
 
 
 
  0 条评论
 下一页
  
   
   
   
   
  
  
  
  
  
  
  
  
 