ES灾备数据同步
2025-12-16 16:29:28 0 举报
数据同步
作者其他创作
大纲/内容
捞取的数据后进行多线程处理,保证每行任务分配到不同的微服务中
索引操作失败,则认为失败
告知业务部对比结果uuid
taskReq 请求对象
str uuid;//配置表的uuid,通知执行结果时,回传回来int indexType;//es索引所在集群标识:0-默认;1-业务集群1;2-业务集群2;3-业务集群3str indexName;es索引名称(落库索引);str indexAlias ;//索引别名;str dbName;//库str tabName;//表str sql; 迁移的数据SQL;如涉及动态赋值,需在es-sync服务中拼接完整...
ProductDbSource
getSqlSession();getConnect();
listener
请求JSON
sendKafka(TaskReq req);//发送Kafka
kafka
使用新队列通知
增加同步结果表记录被调用的处理结果(写ES的结果);前期处理结果均记录,后期稳定后,不在记录成功的数据
专门的灾备同步ES的服务
发送成功即任务成功
es-sync
冷热
(优先级不高)重要/不紧急
updateResult(Res res);//
查询验证数据是否操作成功(删除、插入);如不符合预期,则认为失败
灾备ES
canal增加往灾备topic里推送消息,范围同生产topic
补偿任务
任务发起
...
通知的消息对象-Res
int configUuid;//业务ES分索引配置表uuid;str taskReq;//任务请求参数(json格式)int state ;//同步结果0-失败(默认);1-成功;2-同步中str msg;//同步结果信息...
请求参数:1、任务配置表的id集合2、库名;只有当id集合传值时,该字段可不传3、表名集合;4、时间范围start5、时间范围end 说明:1、均不传:则直接返回,不进行任何数据处理;2、只传id集合:则只处理任务配置表的指定任务3、只传库名:则查询 该库名 的所有数据,数据范围:数据表中的 数据范围类型+数据范围值4、只传库名+表名集合:则查询 该 库名 下的该 表名集合 数据,数据范围:数据表中的 数据范围类型+数据范围值5、只传库名+表名集合+时间范围start/end:则查询 该 库名 下的该 表名集合 数据,数据范围:传过来的开始时间和结束时间6、只传库名+时间范围start/end:则查询 该 库名 下的数据,数据范围:传过来的开始时间和结束时间
str dbName ;str tabName;List<str> uuids;
HTTP
新实例DBMySql
esTools里是否已有对应功能增量数据对比(DB<=>ES)
<<接口>>AbstractDbQueryService<T>
RetDTO handleTask(Object req);Pager queryPage(req);private warpEsData();RetDTO writeEs(BaseEs);//返回处理结果
读
1、根据不同的库切换数据源;2、根据不同的SQL查询数据(注意分页处理[各查count])
具体同步逻辑同生产逻辑,但落灾备集群ES宽表
灾备DB
业务ES分索引配置表
str uuid ;//主键int index_type;//集群标识str index_name;//索引名称str db_name;//库str tab_name;//表int index_split_type ;//索引切割方式;1-冷热;2-时间str sql ;//对应的查询数据的sql...
getXxxx();updataXxx();...
注意:索引的创建等等操作
sendTaslResultMsg
send(Res res);//
1、根据库表。定位配置表;2、根据配置表,拼接SQL:并定位出索引名2.1、传业务表uuid 的,拼uuid;2.2、全量的(只支持动态索引); 根据规则,拼接SQL;
根据集群标识切换ES连接
esDataMoveHandler
时间
默认记录失败
db-sync-task
异步业务表数据对比
getResult();//需要返回对比结果
改造功能点1、业务组-组内功能2、周边系统(??)3、运营后台4、数据修改单(esTools)5、es-sync/es-sync-task
周期处理
业务组
1、默认按每个库配置定时2、自定义配置定时
xxl-admin配置对应的执行任务
(优先级不高)不重要/不紧急
N
1
调用es-sync服务新增的灾备topic
记录执行结果表数据
增加任务配置表1、【库名】为必填2、【表名】为必填 3、【es索引所在集群标识】为必填,枚举值;1-业务集群1;2-业务集群24、【es索引名】为必填5、【数据范围类型】为必填;1-分钟;2-小时;3-天6、【数据范围值】为必填;范围类型+值来捞取当前时间之前时间范围的时间
es_task_info 表
公共字段....str uuid;str db_name;//库名称,用于定位DB,默认空字符串str table_name;//DB 下的 表名称,默认空字符串int env_type;//环境标识;0-灾备;1-生产str 表名对应实体类全路径int index_type;//es索引所在集群标识:0-默认;1-业务集群1;2-业务集群2str index_name;es索引名称,默认空字符串int data_type;数据范围类型;0-默认(秒);1-分钟;2-小时;3-天str data_value;数据范围值,与类型搭配使用,默认空字符串
esTools
1、读取到的数据为历史数据;2、把数据写入到对应的索引2.1、写入成功,删除当前索引数据;2.2、写入失败,则认为失败;3、删除失败,则认为失败;
调用es-sync服务(发起的入口)
配置多个定时任务;默认按库配置定时;当灾备切换后,灾备同步的相关定时任务需要停止
根据查询条件返回所需数据
切割规则
1、DB读取到的数据为冷数据;2、把冷数据写入到冷索引;2.1、写入成功,删除热索引数据;2.2、写入失败,则认为失败;3、删除失败,则认为失败;
重点内容1、多数据源配置及切换;2、单任务多线程数据处理;3、注意批量落ES时,需拿到每条的结果,只要有失败的即为任务失败;4、从数据库抽数的效率;深度分页后,单次查询耗时需控制;5、整体同步的效率;例:增量30分钟的数据,需在30分钟内处理完成;6、任务的请求参数合规组合的校验;例:请求参数的 开始时间需小于结束时间7、补偿任务:优先处理新失败的任务;其次是失败次数少的任务span style=\
TaskReq 请求对象
List<str> uuidList;//任务表uuidstr db_name;//库名List<str> tableNameList;//不为空时,db_name必须传值str startTimestr endTime
esTools:1、特殊表不支持重建索引2、判断是否为特殊表(拆索引的表);1、全量同步2、增量同步
updateResult();//
暂定:如部分失败,则认为失败;失败的补偿执行一次;
OrderDbSource
任务结束
业务表同步方案
AbstractDbSource
default void setSession();default Connect getConnect();...
1、存量数据数据初始化问题;2、数据检查问题(重复&缺失);3、索引变化,数据同步问题;4、相同数据被同时触发问题;
xxl
异步调用,不等待微服务处理结果
es_task_info_result 表
公共字段....str es_task_info_uuid;//es_task_info表uuid;默认空字符串str task_req;//任务请求参数(TaskReq toJSON);默认空字符串int state;//处理结果,0-默认(失败);1-成功;int err_count;//失败次数 0-默认。补偿失败则+1次str msg ;//处理结果信息;默认空字符串str end_time;任务结束时间;默认空字符串;公共字段的【create_time】 为任务开始时间str xxxxx ;//estools 对应库的业务表 uuid
推送成功即成功
业务ES分索引配置执行结果表
int config_uuid;//业务ES分索引配置表uuid;str task_req;//任务请求参数(json格式)int state ;//同步结果0-失败(默认);1-成功;2-同步中int err_count;//失败次数str msg;//同步结果信息int task_source;//任务来源;默认:1-xxl;2-esTools;3-Kafka(业务组发起的)...
异步
processingData(TaskReq req);//监听消息进行处理
UserDbSource
消息监听
更新对比结果
特殊表全量(增量)同步接口
str dbName;str tabName;List<str> uuidList ;//传入则按uuid 同步,不传,则全量表同步
1、根据切割方式,确定索引名称2、传入业务uuid的,则需查库定位索引名称
推送请求队列
DbToEsHandler
incrSync(TaskReq req);
业务组: 增量同步
索引操作成功后,推送请求队列
读|写
esDataMoveMakeOkHandler
syncEsDataMoveMakeOk(int num);//失败次数,不传默认5次内的失败数据;逐条发送
AbstractDao<T>
宽表同步方案
索引操作同别名多索引
推送返回队列-通知处理结果
incryData();//监听消息进行处理
1、es-sync服务中,增加补偿定时入口;2、处理失败的数据,处理5次后不在进行补偿
任务中,需在根据数据量进行多线程数据处理,汇总后更新任务结果
写
0 条评论
下一页