@KafkaListener指定kafka集群
基于@KafkaListener注解的kafka监听代码可以手动指定要消费的kafka集群,这对于需要访问多套kafka集群的程序来说,是有效的解决方案。这里需要注意的是,此时的消费者配置信息需使用原生kafka的配置信息格式(如:ConsumerConfig.MAX_POLL_RECORDS_CONFIG = “max.poll.records”),与自动装载KafkaConsumer时的配置信息格式不同。详情如下:
依赖项(其实spring-kafka包含了kafka-clients)
<!-- spring-kafka -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.6.0</version>
</dependency>
<!-- kafka-clients -->
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.6.0</version>
</dependency>
配置文件
配置参数的格式和含义,参见《spring-kafka的配置使用》
生产代码
@Component
@Slf4j
public class KafKaProducer {@Autowiredprivate KafkaTemplate kafkaTemplate;public void sendMessage(String topic, Object object) {/** 这里的 ListenableFuture 类是 spring 对 java 原生 Future 的扩展增强,是一个泛型接口,用于监听异步方法的回调 而对于* kafka send 方法返回值而言,这里的泛型所代表的实际类型就是 SendResult<K, V>,而这里 K,V 的泛型实际上 被用于* ProducerRecord<K, V> producerRecord,即生产者发送消息的 key,value 类型*/ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object);future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable throwable) {log.error("发送消息失败:" + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> sendResult){// log.info("发送消息成功:" + sendResult.toString());}});}
}
消费者配置类,其中可配置多个kafka集群,每个kafka集群生成一个KafkaListenerContainerFactory实例
@Data
@Slf4j
@Configuration
public class KafkaConfig {@ResourceEnvironment environment;@Beanpublic KafkaListenerContainerFactory<?> containerFactory() {Integer concurrency = environment.getProperty("kafka.concurrency", Integer.class, 1);Integer pollTimeout = environment.getProperty("kafka.poll.timeout", Integer.class, 3000);ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(this.consumerConfigs()));containerFactory.setConcurrency(concurrency); // 消费并发数量containerFactory.setBatchListener(true); // 批量监听消息containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); // 批量提交偏移containerFactory.getContainerProperties().setPollTimeout(pollTimeout); // 消息拉取时限return containerFactory;}@Beanpublic Map<String, Object> consumerConfigs() {String servers = environment.getProperty("kafka.servers", "127.0.0.1:9092");String groupId = environment.getProperty("kafka.groupId", "consumer-group");String sessionTimeout = environment.getProperty("kafka.session.timeout.ms", "60000");String maxPollRecords = environment.getProperty("kafka.max.poll.records", "100");String maxPollInterval = environment.getProperty("kafka.max.poll.interval", "600000");String jaasConfig = environment.getProperty("kafka.sasl.jaas.config");Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);/// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "SCRAM-SHA-256");props.put("sasl.jaas.config", jaasConfig);return props;}
}
消费代码 @KafkaListener注解的containerFactory参数指定了KafkaListenerContainerFactory实例,也就指定了kafka集群
@Slf4j
@Component
public class KafkaConsumerListen implements BatchMessageListener<String, String> {@Autowiredprivate Environment environment;@Autowiredprivate KafkaMsgHandleService msgHandleService;@Autowiredprivate ThreadPoolTaskExecutor taskExecutor;/************************* 接收消息************************/@Override@KafkaListener( containerFactory = "containerFactory", groupId = "${kafka.groupId}", topics = "#{'${kafka.topics}'.split(',')}", concurrency = "${kafka.concurrency}")public void onMessage(List<ConsumerRecord<String, String>> records) {try {final List<String> msgs = records.stream().map(ConsumerRecord::value).collect(Collectors.toList());log.info("收到消息体:size={} content:{}", msgs.size(), JSON.toJSONString(msgs));/// 处理消息msgs.forEach(this::processRecord);} catch (Exception e) {log.error("KafkaListener_kafka_consume_error.", e);}}/************************* 处理消息************************/private void processRecord(String msg) {taskExecutor.submit(() -> {if (!environment.getProperty("kafka1.switch", Boolean.class,true)) {log.warn("KafkaListener_turn_off_drop_message.");return;}msgHandleService.handle(msg);});}
}
相关文章:
@KafkaListener指定kafka集群
基于KafkaListener注解的kafka监听代码可以手动指定要消费的kafka集群,这对于需要访问多套kafka集群的程序来说,是有效的解决方案。这里需要注意的是,此时的消费者配置信息需使用原生kafka的配置信息格式(如:ConsumerC…...
什么是算法的空间复杂度?
一、问题 常常⽤算法的空间复杂度来评价算法的性能,那么什么是算法的空间复杂度呢? 二、解答 算法的空间复杂度是指在算法的执⾏过程中,需要的辅助空间数量。 辅助空间数量指的不是程序指令、常数、指针等所需要的存储空间,也不是…...
WebDav协议相关软件@简单配置局域网内的http和WebDav服务器和传输系统
文章目录 相关软件windows自带第三方软件 chfs(CuteHttpFileServer)下载软件GUI方案 补充命令行方案命令行程序定位简单创建服务站点使用配置文件配置细节 使用软连接或符号链接等手段将向共享站点的根目录添加文件开机自启服务包装nssm包装使用powershell包装 服务启动chfs服务…...
自定义数据实现SA3D
SA3D:Segment Anything in 3D with NeRFs 实现了3D目标分割 原理是利用SAM(segment anything) 模型和Nerf分割渲染3D目标, SAM只能分块,是没有语义标签的,如何做到语义连续? SA3D中用了self-prompt, 根据前一帧的mask…...
设计模式基础概念:探索设计模式的魅力
设计模式是软件开发中的一种指导性概念,它提供了一套被广泛接受的解决方案,用于常见的设计问题。设计模式有助于提高软件的可重用性、可扩展性和可维护性,并促进团队之间的沟通。 以下是一些常见的设计模式: 创建型模式࿱…...
【Leetcode】2182. 构造限制重复的字符串
文章目录 题目思路代码 题目 2182. 构造限制重复的字符串 问题:给你一个字符串 s 和一个整数 repeatLimit ,用 s 中的字符构造一个新字符串 repeatLimitedString ,使任何字母 连续 出现的次数都不超过 repeatLimit 次。你不必使用 s 中的全…...
Kubernetes(K8S)云服务器实操TKE
一、 Kubernetes(K8S)简介 Kubernetes源于希腊语,意为舵手,因为首尾字母中间正好有8个字母,简称为K8S。Kubernetes是当今最流行的开源容器管理平台,是 Google 发起并维护的基于 Docker 的开源容器集群管理系统。它是大名鼎鼎的Google Borg的开源版本。 K8s构建在 Docker …...
设置弹窗随鼠标位置移动
1.这是要移动的弹窗,隐藏显示逻辑、样式、展示内容自己写,主要就是动态设置弹窗的style,floatLeft和floatTop都是Vue中的data双向绑定数据; <div id"box" v-show"hasMove" :style"{ left: floatLeft…...
Spring Boot实现数据加密脱敏:注解 + 反射 + AOP
文章目录 1. 引言2. 数据加密和脱敏的需求3. Spring Boot项目初始化4. 敏感数据加密注解设计5. 实现加密和脱敏的工具类6. 实体类和加密脱敏注解的使用7. 利用AOP实现加密和脱敏8. 完善AOP切面9. 测试10. 拓展功能与未来展望10.1 加密算法的选择10.2 动态注解配置 11. 总结 &am…...
jmeter和meterSphere如何使用第三方jar包
工具引用jar包语言都是beanshell 问题起因:metersphere 接口自动化实现过程中,如何实现字符串加密且加密方法依赖第三方库; 使用语言:beanshell脚本语言,java语言 使用工具:idea jmeter metersphere 1.首…...
API对象上千个,有啥关联性,kubectl-tree一键搞定
关注【云原生百宝箱】公众号,获取更多云原生消息 "kubectl-tree 是一款强大的 kubectl 插件,通过 ownerReferences 实现 Kubernetes 对象之间的所有权关系探索。相较于 kubectl lineage,它不仅更全面理解 API 对象的逻辑关系,…...
java自定义工具类在List快速查找相同字段值对象
根据对象某一字段名,获取字段值,将List转换为Map中包含list,Key为字段值,Value为相同字段值的对象list,快速定位具有相同字段值的对象,转换之后便于在Map中根据字段值快速查找相同字段值的对象 //List转Map…...
codeforces Hello 2024 - C - Grouping Increases --- 题解
目录 Grouping Increases 题目描述: 思路解析: 代码实现: Grouping Increases 题目描述: 给你一个大小为n的数组a,你可以把数组a划分为两个子序列s和t,a中元素,要么在子序列s中,…...
STM32H5培训(一)总览
文章目录 1. 前言2. STM32H5系列MCU的特点和新功能包括性能提升、新外设和安全功能等3. STM32H5系列型号之间的区别和关键资源对比4. 性能和功能亮点6. 开发生态参考: 1. 前言 本篇主要介绍STM32H5系列MCU的特点和新功能,包括全新的M33内核、250M主频处…...
亚马逊云科技 WAF 部署小指南(五):在客户端集成 Amazon WAF SDK 抵御 DDoS 攻击...
方案介绍 在 WAF 部署小指南(一)中,我们了解了 Amazon WAF 的原理,并通过创建 WEB ACL 和托管规则防护常见的攻击。也了解了通过创建自定义规则在 HTTP 请求到达应用之前判断是阻断还是允许该请求。在 Amazon WAF 自定义规则中&am…...
高光谱分类论文解读分享之基于多模态融合Transformer的遥感图像分类方法
IEEE TGRS 2023:基于多模态融合Transformer的遥感图像分类方法 题目 Multimodal Fusion Transformer for Remote Sensing Image Classification 作者 Swalpa Kumar Roy , Student Member, IEEE, Ankur Deria , Danfeng Hong , Senior Member, IEEE, Behnood Ras…...
Trans论文复现:基于数据驱动的新能源充电站两阶段规划方法程序代码!
适用平台:MatlabYalmipCplex/Gurobi; 文章提出了一种电动汽车充电站的两阶段规划方法,第一阶段通过蒙特卡洛法模拟充电车辆需求和电池充放电数据来确定充电站位置;第二阶段通过数据驱动的分布鲁棒优化方法优化充电站的新能源和电池…...
将抖音视频转成MP3并下载
这篇是在上一篇的基础上写的,这篇负责抖音作者详情页的视频转声音提取,这篇需要用到后端。 本地启动后端后,在控制台输入对应代码,即可实现hover在封面上,按d一键下载音频 控制台代码 // 获取作者的视频列表var liEle…...
C程序训练:与输入有关的错误
在录入程序时有时稍不注意就可能录入错误的字符导致程序运行结果出现错误,下面举例说明。 下面程序的运行结果是错的,但程序又没有错,到底问题出现在哪呢? #include <stdio.h> int main() {FILE *fp;int i, k, n;fpfopen(…...
制作 CentOS Stream9 的U盘系统启动盘
一、简述 注:请勿用于商用,如有版权纠纷,于博主无任何关系。(仅用于学习研究使用) 由于CentOs Linux 7和CentOs Stream8终止日期是2024年,需要将系统升级到最新版本的CentOs Stream9,下面是刻录系统盘的操…...
JavaSec-RCE
简介 RCE(Remote Code Execution),可以分为:命令注入(Command Injection)、代码注入(Code Injection) 代码注入 1.漏洞场景:Groovy代码注入 Groovy是一种基于JVM的动态语言,语法简洁,支持闭包、动态类型和Java互操作性,…...
深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录
ASP.NET Core 是一个跨平台的开源框架,用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录,以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...
椭圆曲线密码学(ECC)
一、ECC算法概述 椭圆曲线密码学(Elliptic Curve Cryptography)是基于椭圆曲线数学理论的公钥密码系统,由Neal Koblitz和Victor Miller在1985年独立提出。相比RSA,ECC在相同安全强度下密钥更短(256位ECC ≈ 3072位RSA…...
React Native 导航系统实战(React Navigation)
导航系统实战(React Navigation) React Navigation 是 React Native 应用中最常用的导航库之一,它提供了多种导航模式,如堆栈导航(Stack Navigator)、标签导航(Tab Navigator)和抽屉…...
江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命
在华东塑料包装行业面临限塑令深度调整的背景下,江苏艾立泰以一场跨国资源接力的创新实践,重新定义了绿色供应链的边界。 跨国回收网络:废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点,将海外废弃包装箱通过标准…...
Qwen3-Embedding-0.6B深度解析:多语言语义检索的轻量级利器
第一章 引言:语义表示的新时代挑战与Qwen3的破局之路 1.1 文本嵌入的核心价值与技术演进 在人工智能领域,文本嵌入技术如同连接自然语言与机器理解的“神经突触”——它将人类语言转化为计算机可计算的语义向量,支撑着搜索引擎、推荐系统、…...
高危文件识别的常用算法:原理、应用与企业场景
高危文件识别的常用算法:原理、应用与企业场景 高危文件识别旨在检测可能导致安全威胁的文件,如包含恶意代码、敏感数据或欺诈内容的文档,在企业协同办公环境中(如Teams、Google Workspace)尤为重要。结合大模型技术&…...
3403. 从盒子中找出字典序最大的字符串 I
3403. 从盒子中找出字典序最大的字符串 I 题目链接:3403. 从盒子中找出字典序最大的字符串 I 代码如下: class Solution { public:string answerString(string word, int numFriends) {if (numFriends 1) {return word;}string res;for (int i 0;i &…...
【学习笔记】深入理解Java虚拟机学习笔记——第4章 虚拟机性能监控,故障处理工具
第2章 虚拟机性能监控,故障处理工具 4.1 概述 略 4.2 基础故障处理工具 4.2.1 jps:虚拟机进程状况工具 命令:jps [options] [hostid] 功能:本地虚拟机进程显示进程ID(与ps相同),可同时显示主类&#x…...
听写流程自动化实践,轻量级教育辅助
随着智能教育工具的发展,越来越多的传统学习方式正在被数字化、自动化所优化。听写作为语文、英语等学科中重要的基础训练形式,也迎来了更高效的解决方案。 这是一款轻量但功能强大的听写辅助工具。它是基于本地词库与可选在线语音引擎构建,…...
