Kafka(四) Consumer消费者
一,基础知识
1,消费者与消费组
- 每个消费者都有对应的消费组,不同消费组之间互不影响。
- Partition的消息只能被一个消费组中的一个消费者所消费, 但Partition也可能被再平衡分配给新的消费者。
- 一个Topic的不同Partition会根据分配策略(消费者客户端参数partition.assignment strategy)分给不同消费者。

2,Kafka的消息模式
- 如果所有的消费者都属于同一消费组,那么所有的消息都会被均衡地投递给每个消费者,即每条消息只会被一个消费者处理,这就相当于点对点模式;
- 如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息会被所有的消费者处理,这就相当于发布/订阅模式;
二,Client开发
1,消费逻辑需要具备以下几个步骤
- 配置消费者参数及创建消费者实例
- 订阅主题
- 拉取消息并消费
- 提交消费位移
- 关闭消费者实例
public class Consumer {private static final String BROKER_LIST = "localhost:9092";private static final String TOPIC = "TOPIC-A";private static final String GROUP_ID = "GROUP-A";private static final AtomicBoolean IS_RUNNING = new AtomicBoolean(true);public static Properties initConfig() {Properties properties = new Properties();// 以下3个必须properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 客户端IDproperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "eris-kafka-consumer");// 消费组IDproperties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);// 自动提交,默认为trueproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);return properties;}public static void main(String[] args) {Properties properties = initConfig();KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);kafkaConsumer.subscribe(Arrays.asList(TOPIC));try {while (IS_RUNNING.get()) {// poll内部封装了消费位移提交、消费者协调器、组协调器、消费者的选举、分区分配与再均衡、心跳等// Duration用来控制在消费者的缓冲区里没有可用数据时阻塞等待的时间,0表示不等待直接返回ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> r : records) {print("topic:" + r.topic() + ", patition:" + r.partition() + ", offset:" + r.offset());print("key:" + r.key() + ", value:" + r.value());}}} catch (WakeupException e) {// wakeup方法是KafkaConsumer中唯一可以从其他线程里安全调用的方法,调用wakeup后可以退出poll的逻辑,并抛出WakeupException。我们也不需处理WakeupException,它只是一种跳出循环的方式。} catch (Exception e) {e.printStackTrace();} finally {// maybe commit offset.kafkaConsumer.close();}}
}
2,subscribe有4个重载方法
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
public void subscribe(Collection<String> topics)// 在之后如果又创建了新主题,并且与正表达式相匹配,那么这个消费者也可以消费到新添加的Topic
public void subscribe (Pattern pattern, ConsumerRebalanceListener listener)
public void subscribe (Pattern pattern)
3,assign订阅指定的分区
consumer.assign(Arrays.asList(new TopicPartition ("topic-demo", 0))) ;
4,取消订阅
consumer.unsubscribe();
consumer.subscribe(new ArrayList<String>()) ;
consumer.assign(new ArrayList<TopicPartition>());
5,消息消费
public class ConsumerRecord<K, V> {private final String topic;private final int partition;pr vate final long offset;private final long timestamp;private final TimestampType timestampType;private final int serializedKeySize;private final int serializedValueSize;private final Headers headers;private final K key;private final V value;private volatile Long checksum;//省略若干方法
}
① 根据分区对当前批次消息分类:public List<ConsumerRecord<K, V> records(TopicPart tion partition)for (TopicPartition tp : records.partitions()) {for (ConsumerRecord<String, String> record : records.records(tp)) {System.out.println(record.partition() + ":" + record.value());}
}② 根据主题对当前批次消息分类:public Iterable<ConsumerRecord<K, V> records(String topic)// ConsumerRecords类中并没提供与partitions()类似的topics()方法来查看拉取的消息集中所含的主题列表。
for (String topic : Arrays.asList(TOPIC)) {for (ConsumerRecord<String, String> record : records.records(topic)) {System.out.println(record.topic() + ":" + record.value());}
}
6,反序列化
三,位移提交
1,消费位移提交

2,三个位移的关系


- lastConsumedOffset:当前消费到的位置,即poll拉到的该分区最后一条消息的offset
- committed offset:提交的消费位移
- position:下次拉取的位置
TopicPartition tp = new TopicPartition("topic", 0);
kafkaConsumer.assign(Arrays.asList(tp));
long lastConsumedOffset = 0;while (true) {ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));List<ConsumerRecord<String, String>> partitionRecords = consumerRecords.records(tp);lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();// 同步提交消费位移kafkaConsumer.commitSync();System.out.println("consumed off set is " + lastConsumedOffset);OffsetAndMetadata offsetAndMetadata = kafkaConsumer.committed(tp);System.out.println("commited offset is " + offsetAndMetadata.offset());long posititon = kafkaConsumer.position(tp);System.out.println("he offset of t he next record is " + posititon);
}输出结果:
consumed offset is 377
commited offset is 378
the offset of the next record is 378
3,消息丢失与重复消费
- 如果poll后立马提交位移,之后业务异常,再次拉取就从新位移开始,就丢失了数据。
- 如果poll后先处理数据,处理到一半异常了,或者最后提交位移异常,重新拉取会从之前的位移拉,就重复消费了。
4,自动提交位移原理
5,手动提交位移
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {//do some logical processing kafkaConsumer.commitSync();
}1)commitSync会根据poll拉取的最新位移来进行提交(注意提交的值对应于图3-6 position的位置〉。2)可以使用带参方法,提交指定位移:commitSync(final Map<TopicPartition OffsetAndMetadata> offsets)3)没必要每条消息提交一次,可以改为批量提交。
public void commitAsync()
public void commitAsync(OffsetCommitCallback callback)
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)1)异步提交在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的poll操作。2)提交也可能会异常,可引入重试机制。但重试可能出现问题,若第一次commitAsync失败在重试,第二次成功了,然后第一次重试也成功了,就会覆盖位移为之前。解决方案:可以提交时维护一个序号,如果发现过期的序号就不再重试。
try {while (IS_RUNNING.get()) {// poll records and do some logical processing .kafkaConsumer.commitAsync();}
} finally {try {kafkaConsumer.commitSync();} finally {kafkaConsumer.close();}
}
四,暂停或恢复消费
public void pause(Collection<TopicPartition> partitions)
public roid resume(Collection<TopicPartition> partitions)
五,指定位移消费
- 决定从何处消费;
- 找不到消费位移记录时;
- 位移越界时(seek);
- (默认值)auto.offset.reset=latest,从分区末尾开始消费;
- auto.offset.reset=earliest,从分区起始开始消费;

// partition:分区,offset:从哪个位置消费
public void seek(TopicPartition partition, long offset)
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {// 如采不为空,则说明已经成功分配到了分区kafkaConsumer.poll(Duration.ofMillis(1000));assignment = kafkaConsumer.assignment();
}for (TopicPartition tp : assignment) {kafkaConsumer.seek(tp, 10);
}
while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));//consume the record
}
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout)给定待查分区和时间戳,返回大于等于该时间戳的第一条消息对应的offset和timestamp,对应于OffsetAndTimestamp中的offset、timestamp字段。
六,消费再均衡
注册:
subscribe(Collection<String> topics, ConsumerRebalanceListener listener) 和 subscribe(Patten pattern, ConsumerRebalanceListener listener)ConsumerRebalanceListener是一个接口,有2个方法。(1) void onPartitionsRevoked(Collection<TopicPartition> partitions)
再均衡开始之前和消费者停止读取消息之后被调用。(2) void onPartitionsAssigned(Collection<TopicPartition> partitions)
新分配分区之后和消费者开始拉取消费之前被调用 。
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
kafkaConsumer.subscribe(Arrays.asList("topic"), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {kafkaConsumer.commitSync(currentOffsets);currentOffsets.clear();}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {//do nothing .}
});
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {//process the recordcurrentOffsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1));
}
kafkaConsumer.commitAsync(currentOffsets, null);
kafkaConsumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {// store offset in DB}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {for (TopicPartition tp : partitions) {// 从DB中读取消费位移kafkaConsumer.seek(tp, getOffsetFromDB(tp));}}
}
七,消费者拦截器
// poll方法返回之前调用,可以修改返回的消息内容、按照某种规则过滤消息等
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K , V> records);// 提交完消费位移之后调用,可以用来记录跟踪所提交的位移信息,比如当使用commitSync的无参方法时,我们不知道提交的消费位移,而onCommit方法却可以做到这一点
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);public void close();
八,消费者多线程模型




相关文章:

Kafka(四) Consumer消费者
一,基础知识 1,消费者与消费组 每个消费者都有对应的消费组,不同消费组之间互不影响。 Partition的消息只能被一个消费组中的一个消费者所消费, 但Partition也可能被再平衡分配给新的消费者。 一个Topic的不同Partition会根据分配…...

前端路由手写Hash和History两种模式
文章目录 1. Hash模式:简洁而广泛适用2. History模式:更自然的用户体验3. 结论 在现代Web开发中,单页面应用(Single Page Application,简称SPA)因其流畅的用户体验和高效的页面交互能力而备受青睐。前端路由…...

Redis的单线程讲解与指令学习
目录 一.Redis的命令 二.数据类型 三.Redis的key的过期策略如何实现? 四.Redis为什么是单线程的 五.String有关的命令 Redis的学习专栏:http://t.csdnimg.cn/a8cvV 一.Redis的命令 两个基本命令 在Redis当中,有两个基本命令࿱…...

为什么MySQL会选择B+树作为索引
为什么MySQL会选择B树作为索引 在数据库管理系统中,索引是提升查询效率的关键技术之一。MySQL作为广泛使用的关系型数据库管理系统,其核心存储引擎InnoDB选择B树作为其索引结构,这一选择背后蕴含了深刻的性能和存储效率考量。本文将简要介绍…...

k8s secret-从环境变量里去读和从yaml文件里读取secret有什么区别?
从环境变量和YAML文件中读取Kubernetes Secret的区别主要体现在使用方式、动态更新能力以及管理便捷性上。以下是详细的区别说明: 1. **使用方式**: - **环境变量方式**:Kubernetes允许将Secret作为环境变量注入到Pod的容器中。这种方式的好处…...

Springboot+Aop用注解实现阿里云短信验证码校验,校验通过自动删除验证码缓存
1.新建操作类型枚举(这里的IEnum是我自定义的http请求拦截接口,不需要的话可以不用实现) Getter AllArgsConstructor public enum OperationType implements IEnum<Integer> {/*** 注册*/SIGN_UP(0),/*** 密码登录*/LOGIN_BY_PWD(1),/…...

无线物联网新时代,RFID拣货标签跟随潮流
拣选技术的演变历程,本质上是从人力操作向自动化、智能化转型的持续进程。近期,“货寻人”技术成为众多企业热烈追捧的对象,它可以根据企业的特定需求,从众多拣选方案中选出最优解。那么,在采用“货到人”拣选技术时&a…...

Java8 根据List实体中一个字段去重取最大值,并且根据该字段进行排序
1、前言 某个功能要求需要对一个list对象里数据按照股票分组,并且取分组涨跌幅最大的,返回一个新的list对象,并且按照涨跌幅字段进行排序,这么一连串的要求,如果按照传统的写法,我们需要写一大坨的代码&am…...

微服务经纬:Eureka驱动的分布式服务网格配置全解
微服务经纬:Eureka驱动的分布式服务网格配置全解 在微服务架构的宏伟蓝图中,服务网格(Service Mesh)作为微服务间通信的独立层,承担着流量管理、服务发现、故障恢复等关键任务。Eureka,Netflix开源的服务发…...

关于前端数据库可视化库的选择,vue3+antd+g2plot录课计划
之前:antdv 现在:g2plot https://g2plot.antv.antgroup.com/manual/introduction 录课内容:快速入门 图表示例: 选择使用比较广泛的示例类型,录课顺序如下: 1、折线图2、面积图3、柱形图4、条形图5、饼…...

linux进行redis的安装并使用RDB进行数据迁移
现在有两台电脑,分别是A,B,现在我要把A电脑上的redis的数据迁移到B电脑上,B电脑上是没有安装redis的 1.找到A电脑的redis的版本 1.先启动A电脑的redis,一般来说,都是直接在linux的控制台输入:re…...

深入理解Scikit-learn:决策树与随机森林算法详解
用sklearn实现决策树与随机森林 1. 简介 决策树和随机森林是机器学习中的两种强大算法。决策树通过学习数据特征与标签之间的规则来进行预测,而随机森林则是由多棵决策树组成的集成算法,能有效提高模型的稳定性和准确性。 2. 安装sklearn 首先&#…...

AutoHotKey自动热键(十一)下载SciTE4AutoHotkey-Plus的中文增强版脚本编辑器
关于AutoHotkey的专用编辑器, SciTE4AutoHotkey是一个免费的基于 SciTE 的 AutoHotkey 脚本编辑器,除了 DBGp 支持, 它还为 AutoHotkey 提供了语法高亮, 调用提示, 参数信息和自动完成, 以及其他拥有的编辑特性和辅助工具.XDebugClient 是一个基于 .NET Framework 2.0 的简单开…...

Halcon与C++之间的数据转换
HALCON的HTuple类型(元组)功能很强大,可以表示INT、double、string等多种类型数据。当元组中只有一个成员时,HTuple也可表示原子类型 1. haclon -> C //HTuple转int HTuple hTuple 1; int data1 hTuple[0].I(); // data1 1//HTuple转do…...

MybatisPlus 一些技巧
查询简化 SimpleQuery 有工具类 com.baomidou.mybatisplus.extension.toolkit.SimpleQuery 对 selectList 查询后的结果进行了封装,使其可以通过 Stream 流的方式进行处理,从而简化了 API 的调用。 方法 list() 支持对一个列表提取某个字段ÿ…...

定制化服务发现:Eureka中服务实例偏好的高级配置
定制化服务发现:Eureka中服务实例偏好的高级配置 在微服务架构中,服务实例的智能管理和优化是保证系统高效运行的关键。Eureka作为Netflix开源的服务注册与发现框架,提供了丰富的配置选项来满足不同场景下的需求。服务实例偏好配置允许开发者…...

【实战场景】MongoDB迁移的那些事
【实战场景】MongoDB迁移的那些事 开篇词:干货篇【MongoDB迁移的方法】:1. 基于mongodump和mongorestore的迁移一、迁移前准备二、使用mongodump备份数据三、使用mongorestore还原数据四、注意事项 2. 基于MongoDB复制集的迁移一、迁移前准备二、配置新复…...

为什么要使用加密软件?
一、保护数据安全:加密软件通过复杂的加密算法对敏感数据进行加密处理,使得未经授权的人员即使获取了加密数据,也无法轻易解密和获取其中的内容。这极大地提高了数据在存储、传输和使用过程中的安全性。 二、遵守法律法规:在许多国…...

k8s学习笔记——dashboard安装
重装了k8s集群后,重新安装k8s的仪表板,发现与以前安装不一样的地方。主要是镜像下载的问题,由于网络安全以及国外网站封锁的原因,现在很多镜像按照官方提供的仓库地址都下拉不下来,导致安装失败。我查了好几天…...

AI艺术创作:掌握Midjourney和DALL-E的技巧与策略
AI艺术创作:掌握Midjourney和DALL-E的技巧与策略 AI艺术创作正逐渐成为艺术家和创意工作者们探索新表达方式的重要工具。Midjourney和DALL-E是两款领先的AI绘画工具,它们各有独特的功能和优势。本文将详细介绍如何掌握这两款工具的使用技巧,…...

在Mac上免费恢复误删除的Word文档
Microsoft Word for Mac是一个有用的文字处理应用程序,它与Microsoft Office套件捆绑在一起。该软件的稳定版本包括 Word 2019、2016、2011 等。 Word for Mac 与 Apple Pages 兼容;这允许在不同的操作系统版本中使用Word文档,而不会遇到任何麻烦。 与…...

HarmonyOS 屏幕适配设计
1. armonyOS 屏幕适配设计 1.1. 像素单位 (1)px (Pixels) px代表屏幕上的像素点,是手机屏幕分辨率的单位,即屏幕物理像素单位。 (2)vp (Viewport Percentage) vp是视口百分比单位,基于…...

Netfilter之连接跟踪(Connection Tracking)和反向 SNAT(Reverse SNAT)
连接跟踪(Connection Tracking) 连接跟踪是 Netfilter 框架中的一个功能,用于跟踪网络连接的状态和元数据。它使防火墙能够识别和处理数据包属于哪个连接,并在双向通信中正确匹配请求和响应数据包。 工作原理 建立连接…...

Linux下使用vs code离线安装各种插件
Linux下使用vs code离线安装各种插件 (1)手动下载插件 插件市场 -> 搜索插件名 -> 右边栏 Download Extension (2)寻找安装目录 whereis code一般会出现两个目录,选择右边那个/usr/share/code code: /usr/b…...

【常见开源库的二次开发】基于openssl的加密与解密——Base58比特币钱包地址——算法分析(三)
目录: 目录: 一、base58(58进制) 1.1 什么是base58? 1.2 辗转相除法 1.3 base58输出字节数: 二、源码分析: 2.1源代码: 2.2 算法思路介绍: 2.2.1 Base58编码过程: 2.1.2 Base58解码过…...

Linux操作系统——数据库
数据库 sun solaris gnu 1、分类: 大型 中型 小型 ORACLE MYSQL/MSSQL SQLITE DBII powdb 关系型数据库 2、名词: DB 数据库 select update database DBMS 数据…...

【数据结构与算法】希尔排序:基于插入排序的高效排序算法
💓 博客主页:倔强的石头的CSDN主页 📝Gitee主页:倔强的石头的gitee主页 ⏩ 文章专栏:《数据结构与算法》 期待您的关注 目录 一、引言 二、基本原理 三、实现步骤 四、C语言实现 五、性能分析 1. 时间复杂度…...

关于正点原子的alpha开发板的启动函数(汇编,自己的认识)
我傻逼了,这里的注释还是不要用; 全部换成 /* */ 这里就分为两块,一部分是复位中断部分,第二部分就是IRQ部分(中断部分最重要) 我就围绕着两部分来展开我的认识 首先声明全局 .global_start 在 ARM 架…...

Deep Layer Aggregation【方法部分解读】
摘要: 视觉识别需要跨越从低到高的层次、从小到大的尺度以及从精细到粗略的分辨率的丰富表示。即使卷积网络的特征层次很深,单独的一层信息也不足够:复合和聚合这些表示可以改进对“是什么”和“在哪里”的推断。架构上的努力正在探索网络骨干的许多维度,设计更深或更宽的架…...

大数据面试SQL题-笔记01【运算符、条件查询、语法顺序、表连接】
大数据面试SQL题复习思路一网打尽!(文档见评论区)_哔哩哔哩_bilibiliHive SQL 大厂必考常用窗口函数及相关面试题 大数据面试SQL题-笔记01【运算符、条件查询、语法顺序、表连接】大数据面试SQL题-笔记02【...】 目录 01、力扣网-sql题 1、高频SQL50题(…...