当前位置: 首页 > news >正文

[ Spring ] Spring Cloud Alibaba Message Stream Binder for RocketMQ 2025

文章目录

          • Introduce
          • Project Structure
          • Declare Plugins and Modules
          • Apply Plugins and Add Dependencies
          • Sender Properties
          • Sender Application
          • Sender Controller
          • Receiver Properties
          • Receiver Application
          • Receiver Message Handler
          • Congratulations
          • Automatically Send Message By Interval
          • Type Adapter for Payload
          • Send Message Model as JSON
          • Receive JSON as Message Model

Introduce

spring-cloud-starter-stream have a great change since version 4.x

most annotations like @EnableBinding @Input @Output @StreamListener were all removed

this blog is about stream-rocketmq, but also fit for stream-kafaka

just migrate dependency from rocketmq to kafaka

Project Structure
  • stream-binder-sender : rocket message sender
  • stream-binder-receiver : rocket message receiver
Declare Plugins and Modules
pluginManagement {repositories {gradlePluginPortal()google()mavenCentral()}
}dependencyResolutionManagement {repositoriesMode = RepositoriesMode.PREFER_SETTINGSrepositories {gradlePluginPortal()google()mavenCentral()}
}buildscript {repositories {gradlePluginPortal()google()mavenCentral()}
}plugins {id("org.jetbrains.kotlin.jvm") version "2.0.21" apply falseid("org.jetbrains.kotlin.kapt") version "2.0.21" apply falseid("org.jetbrains.kotlin.plugin.spring") version "2.0.21" apply falseid("org.springframework.boot") version "3.4.1" apply false
}include("stream-binder-sender")
include("stream-binder-receiver")
Apply Plugins and Add Dependencies
plugins {id("org.jetbrains.kotlin.jvm")id("org.jetbrains.kotlin.kapt")id("org.jetbrains.kotlin.plugin.spring")id("org.springframework.boot")
}java {toolchain {languageVersion = JavaLanguageVersion.of(17)}
}dependencies {val springBootVersion = "3.4.1"val springCloudVersion = "4.2.0"val springCloudAlibabaVersion = "2023.0.3.2"// commonsapi("io.github.hellogoogle2000:kotlin-commons:1.0.19")// kotlinapi("org.jetbrains.kotlin:kotlin-reflect:2.0.21")// springapi("org.springframework.boot:spring-boot-starter:$springBootVersion")api("org.springframework.boot:spring-boot-starter-web:$springBootVersion")api("org.springframework.cloud:spring-cloud-starter-bootstrap:$springCloudVersion")// spring cloud stream binderapi("com.alibaba.cloud:spring-cloud-starter-stream-rocketmq:$springCloudAlibabaVersion")
}
Sender Properties

configTopicSender-out is the name for customized output binding object

# service
server.port=10003
spring.application.name=stream-binder-sender
# stream binder
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.destination=topic-config
spring.cloud.stream.bindings.configTopicProducer-out.content-type=text/plain
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
Sender Application
package x.spring.helloimport org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication@SpringBootApplication
class StreamBinderSenderApplicationfun main(args: Array<String>) {runApplication<StreamBinderSenderApplication>(*args)
}
Sender Controller

the binding name for sending should be same with output name in properties

package x.spring.hello.controllerimport org.springframework.beans.factory.annotation.Autowired
import org.springframework.cloud.stream.function.StreamBridge
import org.springframework.messaging.support.MessageBuilder
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RestController@RestController
class MessageSendController {@Autowiredprivate lateinit var bridge: StreamBridge@GetMapping("send")fun send(): String {val payload = "config"val message = MessageBuilder.withPayload(payload).build()bridge.send("configTopicProducer-out", message)return "send successfully"}
}
Receiver Properties

plainTextConsumer is the name of message handler function

remember it and you should implement it by yourself

you can define multiple message handler functions, and split with ,

plainTextConsumer-in-0 is the name of input binding object

its format is constrained to format of <definition>-in-<index>

# service
server.port=10004
spring.application.name=stream-binder-receiver
# stream binder
spring.cloud.function.definition=configTopicConsumer
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.destination=topic-config
spring.cloud.stream.bindings.configTopicConsumer-in-0.content-type=text/plain
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
Receiver Application
package x.spring.helloimport org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication@SpringBootApplication
class StreamBinderReceiverApplicationfun main(args: Array<String>) {runApplication<StreamBinderReceiverApplication>(*args)
}
Receiver Message Handler

function name correspond to properties specified by spring.cloud.function.definition property

package x.spring.hello.componentimport org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
import java.util.function.Consumer@Component
class MessageConsumerObject {@Bean("configTopicConsumer")fun configTopicConsumer(): Consumer<Message<String>> {return Consumer<Message<String>> { message ->val payload = message.payloadprintln("consumer receive config topic message: $payload")}}
}
Congratulations

now, you have get known about basic usage of message binder

do not modify demos above, it may cause a failure, and waste lots of time

try your own ways, let them run out first

let us try some advanced way, after achieve goals above

Automatically Send Message By Interval

register a supplier object to automatically generate heartbeat message

package x.spring.hello.componentimport org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.messaging.MessageHeaders
import org.springframework.messaging.support.MessageBuilder
import org.springframework.stereotype.Component
import org.springframework.util.MimeTypeUtils
import java.util.function.Supplier@Component
class MessageSupplierObject {@Beanfun heartPacketProducer(): Supplier<Message<String>> {return Supplier<Message<String>> {println("send heart packet message")val payload = "heart"val message = MessageBuilder.withPayload(payload).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN).build()return@Supplier message}}
}

update properties of sender project, add a output binding object named heartPacketProducer

# service
server.port=10003
spring.application.name=stream-binder-sender
# stream binder
spring.cloud.function.definition=heartPacketProducer
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.destination=topic-config
spring.cloud.stream.bindings.configTopicProducer-out.content-type=text/plain
spring.cloud.stream.bindings.configTopicProducer-out.consumer.concurrency=100
spring.cloud.stream.bindings.heartPacketProducer-out-0.binder=rocketmq
spring.cloud.stream.bindings.heartPacketProducer-out-0.destination=topic-heart
spring.cloud.stream.bindings.heartPacketProducer-out-0.content-type=text/plain
spring.cloud.stream.bindings.heartPacketProducer-out-0.consumer.concurrency=100
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876

update properties of receiver project, add a input binding object named heartPacketConsumer

# service
server.port=10004
spring.application.name=stream-binder-receiver
# stream binder
spring.cloud.function.definition=configTopicConsumer;heartPacketConsumer
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.destination=topic-config
spring.cloud.stream.bindings.configTopicConsumer-in-0.content-type=text/plain
spring.cloud.stream.bindings.heartPacketConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.heartPacketConsumer-in-0.destination=topic-heart
spring.cloud.stream.bindings.heartPacketConsumer-in-0.content-type=text/plain
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876

register message handler function for receiver project

package x.spring.hello.componentimport org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
import java.util.function.Consumer@Component
class MessageConsumerObject {@Bean("heartPacketConsumer")fun heartPacketConsumer(): Consumer<Message<String>> {return Consumer<Message<String>> { message ->val payload = message.payloadprintln("consumer receive heart packet message: $payload")}}@Bean("configTopicConsumer")fun configTopicConsumer(): Consumer<Message<String>> {return Consumer<Message<String>> { message ->val payload = message.payloadprintln("consumer receive config topic message: $payload")}}
}
Type Adapter for Payload

this enable your auto send and receive advanced object like class/json/xml

put this adapter file into both sender project and receiver object

package x.spring.hello.componentimport org.springframework.context.annotation.Bean
import org.springframework.stereotype.Component
import x.kotlin.commons.serialize.JSON.fromJson
import x.kotlin.commons.serialize.JSON.toJson
import x.spring.hello.model.ConfigModel
import java.util.function.Function@Component
class MessageModelAdapter {@Beanfun configModelConvertor1(): Function<ConfigModel, String> {return Function { it.toJson() }}@Beanfun configModelConvertor2(): Function<String, ConfigModel> {return Function { it.fromJson(ConfigModel::class.java) }}
}
Send Message Model as JSON
@GetMapping("send2")
fun send2(): String {val payload = ConfigModel()payload.username = "admin"payload.password = "123456"val message = MessageBuilder.withPayload(payload).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build()bridge.send("configModelProducer-out", message)return "send successfully"
}
spring.cloud.stream.bindings.configModelProducer-out.binder=rocketmq
spring.cloud.stream.bindings.configModelProducer-out.destination=topic-config-model
spring.cloud.stream.bindings.configModelProducer-out.content-type=application/json
spring.cloud.stream.bindings.configModelProducer-out.consumer.concurrency=100
Receive JSON as Message Model
@Bean
fun configModelConsumer(): Consumer<Message<ConfigModel>> {return Consumer<Message<ConfigModel>> { message ->val payload = message.payload.toJson()println("consumer receive config model message: $payload")}
}
spring.cloud.function.definition=configModelConsumer
spring.cloud.stream.bindings.configModelConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.configModelConsumer-in-0.destination=topic-config-model
spring.cloud.stream.bindings.configModelConsumer-in-0.content-type=application/json

相关文章:

[ Spring ] Spring Cloud Alibaba Message Stream Binder for RocketMQ 2025

文章目录 IntroduceProject StructureDeclare Plugins and ModulesApply Plugins and Add DependenciesSender PropertiesSender ApplicationSender ControllerReceiver PropertiesReceiver ApplicationReceiver Message HandlerCongratulationsAutomatically Send Message By …...

ubuntu 更新24LTS中断导致“系统出错且无法恢复,请联系系统管理员”

22LTS to 24LTS 更新过程中手jian把更新程序controlC导致的。 解决 目前企图完成更新来恢复&#xff0c;重启后有软件包冲突&#xff0c;sudo apt upgrade报冲突。无法进行。 将原来source.list重新 sudo dpkg --configure -a sudo apt install -f 这些都不管用。还是显示gno…...

力扣-链表-203 移除链表元素

思路1 处理头节点&#xff0c;然后遍历下一个节点&#xff0c;只有确保下一个节点不是要移除的节点时再跳到下一个节点 代码1 class Solution { public:ListNode* removeElements(ListNode* head, int val) {while(head ! nullptr && head->val val){head head…...

Unity中关于实现 管道水流+瀑布流动+大肠蠕动效果笔记

Unity中关于实现 管道水流瀑布流动大肠蠕动效果笔记 效果展示&#xff1a; 参考资料及链接&#xff1a; 1、如何在 Unity 中创建水效果 - 水弯曲教程 https://www.youtube.com/watch?v3CcWus6d_B8 关于补充个人技能中&#xff1a;顶点噪波影响网格着色器配合粒子实现水特效 …...

宏_wps_宏修改word中所有excel表格的格式_设置字体对齐格式_删除空行等

需求&#xff1a; 将word中所有excel表格的格式进行统一化&#xff0c;修改其中的数字类型为“宋体&#xff0c; 五号&#xff0c;右对齐&#xff0c; 不加粗&#xff0c;不倾斜”&#xff0c;其中的中文为“宋体&#xff0c; 五号&#xff0c; 不加粗&#xff0c;不倾斜” 数…...

Linux——网络(udp)

文章目录 目录 文章目录 前言 一、upd函数及接口介绍 1. 创建套接字 - socket 函数 2. 绑定地址和端口 - bind 函数 3. 发送数据 - sendto 函数 4. 接收数据 - recvfrom 函数 5. 关闭套接字 - close 函数 二、代码示例 1.服务端 2.客户端 总结 前言 Linux——网络基础&#xf…...

Oracle-Java JDBC 连接超时之后的认知纠正

背景 偶然读到熊老师的文章《老熊的三分地-JDBC中语句超时与事务》了解到&#xff1a;JAVA代码的最后正常断开数据库连接&#xff0c;在默认情况下&#xff0c;正常断开的数据库连接会自动提交没有提交的事务。   通过文章的测试JAVA程序&#xff0c;可以表明&#xff0c;JDB…...

自定义数据集使用框架的线性回归方法对其进行拟合

代码 import torch import numpy as np import torch.nn as nncriterion nn.MSELoss()data np.array([[-0.5, 7.7],[1.8, 98.5],[0.9, 57.8],[0.4, 39.2],[-1.4, -15.7],[-1.4, -37.3],[-1.8, -49.1],[1.5, 75.6],[0.4, 34.0],[0.8, 62.3]])x_data data[:, 0] y_data data…...

15天基础内容-5

day13 【String类、StringBuilder类】 主要内容 String类常用方法【重点】 String类案例【重点】 StringBuilder类【重点】 StringBuilder类常用方法【重点&#xff1a; append】 StringBuilder类案例【理解】 第一章String类 1.1 String类的判断方法 String类实现判断功能…...

82,【6】BUUCTF WEB .[CISCN2019 华东南赛区]Double Secret

进入靶场 提到了secret&#xff0c;那就访问 既然这样&#xff0c;那就传参看能不能报错 这个页面证明是有用的 传参长一点就会报错&#xff0c;传什么内容无所谓 所以网站是flask框架写的 有一个颜色深一点&#xff0c;点开看看 rc4加密url编码 import base64 from urllib…...

Android WebView 中网页被劫持的原因及解决方案

文章目录 一、原因分析二、解决方案一览三、解决方案代码案例3.1 使用 HTTPS3.2 验证 URL3.3 禁用 JavaScript3.4 使用安全的 WebView 设置3.5 监控网络请求3.6 使用安全的 DNS 四、案例深入分析4.1 问题4.2 分析 五、结论 在 Android 应用开发中&#xff0c;WebView 是一个常用…...

特朗普政府将开展新网络攻击

近日&#xff0c;特朗普政府已表态&#xff1a;减少物理战争&#xff0c;网络战将代替&#xff0c;以实现美国的全球优势。 特朗普也指示美国网络司令部可以在没有总统批准的情况下开展更广泛行动&#xff0c;尤其是应对一些突发事件&#xff0c;这其实成为了后续美国通过网络…...

快递代取项目Uniapp+若依后端管理

快递接单代取得uniappspringboot项目 实际效果图...

arcgis短整型变为长整型的处理方式

1.用QGIS的重构字段工具进行修改&#xff0c;亲测比arcgis的更改字段工具有用 2.更换低版本的arcgis10.2.2&#xff0c;亲测10.5和10.6都有这个毛病&#xff0c;虽然官方文档里面说的是10.6.1及以上 Arcgis10.2.2百度链接&#xff1a;https://pan.baidu.com/s/1HYTwgnBJsBug…...

06、Redis相关概念:缓存击穿、雪崩、穿透、预热、降级、一致性等

Redis相关概念&#xff1a;缓存击穿、雪崩、穿透、预热、降级、一致性等 Redis缓存雪崩、缓存击穿、缓存预热热点key、缓存降级、短链接、分布式锁秒杀、预减库存、 堆外缓存Redis架构设计、Redis动态刷新、Redis和DB双写一致性、过期删除策略、集群数据倾斜等一、缓存雪崩 缓…...

嵌入式基础 -- PCIe 控制器中断管理之MSI与MSI-X简介

PCIe 控制器中断管理技术文档 1. 背景 在现代计算机系统中&#xff0c;中断是设备与 CPU 通信的重要机制&#xff0c;PCIe 控制器提供了从传统线中断到基于消息的中断&#xff08;MSI/MSI-X&#xff09;的演进&#xff0c;以提升中断处理效率和可扩展性。x86 和 ARM 架构虽然…...

websocket实现

由于安卓资源管理器展示的路径不尽相同,各种软件保存文件的位置也不一定一样.对于普通用户上传文件时,查找文件可能是一个麻烦的事情.后来想到了一个办法,使用pc端进行辅助上传. 文章目录 实现思路1.0 实现定义web与客户端通信数据类型和数据格式web端websocket实现web端对客户…...

unity学习20:time相关基础 Time.time 和 Time.deltaTime

目录 1 unity里的几种基本时间 1.1 time 相关测试脚本 1.2 游戏开始到现在所用的时间 Time.time 1.3 时间缩放值 Time.timeScale 1.4 固定时间间隔 Time.fixedDeltaTime 1.5 两次响应时间之间的间隔&#xff1a;Time.deltaTime 1.6 对应测试代码 1.7 需要关注的2个基本…...

【C++】特殊类设计、单例模式与类型转换

目录 一、设计一个类不能被拷贝 &#xff08;一&#xff09;C98 &#xff08;二&#xff09;C11 二、设计一个类只能在堆上创建对象 &#xff08;一&#xff09;将构造函数私有化&#xff0c;对外提供接口 &#xff08;二&#xff09;将析构函数私有化 三、设计一个类只…...

scratch七彩六边形 2024年12月scratch三级真题 中国电子学会 图形化编程 scratch三级真题和答案解析

目录 scratch七彩六边形 一、题目要求 1、准备工作 2、功能实现 二、案例分析 1、角色分析 2、背景分析 3、前期准备 三、解题思路 1、思路分析 2、详细过程 四、程序编写 五、考点分析 六、推荐资料 1、入门基础 2、蓝桥杯比赛 3、考级资料 4、视频课程 5、…...

k8s从入门到放弃之Ingress七层负载

k8s从入门到放弃之Ingress七层负载 在Kubernetes&#xff08;简称K8s&#xff09;中&#xff0c;Ingress是一个API对象&#xff0c;它允许你定义如何从集群外部访问集群内部的服务。Ingress可以提供负载均衡、SSL终结和基于名称的虚拟主机等功能。通过Ingress&#xff0c;你可…...

条件运算符

C中的三目运算符&#xff08;也称条件运算符&#xff0c;英文&#xff1a;ternary operator&#xff09;是一种简洁的条件选择语句&#xff0c;语法如下&#xff1a; 条件表达式 ? 表达式1 : 表达式2• 如果“条件表达式”为true&#xff0c;则整个表达式的结果为“表达式1”…...

Springcloud:Eureka 高可用集群搭建实战(服务注册与发现的底层原理与避坑指南)

引言&#xff1a;为什么 Eureka 依然是存量系统的核心&#xff1f; 尽管 Nacos 等新注册中心崛起&#xff0c;但金融、电力等保守行业仍有大量系统运行在 Eureka 上。理解其高可用设计与自我保护机制&#xff0c;是保障分布式系统稳定的必修课。本文将手把手带你搭建生产级 Eur…...

大语言模型(LLM)中的KV缓存压缩与动态稀疏注意力机制设计

随着大语言模型&#xff08;LLM&#xff09;参数规模的增长&#xff0c;推理阶段的内存占用和计算复杂度成为核心挑战。传统注意力机制的计算复杂度随序列长度呈二次方增长&#xff0c;而KV缓存的内存消耗可能高达数十GB&#xff08;例如Llama2-7B处理100K token时需50GB内存&a…...

Android第十三次面试总结(四大 组件基础)

Activity生命周期和四大启动模式详解 一、Activity 生命周期 Activity 的生命周期由一系列回调方法组成&#xff0c;用于管理其创建、可见性、焦点和销毁过程。以下是核心方法及其调用时机&#xff1a; ​onCreate()​​ ​调用时机​&#xff1a;Activity 首次创建时调用。​…...

Ubuntu系统复制(U盘-电脑硬盘)

所需环境 电脑自带硬盘&#xff1a;1块 (1T) U盘1&#xff1a;Ubuntu系统引导盘&#xff08;用于“U盘2”复制到“电脑自带硬盘”&#xff09; U盘2&#xff1a;Ubuntu系统盘&#xff08;1T&#xff0c;用于被复制&#xff09; &#xff01;&#xff01;&#xff01;建议“电脑…...

渗透实战PortSwigger靶场:lab13存储型DOM XSS详解

进来是需要留言的&#xff0c;先用做简单的 html 标签测试 发现面的</h1>不见了 数据包中找到了一个loadCommentsWithVulnerableEscapeHtml.js 他是把用户输入的<>进行 html 编码&#xff0c;输入的<>当成字符串处理回显到页面中&#xff0c;看来只是把用户输…...

React核心概念:State是什么?如何用useState管理组件自己的数据?

系列回顾&#xff1a; 在上一篇《React入门第一步》中&#xff0c;我们已经成功创建并运行了第一个React项目。我们学会了用Vite初始化项目&#xff0c;并修改了App.jsx组件&#xff0c;让页面显示出我们想要的文字。但是&#xff0c;那个页面是“死”的&#xff0c;它只是静态…...

EasyRTC音视频实时通话功能在WebRTC与智能硬件整合中的应用与优势

一、WebRTC与智能硬件整合趋势​ 随着物联网和实时通信需求的爆发式增长&#xff0c;WebRTC作为开源实时通信技术&#xff0c;为浏览器与移动应用提供免插件的音视频通信能力&#xff0c;在智能硬件领域的融合应用已成必然趋势。智能硬件不再局限于单一功能&#xff0c;对实时…...

C#最佳实践:为何优先使用as或is而非强制转换

C#最佳实践&#xff1a;为何优先使用as或is而非强制转换 在 C# 的编程世界里&#xff0c;类型转换是我们经常会遇到的操作。就像在现实生活中&#xff0c;我们可能需要把不同形状的物品重新整理归类一样&#xff0c;在代码里&#xff0c;我们也常常需要将一个数据类型转换为另…...