kafka核心
2021-11-22 10:41:10 0 举报AI智能生成
KAFKA核心
KAFKA核心
kafka
kafka原理
模版推荐
作者其他创作
大纲/内容
kafka原理
备份机制
好处
方便实现“Read-your-writes”
方便实现单调读(Monotonic Reads)
副本角色
基于领导者(Leader-based)的副本机制
追随者副本
不对外提供服务
请求处理
方案
顺序请求
缺陷
吞吐量差
适用
请求发送非常不频繁的系统
每个请求使用单独线程处理
缺陷
开销大
适用
请求发送频率很低的业务场景
Reactor模式
Reactor模式是事件驱动架构的一种是先方式,特别适合应用于<br>处理多个客户端并发向服务器端发送请求的场景。
num.network.threads
默认3,表示每台Broker启动时会创建3个网络线程,专门处理客户端发送的请求
Rebalance全流程解析
Controller
职责
1、主题管理(创建、删除、增加分区)
kafka-topic脚本
2、分区重分配
kafka-reassign-partitions脚本
3、Preferred领导者选举
只要是kafka为了避免Broker负载过量而<br>提供的一种换Leader的方案。<br>
4、集群成员管理(新增Broker、Broker主动关闭、Broker宕机)
包括自定检测新增Broker、Broker主动或被动宕机
5、数据服务
控制器故障转移<br>(Failover)
当运行中的控制器突然宕机或意外终止时,<br>kafka能够快速地感知到,并立即启用备用<br>控制器来替代之前失效的控制器。<br>
高水位
作用
定义消息可见性,即用来标识分区下的哪些消息是可以被消费的
帮助kafka完成副本同步
运维与监控
主题管理
动态配置
消费者组位移管理
KafkaAdminClient
认证机制
MirrorMaker
监控框架
授权管理
kafka调优
流处理应用搭建实例
高级kafka应用
Kafka Streams
Kafka DSL开发
应用实例
基本使用
线上方案制定
操作系统
Windows
只适合于个人测试或用于功能验证,<br>千万不要应用于生产环境
Linux
最多,最优
I/O模型的使用
阻塞式I/O
非阻塞式I/O
I/O多路复用
信号驱动I/O
异步I/O
数据网络传输效率
社区支持度
能享受到零拷贝基数所带来的的快速数据传输特性
macOS
磁盘
使用机械磁盘完全能够胜任kafka线上环境
追求性价比的公司可以不搭建RAID,使用普通磁盘组成存储空间即可
磁盘容量
新增消息条数
消息留存时间
平均消息大小
备份数
是否启用压缩
带宽
集群配置参数
参数类型
静态参数:指必须在Kafka的配置文件server.properties中进行设置的参数,<br>同时必须重启Broker进程才能生效。
主体级别参数:Kafka提供专门的kafka-configs命令来修改
JVM和操作系统级别参数
Broker端参数
针对存储信息
log.dirs:重要!指定Broker需要使用的若干个文件目录的路径。<br>没有默认值,需亲自指定。例:/home/kafka1,/home/kafka2,...
提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据,有更高的吞吐量
能够实现在故障转移,即Failover
log.dir:单个路径,对上一个参数的补充<br>(只设置log.dirs就行,不要设置这个参数)<br>
Zookeeper参数
Zookeeper的作用
负责协调管理并保存Kafka集群的所有元数据信息,比如集群有哪些Broker在运行、<br>创建了哪些Topic、每个Topic都有多少分区以及这些分区的Leader副本都在哪些<br>机器上等信息。
zookeeper.connect:Kafka与zookeeper相关最重要的参数,<br>例如:zk1:2181,zk2:2181,zk3:2181,...2181是zookeeper默认端口
chroot
多个kafka集群使用同一套zookeeper集群
zookeeper.connect配置如:<br>zk1:2181,zk2:2181,zk3:2181/kafka1和<br>zk1:2181,zk2:2181,zk3:2181/kafka2<br>切记chroot只需要写1次,而且是加到最后
Broker之间通讯
listeners
学名监听器,告诉外部连接者要通过什么协议访问<br>指定主机名和端口开放的kafka服务<br>
若干个逗号分隔的三元组,每个三元组的格式为<协议名称,主机名,端口号><br>协议名称可以是标准的名字(PLAINTEXE、SSL、TLS等)也可以自定义
一旦自定义,就要指定listener.scurity.protocol.map参数,告诉这个协议底层使用了哪种安全协议。<br>比如listener.scurity.protocol.map=CONTROLLER:PLAINTEXT。表示CONTROLLER这个自定义协议<br>底层使用明文不加密传输数据。
主机名建议:全部使用主机名,即Broker端和Client端应用配置中全部填写主机名。
advertisend.listeners
和listeners相比多了个advertised。Advertised的含义表示宣称的、公布的,<br>就是说这组监听器是Broker用于对外发布的。
host.nome/port
过期的参数,不需要指定,可以忘掉
Topic管理
auto.create.topics.enable:是否允许自动创建Topic
建议设置成false,防止线上存在稀奇古怪的topic
unclean.leader.election.enable:是否允许Unclean Leader选举
默认false,建议显示设置成false
auto.leader.rebalance.enable:是否云系定期进行Leader选举
建议生产环境设置成false
数据留存
log.retention.{hours|minutes|ms}:都是控制一条消息数据被保存多长时间,<br>优先级:ms>minutes>hours
log.retention.bytes:指定Broker为消息保存的总磁盘容量大小,<br>默认-1,表明不限制。
message.max.bytes:控制Broker能够接受的最大消息大小。<br>默认1000012,不到1MB,太少了。建议设置一个比较大的值。
Topic级别参数
retention.ms:规定该消息被保存的时长,默认7天。<br>设置该参数会覆盖Broker端的全局参数。
retention.bytes:规定了要为Topic预留多大空间,默认设置为-1.<br>该参数再多租户的Kafka集群中会有用武之地。
设置方式
创建Topic时进行设置
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic transaction --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880
修改Topic时设置
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760
个人建议使用这种方式
JVM参数
将JVM堆大小设置成6GB
GC设置
Java7
如果Broker所在机器的CPU资源非常充裕,建议使用CMS收集器。<br>启用方法是指定XX:+UseCurrentMarSweepGC
否则使用吞吐量收集器。开启方法时指定XX:+UseParalleGC
Java8
手动设置使用G1收集器
设置(环境变量)
KAFKA_HEAP_OPTS:指定堆大小
KAFKA_JVM_PERFOR<ANCE_OPTS:指定GC参数
例:<br>$> export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g<br>$> export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true<br>$> bin/kafka-server-start.sh config/server.properties
操作系统参数
文件描述符限制
ulimirt -n 1000000
文件系统类型
ext3
ext4
XFS
最好选用XFS
Swappiness
配置成一个接近0但不是0的值
提交时间
适当拉大提交间隔
kafka入门
消息引擎基础
编码格式
使用纯二进制字节序列
传输协议
点对点模型<br>(也叫消息队列模型)
发布/订阅模型
主题(Topic)
发布者(Publisher)
订阅者(Subscriber)
kafka基本术语
消息(Record)
主题(topic)
发布订阅的对象叫做主题
客户端(clients)
生产者(Producer)
向主题发送消息的客户端应用程序叫生产者
消费者(Consumer)
订阅主题消息的客户端应用程序叫做消费者
消费者位移(Consumer Offset)
记录当前消费了分区的哪个位置上<br>(消费者消费进度的指示器)<br>
消费者组(Consumer Group)
多个消费者实例共同组成一个组来消费一组主题。<br>这个组主题中的每个分区都只会被组内的一个消费者实例消费
服务器端
由被称为Broker的服务进程构成
Broler负责接受和处理客户端发送过来的请求,以及对消息进行持久化
通常将不同的Broker分散运行在不同的机器上,<br>避免一台机器宕机,全部Broker进程都挂掉。
高可用
备份机制(Replication)
就是把相同的数据拷贝到多台机器上,<br>而这些相同的数据拷贝称为副本(Replica)<br>
领导者副本(Leader Replica)
对外(客户端)进行交互
追随者副本(Follower Replica)
只是被动地追随领导者副本而已,<br>不能与外界交互
副本的工作机制
生产者总是向领导者副本写消息
消费者总是从领导者副本读消息
追随者副本只做一件事:向领导者副本发送请求,<br>请求领导者把最新生产的消息发给它,这样它能<br>保持与领导者的同步。<br>
伸缩性(Scalability)
分区(Partitioning)
将每个主题划分成多个分区,每个分区是一组有序的消息日至。<br>生产者生产的每条消息只会发送到一个分区中。
位移(Offset)
每条消息在分区中的位置信息由一个叫位移的舒俱来表征
三层消息架构
第一层是主题层
每个主题可以配置M个分区,<br>而每个分区又可以配置N个副本。<br>
第二层是分区层
每个分区的N个副本中只能有一个充当领导角色,对外提供服务。<br>其他N-1个副本是追随者副本,只是提供数据冗余之用。
第三层是消息层
分区中包含若干条消息,每条消息位移从0开始,依次递增。
持久化
kafka使用消息日志(Log)来保存数据,一个日志就是在磁盘上一个只能追加写(Append-only)消息的物理文件
因为是追加,避免了缓慢的随机I/O操作,采用顺序I/O写操作
删除旧日志:通过日志段(Log Segment)机制
kafka后台有定时任务定期检查老日志段是否被删除,<br>实现回收磁盘空间的目的<br>
重平衡(Rebalance)
消费者组内某个消费者实例挂掉后,<br>其他消费者实例自动重新分配订阅<br>主题分区的过程。Rebalance是kafka<br>消费者端实现高可用的重要手段。
kafka角色定位
是消息引擎系统,也是一个分布式流处理平台
优势
更容易实现端对端的正确性
kafka自己对于流式计算的定位
kafka版本选择
Apache Kafka
社区版
Confluent Kafka
商业化
跨数据中心备份
Schema注册中心
集群监控工具
Cloudera/Hortonworks Kafka
提供大数据平台
版本号
客户端
生产者
分区机制
作用:提升负载均衡的能力,实现系统的高伸缩性(Scalability)
分区策略
默认分区策略
自定义分区策略
需要显示地配置生产者端的参数partitioner.class
轮询策略(Round-robin,顺序分配)
轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地<br>平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是<br>最常用的分区策略之一。
随机策略(Randomness)
按消息键保存策略(Key-ordering)
其他策略
基于地理位置的分区策略
一般只针对于大规模Kafka集群,特别是跨城市、跨国家、<br>甚至是跨大洲的集群
压缩算法
合适压缩
生产者端
生产者程序中配置compression.type参数表示启用指定的压缩算法
Broker端
2.1.0版本前
GZIP
Snappy
Lz4
2.1.0后
增加zstd(Zstandard)算法
算法优劣
吞吐量
LZ4>Snappy>zstd和GZIP
压缩比
zstd>LZ4>GZIP>Snappy
无消息丢失配置
kafka只对“已提交”的消息(committed message)做有限度的持久化保证
生产者丢失数据
producer.sens(msg)
不等待确认消息,立即返回
producer.send(msg,callback)
带回调通知的发送API
消费者程序丢失数据
解决方案:维持先消费消息,再更新位移的顺序
重复消费(多线程消费)
Consumer程序不要开启自动提交位移,而是要应用程序手动提交位移
最佳实践
1、不是要使用producer.send(msg),而是用producer.send(msg,callback)
2、设置acks=all。acks是Producer的一个参数,代表了对“已提交”消息的定义,<br>如果设置成all,则表明所有副本Broker都要接收到消息,该消息才是“已提交”。<br>这是最高等级的“已提交”定义
3、设置retries为一个较大的值。这里的retries是Producer的参数,对应Producer自动重试。<br>当出现网络的瞬时抖动时,消息发送可能会失败,此时设置了retries>0的Producer能够自动<br>重试,避免消息丢失。
4、设置unclean.leader.election.enable=false。这是Broker端的参数,它控制的是<br>哪些Broker有资格竞选分区Leader。如果一个Broker落后原先的Leader太多,那么它<br>一旦成为新的Leader,就会造成数据丢失,设置为false是不允许这种情况发生。<br>
5、设置replication.factor>=3。Broker端的参数,标识最好将消息保存几份,<br>毕竟目前防止消息丢失的主要机制就是冗余。
6、设置min.insync.replicas>1。Broker端参数,控制的是消息至少要被写入<br>多少个副本才算“已提交”。设置成大于1可以提升消息持久性。实际环境中<br>千万不要使用默认值1.
7、确保replication.factor>min.insync.replicas。如果两者相等,那么只要其中一个<br>副本挂机,整个分区就无法正常工作了。设置成replication.factor=min.insybc.replicas+1
8、确保消息消费完再提交。Counsumer端有个参数enable.auto.commit,最好把它设置成false,并采用手动提交位移的方式。
高级功能
Kafka拦截器
生产者拦截器
允许再发送消息前以及消息提交后植入拦截器逻辑
消费者拦截器
下消费消息前以及提交为以后编写特定逻辑
TCP连接管理
幂等性生产者与事务
消息可靠性保证
最多一次(at most once):消息可能会丢失,但绝不会被重复发送
(默认)至少一次(at least once):消息不会丢失,但有可能被重复发送
精确一次(exactly once):消息不会丢失,也不会被重复发送
幂等性(Idempotence)
指某些操作或函数能够被执行多次,但每次得到的结果都是不变的。
幂等性Producer
producer默认不是幂等的
props.put("enable.idempotence",true)或者props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)。<br>enable.idempotence被设置成true后,Producter自动升级成幂等性Producer。<br>
所用范围
只能保证单分区上的幂等性
只能实现单会话上的幂等性
事务(Transaction)
事务Producer
开启enable.idempotence=true
设置Producer端参数transactional.id。最好设置一个有意义的名字
API
初始化事务:initTranscation
事务开始:beginTransaction
事务提交:commitTransaction
终止事务:abortTransaction
Consumer端设置
设置isolation.level参数
read_uncommitted:这事默认值,表明Consumer能读取kafka写入的任何消息。
read_committed:表明Consumer只会读事务型Producer成功提交事务写入的消息。<br>当然也能看到非事务型Producer写入的所有信息。
消费者
消费者组(Consumer group)
Consumer Group是kafka提供的可扩展且具有容错性的消费者机制
理解
1、consumer Group下可以有一个或者多个Consumer实例。这里的实例可以是一个<br>单独的进程,也可以是同一个进程下的线程。在实际场景中,使用进程更为常见。<br>
2、Group ID是一个字符串,在一个kafka集群中,它标识位移的一个Consumer Group。
3、Consumer Group下所有实例订阅的主题的单个分区,只能分配给组内的某个<br>Consumer实例消费。这个分期当然也可以呗其他的Group消费。
位移主题(_consumer_offsets)
原理
将Consumer的位移数据作为一条条普通的kafka消息,提交到_consumer_offsets中,<br>_consumer_offsets的主要作用是保存Kafka消费者的位移信息<br>
格式
key格式
唯一主题的Key中应该保存3部分内容<Group Id,主题名,分区号>
消息格式
第一种格式:位移值、时间戳、用户自定义的数据等
第二种格式:用于保存Consumer Group信息的消息
第三种格式(墓碑消息/delete mark):用于删除Group过期位移设置是删除Group的消息
创建
当kafka集群中的第一个Consumer程序启动时,Kafka会自动创建位移主题
分区数
Broker端offsets.topic.num.paritions的取值,默认50
副本数和备份因子
Broker端offsets.topic.replication.factor参数,默认3
提交位移
参数
Consumer端的enable.auto.commit
true时自动提交
auto.commit.interval.ms
控制提交间隔
自动提交位移
问题:只要Consumer启动着,就会无限期地向位移主题写入消息
手动提交位移
API:consumer.commitSync等
删除过期消息
Compact策略
Kafka提供了专门的后台线程定期地巡检待Compact的主题,<br>看看是否存在满足条件的可删除数据
重平衡(Rebalance)
本质上是一种协议,规定了一个Consumer Group下的所有Consumer如何达成一致,<br>来分配订阅的每个分区。
触发条件
组成员数发生变更
订阅主题数发生变更
订阅主题的分区发生变更
缺点
对Consumer Group消费国成有极大的影响。在Reblalance过程中,<br>所有的Consumer实例都会停止消费,等待Rebalance完成。<br>
Rebalance的设计是所有Consumer实例共同参与,全部重新分配所有分区。
Rebalance太慢
非必要Rebalance
未及时发送信条,导致Consumer被踢出
设置session.timeout.ms=6s
设置:heartbeat.interval.ms=2s
保证Consumer实例在被判定为“dead”之前,能够发送至少<br>3轮心跳请求,即session.timeout.ms>=3*heartbeat.interval.ms
Consumer消费时间过程导致
设置max.poll.interval.ms大一点的值
位移提交
用户角度
自动提交
缺陷:比如5秒提交一次,3秒的时候发生Rebalance,导致3面数据重复消费
手动提交
缺陷:在调用commitSync()时,COnsumer程序会处于阻塞状态,知道Broker返回提交结果。
consumer.commitAsync():异步操作,不会阻塞
commitASync()不能替代commitSync()
commitAsync()失败后不会自动重试
Consumer角度
同步提交
异步提交
异常处理
原因
CommitFailedException,就是Consumer客户端在提交位移时出现了错误<br>或异常,而且还是那种不可恢复的严重异常。
在手动提交位移时发生(commitSync方法)
解决办法
增加期望的时间间隔max.poll.interval.ms参数值
减少poll方法一次性返回的消息数量,即减少max.poll.records参数值
发生场景
场景一:当消息处理的总时间超过了预设的max.poll.inerval.ms参数值时
1、缩短单条消息处理的时间
2、增加Consumer端允许下游系统消费一批消息的最大时长(max.poll.interval.ms)
3、减少下游系统一次性消费的消息总数(max.poll.records参数值)
4、下游系统采用多线程来加速消费
场景二:设置了相同group.id值的消费者组程序和<br>独立消费者程序。在独立消费者程序手动提交位移<br>时抛出异常。<br>
多线程开发实例
1、消费者程序启动多个线程,每个线程维护专属的kafkaConsumer实例<br>负责完整的消息获取、消息处理流程。
2、消费者程序使用单或多线程获取消息,同时创建<br>多个消费者线程执行消息处理逻辑。
TCP连接管理
group监控
消费进度\滞后进度<br>(消费者Log\Consumer Log)
指消费者当前落后生产者的程度
监控方法
1、使用kafka自带的命令行工具kafka-consumer-groups脚本
bin/kafka-consumer-groups.sh(bat)
2、使用kavka javaConsumerAPI编程
3、使用kafka自带的JMX监控指标
Collect
Get Started
Collect
Get Started
Collect
Get Started
Collect
Get Started
评论
0 条评论
下一页