消息消费过程
前言
本文介绍下Kafka消费过程, 内容涉及消费与消费组, 主题与分区, 位移提交,分区再平衡和消费者拦截器等内容。
消费者与消费组
Kafka将消费者组织为消费组, 消息只会被投递给消费组中的1个消费者。因此, 从不同消费组中的消费者来看, Kafka是多播(Pub/Sub)模式。从同一个消费组中的消费者来看, Kafka是单播(P2P)模式。
开发流程
- 配置consumer参数并创建consumer实例;
- 订阅主题;
- 拉取消息并消费;
- 提交消费偏移量;
- 关闭consumer;
class ConsumerTest {public static void main(String[] args) {Properties props = new Properties();// bootstrap serverprops.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");// group.id, 如果当前consumer需要加入到某个group中, 否则自成一个groupprops.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "");// 自动创建topic, 开发中可能consumer端的小伙伴先开始, 等不及生产端。props.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "");// 自动提交offset设置, 样例中为手动提交// props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "");// 自动提交offset的时间间隔// props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "");// offset reset配置props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "");// key和value的deserializer配置props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "");props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);boolean running = true;while(running) {ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String> record : records) {// 消费消息}// 消费成功提交offsetconsumer.commitSync();}consumer.close();}
}
主题与分区
每个Topic中的消息由若干个分区存储, 每个分区存储了整个Topic下消息的一部分。在消息消费阶段, 同一个partition会被分配给消费组中的某一个consumer。因此partion的数量决定了一个consumer group中consumer的上限。
例如, Topic test 有 3 个partition, 对应的consumer group test-group中有4个consumer(consumer-1, consumer-2, consumer-3, consumer-4), 那么其中的某个consumer会处于空闲状态, 因为没有partition可以被分配, 进而也就无消息可消费。
反序列化
consumer作为消息的消费方, 必须使用与producer serializer相兼容的deserializer, 这样才能正确解析出对应的消息, 进而做消息消费。可以配置消息的key和message的deserialzer。Kafka内置了基本数据类型的Deserializer, IntegerDeserializer。
interface Deserializer {T deserialize(String topic, byte[] data);void close();
}
主题订阅
- 订阅通过subscribe方法完成。如果订阅方法反复调用, 仅最后一次的调用生效。
- 订阅多个特定主题, subscribe(collection);
- 订阅某种模式的主题, subscribe(pattern);
- 订阅某个主题的特定partition, assign(partition);
- 无论是哪种订阅方式, 一个consumer只能使用其中的一种, 都可以通过unsubscribe来取消订阅;
消息获取
- 消息消费的前提是topic中的消息投递给consumer。总体来说消息投递有2种模式, 推模式和拉模式。
- 推模式: client建立到server的长链接, 当server中有消息产生时, 第一时间通过该长链接推送到client;
- 拉模式: client主动发起消息请求, 从server端拉取消息;
- 从代码来看, Kafka是拉模式。由于consumer无法预知, topic中是否有新消息, 因此无效请求是存在的。Kafka设计者也注意到了这点, 提供了如下方法, 加入了一个等待窗口。如果窗口内有新消息到达, 则立刻返回; 如果始终无消息到达, 则超时后返回。平衡消息消费的及时性, 无效请求数量, 和server端实现复杂性。
kafkaConsumer.poll(timeout, timeunit)
内部涉及消费者位移, 消费者协调器, 组协调器, 消费者选择具, 分区分配的分发和再分配, 消费者心跳等内容。
位移提交
位移是消息在存储中的位置说明。通常来说, 消费者继续消费尚未消费的消息。消息存储和消费的逻辑模型如下:

- 消息是按照partition存储的;
- 消息写入partition时, offset单调递增;
- 从partition消费时, 每个消费者维护自己的offset; 消费中断后恢复时, 从上次保存的offset位置开始继续消费;
因此消息是否已经被消费由offset决定, offset及其之前的消息是已消费的消息, offset之后是待消费消息。因此, 消费者完成某个分区的消费之后, 需要提交该offset给Kafka Server。提交方式有两种自动提交和手动提交;
| 提交方式 | 说明 | 优缺点 |
|---|---|---|
| 自动提交 | (默认方式) Kafka Client周期性地提交偏移量 | 优点是简单, 确定是重复消费和丢失风险 |
| 手动提交 | 由用户主动提交偏移量 | 优点是可细粒度管理, 缺点是相对复杂 |
自动提交
自动提交是按照时间间隔提交, 如果在消息拉取和位移提交之间client崩溃, 对下一次消费的影响分三种场景讨论(如下图所示)。

- consumer thread中poll和消费是串行的, 但consumer thread和commit thread是并行的;
- 在poll和crash之间发生commit, 那么当client恢复后从x+7开始拉取消息, [x+3, x+6] 的消息丢失;
- 在crash之后发生commit, 那么当client恢复后从x+7开始拉取消息, [x+3, x+6] 的消息丢失;
- 全程没有发生commit, 那么当client恢复后从x+1开始拉取消息, [x+1, x+3]的消息重复消费;
手动提交
也有两种模式, 同步提交和异步提交。前者在得到server确认之前所在线程会阻塞, 后者线程继续运行。是需要结合场景来选择。
| 提交方式 | 说明 | 优缺点 |
| 同步提交 | 针对当前拉取的一批消息, 统一提交 | 简单, 无法做细粒度控制 |
| 异步提交 | 基于回调通知结果 | 可以按分区提交, 指定offset参数提交 |
指定offset
消费消息需要从某个offset开始, 如果是首次消费又该从哪个位置开始呢?
-
由参数auto.offset.reset设定默认行为
| 参数值 | 行为 |
|----|----|
| earliest | 从分区第一条消息的offset开始 |
| latest | 从上次保存的offset开始, 首次消费时和earliest行为一致 |
| none | 程序逻辑自定义, 如果未设置则抛出异常 | -
程序通过seek方法指定offset位置, 如果指定offset越界也会触发auto.offset.reset行为; 由于offset是partition级别的概念, 因此seek的使用是面向partition, 这就意味着对同一个topic的多个partition来说, 可以seek不同的offset。此外seek方法也支持基于timestamp定位消息。站在更高的视角来看, seek提供了parttion级别的消息搜索能力。
-
由于seek的存在, 我们可以把offset存储在DB或者其他Kafka之外的地方, 并基于seek进行恢复。
再平衡
再平衡是把分区所有权从1个消费者转移到另一个消费者的行为, 它保障消费组的可用性和伸缩性。从可用性而言, 消费故障可以恢复。就伸缩性而言, 消费组内的消费者可以扩缩容。再平衡期间, 所有的消费者暂停消费, 直到再平衡结束。由于再平衡期间, 消费者的消费状态会丢失, 再平衡之后每个partition的offset以Kafka已持久保存的offset为准, 因此可能存在重复消费情况。
Kafka提供ConsumerRebalanceListener接口, 使得该过程可以被Consumer感知, 至于怎么处理则是应用需要解决的问题, Kafka也只能帮我们到这里。
消费者拦截器
Kafka提供了ConsumerInterceptor接口, 允许我们在poll方法返回前和commit方法调用后触发, 允许我们做一些定制化的工作, 比如消息过滤, 日志输出, 消息追踪等操作。从网络应用开发的角度来说, 这种是一种常见的实现方式, 比如Tomcat中的Filter。
拦截器通过interceptor.classes配置生效, 多个拦截器可以组合成为"拦截器Pipeline"。如果其中一个拦截器异常, 后续的拦截器从最近一次成功的拦截器继续执行, 因此需要提防副作用。
相关文章:
消息消费过程
前言 本文介绍下Kafka消费过程, 内容涉及消费与消费组, 主题与分区, 位移提交,分区再平衡和消费者拦截器等内容。 消费者与消费组 Kafka将消费者组织为消费组, 消息只会被投递给消费组中的1个消费者。因此, 从不同消费组中的消费者来看, Kafka是多播(Pub/Sub)模式…...
使用Lychee搭建个人图片存储系统并进行远程访问设置实现公网访问本地私人图床
文章目录 1.前言2. Lychee网站搭建2.1. Lychee下载和安装2.2 Lychee网页测试2.3 cpolar的安装和注册 3.本地网页发布3.1 Cpolar云端设置3.2 Cpolar本地设置 4.公网访问测试5.结语 1.前言 图床作为图片集中存放的服务网站,可以看做是云存储的一部分,既可…...
12-2- DCGAN -简单网络-卷积网络
功能 随机噪声→生成器→MINIST图像。 训练方法 0 损失函数:gan的优化目标是一个对抗损失,是二分类问题,用BCELoss 1 判别器的训练,首先固定生成器参数不变,其次判别器应当将真实图像判别为1,生成图像判别为0 loss=loss(real_out, 1)+loss(fake_out, 0) 2 生成器的…...
Redis持久化策略之RDB与AOF
文章目录 1.RDB1)基本介绍2)自动触发3)手动触发4)RDB文件5)优点缺点 2.AOF1)基本介绍2)使用方式3)工作流程4)重写机制5)AOF文件6)优点缺点 3.RDB AOF 我们都知道,redis 是一个基于内存的数据库。基于内存的好处是访问速度快,缺点是“不持久”——当数据…...
Python学习笔记--初识 Python 正则表达式
初识 Python 正则表达式 正则表达式是一个特殊的字符序列,用于判断一个字符串是否与我们所设定的字符序列是否匹配,也就是说检查一个字符串是否与某种模式匹配。 Python 自 1.5 版本起增加了re 模块,它提供 Perl 风格的正则表达式模式。re 模块使 Python 语言拥有全部的正…...
webAPP基础学习
###视觉基础 part-I ####1.面试中常见的像素问题 >什么是像素? *1.什么是px? px-虚拟像素,css像素的单位 px是一个相对单位,相对于设备像素而言 >相对性 a.相对于同一个设备,css像素的可变的 css像素物理像素>会受到缩放的影响 css像素缩放倍数*单个物理像…...
RIP路由信息协议
RIP路由信息协议(Routing Information Protocol) 最先得到广泛应用的协议,最大优点是简单要求网络中的每个路由器都要维护一张表,表中记录了从它自己到其他每一个目的网络的距离RIP是应用层协议,它在传输层使用UDP,RIP报文作为UD…...
kubernetes 高可用集群
目录 一、haproxy负载均衡 二、pacemaker高可用 三、部署control-plane 四、部署worker node 实验环境 主机名 IP 角色 docker 192.168.67.10 harbor k8s1 192.168.67.11 control-plane k8s2 192.168.67.12 control-plane k8s3 192.168.67.13 control-plane k8s…...
java实现插入排序
图解 以下是Java实现插入排序的代码: public class InsertionSort {public static void main(String[] args) {int[] arr {5, 2, 4, 6, 1, 3};insertionSort(arr);System.out.println(Arrays.toString(arr)); // output: [1, 2, 3, 4, 5, 6]}public static void i…...
深度学习之基于YoloV5血红细胞检测识别系统
欢迎大家点赞、收藏、关注、评论啦 ,由于篇幅有限,只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 深度学习已经在许多领域中得到了广泛的应用,包括医疗健康领域。其中,YOLO(You O…...
8、可视化高斯滤波器并完成高斯滤波
本节一起绘制一个可视化的高斯滤波器,同时对一个彩色图像增加高斯噪声,最后通过一个高斯滤波器对图像进行降噪处理。 就像上节说的那样,滤波不是学习重点,下面通过实操了解下原理即可。 可视化高斯滤波器 高斯滤波器符合高斯分布,并且是二维高斯分布,这一点在上一节高斯…...
Linux MMC子系统 - 5.eMMC 5.1工作模式-引导模式
By: Ailson Jack Date: 2023.11.19 个人博客:http://www.only2fire.com/ 本文在我博客的地址是:http://www.only2fire.com/archives/164.html,排版更好,便于学习,也可以去我博客逛逛,兴许有你想要的内容呢。…...
2342. 数位和相等数对的最大和 --力扣 --JAVA
题目 给你一个下标从 0 开始的数组 nums ,数组中的元素都是 正 整数。请你选出两个下标 i 和 j(i ! j),且 nums[i] 的数位和 与 nums[j] 的数位和相等。 请你找出所有满足条件的下标 i 和 j ,找出并返回 nums[i] num…...
linux如何重置root密码
目录 当我们想要重置root管理员密码时,我们可以有两种方法进行: 方法一、init方法 1、重启系统,在下图所示界面按e键 2、随后进入以下界面,、将ro修改为rw,在行末尾添加init/bin/sh。编辑 3、随后按Ctrlx启动到s…...
Java 类之 java.util.Properties
Java 类之 java.util.Properties 文章目录 Java 类之 java.util.Properties一、简介二、主要功能1、存储键值对2、读取文件与属性代码示例运行结果截图 3、设置属性并保存文件代码示例结果截图 4、遍历属性代码示例运行结果 关联博客:《基于 Java 列举和说明常用的外…...
我遇到的bug(活动)
目录 方向一:身为程序员遇到过的奔溃瞬间 方向二:如何解决遇到的奔溃瞬间 方向三:在解决完后获得的收获和体会 方向一:身为程序员遇到过的奔溃瞬间 在一个项目中,我负责实现一个复杂的图像处理算法。经过几天的努力…...
MIB 6.1810实验Xv6 and Unix utilities(3)pingpong
Mit6.S081-实验1-Xv6 and Unix utilities-pingpong问题_Isana_Yashiro的博客-CSDN博客 Write a user-level program that uses xv6 system calls to ping-pong a byte between two processes over a pair of pipes, one for each direction. The parent should send a byte to…...
压力测试总共需要几个步骤?思路总结篇
在运维工作中,压力测试是一项很重要的工作。比如在一个网站上线之前,能承受多大访问量、在大访问量情况下性能怎样,这些数据指标好坏将会直接影响用户体验。今天我们就来深入了解下压力测试! 1、首先,什么是压力测试&…...
03_面向对象高级_多态
多态 1. 什么是多态? “多态” 是在 “继承” 的基础上实现的一种现象,具体表现为:对象多态、行为多态。 public class HelloWorld {public static void main(String[] args) {// 1. 对象多态Human h1 new Student();Human h2 new Teach…...
【Kingbase FlySync】界面化管控平台:2.配置数据库同步之KES>KES
【Kingbase FlySync】界面化管控平台:3.配置数据库同步之KES->KES 部署KES数据库到KES数据库同步服务1.登录KFS管理平台2.开始配置数据节点信息(1)配置node1数据节点(2)配置node2数据节点 3.KFS拓扑图配置4.开始部署5.启动同步程序并查验是否运行正常 测试同步1.从node1数据…...
Java面试专项一-准备篇
一、企业简历筛选规则 一般企业的简历筛选流程:首先由HR先筛选一部分简历后,在将简历给到对应的项目负责人后再进行下一步的操作。 HR如何筛选简历 例如:Boss直聘(招聘方平台) 直接按照条件进行筛选 例如:…...
云原生玩法三问:构建自定义开发环境
云原生玩法三问:构建自定义开发环境 引言 临时运维一个古董项目,无文档,无环境,无交接人,俗称三无。 运行设备的环境老,本地环境版本高,ssh不过去。正好最近对 腾讯出品的云原生 cnb 感兴趣&…...
Fabric V2.5 通用溯源系统——增加图片上传与下载功能
fabric-trace项目在发布一年后,部署量已突破1000次,为支持更多场景,现新增支持图片信息上链,本文对图片上传、下载功能代码进行梳理,包含智能合约、后端、前端部分。 一、智能合约修改 为了增加图片信息上链溯源,需要对底层数据结构进行修改,在此对智能合约中的农产品数…...
Go 并发编程基础:通道(Channel)的使用
在 Go 中,Channel 是 Goroutine 之间通信的核心机制。它提供了一个线程安全的通信方式,用于在多个 Goroutine 之间传递数据,从而实现高效的并发编程。 本章将介绍 Channel 的基本概念、用法、缓冲、关闭机制以及 select 的使用。 一、Channel…...
解析奥地利 XARION激光超声检测系统:无膜光学麦克风 + 无耦合剂的技术协同优势及多元应用
在工业制造领域,无损检测(NDT)的精度与效率直接影响产品质量与生产安全。奥地利 XARION开发的激光超声精密检测系统,以非接触式光学麦克风技术为核心,打破传统检测瓶颈,为半导体、航空航天、汽车制造等行业提供了高灵敏…...
macOS 终端智能代理检测
🧠 终端智能代理检测:自动判断是否需要设置代理访问 GitHub 在开发中,使用 GitHub 是非常常见的需求。但有时候我们会发现某些命令失败、插件无法更新,例如: fatal: unable to access https://github.com/ohmyzsh/oh…...
如何在Windows本机安装Python并确保与Python.NET兼容
✅作者简介:2022年博客新星 第八。热爱国学的Java后端开发者,修心和技术同步精进。 🍎个人主页:Java Fans的博客 🍊个人信条:不迁怒,不贰过。小知识,大智慧。 💞当前专栏…...
ThreadLocal 源码
ThreadLocal 源码 此类提供线程局部变量。这些变量不同于它们的普通对应物,因为每个访问一个线程局部变量的线程(通过其 get 或 set 方法)都有自己独立初始化的变量副本。ThreadLocal 实例通常是类中的私有静态字段,这些类希望将…...
基于小程序老人监护管理系统源码数据库文档
摘 要 近年来,随着我国人口老龄化问题日益严重,独居和居住养老机构的的老年人数量越来越多。而随着老年人数量的逐步增长,随之而来的是日益突出的老年人问题,尤其是老年人的健康问题,尤其是老年人产生健康问题后&…...
HTML版英语学习系统
HTML版英语学习系统 这是一个完全免费、无需安装、功能完整的英语学习工具,使用HTML CSS JavaScript实现。 功能 文本朗读练习 - 输入英文文章,系统朗读帮助练习听力和发音,适合跟读练习,模仿学习;实时词典查询 - 双…...
