当前位置: 首页 > news >正文

springboot使用rabbitmq

使用springboot创建rabbitMQ的链接。

整个项目结构如下:

img

1.maven依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.4.1</version>
</dependency>

application.yaml的配置如下

spring:application:name: rabbitMQrabbitmq:host: 192.168.142.128  #rabbitmq的主机名port: 5672			   #端口,默认5672username: itheima      #rabbitmq的账号password: 123321	   #密码
server:port: 8081               #项目启动端口

2.创建rabbitMQ配置类 – RabbitConfig。

@Configuration
@Slf4j
public class RabbitConfig {@Bean("directExchange")public DirectExchange directExchange() {return new DirectExchange(MQConstant.DIRECT_EXCHANGE);}@Bean("directQueue")public Queue directQueue() {return new Queue(MQConstant.DIRECT_QUEUE);}@Bean("bindingDirectExchange")public Binding bindingDirectExchange(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue") Queue directQueue) {return BindingBuilder.bind(directQueue).to(directExchange).with(MQConstant.ROUTING_KEY);}}

3.创建RabbitMQ客户端类,主要是用来发送消息用的。

@Component
@Slf4j
public class RabbitMqClient {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(MessageBody messageBody){try{String uuid = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(uuid);rabbitTemplate.convertAndSend(MQConstant.DIRECT_EXCHANGE, MQConstant.ROUTING_KEY, JSON.toJSONString(messageBody),new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);log.info("message send,{}", message);return message;}},correlationData);log.info("message send successful");}catch (Exception e){log.info("send message error:{}",e);}}
}

4.创建接收消息类 —RabbitMqServer

@Component
@Slf4j
public class RabbitMqServer {@RabbitListener(queues = MQConstant.DIRECT_QUEUE)public void receive(String message) {try {log.info("receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);switch (messageBody.getTopic()) {case QueueTopic.USER_LOGIN:User user = JSON.parseObject(messageBody.getData(), User.class);log.info("receive user:{}", user);break;default:log.info("no need hanndle message:{},topic:{}", message, messageBody.getTopic());break;}}catch (Exception e){log.error("rabbitmq receive message error:{}", e);}}
}

5.有了以上准备后就可以开始向mq里面发送消息了,在单元测试编写测试代码。

@SpringBootTest(classes = RabbitMqApplication.class)
class RabbitMqApplicationTests {@Autowiredprivate RabbitMqClient rabbitMqClient;@Testvoid testDirectSend() {//数据User user = new User();user.setId(123);user.setName("Lewin-jie2");user.setPassword("123");MessageBody messageBody = new MessageBody();messageBody.setData(JSON.toJSONString(user));long time = new Date().getTime();messageBody.setSendTime(time);//添加主题messageBody.setTopic(QueueTopic.USER_LOGIN);rabbitMqClient.send(messageBody);}}

6.运行后,可以看到后台的日志,证明我们消息发送已经成功了。

image-20241109151751035

我们打开rabbitmq的控制台(http://你的主机名:15672),可以开到队列里面也收到了消息,但是还没有被消费。

image-20241109152401671

以上出现结果就证明rabbit已经是配置好了。那么我们来了解一下啊rabbitmq

简介:rabbitmq是基于amqp协议,用elang语言开发的一个高级的消息队列,以高性能,高可靠,高吞吐量而被大量应用到应用系统作为第三方消息中间件使用,为应用系统实现应用解耦削峰减流,异步消息

rabbitmq主要构造有,producter,consumer,exchange,queue组成

1.直连交换机(direct_exchange)。

刚刚配置的时候就是演示的producter发消息到直连交换机,然后再发送到queue中的过程。

2.广播交换机(fanout_exchange).

顾名思义,就是绑定该交换机的所有队列都可以收到这个交换机的消息

	@Bean("fanoutExchange")public FanoutExchange fanoutExchange() {return new FanoutExchange(MQConstant.FANOUT_EXCHANGE);}@Bean("aQueue")public Queue aQueue(){return new Queue(MQConstant.FANOUT_QUEUE_A);}@Bean("bQueue")public Queue bQueue(){return new Queue(MQConstant.FANOUT_QUEUE_B);}@Bean("cQueue")public Queue cQueue(){return new Queue(MQConstant.FANOUT_QUEUE_C);}/*** 绑定队列aQueue bQueue cQueue*/@Bean("bindingFanoutExchange1")public Binding bindingFanoutExchange1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("aQueue") Queue aQueue){return BindingBuilder.bind(aQueue).to(fanoutExchange);}@Bean("bindingFanoutExchange2")public Binding bindingFanoutExchange2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("bQueue") Queue bQueue){return BindingBuilder.bind(bQueue).to(fanoutExchange);}@Bean("bindingFanoutExchange3")public Binding bindingFanoutExchange3(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("cQueue") Queue cQueue){return BindingBuilder.bind(cQueue).to(fanoutExchange);}

编写controller类,再postman上面测试[http://localhost:8081/mq/sendFanoutMsg?msg=hi i am a fanoutmag](http://localhost:8081/mq/sendFanoutMsg?msg=hi i am a fanoutmag)

@Controller
@RequestMapping("/mq")
@Slf4j
public class SendMessageController {@Autowiredprivate RabbitMqClient rabbitMqClient;@PostMapping("/sendFanoutMsg")public String sendFanoutMsg(@RequestParam("msg") String msg){try {MessageBody messageBody = new MessageBody();messageBody.setData(msg);rabbitMqClient.send1(messageBody);}catch (Exception e){log.error("sendFanoutMsg error{}", e);}return "send fanout msg success";}
}

结果:控制台收到消息了!!!

image-20241109213739205

3.主题交换机(topic_exchange)

topic_exchange和direct_exchange很像,topic有通配符。direct没有。

image-20241109214721827
  1. china.news 代表只关心中国新闻

  2. china.weather 代表只关心中国天气

  3. japan.news 代表只关心日本的新闻

  4. japan.weather 代表只关心日本的天气

    controller接口

    @PostMapping("/sendTopicMsg")public String sendTopicMsg(@RequestParam("msg") String msg,@RequestParam("type") String type){try {MessageBody messageBody = new MessageBody();messageBody.setData(msg);rabbitMqClient.send3(messageBody,type);}catch (Exception e){log.error("sendTopicMsg error{}", e);}return "send topic msg success";}
    

    利用postman测试。

    1.msg: “祖国75岁生日快乐”,type:“china.news”

    image-20241110102452558

    预测:queue1,queue4会接收到消息。

    image-20241110102659758

    2.msg: “日本大量排核废水,导致哥斯拉出现”,type:“japan.news”

    image-20241110102738974

    预测:queue2,queue4会接收到消息。

    image-20241110103638277

    3.msg: “今日日本出现大暴雨,怀疑是哥斯拉来了”,type:“Japan.weather”

    image-20241110103727452

    预测:queue2,queue3会接收到消息

    image-20241110103839382

topic-exchange在代码中如何使用。首先创建交换机,和队列,绑定交换机。

/*============================topic===========================*/@Bean("topicExchange")public TopicExchange topicExchange() {return new TopicExchange(MQConstant.TOPIC_EXCHANGE);}@Bean("queue1")public Queue queue1(){return new Queue(MQConstant.QUEUE1);}@Bean("queue2")public Queue queue2(){return new Queue(MQConstant.QUEUE2);}@Bean("queue3")public Queue queue3(){return new Queue(MQConstant.QUEUE3);}@Bean("queue4")public Queue queue4(){return new Queue(MQConstant.QUEUE4);}@Bean("bingTopicExchange1")public Binding bingTopicExchange1(@Qualifier("queue1") Queue queue1,@Qualifier("topicExchange") TopicExchange topicExchange) {return BindingBuilder.bind(queue1).to(topicExchange).with(MQConstant.CHINA_);}@Bean("bingTopicExchange2")public Binding bingTopicExchange2(@Qualifier("queue2") Queue queue2,@Qualifier("topicExchange") TopicExchange topicExchange) {return BindingBuilder.bind(queue2).to(topicExchange).with(MQConstant.JAPAN_);}@Bean("bingTopicExchange3")public Binding bingTopicExchange3(@Qualifier("queue3") Queue queue3,@Qualifier("topicExchange") TopicExchange topicExchange) {return BindingBuilder.bind(queue3).to(topicExchange).with(MQConstant._WEATHER);}@Bean("bingTopicExchange4")public Binding bingTopicExchange4(@Qualifier("queue4") Queue queue4,@Qualifier("topicExchange") TopicExchange topicExchange) {return BindingBuilder.bind(queue4).to(topicExchange).with(MQConstant._NEWS);}

消息发送

 public void send3(MessageBody messageBody,String routingKey) {try{String uuid = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(uuid);rabbitTemplate.convertAndSend(MQConstant.TOPIC_EXCHANGE, routingKey , JSON.toJSONString(messageBody),new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);log.info("message send,{}", message);return message;}},correlationData);log.info("message send successful");}catch (Exception e){log.info("send message error:{}",e);}}

消息接收

@RabbitListener(queues = MQConstant.QUEUE1)public void receive4(String message) {log.info("topic exchange");try {log.info("queue1 receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);log.info("receive message:{}", messageBody.getData());}catch (Exception e){log.error("rabbitmq receive a message error:{}", e);}}@RabbitListener(queues = MQConstant.QUEUE2)public void receive5(String message) {log.info("topic exchange");try {log.info("queue2 receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);log.info("receive message:{}", messageBody.getData());}catch (Exception e){log.error("rabbitmq receive a message error:{}", e);}}@RabbitListener(queues = MQConstant.QUEUE3)public void receive6(String message) {log.info("topic exchange");try {log.info("queue3 receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);log.info("receive message:{}", messageBody.getData());}catch (Exception e){log.error("rabbitmq receive a message error:{}", e);}}@RabbitListener(queues = MQConstant.QUEUE4)public void receive7(String message) {log.info("topic exchange");try {log.info("queue4 receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);log.info("receive message:{}", messageBody.getData());}catch (Exception e){log.error("rabbitmq receive a message error:{}", e);}}

相关文章:

springboot使用rabbitmq

使用springboot创建rabbitMQ的链接。 整个项目结构如下&#xff1a; 1.maven依赖 <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.4.1</version> </dependency>application.y…...

【微服务与分布式实践】探索 Eureka

服务注册中心 心跳检测机制&#xff1a;剔除失效服务自我保护机制 统计心跳失败的比例在15分钟之内是否低于85%&#xff0c;如果出现低于的情况&#xff0c;Eureka Server会将当前的实例注册信息保护起来&#xff0c;让这些实例不会过期。当节点在短时间内丢失过多的心跳时&am…...

Day48:获取字典键的值

在 Python 中&#xff0c;字典是一种无序的集合类型&#xff0c;它以键-值对的形式存储数据。字典的每个元素都有一个唯一的键&#xff0c;并且每个键都对应一个值。获取字典中的值是字典操作的常见任务&#xff0c;今天我们将学习如何从字典中获取键对应的值。 1. 使用方括号…...

Java锁自定义实现到aqs的理解

专栏系列文章地址&#xff1a;https://blog.csdn.net/qq_26437925/article/details/145290162 本文目标&#xff1a; 理解锁&#xff0c;能自定义实现锁通过自定义锁的实现复习Thread和Object的相关方法开始尝试理解Aqs, 这样后续基于Aqs的的各种实现将能更好的理解 目录 锁的…...

仿真设计|基于51单片机的温度与烟雾报警系统

目录 具体实现功能 设计介绍 51单片机简介 资料内容 仿真实现&#xff08;protues8.7&#xff09; 程序&#xff08;Keil5&#xff09; 全部内容 资料获取 具体实现功能 &#xff08;1&#xff09;LCD1602实时监测及显示温度值和烟雾浓度值&#xff1b; &#xff08;2…...

文件读写操作

写入文本文件 #include <iostream> #include <fstream>//ofstream类需要包含的头文件 using namespace std;void test01() {//1、包含头文件 fstream//2、创建流对象ofstream fout;/*3、指定打开方式&#xff1a;1.ios::out、ios::trunc 清除文件内容后打开2.ios:…...

【后端开发】字节跳动青训营Cloudwego脚手架

Cloudwego脚手架使用 cwgo脚手架 cwgo脚手架 安装的命令&#xff1a; GOPROXYhttps://goproxy.cn/,direct go install github.com/cloudwego/cwgolatest依赖thriftgo的安装&#xff1a; go install github.com/cloudwego/thriftgolatest编辑echo.thrift文件用于生成项目&…...

SQL UCASE() 函数详解

SQL UCASE() 函数详解 在SQL中&#xff0c;UCASE() 函数是一个非常有用的字符串处理函数&#xff0c;它可以将字符串中的所有小写字母转换为大写字母。本文将详细介绍UCASE() 函数的用法、语法、示例以及其在实际应用中的优势。 一、UCASE() 函数简介 UCASE() 函数是SQL标准…...

99.23 金融难点通俗解释:小卖部经营比喻PPI(生产者物价指数)vsCPI(消费者物价指数)

目录 0. 承前1. 简述&#xff1a;价格指数对比2. 比喻&#xff1a;两大指数对比2.1 简单对比2.2 生动比喻 3. 实际应用3.1 价格传导现象 4. 总结5. 有趣的对比6. 数据获取实现代码7. 数据可视化实现代码 0. 承前 本文主旨&#xff1a; 本文使用小卖部比喻PPI和CPI&#xff0c;…...

【Elasticsearch】match_bool_prefix 查询 vs match_phrase_prefix 查询

Match Bool Prefix Query vs. Match Phrase Prefix Query 在 Elasticsearch 中&#xff0c;match_bool_prefix 查询和 match_phrase_prefix 查询虽然都支持前缀匹配&#xff0c;但它们的行为和用途有所不同。以下是它们之间的主要区别&#xff1a; 1. match_bool_prefix 查询…...

H. Mad City

题目链接&#xff1a;Problem - H - Codeforces 题目大意&#xff1a;给定一个带环的图&#xff0c; 以及a, b两点 判断再图上不断的移动&#xff0c; b想不与a相遇&#xff0c; a想捉到b, 并且二者只能移动一步。 若b跑不掉 NO 否则YES. 具体题目看链接 输入&#xff1a; …...

【图床配置】PicGO+Gitee方案

【图床配置】PicGOGitee方案 文章目录 【图床配置】PicGOGitee方案为啥要用图床图床是什么配置步骤下载安装PicGoPicGo配置创建Gitee仓库Typora中的设置 为啥要用图床 在Markdown中&#xff0c;图片默认是以路径的形式存在的&#xff0c;类似这样 可以看到这是本地路径&#x…...

《程序人生》工作2年感悟

一些杂七杂八的感悟&#xff1a; 1.把事做好比什么都重要&#xff0c; 先树立量良好的形象&#xff0c;再横向发展。 2.职场就是人情世故&#xff0c;但也不要被人情世故绑架。 3.要常怀感恩的心&#xff0c;要记住帮助过你的人&#xff0c;愿意和你分享的人&#xff0c;有能力…...

当当网近30日热销图书的数据采集与可视化分析(scrapy+openpyxl+matplotlib)

当当网近30日热销图书的数据采集与可视化分析(scrapy+openpyxl+matplotlib) 当当网近30日热销书籍官网写在前面 实验目的:实现当当网近30日热销图书的数据采集与可视化分析。 电脑系统:Windows 使用软件:Visual Studio Code Python版本:python 3.12.4 技术需求:scrapy、…...

unity学习25:用 transform 进行旋转和移动,简单的太阳地球月亮模型,以及父子级关系

目录 备注内容 1游戏物体的父子级关系 1.1 父子物体 1.2 坐标关系 1.3 父子物体实际是用 每个gameobject的tranform来关联的 2 获取gameObject的静态数据 2.1 具体命令 2.2 具体代码 2.3 输出结果 3 获取gameObject 的方向 3.1 游戏里默认的3个方向 3.2 获取方向代…...

【项目集成Husky】

项目集成Husky 安装初始化 Husky在.husky → pre-commit文件中添加想要执行的命令 安装 使用 Husky 可以帮助你在 Git 钩子中运行脚本&#xff0c;例如在提交代码前运行测试或格式化代码pnpm add --save-dev husky初始化 Husky npx husky init这会在项目根目录下创建一个 .hu…...

基于Spring Security 6的OAuth2 系列之七 - 授权服务器--自定义数据库客户端信息

之所以想写这一系列&#xff0c;是因为之前工作过程中使用Spring Security OAuth2搭建了网关和授权服务器&#xff0c;但当时基于spring-boot 2.3.x&#xff0c;其默认的Spring Security是5.3.x。之后新项目升级到了spring-boot 3.3.0&#xff0c;结果一看Spring Security也升级…...

【Matlab高端绘图SCI绘图模板】第006期 对比绘柱状图 (只需替换数据)

1. 简介 柱状图作为科研论文中常用的实验结果对比图&#xff0c;本文采用了3组实验对比的效果展示图&#xff0c;代码已调试好&#xff0c;只需替换数据即可生成相关柱状图&#xff0c;为科研加分。通过获得Nature配色的柱状图&#xff0c;让你的论文看起来档次更高&#xff0…...

Java 大视界 -- Java 大数据在生物信息学中的应用与挑战(67)

&#x1f496;亲爱的朋友们&#xff0c;热烈欢迎来到 青云交的博客&#xff01;能与诸位在此相逢&#xff0c;我倍感荣幸。在这飞速更迭的时代&#xff0c;我们都渴望一方心灵净土&#xff0c;而 我的博客 正是这样温暖的所在。这里为你呈上趣味与实用兼具的知识&#xff0c;也…...

.NET Core 中依赖注入的使用

ASP.NET Core中服务注入的地方 在ASP.NET Core项目中一般不需要自己创建ServiceCollection、IServiceProvider。在Program.cs的builder.Build()之前向builder.Services中注入。在Controller中可以通过构造方法注入服务。 低使用频率的服务 把Action用到的服务通过Action的参…...

【网络】每天掌握一个Linux命令 - iftop

在Linux系统中&#xff0c;iftop是网络管理的得力助手&#xff0c;能实时监控网络流量、连接情况等&#xff0c;帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...

中南大学无人机智能体的全面评估!BEDI:用于评估无人机上具身智能体的综合性基准测试

作者&#xff1a;Mingning Guo, Mengwei Wu, Jiarun He, Shaoxian Li, Haifeng Li, Chao Tao单位&#xff1a;中南大学地球科学与信息物理学院论文标题&#xff1a;BEDI: A Comprehensive Benchmark for Evaluating Embodied Agents on UAVs论文链接&#xff1a;https://arxiv.…...

C++中string流知识详解和示例

一、概览与类体系 C 提供三种基于内存字符串的流&#xff0c;定义在 <sstream> 中&#xff1a; std::istringstream&#xff1a;输入流&#xff0c;从已有字符串中读取并解析。std::ostringstream&#xff1a;输出流&#xff0c;向内部缓冲区写入内容&#xff0c;最终取…...

Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析

Java求职者面试指南&#xff1a;Spring、Spring Boot、MyBatis框架与计算机基础问题解析 一、第一轮提问&#xff08;基础概念问题&#xff09; 1. 请解释Spring框架的核心容器是什么&#xff1f;它在Spring中起到什么作用&#xff1f; Spring框架的核心容器是IoC容器&#…...

JS设计模式(4):观察者模式

JS设计模式(4):观察者模式 一、引入 在开发中&#xff0c;我们经常会遇到这样的场景&#xff1a;一个对象的状态变化需要自动通知其他对象&#xff0c;比如&#xff1a; 电商平台中&#xff0c;商品库存变化时需要通知所有订阅该商品的用户&#xff1b;新闻网站中&#xff0…...

JVM虚拟机:内存结构、垃圾回收、性能优化

1、JVM虚拟机的简介 Java 虚拟机(Java Virtual Machine 简称:JVM)是运行所有 Java 程序的抽象计算机,是 Java 语言的运行环境,实现了 Java 程序的跨平台特性。JVM 屏蔽了与具体操作系统平台相关的信息,使得 Java 程序只需生成在 JVM 上运行的目标代码(字节码),就可以…...

Go语言多线程问题

打印零与奇偶数&#xff08;leetcode 1116&#xff09; 方法1&#xff1a;使用互斥锁和条件变量 package mainimport ("fmt""sync" )type ZeroEvenOdd struct {n intzeroMutex sync.MutexevenMutex sync.MutexoddMutex sync.Mutexcurrent int…...

iview框架主题色的应用

1.下载 less要使用3.0.0以下的版本 npm install less2.7.3 npm install less-loader4.0.52./src/config/theme.js文件 module.exports {yellow: {theme-color: #FDCE04},blue: {theme-color: #547CE7} }在sass中使用theme配置的颜色主题&#xff0c;无需引入&#xff0c;直接可…...

LabVIEW双光子成像系统技术

双光子成像技术的核心特性 双光子成像通过双低能量光子协同激发机制&#xff0c;展现出显著的技术优势&#xff1a; 深层组织穿透能力&#xff1a;适用于活体组织深度成像 高分辨率观测性能&#xff1a;满足微观结构的精细研究需求 低光毒性特点&#xff1a;减少对样本的损伤…...

华为OD最新机试真题-数组组成的最小数字-OD统一考试(B卷)

题目描述 给定一个整型数组,请从该数组中选择3个元素 组成最小数字并输出 (如果数组长度小于3,则选择数组中所有元素来组成最小数字)。 输入描述 行用半角逗号分割的字符串记录的整型数组,0<数组长度<= 100,0<整数的取值范围<= 10000。 输出描述 由3个元素组成…...