spring cloud stream 自定义binder
背景xxx,关键字 binder stream ,解决多中间件通信及切换问题
直接主菜:
spring cloud stream 架构
中间件 --- binder --- channel --- sink --- (处理)---source ---channel ---binder ---中间件
springcloudstream已自己集成了kafka、rabbitmq ,其他厂商也集成了一些。在官网有说明 https://docs.spring.io/spring-cloud-stream/docs/3.2.7/reference/html/index.html
但是有时候还需自己实现,官方也给出了响应步骤
https://docs.spring.io/spring-cloud-stream/docs/3.2.7/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-custom-binder-impl
自定义实现
定义xxBinder
cp了一网友的项目,我换成了maven,
https://github.com/yangyongdehao30/spring-cloud-stream-binder-mqtt/tree/yangyongdehao30-maven
具体实现如下:
设置config类
import com.sheunglaili.binder.mqtt.MqttMessageChannelBinder;
import com.sheunglaili.binder.mqtt.MqttProvisioningProvider;
import com.sheunglaili.binder.mqtt.properties.MqttBinderConfigurationProperties;
import com.sheunglaili.binder.mqtt.properties.MqttBindingProperties;
import com.sheunglaili.binder.mqtt.properties.MqttExtendedBindingProperties;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.config.BindingHandlerAdvise;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.util.ObjectUtils;/*** Mqtt binder configuration class* @author Alex , Li Sheung Lai*/
@Configuration
@EnableConfigurationProperties({MqttExtendedBindingProperties.class})
public class MqttBinderConfiguration {@Autowiredprivate MqttExtendedBindingProperties mqttExtendedBindingProperties;@Beanpublic MqttBinderConfigurationProperties configurationProperties(){return new MqttBinderConfigurationProperties();}@Beanpublic MqttProvisioningProvider provisioningProvider(MqttBinderConfigurationProperties configurationProperties){return new MqttProvisioningProvider();}@Beanpublic MqttPahoClientFactory mqttPahoClientFactory(MqttBinderConfigurationProperties configurationProperties) {MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(configurationProperties.getUrl());options.setUserName(configurationProperties.getUsername());options.setPassword(configurationProperties.getPassword().toCharArray());options.setCleanSession(configurationProperties.isCleanSession());options.setConnectionTimeout(configurationProperties.getConnectionTimeout());options.setKeepAliveInterval(configurationProperties.getKeepAliveInterval());DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(options);if (ObjectUtils.nullSafeEquals(configurationProperties.getPersistence(), "file")) {factory.setPersistence(new MqttDefaultFilePersistence(configurationProperties.getPersistenceDirectory()));}else if (ObjectUtils.nullSafeEquals(configurationProperties.getPersistence(), "memory")) {factory.setPersistence(new MemoryPersistence());}return factory;}@Beanpublic MqttMessageChannelBinder mqttMessageChannelBinder(MqttPahoClientFactory mqttPahoClientFactory,MqttProvisioningProvider provisioningProvider){MqttMessageChannelBinder mqttMessageChannelBinder = new MqttMessageChannelBinder(mqttPahoClientFactory,provisioningProvider);return mqttMessageChannelBinder;}
配置properties
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.config.BinderProperties;
import org.springframework.context.annotation.PropertySource;
import org.springframework.util.Assert;
import org.springframework.validation.annotation.Validated;import javax.validation.constraints.Size;/*** Configuration properties for the Mqtt binder . The properties in the class* are prefixed with <b>spring.cloud.stream.mqtt.binder</b>* @author Alex , Li Sheung Lai*/
@Data
@Validated
@ConfigurationProperties(prefix = "spring.cloud.stream.mqtt")
public class MqttBinderConfigurationProperties {/*** location of the mqtt broker(s) (comma-delimited list)*/@Size(min = 1)private String[] url = new String[] { "tcp://localhost:1883" };/*** the username to use when connecting to the broker*/private String username = "guest";/*** the password to use when connecting to the broker*/private String password = "guest";/*** whether the client and server should remember state across restarts and reconnects*/private boolean cleanSession = true;/*** the connection timeout in seconds*/private int connectionTimeout = 30;/*** the ping interval in seconds*/private int keepAliveInterval = 60;/*** 'memory' or 'file'*/private String persistence = "memory";/*** Persistence directory*/private String persistenceDirectory = "/tmp/paho";public MqttBinderConfigurationProperties() {}public String[] getUrl() {return url;}public void setUrl(String[] url) {this.url = url;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public boolean isCleanSession() {return cleanSession;}public void setCleanSession(boolean cleanSession) {this.cleanSession = cleanSession;}public int getConnectionTimeout() {return connectionTimeout;}public void setConnectionTimeout(int connectionTimeout) {this.connectionTimeout = connectionTimeout;}public int getKeepAliveInterval() {return keepAliveInterval;}public void setKeepAliveInterval(int keepAliveInterval) {this.keepAliveInterval = keepAliveInterval;}public String getPersistence() {return persistence;}public void setPersistence(String persistence) {this.persistence = persistence;}public String getPersistenceDirectory() {return persistenceDirectory;}public void setPersistenceDirectory(String persistenceDirectory) {this.persistenceDirectory = persistenceDirectory;}
//注,和本properties同文件夹的还有几个类,具体在 git中 ,可下载拷贝实现一个channel binder
import com.sheunglaili.binder.mqtt.properties.MqttExtendedBindingProperties;
import com.sheunglaili.binder.mqtt.properties.MqttSinkProperties;
import com.sheunglaili.binder.mqtt.properties.MqttSourceProperties;
import org.springframework.cloud.stream.binder.*;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;public class MqttMessageChannelBinderextends AbstractMessageChannelBinder<ExtendedConsumerProperties<MqttSourceProperties>, ExtendedProducerProperties<MqttSinkProperties>, MqttProvisioningProvider>implements ExtendedPropertiesBinder<MessageChannel, MqttSourceProperties, MqttSinkProperties> {private MqttExtendedBindingProperties extendedBindingProperties = new MqttExtendedBindingProperties();private MqttPahoClientFactory mqttPahoClientFactory;public void setMqttPahoClientFactory(MqttPahoClientFactory mqttPahoClientFactory) {this.mqttPahoClientFactory = mqttPahoClientFactory;}public MqttMessageChannelBinder(MqttPahoClientFactory factory,MqttProvisioningProvider provisioningProvider) {super(BinderHeaders.STANDARD_HEADERS, provisioningProvider);this.mqttPahoClientFactory = factory;}@Overrideprotected MessageHandler createProducerMessageHandler(ProducerDestination destination,ExtendedProducerProperties<MqttSinkProperties> producerProperties,MessageChannel errorChannel) throws Exception {MqttSinkProperties sinkProperties = producerProperties.getExtension();DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(sinkProperties.getQos(),sinkProperties.isRetained(),sinkProperties.getCharset());MqttPahoMessageHandler handler = new MqttPahoMessageHandler(sinkProperties.getClientId(),this.mqttPahoClientFactory);handler.setAsync(sinkProperties.isAsync());handler.setDefaultTopic(sinkProperties.getTopic());handler.setConverter(converter);return handler;}@Overrideprotected MessageProducer createConsumerEndpoint(ConsumerDestination destination,String group,ExtendedConsumerProperties<MqttSourceProperties> properties) throws Exception {MqttSourceProperties sourceProperties = properties.getExtension();DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(sourceProperties.getCharset());converter.setPayloadAsBytes(sourceProperties.isBinary());MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(sourceProperties.getClientId(),this.mqttPahoClientFactory,sourceProperties.getTopics());adapter.setBeanFactory(this.getBeanFactory());adapter.setQos(sourceProperties.getQos());adapter.setConverter(converter);adapter.setOutputChannelName(destination.getName());return adapter;}public void setExtendedBindingProperties(MqttExtendedBindingProperties extendedBindingProperties) {this.extendedBindingProperties = extendedBindingProperties;}@Overridepublic MqttSourceProperties getExtendedConsumerProperties(String channelName) {return this.extendedBindingProperties.getExtendedConsumerProperties(channelName);}@Overridepublic MqttSinkProperties getExtendedProducerProperties(String channelName) {return this.extendedBindingProperties.getExtendedProducerProperties(channelName);}@Overridepublic String getDefaultsPrefix() {return this.extendedBindingProperties.getDefaultsPrefix();}@Overridepublic Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {return this.extendedBindingProperties.getExtendedPropertiesEntryClass();}}
实现一个Provider
import com.sheunglaili.binder.mqtt.properties.MqttSinkProperties;
import com.sheunglaili.binder.mqtt.properties.MqttSourceProperties;
import lombok.RequiredArgsConstructor;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;public class MqttProvisioningProvider implementsProvisioningProvider<ExtendedConsumerProperties<MqttSourceProperties>, ExtendedProducerProperties<MqttSinkProperties>> {@Overridepublic ProducerDestination provisionProducerDestination(String name,ExtendedProducerProperties<MqttSinkProperties> properties) throws ProvisioningException {return new MqttTopicDestination(name);}@Overridepublic ConsumerDestination provisionConsumerDestination(String name, String group, ExtendedConsumerProperties<MqttSourceProperties> properties) throws ProvisioningException {return new MqttTopicDestination(name);}@RequiredArgsConstructorprivate class MqttTopicDestination implements ProducerDestination , ConsumerDestination{private final String destination;@Overridepublic String getName() {return this.destination.trim();}@Overridepublic String getNameForPartition(int partition) {throw new UnsupportedOperationException("Partitioning is not implemented for mqtt");}}
}
配置 spring.binders
mqtt:\
com.sheunglaili.binder.mqtt.config.MqttBinderConfiguration配置如下:
spring.cloud.stream.binders.mqtt1.type=mqtt
spring.cloud.stream.binders.mqtt1.environment.spring.cloud.stream.mqtt.url=tcp://localhost:1883
spring.cloud.stream.binders.mqtt1.environment.spring.cloud.stream.mqtt.username=admin
spring.cloud.stream.binders.mqtt1.environment.spring.cloud.stream.mqtt.password=admin记得,不要扫描到BinderConfiguration,xxBinderConfiguration 是在binderService动态配置的,具体构建Binder在这,如果扫描到BinderConfiguration类,此处binders.size就不是0了

相关文章:
spring cloud stream 自定义binder
背景xxx,关键字 binder stream ,解决多中间件通信及切换问题直接主菜:spring cloud stream 架构中间件 --- binder --- channel --- sink --- (处理)---source ---channel ---binder ---中间件 springcloudstream已自己集成了kafk…...
计算机网络之HTTP协议
目录 一、HTTP的含义 1.1 理解超文本 1.2 理解应用层协议 1.3 理解HTTP协议的工作过程 二、HTTP协议格式 2.1 抓包工具的使用 2.2 理解协议格式 2.2.1 请求协议格式 2.2.2. 响应格式请求 一、HTTP的含义 HTTP(全称为“超文本传输协议”)&#x…...
如何挖掘专利创新点?
“无意中发现了一个巨牛的人工智能教程,忍不住分享一下给大家。教程不仅是零基础,通俗易懂,而且非常风趣幽默,像看小说一样!觉得太牛了,所以分享给大家。点这里可以跳转到教程。” 对于广大的软件工程师来说…...
虚函数和纯虚函数
多态(polymorphism)是面向对象编程语言的一大特点,而虚函数是实现多态的机制。其核心理念就是通过基类访问派生类定义的函数。多态性使得程序调用的函数是在运行时动态确定的,而不是在编译时静态确定的。使用一个基类类型的指针或…...
Framework源码面试——Handler与事件传递机制面试集合
Handler面试题 Handler的作用: 当我们需要在子线程处理耗时的操作(例如访问网络,数据库的操作),而当耗时的操作完成后,需要更新UI,这就需要使用Handler来处理,因为子线程不能做更新…...
iOS开发-bugly符号表自动上传发布自动化shell
这里介绍的是通过build得到的app文件和dSYM文件来打包分发和符号表上传。 通过Archive方式打包和获得符号表的方式以后再说。 一:bugly工具jar包准备 bugly符号表工具下载地址:(下载完成后放入项目目录下,如不想加入git可通过gitIgnore忽略…...
MySQL OCP888题解046-哪些语句会被记录到binlog
文章目录1、原题1.1、英文原题1.2、中文翻译1.3、答案2、题目解析2.1、题干解析2.2、选项解析3、知识点3.1、知识点1:binlog_format选项3.2、知识点2:Performance Schema(性能模式)4、总结1、原题 1.1、英文原题 You enable binary logging on MySQL S…...
【前端学习】D5:CSS进阶
文章目录前言系列文章目录1 精灵图Sprites1.1 为什么需要精灵图?1.2 精灵图的使用2 字体图标iconfont2.1 字体图标的产生2.2 字体图标的优点2.3 字体文件格式2.4 字体图标的使用2.5 字体图标的引入2.6 字体图标的追加3 CSS三角3.1 普通三角3.2 案例4 CSS用户界面样式…...
【bioinfo】融合检测软件FusionMap分析流程和报告结果
文章目录写在前面FusionMap融合检测原理FusionMap与其他软比较FusionMap分析流程FusionMap结果文件说明FusionMap mono CUP设置图片来源: https://en.wikipedia.org/wiki/Fusion_gene写在前面 下面主要内容是关于RNA-seq数据分析融合,用到软件是FusionMap 【Fusion…...
C++基础了解-17-C++日期 时间
C日期 & 时间 一、C日期 & 时间 C 标准库没有提供所谓的日期类型。C 继承了 C 语言用于日期和时间操作的结构和函数。为了使用日期和时间相关的函数和结构,需要在 C 程序中引用 头文件。 有四个与时间相关的类型:clock_t、time_t、size_t 和 …...
MOV压敏电阻的几种电路元件功能及不同优势讲解
压敏电阻,通常是电路为防护浪涌冲击电压而使用的一种电子元器件,相比其他的浪涌保护器来说,也有那么几个不一样的优势,那么,具体有哪些?以及关于它的作用,你都知道吗?以下优恩小编为…...
uniapp+uniCloud实战项目报修小程序开发
前言 本项目基于 uniapp uniCloud 云开发,简单易用,逻辑主要是云数据库的增删查改,页面大部分自写,部分使用uniUI, uView 组件库。大家可用于学习或者二次开发,有什么不懂的地方可联系 wechat:MrYe443。用…...
演唱会的火车票没了?Python实现12306查票以及zidong购票....
嗨害大家好!我是小熊猫~ 不知道大家抢到演唱会的门票没有呢? 不管抢到没有,火车票也是很重要的哇 24小时抢票不间断的那种喔~ ~ ~ 不然可就要走路去了喔~ 准备工作 环境 Python 3.8Pycharm 插件 谷歌浏览器驱动 模块 需要安装的第三方模块&am…...
Linux发行版本与发行版的简单的介绍
Linux linux下有很多发行的版本,或者称之为魔改版本。以下介绍一些常见的版本,以避免名词的混淆。 linux是提供了一个内核,就像是谷歌的内核一样,QQ浏览器就是使用的谷歌的内核,也算是一个发行版本。 Ubuntu&#x…...
前后端分离项目学习-vue+springboot 博客
前后端分离项目 文章总体分为2大部分,Java后端接口和vue前端页面 项目演示:www.markerhub.com:8084/blogs Java后端接口开发 1、前言 从零开始搭建一个项目骨架,最好选择合适,熟悉的技术,并且在未来易拓展…...
关于指针运算的一道题
目录 刚看到这道题的时候我也和大多数小白一样感到无从下手,但是在我写这篇博客的前几分钟开始我对这道题有了一点点的理解。所以我就想着趁热打铁,写一篇博客来记录一下我的想法。 题目如下: 画图: 逐一解答: 题一…...
【论文简述】Learning Optical Flow with Kernel Patch Attention(CVPR 2022)
一、论文简述 1. 第一作者:Ao Luo 2. 发表年份:2022 3. 发表期刊:CVPR 4. 关键词:光流、局部注意力、空间关联、上下文关联 5. 探索动机:现有方法主要将光流估计视为特征匹配任务,即学习在特征空间中将…...
Java学习-MySQL-列的数据类型
Java学习-MySQL-列的数据类型 数值 tinyint - 1个字节smallint - 2个字节mediumint - 3个字节int - 4个字节bigint - 8个字节float - 4个字节double - 8个字节decimal - 字符串形式的浮点数 字符串 char - 0~255varchar - 可变字符串 0~65535tinytext - 微型文本 2^8-1text…...
终端配色-Docker容器终端
20230309 - 0. 引言 平时使用SSH,通常都是使用securecrt来用,毕竟也算是之前windows下一种使用的工具,在mac下使用还算方便;进入终端后,可以通过调整配色来调整编程环境。平时经常使用屎黄色的那种配色,毕…...
SQL基础培训04-插入数据
知识点: 假设有订单表 CREATE TABLE SEOrder ( FID int identity(...
FVCOM-FABM耦合器实战:手把手教你配置ERSEM生态模型(附避坑指南)
FVCOM-FABM耦合器实战:手把手教你配置ERSEM生态模型(附避坑指南) 当海洋生态建模遇上高性能计算,FVCOM-FABM-ERSEM的组合正在成为水生生态系统模拟的黄金标准。这套工具链能够精确模拟从营养盐循环到浮游生物动态的复杂过程&#…...
别再傻傻分不清!一张图看懂PMOS、NMOS、CMOS在电路设计中的关键区别与选型
电子工程师必读:PMOS、NMOS与CMOS的实战选型指南 在电路设计的世界里,MOS管就像乐高积木中的基础模块,而PMOS、NMOS和CMOS则是三种最常用的"积木类型"。许多初学者在面对原理图上那些看似相似的符号时,常常感到困惑&…...
模型预测控制与神经控制屏障函数的融合应用
1. 项目概述:当模型预测控制遇上神经控制屏障函数在自动驾驶和机器人控制领域,模型预测控制(MPC)因其优秀的实时优化能力而广受青睐。但从业者都知道一个"公开的秘密"——传统MPC就像个近视的导航员,只能确保…...
Speechless:一键永久保存你的微博记忆,免费导出高质量PDF
Speechless:一键永久保存你的微博记忆,免费导出高质量PDF 【免费下载链接】Speechless 把新浪微博的内容,导出成 PDF 文件进行备份的 Chrome Extension。 项目地址: https://gitcode.com/gh_mirrors/sp/Speechless 在数字记忆日益珍贵…...
“房东“骗完租客,转头问AI“会被抓吗“?警方:这就来告诉你答案
一场堪称"教科书级"的黑色幽默2026年5月,杭州上城区发生了一起让人哭笑不得的案件。一个骗子刚刚诈骗完租客,转头打开AI,小心翼翼地问了一句:"我朋友骗了人,会被抓吗?"然后——警察破门…...
Piccolo-FIM:DRAM细粒度访问优化技术解析
1. 现代DRAM架构的细粒度访问挑战在传统DRAM架构中,数据访问的最小单位通常是一个完整的行(Row),这种粗粒度的访问机制在处理图计算等不规则访问模式时暴露出了明显的效率问题。当需要随机访问内存中的离散数据时,系统…...
Ai小程序入门00-初识AI编程(小白入门:不懂代码也能做小程序?AI编程到底怎么玩)
Ai小程序入门00-初识AI编程(小白入门:不懂代码也能做小程序?AI编程到底怎么玩) 📌 文章简介:很多人都有一个"做个小程序赚钱"或"实现自己创意"的梦想,但往往被复杂的代码、繁琐的环境配置劝退。如今,AI 编程工具(如 Cursor、Claude 等)彻底改变…...
图片怎么去水印?2026图片去水印方法实测 + 好用工具推荐
图片怎么去水印?2026图片去水印方法实测 好用工具推荐 前言 日常刷图、做设计、整理相册,总免不了碰到这个问题:图片上有水印,该怎么去掉?无论是摄影平台的版权标识、相机自动打上的日期戳、App 角标,还是…...
为什么92%的Discord AI机器人3天内被封禁?ChatGPT合规集成的4个硬性红线,开发者必查
更多请点击: https://intelliparadigm.com 第一章:为什么92%的Discord AI机器人3天内被封禁?ChatGPT合规集成的4个硬性红线,开发者必查 Discord 并非开放沙盒——其 API 政策与《Developer Terms of Service》明确禁止未经用户明…...
三步轻松上手:BilldDesk Pro开源远程桌面控制工具完整指南
三步轻松上手:BilldDesk Pro开源远程桌面控制工具完整指南 【免费下载链接】billd-desk 基于Vue3 WebRTC Nodejs Flutter搭建的远程桌面控制、游戏串流 项目地址: https://gitcode.com/gh_mirrors/bi/billd-desk 如果你正在寻找一款功能强大且完全免费的跨…...
