消息消费过程
前言
本文介绍下Kafka消费过程, 内容涉及消费与消费组, 主题与分区, 位移提交,分区再平衡和消费者拦截器等内容。
消费者与消费组
Kafka将消费者组织为消费组, 消息只会被投递给消费组中的1个消费者。因此, 从不同消费组中的消费者来看, Kafka是多播(Pub/Sub)模式。从同一个消费组中的消费者来看, Kafka是单播(P2P)模式。
开发流程
- 配置consumer参数并创建consumer实例;
- 订阅主题;
- 拉取消息并消费;
- 提交消费偏移量;
- 关闭consumer;
class ConsumerTest {public static void main(String[] args) {Properties props = new Properties();// bootstrap serverprops.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");// group.id, 如果当前consumer需要加入到某个group中, 否则自成一个groupprops.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "");// 自动创建topic, 开发中可能consumer端的小伙伴先开始, 等不及生产端。props.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "");// 自动提交offset设置, 样例中为手动提交// props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "");// 自动提交offset的时间间隔// props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "");// offset reset配置props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "");// key和value的deserializer配置props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "");props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);boolean running = true;while(running) {ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String> record : records) {// 消费消息}// 消费成功提交offsetconsumer.commitSync();}consumer.close();}
}
主题与分区
每个Topic中的消息由若干个分区存储, 每个分区存储了整个Topic下消息的一部分。在消息消费阶段, 同一个partition会被分配给消费组中的某一个consumer。因此partion的数量决定了一个consumer group中consumer的上限。
例如, Topic test 有 3 个partition, 对应的consumer group test-group中有4个consumer(consumer-1, consumer-2, consumer-3, consumer-4), 那么其中的某个consumer会处于空闲状态, 因为没有partition可以被分配, 进而也就无消息可消费。
反序列化
consumer作为消息的消费方, 必须使用与producer serializer相兼容的deserializer, 这样才能正确解析出对应的消息, 进而做消息消费。可以配置消息的key和message的deserialzer。Kafka内置了基本数据类型的Deserializer, IntegerDeserializer。
interface Deserializer {T deserialize(String topic, byte[] data);void close();
}
主题订阅
- 订阅通过subscribe方法完成。如果订阅方法反复调用, 仅最后一次的调用生效。
- 订阅多个特定主题, subscribe(collection);
- 订阅某种模式的主题, subscribe(pattern);
- 订阅某个主题的特定partition, assign(partition);
- 无论是哪种订阅方式, 一个consumer只能使用其中的一种, 都可以通过unsubscribe来取消订阅;
消息获取
- 消息消费的前提是topic中的消息投递给consumer。总体来说消息投递有2种模式, 推模式和拉模式。
- 推模式: client建立到server的长链接, 当server中有消息产生时, 第一时间通过该长链接推送到client;
- 拉模式: client主动发起消息请求, 从server端拉取消息;
- 从代码来看, Kafka是拉模式。由于consumer无法预知, topic中是否有新消息, 因此无效请求是存在的。Kafka设计者也注意到了这点, 提供了如下方法, 加入了一个等待窗口。如果窗口内有新消息到达, 则立刻返回; 如果始终无消息到达, 则超时后返回。平衡消息消费的及时性, 无效请求数量, 和server端实现复杂性。
kafkaConsumer.poll(timeout, timeunit)
内部涉及消费者位移, 消费者协调器, 组协调器, 消费者选择具, 分区分配的分发和再分配, 消费者心跳等内容。
位移提交
位移是消息在存储中的位置说明。通常来说, 消费者继续消费尚未消费的消息。消息存储和消费的逻辑模型如下:
- 消息是按照partition存储的;
- 消息写入partition时, offset单调递增;
- 从partition消费时, 每个消费者维护自己的offset; 消费中断后恢复时, 从上次保存的offset位置开始继续消费;
因此消息是否已经被消费由offset决定, offset及其之前的消息是已消费的消息, offset之后是待消费消息。因此, 消费者完成某个分区的消费之后, 需要提交该offset给Kafka Server。提交方式有两种自动提交和手动提交;
提交方式 | 说明 | 优缺点 |
---|---|---|
自动提交 | (默认方式) Kafka Client周期性地提交偏移量 | 优点是简单, 确定是重复消费和丢失风险 |
手动提交 | 由用户主动提交偏移量 | 优点是可细粒度管理, 缺点是相对复杂 |
自动提交
自动提交是按照时间间隔提交, 如果在消息拉取和位移提交之间client崩溃, 对下一次消费的影响分三种场景讨论(如下图所示)。
- consumer thread中poll和消费是串行的, 但consumer thread和commit thread是并行的;
- 在poll和crash之间发生commit, 那么当client恢复后从x+7开始拉取消息, [x+3, x+6] 的消息丢失;
- 在crash之后发生commit, 那么当client恢复后从x+7开始拉取消息, [x+3, x+6] 的消息丢失;
- 全程没有发生commit, 那么当client恢复后从x+1开始拉取消息, [x+1, x+3]的消息重复消费;
手动提交
也有两种模式, 同步提交和异步提交。前者在得到server确认之前所在线程会阻塞, 后者线程继续运行。是需要结合场景来选择。
| 提交方式 | 说明 | 优缺点 |
| 同步提交 | 针对当前拉取的一批消息, 统一提交 | 简单, 无法做细粒度控制 |
| 异步提交 | 基于回调通知结果 | 可以按分区提交, 指定offset参数提交 |
指定offset
消费消息需要从某个offset开始, 如果是首次消费又该从哪个位置开始呢?
-
由参数auto.offset.reset设定默认行为
| 参数值 | 行为 |
|----|----|
| earliest | 从分区第一条消息的offset开始 |
| latest | 从上次保存的offset开始, 首次消费时和earliest行为一致 |
| none | 程序逻辑自定义, 如果未设置则抛出异常 | -
程序通过seek方法指定offset位置, 如果指定offset越界也会触发auto.offset.reset行为; 由于offset是partition级别的概念, 因此seek的使用是面向partition, 这就意味着对同一个topic的多个partition来说, 可以seek不同的offset。此外seek方法也支持基于timestamp定位消息。站在更高的视角来看, seek提供了parttion级别的消息搜索能力。
-
由于seek的存在, 我们可以把offset存储在DB或者其他Kafka之外的地方, 并基于seek进行恢复。
再平衡
再平衡是把分区所有权从1个消费者转移到另一个消费者的行为, 它保障消费组的可用性和伸缩性。从可用性而言, 消费故障可以恢复。就伸缩性而言, 消费组内的消费者可以扩缩容。再平衡期间, 所有的消费者暂停消费, 直到再平衡结束。由于再平衡期间, 消费者的消费状态会丢失, 再平衡之后每个partition的offset以Kafka已持久保存的offset为准, 因此可能存在重复消费情况。
Kafka提供ConsumerRebalanceListener接口, 使得该过程可以被Consumer感知, 至于怎么处理则是应用需要解决的问题, Kafka也只能帮我们到这里。
消费者拦截器
Kafka提供了ConsumerInterceptor接口, 允许我们在poll方法返回前和commit方法调用后触发, 允许我们做一些定制化的工作, 比如消息过滤, 日志输出, 消息追踪等操作。从网络应用开发的角度来说, 这种是一种常见的实现方式, 比如Tomcat中的Filter。
拦截器通过interceptor.classes配置生效, 多个拦截器可以组合成为"拦截器Pipeline"。如果其中一个拦截器异常, 后续的拦截器从最近一次成功的拦截器继续执行, 因此需要提防副作用。
相关文章:

消息消费过程
前言 本文介绍下Kafka消费过程, 内容涉及消费与消费组, 主题与分区, 位移提交,分区再平衡和消费者拦截器等内容。 消费者与消费组 Kafka将消费者组织为消费组, 消息只会被投递给消费组中的1个消费者。因此, 从不同消费组中的消费者来看, Kafka是多播(Pub/Sub)模式…...

使用Lychee搭建个人图片存储系统并进行远程访问设置实现公网访问本地私人图床
文章目录 1.前言2. Lychee网站搭建2.1. Lychee下载和安装2.2 Lychee网页测试2.3 cpolar的安装和注册 3.本地网页发布3.1 Cpolar云端设置3.2 Cpolar本地设置 4.公网访问测试5.结语 1.前言 图床作为图片集中存放的服务网站,可以看做是云存储的一部分,既可…...

12-2- DCGAN -简单网络-卷积网络
功能 随机噪声→生成器→MINIST图像。 训练方法 0 损失函数:gan的优化目标是一个对抗损失,是二分类问题,用BCELoss 1 判别器的训练,首先固定生成器参数不变,其次判别器应当将真实图像判别为1,生成图像判别为0 loss=loss(real_out, 1)+loss(fake_out, 0) 2 生成器的…...

Redis持久化策略之RDB与AOF
文章目录 1.RDB1)基本介绍2)自动触发3)手动触发4)RDB文件5)优点缺点 2.AOF1)基本介绍2)使用方式3)工作流程4)重写机制5)AOF文件6)优点缺点 3.RDB AOF 我们都知道,redis 是一个基于内存的数据库。基于内存的好处是访问速度快,缺点是“不持久”——当数据…...
Python学习笔记--初识 Python 正则表达式
初识 Python 正则表达式 正则表达式是一个特殊的字符序列,用于判断一个字符串是否与我们所设定的字符序列是否匹配,也就是说检查一个字符串是否与某种模式匹配。 Python 自 1.5 版本起增加了re 模块,它提供 Perl 风格的正则表达式模式。re 模块使 Python 语言拥有全部的正…...

webAPP基础学习
###视觉基础 part-I ####1.面试中常见的像素问题 >什么是像素? *1.什么是px? px-虚拟像素,css像素的单位 px是一个相对单位,相对于设备像素而言 >相对性 a.相对于同一个设备,css像素的可变的 css像素物理像素>会受到缩放的影响 css像素缩放倍数*单个物理像…...

RIP路由信息协议
RIP路由信息协议(Routing Information Protocol) 最先得到广泛应用的协议,最大优点是简单要求网络中的每个路由器都要维护一张表,表中记录了从它自己到其他每一个目的网络的距离RIP是应用层协议,它在传输层使用UDP,RIP报文作为UD…...

kubernetes 高可用集群
目录 一、haproxy负载均衡 二、pacemaker高可用 三、部署control-plane 四、部署worker node 实验环境 主机名 IP 角色 docker 192.168.67.10 harbor k8s1 192.168.67.11 control-plane k8s2 192.168.67.12 control-plane k8s3 192.168.67.13 control-plane k8s…...

java实现插入排序
图解 以下是Java实现插入排序的代码: public class InsertionSort {public static void main(String[] args) {int[] arr {5, 2, 4, 6, 1, 3};insertionSort(arr);System.out.println(Arrays.toString(arr)); // output: [1, 2, 3, 4, 5, 6]}public static void i…...

深度学习之基于YoloV5血红细胞检测识别系统
欢迎大家点赞、收藏、关注、评论啦 ,由于篇幅有限,只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 深度学习已经在许多领域中得到了广泛的应用,包括医疗健康领域。其中,YOLO(You O…...
8、可视化高斯滤波器并完成高斯滤波
本节一起绘制一个可视化的高斯滤波器,同时对一个彩色图像增加高斯噪声,最后通过一个高斯滤波器对图像进行降噪处理。 就像上节说的那样,滤波不是学习重点,下面通过实操了解下原理即可。 可视化高斯滤波器 高斯滤波器符合高斯分布,并且是二维高斯分布,这一点在上一节高斯…...

Linux MMC子系统 - 5.eMMC 5.1工作模式-引导模式
By: Ailson Jack Date: 2023.11.19 个人博客:http://www.only2fire.com/ 本文在我博客的地址是:http://www.only2fire.com/archives/164.html,排版更好,便于学习,也可以去我博客逛逛,兴许有你想要的内容呢。…...
2342. 数位和相等数对的最大和 --力扣 --JAVA
题目 给你一个下标从 0 开始的数组 nums ,数组中的元素都是 正 整数。请你选出两个下标 i 和 j(i ! j),且 nums[i] 的数位和 与 nums[j] 的数位和相等。 请你找出所有满足条件的下标 i 和 j ,找出并返回 nums[i] num…...

linux如何重置root密码
目录 当我们想要重置root管理员密码时,我们可以有两种方法进行: 方法一、init方法 1、重启系统,在下图所示界面按e键 2、随后进入以下界面,、将ro修改为rw,在行末尾添加init/bin/sh。编辑 3、随后按Ctrlx启动到s…...

Java 类之 java.util.Properties
Java 类之 java.util.Properties 文章目录 Java 类之 java.util.Properties一、简介二、主要功能1、存储键值对2、读取文件与属性代码示例运行结果截图 3、设置属性并保存文件代码示例结果截图 4、遍历属性代码示例运行结果 关联博客:《基于 Java 列举和说明常用的外…...
我遇到的bug(活动)
目录 方向一:身为程序员遇到过的奔溃瞬间 方向二:如何解决遇到的奔溃瞬间 方向三:在解决完后获得的收获和体会 方向一:身为程序员遇到过的奔溃瞬间 在一个项目中,我负责实现一个复杂的图像处理算法。经过几天的努力…...

MIB 6.1810实验Xv6 and Unix utilities(3)pingpong
Mit6.S081-实验1-Xv6 and Unix utilities-pingpong问题_Isana_Yashiro的博客-CSDN博客 Write a user-level program that uses xv6 system calls to ping-pong a byte between two processes over a pair of pipes, one for each direction. The parent should send a byte to…...
压力测试总共需要几个步骤?思路总结篇
在运维工作中,压力测试是一项很重要的工作。比如在一个网站上线之前,能承受多大访问量、在大访问量情况下性能怎样,这些数据指标好坏将会直接影响用户体验。今天我们就来深入了解下压力测试! 1、首先,什么是压力测试&…...
03_面向对象高级_多态
多态 1. 什么是多态? “多态” 是在 “继承” 的基础上实现的一种现象,具体表现为:对象多态、行为多态。 public class HelloWorld {public static void main(String[] args) {// 1. 对象多态Human h1 new Student();Human h2 new Teach…...

【Kingbase FlySync】界面化管控平台:2.配置数据库同步之KES>KES
【Kingbase FlySync】界面化管控平台:3.配置数据库同步之KES->KES 部署KES数据库到KES数据库同步服务1.登录KFS管理平台2.开始配置数据节点信息(1)配置node1数据节点(2)配置node2数据节点 3.KFS拓扑图配置4.开始部署5.启动同步程序并查验是否运行正常 测试同步1.从node1数据…...
云原生核心技术 (7/12): K8s 核心概念白话解读(上):Pod 和 Deployment 究竟是什么?
大家好,欢迎来到《云原生核心技术》系列的第七篇! 在上一篇,我们成功地使用 Minikube 或 kind 在自己的电脑上搭建起了一个迷你但功能完备的 Kubernetes 集群。现在,我们就像一个拥有了一块崭新数字土地的农场主,是时…...

《用户共鸣指数(E)驱动品牌大模型种草:如何抢占大模型搜索结果情感高地》
在注意力分散、内容高度同质化的时代,情感连接已成为品牌破圈的关键通道。我们在服务大量品牌客户的过程中发现,消费者对内容的“有感”程度,正日益成为影响品牌传播效率与转化率的核心变量。在生成式AI驱动的内容生成与推荐环境中࿰…...
Web 架构之 CDN 加速原理与落地实践
文章目录 一、思维导图二、正文内容(一)CDN 基础概念1. 定义2. 组成部分 (二)CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 (三)CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 …...
重启Eureka集群中的节点,对已经注册的服务有什么影响
先看答案,如果正确地操作,重启Eureka集群中的节点,对已经注册的服务影响非常小,甚至可以做到无感知。 但如果操作不当,可能会引发短暂的服务发现问题。 下面我们从Eureka的核心工作原理来详细分析这个问题。 Eureka的…...
React---day11
14.4 react-redux第三方库 提供connect、thunk之类的函数 以获取一个banner数据为例子 store: 我们在使用异步的时候理应是要使用中间件的,但是configureStore 已经自动集成了 redux-thunk,注意action里面要返回函数 import { configureS…...

SiFli 52把Imagie图片,Font字体资源放在指定位置,编译成指定img.bin和font.bin的问题
分区配置 (ptab.json) img 属性介绍: img 属性指定分区存放的 image 名称,指定的 image 名称必须是当前工程生成的 binary 。 如果 binary 有多个文件,则以 proj_name:binary_name 格式指定文件名, proj_name 为工程 名&…...

AirSim/Cosys-AirSim 游戏开发(四)外部固定位置监控相机
这个博客介绍了如何通过 settings.json 文件添加一个无人机外的 固定位置监控相机,因为在使用过程中发现 Airsim 对外部监控相机的描述模糊,而 Cosys-Airsim 在官方文档中没有提供外部监控相机设置,最后在源码示例中找到了,所以感…...
【FTP】ftp文件传输会丢包吗?批量几百个文件传输,有一些文件没有传输完整,如何解决?
FTP(File Transfer Protocol)本身是一个基于 TCP 的协议,理论上不会丢包。但 FTP 文件传输过程中仍可能出现文件不完整、丢失或损坏的情况,主要原因包括: ✅ 一、FTP传输可能“丢包”或文件不完整的原因 原因描述网络…...
前端高频面试题2:浏览器/计算机网络
本专栏相关链接 前端高频面试题1:HTML/CSS 前端高频面试题2:浏览器/计算机网络 前端高频面试题3:JavaScript 1.什么是强缓存、协商缓存? 强缓存: 当浏览器请求资源时,首先检查本地缓存是否命中。如果命…...

基于开源AI智能名片链动2 + 1模式S2B2C商城小程序的沉浸式体验营销研究
摘要:在消费市场竞争日益激烈的当下,传统体验营销方式存在诸多局限。本文聚焦开源AI智能名片链动2 1模式S2B2C商城小程序,探讨其在沉浸式体验营销中的应用。通过对比传统品鉴、工厂参观等初级体验方式,分析沉浸式体验的优势与价值…...