当前位置: 首页 > news >正文

[ Spring ] Spring Cloud Alibaba Message Stream Binder for RocketMQ 2025

文章目录

          • Introduce
          • Project Structure
          • Declare Plugins and Modules
          • Apply Plugins and Add Dependencies
          • Sender Properties
          • Sender Application
          • Sender Controller
          • Receiver Properties
          • Receiver Application
          • Receiver Message Handler
          • Congratulations
          • Automatically Send Message By Interval
          • Type Adapter for Payload
          • Send Message Model as JSON
          • Receive JSON as Message Model

Introduce

spring-cloud-starter-stream have a great change since version 4.x

most annotations like @EnableBinding @Input @Output @StreamListener were all removed

this blog is about stream-rocketmq, but also fit for stream-kafaka

just migrate dependency from rocketmq to kafaka

Project Structure
  • stream-binder-sender : rocket message sender
  • stream-binder-receiver : rocket message receiver
Declare Plugins and Modules
pluginManagement {repositories {gradlePluginPortal()google()mavenCentral()}
}dependencyResolutionManagement {repositoriesMode = RepositoriesMode.PREFER_SETTINGSrepositories {gradlePluginPortal()google()mavenCentral()}
}buildscript {repositories {gradlePluginPortal()google()mavenCentral()}
}plugins {id("org.jetbrains.kotlin.jvm") version "2.0.21" apply falseid("org.jetbrains.kotlin.kapt") version "2.0.21" apply falseid("org.jetbrains.kotlin.plugin.spring") version "2.0.21" apply falseid("org.springframework.boot") version "3.4.1" apply false
}include("stream-binder-sender")
include("stream-binder-receiver")
Apply Plugins and Add Dependencies
plugins {id("org.jetbrains.kotlin.jvm")id("org.jetbrains.kotlin.kapt")id("org.jetbrains.kotlin.plugin.spring")id("org.springframework.boot")
}java {toolchain {languageVersion = JavaLanguageVersion.of(17)}
}dependencies {val springBootVersion = "3.4.1"val springCloudVersion = "4.2.0"val springCloudAlibabaVersion = "2023.0.3.2"// commonsapi("io.github.hellogoogle2000:kotlin-commons:1.0.19")// kotlinapi("org.jetbrains.kotlin:kotlin-reflect:2.0.21")// springapi("org.springframework.boot:spring-boot-starter:$springBootVersion")api("org.springframework.boot:spring-boot-starter-web:$springBootVersion")api("org.springframework.cloud:spring-cloud-starter-bootstrap:$springCloudVersion")// spring cloud stream binderapi("com.alibaba.cloud:spring-cloud-starter-stream-rocketmq:$springCloudAlibabaVersion")
}
Sender Properties

configTopicSender-out is the name for customized output binding object

# service
server.port=10003
spring.application.name=stream-binder-sender
# stream binder
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.destination=topic-config
spring.cloud.stream.bindings.configTopicProducer-out.content-type=text/plain
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
Sender Application
package x.spring.helloimport org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication@SpringBootApplication
class StreamBinderSenderApplicationfun main(args: Array<String>) {runApplication<StreamBinderSenderApplication>(*args)
}
Sender Controller

the binding name for sending should be same with output name in properties

package x.spring.hello.controllerimport org.springframework.beans.factory.annotation.Autowired
import org.springframework.cloud.stream.function.StreamBridge
import org.springframework.messaging.support.MessageBuilder
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RestController@RestController
class MessageSendController {@Autowiredprivate lateinit var bridge: StreamBridge@GetMapping("send")fun send(): String {val payload = "config"val message = MessageBuilder.withPayload(payload).build()bridge.send("configTopicProducer-out", message)return "send successfully"}
}
Receiver Properties

plainTextConsumer is the name of message handler function

remember it and you should implement it by yourself

you can define multiple message handler functions, and split with ,

plainTextConsumer-in-0 is the name of input binding object

its format is constrained to format of <definition>-in-<index>

# service
server.port=10004
spring.application.name=stream-binder-receiver
# stream binder
spring.cloud.function.definition=configTopicConsumer
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.destination=topic-config
spring.cloud.stream.bindings.configTopicConsumer-in-0.content-type=text/plain
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
Receiver Application
package x.spring.helloimport org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication@SpringBootApplication
class StreamBinderReceiverApplicationfun main(args: Array<String>) {runApplication<StreamBinderReceiverApplication>(*args)
}
Receiver Message Handler

function name correspond to properties specified by spring.cloud.function.definition property

package x.spring.hello.componentimport org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
import java.util.function.Consumer@Component
class MessageConsumerObject {@Bean("configTopicConsumer")fun configTopicConsumer(): Consumer<Message<String>> {return Consumer<Message<String>> { message ->val payload = message.payloadprintln("consumer receive config topic message: $payload")}}
}
Congratulations

now, you have get known about basic usage of message binder

do not modify demos above, it may cause a failure, and waste lots of time

try your own ways, let them run out first

let us try some advanced way, after achieve goals above

Automatically Send Message By Interval

register a supplier object to automatically generate heartbeat message

package x.spring.hello.componentimport org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.messaging.MessageHeaders
import org.springframework.messaging.support.MessageBuilder
import org.springframework.stereotype.Component
import org.springframework.util.MimeTypeUtils
import java.util.function.Supplier@Component
class MessageSupplierObject {@Beanfun heartPacketProducer(): Supplier<Message<String>> {return Supplier<Message<String>> {println("send heart packet message")val payload = "heart"val message = MessageBuilder.withPayload(payload).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN).build()return@Supplier message}}
}

update properties of sender project, add a output binding object named heartPacketProducer

# service
server.port=10003
spring.application.name=stream-binder-sender
# stream binder
spring.cloud.function.definition=heartPacketProducer
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.destination=topic-config
spring.cloud.stream.bindings.configTopicProducer-out.content-type=text/plain
spring.cloud.stream.bindings.configTopicProducer-out.consumer.concurrency=100
spring.cloud.stream.bindings.heartPacketProducer-out-0.binder=rocketmq
spring.cloud.stream.bindings.heartPacketProducer-out-0.destination=topic-heart
spring.cloud.stream.bindings.heartPacketProducer-out-0.content-type=text/plain
spring.cloud.stream.bindings.heartPacketProducer-out-0.consumer.concurrency=100
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876

update properties of receiver project, add a input binding object named heartPacketConsumer

# service
server.port=10004
spring.application.name=stream-binder-receiver
# stream binder
spring.cloud.function.definition=configTopicConsumer;heartPacketConsumer
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.destination=topic-config
spring.cloud.stream.bindings.configTopicConsumer-in-0.content-type=text/plain
spring.cloud.stream.bindings.heartPacketConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.heartPacketConsumer-in-0.destination=topic-heart
spring.cloud.stream.bindings.heartPacketConsumer-in-0.content-type=text/plain
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876

register message handler function for receiver project

package x.spring.hello.componentimport org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
import java.util.function.Consumer@Component
class MessageConsumerObject {@Bean("heartPacketConsumer")fun heartPacketConsumer(): Consumer<Message<String>> {return Consumer<Message<String>> { message ->val payload = message.payloadprintln("consumer receive heart packet message: $payload")}}@Bean("configTopicConsumer")fun configTopicConsumer(): Consumer<Message<String>> {return Consumer<Message<String>> { message ->val payload = message.payloadprintln("consumer receive config topic message: $payload")}}
}
Type Adapter for Payload

this enable your auto send and receive advanced object like class/json/xml

put this adapter file into both sender project and receiver object

package x.spring.hello.componentimport org.springframework.context.annotation.Bean
import org.springframework.stereotype.Component
import x.kotlin.commons.serialize.JSON.fromJson
import x.kotlin.commons.serialize.JSON.toJson
import x.spring.hello.model.ConfigModel
import java.util.function.Function@Component
class MessageModelAdapter {@Beanfun configModelConvertor1(): Function<ConfigModel, String> {return Function { it.toJson() }}@Beanfun configModelConvertor2(): Function<String, ConfigModel> {return Function { it.fromJson(ConfigModel::class.java) }}
}
Send Message Model as JSON
@GetMapping("send2")
fun send2(): String {val payload = ConfigModel()payload.username = "admin"payload.password = "123456"val message = MessageBuilder.withPayload(payload).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build()bridge.send("configModelProducer-out", message)return "send successfully"
}
spring.cloud.stream.bindings.configModelProducer-out.binder=rocketmq
spring.cloud.stream.bindings.configModelProducer-out.destination=topic-config-model
spring.cloud.stream.bindings.configModelProducer-out.content-type=application/json
spring.cloud.stream.bindings.configModelProducer-out.consumer.concurrency=100
Receive JSON as Message Model
@Bean
fun configModelConsumer(): Consumer<Message<ConfigModel>> {return Consumer<Message<ConfigModel>> { message ->val payload = message.payload.toJson()println("consumer receive config model message: $payload")}
}
spring.cloud.function.definition=configModelConsumer
spring.cloud.stream.bindings.configModelConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.configModelConsumer-in-0.destination=topic-config-model
spring.cloud.stream.bindings.configModelConsumer-in-0.content-type=application/json

相关文章:

[ Spring ] Spring Cloud Alibaba Message Stream Binder for RocketMQ 2025

文章目录 IntroduceProject StructureDeclare Plugins and ModulesApply Plugins and Add DependenciesSender PropertiesSender ApplicationSender ControllerReceiver PropertiesReceiver ApplicationReceiver Message HandlerCongratulationsAutomatically Send Message By …...

ubuntu 更新24LTS中断导致“系统出错且无法恢复,请联系系统管理员”

22LTS to 24LTS 更新过程中手jian把更新程序controlC导致的。 解决 目前企图完成更新来恢复&#xff0c;重启后有软件包冲突&#xff0c;sudo apt upgrade报冲突。无法进行。 将原来source.list重新 sudo dpkg --configure -a sudo apt install -f 这些都不管用。还是显示gno…...

力扣-链表-203 移除链表元素

思路1 处理头节点&#xff0c;然后遍历下一个节点&#xff0c;只有确保下一个节点不是要移除的节点时再跳到下一个节点 代码1 class Solution { public:ListNode* removeElements(ListNode* head, int val) {while(head ! nullptr && head->val val){head head…...

Unity中关于实现 管道水流+瀑布流动+大肠蠕动效果笔记

Unity中关于实现 管道水流瀑布流动大肠蠕动效果笔记 效果展示&#xff1a; 参考资料及链接&#xff1a; 1、如何在 Unity 中创建水效果 - 水弯曲教程 https://www.youtube.com/watch?v3CcWus6d_B8 关于补充个人技能中&#xff1a;顶点噪波影响网格着色器配合粒子实现水特效 …...

宏_wps_宏修改word中所有excel表格的格式_设置字体对齐格式_删除空行等

需求&#xff1a; 将word中所有excel表格的格式进行统一化&#xff0c;修改其中的数字类型为“宋体&#xff0c; 五号&#xff0c;右对齐&#xff0c; 不加粗&#xff0c;不倾斜”&#xff0c;其中的中文为“宋体&#xff0c; 五号&#xff0c; 不加粗&#xff0c;不倾斜” 数…...

Linux——网络(udp)

文章目录 目录 文章目录 前言 一、upd函数及接口介绍 1. 创建套接字 - socket 函数 2. 绑定地址和端口 - bind 函数 3. 发送数据 - sendto 函数 4. 接收数据 - recvfrom 函数 5. 关闭套接字 - close 函数 二、代码示例 1.服务端 2.客户端 总结 前言 Linux——网络基础&#xf…...

Oracle-Java JDBC 连接超时之后的认知纠正

背景 偶然读到熊老师的文章《老熊的三分地-JDBC中语句超时与事务》了解到&#xff1a;JAVA代码的最后正常断开数据库连接&#xff0c;在默认情况下&#xff0c;正常断开的数据库连接会自动提交没有提交的事务。   通过文章的测试JAVA程序&#xff0c;可以表明&#xff0c;JDB…...

自定义数据集使用框架的线性回归方法对其进行拟合

代码 import torch import numpy as np import torch.nn as nncriterion nn.MSELoss()data np.array([[-0.5, 7.7],[1.8, 98.5],[0.9, 57.8],[0.4, 39.2],[-1.4, -15.7],[-1.4, -37.3],[-1.8, -49.1],[1.5, 75.6],[0.4, 34.0],[0.8, 62.3]])x_data data[:, 0] y_data data…...

15天基础内容-5

day13 【String类、StringBuilder类】 主要内容 String类常用方法【重点】 String类案例【重点】 StringBuilder类【重点】 StringBuilder类常用方法【重点&#xff1a; append】 StringBuilder类案例【理解】 第一章String类 1.1 String类的判断方法 String类实现判断功能…...

82,【6】BUUCTF WEB .[CISCN2019 华东南赛区]Double Secret

进入靶场 提到了secret&#xff0c;那就访问 既然这样&#xff0c;那就传参看能不能报错 这个页面证明是有用的 传参长一点就会报错&#xff0c;传什么内容无所谓 所以网站是flask框架写的 有一个颜色深一点&#xff0c;点开看看 rc4加密url编码 import base64 from urllib…...

Android WebView 中网页被劫持的原因及解决方案

文章目录 一、原因分析二、解决方案一览三、解决方案代码案例3.1 使用 HTTPS3.2 验证 URL3.3 禁用 JavaScript3.4 使用安全的 WebView 设置3.5 监控网络请求3.6 使用安全的 DNS 四、案例深入分析4.1 问题4.2 分析 五、结论 在 Android 应用开发中&#xff0c;WebView 是一个常用…...

特朗普政府将开展新网络攻击

近日&#xff0c;特朗普政府已表态&#xff1a;减少物理战争&#xff0c;网络战将代替&#xff0c;以实现美国的全球优势。 特朗普也指示美国网络司令部可以在没有总统批准的情况下开展更广泛行动&#xff0c;尤其是应对一些突发事件&#xff0c;这其实成为了后续美国通过网络…...

快递代取项目Uniapp+若依后端管理

快递接单代取得uniappspringboot项目 实际效果图...

arcgis短整型变为长整型的处理方式

1.用QGIS的重构字段工具进行修改&#xff0c;亲测比arcgis的更改字段工具有用 2.更换低版本的arcgis10.2.2&#xff0c;亲测10.5和10.6都有这个毛病&#xff0c;虽然官方文档里面说的是10.6.1及以上 Arcgis10.2.2百度链接&#xff1a;https://pan.baidu.com/s/1HYTwgnBJsBug…...

06、Redis相关概念:缓存击穿、雪崩、穿透、预热、降级、一致性等

Redis相关概念&#xff1a;缓存击穿、雪崩、穿透、预热、降级、一致性等 Redis缓存雪崩、缓存击穿、缓存预热热点key、缓存降级、短链接、分布式锁秒杀、预减库存、 堆外缓存Redis架构设计、Redis动态刷新、Redis和DB双写一致性、过期删除策略、集群数据倾斜等一、缓存雪崩 缓…...

嵌入式基础 -- PCIe 控制器中断管理之MSI与MSI-X简介

PCIe 控制器中断管理技术文档 1. 背景 在现代计算机系统中&#xff0c;中断是设备与 CPU 通信的重要机制&#xff0c;PCIe 控制器提供了从传统线中断到基于消息的中断&#xff08;MSI/MSI-X&#xff09;的演进&#xff0c;以提升中断处理效率和可扩展性。x86 和 ARM 架构虽然…...

websocket实现

由于安卓资源管理器展示的路径不尽相同,各种软件保存文件的位置也不一定一样.对于普通用户上传文件时,查找文件可能是一个麻烦的事情.后来想到了一个办法,使用pc端进行辅助上传. 文章目录 实现思路1.0 实现定义web与客户端通信数据类型和数据格式web端websocket实现web端对客户…...

unity学习20:time相关基础 Time.time 和 Time.deltaTime

目录 1 unity里的几种基本时间 1.1 time 相关测试脚本 1.2 游戏开始到现在所用的时间 Time.time 1.3 时间缩放值 Time.timeScale 1.4 固定时间间隔 Time.fixedDeltaTime 1.5 两次响应时间之间的间隔&#xff1a;Time.deltaTime 1.6 对应测试代码 1.7 需要关注的2个基本…...

【C++】特殊类设计、单例模式与类型转换

目录 一、设计一个类不能被拷贝 &#xff08;一&#xff09;C98 &#xff08;二&#xff09;C11 二、设计一个类只能在堆上创建对象 &#xff08;一&#xff09;将构造函数私有化&#xff0c;对外提供接口 &#xff08;二&#xff09;将析构函数私有化 三、设计一个类只…...

scratch七彩六边形 2024年12月scratch三级真题 中国电子学会 图形化编程 scratch三级真题和答案解析

目录 scratch七彩六边形 一、题目要求 1、准备工作 2、功能实现 二、案例分析 1、角色分析 2、背景分析 3、前期准备 三、解题思路 1、思路分析 2、详细过程 四、程序编写 五、考点分析 六、推荐资料 1、入门基础 2、蓝桥杯比赛 3、考级资料 4、视频课程 5、…...

代码随想录刷题day16|(哈希表篇)349.两个数组的交集

目录 一、哈希表理论基础 二、集合set在哈希法中的应用 三、相关算法题目 四、相关知识点 1.set集合特点和常用方法 1.1 set集合概述 1.2 set集合特点 1.3 常用方法 2.set集合转换成数组 法1&#xff1a;另新建一个数组 法2&#xff1a;将结果集合转为数组 ▲ 3.数组…...

Synology 群辉NAS安装(6)安装mssql

Synology 群辉NAS安装&#xff08;6&#xff09;安装mssql 写在前面mssql 2019:成功安装说明&#xff0c;这个最终成功了 mssql 2022没有成功1. pull image2.启动mssql docker container 远程连接 写在前面 mssq是一个重要节点。 这是因为我对mysql没有一丝好感。虽然接触了许…...

2025年美赛B题-结合Logistic阻滞增长模型和SIR传染病模型研究旅游可持续性-成品论文

模型设计思路与创新点&#xff1a; 建模的时候应该先确定我们需要建立什么类的模型&#xff1f;优化类还是统计类&#xff1f;这个题需要大量的数据分析&#xff0c;因此我们可以建立一个统计学模型。 统计学建模思路&#xff1a;观察规律&#xff0c;建立模型&#xff0c;参…...

Hook 函数

什么是hook函数&#xff1f; 在计算机编程中&#xff0c;hook函数是指在特定的事件发生时被调用的函数&#xff0c;用于在事件发生前或后进行一些特定的操作。通常&#xff0c;hook函数作为回调函数被注册到事件处理器中&#xff0c;当事件发生时&#xff0c;事件处理器会自动…...

蓝桥杯模拟算法:蛇形方阵

P5731 【深基5.习6】蛇形方阵 - 洛谷 | 计算机科学教育新生态 我们只要定义两个方向向量数组&#xff0c;这种问题就可以迎刃而解了 比如我们是4的话&#xff0c;我们从左向右开始存&#xff0c;1&#xff0c;2&#xff0c;3&#xff0c;4 到5的时候y就大于4了就是越界了&…...

DeepSeek-R1解读:纯强化学习,模型推理能力提升的新范式?

DeepSeek-R1解读&#xff1a;纯强化学习&#xff0c;模型推理能力提升的新范式&#xff1f; 1. Impressive Points2. 纯强化学习&#xff0c;LLM推理能力提升新范式&#xff1f;2.1 DeepSeek-R1-Zero2.2 DeepSeek-R1 3. 端侧模型能力提升&#xff1a;蒸馏>强化学习 1. Impre…...

深度解析:基于Vue 3的教育管理系统架构设计与优化实践

一、项目架构分析 1. 技术栈全景 项目采用 Vue 3 TypeScript Tailwind CSS 技术组合&#xff0c;体现了现代前端开发的三大趋势&#xff1a; 响应式编程&#xff1a;通过Vue 3的Composition API实现细粒度响应 类型安全&#xff1a;约60%的组件采用TypeScript编写 原子化…...

【PyTorch】3.张量类型转换

个人主页&#xff1a;Icomi 在深度学习蓬勃发展的当下&#xff0c;PyTorch 是不可或缺的工具。它作为强大的深度学习框架&#xff0c;为构建和训练神经网络提供了高效且灵活的平台。神经网络作为人工智能的核心技术&#xff0c;能够处理复杂的数据模式。通过 PyTorch&#xff0…...

Spring Boot整合JavaMail实现邮件发送

一. 发送邮件原理 发件人【设置授权码】 - SMTP协议【Simple Mail TransferProtocol - 是一种提供可靠且有效的电子邮件传输的协议】 - 收件人 二. 获取授权码 开通POP3/SMTP&#xff0c;获取授权码 授权码是QQ邮箱推出的&#xff0c;用于登录第三方客户端的专用密码。适用…...

字节跳动发布UI-TARS,超越GPT-4o和Claude,能接管电脑完成复杂任务

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…...