SpringBoot Kafka消费者 多kafka配置
一、配置文件
xxxxxx:kafka:bootstrap-servers: xx.xx.xx.xx:9092,xx.xx.xx.xx:9092consumer:poll-timeout: 3000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-commit: falseoffset-reset: earliestrecords: 10session-timeout: 150000poll-interval: 360000request-timeout: 60000
二、KafkaConfig
package com.xxxxxx.xxxxxx.config;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.*;import java.util.HashMap;
import java.util.Map;@Slf4j
@Configuration
@EnableKafka
public class KafkaConfig {@Value("${xxxxxx.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${xxxxxx.kafka.consumer.poll-timeout}")private Integer pollTimeout;@Value("${xxxxxx.kafka.consumer.key-deserializer}")private String keyDeserializer;@Value("${xxxxxx.kafka.consumer.value-deserializer}")private String valueDeserializer;@Value("${xxxxxx.kafka.consumer.auto-commit}")private String autoCommit;@Value("${xxxxxx.kafka.consumer.offset-reset}")private String offsetReset;@Value("${xxxxxx.kafka.consumer.records}")private Integer records;@Value("${xxxxxx.kafka.consumer.session-timeout}")private Integer sessionTimeout;@Value("${xxxxxx.kafka.consumer.poll-interval}")private Integer pollInterval;@Value("${xxxxxx.kafka.consumer.request-timeout}")private Integer requestTimeout;@Bean(name = "ixxxxxxKafkaListenerContainerFactory")public KafkaListenerContainerFactory integratedEnergyKafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());//并发数量factory.setConcurrency(3);//设置在消费者中等待记录的最大阻塞时间。factory.getContainerProperties().setPollTimeout(pollTimeout);//ack模式factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}private ConsumerFactory consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}private Map consumerConfigs() {Map props = new HashMap<>();//Kafka集群props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);//消费者组,只要group.id相同,就属于同一个消费者组//props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//是否自动提交offset,默认为true,设置为falseprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);//key反序列化器props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);//value反序列化器props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);//一次消费信息条数props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, records);//earliest:第一次从头开始消费,之后按照offset开始消费;latest:只消费自己启动之后的消息props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);//session超时时间props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);//消费者轮询获取消息的最大时间间隔,超过此时间未获取消息,组将重新平衡,以便将分区重新分配给另一个成员props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, pollInterval);//客户端发起请求后,等待响应的最大时间。如果超时之前未收到响应,客户端会在必要时重新发起请求props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);return props;}
}
三、消费者
@KafkaListener(containerFactory = "xxxxxxEnergyKafkaListenerContainerFactory",id = "itsId",idIsGroup = false,groupId = "itsGroupId",topics = "itsTopic")public void consumerUser(@Payload String data,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,Acknowledgment ack,Consumer<?, ?> consumer){try{}catch (Exception e){}ack.acknowledge();}
相关文章:
SpringBoot Kafka消费者 多kafka配置
一、配置文件 xxxxxx:kafka:bootstrap-servers: xx.xx.xx.xx:9092,xx.xx.xx.xx:9092consumer:poll-timeout: 3000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer…...
git 标签相关命令
要在本地仓库中添加标签,你可以使用以下命令: git tag <tag_name> 这将在当前所处的提交上创建一个轻量级标签(lightweight tag)。如果你想要创建一个带有附注信息的标签,可以使用 -a 选项: git t…...

我在Vscode学OpenCV 图像运算(权重、逻辑运算、掩码、位分解、数字水印)
文章目录 权重 _ 要求两幅图像是相同大小的。[ 1 ] 以数据说话( 1) 最终:( 2 )gamma _输出图像的标量值 [ 2 ] 图像的展现力gamma并不等同于增加曝光度( 1 )gamma100( 2 )…...

【 Docker: 数据卷挂载】
背景 Docker只提供了容器运行的必备依赖,但是一些编辑等操作的依赖是不支持的,如vi操作容器内部文件、将静态资源拷贝到容器内来等。 docker pull nginx docker run -d -p 81:80 --namemynginx -v D:/docker/nginx/www:/usr/share/nginx/www -v D:/dock…...
windows上的静态链接和动态链接的区别与作用(笔记)
C源代码文件经过预编译、编译和汇编后输出的目标文件的后缀和操作系统是有关系的。不同的操作系统使用不同的命名约定和文件格式来表示目标文件。常见的目标文件后缀包括: - Windows系统:.obj、.lib、.dll - Linux系统:.o、.a、.so - macOS系…...
MySQL和Postgresql数据库备份和恢复
MySQL和Postgresql数据库备份和恢复 一、MySQL数据库备份 备份单个数据库 $ mysqldump -uroot -p bdname > dbname.sql备份多个数据库 $ mysqldump -uroot -p --databases dbname1 dbname2 ... > dbname.sql # 备份所有数据库 $ mysqldump -uroot -p --all-databases…...

使用MCU上的I2C总线进行传感器应用
使用MCU上的I2C总线进行传感器应用是嵌入式系统开发中常见的任务,本文将介绍在MCU上实现I2C总线传感器应用的相关技术和流程。 首先,I2C(Inter-Integrated Circuit)总线是一种常用的串行通信协议,用于连接多个设备&am…...

汽车标定技术(七)--基于模型开发如何生成完整的A2L文件(2)
目录 1. 自定义ASAP2文件 2. asap2userlib.tlc需要修改的部分 3. 标定量观测量地址替换 3.1 由elf文件替换 3.2 由map文件替换 3.3 正则表达式(含asap2post.m修改方法) 4.小结 书接上文汽车标定技术(五)--基于模型开发如何生成完整的A2L文件(1)-C…...

ZZ308 物联网应用与服务赛题第E套
2023年全国职业院校技能大赛 中职组 物联网应用与服务 任 务 书 (E卷) 赛位号:______________ 竞赛须知 一、注意事项 1.检查硬件设备、电脑设备是否正常。检查竞赛所需的各项设备、软件和竞赛材料等; 2.竞赛任务中所使用的…...
web相关框架
web相关框架 web 后端开发框架 expressKoaHapiNest web 前端UI库(组件库) Naive-UiAnt Design VueElement plus 后端页面,前端框架(一个开箱即用前端框架) naive-ui-admingin-vue-adminvue-vben-adminvue-pure-adminvue3-antd-admin 无…...

安装dubbo-admin报错node版本和test错误
✅作者简介:CSDN内容合伙人、信息安全专业在校大学生🏆 🔥系列专栏 :dubbo-admin安装 📃新人博主 :欢迎点赞收藏关注,会回访! 💬舞台再大,你不上台࿰…...

HTML使用canvas绘制海报(网络图片)
生成前: 生成后: <!DOCTYPE html> <html><head><meta charset"utf-8"><title>媒体参会嘉宾邀请函生成链接</title><link rel"stylesheet" href"https://cdn.jsdelivr.net/npm/vant2.10…...

20道高频JavaScript面试题快问快答
※其他的快问快答,看这里! 10道高频Qiankun微前端面试题快问快答 10道高频webpack面试题快问快答 20道高频CSS面试题快问快答 20道高频JavaScript面试题快问快答 30道高频Vue面试题快问快答 面试中的快问快答 快问快答的情景在面试中非常常见。 在面试过…...

【STM32】HAL库UART含校验位的串口通信配置BUG避坑
【STM32】HAL库UART含校验位的串口通信配置BUG避坑 文章目录 UART协议校验位HAL库配置含校验位的串口配置BUG避坑附录:Cortex-M架构的SysTick系统定时器精准延时和MCU位带操作SysTick系统定时器精准延时延时函数阻塞延时非阻塞延时 位带操作位带代码位带宏定义总线函…...

Python实用技巧:将 Excel转为PDF
将Excel文件转换为PDF可以方便储存表格数据,此外在打印或共享文档时也能确保表格样式布局等在不同设备和操作系统上保持一致。今天给大家分享一个使用第三方Python库Spire.XLS for Python 实现Excel转PDF的简单方法。 实现步骤 首先,通过pip命令来安装依…...
【面经】讲一下你对jvm和jmm的了解
JVM JVM是Java虚拟机,是Java程序的执行环境。它是一种虚拟的计算机,通过在实际的计算机上仿真模拟各种计算机功能来实现. JVM是Java程序运行的核心,可以将Java字节码转换为可执行的机器码,提供了跨平台性、优秀的垃圾回收器&…...

《网络协议》03. 传输层(TCP UDP)
title: 《网络协议》03. 传输层(TCP & UDP) date: 2022-09-04 22:37:11 updated: 2023-11-08 15:58:52 categories: 学习记录:网络协议 excerpt: 传输层、UDP、TCP(可靠传输,流量控制,拥塞控制…...
ZooKeeper调优
服务器硬件配置 建议 Zookeeper 的服务器最好专属(或是资源隔离的)。磁盘由于 Zookeeper 的数据写入磁盘,强烈建议要使用 SSD。 Linux操作系统优化 Zookeeper 的性能会很明显受到交换分区的影响。建议部署 Zookeeper 的服务器关闭交换分区功能或是通过内核参数调整,减少…...

改进YOLOv5:结合ICCV2023|动态蛇形卷积,构建不规则目标识别网络
🔥🔥🔥 提升多尺度、不规则目标检测,创新提升 🔥🔥🔥 🔥🔥🔥 捕捉图像特征和处理复杂图像特征 🔥🔥🔥 👉👉👉: 本专栏包含大量的新设计的创新想法,包含详细的代码和说明,具备有效的创新组合,可以有效应用到改进创新当中 👉👉👉: �…...

开发知识点-NodeJs-npm/Pnpm/Vite/Yarn包管理器
包管理器 vue-cli-service 不是内部或外部命令,也不是可运行的程序npm 全局变量pnpmPnpm介绍ViteYarn ‘vue-cli-service’ 不是内部或外部命令,也不是可运行的程序 yarn yarn add vue-amap yarn add vue-amap ant-design-vue npm 全局变量 换主机 新…...
web vue 项目 Docker化部署
Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段: 构建阶段(Build Stage):…...

TDengine 快速体验(Docker 镜像方式)
简介 TDengine 可以通过安装包、Docker 镜像 及云服务快速体验 TDengine 的功能,本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用 安装包的方式快…...

关于nvm与node.js
1 安装nvm 安装过程中手动修改 nvm的安装路径, 以及修改 通过nvm安装node后正在使用的node的存放目录【这句话可能难以理解,但接着往下看你就了然了】 2 修改nvm中settings.txt文件配置 nvm安装成功后,通常在该文件中会出现以下配置&…...

深入解析C++中的extern关键字:跨文件共享变量与函数的终极指南
🚀 C extern 关键字深度解析:跨文件编程的终极指南 📅 更新时间:2025年6月5日 🏷️ 标签:C | extern关键字 | 多文件编程 | 链接与声明 | 现代C 文章目录 前言🔥一、extern 是什么?&…...
高防服务器能够抵御哪些网络攻击呢?
高防服务器作为一种有着高度防御能力的服务器,可以帮助网站应对分布式拒绝服务攻击,有效识别和清理一些恶意的网络流量,为用户提供安全且稳定的网络环境,那么,高防服务器一般都可以抵御哪些网络攻击呢?下面…...

selenium学习实战【Python爬虫】
selenium学习实战【Python爬虫】 文章目录 selenium学习实战【Python爬虫】一、声明二、学习目标三、安装依赖3.1 安装selenium库3.2 安装浏览器驱动3.2.1 查看Edge版本3.2.2 驱动安装 四、代码讲解4.1 配置浏览器4.2 加载更多4.3 寻找内容4.4 完整代码 五、报告文件爬取5.1 提…...
大数据学习(132)-HIve数据分析
🍋🍋大数据学习🍋🍋 🔥系列专栏: 👑哲学语录: 用力所能及,改变世界。 💖如果觉得博主的文章还不错的话,请点赞👍收藏⭐️留言Ǵ…...
laravel8+vue3.0+element-plus搭建方法
创建 laravel8 项目 composer create-project --prefer-dist laravel/laravel laravel8 8.* 安装 laravel/ui composer require laravel/ui 修改 package.json 文件 "devDependencies": {"vue/compiler-sfc": "^3.0.7","axios": …...

以光量子为例,详解量子获取方式
光量子技术获取量子比特可在室温下进行。该方式有望通过与名为硅光子学(silicon photonics)的光波导(optical waveguide)芯片制造技术和光纤等光通信技术相结合来实现量子计算机。量子力学中,光既是波又是粒子。光子本…...

【JVM面试篇】高频八股汇总——类加载和类加载器
目录 1. 讲一下类加载过程? 2. Java创建对象的过程? 3. 对象的生命周期? 4. 类加载器有哪些? 5. 双亲委派模型的作用(好处)? 6. 讲一下类的加载和双亲委派原则? 7. 双亲委派模…...