kafka--发布-订阅消息系统
1. Kafka概述
1. kafka是什么
kafka是分布式的
、高并发的
、基于发布/订阅模式
的消息队列软件系统。
kafka中的重要组件
- Producer:消息生产者,发布消息到Kafka集群的终端或服务
- Consume:消费者,从Kafka集群中消费消息的终端或服务
- Broker: 一个 Kafka 服务器也称为 Broker,它接受生产者发送的消息并
存入磁盘
;Broker 同时服务消费者拉取分区消息的请求,返回目前已经提交的消息。 - 集群(cluster):若干个 Broker 组成一个 集群(Cluster),其中集群内某个 Broker 会成为集群控制器(Cluster Controller),它负责管理集群,包括分配分区到 Broker、监控 Broker 故障等。在集群内,一个分区由一个 Broker 负责,这个 Broker 也称为这个分区的 Leader;当然一个分区可以被复制到多个 Broker 上来实现冗余,这样当存在 Broker 故障时可以将其分区重新分配到其他 Broker 来负责。
-
主题(Topic):主题(topic):一个 topic 里保存的是同一类消息,相当于对消息的分类,每个 producer 将消息发送到 kafka 中,都需要指明要存的 topic 是哪个,也就是指明这个消息属于哪一类。
-
Partition(分区):每个 topic 都可以分成多个 partition,每个 partition 在存储层面是 append log 文件。任何发布到此 partition 的消息都会被直接追加到 log 文件的尾部。为什么要进行分区呢?最根本的原因就是:kafka基于文件进行存储,当文件内容大到一定程度时,很容易达到单个磁盘的上限,因此,采用分区的办法,一个分区对应一个文件,这样就可以将数据分别存储到不同的server上去,另外这样做也可以负载均衡,容纳更多的消费者。
-
Replica:即副本,为实现数据备份的功能,保证集群中的某个节点发生故障时,该节点上的 Partition 数据不丢失,且 Kafka 仍然能够继续工作,为此 Kafka 提供了副本机制,一个 Topic 的每个 Partition 都有若干个副本,一个 Leader 副本和若干个 Follower 副本。
-
Leader:即每个
分区
多个副本的主副本
,生产者发送数据的对象,以及消费者消费数据的对象,都是 Leader。 -
Follower:即每个分区多个副本的
从副本
,会实时从 Leader 副本中同步数据,并保持和 Leader 数据的同步。Leader 发生故障时,某个 Follower 还会被选举并成为新的 Leader , 且不能跟 Leader 在同一个 Broker 上, 防止崩溃数据可恢复。 -
Offset:消费者消费的位置信息,监控数据消费到什么位置,当消费者挂掉再重新恢复的时候,可以从消费位置继续消费。
-
ZooKeeper:ZooKeeper用于管理和协调Kafka代理。 ZooKeeper服务主要用于通知生产者和消费者Kafka系统中存在任何新代理或Kafka系统中代理失败。 根据Zookeeper接收到关于代理的存在或失败的通知,然后产品和消费者采取决定并开始与某些其他代理协调他们的任务。
kafka 特点
- kafka 是一个基于发布-订阅的分布式消息系统(消息队列)
- Kafka 面向大数据,消息保存在主题中,而每个 topic 有分为多个分区
- kafka 的消息数据保存在磁盘,每个 partition 对应磁盘上的一个文件,消息写入就是简单的文件追加,文件可以在集群内复制备份以防丢失
- 即使消息被消费,kafka 也不会立即删除该消息,可以通过配置使得过一段时间后自动删除以释放磁盘空间
- kafka依赖分布式协调服务Zookeeper,适合离线/在线信息的消费,与 storm 和 spark 等实时流式数据分析常常结合使用
Kafka 中 AR、ISR、OSR 三者的概念
- AR:分区中所有副本称为 AR
- ISR:所有与主副本保持一定程度同步的副本(包括主副本)称为 ISR
- OSR:与主副本滞后过多的副本组成 OSR
ZooKeeper的作用
Kafka 架构中 ZooKeeper 以怎样的形式存在?
kafka的目录:
ZooKeeper 是一个分布式、开放源码的分布式应用程序协调服务,是 Google 的 Chubby 开源实现。分布式应用程序可以基于它实现统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等工作。
在基于 Kafka 的分布式消息队列中,ZooKeeper 的作用有 Broker 注册
、Topic 注册
、Producer 和 Consumer 负载均衡
、维护 Partition 与 Consumer 的关系
、记录消息消费的进度
以及 Consumer 注册
等。
1. Broker 在 ZooKeeper 中的注册
ZooKeeper 是一个共享配置中心,我们可以将一些信息存放入其中,比如 Broker 信息,本质上就是存放一个文件目录
。这个配置中心是共享的,分布式系统的各个节点都可以从配置中心访问到相关信息。同时,ZooKeeper 还具有 Watch 机制(Raft 算法),一旦注册信息发生变化,比如某个注册的 Broker 下线,ZooKeeper 就会删除与之相关的注册信息
,其它节点可以通过 Watch 机制
监听到这一变化,进而做出响应。
言归正传,Broker 注册,也就是 Kafka 节点注册,本质上就是在 ZooKeeper 中创建一个专属的目录(又称为节点),其路径为 / brokers。
在专属节点创建好后,Kafka 会将该 Broker 相关的信息存入其中,包括 broker.name 、端口号。
需要特别说明的是,Broker 在 ZooKeeper 中注册的节点是“临时节点”,一旦 Broker 故障下线,ZooKeeper 就会将该节点删除。同时,可以基于 Watch 机制监听到这一删除事件,进而做出响应(如负载均衡)。
2. Topic 在 ZooKeeper 中的注册
在 Kafka 中,所有 Topic 与 Broker 的对应关系都由 ZooKeeper 来维护,在 ZooKeeper 中,通过建立专属的节点来存储这些信息,其路径为
/brokers/topics/{topic_name}
前面说过,为了保障数据的可靠性,每个 Topic 的 Partition 实际上是存在备份的,并且备份的数量由 Kafka 机制中的 Replicas
来控制。那么问题来了,如下图所示,假设某个 TopicA 被分为 2 个 Partition,并且存在两个备份,由于这 2 个 Partition(1-2)被分布在不同的 Broker 上,同一个 Partiton 与其备份不能(也不应该)存储于同一个 Broker 上
。以 Partition1 为例,假设它被存储于 Broker2,其对应的备份分别存储于 Broker1 和 Broker4,有了备份,可靠性得到保障,但数据一致性
却是个问题。
为了保障数据的一致性,ZooKeeper 机制得以引入。基于 ZooKeeper,Kafka 为每一个 Partition 找一个节点作为 Leader,其余备份作为 Follower;接续上图的例子,就 TopicA 的 Partition1 而言,如果位于 Broker2(Kafka 节点)上的 Partition1 为 Leader,那么位于 Broker1 和 Broker4 上面的 Partition1 就充当 Follower,则有下图:
基于上图的架构,当 Producer Push 的消息写入 Partition(分区)时,作为 Leader 的 Broker(Kafka 节点)会将消息写入自己的分区,同时还会将此消息复制到各个 Follower,实现同步。如果某个 Follower 挂掉,Leader 会再找一个替代并同步消息;如果 Leader 挂了,Follower 们会选举出一个新的 Leader 替代,继续业务,这些都是由 ZooKeeper 完成的。
3. Consumer 在 ZooKeeper 中的注册
- consumer group 注册
与 Broker、Topic 注册类似,Consumer Group
注册本质上也是在 ZooKeeper 中创建专属的节点,以记录相关信息,其路径为 /consumers/{group_id}
。
这里补充一点,在 ZooKeeper 中,/consumers/{group_id}
虽然被称为节点,但本质上是一个目录
。既然是目录,在记录信息时,就可以根据信息的不同,进一步创建子目录(子节点),分别记录不同类别的信息。对于 Consumer Group 而言,有三类信息需要记录,因此,/consumers/{group_id} 下还有三个子目录,如下所示。
ids
:Consumer Group 中有多个 Consumer,ids 用于记录这些 Consumer;owners
:记录该 Consumer Group 可消费的 Topic 信息;offsets
:记录 owners 中每个 Topic 的所有 Partition 的 Offset。
- consumer注册
原理同 Consumer Group 注册,其节点路径比较特殊,需在路径 /consumers/{group_id}/ids
下创建专属子节点,它是临时的。比如,某 Consumer 的临时节点路径为 :
/ consumers/{group_id}/ids/my_consumer_for_test-1223234-fdfv1233df23
- 负载均衡
通过前面的学习,我们知道,对于一条消息,订阅了它的 Consumer Group 中只能有一个 Consumer 消费它。那么就存在一个问题:一个 Consumer Group 中有多个 Consumer,如何让它们尽可能均匀地消费订阅的消息呢(也就是负载均衡)?这里不讨论实现细节,但要实现负载均衡,实时获取 Consumer 的数量显然是必要的,通过 Watch 机制
监听/ consumers/{group_id}/ids
下子节点的事件便可实现
- Producers 负载均衡
前面已经介绍过,为了负载均衡和避免连锁反应,Kafka 中,同一个 Topic 的 Partition 会尽量分散到不同的 Broker 上。而 Producers 则根据指定的 Topic 将消息 Push 到相应的 Partition,那么,如何将消息均衡地 Push 到各个 Partition 呢?这便是 Producers 负载均衡的问题。
Producers 启动后同样也要进行注册(依然是创建一个专属的临时节点),为了负载均衡,Producers 会通过 Watcher 机制监听 Brokers 注册节点的变化。一旦 Brokers 发生变化,如增加、减少,Producers 可以收到通知并更新自己记录的 Broker 列表 。此外,基于 ZooKeeper 提供的 Watcher 机制,还可以监听其它在 ZooKeeper 上注册的节点,如 Topic、Consumer 等。
Producer 向 Kafka 集群 Push 消息的时候,必须指定 Topic
,不过,Partition 却是非必要的。事实上,目前高级的客户端已经不提供指定 Partition 的接口。虽然不提供,但并不代表无须指定 Partition,只是隐藏了细节。通常有两种方式用于指定 Partition。
-
低级接口
在指定 Topic 的同时,需指定 Partition
编号(0、1、2……N),消息数据将被 Push 到指定的 Partition 中。从负载均衡的角度看,这并不是一种友好的方式。 -
高级接口
不支持指定 Partition,隐藏相关细节,内部则采用轮询
、对传入 Key 进行 Hash 等策略将消息数据均衡地发送到各个 Partition
。此外,有一些 Kafka 客户端还支持自定义负载均衡策略
。
- Consumer 负载均衡
基于 Producer 的负载均衡策略,对于任意一个 Topic,其各个 Partition 中消息量相对均衡。进一步,对于 Topic 的任意一条消息,订阅了它的任何一个 Consumer Group 中都只能有一个 Consumer 消费它,在此约束下,如何实现 Consumer 均衡地消费消息呢?
一种最朴实的想法是,对于订阅的 Topic,既然 Partition 中的消息是均衡的,那么,可以为 Consumer Group 中的各个 Consumer 分别指定不同的 Partition,只要保证该过程“相对公平”即可。不过,需要注意的是,Consume Group 中 Consumer 的数量是动态变化的,Topic 的 Partition 数量也不是固定值,如何“均匀”分配呢?
借助 ZooKeeper 实现负载均衡 :
在 Consumer 消费消息时,高级别 API 只需指定 Topic 即可,隐藏了负载均衡策略;而低级别的 API 通常需要同时指定 Topic 和 Partition,需要自行实现负载均衡策略。高级别 API 的负载均衡策略需借助 ZooKeeper 实现,具体原理如下。
前已述及,Consumer Group、Consumer、Broker 都会在 ZooKeeper 中注册节点,因此,基于 ZooKeeper 提供的 Watcher,Consumer 可以监听同一 Group 中 Consumers 的变化,以及 Broker 列表的变化。进一步,根据 Consumer 列表,将 Partition 排序后,轮流进行分配。由于这是一个动态过程,相应的负载均衡被称为 Rebalance,其描述如下:
6.记录消费进度offset
offset是 /consumers/[group_id] 下的一个子节点。Kafka 中,Consumer 采用 Pull 模式消费相应 Partition 中的消息,是一种异步消费模式。为了避免因 Consumer 故障、重启、Rebalance 等原因造成重复消费、遗漏消费消息,需要记录 Consumer 对 Partition 中消息的消费进度,即偏移量 Offset。Offset 在 ZooKeeper 中,有一个专属的节点(目录)用于记录 Offset
- 记录 Partition 与 Consumer 的关系
Consumer Group 在 ZooKeeper 上的注册节点为 /consumers/[group_id]
,而 Consumer Group 中的 Consumer 在 ZooKeeper 上的注册节点为 /consumers/[group_id] / owners
,它们共享一个 Group ID。为了 Consumer 负载均衡,同一个 Group 订阅的 Topic 下的任一 Partition 都只能分配给一个 Consumer。Partition 与 Consumer 的对应关系也需要在 ZooKeeper 中记录,路径为:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
补充:这个路径也是一个临时节点,进行 Rebalance 时会被删除,而后依据新的对应关系重建。此外,[broker_id-partition_id] 是一个消息分区的标识,其内容就是该消息分区消费者的 Consumer ID,通常采用 hostname:UUID 形式表示。
2. Kafka工作流程
Kafka - 3.x 图解Broker总体工作流程
消息队列的通信模式
1. 点对点模式(queue)
在点对点消息系统中,消息持久化到一个队列中。此时,将有一个或多个消费者消费队列中的数据。但是一条消息只能被消费一次。当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。该模式即使有多个消费者同时消费数据,也能保证数据处理的顺序。这种架构描述示意图如下:
特点:点对点:Queue,不可重复消费
2. 订阅/发布模式(topic)
在发布-订阅消息系统中,消息被持久化到一个topic
中。与点对点消息系统不同的是,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费
,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。该模式的示例图如下:
特点:发布/订阅:Topic,可以重复消费
kafka工作特点
- 同一主题下的
不同分区包含的消息是不同
的,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。offset 是消息在分区中的唯一标识,Kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,Kafka 保证的是分区有序而不是主题有序
。 - 一个主题可以横跨多个 broker,以此来提供比单个 broker 更强大的性能。
- Kafka 为分区引入了
多副本
(Replica)机制,通过增加副本数量可以提升容灾
能力。 - 同一分区的
不同副本中保存的是相同的消息
(在同一时刻,副本之间并非完全一样),副本之间是“一主多从”的关系,其中leader 副本负责处理读写请求
,follower 副本只负责与 leader 副本的消息同步
。副本处于不同的 broker 中,当 leader 副本出现故障时,从 follower 副本中重新选举
新的 leader 副本对外提供服务。Kafka 通过多副本机制实现了故障的自动转移,当 Kafka 集群中某个 broker 失效时仍然能保证服务可用。
topic组成
Kafka 工作流程和存储机制
Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 topic 的。
在 Kafka 中,一个 topic 可以分为多个 partition,一个 partition 分为多个 segment,每个 segment 对应两个文件:.index 和 .log 文件
topic 是逻辑
上的概念,而 patition 是物理
上的概念,每个 patition 对应一个 log 文件,而 log 文件中存储的就是 producer 生产的数据,patition 生产的数据会被不断的添加到 log 文件的末端,且每条数据都有自己的 offset。
消费组中的每个消费者,都是实时记录自己消费到哪个 offset,以便出错恢复,从上次的位置继续消费。
生产者发送流程
在消息发送的过程中,涉及到了两个线程——main 线程
和 Sender 线程
。在 main 线程中创建了一个双端队列 RecordAccumulator
。main 线程
将消息发送给 RecordAccumulator
,Sender 线程
不断从 RecordAccumulator
中拉取消息发送到 Kafka Broker
。
1. 主线程
在主线程中由 kafkaProducer
创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,
也称为消息收集器)中。
拦截器
: 可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。序列化器
: 用于在网络传输中将数据序列化为字节流进行传输,保证数据不会丢失。分区器
: 用于按照一定的规则将数据分发到不同的kafka broker节点中
2. Sender 线程
Sender线程是Kafka Producer内部的一个后台线程,它负责从RecordAccumulator中拉取消息并发送到Kafka broker。Sender线程的主要工作如下:
从RecordAccumulator拉取消息
:Sender线程定期轮询(poll)RecordAccumulator,检查是否有新消息需要发送。这个轮询是异步的,因此主线程不需要等待消息被发送。构建请求
:当Sender线程发现有消息需要发送,它会构建一个或多个ProducerRequest,每个请求包含多个消息,以便进行有效的批量发送。发送消息到Kafka broker
:Sender线程将构建的请求发送到Kafka broker,等待来自broker的响应。一旦消息被成功接收并记录在Kafka broker中,Sender线程会通知RecordAccumulator,以便它可以更新消息的状态。
3. RecordAccumulator
RecordAccumulator是Producer内部的一个共享变量,用于暂存即将发送到Kafka broker的消息。主要功能包括:
暂存消息
:主线程将消息发送到RecordAccumulator中,使其在等待Sender线程处理。管理消息的状态
:RecordAccumulator跟踪每条消息的发送状态,以确保消息被成功发送到Kafka broker。一旦消息被成功写入到Kafka broker的日志中,RecordAccumulator会将消息的状态标记为已发送。负责消息批量化
:RecordAccumulator也有助于消息的批量发送,以减少网络开销和提高性能。
重要参数
参数名称 | 描述 |
---|---|
bootstrap.servers | 生产者连接集群所需的 broker 地址清单。可以设置 1 个或多个,中间用逗号隔开。生产者从给定的 broker 里查找到其他 broker 信息。 |
key.serializer, value.serializer | 指定发送消息的 key 和 value 的序列化类型。要写全类名。(反射获取) |
buffer.memory | RecordAccumulator 缓冲区总大小,默认 32m。 |
batch.size | 缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。 |
linger.ms | 如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。 |
acks | 0:生产者发送过来的数据,不需要等数据落盘应答。 1:生产者发送过来的数据,Leader 数据落盘后应答. -1(all):生产者发送过来的数据,Leader 和 ISR 队列里面的所有节点数据都落盘后应答。默认值是 -1 |
max.in.flight.requests.per.connection | 允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字。 |
retries | 当消息发送出现错误的时候,系统会重发消息。retries 表示重试次数。默认是 int 最大值,2147483647。 如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1 否则在重试此失败消息的时候,其他的消息可能发送成功了。 |
retry.backoff.ms | 两次重试之间的时间间隔,默认是 100ms。 |
enable.idempotence | 是否开启幂等性,默认 true,开启幂等性。 |
compression.type | 生产者发送的所有数据的压缩方式。默认是 none,不压缩。支持压缩类型:none、gzip、snappy、lz4 和 zstd。 |
kafka生产者工作流程
- 生产者从Kafka集群获取分区leader信息
- 生产者将消息发送给leader
- leader将消息写入到本地磁盘
- follower从leader拉取消息数据
- follower将消息写入到本地磁盘后向leader发送ACK
- leader收到所有follow的ACK后向生产者发送ACK
Kafka 消费方式
Kafka 消费者工作流程
Kafka 消费者之消费方式、工作流程、消费者案例(订阅主题、订阅分区)、消费者组案例、分区的分配以及再平衡、offset 位移、消费者事务、数据积压(消费者如何提高吞吐量)
消费者总体工作流程
- 生产者将数据发送到指定topic中
- Kafka将数据以partition的方式存储到broker上。Kafka支持数据均衡,例如生产者生成了两条消息,topic有两个partition,那么Kafka将在两个partition上分别存储一条消息
- 消费者订阅指定topic的数据
- 当消费者订阅topic中消息时,Kafka将当前的offset发给消费者,同时将offset存储到Zookeeper中
- 消费者以特定的间隔(如100ms)向Kafka请求数据
- 当Kafka接收到生产者发送的数据时,Kafka将这些数据推送给消费者
- 消费者受到Kafka推送的数据,并进行处理
- 当消费者处理完该条消息后,消费者向Kafka broker发送一个该消息已被消费的反馈
- 当Kafka接到消费者的反馈后,Kafka更新offset包括Zookeeper中的offset。
- 以上过程一直重复,直到消费者停止请求数据
- 消费者可以重置offset,从而可以灵活消费存储在Kafka上的数据
消费者组原理
简言之,可以在GC有分叉,partition不可以有分叉
消费者是以 consumer group
消费者组的方式工作,由一个或者多个消费者组成一个组, 共同消费一个 topic。每个分区在同一时间只能由 group 中的一个消费者读取,但是多个 group 可以同时消费这个 partition。在图中,有一个由三个消费者组成的 group,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也可以叫做某个消费者是某个分区的拥有者。
在这种情况下,消费者可以通过水平扩展的方式同时读取大量的消息。另外,如果一个消费者失败了,那么其他的 group 成员会自动负载均衡读取之前失败的消费者读取的分区。
消费者组最为重要的一个功能是实现广播与单播的功能。一个消费者组可以确保其所订阅的 Topic 的每个分区只能被从属于该消费者组中的唯一一个消费者所消费;如果不同的消费者组订阅了同一个 Topic,那么这些消费者组之间是彼此独立的,不会受到相互的干扰。
如果我们希望一条消息可以被多个消费者所消费,那么可以将这些消费者放到不同的消费者组中,这实际上就是广播的效果;如果希望一条消息只能被一个消费者所消费,那么可以将这些消费者放到同一个消费者组中,这实际上就是单播的效果。
消费者组初始化流程
先选出使用哪台服务器的coordinator,根据groupid的hashcode值进行50求模处理,就得到了分区编号,该分区编号在哪个broker节点上,则该节点的coordinator就负责本次消费者组的事务协调。
消费者重要参数
bootstrap.servers
向 Kafka 集群建立初始连接用到的 host/port 列表。
key.deserializer 和value.deserializer指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。group.id
标记消费者所属的消费者组。enable.auto.commit
默认值为true,消费者会自动周期性地向服务器提交偏移量。auto.commit.interval.ms
如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。auto.offset.reset
当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。anything:向消费者抛异常。offsets.topic.num.partitions
__consumer_offsets 的分区数,默认是 50 个分区。heartbeat.interval.ms
Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于 session.timeout.ms ,也不应该高于session.timeout.ms 的 1/3。session.timeout.ms
Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。max.poll.interval.ms
消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。fetch.min.bytes
默认 1 个字节。消费者获取服务器端一批消息最小的字节数。fetch.max.wait.ms
默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。
-fetch.max.bytes
默认 Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影响。max.poll.records
一次 poll 拉取数据返回消息的最大条数,默认是 500条。
Kafka选择分区的模式(3种)
-
方便在集群中扩展
,每个 Partition 可以通过调整以适应它所在的机器,而一个topic 又可以有多个 Partition 组成,因此整个集群就可以适应任意大小的数据了;消息日志文件会受到所在机器的文件系统大小的限制,分区之后,理论上一个topic可以处理任意数量的消息数据。 -
可以
提高并发
,因为可以以 Partition 为单位读写了。
分区的原则
- 指定了 patition,则直接使用;
- 未指定 patition 但指定 key,通过对 key 进行 hash 出一个 patition
- patition 和 key 都未指定,使用轮询选出一个 patition。
Kafka 判断一个节点是否还活着的条件
- 节点必须可以维护和 ZooKeeper 的连接,Zookeeper 通过
心跳机制
检查每个节点的连接 - 如果节点是个 follower,他必须能
及时的同步
leader 的写操作,延时不能太久
kafka 的 ack 的三种机制
为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到producer发送的数据后,都需要向producer发送ack
(acknowledgement确认收到),如果producer收到ack,就会进行下一轮的发送,否则重新发送数据.
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。
所以Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。
acks参数配置:
0
:这是最不可靠的模式。生产者在发送消息后不会等待
来自服务器的确认。这意味着消息可能会在发送之后丢失,而生产者将无法知道它是否成功到达服务器1
:这是默认模式,也是一种折衷方式。在这种模式下,生产者会在消息发送后等待来自分区领导者(leader)的确认
,但不会等待所有副本(replicas)的确认
。这意味着只要消息被写入分区领导者,生产者就会收到确认。如果分区领导者成功写入消息,但在同步到所有副本之前宕机,消息可能会丢失。-1
(all):这是最可靠的模式。在这种模式下,生产者会在消息发送后等待所有副本的确认
。只有在所有副本都成功写入消息后,生产者才会收到确认。这确保了消息的可靠性,但会导致更长的延迟。
3. kafka 中zookeeper的作用
深入浅出理解基于 Kafka 和 ZooKeeper 的分布式消息队列
Broker总体工作流程
1. broker启动后在zk中注册,如下图所示:
2. controller谁先注册,谁就会成为leader,如下图中如果broler0中controller先注册,即broler0中controller会成为leader
3. 由选举出来的Controller监听brokers节点变化
4. Controller决定Leader选举。
选举规则:在isr中存活为前提,按照AR(即Kafka分区中的所有副本统称)中排在前面的优先。例如ar[1,0,2], isr [1,0,2],那么leader就会按照1,0,2的顺序轮询
5. Controller将节点信息上传到ZK
6. 其他contorller从zk同步相关信息
7. 生产者往集群发送信息,fllower主动跟leader同步消息,并在底层采用log方式存储
8. 假设Broker1中Leader挂了
- Controller监听到节点变化
- 获取ISR
- 选举新的Leader(在isr中存活为前提,按照AR中排在前面的优先)
- 更新Leader及ISR
实例:
1)案例内容:模拟kafka上下线,查看zookeeper中数据变化2)查看kafka节点相关信息:① 查看zookeeper上的kafka集群节点信息
[zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids
[102, 103, 104]
② 查看当前kafka集群节点中的controller信息
[zk: localhost:2181(CONNECTED) 2] get /kafka/controller
{"version":1,"brokerid":103,"timestamp":"1637292471777"}
③ 查看kafka中的first主题的0号分区的状态
[zk: localhost:2181(CONNECTED) 2] get /kafka/brokers/topics/first/partitions/0/state
{"controller_epoch":24,"leader":102,"version":1,"leader_epoch":18,"isr":[102,103,104]}3)模拟kafka下线:停止hadoop103上的kafka
[xxx@hadoop103 kafka]$ bin/kafka-server-stop.sh4)查看kafka相关节点信息
① 查看zookeeper上的kafka集群节点信息
[zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids
[102, 104]
② 查看当前kafka集群节点中的controller信息
[zk: localhost:2181(CONNECTED) 2] ls /kafka/controller
{"version":1,"brokerid":102,"timestamp":"1637292471777"}
③ 查看kafka中的first主题的0号分区的状态
[zk: localhost:2181(CONNECTED) 2] get /kafka/brokers/topics/partitions/0/state
{"controller_epoch":24,"leader":102,"version":1,"leader_epoch":18,"isr":[102,104]}5)重新启动hadoop103上的kafka服务
[xxx@hadoop103 kafka]$ bin/kafka-server-stop.sh6)再次查看上述节点,观察区别变化
4. kafka基本操作
启停
1. 启动Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
2. 启动kafka
bin/kafka-server-start.sh config/server.properties
3. 启动producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic quickstart-events
4. 启动consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 -topic quickstart-events --from-beginning
Topic
1. 创建Topic
/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
–topic 指定 Topic 名,–partitions 指定分区数,–replication-factor 指定备份数
2. 列出所有Topic
/bin/kafka-topics.sh --list --zookeeper localhost:2181
3. 查看Topic
/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
4. 增加 Topic 的 partition 数
/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --partitions 5
5. 删除Topic
/bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --delete
消息
1. 启动生产者发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name
- –broker-list: 指定 Kafka 集群的 broker 列表。在本例中,broker 列表是 localhost:9092,表示 Kafka broker 运行在本地机器的 9092 端口上。
- –topic: 指定要发送消息的主题名称。在本例中,主题名称是 topic-name。
2. 启动消费者消费消息
./bin/kafka-console-consumer.sh --bootstrap-server kafka-1:9092 --topic test-topic --from-beginning
- –bootstrap-server: 指定 Kafka 集群的 bootstrap 服务器地址。在本例中,bootstrap 服务器地址是 kafka-1:9092,表示 Kafka bootstrap 服务器运行在名为 kafka-1 的机器的 9092 端口上。
- –topic: 指定要消费消息的主题名称。在本例中,主题名称是 test-topic。
- –from-beginning: 指示从主题的开头开始消费消息。
3. 指定分区
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 0
- –offset: 指定要消费消息的偏移量。在本例中,偏移量是 latest,表示消费最新的消息。
- –partition: 指定要消费消息的分区号。在本例中,分区号是 0,表示消费分区 0 的消息。
4. 指定个数
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 0 --max-messages 1
- –max-messages: 指定要消费的最大消息数量。在本例中,最大消息数量是 1,表示只消费一条消息。
消费者Group
1. 查看列表
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
2. 查看详情
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --describe
3. 删除group中的Topic
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --topic test --delete
4. 删除Group
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --delete
相关文章:

kafka--发布-订阅消息系统
1. Kafka概述 1. kafka是什么 kafka是分布式的、高并发的、基于发布/订阅模式的消息队列软件系统。 kafka中的重要组件 Producer:消息生产者,发布消息到Kafka集群的终端或服务Consume:消费者,从Kafka集群中消费消息的终端或服…...

2024最新软件测试面试题。内附答案+文档
🍅 视频学习:文末有免费的配套视频可观看 🍅 点击文末小卡片,免费获取软件测试全套资料,资料在手,涨薪更快 1、你以前工作时的测试流程是什么? 参考答案:(灵活回答&…...

新加坡很火的slots游戏代投Facebook广告新流量趋势
新加坡很火的slots游戏代投Facebook广告新流量趋势 在新加坡这片充满活力的土地上,Slots游戏以其独特的魅力和吸引力,迅速成为了许多玩家的心头好。而Facebook,作为全球最大的社交媒体平台之一,为Slots游戏的推广提供了得天独厚的…...

C++ 实现字符串逆序
C 实现字符串逆序 思路: 输入一个字符串。使用双指针法,交换字符串的首尾字符,逐步向中间移动。输出逆序后的字符串。 #include <iostream> #include <string>using namespace std;void reverseString(string &str) {int …...

【项目实践】贪吃蛇
一、游戏效果展示二、博客目标三、使用到的知识四、Win32 API 介绍 4.1 WIn32 API4.2 控制台程序4.3 控制屏幕上的坐标COORD4.4 GetStdHandle4.5 GetConsoleCursorInfo 4.5.1 CONSOLE_CURSOR_INFO 4.6 SetConsoleCursorInfo4.7 SetConsoleCursorPosition4.8 GetAsyncKeyState 五…...

将exe文件添加到注册表中,实现开机时自动运行
目录 一、前言 二、代码 三、使用步骤 1.编译生成exe文件、 2.以管理员身份运行代码 3.打开注册表,验证结果 一、前言 在Windows操作系统中,将exe文件的路径添加到注册表下,主要用于实现程序的开机自动运行功能。 注册表路径为…...

SQL使用注意事项
作为开发人员日常最为熟悉的工具sql。但是在实际使用中,有一些坑需要尽量避免,本文是对一些常用注意事项的总结 查询需要的。不要全部都查询。禁止使用存储过程,禁止使用外键。使用sql进行计算,要小心。(数据量大的情况…...

uniapp小程序IOS端,uni.createInnerAudioContext()无声音
可能的问题 路径中有中文字符需要使用uni.getBackgroundAudioManager()播放其他问题 解决办法 首先我的路径中没有中文字符,如果有的,可能需要转义一下或者干脆不使用中文字符,第二个也是从其他博客中看到的,我这边分享一下我的…...

第二节-K8s词汇表
关键字词汇表 https://kubernetes.io/zh-cn/docs/reference/glossary/?fundamentaltrue API Group (API 组)Kubernetes API 中的一组相关路径。 API 服务器亦称作:kube-apiserver API 服务器是 Kubernetes 控制平面的组件, 该组件负责公开了 Kubernetes API&…...

命令行运行git reflog(reference log)报错的解决办法
文章目录 1. 检查 Git 是否已安装2. 检查 PATH 环境变量3. 重新安装 Git 在Git中, reflog的英文全称是 “ reference log”。意思是 引用日志(参考日志)。它记录了本地仓库中HEAD和分支引用所指向的提交的变更历史。这包括了你所有的提交&…...

python3 imwrite 中文路径不成功解决方法
filename 中文路径 #cv2.imwrite(filename, frame) cv2.imencode(.jpg, frame)[1].tofile(filename)...

tapd 与国内外主流的8大项目管理软件大对比
对比Tapd与8大项目管理工具:PingCode、Worktile、Redmine、Teambition、广联达、Jira、禅道、飞书。 Tapd 是腾讯推出的一款敏捷开发管理工具,特别适合那些需要高效协作和快速迭代的敏捷开发团队。它支持多种敏捷方法论,包括Scrum和Kanban&am…...

IP地址配置
1.为虚拟机配置IP地址,网关,DNS 例如:手动给虚拟机配置IP地址为 192.168.5.50/24;网关地址为:192.168.5.2;DNS地址为:192.168.5.2 解题步骤如下: #配置IP地址 [rootlocalhost ~]#…...

【C#】ProgressBar进度条异步编程思想
1.控件介绍 进度条通常用于显示代码的执行进程进度,在一些复杂功能交互体验时告知用户进程还在继续。 在属性栏中,有三个值常用: Value表示当前值,Minimum表示进度条范围下限,Maximum表示进度条范围上限。 2.简单实…...

深入浅出3D感知中的优化与基于学习的技术1(原创系列)
近期几乎看了所有有关NERF技术论文,本身我研究的领域不在深度学习技术方向,是传统的机器人控制和感知。所以总结了下这部分基于学习的感知技术,会写一个新的系列教程讲解这部分三维感知技术的发展到最新的技术细节,并支持自己最近…...

【CentOS 7 上安装 Oracle JDK 8u333】
文章目录 下载 Oracle JDK 8u333:上传 RPM 包到服务器安装 Oracle JDK设置 JAVA_HOME 环境变量验证 下载 Oracle JDK 8u333 访问 https://www.oracle.com/java/technologies/javase/javase8-archive-downloads.html 找到 JDK 8u333 版本,并下载适用于 L…...

Nginx 常用配置与应用
Nginx 常用配置与应用 官网地址:https://nginx.org/en/docs/ 目录 Nginx 常用配置与应用 Nginx总架构 正向代理 反向代理 Nginx 基本配置反向代理案例 负载均衡 Nginx总架构 进程模型 正向代理 反向代理 Nginx 基本配置反向代理案例 负载均衡 Nginx 基本配置…...

基于Springboot的智慧养老中心管理系统
文章目录 项目介绍主要功能截图:部分代码展示设计总结项目获取方式🍅 作者主页:超级无敌暴龙战士塔塔开 🍅 简介:Java领域优质创作者🏆、 简历模板、学习资料、面试题库【关注我,都给你】 🍅文末获取源码联系🍅 项目介绍 基于Springboot的智慧养老中心管理系统,…...

数据结构笔记第3篇:双向链表
1、双向链表的结构 注意:这里的 "带头" 跟前面我们说的 "头结点" 是两个概念,实际前面的在单链表阶段称呼不严谨,但是为了同学们更好的理解就直接称为单链表的头结点。 带头链表里的头结点,实际为 "哨兵…...

详细对比Java SPI、Spring SPI 和 Dubbo SPI
SPI(Service Provider Interface)概述 定义:SPI是一种动态替换发现机制,用于实现接口与实现的解耦,提高框架的可扩展性。核心思想:解耦和方便扩展。 Java SPI 约定规范: 扩展类文件放在META-…...

CPU的核心数和线程数
CPU的核心数和线程数 一、关系: 1、线程数可以模拟出不同的CPU核心数。 CPU的核心数指的是硬件上存在着几个核心,而线程数可以模拟出多个核心数的功能。线程数越多,越有利于同时运行多个程序,因为线程数等同于在某个瞬间CPU能同…...

电脑游戏录屏,3款实用软件推荐给你
在电竞游戏热潮席卷全球的今天,电脑游戏录屏早已不再是简单的画面捕捉,它成为了记录电竞风采、打造专属游戏记忆的重要手段。通过游戏录屏,我们可以定格游戏中的精彩瞬间,重温那些令人热血沸腾的电竞时刻。那么,在进行…...

C#桌面应用开发:番茄定时器
C#桌面应用开发:番茄定时器 1、环境搭建和工程创建: 步骤一:安装visual studio2022 步骤二:新建工程 2、制作窗体部件 *踩过的坑: (1)找不到工具箱控件,现象如下:…...

PHP智慧门店微信小程序系统源码
🔍【引领未来零售新风尚】🔍 🚀升级启航,智慧零售新篇章🚀 告别传统门店的束缚,智慧门店v3微信小程序携带着前沿科技与人性化设计,正式启航!这个版本不仅是对过往功能的全面优化&a…...

SerDes介绍以及原语使用介绍(2)OSERDESE2原语仿真
文章目录 前言一、SDR模式1.1、设计代码1.2、testbench代码1.3、仿真分析 二、DDR模式下2.1、设计代码2.2、testbench代码2.3、仿真分析 三、OSERDES2级联3.1、设计代码3.2、testbench代码3.3、代码分析 前言 上文通过xilinx ug471手册对OSERDESE有了简单的了解,接…...

【稳定检索/投稿优惠】2024年教育、人文发展与艺术国际会议(EHDA 2024)
2024 International Conference on Education, Humanities Development and Arts 2024年教育、人文发展与艺术国际会议 【会议信息】 会议简称:EHDA 2024 大会时间:点击查看 截稿时间:点击查看 大会地点:中国北京 会议官网&#…...

Docker拉取失败,利用 Git将 Docker镜像重新打 Tag 推送到阿里云等其他公有云镜像仓库里
目录 一、开通阿里云容器镜像服务 二、Git配置 三、去DockerHub找镜像 四、编写images.txt文件 五、演示 六、其他注意事项 最近一段时间 Docker 镜像一直是 Pull 不下来的状态,想直连 DockerHub 是几乎不可能的。更糟糕的是,很多原本可靠的国内…...

【区分vue2和vue3下的element UI Breadcrumb 面包屑组件,分别详细介绍属性,事件,方法如何使用,并举例】
在 Vue 2 中,Element UI 提供了 el-breadcrumb 面包屑组件,而在 Vue 3 中,Element UI 的官方版本并没有直接更新以支持 Vue 3,但有一个类似的库叫做 Element Plus,它是为 Vue 3 设计的。 Vue 2 Element UI 在 Vue 2…...

gdb调试命令大全
基本命令 #gdb test test是要调试的程序,由gcc test.c -g -o test生成。进入后提示符变为(gdb) 。 start : 指令会执行程序至main() 主函数的起始位置,即在main() 函数的第一行语句处停止执行(该行代码尚未执行) cont…...

ESP32之arduino环境安装及点灯
目录 前言 前两天安装了VScode,奈何资源找的困难,于是咨询淘宝客服,他说arduino用的多,资源多.然后就安装了a…...