[ Spring ] Spring Cloud Alibaba Message Stream Binder for RocketMQ 2025
文章目录
- Introduce
- Project Structure
- Declare Plugins and Modules
- Apply Plugins and Add Dependencies
- Sender Properties
- Sender Application
- Sender Controller
- Receiver Properties
- Receiver Application
- Receiver Message Handler
- Congratulations
- Automatically Send Message By Interval
- Type Adapter for Payload
- Send Message Model as JSON
- Receive JSON as Message Model
Introduce
spring-cloud-starter-stream
have a great change since version 4.x
most annotations like @EnableBinding
@Input
@Output
@StreamListener
were all removed
this blog is about stream-rocketmq
, but also fit for stream-kafaka
just migrate dependency from rocketmq
to kafaka
Project Structure
- stream-binder-sender : rocket message sender
- stream-binder-receiver : rocket message receiver
Declare Plugins and Modules
pluginManagement {repositories {gradlePluginPortal()google()mavenCentral()}
}dependencyResolutionManagement {repositoriesMode = RepositoriesMode.PREFER_SETTINGSrepositories {gradlePluginPortal()google()mavenCentral()}
}buildscript {repositories {gradlePluginPortal()google()mavenCentral()}
}plugins {id("org.jetbrains.kotlin.jvm") version "2.0.21" apply falseid("org.jetbrains.kotlin.kapt") version "2.0.21" apply falseid("org.jetbrains.kotlin.plugin.spring") version "2.0.21" apply falseid("org.springframework.boot") version "3.4.1" apply false
}include("stream-binder-sender")
include("stream-binder-receiver")
Apply Plugins and Add Dependencies
plugins {id("org.jetbrains.kotlin.jvm")id("org.jetbrains.kotlin.kapt")id("org.jetbrains.kotlin.plugin.spring")id("org.springframework.boot")
}java {toolchain {languageVersion = JavaLanguageVersion.of(17)}
}dependencies {val springBootVersion = "3.4.1"val springCloudVersion = "4.2.0"val springCloudAlibabaVersion = "2023.0.3.2"// commonsapi("io.github.hellogoogle2000:kotlin-commons:1.0.19")// kotlinapi("org.jetbrains.kotlin:kotlin-reflect:2.0.21")// springapi("org.springframework.boot:spring-boot-starter:$springBootVersion")api("org.springframework.boot:spring-boot-starter-web:$springBootVersion")api("org.springframework.cloud:spring-cloud-starter-bootstrap:$springCloudVersion")// spring cloud stream binderapi("com.alibaba.cloud:spring-cloud-starter-stream-rocketmq:$springCloudAlibabaVersion")
}
Sender Properties
configTopicSender-out
is the name for customized output binding object
# service
server.port=10003
spring.application.name=stream-binder-sender
# stream binder
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.destination=topic-config
spring.cloud.stream.bindings.configTopicProducer-out.content-type=text/plain
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
Sender Application
package x.spring.helloimport org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication@SpringBootApplication
class StreamBinderSenderApplicationfun main(args: Array<String>) {runApplication<StreamBinderSenderApplication>(*args)
}
Sender Controller
the binding name for sending should be same with output name in properties
package x.spring.hello.controllerimport org.springframework.beans.factory.annotation.Autowired
import org.springframework.cloud.stream.function.StreamBridge
import org.springframework.messaging.support.MessageBuilder
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RestController@RestController
class MessageSendController {@Autowiredprivate lateinit var bridge: StreamBridge@GetMapping("send")fun send(): String {val payload = "config"val message = MessageBuilder.withPayload(payload).build()bridge.send("configTopicProducer-out", message)return "send successfully"}
}
Receiver Properties
plainTextConsumer
is the name of message handler function
remember it and you should implement it by yourself
you can define multiple message handler functions, and split with ,
plainTextConsumer-in-0
is the name of input binding object
its format is constrained to format of <definition>-in-<index>
# service
server.port=10004
spring.application.name=stream-binder-receiver
# stream binder
spring.cloud.function.definition=configTopicConsumer
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.destination=topic-config
spring.cloud.stream.bindings.configTopicConsumer-in-0.content-type=text/plain
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
Receiver Application
package x.spring.helloimport org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication@SpringBootApplication
class StreamBinderReceiverApplicationfun main(args: Array<String>) {runApplication<StreamBinderReceiverApplication>(*args)
}
Receiver Message Handler
function name correspond to properties specified by spring.cloud.function.definition
property
package x.spring.hello.componentimport org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
import java.util.function.Consumer@Component
class MessageConsumerObject {@Bean("configTopicConsumer")fun configTopicConsumer(): Consumer<Message<String>> {return Consumer<Message<String>> { message ->val payload = message.payloadprintln("consumer receive config topic message: $payload")}}
}
Congratulations
now, you have get known about basic usage of message binder
do not modify demos above, it may cause a failure, and waste lots of time
try your own ways, let them run out first
let us try some advanced way, after achieve goals above
Automatically Send Message By Interval
register a supplier object to automatically generate heartbeat message
package x.spring.hello.componentimport org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.messaging.MessageHeaders
import org.springframework.messaging.support.MessageBuilder
import org.springframework.stereotype.Component
import org.springframework.util.MimeTypeUtils
import java.util.function.Supplier@Component
class MessageSupplierObject {@Beanfun heartPacketProducer(): Supplier<Message<String>> {return Supplier<Message<String>> {println("send heart packet message")val payload = "heart"val message = MessageBuilder.withPayload(payload).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN).build()return@Supplier message}}
}
update properties of sender project, add a output binding object named heartPacketProducer
# service
server.port=10003
spring.application.name=stream-binder-sender
# stream binder
spring.cloud.function.definition=heartPacketProducer
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.destination=topic-config
spring.cloud.stream.bindings.configTopicProducer-out.content-type=text/plain
spring.cloud.stream.bindings.configTopicProducer-out.consumer.concurrency=100
spring.cloud.stream.bindings.heartPacketProducer-out-0.binder=rocketmq
spring.cloud.stream.bindings.heartPacketProducer-out-0.destination=topic-heart
spring.cloud.stream.bindings.heartPacketProducer-out-0.content-type=text/plain
spring.cloud.stream.bindings.heartPacketProducer-out-0.consumer.concurrency=100
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
update properties of receiver project, add a input binding object named heartPacketConsumer
# service
server.port=10004
spring.application.name=stream-binder-receiver
# stream binder
spring.cloud.function.definition=configTopicConsumer;heartPacketConsumer
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.destination=topic-config
spring.cloud.stream.bindings.configTopicConsumer-in-0.content-type=text/plain
spring.cloud.stream.bindings.heartPacketConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.heartPacketConsumer-in-0.destination=topic-heart
spring.cloud.stream.bindings.heartPacketConsumer-in-0.content-type=text/plain
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
register message handler function for receiver project
package x.spring.hello.componentimport org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
import java.util.function.Consumer@Component
class MessageConsumerObject {@Bean("heartPacketConsumer")fun heartPacketConsumer(): Consumer<Message<String>> {return Consumer<Message<String>> { message ->val payload = message.payloadprintln("consumer receive heart packet message: $payload")}}@Bean("configTopicConsumer")fun configTopicConsumer(): Consumer<Message<String>> {return Consumer<Message<String>> { message ->val payload = message.payloadprintln("consumer receive config topic message: $payload")}}
}
Type Adapter for Payload
this enable your auto send and receive advanced object like class/json/xml
put this adapter file into both sender project and receiver object
package x.spring.hello.componentimport org.springframework.context.annotation.Bean
import org.springframework.stereotype.Component
import x.kotlin.commons.serialize.JSON.fromJson
import x.kotlin.commons.serialize.JSON.toJson
import x.spring.hello.model.ConfigModel
import java.util.function.Function@Component
class MessageModelAdapter {@Beanfun configModelConvertor1(): Function<ConfigModel, String> {return Function { it.toJson() }}@Beanfun configModelConvertor2(): Function<String, ConfigModel> {return Function { it.fromJson(ConfigModel::class.java) }}
}
Send Message Model as JSON
@GetMapping("send2")
fun send2(): String {val payload = ConfigModel()payload.username = "admin"payload.password = "123456"val message = MessageBuilder.withPayload(payload).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build()bridge.send("configModelProducer-out", message)return "send successfully"
}
spring.cloud.stream.bindings.configModelProducer-out.binder=rocketmq
spring.cloud.stream.bindings.configModelProducer-out.destination=topic-config-model
spring.cloud.stream.bindings.configModelProducer-out.content-type=application/json
spring.cloud.stream.bindings.configModelProducer-out.consumer.concurrency=100
Receive JSON as Message Model
@Bean
fun configModelConsumer(): Consumer<Message<ConfigModel>> {return Consumer<Message<ConfigModel>> { message ->val payload = message.payload.toJson()println("consumer receive config model message: $payload")}
}
spring.cloud.function.definition=configModelConsumer
spring.cloud.stream.bindings.configModelConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.configModelConsumer-in-0.destination=topic-config-model
spring.cloud.stream.bindings.configModelConsumer-in-0.content-type=application/json
相关文章:
[ Spring ] Spring Cloud Alibaba Message Stream Binder for RocketMQ 2025
文章目录 IntroduceProject StructureDeclare Plugins and ModulesApply Plugins and Add DependenciesSender PropertiesSender ApplicationSender ControllerReceiver PropertiesReceiver ApplicationReceiver Message HandlerCongratulationsAutomatically Send Message By …...

ubuntu 更新24LTS中断导致“系统出错且无法恢复,请联系系统管理员”
22LTS to 24LTS 更新过程中手jian把更新程序controlC导致的。 解决 目前企图完成更新来恢复,重启后有软件包冲突,sudo apt upgrade报冲突。无法进行。 将原来source.list重新 sudo dpkg --configure -a sudo apt install -f 这些都不管用。还是显示gno…...
力扣-链表-203 移除链表元素
思路1 处理头节点,然后遍历下一个节点,只有确保下一个节点不是要移除的节点时再跳到下一个节点 代码1 class Solution { public:ListNode* removeElements(ListNode* head, int val) {while(head ! nullptr && head->val val){head head…...

Unity中关于实现 管道水流+瀑布流动+大肠蠕动效果笔记
Unity中关于实现 管道水流瀑布流动大肠蠕动效果笔记 效果展示: 参考资料及链接: 1、如何在 Unity 中创建水效果 - 水弯曲教程 https://www.youtube.com/watch?v3CcWus6d_B8 关于补充个人技能中:顶点噪波影响网格着色器配合粒子实现水特效 …...

宏_wps_宏修改word中所有excel表格的格式_设置字体对齐格式_删除空行等
需求: 将word中所有excel表格的格式进行统一化,修改其中的数字类型为“宋体, 五号,右对齐, 不加粗,不倾斜”,其中的中文为“宋体, 五号, 不加粗,不倾斜” 数…...

Linux——网络(udp)
文章目录 目录 文章目录 前言 一、upd函数及接口介绍 1. 创建套接字 - socket 函数 2. 绑定地址和端口 - bind 函数 3. 发送数据 - sendto 函数 4. 接收数据 - recvfrom 函数 5. 关闭套接字 - close 函数 二、代码示例 1.服务端 2.客户端 总结 前言 Linux——网络基础…...

Oracle-Java JDBC 连接超时之后的认知纠正
背景 偶然读到熊老师的文章《老熊的三分地-JDBC中语句超时与事务》了解到:JAVA代码的最后正常断开数据库连接,在默认情况下,正常断开的数据库连接会自动提交没有提交的事务。 通过文章的测试JAVA程序,可以表明,JDB…...
自定义数据集使用框架的线性回归方法对其进行拟合
代码 import torch import numpy as np import torch.nn as nncriterion nn.MSELoss()data np.array([[-0.5, 7.7],[1.8, 98.5],[0.9, 57.8],[0.4, 39.2],[-1.4, -15.7],[-1.4, -37.3],[-1.8, -49.1],[1.5, 75.6],[0.4, 34.0],[0.8, 62.3]])x_data data[:, 0] y_data data…...

15天基础内容-5
day13 【String类、StringBuilder类】 主要内容 String类常用方法【重点】 String类案例【重点】 StringBuilder类【重点】 StringBuilder类常用方法【重点: append】 StringBuilder类案例【理解】 第一章String类 1.1 String类的判断方法 String类实现判断功能…...

82,【6】BUUCTF WEB .[CISCN2019 华东南赛区]Double Secret
进入靶场 提到了secret,那就访问 既然这样,那就传参看能不能报错 这个页面证明是有用的 传参长一点就会报错,传什么内容无所谓 所以网站是flask框架写的 有一个颜色深一点,点开看看 rc4加密url编码 import base64 from urllib…...

Android WebView 中网页被劫持的原因及解决方案
文章目录 一、原因分析二、解决方案一览三、解决方案代码案例3.1 使用 HTTPS3.2 验证 URL3.3 禁用 JavaScript3.4 使用安全的 WebView 设置3.5 监控网络请求3.6 使用安全的 DNS 四、案例深入分析4.1 问题4.2 分析 五、结论 在 Android 应用开发中,WebView 是一个常用…...
特朗普政府将开展新网络攻击
近日,特朗普政府已表态:减少物理战争,网络战将代替,以实现美国的全球优势。 特朗普也指示美国网络司令部可以在没有总统批准的情况下开展更广泛行动,尤其是应对一些突发事件,这其实成为了后续美国通过网络…...

快递代取项目Uniapp+若依后端管理
快递接单代取得uniappspringboot项目 实际效果图...

arcgis短整型变为长整型的处理方式
1.用QGIS的重构字段工具进行修改,亲测比arcgis的更改字段工具有用 2.更换低版本的arcgis10.2.2,亲测10.5和10.6都有这个毛病,虽然官方文档里面说的是10.6.1及以上 Arcgis10.2.2百度链接:https://pan.baidu.com/s/1HYTwgnBJsBug…...

06、Redis相关概念:缓存击穿、雪崩、穿透、预热、降级、一致性等
Redis相关概念:缓存击穿、雪崩、穿透、预热、降级、一致性等 Redis缓存雪崩、缓存击穿、缓存预热热点key、缓存降级、短链接、分布式锁秒杀、预减库存、 堆外缓存Redis架构设计、Redis动态刷新、Redis和DB双写一致性、过期删除策略、集群数据倾斜等一、缓存雪崩 缓…...
嵌入式基础 -- PCIe 控制器中断管理之MSI与MSI-X简介
PCIe 控制器中断管理技术文档 1. 背景 在现代计算机系统中,中断是设备与 CPU 通信的重要机制,PCIe 控制器提供了从传统线中断到基于消息的中断(MSI/MSI-X)的演进,以提升中断处理效率和可扩展性。x86 和 ARM 架构虽然…...

websocket实现
由于安卓资源管理器展示的路径不尽相同,各种软件保存文件的位置也不一定一样.对于普通用户上传文件时,查找文件可能是一个麻烦的事情.后来想到了一个办法,使用pc端进行辅助上传. 文章目录 实现思路1.0 实现定义web与客户端通信数据类型和数据格式web端websocket实现web端对客户…...

unity学习20:time相关基础 Time.time 和 Time.deltaTime
目录 1 unity里的几种基本时间 1.1 time 相关测试脚本 1.2 游戏开始到现在所用的时间 Time.time 1.3 时间缩放值 Time.timeScale 1.4 固定时间间隔 Time.fixedDeltaTime 1.5 两次响应时间之间的间隔:Time.deltaTime 1.6 对应测试代码 1.7 需要关注的2个基本…...

【C++】特殊类设计、单例模式与类型转换
目录 一、设计一个类不能被拷贝 (一)C98 (二)C11 二、设计一个类只能在堆上创建对象 (一)将构造函数私有化,对外提供接口 (二)将析构函数私有化 三、设计一个类只…...

scratch七彩六边形 2024年12月scratch三级真题 中国电子学会 图形化编程 scratch三级真题和答案解析
目录 scratch七彩六边形 一、题目要求 1、准备工作 2、功能实现 二、案例分析 1、角色分析 2、背景分析 3、前期准备 三、解题思路 1、思路分析 2、详细过程 四、程序编写 五、考点分析 六、推荐资料 1、入门基础 2、蓝桥杯比赛 3、考级资料 4、视频课程 5、…...
设计模式和设计原则回顾
设计模式和设计原则回顾 23种设计模式是设计原则的完美体现,设计原则设计原则是设计模式的理论基石, 设计模式 在经典的设计模式分类中(如《设计模式:可复用面向对象软件的基础》一书中),总共有23种设计模式,分为三大类: 一、创建型模式(5种) 1. 单例模式(Sing…...

超短脉冲激光自聚焦效应
前言与目录 强激光引起自聚焦效应机理 超短脉冲激光在脆性材料内部加工时引起的自聚焦效应,这是一种非线性光学现象,主要涉及光学克尔效应和材料的非线性光学特性。 自聚焦效应可以产生局部的强光场,对材料产生非线性响应,可能…...

python/java环境配置
环境变量放一起 python: 1.首先下载Python Python下载地址:Download Python | Python.org downloads ---windows -- 64 2.安装Python 下面两个,然后自定义,全选 可以把前4个选上 3.环境配置 1)搜高级系统设置 2…...
STM32+rt-thread判断是否联网
一、根据NETDEV_FLAG_INTERNET_UP位判断 static bool is_conncected(void) {struct netdev *dev RT_NULL;dev netdev_get_first_by_flags(NETDEV_FLAG_INTERNET_UP);if (dev RT_NULL){printf("wait netdev internet up...");return false;}else{printf("loc…...
关于 WASM:1. WASM 基础原理
一、WASM 简介 1.1 WebAssembly 是什么? WebAssembly(WASM) 是一种能在现代浏览器中高效运行的二进制指令格式,它不是传统的编程语言,而是一种 低级字节码格式,可由高级语言(如 C、C、Rust&am…...

多种风格导航菜单 HTML 实现(附源码)
下面我将为您展示 6 种不同风格的导航菜单实现,每种都包含完整 HTML、CSS 和 JavaScript 代码。 1. 简约水平导航栏 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport&qu…...
CSS设置元素的宽度根据其内容自动调整
width: fit-content 是 CSS 中的一个属性值,用于设置元素的宽度根据其内容自动调整,确保宽度刚好容纳内容而不会超出。 效果对比 默认情况(width: auto): 块级元素(如 <div>)会占满父容器…...

Mysql中select查询语句的执行过程
目录 1、介绍 1.1、组件介绍 1.2、Sql执行顺序 2、执行流程 2.1. 连接与认证 2.2. 查询缓存 2.3. 语法解析(Parser) 2.4、执行sql 1. 预处理(Preprocessor) 2. 查询优化器(Optimizer) 3. 执行器…...
C++.OpenGL (20/64)混合(Blending)
混合(Blending) 透明效果核心原理 #mermaid-svg-SWG0UzVfJms7Sm3e {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-SWG0UzVfJms7Sm3e .error-icon{fill:#552222;}#mermaid-svg-SWG0UzVfJms7Sm3e .error-text{fill…...
08. C#入门系列【类的基本概念】:开启编程世界的奇妙冒险
C#入门系列【类的基本概念】:开启编程世界的奇妙冒险 嘿,各位编程小白探险家!欢迎来到 C# 的奇幻大陆!今天咱们要深入探索这片大陆上至关重要的 “建筑”—— 类!别害怕,跟着我,保准让你轻松搞…...