RocketMq框架源码
2022-12-03 00:12:39   0  举报             
     
         
 rocketMq框架源码流程图
    作者其他创作
 大纲/内容
 CommitLogDispatcherBuildIndex
  run()
  检测环境变量 
  创建线程 run方法
  switch (communicationMode) 
  创建 处理 consumer 的pull 请求线程池
    serverHandler
  this.filterServerManager.start()
  创建消息存储组件
  this.tryToFindTopicPublishInfo(msg.getTopic())
  对不同的请求 使用不同的 处理器进行处理
  RouteInfoManager 路由表
  启动客户端 就是netty连接
  while (!this.isStopped())
  进行处理请求
  初始化 acl
  this.mQClientAPIImpl.start()
  启动netty 客户端
  添加请求
  设置topic 和 tag
  else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently)
  创建 brokerController 对象
  设置监听端口 9876
  namesrv的 handler
  if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType())
  this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor
  Netty的 handler
  broker netty 客户端
  创建心跳线程池
  寻找队列 负载均衡实现
  this.thread.start()
  可信客户端工厂
  consumerQueue分发器 用于在 consumerQueue记录偏移量
  创建好 具体的RemotingCommand 获通过netty发送 
  this.pullRequestHoldService.start()
  new NettyServerConfig()
  this.registerProcessor()
  this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor
  this.reputMessageService.start()
  异步写入 commitlog文件 后面有一个延迟队列处理方式
  this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName())
  this.brokerController.getMessageStore().asyncPutMessage(msgInner)
  tpInfo.getMessageQueueList().get(pos)
  通过netty 去 namesrv获取路由信息
  加载 就是 kvConfig.json 文件
  this.adminBrokerExecutor = Executors.newFixedThreadPool
  this.doReput()
  创建服务端线程池
  执行发送方法
  异步刷盘
  this.namesrvController.getRouteInfoManager().registerBroker
  tpInfo.getSendWhichQueue().incrementAndGet()
  this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor
  this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor
  this.topicConfigManager.load()
  创建 producer对象
  实现 handler 方法 处理业务channelRead0
  大部分都是集群模式,只有存在远端裁可以保证消息只往一个消费者组的一个消费者发送
  进行初始化
  处理请求 这里是 DefaultRequestProcessor
  initialRpcHooks()
  new NamesrvConfig()
  创建定时任务延迟5秒 每隔10描扫一次 Broker 移除不活跃的Broker
  执行处理
  注册
   this.scheduledExecutorService.scheduleAtFixedRate
  consumerqueue文件会由线程去扫描文件是否变化去写入 consumerqueue
  实现刷盘
  延迟消息实现 如果延迟时间 > 0
  每隔1毫秒检查一次 commitlog 偏移量 是否由变化
  创建线程
  会通过netty 发送到 服务端  handler 处触发读请求
  注册很多处理器  内容太多源码上看
  启动存储组件 主要用于 commitlog的写入是事件 分发给 comsumerQueue 喝 indexFile
  创建同步刷盘请求
  启动控制器
  没有环境变量直接退出
  线程任务
  以每隔10ms的时间调用操作系统刷盘
  if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable())
  创建 任务 把run方法传入
  this.brokerOuterAPI.start()
  NamesrvController.this.kvConfigManager.printAllPeriodically()
  重新加载配置 防止丢失
  发送消息到 broker
  获取
  Broker入口
  CommitLogDispatcherBuildConsumeQueue
  switch (this.defaultMQPushConsumer.getMessageModel())
  根据索引返回具体的 queue
  注册 broker
  客户端工厂
  start()
  new NettyClientConfig()
  执行处理 reuqest
  根据不通类型设置不通消费消息模式 顺序消费和并发消费
  创建 Namesrv对象 传入两个配置对象
  this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor
  启动netty 整个核心交互业务
  this.remotingServer.start()
  初始化事务
  启动 netty服务 vip
                  this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
  广播消息 本地存储 offset偏移量
  这里会初始化 commitlog 对象 用于写入文件
  this.runnable.run()
  PipeLine
  index 分发器构建 维护index索引
  客户端 发送消息入口
  同步刷盘
  实现机制,把消息存到 修改的topic中去,后端有线程根据时间在进行处理转发到 需要发送的 topic
  initialAcl()
  创建 Netty服务端配置
  加载磁盘文件
  会通过netty 发送到 服务端  handler 处触发读请求
  new CommitLog(this)
  执行心跳注册 30 秒执行一次
  consumer在构造方法中 就声明了负载均衡策略 AllocateMessageQueueAveragely
  this.consumerManageExecutor = Executors.newFixedThreadPool
  broker 是服务端 也是客户端 服务端连接java程序 客户端连接 namesrv注册
  创建 broker配置类
  设置监听方法 等待消费消息
  注册的处理器
  创建 Netty 客户端配置
  创建回复消息线程池
  如果有变化会进行分发
  if (this.getMessageListenerInner() instanceof MessageListenerOrderly)
  构造方法中会创建 RouteInfoManager路由表
  由start方法启动
  this.consumerOffsetManager.load()
  consumer.subscribe(\"orderTopic\
  创建 Netty 配置类
  设置负载均衡策略
  this.clientManageExecutor = new ThreadPoolExecuto
  启动的这个线程任务  调用run方法
  启动commitlog start线程
   this.fileWatchService.start()
  this.defaultMQProducerImpl.start()
  获取 namesrv地址
  创建 Netty 服务端对象
  同步
  MessageModel 来区分是集群模式还是广播模式
  for (CommitLogDispatcher dispatcher : this.dispatcherList) {
  if (msg.getDelayTimeLevel() > 0)
  new GroupCommitService()
  生产者获取 topic 通过topic 可以获取 具体的消息队列 messageQueue
  维护客户端工厂 缓存好实例和topic关系 也就是生产者组
  创建 Namesrv配置类
  执行run方法
  创建 Netty远程服务端
  this.fastRemotingServer.start()
  initialTransaction()
  处理请求
  CommitLog.this.mappedFileQueue.flush(0)
  NamesrvStartup
  设置好 并发消息还是顺序消息后 运行start  这里只画并发消息
  start(createBrokerController(args))
  Netty
  this.commitLog.start()
  写入commitlog 
  DefaultMessageStore.this.doDispatch(dispatchRequest)
  再次加载属性 防止丢失
  this.processorTable.get(cmd.getCode())
  启动netty
  客户端请求数据 
  this.kvConfigManager.load()
  this.messageStore.start()
  this.commitLog.asyncPutMessage(msg)
  mQClientFactory.start()
  加载磁盘配置 conf目录下的 json文件
  获取路由信息
  执行传入的 run方法
  创建客户端线程池
  执行异步处理request SendMessageProcessor 这个类
  获取对应的处理器这里是 DefaultRequestProcessor
  启动 netty 服务
  this.messageStore.load();
  this.clientHousekeepingService.start()
  构建了 fast监听 算是vip通道 端口是 监听端口 - 2
  this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor
  start(controller)
  controller.getConfiguration().registerConfig(properties)
  createNamesrvController(args)
  this.doCommit()
  传入
  Thread.sleep(1)循环
  实现非常简单 每次进行+1操作 取模对垒数量 
  调用启动方法 
  集群模式在broker远端存储 offset 偏移量
  controller.initialize()
  new DefaultMQPushConsumer(“consumerGroup)
  注册心跳
  通过连接namesrv netty服务端获取 路由信息
  broker 注册 processor
  while循环 每隔10ms进行处理一次刷盘
  if (null == namesrvConfig.getRocketmqHome())
  nettyServerConfig.setListenPort(9876)
  创建commitServeice线程 用于刷盘
  producer.send(zeroMsg)
  this.defaultMQPushConsumerImpl.start()
  pull 长轮询组件
  修改 topic 为 SCHEDULE_TOPIC_XXXX和 queueId
  while (!this.isStopped())
  找到相应的 MessageQueue在根据 queue找到broker 往broker发送消息
                  msg.setTopic(topic);                msg.setQueueId(queueId)
  启动 broker
  执行线程的start方法
  new DefaultMQProducer(\"producerGroup\")
  createBrokerController
  回调 方法 执行 callback
  启动客户端 netty
  NameSrv 入口
  触发netty进行注册路由信息
  controller.start()
  this.consumerFilterManager.load()
  如果不是延迟消息以零拷贝的方式写入文件存储消息如果commitlog文件不够存储消息 会根据偏移量创建新的文件
  这是两个方法 先创建BrokerController
   new BrokerConfig()
  创建发送消息线程池
  客户端 消费消息入口
  (AsyncNettyRequestProcessor)pair.getObject1()
  (AsyncNettyRequestProcessor)pair.getObject1();
  ServerUtil.buildCommandlineOptions(new Options())
  获取  broker地址
  入口
  创建查询消息线程池
  调用 send方法  发送消息
  brokerConfig.getNamesrvAddr()
  这些线程池有一系列任务 统计任务持久化任务等等
  逻辑大致相同 创建了 netty 客户端 加入handler
  通过netty 客户端进行发送到服务端
  service.putRequest(request)
  获取MessageStroe 进行异步写入数据
  BrokerStartup
  创建 BrokerController
  循环进行分发
  后面代码是 如果以前有过发送失败 就会尽量绕过失败的 broker
  启动
  整个过程是创建controller
  this.consumeMessageService.start()
  绝大数是并发消息
  处理 channel 注册路由
  检测命令行参数 -c 不是重点
  选择具体的 处理器
  System.exit(-2)
  this.subscriptionGroupManager.load()
  callback(RemotingCommand response)
  调用 ReputMessageService run方法ReputMessageService 是继承了ServiceThread 所以执行具体类的 run方法 
  根据不通的请求 找到对应的 RemotingCommand 进行发送 
  dispatcher.dispatch(req)
  顺序消息
  扫描任务
   
 
 
 
 
  0 条评论
 下一页
  
   
   
   
   
  
  
  
  
  
  
  
  
 