当前位置: 首页 > news >正文

Spring Boot配置多个Kafka数据源

一、配置文件

application.properties配置文件如下

#kafka多数据源配置
#kafka数据源一,日志审计推送
spring.kafka.one.bootstrap-servers=172.19.12.109:32182
spring.kafka.one.producer.retries=0
spring.kafka.one.producer.properties.max.block.ms=5000
#kafka数据源二,动环数据消费
spring.kafka.two.bootstrap-servers=172.19.12.109:32182
spring.kafka.two.producer.retries=0
spring.kafka.two.producer.properties.max.block.ms=5000
spring.kafka.two.consumer.group-id=bw-convert-data
spring.kafka.two.consumer.enable-auto-commit=true

二、pom依赖

		<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

三、生产者、消费者配置

1.第一个kakfa

package com.gstanzer.convert.config;import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;@EnableKafka
@Configuration
public class KafkaOneConfig {@Value("${spring.kafka.one.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.one.producer.retries}")private String retries;@Value("${spring.kafka.one.producer.properties.max.block.ms}")private String maxBlockMs;@Beanpublic KafkaTemplate<String, String> kafkaOneTemplate() {return new KafkaTemplate<>(producerFactory());}private ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}private Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}
}

2.第二个kakfa

package com.gstanzer.convert.config;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
public class KafkaTwoConfig {@Value("${spring.kafka.two.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.two.producer.retries}")private String retries;@Value("${spring.kafka.two.producer.properties.max.block.ms}")private String maxBlockMs;@Value("${spring.kafka.two.consumer.group-id}")private String groupId;@Value("${spring.kafka.two.consumer.enable-auto-commit}")private boolean enableAutoCommit;@Beanpublic KafkaTemplate<String, String> kafkaTwoTemplate() {return new KafkaTemplate<>(producerFactory());}@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}private ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}public ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}private Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}private Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}}

四.生产者

@Controller
public class TestController {@Autowiredprivate KafkaTemplate kafkaOneTemplate;@Autowiredprivate KafkaTemplate kafkaTwoTemplate;@RequestMapping("/send")@ResponseBodypublic String send() {final String TOPIC = "TOPIC_1";kafkaOneTemplate.send(TOPIC, "kafka one");kafkaTwoTemplate.send(TOPIC, "kafka two");return "success";}
}

五.消费者

@Component
public class KafkaConsumer {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);final String TOPIC = "TOPIC_1";// containerFactory 的值要与配置中 KafkaListenerContainerFactory 的 Bean 名相同@KafkaListener(topics = {TOPIC}, containerFactory = "kafkaOneContainerFactory")public void listenerOne(ConsumerRecord<?, ?> record) {LOGGER.info(" kafka one 接收到消息:{}", record.value());}@KafkaListener(topics = {TOPIC}, containerFactory = "kafkaTwoContainerFactory")public void listenerTwo(ConsumerRecord<?, ?> record) {LOGGER.info(" kafka two 接收到消息:{}", record.value());}
}

备注:

生产者消费者代码参考链接,开发同学需要以实际情况按要求自己变更下代码即可:

Spring Boot 集成多个 Kafka_springboot集成多个kafka_//承续缘_纪录片的博客-CSDN博客

相关文章:

Spring Boot配置多个Kafka数据源

一、配置文件 application.properties配置文件如下 #kafka多数据源配置 #kafka数据源一&#xff0c;日志审计推送 spring.kafka.one.bootstrap-servers172.19.12.109:32182 spring.kafka.one.producer.retries0 spring.kafka.one.producer.properties.max.block.ms5000 #kafk…...

Learning Open-World Object Proposals without Learning to Classify(论文解析)

Learning Open-World Object Proposals without Learning to Classify 摘要1 介绍2 相关工作3 方法3.1 基线3.2 基于纯定位的对象性3.3. 对象定位网络 (OLN)4 实验4.1跨类泛化4.2.开放世界类不可知检测4.3更多的跨数据集泛化4.3.1 Objects365 泛化4.3.2 EpicKitchens 的泛化4.4…...

前端在项目中添加自己的功能页面

1.src—>mock–>sideMenue:边表(sidemenue)的子功能的添加&#xff1a;左边功能框中的显示 在相应的父功能添加子功能 id号不能和他人的一样&#xff0c;casecode:就是路由名字 title&#xff1a;中文名称 2.前后端接口(后端程序员给),定义好接口名称 src—>moudles—…...

数据库MySQL(二):DDL数据定义语言

数据定义语言&#xff08;Data Definition Language&#xff0c;DDL&#xff09; 该语言主要用于定义数据库对象&#xff0c;操作对象为数据库、表或字段。 数据库操作 # 查询所有数据库 SHOW DATABASES;# 查询当前数据库 SELECT DATABASE(); # 创建数据库 CREATE DATABASE […...

Spring FactoryBean 源码讲解

Spring FactoryBean 源码讲解 什么是Spring FactoryBean Spring FactoryBean是一个特殊的Bean&#xff0c;它实现了FactoryBean接口并重写了其getObject()方法&#xff0c;用于生产其他Bean的实例。在Spring容器启动时&#xff0c;会自动调用FactoryBean的getObject()方法来获…...

【C语言】零碎知识点|细节

除法运算符(/)的使用规则 在C语言中,除法运算符(/)的使用规则如下: 当两个整数相除时,结果也是一个整数。例如,如果A和B都是整数,那么A / B的结果也是一个整数。这意味着,除法运算的结果会忽略小数部分。例如,10 / 3 的结果是3,而不是3.3333。 当一个整数和一个浮点…...

电影评分数据分析案例-Spark SQL

# cording:utf8from pyspark.sql import SparkSession from pyspark.sql.types import IntegerType, StringType, StructType import pyspark.sql.functions as Fif __name__ __main__:# 0.构建执行环境入口对象SparkSessionspark SparkSession.builder.\appName(movie_demo)…...

vue如何使用冻结对象提升代码效率及其原理解析

先给大家伙整个实际工作中一定会碰到的问题 如下vue dome ,它的代码非常简单功能也1非常简单,就是一个按钮,点击后会显示有多少条数据 来看看源码, html部分就是一个按钮绑定了一个loadData事件,然后在p标签内展示了这个myData这个数据的长度 <template><div id&quo…...

基于深度学习网络的手势识别算法matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2022a 3.部分核心程序 clc; clear; close all; warning off; addpath(genpath(pwd)); rng(default)load gnet.mat[Pr…...

[论文笔记] 多语言模型中的负干扰研究结果和元学习算法

On Negative Interference in Multilingual Models: Findings and A Meta-Learning Treatment 多语言模型中的负干扰:研究结果和元学习解决办法 概述: 训练语料库大小(训练数据大小和 负干扰 无关)。 语言亲缘关系/语系 和 负干扰 有关。添加相似的语言并不能减轻负面干扰。…...

【OpenVINO】行人摔倒检测 — 基于 OpenVINO C# API 部署PP-Human-下篇

行人摔倒检测 — 基于 OpenVINO C# API 部署PP-Human 4. 配置 PP-Human_Fall_Detection 项目4.1 环境配置4.2 创建 AlxBoard_deploy_yolov8 项目4.3 添加项目源码4.4 添加 OpenVINO C# API4.5 添加 OpenCvSharp 5. 测试 PP-Human_Fall_Detection 项目5.1 创建视频读取器5.2 行人…...

运行报错(三)git bash报错fatal: detected dubious ownership in repository at

报错现象 在运行git 命令时&#xff0c;出现报错 “fatal: detected dubious ownership in repository at” 报错原因 文件夹的所有者和现在的用户不一致 栗子&#xff1a; 文件夹的所有者是root&#xff0c;而当前用户是admin 解决方案 方法一、 将文件夹的所有者替换成ad…...

nvm 的安装及使用

文章目录 一、nvm是什么&#xff1f;二、下载nvm三、在cmd控制台进行操作1、nvm 查询版本号2、查询可以下载的node版本3、安装指定版本4、查看已经安装的node版本5、切换node版本(如果失败那就用管理员身份打开cmd进行切换) 一、nvm是什么&#xff1f; nvm是一个node的版本管理…...

xcode Simulator 安装

xcode Simulator 安装 参考文档 xcode又又又升级了&#xff0c;升级完成之后不下载最新的 iOS 17 Simulator就不能编译运行了&#xff0c;只能静静的等他下载。但是离谱的是这个居然没有断点续下&#xff0c;每次都要重新下载&#xff0c;眼睁睁的看着下载了4个G然后断掉了从…...

【Maven教程】(八):使用 Nexus 创建私服 ~

Maven 使用 Nexus 创建私服 1️⃣ Nexus简介2️⃣ 安装 Nexus2.1 下载 Nexus2.2 Bundle 方式安装 Nexus2.3 WAR 方式安装 Nexus2.4 登录 Nexus 3️⃣ Nexus 的仓库与仓库组3.1 Nexus 内置的仓库3.2 Nexus 仓库分类的概念3.3 创建 Nexus 宿主仓库3.4 创建 Nexus 代理仓库3.5 创…...

螺旋矩阵[中等]

优质博文&#xff1a;IT-BLOG-CN 一、题目 给你一个m行n列的矩阵matrix&#xff0c;请按照顺时针螺旋顺序&#xff0c;返回矩阵中的所有元素。 示例 1&#xff1a; 输入&#xff1a;matrix [[1,2,3],[4,5,6],[7,8,9]] 输出&#xff1a;[1,2,3,6,9,8,7,4,5] 示例 2&#xf…...

babel6使用ES2020最新js语法

babel6使用ES2020最新js语法 Babel 6 原本是不支持 ES2020 语法&#xff0c;因为它是在 Babel 7 中引入的。如果您想使用 ES2020 语法&#xff0c;您需要将 Babel 6 升级到 Babel 7 或更高版本(推荐),当然也可以在bebel6中安装支持某个语法的plugin,比如你想使用 ES2020 中的可…...

【iOS】简单的网络请求

应iOS小组要求&#xff0c;仿写知乎日报需要实现网络请求并解析JSON格式数据&#xff0c;这篇文章仅对基本的网络请求和iOS中的JSON解析作以记录&#xff0c;还涉及到RunLoop的一点小插曲&#xff0c;具体请求过程和原理以后会详细学习&#xff01;&#x1f64f; 基本网络流程简…...

Vulnhub系列靶机---mhz_cxf: c1f

靶机文档&#xff1a;&#xff1a;mhz_cxf: c1f 下载地址&#xff1a;Download (Mirror): 网卡配置 靶机开机后按住shift&#xff0c;出现界面如图&#xff0c;按e键进入安全模式&#xff1a; 找到ro&#xff0c;删除该行后边内容&#xff0c;并将ro 。。。修改为&#xff1a…...

SDRAM与DRAM

SDRAM&#xff08;同步动态随机存取内存&#xff09;和DRAM&#xff08;动态随机存取内存&#xff09;都是RAM的一种类型&#xff0c;但是它们工作的方式有所不同。 DRAM&#xff1a;DRAM是最基础的动态随机存取内存&#xff0c;它的工作方式是总线在内存中读取或写入数据的速度…...

FFmpeg 低延迟同屏方案

引言 在实时互动需求激增的当下&#xff0c;无论是在线教育中的师生同屏演示、远程办公的屏幕共享协作&#xff0c;还是游戏直播的画面实时传输&#xff0c;低延迟同屏已成为保障用户体验的核心指标。FFmpeg 作为一款功能强大的多媒体框架&#xff0c;凭借其灵活的编解码、数据…...

第一篇:Agent2Agent (A2A) 协议——协作式人工智能的黎明

AI 领域的快速发展正在催生一个新时代&#xff0c;智能代理&#xff08;agents&#xff09;不再是孤立的个体&#xff0c;而是能够像一个数字团队一样协作。然而&#xff0c;当前 AI 生态系统的碎片化阻碍了这一愿景的实现&#xff0c;导致了“AI 巴别塔问题”——不同代理之间…...

项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)

Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败&#xff0c;具体原因是客户端发送了密码认证请求&#xff0c;但Redis服务器未设置密码 1.为Redis设置密码&#xff08;匹配客户端配置&#xff09; 步骤&#xff1a; 1&#xff09;.修…...

LeetCode - 199. 二叉树的右视图

题目 199. 二叉树的右视图 - 力扣&#xff08;LeetCode&#xff09; 思路 右视图是指从树的右侧看&#xff0c;对于每一层&#xff0c;只能看到该层最右边的节点。实现思路是&#xff1a; 使用深度优先搜索(DFS)按照"根-右-左"的顺序遍历树记录每个节点的深度对于…...

mac 安装homebrew (nvm 及git)

mac 安装nvm 及git 万恶之源 mac 安装这些东西离不开Xcode。及homebrew 一、先说安装git步骤 通用&#xff1a; 方法一&#xff1a;使用 Homebrew 安装 Git&#xff08;推荐&#xff09; 步骤如下&#xff1a;打开终端&#xff08;Terminal.app&#xff09; 1.安装 Homebrew…...

手机平板能效生态设计指令EU 2023/1670标准解读

手机平板能效生态设计指令EU 2023/1670标准解读 以下是针对欧盟《手机和平板电脑生态设计法规》(EU) 2023/1670 的核心解读&#xff0c;综合法规核心要求、最新修正及企业合规要点&#xff1a; 一、法规背景与目标 生效与强制时间 发布于2023年8月31日&#xff08;OJ公报&…...

Qt 事件处理中 return 的深入解析

Qt 事件处理中 return 的深入解析 在 Qt 事件处理中&#xff0c;return 语句的使用是另一个关键概念&#xff0c;它与 event->accept()/event->ignore() 密切相关但作用不同。让我们详细分析一下它们之间的关系和工作原理。 核心区别&#xff1a;不同层级的事件处理 方…...

基于鸿蒙(HarmonyOS5)的打车小程序

1. 开发环境准备 安装DevEco Studio (鸿蒙官方IDE)配置HarmonyOS SDK申请开发者账号和必要的API密钥 2. 项目结构设计 ├── entry │ ├── src │ │ ├── main │ │ │ ├── ets │ │ │ │ ├── pages │ │ │ │ │ ├── H…...

书籍“之“字形打印矩阵(8)0609

题目 给定一个矩阵matrix&#xff0c;按照"之"字形的方式打印这个矩阵&#xff0c;例如&#xff1a; 1 2 3 4 5 6 7 8 9 10 11 12 ”之“字形打印的结果为&#xff1a;1&#xff0c;…...

高效的后台管理系统——可进行二次开发

随着互联网技术的迅猛发展&#xff0c;企业的数字化管理变得愈加重要。后台管理系统作为数据存储与业务管理的核心&#xff0c;成为了现代企业不可或缺的一部分。今天我们要介绍的是一款名为 若依后台管理框架 的系统&#xff0c;它不仅支持跨平台应用&#xff0c;还能提供丰富…...