spring cloud stream 自定义binder
背景xxx,关键字 binder stream ,解决多中间件通信及切换问题
直接主菜:
spring cloud stream 架构
中间件 --- binder --- channel --- sink --- (处理)---source ---channel ---binder ---中间件
springcloudstream已自己集成了kafka、rabbitmq ,其他厂商也集成了一些。在官网有说明 https://docs.spring.io/spring-cloud-stream/docs/3.2.7/reference/html/index.html
但是有时候还需自己实现,官方也给出了响应步骤
https://docs.spring.io/spring-cloud-stream/docs/3.2.7/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-custom-binder-impl
自定义实现
定义xxBinder
cp了一网友的项目,我换成了maven,
https://github.com/yangyongdehao30/spring-cloud-stream-binder-mqtt/tree/yangyongdehao30-maven
具体实现如下:
设置config类
import com.sheunglaili.binder.mqtt.MqttMessageChannelBinder;
import com.sheunglaili.binder.mqtt.MqttProvisioningProvider;
import com.sheunglaili.binder.mqtt.properties.MqttBinderConfigurationProperties;
import com.sheunglaili.binder.mqtt.properties.MqttBindingProperties;
import com.sheunglaili.binder.mqtt.properties.MqttExtendedBindingProperties;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.config.BindingHandlerAdvise;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.util.ObjectUtils;/*** Mqtt binder configuration class* @author Alex , Li Sheung Lai*/
@Configuration
@EnableConfigurationProperties({MqttExtendedBindingProperties.class})
public class MqttBinderConfiguration {@Autowiredprivate MqttExtendedBindingProperties mqttExtendedBindingProperties;@Beanpublic MqttBinderConfigurationProperties configurationProperties(){return new MqttBinderConfigurationProperties();}@Beanpublic MqttProvisioningProvider provisioningProvider(MqttBinderConfigurationProperties configurationProperties){return new MqttProvisioningProvider();}@Beanpublic MqttPahoClientFactory mqttPahoClientFactory(MqttBinderConfigurationProperties configurationProperties) {MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(configurationProperties.getUrl());options.setUserName(configurationProperties.getUsername());options.setPassword(configurationProperties.getPassword().toCharArray());options.setCleanSession(configurationProperties.isCleanSession());options.setConnectionTimeout(configurationProperties.getConnectionTimeout());options.setKeepAliveInterval(configurationProperties.getKeepAliveInterval());DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(options);if (ObjectUtils.nullSafeEquals(configurationProperties.getPersistence(), "file")) {factory.setPersistence(new MqttDefaultFilePersistence(configurationProperties.getPersistenceDirectory()));}else if (ObjectUtils.nullSafeEquals(configurationProperties.getPersistence(), "memory")) {factory.setPersistence(new MemoryPersistence());}return factory;}@Beanpublic MqttMessageChannelBinder mqttMessageChannelBinder(MqttPahoClientFactory mqttPahoClientFactory,MqttProvisioningProvider provisioningProvider){MqttMessageChannelBinder mqttMessageChannelBinder = new MqttMessageChannelBinder(mqttPahoClientFactory,provisioningProvider);return mqttMessageChannelBinder;}
配置properties
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.config.BinderProperties;
import org.springframework.context.annotation.PropertySource;
import org.springframework.util.Assert;
import org.springframework.validation.annotation.Validated;import javax.validation.constraints.Size;/*** Configuration properties for the Mqtt binder . The properties in the class* are prefixed with <b>spring.cloud.stream.mqtt.binder</b>* @author Alex , Li Sheung Lai*/
@Data
@Validated
@ConfigurationProperties(prefix = "spring.cloud.stream.mqtt")
public class MqttBinderConfigurationProperties {/*** location of the mqtt broker(s) (comma-delimited list)*/@Size(min = 1)private String[] url = new String[] { "tcp://localhost:1883" };/*** the username to use when connecting to the broker*/private String username = "guest";/*** the password to use when connecting to the broker*/private String password = "guest";/*** whether the client and server should remember state across restarts and reconnects*/private boolean cleanSession = true;/*** the connection timeout in seconds*/private int connectionTimeout = 30;/*** the ping interval in seconds*/private int keepAliveInterval = 60;/*** 'memory' or 'file'*/private String persistence = "memory";/*** Persistence directory*/private String persistenceDirectory = "/tmp/paho";public MqttBinderConfigurationProperties() {}public String[] getUrl() {return url;}public void setUrl(String[] url) {this.url = url;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public boolean isCleanSession() {return cleanSession;}public void setCleanSession(boolean cleanSession) {this.cleanSession = cleanSession;}public int getConnectionTimeout() {return connectionTimeout;}public void setConnectionTimeout(int connectionTimeout) {this.connectionTimeout = connectionTimeout;}public int getKeepAliveInterval() {return keepAliveInterval;}public void setKeepAliveInterval(int keepAliveInterval) {this.keepAliveInterval = keepAliveInterval;}public String getPersistence() {return persistence;}public void setPersistence(String persistence) {this.persistence = persistence;}public String getPersistenceDirectory() {return persistenceDirectory;}public void setPersistenceDirectory(String persistenceDirectory) {this.persistenceDirectory = persistenceDirectory;}
//注,和本properties同文件夹的还有几个类,具体在 git中 ,可下载拷贝
实现一个channel binder
import com.sheunglaili.binder.mqtt.properties.MqttExtendedBindingProperties;
import com.sheunglaili.binder.mqtt.properties.MqttSinkProperties;
import com.sheunglaili.binder.mqtt.properties.MqttSourceProperties;
import org.springframework.cloud.stream.binder.*;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;public class MqttMessageChannelBinderextends AbstractMessageChannelBinder<ExtendedConsumerProperties<MqttSourceProperties>, ExtendedProducerProperties<MqttSinkProperties>, MqttProvisioningProvider>implements ExtendedPropertiesBinder<MessageChannel, MqttSourceProperties, MqttSinkProperties> {private MqttExtendedBindingProperties extendedBindingProperties = new MqttExtendedBindingProperties();private MqttPahoClientFactory mqttPahoClientFactory;public void setMqttPahoClientFactory(MqttPahoClientFactory mqttPahoClientFactory) {this.mqttPahoClientFactory = mqttPahoClientFactory;}public MqttMessageChannelBinder(MqttPahoClientFactory factory,MqttProvisioningProvider provisioningProvider) {super(BinderHeaders.STANDARD_HEADERS, provisioningProvider);this.mqttPahoClientFactory = factory;}@Overrideprotected MessageHandler createProducerMessageHandler(ProducerDestination destination,ExtendedProducerProperties<MqttSinkProperties> producerProperties,MessageChannel errorChannel) throws Exception {MqttSinkProperties sinkProperties = producerProperties.getExtension();DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(sinkProperties.getQos(),sinkProperties.isRetained(),sinkProperties.getCharset());MqttPahoMessageHandler handler = new MqttPahoMessageHandler(sinkProperties.getClientId(),this.mqttPahoClientFactory);handler.setAsync(sinkProperties.isAsync());handler.setDefaultTopic(sinkProperties.getTopic());handler.setConverter(converter);return handler;}@Overrideprotected MessageProducer createConsumerEndpoint(ConsumerDestination destination,String group,ExtendedConsumerProperties<MqttSourceProperties> properties) throws Exception {MqttSourceProperties sourceProperties = properties.getExtension();DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(sourceProperties.getCharset());converter.setPayloadAsBytes(sourceProperties.isBinary());MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(sourceProperties.getClientId(),this.mqttPahoClientFactory,sourceProperties.getTopics());adapter.setBeanFactory(this.getBeanFactory());adapter.setQos(sourceProperties.getQos());adapter.setConverter(converter);adapter.setOutputChannelName(destination.getName());return adapter;}public void setExtendedBindingProperties(MqttExtendedBindingProperties extendedBindingProperties) {this.extendedBindingProperties = extendedBindingProperties;}@Overridepublic MqttSourceProperties getExtendedConsumerProperties(String channelName) {return this.extendedBindingProperties.getExtendedConsumerProperties(channelName);}@Overridepublic MqttSinkProperties getExtendedProducerProperties(String channelName) {return this.extendedBindingProperties.getExtendedProducerProperties(channelName);}@Overridepublic String getDefaultsPrefix() {return this.extendedBindingProperties.getDefaultsPrefix();}@Overridepublic Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {return this.extendedBindingProperties.getExtendedPropertiesEntryClass();}}
实现一个Provider
import com.sheunglaili.binder.mqtt.properties.MqttSinkProperties;
import com.sheunglaili.binder.mqtt.properties.MqttSourceProperties;
import lombok.RequiredArgsConstructor;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;public class MqttProvisioningProvider implementsProvisioningProvider<ExtendedConsumerProperties<MqttSourceProperties>, ExtendedProducerProperties<MqttSinkProperties>> {@Overridepublic ProducerDestination provisionProducerDestination(String name,ExtendedProducerProperties<MqttSinkProperties> properties) throws ProvisioningException {return new MqttTopicDestination(name);}@Overridepublic ConsumerDestination provisionConsumerDestination(String name, String group, ExtendedConsumerProperties<MqttSourceProperties> properties) throws ProvisioningException {return new MqttTopicDestination(name);}@RequiredArgsConstructorprivate class MqttTopicDestination implements ProducerDestination , ConsumerDestination{private final String destination;@Overridepublic String getName() {return this.destination.trim();}@Overridepublic String getNameForPartition(int partition) {throw new UnsupportedOperationException("Partitioning is not implemented for mqtt");}}
}
配置 spring.binders
mqtt:\
com.sheunglaili.binder.mqtt.config.MqttBinderConfiguration
配置如下:
spring.cloud.stream.binders.mqtt1.type=mqtt
spring.cloud.stream.binders.mqtt1.environment.spring.cloud.stream.mqtt.url=tcp://localhost:1883
spring.cloud.stream.binders.mqtt1.environment.spring.cloud.stream.mqtt.username=admin
spring.cloud.stream.binders.mqtt1.environment.spring.cloud.stream.mqtt.password=admin
记得,不要扫描到BinderConfiguration,xxBinderConfiguration 是在binderService动态配置的,具体构建Binder在这,如果扫描到BinderConfiguration类,此处binders.size就不是0了

相关文章:

spring cloud stream 自定义binder
背景xxx,关键字 binder stream ,解决多中间件通信及切换问题直接主菜:spring cloud stream 架构中间件 --- binder --- channel --- sink --- (处理)---source ---channel ---binder ---中间件 springcloudstream已自己集成了kafk…...

计算机网络之HTTP协议
目录 一、HTTP的含义 1.1 理解超文本 1.2 理解应用层协议 1.3 理解HTTP协议的工作过程 二、HTTP协议格式 2.1 抓包工具的使用 2.2 理解协议格式 2.2.1 请求协议格式 2.2.2. 响应格式请求 一、HTTP的含义 HTTP(全称为“超文本传输协议”)&#x…...

如何挖掘专利创新点?
“无意中发现了一个巨牛的人工智能教程,忍不住分享一下给大家。教程不仅是零基础,通俗易懂,而且非常风趣幽默,像看小说一样!觉得太牛了,所以分享给大家。点这里可以跳转到教程。” 对于广大的软件工程师来说…...
虚函数和纯虚函数
多态(polymorphism)是面向对象编程语言的一大特点,而虚函数是实现多态的机制。其核心理念就是通过基类访问派生类定义的函数。多态性使得程序调用的函数是在运行时动态确定的,而不是在编译时静态确定的。使用一个基类类型的指针或…...

Framework源码面试——Handler与事件传递机制面试集合
Handler面试题 Handler的作用: 当我们需要在子线程处理耗时的操作(例如访问网络,数据库的操作),而当耗时的操作完成后,需要更新UI,这就需要使用Handler来处理,因为子线程不能做更新…...

iOS开发-bugly符号表自动上传发布自动化shell
这里介绍的是通过build得到的app文件和dSYM文件来打包分发和符号表上传。 通过Archive方式打包和获得符号表的方式以后再说。 一:bugly工具jar包准备 bugly符号表工具下载地址:(下载完成后放入项目目录下,如不想加入git可通过gitIgnore忽略…...
MySQL OCP888题解046-哪些语句会被记录到binlog
文章目录1、原题1.1、英文原题1.2、中文翻译1.3、答案2、题目解析2.1、题干解析2.2、选项解析3、知识点3.1、知识点1:binlog_format选项3.2、知识点2:Performance Schema(性能模式)4、总结1、原题 1.1、英文原题 You enable binary logging on MySQL S…...

【前端学习】D5:CSS进阶
文章目录前言系列文章目录1 精灵图Sprites1.1 为什么需要精灵图?1.2 精灵图的使用2 字体图标iconfont2.1 字体图标的产生2.2 字体图标的优点2.3 字体文件格式2.4 字体图标的使用2.5 字体图标的引入2.6 字体图标的追加3 CSS三角3.1 普通三角3.2 案例4 CSS用户界面样式…...

【bioinfo】融合检测软件FusionMap分析流程和报告结果
文章目录写在前面FusionMap融合检测原理FusionMap与其他软比较FusionMap分析流程FusionMap结果文件说明FusionMap mono CUP设置图片来源: https://en.wikipedia.org/wiki/Fusion_gene写在前面 下面主要内容是关于RNA-seq数据分析融合,用到软件是FusionMap 【Fusion…...

C++基础了解-17-C++日期 时间
C日期 & 时间 一、C日期 & 时间 C 标准库没有提供所谓的日期类型。C 继承了 C 语言用于日期和时间操作的结构和函数。为了使用日期和时间相关的函数和结构,需要在 C 程序中引用 头文件。 有四个与时间相关的类型:clock_t、time_t、size_t 和 …...

MOV压敏电阻的几种电路元件功能及不同优势讲解
压敏电阻,通常是电路为防护浪涌冲击电压而使用的一种电子元器件,相比其他的浪涌保护器来说,也有那么几个不一样的优势,那么,具体有哪些?以及关于它的作用,你都知道吗?以下优恩小编为…...

uniapp+uniCloud实战项目报修小程序开发
前言 本项目基于 uniapp uniCloud 云开发,简单易用,逻辑主要是云数据库的增删查改,页面大部分自写,部分使用uniUI, uView 组件库。大家可用于学习或者二次开发,有什么不懂的地方可联系 wechat:MrYe443。用…...

演唱会的火车票没了?Python实现12306查票以及zidong购票....
嗨害大家好!我是小熊猫~ 不知道大家抢到演唱会的门票没有呢? 不管抢到没有,火车票也是很重要的哇 24小时抢票不间断的那种喔~ ~ ~ 不然可就要走路去了喔~ 准备工作 环境 Python 3.8Pycharm 插件 谷歌浏览器驱动 模块 需要安装的第三方模块&am…...

Linux发行版本与发行版的简单的介绍
Linux linux下有很多发行的版本,或者称之为魔改版本。以下介绍一些常见的版本,以避免名词的混淆。 linux是提供了一个内核,就像是谷歌的内核一样,QQ浏览器就是使用的谷歌的内核,也算是一个发行版本。 Ubuntu&#x…...

前后端分离项目学习-vue+springboot 博客
前后端分离项目 文章总体分为2大部分,Java后端接口和vue前端页面 项目演示:www.markerhub.com:8084/blogs Java后端接口开发 1、前言 从零开始搭建一个项目骨架,最好选择合适,熟悉的技术,并且在未来易拓展…...

关于指针运算的一道题
目录 刚看到这道题的时候我也和大多数小白一样感到无从下手,但是在我写这篇博客的前几分钟开始我对这道题有了一点点的理解。所以我就想着趁热打铁,写一篇博客来记录一下我的想法。 题目如下: 画图: 逐一解答: 题一…...

【论文简述】Learning Optical Flow with Kernel Patch Attention(CVPR 2022)
一、论文简述 1. 第一作者:Ao Luo 2. 发表年份:2022 3. 发表期刊:CVPR 4. 关键词:光流、局部注意力、空间关联、上下文关联 5. 探索动机:现有方法主要将光流估计视为特征匹配任务,即学习在特征空间中将…...
Java学习-MySQL-列的数据类型
Java学习-MySQL-列的数据类型 数值 tinyint - 1个字节smallint - 2个字节mediumint - 3个字节int - 4个字节bigint - 8个字节float - 4个字节double - 8个字节decimal - 字符串形式的浮点数 字符串 char - 0~255varchar - 可变字符串 0~65535tinytext - 微型文本 2^8-1text…...

终端配色-Docker容器终端
20230309 - 0. 引言 平时使用SSH,通常都是使用securecrt来用,毕竟也算是之前windows下一种使用的工具,在mac下使用还算方便;进入终端后,可以通过调整配色来调整编程环境。平时经常使用屎黄色的那种配色,毕…...
SQL基础培训04-插入数据
知识点: 假设有订单表 CREATE TABLE SEOrder ( FID int identity(...

黑马Mybatis
Mybatis 表现层:页面展示 业务层:逻辑处理 持久层:持久数据化保存 在这里插入图片描述 Mybatis快速入门 ,你需要做出一个**决定性判断**:恶性还是良性?这种“非黑即白”的抉择,正是**逻辑回归(Logistic Regression)** 的战场&a…...
在鸿蒙HarmonyOS 5中实现抖音风格的点赞功能
下面我将详细介绍如何使用HarmonyOS SDK在HarmonyOS 5中实现类似抖音的点赞功能,包括动画效果、数据同步和交互优化。 1. 基础点赞功能实现 1.1 创建数据模型 // VideoModel.ets export class VideoModel {id: string "";title: string ""…...
IGP(Interior Gateway Protocol,内部网关协议)
IGP(Interior Gateway Protocol,内部网关协议) 是一种用于在一个自治系统(AS)内部传递路由信息的路由协议,主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院查看报告小程序
一、开发环境准备 工具安装: 下载安装DevEco Studio 4.0(支持HarmonyOS 5)配置HarmonyOS SDK 5.0确保Node.js版本≥14 项目初始化: ohpm init harmony/hospital-report-app 二、核心功能模块实现 1. 报告列表…...
Java + Spring Boot + Mybatis 实现批量插入
在 Java 中使用 Spring Boot 和 MyBatis 实现批量插入可以通过以下步骤完成。这里提供两种常用方法:使用 MyBatis 的 <foreach> 标签和批处理模式(ExecutorType.BATCH)。 方法一:使用 XML 的 <foreach> 标签ÿ…...

pikachu靶场通关笔记19 SQL注入02-字符型注入(GET)
目录 一、SQL注入 二、字符型SQL注入 三、字符型注入与数字型注入 四、源码分析 五、渗透实战 1、渗透准备 2、SQL注入探测 (1)输入单引号 (2)万能注入语句 3、获取回显列orderby 4、获取数据库名database 5、获取表名…...
DiscuzX3.5发帖json api
参考文章:PHP实现独立Discuz站外发帖(直连操作数据库)_discuz 发帖api-CSDN博客 简单改造了一下,适配我自己的需求 有一个站点存在多个采集站,我想通过主站拿标题,采集站拿内容 使用到的sql如下 CREATE TABLE pre_forum_post_…...
CppCon 2015 学习:Time Programming Fundamentals
Civil Time 公历时间 特点: 共 6 个字段: Year(年)Month(月)Day(日)Hour(小时)Minute(分钟)Second(秒) 表示…...

倒装芯片凸点成型工艺
UBM(Under Bump Metallization)与Bump(焊球)形成工艺流程。我们可以将整张流程图分为三大阶段来理解: 🔧 一、UBM(Under Bump Metallization)工艺流程(黄色区域ÿ…...