Kafka
2024-08-01 18:39:25 22 举报
AI智能生成
Kafka是一种开源的分布式消息传递系统,专为高吞吐量和低延迟的应用而设计。它提供了一个高容错的平台,能够处理大量数据,同时保证数据的实时性和可靠性。Kafka通过分区、主题和消费者组等概念来实现大规模消息处理,并且支持横向扩展和弹性配置。此外,Kafka还提供了强大的数据持久性保证,确保消息不会因系统故障而丢失。因此,Kafka被广泛用于各种场景,包括实时数据分析、流处理、物联网(IoT)和微服务架构等。
作者其他创作
大纲/内容
概述
高性能的消息中间件,在大数据的业务场景下性能比较好,kafka本身不维护消息位点,而是交由Consumer来维护,消息可以重复消费,<br>并且内部使用了零拷贝技术,性能比较好<br>Broker持久化消息时采用了MMAP的技术,Consumer拉取消息时使用的sendfile技术<br>
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区(parition)、多副本的(replica),<br>基于Zookeeper协调的分布式消息系统,它最大的特性就是可以实时地处理大量数据以满足各种需求场景<br>比如:基于Hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志<br>、访问日志,消息服务等等,用scala语言编写
吞吐量在10w~100w
模型设计图
特点
一个parition只能交给一个consumer消费,因为交给多个consumer让其进行poll拉取消息进行消费,会引起重复消费的问题
服务端(broker)和客户端(producer、consumer)之间通信通过TCP协议来完成
Topic、Partition和Broker之间的关系.<br>一个topic,代表逻辑上的一个业务数据集,比如按数据库里不同表的数据操作消息区分放入不同topic,<br>订单相关操作消息放入订单topic,用户相关操作消息放入用户topic,对于大型网站来说,后端数据都是海量的.<br>订单消息很可能时非常巨量的,比如有几百个G甚至达到TB级别,如果把这么多数据都放在一台机器上可能会有<br>容量限制问题,那么就可以在topic内部划分多个partition来分片存储数据,不同的parition可以位于不同的机器上,<br>每台机器上都运行一个Kafka的进程Broker
同时未发布和订阅提供高吞吐量,Kafka的设计目标是以时间复杂度O(1)的方式提供消息持久化能力的,<br>即使对TB级别以上数据也能保证常数时间的访问性能,即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输
消费消息采用Pull模式,消息被处理的状态是在Consumer端维护的,而不是由服务端维护,Broker无状态,Consumer自己保存offset,<br>这样消息的消费速度直接和消费者有关,broker只是存储消息
核心组件
Topic
Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic
Parition
物理上的概念,一个topic可以分为多个partition,每个partition的内部时有序的
Broker
消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群
ConsumerGroup
每个Consumer属于一个特定的ConsumerGroup,一条消息可以被多个不同的ConsumerGroup消费<br>到那时一个ConsumerGroup中只能有一个Consumer能够消费该消息
Consumer
消息消费者,从Broker读取消息的客户端
Producer
消息生产者,向Broker发送消息的客户端
设计机制
Kafka核心总控制器Controller
在Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),<br>它负责管理整个集群中所有分区和副本的状态<br><ul><li>1.当某个分区的leader副本出现故障时,由控制器负责为该分区选举出新的leader副本</li><li>2.当检测到某个分区的ISR(In-Sync-Replica)集合发生变化时,由控制器负责通知所有的broker更新元数据信息</li><li>3.当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责让新分区被其他节点感知到</li></ul>
Controller选举机制
在Kafka集群启动的时候,会自动选举以太broker作为controller来管理整个集群,<br>选举的过程时集群中每个broker都会尝试在zookeeper上创建一个/controller的临时节点,<br>zookeeper会保证有且仅有一个broker能创建成功,这个broker就会成为集群的总控制器controller<br>当这个controller角色的broker宕机了,此时zookeeper临时节点就会消失,集群里其他broker<br>会一直监听这个临时节点,发现临时节点消失了,就竞争再次创建临时节点,这就是我们说的选举机制<br>zookeeper又会保证有一个broker成为新的controller
Partition副本选举Leader机制
controller感知到分区Leader所在的broker挂了(controller监听了很多zk节点可以感知到broker存活)<br>controller会从ISR(In-sync replica)列表(参数unclean.leader.election.enable=false)里挑第一个broker作为leader(第一个broker最先放进ISR列表,<br>可能时同步数据最多的副本)<br>如果参数unclean.leader.election.enable为true,代表ISR列表里所有副本都挂了的时候可以在ISR列表以外的副本中选leader<br>这种设置,可以提高可用性,但是选出的新leader有肯恩那个数据少很多,<br>副本进入ISR列表有两个条件:<br><ul><li>1.副本节点不能那个产生分区,必须能与zk保持会话以及跟leader副本网络连通</li><li>2.副本能复制leader上的所有写操作,并且不能落后太多。(与leader副本同步之后的副本,是由replica.lag.time.max.ms配置决定的,超过</li><li>这个时间都没有跟leader同步过的一次的副本会被移出ISR列表)</li></ul>
消费者消费消息的offset记录机制
每个consumer会定期将自己消费分区的offset提交给kafka内部topic:_consumer_offsets,<br>提交过去的时候,key时consumerGroupId+topic+分区号,value就是当前offset的值,<br>kafka会定期清理topic里的消息,最后就保留最新的那条数据,因为_consumer_offset可能会接收高并发的请求,<br>kafka默认给其分配50个分区(可以通过offsets.topic.num.paritions设置),这样可以通过加机器的方式抗大并发
消费者的rebalance机制
rebalance就是说如果消费组里的消费者数量有变化或者消费的分区数有变化,kafka会重新分配消费者消费分区的关系<br>比如consumer group中某个消费者挂了,此时会自动把分配给它的分区交给其他的消费者,如果它又重启了,那么<br>又会把一些分区重新交还给他<br>注意:relablance只针对subscribe这种不指定分区消费的情况,如果通过assign这种消费方式指定了分区,kafka不会进行rebalance<br>如下情况可能会触发消费者realance:<br><ul><li>1.消费组里的consumer增加或减少了</li><li>2.动态给topic增加了分区</li><li>3.消费者组订阅了更多的topic</li><li>rebalance过程中,消费者无法从kafka消费消息,这对kafka的TPS会有影响,如果kafka集群内节点较多,比如数百个,</li><li>那rebalance可能会耗时极多,所以尽量避免在系统高峰期的rebalance发生</li></ul>
producer发布消息机制
1.写入方式<br>producer采用push模式将消息发布到broker,每条消息都被append到partition中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka吞吐率)<br>2.消息路由<br>producer发送消息到broker时,会根据分区算法选择将其存储到哪一个partition,路由机制为:<br><ul><li>2.1 指定了partition,则直接使用</li><li>2.2 未指定partition但指定key,通过对key的value进行hash选出一个partition</li><li>2.3 partition和key都未指定,使用轮询选出一个partition<br></li></ul>3.写入流程<br><ul><li>3.1 producer先从zookeeper的/brokers/.../state/节点找到该partition的leader</li><li>3.2 producer消息发送给该leader</li><li>3.3 leader将消息写入本地log</li><li>3.4 followers从leader pull消息,写入本地log后向leader发送ACK</li><li>3.5 leader收到所有的ISR中的replica的ACK后,增加HW(high watermark,最后commit的offset)并向producer发送ACK</li></ul>
consumer消费消息机制
Kafka的设计是一个partition在同一时刻只能被一个消费者组中的一个消费者消费,用来保证partion的顺序性,<br>如果一个partition让两个消费者同时消费,则无法保证一个parition里面的消息消费是有序的,反观RocketMQ<br>中的MessageQueue可以被多个消费者消费,因为RocketMQ保证顺序消费的机制是通过将一类key的消息发送到<br>同一个队列当中来保证有序的
HW与LEO机制
HW俗称高水位,HighWatermark的缩写,取一个partition对应的ISR中最小的LEO(log-end-offse)作为HW,<br>consumer最多只能消费到HW所在的位置,另外每个replica都有HW,leader和follower各自负责更新自己的状态<br>对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replica同步后更新HW<br>此时消息才能被Consumer消费,这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取<br>对于来自内部broker的读取请求,没有HW的限制
由此可见,Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的follower都复制完<br>,这条消息才会被commit,这种方式极大的影响了吞吐率。而异步复制的方式下,follower异步的从leader复制数据,数据只要被<br>leader写入log就被认为已经commit,这种情况下如果follower都还没有复制完,落后于leader时,突然leader宕机,则会丢失数据<br>而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率,还可以设置消息发送端对于发出消息持久化机制参数acks的设置<br>
模型图解释
日志分段存储
Kafka一个分区的消息数据对应存储在一个文件夹下,以topic名称+分区号命名,消息在分区内是分段(Segment)存储,<br>每个段的消息都存储在不一样的log文件里,这种特性方便old segment file快速被删除,kafka规定了一个段位的log文件<br>最大为1G,做这个限制的目的是为了方便把log文件加载到内存去操作<br>
文件类型
index文件
部分消息的offset索引文件,kafka每次往分区发4K(可配置)消息就会记录一条当前消息的offset到index中,<br>如果要定位消息的offset会在这个文件里快速定位,再去log文件里找具体消息<br>00000000000000000000.index
log文件
消息存储文件,主要存offset和消息体<br>00000000000000000000.log<br>
timeindex文件
消息的发送时间索引文件,kafka每次往分区发4K(可配置)消息的发送时间戳与对应的<br>offset到timeindex文件,如果需要按照时间来定位消息的offset,会先在这个文件里找<br>00000000000000000000.log<br>
KafkaBroker有一个参数,log.setgment.bytes,限定了每个日志段文件的大小,最大就是1GB.<br>一个日志段文件满了,就自动开一个新的日志段文件来写入,避免单个文件过大,影响文件的读写性能,这个过程<br>叫做log rolling,正在被写入的那个日志段文件,叫做active log segment
支持的消息类型
普通消息
顺序消息
顺序消息
延时消息
消费模式
集群消费
一个ConsumerGroup中,只能有一个消费者消费消息
广播消费
一个ConsumerGroup中,每个消费者都可以消费到消息
线上规划
亿级流量模型设计
JVM参数设置
Kafka是由Scala语言开发,运行在JVm上,需要对JVM参数合理设置<br>修改/bin/kafka-start-server.sh中的jvm设置,假设机器是32G内存,可进行如上设置:<br><br>这种大内存的情况一般都要用G1垃圾收集器,因为年轻代内存比较大,用G1可以设置GC最大停顿时间,不至于<br>一次minor gc就花费太长时间,当然,因为像Kafka、RocketMQ ES这些中间件,写数据到磁盘会用到操作系统<br>的Page Cache,所以JVM内存不宜分配过大,需要给操作系统的缓存预留出几个G<br><br>
线上问题及优化
为什么要对topic下数据进行分区存储?<br>
1.commit log文件会受到所在机器的文件系统大小的限制,分区之后可以将不同的分区放在不同的机器上,<br>相当于对数据做了分布式存储,理论上一个topic可以处理任意数量的数据<br>2.提高并行度
如何在多个partition中保证顺序消费?
方案一:首先将需要保证顺序的消息收集起来,然后交给一个consumer去进行处理,然后内部维护一个线程池,让其中某一个线程去顺序执行这些消息eg:用户下单流程,支付成功消息 -> 库存消息<br>方案二:让多个消息构造一个特殊结构的顺序消息,当consumer收到时,在一个线程中依次进行消费
消息丢失
4个过程都有可能造成消息丢失
Producer
acks=0,表示producer不需要等待任何broker确认收到消息的回复,就可以发送下一条消息,性能最高,但是最容易丢消息<br>大数据统计报表场景,对性能要求很高,对数据丢失不敏感的情况可以用这种
acks=1,表示至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入,就可以继续发送<br>下一条消息,这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失
ack=-1或者all,这意味着leader需要等待所有备份(min.insync.replicas配置的备份个数)都成功写入日志,<br>这种策略会保证只要由一个备份存活就不会丢失数据,这是最强的数据保证,一般除非是金融级别,或跟钱<br>打交道的场景才会使用这种配置,当然如果min.insync.replicas配置的是1则也可能丢消息,跟acks=1情况类似
Consumer
如果消费这边配置的是自动提交,万一消费到数据还没处理完,就自动提交offset了,但是此时consumer直接宕机了<br>未处理完的数据丢失了,下次也消费不到了
消费重复
Producer
发送消息如果配置了重试机制,比如网络抖动事件过长导致发送端发送超时,实际broker可能已经接收到消息,<br>但发送方会重新发送消息
Consumer
如果消费这边配置的是自动提交,刚拉取了一批数据处理了一部分,但还没来得及提交,服务挂了,<br>下次重启又会拉取相同的一批数据重复处理<br>一般消费端都是要做消息幂等处理的
消息乱序
如果发送端配置了重试机制,Kafka不会等之前那条消息完全成功了才去发送下一条消息,这样就可能出现<br>发送了1,2,3条2消息,第一条超时了,后面两条发送成功,再重试发送第一条消息,这时消息在broker端的顺序就是2,3,1了<br>所以,是否一定要配置重试要根据业务情况而定。也可以用同步发送的模式取发消息,当然acks不能设置为0,<br>这样也能保证消息从发送端到消费端全链路有序<br>kafka保证全链路消息顺序消费,需要从发送端开始,将所有有序消息发送到同一个分区,然后用一个消费者去消费,<br>但是这种性能比较低,可以在消费者端接收到消息后将需要保证顺序消费的几条消息发到内存队列(可以多搞几个),一个内存队列<br>开启一个线程顺序消费处理
一个parition同一时刻在一个consumer group中只能有一个consumer实例在消费<br>,从而保证消费顺序。consumer group中的consumer数量不能比一个topic中的partion数量还要多,否则<br>多出来的consumer消费不到消息。<br>Kafka只在parition的范围内保证消息消费的局部顺序性,不能在同一个topic中的多个partition中保证总的消费性<br>如果有在总体上保证消费顺序的需求,那么我们可以通过将topic的partition数量设置为1,将consumer group中的<br>consumer instance数量也设置为1,但是这样会影响性能,所以kafka的顺序消费很少用
消息积压
1.线上有时因为发送方发送消息速度过快,或者消费放处理消息过慢,可能会导致broker挤压大量未消费消息<br>此种情况如果挤压了上百万未消费消息需要紧急处理,可以修改消费端程序,让其将收到地消息快速转发到其他<br>topic(可以设置很多分区),然后再启动多个消费者同时消费新主题地不同分区
2.由于消息数据格式变动或者消费者程序有bug,导致消费者一直消费不成功,也可能导致broker积压大量未<br>消费消息.此种情况可以将这些消费不成功地消息转发到其他队列里去(类似死信队列),后面再慢慢分析死信队列<br>里地消息处理问题
延时队列
延时队列存储的对象是延时消息,所谓的"延时消息"是指消息被发送以后,并不想让消费者立刻获取,而是等待<br>特定的时间后,消费者才能获取这个消息进行消费,延时队列的使用场景有很多。比如:<br>1.在订单系统中,一个用户下单之后通常有30分钟的时候进行支付,如果30分钟之内没有支付成功,那么这个订单<br>将进行异常处理,这时就可以使用延时队列来处理这些订单了<br>2.订单完成1小时后通知用户进行评价
实现思路:发送延时消息时先把消息按照不同的延迟时间段发送到指定的队列中(topic_1s、topic_5s、topic_10s....2h)<br>这个一般不能支持任意时间段的延时),然后通过定时器进行轮询这些topic,查看消息是否到期,如果到期就把这个消息<br>发送到具体业务处理的topic中,队列中消息越靠前的到期时间越早,具体来说就是定时器在一次消费过程中,对消息<br>的发送时间做判断,看下是否延迟到对应时间了,如果到了就转发,如果还没到这一次定时任务就可以提前结束了
消息回溯
如果某段时间对已消费消息计算的结果觉得有问题,可能是由于程序bug导致的计算错误,当程序buf修复后,这时可能需要<br>对之前已消费的消息重新消费,可以指定从多久之前的消息回溯消费,这种可以用consumer的offsetForTimes、seek等<br>方法指定从某个offset偏移量的消息开始消费
分区数越多吞吐量越高吗
可以用kafka压测工具自己测试分区数不同,各种情况下的吞吐量<br>#往test里发送一百万消息,每条设置1kb<br># throughout用来进行限流控制,当设定的值小于0时不限流,当设定的值大于0时,当发送的吞吐量大于该值时就会被<br>阻塞一段时间<br>bin/kafka-producer-perf-test.sh --topic test --num-records 1000000 --record-size 1024 --throughout -1<br>--producer-props bootstrap.servers=192.168.65.60:9092 acks=1
网络上很多资料都说分区数越多吞吐量很高,但从压测结果来看,分区数到达某个值,吞吐量反而开始下降<br>实际上很多事情都会有一个临界值,当超过这个临界值之后,很多原本符合既定逻辑的走向又会变得不同,<br>一般情况下跟集群机器数量相当就差不多了,比如分区对文件描述符的占用,以及缓冲区的竞争,<br>类似于Redis集群数量不能超过1000个,当超过1000个时,<br>整体的网络心跳将会特别长,心跳数据包会比较大,心跳包中会包含,比较容易产生网络分区<br>当然吞吐量的数值和走势还会和磁盘、文件系统、IO调度策略等因素有关<br>注意:如果分区数设置过大,比如设置10000,可能会设置不成功,后台可能会报错<br>"java.io.IOException: Too many open files"异常中最关键的信息是"Too many open files",<br>这是一种常见的Linux系统错误,通常意味着文件描述符不足,它一般发生在创建线程、创建socket、<br>打开文件这些场景下,在Linux系统的默认设置下,这个文件描述符的个数不是很多,通过ulimit -n命令可以查看<br>一般默认是1024,可以将该值调大比如 ulimit -n 65535
消息传递保障
at most once(消费者最多收到一次消息, 0-1次) acks = 0可以实现<br>at least once(消费者至少收到一次消息,1-多次) acks = all可以实现<br>exactly once(消费者刚好收到一次消息): at least once加上消费者幂等可以实现,还可以用kafka生产者的幂等性<br>来实现
kafka生产者的幂等性:<br>因为发送端充值导致的消息重复发送问题,kafka的幂等性可以保证重复发送的消息指接收一次,只需要<br>在生产者加上参数props.put("enable.idempotence",true)即可,默认是false不开启<br>具体实现原理是,kafka每次发送消息会生成PID和Sequence Number,并将这两个属性一起发送给broker<br>broker会将PID和Sequence Number跟消息绑定一起存起来,下次如果生产者重发相同消息,broker会检查<br>PID和Sequence Number,如果相同不会再接收
PID:每个新的Producer在初始化的时候会被分配一个唯一的PID,这个PID对用户完全是透明的,生产者<br>如果重启则会生成新的PID,<br>Sequence Number:对于每个PID,该Producer发送到每个Partition的数据都有对应的序列号,这些<br>序列号是从0开始递增的
Kafka的事务
Kafka的事务不同于RocketMQ,RocketMQ是保障本地事务(比如数据库)与MQ消息发送的事务一致性,<br>Kafka的事务主要是保障一次发送多条消息的事务一致性(要么同时成功要么同时失败)<br>一般在Kafka的流式计算场景用得多一点,比如,kafka需要对于给topic里的消息做不同的流式计算处理<br>处理完分别发到不同的topic里,这些topic分别被不同的下游系统消费(比如HBase、Redis、ES等)<br>这种我们肯定希望系统发送到多个topic的数据保持事务一致性
Kafka高性能的原因
磁盘顺序读写
kafka消息不能修改以及不会从文件中间删除保证了磁盘顺序读,kafka的消息写入文件都是<br>追加在文件末尾,不会写入文件中的某个位置(随机写)保证了磁盘的顺序写,反观MySQL却是会经常<br>需要产生随机IO
数据传输的零拷贝
kafka的消费者底层通过操作系统的sendfile来实现零拷贝<br>Kafka的生产者则是通过操作系统的mmap来实现零拷贝<br>1.减少了两次内核与用户空间的数据拷贝<br>2.减少了内核与用户空间上下文切换<br><br>
网上很多人说sendfile是直接从内核读取缓冲区拷贝到网卡接口里面,也有人说拷贝到socket缓冲区当中,<br>我们通过```mand systemcalls sendfile```会发现,在Linux 内核2.6.33之前是拷贝到socket缓冲区当中,<br>之后的版本是直接拷贝到了网卡接口
子主题
读写数据的批量batch处理以及压缩传输
0 条评论
下一页