springboot使用rabbitmq
使用springboot创建rabbitMQ的链接。
整个项目结构如下:

1.maven依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.4.1</version>
</dependency>
application.yaml的配置如下
spring:application:name: rabbitMQrabbitmq:host: 192.168.142.128 #rabbitmq的主机名port: 5672 #端口,默认5672username: itheima #rabbitmq的账号password: 123321 #密码
server:port: 8081 #项目启动端口
2.创建rabbitMQ配置类 – RabbitConfig。
@Configuration
@Slf4j
public class RabbitConfig {@Bean("directExchange")public DirectExchange directExchange() {return new DirectExchange(MQConstant.DIRECT_EXCHANGE);}@Bean("directQueue")public Queue directQueue() {return new Queue(MQConstant.DIRECT_QUEUE);}@Bean("bindingDirectExchange")public Binding bindingDirectExchange(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue") Queue directQueue) {return BindingBuilder.bind(directQueue).to(directExchange).with(MQConstant.ROUTING_KEY);}}
3.创建RabbitMQ客户端类,主要是用来发送消息用的。
@Component
@Slf4j
public class RabbitMqClient {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(MessageBody messageBody){try{String uuid = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(uuid);rabbitTemplate.convertAndSend(MQConstant.DIRECT_EXCHANGE, MQConstant.ROUTING_KEY, JSON.toJSONString(messageBody),new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);log.info("message send,{}", message);return message;}},correlationData);log.info("message send successful");}catch (Exception e){log.info("send message error:{}",e);}}
}
4.创建接收消息类 —RabbitMqServer
@Component
@Slf4j
public class RabbitMqServer {@RabbitListener(queues = MQConstant.DIRECT_QUEUE)public void receive(String message) {try {log.info("receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);switch (messageBody.getTopic()) {case QueueTopic.USER_LOGIN:User user = JSON.parseObject(messageBody.getData(), User.class);log.info("receive user:{}", user);break;default:log.info("no need hanndle message:{},topic:{}", message, messageBody.getTopic());break;}}catch (Exception e){log.error("rabbitmq receive message error:{}", e);}}
}
5.有了以上准备后就可以开始向mq里面发送消息了,在单元测试编写测试代码。
@SpringBootTest(classes = RabbitMqApplication.class)
class RabbitMqApplicationTests {@Autowiredprivate RabbitMqClient rabbitMqClient;@Testvoid testDirectSend() {//数据User user = new User();user.setId(123);user.setName("Lewin-jie2");user.setPassword("123");MessageBody messageBody = new MessageBody();messageBody.setData(JSON.toJSONString(user));long time = new Date().getTime();messageBody.setSendTime(time);//添加主题messageBody.setTopic(QueueTopic.USER_LOGIN);rabbitMqClient.send(messageBody);}}
6.运行后,可以看到后台的日志,证明我们消息发送已经成功了。

我们打开rabbitmq的控制台(http://你的主机名:15672),可以开到队列里面也收到了消息,但是还没有被消费。

以上出现结果就证明rabbit已经是配置好了。那么我们来了解一下啊rabbitmq
简介:rabbitmq是基于amqp协议,用elang语言开发的一个高级的消息队列,以高性能,高可靠,高吞吐量而被大量应用到应用系统作为第三方消息中间件使用,为应用系统实现应用解耦,削峰减流,异步消息。
rabbitmq主要构造有,producter,consumer,exchange,queue组成
1.直连交换机(direct_exchange)。
刚刚配置的时候就是演示的producter发消息到直连交换机,然后再发送到queue中的过程。
2.广播交换机(fanout_exchange).
顾名思义,就是绑定该交换机的所有队列都可以收到这个交换机的消息
@Bean("fanoutExchange")public FanoutExchange fanoutExchange() {return new FanoutExchange(MQConstant.FANOUT_EXCHANGE);}@Bean("aQueue")public Queue aQueue(){return new Queue(MQConstant.FANOUT_QUEUE_A);}@Bean("bQueue")public Queue bQueue(){return new Queue(MQConstant.FANOUT_QUEUE_B);}@Bean("cQueue")public Queue cQueue(){return new Queue(MQConstant.FANOUT_QUEUE_C);}/*** 绑定队列aQueue bQueue cQueue*/@Bean("bindingFanoutExchange1")public Binding bindingFanoutExchange1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("aQueue") Queue aQueue){return BindingBuilder.bind(aQueue).to(fanoutExchange);}@Bean("bindingFanoutExchange2")public Binding bindingFanoutExchange2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("bQueue") Queue bQueue){return BindingBuilder.bind(bQueue).to(fanoutExchange);}@Bean("bindingFanoutExchange3")public Binding bindingFanoutExchange3(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("cQueue") Queue cQueue){return BindingBuilder.bind(cQueue).to(fanoutExchange);}
编写controller类,再postman上面测试[http://localhost:8081/mq/sendFanoutMsg?msg=hi i am a fanoutmag](http://localhost:8081/mq/sendFanoutMsg?msg=hi i am a fanoutmag)
@Controller
@RequestMapping("/mq")
@Slf4j
public class SendMessageController {@Autowiredprivate RabbitMqClient rabbitMqClient;@PostMapping("/sendFanoutMsg")public String sendFanoutMsg(@RequestParam("msg") String msg){try {MessageBody messageBody = new MessageBody();messageBody.setData(msg);rabbitMqClient.send1(messageBody);}catch (Exception e){log.error("sendFanoutMsg error{}", e);}return "send fanout msg success";}
}
结果:控制台收到消息了!!!

3.主题交换机(topic_exchange)
topic_exchange和direct_exchange很像,topic有通配符。direct没有。
-
china.news 代表只关心中国新闻
-
china.weather 代表只关心中国天气
-
japan.news 代表只关心日本的新闻
-
japan.weather 代表只关心日本的天气
controller接口
@PostMapping("/sendTopicMsg")public String sendTopicMsg(@RequestParam("msg") String msg,@RequestParam("type") String type){try {MessageBody messageBody = new MessageBody();messageBody.setData(msg);rabbitMqClient.send3(messageBody,type);}catch (Exception e){log.error("sendTopicMsg error{}", e);}return "send topic msg success";}利用postman测试。
1.msg: “祖国75岁生日快乐”,type:“china.news”

预测:queue1,queue4会接收到消息。

2.msg: “日本大量排核废水,导致哥斯拉出现”,type:“japan.news”

预测:queue2,queue4会接收到消息。

3.msg: “今日日本出现大暴雨,怀疑是哥斯拉来了”,type:“Japan.weather”

预测:queue2,queue3会接收到消息

topic-exchange在代码中如何使用。首先创建交换机,和队列,绑定交换机。
/*============================topic===========================*/@Bean("topicExchange")public TopicExchange topicExchange() {return new TopicExchange(MQConstant.TOPIC_EXCHANGE);}@Bean("queue1")public Queue queue1(){return new Queue(MQConstant.QUEUE1);}@Bean("queue2")public Queue queue2(){return new Queue(MQConstant.QUEUE2);}@Bean("queue3")public Queue queue3(){return new Queue(MQConstant.QUEUE3);}@Bean("queue4")public Queue queue4(){return new Queue(MQConstant.QUEUE4);}@Bean("bingTopicExchange1")public Binding bingTopicExchange1(@Qualifier("queue1") Queue queue1,@Qualifier("topicExchange") TopicExchange topicExchange) {return BindingBuilder.bind(queue1).to(topicExchange).with(MQConstant.CHINA_);}@Bean("bingTopicExchange2")public Binding bingTopicExchange2(@Qualifier("queue2") Queue queue2,@Qualifier("topicExchange") TopicExchange topicExchange) {return BindingBuilder.bind(queue2).to(topicExchange).with(MQConstant.JAPAN_);}@Bean("bingTopicExchange3")public Binding bingTopicExchange3(@Qualifier("queue3") Queue queue3,@Qualifier("topicExchange") TopicExchange topicExchange) {return BindingBuilder.bind(queue3).to(topicExchange).with(MQConstant._WEATHER);}@Bean("bingTopicExchange4")public Binding bingTopicExchange4(@Qualifier("queue4") Queue queue4,@Qualifier("topicExchange") TopicExchange topicExchange) {return BindingBuilder.bind(queue4).to(topicExchange).with(MQConstant._NEWS);}
消息发送
public void send3(MessageBody messageBody,String routingKey) {try{String uuid = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(uuid);rabbitTemplate.convertAndSend(MQConstant.TOPIC_EXCHANGE, routingKey , JSON.toJSONString(messageBody),new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);log.info("message send,{}", message);return message;}},correlationData);log.info("message send successful");}catch (Exception e){log.info("send message error:{}",e);}}
消息接收
@RabbitListener(queues = MQConstant.QUEUE1)public void receive4(String message) {log.info("topic exchange");try {log.info("queue1 receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);log.info("receive message:{}", messageBody.getData());}catch (Exception e){log.error("rabbitmq receive a message error:{}", e);}}@RabbitListener(queues = MQConstant.QUEUE2)public void receive5(String message) {log.info("topic exchange");try {log.info("queue2 receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);log.info("receive message:{}", messageBody.getData());}catch (Exception e){log.error("rabbitmq receive a message error:{}", e);}}@RabbitListener(queues = MQConstant.QUEUE3)public void receive6(String message) {log.info("topic exchange");try {log.info("queue3 receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);log.info("receive message:{}", messageBody.getData());}catch (Exception e){log.error("rabbitmq receive a message error:{}", e);}}@RabbitListener(queues = MQConstant.QUEUE4)public void receive7(String message) {log.info("topic exchange");try {log.info("queue4 receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);log.info("receive message:{}", messageBody.getData());}catch (Exception e){log.error("rabbitmq receive a message error:{}", e);}}
相关文章:
springboot使用rabbitmq
使用springboot创建rabbitMQ的链接。 整个项目结构如下: 1.maven依赖 <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.4.1</version> </dependency>application.y…...
【微服务与分布式实践】探索 Eureka
服务注册中心 心跳检测机制:剔除失效服务自我保护机制 统计心跳失败的比例在15分钟之内是否低于85%,如果出现低于的情况,Eureka Server会将当前的实例注册信息保护起来,让这些实例不会过期。当节点在短时间内丢失过多的心跳时&am…...
Day48:获取字典键的值
在 Python 中,字典是一种无序的集合类型,它以键-值对的形式存储数据。字典的每个元素都有一个唯一的键,并且每个键都对应一个值。获取字典中的值是字典操作的常见任务,今天我们将学习如何从字典中获取键对应的值。 1. 使用方括号…...
Java锁自定义实现到aqs的理解
专栏系列文章地址:https://blog.csdn.net/qq_26437925/article/details/145290162 本文目标: 理解锁,能自定义实现锁通过自定义锁的实现复习Thread和Object的相关方法开始尝试理解Aqs, 这样后续基于Aqs的的各种实现将能更好的理解 目录 锁的…...
仿真设计|基于51单片机的温度与烟雾报警系统
目录 具体实现功能 设计介绍 51单片机简介 资料内容 仿真实现(protues8.7) 程序(Keil5) 全部内容 资料获取 具体实现功能 (1)LCD1602实时监测及显示温度值和烟雾浓度值; (2…...
文件读写操作
写入文本文件 #include <iostream> #include <fstream>//ofstream类需要包含的头文件 using namespace std;void test01() {//1、包含头文件 fstream//2、创建流对象ofstream fout;/*3、指定打开方式:1.ios::out、ios::trunc 清除文件内容后打开2.ios:…...
【后端开发】字节跳动青训营Cloudwego脚手架
Cloudwego脚手架使用 cwgo脚手架 cwgo脚手架 安装的命令: GOPROXYhttps://goproxy.cn/,direct go install github.com/cloudwego/cwgolatest依赖thriftgo的安装: go install github.com/cloudwego/thriftgolatest编辑echo.thrift文件用于生成项目&…...
SQL UCASE() 函数详解
SQL UCASE() 函数详解 在SQL中,UCASE() 函数是一个非常有用的字符串处理函数,它可以将字符串中的所有小写字母转换为大写字母。本文将详细介绍UCASE() 函数的用法、语法、示例以及其在实际应用中的优势。 一、UCASE() 函数简介 UCASE() 函数是SQL标准…...
99.23 金融难点通俗解释:小卖部经营比喻PPI(生产者物价指数)vsCPI(消费者物价指数)
目录 0. 承前1. 简述:价格指数对比2. 比喻:两大指数对比2.1 简单对比2.2 生动比喻 3. 实际应用3.1 价格传导现象 4. 总结5. 有趣的对比6. 数据获取实现代码7. 数据可视化实现代码 0. 承前 本文主旨: 本文使用小卖部比喻PPI和CPI,…...
【Elasticsearch】match_bool_prefix 查询 vs match_phrase_prefix 查询
Match Bool Prefix Query vs. Match Phrase Prefix Query 在 Elasticsearch 中,match_bool_prefix 查询和 match_phrase_prefix 查询虽然都支持前缀匹配,但它们的行为和用途有所不同。以下是它们之间的主要区别: 1. match_bool_prefix 查询…...
H. Mad City
题目链接:Problem - H - Codeforces 题目大意:给定一个带环的图, 以及a, b两点 判断再图上不断的移动, b想不与a相遇, a想捉到b, 并且二者只能移动一步。 若b跑不掉 NO 否则YES. 具体题目看链接 输入: …...
【图床配置】PicGO+Gitee方案
【图床配置】PicGOGitee方案 文章目录 【图床配置】PicGOGitee方案为啥要用图床图床是什么配置步骤下载安装PicGoPicGo配置创建Gitee仓库Typora中的设置 为啥要用图床 在Markdown中,图片默认是以路径的形式存在的,类似这样 可以看到这是本地路径&#x…...
《程序人生》工作2年感悟
一些杂七杂八的感悟: 1.把事做好比什么都重要, 先树立量良好的形象,再横向发展。 2.职场就是人情世故,但也不要被人情世故绑架。 3.要常怀感恩的心,要记住帮助过你的人,愿意和你分享的人,有能力…...
当当网近30日热销图书的数据采集与可视化分析(scrapy+openpyxl+matplotlib)
当当网近30日热销图书的数据采集与可视化分析(scrapy+openpyxl+matplotlib) 当当网近30日热销书籍官网写在前面 实验目的:实现当当网近30日热销图书的数据采集与可视化分析。 电脑系统:Windows 使用软件:Visual Studio Code Python版本:python 3.12.4 技术需求:scrapy、…...
unity学习25:用 transform 进行旋转和移动,简单的太阳地球月亮模型,以及父子级关系
目录 备注内容 1游戏物体的父子级关系 1.1 父子物体 1.2 坐标关系 1.3 父子物体实际是用 每个gameobject的tranform来关联的 2 获取gameObject的静态数据 2.1 具体命令 2.2 具体代码 2.3 输出结果 3 获取gameObject 的方向 3.1 游戏里默认的3个方向 3.2 获取方向代…...
【项目集成Husky】
项目集成Husky 安装初始化 Husky在.husky → pre-commit文件中添加想要执行的命令 安装 使用 Husky 可以帮助你在 Git 钩子中运行脚本,例如在提交代码前运行测试或格式化代码pnpm add --save-dev husky初始化 Husky npx husky init这会在项目根目录下创建一个 .hu…...
基于Spring Security 6的OAuth2 系列之七 - 授权服务器--自定义数据库客户端信息
之所以想写这一系列,是因为之前工作过程中使用Spring Security OAuth2搭建了网关和授权服务器,但当时基于spring-boot 2.3.x,其默认的Spring Security是5.3.x。之后新项目升级到了spring-boot 3.3.0,结果一看Spring Security也升级…...
【Matlab高端绘图SCI绘图模板】第006期 对比绘柱状图 (只需替换数据)
1. 简介 柱状图作为科研论文中常用的实验结果对比图,本文采用了3组实验对比的效果展示图,代码已调试好,只需替换数据即可生成相关柱状图,为科研加分。通过获得Nature配色的柱状图,让你的论文看起来档次更高࿰…...
Java 大视界 -- Java 大数据在生物信息学中的应用与挑战(67)
💖亲爱的朋友们,热烈欢迎来到 青云交的博客!能与诸位在此相逢,我倍感荣幸。在这飞速更迭的时代,我们都渴望一方心灵净土,而 我的博客 正是这样温暖的所在。这里为你呈上趣味与实用兼具的知识,也…...
.NET Core 中依赖注入的使用
ASP.NET Core中服务注入的地方 在ASP.NET Core项目中一般不需要自己创建ServiceCollection、IServiceProvider。在Program.cs的builder.Build()之前向builder.Services中注入。在Controller中可以通过构造方法注入服务。 低使用频率的服务 把Action用到的服务通过Action的参…...
ubuntu搭建nfs服务centos挂载访问
在Ubuntu上设置NFS服务器 在Ubuntu上,你可以使用apt包管理器来安装NFS服务器。打开终端并运行: sudo apt update sudo apt install nfs-kernel-server创建共享目录 创建一个目录用于共享,例如/shared: sudo mkdir /shared sud…...
【WiFi帧结构】
文章目录 帧结构MAC头部管理帧 帧结构 Wi-Fi的帧分为三部分组成:MAC头部frame bodyFCS,其中MAC是固定格式的,frame body是可变长度。 MAC头部有frame control,duration,address1,address2,addre…...
【Java学习笔记】Arrays类
Arrays 类 1. 导入包:import java.util.Arrays 2. 常用方法一览表 方法描述Arrays.toString()返回数组的字符串形式Arrays.sort()排序(自然排序和定制排序)Arrays.binarySearch()通过二分搜索法进行查找(前提:数组是…...
JVM垃圾回收机制全解析
Java虚拟机(JVM)中的垃圾收集器(Garbage Collector,简称GC)是用于自动管理内存的机制。它负责识别和清除不再被程序使用的对象,从而释放内存空间,避免内存泄漏和内存溢出等问题。垃圾收集器在Ja…...
工程地质软件市场:发展现状、趋势与策略建议
一、引言 在工程建设领域,准确把握地质条件是确保项目顺利推进和安全运营的关键。工程地质软件作为处理、分析、模拟和展示工程地质数据的重要工具,正发挥着日益重要的作用。它凭借强大的数据处理能力、三维建模功能、空间分析工具和可视化展示手段&…...
【分享】推荐一些办公小工具
1、PDF 在线转换 https://smallpdf.com/cn/pdf-tools 推荐理由:大部分的转换软件需要收费,要么功能不齐全,而开会员又用不了几次浪费钱,借用别人的又不安全。 这个网站它不需要登录或下载安装。而且提供的免费功能就能满足日常…...
R 语言科研绘图第 55 期 --- 网络图-聚类
在发表科研论文的过程中,科研绘图是必不可少的,一张好看的图形会是文章很大的加分项。 为了便于使用,本系列文章介绍的所有绘图都已收录到了 sciRplot 项目中,获取方式: R 语言科研绘图模板 --- sciRplothttps://mp.…...
Bean 作用域有哪些?如何答出技术深度?
导语: Spring 面试绕不开 Bean 的作用域问题,这是面试官考察候选人对 Spring 框架理解深度的常见方式。本文将围绕“Spring 中的 Bean 作用域”展开,结合典型面试题及实战场景,帮你厘清重点,打破模板式回答,…...
Ubuntu系统复制(U盘-电脑硬盘)
所需环境 电脑自带硬盘:1块 (1T) U盘1:Ubuntu系统引导盘(用于“U盘2”复制到“电脑自带硬盘”) U盘2:Ubuntu系统盘(1T,用于被复制) !!!建议“电脑…...
Linux中《基础IO》详细介绍
目录 理解"文件"狭义理解广义理解文件操作的归类认知系统角度文件类别 回顾C文件接口打开文件写文件读文件稍作修改,实现简单cat命令 输出信息到显示器,你有哪些方法stdin & stdout & stderr打开文件的方式 系统⽂件I/O⼀种传递标志位…...

