当前位置: 首页 > news >正文

SpringBoot Kafka消费者 多kafka配置

一、配置文件

xxxxxx:kafka:bootstrap-servers: xx.xx.xx.xx:9092,xx.xx.xx.xx:9092consumer:poll-timeout: 3000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-commit: falseoffset-reset: earliestrecords: 10session-timeout: 150000poll-interval: 360000request-timeout: 60000

二、KafkaConfig

package com.xxxxxx.xxxxxx.config;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.*;import java.util.HashMap;
import java.util.Map;@Slf4j
@Configuration
@EnableKafka
public class KafkaConfig {@Value("${xxxxxx.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${xxxxxx.kafka.consumer.poll-timeout}")private Integer pollTimeout;@Value("${xxxxxx.kafka.consumer.key-deserializer}")private String keyDeserializer;@Value("${xxxxxx.kafka.consumer.value-deserializer}")private String valueDeserializer;@Value("${xxxxxx.kafka.consumer.auto-commit}")private String autoCommit;@Value("${xxxxxx.kafka.consumer.offset-reset}")private String offsetReset;@Value("${xxxxxx.kafka.consumer.records}")private Integer records;@Value("${xxxxxx.kafka.consumer.session-timeout}")private Integer sessionTimeout;@Value("${xxxxxx.kafka.consumer.poll-interval}")private Integer pollInterval;@Value("${xxxxxx.kafka.consumer.request-timeout}")private Integer requestTimeout;@Bean(name = "ixxxxxxKafkaListenerContainerFactory")public KafkaListenerContainerFactory integratedEnergyKafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());//并发数量factory.setConcurrency(3);//设置在消费者中等待记录的最大阻塞时间。factory.getContainerProperties().setPollTimeout(pollTimeout);//ack模式factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}private ConsumerFactory consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}private Map consumerConfigs() {Map props = new HashMap<>();//Kafka集群props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);//消费者组,只要group.id相同,就属于同一个消费者组//props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//是否自动提交offset,默认为true,设置为falseprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);//key反序列化器props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);//value反序列化器props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);//一次消费信息条数props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, records);//earliest:第一次从头开始消费,之后按照offset开始消费;latest:只消费自己启动之后的消息props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);//session超时时间props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);//消费者轮询获取消息的最大时间间隔,超过此时间未获取消息,组将重新平衡,以便将分区重新分配给另一个成员props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, pollInterval);//客户端发起请求后,等待响应的最大时间。如果超时之前未收到响应,客户端会在必要时重新发起请求props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);return props;}
}

三、消费者

@KafkaListener(containerFactory = "xxxxxxEnergyKafkaListenerContainerFactory",id = "itsId",idIsGroup = false,groupId = "itsGroupId",topics = "itsTopic")public void consumerUser(@Payload String data,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,Acknowledgment ack,Consumer<?, ?> consumer){try{}catch (Exception e){}ack.acknowledge();}

相关文章:

SpringBoot Kafka消费者 多kafka配置

一、配置文件 xxxxxx:kafka:bootstrap-servers: xx.xx.xx.xx:9092,xx.xx.xx.xx:9092consumer:poll-timeout: 3000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer…...

git 标签相关命令

要在本地仓库中添加标签&#xff0c;你可以使用以下命令&#xff1a; git tag <tag_name> 这将在当前所处的提交上创建一个轻量级标签&#xff08;lightweight tag&#xff09;。如果你想要创建一个带有附注信息的标签&#xff0c;可以使用 -a 选项&#xff1a; git t…...

我在Vscode学OpenCV 图像运算(权重、逻辑运算、掩码、位分解、数字水印)

文章目录 权重 _ 要求两幅图像是相同大小的。[ 1 ] 以数据说话&#xff08; 1&#xff09; 最终&#xff1a;&#xff08; 2 &#xff09;gamma _输出图像的标量值 [ 2 ] 图像的展现力gamma并不等同于增加曝光度&#xff08; 1 &#xff09;gamma100&#xff08; 2 &#xff09…...

【 Docker: 数据卷挂载】

背景 Docker只提供了容器运行的必备依赖&#xff0c;但是一些编辑等操作的依赖是不支持的&#xff0c;如vi操作容器内部文件、将静态资源拷贝到容器内来等。 docker pull nginx docker run -d -p 81:80 --namemynginx -v D:/docker/nginx/www:/usr/share/nginx/www -v D:/dock…...

windows上的静态链接和动态链接的区别与作用(笔记)

C源代码文件经过预编译、编译和汇编后输出的目标文件的后缀和操作系统是有关系的。不同的操作系统使用不同的命名约定和文件格式来表示目标文件。常见的目标文件后缀包括&#xff1a; - Windows系统&#xff1a;.obj、.lib、.dll - Linux系统&#xff1a;.o、.a、.so - macOS系…...

MySQL和Postgresql数据库备份和恢复

MySQL和Postgresql数据库备份和恢复 一、MySQL数据库备份 备份单个数据库 $ mysqldump -uroot -p bdname > dbname.sql备份多个数据库 $ mysqldump -uroot -p --databases dbname1 dbname2 ... > dbname.sql # 备份所有数据库 $ mysqldump -uroot -p --all-databases…...

使用MCU上的I2C总线进行传感器应用

使用MCU上的I2C总线进行传感器应用是嵌入式系统开发中常见的任务&#xff0c;本文将介绍在MCU上实现I2C总线传感器应用的相关技术和流程。 首先&#xff0c;I2C&#xff08;Inter-Integrated Circuit&#xff09;总线是一种常用的串行通信协议&#xff0c;用于连接多个设备&am…...

汽车标定技术(七)--基于模型开发如何生成完整的A2L文件(2)

目录 1. 自定义ASAP2文件 2. asap2userlib.tlc需要修改的部分 3. 标定量观测量地址替换 3.1 由elf文件替换 3.2 由map文件替换 3.3 正则表达式&#xff08;含asap2post.m修改方法&#xff09; 4.小结 书接上文汽车标定技术(五)--基于模型开发如何生成完整的A2L文件(1)-C…...

ZZ308 物联网应用与服务赛题第E套

2023年全国职业院校技能大赛 中职组 物联网应用与服务 任 务 书 &#xff08;E卷&#xff09; 赛位号&#xff1a;______________ 竞赛须知 一、注意事项 1.检查硬件设备、电脑设备是否正常。检查竞赛所需的各项设备、软件和竞赛材料等&#xff1b; 2.竞赛任务中所使用的…...

web相关框架

web相关框架 web 后端开发框架 expressKoaHapiNest web 前端UI库&#xff08;组件库&#xff09; Naive-UiAnt Design VueElement plus 后端页面&#xff0c;前端框架(一个开箱即用前端框架) naive-ui-admingin-vue-adminvue-vben-adminvue-pure-adminvue3-antd-admin 无…...

安装dubbo-admin报错node版本和test错误

✅作者简介&#xff1a;CSDN内容合伙人、信息安全专业在校大学生&#x1f3c6; &#x1f525;系列专栏 &#xff1a;dubbo-admin安装 &#x1f4c3;新人博主 &#xff1a;欢迎点赞收藏关注&#xff0c;会回访&#xff01; &#x1f4ac;舞台再大&#xff0c;你不上台&#xff0…...

HTML使用canvas绘制海报(网络图片)

生成前&#xff1a; 生成后&#xff1a; <!DOCTYPE html> <html><head><meta charset"utf-8"><title>媒体参会嘉宾邀请函生成链接</title><link rel"stylesheet" href"https://cdn.jsdelivr.net/npm/vant2.10…...

20道高频JavaScript面试题快问快答

※其他的快问快答&#xff0c;看这里&#xff01; 10道高频Qiankun微前端面试题快问快答 10道高频webpack面试题快问快答 20道高频CSS面试题快问快答 20道高频JavaScript面试题快问快答 30道高频Vue面试题快问快答 面试中的快问快答 快问快答的情景在面试中非常常见。 在面试过…...

【STM32】HAL库UART含校验位的串口通信配置BUG避坑

【STM32】HAL库UART含校验位的串口通信配置BUG避坑 文章目录 UART协议校验位HAL库配置含校验位的串口配置BUG避坑附录&#xff1a;Cortex-M架构的SysTick系统定时器精准延时和MCU位带操作SysTick系统定时器精准延时延时函数阻塞延时非阻塞延时 位带操作位带代码位带宏定义总线函…...

Python实用技巧:将 Excel转为PDF

将Excel文件转换为PDF可以方便储存表格数据&#xff0c;此外在打印或共享文档时也能确保表格样式布局等在不同设备和操作系统上保持一致。今天给大家分享一个使用第三方Python库Spire.XLS for Python 实现Excel转PDF的简单方法。 实现步骤 首先&#xff0c;通过pip命令来安装依…...

【面经】讲一下你对jvm和jmm的了解

JVM JVM是Java虚拟机&#xff0c;是Java程序的执行环境。它是一种虚拟的计算机&#xff0c;通过在实际的计算机上仿真模拟各种计算机功能来实现. JVM是Java程序运行的核心&#xff0c;可以将Java字节码转换为可执行的机器码&#xff0c;提供了跨平台性、优秀的垃圾回收器&…...

《网络协议》03. 传输层(TCP UDP)

title: 《网络协议》03. 传输层&#xff08;TCP & UDP&#xff09; date: 2022-09-04 22:37:11 updated: 2023-11-08 15:58:52 categories: 学习记录&#xff1a;网络协议 excerpt: 传输层、UDP、TCP&#xff08;可靠传输&#xff0c;流量控制&#xff0c;拥塞控制&#xf…...

ZooKeeper调优

服务器硬件配置 建议 Zookeeper 的服务器最好专属(或是资源隔离的)。磁盘由于 Zookeeper 的数据写入磁盘,强烈建议要使用 SSD。 Linux操作系统优化 Zookeeper 的性能会很明显受到交换分区的影响。建议部署 Zookeeper 的服务器关闭交换分区功能或是通过内核参数调整,减少…...

改进YOLOv5:结合ICCV2023|动态蛇形卷积,构建不规则目标识别网络

🔥🔥🔥 提升多尺度、不规则目标检测,创新提升 🔥🔥🔥 🔥🔥🔥 捕捉图像特征和处理复杂图像特征 🔥🔥🔥 👉👉👉: 本专栏包含大量的新设计的创新想法,包含详细的代码和说明,具备有效的创新组合,可以有效应用到改进创新当中 👉👉👉: �…...

开发知识点-NodeJs-npm/Pnpm/Vite/Yarn包管理器

包管理器 vue-cli-service 不是内部或外部命令&#xff0c;也不是可运行的程序npm 全局变量pnpmPnpm介绍ViteYarn ‘vue-cli-service’ 不是内部或外部命令&#xff0c;也不是可运行的程序 yarn yarn add vue-amap yarn add vue-amap ant-design-vue npm 全局变量 换主机 新…...

Mac上好用的翻译软件推荐 兼容m

Mac翻译软件可以用在学习&#xff0c;工作&#xff0c;生活当中&#xff0c;一款好用的翻译软件&#xff0c;具有翻译准确&#xff0c;翻译快速等基本特点&#xff0c;能够帮您提高工作效率。Mac上有什么好用的翻译软件呢&#xff1f;今天小编为大家整理了6款好用的Mac翻译软件…...

软件下载网站

1.qt 下载官网 Index of /new_archive/qt 2.qt-vs 插件下载 Index of /official_releases/vsaddin...

java获取近期视频流关键帧与截图

1、背景 最近在做视频转发的开发时&#xff0c;遇到一个问题&#xff0c;前端订阅播放h264视频流时&#xff0c;有时会出现一段时间黑屏&#xff0c;经过测试发现是没有收到关键帧&#xff0c;只有第一帧是关键帧才能保证后续播放正常。所以后端需要实现一个功能&#xff0c;就…...

arcgis 批量删除Table中的某些Field

当shp或者table文件较少时&#xff0c;可以手动删除每个文件中的某些字段&#xff0c;当文件较多时&#xff0c;就需要使用arcpy或者model进行处理。...

工厂设备扫码使用售卖联网开发需要怎么开发开源代码?

我们将详细介绍如何使用开源代码开发一套用于工厂设备联网统计的系统。我们将详细讨论所需硬件组件的选择、开源框架和库的使用、软件开发流程以及最后的集成和部署。在这个过程中&#xff0c;我们将提供实用的操作步骤和指导&#xff0c;帮助你更容易地完成这个复杂的任务。 …...

软考高级之132个工具和技术

分类 工具与技术 描述 数据收集 头脑风暴 在短时间内获得大量创意&#xff0c;适用于团队环境&#xff0c;需要引导者引导&#xff08;过程中可以天马行空&#xff0c;不要打断&#xff09; 包括&#xff1a;头脑风暴、头脑写作 头脑写作&#xff1a;在开始小组创意讨论之…...

算法通过村第十八关-回溯|白银笔记|经典问题

文章目录 前言组合总和问题分割回文串子集问题排序问题字母大小写全排列单词搜索总结 前言 提示&#xff1a;我不愿再给你写信了。因为我终于感到&#xff0c;我们的全部通信知识一个大大的幻影&#xff0c;我们每个人知识再给自己写信。 --安德烈纪德 回溯主要解决一些暴力枚举…...

vue2 集成 - 超图 - SuperMap iClient3D for WebGL 及常用方法

文章目录 1:下载SuperMap iClient3D for WebGL2:格式化项目中所用的依赖包3:vue2 项目引入4:vue2 页面使用常见方法4.1 创建三维场景,引入在线地图资源,定位到指定位置4.2 坐标拾取4.3 用户输入事件4.4 拾取实体4.5 实体改变监听事件4.6 双击全屏4.7 相机移动事件4.8 添加…...

应用程序服务器/事件驱动编程/CommonJS介绍

目录 应用程序服务器事件驱动编程CommonJS &#x1f44d; 点赞&#xff0c;你的认可是我创作的动力&#xff01; ⭐️ 收藏&#xff0c;你的青睐是我努力的方向&#xff01; ✏️ 评论&#xff0c;你的意见是我进步的财富&#xff01; 应用程序服务器 应用程序服务器是一种用…...

第二十九章 目标检测中的测试模型评价指标(车道线感知)

前言 近期参与到了手写AI的车道线检测的学习中去&#xff0c;以此系列笔记记录学习与思考的全过程。车道线检测系列会持续更新&#xff0c;力求完整精炼&#xff0c;引人启示。所需前期知识&#xff0c;可以结合手写AI进行系统的学习。 介绍 自动驾驶的一大前提是保证人的安全…...