seata分布式事务源码解析
2021-07-30 18:21:07 1 举报
登录查看完整内容
seata源码解析
作者其他创作
大纲/内容
初始化branchSession
branchCommit
获取全局锁
初始化RM客户端
TC(server)2.分支事务注册
准备向TC服务端请求
afterImage
Defaultsqlsession#insert
ServerOnRequestProcessor#processMessage
放入集合中,供后续调用
DataSourceProxy
handleAsyncCommitting
全局事务DefaultCoordinator#doGlobalBegin
GlobalTransactionScanner#wrapIfNecessary
定时任务:每隔1秒
Server#main
(GlobalCommitResponse) syncCall
1.提交分支事务
spring aop扩展点:继承AbstractAutoProxyCreator,会调用wrapIfNecessary
globalSession#addBranch
db删除
GlobalSession#createGlobalSession
本地事务提交
根据表名:id去数据库seata/lock_table中查
connectionProxy#register
读取@GlobalTransactional注解
向服务端获取XID
分支事务DefaultCoordinator#doBranchRegister
logStore#updateGlobalTransactionDO
1.全局事务注册TC(server)
AbstractDMLBaseExecutor#executeAutoCommitTrue
2.分支事务入库
XID生成
根据sql语句类型获取excutor类型
Excutor:UpdateExecutor/DeleteExecutor
异常释放锁
branchSession#lock
handleGlobalTransaction
transactionMessageHandler#onRequest
GlobalTransactionalInterceptor
AbstractSessionManager#writeSession
RMClient#init
this.xid = XID.generateXID
TMClient#init
1.收集行锁到lock_table2.保存分支事务到branch_table
执行业务SQL
undolog数据库删除
扩展点:实现了MethodInterceptor接口,调用业务方法前会调用拦截器的invoke方法
spring ioc扩展点:GlobalTransactionScanner实现了InitializingBean接口,会调用afterPropertiesSet
插入到seata/global_table
全局事务-获取XID:Netty远程调用TC-server
DefaultCoordinator#onRequest
LockStoreDataBaseDAO#acquireLock
rs = business.execute()
初始化客户端请求处理handler
GlobalTransactionScanner
DefaultGlobalTransaction#begin
执行开发人员业务逻辑,如订单插入
生成前置镜像
准备undolog
RPC调用删除undolog
branchSession.unlock()
AbstractSessionManager#updateGlobalSessionStatus
全局事务开始
修改全局事务状态
初始化全局事务拦截器
RootContext.bind(xid)
connectionProxy.commit()
同步请求:会重试5次提交
ConnectionProxy
undolog入库,此时是order库
MySQLUndoLogManager#insertUndoLog
(GlobalBeginResponse) syncCall
delete lock_table where xid=? and branchid=?
分支事务RPC调用:获取分支ID
LockStoreDataBaseDAO#doAcquireLock
ExecuteTemplate#execute
后置镜像
AbstractDMLBaseExecutor#doExecute
1.全局事务开始
connectionProxy#processGlobalTransactionCommit
flushUndoLogs(this)
DefaultCore#commit
implements
GlobalSession#begin
入口
assertGlobalSessionNotNull
writeSession
seata源码开始
XID上下文绑定
同步请求TC
分支事务
1.全局锁保存到数据库lock_table
SessionHolder.findGlobalSession(xid)
TransactionalTemplate#commitTransaction
GlobalTransactionScanner#afterPropertiesSet
lifecycleListener#onBegin
RPC远程调用
初始化nettyServer
GlobalTransactionalInterceptor#invoke
DefaultCore#doGlobalCommit
批量处理注册的processor
全局事务提交DefaultCoordinator#doGlobalCommit
如果可以提交事务
根据XID获取globalSession
DefaultCore#begin
ServerHandler#channelRead
LockStoreDataBaseDAO#unLock
PreparedStatementProxy
getConnection
SessionHelper#endCommitFailed
TransactionalTemplate#beginTransaction
xid = transactionManager.begin
SessionHelper#newBranchByGlobal
异步提交事务
globalSession#asyncCommit()
List<SQLUndoLog> sqlUndoItemsBuffer
@Bean初始化GlobalTransactionScanner
2.分支事务开始
executor#execute
2.删除全局事务
update global_table set status=8(asycommit) where xid=xxx
logStore#insertBranchTransactionDO
undolog入库
分布式事务,数据库会交由seata代理管理
lifecycleListener#onAddBranch
DefaultCoordinator.#init
4.删除分支事务
globalSession#changeStatus
beforeImage
statementCallback#execute
3.提交全局事务开始
connectionProxy#doCommit
targetConnection#commit()
监听客户端请求
TransactionalTemplate#execute
extends
prepareUndoLog
orderDao.saveOrder(order)
branchRegister
TC(server)3.全局事务提交
根据XID获取全局事务,去数据库中查
DataBaseTransactionStoreManager#writeSession
AbstractCore#branchRegister
客户端
AbstractCore#branchCommitSend
DefaultGlobalTransaction#commit
初始化TM客户端
NettyRemotingServer
3.删除全局锁
AbstractDMLBaseExecutor#executeAutoCommitFalse(args)
AsyncWorker#doBranchCommits
logStore#insertGlobalTransactionDO
SeataAutoConfigurationseata-spring-boot-starter-1.4.0/spring.factories
AbstractNettyRemoting#processMessage
writeSession入口
本地事务提交,此时order表会生成数据
globalSession#removeBranch
branchSessionUnlock
transactionManager#commit
0 条评论
回复 删除
下一页