当前位置: 首页 > news >正文

[ Spring ] Spring Cloud Alibaba Message Stream Binder for RocketMQ 2025

文章目录

          • Introduce
          • Project Structure
          • Declare Plugins and Modules
          • Apply Plugins and Add Dependencies
          • Sender Properties
          • Sender Application
          • Sender Controller
          • Receiver Properties
          • Receiver Application
          • Receiver Message Handler
          • Congratulations
          • Automatically Send Message By Interval
          • Type Adapter for Payload
          • Send Message Model as JSON
          • Receive JSON as Message Model

Introduce

spring-cloud-starter-stream have a great change since version 4.x

most annotations like @EnableBinding @Input @Output @StreamListener were all removed

this blog is about stream-rocketmq, but also fit for stream-kafaka

just migrate dependency from rocketmq to kafaka

Project Structure
  • stream-binder-sender : rocket message sender
  • stream-binder-receiver : rocket message receiver
Declare Plugins and Modules
pluginManagement {repositories {gradlePluginPortal()google()mavenCentral()}
}dependencyResolutionManagement {repositoriesMode = RepositoriesMode.PREFER_SETTINGSrepositories {gradlePluginPortal()google()mavenCentral()}
}buildscript {repositories {gradlePluginPortal()google()mavenCentral()}
}plugins {id("org.jetbrains.kotlin.jvm") version "2.0.21" apply falseid("org.jetbrains.kotlin.kapt") version "2.0.21" apply falseid("org.jetbrains.kotlin.plugin.spring") version "2.0.21" apply falseid("org.springframework.boot") version "3.4.1" apply false
}include("stream-binder-sender")
include("stream-binder-receiver")
Apply Plugins and Add Dependencies
plugins {id("org.jetbrains.kotlin.jvm")id("org.jetbrains.kotlin.kapt")id("org.jetbrains.kotlin.plugin.spring")id("org.springframework.boot")
}java {toolchain {languageVersion = JavaLanguageVersion.of(17)}
}dependencies {val springBootVersion = "3.4.1"val springCloudVersion = "4.2.0"val springCloudAlibabaVersion = "2023.0.3.2"// commonsapi("io.github.hellogoogle2000:kotlin-commons:1.0.19")// kotlinapi("org.jetbrains.kotlin:kotlin-reflect:2.0.21")// springapi("org.springframework.boot:spring-boot-starter:$springBootVersion")api("org.springframework.boot:spring-boot-starter-web:$springBootVersion")api("org.springframework.cloud:spring-cloud-starter-bootstrap:$springCloudVersion")// spring cloud stream binderapi("com.alibaba.cloud:spring-cloud-starter-stream-rocketmq:$springCloudAlibabaVersion")
}
Sender Properties

configTopicSender-out is the name for customized output binding object

# service
server.port=10003
spring.application.name=stream-binder-sender
# stream binder
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.destination=topic-config
spring.cloud.stream.bindings.configTopicProducer-out.content-type=text/plain
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
Sender Application
package x.spring.helloimport org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication@SpringBootApplication
class StreamBinderSenderApplicationfun main(args: Array<String>) {runApplication<StreamBinderSenderApplication>(*args)
}
Sender Controller

the binding name for sending should be same with output name in properties

package x.spring.hello.controllerimport org.springframework.beans.factory.annotation.Autowired
import org.springframework.cloud.stream.function.StreamBridge
import org.springframework.messaging.support.MessageBuilder
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RestController@RestController
class MessageSendController {@Autowiredprivate lateinit var bridge: StreamBridge@GetMapping("send")fun send(): String {val payload = "config"val message = MessageBuilder.withPayload(payload).build()bridge.send("configTopicProducer-out", message)return "send successfully"}
}
Receiver Properties

plainTextConsumer is the name of message handler function

remember it and you should implement it by yourself

you can define multiple message handler functions, and split with ,

plainTextConsumer-in-0 is the name of input binding object

its format is constrained to format of <definition>-in-<index>

# service
server.port=10004
spring.application.name=stream-binder-receiver
# stream binder
spring.cloud.function.definition=configTopicConsumer
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.destination=topic-config
spring.cloud.stream.bindings.configTopicConsumer-in-0.content-type=text/plain
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
Receiver Application
package x.spring.helloimport org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication@SpringBootApplication
class StreamBinderReceiverApplicationfun main(args: Array<String>) {runApplication<StreamBinderReceiverApplication>(*args)
}
Receiver Message Handler

function name correspond to properties specified by spring.cloud.function.definition property

package x.spring.hello.componentimport org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
import java.util.function.Consumer@Component
class MessageConsumerObject {@Bean("configTopicConsumer")fun configTopicConsumer(): Consumer<Message<String>> {return Consumer<Message<String>> { message ->val payload = message.payloadprintln("consumer receive config topic message: $payload")}}
}
Congratulations

now, you have get known about basic usage of message binder

do not modify demos above, it may cause a failure, and waste lots of time

try your own ways, let them run out first

let us try some advanced way, after achieve goals above

Automatically Send Message By Interval

register a supplier object to automatically generate heartbeat message

package x.spring.hello.componentimport org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.messaging.MessageHeaders
import org.springframework.messaging.support.MessageBuilder
import org.springframework.stereotype.Component
import org.springframework.util.MimeTypeUtils
import java.util.function.Supplier@Component
class MessageSupplierObject {@Beanfun heartPacketProducer(): Supplier<Message<String>> {return Supplier<Message<String>> {println("send heart packet message")val payload = "heart"val message = MessageBuilder.withPayload(payload).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN).build()return@Supplier message}}
}

update properties of sender project, add a output binding object named heartPacketProducer

# service
server.port=10003
spring.application.name=stream-binder-sender
# stream binder
spring.cloud.function.definition=heartPacketProducer
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.destination=topic-config
spring.cloud.stream.bindings.configTopicProducer-out.content-type=text/plain
spring.cloud.stream.bindings.configTopicProducer-out.consumer.concurrency=100
spring.cloud.stream.bindings.heartPacketProducer-out-0.binder=rocketmq
spring.cloud.stream.bindings.heartPacketProducer-out-0.destination=topic-heart
spring.cloud.stream.bindings.heartPacketProducer-out-0.content-type=text/plain
spring.cloud.stream.bindings.heartPacketProducer-out-0.consumer.concurrency=100
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876

update properties of receiver project, add a input binding object named heartPacketConsumer

# service
server.port=10004
spring.application.name=stream-binder-receiver
# stream binder
spring.cloud.function.definition=configTopicConsumer;heartPacketConsumer
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.destination=topic-config
spring.cloud.stream.bindings.configTopicConsumer-in-0.content-type=text/plain
spring.cloud.stream.bindings.heartPacketConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.heartPacketConsumer-in-0.destination=topic-heart
spring.cloud.stream.bindings.heartPacketConsumer-in-0.content-type=text/plain
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876

register message handler function for receiver project

package x.spring.hello.componentimport org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
import java.util.function.Consumer@Component
class MessageConsumerObject {@Bean("heartPacketConsumer")fun heartPacketConsumer(): Consumer<Message<String>> {return Consumer<Message<String>> { message ->val payload = message.payloadprintln("consumer receive heart packet message: $payload")}}@Bean("configTopicConsumer")fun configTopicConsumer(): Consumer<Message<String>> {return Consumer<Message<String>> { message ->val payload = message.payloadprintln("consumer receive config topic message: $payload")}}
}
Type Adapter for Payload

this enable your auto send and receive advanced object like class/json/xml

put this adapter file into both sender project and receiver object

package x.spring.hello.componentimport org.springframework.context.annotation.Bean
import org.springframework.stereotype.Component
import x.kotlin.commons.serialize.JSON.fromJson
import x.kotlin.commons.serialize.JSON.toJson
import x.spring.hello.model.ConfigModel
import java.util.function.Function@Component
class MessageModelAdapter {@Beanfun configModelConvertor1(): Function<ConfigModel, String> {return Function { it.toJson() }}@Beanfun configModelConvertor2(): Function<String, ConfigModel> {return Function { it.fromJson(ConfigModel::class.java) }}
}
Send Message Model as JSON
@GetMapping("send2")
fun send2(): String {val payload = ConfigModel()payload.username = "admin"payload.password = "123456"val message = MessageBuilder.withPayload(payload).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build()bridge.send("configModelProducer-out", message)return "send successfully"
}
spring.cloud.stream.bindings.configModelProducer-out.binder=rocketmq
spring.cloud.stream.bindings.configModelProducer-out.destination=topic-config-model
spring.cloud.stream.bindings.configModelProducer-out.content-type=application/json
spring.cloud.stream.bindings.configModelProducer-out.consumer.concurrency=100
Receive JSON as Message Model
@Bean
fun configModelConsumer(): Consumer<Message<ConfigModel>> {return Consumer<Message<ConfigModel>> { message ->val payload = message.payload.toJson()println("consumer receive config model message: $payload")}
}
spring.cloud.function.definition=configModelConsumer
spring.cloud.stream.bindings.configModelConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.configModelConsumer-in-0.destination=topic-config-model
spring.cloud.stream.bindings.configModelConsumer-in-0.content-type=application/json

相关文章:

[ Spring ] Spring Cloud Alibaba Message Stream Binder for RocketMQ 2025

文章目录 IntroduceProject StructureDeclare Plugins and ModulesApply Plugins and Add DependenciesSender PropertiesSender ApplicationSender ControllerReceiver PropertiesReceiver ApplicationReceiver Message HandlerCongratulationsAutomatically Send Message By …...

ubuntu 更新24LTS中断导致“系统出错且无法恢复,请联系系统管理员”

22LTS to 24LTS 更新过程中手jian把更新程序controlC导致的。 解决 目前企图完成更新来恢复&#xff0c;重启后有软件包冲突&#xff0c;sudo apt upgrade报冲突。无法进行。 将原来source.list重新 sudo dpkg --configure -a sudo apt install -f 这些都不管用。还是显示gno…...

力扣-链表-203 移除链表元素

思路1 处理头节点&#xff0c;然后遍历下一个节点&#xff0c;只有确保下一个节点不是要移除的节点时再跳到下一个节点 代码1 class Solution { public:ListNode* removeElements(ListNode* head, int val) {while(head ! nullptr && head->val val){head head…...

Unity中关于实现 管道水流+瀑布流动+大肠蠕动效果笔记

Unity中关于实现 管道水流瀑布流动大肠蠕动效果笔记 效果展示&#xff1a; 参考资料及链接&#xff1a; 1、如何在 Unity 中创建水效果 - 水弯曲教程 https://www.youtube.com/watch?v3CcWus6d_B8 关于补充个人技能中&#xff1a;顶点噪波影响网格着色器配合粒子实现水特效 …...

宏_wps_宏修改word中所有excel表格的格式_设置字体对齐格式_删除空行等

需求&#xff1a; 将word中所有excel表格的格式进行统一化&#xff0c;修改其中的数字类型为“宋体&#xff0c; 五号&#xff0c;右对齐&#xff0c; 不加粗&#xff0c;不倾斜”&#xff0c;其中的中文为“宋体&#xff0c; 五号&#xff0c; 不加粗&#xff0c;不倾斜” 数…...

Linux——网络(udp)

文章目录 目录 文章目录 前言 一、upd函数及接口介绍 1. 创建套接字 - socket 函数 2. 绑定地址和端口 - bind 函数 3. 发送数据 - sendto 函数 4. 接收数据 - recvfrom 函数 5. 关闭套接字 - close 函数 二、代码示例 1.服务端 2.客户端 总结 前言 Linux——网络基础&#xf…...

Oracle-Java JDBC 连接超时之后的认知纠正

背景 偶然读到熊老师的文章《老熊的三分地-JDBC中语句超时与事务》了解到&#xff1a;JAVA代码的最后正常断开数据库连接&#xff0c;在默认情况下&#xff0c;正常断开的数据库连接会自动提交没有提交的事务。   通过文章的测试JAVA程序&#xff0c;可以表明&#xff0c;JDB…...

自定义数据集使用框架的线性回归方法对其进行拟合

代码 import torch import numpy as np import torch.nn as nncriterion nn.MSELoss()data np.array([[-0.5, 7.7],[1.8, 98.5],[0.9, 57.8],[0.4, 39.2],[-1.4, -15.7],[-1.4, -37.3],[-1.8, -49.1],[1.5, 75.6],[0.4, 34.0],[0.8, 62.3]])x_data data[:, 0] y_data data…...

15天基础内容-5

day13 【String类、StringBuilder类】 主要内容 String类常用方法【重点】 String类案例【重点】 StringBuilder类【重点】 StringBuilder类常用方法【重点&#xff1a; append】 StringBuilder类案例【理解】 第一章String类 1.1 String类的判断方法 String类实现判断功能…...

82,【6】BUUCTF WEB .[CISCN2019 华东南赛区]Double Secret

进入靶场 提到了secret&#xff0c;那就访问 既然这样&#xff0c;那就传参看能不能报错 这个页面证明是有用的 传参长一点就会报错&#xff0c;传什么内容无所谓 所以网站是flask框架写的 有一个颜色深一点&#xff0c;点开看看 rc4加密url编码 import base64 from urllib…...

Android WebView 中网页被劫持的原因及解决方案

文章目录 一、原因分析二、解决方案一览三、解决方案代码案例3.1 使用 HTTPS3.2 验证 URL3.3 禁用 JavaScript3.4 使用安全的 WebView 设置3.5 监控网络请求3.6 使用安全的 DNS 四、案例深入分析4.1 问题4.2 分析 五、结论 在 Android 应用开发中&#xff0c;WebView 是一个常用…...

特朗普政府将开展新网络攻击

近日&#xff0c;特朗普政府已表态&#xff1a;减少物理战争&#xff0c;网络战将代替&#xff0c;以实现美国的全球优势。 特朗普也指示美国网络司令部可以在没有总统批准的情况下开展更广泛行动&#xff0c;尤其是应对一些突发事件&#xff0c;这其实成为了后续美国通过网络…...

快递代取项目Uniapp+若依后端管理

快递接单代取得uniappspringboot项目 实际效果图...

arcgis短整型变为长整型的处理方式

1.用QGIS的重构字段工具进行修改&#xff0c;亲测比arcgis的更改字段工具有用 2.更换低版本的arcgis10.2.2&#xff0c;亲测10.5和10.6都有这个毛病&#xff0c;虽然官方文档里面说的是10.6.1及以上 Arcgis10.2.2百度链接&#xff1a;https://pan.baidu.com/s/1HYTwgnBJsBug…...

06、Redis相关概念:缓存击穿、雪崩、穿透、预热、降级、一致性等

Redis相关概念&#xff1a;缓存击穿、雪崩、穿透、预热、降级、一致性等 Redis缓存雪崩、缓存击穿、缓存预热热点key、缓存降级、短链接、分布式锁秒杀、预减库存、 堆外缓存Redis架构设计、Redis动态刷新、Redis和DB双写一致性、过期删除策略、集群数据倾斜等一、缓存雪崩 缓…...

嵌入式基础 -- PCIe 控制器中断管理之MSI与MSI-X简介

PCIe 控制器中断管理技术文档 1. 背景 在现代计算机系统中&#xff0c;中断是设备与 CPU 通信的重要机制&#xff0c;PCIe 控制器提供了从传统线中断到基于消息的中断&#xff08;MSI/MSI-X&#xff09;的演进&#xff0c;以提升中断处理效率和可扩展性。x86 和 ARM 架构虽然…...

websocket实现

由于安卓资源管理器展示的路径不尽相同,各种软件保存文件的位置也不一定一样.对于普通用户上传文件时,查找文件可能是一个麻烦的事情.后来想到了一个办法,使用pc端进行辅助上传. 文章目录 实现思路1.0 实现定义web与客户端通信数据类型和数据格式web端websocket实现web端对客户…...

unity学习20:time相关基础 Time.time 和 Time.deltaTime

目录 1 unity里的几种基本时间 1.1 time 相关测试脚本 1.2 游戏开始到现在所用的时间 Time.time 1.3 时间缩放值 Time.timeScale 1.4 固定时间间隔 Time.fixedDeltaTime 1.5 两次响应时间之间的间隔&#xff1a;Time.deltaTime 1.6 对应测试代码 1.7 需要关注的2个基本…...

【C++】特殊类设计、单例模式与类型转换

目录 一、设计一个类不能被拷贝 &#xff08;一&#xff09;C98 &#xff08;二&#xff09;C11 二、设计一个类只能在堆上创建对象 &#xff08;一&#xff09;将构造函数私有化&#xff0c;对外提供接口 &#xff08;二&#xff09;将析构函数私有化 三、设计一个类只…...

scratch七彩六边形 2024年12月scratch三级真题 中国电子学会 图形化编程 scratch三级真题和答案解析

目录 scratch七彩六边形 一、题目要求 1、准备工作 2、功能实现 二、案例分析 1、角色分析 2、背景分析 3、前期准备 三、解题思路 1、思路分析 2、详细过程 四、程序编写 五、考点分析 六、推荐资料 1、入门基础 2、蓝桥杯比赛 3、考级资料 4、视频课程 5、…...

DYOR 世茂集团 00813.HK

文章目录1. 公司概况&#xff1a;老牌闽系房企的沉浮1.1 简介1.2 股权结构1.2 核心资质与定位2. 财务表现&#xff1a;2025年成功扭亏为盈2.1 2025年核心财务数据2.2 收入结构变化&#xff1a;多元化成效显现2.3 偿债能力与流动性2.4 估值与市场表现3. 债务重组&#xff1a;境外…...

水下珍品目标检测数据集海胆(sea urchin),海参(sea cucumber),扇贝(scallop)总计796张图像,图像大小是1920×1080数据集是YOLO格式和VOC格式可直接

水下珍品目标检测数据集 海胆(sea urchin)&#xff0c;海参(sea cucumber)&#xff0c;扇贝(scallop) 总计796张图像&#xff0c;图像大小是19201080 数据集是YOLO格式和VOC格式 可直接进行YOLO检测&#xff0c;目前yolov5检测map高达0.91 图像是原始图像&#xff0c;未做清晰化…...

手把手教你用Matlab实现Demosaic算法:从RAW图像到RGB的完整转换流程

从RAW到RGB&#xff1a;Matlab实战Demosaic算法全解析 当你第一次看到相机拍摄的RAW文件时&#xff0c;可能会困惑于那些看似杂乱无章的像素排列。这正是Demosaic算法大显身手的时刻——它能将这些单色数据转化为我们熟悉的彩色图像。作为图像信号处理(ISP)流水线中的关键环节&…...

如何用Depressurizer拯救混乱的Steam游戏库?3个高效管理技巧

如何用Depressurizer拯救混乱的Steam游戏库&#xff1f;3个高效管理技巧 【免费下载链接】Depressurizer A Steam library categorizing tool. 项目地址: https://gitcode.com/gh_mirrors/de/Depressurizer 你是否也曾面对Steam库里上百款游戏却找不到想玩的那一款&…...

一篇文章带你了解 HTTP协议 !!!

引言在 Web 开发体系中&#xff0c;HTTP 协议作为前后端数据交互的核心规范&#xff0c;定义了请求与响应的标准格式&#xff0c;是实现浏览器与服务器通信的基础&#xff1b;而 TCP 协议则为 HTTP 提供了可靠的传输保障&#xff0c;确保数据完整有序传输。本文带你了解一下HTT…...

MATLAB实战:小波多分辨率分解与重构在信号处理中的应用

1. 小波分析为何成为信号处理的瑞士军刀 第一次接触小波分析是在处理一组地震波数据时遇到的难题。传统傅里叶变换在处理这类具有突发性震动的信号时&#xff0c;就像用一把钝刀雕刻精细花纹——虽然能看出大致轮廓&#xff0c;但完全丢失了关键的局部特征。而小波分析的出现&a…...

用Chord做短视频分析:自动生成视频描述,提升内容运营效率

用Chord做短视频分析&#xff1a;自动生成视频描述&#xff0c;提升内容运营效率 1. 短视频运营的痛点&#xff1a;内容爆炸与人力瓶颈 每天打开短视频平台的后台&#xff0c;运营同学是不是经常感到一阵窒息&#xff1f;几十条、上百条视频素材堆在那里&#xff0c;等着你一…...

用 Laravel AI SDK 构建多智能体工作流

Anthropic 之前发布过一篇广泛传播的文章《Building Effective Agents》&#xff0c;系统总结了构建生产级 AI 系统时最实用的几种模式。这些模式的共同点是&#xff1a;都已经在真实场景中被反复验证&#xff0c;实践性强&#xff0c;而且采用范围很广。对 Laravel 开发者来说…...

Matlab含新能源(风电光伏)和多类型电动汽车配电网风险评估 软件:matpower+Mat...

Matlab含新能源&#xff08;风电光伏&#xff09;和多类型电动汽车配电网风险评估 软件&#xff1a;matpowerMatlab&#xff1a; 关键词&#xff1a;蒙特卡洛、时序、电网风险、风险评估、风光不确定性 介绍&#xff1a;由于电动汽车负荷与风电光伏出力的不确定性&#xff0c;造…...

CANoe.Diva CDD文件配置避坑指南:DTC导入、会话迁移与NRC设置详解

CANoe.Diva CDD文件高阶配置实战&#xff1a;从DTC陷阱到NRC优化的深度解析 当诊断测试用例在CANoe.Diva环境中频繁失败时&#xff0c;往往不是基础配置出错&#xff0c;而是那些隐藏在CDD文件深处的"高级选项"在作祟。本文将带您穿透表面配置&#xff0c;直击五个最…...