当前位置: 首页 > news >正文

SpringBoot Kafka消费者 多kafka配置

一、配置文件

xxxxxx:kafka:bootstrap-servers: xx.xx.xx.xx:9092,xx.xx.xx.xx:9092consumer:poll-timeout: 3000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-commit: falseoffset-reset: earliestrecords: 10session-timeout: 150000poll-interval: 360000request-timeout: 60000

二、KafkaConfig

package com.xxxxxx.xxxxxx.config;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.*;import java.util.HashMap;
import java.util.Map;@Slf4j
@Configuration
@EnableKafka
public class KafkaConfig {@Value("${xxxxxx.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${xxxxxx.kafka.consumer.poll-timeout}")private Integer pollTimeout;@Value("${xxxxxx.kafka.consumer.key-deserializer}")private String keyDeserializer;@Value("${xxxxxx.kafka.consumer.value-deserializer}")private String valueDeserializer;@Value("${xxxxxx.kafka.consumer.auto-commit}")private String autoCommit;@Value("${xxxxxx.kafka.consumer.offset-reset}")private String offsetReset;@Value("${xxxxxx.kafka.consumer.records}")private Integer records;@Value("${xxxxxx.kafka.consumer.session-timeout}")private Integer sessionTimeout;@Value("${xxxxxx.kafka.consumer.poll-interval}")private Integer pollInterval;@Value("${xxxxxx.kafka.consumer.request-timeout}")private Integer requestTimeout;@Bean(name = "ixxxxxxKafkaListenerContainerFactory")public KafkaListenerContainerFactory integratedEnergyKafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());//并发数量factory.setConcurrency(3);//设置在消费者中等待记录的最大阻塞时间。factory.getContainerProperties().setPollTimeout(pollTimeout);//ack模式factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}private ConsumerFactory consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}private Map consumerConfigs() {Map props = new HashMap<>();//Kafka集群props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);//消费者组,只要group.id相同,就属于同一个消费者组//props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//是否自动提交offset,默认为true,设置为falseprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);//key反序列化器props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);//value反序列化器props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);//一次消费信息条数props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, records);//earliest:第一次从头开始消费,之后按照offset开始消费;latest:只消费自己启动之后的消息props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);//session超时时间props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);//消费者轮询获取消息的最大时间间隔,超过此时间未获取消息,组将重新平衡,以便将分区重新分配给另一个成员props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, pollInterval);//客户端发起请求后,等待响应的最大时间。如果超时之前未收到响应,客户端会在必要时重新发起请求props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);return props;}
}

三、消费者

@KafkaListener(containerFactory = "xxxxxxEnergyKafkaListenerContainerFactory",id = "itsId",idIsGroup = false,groupId = "itsGroupId",topics = "itsTopic")public void consumerUser(@Payload String data,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,Acknowledgment ack,Consumer<?, ?> consumer){try{}catch (Exception e){}ack.acknowledge();}

相关文章:

SpringBoot Kafka消费者 多kafka配置

一、配置文件 xxxxxx:kafka:bootstrap-servers: xx.xx.xx.xx:9092,xx.xx.xx.xx:9092consumer:poll-timeout: 3000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer…...

git 标签相关命令

要在本地仓库中添加标签&#xff0c;你可以使用以下命令&#xff1a; git tag <tag_name> 这将在当前所处的提交上创建一个轻量级标签&#xff08;lightweight tag&#xff09;。如果你想要创建一个带有附注信息的标签&#xff0c;可以使用 -a 选项&#xff1a; git t…...

我在Vscode学OpenCV 图像运算(权重、逻辑运算、掩码、位分解、数字水印)

文章目录 权重 _ 要求两幅图像是相同大小的。[ 1 ] 以数据说话&#xff08; 1&#xff09; 最终&#xff1a;&#xff08; 2 &#xff09;gamma _输出图像的标量值 [ 2 ] 图像的展现力gamma并不等同于增加曝光度&#xff08; 1 &#xff09;gamma100&#xff08; 2 &#xff09…...

【 Docker: 数据卷挂载】

背景 Docker只提供了容器运行的必备依赖&#xff0c;但是一些编辑等操作的依赖是不支持的&#xff0c;如vi操作容器内部文件、将静态资源拷贝到容器内来等。 docker pull nginx docker run -d -p 81:80 --namemynginx -v D:/docker/nginx/www:/usr/share/nginx/www -v D:/dock…...

windows上的静态链接和动态链接的区别与作用(笔记)

C源代码文件经过预编译、编译和汇编后输出的目标文件的后缀和操作系统是有关系的。不同的操作系统使用不同的命名约定和文件格式来表示目标文件。常见的目标文件后缀包括&#xff1a; - Windows系统&#xff1a;.obj、.lib、.dll - Linux系统&#xff1a;.o、.a、.so - macOS系…...

MySQL和Postgresql数据库备份和恢复

MySQL和Postgresql数据库备份和恢复 一、MySQL数据库备份 备份单个数据库 $ mysqldump -uroot -p bdname > dbname.sql备份多个数据库 $ mysqldump -uroot -p --databases dbname1 dbname2 ... > dbname.sql # 备份所有数据库 $ mysqldump -uroot -p --all-databases…...

使用MCU上的I2C总线进行传感器应用

使用MCU上的I2C总线进行传感器应用是嵌入式系统开发中常见的任务&#xff0c;本文将介绍在MCU上实现I2C总线传感器应用的相关技术和流程。 首先&#xff0c;I2C&#xff08;Inter-Integrated Circuit&#xff09;总线是一种常用的串行通信协议&#xff0c;用于连接多个设备&am…...

汽车标定技术(七)--基于模型开发如何生成完整的A2L文件(2)

目录 1. 自定义ASAP2文件 2. asap2userlib.tlc需要修改的部分 3. 标定量观测量地址替换 3.1 由elf文件替换 3.2 由map文件替换 3.3 正则表达式&#xff08;含asap2post.m修改方法&#xff09; 4.小结 书接上文汽车标定技术(五)--基于模型开发如何生成完整的A2L文件(1)-C…...

ZZ308 物联网应用与服务赛题第E套

2023年全国职业院校技能大赛 中职组 物联网应用与服务 任 务 书 &#xff08;E卷&#xff09; 赛位号&#xff1a;______________ 竞赛须知 一、注意事项 1.检查硬件设备、电脑设备是否正常。检查竞赛所需的各项设备、软件和竞赛材料等&#xff1b; 2.竞赛任务中所使用的…...

web相关框架

web相关框架 web 后端开发框架 expressKoaHapiNest web 前端UI库&#xff08;组件库&#xff09; Naive-UiAnt Design VueElement plus 后端页面&#xff0c;前端框架(一个开箱即用前端框架) naive-ui-admingin-vue-adminvue-vben-adminvue-pure-adminvue3-antd-admin 无…...

安装dubbo-admin报错node版本和test错误

✅作者简介&#xff1a;CSDN内容合伙人、信息安全专业在校大学生&#x1f3c6; &#x1f525;系列专栏 &#xff1a;dubbo-admin安装 &#x1f4c3;新人博主 &#xff1a;欢迎点赞收藏关注&#xff0c;会回访&#xff01; &#x1f4ac;舞台再大&#xff0c;你不上台&#xff0…...

HTML使用canvas绘制海报(网络图片)

生成前&#xff1a; 生成后&#xff1a; <!DOCTYPE html> <html><head><meta charset"utf-8"><title>媒体参会嘉宾邀请函生成链接</title><link rel"stylesheet" href"https://cdn.jsdelivr.net/npm/vant2.10…...

20道高频JavaScript面试题快问快答

※其他的快问快答&#xff0c;看这里&#xff01; 10道高频Qiankun微前端面试题快问快答 10道高频webpack面试题快问快答 20道高频CSS面试题快问快答 20道高频JavaScript面试题快问快答 30道高频Vue面试题快问快答 面试中的快问快答 快问快答的情景在面试中非常常见。 在面试过…...

【STM32】HAL库UART含校验位的串口通信配置BUG避坑

【STM32】HAL库UART含校验位的串口通信配置BUG避坑 文章目录 UART协议校验位HAL库配置含校验位的串口配置BUG避坑附录&#xff1a;Cortex-M架构的SysTick系统定时器精准延时和MCU位带操作SysTick系统定时器精准延时延时函数阻塞延时非阻塞延时 位带操作位带代码位带宏定义总线函…...

Python实用技巧:将 Excel转为PDF

将Excel文件转换为PDF可以方便储存表格数据&#xff0c;此外在打印或共享文档时也能确保表格样式布局等在不同设备和操作系统上保持一致。今天给大家分享一个使用第三方Python库Spire.XLS for Python 实现Excel转PDF的简单方法。 实现步骤 首先&#xff0c;通过pip命令来安装依…...

【面经】讲一下你对jvm和jmm的了解

JVM JVM是Java虚拟机&#xff0c;是Java程序的执行环境。它是一种虚拟的计算机&#xff0c;通过在实际的计算机上仿真模拟各种计算机功能来实现. JVM是Java程序运行的核心&#xff0c;可以将Java字节码转换为可执行的机器码&#xff0c;提供了跨平台性、优秀的垃圾回收器&…...

《网络协议》03. 传输层(TCP UDP)

title: 《网络协议》03. 传输层&#xff08;TCP & UDP&#xff09; date: 2022-09-04 22:37:11 updated: 2023-11-08 15:58:52 categories: 学习记录&#xff1a;网络协议 excerpt: 传输层、UDP、TCP&#xff08;可靠传输&#xff0c;流量控制&#xff0c;拥塞控制&#xf…...

ZooKeeper调优

服务器硬件配置 建议 Zookeeper 的服务器最好专属(或是资源隔离的)。磁盘由于 Zookeeper 的数据写入磁盘,强烈建议要使用 SSD。 Linux操作系统优化 Zookeeper 的性能会很明显受到交换分区的影响。建议部署 Zookeeper 的服务器关闭交换分区功能或是通过内核参数调整,减少…...

改进YOLOv5:结合ICCV2023|动态蛇形卷积,构建不规则目标识别网络

🔥🔥🔥 提升多尺度、不规则目标检测,创新提升 🔥🔥🔥 🔥🔥🔥 捕捉图像特征和处理复杂图像特征 🔥🔥🔥 👉👉👉: 本专栏包含大量的新设计的创新想法,包含详细的代码和说明,具备有效的创新组合,可以有效应用到改进创新当中 👉👉👉: �…...

开发知识点-NodeJs-npm/Pnpm/Vite/Yarn包管理器

包管理器 vue-cli-service 不是内部或外部命令&#xff0c;也不是可运行的程序npm 全局变量pnpmPnpm介绍ViteYarn ‘vue-cli-service’ 不是内部或外部命令&#xff0c;也不是可运行的程序 yarn yarn add vue-amap yarn add vue-amap ant-design-vue npm 全局变量 换主机 新…...

Keil 中设置 STM32 Flash 和 RAM 地址详解

文章目录 Keil 中设置 STM32 Flash 和 RAM 地址详解一、Flash 和 RAM 配置界面(Target 选项卡)1. IROM1(用于配置 Flash)2. IRAM1(用于配置 RAM)二、链接器设置界面(Linker 选项卡)1. 勾选“Use Memory Layout from Target Dialog”2. 查看链接器参数(如果没有勾选上面…...

从零开始打造 OpenSTLinux 6.6 Yocto 系统(基于STM32CubeMX)(九)

设备树移植 和uboot设备树修改的内容同步到kernel将设备树stm32mp157d-stm32mp157daa1-mx.dts复制到内核源码目录下 源码修改及编译 修改arch/arm/boot/dts/st/Makefile&#xff0c;新增设备树编译 stm32mp157f-ev1-m4-examples.dtb \stm32mp157d-stm32mp157daa1-mx.dtb修改…...

Spring Boot面试题精选汇总

&#x1f91f;致敬读者 &#x1f7e9;感谢阅读&#x1f7e6;笑口常开&#x1f7ea;生日快乐⬛早点睡觉 &#x1f4d8;博主相关 &#x1f7e7;博主信息&#x1f7e8;博客首页&#x1f7eb;专栏推荐&#x1f7e5;活动信息 文章目录 Spring Boot面试题精选汇总⚙️ **一、核心概…...

MySQL用户和授权

开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务&#xff1a; test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...

【JVM面试篇】高频八股汇总——类加载和类加载器

目录 1. 讲一下类加载过程&#xff1f; 2. Java创建对象的过程&#xff1f; 3. 对象的生命周期&#xff1f; 4. 类加载器有哪些&#xff1f; 5. 双亲委派模型的作用&#xff08;好处&#xff09;&#xff1f; 6. 讲一下类的加载和双亲委派原则&#xff1f; 7. 双亲委派模…...

Python网页自动化Selenium中文文档

1. 安装 1.1. 安装 Selenium Python bindings 提供了一个简单的API&#xff0c;让你使用Selenium WebDriver来编写功能/校验测试。 通过Selenium Python的API&#xff0c;你可以非常直观的使用Selenium WebDriver的所有功能。 Selenium Python bindings 使用非常简洁方便的A…...

游戏开发中常见的战斗数值英文缩写对照表

游戏开发中常见的战斗数值英文缩写对照表 基础属性&#xff08;Basic Attributes&#xff09; 缩写英文全称中文释义常见使用场景HPHit Points / Health Points生命值角色生存状态MPMana Points / Magic Points魔法值技能释放资源SPStamina Points体力值动作消耗资源APAction…...

Java多线程实现之Runnable接口深度解析

Java多线程实现之Runnable接口深度解析 一、Runnable接口概述1.1 接口定义1.2 与Thread类的关系1.3 使用Runnable接口的优势 二、Runnable接口的基本实现方式2.1 传统方式实现Runnable接口2.2 使用匿名内部类实现Runnable接口2.3 使用Lambda表达式实现Runnable接口 三、Runnabl…...

以太网PHY布局布线指南

1. 简介 对于以太网布局布线遵循以下准则很重要&#xff0c;因为这将有助于减少信号发射&#xff0c;最大程度地减少噪声&#xff0c;确保器件作用&#xff0c;最大程度地减少泄漏并提高信号质量。 2. PHY设计准则 2.1 DRC错误检查 首先检查DRC规则是否设置正确&#xff0c;然…...

宠物车载安全座椅市场报告:解读行业趋势与投资前景

一、什么是宠物车载安全座椅&#xff1f; 宠物车载安全座椅是一种专为宠物设计的车内固定装置&#xff0c;旨在保障宠物在乘车过程中的安全性与舒适性。它通常由高强度材料制成&#xff0c;具备良好的缓冲性能&#xff0c;并可通过安全带或ISOFIX接口固定于车内。 近年来&…...