【实战】Spring Cloud Stream 3.1+整合Kafka
文章目录
- 前言
- 新版版本优势
- 实战演示
- 增加maven依赖
- 增加applicaiton.yaml配置
- 新增Kafka通道消费者
- 新增发送消息的接口
- 实战测试
- postman发送一个正常的消息
- postman发送异常消息
前言
之前我们已经整合过Spring Cloud Stream 3.0版本与Kafka、RabbitMQ中间件,简直不要太好,直接让我们不用再关心底层MQ如何集与消息收发。但是从Spring Cloud 2020版本开始,Spring Cloud Stream的版本升级至3.1.0以上版本,自此版本开始@StreamListener上面就增加@Deprecated注解,不赞成使用,有可能接下来的版本会删除掉。传说是有利于使用Project Reactor提供的事件流抽象(如Flux和Mono),命令函数在每个单独的事件上触发,而reactive函数只触发一次。故今天我们分享一期Spring Cloud Stream 3.1+整合Kafka,各位看官敬请鉴赏。

新版版本优势
新版提倡用函数式进行发送和消费信息
定义返回类型为Supplier, Function or Consumer的bean提供消息发送和消费的bean 看看绑定名称命名规则
input - + -in- +
output - + -out- +
在配置文件中指定spring.cloud.function.definition/spring.cloud.stream.function.definition的名称后会把这个bean绑定到对应的消费者和提供者上。
比如 inputChannel bean绑定了inputChannel-in-0通道,outputChannel bean绑定了outputChannel-out-0通道:
spring:kafka:bootstrap-servers: 192.168.112.10:9092,192.168.112.130:9092,192.168.112.129:9092cloud:stream:kafka:binder:brokers: ${spring.kafka.bootstrap-servers}binders:kafkahub:type: kafkaenvironment:spring:cloud:stream:kafka: ${spring.cloud.stream.kafka.binder}default-binder: kafkahub function:definition: inputChannel,outputChannelbindings:inputChannel-in-0:binder: kafkahubdestination: test-kafka-topicgroup: test-kafka-groupcontent-type: text/plainoutputChannel-out-0:binder: kafkahubdestination: test-kafka-topiccontent-type: text/plainproducer:partition-count: 3 #分区数目
此时消息生产者为:
@Resource
private StreamBridge streamBridge;@GetMapping("/send")
public Boolean sendMessageToKafka(String msg){boolean send = streamBridge.send("outputChannel-out-0", MessageBuilder.withPayload("kafka测试:"+msg).build());return send;
}
此时消息消费者为:
@Configuration
public class KafkaChannel {@Resourceprivate StreamBridge streamBridge;/*** inputChannel 消费者* @author senfel* @date 2024/6/18 15:26* @return java.util.function.Consumer<java.lang.String>*/@Beanpublic Consumer<Message<String>> inputChannel(){return message -> {System.out.println("接收到消息Payloa:" + message.getPayload());System.out.println("接收到消息Header:" + message.getHeaders());};}
}
实战演示
我们简单进行一下演示即可,kafka环境可以看我之前的博文搭建。
主要演示功能:
正常情况下生产者发送消息到kafka,消费者监听消息并消费成功
异常情况下消费者消费失败,立即将异常消息投递到另一个topic上,兜底topic消费者消费
本次全部采用自动ack模式,如果需要手动ack参照之前的博文配置即可,注意在消费者端加上手动ack逻辑。
增加maven依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.12.RELEASE</version><relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>cce-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>seata-demo-order</name>
<description>Demo project for Spring Boot</description>
<properties><java.version>8</java.version><spring-cloud.version>Hoxton.SR12</spring-cloud.version>
</properties>
<dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>3.2.4</version></dependency>
</dependencies>
<dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>
增加applicaiton.yaml配置
spring:#kafkakafka:bootstrap-servers: 192.168.112.10:9092,192.168.112.130:9092,192.168.112.129:9092cloud:stream:kafka: # kafka配置binder:brokers: ${spring.kafka.bootstrap-servers}auto-add-partitions: true #自动分区auto-create-topics: true #自动创建主题replication-factor: 3 #副本min-partition-count: 3 #最小分区bindings:outputChannel-out-0:producer:# 无限制重发不产生消息丢失retries: Integer.MAX_VALUE#acks =0:producer不等待broker的ack,broker一接收到还没有写入磁盘就已经返回,可靠性最低#acks =1:producer等待broker的ack,partition的leader刷盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据,可靠性中#acks = all 、 -1:producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack,可靠性高,但延迟时间长#可以设置的值为:all, -1, 0, 1acks: allmin:insync:replicas: 3 #感知副本数inputChannel-in-0:consumer:concurrency: 1 #消费者数量max-concurrency: 5 #最大消费者数量recovery-interval: 3000 #3s 重连auto-rebalance-enabled: true #主题分区消费者组成员自动平衡auto-commit-offset: false #手动提交偏移量enable-dlq: true # 开启 dlq队列dlq-name: test-kafka-topic.dlqdeserializationExceptionHandler: sendToDlq #异常加入死信binders: # 与外部mq组件绑定kafkahub:type: kafkaenvironment:spring:cloud:stream:kafka: ${spring.cloud.stream.kafka.binder}default-binder: kafkahub #默认绑定function: # 定义channel名字,每个channel又可以作为生产者(in)与消费者(out)definition: inputChannel;outputChannel;dlqChannelbindings: # 通道绑定inputChannel-in-0:binder: kafkahubdestination: test-kafka-topicgroup: test-kafka-groupcontent-type: text/plainconsumer:maxAttempts: 1 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10soutputChannel-out-0:binder: kafkahubdestination: test-kafka-topiccontent-type: text/plainproducer:partition-count: 3 #分区数目dlqChannel-in-0:binder: kafkahubdestination: test-kafka-topic.dlqgroup: test-kafka-groupcontent-type: text/plainconsumer:maxAttempts: 1 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10sdlqChannel-out-0:binder: kafkahubdestination: test-kafka-topic.dlqcontent-type: text/plainproducer:partition-count: 3 #分区数目
新增Kafka通道消费者
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
import java.util.function.Consumer;/*** KafkaCustomer* @author senfel* @version 1.0* @date 2024/6/18 15:22*/
@Configuration
public class KafkaChannel {@Resourceprivate StreamBridge streamBridge;/*** inputChannel 消费者* @author senfel* @date 2024/6/18 15:26* @return java.util.function.Consumer<java.lang.String>*/@Beanpublic Consumer<Message<String>> inputChannel(){return message -> {System.out.println("接收到消息:" + message.getPayload());System.out.println("接收到消息:" + message.getHeaders());if(message.getPayload().contains("9")){boolean send = streamBridge.send("dlqChannel-out-0", MessageBuilder.withPayload("kafka异常消息发送到dlq测试:"+message).build());System.err.println("向dlqChannel发送消息:"+send);}};}/*** dlqChannel 死信消费者* @author senfel* @date 2024/6/18 15:26* @return java.util.function.Consumer<java.lang.String>*/@Beanpublic Consumer<Message<String>> dlqChannel(){return message -> {System.out.println("死信dlqChannel接收到消息:" + message.getPayload());System.out.println("死信dlqChannel接收到消息:" + message.getHeaders());};}
}
新增发送消息的接口
@Resource
private StreamBridge streamBridge;@GetMapping("/send")
public Boolean sendMessageToKafka(String msg){boolean send = streamBridge.send("outputChannel-out-0", MessageBuilder.withPayload("kafka测试:"+msg).build());return send;
}
实战测试
postman发送一个正常的消息

postman发送异常消息

相关文章:
【实战】Spring Cloud Stream 3.1+整合Kafka
文章目录 前言新版版本优势实战演示增加maven依赖增加applicaiton.yaml配置新增Kafka通道消费者新增发送消息的接口 实战测试postman发送一个正常的消息postman发送异常消息 前言 之前我们已经整合过Spring Cloud Stream 3.0版本与Kafka、RabbitMQ中间件,简直不要太…...
java之可变字符串之append方法
可变字符串如果要添加内容,需要用到append方法 语法格式如下 sbf.append(obj) 其中sbf是任意的可变字符串 obj是任意数据类型的对象 这个方法是将任意数据转换成字符串,然后添加到此序列中 public class Buffer {public static void main(String[]…...
[保姆级教程]uniapp自定义导航栏
文章目录 导文隐藏默认导航栏:全局隐藏当前页面隐藏 添加自定义导航栏视图:手写导航栏组件导航栏 导文 在 UniApp 中,自定义导航栏通常涉及到隐藏默认的导航栏,并在页面顶部添加自定义的视图组件来模拟导航栏的功能。 隐藏默认导航…...
项目训练营第二天
项目训练营第二天 用户登录逻辑 1、账户名不少于4位 2、密码不少于8位 3、数据库表中能够查询到账户、密码 4、密码查询时用同样加密脱敏处理手段处理后再和数据库中取出字段进行对比,如果账户名未查询到,直接返回null 5、后端设置相应的脱敏后用户的s…...
考研数学一有多难?130+背后的残酷真相
考研数学一很难 大家平时在网上上看到很多人说自己考了130,其实这些人只占参加考研数学人数的极少部分,有个数据可以展示出来考研数学到底有多难: 在几百万考研大军中,能考到120分以上的考生只有2%。绝大多数人的分数集中在30到…...
vue2脚手架笔记总结1
1.什么是组件 组件是实现局部代码和功能资源的集合 2.vue.config.js配置文件 使用vue inspect > output.js可以查看到Vue脚手架的默认配置,但是在这里面修改不会影响实际的配置,如果需要修改配置需要使用用vue.config.js文件,详情见:https://cli.vuej…...
校园巡礼:一周只上四天课,入学即发钱?深圳理工大学,开局即王炸
校园巡礼 | 一周只上四天课,入学即发钱?深圳理工大学,开局即王炸! 会议之眼 快讯 目前各省的高考成绩现已陆续揭晓,广东省教育考试院发布了2024年高考录取最低分数线,物理类本科线为442分,历史…...
免交互 实验
免交互 交互:我们发出指令控制程序的运行,程序在接收到指令之后按照指令的效果做出对应的反应。 免交互:间接的,通过第三方的方式把指令传送给程序,不用直接下达指令。 Here document 免交互 这是命令行格式&#…...
Sublime Text 设置
备份 {"font_size": 10,"index_files": true,"font_face": "Courier New","vintage_start_in_command_mode": false,"ignored_packages": ["Vintage"],"word_wrap": "false" }关闭…...
spire.Pdf 将pdf转成image
一、nuget安装 <ItemGroup><PackageReference Include"Spire.PDF" Version"10.6.7" /></ItemGroup> 二、直接上代码 using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Logging; using System; using System.IO;namespace …...
仓颉编程语言 -- 初识(一)
官网 文档 原生智能化 内嵌AgentDSL的编程框架,自然语言&编程语言有机融合;多Agent协同,简化符号表达,模式自由组合,支持各类智能应用开发。 天生全场景 轻量化可缩放运行时,模块化分层设计…...
前端JS必用工具【js-tool-big-box】学习,数值型数组的正向排序和倒向排序
这一小节,我们说一下前端 js-tool-big-box 这个工具库,添加的数值型数组的正向排序和倒向排序。 以前呢,我们的数组需要排序的时候,都是在项目的utils目录里,写一段公共方法,弄个冒泡排序啦,弄…...
python web框架哪家强?Flask、Django、FastAPI对比
前言 当你掌握了python的基础知识,并且会用和HTML和CSS编写简单的静态网页。现在你只需再掌握一个python web框架的知识,就可以开始编写一个动态的网站了。目前市面比较流程的python web框架有三个flask、Django、FastAPI。接下来我们对比一下。他们三个…...
Mybatis plus:IService接口
一、介绍 在MybatisPlus框架中,IService接口扮演着重要的角色。作为一个通用的服务接口,IService定义了一系列方法,包括查询、插入、更新、删除等。这些方法的定义使得在服务层进行数据库操作变得更为便捷和高效。 IService 接口是一个泛型接…...
时序分析基本概念介绍——min pulse width 最小脉冲宽度
文章目录 前言一、什么是 min pulse width?二、为什么检查 min pulse width?三、如何设置 min pulse width约束?1. 在sdc里面定义2. library里面定义 四、如何检查 min pulse width?五、如何修复 min pulse width?总结…...
PHP原生代码生成pdf---解决中文乱码问题
github地址 尝试了使用composer下载FPDF或者FPDI,但是无法解决中文乱码问题。只有使用这个github上的中文包才可以,那俩没必要下。 直接上代码(这里并没有使用任何框架) require(./fpdf/chinese.php);//生成pdf$pdf new PDF_Chinese();$pdf->AddPage…...
智慧车库管理系统
摘 要 随着城市化进程的不断加快,私家车数量的快速增长给城市交通带来了巨大的挑战,停车问题成为城市交通管理中的一大难题。车辆停车时,在停车场寻找停车位耗时过久,不仅仅浪费用户的时间,还可能引起交通拥堵。城市停…...
每日新闻掌握【2024年6月26日 星期三】
2024年6月26日 星期三 农历五月廿一 大公司/大事件 OpenAI将终止对中国提供API服务 从6月24日晚间开始,已有多名用户收到了来自OpenAI的邮件。该邮件表示,“我们的数据显示您的组织来自OpenAI目前不支持的地区的API流量。”邮件进一步表示,…...
InVEST实践及在生态系统服务供需、固碳、城市热岛、论文写作等实际项目中应用
白老师(研究员):长期从事生态系统结构-格局-过程-功能-服务的变化与响应关系等研究工作,重点围绕生物多样性、生态系统服务与价值等,构建生物地球化学模型和评价指标体系,为城市、区域和自然保护区的可持续发展和生态环…...
慧科新闻搜索研究数据库的使用指南及个人获取途径
《慧科新闻搜索研究数据库》WiseSearch由慧科讯业有限公司出品。WiseSearch是具有新闻搜索/浏览、对比分析等功能的一站式新闻搜索平台;内容包括1200种报刊和8000 网站的新闻资讯,平面媒体涵盖全国综合大报、党委机关报、都市报、行业报刊媒体࿰…...
centos 7 部署awstats 网站访问检测
一、基础环境准备(两种安装方式都要做) bash # 安装必要依赖 yum install -y httpd perl mod_perl perl-Time-HiRes perl-DateTime systemctl enable httpd # 设置 Apache 开机自启 systemctl start httpd # 启动 Apache二、安装 AWStats࿰…...
无法与IP建立连接,未能下载VSCode服务器
如题,在远程连接服务器的时候突然遇到了这个提示。 查阅了一圈,发现是VSCode版本自动更新惹的祸!!! 在VSCode的帮助->关于这里发现前几天VSCode自动更新了,我的版本号变成了1.100.3 才导致了远程连接出…...
STM32+rt-thread判断是否联网
一、根据NETDEV_FLAG_INTERNET_UP位判断 static bool is_conncected(void) {struct netdev *dev RT_NULL;dev netdev_get_first_by_flags(NETDEV_FLAG_INTERNET_UP);if (dev RT_NULL){printf("wait netdev internet up...");return false;}else{printf("loc…...
土地利用/土地覆盖遥感解译与基于CLUE模型未来变化情景预测;从基础到高级,涵盖ArcGIS数据处理、ENVI遥感解译与CLUE模型情景模拟等
🔍 土地利用/土地覆盖数据是生态、环境和气象等诸多领域模型的关键输入参数。通过遥感影像解译技术,可以精准获取历史或当前任何一个区域的土地利用/土地覆盖情况。这些数据不仅能够用于评估区域生态环境的变化趋势,还能有效评价重大生态工程…...
在WSL2的Ubuntu镜像中安装Docker
Docker官网链接: https://docs.docker.com/engine/install/ubuntu/ 1、运行以下命令卸载所有冲突的软件包: for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt-get remove $pkg; done2、设置Docker…...
.Net Framework 4/C# 关键字(非常用,持续更新...)
一、is 关键字 is 关键字用于检查对象是否于给定类型兼容,如果兼容将返回 true,如果不兼容则返回 false,在进行类型转换前,可以先使用 is 关键字判断对象是否与指定类型兼容,如果兼容才进行转换,这样的转换是安全的。 例如有:首先创建一个字符串对象,然后将字符串对象隐…...
学校时钟系统,标准考场时钟系统,AI亮相2025高考,赛思时钟系统为教育公平筑起“精准防线”
2025年#高考 将在近日拉开帷幕,#AI 监考一度冲上热搜。当AI深度融入高考,#时间同步 不再是辅助功能,而是决定AI监考系统成败的“生命线”。 AI亮相2025高考,40种异常行为0.5秒精准识别 2025年高考即将拉开帷幕,江西、…...
保姆级教程:在无网络无显卡的Windows电脑的vscode本地部署deepseek
文章目录 1 前言2 部署流程2.1 准备工作2.2 Ollama2.2.1 使用有网络的电脑下载Ollama2.2.2 安装Ollama(有网络的电脑)2.2.3 安装Ollama(无网络的电脑)2.2.4 安装验证2.2.5 修改大模型安装位置2.2.6 下载Deepseek模型 2.3 将deepse…...
08. C#入门系列【类的基本概念】:开启编程世界的奇妙冒险
C#入门系列【类的基本概念】:开启编程世界的奇妙冒险 嘿,各位编程小白探险家!欢迎来到 C# 的奇幻大陆!今天咱们要深入探索这片大陆上至关重要的 “建筑”—— 类!别害怕,跟着我,保准让你轻松搞…...
并发编程 - go版
1.并发编程基础概念 进程和线程 A. 进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。B. 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。C.一个进程可以创建和撤销多个线程;同一个进程中…...
