KAFKA
2021-12-26 18:14:22 3 举报
AI智能生成
思维导图
作者其他创作
大纲/内容
开源、分布式、分区和复制的基于提交日志的分布式发布-订阅消息系统
接收来自不同源系统的数据,并实时将数据提供给目标系统
以 Scala 和 Java 编写,通常与大数据的实时事件流处理相关
Kafka 简介
能够无缝处理多个生产者
多个生产者 Multiple Producers
Kafka 专为多个消费者设计,无需相互干扰即可读取任何单个消息流
多个消费者 Multiple Consumers
基于磁盘的持久化 Disk-Based Retention
可扩展 Scalable
高性能 High Performance
Apache Kafka 的一个关键特性是保留功能,即在一段时间内持久地存储消息。Kafka 代理配置了主题的默认保留设置,保留消息一段时间(例如,7 天)或直到主题达到某个字节大小(例如 1GB)。
为什么选择 Kafka
消息代理用于将数据处理与数据生产者分离。
信息系统 Messaging
网站活动跟踪 Website Activity Tracking
监控 Metrics
使用 Kafka 可以将日志或事件数据抽象为消息流,从而消除对文件细节的任何依赖。
日志收集 Log aggregation
流处理 Stream processing
Kafka 应用场景
Kafka 是在 2010 年底作为一个开源项目在 GitHub 上发布的
2011 年 7 月被提议并被接受为 Apache 软件基金会孵化器项目
阿帕奇·卡夫卡于2012 年 10 月从孵化器毕业
Kafka 的诞生
Topics 就像一个类别/一个索引,它把消息一起组在一起
Kafka 中的消息被分类为 Topics。
Topics 还被分解为多个 Partitions。回到\"提交日志\"描述,分区是单个日志
Topics
将消息推送到 Kafka Topics 的流程
Producer 创建新消息
生产者 Producer
使用来自 Kafka Topics 的消息的进程
消费者阅读信息。
消费者通过跟踪消息的偏移量来跟踪它已经消费了哪些消息
消费者 Consumer
一个不可变的 Topics 消息序列,连续地附加到结构化提交日志中
分区也是 Kafka 提供冗余性和可扩展性的方式
由于 Topics 通常具有多个分区,因此不能保证整个 Topics(只需在单个分区内)的消息时间排序
分区 Partition
单个 Kafka 服务器称为一个broker
代理接收来自生产者的消息,为其分配偏移量,并将消息提交到磁盘上的存储
它还为使用者提供服务,响应分区的提取请求,并响应已提交到磁盘的消息
Kafka broker
消费者作为消费者群体的一部分工作,消费者群体是一个或多个消费者共同消费一个主题。
该组确保每个分区仅由一个成员使用。
消费者可以横向扩展以使用包含大量消息的主题
如果单个使用者失败,组的其余成员将重新平衡正在使用的分区,以接管丢失的成员。
消费者组 Consumer Groups
Kafka 内部的数据单位称为消息
可以把消息本身理解成一个不透明的数组
消息被成批写入 Kafka。批处理只是消息的集合,所有这些消息都是针对同一Topics 和分区生成的。
消息和批处理
模式 Schemas
Kafka 组件
基础
安装 Java
Apache Kafka 使用 Zookeeper 存储有关 Kafka 群集的元数据以及使用者客户端详细信息
安装 Zookeeper
Kafka 代理接收来自生产者的消息,并将它们存储在由唯一偏移键键的磁盘上
Kafka 代理允许使用者按 topic、分区和偏移提取消息
Kafka 代理可以通过使用 Zookeeper 直接或间接地相互共享信息来创建 Kafka 群集
Kafka 群集只有一个代理充当控制器
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181--replication-factor 1 --partitions 1 --topic test
创建并验证 topic
/usr/local/kafka/bin/kafka-console-producer.sh --broker-listlocalhost:9092 --topic testTest Message 1
测试主题生成消息
/usr/local/kafka/bin/kafka-console-consumer.sh --zookeeperlocalhost:2181 --topic test --from-beginningTest Message 1
测试使用来自主题的消息
安装 Kafka broker
设置 kafka 的环境
General Broker
每个 Kafka 代理都必须有一个整数标识符,该标识符是用来设置broker.id 配置。
默认情况下,此整数设置为 0,但可以是任何值。但在一个 Kafka cluster中必须是唯一的。
broker.id
示例配置文件使用 TCP 端口 9092 上的侦听器启动 Kafka。这可以通过更改端口配置参数设置为任何可用端口
port
用于存储代理元数据的 Zookeeper 的位置。
hostname、主机名或 Zookeeper 服务器的 IP 地址
port,服务器的客户端端口号
/path,可选的 Zookeeper 路径,用作卡夫卡的 chroot 环境
zookeeper.connect
Kafka 将所有消息保存到磁盘,这些日志段存储在日志目录配置
log.dirs
Kafka 服务器配置为创建的主题指定了许多默认配置。
确定创建新主题时使用的分区数,主要是在启用自动主题创建时(这是默认设置)。
num.partitions
默认值是在配置文件中使用 log.retention.ms参数指定的,它被设置为 168 小时或一周。
log.retention.ms
消息保留值的总字节数是使用log.retention.bytes 参数设置的,它将应用于每个分区
log.retention.bytes
日志段已达到 log.segment.bytes 参数指定的大小(默认值为 1 GB),日志段将关闭并打开一个新的日志段。日志段一旦关闭,就可以考虑过期。
log.segment.bytes
Topic 默认值
broker配置
kafka-topics --create --zookeeper localhost:2181 --replication\u0002factor 1 --partitions 1 --topic test
创建 Kafka 主题
kafka-topics --zookeeper localhost:2181 --describe --topic test
Describe a topic
afka-topics --zookeeper localhost:2181 –list
Topics 列表
# change configurationkafka-topics --zookeeper localhost:2181 --alter --topic test --configmax.message.bytes=128000# add a partitionkafka-topics --zookeeper localhost:2181 --alter --topic test --partitions 2
更改 Topic
此工具用于将消息写入 topic
当消息采用基于文本的格式时,它很有用
kafka-console-producer --broker-list localhost:9092 --topic testhere is a message
向 topic 发送简单的字符串消息
用 key 发送信息
kafka-console-producer --broker-list localhost:9092 --topic test_topic < file.log
从文件发送消息
Kafka CLI Producer
从 Kafka 主题读取数据并写入标准输出(控制台)
kafka-console-consumer --bootstrap-server localhost:9092 --topic test
消费消息
要查看旧邮件,可以使用--from-beginning
consume 旧message
显示 key-value 信息
此工具允许您使用来自特定分区的消息。偏移量和副本
parition: 要从中使用的特定分区(默认为全部)
offset: 起始偏移量。使用-2 从开始使用消息,-1 从结尾使用。
max-messages: 要打印的邮件数
replica: 复制副本,默认为broker-leader(-1)
kafka-simple-consumer-shell
Kafka CLI Consumer
kafka-consumer-groups --bootstrap-server localhost:9092 --list
log-end-offset 是分区的最高偏移量
current-offset 是使用者实例最后提交的偏移量
owner 是客户端.id(如果未指定,则显示默认值)
lag 是当前消费者补偿和最高补偿之间的差值
列出消费者组
kafka-consumer-groups --bootstrap-server localhost:9092 --delete --group octopus
删除消费者组
Kafka CLI Conusmer Group
Kafka 控制台工具
安装
生产者可以向一个或多个 Kafka topics 发布消息。
Kafka 生产者将记录发送到 topics。这些记录有时被称为消息。
生产者为每个 topics 选择要向哪个分区发送记录。
生产者可以循环发送记录。
生产者可以根据记录的优先级将记录发送到特定分区,从而实现优先级系统
Kafka broker还通过复制为我们提供可靠性和数据保护
生产者概述
Topic
Kafka 主题被划分为多个分区
分区允许您通过在多个代理之间拆分特定主题中的数据来并行化主题。每个分区都可以放在单独的机器上,以允许多个使用者并行地从一个主题中读取数据。
分区
将对象转换为字节流以进行传输的过程称为序列化
序列化器
broker是一个无状态的 Kafka 服务器
Kafka 生产者和消费者不直接交互,而是使用 Kafka 服务器作为代理来交换消息服务。
broker
生产者组件
KafkaProducer
ProducerRecord
ProducerConfig
Java中通过org.apache.kafka.clients.admin.AdminClient操作topics
public CreateTopicsResult createTopics(Collection<NewTopic> newTopics)
创建topic
public DeleteTopicsResult deleteTopics(Collection<String> topics)
删除topic
public ListTopicsResult listTopics()
列表topic
public DescribeTopicsResult describeTopics(Collection<String> topicNames)
查询topic
Java 生产者 API
要创建 Kafka producer,首先需要设置属性,然后在 ProducerRecord 的帮助下发送消息。
key.serializer class:实现的键的序列化程序类
value.serializer class: 实现的值的序列化程序类
acks string:生产者要求领导者在考虑完成请求之前收到的确认数。这将控制发送的记录的持久性。
acks=0 如果设置为零,则生产者将不等待来自服务器的任何确认
acks=1 这意味着领导者会将记录写入本地日志,但不会等待所有追随者的完全确认。
acks=all 这意味着领导者将等待所以的同步副本集来确认记录。这可确保只要至少一个同步副本保持活动状态,记录就会丢失。这是最强的保证。这等效于 acks=-1 设置。
生产者可用于缓冲等待发送到服务器的记录的内存总字节数
bootstrap.servers list
生产者生成的所有数据的压缩类型。默认值为无(即无压缩)。
buffer.memory
配置控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor()将阻止多长时间
max.block.ms long
请求的最大大小(字节)
max.request.size int
配置控制客户机等待请求响应的最长时间
request.timeout.ms int
Kafka 生产者的属性列表
消息发送,生产者等待第一条消息的确认以发送第二条消息。
同步发送消息
有时我们不需要等待回复,然后发送下一条消息
为了异步发送消息并仍然处理错误方案,生产者支持在发送记录时添加回调
异步发送消息
向 Kafka 发送消息
创建 Kafka 生产者
Properties props = new Properties();props.setProperty(\"bootstrap.servers\
使用属性创建生产者配置对象
使用我们刚才提供的设置创建生产者对象
创建要推送到 Kafka 主题的消息
producer.send(record);
调用发送方法
producer.close();
<artifactId>kafka-clients</artifactId>
生产者的 maven 依赖
配置 Kafka Producer
Kafka生产者
消费者和消费者组
消费者组和分区再平衡
Kafka 消费者概述
每个 topic 每个分区的偏移量管理(自动读取 Zookeeper 中消费者组的最后一个偏移量)
代理故障转移,以及添加或减去分区和消费者时
负载平衡
整合 API
减少依赖性
更好的安全性
高级 API
⚫ 多次读取消息⚫ 在一个进程中只使用 topic 中分区的一个子集⚫ 管理事务,确保一条消息只处理一次
更好地控制 Kafka 消息的过度消耗
⚫ 偏移不再透明⚫ 需要处理代理自动故障转移⚫ 添加消费者、分区和代理需要自己进行负载平衡
更灵活的控制
Low Level API
Java 消费者 API
要创建 Kafka 消费者,首先需要设置属性,然后在 ConsumerRecord 的帮助下接收消息。
subcribe()方法将 topic 列表作为参数,以订阅:consumer.subscribe(Collections.singletonList(\"topicName\"));
订阅 topic
一旦消费者订阅了 topic,poll 循环将处理协调、分区重新平衡、心跳和数据获取的所有细节
poll 循环
创建 Kafka 消费者
创建消费者属性
创建消费者
配置 Kafka 消费者
consumer.subscribe(Collections.singletonList(this.topic));
让消费者订阅特定 topic
获得一些新数据
消费记录
读取消息
Kafka 消费者属性列表
Kafka消费者
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --replication-factor 1 --partitions 1
创建主题
kafka-topics.sh --bootstrap-server localhost:2181 --alter --topic mytopic --partitions 2
为主题添加分区
> bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic my-topic
如果想要真正将其删除,需要在 server.properties 里面加上一行配置如下:delete.topic.enable=true
删除主题
> bin/kafka-topics.sh --zookeeper localhost:2181 --list
列出集群中的主题
主题操作
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
可以通过将--list 更改为--describe 并添加--group 参数来获得更多详情信息,比如偏移量。这将列出指定消费者组正在使用的所有主题,以及每个主题分区的偏移量
列出和描述消费者群组
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 -delete --group my-group --group my-other-group
当要删除的消费者组不为空时,执行上面命令你将得到以下错误: GroupNotEmptyException。
手动删除一个或多个消费者组,可以使用--delete
删除消费者群组
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete-offsets --group my-group --topic my-topic-1 --topic my-topic-2
使用--delete-offsets 选项删除消费者组的偏移量此选项同时支持一个消费者组和一个或多个主题
重置使用者组的偏移量,可以使用“--reset-offsets”选项。此选项一次只支持一个消费者组。它需要定义以下范围:--all-topic 或--topic
偏移量管理
消费者操作
覆盖主题默认配置
Kafka 客户端唯一可以覆盖的配置是生产者和消费者的配额,即允许具有指定客户端 ID 的所有客户端在每个 broker 上每秒生产或者消费的字节数
覆盖客户端默认配置
可以使用命令行工具 kafka-configs.sh 来检查主题或客户机的特定配置。显示覆盖的配置(Describing Configuration Overrides)需要使用--describe 选项。
显示覆盖的配置
可以完全删除动态配置,这将导致集群配置恢复到默认值,要删除配置覆盖,请使用--alter 命令以及--delete-config 命令。
kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name my-topic --delete-config retention.ms
删除覆盖的配置
包含在一个名为 kafka-config.sh 的命令行工具脚本
动态配置修改
Kafka 在服务器和客户端都使用 Yammer Metrics 来测量系统的运行状态
查看可用指标(Metrics)的最简单方法是启动 Java 自带的 Jconsole 工具
Jconsole 是基于Java Management Extensions的,通过 JMX 就可以查看连接到的客户端或者服务器上的所有指标
监控服务器状态
监控生产者状态
监控消费者状态
Kafka 默认禁用远程 JMX,但我们通过为执行 CLI 命令所在的进程中设置环境变量 JMX_PORT,或者设置标准Java 系统属性,就可以通过编程方式启用远程 JMX
监控 Kafka
监控和管理
数据流(也称为事件流或流数据)是表示无限数据集的抽象
流处理是指对一个或多个事件流进行的处理。流处理与请求-响应和批处理一样是一种编程范式
无界性
对事件流进行排序
数据记录不可变
事件流可重播
请求-响应
批处理
Stream 处理
事件流模型的属性
是我们跟踪的事件发生和记录创建的时间
事件时间
是事件到达 Kafka 代理并存储在那里的时间
日志附加时间
是流处理应用程序接收事件以执行某些计算的时间
处理时间
时间
只能由流处理应用程序的特定实例访问的状态
本地或内部状态
优点是它的大小几乎是无限的,并且可以从应用程序的多个实例甚至不同的应用程序访问它。缺点是额外的系统会带来额外的延迟和复杂性
外部状态
状态
流包含更改的历史记录
表包含当前状态
流和表的对偶性
窗口大小
窗口移动的频率(提前间隔)
窗口保持可更新的时间
时间窗口
流处理的一些关键概念
流处理的最基本模式是单独地处理每个事件。这也被称为 map/filter 模式
它通常用于从流中过滤不必要的事件或转换每个事件
在这种模式中,流处理应用程序使用流中的事件,修改每个事件,然后将事件生成到另一个流。
例如,一个应用程序从流中读取日志消息并将错误事件写入高优先级流,将其余事件写入低优先级流。另一个例子是从流中读取事件并将其从 JSON 修改为 Avro 的应用程序
单事件处理
大多数流处理应用程序都关注聚合信息,尤其是时间窗口聚合
本地处理状态
流处理设计模式
每个 streams 应用程序至少实现和执行一个拓扑
Building a Topology(构建拓扑)
Kafka Streams 通过允许在应用程序的一个实例中执行多个线程以及支持应用程序的分布式实例之间的负载平衡来进行扩展
Streams 引擎通过将拓扑拆分为任务来并行执行拓扑
Scaling the Topology(扩展拓扑)
允许我们扩展应用程序的同一个模型也允许我们处理失败
Surviving Failures(故障容错)
Kafka Streams: 架构概述
Customer Service(客户服务)
Internet of Things(物联网)
Fraud Detection(欺诈检测)
Stream 处理用例
流处理
KAFKA
0 条评论
回复 删除
下一页