Kafka(四) Consumer消费者
一,基础知识
1,消费者与消费组
- 每个消费者都有对应的消费组,不同消费组之间互不影响。
- Partition的消息只能被一个消费组中的一个消费者所消费, 但Partition也可能被再平衡分配给新的消费者。
- 一个Topic的不同Partition会根据分配策略(消费者客户端参数partition.assignment strategy)分给不同消费者。

2,Kafka的消息模式
- 如果所有的消费者都属于同一消费组,那么所有的消息都会被均衡地投递给每个消费者,即每条消息只会被一个消费者处理,这就相当于点对点模式;
- 如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息会被所有的消费者处理,这就相当于发布/订阅模式;
二,Client开发
1,消费逻辑需要具备以下几个步骤
- 配置消费者参数及创建消费者实例
- 订阅主题
- 拉取消息并消费
- 提交消费位移
- 关闭消费者实例
public class Consumer {private static final String BROKER_LIST = "localhost:9092";private static final String TOPIC = "TOPIC-A";private static final String GROUP_ID = "GROUP-A";private static final AtomicBoolean IS_RUNNING = new AtomicBoolean(true);public static Properties initConfig() {Properties properties = new Properties();// 以下3个必须properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 客户端IDproperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "eris-kafka-consumer");// 消费组IDproperties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);// 自动提交,默认为trueproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);return properties;}public static void main(String[] args) {Properties properties = initConfig();KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);kafkaConsumer.subscribe(Arrays.asList(TOPIC));try {while (IS_RUNNING.get()) {// poll内部封装了消费位移提交、消费者协调器、组协调器、消费者的选举、分区分配与再均衡、心跳等// Duration用来控制在消费者的缓冲区里没有可用数据时阻塞等待的时间,0表示不等待直接返回ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> r : records) {print("topic:" + r.topic() + ", patition:" + r.partition() + ", offset:" + r.offset());print("key:" + r.key() + ", value:" + r.value());}}} catch (WakeupException e) {// wakeup方法是KafkaConsumer中唯一可以从其他线程里安全调用的方法,调用wakeup后可以退出poll的逻辑,并抛出WakeupException。我们也不需处理WakeupException,它只是一种跳出循环的方式。} catch (Exception e) {e.printStackTrace();} finally {// maybe commit offset.kafkaConsumer.close();}}
}
2,subscribe有4个重载方法
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
public void subscribe(Collection<String> topics)// 在之后如果又创建了新主题,并且与正表达式相匹配,那么这个消费者也可以消费到新添加的Topic
public void subscribe (Pattern pattern, ConsumerRebalanceListener listener)
public void subscribe (Pattern pattern)
3,assign订阅指定的分区
consumer.assign(Arrays.asList(new TopicPartition ("topic-demo", 0))) ;
4,取消订阅
consumer.unsubscribe();
consumer.subscribe(new ArrayList<String>()) ;
consumer.assign(new ArrayList<TopicPartition>());
5,消息消费
public class ConsumerRecord<K, V> {private final String topic;private final int partition;pr vate final long offset;private final long timestamp;private final TimestampType timestampType;private final int serializedKeySize;private final int serializedValueSize;private final Headers headers;private final K key;private final V value;private volatile Long checksum;//省略若干方法
}
① 根据分区对当前批次消息分类:public List<ConsumerRecord<K, V> records(TopicPart tion partition)for (TopicPartition tp : records.partitions()) {for (ConsumerRecord<String, String> record : records.records(tp)) {System.out.println(record.partition() + ":" + record.value());}
}② 根据主题对当前批次消息分类:public Iterable<ConsumerRecord<K, V> records(String topic)// ConsumerRecords类中并没提供与partitions()类似的topics()方法来查看拉取的消息集中所含的主题列表。
for (String topic : Arrays.asList(TOPIC)) {for (ConsumerRecord<String, String> record : records.records(topic)) {System.out.println(record.topic() + ":" + record.value());}
}
6,反序列化
三,位移提交
1,消费位移提交

2,三个位移的关系


- lastConsumedOffset:当前消费到的位置,即poll拉到的该分区最后一条消息的offset
- committed offset:提交的消费位移
- position:下次拉取的位置
TopicPartition tp = new TopicPartition("topic", 0);
kafkaConsumer.assign(Arrays.asList(tp));
long lastConsumedOffset = 0;while (true) {ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));List<ConsumerRecord<String, String>> partitionRecords = consumerRecords.records(tp);lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();// 同步提交消费位移kafkaConsumer.commitSync();System.out.println("consumed off set is " + lastConsumedOffset);OffsetAndMetadata offsetAndMetadata = kafkaConsumer.committed(tp);System.out.println("commited offset is " + offsetAndMetadata.offset());long posititon = kafkaConsumer.position(tp);System.out.println("he offset of t he next record is " + posititon);
}输出结果:
consumed offset is 377
commited offset is 378
the offset of the next record is 378
3,消息丢失与重复消费
- 如果poll后立马提交位移,之后业务异常,再次拉取就从新位移开始,就丢失了数据。
- 如果poll后先处理数据,处理到一半异常了,或者最后提交位移异常,重新拉取会从之前的位移拉,就重复消费了。
4,自动提交位移原理
5,手动提交位移
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {//do some logical processing kafkaConsumer.commitSync();
}1)commitSync会根据poll拉取的最新位移来进行提交(注意提交的值对应于图3-6 position的位置〉。2)可以使用带参方法,提交指定位移:commitSync(final Map<TopicPartition OffsetAndMetadata> offsets)3)没必要每条消息提交一次,可以改为批量提交。
public void commitAsync()
public void commitAsync(OffsetCommitCallback callback)
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)1)异步提交在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的poll操作。2)提交也可能会异常,可引入重试机制。但重试可能出现问题,若第一次commitAsync失败在重试,第二次成功了,然后第一次重试也成功了,就会覆盖位移为之前。解决方案:可以提交时维护一个序号,如果发现过期的序号就不再重试。
try {while (IS_RUNNING.get()) {// poll records and do some logical processing .kafkaConsumer.commitAsync();}
} finally {try {kafkaConsumer.commitSync();} finally {kafkaConsumer.close();}
}
四,暂停或恢复消费
public void pause(Collection<TopicPartition> partitions)
public roid resume(Collection<TopicPartition> partitions)
五,指定位移消费
- 决定从何处消费;
- 找不到消费位移记录时;
- 位移越界时(seek);
- (默认值)auto.offset.reset=latest,从分区末尾开始消费;
- auto.offset.reset=earliest,从分区起始开始消费;

// partition:分区,offset:从哪个位置消费
public void seek(TopicPartition partition, long offset)
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {// 如采不为空,则说明已经成功分配到了分区kafkaConsumer.poll(Duration.ofMillis(1000));assignment = kafkaConsumer.assignment();
}for (TopicPartition tp : assignment) {kafkaConsumer.seek(tp, 10);
}
while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));//consume the record
}
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout)给定待查分区和时间戳,返回大于等于该时间戳的第一条消息对应的offset和timestamp,对应于OffsetAndTimestamp中的offset、timestamp字段。
六,消费再均衡
注册:
subscribe(Collection<String> topics, ConsumerRebalanceListener listener) 和 subscribe(Patten pattern, ConsumerRebalanceListener listener)ConsumerRebalanceListener是一个接口,有2个方法。(1) void onPartitionsRevoked(Collection<TopicPartition> partitions)
再均衡开始之前和消费者停止读取消息之后被调用。(2) void onPartitionsAssigned(Collection<TopicPartition> partitions)
新分配分区之后和消费者开始拉取消费之前被调用 。
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
kafkaConsumer.subscribe(Arrays.asList("topic"), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {kafkaConsumer.commitSync(currentOffsets);currentOffsets.clear();}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {//do nothing .}
});
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {//process the recordcurrentOffsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1));
}
kafkaConsumer.commitAsync(currentOffsets, null);
kafkaConsumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {// store offset in DB}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {for (TopicPartition tp : partitions) {// 从DB中读取消费位移kafkaConsumer.seek(tp, getOffsetFromDB(tp));}}
}
七,消费者拦截器
// poll方法返回之前调用,可以修改返回的消息内容、按照某种规则过滤消息等
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K , V> records);// 提交完消费位移之后调用,可以用来记录跟踪所提交的位移信息,比如当使用commitSync的无参方法时,我们不知道提交的消费位移,而onCommit方法却可以做到这一点
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);public void close();
八,消费者多线程模型




相关文章:

Kafka(四) Consumer消费者
一,基础知识 1,消费者与消费组 每个消费者都有对应的消费组,不同消费组之间互不影响。 Partition的消息只能被一个消费组中的一个消费者所消费, 但Partition也可能被再平衡分配给新的消费者。 一个Topic的不同Partition会根据分配…...

前端路由手写Hash和History两种模式
文章目录 1. Hash模式:简洁而广泛适用2. History模式:更自然的用户体验3. 结论 在现代Web开发中,单页面应用(Single Page Application,简称SPA)因其流畅的用户体验和高效的页面交互能力而备受青睐。前端路由…...

Redis的单线程讲解与指令学习
目录 一.Redis的命令 二.数据类型 三.Redis的key的过期策略如何实现? 四.Redis为什么是单线程的 五.String有关的命令 Redis的学习专栏:http://t.csdnimg.cn/a8cvV 一.Redis的命令 两个基本命令 在Redis当中,有两个基本命令࿱…...
为什么MySQL会选择B+树作为索引
为什么MySQL会选择B树作为索引 在数据库管理系统中,索引是提升查询效率的关键技术之一。MySQL作为广泛使用的关系型数据库管理系统,其核心存储引擎InnoDB选择B树作为其索引结构,这一选择背后蕴含了深刻的性能和存储效率考量。本文将简要介绍…...
k8s secret-从环境变量里去读和从yaml文件里读取secret有什么区别?
从环境变量和YAML文件中读取Kubernetes Secret的区别主要体现在使用方式、动态更新能力以及管理便捷性上。以下是详细的区别说明: 1. **使用方式**: - **环境变量方式**:Kubernetes允许将Secret作为环境变量注入到Pod的容器中。这种方式的好处…...
Springboot+Aop用注解实现阿里云短信验证码校验,校验通过自动删除验证码缓存
1.新建操作类型枚举(这里的IEnum是我自定义的http请求拦截接口,不需要的话可以不用实现) Getter AllArgsConstructor public enum OperationType implements IEnum<Integer> {/*** 注册*/SIGN_UP(0),/*** 密码登录*/LOGIN_BY_PWD(1),/…...

无线物联网新时代,RFID拣货标签跟随潮流
拣选技术的演变历程,本质上是从人力操作向自动化、智能化转型的持续进程。近期,“货寻人”技术成为众多企业热烈追捧的对象,它可以根据企业的特定需求,从众多拣选方案中选出最优解。那么,在采用“货到人”拣选技术时&a…...
Java8 根据List实体中一个字段去重取最大值,并且根据该字段进行排序
1、前言 某个功能要求需要对一个list对象里数据按照股票分组,并且取分组涨跌幅最大的,返回一个新的list对象,并且按照涨跌幅字段进行排序,这么一连串的要求,如果按照传统的写法,我们需要写一大坨的代码&am…...
微服务经纬:Eureka驱动的分布式服务网格配置全解
微服务经纬:Eureka驱动的分布式服务网格配置全解 在微服务架构的宏伟蓝图中,服务网格(Service Mesh)作为微服务间通信的独立层,承担着流量管理、服务发现、故障恢复等关键任务。Eureka,Netflix开源的服务发…...

关于前端数据库可视化库的选择,vue3+antd+g2plot录课计划
之前:antdv 现在:g2plot https://g2plot.antv.antgroup.com/manual/introduction 录课内容:快速入门 图表示例: 选择使用比较广泛的示例类型,录课顺序如下: 1、折线图2、面积图3、柱形图4、条形图5、饼…...

linux进行redis的安装并使用RDB进行数据迁移
现在有两台电脑,分别是A,B,现在我要把A电脑上的redis的数据迁移到B电脑上,B电脑上是没有安装redis的 1.找到A电脑的redis的版本 1.先启动A电脑的redis,一般来说,都是直接在linux的控制台输入:re…...
深入理解Scikit-learn:决策树与随机森林算法详解
用sklearn实现决策树与随机森林 1. 简介 决策树和随机森林是机器学习中的两种强大算法。决策树通过学习数据特征与标签之间的规则来进行预测,而随机森林则是由多棵决策树组成的集成算法,能有效提高模型的稳定性和准确性。 2. 安装sklearn 首先&#…...

AutoHotKey自动热键(十一)下载SciTE4AutoHotkey-Plus的中文增强版脚本编辑器
关于AutoHotkey的专用编辑器, SciTE4AutoHotkey是一个免费的基于 SciTE 的 AutoHotkey 脚本编辑器,除了 DBGp 支持, 它还为 AutoHotkey 提供了语法高亮, 调用提示, 参数信息和自动完成, 以及其他拥有的编辑特性和辅助工具.XDebugClient 是一个基于 .NET Framework 2.0 的简单开…...
Halcon与C++之间的数据转换
HALCON的HTuple类型(元组)功能很强大,可以表示INT、double、string等多种类型数据。当元组中只有一个成员时,HTuple也可表示原子类型 1. haclon -> C //HTuple转int HTuple hTuple 1; int data1 hTuple[0].I(); // data1 1//HTuple转do…...
MybatisPlus 一些技巧
查询简化 SimpleQuery 有工具类 com.baomidou.mybatisplus.extension.toolkit.SimpleQuery 对 selectList 查询后的结果进行了封装,使其可以通过 Stream 流的方式进行处理,从而简化了 API 的调用。 方法 list() 支持对一个列表提取某个字段ÿ…...
定制化服务发现:Eureka中服务实例偏好的高级配置
定制化服务发现:Eureka中服务实例偏好的高级配置 在微服务架构中,服务实例的智能管理和优化是保证系统高效运行的关键。Eureka作为Netflix开源的服务注册与发现框架,提供了丰富的配置选项来满足不同场景下的需求。服务实例偏好配置允许开发者…...

【实战场景】MongoDB迁移的那些事
【实战场景】MongoDB迁移的那些事 开篇词:干货篇【MongoDB迁移的方法】:1. 基于mongodump和mongorestore的迁移一、迁移前准备二、使用mongodump备份数据三、使用mongorestore还原数据四、注意事项 2. 基于MongoDB复制集的迁移一、迁移前准备二、配置新复…...

为什么要使用加密软件?
一、保护数据安全:加密软件通过复杂的加密算法对敏感数据进行加密处理,使得未经授权的人员即使获取了加密数据,也无法轻易解密和获取其中的内容。这极大地提高了数据在存储、传输和使用过程中的安全性。 二、遵守法律法规:在许多国…...
k8s学习笔记——dashboard安装
重装了k8s集群后,重新安装k8s的仪表板,发现与以前安装不一样的地方。主要是镜像下载的问题,由于网络安全以及国外网站封锁的原因,现在很多镜像按照官方提供的仓库地址都下拉不下来,导致安装失败。我查了好几天…...
AI艺术创作:掌握Midjourney和DALL-E的技巧与策略
AI艺术创作:掌握Midjourney和DALL-E的技巧与策略 AI艺术创作正逐渐成为艺术家和创意工作者们探索新表达方式的重要工具。Midjourney和DALL-E是两款领先的AI绘画工具,它们各有独特的功能和优势。本文将详细介绍如何掌握这两款工具的使用技巧,…...
HTML 语义化
目录 HTML 语义化HTML5 新特性HTML 语义化的好处语义化标签的使用场景最佳实践 HTML 语义化 HTML5 新特性 标准答案: 语义化标签: <header>:页头<nav>:导航<main>:主要内容<article>&#x…...

iOS 26 携众系统重磅更新,但“苹果智能”仍与国行无缘
美国西海岸的夏天,再次被苹果点燃。一年一度的全球开发者大会 WWDC25 如期而至,这不仅是开发者的盛宴,更是全球数亿苹果用户翘首以盼的科技春晚。今年,苹果依旧为我们带来了全家桶式的系统更新,包括 iOS 26、iPadOS 26…...
CVPR 2025 MIMO: 支持视觉指代和像素grounding 的医学视觉语言模型
CVPR 2025 | MIMO:支持视觉指代和像素对齐的医学视觉语言模型 论文信息 标题:MIMO: A medical vision language model with visual referring multimodal input and pixel grounding multimodal output作者:Yanyuan Chen, Dexuan Xu, Yu Hu…...
DeepSeek 赋能智慧能源:微电网优化调度的智能革新路径
目录 一、智慧能源微电网优化调度概述1.1 智慧能源微电网概念1.2 优化调度的重要性1.3 目前面临的挑战 二、DeepSeek 技术探秘2.1 DeepSeek 技术原理2.2 DeepSeek 独特优势2.3 DeepSeek 在 AI 领域地位 三、DeepSeek 在微电网优化调度中的应用剖析3.1 数据处理与分析3.2 预测与…...

GC1808高性能24位立体声音频ADC芯片解析
1. 芯片概述 GC1808是一款24位立体声音频模数转换器(ADC),支持8kHz~96kHz采样率,集成Δ-Σ调制器、数字抗混叠滤波器和高通滤波器,适用于高保真音频采集场景。 2. 核心特性 高精度:24位分辨率,…...

【p2p、分布式,区块链笔记 MESH】Bluetooth蓝牙通信 BLE Mesh协议的拓扑结构 定向转发机制
目录 节点的功能承载层(GATT/Adv)局限性: 拓扑关系定向转发机制定向转发意义 CG 节点的功能 节点的功能由节点支持的特性和功能决定。所有节点都能够发送和接收网格消息。节点还可以选择支持一个或多个附加功能,如 Configuration …...

零知开源——STM32F103RBT6驱动 ICM20948 九轴传感器及 vofa + 上位机可视化教程
STM32F1 本教程使用零知标准板(STM32F103RBT6)通过I2C驱动ICM20948九轴传感器,实现姿态解算,并通过串口将数据实时发送至VOFA上位机进行3D可视化。代码基于开源库修改优化,适合嵌入式及物联网开发者。在基础驱动上新增…...
云原生周刊:k0s 成为 CNCF 沙箱项目
开源项目推荐 HAMi HAMi(原名 k8s‑vGPU‑scheduler)是一款 CNCF Sandbox 级别的开源 K8s 中间件,通过虚拟化 GPU/NPU 等异构设备并支持内存、计算核心时间片隔离及共享调度,为容器提供统一接口,实现细粒度资源配额…...

uni-app学习笔记三十五--扩展组件的安装和使用
由于内置组件不能满足日常开发需要,uniapp官方也提供了众多的扩展组件供我们使用。由于不是内置组件,需要安装才能使用。 一、安装扩展插件 安装方法: 1.访问uniapp官方文档组件部分:组件使用的入门教程 | uni-app官网 点击左侧…...
用递归算法解锁「子集」问题 —— LeetCode 78题解析
文章目录 一、题目介绍二、递归思路详解:从决策树开始理解三、解法一:二叉决策树 DFS四、解法二:组合式回溯写法(推荐)五、解法对比 递归算法是编程中一种非常强大且常见的思想,它能够优雅地解决很多复杂的…...