Spring Boot配置多个Kafka数据源
一、配置文件
application.properties配置文件如下
#kafka多数据源配置
#kafka数据源一,日志审计推送
spring.kafka.one.bootstrap-servers=172.19.12.109:32182
spring.kafka.one.producer.retries=0
spring.kafka.one.producer.properties.max.block.ms=5000
#kafka数据源二,动环数据消费
spring.kafka.two.bootstrap-servers=172.19.12.109:32182
spring.kafka.two.producer.retries=0
spring.kafka.two.producer.properties.max.block.ms=5000
spring.kafka.two.consumer.group-id=bw-convert-data
spring.kafka.two.consumer.enable-auto-commit=true
二、pom依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
三、生产者、消费者配置
1.第一个kakfa
package com.gstanzer.convert.config;import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;@EnableKafka
@Configuration
public class KafkaOneConfig {@Value("${spring.kafka.one.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.one.producer.retries}")private String retries;@Value("${spring.kafka.one.producer.properties.max.block.ms}")private String maxBlockMs;@Beanpublic KafkaTemplate<String, String> kafkaOneTemplate() {return new KafkaTemplate<>(producerFactory());}private ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}private Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}
}
2.第二个kakfa
package com.gstanzer.convert.config;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
public class KafkaTwoConfig {@Value("${spring.kafka.two.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.two.producer.retries}")private String retries;@Value("${spring.kafka.two.producer.properties.max.block.ms}")private String maxBlockMs;@Value("${spring.kafka.two.consumer.group-id}")private String groupId;@Value("${spring.kafka.two.consumer.enable-auto-commit}")private boolean enableAutoCommit;@Beanpublic KafkaTemplate<String, String> kafkaTwoTemplate() {return new KafkaTemplate<>(producerFactory());}@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}private ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}public ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}private Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}private Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}}
四.生产者
@Controller
public class TestController {@Autowiredprivate KafkaTemplate kafkaOneTemplate;@Autowiredprivate KafkaTemplate kafkaTwoTemplate;@RequestMapping("/send")@ResponseBodypublic String send() {final String TOPIC = "TOPIC_1";kafkaOneTemplate.send(TOPIC, "kafka one");kafkaTwoTemplate.send(TOPIC, "kafka two");return "success";}
}
五.消费者
@Component
public class KafkaConsumer {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);final String TOPIC = "TOPIC_1";// containerFactory 的值要与配置中 KafkaListenerContainerFactory 的 Bean 名相同@KafkaListener(topics = {TOPIC}, containerFactory = "kafkaOneContainerFactory")public void listenerOne(ConsumerRecord<?, ?> record) {LOGGER.info(" kafka one 接收到消息:{}", record.value());}@KafkaListener(topics = {TOPIC}, containerFactory = "kafkaTwoContainerFactory")public void listenerTwo(ConsumerRecord<?, ?> record) {LOGGER.info(" kafka two 接收到消息:{}", record.value());}
}
备注:
生产者消费者代码参考链接,开发同学需要以实际情况按要求自己变更下代码即可:
Spring Boot 集成多个 Kafka_springboot集成多个kafka_//承续缘_纪录片的博客-CSDN博客
相关文章:
Spring Boot配置多个Kafka数据源
一、配置文件 application.properties配置文件如下 #kafka多数据源配置 #kafka数据源一,日志审计推送 spring.kafka.one.bootstrap-servers172.19.12.109:32182 spring.kafka.one.producer.retries0 spring.kafka.one.producer.properties.max.block.ms5000 #kafk…...
Learning Open-World Object Proposals without Learning to Classify(论文解析)
Learning Open-World Object Proposals without Learning to Classify 摘要1 介绍2 相关工作3 方法3.1 基线3.2 基于纯定位的对象性3.3. 对象定位网络 (OLN)4 实验4.1跨类泛化4.2.开放世界类不可知检测4.3更多的跨数据集泛化4.3.1 Objects365 泛化4.3.2 EpicKitchens 的泛化4.4…...
前端在项目中添加自己的功能页面
1.src—>mock–>sideMenue:边表(sidemenue)的子功能的添加:左边功能框中的显示 在相应的父功能添加子功能 id号不能和他人的一样,casecode:就是路由名字 title:中文名称 2.前后端接口(后端程序员给),定义好接口名称 src—>moudles—…...
数据库MySQL(二):DDL数据定义语言
数据定义语言(Data Definition Language,DDL) 该语言主要用于定义数据库对象,操作对象为数据库、表或字段。 数据库操作 # 查询所有数据库 SHOW DATABASES;# 查询当前数据库 SELECT DATABASE(); # 创建数据库 CREATE DATABASE […...
Spring FactoryBean 源码讲解
Spring FactoryBean 源码讲解 什么是Spring FactoryBean Spring FactoryBean是一个特殊的Bean,它实现了FactoryBean接口并重写了其getObject()方法,用于生产其他Bean的实例。在Spring容器启动时,会自动调用FactoryBean的getObject()方法来获…...
【C语言】零碎知识点|细节
除法运算符(/)的使用规则 在C语言中,除法运算符(/)的使用规则如下: 当两个整数相除时,结果也是一个整数。例如,如果A和B都是整数,那么A / B的结果也是一个整数。这意味着,除法运算的结果会忽略小数部分。例如,10 / 3 的结果是3,而不是3.3333。 当一个整数和一个浮点…...
电影评分数据分析案例-Spark SQL
# cording:utf8from pyspark.sql import SparkSession from pyspark.sql.types import IntegerType, StringType, StructType import pyspark.sql.functions as Fif __name__ __main__:# 0.构建执行环境入口对象SparkSessionspark SparkSession.builder.\appName(movie_demo)…...
vue如何使用冻结对象提升代码效率及其原理解析
先给大家伙整个实际工作中一定会碰到的问题 如下vue dome ,它的代码非常简单功能也1非常简单,就是一个按钮,点击后会显示有多少条数据 来看看源码, html部分就是一个按钮绑定了一个loadData事件,然后在p标签内展示了这个myData这个数据的长度 <template><div id&quo…...
基于深度学习网络的手势识别算法matlab仿真
目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2022a 3.部分核心程序 clc; clear; close all; warning off; addpath(genpath(pwd)); rng(default)load gnet.mat[Pr…...
[论文笔记] 多语言模型中的负干扰研究结果和元学习算法
On Negative Interference in Multilingual Models: Findings and A Meta-Learning Treatment 多语言模型中的负干扰:研究结果和元学习解决办法 概述: 训练语料库大小(训练数据大小和 负干扰 无关)。 语言亲缘关系/语系 和 负干扰 有关。添加相似的语言并不能减轻负面干扰。…...
【OpenVINO】行人摔倒检测 — 基于 OpenVINO C# API 部署PP-Human-下篇
行人摔倒检测 — 基于 OpenVINO C# API 部署PP-Human 4. 配置 PP-Human_Fall_Detection 项目4.1 环境配置4.2 创建 AlxBoard_deploy_yolov8 项目4.3 添加项目源码4.4 添加 OpenVINO C# API4.5 添加 OpenCvSharp 5. 测试 PP-Human_Fall_Detection 项目5.1 创建视频读取器5.2 行人…...
运行报错(三)git bash报错fatal: detected dubious ownership in repository at
报错现象 在运行git 命令时,出现报错 “fatal: detected dubious ownership in repository at” 报错原因 文件夹的所有者和现在的用户不一致 栗子: 文件夹的所有者是root,而当前用户是admin 解决方案 方法一、 将文件夹的所有者替换成ad…...
nvm 的安装及使用
文章目录 一、nvm是什么?二、下载nvm三、在cmd控制台进行操作1、nvm 查询版本号2、查询可以下载的node版本3、安装指定版本4、查看已经安装的node版本5、切换node版本(如果失败那就用管理员身份打开cmd进行切换) 一、nvm是什么? nvm是一个node的版本管理…...
xcode Simulator 安装
xcode Simulator 安装 参考文档 xcode又又又升级了,升级完成之后不下载最新的 iOS 17 Simulator就不能编译运行了,只能静静的等他下载。但是离谱的是这个居然没有断点续下,每次都要重新下载,眼睁睁的看着下载了4个G然后断掉了从…...
【Maven教程】(八):使用 Nexus 创建私服 ~
Maven 使用 Nexus 创建私服 1️⃣ Nexus简介2️⃣ 安装 Nexus2.1 下载 Nexus2.2 Bundle 方式安装 Nexus2.3 WAR 方式安装 Nexus2.4 登录 Nexus 3️⃣ Nexus 的仓库与仓库组3.1 Nexus 内置的仓库3.2 Nexus 仓库分类的概念3.3 创建 Nexus 宿主仓库3.4 创建 Nexus 代理仓库3.5 创…...
螺旋矩阵[中等]
优质博文:IT-BLOG-CN 一、题目 给你一个m行n列的矩阵matrix,请按照顺时针螺旋顺序,返回矩阵中的所有元素。 示例 1: 输入:matrix [[1,2,3],[4,5,6],[7,8,9]] 输出:[1,2,3,6,9,8,7,4,5] 示例 2…...
babel6使用ES2020最新js语法
babel6使用ES2020最新js语法 Babel 6 原本是不支持 ES2020 语法,因为它是在 Babel 7 中引入的。如果您想使用 ES2020 语法,您需要将 Babel 6 升级到 Babel 7 或更高版本(推荐),当然也可以在bebel6中安装支持某个语法的plugin,比如你想使用 ES2020 中的可…...
【iOS】简单的网络请求
应iOS小组要求,仿写知乎日报需要实现网络请求并解析JSON格式数据,这篇文章仅对基本的网络请求和iOS中的JSON解析作以记录,还涉及到RunLoop的一点小插曲,具体请求过程和原理以后会详细学习!🙏 基本网络流程简…...
Vulnhub系列靶机---mhz_cxf: c1f
靶机文档::mhz_cxf: c1f 下载地址:Download (Mirror): 网卡配置 靶机开机后按住shift,出现界面如图,按e键进入安全模式: 找到ro,删除该行后边内容,并将ro 。。。修改为:…...
SDRAM与DRAM
SDRAM(同步动态随机存取内存)和DRAM(动态随机存取内存)都是RAM的一种类型,但是它们工作的方式有所不同。 DRAM:DRAM是最基础的动态随机存取内存,它的工作方式是总线在内存中读取或写入数据的速度…...
Lingbot-Depth-Pretrain-VitL-14模型数据处理流水线优化:Python入门到实战
Lingbot-Depth-Pretrain-VitL-14模型数据处理流水线优化:Python入门到实战 你是不是刚学Python,觉得语法都会了,但一碰到真实项目,比如要处理图片、喂给AI模型,就有点无从下手?别担心,这种感觉…...
【JavaSE-网络部分06】TCP 纯高性能优化机制:延迟应答・捎带应答【传输层】
上一期咱们把TCP稳如泰山的三大核心机制——滑动窗口、流量控制、拥塞控制彻底盘明白了📚。 这三者强强联手,既守住了可靠传输的底线,又大幅提升传输效率,让数据既稳又快地跑在网络里。 但TCP对性能的“抠搜”可不止于此…...
Open Interpreter实时代码预览:沙箱模式部署详细说明
Open Interpreter实时代码预览:沙箱模式部署详细说明 1. 项目概述 Open Interpreter 是一个让人眼前一亮的开源工具,它能让你用平常说话的方式告诉AI要做什么,然后AI就会在你的电脑上直接写代码、运行代码,甚至帮你修改代码。想…...
大模型优化:CUDA调度波次(Wave)中的负载均衡与资源利用
1. 理解CUDA调度波次(Wave)的基本概念 当你第一次听到"CUDA调度波次"这个词时,可能会觉得有点抽象。其实它就像餐厅里服务员上菜的过程。想象一下,一个餐厅有4个厨师(相当于GPU的SM),…...
万字干货 | OpenClaw 进阶玩法大全:技能 / 多 Agent / 省钱 / 安全,+ 实战技巧一次学会
1.概述在人工智能快速发展的今天,AI不再仅仅是回答问题的聊天机器人,而是正在演变为能够主动完成复杂任务的智能代理。OpenAI的Codex CLI就是这一趋势的典型代表——一个跨平台的本地软件代理,能够在用户的机器上安全高效地生成高质量的软件变…...
C语言实现进程调度算法:优先级与时间片轮转
1. 项目概述在嵌入式系统和操作系统开发中,进程调度是一个核心概念。今天我要分享的是如何在C语言中实现一个简单的程序调度机制,重点讲解高优先数调度算法和先来先服务算法的实现。这个项目非常适合想要深入理解操作系统底层原理的开发者,特…...
深入解析Dify中的RAG内容检索:Rerank模型与权重计算的实战对比
1. RAG内容检索的核心挑战与Rerank的价值 当你用Dify搭建一个智能问答系统时,最头疼的问题往往是:明明数据库里有正确答案,但系统总是返回一堆不相关的文档。这就像在图书馆用关键词搜索书籍,结果管理员给你搬来了整个书架——这时…...
基于双向反激变换器的SOC估算与主动均衡策略仿真研究——复现硕士论文并拓展六节电池模型与均衡策略分析
基于双向反激变换器的SOC估算与主动均衡仿真 可以 [1]复现硕士论文:《锂离子电池SOC估算与主动均衡策略研究_王昊》 [2]六节电池模型:使用Simmulink搭建了六节电池主动均衡仿真 [3]均衡策略:选择了电压、SOC及其分阶段使用作为主动均衡变量&a…...
第25课:让 Qt 从 GPIO 子系统一路进阶到平台驱动与设备树控制
本节路线图 为什么这一课要把三种GP → 先从GPIO子系统开始: → 再进一步:平台驱动让LE 小猫提醒 这节有分区、烧录或删除类操作,先确认盘符和路径,再按回车。 猫头鹰提示 编译前先对齐目标架构和工具链名字,别让主机程序和板卡程序搞混。 上一课我们已经把 Qt 和字符驱动…...
如何从零搭建Cubli_Mini:开源自平衡机器人完整制作指南
如何从零搭建Cubli_Mini:开源自平衡机器人完整制作指南 【免费下载链接】Cubli_Mini 项目地址: https://gitcode.com/gh_mirrors/cu/Cubli_Mini Cubli_Mini是一款令人惊叹的开源自平衡立方体机器人项目,它通过三个正交安装的飞轮实现姿态控制&am…...
