基于消息的分布式事务
2021-01-30 10:15:08   0  举报             
     
         
 分布式事务 消息
    作者其他创作
 大纲/内容
 虚拟节点
  transaction
  H1
    XMLMapperBuilder解析mq-tx的XML注册到mybatis的Configuration
  P4
  节点编号:1
  业务处理
  阶段四:创建返回阿里云Producer代理对象
  H4
  2
  P
  :
  常态部署节点
  P1/P2/P3/P4
  消息落库
  P2
  K2
  【0 ~ uint32】
  H2
  H
  消息落库TOPIC_TABLE
  1
  阶段一:扫描mq-tx注册beanDefinition交由spring管理
  consumer
  处理BeanProcess实现一
  节点编号:2
  本地业务处理。。。。。
  3
  P3
  index=(uint32)hash(key)
  节点编号:3
  投递/可重试
  H3
  处理BeanProcess实现二
  调用
  消息落库询标1:通过hash算法对key进行hash,得到uint32的index2:寻找hash环中最近的一个虚拟节点3:寻找虚拟节点映射的后端真实节点节点编号
  K4
  P1
  mq-tx中间插件
  TransactionSynchronizationAdapterafterCommit
  K3
  spring
  节点编号
  启动类加入@EnableLefitMqTransaction实现了ImportBeanDefinitionRegistrar
  producer
  H1/H2/H3/H4
  异步任务待处理
  消费成功
  K
  扩容增加部署节点
  K1
  index
  减少部署节点
  K1/K2/K3/K4
  调用阿里云producer真实对象send更新本地库msgId
  扫描被spring管理的bean 即被定义为@Compten@Configuration等
  mybatis
  调用producer的send
  消息落库由一致性hash得到最近虚拟index节点对应真实节点打标
  Rocket MQ
  代理ONS的Producerbean instanceof Producerorbean field instanceof Producer
  真实节点
  重写mybatis @MapperScan
  扫描配置原业务mapper
  启动
  阶段二:扫描mq-tx的mapper,目的统一交由spring transaction维护
  自定义@MapperSacn
  消费回执
  处理实现ImportBeanDefinitionRegistrar
  @Transaction
  基于spring无侵入的分布式消息落地织入流程
  扫描mq-tx中的mapper
  阶段三:扫描mq-tx的mybatis的xml创建阶段二中mapper的代理实现
  保证幂等
  重写postProcessAfterInitialization方法处理SqlSessionFactoryBean创建阶段二mq-tc中mapper代理实现
  服务下线:1:K服务下线选定第一个(策略待定)虚拟节点;延时触发(时间待定)2:寻找hash环中最近一个虚拟节点;3:获取映射虚拟节点对应的真实节点;例如:P节点服务4:P节点服务查询编号3对应的DB数据,创建新队列,启用线程池消费;服务上线:1:ZK通知在线节点服务,处理是否存在非自己节点以外的队列数据,存在即立即停止;2:上线服务获取自己节点对应的节点编号查询DB数据,加入队列线程池消费;
   
 
 
 
 
  0 条评论
 下一页
  
   
   
   
   
  
  
  
  
  
  
  
  
 