消息消费流程
2021-08-25 10:34:41   3  举报             
     
         
 并发消费,顺序消费,broker端处理拉取消息流程等
    作者其他创作
 大纲/内容
 if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())      && !this.processQueue.isLocked()) {      log.warn(\
  构建请求头PullMessageRequestHeader
    invokeAsyncImpl
  找到消息:1:设置下一次拉取的消息偏移量参数2:统计拉取RT      DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT3:如果经过过滤之后的消息为空,则立马再拉取一次     DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest)4:将拉取的消息放入到processQueue     processQueue.putMessage(pullResult.getMsgFoundList())5:开启线程异步消费ProcessQueue中的数据     DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest6:进行下一次数据拉取
  this.mQClientFactory.getMQClientAPIImpl()pullMessage
  在执行消费消息之前,还有如下操作:1:listener = ConsumeMessageConcurrentlyService.this.messageListener2:context = new ConsumeConcurrentlyContext(messageQueue);3:ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);      如果topic是%retry%开始的,需要还原成原始topic4:listener.consumeMessage     业务系统自己消费逻辑
  while(true)
  LocalFileOffsetStore
  END
  pullMessageService.start
  1、font color=\"#d32f2f\
  把pullRequest放入ManyPullRequest
  processQueue.isLocked()
  并发消费场景
  processQueue.getMaxSpan > consumeConcurrentlyMaxSpan
  此处put请求到queue中了,前面pullMessageService.start阻塞在那的线程,才可以继续往下走了
   如果支持挂起,则将response设为null,netty服务端将不会立即向netty客户端写入响应——channel.writeAndFlush
  OffsetStoreupdateOffset
  defaultMQPushConsumerchangeInstanceNameToPID
  从本地记录的offset.json文件中获取进度
  ProcessQueueremoveMessage
  每隔15分钟清理超过15分钟都没消费的消息
  lockMQPeriodically
  this.brokerController.getConsumerOffsetManager().queryOffset
  线程run
  获取消费锁,然后执行业务系统自身的逻辑
  isTransferMsgByHeaptrue(默认):从堆外内存copy回jvm内存一次,然后将byteBuffer的字节数组塞到body种false:直接构建FileRegion对象,通过channel直接发送出去,少一次copy了
  startScheduledTask
  load
  ConsumeMessageOrderlyService.this.processConsumeResult
  defaultMQProducer.getDefaultMQProducerImpl().start
  mQClientFactoryrebalanceImmediately
  1、判断拉取回来的数据是否超过1000条都没有消费了2、判断拉取回来的数据是否超过100MB都没有消费了如果是促发流控,过50ms再来拉数据
  CONSUME_SUCCESS
  updateProcessQueueTableInRebalance
  cachedMessageCount>pullThresholdForQueuecachedMessageSizeInMiB>pullThresholdSizeForQueue
  registerMessageListener
  NettyClientHandlerprocessMessageReceived
  成功就略过失败的话,把消息逐条发送给broker
  BROADCASTING||processQueue.isLocked
  consumeMessageService
  一样了
  defaultMQPushConsumerImpl.subscribe
  过3秒再来拉取消息executePullRequestLater
  顺序
  mqClientFactory.doRebalance
  commitOffsetValue
  广播模式或者有未过期的锁
  notifyMessageArriving
  广播
  RequestHoldServicesuspendPullRequest
  NettyRemotingClientinvokeAsync
  死循环消费
  this.submitConsumeRequestLater
  new DefaultMQPushConsumerImpl
  RECONSUME_LATER
  MQClientManagermQClientFactory=getAndCreateMQClientInstance
  this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset
  processQueue没加锁
  构建PullCallback
  run
  支持挂起的情况下
  RemoteBrokerOffsetStorefetchConsumeOffsetFromBroker
  ConsumeMessageOrderlyService
  this.rebalanceImpl.computePullFromWherepullRequest.setNextOffset(offset)
  判断消费模式
  Remote模式
  挂起请求
  FOUND
  pullAPIWrapperprocessPullResult
  并发
  ConsumerManageProcessorqueryConsumerOffset
  broker查询满足条件的数据
  1、4.1.0以上才可以使用SQL92过滤2、如果是slave,则清除commit offset标记
  mQClientAPIImpl.start
  PULL_OFFSET_MOVED
  DefaultMQPushConsumerImpldoRebalance
  processQueue加锁
  MessageListenerOrderly
  获取borker地址信息
  rebalanceImpl.setConsumerGrouprebalanceImpl.setMessageModelrebalanceImpl.setAllocateMessageQueueStrategyrebalanceImpl.setmQClientFactory
  PullMessageProcessorexecuteRequestWhenWakeup
  response赋值
  defaultMQPushConsumerImplcheckConfig
  processQueuetakeMessags
  rebalanceImpl.getSubscriptionInner().get
  集群模式,如果没锁了或者锁过期了延迟10ms再消费
  RemoteBrokerOffsetStorereadOffset
  mQClientFactoryregisterConsumer
  CLUSTERING
  广播消费的时候,进度是存本地的,这个地方就是更新下本地内存中记录的消息进度offsetTable.putIfAbsent每5s会持久化到本地文件中
  messageListener.consumeMessage
  实例化offsetStore
  PullCallbackonSuccess
  继续
  mQClientFactorystart
  到broker端拉取数据
  DefaultMQPushConsumer::new
  成功
  response=null
  1、确保当前状态是runnibg,否则默认3秒后再来拉取2、判断当前consumer是否被暂停拉取数据了,如果是1s后再来拉取
  有新消息了,并且匹配
  defaultMQPushConsumerImplcopySubscription
  RebalanceServicerun
  很重要:处理返回的ACK
  subscribe(\"topic\
  构建PullRequest
  OFFSET_ILLEGAL
  check 
  synchronized (objLock)
  RebalancePushImpldispatchPullRequest
  记录成功和失败消息的数量this.getConsumerStatsManager().incConsumeOKTPSthis.getConsumerStatsManager().incConsumeFailedTPS
  start
  顺序消费场景
  InvokeCallbackoperationComplete
  构建MessageFilter
  pullRequest.setNextOffset(pullResult.getNextBeginOffset())DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);1:设置下次拉取的进度2:offsetTable中修改进度3:重新拉取一次
  SUSPEND_CURRENT_QUEUE_A_MOMENT
  mQClientFactorysendHeartbeatToAllBrokerWithLock
  拉取结果
  channel.writeAndFlush
  发生给broker
  RemoteBrokerOffsetStore
  updateOffset
  能获取到锁,就是在这里加的
  ackIndex置为-1,为后面发回broker准备记录失败的消息数量this.getConsumerStatsManager().incConsumeFailedTPS
  成功就略过失败的话打印下每条消息的错误记录
  ResponseFutureexecuteInvokeCallback
  ConsumeMessageConcurrentlyService
  processRequest
  RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
  如果消费模式为集群模式默认每隔20s执行一次锁定分配给自己的消息消费队列
  this.pullAPIWrapper.pullKernelImpl
  checkHoldRequest
  incGroupGetNumsincGroupGetSizeincBrokerGetNums
  构建返回的结果对象
  this.pullRequestQueue.take()pullMessage
  this.defaultMQPushConsumerImplexecutePullRequestImmediately(pullRequest)
  集群模式下,把内存中的消费进度提交给broker
  PullAPIWrapperfindBrokerAddressInSubscribe
  统计信息修改
  SUCCESS
  mQClientFactorycheckClientInBroker
  默认最多65535个异步请求同时拉取消息
  PullMessageProcessorprocessRequest
  dispatchPullRequest
  BROADCASTING
  Local模式
  rebalanceByTopic
  pullAPIWrappernew PullAPIWrapper
  NO_NEW_MSGNO_MATCHED_MSG
  PULL_NOT_FOUND
  cleanExpireMsg
  ConsumeMessageConcurrentlyServicesubmitConsumeRequest
  重试消息的起点
  1:构建response、responseHeader2:获取requetHeader3:判断当前broker是否可读4:根据requestHeader中的consumerGroup来获取其订阅信息SubscriptionGroupConfig,如果group不存在,默认回自动创建一个5:根据请求参数,获取一些请求flag:    a: hasSuspendFlag ---消息是否支持被挂起    b: hasCommitOffsetFlag --- 是否从内存中读取消费进度    c: hasSubscriptionFlag ---  消息过滤机制的支持,天然就是true6: topic和queue做一些基本校验、TagType的一些校验等等
  DefaultMQPushConsumerImplpullMessage
  有一把写锁,来保证取消息的顺序把要消费的消息从msgTreeMap移动到consumingMsgOrderlyTreeMap
  LocalFileOffsetStorereadOffset
  这个方法有两处被调用:1、其中一处是PullRequestHoldService中的定时任务,每隔 一段时间就会去查啊看是否有新消息到来2、doReput;也就是broker收到消息,异步构建ConsumeQueue、IndexFile的过程
  先获取borker的ip和端口,再去服务端拉数据
  this.semaphoreAsync.tryAcquire
  ConsumeMessageOrderlyServicesubmitConsumeRequest
  ConsumeMessageConcurrentlyService.this.processConsumeResult
  this.brokerController.getMessageStore().getMinOffsetInQueue
  rebalanceService.start
  并发消费的时候,如果maxSpan>2000,则触发流控,50s后再来看看maxSpan并不是消息的数量,而是processQueue内部treeMap中首尾消息的偏移量差值;这个偏移量,是在consumeQueue文件的逻辑offset
  1、incConsumeFailedTPS2、checkReconsumeTimes,检查重试次数,如果小于16次,可以重新消费:makeMessageToCosumeAgainsubmitConsumeRequestLater3:否则进死信队列了(会sendMessageBack)
  套娃了,又到了broker处理拉取消息的流程了这次brokerAllowSuspend=false了,首次默认是true的
  集群
  结果为-1
  DefaultMessageStoregetMessage
  MQClientAPIImplprocessPullResponse
  获取当前messageQueue的锁
  tryLockLaterAndReconsume
  broker端只是根据tagCode来过滤数据了,可能有hash冲突consumer端进行二次过滤,这次是名称equals比较
  把消费成功的消息从processQueue中删除
  makeSureStateOKisPause
  失败
  listener.consumeMessage
  获取返回的对象的字节数组response.setBody(r)
  defaultMQPushConsumerImplstart
  this.rebalanceService.wakeup()
  RemoteBrokerOffsetStorefindBrokerAddressInAdmin
  设置nextBeginOffset、minOffset、maxOffset如果master太忙,设置从salve拉取信息
  PullMessageServicethis.pullRequestQueue.put(pullRequest)
  PullResultExt
  RebalanceImpldoRebalance
  font color=\"#1976d2\
  如果sendMessageBack失败,则将失败的消息存入新list,延迟5S再消费
  MessageListenerConcurrently
  updateTopicSubscribeInfoWhenSubscriptionChanged
    
    收藏 
      
    收藏 
     
 
 
 
 
  0 条评论
 下一页
  
  
  
  
  
  
  
  
  
 