canal源码全流程分析1.1.5
2022-08-02 13:48:21 0 举报
登录查看完整内容
canal源码
作者其他创作
大纲/内容
start(destination)
InstanceConfigMonitor.start()
EvenSink
FileMixedMetaManager
初始化ServerRunningMonitors.runningMonitors
启动Netty网络接口
subscribeChange
zk操作
初始化CanalInstanceGenerator(manager|spring)
defaultAction.reload(destination);
initInstanceConfig(properties);
提交到store前回调
数据存储
embededCanalServer.start(destination);
afterStartEventParser(eventParser);
初始化instanceConfigMonitors
EventParser
new CanalController(properties);
实现类
erosaConnection.dump()
自增batchId以及记录该batchId的Event点位范围
metaManager.listAllSubscribeInfo
canalInstance.start();
CanalEventParser.start()
canalServer = CanalServerWithNetty.instance();
用来获取SpringInstanceConfigMonitor或者ManagerInstanceConfigMonitor
mysql
初始化一些变量
doSink(events);
ack
CanalEventStore.start()
CanalAlarmHandler.start()
CanalLogPositionManager记录binlog最后一次解析成功位置信息,主要是描述下一次canal启动的位点
eventStore.rollback()
handler.after(events);
canalServer(netty)
清除对应的batch信息
sinkData()
CanalHAController.start()
transactionBuffer.add(entry);
CanalController.start()
start()
getWithoutAck
updateSettings
get()
用于监控每一个CanalInstance。
获取主库checksum信息
发送注册Slave命令报文
CanalEventSink.start()
通过网络最终到达CanalServerWithEmbedded
MetaManager
TransactionFlushCallback.flush()
更新消费点位
sendRegisterSlave
initGlobalConfig(properties)
eventSink.sink()
解析LogEvent转化为Entry
PeriodMixedLogPositionManager
将EventStore中获取的Events转化成Message
MixedLogPositionManager
对当前链接进行一些参数设置
rollback
发送Binlog Dump命令报文
MemoryMetaManager
对一个batch的确认
eventStore.ack()
clearAllBatchs
BinlogParser
CanalLauncher
setMode(manager|spring)setLazysetManagerAddresssetSpringXml
MixedMetaManager
满足条件触发事务刷新机制
cleanUntil
放入缓冲区
提交store成功后回调,目前没有实现
ZooKeeperMetaManager
获取上一次成功的位置
listener.processStart();
flush()
数据源接入,协议解析
更新下历史订阅的filter信息
MemoryEventStoreWithBuffer
SinkFunction.sink(event)
client
PeriodMixedMetaManager
记录当前解析的位置
findStartPosition
autoScan
socket
defaultAction.stop(destination);
embededCanalServer.start();
doGet
embededCanalServer = CanalServerWithEmbedded.instance();
从内存Event[]中取数据
logPositionManager.persistLogPosition();
1. 首先查询上一次解析成功的最后一条记录 2. 存在最后一条记录,判断一下当前记录是否发生过主备切换 a. 无机器切换,直接返回 b. 存在机器切换,按最后一条记录的stamptime进行查找 3. 不存在最后一条记录,则从默认的位置开始启动
sendBinlogDump
初始化自动扫描机制InstanceAction
beforeStartEventParser(eventParser)
CanalInstance
MemoryLogPositionManager
MetaLogPositionManager
loadBinlogChecksum
(若有配置)初始化ZkClientx
deployer
(若有配置)ZkClientx创建整个canal的工作节点
CanalHAController控制Evenparser的链接主机管理,判断当前该链接哪个mysql数据库
启动类
listener.processActiveEnter();
更新put点位
CanalStarter.start()
配置canal.withoutNetty=false时
canalServer.start();
FailbackLogPositionManager
getEvents
handler.before(events);
更新get点位
eventParser.setEventFilter()
将ack之前的内存全部释放掉
EventTransactionBuffer
FileMixedLogPositionManager
将get的位置重新设为ack的位置
ServerRunningMonitor.start();
CanalLogPositionManager.start()
CanalMetaManager.start()
继承
启动一个定时线程扫描查询配置是否有变动,若有变化
doPut(data);
读取一下历史订阅的filter信息
getSequence.set(ackSequence.get());getMemSize.set(ackMemSize.get());
ZooKeeperLogPositionManager
TableMetaCache
增量订阅&消费信息管理器
Parser和Store的链接器,数据过滤,加工,分发到工作
doFilter(entry)
建立数据库连接
defaultAction.start(destination);
logPositionManager.getLatestIndexBy
eventStore.tryPut(events)
erosaConnection.connect();
CanalServerWithEmbedded
定时将内存中的值刷到zookeeper
setServerRunningListener
0 条评论
回复 删除
下一页