ES索引拆分同步方案
2026-01-05 14:44:17 0 举报
本文档详细介绍了一种基于时间序列的数据存储优化策略:ES索引拆分同步方案。该方案设计了一套精巧的算法,实现了高效的数据分割和自动化同步流程,其目的在于提升大数据处理的效率和查询性能。核心内容包括弹性搜索(Elasticsearch,简称ES)索引的动态拆分机制,以及确保数据完整性和一致性同步的方法。此方案适用于需要处理大规模、高速度生成的时间序列数据的应用场景。通过本方案,能够实现数据按预定时间段(如按周、月)自动拆分到新索引中,并将分散的数据综合同步到全局视图中,增强数据检索的能力和扩展数据处理平台的吞吐量。整体而言,该方案通过高级索引设计和智能同步策略,有效解决了ES在大规模时间序列数据存储和检索中常见的性能瓶颈问题。
作者其他创作
大纲/内容
任务发起
注意:索引的创建后的读写设置及查询验证
1、根据不同的库切换数据源;2、根据不同的SQL查询数据(注意分页处理[各查count])
1、DB读取到的数据为冷数据;2、把冷数据写入到冷索引;2.1、写入成功,删除热索引数据;2.2、写入失败,则认为失败;3、删除热数据失败,则认为失败;
推送请求队列
AbstractDbSource
default void setSession();default Connect getConnect();...
索引操作同别名多索引
db-sync-task
1、根据库表。定位配置表;2、根据配置表,拼接SQL:并定位出索引名(传uuid 时,无法定位索引)2.1、传业务表uuid 的,拼uuid;(此时,索引名称无法定位,Kafka消息中,不传索引名称)2.2、全量的(只支持动态索引); 根据规则,拼接SQL;
listener(warpMsgTopic)
请求JSON
listenerAndSendKafka(TaskReq req);//先落库,后发送Kafka消息;
冷热
发送指定Kafka消息队列的接口(warpMsgTopic)
xxl
esDataMoveMakeOkHandler
syncEsDataMoveMakeOk(int num);//失败次数,不传默认5次内的失败数据;逐条发送
业务组
暂定:如部分失败,则认为失败;失败的补偿执行一次;
改造功能点1、业务组-组内功能2、周边系统(??)3、运营后台(例:订单数据处理的)4、数据修改单(esTools)5、es-sync/es-sync-task(注意:对现有的定时任务的影响)6、写/读数据库的一套新方法;(业务表的)7、新接口:数据移动(读->写)索引数据
切割规则
推送返回队列-通知处理结果
记录执行结果表数据
任务结束
默认记录失败
listener
updateResult(Res res);//
索引操作失败,则任务失败
esDataMoveHandler
str dbName ;str tabName;int task_source;//任务来源;默认:1-xxl;2-esTools;3-Kafka(业务组发起的)List<str> uuids;
推送请求队列/http
esTools
1、根据切割方式,确定索引名称2、传入业务uuid的,则需查库定位索引名称(task服务)
根据查询条件返回所需数据
更新任务结果
通知的消息对象-Res
int configUuid;//业务ES分索引配置表uuid;str taskReq;//任务请求参数(json格式)int state ;//同步结果0-失败(默认);1-成功;2-同步中str msg;//同步结果信息...
例:”冷热: orderfinished时间: order_20120101、order_20130101、如为:时间切割方式。需在年度切换时自动创建索引,并设置为读;如为:冷热切割方式。如未进行拆分,需新增冷索引,并设置为读;
1、读取到的数据为历史数据;2、把数据写入到对应的索引2.1、写入成功,删除当前索引数据;2.2、写入失败,则认为失败;3、删除es数据失败,则认为失败;
调用/推送成功即成功
业务ES分索引配置执行结果表
int config_uuid;//业务ES分索引配置表uuid;str task_req;//任务请求参数(json格式)int state ;//同步结果0-失败(默认);1-成功;2-同步中int err_count;//失败次数str msg;//同步结果信息int task_source;//任务来源;默认:1-xxl;2-esTools;3-Kafka(业务组发起的)int incr_sync;//增量标识;默认:0-全量;1-增量;...
getXxxx();updataXxx();...
HTTP
异步
业务ES分索引配置表
str uuid ;//主键int index_type;//集群标识str index_alias;//索引别名str db_name;//库str tab_name;//表str 表名对应实体类全路径int index_split_type ;//索引切割方式;1-冷热;2-时间str index_suffix;索引后缀;例冷热时:finxx,时间时:2022-2025str sql ;//对应的查询数据的sql...
业务组: 增量同步
查询验证数据是否操作成功(删除、插入);如不符合预期,则认为失败
OrderDbSource
getSqlSession();getConnect();
1、DB读取到的数据为冷数据;2、把冷数据写入到冷索引;2.1、写入成功,删除热索引数据;2.2、写入失败,则认为失败;3、删除失败,则认为失败;
根据集群标识切换ES连接
1、读取到的数据为历史数据;2、把数据写入到对应的索引2.1、写入成功,删除当前索引数据;2.2、写入失败,则认为失败;3、删除失败,则认为失败;
补偿任务
时间
任务结果体现索引操作结果
kafka&HTTP
sendTaslResultMsg
send(Res res);//
发送成功即任务成功
ProductDbSource
需关注的问题:1、存量数据数据初始化问题; 所需时间及结果,不影响查询使用2、数据检查问题(重复&缺失); 定时对比删除&补偿3、索引变化,数据同步问题; 更换索引,只能人为介入处理4、相同数据被同时触发问题; 1.存在么? 2.如果存在,可能会有数据不在对应索引内问题,但不会重复;定时再次执行时会处理掉;
...
读取配置表,根据规则处理索引;1、根据集群标识切换ES连接2,有索引,不处理;3、没有索引,进行新索引创建等动作;
es-sync
esTools:1、特殊表重建索引;重建索引切换使用时,会引起拆分失效,需在次进行索引数据迁移动作2、新增页面-特殊表(拆索引的表);1、全量同步(需控制请求频率);2、增量同步
kafka
processingData(TaskReq req);//监听消息进行处理
taskReq 请求对象
str uuid;//配置表的uuid,通知执行结果时,回传回来int indexType;//es索引所在集群标识:0-默认;1-业务集群1;2-业务集群2;3-业务集群3str indexName;es索引名称(落es索引);增量时,该字段无值str indexAlias ;//索引别名;str dbName;//库str sql ; 迁移的数据SQL;如涉及动态赋值,需在es-sync服务中拼接完整int incrSync;//增量标识;默认:0-全量;1-增量;int indexSplitType ;//索引切割方式;1-冷热;2-时间...
特殊表全量(增量)同步接口
str dbName;str tabName;List<str> uuidList ;//传入则按uuid 同步,不传,则全量表同步
0 条评论
下一页
为你推荐
查看更多