springboot整合kafka多数据源
整合kafka多数据源
- 项目背景
- 依赖
- 配置
- 生产者
- 消费者
- 消息体
项目背景
在很多与第三方公司对接的时候,或者处在不同的网络环境下,比如在互联网和政务外网的分布部署服务的时候,我们需要对接多台kafka来达到我们的业务需求,那么当kafka存在多数据源的情况,就与单机的情况有所不同。
依赖
implementation 'org.springframework.kafka:spring-kafka:2.8.2'
配置
单机的情况
如果是单机的kafka我们直接通过springboot自动配置的就可以使用,例如在yml里面直接引用
spring:kafka:producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerbootstrap-servers: server001.bbd:9092
在使用的时候直接注入,然后就可以使用里面的方法了
@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;
多数据源情况下
本篇文章主要讲的是在多数据源下的使用,和单机的有所不同,我也看了网上的一些博客,但是当我去按照网上的配置的时候,总是会报错 kafakTemplate
这个bean
找不到,所以没办法只有按照springboot自动配置里面的来改
package com.ddb.zggz.config;import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaTransactionManager;import java.io.IOException;@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConfiguration {private final KafkaProperties properties;private final KafkaSecondProperties kafkaSecondProperties;public KafkaConfiguration(KafkaProperties properties, KafkaSecondProperties kafkaSecondProperties) {this.properties = properties;this.kafkaSecondProperties = kafkaSecondProperties;}@Bean("kafkaTemplate")@Primarypublic KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,ProducerListener<Object, Object> kafkaProducerListener,ObjectProvider<RecordMessageConverter> messageConverter) {KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);messageConverter.ifUnique(kafkaTemplate::setMessageConverter);kafkaTemplate.setProducerListener(kafkaProducerListener);kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());return kafkaTemplate;}@Bean("kafkaSecondTemplate")public KafkaTemplate<?, ?> kafkaSecondTemplate(@Qualifier("kafkaSecondProducerFactory") ProducerFactory<Object, Object> kafkaProducerFactory,@Qualifier("kafkaSecondProducerListener") ProducerListener<Object, Object> kafkaProducerListener,ObjectProvider<RecordMessageConverter> messageConverter) {KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);messageConverter.ifUnique(kafkaTemplate::setMessageConverter);kafkaTemplate.setProducerListener(kafkaProducerListener);kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());return kafkaTemplate;}@Bean("kafkaProducerListener")@Primarypublic ProducerListener<Object, Object> kafkaProducerListener() {return new LoggingProducerListener<>();}@Bean("kafkaSecondProducerListener")public ProducerListener<Object, Object> kafkaSecondProducerListener() {return new LoggingProducerListener<>();}@Bean("kafkaConsumerFactory")@Primarypublic ConsumerFactory<Object, Object> kafkaConsumerFactory(ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));return factory;}@Bean("kafkaSecondConsumerFactory")public ConsumerFactory<Object, Object> kafkaSecondConsumerFactory(ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(this.kafkaSecondProperties.buildConsumerProperties());customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));return factory;}@Bean("zwKafkaContainerFactory")KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> zwKafkaContainerFactory(@Qualifier(value = "kafkaSecondConsumerFactory") ConsumerFactory<Object, Object> kafkaSecondConsumerFactory) {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(kafkaSecondConsumerFactory);factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}@Bean("kafkaProducerFactory")@Primarypublic ProducerFactory<Object, Object> kafkaProducerFactory(ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(this.properties.buildProducerProperties());String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();if (transactionIdPrefix != null) {factory.setTransactionIdPrefix(transactionIdPrefix);}customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));return factory;}@Bean("kafkaSecondProducerFactory")public ProducerFactory<Object, Object> kafkaSecondProducerFactory(ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(this.kafkaSecondProperties.buildProducerProperties());String transactionIdPrefix = this.kafkaSecondProperties.getProducer().getTransactionIdPrefix();if (transactionIdPrefix != null) {factory.setTransactionIdPrefix(transactionIdPrefix);}customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));return factory;}@Bean@ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {return new KafkaTransactionManager<>(producerFactory);}@Bean@ConditionalOnProperty(name = "spring.kafka.jaas.enabled")public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();KafkaProperties.Jaas jaasProperties = this.properties.getJaas();if (jaasProperties.getControlFlag() != null) {jaas.setControlFlag(jaasProperties.getControlFlag());}if (jaasProperties.getLoginModule() != null) {jaas.setLoginModule(jaasProperties.getLoginModule());}jaas.setOptions(jaasProperties.getOptions());return jaas;}@Bean("kafkaAdmin")@Primarypublic KafkaAdmin kafkaAdmin() {KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());return kafkaAdmin;}}
生产者
package com.ddb.zggz.event;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;import com.ddb.zggz.config.ApplicationConfiguration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;import javax.annotation.Resource;@Component
@Slf4j
public class KafkaPushEvent {@Resourceprivate KafkaTemplate<String, String> kafkaSecondTemplate;@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;@Autowiredprivate ApplicationConfiguration configuration;public void pushEvent(PushParam param) {ListenableFuture<SendResult<String, String>> sendResultListenableFuture = null;if ("zw".equals(configuration.getEnvironment())){sendResultListenableFuture = kafkaSecondTemplate.send(configuration.getPushTopic(), JSON.toJSONString(param));}if ("net".equals(configuration.getEnvironment())){sendResultListenableFuture = kafkaTemplate.send(configuration.getPushTopic(), JSON.toJSONString(param));}if (sendResultListenableFuture == null){throw new IllegalArgumentException("kakfa发送消息失败");}sendResultListenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable ex) {log.error("kafka发送的message报错,发送数据:{}", param);}@Overridepublic void onSuccess(SendResult<String, String> result) {log.info("kafka发送的message成功,发送数据:{}", param);}});}}
消费者
package com.ddb.zggz.event;import com.alibaba.fastjson.JSONObject;import com.ddb.zggz.config.ApplicationConfiguration;
import com.ddb.zggz.model.dto.ApprovalDTO;
import com.ddb.zggz.param.OffShelfParam;
import com.ddb.zggz.service.GzApprovalService;
import com.ddb.zggz.service.GzServiceService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.retry.annotation.Backoff;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;import java.util.ArrayList;
import java.util.List;
import java.util.Objects;@Component
@Slf4j
public class SendMessageListener {@Autowiredprivate GzApprovalService gzApprovalService;@Autowiredprivate GzServiceService gzServiceService;@KafkaListener(topics = "${application.config.push-topic}", groupId = "zggz",containerFactory = "zwKafkaContainerFactory")@RetryableTopic(include = {Exception.class},backoff = @Backoff(delay = 3000, multiplier = 1.5, maxDelay = 15000))public void listen(ConsumerRecord<?, ?> consumerRecord) {String value = (String) consumerRecord.value();PushParam pushParam = JSONObject.parseObject(value, PushParam.class);//版本提审if ("version-approval".equals(pushParam.getEvent())) {ApprovalDTO approvalDTO = JSONObject.parseObject(JSONObject.toJSONString(pushParam.getData()), ApprovalDTO.class);gzApprovalService.approval(approvalDTO);}//服务下架if (pushParam.getEvent().equals("server-OffShelf-gzt")) {OffShelfParam offShelfParam = JSONObject.parseObject(JSONObject.toJSONString(pushParam.getData()), OffShelfParam.class);gzServiceService.offShelfV1(offShelfParam.getReason(), null, offShelfParam.getUserName(), "ZGGZ", offShelfParam.getH5Id(), offShelfParam.getAppId(), offShelfParam.getVersion());}}@DltHandlerpublic void processMessage(String message) {}
}
消息体
package com.ddb.zggz.event;import com.alibaba.fastjson.annotation.JSONField;
import com.ddb.zggz.model.GzH5VersionManage;
import com.ddb.zggz.model.GzService;
import com.ddb.zggz.model.dto.ApprovalDTO;
import com.ddb.zggz.param.OffShelfParam;
import com.ddb.zggz.param.PublishParam;
import com.ddb.zggz.param.ReviewAndRollback;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import lombok.Data;import java.io.Serializable;
import java.time.LocalDateTime;/*** @author bbd*/
@Data
public class PushParam implements Serializable {/*** 发送的消息数据*/private Object data;@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")@JsonSerialize(using = LocalDateTimeSerializer.class)@JsonDeserialize(using = LocalDateTimeDeserializer.class)@JSONField(format = "yyyy-MM-dd HH:mm:ss")private LocalDateTime createTime = LocalDateTime.now();/*** 事件名称,用于消费者处理相关业务*/private String event;/*** 保存版本参数*/public static PushParam toKafkaVersion(GzH5VersionManage gzH5VersionManage) {PushParam pushParam = new PushParam();pushParam.setData(gzH5VersionManage);pushParam.setEvent("save-version");return pushParam;}/*** 保存服务参数*/public static PushParam toKafkaServer(GzService gzService) {PushParam pushParam = new PushParam();pushParam.setData(gzService);pushParam.setEvent("save-server");return pushParam;}
相关文章:

springboot整合kafka多数据源
整合kafka多数据源 项目背景依赖配置生产者消费者消息体 项目背景 在很多与第三方公司对接的时候,或者处在不同的网络环境下,比如在互联网和政务外网的分布部署服务的时候,我们需要对接多台kafka来达到我们的业务需求,那么当kafk…...

基于CentOS 7 配置nginx负载均衡
搭建负载均衡服务的需求如下: 1 ) 把单台计算机无法承受的大规模并发访问或数据流量分担到多台节点设备上,分别进行处理, 减少用户等待响应的时间, 提升用户体验。 2 ) 单个重负载的运算分担到多台节点设备上做并行处理ÿ…...
WordToPDF2.java
用Java将Word转PDF 本例子测试了spire.doc.free-3.9.0.jar的包 <dependency><groupId> e-iceblue </groupId><artifactId>spire.doc.free</artifactId><version>3.9.0</version></dependency> package word;import com.spire.…...

k8s服务注册发现
Service 是 将运行在一个或一组pod上的网络应用程序公开为网络服务的方法。 定义service前端为service名称、ip、端口等不变的部分,后端为符合标签选择的pod集合 注册 通过api server提交注册service请求到DNSservice随后得到clusterIP(虚拟ip地址&am…...

IK分词器升级,MySQL热更新助一臂之力
ik分词器采用MySQL热更新 官方所给的IK分词器只支持远程文本文件热更新,不支持采用MySQL热更新,没关系,这难不倒伟大的博主,给哈哈哈。今天就来和大家讲一下如何采用MySQL做热更新IK分词器的词库。 一、建立数据库表 CREATE…...

泛微 E-Office文件上传漏洞复现
声明 本文仅用于技术交流,请勿用于非法用途 由于传播、利用此文所提供的信息而造成的任何直接或者间接的后果及损失,均由使用者本人负责,文章作者不为此承担任何责任。 文章作者拥有对此文章的修改和解释权。如欲转载或传播此文章,…...
bug的生命周期
bug的生命周期 bugbug的生命周期bug等级 bug 当且仅当规格说明书是存在的并且正确的,程序和规格说明书之间的不匹配才是错误当产品规格说明书没有提到时,以用户需求为准,当程序最终没有实现用户的合理预期的功能要求时,就是软件错…...
mysql分库分表相关
3小时快速上手sharding-jdbc 百亿级数据 分库分表 后面怎么分页查询? Java实战:教你如何进行数据库分库分表...

云原生k8s---资源限制、探针
目录 一:资源限制 1、资源限制原因 2、Pod 和 容器 的资源请求和限制 3、CPU 资源单位 4、内存 资源单位 5、事例 (1)事例一 (2)事例二 二:重启策略 1、重启策略模式 2、事例 三:探针…...

html2canvas生成图片地址Base64格式转成blob在转成file(二进制)可正常发送(保姆教程,复制粘贴可用)
开始: 最终结果: 1. html2canvas方法生成的图片地址已Base64编码形式放在img标签src中可直接展示生成的图片(注意页面标签获取位置,还有个setTimeout页面渲染需要时间) setTimeout(function () {var result {};v…...

将Linux上的cpolar内网穿透配置为开机自启动——“cpolar内网穿透”
将Linux上的cpolar内网穿透配置为开机自启动 文章目录 将Linux上的cpolar内网穿透配置为开机自启动前言一、进入命令行模式二、输入token码三、输入内网穿透命令 前言 我们将cpolar安装到了Ubuntu系统上,并通过web-UI界面对cpolar的功能有了初步了解。当然cpolar除…...

微信小程序data-item设置获取不到数据的问题
微信小程序data-item设置获取不到数据的问题 简单说明: 在微信小程序中,通过列表渲染使用wx:for根据数组中的每一项重复渲染组件。同时使用bindtap给每一项绑定点击事件clickItem,再通过data-item绑定数据。 **问题:**通过data-i…...

创建百度百科需要什么条件?
随着互联网的发展,人们越来越依赖于搜索引擎获取信息。百度作为中国最大的搜索引擎之一,旗下的百科词条已成为人们获取知识的重要来源。创建百度百科需要什么条件呢?接下来伯乐网络传媒就来给大家讲一讲。 首先,你需要有一个百度…...

【springboot启动报错】java: 错误: 无效的源发行版:17
报错截图 解决方案 第一步:编辑配置,改为想用的jdk版本 第二步:文件--->项目结构,改为对应的SDK 第三步:文件--->设置--->构建、执行、部署--->编译器--->Java编译器,修改目标字节码版本 第…...

无涯教程-Perl - setservent函数
描述 在第一次调用getservent之前,应先调用此函数。 STAYOPEN参数是可选的,在大多数系统上未使用。当getservent()检索服务数据库中下一行的信息时,然后setervent设置(或重置)枚举到主机条目集的开头。 语法 以下是此函数的简单语法- setservent STAYOPEN返回值 此函数不返…...

Java创建多线程的最全方法
Java创建多线程的最全方法 一、继承Thread,重写run方法二、实现Runnable接口,重写run方法三、使用匿名内部类创建 Thread 子类对象四、使用匿名内部类,实现Runnable接口五、实现Callable接口六、使用线程池创建线程 一、继承Thread࿰…...
02 qt基本控件及信号和槽
一 QString类 功能:显示一个字符串内容 主要接口函数 构造函数: QString(const char *str)QString(const QString &other)赋值运算符重载: QString &operator=(const QString &other)功能函数: 1&...

大数据校招学员实习面试分享
本文实习面试总结来自一位非科班(机械专业)出身的在校生。 作为一个大数据领域的校招实习生,我在这里想分享一下我的经验和教训,希望对大家有所帮助。 1 简历投递准备 在准备简历时,首先需要准确地把握自己的技能和…...

用于弥散加权MRI的关节各向异性维纳滤光片研究(Matlab代码实现)
💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…...

谷粒商城第十一天-品牌管理中关联分类
目录 一、总述 二、前端部分 1. 调整查询调用 2. 关联分类 三、后端部分 四、总结 一、总述 之前是在商品的分类管理中直接使用的若依的逆向代码 有下面的几个问题: 1. 表格上面的参数填写之后,都是按照完全匹配进行搜索,没有模糊匹配…...
浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)
✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义(Task Definition&…...

基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真
目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销,平衡网络负载,延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...

Debian系统简介
目录 Debian系统介绍 Debian版本介绍 Debian软件源介绍 软件包管理工具dpkg dpkg核心指令详解 安装软件包 卸载软件包 查询软件包状态 验证软件包完整性 手动处理依赖关系 dpkg vs apt Debian系统介绍 Debian 和 Ubuntu 都是基于 Debian内核 的 Linux 发行版ÿ…...
Cesium1.95中高性能加载1500个点
一、基本方式: 图标使用.png比.svg性能要好 <template><div id"cesiumContainer"></div><div class"toolbar"><button id"resetButton">重新生成点</button><span id"countDisplay&qu…...

关于iview组件中使用 table , 绑定序号分页后序号从1开始的解决方案
问题描述:iview使用table 中type: "index",分页之后 ,索引还是从1开始,试过绑定后台返回数据的id, 这种方法可行,就是后台返回数据的每个页面id都不完全是按照从1开始的升序,因此百度了下,找到了…...
LLM基础1_语言模型如何处理文本
基于GitHub项目:https://github.com/datawhalechina/llms-from-scratch-cn 工具介绍 tiktoken:OpenAI开发的专业"分词器" torch:Facebook开发的强力计算引擎,相当于超级计算器 理解词嵌入:给词语画"…...

c#开发AI模型对话
AI模型 前面已经介绍了一般AI模型本地部署,直接调用现成的模型数据。这里主要讲述讲接口集成到我们自己的程序中使用方式。 微软提供了ML.NET来开发和使用AI模型,但是目前国内可能使用不多,至少实践例子很少看见。开发训练模型就不介绍了&am…...

selenium学习实战【Python爬虫】
selenium学习实战【Python爬虫】 文章目录 selenium学习实战【Python爬虫】一、声明二、学习目标三、安装依赖3.1 安装selenium库3.2 安装浏览器驱动3.2.1 查看Edge版本3.2.2 驱动安装 四、代码讲解4.1 配置浏览器4.2 加载更多4.3 寻找内容4.4 完整代码 五、报告文件爬取5.1 提…...

学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2
每日一言 今天的每一份坚持,都是在为未来积攒底气。 案例:OLED显示一个A 这边观察到一个点,怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 : 如果代码里信号切换太快(比如 SDA 刚变,SCL 立刻变&#…...
Mobile ALOHA全身模仿学习
一、题目 Mobile ALOHA:通过低成本全身远程操作学习双手移动操作 传统模仿学习(Imitation Learning)缺点:聚焦与桌面操作,缺乏通用任务所需的移动性和灵活性 本论文优点:(1)在ALOHA…...