springboot使用rabbitmq
使用springboot创建rabbitMQ的链接。
整个项目结构如下:

1.maven依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.4.1</version>
</dependency>
application.yaml的配置如下
spring:application:name: rabbitMQrabbitmq:host: 192.168.142.128 #rabbitmq的主机名port: 5672 #端口,默认5672username: itheima #rabbitmq的账号password: 123321 #密码
server:port: 8081 #项目启动端口
2.创建rabbitMQ配置类 – RabbitConfig。
@Configuration
@Slf4j
public class RabbitConfig {@Bean("directExchange")public DirectExchange directExchange() {return new DirectExchange(MQConstant.DIRECT_EXCHANGE);}@Bean("directQueue")public Queue directQueue() {return new Queue(MQConstant.DIRECT_QUEUE);}@Bean("bindingDirectExchange")public Binding bindingDirectExchange(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue") Queue directQueue) {return BindingBuilder.bind(directQueue).to(directExchange).with(MQConstant.ROUTING_KEY);}}
3.创建RabbitMQ客户端类,主要是用来发送消息用的。
@Component
@Slf4j
public class RabbitMqClient {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(MessageBody messageBody){try{String uuid = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(uuid);rabbitTemplate.convertAndSend(MQConstant.DIRECT_EXCHANGE, MQConstant.ROUTING_KEY, JSON.toJSONString(messageBody),new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);log.info("message send,{}", message);return message;}},correlationData);log.info("message send successful");}catch (Exception e){log.info("send message error:{}",e);}}
}
4.创建接收消息类 —RabbitMqServer
@Component
@Slf4j
public class RabbitMqServer {@RabbitListener(queues = MQConstant.DIRECT_QUEUE)public void receive(String message) {try {log.info("receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);switch (messageBody.getTopic()) {case QueueTopic.USER_LOGIN:User user = JSON.parseObject(messageBody.getData(), User.class);log.info("receive user:{}", user);break;default:log.info("no need hanndle message:{},topic:{}", message, messageBody.getTopic());break;}}catch (Exception e){log.error("rabbitmq receive message error:{}", e);}}
}
5.有了以上准备后就可以开始向mq里面发送消息了,在单元测试编写测试代码。
@SpringBootTest(classes = RabbitMqApplication.class)
class RabbitMqApplicationTests {@Autowiredprivate RabbitMqClient rabbitMqClient;@Testvoid testDirectSend() {//数据User user = new User();user.setId(123);user.setName("Lewin-jie2");user.setPassword("123");MessageBody messageBody = new MessageBody();messageBody.setData(JSON.toJSONString(user));long time = new Date().getTime();messageBody.setSendTime(time);//添加主题messageBody.setTopic(QueueTopic.USER_LOGIN);rabbitMqClient.send(messageBody);}}
6.运行后,可以看到后台的日志,证明我们消息发送已经成功了。

我们打开rabbitmq的控制台(http://你的主机名:15672),可以开到队列里面也收到了消息,但是还没有被消费。

以上出现结果就证明rabbit已经是配置好了。那么我们来了解一下啊rabbitmq
简介:rabbitmq是基于amqp协议,用elang语言开发的一个高级的消息队列,以高性能,高可靠,高吞吐量而被大量应用到应用系统作为第三方消息中间件使用,为应用系统实现应用解耦,削峰减流,异步消息。
rabbitmq主要构造有,producter,consumer,exchange,queue组成
1.直连交换机(direct_exchange)。
刚刚配置的时候就是演示的producter发消息到直连交换机,然后再发送到queue中的过程。
2.广播交换机(fanout_exchange).
顾名思义,就是绑定该交换机的所有队列都可以收到这个交换机的消息
@Bean("fanoutExchange")public FanoutExchange fanoutExchange() {return new FanoutExchange(MQConstant.FANOUT_EXCHANGE);}@Bean("aQueue")public Queue aQueue(){return new Queue(MQConstant.FANOUT_QUEUE_A);}@Bean("bQueue")public Queue bQueue(){return new Queue(MQConstant.FANOUT_QUEUE_B);}@Bean("cQueue")public Queue cQueue(){return new Queue(MQConstant.FANOUT_QUEUE_C);}/*** 绑定队列aQueue bQueue cQueue*/@Bean("bindingFanoutExchange1")public Binding bindingFanoutExchange1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("aQueue") Queue aQueue){return BindingBuilder.bind(aQueue).to(fanoutExchange);}@Bean("bindingFanoutExchange2")public Binding bindingFanoutExchange2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("bQueue") Queue bQueue){return BindingBuilder.bind(bQueue).to(fanoutExchange);}@Bean("bindingFanoutExchange3")public Binding bindingFanoutExchange3(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("cQueue") Queue cQueue){return BindingBuilder.bind(cQueue).to(fanoutExchange);}
编写controller类,再postman上面测试[http://localhost:8081/mq/sendFanoutMsg?msg=hi i am a fanoutmag](http://localhost:8081/mq/sendFanoutMsg?msg=hi i am a fanoutmag)
@Controller
@RequestMapping("/mq")
@Slf4j
public class SendMessageController {@Autowiredprivate RabbitMqClient rabbitMqClient;@PostMapping("/sendFanoutMsg")public String sendFanoutMsg(@RequestParam("msg") String msg){try {MessageBody messageBody = new MessageBody();messageBody.setData(msg);rabbitMqClient.send1(messageBody);}catch (Exception e){log.error("sendFanoutMsg error{}", e);}return "send fanout msg success";}
}
结果:控制台收到消息了!!!

3.主题交换机(topic_exchange)
topic_exchange和direct_exchange很像,topic有通配符。direct没有。
-
china.news 代表只关心中国新闻
-
china.weather 代表只关心中国天气
-
japan.news 代表只关心日本的新闻
-
japan.weather 代表只关心日本的天气
controller接口
@PostMapping("/sendTopicMsg")public String sendTopicMsg(@RequestParam("msg") String msg,@RequestParam("type") String type){try {MessageBody messageBody = new MessageBody();messageBody.setData(msg);rabbitMqClient.send3(messageBody,type);}catch (Exception e){log.error("sendTopicMsg error{}", e);}return "send topic msg success";}利用postman测试。
1.msg: “祖国75岁生日快乐”,type:“china.news”

预测:queue1,queue4会接收到消息。

2.msg: “日本大量排核废水,导致哥斯拉出现”,type:“japan.news”

预测:queue2,queue4会接收到消息。

3.msg: “今日日本出现大暴雨,怀疑是哥斯拉来了”,type:“Japan.weather”

预测:queue2,queue3会接收到消息

topic-exchange在代码中如何使用。首先创建交换机,和队列,绑定交换机。
/*============================topic===========================*/@Bean("topicExchange")public TopicExchange topicExchange() {return new TopicExchange(MQConstant.TOPIC_EXCHANGE);}@Bean("queue1")public Queue queue1(){return new Queue(MQConstant.QUEUE1);}@Bean("queue2")public Queue queue2(){return new Queue(MQConstant.QUEUE2);}@Bean("queue3")public Queue queue3(){return new Queue(MQConstant.QUEUE3);}@Bean("queue4")public Queue queue4(){return new Queue(MQConstant.QUEUE4);}@Bean("bingTopicExchange1")public Binding bingTopicExchange1(@Qualifier("queue1") Queue queue1,@Qualifier("topicExchange") TopicExchange topicExchange) {return BindingBuilder.bind(queue1).to(topicExchange).with(MQConstant.CHINA_);}@Bean("bingTopicExchange2")public Binding bingTopicExchange2(@Qualifier("queue2") Queue queue2,@Qualifier("topicExchange") TopicExchange topicExchange) {return BindingBuilder.bind(queue2).to(topicExchange).with(MQConstant.JAPAN_);}@Bean("bingTopicExchange3")public Binding bingTopicExchange3(@Qualifier("queue3") Queue queue3,@Qualifier("topicExchange") TopicExchange topicExchange) {return BindingBuilder.bind(queue3).to(topicExchange).with(MQConstant._WEATHER);}@Bean("bingTopicExchange4")public Binding bingTopicExchange4(@Qualifier("queue4") Queue queue4,@Qualifier("topicExchange") TopicExchange topicExchange) {return BindingBuilder.bind(queue4).to(topicExchange).with(MQConstant._NEWS);}
消息发送
public void send3(MessageBody messageBody,String routingKey) {try{String uuid = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(uuid);rabbitTemplate.convertAndSend(MQConstant.TOPIC_EXCHANGE, routingKey , JSON.toJSONString(messageBody),new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);log.info("message send,{}", message);return message;}},correlationData);log.info("message send successful");}catch (Exception e){log.info("send message error:{}",e);}}
消息接收
@RabbitListener(queues = MQConstant.QUEUE1)public void receive4(String message) {log.info("topic exchange");try {log.info("queue1 receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);log.info("receive message:{}", messageBody.getData());}catch (Exception e){log.error("rabbitmq receive a message error:{}", e);}}@RabbitListener(queues = MQConstant.QUEUE2)public void receive5(String message) {log.info("topic exchange");try {log.info("queue2 receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);log.info("receive message:{}", messageBody.getData());}catch (Exception e){log.error("rabbitmq receive a message error:{}", e);}}@RabbitListener(queues = MQConstant.QUEUE3)public void receive6(String message) {log.info("topic exchange");try {log.info("queue3 receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);log.info("receive message:{}", messageBody.getData());}catch (Exception e){log.error("rabbitmq receive a message error:{}", e);}}@RabbitListener(queues = MQConstant.QUEUE4)public void receive7(String message) {log.info("topic exchange");try {log.info("queue4 receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);log.info("receive message:{}", messageBody.getData());}catch (Exception e){log.error("rabbitmq receive a message error:{}", e);}}
相关文章:
springboot使用rabbitmq
使用springboot创建rabbitMQ的链接。 整个项目结构如下: 1.maven依赖 <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.4.1</version> </dependency>application.y…...
Linux——ext2文件系统(二)
Linux——ext2文件系统 ext2文件系统宏观认识一、磁盘分区与格式化二、块组(Block Group)结构三、文件系统特性 文件名与目录名与inode一、inode的作用原理二、文件与目录名与inode的关系 路径一,路径解析二,路径缓存三࿰…...
动手学深度学习-3.2 线性回归的从0开始
以下是代码的逐段解析及其实际作用: 1. 环境设置与库导入 %matplotlib inline import random import torch from d2l import torch as d2l作用: %matplotlib inline:在 Jupyter Notebook 中内嵌显示 matplotlib 图形。random:生成…...
“深度强化学习揭秘:掌握DQN与PPO算法的精髓“
深度Q网络(Deep Q-Network,简称DQN)是一种结合了Q学习和深度神经网络的强化学习算法。它使用神经网络来近似Q值函数,从而实现对复杂状态空间中的动作选择。DQN的核心思想是通过贝尔曼方程(Bellman Equation)…...
如何让DeepSeek恢复联网功能?解决(由于技术原因,联网搜索暂不可用)
DeekSeek提示:(由于技术原因,联网搜索暂不可用) 众所周知,因为海外黑客的ddos攻击、僵尸网络攻击,deepseek的联网功能一直处于宕机阶段,但是很多问题不联网出来的结果都还是2023年的,…...
Unity-编译构建Android的问题记录
文章目录 报错:AAPT2 aapt2-4.1.2-6503028-osx Daemon #0 Failed to shutdown within timeout报错信息解读:原因分析最终处理方法 报错:AAPT2 aapt2-4.1.2-6503028-osx Daemon #0 Failed to shutdown within timeout 报错信息解读࿱…...
python的ruff简单使用
Ruff 是一个用 Rust 编写的高性能 Python 静态分析工具和代码格式化工具。它旨在提供快速的代码检查和格式化功能,同时支持丰富的配置选项和与现有工具的兼容性。ruff是用rust实现的python Linter&Formatter。 安装: conda install -c conda-forge…...
Docker 部署 GLPI(IT 资产管理软件系统)
GLPI 简介 GLPI open source tool to manage Helpdesk and IT assets GLPI stands for Gestionnaire Libre de Parc Informatique(法语 资讯设备自由软件 的缩写) is a Free Asset and IT Management Software package, that provides ITIL Service De…...
【漫话机器学习系列】077.范数惩罚是如何起作用的(How Norm Penalties Work)
范数惩罚的作用与原理 范数惩罚(Norm Penalty) 是一种常用于机器学习模型中的正则化技术,它的主要目的是控制模型复杂度,防止过拟合。通过对模型的参数进行惩罚(即在损失函数中加入惩罚项),使得…...
【C++ STL】vector容器详解:从入门到精通
【C STL】vector容器详解:从入门到精通 摘要:本文深入讲解C STL中vector容器的使用方法,涵盖常用函数、代码示例及注意事项,助你快速掌握动态数组的核心操作! 一、vector概述 vector是C标准模板库(STL&am…...
LLMs之OpenAI o系列:OpenAI o3-mini的简介、安装和使用方法、案例应用之详细攻略
LLMs之OpenAI o系列:OpenAI o3-mini的简介、安装和使用方法、案例应用之详细攻略 目录 相关文章 LLMs之o3:《Deliberative Alignment: Reasoning Enables Safer Language Models》翻译与解读 LLMs之OpenAI o系列:OpenAI o3-mini的简介、安…...
Notepad++消除生成bak文件
设置(T) ⇒ 首选项... ⇒ 备份 ⇒ 勾选 "禁用" 勾选禁用 就不会再生成bak文件了 notepad怎么修改字符集编码格式为gbk 如图所示...
后台管理系统通用页面抽离=>高阶组件+配置文件+hooks
目录结构 配置文件和通用页面组件 content.config.ts const contentConfig {pageName: "role",header: {title: "角色列表",btnText: "新建角色"},propsList: [{ type: "selection", label: "选择", width: "80px&q…...
Spring Boot项目如何使用MyBatis实现分页查询
写在前面:大家好!我是晴空๓。如果博客中有不足或者的错误的地方欢迎在评论区或者私信我指正,感谢大家的不吝赐教。我的唯一博客更新地址是:https://ac-fun.blog.csdn.net/。非常感谢大家的支持。一起加油,冲鸭&#x…...
[Java]多态
1. 多态的基本概念 1.1 定义: 多态是指同一操作作用于不同的对象时,能够表现出不同的行为。多态通常通过以下两种方式实现: 方法重载(Overloading)方法重写(Overriding) 1.2 示例࿱…...
用Impala对存储在HDFS中的大规模数据集进行快速、实时的交互式SQL查询的具体步骤和关键代码
AWS EMR(Elastic MapReduce)中应用Impala的典型案例,主要体现在大型企业和数据密集型组织如何利用Impala对存储在Hadoop分布式文件系统(HDFS)中的大规模数据集进行快速、实时的交互式SQL查询。以下是一个具体的案例说明…...
Intellij 插件开发-快速开始
目录 一、开发环境搭建以及创建action1. 安装 Plugin DevKit 插件2. 新建idea插件项目3. 创建 Action4. 向新的 Action 表单注册 Action5. Enabling Internal Mode 二、插件实战开发[不推荐]UI Designer 基础JBPanel类(JPanel面板)需求:插件设…...
GIt使用笔记大全
Git 使用笔记大全 1. 安装 Git 在终端或命令提示符中,输入以下命令检查是否已安装 Git: git --version如果未安装,可以从 Git 官方网站 下载并安装适合你操作系统的版本。 2. 配置 Git 首次使用 Git 时,需要配置用户名和邮箱…...
语言月赛 202412【题目名没活了】题解(AC)
》》》点我查看「视频」详解》》》 [语言月赛 202412] 题目名没活了 题目描述 在 XCPC 竞赛里,会有若干道题目,一支队伍可以对每道题目提交若干次。我们称一支队伍对一道题目的一次提交是有效的,当且仅当: 在本次提交以前&…...
MySQL锁类型(详解)
锁的分类图,如下: 锁操作类型划分 读锁 : 也称为共享锁 、英文用S表示。针对同一份数据,多个事务的读操作可以同时进行而不会互相影响,相互不阻塞的。 写锁 : 也称为排他锁 、英文用X表示。当前写操作没有完成前,它会…...
面经--C语言——static,volatile,malloc,使用异或进行数据交换
文章目录 static静态变量和全局变量的区别volatile主要作用 malloc1. 内存分配器的作用2. 内存分配过程(1) 查找空闲内存块(2) 扩展堆空间(3) 元数据 3. 内存释放过程(1) 标记为可用(2) 合并相邻空闲块(3) 延迟释放 4. 内存管理策略(1) 分配缓存(Allocation Caching…...
stm32小白成长为高手的学习步骤和方法
我们假定大家已经对STM32的书籍或者文档有一定的理解。如不理解,请立即阅读STM32的文档,以获取最基本的知识点。STM32单片机自学教程 这篇博文也是一篇不错的入门教程,初学者可以看看,讲的真心不错。 英文好的同学…...
OSCP - Proving Grounds - Roquefort
主要知识点 githook 注入Linux path覆盖 具体步骤 依旧是nmap扫描开始,3000端口不是很熟悉,先看一下 Nmap scan report for 192.168.54.67 Host is up (0.00083s latency). Not shown: 65530 filtered tcp ports (no-response) PORT STATE SERV…...
集合通讯概览
(1)通信的算法 是根据通讯的链路组成的 (2)因为通信链路 跟硬件强相关,所以每个CCL的库都不一样 芯片与芯片、不同U之间是怎么通信的!!!!!! 很重要…...
【贪心算法篇】:“贪心”之旅--算法练习题中的智慧与策略(二)
✨感谢您阅读本篇文章,文章内容是个人学习笔记的整理,如果哪里有误的话还请您指正噢✨ ✨ 个人主页:余辉zmh–CSDN博客 ✨ 文章所属专栏:贪心算法篇–CSDN博客 文章目录 前言例题1.买卖股票的最佳时机2.买卖股票的最佳时机23.k次取…...
oracle: 表分区>>范围分区,列表分区,散列分区/哈希分区,间隔分区,参考分区,组合分区,子分区/复合分区/组合分区
分区表 是将一个逻辑上的大表按照特定的规则划分为多个物理上的子表,这些子表称为分区。 分区可以基于不同的维度,如时间、数值范围、字符串值等,将数据分散存储在不同的分区 中,以提高数据管理的效率和查询性能,同时…...
基于SpringBoot 前端接收中文显示解决方案
一. 问题 返回给前端的的中文值会变成“???” 二. 解决方案 1. 在application.yml修改字符编码 (无效) 在网上看到说修改servlet字符集编码,尝试了不行 server:port: 8083servlet:encoding:charset: UTF-8enabled: trueforce: true2. …...
java练习(5)
ps:题目来自力扣 给你两个 非空 的链表,表示两个非负的整数。它们每位数字都是按照 逆序 的方式存储的,并且每个节点只能存储 一位 数字。 请你将两个数相加,并以相同形式返回一个表示和的链表。 你可以假设除了数字 0 之外,这…...
python算法和数据结构刷题[3]:哈希表、滑动窗口、双指针、回溯算法、贪心算法
回溯算法 「所有可能的结果」,而不是「结果的个数」,一般情况下,我们就知道需要暴力搜索所有的可行解了,可以用「回溯法」。 回溯算法关键在于:不合适就退回上一步。在回溯算法中,递归用于深入到所有可能的分支&…...
大数据数仓实战项目(离线数仓+实时数仓)1
目录 1.课程目标 2.电商行业与电商系统介绍 3.数仓项目整体技术架构介绍 4.数仓项目架构-kylin补充 5.数仓具体技术介绍与项目环境介绍 6.kettle的介绍与安装 7.kettle入门案例 8.kettle输入组件之JSON输入与表输入 9.kettle输入组件之生成记录组件 10.kettle输出组件…...

