kafka介绍
kafka最初由linkedin使用Scala语言开发,后贡献给Apache基金会。它是一个高吞吐量、多分区、多副本、多订阅,基于Zookeeper协调的分布式发布订阅消息系统。
使用场景
日志收集
kafka可以收集系统中各个服务的日志,再通过kafka已统一接口的形式暴露给消费者
运营指标
kafka也用来记录平台运营中的各种指标数据。如:报警、报告等数据
流式处理
spark streaming和storm。
用户追踪
记录用户在web、app等平台上的活动。如:浏览网页、点击事件等
kafka基本概念
producer生产者
消息生产者,向Broker发送消息
broker
kafka节点,一个broker可以看成一个kafka服务
partition分区
消息分区,kafka消息存储在分区上的,一个topic可以关联多个分区,分区内的消息是有序的
topic 主题
kafka发送的消息是按照topic进行分组的。每条消息都需要指定topic
consumer消费者
消息消费者,从broker消费消息
consumer group 消费者组
每个consumer属于一个特定的ConsumerGroup。一条消息可以被多个ConsumerGroup消费,但同一个组中只有一个Consumer能消费这条消息
kafka安装
环境
- 操作系统centos7、jdk8、kafka_2.11‐2.4.1、zookeeper-3.5.8
配置
修改kafka 配置文件 config/server.properties<br>
启动
kafka-server-start.sh [-daemon] server.properties
基本使用(控制台命令—了解)
创建主题
bin/kafka-topics.sh --create --zookeeper 192.168.0.2:2181 --replication-factor 1 --partitions 1 --topic test
replication-factor : 副本数<br>partitions:分区数<br>
删除主题
bin/kafka-topics.sh --delete --topic test --zookeeper 192.168.0.2:2181
查看主题
bin/kafka-topics.sh --list --zookeeper 192.168.0.2:2181
生产者发送消息
bin/kafka-console-producer.sh --broker-list 192.168.0.2:9092 --topic test
消费者消费消息
默认是消费最新的消息:<br>bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.2:9092 --topic test<br>
消费之前的消息:<br>bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.2:9092 --from-beginning --topic test<br>
订阅多主题
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.2:9092 --whitelist "test|test-2"
单播消费
让所有消费者在同一个消费组里即:<br>bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.2:9092 --consumer-property group.id=testGroup --topic test<br>
多播消费
保证这些消费者属于不同的消费组即可<br>bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.2:9092 --consumer-property group.id=testGroup-2 --topic test<br>
查看消费者组
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.0.2:9092 --list
查看消息消费情况
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.0.2:9092 --describe --group testGroup<br>
current-offset:当前消费组的已消费偏移量<br>log-end-offset:主题对应分区消息的结束偏移量(HW)<br>lag:当前消费组未消费的消息数<br>
kafka集群
kafka启动一个broker 本身可以看做一个集群,只是集群中只有一个节点。组成真正意义上的集群只需要多启动几个节点就行。
集群配置:<br>#broker.id属性在kafka集群中必须要是唯一<br>broker.id=1<br>#kafka部署的机器ip和提供服务的端口号<br>listeners=PLAINTEXT://192.168.0.3:9092 <br>#kafka连接zookeeper的地址,要把多个kafka实例组成集群,对应连接的zookeeper必须相同<br>zookeeper.connect=192.168.0.2:2181,192.168.0.3:2181,192.168.0.4:2181,192.168.0.5:2181<br>
可以通过连接zookeeper查看集群是否注册成功:ls /brokers/ids<br>
Kafka的总控制器Controller
kafka Controller作用
kafka集群中有多个broker,其中某一个broker会被选举为Controller,负责整个集群中的分区和副本的管理。例如:<br> 1. 当分区中的某个leader挂掉后,Kafka Controller负责leader选举<br> 2. 集群中的分区、topic isr列表等元数据发生改变时,Kafka Controller 让其他分区感知道。
kafka Controller选举机制
Kafka Controller的选举借助于zookeeper实现,在broker启动过程中会再zookeeper中创建/controller 临时节点,有zookeeper保证只有一个broker创建成功。创建成功的broker就成为Kafka Controller
Kafka 的partition 分区leader选举机制
当前leader挂掉后,会被Kafka Controller感知道,有Kafka Controller 从分区中的ISR列表中选出第一个副本作为leader。
ISR列表: 1.副本节点不能产生网络分区,必须能与zookeeper保持会话以及跟leader副本网络连通<br> 2. 副本能复制leader上的所有写操作,并且不能落后太多
Consumer 消费者维护Offset记录
每个consumer会定期将自己消费分区的offset提交给kafka内部topic:__consumer_offsets,提交过去的时候,<br>key是 consumerGroupId+topic+分区号,value就是当前offset的值,kafka会定期清理topic里的消息,最后就保留最新的那条数据。如上图所示:Kafka默认会给50个分区,应对高并发请求。
Kafka 的Rebalance机制
Rebalance机制指的是如果Kafka的分区数量或者消费者组中的消费者发送了变化。Kafka的ReBalance机制会对消费者进行重新分配。如某个消费者组中的一个消费者宕机了,Kafka会将分配给它的分区重新分配给其他消费者,如果消费者重启了,又会分配一些分区给它消费。
ReBalance策略
RangeAssignor范围分区分配策略
partition.assignment.strategy=]org.apache.kafka.clients.consumer.RangeAssignor<br>以单个Topic维度来分配分区的,每个Topic的分区尽量均衡的分配给消费者。
消费者组中的消费者按照字母顺序排序,主题上的分区按照分区序号排序
先计算出每个消费者最少平均分配的分区数,剩下的分区按顺序分配给消费者
RoundRobinAssignor轮询分区策略
针对所有主题Topic分配分区的。使用轮询策略将分区分配给主题下的消费者
将所有主题下的消费者按照字母顺序排序,Topic下的分区按照分区序号排序
按照轮询策略,依次给消费者分配主题下的分区
StickyAssignor 粘性分区策略
分配给消费者的分区尽可能均衡,分配给消费者的主题分区最多相差一个
分区重新分配时尽可能保证和上传分配的一致
CooperativeStickyAssignor策略
Kafka2.4.0开始引入CooperativeStickyAssignor策略。CooperativeStickyAssignor与之前的StickyAssignor虽然都是维持原来的分区分配方案。最大的区别是:StickyAssignor仍然是基于eager协议,分区重分配时候,都需要consumers先放弃当前持有的分区,重新加入consumer group;<br>而CooperativeStickyAssignor基于cooperative协议,该协议将原来的一次全局分区重平衡,改成多次小规模分区重平衡。渐进式的重平衡。<br>
producer 发送消息流程
producer采用push模式将消息发送到选择的分区中,顺序写入文件
分区选择:producer在发送消息是如果指的了分区,就直接发送到指定的分区;如果发送时没有指定分区而是指定了key的话,通过对key的value值进行hash计算得到目标分区。如果未指定分区又没有指定key,默认采用轮询策略选择分区。
消息写入
producer从zookeeper中获取leader分区,将消息发送给leader。leader收到消息后顺序写入本地文件
其他flower节点从leader pull 拉取消息,并写入到本地log文件,然后向leader返回ACK
Leader收到所有ISR列表中Leader的ACK后增加HW,向producer发送ACK
HW&LEO
HW(High WaterMark)高水位,ISR列表中所有副本的LEO(log end Offset)最后提交数据日志偏移。Consumer 只能消费到高水位的消息
Kafka日志存储结构
Kafka消息日志采用分区分段存储,存储目录为Topic-partition分区号,在该目录下日志文件采用分段存储。每个分段存储不一样的log消息。Kafka规定单个log日志文件大小为1G,方便加载到内存中。
日志存储结构如如上所示。<br>
Kafka高性能机制
读写数据批量传输和数据压缩
消息持久化时使用磁盘顺序读写机制
Kafka数据传输中的零拷贝机制
Kafka利用操作系统的sendFile机制,减少数据的拷贝次数。不用再从内核空间拷贝到jvm用户空间,减少了两次磁盘拷贝、内核和用户上下文切换