MQ消息队列<br>
队列:先进先出<br>
堆:先进后出<br>
异步执行
同步执行:前面执行完之后后面才能执行<br>
数据削峰
秒杀:短时间要处理大量的请求<br>
将用户的请求先放到消息队列,然后根据服务器的处理能力依次处理用户请求
大数据计算
小时榜
将生产者产生的数据快速发送给消费者
生产者和消费者和消费中间件有好多人
消息系统的原理
数据单元
消息必须关联到某些业务,必须有存在的意义和价值
完整性:要么传输成功要么传输失败 确保该数据单元的完整性<br>
独立性:在各个数据单元之间没有互相依赖 防止导致被分割的消息不能被同一个消费者所消费<br>
颗粒度:粒度太小传输次数变多,集群压力大<br>粒度太大:传输次数变少,数据有可能很大的延迟<br>
环境搭建
<br>
主题名:userlog<br>
PartitionCount:分区数三<br>
ReplicationFactor:2 2个副本
Partition: 0 1 2 0分区1分区2分区<br>
Replicas: 1,2:列出了该分区所有副本所在的 Broker 节点 ID
<br>
副本
Partition 分区<br>
Reaplication 副本<br>
AR 123<br>全部副本
<br>
OSR<br>离开同步队列的副本
ISR<br>加入同步队列的副本<br><font color="#e74f4c">即使副本在ISR也有可能和Leader的数据也是不同步的<br>Leader也属于ISR的一部分</font><br>
评判 OSR和ISR的标准 默认10s isr中的follow没有向leader发送心跳包就会被移除 replica.lag.time.max/ms=10000<br>
保持数据的一致性
LEO<br>
因为leader和follower的偏移量是不同,<font color="#e74f4c">下一跳待写入数据的偏移量</font>Leader和每个副本有可能不同的LEO<br>
leader:3334<br>followe1r:3332<br>follower2:3333<br>follower3:3331<br>
HW<br>
HW就决定了消费者可以读取的偏移量<br>
高水位线 是通过LEO计算出来的,基于Leader的LEO和所有ISR中的副本的LEO取所有的LEO的最小值<br>
HW:3331<br>
CheckPoint<br>
故障处理
leader崩溃<br>
优先从ISR中选出一个新的Leader<br>
为了保证多个副本之间的数据一致性,竞选失败的follower 会将各自的log文件高于HW的部分截掉然后从新的Leader同步数据<br><font color="#e74f4c">有可能选举成Leader的不是数据最多的需要把多余的数据切掉感觉会丢数据</font><br>
Follower崩溃<br>
Follower发生故障后会被临时踢出ISR<br>恢复后直接拉取数据即可
Unclean<br>
leader 崩了以后 ISR里面也没有副本了 使用脏选举<br>
消息保障机制(实现消息只发送一次)<br>
幂等性
幂等性的概念 生产者无论将消息发送多少次集群只存储一份 好比数字取绝对值
实现方式:(PID) Producerld SequenceNum(SN)<br>
每次partition 接收到数据之后会对比本身存放的sn号 <br>
生产者发送数据携带sn号<br>
第一种情况
生产者sn<=分区记录的sn -- 数据重复发送<br>
第二种情况
生产者sn - 分区记录的sn = 1 -- 直接写入【这属于正常情况 只接受这种数据】<br>
第三种情况
生产者sn - 分区记录sn >1 丢失数据<br>
缺陷
可以解决数据重复发送的问题,但是消息不能跨分区发送发送的数据必须和分区绑定
事务
事务协调器 : 解决了幂等性不能跨分区的问题<br>
只有一条=至少一条+幂等性+事务<br>
消费者
从kafka拉取数据 <br>
优化问题
kafka数据积压之后 不能单纯增加消费者 要增加消费者组<br>
<br>
因为一个分区 只能发送给 一个消费者组中的一个消费者进行消费
分区策略(消费者)<br>
RangeAssignor<br>
消费者总数和分区总数进行整除运算来获得一个跨度
然后剩余的从第一个消费者开始多分配一个
n=p/c<br>
m=p%c<br>
前m个消费者每个消费n+1<br>
m之后的消费者每人消费n<br>
优点:简单方便 缺点:如果一个消费者组订阅多个topic 会导致前m 个压力过大<br>
RoundRobinAssignor<br>
将消费组内所有的消费者以及消费者订阅的所有topic 的partition 按照字典序排列,然后通过轮询消费者方式逐个将分区分配给每个消费者 <br>优点将分区统一考虑,降低前m个消费者的压力<br>
<br>
StickyAssignor<br>
分区的分配要尽可能的均匀
分区的分配尽可能的于上次分配的保持相同
两者发生冲突时第一个目标优于第二个
<br>
Kafka3.3.1新特性<br>
<br>
kraft<br>
列出所有可用主题
kafka-topics.sh --bootstrap-server node01:9092 --list
高可用方式
kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --list
创建 hello 主题,设置分区数为 1,设置分区副本数为 3
kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \<br>--topic hello \<br>--create --partitions 1 \<br>--replication-factor 3
列出指定主题的详细信息
kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \<br>--topic hello \<br>--describe
通过 topic-id 查看主题
kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \<br>--topic-id a9Q84XrBT4SqXj2FwAEaYA \<br>--describe
修改分区数(分区数只能增加不能减少,还有就是命令行的方式无法使用 --replication-factor 修改分区副<br>本数)
kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \<br>--topic hello \<br>--alter \<br>--partitions 3
删除主题
kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \<br>--topic hello \<br>--delet
Pro & Con 命令
# 创建消费者,从当前最新消息开始消费,不指定分区默认监听所有分区<br>kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \<br>--topic hello<br># 创建消费者,从当前最新消息开始消费,指定监听分区 0<br>kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \<br>--topic hello \<br>--partition 0<br># 创建消费者,从日志中最早出现的消息开始消费<br>kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \<br>--topic hello \<br>--from-beginning<br># 创建生成者<br>kafka-console-producer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \<br>--topic hello
集成Flume<br>
<br>a1.sources = r1<br>a1.sinks = k1<br>a1.channels = c1<br># 配置 Agent a1 的 Source r1 的属性<br># 使用 Taildir Source 断点续传<br>a1.sources.r1.type = TAILDIR<br># 通过一个 JSON 格式的文件记录每个文件的 Inode 信息、绝对路径和最近一次读取的位置<br>a1.sources.r1.positionFile = /var/log/flume/taildir_position.json<br># 定义文件组(被监控的文件夹目录集合),这些文件夹下的文件都会被监控,多个用空格分隔<br>a1.sources.r1.filegroups = f1<br># 声明文件组 f1 监控的文件<br>a1.sources.r1.filegroups.f1 = /var/logs/test1/baidu.log<br># 是否添加文件的绝对路径名(绝对路径+文件名)到 Header 中<br>a1.sources.r1.fileHeader = true<br># 每次重新尝试轮询新数据时的最大时间延迟(毫秒)<br>a1.sources.ri.maxBatchCount = 1000<br># 一次读取数据行和写入 Channel 的最大数量<br>a1.sources.ri.batchSize = 100<br># 配置 Agent a1 的 Sink k1 的属性<br># 使用 Kafka Sink<br>a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink<br># 主题<br>a1.sinks.k1.kafka.topic = baidu<br># Kafka 服务器地址<br>a1.sinks.k1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092<br># 一个批次中要处理的消息数,默认为 100。设置较大的值可以提高吞吐量,但是会增加延迟。<br>a1.sinks.k1.kafka.flumeBatchSize = 20<br># 在考虑成功写入之前,要有多少个副本必须确认消息,默认为 1。<br># 可选值 0:从不等待确认 1:只等待 Leader 确认;-1:等待所有副本确认。<br># 设置为 -1 可以避免因某些情况下 Leader 失败而带来的数据丢失。<br>a1.sinks.k1.kafka.producer.acks = -1<br># 等待时间,该参数指定了生产者在发送批次之前等待更多消息加入批次的时间,生产者会在批次填满或者达到这个<br>时间时把批次发送出去<br>a1.sinks.k1.kafka.producer.linger.ms = 1<br># 消息压缩算法,默认不启用压缩。可以指定为:gzip、snappy、lz4 或 zstd<br>a1.sinks.k1.kafka.producer.compression.type = snappy<br># 配置 Agent a1 的 Channel c1 的属性,Channel 是用来缓冲 Event 数据的<br># 使用 Memory Channel<br># Channel 的类型是内存 Channel,顾名思义这个 Channel 是使用内存来缓冲数据<br>a1.channels.c1.type = memory<br># 内存 Channel 的总容量大小是 1000,注意这个容量不是越大越好,配置越大一旦 Flume 挂掉丢失的 Event<br>也就越多<br>a1.channels.c1.capacity = 1000<br># Source 和 Sink 从内存 Channel 每次事务传输的 Event 数量,该配置要小于总容量大小<br>a1.channels.c1.transactionCapacity = 100<br># 把 Source 和 Sink 绑定到 Channel 上<br># 与 Source r1 绑定的 Channel 有一个,叫做 c1,一个 Source 可以绑定多个 Channel<br>a1.sources.r1.channels = c1<br># 与 Sink k1 绑定的 Channel 有一个,叫做 c1,一个 Sink 只能绑定一个 Channel<br>a1.sinks.k1.channel = c1
常用命令
创建主题
kafka-topics.sh --zookeeper node01:2181/kafka0110 --create --replication-factor 1 --partitions 2 --topic baidu
查看所有主题
kafka-topics.sh --zookeeper node01:2181/kafka0110 --list
创建生产者
kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic userlog
创建消费者
kafka-console-consumer.sh --bootstrap-server node01:9092 --topic userlog --consumer-property group.id=yjx
删除主题
提前在配置文件server.properties增加设置,默认未开启<br>delete.topic.enable=true
删除命令
kafka-topics --delete --topic userlog --zookeeper<br>node01:2181,node02:2181,node03:2181/kafka0110