当前位置: 首页 > news >正文

Spring Boot配置多个Kafka数据源

一、配置文件

application.properties配置文件如下

#kafka多数据源配置
#kafka数据源一,日志审计推送
spring.kafka.one.bootstrap-servers=172.19.12.109:32182
spring.kafka.one.producer.retries=0
spring.kafka.one.producer.properties.max.block.ms=5000
#kafka数据源二,动环数据消费
spring.kafka.two.bootstrap-servers=172.19.12.109:32182
spring.kafka.two.producer.retries=0
spring.kafka.two.producer.properties.max.block.ms=5000
spring.kafka.two.consumer.group-id=bw-convert-data
spring.kafka.two.consumer.enable-auto-commit=true

二、pom依赖

		<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

三、生产者、消费者配置

1.第一个kakfa

package com.gstanzer.convert.config;import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;@EnableKafka
@Configuration
public class KafkaOneConfig {@Value("${spring.kafka.one.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.one.producer.retries}")private String retries;@Value("${spring.kafka.one.producer.properties.max.block.ms}")private String maxBlockMs;@Beanpublic KafkaTemplate<String, String> kafkaOneTemplate() {return new KafkaTemplate<>(producerFactory());}private ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}private Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}
}

2.第二个kakfa

package com.gstanzer.convert.config;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
public class KafkaTwoConfig {@Value("${spring.kafka.two.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.two.producer.retries}")private String retries;@Value("${spring.kafka.two.producer.properties.max.block.ms}")private String maxBlockMs;@Value("${spring.kafka.two.consumer.group-id}")private String groupId;@Value("${spring.kafka.two.consumer.enable-auto-commit}")private boolean enableAutoCommit;@Beanpublic KafkaTemplate<String, String> kafkaTwoTemplate() {return new KafkaTemplate<>(producerFactory());}@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}private ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}public ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}private Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}private Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}}

四.生产者

@Controller
public class TestController {@Autowiredprivate KafkaTemplate kafkaOneTemplate;@Autowiredprivate KafkaTemplate kafkaTwoTemplate;@RequestMapping("/send")@ResponseBodypublic String send() {final String TOPIC = "TOPIC_1";kafkaOneTemplate.send(TOPIC, "kafka one");kafkaTwoTemplate.send(TOPIC, "kafka two");return "success";}
}

五.消费者

@Component
public class KafkaConsumer {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);final String TOPIC = "TOPIC_1";// containerFactory 的值要与配置中 KafkaListenerContainerFactory 的 Bean 名相同@KafkaListener(topics = {TOPIC}, containerFactory = "kafkaOneContainerFactory")public void listenerOne(ConsumerRecord<?, ?> record) {LOGGER.info(" kafka one 接收到消息:{}", record.value());}@KafkaListener(topics = {TOPIC}, containerFactory = "kafkaTwoContainerFactory")public void listenerTwo(ConsumerRecord<?, ?> record) {LOGGER.info(" kafka two 接收到消息:{}", record.value());}
}

备注:

生产者消费者代码参考链接,开发同学需要以实际情况按要求自己变更下代码即可:

Spring Boot 集成多个 Kafka_springboot集成多个kafka_//承续缘_纪录片的博客-CSDN博客

相关文章:

Spring Boot配置多个Kafka数据源

一、配置文件 application.properties配置文件如下 #kafka多数据源配置 #kafka数据源一&#xff0c;日志审计推送 spring.kafka.one.bootstrap-servers172.19.12.109:32182 spring.kafka.one.producer.retries0 spring.kafka.one.producer.properties.max.block.ms5000 #kafk…...

Learning Open-World Object Proposals without Learning to Classify(论文解析)

Learning Open-World Object Proposals without Learning to Classify 摘要1 介绍2 相关工作3 方法3.1 基线3.2 基于纯定位的对象性3.3. 对象定位网络 (OLN)4 实验4.1跨类泛化4.2.开放世界类不可知检测4.3更多的跨数据集泛化4.3.1 Objects365 泛化4.3.2 EpicKitchens 的泛化4.4…...

前端在项目中添加自己的功能页面

1.src—>mock–>sideMenue:边表(sidemenue)的子功能的添加&#xff1a;左边功能框中的显示 在相应的父功能添加子功能 id号不能和他人的一样&#xff0c;casecode:就是路由名字 title&#xff1a;中文名称 2.前后端接口(后端程序员给),定义好接口名称 src—>moudles—…...

数据库MySQL(二):DDL数据定义语言

数据定义语言&#xff08;Data Definition Language&#xff0c;DDL&#xff09; 该语言主要用于定义数据库对象&#xff0c;操作对象为数据库、表或字段。 数据库操作 # 查询所有数据库 SHOW DATABASES;# 查询当前数据库 SELECT DATABASE(); # 创建数据库 CREATE DATABASE […...

Spring FactoryBean 源码讲解

Spring FactoryBean 源码讲解 什么是Spring FactoryBean Spring FactoryBean是一个特殊的Bean&#xff0c;它实现了FactoryBean接口并重写了其getObject()方法&#xff0c;用于生产其他Bean的实例。在Spring容器启动时&#xff0c;会自动调用FactoryBean的getObject()方法来获…...

【C语言】零碎知识点|细节

除法运算符(/)的使用规则 在C语言中,除法运算符(/)的使用规则如下: 当两个整数相除时,结果也是一个整数。例如,如果A和B都是整数,那么A / B的结果也是一个整数。这意味着,除法运算的结果会忽略小数部分。例如,10 / 3 的结果是3,而不是3.3333。 当一个整数和一个浮点…...

电影评分数据分析案例-Spark SQL

# cording:utf8from pyspark.sql import SparkSession from pyspark.sql.types import IntegerType, StringType, StructType import pyspark.sql.functions as Fif __name__ __main__:# 0.构建执行环境入口对象SparkSessionspark SparkSession.builder.\appName(movie_demo)…...

vue如何使用冻结对象提升代码效率及其原理解析

先给大家伙整个实际工作中一定会碰到的问题 如下vue dome ,它的代码非常简单功能也1非常简单,就是一个按钮,点击后会显示有多少条数据 来看看源码, html部分就是一个按钮绑定了一个loadData事件,然后在p标签内展示了这个myData这个数据的长度 <template><div id&quo…...

基于深度学习网络的手势识别算法matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2022a 3.部分核心程序 clc; clear; close all; warning off; addpath(genpath(pwd)); rng(default)load gnet.mat[Pr…...

[论文笔记] 多语言模型中的负干扰研究结果和元学习算法

On Negative Interference in Multilingual Models: Findings and A Meta-Learning Treatment 多语言模型中的负干扰:研究结果和元学习解决办法 概述: 训练语料库大小(训练数据大小和 负干扰 无关)。 语言亲缘关系/语系 和 负干扰 有关。添加相似的语言并不能减轻负面干扰。…...

【OpenVINO】行人摔倒检测 — 基于 OpenVINO C# API 部署PP-Human-下篇

行人摔倒检测 — 基于 OpenVINO C# API 部署PP-Human 4. 配置 PP-Human_Fall_Detection 项目4.1 环境配置4.2 创建 AlxBoard_deploy_yolov8 项目4.3 添加项目源码4.4 添加 OpenVINO C# API4.5 添加 OpenCvSharp 5. 测试 PP-Human_Fall_Detection 项目5.1 创建视频读取器5.2 行人…...

运行报错(三)git bash报错fatal: detected dubious ownership in repository at

报错现象 在运行git 命令时&#xff0c;出现报错 “fatal: detected dubious ownership in repository at” 报错原因 文件夹的所有者和现在的用户不一致 栗子&#xff1a; 文件夹的所有者是root&#xff0c;而当前用户是admin 解决方案 方法一、 将文件夹的所有者替换成ad…...

nvm 的安装及使用

文章目录 一、nvm是什么&#xff1f;二、下载nvm三、在cmd控制台进行操作1、nvm 查询版本号2、查询可以下载的node版本3、安装指定版本4、查看已经安装的node版本5、切换node版本(如果失败那就用管理员身份打开cmd进行切换) 一、nvm是什么&#xff1f; nvm是一个node的版本管理…...

xcode Simulator 安装

xcode Simulator 安装 参考文档 xcode又又又升级了&#xff0c;升级完成之后不下载最新的 iOS 17 Simulator就不能编译运行了&#xff0c;只能静静的等他下载。但是离谱的是这个居然没有断点续下&#xff0c;每次都要重新下载&#xff0c;眼睁睁的看着下载了4个G然后断掉了从…...

【Maven教程】(八):使用 Nexus 创建私服 ~

Maven 使用 Nexus 创建私服 1️⃣ Nexus简介2️⃣ 安装 Nexus2.1 下载 Nexus2.2 Bundle 方式安装 Nexus2.3 WAR 方式安装 Nexus2.4 登录 Nexus 3️⃣ Nexus 的仓库与仓库组3.1 Nexus 内置的仓库3.2 Nexus 仓库分类的概念3.3 创建 Nexus 宿主仓库3.4 创建 Nexus 代理仓库3.5 创…...

螺旋矩阵[中等]

优质博文&#xff1a;IT-BLOG-CN 一、题目 给你一个m行n列的矩阵matrix&#xff0c;请按照顺时针螺旋顺序&#xff0c;返回矩阵中的所有元素。 示例 1&#xff1a; 输入&#xff1a;matrix [[1,2,3],[4,5,6],[7,8,9]] 输出&#xff1a;[1,2,3,6,9,8,7,4,5] 示例 2&#xf…...

babel6使用ES2020最新js语法

babel6使用ES2020最新js语法 Babel 6 原本是不支持 ES2020 语法&#xff0c;因为它是在 Babel 7 中引入的。如果您想使用 ES2020 语法&#xff0c;您需要将 Babel 6 升级到 Babel 7 或更高版本(推荐),当然也可以在bebel6中安装支持某个语法的plugin,比如你想使用 ES2020 中的可…...

【iOS】简单的网络请求

应iOS小组要求&#xff0c;仿写知乎日报需要实现网络请求并解析JSON格式数据&#xff0c;这篇文章仅对基本的网络请求和iOS中的JSON解析作以记录&#xff0c;还涉及到RunLoop的一点小插曲&#xff0c;具体请求过程和原理以后会详细学习&#xff01;&#x1f64f; 基本网络流程简…...

Vulnhub系列靶机---mhz_cxf: c1f

靶机文档&#xff1a;&#xff1a;mhz_cxf: c1f 下载地址&#xff1a;Download (Mirror): 网卡配置 靶机开机后按住shift&#xff0c;出现界面如图&#xff0c;按e键进入安全模式&#xff1a; 找到ro&#xff0c;删除该行后边内容&#xff0c;并将ro 。。。修改为&#xff1a…...

SDRAM与DRAM

SDRAM&#xff08;同步动态随机存取内存&#xff09;和DRAM&#xff08;动态随机存取内存&#xff09;都是RAM的一种类型&#xff0c;但是它们工作的方式有所不同。 DRAM&#xff1a;DRAM是最基础的动态随机存取内存&#xff0c;它的工作方式是总线在内存中读取或写入数据的速度…...

【WiFi帧结构】

文章目录 帧结构MAC头部管理帧 帧结构 Wi-Fi的帧分为三部分组成&#xff1a;MAC头部frame bodyFCS&#xff0c;其中MAC是固定格式的&#xff0c;frame body是可变长度。 MAC头部有frame control&#xff0c;duration&#xff0c;address1&#xff0c;address2&#xff0c;addre…...

Cesium1.95中高性能加载1500个点

一、基本方式&#xff1a; 图标使用.png比.svg性能要好 <template><div id"cesiumContainer"></div><div class"toolbar"><button id"resetButton">重新生成点</button><span id"countDisplay&qu…...

如何在看板中体现优先级变化

在看板中有效体现优先级变化的关键措施包括&#xff1a;采用颜色或标签标识优先级、设置任务排序规则、使用独立的优先级列或泳道、结合自动化规则同步优先级变化、建立定期的优先级审查流程。其中&#xff0c;设置任务排序规则尤其重要&#xff0c;因为它让看板视觉上直观地体…...

【快手拥抱开源】通过快手团队开源的 KwaiCoder-AutoThink-preview 解锁大语言模型的潜力

引言&#xff1a; 在人工智能快速发展的浪潮中&#xff0c;快手Kwaipilot团队推出的 KwaiCoder-AutoThink-preview 具有里程碑意义——这是首个公开的AutoThink大语言模型&#xff08;LLM&#xff09;。该模型代表着该领域的重大突破&#xff0c;通过独特方式融合思考与非思考…...

新能源汽车智慧充电桩管理方案:新能源充电桩散热问题及消防安全监管方案

随着新能源汽车的快速普及&#xff0c;充电桩作为核心配套设施&#xff0c;其安全性与可靠性备受关注。然而&#xff0c;在高温、高负荷运行环境下&#xff0c;充电桩的散热问题与消防安全隐患日益凸显&#xff0c;成为制约行业发展的关键瓶颈。 如何通过智慧化管理手段优化散…...

微信小程序云开发平台MySQL的连接方式

注&#xff1a;微信小程序云开发平台指的是腾讯云开发 先给结论&#xff1a;微信小程序云开发平台的MySQL&#xff0c;无法通过获取数据库连接信息的方式进行连接&#xff0c;连接只能通过云开发的SDK连接&#xff0c;具体要参考官方文档&#xff1a; 为什么&#xff1f; 因为…...

高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数

高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数 在软件开发中,单例模式(Singleton Pattern)是一种常见的设计模式,确保一个类仅有一个实例,并提供一个全局访问点。在多线程环境下,实现单例模式时需要注意线程安全问题,以防止多个线程同时创建实例,导致…...

Spring是如何解决Bean的循环依赖:三级缓存机制

1、什么是 Bean 的循环依赖 在 Spring框架中,Bean 的循环依赖是指多个 Bean 之间‌互相持有对方引用‌,形成闭环依赖关系的现象。 多个 Bean 的依赖关系构成环形链路,例如: 双向依赖:Bean A 依赖 Bean B,同时 Bean B 也依赖 Bean A(A↔B)。链条循环: Bean A → Bean…...

网站指纹识别

网站指纹识别 网站的最基本组成&#xff1a;服务器&#xff08;操作系统&#xff09;、中间件&#xff08;web容器&#xff09;、脚本语言、数据厍 为什么要了解这些&#xff1f;举个例子&#xff1a;发现了一个文件读取漏洞&#xff0c;我们需要读/etc/passwd&#xff0c;如…...

【网络安全】开源系统getshell漏洞挖掘

审计过程&#xff1a; 在入口文件admin/index.php中&#xff1a; 用户可以通过m,c,a等参数控制加载的文件和方法&#xff0c;在app/system/entrance.php中存在重点代码&#xff1a; 当M_TYPE system并且M_MODULE include时&#xff0c;会设置常量PATH_OWN_FILE为PATH_APP.M_T…...