当前位置: 首页 > news >正文

探究Kafka原理-3.生产者消费者API原理解析

  • 👏作者简介:大家好,我是爱吃芝士的土豆倪,24届校招生Java选手,很高兴认识大家
  • 📕系列专栏:Spring源码、JUC源码、Kafka原理
  • 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
  • 🍂博主正在努力完成2023计划中:源码溯源,一探究竟
  • 📝联系方式:nhs19990716,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬👀

文章目录

  • API 开发:producer 生产者
    • 生产者 api 示例
    • 必要的参数配置
    • 发送消息
      • 发后即忘( fire-and-forget)
      • 同步发送(sync )
      • 异步发送(async )
  • API 开发:consumer 消费
    • subscribe 订阅主题
      • 消费者组再均衡分区分配策略
        • Range Strategy
        • Round-Robin Strateg
        • Sticky Strategy
        • Cooperative Sticky Strategy
      • 消费者组再均衡流程
        • GroupCoordinator 介绍
      • eager 协议再均衡步骤细节
        • 定位 Group Coordinator
        • 加入组 Join The Group
        • 组信息同步 SYNC Group
        • 心跳联系 HEART BEAT
        • 再均衡流程
    • assign 订阅主题
    • subscribe 与 assign 的区别
    • 取消订阅
    • 消息的消费模式
    • 指定位移消费
    • 自动提交消费者偏移量
    • 手动提交消费者偏移量(调用 kafka api)
    • 手动提交位移(时机的选择)
    • 消费者提交偏移量方式的总结

API 开发:producer 生产者

生产者 api 示例

一个正常的生产逻辑需要具备以下几个步骤
(1)配置生产者参数及创建相应的生产者实例
(2)构建待发送的消息
(3)发送消息
(4)关闭生产者实例

首先,引入 maven 依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.3.1</version>
</dependency>

采用默认分区方式将消息散列的发送到各个分区当中

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
/*kafka生产者api代码示例*/
public class MyProducer {public static void main(String[] args) throws InterruptedException {Properties props = new Properties();//设置 kafka 集群的地址 必选props.put("bootstrap.servers", "doitedu01:9092,doitedu02:9092,doitedu03:9092");//ack 模式,取值有 0,1,-1(all) , all 是最慢但最安全的 消息发送,应答级别props.put("acks", "all");//序列化器 因为业务数据有各种类型的,但是kafka底层存储里面不可能有各种类型的,只能是序列化的字节,所以不管你要发什么东西给它,都要提供一个序列化器,帮你能够把key value序列化成二进制的字节// 因为kafka底层的存储是没有类型维护机制的,用户所发的所有数据类型,都必须 序列化成byte[],所以kafka的producer需要一个针对用户所发送的数据类型的序列化工具类,且这个序列化工具类,需要实现kafka所提供的序列工具接口。props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");/*需要额外的指定泛型,key value*/Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 100; i++)// 其调用是异步的,数据的发送动作在producer的底层是异步线程的producer.send(new ProducerRecord<String, String>("test",Integer.toString(i), "dd:"+i));// 在这里面可以通过逻辑判断去指定发送到那个topic中//Thread.sleep(100);producer.close();}
}

消息对象 ProducerRecord,除了包含业务数据外,还包含了多个属性:

public class ProducerRecord<K, V> {private final String topic;private final Integer partition;private final Headers headers;private final K key;private final V value;private final Long timestamp;

其发送方法中,根据参数的不同,有不同的构造方法

在这里插入图片描述

其实这样也就意味着我们可以把消息发送到不同的topic。

必要的参数配置

Kafka 生产者客户端 KakaProducer 中有 3 个参数是必填的。

bootstrap.servers / key.serializer / value.serializer

为了防止参数名字符串书写错误,可以使用如下方式进行设置:

pro.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
pro.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

发送消息

创建生产者实例和构建消息之后 就可以开始发送消息了。发送消息主要有 3 种模式:

发后即忘( fire-and-forget)

发后即忘,它只管往 Kafka 发送,并不关心消息是否正确到达。
在大多数情况下,这种发送方式没有问题;
不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。
这种发送方式的性能最高,可靠性最差。

Future<RecordMetadata> send = producer.send(rcd);

同步发送(sync )

try {producer.send(rcd).get();
} catch (Exception e) {e.printStackTrace();
}

因为Future的get方法是同步阻塞的。

异步发送(async )

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata 和Exception,如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class MyProducer {public static void main(String[] args) throws InterruptedException {Properties props = new Properties();// Kafka 服务端的主机名和端口号props.put("bootstrap.servers", "doitedu01:9092,doitedu02:9092,doitedu03:9092");// 等待所有副本节点的应答props.put("acks", "all");// 消息发送最大尝试次数props.put("retries", 0);// 一批消息处理大小props.put("batch.size", 16384);// 增加服务端请求延时props.put("linger.ms", 1);// 发送缓存区内存大小props.put("buffer.memory", 33554432);// key 序列化props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");// value 序列化props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);for (int i = 0; i < 50; i++) {kafkaProducer.send(new ProducerRecord<String, String>("test", "hello" + i),new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (metadata != null) {System.out.println(metadata.partition()+ "-"+ metadata.offset());}}});}kafkaProducer.close();}
}

API 开发:consumer 消费

import org.apache.kafka.clients.consumer.*;
import java.util.Arrays;
import java.util.Properties;
public class MyConsumer {public static void main(String[] args) {Properties props = new Properties();// 定义 kakfa 服务的地址,不需要将所有 broker 指定上// 客户端只要知道一台服务器,就能通过这一台服务器来获知整个集群的信息(所有的服务器、主机名等)// 如果你只填写一台,万一,你得客户端启动的时候,宕机了不在线,那就无法连接到集群了// 如果你填写了堕胎,有一个好处就是,万一连不上其中一个,可以去连接其它的props.put("bootstrap.servers", "doitedu01:9092");// 制定 consumer groupprops.put("group.id", "g1");// 按照一个时间间隔自动去提交偏移量// 是否自动提交 offsetprops.put("enable.auto.commit", "true");// 自动提交 offset 的时间间隔props.put("auto.commit.interval.ms", "1000");// key 的反序列化类props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");// value 的反序列化类props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");// kafka的消费者,默认是从属组之前所记录的偏移量开始消费,如果找不到之前记录的偏移量,则从如下参数配置的策略确定消费起始偏移量// 如果没有消费偏移量记录,则自动重设为起始 offset:latest, earliest, none/*earliest 自动重置到每个分区的最前一条消息latest   自动重置到每个分区的最新一条消息none	 没有重置策略*/props.put("auto.offset.reset","earliest");// 定义 consumerKafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 消费者订阅的 topic, 可同时订阅多个// subscribe订阅,是需要参与消费者组的再均衡机制才能真正获得自己要消费的topic及其分区// 只要消费者组里的消费者 变化了 就要发生再均衡consumer.subscribe(Arrays.asList("first", "test","test1"));// 显式指定消费起始偏移量(如果同时设置了消费者 偏移策略的话,以手动指定的为准)// 在设置消费分区起始偏移量这里,存在一个点,如果此时到这里了然后消费者组再均衡机制还没有做完,那么就会报错,因为可能这个消费者还没有被分配到这个分区  针对这个问题,其实动态再分配是有一个过程 和 时间的,谁也不知道要等多久,所以最好想的sleep就不容易实现了。想要解决这个问题有两种办法1.在这个过程中 拉一次数据,能拉到就代表再均衡机制完成了 consumer.poll(Long.MAX_VALVE);这里是无意义的拉一次数据,主要是为了确保分区分配已完成,然后就能够去定位偏移量了。但是这种方式不符合最初的设计初衷,如果是使用subscribe来订阅主题,那就意味着是应该参与这个组的均衡的,参与了,那就不要去指定组的偏移量了,应该听从组的分配。2.既然要自己指定一个确定的起始消费位置,那通常隐含之意就是不需要去参与消费者组的自动再均衡机制那么就不要使用subscribe来订阅主题consumer.assign(Arrays.asList(new TopicPartition("ddd",0))) 使用这个是不参与消费者的自动再均衡的。//TopicPartition first0 = new TopicPartition("first",0);//TopicPartition first1 = new TopicPartition("first",1);//consumer.seek(first0,10);//consumer.seek(first1,15);/*kafka消费者的起始消费位置有两种决定机制1.手动指定了起始位置,它肯定从你指定的位置开始2.如果没有手动指定位置,它会在找消费组之前所记录的偏移量开始3.如果之前的位置也获取不到,就看参数 : auto.offset.reset 所指定的重置策略*/while (true) {// 读取数据,读取超时时间为 100msConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records)// ConsumerRecord中,不光有用户的业务数据,还有kafka塞入的元数据String key = record.key();String value = record.value();// 本条数据所属的topicString topic = record.topic();// 本条数据所属的分区int partition = record.partition// 本条数据的offsetlong offset = record.offset();// 当前这条数据所在分区的leader的朝代纪年Optional<Integer> leaderEpoch = record.leaderEpoch();// 在kafka的数据底层存储中,不光有用户的业务数据,还有大量元数据,timestamp就是其中之一:记录本条数据的时间戳,但是时间戳有两种类型,本条数据的创建时间(生产者)、本条数据的追加时间(broker写入log文件的时间)TimestampType timestampType = record.timestampType();long timestamp = record.timestamp();// 数据头,是生产者在写入数据时附加进去的(相当于用户自己的元数据)// 在生产者发送数据的时候,有一个构造方法可以允许你自己携带自己的 headersHeaders headers = record.headers();System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(),record.key(), record.value());}}
}

如果消息还没生产到指定的位置呢?这是一个很有趣的问题,到底是等,还是报错

kafka-console-consumer.sh --bootstrap-server doit01:9092 --topic test --offset 100000 --partition 0  

假设分区0 中并没有offset >= 100000 的消息,执行之后,并不会报错,但是如果超标了,就会自动重置到最新的(lastest)。

如果如果指定的offset大于最大可用的offset,那么就会定义到最后一条消息。


subscribe 订阅主题

subscribe 有如下重载方法:

public void subscribe(Collection<String> topics,ConsumerRebalanceListener listener)
public void subscribe(Collection<String> topics)
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
public void subscribe(Pattern pattern)

通过这几个构造函数来看,其中有ConsumerRebalanceListener listener 其实就是 再均衡 的监听器,再均衡的过程中,会调用这个方法。

Properties props = new Properties();
// 从配置文件中加载写好的参数
props.load(Consumer.class.getClassLoader.getResourceAsStream("consumer.properties"));
// 手动set一些参数进去
props.setProperty();
......KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);// reb-1 主题 3个分区
// reb-2 主题 2个分区
consumer.subscribe(Arrays.asList("reb-1","reb-2"),new ConsumerRebalanceListener(){// 再均衡分配过程中,消费者会取消先前所分配的主题、分区// 取消了之后,consumer会调用下面的方法public void onPartitionsRevoked(Collection<TopicPartition> partitions){}// 再均衡过程中,消费者会重新分配到新的主题、分区// 分配了新的主题 和 分区之后,consumer底层会调用下面的方法public void onPartitionAssigned(Collection<TopicPartition> partitions){}
});
但是以上的过程 懒加载,只有消费者真正 开始 poll的时候,才会实现再均衡分配的过程。

在这里插入图片描述

现有的再均衡原则就是每次有消费者增减 都会重新分配,其实就是先全部取消,然后又重新分配了呢,这过程中肯定存在消耗,得先把工作暂停,把偏移量记好,另外一个人接手的时候,还需要另外去读偏移量,重新从对应的位置开始。

而在kafka2.4.1中解决了这个重分配的问题。但是大多数使用的框架没有到这个版本,或者所使用的如spark flink等底层所依赖的kafka没有2.4.1这个版本。

消费者组再均衡分区分配策略

消费者组的意义何在?为了提高数据处理的并行度!

在这里插入图片描述

会触发 rebalance 的事件可能是如下任意一种:

  • 有新的消费者加入消费组。
  • 有消费者宕机下线,消费者并不一定需要真正下线,例如遇到长时间的 GC 、网络延迟导致消费者长时间未向 GroupCoordinator 发送心跳等情况时,GroupCoordinator 会认为消费者己下线。
  • 有消费者主动退出消费组(发送 LeaveGroupRequest 请求):比如客户端调用了 unsubscrible()方法取消对某些主题的订阅。
  • 消费组所对应的 GroupCoorinator 节点发生了变更。
  • 消费组内所订阅的任一主题或者主题的分区数量发生变化。

将分区的消费权从一个消费者移到另一个消费者称为再均衡(rebalance),如何 rebalance 也涉及到分区分配策略。

kafka 有两种的分区分配策略:range(默认) 和 round robin(新版本中又新增了另外 2 种)

我们可以通过 partition.assignment.strategy 参数选择 range 或 roundrobin。
partition.assignment.strategy 参数默认的值是 range。
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor

这个参数属于“消费者”参数!

Range Strategy
  • 先将消费者按照 client.id 字典排序,然后按 topic 逐个处理;
  • 针对一个 topic,将其 partition 总数/消费者数得到 商 n 和 余数 m,则每个 consumer 至少分到 n个分区,且前 m 个 consumer 每人多分一个分区;

举例说明 2:假设有 TOPIC_A 有 5 个分区,由 3 个 consumer(C1,C2,C3)来消费;

5/3 得到商 1,余 2,则每个消费者至少分 1 个分区,前两个消费者各多 1 个分区 C1: 2 个分区,C2:2 个分区, C3:1 个分区

接下来,就按照“区间”进行分配:

C1: TOPIC_A-0 TOPIC_A-1
C2: TOPIC_A-2 TOPIC_A_3
C3: TOPIC_A-4

举例说明 2:假设 TOPIC_A 有 5 个分区,TOPIC_B 有 3 个分区,由 2 个 consumer(C1,C2)来消费

先分配 TOPIC_A:

5/2 得到商 2,余 1,则 C1 有 3 个分区,C2 有 2 个分区,得到结果

C1: TOPIC_A-0 TOPIC_A-1 TOPIC_A-2
C2: TOPIC_A-3 TOPIC_A-4

再分配 TOPIC_B:

3/2 得到商 1,余 1,则 C1 有 2 个分区,C2 有 1 个分区,得到结果

C1: TOPIC_B-0 TOPIC_B-1
C2: TOPIC_B-2

最终分配结果:

C1: TOPIC_A-0 TOPIC_A-1 TOPIC_A-2 TOPIC_B-0 TOPIC_B-1
C2: TOPIC_A-3 TOPIC_A-4 TOPIC_B-2

如果共同订阅的主题很多,那也就意味着,排在前面的消费者拿到的分区会明显多余排在后面的。

而消费者本身有一个id,是根据id号去排序

以上就是该种模式的弊端,其实就是一个topic一个topic去分的。这个问题尤其是在订阅多个topic的时候最明显,分配单个topic的情况,也就多一个分区。

Round-Robin Strateg

将所有主题分区组成 TopicAndPartition 列表,并对 TopicAndPartition 列表按照其 hashCode 排序,然后,以轮询的方式分配给各消费者。

以上述问题来举例:

先对 TopicPartition 的 hashCode 排序,假如排序结果如下:

TOPIC_A-0 TOPIC_B-0 TOPIC_A-1 TOPIC_A-2 TOPIC_B-1 TOPIC_A-3 TOPIC_A-4 TOPIC_B-

然后按轮询方式分配

C1: TOPIC_A-0 TOPIC_A-1 TOPIC_B-1 TOPIC_A-4

C2: TOPIC_B-0 TOPIC_A-2 TOPIC_A-3 TOPIC_B-2

Sticky Strategy

对应的类叫做: org.apache.kafka.clients.consumer.StickyAssignor

sticky 策略的特点:

  • 要去打成最大化的均衡
  • 尽可能保留各消费者原来分配的分区

再均衡的过程中,还是会让各消费者先取消自身的分区,然后再重新分配(只不过是分配过程中会尽量让原来属于谁的分区依然分配给谁)

以一个例子来看

---开始
C1:A-P0		B-P1	B-P2
C2:B-P0		A-P1---加入C3后再分配
Range Strategy
C1:A-P0		A-P1
C2:B-P0		B-P2
C3:B-P1Sticky Strategy
C1:A-P0		B-P1
C2:B-P0		A-P1
C3:B-P2
Cooperative Sticky Strategy

对应的类叫做: org.apache.kafka.clients.consumer.ConsumerPartitionAssignor(最新的一种 2.4.1)

sticky 策略的特点:

  • 逻辑与 sticky 策略一致
  • 支持 cooperative 再均衡机制(再均衡的过程中,不会让所有消费者取消掉所有分区然后再进行重分配,影响到谁,就针对那个消费者进行即可)

消费者组再均衡流程

消费组在消费数据的时候,有两个角色进行组内的各事务的协调;

  • 角色 1: Group Coordinator (组协调器) 位于服务端(就是某个 broker)
  • 角色 2: Group Leader (组长) 位于消费端(就是消费组中的某个消费者)
GroupCoordinator 介绍

每个消费组在服务端对应一个 GroupCoordinator 其进行管理,GroupCoordinator 是 Kafka 服务端中用于管理消费组的组件。

消费者客户端中由 ConsumerCoordinator 组件负责与 GroupCoordinator 行交互;

ConsumerCoordinator 和 GroupCoordinator 最重要的职责就是负责执行消费者 rebalance 操作

eager 协议再均衡步骤细节

定位 Group Coordinator

coordinator 在我们组记偏移量的__consumer_offsets 分区的 leader 所在 broker 上

查找 Group Coordinator 的方式:

先根据消费组 groupid 的 hashcode 值计算它应该所在_consumer_offsets 中的分区编号:

在这里插入图片描述

Utils.abc(groupId.hashCode) % groupMetadataTopicPartitionCount
groupMetadataTopicPartitionCount 为 __consumer_offsets 的 分 区 总 数 , 这 个 可 以 通 过 broker 端 参 数
offset.topic.num.partitions 来配置,默认值是 50;

找到对应的分区号后,再寻找此分区 leader 副本所在 broker 节点,则此节点即为自己的 Grouping
Coordinator;

在这里插入图片描述

加入组 Join The Group

此阶段的重要操作之 1:选举消费组的 leader

private val members = new mutable.HashMap[String, MemberMetadata]var leaderid = members.keys.head
set集合本身无序的,取头部的一个,自然也是无序的

消费组 leader 的选举,策略就是:随机!

此阶段的重要操作之 2:选择分区分配策略

最终选举的分配策略基本上可以看作被各个消费者支持的最多的策略,具体的选举过程如下:

(1)收集各个消费者支持的所有分配策略,组成候选集 candidates。

(2)每个消费者从候选集 candidates 找出第一个自身支持的策略,为这个策略投上一票。

(3)计算候选集中各个策略的选票数,选票数最多的策略即为当前消费组的分配策略(如果得票一样,那就以组长的为主)。

其实,此逻辑并不需要 consumer 来执行,而是由 Group Coordinator 来执行。

组信息同步 SYNC Group

此阶段,主要是由消费组 leader 将分区分配方案,通过 Group Coordinator 来转发给组中各消费者

在这里插入图片描述

心跳联系 HEART BEAT

进入这个阶段之后,消费组中的所有消费者就会处于正常工作状态。

各消费者在消费数据的同时,保持与 Group Coordinator的心跳通信。

消费者的心跳间隔时间由参数 heartbeat.interval.ms 指定,默认值为 3000 ,即这个参数必须比
session.timeout.ms 参 数 设 定 的 值 要 小 ; 一 般 情 况 下 heartbeat.interval.ms 的 配 置 值 不 能 超 过
session.timeout.ms 配置值的 1/3 。这个参数可以调整得更低,以控制正常重新平衡的预期时间;

如果一个消费者发生崩溃,并停止读取消息,那么 GroupCoordinator 会等待一小段时间确认这个消费者死亡之后才会触发再均衡。在这一小段时间内,死掉的消费者并不会读取分区里的消息。
这 个 一 小 段 时 间 由 session.timeout. ms 参 数 控 制 , 该 参 数 的 配 置 值 必 须 在 broker 端 参 数
group.min.session.timeout. ms (默认值为 6000 ,即 6 秒)和 group.max.session. timeout. ms (默认值为 300000 ,即 5 分钟)允许的范围内

再均衡流程

eager 协议的再均衡过程整体流程如下图:

在这里插入图片描述

特点:再均衡发生时,所有消费者都会停止工作,等待新方案的同步

Cooperative 协议的再均衡过程整体流程如下图:

在这里插入图片描述

特点:cooperative 把原来 eager 协议的一次性全局再均衡,化解成了多次的小均衡,并最终达到全局均衡的收敛状态

指定集合方式订阅主题

consumer.subscribe(Arrays.asList(topicl));consumer.subscribe(Arrays.asList(topic2))

正则方式订阅主题

如果消费者采用的是正则表达式的方式(subscribe(Pattern))订阅, 在之后的过程中,如果有人又创建了新的主题,并且主题名字与正表达式相匹配,那么这个消费者就可以消费到新添加的主题中的消息。如果应用程序需要消费多个主题,并且可以处理不同的类型,那么这种订阅方式就很有效。

正则表达式的方式订阅的示例如下

consumer.subscribe(Pattern.compile ("topic.*" ));

利用正则表达式订阅主题,可实现动态订阅;

assign 订阅主题

消费者不仅可以通过 KafkaConsumer.subscribe() 方法订阅主题,还可直接订阅某些主题的指定分区;

在 KafkaConsumer 中提供了 assign() 方法来实现这些功能,此方法的具体定义如下:

public void assign(Collection<TopicPartition> partitions)

这个方法只接受参数 partitions,用来指定需要订阅的分区集合。示例如下:

consumer.assign(Arrays.asList(new TopicPartition ("tpc_1" , 0),new TopicPartition(“tpc_2”,1))) ;

subscribe 与 assign 的区别

通过 subscribe()方法订阅主题具有消费者自动再均衡功能 ;

在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。

assign() 方法订阅分区时,是不具备消费者自动均衡的功能的;

其实这一点从 assign()方法参数可以看出端倪,两种类型 subscribe()都有 ConsumerRebalanceListener类型参数的方法,而 assign()方法却没有。

取消订阅

既然有订阅,那么就有取消订阅;
可以使用 KafkaConsumer 中的 unsubscribe()方法采取消主题的订阅,这个方法既可以取消通过subscribe( Collection)方式实现的订阅;也可以取消通过 subscribe(Pattem)方式实现的订阅,还可以取消通过 assign( Collection)方式实现的订阅。示例码如下

consumer.unsubscribe();

如果将 subscribe(Collection )或 assign(Collection)集合参数设置为空集合,作用与 unsubscribe()方法相同,如下示例中三行代码的效果相同:

consumer.unsubscribe();consumer.subscribe(new ArrayList<String>()) ;consumer.assign(new ArrayList<TopicPartition>());

消息的消费模式

Kafka 中的消费是基于拉取模式的。

消息的消费一般有两种模式:推送模式和拉取模式。推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。

Kafka 中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用 poll( ) 方法, poll( )方法返回的是所订阅的主题(分区)上的一组消息。

对于 poll ( ) 方法而言,如果某些分区中没有可供消费的消息,那么此分区对应的消息拉取的结果就为空,如果订阅的所有分区中都没有可供消费的消息,那么 poll( )方法返回为空的消息集;

poll ( ) 方法具体定义如下:

public ConsumerRecords<K, V> poll(final Duration timeout)

超时时间参数 timeout ,用来控制 poll( ) 方法的阻塞时间,在消费者的缓冲区里没有可用数据时会发生阻塞。如果消费者程序只用来单纯拉取并消费数据,则为了提高吞吐率,可以把 timeout 设置为Long.MAX_VALUE;

消费者消费到的每条消息的类型为 ConsumerRecord

public class ConsumerRecord<K, V> {public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;public static final int NULL_SIZE = -1;public static final int NULL_CHECKSUM = -1;private final String topic;private final int partition;private final long offset;private final long timestamp;private final TimestampType timestampType;private final int serializedKeySize;private final int serializedValueSize;private final Headers headers;private final K key;private final V value;private volatile Long checksum;topic partition 这两个字段分别代表消息所属主题的名称和所在分区的编号。
offset 表示消息在所属分区的偏移量。
timestamp 表示时间戳,与此对应的 timestampType 表示时间戳的类型。
timestampType 有两种类型 CreateTimeLogAppendTime ,分别代表消息创建的时间戳和消息追加
到日志的时间戳。
headers 表示消息的头部内容。
key value 分别表示消息的键和消息的值,一般业务应用要读取的就是 value ;
serializedKeySize、serializedValueSize 分别表示 key、value 经过序列化之后的大小,如果 key 为空,
则 serializedKeySize 值为 -1,同样,如果 value 为空,则 serializedValueSize 的值也会为 -1;
checksum 是 CRC32 的校验值。

示例代码片段

/**
* 订阅与消费方式 2
*/
TopicPartition tp1 = new TopicPartition("x", 0);
TopicPartition tp2 = new TopicPartition("y", 0);
TopicPartition tp3 = new TopicPartition("z", 0);
List<TopicPartition> tps = Arrays.asList(tp1, tp2, tp3);
consumer.assign(tps);
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (TopicPartition tp : tps) {List<ConsumerRecord<String, String>> rList = records.records(tp);for (ConsumerRecord<String, String> r : rList) {r.topic();r.partition();r.offset();r.value();//do something to process record.}}
}

指定位移消费

有些时候,我们需要一种更细粒度的掌控,可以让我们从特定的位移处开始拉取消息,而KafkaConsumer 中的 seek() 方法正好提供了这个功能,让我们可以追前消费或回溯消费。
seek()方法的具体定义如下:

public void seek(TopicPartiton partition,long offset)

代码示例:

public class ConsumerDemo3 指定偏移量消费 {public static void main(String[] args) {Properties props = new Properties();props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"g002");props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"doit01:9092");props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");// 是否自动提交消费位移props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");// 限制一次 poll 拉取到的数据量的最大值props.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,"10240000");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// assign 方式订阅 doit27-1 的两个分区TopicPartition tp0 = new TopicPartition("doit27-1", 0);TopicPartition tp1 = new TopicPartition("doit27-1", 1);consumer.assign(Arrays.asList(tp0,tp1));// 指定分区 0,从 offset:800 开始消费 ; 分区 1,从 offset:650 开始消费consumer.seek(tp0,200);consumer.seek(tp1,250);// 开始拉取消息while(true){ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(3000));for (ConsumerRecord<String, String> rec : poll) {System.out.println(rec.partition()+","+rec.key()+","+rec.value()+","+rec.offset());}}}
}

自动提交消费者偏移量

Kafka 中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数 enable.auto.commit 配置,默认值为 true 。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数 auto.commit.interval.ms 配置,默认值为 5 秒,此参数生效的前提是 enable. auto.commit 参数为 true。

在默认的方式下,消费者每隔 5 秒会将拉取到的每个分区中最大的消息位移进行提交。自动位移提交的动作是在 poll() 方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。

Kafka 消费的编程逻辑中位移提交是一大难点,自动提交消费位移的方式非常简便,它免去了复杂的位移提交逻辑,让编码更简洁。但随之而来的是重复消费消息丢失的问题。

  • 重复消费

假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象(对于再均衡的情况同样适用)。我们可以通过减小位移提交的时间间隔来减小重复消息的窗口大小,但这样并不能避免重复消费的发送,而且也会使位移提交更加频繁。

  • 丢失消息

按照一般思维逻辑而言,自动提交是延时提交,重复消费可以理解,那么消息丢失又是在什么情形下会发生的呢?我们来看下图中的情形:

拉取线程不断地拉取消息并存入本地缓存,比如在 BlockingQueue 中,另一个处理线程从缓存中读取消息并进行相应的逻辑处理。设目前进行到了第 y+l 次拉取,以及第 m 次位移提交的时候,也就是x+6 之前的位移己经确认提交了,处理线程却还正在处理 x+3 的消息;此时如果处理线程发生了异常,待其恢复之后会从第 m 次位移提交处,也就是 x+6 的位置开始拉取消息,那么 x+3 至 x+6 之间的消息就没有得到相应的处理,这样便发生消息丢失的现象。

在这里插入图片描述

手动提交消费者偏移量(调用 kafka api)

自动位移提交的方式在正常情况下不会发生消息丢失或重复消费的现象,但是在编程的世界里异常无可避免;同时,自动位移提交也无法做到精确的位移管理。在 Kafka 中还提供了手动位移提交的方式,这样可以使得开发人员对消费位移的管理控制更加灵活。

很多时候并不是说拉取到消息就算消费完成,而是需要将消息写入数据库、写入本地缓存,或者是更加复杂的业务处理。在这些场景下,所有的业务处理完成才能认为消息被成功消费;

手动的提交方式可以让开发人员根据程序的逻辑在合适的时机进行位移提交。开启手动提交功能的前提是消费者客户端参数 enable.auto.commit 配置为 false ,示例如下:

props.put(ConsumerConf.ENABLE_AUTO_COMMIT_CONFIG, false);

手动提交可以细分为同步提交和异步提交,对应于 KafkaConsumer 中的 commitSync()和commitAsync()两种类型的方法。

  • 同步提交的方式

commitSync()方法的定义如下:

/**
* 手动提交 offset
*/
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> r : records) {//do something to process record.}consumer.commitSync();
}

对于采用 commitSync()的无参方法,它提交消费位移的频率和拉取批次消息、处理批次消息的频率是一样的,如果想寻求更细粒度的、更精准的提交,那么就需要使用 commitSync()的另一个有参方法,具体定义如下:

public void commitSync(final Map<TopicPartitionOffsetAndMetadata> offsets

示例代码如下:

while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> r : records) {long offset = r.offset();//do something to process record.TopicPartition topicPartition = new TopicPartition(r.topic(), r.partition());consumer.commitSync(Collections.singletonMap(topicPartition,new
OffsetAndMetadata(offset+1)));}
}

提交的偏移量 = 消费完的 record 的偏移量 + 1

因为,__consumer_offsets 中记录的消费偏移量,代表的是,消费者下一次要读取的位置!!!

  • 异步提交方式

异步提交的方式( commitAsync())在执行的时候消费者线程不会被阻塞;可能在提交消费位移的结果还未返回之前就开始了新一次的拉取。异步提交可以让消费者的性能得到一定的增强。commitAsync 方法有一个不同的重载方法,具体定义如下:

在这里插入图片描述

示例代码

/**
* 异步提交 offset
*/
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> r : records) {long offset = r.offset();//do something to process record.TopicPartition topicPartition = new TopicPartition(r.topic(), r.partition());consumer.commitSync(Collections.singletonMap(topicPartition,new
OffsetAndMetadata(offset+1)));consumer.commitAsync(Collections.singletonMap(topicPartition, new
OffsetAndMetadata(offset + 1)), new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {if(e == null ){System.out.println(map);}else{System.out.println("error commit offset");}}});}
}

手动提交位移(时机的选择)

  • 数据处理完成之前先提交偏移量

可能会发生漏处理的现象(数据丢失)

反过来说,这种方式实现了: at most once 的数据处理(传递)语义

  • 数据处理完成之后再提交偏移量

可能会发生重复处理的现象(数据重复)

反过来说,这种方式实现了: at least once 的数据处理(传递)语义

当然,数据处理(传递)的理想语义是: exactly once(精确一次)

Kafka 也能做到 exactly once(基于 kafka 的事务机制)

消费者提交偏移量方式的总结

consumer 的消费位移提交方式:

全自动

  • auto.offset.commit = true
  • 定时提交到 consumer_offsets

半自动

  • auto.offset.commit = false;
  • 然后手动触发提交 consumer.commitSync()
  • 提交到 consumer_offsets

全手动

  • auto.offset.commit = false;
  • 写自己的代码去把消费位移保存到你自己的地方 mysql/zk/redis
  • 提交到自己所涉及的存储;初始化时也需要自己去从自定义存储中查询到消费位移

相关文章:

探究Kafka原理-3.生产者消费者API原理解析

&#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是爱吃芝士的土豆倪&#xff0c;24届校招生Java选手&#xff0c;很高兴认识大家&#x1f4d5;系列专栏&#xff1a;Spring源码、JUC源码、Kafka原理&#x1f525;如果感觉博主的文章还不错的话&#xff0c;请&#x1f44…...

Linux系统iptables扩展

目录 一. iptables规则保存 1. 导出规则保存 2. 自动重载规则 ①. 当前用户生效 ②. 全局生效 二. 自定义链 1. 新建自定义链 2. 重命名自定义链 3. 添加自定义链规则 4. 调用自定义链规则 5. 删除自定义链 三. NAT 1. SNAT 2. DNAT 3. 实验 ①. 实验要求 ②. …...

Openwrt 系统安装 插件名称与中文释义

系统镜像 当时是去官网找对应的&#xff0c;但是作为门外汉&#xff0c;想简单&#xff0c;可以试试这个网站 插件 OpenWrt/Lede全部插件列表功能注释...

[原创]Delphi的SizeOf(), Length(), 动态数组, 静态数组的关系.

[简介] 常用网名: 猪头三 出生日期: 1981.XX.XXQQ: 643439947 个人网站: 80x86汇编小站 https://www.x86asm.org 编程生涯: 2001年~至今[共22年] 职业生涯: 20年 开发语言: C/C、80x86ASM、PHP、Perl、Objective-C、Object Pascal、C#、Python 开发工具: Visual Studio、Delphi…...

C++(20):bind_front

C(11)&#xff1a;bind_c11 bind_风静如云的博客-CSDN博客 提供了方法来绑定函数参数的方法。 C20提供了bind_front用于简化这个绑定。 #include <iostream> #include <functional> using namespace std;void func1(int d1, int d2) {cout<<__func__<&l…...

【spring】bean的后处理器

目录 一、作用二、常见的bean后处理器2.1 AutowiredAnnotationBeanPostProcessor2.1.1 说明2.1.2 代码示例2.1.3 截图示例 2.2 CommonAnnotationBeanPostProcessor2.2.1 说明2.2.2 代码示例2.2.3 截图示例 2.3 ConfigurationPropertiesBindingPostProcessor2.3.1 说明2.3.2 代码…...

Centos7安装docker、java、python环境

文章目录 前言一、docker的安装二、docker-compose的安装三、安装python3和配置pip3配置python软链接&#xff08;关键&#xff09; 四、Centos 7.6操作系统安装JAVA环境 前言 每次vps安装docker都要看网上的文章&#xff0c;而且都非常坑&#xff0c;方法千奇百怪&#xff0c…...

简单小结类与对象

/*** Description 简单小结类与对象*/ package com.oop;import com.oop.demo03.Pet;public class Application {public static void main(String[] args) {/*1.类与对象类是一个模版&#xff1a;抽象&#xff0c;对象是一个具体的实例2.方法定义、调用&#xff01;3.对象的引用…...

ABAP 如何获取内表行的索引值(index) ?

获取索引值 在ABAP中&#xff0c;如果需要获取一个内表中某条记录的索引&#xff08;index&#xff09;&#xff0c;可以使用 READ TABLE 语句。在 READ TABLE 语句后面的 WITH KEY 子句可以指定搜索条件&#xff0c;如果找到了匹配的记录&#xff0c;系统字段 SY-TABIX 将保存…...

ESP32-Web-Server编程- 使用表格(Table)实时显示设备信息

ESP32-Web-Server编程- 使用表格&#xff08;Table&#xff09;实时显示设备信息 概述 上节讲述了通过 Server-Sent Events&#xff08;以下简称 SSE&#xff09; 实现在网页实时更新 ESP32 Web 服务器的传感器数据。 本节书接上会&#xff0c;继续使用 SSE 机制在网页实时显…...

vue3 Hooks函数使用及常用utils封装

hooks 是什么 vue3使用了composition API&#xff0c;我们可自定义封装hooks&#xff0c;达到复用&#xff0c;在Vue2中采取的mixins&#xff0c;对mixins而言&#xff0c; hooks更清楚复用功能代码的来源, 更清晰易懂。 简单来说&#xff1a;hooks 就是函数的一种写法&#xf…...

matlab 无迹卡尔曼滤波

1、内容简介 略 26-可以交流、咨询、答疑 2、内容说明 无迹卡尔曼滤波 无迹卡尔曼滤波 无迹卡尔曼滤波 3、仿真分析 %该文件用于编写无迹卡尔曼滤波算法及其测试 %注解&#xff1a;主要子程序包括&#xff1a;轨迹发生器、系统方程 % 测量方程、UKF滤波器 %----…...

大脑--学习方法

1.大脑喜欢色彩。平时使用高质量的有色笔或使用有色纸&#xff0c;颜色能帮助记忆。 2.大脑集中精力最多只有25分钟。这是对成人而言&#xff0c;所以学习20到30分钟后就应该休息10分钟。你可以利用这段时间做点家务&#xff0c;10分钟后再回来继续学习&#xff0c;效果会更好…...

4.C转python

1.建立函数: def 函数名(形参): 函数体(记得写缩进) return 返回值(python中可以没有return) 2.调用函数: 函数名(实参) 实参和形参个数相等即可,类型不需要相同 其中接收返回值与C中的差不多 3.如果只是定义而不调用则函数不会执行 4.先定义函数,后调用 5.python中可以…...

YOLOv5项目实战(5)— 算法模型优化和服务器部署

前言:Hello大家好,我是小哥谈。近期,作者所负责项目中的算法模型检测存在很多误报情况,为了减少这种误报情况,作者一直在不断优化算法模型。鉴于此,本节课就给大家详细介绍一下实际工作场景中如何去优化算法模型和进行部署,另外为了方便大家进行模型训练,作者在文章中提…...

JavaScript类型判断:解密变量真实身份的神奇技巧

文章目录 1. typeof运算符2. instanceof运算符3. Object.prototype.toString4. Array.isArray5. 使用constructor属性6. 使用Symbol.toStringTag7. 使用is类型判断库8. 谨慎使用隐式类型转换结语 &#x1f389;JavaScript类型判断&#xff1a;解密变量真实身份的神奇技巧 ☆* o…...

MT6893_天玑 1200芯片规格参数介绍_datasheet规格书

天玑 1200(MT6893)是一款专为旗舰级全新5G芯片&#xff0c;它融合了先进的AI、相机和多媒体技术&#xff0c;为用户带来令人惊叹的体验。采用先进的6纳米制程设计&#xff0c;内置各种先进技术。该芯片采用旗舰级的八核CPU架构设计&#xff0c;支持16GB强大的四通道内存以及双通…...

【Android踩过的坑】13.Android Studio 运行成功,但APP没有安装上的问题

【Android踩过的坑】13.Android Studio 运行成功&#xff0c;但APP没有安装上的问题 解决办法&#xff1a; 在app的build.gradle文件下添加以下代码 android {...//android.useNewApkCreatorfalse 在高版本gradle下无效&#xff0c;添加以下代码解决冲突即可packagingOptions…...

redis安装配置

Windows 下 Redis 安装与配置 教程_redis windows-CSDN博客 启动Redis服务 打开cmd窗口&#xff0c;切换到Redis安装路径&#xff0c;输入 redis-server 启动 redis 服务...

企业数字化转型应对传统网络挑战的关键策略

数字化变革正在以前所未有的速度和规模改变着我们的生活和工作方式&#xff0c;使得传统网络架构面临着巨大的挑战。其中包括带宽需求增加、多云应用增加、安全威胁增加以及传统网络设备无法满足需求等问题。 数字化时代需要更高速、更可靠、更安全的网络支持&#xff0c;传统网…...

云计算——弹性云计算器(ECS)

弹性云服务器&#xff1a;ECS 概述 云计算重构了ICT系统&#xff0c;云计算平台厂商推出使得厂家能够主要关注应用管理而非平台管理的云平台&#xff0c;包含如下主要概念。 ECS&#xff08;Elastic Cloud Server&#xff09;&#xff1a;即弹性云服务器&#xff0c;是云计算…...

在HarmonyOS ArkTS ArkUI-X 5.0及以上版本中,手势开发全攻略:

在 HarmonyOS 应用开发中&#xff0c;手势交互是连接用户与设备的核心纽带。ArkTS 框架提供了丰富的手势处理能力&#xff0c;既支持点击、长按、拖拽等基础单一手势的精细控制&#xff0c;也能通过多种绑定策略解决父子组件的手势竞争问题。本文将结合官方开发文档&#xff0c…...

(二)TensorRT-LLM | 模型导出(v0.20.0rc3)

0. 概述 上一节 对安装和使用有个基本介绍。根据这个 issue 的描述&#xff0c;后续 TensorRT-LLM 团队可能更专注于更新和维护 pytorch backend。但 tensorrt backend 作为先前一直开发的工作&#xff0c;其中包含了大量可以学习的地方。本文主要看看它导出模型的部分&#x…...

【解密LSTM、GRU如何解决传统RNN梯度消失问题】

解密LSTM与GRU&#xff1a;如何让RNN变得更聪明&#xff1f; 在深度学习的世界里&#xff0c;循环神经网络&#xff08;RNN&#xff09;以其卓越的序列数据处理能力广泛应用于自然语言处理、时间序列预测等领域。然而&#xff0c;传统RNN存在的一个严重问题——梯度消失&#…...

Psychopy音频的使用

Psychopy音频的使用 本文主要解决以下问题&#xff1a; 指定音频引擎与设备&#xff1b;播放音频文件 本文所使用的环境&#xff1a; Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...

Reasoning over Uncertain Text by Generative Large Language Models

https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829 1. 概述 文本中的不确定性在许多语境中传达,从日常对话到特定领域的文档(例如医学文档)(Heritage 2013;Landmark、Gulbrandsen 和 Svenevei…...

A2A JS SDK 完整教程:快速入门指南

目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库&#xff…...

【JVM面试篇】高频八股汇总——类加载和类加载器

目录 1. 讲一下类加载过程&#xff1f; 2. Java创建对象的过程&#xff1f; 3. 对象的生命周期&#xff1f; 4. 类加载器有哪些&#xff1f; 5. 双亲委派模型的作用&#xff08;好处&#xff09;&#xff1f; 6. 讲一下类的加载和双亲委派原则&#xff1f; 7. 双亲委派模…...

五子棋测试用例

一.项目背景 1.1 项目简介 传统棋类文化的推广 五子棋是一种古老的棋类游戏&#xff0c;有着深厚的文化底蕴。通过将五子棋制作成网页游戏&#xff0c;可以让更多的人了解和接触到这一传统棋类文化。无论是国内还是国外的玩家&#xff0c;都可以通过网页五子棋感受到东方棋类…...

文件上传漏洞防御全攻略

要全面防范文件上传漏洞&#xff0c;需构建多层防御体系&#xff0c;结合技术验证、存储隔离与权限控制&#xff1a; &#x1f512; 一、基础防护层 前端校验&#xff08;仅辅助&#xff09; 通过JavaScript限制文件后缀名&#xff08;白名单&#xff09;和大小&#xff0c;提…...