当前位置: 首页 > news >正文

消息消费过程

前言

本文介绍下Kafka消费过程, 内容涉及消费与消费组, 主题与分区, 位移提交,分区再平衡和消费者拦截器等内容。

消费者与消费组

Kafka将消费者组织为消费组, 消息只会被投递给消费组中的1个消费者。因此, 从不同消费组中的消费者来看, Kafka是多播(Pub/Sub)模式。从同一个消费组中的消费者来看, Kafka是单播(P2P)模式。

开发流程

  1. 配置consumer参数并创建consumer实例;
  2. 订阅主题;
  3. 拉取消息并消费;
  4. 提交消费偏移量;
  5. 关闭consumer;
class ConsumerTest {public static void main(String[] args) {Properties props = new Properties();// bootstrap serverprops.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");// group.id, 如果当前consumer需要加入到某个group中, 否则自成一个groupprops.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "");// 自动创建topic, 开发中可能consumer端的小伙伴先开始, 等不及生产端。props.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "");// 自动提交offset设置, 样例中为手动提交// props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "");// 自动提交offset的时间间隔// props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "");// offset reset配置props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "");// key和value的deserializer配置props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "");props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);boolean running = true;while(running) {ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String> record : records) {// 消费消息}// 消费成功提交offsetconsumer.commitSync();}consumer.close();}
}

主题与分区

每个Topic中的消息由若干个分区存储, 每个分区存储了整个Topic下消息的一部分。在消息消费阶段, 同一个partition会被分配给消费组中的某一个consumer。因此partion的数量决定了一个consumer group中consumer的上限。

例如, Topic test 有 3 个partition, 对应的consumer group test-group中有4个consumer(consumer-1, consumer-2, consumer-3, consumer-4), 那么其中的某个consumer会处于空闲状态, 因为没有partition可以被分配, 进而也就无消息可消费。

反序列化

consumer作为消息的消费方, 必须使用与producer serializer相兼容的deserializer, 这样才能正确解析出对应的消息, 进而做消息消费。可以配置消息的key和message的deserialzer。Kafka内置了基本数据类型的Deserializer, IntegerDeserializer。

interface Deserializer {T deserialize(String topic, byte[] data);void close();
}

主题订阅

  1. 订阅通过subscribe方法完成。如果订阅方法反复调用, 仅最后一次的调用生效。
  2. 订阅多个特定主题, subscribe(collection);
  3. 订阅某种模式的主题, subscribe(pattern);
  4. 订阅某个主题的特定partition, assign(partition);
  5. 无论是哪种订阅方式, 一个consumer只能使用其中的一种, 都可以通过unsubscribe来取消订阅;

消息获取

  1. 消息消费的前提是topic中的消息投递给consumer。总体来说消息投递有2种模式, 推模式和拉模式。
  2. 推模式: client建立到server的长链接, 当server中有消息产生时, 第一时间通过该长链接推送到client;
  3. 拉模式: client主动发起消息请求, 从server端拉取消息;
  4. 从代码来看, Kafka是拉模式。由于consumer无法预知, topic中是否有新消息, 因此无效请求是存在的。Kafka设计者也注意到了这点, 提供了如下方法, 加入了一个等待窗口。如果窗口内有新消息到达, 则立刻返回; 如果始终无消息到达, 则超时后返回。平衡消息消费的及时性, 无效请求数量, 和server端实现复杂性。
kafkaConsumer.poll(timeout, timeunit)

内部涉及消费者位移, 消费者协调器, 组协调器, 消费者选择具, 分区分配的分发和再分配, 消费者心跳等内容。

位移提交

位移是消息在存储中的位置说明。通常来说, 消费者继续消费尚未消费的消息。消息存储和消费的逻辑模型如下:
请添加图片描述

  1. 消息是按照partition存储的;
  2. 消息写入partition时, offset单调递增;
  3. 从partition消费时, 每个消费者维护自己的offset; 消费中断后恢复时, 从上次保存的offset位置开始继续消费;

因此消息是否已经被消费由offset决定, offset及其之前的消息是已消费的消息, offset之后是待消费消息。因此, 消费者完成某个分区的消费之后, 需要提交该offset给Kafka Server。提交方式有两种自动提交和手动提交;

提交方式说明优缺点
自动提交(默认方式) Kafka Client周期性地提交偏移量优点是简单, 确定是重复消费和丢失风险
手动提交由用户主动提交偏移量优点是可细粒度管理, 缺点是相对复杂

自动提交

自动提交是按照时间间隔提交, 如果在消息拉取和位移提交之间client崩溃, 对下一次消费的影响分三种场景讨论(如下图所示)。
在这里插入图片描述

  1. consumer thread中poll和消费是串行的, 但consumer thread和commit thread是并行的;
  2. 在poll和crash之间发生commit, 那么当client恢复后从x+7开始拉取消息, [x+3, x+6] 的消息丢失;
  3. 在crash之后发生commit, 那么当client恢复后从x+7开始拉取消息, [x+3, x+6] 的消息丢失;
  4. 全程没有发生commit, 那么当client恢复后从x+1开始拉取消息, [x+1, x+3]的消息重复消费;

手动提交

也有两种模式, 同步提交和异步提交。前者在得到server确认之前所在线程会阻塞, 后者线程继续运行。是需要结合场景来选择。
| 提交方式 | 说明 | 优缺点 |
| 同步提交 | 针对当前拉取的一批消息, 统一提交 | 简单, 无法做细粒度控制 |
| 异步提交 | 基于回调通知结果 | 可以按分区提交, 指定offset参数提交 |

指定offset

消费消息需要从某个offset开始, 如果是首次消费又该从哪个位置开始呢?

  1. 由参数auto.offset.reset设定默认行为
    | 参数值 | 行为 |
    |----|----|
    | earliest | 从分区第一条消息的offset开始 |
    | latest | 从上次保存的offset开始, 首次消费时和earliest行为一致 |
    | none | 程序逻辑自定义, 如果未设置则抛出异常 |

  2. 程序通过seek方法指定offset位置, 如果指定offset越界也会触发auto.offset.reset行为; 由于offset是partition级别的概念, 因此seek的使用是面向partition, 这就意味着对同一个topic的多个partition来说, 可以seek不同的offset。此外seek方法也支持基于timestamp定位消息。站在更高的视角来看, seek提供了parttion级别的消息搜索能力。

  3. 由于seek的存在, 我们可以把offset存储在DB或者其他Kafka之外的地方, 并基于seek进行恢复。

再平衡

再平衡是把分区所有权从1个消费者转移到另一个消费者的行为, 它保障消费组的可用性和伸缩性。从可用性而言, 消费故障可以恢复。就伸缩性而言, 消费组内的消费者可以扩缩容。再平衡期间, 所有的消费者暂停消费, 直到再平衡结束。由于再平衡期间, 消费者的消费状态会丢失, 再平衡之后每个partition的offset以Kafka已持久保存的offset为准, 因此可能存在重复消费情况。

Kafka提供ConsumerRebalanceListener接口, 使得该过程可以被Consumer感知, 至于怎么处理则是应用需要解决的问题, Kafka也只能帮我们到这里。

消费者拦截器

Kafka提供了ConsumerInterceptor接口, 允许我们在poll方法返回前和commit方法调用后触发, 允许我们做一些定制化的工作, 比如消息过滤, 日志输出, 消息追踪等操作。从网络应用开发的角度来说, 这种是一种常见的实现方式, 比如Tomcat中的Filter。

拦截器通过interceptor.classes配置生效, 多个拦截器可以组合成为"拦截器Pipeline"。如果其中一个拦截器异常, 后续的拦截器从最近一次成功的拦截器继续执行, 因此需要提防副作用。

相关文章:

消息消费过程

前言 本文介绍下Kafka消费过程, 内容涉及消费与消费组, 主题与分区, 位移提交&#xff0c;分区再平衡和消费者拦截器等内容。 消费者与消费组 Kafka将消费者组织为消费组, 消息只会被投递给消费组中的1个消费者。因此, 从不同消费组中的消费者来看, Kafka是多播(Pub/Sub)模式…...

使用Lychee搭建个人图片存储系统并进行远程访问设置实现公网访问本地私人图床

文章目录 1.前言2. Lychee网站搭建2.1. Lychee下载和安装2.2 Lychee网页测试2.3 cpolar的安装和注册 3.本地网页发布3.1 Cpolar云端设置3.2 Cpolar本地设置 4.公网访问测试5.结语 1.前言 图床作为图片集中存放的服务网站&#xff0c;可以看做是云存储的一部分&#xff0c;既可…...

12-2- DCGAN -简单网络-卷积网络

功能 随机噪声→生成器→MINIST图像。 训练方法 0 损失函数:gan的优化目标是一个对抗损失,是二分类问题,用BCELoss 1 判别器的训练,首先固定生成器参数不变,其次判别器应当将真实图像判别为1,生成图像判别为0 loss=loss(real_out, 1)+loss(fake_out, 0) 2 生成器的…...

Redis持久化策略之RDB与AOF

文章目录 1.RDB1)基本介绍2)自动触发3)手动触发4)RDB文件5)优点缺点 2.AOF1)基本介绍2)使用方式3)工作流程4)重写机制5)AOF文件6)优点缺点 3.RDB AOF 我们都知道&#xff0c;redis 是一个基于内存的数据库。基于内存的好处是访问速度快&#xff0c;缺点是“不持久”——当数据…...

Python学习笔记--初识 Python 正则表达式

初识 Python 正则表达式 正则表达式是一个特殊的字符序列,用于判断一个字符串是否与我们所设定的字符序列是否匹配,也就是说检查一个字符串是否与某种模式匹配。 Python 自 1.5 版本起增加了re 模块,它提供 Perl 风格的正则表达式模式。re 模块使 Python 语言拥有全部的正…...

webAPP基础学习

###视觉基础 part-I ####1.面试中常见的像素问题 >什么是像素? *1.什么是px? px-虚拟像素,css像素的单位 px是一个相对单位,相对于设备像素而言 >相对性 a.相对于同一个设备,css像素的可变的 css像素物理像素>会受到缩放的影响 css像素缩放倍数*单个物理像…...

RIP路由信息协议

RIP路由信息协议(Routing Information Protocol) 最先得到广泛应用的协议&#xff0c;最大优点是简单要求网络中的每个路由器都要维护一张表&#xff0c;表中记录了从它自己到其他每一个目的网络的距离RIP是应用层协议&#xff0c;它在传输层使用UDP&#xff0c;RIP报文作为UD…...

kubernetes 高可用集群

目录 一、haproxy负载均衡 二、pacemaker高可用 三、部署control-plane 四、部署worker node 实验环境 主机名 IP 角色 docker 192.168.67.10 harbor k8s1 192.168.67.11 control-plane k8s2 192.168.67.12 control-plane k8s3 192.168.67.13 control-plane k8s…...

java实现插入排序

图解 以下是Java实现插入排序的代码&#xff1a; public class InsertionSort {public static void main(String[] args) {int[] arr {5, 2, 4, 6, 1, 3};insertionSort(arr);System.out.println(Arrays.toString(arr)); // output: [1, 2, 3, 4, 5, 6]}public static void i…...

深度学习之基于YoloV5血红细胞检测识别系统

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 深度学习已经在许多领域中得到了广泛的应用&#xff0c;包括医疗健康领域。其中&#xff0c;YOLO&#xff08;You O…...

8、可视化高斯滤波器并完成高斯滤波

本节一起绘制一个可视化的高斯滤波器,同时对一个彩色图像增加高斯噪声,最后通过一个高斯滤波器对图像进行降噪处理。 就像上节说的那样,滤波不是学习重点,下面通过实操了解下原理即可。 可视化高斯滤波器 高斯滤波器符合高斯分布,并且是二维高斯分布,这一点在上一节高斯…...

Linux MMC子系统 - 5.eMMC 5.1工作模式-引导模式

By: Ailson Jack Date: 2023.11.19 个人博客&#xff1a;http://www.only2fire.com/ 本文在我博客的地址是&#xff1a;http://www.only2fire.com/archives/164.html&#xff0c;排版更好&#xff0c;便于学习&#xff0c;也可以去我博客逛逛&#xff0c;兴许有你想要的内容呢。…...

2342. 数位和相等数对的最大和 --力扣 --JAVA

题目 给你一个下标从 0 开始的数组 nums &#xff0c;数组中的元素都是 正 整数。请你选出两个下标 i 和 j&#xff08;i ! j&#xff09;&#xff0c;且 nums[i] 的数位和 与 nums[j] 的数位和相等。 请你找出所有满足条件的下标 i 和 j &#xff0c;找出并返回 nums[i] num…...

linux如何重置root密码

目录 当我们想要重置root管理员密码时&#xff0c;我们可以有两种方法进行&#xff1a; 方法一、init方法 1、重启系统&#xff0c;在下图所示界面按e键 2、随后进入以下界面&#xff0c;、将ro修改为rw&#xff0c;在行末尾添加init/bin/sh。​编辑 3、随后按Ctrlx启动到s…...

Java 类之 java.util.Properties

Java 类之 java.util.Properties 文章目录 Java 类之 java.util.Properties一、简介二、主要功能1、存储键值对2、读取文件与属性代码示例运行结果截图 3、设置属性并保存文件代码示例结果截图 4、遍历属性代码示例运行结果 关联博客&#xff1a;《基于 Java 列举和说明常用的外…...

我遇到的bug(活动)

目录 方向一&#xff1a;身为程序员遇到过的奔溃瞬间 方向二&#xff1a;如何解决遇到的奔溃瞬间 方向三&#xff1a;在解决完后获得的收获和体会 方向一&#xff1a;身为程序员遇到过的奔溃瞬间 在一个项目中&#xff0c;我负责实现一个复杂的图像处理算法。经过几天的努力…...

MIB 6.1810实验Xv6 and Unix utilities(3)pingpong

Mit6.S081-实验1-Xv6 and Unix utilities-pingpong问题_Isana_Yashiro的博客-CSDN博客 Write a user-level program that uses xv6 system calls to ping-pong a byte between two processes over a pair of pipes, one for each direction. The parent should send a byte to…...

压力测试总共需要几个步骤?思路总结篇

在运维工作中&#xff0c;压力测试是一项很重要的工作。比如在一个网站上线之前&#xff0c;能承受多大访问量、在大访问量情况下性能怎样&#xff0c;这些数据指标好坏将会直接影响用户体验。今天我们就来深入了解下压力测试&#xff01; 1、首先&#xff0c;什么是压力测试&…...

03_面向对象高级_多态

多态 1. 什么是多态&#xff1f; “多态” 是在 “继承” 的基础上实现的一种现象&#xff0c;具体表现为&#xff1a;对象多态、行为多态。 public class HelloWorld {public static void main(String[] args) {// 1. 对象多态Human h1 new Student();Human h2 new Teach…...

【Kingbase FlySync】界面化管控平台:2.配置数据库同步之KES>KES

【Kingbase FlySync】界面化管控平台:3.配置数据库同步之KES->KES 部署KES数据库到KES数据库同步服务1.登录KFS管理平台2.开始配置数据节点信息(1)配置node1数据节点(2)配置node2数据节点 3.KFS拓扑图配置4.开始部署5.启动同步程序并查验是否运行正常 测试同步1.从node1数据…...

Linux 文件类型,目录与路径,文件与目录管理

文件类型 后面的字符表示文件类型标志 普通文件&#xff1a;-&#xff08;纯文本文件&#xff0c;二进制文件&#xff0c;数据格式文件&#xff09; 如文本文件、图片、程序文件等。 目录文件&#xff1a;d&#xff08;directory&#xff09; 用来存放其他文件或子目录。 设备…...

iOS 26 携众系统重磅更新,但“苹果智能”仍与国行无缘

美国西海岸的夏天&#xff0c;再次被苹果点燃。一年一度的全球开发者大会 WWDC25 如期而至&#xff0c;这不仅是开发者的盛宴&#xff0c;更是全球数亿苹果用户翘首以盼的科技春晚。今年&#xff0c;苹果依旧为我们带来了全家桶式的系统更新&#xff0c;包括 iOS 26、iPadOS 26…...

stm32G473的flash模式是单bank还是双bank?

今天突然有人stm32G473的flash模式是单bank还是双bank&#xff1f;由于时间太久&#xff0c;我真忘记了。搜搜发现&#xff0c;还真有人和我一样。见下面的链接&#xff1a;https://shequ.stmicroelectronics.cn/forum.php?modviewthread&tid644563 根据STM32G4系列参考手…...

3.3.1_1 检错编码(奇偶校验码)

从这节课开始&#xff0c;我们会探讨数据链路层的差错控制功能&#xff0c;差错控制功能的主要目标是要发现并且解决一个帧内部的位错误&#xff0c;我们需要使用特殊的编码技术去发现帧内部的位错误&#xff0c;当我们发现位错误之后&#xff0c;通常来说有两种解决方案。第一…...

【解密LSTM、GRU如何解决传统RNN梯度消失问题】

解密LSTM与GRU&#xff1a;如何让RNN变得更聪明&#xff1f; 在深度学习的世界里&#xff0c;循环神经网络&#xff08;RNN&#xff09;以其卓越的序列数据处理能力广泛应用于自然语言处理、时间序列预测等领域。然而&#xff0c;传统RNN存在的一个严重问题——梯度消失&#…...

DIY|Mac 搭建 ESP-IDF 开发环境及编译小智 AI

前一阵子在百度 AI 开发者大会上&#xff0c;看到基于小智 AI DIY 玩具的演示&#xff0c;感觉有点意思&#xff0c;想着自己也来试试。 如果只是想烧录现成的固件&#xff0c;乐鑫官方除了提供了 Windows 版本的 Flash 下载工具 之外&#xff0c;还提供了基于网页版的 ESP LA…...

【服务器压力测试】本地PC电脑作为服务器运行时出现卡顿和资源紧张(Windows/Linux)

要让本地PC电脑作为服务器运行时出现卡顿和资源紧张的情况&#xff0c;可以通过以下几种方式模拟或触发&#xff1a; 1. 增加CPU负载 运行大量计算密集型任务&#xff0c;例如&#xff1a; 使用多线程循环执行复杂计算&#xff08;如数学运算、加密解密等&#xff09;。运行图…...

Device Mapper 机制

Device Mapper 机制详解 Device Mapper&#xff08;简称 DM&#xff09;是 Linux 内核中的一套通用块设备映射框架&#xff0c;为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程&#xff0c;并配以详细的…...

Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决

Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决 问题背景 在一个基于 Spring Cloud Gateway WebFlux 构建的微服务项目中&#xff0c;新增了一个本地验证码接口 /code&#xff0c;使用函数式路由&#xff08;RouterFunction&#xff09;和 Hutool 的 Circle…...

基于 TAPD 进行项目管理

起因 自己写了个小工具&#xff0c;仓库用的Github。之前在用markdown进行需求管理&#xff0c;现在随着功能的增加&#xff0c;感觉有点难以管理了&#xff0c;所以用TAPD这个工具进行需求、Bug管理。 操作流程 注册 TAPD&#xff0c;需要提供一个企业名新建一个项目&#…...