SpringBoot Kafka消费者 多kafka配置
一、配置文件
xxxxxx:kafka:bootstrap-servers: xx.xx.xx.xx:9092,xx.xx.xx.xx:9092consumer:poll-timeout: 3000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-commit: falseoffset-reset: earliestrecords: 10session-timeout: 150000poll-interval: 360000request-timeout: 60000
二、KafkaConfig
package com.xxxxxx.xxxxxx.config;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.*;import java.util.HashMap;
import java.util.Map;@Slf4j
@Configuration
@EnableKafka
public class KafkaConfig {@Value("${xxxxxx.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${xxxxxx.kafka.consumer.poll-timeout}")private Integer pollTimeout;@Value("${xxxxxx.kafka.consumer.key-deserializer}")private String keyDeserializer;@Value("${xxxxxx.kafka.consumer.value-deserializer}")private String valueDeserializer;@Value("${xxxxxx.kafka.consumer.auto-commit}")private String autoCommit;@Value("${xxxxxx.kafka.consumer.offset-reset}")private String offsetReset;@Value("${xxxxxx.kafka.consumer.records}")private Integer records;@Value("${xxxxxx.kafka.consumer.session-timeout}")private Integer sessionTimeout;@Value("${xxxxxx.kafka.consumer.poll-interval}")private Integer pollInterval;@Value("${xxxxxx.kafka.consumer.request-timeout}")private Integer requestTimeout;@Bean(name = "ixxxxxxKafkaListenerContainerFactory")public KafkaListenerContainerFactory integratedEnergyKafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());//并发数量factory.setConcurrency(3);//设置在消费者中等待记录的最大阻塞时间。factory.getContainerProperties().setPollTimeout(pollTimeout);//ack模式factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}private ConsumerFactory consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}private Map consumerConfigs() {Map props = new HashMap<>();//Kafka集群props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);//消费者组,只要group.id相同,就属于同一个消费者组//props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//是否自动提交offset,默认为true,设置为falseprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);//key反序列化器props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);//value反序列化器props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);//一次消费信息条数props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, records);//earliest:第一次从头开始消费,之后按照offset开始消费;latest:只消费自己启动之后的消息props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);//session超时时间props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);//消费者轮询获取消息的最大时间间隔,超过此时间未获取消息,组将重新平衡,以便将分区重新分配给另一个成员props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, pollInterval);//客户端发起请求后,等待响应的最大时间。如果超时之前未收到响应,客户端会在必要时重新发起请求props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);return props;}
}
三、消费者
@KafkaListener(containerFactory = "xxxxxxEnergyKafkaListenerContainerFactory",id = "itsId",idIsGroup = false,groupId = "itsGroupId",topics = "itsTopic")public void consumerUser(@Payload String data,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,Acknowledgment ack,Consumer<?, ?> consumer){try{}catch (Exception e){}ack.acknowledge();}
相关文章:
SpringBoot Kafka消费者 多kafka配置
一、配置文件 xxxxxx:kafka:bootstrap-servers: xx.xx.xx.xx:9092,xx.xx.xx.xx:9092consumer:poll-timeout: 3000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer…...
git 标签相关命令
要在本地仓库中添加标签,你可以使用以下命令: git tag <tag_name> 这将在当前所处的提交上创建一个轻量级标签(lightweight tag)。如果你想要创建一个带有附注信息的标签,可以使用 -a 选项: git t…...
我在Vscode学OpenCV 图像运算(权重、逻辑运算、掩码、位分解、数字水印)
文章目录 权重 _ 要求两幅图像是相同大小的。[ 1 ] 以数据说话( 1) 最终:( 2 )gamma _输出图像的标量值 [ 2 ] 图像的展现力gamma并不等同于增加曝光度( 1 )gamma100( 2 )…...
【 Docker: 数据卷挂载】
背景 Docker只提供了容器运行的必备依赖,但是一些编辑等操作的依赖是不支持的,如vi操作容器内部文件、将静态资源拷贝到容器内来等。 docker pull nginx docker run -d -p 81:80 --namemynginx -v D:/docker/nginx/www:/usr/share/nginx/www -v D:/dock…...
windows上的静态链接和动态链接的区别与作用(笔记)
C源代码文件经过预编译、编译和汇编后输出的目标文件的后缀和操作系统是有关系的。不同的操作系统使用不同的命名约定和文件格式来表示目标文件。常见的目标文件后缀包括: - Windows系统:.obj、.lib、.dll - Linux系统:.o、.a、.so - macOS系…...
MySQL和Postgresql数据库备份和恢复
MySQL和Postgresql数据库备份和恢复 一、MySQL数据库备份 备份单个数据库 $ mysqldump -uroot -p bdname > dbname.sql备份多个数据库 $ mysqldump -uroot -p --databases dbname1 dbname2 ... > dbname.sql # 备份所有数据库 $ mysqldump -uroot -p --all-databases…...
使用MCU上的I2C总线进行传感器应用
使用MCU上的I2C总线进行传感器应用是嵌入式系统开发中常见的任务,本文将介绍在MCU上实现I2C总线传感器应用的相关技术和流程。 首先,I2C(Inter-Integrated Circuit)总线是一种常用的串行通信协议,用于连接多个设备&am…...
汽车标定技术(七)--基于模型开发如何生成完整的A2L文件(2)
目录 1. 自定义ASAP2文件 2. asap2userlib.tlc需要修改的部分 3. 标定量观测量地址替换 3.1 由elf文件替换 3.2 由map文件替换 3.3 正则表达式(含asap2post.m修改方法) 4.小结 书接上文汽车标定技术(五)--基于模型开发如何生成完整的A2L文件(1)-C…...
ZZ308 物联网应用与服务赛题第E套
2023年全国职业院校技能大赛 中职组 物联网应用与服务 任 务 书 (E卷) 赛位号:______________ 竞赛须知 一、注意事项 1.检查硬件设备、电脑设备是否正常。检查竞赛所需的各项设备、软件和竞赛材料等; 2.竞赛任务中所使用的…...
web相关框架
web相关框架 web 后端开发框架 expressKoaHapiNest web 前端UI库(组件库) Naive-UiAnt Design VueElement plus 后端页面,前端框架(一个开箱即用前端框架) naive-ui-admingin-vue-adminvue-vben-adminvue-pure-adminvue3-antd-admin 无…...
安装dubbo-admin报错node版本和test错误
✅作者简介:CSDN内容合伙人、信息安全专业在校大学生🏆 🔥系列专栏 :dubbo-admin安装 📃新人博主 :欢迎点赞收藏关注,会回访! 💬舞台再大,你不上台࿰…...
HTML使用canvas绘制海报(网络图片)
生成前: 生成后: <!DOCTYPE html> <html><head><meta charset"utf-8"><title>媒体参会嘉宾邀请函生成链接</title><link rel"stylesheet" href"https://cdn.jsdelivr.net/npm/vant2.10…...
20道高频JavaScript面试题快问快答
※其他的快问快答,看这里! 10道高频Qiankun微前端面试题快问快答 10道高频webpack面试题快问快答 20道高频CSS面试题快问快答 20道高频JavaScript面试题快问快答 30道高频Vue面试题快问快答 面试中的快问快答 快问快答的情景在面试中非常常见。 在面试过…...
【STM32】HAL库UART含校验位的串口通信配置BUG避坑
【STM32】HAL库UART含校验位的串口通信配置BUG避坑 文章目录 UART协议校验位HAL库配置含校验位的串口配置BUG避坑附录:Cortex-M架构的SysTick系统定时器精准延时和MCU位带操作SysTick系统定时器精准延时延时函数阻塞延时非阻塞延时 位带操作位带代码位带宏定义总线函…...
Python实用技巧:将 Excel转为PDF
将Excel文件转换为PDF可以方便储存表格数据,此外在打印或共享文档时也能确保表格样式布局等在不同设备和操作系统上保持一致。今天给大家分享一个使用第三方Python库Spire.XLS for Python 实现Excel转PDF的简单方法。 实现步骤 首先,通过pip命令来安装依…...
【面经】讲一下你对jvm和jmm的了解
JVM JVM是Java虚拟机,是Java程序的执行环境。它是一种虚拟的计算机,通过在实际的计算机上仿真模拟各种计算机功能来实现. JVM是Java程序运行的核心,可以将Java字节码转换为可执行的机器码,提供了跨平台性、优秀的垃圾回收器&…...
《网络协议》03. 传输层(TCP UDP)
title: 《网络协议》03. 传输层(TCP & UDP) date: 2022-09-04 22:37:11 updated: 2023-11-08 15:58:52 categories: 学习记录:网络协议 excerpt: 传输层、UDP、TCP(可靠传输,流量控制,拥塞控制…...
ZooKeeper调优
服务器硬件配置 建议 Zookeeper 的服务器最好专属(或是资源隔离的)。磁盘由于 Zookeeper 的数据写入磁盘,强烈建议要使用 SSD。 Linux操作系统优化 Zookeeper 的性能会很明显受到交换分区的影响。建议部署 Zookeeper 的服务器关闭交换分区功能或是通过内核参数调整,减少…...
改进YOLOv5:结合ICCV2023|动态蛇形卷积,构建不规则目标识别网络
🔥🔥🔥 提升多尺度、不规则目标检测,创新提升 🔥🔥🔥 🔥🔥🔥 捕捉图像特征和处理复杂图像特征 🔥🔥🔥 👉👉👉: 本专栏包含大量的新设计的创新想法,包含详细的代码和说明,具备有效的创新组合,可以有效应用到改进创新当中 👉👉👉: �…...
开发知识点-NodeJs-npm/Pnpm/Vite/Yarn包管理器
包管理器 vue-cli-service 不是内部或外部命令,也不是可运行的程序npm 全局变量pnpmPnpm介绍ViteYarn ‘vue-cli-service’ 不是内部或外部命令,也不是可运行的程序 yarn yarn add vue-amap yarn add vue-amap ant-design-vue npm 全局变量 换主机 新…...
告别物理打印机:如何用Virtual-ZPL-Printer高效测试Zebra标签应用 [特殊字符]
告别物理打印机:如何用Virtual-ZPL-Printer高效测试Zebra标签应用 🚀 【免费下载链接】Virtual-ZPL-Printer An ethernet based virtual Zebra Label Printer that can be used to test applications that produce bar code labels. 项目地址: https:/…...
模拟消息队列的消费逻辑-Java
分享一个大牛的人工智能教程。零基础!通俗易懂!风趣幽默!希望你也加入到人工智能的队伍中来!请轻击人工智能教程https://www.captainai.net/troubleshooter 这是一个生产级消息队列消费逻辑模拟,重点突出&am…...
Bluesky 24小时全网瘫痪深度解析:伊朗API层DDoS攻击与去中心化平台的安全困局
前言 2026年4月15日深夜,一场突如其来的大规模网络攻击让全球增长最快的去中心化社交平台Bluesky陷入了成立以来最严重的服务危机。在短短24小时内,全球4370万用户无法刷新信息流、接收通知、发布内容或使用搜索功能,平台几乎完全瘫痪。此次攻…...
告别C盘红色警告!把WSL 2的虚拟硬盘迁移并扩容到其他盘(D/E盘教程)
彻底解放C盘空间:WSL 2虚拟硬盘迁移与智能扩容全攻略 每次打开Windows资源管理器,那个刺眼的红色警告条总让人心头一紧——C盘又满了。对于深度使用WSL 2的开发者和数据科学工作者来说,这个问题尤为棘手。默认安装在C盘的WSL 2虚拟硬盘(VHDX)…...
别再只看单个差异基因了!用R语言clusterProfiler包做ORA富集分析,给你的RNA-seq结果找个靠谱的‘解释’
从基因列表到生物学故事:用R语言解锁RNA-seq数据的通路级解读 第一次拿到RNA-seq差异分析结果时,看着Excel里那几百个"显著差异基因",我盯着屏幕发呆了半小时——这些基因到底说明了什么生物学问题?如果你也经历过这种&…...
手把手教你用Windows电脑+IPv6搭建个人网盘:可道云保姆级配置与防火墙避坑指南
零成本打造私有云盘:WindowsIPv6环境下的可道云全栈配置指南 家里那台闲置的Windows电脑,其实是一台被低估的数据中心。想象一下:不再受公有云限速困扰,所有文件触手可及,还能与团队成员实时协作——关键是完全免费。本…...
Moonlight Internet Hosting Tool:零配置实现远程游戏串流的终极解决方案
Moonlight Internet Hosting Tool:零配置实现远程游戏串流的终极解决方案 【免费下载链接】Internet-Hosting-Tool Enable Moonlight streaming from your PC over the Internet with no configuration required 项目地址: https://gitcode.com/gh_mirrors/in/Int…...
左值和右值:从根源理解 C++ 的引用与移动语义
在 C 里,“左值”和“右值”几乎是每一个进阶开发者绕不开的概念。它们看起来很基础——左值可以放在赋值号左边,右值只能放在右边——但这个朴素的定义在现代 C 中早已不够用了。C11 引入的右值引用、移动语义、完美转发,让这一对概念变得无…...
Keil5编译报错找不到ARM编译器?手把手教你安装AC5.06(附路径配置避坑指南)
Keil5编译报错找不到ARM编译器?手把手教你安装AC5.06(附路径配置避坑指南) 当你满怀期待地打开一个STM32项目准备大展身手时,Keil5突然弹出一个令人窒息的报错:"Target uses ARM-Compiler Default Compiler Versi…...
别再死记硬背了!用‘安检-修正-通知’三步法,轻松理解WPF依赖属性的PropertyChangedCallback、CoerceValueCallback和ValidateValueCallback
用机场安检流程秒懂WPF依赖属性的三大回调机制 想象你正推着行李走进机场,从值机柜台到登机口需要经过层层检查与调整——这与WPF依赖属性处理数据流的逻辑惊人地相似。本文将用"安检-修正-通知"的生活化模型,带您重新理解ValidateValueCallba…...
