MQ ElasticSearch
2021-06-29 12:02:02 92 举报
AI智能生成
登录查看完整内容
MQ ElasticSearch
作者其他创作
大纲/内容
MQ ElasticSearch
MQ
为什么要用MQ?
1.解偶
2.异步
3.削峰
消息队列优点缺点?
1.系统可用性降低
MQ一旦故障,系统A就没办法发送消息到MQ了,系统BCD也没法接收到消息。整个系统就崩溃了,就没有办法运转了
2.导致系统要考虑的问题变多,进而导致系统复杂性变高
系统A本来给系统B发送一条就可以,但是由于系统A和MQ协调问题,系统A发送了两条一摸一样的数据。
3.一致性问题
有人给A发送请求,本来这个请求ABCD都执行成功才能返回,结果D执行失败了,就导致整个请求给用户返回的是成功,结果后台逻辑实际上差了一点,没有完全执行完
MQ对比
ActiveMQ
成熟,功能强大
偶尔丢失数据,维护越来越少
RabbitMQ
吞吐量万级,相对较小,MQ功能比较完备
用erlang开发,性能极好,延迟最低
不会丢失消息
开源提供管理界面很棒
社区相对比较活跃
RocketMQ
阿里开发,支持高吞吐量
分布式扩展起来比较方便
java开发,源码可以研究点
如果阿里不支持了就黄了
Kafka
吞吐量十万级
功能比较少
天然适合大数据的实时计算和日志采集
如何保证消息队列高可用?
Kafka高可用架构
每台机器+机器上的broker进程,就可以认为是kafka集群中的一个节点
每个节点存储一部分topic的partition每个节点可以设置多个副本,选举一个为leader,其他副本为follower生产者只能往leader里写数据,写入数据到leader的时候,leader就会将数据同步到follower上去
RabbitMQ集群模式
普通集群模式
缺点
1.可能在rabbitMQ集群内部产生大量的数据传输
2.可用性几乎没有什么保障,如果queue所在的节点宕机了,就导致那个queue的数据丢失了,你就没法消费了
唯一好处
能够从多个机器上去消费提高消费的吞吐量而已
只是做了集群,不算分布式
如果找的不是queue所在的主节点,找的这个节点回去主节点去要所需的queue,然后给到客户端消费
镜像集群模式
每个节点上都有queue的完整镜像,包含了queue的全部数据
高可用:任何一个节点宕机了,没问题,其他节点上还包含了这个queue的完整数据,别的consumer都可以到其他活着的节点上去消费
不是分布式的:如果queue的数据量很大,大到机器上的容量无法容纳了,此时该怎么办呢
如何开启镜像集群模式:在管理控制台,后台新增一个策略,这个策略就是镜像集群模式的策略,指定的时候可以要求数据同步到所有节点的,也可以要求同步到指定数量节点,然后再次创建queue的时候应用这个策略,就会自动将数据同步到其他节点上去了
如何保证消息不被重复消费?(如何保证消息消费时的幂等性?)
kafka消费端可能出现的数据重复问题
每个消息都有一个offset,代表这个消息的顺序的序号消费者从kafka消费的时候,是按照这个offset顺序去消费的消费者会去提交offset,告诉kafka已经消费到offset=153\b的这条数据了offset是存储在zookeeper里的消费者系统被重启,重启以后,去kafka继续上次接着消费,但是offset是定时定期提交一次,可能有几条数据消费了,但是没有告诉kafka就重启了导致这几条数据在重启完后重新消费了一次
重复了怎么保证幂等性
幂等性:一个数据或者一个请求,给你重复来多次,你得保证对应的数据是不会改变的,不能出错
具体根据业务来解决
1.生产者发送每条数据时,里面加一个全局唯一的id,类似订单id之类的东西,然后消费到了之后,先根据id去redis里查一下,之前消费过么?如果没有消费过,就正常处理,然后将id写到redis。如果查到了消费过,那就不处理了,保证别重复处理相同的消息即可
如何保证消息的可靠性传输?(如何处理数据丢失问题)
rabbitMQ
1.生产者丢失数据
写消息过程中,消息都没到rabbitmq在网络传输过程中就丢了
消息到了rabbitmq但是人家内部出错了没保存下来
解决
方案1:事务
缺点:这个事务机制是同步的,生产者发送消息会阻塞卡住等待成功,会导致生产者发送消息的吞吐量降下来
方案2:confirm
先把channel设置成confirm模式发送一个消息发送完消息后就不用管了rabbitmq如果接收到这条消息,就会回调你生产者本地的一个接口,通知你说这条消息已经收到了rabbitmq如果在接收消息的时候报错了,就会回调你的接口告诉你这个消息接收失败了,你可以再次重发
生产者这块如果要保证消息不丢,一般是用confirm机制,异步的模式,你发送消息之后不会阻塞,直接你可以发送下一个消息
2.rabbitMQ
rabbitMQ将数据暂存在自己的内存里,结果消费者还没来得及消费,rabbitmq挂掉了,就导致暂存在内存里的数据丢失
1.rabbitMQ开启持久化
1.创建queue的时候设置queue为持久化,但是queue里的消息不回持久化
2.发送消息的时候将消息的 deliveMode设置为2,就是将消息设置为持久化的
2.开启了持久化,但是消息写到了MQ,但是MQ挂了,此时rabbitMQ挂了
3.消费者
打开了autoAck机制,消费者消费到了这个消息,但是还没来得及处理就挂掉了,但是rabbitmq以为这个消费者已经处理完自动ack了。
取消自动ack
在消息消费完后手动ack通知数据成功消费
kafka
消费者弄丢了数据
消费者消费到消息,自动提交offset,让kafka以为你已经消费好了这个消息,但是实际上你刚拿到这个消息还没处理就挂了,导致数据丢失
取消自动offset
进行手动offset,彻底处理完数据再进行offset传输
但是容易出现消费完消息还没提交offset就挂了的情况,导致重复消费
根据实际情况保证幂等性就行
kafka弄丢了数据
生产者将数据传给了主节点 partition1 leader,但是leader还没有把数据同步到自己的从节点partition1 follower就挂掉了。经过重新指定leader后,新leader里没有这个未同步的数据。
解决(设置4个参数)
1.给topic设置replication.factor参数:这个值必须大于1,要求每个partition必须有至少2个副本
2.在kafka服务器端设置min.insync.replicas参数:这个值必须大于1,这个是要求一个leader至少感知到有一个至少有一个follower还跟自己保持联系,没掉队,确保leader挂了还有一个follower
3.在producer(生产者)端设置 acks=all:这个是要求每条数据,必须写入所有replica之后,才能认为是写成功了。
acks=0:发送完数据就不管了。
acks=1:发送完数据只要leader接收到就算成功,默认设置这个比较好
acks=all:必须所有分片都同步完数据才算成功
4.在producer端设置retries=MAX(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了
生产者会不会丢数据?
如果按照上面的配置设置acks=all,一定不会丢
如何保证从消息队列拿出来的数据按照顺序执行?
场景:通过mysql binlog进行数据复制,新增,修改,删除操作语句,必须保证顺序一致。
造成原因:rabbitMQ顺序不一致情况:一个queue多个消费者consumer
解决:设置多个queue,每个queue对应一个消费者,这种顺序操作的数据写入到一个queue里面去
kafka能做到的保障:1.写入partition中的数据一定是有顺序的。2.生产者在写的时候,可以指定一个key,比如指定某个订单的id作为key3.这个订单相关的数据,一定会被分发到一个partition中。而且这个partition中的数据一定是有顺序的4.一个消费者只能消费一个partition中的数据。
1.首先对需要保证顺序的数据指定一个key,保证这些数据都写入同一个partition中2.一个partition只能被一个消费者消费。但是消费者内部如果多线程
但是消费者内部如果多线程
解决:使用内存队列处理,将key hash后分发到内存队列中,然后每个线程处理一个内存队列的数据。
使用延迟队列
生成订单之后,放入TTL队列设置超时时间。如果信息超过设置时间没处理,就进入处理死信的交换机。然后消费者处理交换机绑定的队列消息,判断是否支付等操作,进行解锁库存取消订单等操作。
消息超过了有效期,没有人处理,成为死信
如何解决消息队列的延迟以及过期失效问题?消息队列满了以后该怎么处理?有几百万条消息持续积压几小时,怎么解决?
场景:消费者故障 1个消费者1s 1000条,一秒三个消费者是3000条,一分钟是18万条,1000多万条积压数据。所以如果你积压了几百万到上千万的数据,即使消费者恢复了,也需要大概1小时的时间才能恢复过来。
1.先修复consumer消费者的问题,确保其恢复消费速度,然后将现有consumer都停掉
2.新建一个topic,临时建立好partition原来10倍或者20倍的数量
3.然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮训写入到临时建立好的topic里的10倍数量的partition里
4.接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时的partition数据。
还设置了过期时间,消息延时了并且过期失效了怎么办?
正常不要设置过期失效,这个东西很坑。
批量重导:假设1万个订单积压在MQ里面,没有处理,其中1000个订单都丢了,你只能手动写程序把1000个订单查出来,手动发到mq里再去补一次。
如果消息积压在mq里,很长时间都没有处理掉,此时导致MQ都快写满了,咋办?或者说消费者消费的消息没解决,磁盘快满了
如上面所诉,新建一个MQ,之前的消费者拿到数据什么都不处理,写到新建的MQ中进行10倍消费
如果消费者故障问题没解决,可以先改下消费者代码,拿到消费者的数据直接扔掉,先保证MQ中的磁盘空间等晚上再临时写程序找到丢失的数据重新补回来。
如果让你来开发一个消息队列中间件,如何设计架构?
1.首先MQ得支持扩容吧,所以可以设计一个分布式的MQ,参考kafka的设计理念,broker->topic->partition,每个partition放一个机器,就存一部分数据。等资源不够了,直接给topic增加partition,然后做数据迁移,增加机器,就可以存放更多数据,增加吞吐量。
2.考虑MQ的数据要持久化,落磁盘的数据要顺序写,这样就没有了磁盘随机读写的寻址开销,性能更高。kafka思路
3.mq可用性。参考kafka,多个副本 leader&follower->broker 挂了重新选举leader即可对外服务
上面整体的总结
ElasticSearch
准备
了解下lucene是什么
全文索引是什么
es中存储数据的基本单位是索引,相当于mysql里的一张表。mapping代表了表的结构定义。
倒排索引
传统的我们的检索是通过文章,逐个遍历找到对应关键词的位置。而倒排索引,是通过分词策略,形成了词和文章的映射关系表,这种词典+映射表即为倒排索引。有了倒排索引,就能实现 o(1)时间复杂度的效率检索文章了,极大的提高了检索效率。
es接收到一个文档后,进行字符过滤->分词->归一化(停用词,大小写,单数和复数,相近词(相似词))
BKD树(K-D数和B+树)
如何实现master选举
ZenDiscover模块负责
对所有可以成为master的节点(node.master: true)根据nodeId字典排序,每次选举每个节点都把自己所知道的节点排一次序,然后选出第一个节点,暂且认为它是master节点。
如果对某个节点的投票数达到一定值并且该节点自己也选举自己,那这个节点就是master
脑裂问题
主节点作用
负责集群层面的相关操作,管理集群变更,如创建删除索引,跟踪哪些节点是集群的一部分,并决定哪些分片分配给相关的节点。
es的分布式架构原理能说一下么(es是如何实现分布式的)
1.核心思想就是在多个机器上启动多个es进程实例,组成一个集群。2.创建一个索引,这个索引可以被分成多个shard(分片),每个shard存储一部分数据3.多台机器中设置其中一台作为master node主节点,shared分片也会有主分片primary副分片replica4.主副分片均匀分布在各个机器上5.数据都是写入到主分片,然后主分片同步写入到副分片上。读数据则可读取主分片或者副分片的数据。6.如果某台机器进程宕机,master进程宕机,选举其他进程作为master,并且将宕机的进程里的主分片的副分片转为主分片。7.宕机的进程好了以后,便不再是master 节点,里面的主分片parimary shard转为副分片 replica shard。
es写入数据的工作原理是什么?es查询数据的工作原理是什么?
写数据
基本写入流程1.首先客户端随便选择一个节点去写,此时这个节点称为协调节点2.协调节点对写的数据进行hash,确定这个数据属于哪个shard(分片)3.发现当前协调节点不存在数据应该存储分片的主分片(primary shard),那么就对数据进行路由,到有pimary shard的节点上去4.主节点同步数据到从节点,如果主节点和从节点都写完了,那么协调节点会返回写成功的响应给客户端。
primary shard存储底层原理(refresh,flush,translog,merge)
1.数据写入shard的时候,先写入内存buffer里,同时它会写入到translog日志文件里。(此时如果客户端要查询数据是查不到的)
2.如果buffer快满了或者每隔一段(默认1s)时间,es会把内存buffer中的数据 refresh刷到到一个新的segment file,每隔1秒就会产生一个新的segment文件(存入这1s的数据) 但是如果buffer里面此时没有数据,就不会执行refresh。数据在写入segment file之后,同时就建立好倒排索引了。
3.操作系统中,磁盘文件其实都有一个东西,叫os cache,操作系统缓存。就是说数据写入磁盘文件之前,会先进入os cache。 只要buffer里的数据写入到了os cache里面,客户端就能搜索到这部分数据了。
为什么es是准实时的?因为写入1s后才会刷到os cache里。 写入到os cache里之后,buffer里的数据就会清空,translog会保留。
translog也是磁盘文件,所以也是先写入os cache里的,默认5秒刷新数据到磁盘中
4.当translog不断变大,大到一定阈值,或者30分钟 就会触发commit(flush)操作。(默认30分钟会自动执行)整个commit过程叫flush,手动根据es api也可以执行flush。 commit操作: 1.写commit point 2.将os cache fsync强刷到磁盘上去 3。清空translog日志文件 1.将buffer里的数据都写入os cache里面去,然后清空buffer。 2.将一个commit point文件写入到磁盘,里面标示着之前写入的所有segment file,但是数据还是在os cache中。 3.把os cache缓冲的所有的数据都fsync到磁盘上面的每个segment file中去。 4.刷完以后会删除并新建translog
translog日志作用:数据一般都是存储在buffer或者os cache内存里,一旦服务器宕机重启,内存中的数据就会丢失。所以将es操作日志存储在translog里,es重启时通过translog将数据恢复到buffer及os cache中。
删除数据写入.del文件中标识一下这个数据被删除了,里面某个doc标识为deleted状态客户端搜索某条数据,一旦发现这条数据在.del文件中找到这条数据被标识成删除状态了,就不会搜索出来。
在新的文档被创建时,Elasticsearch会为该文档指定一个版本号,当执行更新时,旧版本的文档在.del文件中被标记为删除,新版本的文档被索引到一个新段。旧版本的文档依然能匹配查询,但是会在结果中被过滤掉。
由于每隔1s生成一个segment file,当文件多到一定程度的时候,es会merge成一个大的segment file,然后删除旧的文件在merge的时候,会看一下如果某条数据在.del文件中标识为删除,那么merge后的新文件里这条数据就没了(物理删除)
数据是准实时的
1s后才将数据刷新到os cache中
丢失数据情况
默认5s才会将translog从os cache写入到磁盘文件中,所以会有5s数据丢失的可能
解决:可以设置个参数,官方文档。每次写入一条数据,都是写入buffer,同时写入磁盘上的translog。 但是会导致写性能,写入吞吐量下降一个数量级。本来1s可以写入2000条,现在1s钟可能只能写200条。
读数据过程根据doc id查询
查询,GET某一条数据。数据写入了某个document,这个document会自动给你分配一个全局唯一的id (doc id),同时也是根据doc id进行hash路由到对应的primary shard上去的。也可以手动指定doc id,比如用户id,订单id。
客户端发送一个请求到任意一个node,成为协调节点协调节点对doc id进行hash,找到对应shard,然后对document进行路由,请求转发到对应的节点,此时会使用round-robin随机轮询算法,在primary shard及所有replica shard中随机选择一个,让读请求负载均衡。接收请求的节点 返回 document 给协调节点(coordinate node)协调节点返回 document 给客户端
搜索数据过程
es最强大的是做全文检索,比如三条数据java真好玩,java好难学,j2ee真好根据java搜索出来
客户端发送一个请求到协调节点协调节点将搜索请求转发到所有的shard对应挑选的primary shard或者replica shard也可以。query phase:每个shard将自己的搜索结果(其实就是一些 doc id)返回给协调节点,由协调节点进行数据的合并,排序,分页等操作,产出最终结果。fetch phase:接着由协调节点,根据doc id去各个节点上拉取实际的document数据,最终返回给客户端。
es调优
es在数据量很大的情况下(数十亿级别),es怎么提高查询性能?
十亿数据,第一次5~10s,第二次就快了
es性能优化是没有什么银弹的。不要期待随手调一个参数,就可以万能的应对所有性能慢的场景。有些场景换个参数,或者调整个语法就能搞定,但是绝对不是所有场景都是这样的。
1.性能优化杀手锏 filesystem cache
第一次从磁盘查出数据会存到内存的fileSystem Cache,es搜索引擎严重依赖底层的os cache。
如果走磁盘一般肯定上秒, 但是如果走filesystem cache,走纯内存,那么基本上就是毫秒级的。从几毫秒到几百毫秒不等。
1.如果要es性能好,最佳情况下,机器的内存要容纳你总数据量的一半。
比如es中要存储1T数据,那么你多台机器留给filesystem cache的内存要加起来综合到512g。
2.往es里存少量的数据,比如30个字段只用到了三个就存三个。让内存留给filesystem cache的大小跟数据量一致。性能就会非常高,一般可以在1s以内
3.其他字段的数据可以存在mysql里面,建议采用es+hbasehbase的特点就是适用于海量数据的在线存储,就是可以对hbase写入海量数据,不要做复杂的搜索,就是做很简单的一些根据id或者范围查询的操作
总结:最好写入es数据小于 fileSystem cache内存大小
2.缓存预热
假如说,按照上面的方案去做了,es集群中每个机器写入的数据量还是超过了filesystem cache的一倍,60g数据,filesystem cache就30g,还有30g在磁盘中
可以自己后台搞个系统,每隔一会就去搜索一下热数据,刷到filesystem cache中。后面用户搜索热数据就是直接去内存里查了
3.冷热分离
1.将大量不搜索的字段,拆分到别的存储引擎里去,这个类似于mysql分库分表的垂直拆分。
2.可以做类似mysql水平拆分,就是说将大量的访问很少,频率很低的数据,单独写一个索引,然后将访问很频繁的热数据单独写一个索引。
比如:6台机器,2个索引,一个放冷数据,一个放热数据,每个索引3个shard 3台放热数据index;3台放冷数据index; 这样的话,大量的时候是在访问热数据,热数据可能就占总数据的10%,此时数据量很少,几乎能确保数据全部保留在filesystem cache 对于冷数据而言,是在别的index里面,跟热数据都不在同一个index机器上,如果有人访问冷数据,在磁盘上,此时性能差点就差点了。
子主题
4.document模型设计
es里的复杂的关联查询,复杂的查询语法,尽量别用,一旦用了性能一般都不太好。所以要好好设计es里的数据模型。
写入es的java系统里,就完成关联,将关联好的数据直接写入es中,搜索的时候就不需要利用es的搜索语法
比如 mysql两个表需要join在写入es的时候java直接将join好的数据写入es,不用es的join语法查询
5.分页性能优化
es分页性能比较坑假设每页10条数据,现在要查询第100页,实际上是会把每个shard上存储前1000条数据都查到一个协调节点上,如果你有5个shard,那么就有5000条数据,接着协调节点对这5000条数据进行一些合并,处理。再获取到最终第100页的10条数据。翻页的时候,翻的越深,每个shard返回的数据就越多,协调节点处理数据时间越长,非常坑爹。
1.不允许深度分页/默认深度分页性能很差。
系统不允许翻那么深的页,或者告诉产品默认翻的越深性能越差
2.类似于app里的推荐商品或者微博,不断下拉出现一页一页的。可以用scroll api来进行处理scroll会一次性给你生成所有数据的快照,每次翻页通过游标移动,获取下一页这样子,性能会比上面说的那种分页性能高很多。无论分多少页,性能基本上都是毫秒级的。因为scroll api 只能一页一页往后翻,不允许先第十页再120页。
es生产集群的部署架构是什么?每个索引的数据量大概有多少?每个索引大概有多少分片?
基本的版本,不要虚,说完就过去了
0 条评论
回复 删除
下一页