RocketMQ存储机制
2022-12-27 23:35:15   4  举报             
     
         
 RocketMQ存储机制架构与源码
    作者其他创作
 大纲/内容
 BrokerControoler.iniitialize()
    两个线程FlushCommitLogService extends ServiceThread
  waitPoint.await
  构造方法
          // 不断交替 写入列表  requestsWrite 和 读取列表 requestsRead        // 类似于生产者和消费者 ,但是生产和消费队列分开,不断交替
  获取消费分区初始值---topic+queueId CommitLog的Offset----fileFromOffset+ByteBuffer.position.....多个属性写入ByteBuffer
  二级缓存[MappedFile.writeBuffer]isTransientStorePoolEnable
  同步刷盘线程GroupCommitService
  doCommit
  同步刷盘
  1-获取当前内存对象的写入位置(wrotePostion变量值),若写入位置没有超过文件大小则继续顺序写入2-由内存对象MappedByteBuffer创建一个指向同一块内存的ByteBuffer对象,并将内存对象的写入指针指向写入位置3-以font color=\"#d32f2f\
  与运算<<右移。
  this.onWaitEnd
  获取groupCommitService
  自定义CountDownnLatch
  MPSC多生产者单消费者
  通过MappedFileQueue获取MappedFile
  锁
  增加的重置latch的特性
  GroupTransferService 继承于ServiceThread
  可重入锁自旋锁
  数据写入流程
  run
  public final boolean releaseShared(int arg) {//共享锁   if (tryReleaseShared(arg)) {        doReleaseShared();  //唤醒所有等待的线程        return true;   }        return false;   }
  private final boolean parkAndCheckInterrupt() {        LockSupport.park(this);        return Thread.interrupted();    }
  同步复制的场景
  异步刷盘
  是否需要睡眠
  计数器值不能重置
  释放锁
  消息存储CommitLog.putMessage(..)
  刷盘线程
  JUC的CountDownLatch
  1-找到模板方法2-找到子类钩子方法的实现
  有啥妙的?
  DefaultAppendMessageCallback.doAppend(...)
  子类钩子实现
  自定义了自旋锁或重入锁
  wakeup()
  异步刷盘线程FlushRealTimeService
  对锁进行抢占,    public void await() throws InterruptedException {        sync.acquireSharedInterruptibly(1);    }
      public final void acquireSharedInterruptibly(int arg) throws InterruptedException {       if (Thread.interrupted())            throw new InterruptedException();        if (tryAcquireShared(arg) < 0)            doAcquireSharedInterruptibly(arg);    }
  MappedFile.appendFile()
  共享锁的释放
  DefaultmessageStore.的构造方法调用
  上锁
  入口
  创建一个映射文件队列-new MappedFileQueue(..)创建刷盘线程-new GroupCommitService/FlushRealTimeService()创建可重入锁/自旋锁
  requestRequest记录了下一条消息要写入的位置,当前消息的尾巴? nextOffsetflushWhere=表示该位置的消息已经刷入磁盘
  GroupCommitService       --读写分离模式--onwatiEnd钩子回时,进行读队列与写队列的交换。
  同步器-new CountDownLactch(2)==this.sync=new Sync(2)==setState(2)队列同步器,抢占不成功,就会等待=>doAcquireSharedInterruptibly(x);
  二级缓存提交线程
  构造GroupCommitRequest(result.getWroteOffset+result+getWroteBytes())//nextOffset到达CommitLog的最后面了。service.putRequest(req); //背后做了什么?boolean flushOk= request.waitForFlush(timeOut);//等待闭锁
  waitForRunning()
  CountDownLatch2
  HAService调用GroupTransferService.putRequest(req)
  根据需要设置延迟级别,topic系统标志
  核心成员变量与方法
  钩子实现
  业务线程等待-waitFor?
  文件队列
  indexFile
  主从复制handleHA
  public void countDown() {     sync.releaseShared(1); }
  因为ServiceThread是单消费者的模式。重置的时候,等待队列里面没有等待线程在等待了
  是否中断?
  模板方法--定义在AQS中
  waitPoint.reset
  waitForRunning
  CommitLog
  ConsumeQueue
  获取或创建最后一个映射文件MapedFileQueue.getLastMapedFile()
  protected int tryAcquireShared(int acquires) {  return (getState() == 0) ? 1 : -1;}即判断getState()==0? 如果为0,则抢占成功,不为0则失败,排队。
  可以执行定时任务+紧急任务周期性的任务执行-waitForRunning(interval)打开闭锁,使得具有执行紧急任务的特性-wakeUp()等待的时候,不会去耗费CPU的时间片?
  0x1 <<1 0x1 <<20x1 <<4
  this.swapRequests
  有waitStoreMsgOK的标志同步消息会带有该属性
  刷盘失败-则?
  同步器里面的模板方法
  数据刷盘handleDiskFlush
  核心成员变量
  ServiceThread线程模式
  HAService.putRequest(new GroupCommitRequest(nextOffset))request.waitForFlush(timeOut)
    
    收藏 
      
    收藏 
     
 
 
 
 
  0 条评论
 下一页
 为你推荐
 查看更多
    
   
  
  
  
  
  
  
  
  
 