当前位置: 首页 > news >正文

Kafka 消费者

Kafka消费者主要负责消费(读取和处理)由生产者发布的消息。

1 消费者入门

消费组将具有相同group.id的消费者实例组织成组。它们共同读取一个或多个主题的消息。每个消费者都有一个对应的消费组。

消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。

图 消费组与主题的关系

一个消费者,增加新的消费者时,分区会进行重分配,每个消费者会消费不同的分区。可以通过增加(或减少)消费者的个数来提高(或降低)整体的消费能力。但,如果消费者的个数大于分区个数,会出现消费者分配不到任何分区的情况。

1.1 创建消费者消费的步骤

1)配置消费者参数。必备参数:key.deserializer、value.deserializer、bootstrap.servers、group.id(消费者隶属的消费组名称)

2)创建消费者实例。

3)订阅主题及分区。

4)拉取消息,并进行消费。

5)关闭消费者实例。

public static void doConsume(String topic,ThreadSwitch threadSwitch) {new Thread(() -> {Properties props = new Properties();props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,HOST);Consumer<String,String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singleton(topic));while (threadSwitch.running()) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String,String> record : records) {System.out.println(record.partition() + "," + record.offset() + "," + record.value());}}consumer.close();}).start();}

1.1.1 订阅主题与分区

void subscribe(Collection<String> topics);
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
void subscribe(Pattern pattern);
void assign(Collection<TopicPartition> partitions);

上面是消费者订阅主题及分区的方法。订阅方式有三种:集合订阅(AUTO_TOPICS)、正则表达式订阅(AUTO_PARTTERN)、指定分区订阅(USER_ASSIGNED)。消费者只能选择其中一种方式。

1.1.2 查询指定主题的元数据信息

List<PartitionInfo> partitionsFor(String topic);
List<PartitionInfo> partitionsFor(String topic, Duration timeout);

图 PartitionInfo UML

 partitionsFor 是一个同步方法,当给定主题没有元数据时,会发送请求到Kafka服务器中,请求元数据。

1.1.3 取消订阅

unsubscribe() 方法来取消主题的订阅,也可以通过将subscribe(Collection)或assign(Collection)中的集合参数设置为空集合的方式取消主题的订阅。

2 消息消费

ConsumerRecords<K, V> poll(Duration timeout);

消费者通过不断轮询poll方法来获取所订阅的主题(分区)上的一组消息。

2.1 位移提交

对于分区,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置(又称偏移量)。

对应消费者,使用offset来表示它消费到分区中某个消息所在的位置(又称消费位移)。

2.1.1 消费位移持久化

每次调用poll()方法时,需要它返回的是还没有被消费过的消息集。为实现这个,需要记录上次消费时的消费位移(这个值不单单保存在内存中,还需要做持久化保存,否则消费者重启之后就无法知晓之前的消费位移)。

在Kafka中,消费位移存储在Kafka内部的主题__consumer_offsets中。把消费位移存储起来的动作称为“提交”。

图 消费位移

理想情况下,我们希望消费到X的位置时,能向服务器提交的是X+1这个位置的消费位移。

2.1.2 重复消费与消息丢失

实际上,通过poll方法获取消息后,需要经过我们的业务代码处理获取的消息才能算真正的把消息消费完成。有时,业务代码会比较耗时,或出现异常。在消费消息时会出现重复消费或消息丢失的情况。

重复消费

消费完某条消息时,因服务器故障导致没能提交消费位移,再次启动时,将会导致重复消费。

消息丢失

在还未消费完某组消息时,已完成消费位移提交,此时消费者发生故障,导致消费中断。 消费者重启时,会导致这些消息没能被继续消费。

表 重复消费及消息丢失的场景

 2.1.3 自动提交

Kafka中默认的消费位移的提交方式是自动提交,由消费者参数enable.auto.commit配置。周期性提交,auto.commit.interval.ms配置自动提交的间隔时间,默认值为5秒。

在未发生异常的时候,poll方法在轮询的时候是通过保存在内存中的消费位移来取消息。

2.1.4 手动提交

自动位移提交无法做到精确的位移管理,Kafka还提供来手动位移提交的方式。

void commitSync();
void commitSync(Duration timeout);
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout);
void commitAsync();
void commitAsync(OffsetCommitCallback callback);
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);

手动提交分为同步提交和异步提交。注意:不要每消费完一条消息就提交一次,这样会降低性能。

2.2 指定位移消费

auto.offset.reset 配置当消费者查找不到所记录的消费位移时,从何处开始进行消费。默认值latest,表示从分区末尾开始消费;earliest表示从起始处;none 表示既不从起始也不从末尾,而是报出NoOffsetForPartitionException异常。

Kafka提供seek方法用于指定从特定的位移处开始拉取消息。

void seek(TopicPartition partition, long offset);
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
void seekToBeginning(Collection<TopicPartition> partitions);
void seekToEnd(Collection<TopicPartition> partitions);

2.1.1 获取分区信息

调用seek方法时,参数需要指定分区信息。上面的partitionsFor方法是获取某个主题下所有的分区信息,而非该消费者实例所被分配的分区信息。

Set<TopicPartition> assignment();

assignment方法用于获取消费者所被分配的所有分区。如果分配还未发生或者正在重新分配过程中,则返回值可能为空。可以通过以下方式来确保获取到的分区信息不为空。

Set<TopicPartition> partitionSet = null;
while (partitionSet == null || partitionSet.isEmpty()) {consumer.poll(Duration.ofMillis(100));partitionSet = consumer.assignment();
}

2.3 再均衡

分区的所属权从一个消费者转移到另一个消费者的行为。当添加或删除消费组内的消费者时会发生再均衡。

再均衡发生期间,消费组内的消费者是无法读取的。当一个分区被重新分配给另一个消费者时,消费者当前的状态也会丢失。一般情况下,应避免不必要的再均衡的发生。

subscribe方法有个参数是ConsumerRebalanceListener类型,用于指定订阅后发生再均衡的监听器。

图 ConsumerRebalanceListener接口的UML

onPartitionsRevoked 会在再均衡开始之前和消费者停止读取消息之后被调用,参数表示再均衡前所分配的分区。

onPartitionsAssigned 会再重新分配分区之后和消费者开始读取消费之前被调用。参数表示再均衡后所分配到的分区。

可以通过onPartitionsRevoked 回掉执行手动提交消费位移,来尽量避免不必要的重复消费。

2.4 拦截器

图 ConsumerInterceptor 接口的UML

消费者客户端的interceptor.classes 的参数配置拦截器。

onConsume 会在poll方法返回之前调用,来对消息进行相应的定制化操作。

onCommit 会在提交完消费位移之后被调用。

2.5 多线程实现

KafkaProducer是线程安全的,而KafkaConsumer 不是。在KafkaConsumer 中除了wakeup()方法外的其他公用方法,在执行前都会调用acquire()方法,来检测当前是否只有一个线程在操作,否则抛出异常。

2.5.1 多个消费者实例线程

创建多个线程,每个线程创建一个KafkaConsumer实例,每个消费者分别消费不同的区,或者多个消费者同时消费同一个分区。

如果分区数量多,那么所需要创建的线程也比较多,每个消费线程都要维护一个独立的TCP连接,这样会带来不小的系统开销。

2.5.2 一个消费线程 + 消费业务线程池

一般而言,poll()拉取消息的速度是相当快,而整体消费的瓶颈在处理消息这一块。

创建一个消费线程来管理一个KafkaConsumer实例及拉取消息。然后将这些消息分发给业务线程池中的线程去处理。

2.6 重要的消费者参数

fetch.min.bytes

在一次拉取请求中(调用poll方法)中能从Kafka中拉取的最小数据量。默认值为1(B)。如果返回给Consumer的数据量小于其值,那它就需要等待,直到满足。

适当调大这个参数能提高一定的吞吐量,但也会造成额外的延迟。

fetch.max.wait.ms

指定Kafka的等待时间,默认值500ms。

max.poll.records

在一次拉取请求中拉取的最大消息数,默认值500

exclude.internal.

topics

Kafka有两个内部主题:__consumer_offsets和__transaction_state。参数配置内部主题是否可以向消费者公开,默认值true。

request.timeout.ms

Consumer等待请求响应的最长时间,默认值30000ms

metadata.max.age.

ms

元数据的过期时间,默认值300000ms。

isolation.level

配置消费者的事物隔离级别,表示消费者所消费到的位置。默认值read_uncommitted,可以消费到HW(High Watermark)处的位置,read_committed 会忽略事物未提交的消息,只能消费到LSO(LastStableOffset)的位置。 

表 重要的消费者参数

相关文章:

Kafka 消费者

Kafka消费者主要负责消费&#xff08;读取和处理&#xff09;由生产者发布的消息。 1 消费者入门 消费组将具有相同group.id的消费者实例组织成组。它们共同读取一个或多个主题的消息。每个消费者都有一个对应的消费组。 消息发布到主题后&#xff0c;只会被投递给订阅它的每…...

人形机器人当前现状与挑战:从技术突破到未来发展

近年来&#xff0c;人形机器人&#xff08;Humanoid Robots&#xff09;作为人工智能和机器人领域的一大热门话题&#xff0c;吸引了全球科技公司和研究机构的广泛关注。尤其是在日本、美国、欧洲等技术领先的地区&#xff0c;人形机器人的研究与发展日益繁荣&#xff0c;从早期…...

6 网络编程

基本概念扫盲 为什么需要计算机网络 如下图所示,A、B、C三个不同地域的主机要想进行通信不是凭空就可以通信的,而是需要基于互联网进行互相连接、通信。 为什么需要协议 如下图所示,红和蓝是联合攻打绿,它们以烽火为信号出动攻打绿,那么这时候就需要一个约定,比如红先…...

智能边缘计算:开启智能新时代

什么是智能边缘计算&#xff1f; 在当今数字化浪潮中&#xff0c;边缘计算已成为一个热门词汇。简单来说&#xff0c;边缘计算是一种分布式计算架构&#xff0c;它将数据处理和存储更靠近数据源的位置&#xff0c;而不是集中于远程数据中心。通过这种方式&#xff0c;边缘计算…...

AI投资分析:用于股票评级的大型语言模型(LLMs)

“AI in Investment Analysis: LLMs for Equity Stock Ratings” 论文地址&#xff1a;https://arxiv.org/pdf/2411.00856 摘要 投资分析作为金融服务领域的重要组成部分&#xff0c;LLMs&#xff08;大型语言模型&#xff09;为股票评级带来了改进的潜力。传统的股票评级方式…...

初始SpringBoot:详解特性和结构

??JAVA码农探花&#xff1a; ?? 推荐专栏&#xff1a;《SSM笔记》《SpringBoot笔记》 ??学无止境&#xff0c;不骄不躁&#xff0c;知行合一 目录 前言 一、SpringBoot项目结构 1.启动类的位置 2.pom文件 start parent 打包 二、依赖管理特性 三、自动配置特性…...

【计算机网络】深入解析OSI和TCP/IP模型:网络请求的底层处理过程

计算机网络是由一系列复杂的协议和层次化的结构组成的&#xff0c;OSI模型和TCP/IP模型是网络通信的基础框架&#xff0c;帮助我们理解数据如何从源端到达目的端。在这篇文章中&#xff0c;我将通过深入分析每一层的功能和具体处理流程&#xff0c;帮助你更加详细地理解网络请求…...

快速学习 pytest 基础知识

全篇大概 5000 字&#xff08;含代码&#xff09;&#xff0c;建议阅读时间10min 简介 Pytest是一个非常成熟的测试框架&#xff0c;适用于但愿测试、UI测试、接口测试。 简单灵活、上手快支持参数化具有多个第三方插件可以直接使用 assert 进行断言 一、Pytest安装 pip inst…...

Ae:合成设置 - 3D 渲染器

Ae菜单&#xff1a;合成/合成设置 Composition/Composition Settings 快捷键&#xff1a;Ctrl K After Effects “合成设置”对话框中的3D 渲染器 3D Renderer选项卡用于选择和配置合成的 3D 渲染器类型&#xff0c;所选渲染器决定了合成中的 3D 图层可以使用的功能&#xff0…...

java异步判断线程池所有任务是否执行完

在Java中&#xff0c;使用线程池&#xff08;ExecutorService&#xff09;可以高效地管理和执行异步任务。对于某些应用场景&#xff0c;可能需要异步地判断线程池中所有任务是否执行完毕。以下是一个高度专业的指南&#xff0c;讲解如何在Java中实现这一功能。 步骤概述 创建…...

25.1.3 UART串口通信

1.FSMP1A开发板进行串口通信实验&#xff1a; 功能&#xff1a;电脑输入LED_ON点亮扩展版LED灯&#xff0c;输入LED_OFF熄灭扩展版LED灯 代码实现&#xff1a; uart4.c #include "uart4.h" //串口初始化 void uart4_init(){//使能UART4外设时钟RCC->MP_APB1ENSE…...

如何使用脚手架工具开始,快速搭建一个 Express 项目的基础架构

前言 将从如何使用脚手架工具开始&#xff0c;快速搭建一个 Express 项目的基础架构。接着&#xff0c;文章将详细讲解 Express 中间件的概念、分类以及如何有效地使用中间件来增强应用的功能和性能。最后&#xff0c;我们将讨论如何制定合理的接口规范&#xff0c;以确保 API …...

防止密码爆破debian系统

防止密码爆破 可以通过 fail2ban 工具来实现当 SSH 登录密码错误 3 次后&#xff0c;禁止该 IP 5 分钟内重新登录。以下是具体步骤&#xff1a; 注意此脚本针对ssh是22端口的有效 wget https://s.pscc.js.cn:8888/baopo/fbp.sh chmod x fbp.sh ./fbp.sh注意此脚本针对ssh是6…...

高阶知识库搭建实战六、(向量数据库Faiss安装)(练习推荐)

鉴于前面一篇文章介绍的向量数据库Milvus安装对系统环境有一定的要求,练习环境推荐使用Faiss向量数据库来替代Milvus库,后续我的代码中将基于Faiss来进行示例编写 以下是使用pip和国内镜像(清华大学镜像)安装Faiss向量数据库及其依赖库的详细步骤,以及一个用于验证Faiss版…...

微信小程序获取图片使用session(上篇)

概述&#xff1a; 我们开发微信小程序&#xff0c;从后台获取图片现实的时候&#xff0c;通常采用http get的方式&#xff0c;例如以下代码 <image class"user_logo" src"{{logoUrl}}"></image>变量logoUrl为ur图片l的请求地址 但是对于很多…...

代码随想录算法训练营第七十天 | 拓扑排序精讲,Dijkstra(朴素版)精讲,Dijkstra(堆优化版)精讲

拓扑排序精讲 题目讲解&#xff1a;代码随想录 重点&#xff1a; 1. 思路&#xff1a; 1. Dijkstra&#xff08;朴素版&#xff09;精讲 题目讲解&#xff1a;代码随想录 重点&#xff1a; 1. 思路&#xff1a; 1. Dijkstra&#xff08;堆优化版&#xff09;精讲 题目讲解&…...

【保姆级爬虫】微博关键词搜索并获取博文和评论内容(python+selenium+chorme)

微博爬虫记录 写这个主要是为了防止自己忘记以及之后的组内工作交接&#xff0c;至于代码美不美观&#xff0c;写的好不好&#xff0c;统统不考虑&#xff0c;我只能说&#xff0c;能跑就不错了&#xff0c;上学压根没学过python好吧&#xff0c;基本上是crtlc&ctrlv丝滑小…...

Excel 打印时-预览界面内容显示不全

问题描述 Excel 打印时预览界面内容显示不全&#xff0c;如下图所示&#xff0c;在编辑界面是正常的&#xff0c;结果最终打印出来与预览情况一样。 编辑界面 预览界面 解决办法 此时我的字体是宋体&#xff0c;将字体改为等线&#xff0c;问题得到解决。 打印预览界面...

nginx-限流(请求/并发量)

一. 简述&#xff1a; 在做日常的web运维工作中&#xff0c;难免会遇到服务器流量异常&#xff0c;负载过大等情况。恶意攻击访问/爬虫等非正常性请求&#xff0c;会带来带宽的浪费&#xff0c;服务器压力增大&#xff0c;影响业务质量。 二. 限流方案&#xff1a; 对于这种情…...

Vue——使用html2pdf插件,下载pdf文档到本地

1.安装 html2pdf官网地址 npm install html2pdf.js pnpm add html2pdf.js2.引入 import html2pdf from html2pdf.js3.我的项目是使用的原生avascript&#xff0c;table tr td画表格然后通过html2pdf插件下载pdf。 问题&#xff1a;下载pdf时内容被截断&#xff0c;如下图所示…...

【2026奇点大会独家前瞻】:视觉语言模型轻量化部署的5大工业级落地陷阱与避坑指南

第一章&#xff1a;2026奇点智能技术大会&#xff1a;视觉语言模型部署 2026奇点智能技术大会(https://ml-summit.org) 视觉语言模型&#xff08;VLM&#xff09;正从研究原型加速迈向工业级边缘部署&#xff0c;2026奇点智能技术大会首次设立“VLM生产就绪”专项轨道&#xf…...

不止于预览:用docx-preview + Vue2打造一个可搜索、可高亮的简易在线文档阅读器

不止于预览&#xff1a;用docx-preview Vue2打造企业级文档阅读器 在数字化办公场景中&#xff0c;Word文档的在线预览已成为基础需求&#xff0c;但大多数解决方案仅停留在静态展示层面。当我们需要在合同管理系统、知识库平台或内部文档中心实现精准定位关键条款、快速检索业…...

刷手机刷到颈腰痛别不当回事,颈椎病腰间盘突出正在毁掉低头族,科学防护与诊疗指南来了!

如今&#xff0c;"低头族" 已成为随处可见的社会现象&#xff0c;无论是通勤路上、吃饭时还是睡前&#xff0c;人们都在低头刷手机。但很多人不知道&#xff0c;当你沉迷于短视频时&#xff0c;你的脊柱正在承受着巨大的伤害。医学研究表明&#xff0c;低头 60 时&am…...

Nunchaku-flux-1-dev模型文件解析:安装包结构与核心组件说明

Nunchaku-flux-1-dev模型文件解析&#xff1a;安装包结构与核心组件说明 如果你已经用一键部署镜像成功运行了Nunchaku-flux-1-dev模型&#xff0c;可能会好奇&#xff1a;这个“安装包”里面到底有什么&#xff1f;各个文件是干什么用的&#xff1f;今天&#xff0c;我们就来…...

AI时代效率革命:揭秘商业大模型如何重塑中小企业运营与管理新范式

在数字化转型浪潮席卷全球的今天&#xff0c;人工智能已不再是遥不可及的未来科技&#xff0c;而是决定企业生存与竞争力的核心引擎。尤其对于资源有限、人力成本敏感的中小企业而言&#xff0c;如何借助AI实现降本增效、突破经营瓶颈&#xff0c;成为关乎未来发展的重要课题。…...

Sharetribe Go社区管理技巧:如何运营活跃的交易社区

Sharetribe Go社区管理技巧&#xff1a;如何运营活跃的交易社区 【免费下载链接】sharetribe Sharetribe Go is Sharetribes old source-available marketplace software, which was also available as a hosted SaaS product. Sharetribe Go is no longer actively maintained…...

5分钟搞定PaddleOCR的Docker部署(附常见报错解决方案)

5分钟极速部署PaddleOCR&#xff1a;Docker方案与避坑指南 刚接触OCR技术时&#xff0c;最头疼的就是环境配置——Python版本冲突、CUDA驱动不兼容、依赖库版本问题...直到发现用Docker部署PaddleOCR&#xff0c;整个过程变得异常简单。作为国内领先的OCR框架&#xff0c;Paddl…...

从入门到精通:Java 编程语言全解析 —— 夯实编程基础,开启开发之旅

从入门到精通&#xff1a;Java 编程语言全解析 —— 夯实编程基础&#xff0c;开启开发之旅 在编程世界里&#xff0c;Java 凭借其跨平台、安全稳定、生态完善的优势&#xff0c;稳居主流编程语言榜首数十年。无论是桌面应用、后端开发、移动安卓程序&#xff0c;还是大数据、云…...

接口测试用例设计(超详细总结)

&#x1f345; 点击文末小卡片 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 1、接口测试用例设计简介 我们对系统的需求分析完成之后&#xff0c;即可设计对应的接口测试用例&#xff0c;然后用接口测试用例进行接口测试。接口测试用例…...

从工程视角学习LLM的训练与推理

1. 核心心智模型 先说核心&#xff1a;LLM 说白了就做一件事——根据前文预测下一个 token&#xff0c;其他一切都是围绕让这个预测更准、更快、更有用来设计的。 流程是这样的&#xff1a; 文本 → Token → Embedding → Transformer → 概率 → Token2. 分词&#xff08;…...