消费者和消费者组
<b>每一个分区只能被一个消费组中的一个消费者所消费</b>。一个消费者可以订阅多个分区。
partition.assignment.strategy:设置消费者与订阅主体之间的分区分配策略。
Kafka支持两种消息投递模式:<b>点对点模式</b>(基于队列)、<b>发布/订阅模式</b>。
点对点模式:如果所有的消费者都隶属于同一个消费组,那么消息都会被均衡地投递给每一个消费者(<b>一对一</b>),即每条消息只会被一个消费者处理。
发布/订阅模式:如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者(<b>一对多</b>),即每条消息会被所有的消费者处理。
同一个消费组内的消费者即可以部署在同一台机器上,也可以部署在不同的机器上。
客户端开发
<b>消费步骤</b>
配置消费者客户端参数及创建响应的消费者实例。
订阅主题。
拉取消息并消费。
提交消费位移。
关闭消费者实例。
必要参数
Kafka消费者客户端KafkaConsumer中的4个参数,如下:
bootstrap.servers:指定连接Kafka集群所需的broker地址清单。
group.id:消费者隶属的消费组的名称(一般设置具有业务意义的名称)。
key.deserializer和value.deserializer:与生产者的中的key和value的序列化对应。
<b>订阅主题与分区</b>
<b>一个消费者可以订阅一个或多个主题。</b>
如果消费者采用正则表达式的方式订阅,在之后如果有人创建了新的主题,并且名字和正则表达式相匹配,那么消费者可以消费到新添加的主题中的消息。
消费者还可以直接订阅某些主题的特定分区(KafkaConsumer的assign方法)。
Kafka消费者的订阅状态:<b>集合订阅</b>的方式(subscribe(Collection))、<b>正则表达式订阅</b>的方式(subscribe(Pattern))、<b>指定分区订阅</b>(assign(Collection))的的方式。这三种状态是互斥的。
通过subscribe方法订阅分区是具备消费者自动均衡的,而assign不具备。
反序列化
反序列化器:ByteBufferDeserializer、ByteArrayDeserializer、BytesDeserializer、DoubleDeserializer、FloatDeserializer、IntegerDeserializer、LongDeserializer、ShortDeserializer、StringDeserializer。
实际应用中,在Kafka提供的序列化器和反序列化器满足不了应用需求的时候,推荐使用Avro、JSON、Thift、ProtoBuf或Protostuff等序列化工具来包装。
消费消息
Kafka中的消息是基于拉模式的。
消息的消费模式:<b>推模式</b>、<b>拉模式</b>。
Kafka消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用poll方法,而poll方法返回的是所订阅的主题上的一组消息。
ConsumerRecord类属性:主题、分区、偏移量、时间戳、TimestampType(两种类型,消息创建时间、消息存储时间)、Key和value经过序列化后的大小、消息的头部内容headers、CRC32的校验值。
<b>位移提交</b>
<b>偏移量</b>:对于分区而言,每条消息都有唯一的offset,用来表示消息在分区中对应的位置。
<b>消费位移</b>:对于消费者而言,也有一个offset,消费者使用offset表示消费到分区中某个消息所在的消息。
对于一条消息而言,偏移量和消费者消费它的消费位移是相等的。
在旧的消费者客户端中,消费位移是存储在ZooKeeper中的。而在新消费者客户端中,消费位移存储在Kafka内部的主题__consumer_offsets中。
把<b>消费位移存储起来</b>的动作称为 "<b>提交</b>",消费者在消费完消息之后需要执行消费位移的提交。
当前消费者消费到的位置为x,那么消费者需要提交的消费位移并不是x,而是x+1。
<b>消息丢失</b>:当前一次拉取消息集[x+2,x+7],如果拉取消息后就进行位移提交,即x+8,那么当前消费x+5遇到异常,故障恢复后,重新拉区消息是从x+8开始的。
<b>重复消费</b>:位移提交的动作是在消费完所有拉取到的消息之后才执行的。当消费x+5遇到异常,在故障恢复后,又重新拉区的消息是从x+2开始的。x+2到x+4之间的消息又重新消费了一遍。
自动位移提交的方式在正常情况下不会发生消息丢失或重复消费的现象。
Kafka提供手动位移提交的方式,手动的提交方式可以让开发人员根据程序逻辑在合适的地方进行位移提交。
enable.auto.commit:false。
<b>手动提交分为同步提交和异步提交。</b>
<b>同步提交</b>:对拉取到的每一条消息做相应的逻辑处理,或批量处理+批量提交的方式。<b>也有可能出现重复消费问题</b>(<b>消费位移提交前程序突然崩溃</b>)。commitSync()方法会根据poll方法拉取的最新位移来进行提交,只要没有发生不可恢复的错误,它就会阻塞消费者线程直至位移提交完成。
<b>异步提交</b>:commitAsync()方法在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作。异步提交可以使消费者的性能得到一定的增强。
重复消费:commitAsync提交的时候也会失败。一般能想到的方法是重试。如果某一次提交的消费位移为x,但是提交失败了,然后下一次又异步提交了消费位移为x+y,这次成功了。如果这时重试第一次提交的消费位移x,那么此时的消费位移又变为了x。这样就会引入重复消费问题。
解决方法:设置一个递增的序号来维护异步提交的顺序,每次位移提交之后就增加序号对应的值。遇到位移提交失败需要重试的时候,可以检查提交的位移和序号的值的大小,如果前者小于后者,说明有更大的位移已经提交了;如果相等,进行重试提交。
控制或关闭消费
KafkaConsumer提供了对消费速度进行控制的方法,有时我们需要暂停对某些分区的消费而先消费其他分区,当达到一定条件时再恢复这些分区的消费。
KafkaConsumer是非线程安全的,调用wakeup方法后可以退出poll的逻辑。跳出循环后一定要显示地执行关闭动作以释放运行过程中占用的各种系统资源。
指定位移消费
当一个新的消费组建立的时候,或者消费组内的一个新消费者订阅了一个新的主题,或者__consumer_offsets主题中关于这个消费组的位移信息过期而被删除后,没有可以查找的消费位移。
Kafka中消费者找不到记录的消费位移时,会根据<b>消费者客户端参数auto.offset.reset</b>的配置决定从何处开始进行消费,<b>默认值"latest"</b>,表示从分区末尾开始消费消息。<b>参数"earliest"</b>表示消费者从0开始消费。参数"none"抛异常(前提是找不到消费位移)。
此时用一个新的消费组来消费某个主题时,客户端会报出重置位移的提示信息。<b>Resetting offset for partition</b>。
seek方法指定某个分区的位移消费
再均衡
指分区的所属权从一个消费者转移到另一个消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们既方便又安全地删除消费组内的消费者或往消费组内添加消费者。
再均衡期间,消费组内的消费者是无法读取消息的。
为了防止重复消费问题,一般情况下,应该避免不必要的再均衡发生。当然也可以通过异步提交消费位移的操作避免重复消费。
消费者拦截器
主要在消费到消息或在提交消息位移时进行一些定制化的操作。
KafkaConsumer会在poll方法返回之前调用拦截器的onConsume方法来对消息进行定制化操作,比如修改返回的消息内容、按照某种规则过滤消息。
KafkaConsumer会在提交完消费位移之后调用拦截器的onCommit方法,可以使用这个方法来记录跟踪所提交的位移信息。
某些业务场景中会对消息设置一个有效期的属性,如果某条消息在既定的时间窗口内无法到达,那么就会被视为无效。自定义拦截器ConsumerInterceptorTTL实现ConsumerInterceptor<>接口。
<b>多线程实现</b>
KafkaConsumer定义了一个acquire方法,用来检测当前是否只有一个线程在操作,若有其他线程在操作则会抛出ConcurrentModifcationException异常。
<b>acquire通过线程操作计数标记的方式</b>检测线程是否发生了并发操作,以此保证只有一个线程在操作。acquire()和release()表示加锁和解锁。
多个消费线程同时消费同一个分区,通过assign、seek 方法实现。这种实现对于位移提交和顺序控制的处理会变得非常复杂。实际应用使用的很少。
将处理消息模块改成多线程的实现方式。
重要的消费参数
fetch.min.bytes:配置Consumer在一次拉取请求中能从Kafka中拉取的最小数据量,默认1B。
fetch.max.bytes:配置Consumer在一次拉取请求中从Kafka拉取的最大数据量。默认值50MB。如果此值比Kafka任何一条消息都小,也是可以消费的。
fetch.max.wait.ms:指定Kafka的等待时间。
max.partition.fetch.bytes:配置从<b>每个分区</b>返回给Consumer的最大数据量,默认值1MB。
max.poll.records:Consumer在一次拉取请求中拉取的最大消息数,默认值500条。
connection.max.idle.ms:在多久之后关闭闲置的连接,默认值9分钟。
exclude.internal.topics:Kafka的两个内部主题,__consumer_offsets和__transaction_state。exclude~指定Kafka中的内部主题是否可以向消费者公开,默认true。
receive.buffer.bytes:设置Socket接收消息缓冲区(SO_RECBUF)的大小,默认64KB。-1表示使用系统的默认值。
send.buffer.bytes:设置发送消息缓冲区(SO_SNDBUF)的大小,默认值128KB。
request.timeout.ms:Consumer等待请求响应的最长时间,默认30s。
metadata.max.age.ms:配置元数据的过期时间,默认值5分钟。
reconnect.backoff.ms:配置尝试重新连接指定主机之前的等待时间,避免频繁地连接主机,默认50ms。
retry.backoff.ms:配置尝试重新发送失败的请求到指定的主题分区之前的等待时间,避免在某些故障情况下频繁地重复发送,默认100ms。
isolation.level:配置消费者的事务隔离级别。<b>默认 "read_uncommitted",可以消费到HW处的位置</b>。"read_committed",可以消费到LSO处的位置。