Spring Boot配置多个Kafka数据源
一、配置文件
application.properties配置文件如下
#kafka多数据源配置
#kafka数据源一,日志审计推送
spring.kafka.one.bootstrap-servers=172.19.12.109:32182
spring.kafka.one.producer.retries=0
spring.kafka.one.producer.properties.max.block.ms=5000
#kafka数据源二,动环数据消费
spring.kafka.two.bootstrap-servers=172.19.12.109:32182
spring.kafka.two.producer.retries=0
spring.kafka.two.producer.properties.max.block.ms=5000
spring.kafka.two.consumer.group-id=bw-convert-data
spring.kafka.two.consumer.enable-auto-commit=true
二、pom依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
三、生产者、消费者配置
1.第一个kakfa
package com.gstanzer.convert.config;import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;@EnableKafka
@Configuration
public class KafkaOneConfig {@Value("${spring.kafka.one.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.one.producer.retries}")private String retries;@Value("${spring.kafka.one.producer.properties.max.block.ms}")private String maxBlockMs;@Beanpublic KafkaTemplate<String, String> kafkaOneTemplate() {return new KafkaTemplate<>(producerFactory());}private ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}private Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}
}
2.第二个kakfa
package com.gstanzer.convert.config;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
public class KafkaTwoConfig {@Value("${spring.kafka.two.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.two.producer.retries}")private String retries;@Value("${spring.kafka.two.producer.properties.max.block.ms}")private String maxBlockMs;@Value("${spring.kafka.two.consumer.group-id}")private String groupId;@Value("${spring.kafka.two.consumer.enable-auto-commit}")private boolean enableAutoCommit;@Beanpublic KafkaTemplate<String, String> kafkaTwoTemplate() {return new KafkaTemplate<>(producerFactory());}@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}private ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}public ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}private Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}private Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}}
四.生产者
@Controller
public class TestController {@Autowiredprivate KafkaTemplate kafkaOneTemplate;@Autowiredprivate KafkaTemplate kafkaTwoTemplate;@RequestMapping("/send")@ResponseBodypublic String send() {final String TOPIC = "TOPIC_1";kafkaOneTemplate.send(TOPIC, "kafka one");kafkaTwoTemplate.send(TOPIC, "kafka two");return "success";}
}
五.消费者
@Component
public class KafkaConsumer {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);final String TOPIC = "TOPIC_1";// containerFactory 的值要与配置中 KafkaListenerContainerFactory 的 Bean 名相同@KafkaListener(topics = {TOPIC}, containerFactory = "kafkaOneContainerFactory")public void listenerOne(ConsumerRecord<?, ?> record) {LOGGER.info(" kafka one 接收到消息:{}", record.value());}@KafkaListener(topics = {TOPIC}, containerFactory = "kafkaTwoContainerFactory")public void listenerTwo(ConsumerRecord<?, ?> record) {LOGGER.info(" kafka two 接收到消息:{}", record.value());}
}
备注:
生产者消费者代码参考链接,开发同学需要以实际情况按要求自己变更下代码即可:
Spring Boot 集成多个 Kafka_springboot集成多个kafka_//承续缘_纪录片的博客-CSDN博客
相关文章:
Spring Boot配置多个Kafka数据源
一、配置文件 application.properties配置文件如下 #kafka多数据源配置 #kafka数据源一,日志审计推送 spring.kafka.one.bootstrap-servers172.19.12.109:32182 spring.kafka.one.producer.retries0 spring.kafka.one.producer.properties.max.block.ms5000 #kafk…...
Learning Open-World Object Proposals without Learning to Classify(论文解析)
Learning Open-World Object Proposals without Learning to Classify 摘要1 介绍2 相关工作3 方法3.1 基线3.2 基于纯定位的对象性3.3. 对象定位网络 (OLN)4 实验4.1跨类泛化4.2.开放世界类不可知检测4.3更多的跨数据集泛化4.3.1 Objects365 泛化4.3.2 EpicKitchens 的泛化4.4…...
前端在项目中添加自己的功能页面
1.src—>mock–>sideMenue:边表(sidemenue)的子功能的添加:左边功能框中的显示 在相应的父功能添加子功能 id号不能和他人的一样,casecode:就是路由名字 title:中文名称 2.前后端接口(后端程序员给),定义好接口名称 src—>moudles—…...
数据库MySQL(二):DDL数据定义语言
数据定义语言(Data Definition Language,DDL) 该语言主要用于定义数据库对象,操作对象为数据库、表或字段。 数据库操作 # 查询所有数据库 SHOW DATABASES;# 查询当前数据库 SELECT DATABASE(); # 创建数据库 CREATE DATABASE […...

Spring FactoryBean 源码讲解
Spring FactoryBean 源码讲解 什么是Spring FactoryBean Spring FactoryBean是一个特殊的Bean,它实现了FactoryBean接口并重写了其getObject()方法,用于生产其他Bean的实例。在Spring容器启动时,会自动调用FactoryBean的getObject()方法来获…...
【C语言】零碎知识点|细节
除法运算符(/)的使用规则 在C语言中,除法运算符(/)的使用规则如下: 当两个整数相除时,结果也是一个整数。例如,如果A和B都是整数,那么A / B的结果也是一个整数。这意味着,除法运算的结果会忽略小数部分。例如,10 / 3 的结果是3,而不是3.3333。 当一个整数和一个浮点…...

电影评分数据分析案例-Spark SQL
# cording:utf8from pyspark.sql import SparkSession from pyspark.sql.types import IntegerType, StringType, StructType import pyspark.sql.functions as Fif __name__ __main__:# 0.构建执行环境入口对象SparkSessionspark SparkSession.builder.\appName(movie_demo)…...

vue如何使用冻结对象提升代码效率及其原理解析
先给大家伙整个实际工作中一定会碰到的问题 如下vue dome ,它的代码非常简单功能也1非常简单,就是一个按钮,点击后会显示有多少条数据 来看看源码, html部分就是一个按钮绑定了一个loadData事件,然后在p标签内展示了这个myData这个数据的长度 <template><div id&quo…...

基于深度学习网络的手势识别算法matlab仿真
目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2022a 3.部分核心程序 clc; clear; close all; warning off; addpath(genpath(pwd)); rng(default)load gnet.mat[Pr…...
[论文笔记] 多语言模型中的负干扰研究结果和元学习算法
On Negative Interference in Multilingual Models: Findings and A Meta-Learning Treatment 多语言模型中的负干扰:研究结果和元学习解决办法 概述: 训练语料库大小(训练数据大小和 负干扰 无关)。 语言亲缘关系/语系 和 负干扰 有关。添加相似的语言并不能减轻负面干扰。…...

【OpenVINO】行人摔倒检测 — 基于 OpenVINO C# API 部署PP-Human-下篇
行人摔倒检测 — 基于 OpenVINO C# API 部署PP-Human 4. 配置 PP-Human_Fall_Detection 项目4.1 环境配置4.2 创建 AlxBoard_deploy_yolov8 项目4.3 添加项目源码4.4 添加 OpenVINO C# API4.5 添加 OpenCvSharp 5. 测试 PP-Human_Fall_Detection 项目5.1 创建视频读取器5.2 行人…...

运行报错(三)git bash报错fatal: detected dubious ownership in repository at
报错现象 在运行git 命令时,出现报错 “fatal: detected dubious ownership in repository at” 报错原因 文件夹的所有者和现在的用户不一致 栗子: 文件夹的所有者是root,而当前用户是admin 解决方案 方法一、 将文件夹的所有者替换成ad…...

nvm 的安装及使用
文章目录 一、nvm是什么?二、下载nvm三、在cmd控制台进行操作1、nvm 查询版本号2、查询可以下载的node版本3、安装指定版本4、查看已经安装的node版本5、切换node版本(如果失败那就用管理员身份打开cmd进行切换) 一、nvm是什么? nvm是一个node的版本管理…...
xcode Simulator 安装
xcode Simulator 安装 参考文档 xcode又又又升级了,升级完成之后不下载最新的 iOS 17 Simulator就不能编译运行了,只能静静的等他下载。但是离谱的是这个居然没有断点续下,每次都要重新下载,眼睁睁的看着下载了4个G然后断掉了从…...

【Maven教程】(八):使用 Nexus 创建私服 ~
Maven 使用 Nexus 创建私服 1️⃣ Nexus简介2️⃣ 安装 Nexus2.1 下载 Nexus2.2 Bundle 方式安装 Nexus2.3 WAR 方式安装 Nexus2.4 登录 Nexus 3️⃣ Nexus 的仓库与仓库组3.1 Nexus 内置的仓库3.2 Nexus 仓库分类的概念3.3 创建 Nexus 宿主仓库3.4 创建 Nexus 代理仓库3.5 创…...

螺旋矩阵[中等]
优质博文:IT-BLOG-CN 一、题目 给你一个m行n列的矩阵matrix,请按照顺时针螺旋顺序,返回矩阵中的所有元素。 示例 1: 输入:matrix [[1,2,3],[4,5,6],[7,8,9]] 输出:[1,2,3,6,9,8,7,4,5] 示例 2…...

babel6使用ES2020最新js语法
babel6使用ES2020最新js语法 Babel 6 原本是不支持 ES2020 语法,因为它是在 Babel 7 中引入的。如果您想使用 ES2020 语法,您需要将 Babel 6 升级到 Babel 7 或更高版本(推荐),当然也可以在bebel6中安装支持某个语法的plugin,比如你想使用 ES2020 中的可…...

【iOS】简单的网络请求
应iOS小组要求,仿写知乎日报需要实现网络请求并解析JSON格式数据,这篇文章仅对基本的网络请求和iOS中的JSON解析作以记录,还涉及到RunLoop的一点小插曲,具体请求过程和原理以后会详细学习!🙏 基本网络流程简…...

Vulnhub系列靶机---mhz_cxf: c1f
靶机文档::mhz_cxf: c1f 下载地址:Download (Mirror): 网卡配置 靶机开机后按住shift,出现界面如图,按e键进入安全模式: 找到ro,删除该行后边内容,并将ro 。。。修改为:…...
SDRAM与DRAM
SDRAM(同步动态随机存取内存)和DRAM(动态随机存取内存)都是RAM的一种类型,但是它们工作的方式有所不同。 DRAM:DRAM是最基础的动态随机存取内存,它的工作方式是总线在内存中读取或写入数据的速度…...
java调用dll出现unsatisfiedLinkError以及JNA和JNI的区别
UnsatisfiedLinkError 在对接硬件设备中,我们会遇到使用 java 调用 dll文件 的情况,此时大概率出现UnsatisfiedLinkError链接错误,原因可能有如下几种 类名错误包名错误方法名参数错误使用 JNI 协议调用,结果 dll 未实现 JNI 协…...

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility
Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...

基于当前项目通过npm包形式暴露公共组件
1.package.sjon文件配置 其中xh-flowable就是暴露出去的npm包名 2.创建tpyes文件夹,并新增内容 3.创建package文件夹...

uniapp微信小程序视频实时流+pc端预览方案
方案类型技术实现是否免费优点缺点适用场景延迟范围开发复杂度WebSocket图片帧定时拍照Base64传输✅ 完全免费无需服务器 纯前端实现高延迟高流量 帧率极低个人demo测试 超低频监控500ms-2s⭐⭐RTMP推流TRTC/即构SDK推流❌ 付费方案 (部分有免费额度&#x…...
C++中string流知识详解和示例
一、概览与类体系 C 提供三种基于内存字符串的流,定义在 <sstream> 中: std::istringstream:输入流,从已有字符串中读取并解析。std::ostringstream:输出流,向内部缓冲区写入内容,最终取…...

PL0语法,分析器实现!
简介 PL/0 是一种简单的编程语言,通常用于教学编译原理。它的语法结构清晰,功能包括常量定义、变量声明、过程(子程序)定义以及基本的控制结构(如条件语句和循环语句)。 PL/0 语法规范 PL/0 是一种教学用的小型编程语言,由 Niklaus Wirth 设计,用于展示编译原理的核…...

自然语言处理——循环神经网络
自然语言处理——循环神经网络 循环神经网络应用到基于机器学习的自然语言处理任务序列到类别同步的序列到序列模式异步的序列到序列模式 参数学习和长程依赖问题基于门控的循环神经网络门控循环单元(GRU)长短期记忆神经网络(LSTM)…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...

以光量子为例,详解量子获取方式
光量子技术获取量子比特可在室温下进行。该方式有望通过与名为硅光子学(silicon photonics)的光波导(optical waveguide)芯片制造技术和光纤等光通信技术相结合来实现量子计算机。量子力学中,光既是波又是粒子。光子本…...
【Java学习笔记】BigInteger 和 BigDecimal 类
BigInteger 和 BigDecimal 类 二者共有的常见方法 方法功能add加subtract减multiply乘divide除 注意点:传参类型必须是类对象 一、BigInteger 1. 作用:适合保存比较大的整型数 2. 使用说明 创建BigInteger对象 传入字符串 3. 代码示例 import j…...