当前位置: 首页 > news >正文

SpringBoot Kafka消费者 多kafka配置

一、配置文件

xxxxxx:kafka:bootstrap-servers: xx.xx.xx.xx:9092,xx.xx.xx.xx:9092consumer:poll-timeout: 3000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-commit: falseoffset-reset: earliestrecords: 10session-timeout: 150000poll-interval: 360000request-timeout: 60000

二、KafkaConfig

package com.xxxxxx.xxxxxx.config;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.*;import java.util.HashMap;
import java.util.Map;@Slf4j
@Configuration
@EnableKafka
public class KafkaConfig {@Value("${xxxxxx.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${xxxxxx.kafka.consumer.poll-timeout}")private Integer pollTimeout;@Value("${xxxxxx.kafka.consumer.key-deserializer}")private String keyDeserializer;@Value("${xxxxxx.kafka.consumer.value-deserializer}")private String valueDeserializer;@Value("${xxxxxx.kafka.consumer.auto-commit}")private String autoCommit;@Value("${xxxxxx.kafka.consumer.offset-reset}")private String offsetReset;@Value("${xxxxxx.kafka.consumer.records}")private Integer records;@Value("${xxxxxx.kafka.consumer.session-timeout}")private Integer sessionTimeout;@Value("${xxxxxx.kafka.consumer.poll-interval}")private Integer pollInterval;@Value("${xxxxxx.kafka.consumer.request-timeout}")private Integer requestTimeout;@Bean(name = "ixxxxxxKafkaListenerContainerFactory")public KafkaListenerContainerFactory integratedEnergyKafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());//并发数量factory.setConcurrency(3);//设置在消费者中等待记录的最大阻塞时间。factory.getContainerProperties().setPollTimeout(pollTimeout);//ack模式factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}private ConsumerFactory consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}private Map consumerConfigs() {Map props = new HashMap<>();//Kafka集群props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);//消费者组,只要group.id相同,就属于同一个消费者组//props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//是否自动提交offset,默认为true,设置为falseprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);//key反序列化器props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);//value反序列化器props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);//一次消费信息条数props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, records);//earliest:第一次从头开始消费,之后按照offset开始消费;latest:只消费自己启动之后的消息props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);//session超时时间props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);//消费者轮询获取消息的最大时间间隔,超过此时间未获取消息,组将重新平衡,以便将分区重新分配给另一个成员props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, pollInterval);//客户端发起请求后,等待响应的最大时间。如果超时之前未收到响应,客户端会在必要时重新发起请求props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);return props;}
}

三、消费者

@KafkaListener(containerFactory = "xxxxxxEnergyKafkaListenerContainerFactory",id = "itsId",idIsGroup = false,groupId = "itsGroupId",topics = "itsTopic")public void consumerUser(@Payload String data,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,Acknowledgment ack,Consumer<?, ?> consumer){try{}catch (Exception e){}ack.acknowledge();}

相关文章:

SpringBoot Kafka消费者 多kafka配置

一、配置文件 xxxxxx:kafka:bootstrap-servers: xx.xx.xx.xx:9092,xx.xx.xx.xx:9092consumer:poll-timeout: 3000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer…...

git 标签相关命令

要在本地仓库中添加标签&#xff0c;你可以使用以下命令&#xff1a; git tag <tag_name> 这将在当前所处的提交上创建一个轻量级标签&#xff08;lightweight tag&#xff09;。如果你想要创建一个带有附注信息的标签&#xff0c;可以使用 -a 选项&#xff1a; git t…...

我在Vscode学OpenCV 图像运算(权重、逻辑运算、掩码、位分解、数字水印)

文章目录 权重 _ 要求两幅图像是相同大小的。[ 1 ] 以数据说话&#xff08; 1&#xff09; 最终&#xff1a;&#xff08; 2 &#xff09;gamma _输出图像的标量值 [ 2 ] 图像的展现力gamma并不等同于增加曝光度&#xff08; 1 &#xff09;gamma100&#xff08; 2 &#xff09…...

【 Docker: 数据卷挂载】

背景 Docker只提供了容器运行的必备依赖&#xff0c;但是一些编辑等操作的依赖是不支持的&#xff0c;如vi操作容器内部文件、将静态资源拷贝到容器内来等。 docker pull nginx docker run -d -p 81:80 --namemynginx -v D:/docker/nginx/www:/usr/share/nginx/www -v D:/dock…...

windows上的静态链接和动态链接的区别与作用(笔记)

C源代码文件经过预编译、编译和汇编后输出的目标文件的后缀和操作系统是有关系的。不同的操作系统使用不同的命名约定和文件格式来表示目标文件。常见的目标文件后缀包括&#xff1a; - Windows系统&#xff1a;.obj、.lib、.dll - Linux系统&#xff1a;.o、.a、.so - macOS系…...

MySQL和Postgresql数据库备份和恢复

MySQL和Postgresql数据库备份和恢复 一、MySQL数据库备份 备份单个数据库 $ mysqldump -uroot -p bdname > dbname.sql备份多个数据库 $ mysqldump -uroot -p --databases dbname1 dbname2 ... > dbname.sql # 备份所有数据库 $ mysqldump -uroot -p --all-databases…...

使用MCU上的I2C总线进行传感器应用

使用MCU上的I2C总线进行传感器应用是嵌入式系统开发中常见的任务&#xff0c;本文将介绍在MCU上实现I2C总线传感器应用的相关技术和流程。 首先&#xff0c;I2C&#xff08;Inter-Integrated Circuit&#xff09;总线是一种常用的串行通信协议&#xff0c;用于连接多个设备&am…...

汽车标定技术(七)--基于模型开发如何生成完整的A2L文件(2)

目录 1. 自定义ASAP2文件 2. asap2userlib.tlc需要修改的部分 3. 标定量观测量地址替换 3.1 由elf文件替换 3.2 由map文件替换 3.3 正则表达式&#xff08;含asap2post.m修改方法&#xff09; 4.小结 书接上文汽车标定技术(五)--基于模型开发如何生成完整的A2L文件(1)-C…...

ZZ308 物联网应用与服务赛题第E套

2023年全国职业院校技能大赛 中职组 物联网应用与服务 任 务 书 &#xff08;E卷&#xff09; 赛位号&#xff1a;______________ 竞赛须知 一、注意事项 1.检查硬件设备、电脑设备是否正常。检查竞赛所需的各项设备、软件和竞赛材料等&#xff1b; 2.竞赛任务中所使用的…...

web相关框架

web相关框架 web 后端开发框架 expressKoaHapiNest web 前端UI库&#xff08;组件库&#xff09; Naive-UiAnt Design VueElement plus 后端页面&#xff0c;前端框架(一个开箱即用前端框架) naive-ui-admingin-vue-adminvue-vben-adminvue-pure-adminvue3-antd-admin 无…...

安装dubbo-admin报错node版本和test错误

✅作者简介&#xff1a;CSDN内容合伙人、信息安全专业在校大学生&#x1f3c6; &#x1f525;系列专栏 &#xff1a;dubbo-admin安装 &#x1f4c3;新人博主 &#xff1a;欢迎点赞收藏关注&#xff0c;会回访&#xff01; &#x1f4ac;舞台再大&#xff0c;你不上台&#xff0…...

HTML使用canvas绘制海报(网络图片)

生成前&#xff1a; 生成后&#xff1a; <!DOCTYPE html> <html><head><meta charset"utf-8"><title>媒体参会嘉宾邀请函生成链接</title><link rel"stylesheet" href"https://cdn.jsdelivr.net/npm/vant2.10…...

20道高频JavaScript面试题快问快答

※其他的快问快答&#xff0c;看这里&#xff01; 10道高频Qiankun微前端面试题快问快答 10道高频webpack面试题快问快答 20道高频CSS面试题快问快答 20道高频JavaScript面试题快问快答 30道高频Vue面试题快问快答 面试中的快问快答 快问快答的情景在面试中非常常见。 在面试过…...

【STM32】HAL库UART含校验位的串口通信配置BUG避坑

【STM32】HAL库UART含校验位的串口通信配置BUG避坑 文章目录 UART协议校验位HAL库配置含校验位的串口配置BUG避坑附录&#xff1a;Cortex-M架构的SysTick系统定时器精准延时和MCU位带操作SysTick系统定时器精准延时延时函数阻塞延时非阻塞延时 位带操作位带代码位带宏定义总线函…...

Python实用技巧:将 Excel转为PDF

将Excel文件转换为PDF可以方便储存表格数据&#xff0c;此外在打印或共享文档时也能确保表格样式布局等在不同设备和操作系统上保持一致。今天给大家分享一个使用第三方Python库Spire.XLS for Python 实现Excel转PDF的简单方法。 实现步骤 首先&#xff0c;通过pip命令来安装依…...

【面经】讲一下你对jvm和jmm的了解

JVM JVM是Java虚拟机&#xff0c;是Java程序的执行环境。它是一种虚拟的计算机&#xff0c;通过在实际的计算机上仿真模拟各种计算机功能来实现. JVM是Java程序运行的核心&#xff0c;可以将Java字节码转换为可执行的机器码&#xff0c;提供了跨平台性、优秀的垃圾回收器&…...

《网络协议》03. 传输层(TCP UDP)

title: 《网络协议》03. 传输层&#xff08;TCP & UDP&#xff09; date: 2022-09-04 22:37:11 updated: 2023-11-08 15:58:52 categories: 学习记录&#xff1a;网络协议 excerpt: 传输层、UDP、TCP&#xff08;可靠传输&#xff0c;流量控制&#xff0c;拥塞控制&#xf…...

ZooKeeper调优

服务器硬件配置 建议 Zookeeper 的服务器最好专属(或是资源隔离的)。磁盘由于 Zookeeper 的数据写入磁盘,强烈建议要使用 SSD。 Linux操作系统优化 Zookeeper 的性能会很明显受到交换分区的影响。建议部署 Zookeeper 的服务器关闭交换分区功能或是通过内核参数调整,减少…...

改进YOLOv5:结合ICCV2023|动态蛇形卷积,构建不规则目标识别网络

🔥🔥🔥 提升多尺度、不规则目标检测,创新提升 🔥🔥🔥 🔥🔥🔥 捕捉图像特征和处理复杂图像特征 🔥🔥🔥 👉👉👉: 本专栏包含大量的新设计的创新想法,包含详细的代码和说明,具备有效的创新组合,可以有效应用到改进创新当中 👉👉👉: �…...

开发知识点-NodeJs-npm/Pnpm/Vite/Yarn包管理器

包管理器 vue-cli-service 不是内部或外部命令&#xff0c;也不是可运行的程序npm 全局变量pnpmPnpm介绍ViteYarn ‘vue-cli-service’ 不是内部或外部命令&#xff0c;也不是可运行的程序 yarn yarn add vue-amap yarn add vue-amap ant-design-vue npm 全局变量 换主机 新…...

微软PowerBI考试 PL300-选择 Power BI 模型框架【附练习数据】

微软PowerBI考试 PL300-选择 Power BI 模型框架 20 多年来&#xff0c;Microsoft 持续对企业商业智能 (BI) 进行大量投资。 Azure Analysis Services (AAS) 和 SQL Server Analysis Services (SSAS) 基于无数企业使用的成熟的 BI 数据建模技术。 同样的技术也是 Power BI 数据…...

React第五十七节 Router中RouterProvider使用详解及注意事项

前言 在 React Router v6.4 中&#xff0c;RouterProvider 是一个核心组件&#xff0c;用于提供基于数据路由&#xff08;data routers&#xff09;的新型路由方案。 它替代了传统的 <BrowserRouter>&#xff0c;支持更强大的数据加载和操作功能&#xff08;如 loader 和…...

visual studio 2022更改主题为深色

visual studio 2022更改主题为深色 点击visual studio 上方的 工具-> 选项 在选项窗口中&#xff0c;选择 环境 -> 常规 &#xff0c;将其中的颜色主题改成深色 点击确定&#xff0c;更改完成...

【JVM】- 内存结构

引言 JVM&#xff1a;Java Virtual Machine 定义&#xff1a;Java虚拟机&#xff0c;Java二进制字节码的运行环境好处&#xff1a; 一次编写&#xff0c;到处运行自动内存管理&#xff0c;垃圾回收的功能数组下标越界检查&#xff08;会抛异常&#xff0c;不会覆盖到其他代码…...

定时器任务——若依源码分析

分析util包下面的工具类schedule utils&#xff1a; ScheduleUtils 是若依中用于与 Quartz 框架交互的工具类&#xff0c;封装了定时任务的 创建、更新、暂停、删除等核心逻辑。 createScheduleJob createScheduleJob 用于将任务注册到 Quartz&#xff0c;先构建任务的 JobD…...

基于当前项目通过npm包形式暴露公共组件

1.package.sjon文件配置 其中xh-flowable就是暴露出去的npm包名 2.创建tpyes文件夹&#xff0c;并新增内容 3.创建package文件夹...

WordPress插件:AI多语言写作与智能配图、免费AI模型、SEO文章生成

厌倦手动写WordPress文章&#xff1f;AI自动生成&#xff0c;效率提升10倍&#xff01; 支持多语言、自动配图、定时发布&#xff0c;让内容创作更轻松&#xff01; AI内容生成 → 不想每天写文章&#xff1f;AI一键生成高质量内容&#xff01;多语言支持 → 跨境电商必备&am…...

Rust 异步编程

Rust 异步编程 引言 Rust 是一种系统编程语言,以其高性能、安全性以及零成本抽象而著称。在多核处理器成为主流的今天,异步编程成为了一种提高应用性能、优化资源利用的有效手段。本文将深入探讨 Rust 异步编程的核心概念、常用库以及最佳实践。 异步编程基础 什么是异步…...

GC1808高性能24位立体声音频ADC芯片解析

1. 芯片概述 GC1808是一款24位立体声音频模数转换器&#xff08;ADC&#xff09;&#xff0c;支持8kHz~96kHz采样率&#xff0c;集成Δ-Σ调制器、数字抗混叠滤波器和高通滤波器&#xff0c;适用于高保真音频采集场景。 2. 核心特性 高精度&#xff1a;24位分辨率&#xff0c…...

高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数

高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数 在软件开发中,单例模式(Singleton Pattern)是一种常见的设计模式,确保一个类仅有一个实例,并提供一个全局访问点。在多线程环境下,实现单例模式时需要注意线程安全问题,以防止多个线程同时创建实例,导致…...