当前位置: 首页 > news >正文

SpringBoot 处理 @KafkaListener 消息

消息监听容器

1、KafkaMessageListenerContainer

由spring提供用于监听以及拉取消息,并将这些消息按指定格式转换后交给由@KafkaListener注解的方法处理,相当于一个消费者;

看看其整体代码结构:

可以发现其入口方法为doStart(), 往上追溯到实现了SmartLifecycle接口,很明显,由spring管理其start和stop操作;

ListenerConsumer, 内部真正拉取消息消费的是这个结构,其 实现了Runable接口,简言之,它就是一个后台线程轮训拉取并处理消息(while true死循环拉取消息)。

在doStart方法中会创建ListenerConsumer并交给线程池处理

以上步骤就开启了消息监听过程。

KafkaMessageListenerContainer#doStart
protected void doStart() {if (isRunning()) {return;}ContainerProperties containerProperties = getContainerProperties();if (!this.consumerFactory.isAutoCommit()) {AckMode ackMode = containerProperties.getAckMode();if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");}if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))&& containerProperties.getAckTime() == 0) {containerProperties.setAckTime(5000);}}Object messageListener = containerProperties.getMessageListener();Assert.state(messageListener != null, "A MessageListener is required");if (containerProperties.getConsumerTaskExecutor() == null) {SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-C-");containerProperties.setConsumerTaskExecutor(consumerExecutor);}Assert.state(messageListener instanceof GenericMessageListener, "Listener must be a GenericListener");this.listener = (GenericMessageListener<?>) messageListener;ListenerType listenerType = ListenerUtils.determineListenerType(this.listener);if (this.listener instanceof DelegatingMessageListener) {Object delegating = this.listener;while (delegating instanceof DelegatingMessageListener) {delegating = ((DelegatingMessageListener<?>) delegating).getDelegate();}listenerType = ListenerUtils.determineListenerType(delegating);}// 这里创建了监听消费者对象this.listenerConsumer = new ListenerConsumer(this.listener, listenerType);setRunning(true);// 将消费者对象放入到线程池中执行this.listenerConsumerFuture = containerProperties.getConsumerTaskExecutor().submitListenable(this.listenerConsumer);}
KafkaMessageListenerContainer.ListenerConsumer#run
public void run() {this.consumerThread = Thread.currentThread();if (this.genericListener instanceof ConsumerSeekAware) {((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);}if (this.transactionManager != null) {ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId);}this.count = 0;this.last = System.currentTimeMillis();if (isRunning() && this.definedPartitions != null) {try {initPartitionsIfNeeded();}catch (Exception e) {this.logger.error("Failed to set initial offsets", e);}}long lastReceive = System.currentTimeMillis();long lastAlertAt = lastReceive;while (isRunning()) {try {if (!this.autoCommit && !this.isRecordAck) {processCommits();}processSeeks();if (!this.consumerPaused && isPaused()) {this.consumer.pause(this.consumer.assignment());this.consumerPaused = true;if (this.logger.isDebugEnabled()) {this.logger.debug("Paused consumption from: " + this.consumer.paused());}publishConsumerPausedEvent(this.consumer.assignment());}// 拉取信息ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());this.lastPoll = System.currentTimeMillis();if (this.consumerPaused && !isPaused()) {if (this.logger.isDebugEnabled()) {this.logger.debug("Resuming consumption from: " + this.consumer.paused());}Set<TopicPartition> paused = this.consumer.paused();this.consumer.resume(paused);this.consumerPaused = false;publishConsumerResumedEvent(paused);}if (records != null && this.logger.isDebugEnabled()) {this.logger.debug("Received: " + records.count() + " records");if (records.count() > 0 && this.logger.isTraceEnabled()) {this.logger.trace(records.partitions().stream().flatMap(p -> records.records(p).stream())// map to same format as send metadata toString().map(r -> r.topic() + "-" + r.partition() + "@" + r.offset()).collect(Collectors.toList()));}}if (records != null && records.count() > 0) {if (this.containerProperties.getIdleEventInterval() != null) {lastReceive = System.currentTimeMillis();}invokeListener(records);}else {if (this.containerProperties.getIdleEventInterval() != null) {long now = System.currentTimeMillis();if (now > lastReceive + this.containerProperties.getIdleEventInterval()&& now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {publishIdleContainerEvent(now - lastReceive, this.isConsumerAwareListener? this.consumer : null, this.consumerPaused);lastAlertAt = now;if (this.genericListener instanceof ConsumerSeekAware) {seekPartitions(getAssignedPartitions(), true);}}}}}catch (WakeupException e) {// Ignore, we're stopping}catch (NoOffsetForPartitionException nofpe) {this.fatalError = true;ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);break;}catch (Exception e) {handleConsumerException(e);}}ProducerFactoryUtils.clearConsumerGroupId();if (!this.fatalError) {if (this.kafkaTxManager == null) {commitPendingAcks();try {this.consumer.unsubscribe();}catch (WakeupException e) {// No-op. Continue process}}}else {ListenerConsumer.this.logger.error("No offset and no reset policy; stopping container");KafkaMessageListenerContainer.this.stop();}this.monitorTask.cancel(true);if (!this.taskSchedulerExplicitlySet) {((ThreadPoolTaskScheduler) this.taskScheduler).destroy();}this.consumer.close();this.logger.info("Consumer stopped");}

2、ConcurrentMessageListenerContainer

并发消息监听,相当于创建消费者;其底层逻辑仍然是通过KafkaMessageListenerContainer实现处理;从实现上看就是在KafkaMessageListenerContainer上做了层包装,有多少的concurrency就创建多个KafkaMessageListenerContainer,也就是concurrency个消费者。

	protected void doStart() {if (!isRunning()) {ContainerProperties containerProperties = getContainerProperties();TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();if (topicPartitions != null&& this.concurrency > topicPartitions.length) {this.logger.warn("When specific partitions are provided, the concurrency must be less than or "+ "equal to the number of partitions; reduced from " + this.concurrency + " to "+ topicPartitions.length);this.concurrency = topicPartitions.length;}setRunning(true);// 创建多个消费者for (int i = 0; i < this.concurrency; i++) {KafkaMessageListenerContainer<K, V> container;if (topicPartitions == null) {container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,containerProperties);}else {container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,containerProperties, partitionSubset(containerProperties, i));}String beanName = getBeanName();container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);if (getApplicationEventPublisher() != null) {container.setApplicationEventPublisher(getApplicationEventPublisher());}container.setClientIdSuffix("-" + i);container.setAfterRollbackProcessor(getAfterRollbackProcessor());container.start();this.containers.add(container);}}}

3、@KafkaListener底层监听原理

上面已经介绍了KafkaMessageListenerContainer的作用是拉取并处理消息,但还缺少关键的一步,即 如何将我们的业务逻辑与KafkaMessageListenerContainer的处理逻辑联系起来?

那么这个桥梁就是@KafkaListener注解

KafkaListenerAnnotationBeanPostProcessor, 从后缀BeanPostProcessor就可以知道这是Spring IOC初始化bean相关的操作,当然这里也是;此类会扫描带@KafkaListener注解的类或者方法,通过 KafkaListenerContainerFactory工厂创建对应的KafkaMessageListenerContainer,并调用start方法启动监听,也就是这样打通了这条路…

4、Spring Boot 自动加载kafka相关配置

1、KafkaAutoConfiguration
自动生成kafka相关配置,比如当缺少这些bean的时候KafkaTemplate、ProducerListener、ConsumerFactory、ProducerFactory等,默认创建bean实例

2、KafkaAnnotationDrivenConfiguration
主要是针对于spring-kafka提供的注解背后的相关操作,比如 @KafkaListener;

在开启了@EnableKafka注解后,spring会扫描到此配置并创建缺少的bean实例,比如当配置的工厂beanName不是kafkaListenerContainerFactory的时候,就会默认创建一个beanName为kafkaListenerContainerFactory的实例,这也是为什么在springboot中不用定义consumer的相关配置也可以通过@KafkaListener正常的处理消息

5、消息处理

1、单条消息处理

@Configuration
public class KafkaConsumerConfiguration {@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaCustomizeContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(2);factory.getContainerProperties().setPollTimeout(3000);return factory;}private ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}private Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bizConfig.getReconciliationInstanceKafkaServers());props.put(ConsumerConfig.GROUP_ID_CONFIG, bizConfig.getReconciliationInstanceKafkaConsumerGroupId());props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 300);// poll 一次拉取的阻塞的最大时长,单位:毫秒。这里指的是阻塞拉取需要满足至少 fetch-min-size 大小的消息props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 10000);return props;}}

这种方式的@KafkaLisener中的参数是单条的。

2、批量处理

@Configuration
@EnableKafka
public class KafkaConfig {@Bean
public KafkaListenerContainerFactory<?, ?> batchFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());// 增加开启批量处理factory.setBatchListener(true);  // <<<<<<<<<<<<<<<<<<<<<<<<<return factory;
}@Beanpublic ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());...return props;}
}// 注意:这里接受的是集合类型
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {...
}

这种方式的@KafkaLisener中的参数是多条的。

6、线程池相关

如果没有额外给Kafka指定线程池,底层默认用的是SimpleAsyncTaskExecutor类,它不使用线程池,而是为每个任务创建新线程。相当于一个消费者用一个独立的线程来跑。

总结

spring为了将kafka融入其生态,方便在spring大环境下使用kafka,开发了spring-kafa这一模块,本质上是为了帮助开发者更好的以spring的方式使用kafka

@KafkaListener就是这么一个工具,在同一个项目中既可以有单条的消息处理,也可以配置多条的消息处理,稍微改变下配置即可实现,很是方便

当然,@KafkaListener单条或者多条消息处理仍然是spring自行封装处理,与kafka-client客户端的拉取机制无关;比如一次性拉取50条消息,对于单条处理来说就是循环50次处理,而多条消息处理则可以一次性处理50条;本质上来说这套逻辑都是spring处理的,并不是说单条消费就是通过kafka-client一次只拉取一条消息

在使用过程中需要注意spring自动的创建的一些bean实例,当然也可以覆盖其自动创建的实例以满足特定的需求场景。

                        
原文链接:https://blog.csdn.net/yuechuzhixing/article/details/124725713

相关文章:

SpringBoot 处理 @KafkaListener 消息

消息监听容器 1、KafkaMessageListenerContainer 由spring提供用于监听以及拉取消息&#xff0c;并将这些消息按指定格式转换后交给由KafkaListener注解的方法处理&#xff0c;相当于一个消费者&#xff1b; 看看其整体代码结构&#xff1a; 可以发现其入口方法为doStart(),…...

Spring Boot-API版本控制问题

在现代软件开发中&#xff0c;API&#xff08;应用程序接口&#xff09;版本控制是一项至关重要的技术。随着应用的不断迭代&#xff0c;API 的改动不可避免&#xff0c;如何在引入新版本的同时保证向后兼容&#xff0c;避免对现有用户的影响&#xff0c;是每个开发者需要考虑的…...

Git 提取和拉取的区别在哪

1. 提取&#xff08;Fetch&#xff09; 操作说明&#xff1a;Fetch 操作会从远程仓库下载最新的提交、分支信息等&#xff0c;但不会将这些更改合并到你当前的分支中。它只是将远程仓库的更新信息存储在本地&#xff0c;并不会自动修改你当前的工作区。 使用场景&#xff1a; …...

【数据结构与算法 | 每日一题 | 力扣篇】力扣2390, 2848

1. 力扣2390&#xff1a;从字符串中删除星号 1.1 题目&#xff1a; 给你一个包含若干星号 * 的字符串 s 。 在一步操作中&#xff0c;你可以&#xff1a; 选中 s 中的一个星号。移除星号 左侧 最近的那个 非星号 字符&#xff0c;并移除该星号自身。 返回移除 所有 星号之…...

破解信息架构实施的密码:常见挑战与最佳解决方案全指南

信息架构的成功实施是企业数字化转型的关键步骤&#xff0c;但在实际操作中&#xff0c;企业往往会遇到各种复杂的挑战。这些挑战包括 技术整合的难度、数据管理的复杂性、合规性要求的变化 以及 资源限制 等。《信息架构&#xff1a;商业智能&分析与元数据管理参考模型》为…...

CodeChef Starters 151 (Div.2) A~D

codechef是真敢给分&#xff0c;上把刚注册&#xff0c;这把就div2了&#xff0c;再加上一周没打过还是有点不适应的&#xff0c;好在最后还是能够顺利上分 今天的封面是P3R的设置菜单 我抠出来做我自己的游戏主页了&#xff08; A - Convert string 题意 在01串里面可以翻转…...

Redis学习——数据不一致怎么办?更新缓存失败了又怎么办?

文章目录 引言正文读写缓存的数据一致性只读缓存的数据一致性删除和修改数据不一致问题操作执行失败导致数据不一致解决办法 多线程访问导致数据不一致问题总结 总结参考信息 引言 最近面试快手的时候被问到了缓存不一致怎么解决&#xff1f;一开始还是很懵的&#xff0c;因为…...

跨境电商代购新纪元:一键解锁全球好物,系统流程全揭秘

添加图片注释&#xff0c;不超过 140 字&#xff08;可选&#xff09; 在全球化日益加深的今天&#xff0c;跨境电商代购成为了连接消费者与世界各地优质商品的桥梁。本文将在CSDN平台上&#xff0c;深入剖析跨境电商代购系统的功能流程&#xff0c;带您一窥其背后的技术奥秘与…...

Mac 上终端使用 MySql 记录

文章目录 下载安装终端进入 MySql常用操作查看数据库选择一个数据库查看当前选择的数据库Navcat 打开提示报错参考文章 下载安装 先下载社区版的 MySql 安装的过程需要设置 root 的密码&#xff0c;这个是要进入数据库所设定的&#xff0c;所以要记住 终端进入 MySql 首先输…...

461. 汉明距离

一&#xff1a;题目&#xff1a; 两个整数之间的 汉明距离 指的是这两个数字对应二进制位不同的位置的数目。 给你两个整数 x 和 y&#xff0c;计算并返回它们之间的汉明距离。 示例 1&#xff1a; 输入&#xff1a;x 1, y 4 输出&#xff1a;2 解释&#xff1a; 1 (0 0…...

开发指南061-nexus权限管理

平台后台服务的核心是组件&#xff0c;管理组件的软件有&#xff1a; Apache的Archiva、JFrog的Artifactory、Sonatype的Nexus。 本平台选择nexus。nexus的权限模型是用户-角色-权限体系&#xff1a;通过组合权限定义角色&#xff0c;通过给用户赋角色来赋权限。有关nexus的权…...

Qt 弹出菜单右键菜单 QMenu 设置不同颜色的子项

概述 在Qt中&#xff0c;可以使用样式表&#xff08;StyleSheet&#xff09;来自定义 QMenu 的外观&#xff0c;包括其子项&#xff08;如菜单项QAction&#xff09;的颜色。但是&#xff0c;这通常可以设置 QMenu 的整体样式&#xff0c;而不能单独设置某个子项的颜色。不过&…...

Git换行符自动转换参数core.autocrlf的用法

core.autocrlf 是 Git 中用于控制换行符自动转换的配置选项。它有以下几个可能的值&#xff1a; 1. true 作用&#xff1a;在 checkin 时将 CRLF 转换为 LF&#xff0c;在 checkout 时将 LF 转换为 CRLF。适用场景&#xff1a;适用于 Windows 用户&#xff0c;希望在本地文件…...

C语言的结构体类型

在我们使用C语言进行编写代码时&#xff0c;常常会使用已经给定的类型来创建变量&#xff0c;比如int型&#xff0c;char型&#xff0c;double型等&#xff0c;而当我们想创建一些较为复杂的东西时&#xff0c;单单用一个类型变量是没办法做到的&#xff0c;比如我们想创建一个…...

illustrator 收集字体插件VBscript

这是早些年从俄罗斯网站上看到的一个收集字体插件,语言是用VBscript写的,能用,但个别字体不能收集完成,现在Adobe也在illustrator中加入了收集字体打包功能,所以这个也很少用啦。 使用方法: 下好插件,或把下面的代码存入到本地侯后缀名改为.vbs,然后把.ai文件往.vbs文…...

【LLM多模态】文生视频评测基准VBench

note VBench的16个维度自动化评估指标代码实践&#xff08;待完成&#xff09;16个维度的prompt举例人类偏好标注&#xff1a;计算VBench评估结果与人类偏好之间的相关性、用于DPO微调 文章目录 note一、相关背景二、VBench评测基准概述&#xff1a;论文如何解决这个问题&…...

通过覆写 url_for 将 flask 应用部署到子目录下

0. 缘起 最近用 flask 写了一个 web 应用&#xff0c;需要部署到服务器上。而服务器主域名已经被使用了&#xff0c;只能给主域名加个子目录进行部署&#xff0c;比如主域名 example.org &#xff0c;我需要在 example.org/flask 下部署。这时 flask 应用里的内部连接们就出现…...

攻防世界---->埃尔隆德32

做题笔记。 下载 查壳。 32ida 打开。 发现就一个判断。 跟进看看。 // 首次a20 int __cdecl sub_8048414(_BYTE *a1, int a2) {int result; // eaxswitch ( a2 ){case 0:if ( *a1 105 )goto LABEL_19;result 0;break;case 1:if ( *a1 101 ) // e…...

redis短信登录模型

基于Session实现登录 &#xff0c;...

【React】React18.2.0核心源码解读

前言 本文使用 React18.2.0 的源码&#xff0c;如果想回退到某一版本执行git checkout tags/v18.2.0即可。如果打开源码发现js文件报ts类型错误请看本人另一篇文章&#xff1a;VsCode查看React源码全是类型报错如何解决。 阅读源码的过程&#xff1a; 下载源码 观察 package…...

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式

一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明&#xff1a;假设每台服务器已…...

[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?

论文网址&#xff1a;pdf 英文是纯手打的&#xff01;论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误&#xff0c;若有发现欢迎评论指正&#xff01;文章偏向于笔记&#xff0c;谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...

【ROS】Nav2源码之nav2_behavior_tree-行为树节点列表

1、行为树节点分类 在 Nav2(Navigation2)的行为树框架中,行为树节点插件按照功能分为 Action(动作节点)、Condition(条件节点)、Control(控制节点) 和 Decorator(装饰节点) 四类。 1.1 动作节点 Action 执行具体的机器人操作或任务,直接与硬件、传感器或外部系统…...

生成 Git SSH 证书

&#x1f511; 1. ​​生成 SSH 密钥对​​ 在终端&#xff08;Windows 使用 Git Bash&#xff0c;Mac/Linux 使用 Terminal&#xff09;执行命令&#xff1a; ssh-keygen -t rsa -b 4096 -C "your_emailexample.com" ​​参数说明​​&#xff1a; -t rsa&#x…...

ETLCloud可能遇到的问题有哪些?常见坑位解析

数据集成平台ETLCloud&#xff0c;主要用于支持数据的抽取&#xff08;Extract&#xff09;、转换&#xff08;Transform&#xff09;和加载&#xff08;Load&#xff09;过程。提供了一个简洁直观的界面&#xff0c;以便用户可以在不同的数据源之间轻松地进行数据迁移和转换。…...

Java 加密常用的各种算法及其选择

在数字化时代&#xff0c;数据安全至关重要&#xff0c;Java 作为广泛应用的编程语言&#xff0c;提供了丰富的加密算法来保障数据的保密性、完整性和真实性。了解这些常用加密算法及其适用场景&#xff0c;有助于开发者在不同的业务需求中做出正确的选择。​ 一、对称加密算法…...

DBAPI如何优雅的获取单条数据

API如何优雅的获取单条数据 案例一 对于查询类API&#xff0c;查询的是单条数据&#xff0c;比如根据主键ID查询用户信息&#xff0c;sql如下&#xff1a; select id, name, age from user where id #{id}API默认返回的数据格式是多条的&#xff0c;如下&#xff1a; {&qu…...

CVE-2020-17519源码分析与漏洞复现(Flink 任意文件读取)

漏洞概览 漏洞名称&#xff1a;Apache Flink REST API 任意文件读取漏洞CVE编号&#xff1a;CVE-2020-17519CVSS评分&#xff1a;7.5影响版本&#xff1a;Apache Flink 1.11.0、1.11.1、1.11.2修复版本&#xff1a;≥ 1.11.3 或 ≥ 1.12.0漏洞类型&#xff1a;路径遍历&#x…...

从面试角度回答Android中ContentProvider启动原理

Android中ContentProvider原理的面试角度解析&#xff0c;分为​​已启动​​和​​未启动​​两种场景&#xff1a; 一、ContentProvider已启动的情况 1. ​​核心流程​​ ​​触发条件​​&#xff1a;当其他组件&#xff08;如Activity、Service&#xff09;通过ContentR…...

「全栈技术解析」推客小程序系统开发:从架构设计到裂变增长的完整解决方案

在移动互联网营销竞争白热化的当下&#xff0c;推客小程序系统凭借其裂变传播、精准营销等特性&#xff0c;成为企业抢占市场的利器。本文将深度解析推客小程序系统开发的核心技术与实现路径&#xff0c;助力开发者打造具有市场竞争力的营销工具。​ 一、系统核心功能架构&…...