spring cloud stream 自定义binder
背景xxx,关键字 binder stream ,解决多中间件通信及切换问题
直接主菜:
spring cloud stream 架构
中间件 --- binder --- channel --- sink --- (处理)---source ---channel ---binder ---中间件
springcloudstream已自己集成了kafka、rabbitmq ,其他厂商也集成了一些。在官网有说明 https://docs.spring.io/spring-cloud-stream/docs/3.2.7/reference/html/index.html
但是有时候还需自己实现,官方也给出了响应步骤
https://docs.spring.io/spring-cloud-stream/docs/3.2.7/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-custom-binder-impl
自定义实现
定义xxBinder
cp了一网友的项目,我换成了maven,
https://github.com/yangyongdehao30/spring-cloud-stream-binder-mqtt/tree/yangyongdehao30-maven
具体实现如下:
设置config类
import com.sheunglaili.binder.mqtt.MqttMessageChannelBinder;
import com.sheunglaili.binder.mqtt.MqttProvisioningProvider;
import com.sheunglaili.binder.mqtt.properties.MqttBinderConfigurationProperties;
import com.sheunglaili.binder.mqtt.properties.MqttBindingProperties;
import com.sheunglaili.binder.mqtt.properties.MqttExtendedBindingProperties;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.config.BindingHandlerAdvise;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.util.ObjectUtils;/*** Mqtt binder configuration class* @author Alex , Li Sheung Lai*/
@Configuration
@EnableConfigurationProperties({MqttExtendedBindingProperties.class})
public class MqttBinderConfiguration {@Autowiredprivate MqttExtendedBindingProperties mqttExtendedBindingProperties;@Beanpublic MqttBinderConfigurationProperties configurationProperties(){return new MqttBinderConfigurationProperties();}@Beanpublic MqttProvisioningProvider provisioningProvider(MqttBinderConfigurationProperties configurationProperties){return new MqttProvisioningProvider();}@Beanpublic MqttPahoClientFactory mqttPahoClientFactory(MqttBinderConfigurationProperties configurationProperties) {MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(configurationProperties.getUrl());options.setUserName(configurationProperties.getUsername());options.setPassword(configurationProperties.getPassword().toCharArray());options.setCleanSession(configurationProperties.isCleanSession());options.setConnectionTimeout(configurationProperties.getConnectionTimeout());options.setKeepAliveInterval(configurationProperties.getKeepAliveInterval());DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(options);if (ObjectUtils.nullSafeEquals(configurationProperties.getPersistence(), "file")) {factory.setPersistence(new MqttDefaultFilePersistence(configurationProperties.getPersistenceDirectory()));}else if (ObjectUtils.nullSafeEquals(configurationProperties.getPersistence(), "memory")) {factory.setPersistence(new MemoryPersistence());}return factory;}@Beanpublic MqttMessageChannelBinder mqttMessageChannelBinder(MqttPahoClientFactory mqttPahoClientFactory,MqttProvisioningProvider provisioningProvider){MqttMessageChannelBinder mqttMessageChannelBinder = new MqttMessageChannelBinder(mqttPahoClientFactory,provisioningProvider);return mqttMessageChannelBinder;}
配置properties
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.config.BinderProperties;
import org.springframework.context.annotation.PropertySource;
import org.springframework.util.Assert;
import org.springframework.validation.annotation.Validated;import javax.validation.constraints.Size;/*** Configuration properties for the Mqtt binder . The properties in the class* are prefixed with <b>spring.cloud.stream.mqtt.binder</b>* @author Alex , Li Sheung Lai*/
@Data
@Validated
@ConfigurationProperties(prefix = "spring.cloud.stream.mqtt")
public class MqttBinderConfigurationProperties {/*** location of the mqtt broker(s) (comma-delimited list)*/@Size(min = 1)private String[] url = new String[] { "tcp://localhost:1883" };/*** the username to use when connecting to the broker*/private String username = "guest";/*** the password to use when connecting to the broker*/private String password = "guest";/*** whether the client and server should remember state across restarts and reconnects*/private boolean cleanSession = true;/*** the connection timeout in seconds*/private int connectionTimeout = 30;/*** the ping interval in seconds*/private int keepAliveInterval = 60;/*** 'memory' or 'file'*/private String persistence = "memory";/*** Persistence directory*/private String persistenceDirectory = "/tmp/paho";public MqttBinderConfigurationProperties() {}public String[] getUrl() {return url;}public void setUrl(String[] url) {this.url = url;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public boolean isCleanSession() {return cleanSession;}public void setCleanSession(boolean cleanSession) {this.cleanSession = cleanSession;}public int getConnectionTimeout() {return connectionTimeout;}public void setConnectionTimeout(int connectionTimeout) {this.connectionTimeout = connectionTimeout;}public int getKeepAliveInterval() {return keepAliveInterval;}public void setKeepAliveInterval(int keepAliveInterval) {this.keepAliveInterval = keepAliveInterval;}public String getPersistence() {return persistence;}public void setPersistence(String persistence) {this.persistence = persistence;}public String getPersistenceDirectory() {return persistenceDirectory;}public void setPersistenceDirectory(String persistenceDirectory) {this.persistenceDirectory = persistenceDirectory;}
//注,和本properties同文件夹的还有几个类,具体在 git中 ,可下载拷贝实现一个channel binder
import com.sheunglaili.binder.mqtt.properties.MqttExtendedBindingProperties;
import com.sheunglaili.binder.mqtt.properties.MqttSinkProperties;
import com.sheunglaili.binder.mqtt.properties.MqttSourceProperties;
import org.springframework.cloud.stream.binder.*;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;public class MqttMessageChannelBinderextends AbstractMessageChannelBinder<ExtendedConsumerProperties<MqttSourceProperties>, ExtendedProducerProperties<MqttSinkProperties>, MqttProvisioningProvider>implements ExtendedPropertiesBinder<MessageChannel, MqttSourceProperties, MqttSinkProperties> {private MqttExtendedBindingProperties extendedBindingProperties = new MqttExtendedBindingProperties();private MqttPahoClientFactory mqttPahoClientFactory;public void setMqttPahoClientFactory(MqttPahoClientFactory mqttPahoClientFactory) {this.mqttPahoClientFactory = mqttPahoClientFactory;}public MqttMessageChannelBinder(MqttPahoClientFactory factory,MqttProvisioningProvider provisioningProvider) {super(BinderHeaders.STANDARD_HEADERS, provisioningProvider);this.mqttPahoClientFactory = factory;}@Overrideprotected MessageHandler createProducerMessageHandler(ProducerDestination destination,ExtendedProducerProperties<MqttSinkProperties> producerProperties,MessageChannel errorChannel) throws Exception {MqttSinkProperties sinkProperties = producerProperties.getExtension();DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(sinkProperties.getQos(),sinkProperties.isRetained(),sinkProperties.getCharset());MqttPahoMessageHandler handler = new MqttPahoMessageHandler(sinkProperties.getClientId(),this.mqttPahoClientFactory);handler.setAsync(sinkProperties.isAsync());handler.setDefaultTopic(sinkProperties.getTopic());handler.setConverter(converter);return handler;}@Overrideprotected MessageProducer createConsumerEndpoint(ConsumerDestination destination,String group,ExtendedConsumerProperties<MqttSourceProperties> properties) throws Exception {MqttSourceProperties sourceProperties = properties.getExtension();DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(sourceProperties.getCharset());converter.setPayloadAsBytes(sourceProperties.isBinary());MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(sourceProperties.getClientId(),this.mqttPahoClientFactory,sourceProperties.getTopics());adapter.setBeanFactory(this.getBeanFactory());adapter.setQos(sourceProperties.getQos());adapter.setConverter(converter);adapter.setOutputChannelName(destination.getName());return adapter;}public void setExtendedBindingProperties(MqttExtendedBindingProperties extendedBindingProperties) {this.extendedBindingProperties = extendedBindingProperties;}@Overridepublic MqttSourceProperties getExtendedConsumerProperties(String channelName) {return this.extendedBindingProperties.getExtendedConsumerProperties(channelName);}@Overridepublic MqttSinkProperties getExtendedProducerProperties(String channelName) {return this.extendedBindingProperties.getExtendedProducerProperties(channelName);}@Overridepublic String getDefaultsPrefix() {return this.extendedBindingProperties.getDefaultsPrefix();}@Overridepublic Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {return this.extendedBindingProperties.getExtendedPropertiesEntryClass();}}
实现一个Provider
import com.sheunglaili.binder.mqtt.properties.MqttSinkProperties;
import com.sheunglaili.binder.mqtt.properties.MqttSourceProperties;
import lombok.RequiredArgsConstructor;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;public class MqttProvisioningProvider implementsProvisioningProvider<ExtendedConsumerProperties<MqttSourceProperties>, ExtendedProducerProperties<MqttSinkProperties>> {@Overridepublic ProducerDestination provisionProducerDestination(String name,ExtendedProducerProperties<MqttSinkProperties> properties) throws ProvisioningException {return new MqttTopicDestination(name);}@Overridepublic ConsumerDestination provisionConsumerDestination(String name, String group, ExtendedConsumerProperties<MqttSourceProperties> properties) throws ProvisioningException {return new MqttTopicDestination(name);}@RequiredArgsConstructorprivate class MqttTopicDestination implements ProducerDestination , ConsumerDestination{private final String destination;@Overridepublic String getName() {return this.destination.trim();}@Overridepublic String getNameForPartition(int partition) {throw new UnsupportedOperationException("Partitioning is not implemented for mqtt");}}
}
配置 spring.binders
mqtt:\
com.sheunglaili.binder.mqtt.config.MqttBinderConfiguration配置如下:
spring.cloud.stream.binders.mqtt1.type=mqtt
spring.cloud.stream.binders.mqtt1.environment.spring.cloud.stream.mqtt.url=tcp://localhost:1883
spring.cloud.stream.binders.mqtt1.environment.spring.cloud.stream.mqtt.username=admin
spring.cloud.stream.binders.mqtt1.environment.spring.cloud.stream.mqtt.password=admin记得,不要扫描到BinderConfiguration,xxBinderConfiguration 是在binderService动态配置的,具体构建Binder在这,如果扫描到BinderConfiguration类,此处binders.size就不是0了

相关文章:
spring cloud stream 自定义binder
背景xxx,关键字 binder stream ,解决多中间件通信及切换问题直接主菜:spring cloud stream 架构中间件 --- binder --- channel --- sink --- (处理)---source ---channel ---binder ---中间件 springcloudstream已自己集成了kafk…...
计算机网络之HTTP协议
目录 一、HTTP的含义 1.1 理解超文本 1.2 理解应用层协议 1.3 理解HTTP协议的工作过程 二、HTTP协议格式 2.1 抓包工具的使用 2.2 理解协议格式 2.2.1 请求协议格式 2.2.2. 响应格式请求 一、HTTP的含义 HTTP(全称为“超文本传输协议”)&#x…...
如何挖掘专利创新点?
“无意中发现了一个巨牛的人工智能教程,忍不住分享一下给大家。教程不仅是零基础,通俗易懂,而且非常风趣幽默,像看小说一样!觉得太牛了,所以分享给大家。点这里可以跳转到教程。” 对于广大的软件工程师来说…...
虚函数和纯虚函数
多态(polymorphism)是面向对象编程语言的一大特点,而虚函数是实现多态的机制。其核心理念就是通过基类访问派生类定义的函数。多态性使得程序调用的函数是在运行时动态确定的,而不是在编译时静态确定的。使用一个基类类型的指针或…...
Framework源码面试——Handler与事件传递机制面试集合
Handler面试题 Handler的作用: 当我们需要在子线程处理耗时的操作(例如访问网络,数据库的操作),而当耗时的操作完成后,需要更新UI,这就需要使用Handler来处理,因为子线程不能做更新…...
iOS开发-bugly符号表自动上传发布自动化shell
这里介绍的是通过build得到的app文件和dSYM文件来打包分发和符号表上传。 通过Archive方式打包和获得符号表的方式以后再说。 一:bugly工具jar包准备 bugly符号表工具下载地址:(下载完成后放入项目目录下,如不想加入git可通过gitIgnore忽略…...
MySQL OCP888题解046-哪些语句会被记录到binlog
文章目录1、原题1.1、英文原题1.2、中文翻译1.3、答案2、题目解析2.1、题干解析2.2、选项解析3、知识点3.1、知识点1:binlog_format选项3.2、知识点2:Performance Schema(性能模式)4、总结1、原题 1.1、英文原题 You enable binary logging on MySQL S…...
【前端学习】D5:CSS进阶
文章目录前言系列文章目录1 精灵图Sprites1.1 为什么需要精灵图?1.2 精灵图的使用2 字体图标iconfont2.1 字体图标的产生2.2 字体图标的优点2.3 字体文件格式2.4 字体图标的使用2.5 字体图标的引入2.6 字体图标的追加3 CSS三角3.1 普通三角3.2 案例4 CSS用户界面样式…...
【bioinfo】融合检测软件FusionMap分析流程和报告结果
文章目录写在前面FusionMap融合检测原理FusionMap与其他软比较FusionMap分析流程FusionMap结果文件说明FusionMap mono CUP设置图片来源: https://en.wikipedia.org/wiki/Fusion_gene写在前面 下面主要内容是关于RNA-seq数据分析融合,用到软件是FusionMap 【Fusion…...
C++基础了解-17-C++日期 时间
C日期 & 时间 一、C日期 & 时间 C 标准库没有提供所谓的日期类型。C 继承了 C 语言用于日期和时间操作的结构和函数。为了使用日期和时间相关的函数和结构,需要在 C 程序中引用 头文件。 有四个与时间相关的类型:clock_t、time_t、size_t 和 …...
MOV压敏电阻的几种电路元件功能及不同优势讲解
压敏电阻,通常是电路为防护浪涌冲击电压而使用的一种电子元器件,相比其他的浪涌保护器来说,也有那么几个不一样的优势,那么,具体有哪些?以及关于它的作用,你都知道吗?以下优恩小编为…...
uniapp+uniCloud实战项目报修小程序开发
前言 本项目基于 uniapp uniCloud 云开发,简单易用,逻辑主要是云数据库的增删查改,页面大部分自写,部分使用uniUI, uView 组件库。大家可用于学习或者二次开发,有什么不懂的地方可联系 wechat:MrYe443。用…...
演唱会的火车票没了?Python实现12306查票以及zidong购票....
嗨害大家好!我是小熊猫~ 不知道大家抢到演唱会的门票没有呢? 不管抢到没有,火车票也是很重要的哇 24小时抢票不间断的那种喔~ ~ ~ 不然可就要走路去了喔~ 准备工作 环境 Python 3.8Pycharm 插件 谷歌浏览器驱动 模块 需要安装的第三方模块&am…...
Linux发行版本与发行版的简单的介绍
Linux linux下有很多发行的版本,或者称之为魔改版本。以下介绍一些常见的版本,以避免名词的混淆。 linux是提供了一个内核,就像是谷歌的内核一样,QQ浏览器就是使用的谷歌的内核,也算是一个发行版本。 Ubuntu&#x…...
前后端分离项目学习-vue+springboot 博客
前后端分离项目 文章总体分为2大部分,Java后端接口和vue前端页面 项目演示:www.markerhub.com:8084/blogs Java后端接口开发 1、前言 从零开始搭建一个项目骨架,最好选择合适,熟悉的技术,并且在未来易拓展…...
关于指针运算的一道题
目录 刚看到这道题的时候我也和大多数小白一样感到无从下手,但是在我写这篇博客的前几分钟开始我对这道题有了一点点的理解。所以我就想着趁热打铁,写一篇博客来记录一下我的想法。 题目如下: 画图: 逐一解答: 题一…...
【论文简述】Learning Optical Flow with Kernel Patch Attention(CVPR 2022)
一、论文简述 1. 第一作者:Ao Luo 2. 发表年份:2022 3. 发表期刊:CVPR 4. 关键词:光流、局部注意力、空间关联、上下文关联 5. 探索动机:现有方法主要将光流估计视为特征匹配任务,即学习在特征空间中将…...
Java学习-MySQL-列的数据类型
Java学习-MySQL-列的数据类型 数值 tinyint - 1个字节smallint - 2个字节mediumint - 3个字节int - 4个字节bigint - 8个字节float - 4个字节double - 8个字节decimal - 字符串形式的浮点数 字符串 char - 0~255varchar - 可变字符串 0~65535tinytext - 微型文本 2^8-1text…...
终端配色-Docker容器终端
20230309 - 0. 引言 平时使用SSH,通常都是使用securecrt来用,毕竟也算是之前windows下一种使用的工具,在mac下使用还算方便;进入终端后,可以通过调整配色来调整编程环境。平时经常使用屎黄色的那种配色,毕…...
SQL基础培训04-插入数据
知识点: 假设有订单表 CREATE TABLE SEOrder ( FID int identity(...
基于FPGA的PID算法学习———实现PID比例控制算法
基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容:参考网站: PID算法控制 PID即:Proportional(比例)、Integral(积分&…...
【WiFi帧结构】
文章目录 帧结构MAC头部管理帧 帧结构 Wi-Fi的帧分为三部分组成:MAC头部frame bodyFCS,其中MAC是固定格式的,frame body是可变长度。 MAC头部有frame control,duration,address1,address2,addre…...
Oracle查询表空间大小
1 查询数据库中所有的表空间以及表空间所占空间的大小 SELECTtablespace_name,sum( bytes ) / 1024 / 1024 FROMdba_data_files GROUP BYtablespace_name; 2 Oracle查询表空间大小及每个表所占空间的大小 SELECTtablespace_name,file_id,file_name,round( bytes / ( 1024 …...
全球首个30米分辨率湿地数据集(2000—2022)
数据简介 今天我们分享的数据是全球30米分辨率湿地数据集,包含8种湿地亚类,该数据以0.5X0.5的瓦片存储,我们整理了所有属于中国的瓦片名称与其对应省份,方便大家研究使用。 该数据集作为全球首个30米分辨率、覆盖2000–2022年时间…...
【决胜公务员考试】求职OMG——见面课测验1
2025最新版!!!6.8截至答题,大家注意呀! 博主码字不易点个关注吧,祝期末顺利~~ 1.单选题(2分) 下列说法错误的是:( B ) A.选调生属于公务员系统 B.公务员属于事业编 C.选调生有基层锻炼的要求 D…...
Android15默认授权浮窗权限
我们经常有那种需求,客户需要定制的apk集成在ROM中,并且默认授予其【显示在其他应用的上层】权限,也就是我们常说的浮窗权限,那么我们就可以通过以下方法在wms、ams等系统服务的systemReady()方法中调用即可实现预置应用默认授权浮…...
汇编常见指令
汇编常见指令 一、数据传送指令 指令功能示例说明MOV数据传送MOV EAX, 10将立即数 10 送入 EAXMOV [EBX], EAX将 EAX 值存入 EBX 指向的内存LEA加载有效地址LEA EAX, [EBX4]将 EBX4 的地址存入 EAX(不访问内存)XCHG交换数据XCHG EAX, EBX交换 EAX 和 EB…...
JVM暂停(Stop-The-World,STW)的原因分类及对应排查方案
JVM暂停(Stop-The-World,STW)的完整原因分类及对应排查方案,结合JVM运行机制和常见故障场景整理而成: 一、GC相关暂停 1. 安全点(Safepoint)阻塞 现象:JVM暂停但无GC日志,日志显示No GCs detected。原因:JVM等待所有线程进入安全点(如…...
Redis的发布订阅模式与专业的 MQ(如 Kafka, RabbitMQ)相比,优缺点是什么?适用于哪些场景?
Redis 的发布订阅(Pub/Sub)模式与专业的 MQ(Message Queue)如 Kafka、RabbitMQ 进行比较,核心的权衡点在于:简单与速度 vs. 可靠与功能。 下面我们详细展开对比。 Redis Pub/Sub 的核心特点 它是一个发后…...
C++使用 new 来创建动态数组
问题: 不能使用变量定义数组大小 原因: 这是因为数组在内存中是连续存储的,编译器需要在编译阶段就确定数组的大小,以便正确地分配内存空间。如果允许使用变量来定义数组的大小,那么编译器就无法在编译时确定数组的大…...
