当前位置: 首页 > article >正文

springboot使用rabbitmq

使用springboot创建rabbitMQ的链接。

整个项目结构如下:

img

1.maven依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.4.1</version>
</dependency>

application.yaml的配置如下

spring:application:name: rabbitMQrabbitmq:host: 192.168.142.128  #rabbitmq的主机名port: 5672			   #端口,默认5672username: itheima      #rabbitmq的账号password: 123321	   #密码
server:port: 8081               #项目启动端口

2.创建rabbitMQ配置类 – RabbitConfig。

@Configuration
@Slf4j
public class RabbitConfig {@Bean("directExchange")public DirectExchange directExchange() {return new DirectExchange(MQConstant.DIRECT_EXCHANGE);}@Bean("directQueue")public Queue directQueue() {return new Queue(MQConstant.DIRECT_QUEUE);}@Bean("bindingDirectExchange")public Binding bindingDirectExchange(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue") Queue directQueue) {return BindingBuilder.bind(directQueue).to(directExchange).with(MQConstant.ROUTING_KEY);}}

3.创建RabbitMQ客户端类,主要是用来发送消息用的。

@Component
@Slf4j
public class RabbitMqClient {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(MessageBody messageBody){try{String uuid = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(uuid);rabbitTemplate.convertAndSend(MQConstant.DIRECT_EXCHANGE, MQConstant.ROUTING_KEY, JSON.toJSONString(messageBody),new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);log.info("message send,{}", message);return message;}},correlationData);log.info("message send successful");}catch (Exception e){log.info("send message error:{}",e);}}
}

4.创建接收消息类 —RabbitMqServer

@Component
@Slf4j
public class RabbitMqServer {@RabbitListener(queues = MQConstant.DIRECT_QUEUE)public void receive(String message) {try {log.info("receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);switch (messageBody.getTopic()) {case QueueTopic.USER_LOGIN:User user = JSON.parseObject(messageBody.getData(), User.class);log.info("receive user:{}", user);break;default:log.info("no need hanndle message:{},topic:{}", message, messageBody.getTopic());break;}}catch (Exception e){log.error("rabbitmq receive message error:{}", e);}}
}

5.有了以上准备后就可以开始向mq里面发送消息了,在单元测试编写测试代码。

@SpringBootTest(classes = RabbitMqApplication.class)
class RabbitMqApplicationTests {@Autowiredprivate RabbitMqClient rabbitMqClient;@Testvoid testDirectSend() {//数据User user = new User();user.setId(123);user.setName("Lewin-jie2");user.setPassword("123");MessageBody messageBody = new MessageBody();messageBody.setData(JSON.toJSONString(user));long time = new Date().getTime();messageBody.setSendTime(time);//添加主题messageBody.setTopic(QueueTopic.USER_LOGIN);rabbitMqClient.send(messageBody);}}

6.运行后,可以看到后台的日志,证明我们消息发送已经成功了。

image-20241109151751035

我们打开rabbitmq的控制台(http://你的主机名:15672),可以开到队列里面也收到了消息,但是还没有被消费。

image-20241109152401671

以上出现结果就证明rabbit已经是配置好了。那么我们来了解一下啊rabbitmq

简介:rabbitmq是基于amqp协议,用elang语言开发的一个高级的消息队列,以高性能,高可靠,高吞吐量而被大量应用到应用系统作为第三方消息中间件使用,为应用系统实现应用解耦削峰减流,异步消息

rabbitmq主要构造有,producter,consumer,exchange,queue组成

1.直连交换机(direct_exchange)。

刚刚配置的时候就是演示的producter发消息到直连交换机,然后再发送到queue中的过程。

2.广播交换机(fanout_exchange).

顾名思义,就是绑定该交换机的所有队列都可以收到这个交换机的消息

	@Bean("fanoutExchange")public FanoutExchange fanoutExchange() {return new FanoutExchange(MQConstant.FANOUT_EXCHANGE);}@Bean("aQueue")public Queue aQueue(){return new Queue(MQConstant.FANOUT_QUEUE_A);}@Bean("bQueue")public Queue bQueue(){return new Queue(MQConstant.FANOUT_QUEUE_B);}@Bean("cQueue")public Queue cQueue(){return new Queue(MQConstant.FANOUT_QUEUE_C);}/*** 绑定队列aQueue bQueue cQueue*/@Bean("bindingFanoutExchange1")public Binding bindingFanoutExchange1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("aQueue") Queue aQueue){return BindingBuilder.bind(aQueue).to(fanoutExchange);}@Bean("bindingFanoutExchange2")public Binding bindingFanoutExchange2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("bQueue") Queue bQueue){return BindingBuilder.bind(bQueue).to(fanoutExchange);}@Bean("bindingFanoutExchange3")public Binding bindingFanoutExchange3(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("cQueue") Queue cQueue){return BindingBuilder.bind(cQueue).to(fanoutExchange);}

编写controller类,再postman上面测试[http://localhost:8081/mq/sendFanoutMsg?msg=hi i am a fanoutmag](http://localhost:8081/mq/sendFanoutMsg?msg=hi i am a fanoutmag)

@Controller
@RequestMapping("/mq")
@Slf4j
public class SendMessageController {@Autowiredprivate RabbitMqClient rabbitMqClient;@PostMapping("/sendFanoutMsg")public String sendFanoutMsg(@RequestParam("msg") String msg){try {MessageBody messageBody = new MessageBody();messageBody.setData(msg);rabbitMqClient.send1(messageBody);}catch (Exception e){log.error("sendFanoutMsg error{}", e);}return "send fanout msg success";}
}

结果:控制台收到消息了!!!

image-20241109213739205

3.主题交换机(topic_exchange)

topic_exchange和direct_exchange很像,topic有通配符。direct没有。

image-20241109214721827
  1. china.news 代表只关心中国新闻

  2. china.weather 代表只关心中国天气

  3. japan.news 代表只关心日本的新闻

  4. japan.weather 代表只关心日本的天气

    controller接口

    @PostMapping("/sendTopicMsg")public String sendTopicMsg(@RequestParam("msg") String msg,@RequestParam("type") String type){try {MessageBody messageBody = new MessageBody();messageBody.setData(msg);rabbitMqClient.send3(messageBody,type);}catch (Exception e){log.error("sendTopicMsg error{}", e);}return "send topic msg success";}
    

    利用postman测试。

    1.msg: “祖国75岁生日快乐”,type:“china.news”

    image-20241110102452558

    预测:queue1,queue4会接收到消息。

    image-20241110102659758

    2.msg: “日本大量排核废水,导致哥斯拉出现”,type:“japan.news”

    image-20241110102738974

    预测:queue2,queue4会接收到消息。

    image-20241110103638277

    3.msg: “今日日本出现大暴雨,怀疑是哥斯拉来了”,type:“Japan.weather”

    image-20241110103727452

    预测:queue2,queue3会接收到消息

    image-20241110103839382

topic-exchange在代码中如何使用。首先创建交换机,和队列,绑定交换机。

/*============================topic===========================*/@Bean("topicExchange")public TopicExchange topicExchange() {return new TopicExchange(MQConstant.TOPIC_EXCHANGE);}@Bean("queue1")public Queue queue1(){return new Queue(MQConstant.QUEUE1);}@Bean("queue2")public Queue queue2(){return new Queue(MQConstant.QUEUE2);}@Bean("queue3")public Queue queue3(){return new Queue(MQConstant.QUEUE3);}@Bean("queue4")public Queue queue4(){return new Queue(MQConstant.QUEUE4);}@Bean("bingTopicExchange1")public Binding bingTopicExchange1(@Qualifier("queue1") Queue queue1,@Qualifier("topicExchange") TopicExchange topicExchange) {return BindingBuilder.bind(queue1).to(topicExchange).with(MQConstant.CHINA_);}@Bean("bingTopicExchange2")public Binding bingTopicExchange2(@Qualifier("queue2") Queue queue2,@Qualifier("topicExchange") TopicExchange topicExchange) {return BindingBuilder.bind(queue2).to(topicExchange).with(MQConstant.JAPAN_);}@Bean("bingTopicExchange3")public Binding bingTopicExchange3(@Qualifier("queue3") Queue queue3,@Qualifier("topicExchange") TopicExchange topicExchange) {return BindingBuilder.bind(queue3).to(topicExchange).with(MQConstant._WEATHER);}@Bean("bingTopicExchange4")public Binding bingTopicExchange4(@Qualifier("queue4") Queue queue4,@Qualifier("topicExchange") TopicExchange topicExchange) {return BindingBuilder.bind(queue4).to(topicExchange).with(MQConstant._NEWS);}

消息发送

 public void send3(MessageBody messageBody,String routingKey) {try{String uuid = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(uuid);rabbitTemplate.convertAndSend(MQConstant.TOPIC_EXCHANGE, routingKey , JSON.toJSONString(messageBody),new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);log.info("message send,{}", message);return message;}},correlationData);log.info("message send successful");}catch (Exception e){log.info("send message error:{}",e);}}

消息接收

@RabbitListener(queues = MQConstant.QUEUE1)public void receive4(String message) {log.info("topic exchange");try {log.info("queue1 receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);log.info("receive message:{}", messageBody.getData());}catch (Exception e){log.error("rabbitmq receive a message error:{}", e);}}@RabbitListener(queues = MQConstant.QUEUE2)public void receive5(String message) {log.info("topic exchange");try {log.info("queue2 receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);log.info("receive message:{}", messageBody.getData());}catch (Exception e){log.error("rabbitmq receive a message error:{}", e);}}@RabbitListener(queues = MQConstant.QUEUE3)public void receive6(String message) {log.info("topic exchange");try {log.info("queue3 receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);log.info("receive message:{}", messageBody.getData());}catch (Exception e){log.error("rabbitmq receive a message error:{}", e);}}@RabbitListener(queues = MQConstant.QUEUE4)public void receive7(String message) {log.info("topic exchange");try {log.info("queue4 receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);log.info("receive message:{}", messageBody.getData());}catch (Exception e){log.error("rabbitmq receive a message error:{}", e);}}

相关文章:

springboot使用rabbitmq

使用springboot创建rabbitMQ的链接。 整个项目结构如下&#xff1a; 1.maven依赖 <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.4.1</version> </dependency>application.y…...

Linux——ext2文件系统(二)

Linux——ext2文件系统 ext2文件系统宏观认识一、磁盘分区与格式化二、块组&#xff08;Block Group&#xff09;结构三、文件系统特性 文件名与目录名与inode一、inode的作用原理二、文件与目录名与inode的关系 路径一&#xff0c;路径解析二&#xff0c;路径缓存三&#xff0…...

动手学深度学习-3.2 线性回归的从0开始

以下是代码的逐段解析及其实际作用&#xff1a; 1. 环境设置与库导入 %matplotlib inline import random import torch from d2l import torch as d2l作用&#xff1a; %matplotlib inline&#xff1a;在 Jupyter Notebook 中内嵌显示 matplotlib 图形。random&#xff1a;生成…...

“深度强化学习揭秘:掌握DQN与PPO算法的精髓“

深度Q网络&#xff08;Deep Q-Network&#xff0c;简称DQN&#xff09;是一种结合了Q学习和深度神经网络的强化学习算法。它使用神经网络来近似Q值函数&#xff0c;从而实现对复杂状态空间中的动作选择。DQN的核心思想是通过贝尔曼方程&#xff08;Bellman Equation&#xff09…...

如何让DeepSeek恢复联网功能?解决(由于技术原因,联网搜索暂不可用)

DeekSeek提示&#xff1a;&#xff08;由于技术原因&#xff0c;联网搜索暂不可用&#xff09; 众所周知&#xff0c;因为海外黑客的ddos攻击、僵尸网络攻击&#xff0c;deepseek的联网功能一直处于宕机阶段&#xff0c;但是很多问题不联网出来的结果都还是2023年的&#xff0c…...

Unity-编译构建Android的问题记录

文章目录 报错&#xff1a;AAPT2 aapt2-4.1.2-6503028-osx Daemon #0 Failed to shutdown within timeout报错信息解读&#xff1a;原因分析最终处理方法 报错&#xff1a;AAPT2 aapt2-4.1.2-6503028-osx Daemon #0 Failed to shutdown within timeout 报错信息解读&#xff1…...

python的ruff简单使用

Ruff 是一个用 Rust 编写的高性能 Python 静态分析工具和代码格式化工具。它旨在提供快速的代码检查和格式化功能&#xff0c;同时支持丰富的配置选项和与现有工具的兼容性。ruff是用rust实现的python Linter&Formatter。 安装&#xff1a; conda install -c conda-forge…...

Docker 部署 GLPI(IT 资产管理软件系统)

GLPI 简介 GLPI open source tool to manage Helpdesk and IT assets GLPI stands for Gestionnaire Libre de Parc Informatique&#xff08;法语 资讯设备自由软件 的缩写&#xff09; is a Free Asset and IT Management Software package, that provides ITIL Service De…...

【漫话机器学习系列】077.范数惩罚是如何起作用的(How Norm Penalties Work)

范数惩罚的作用与原理 范数惩罚&#xff08;Norm Penalty&#xff09; 是一种常用于机器学习模型中的正则化技术&#xff0c;它的主要目的是控制模型复杂度&#xff0c;防止过拟合。通过对模型的参数进行惩罚&#xff08;即在损失函数中加入惩罚项&#xff09;&#xff0c;使得…...

【C++ STL】vector容器详解:从入门到精通

【C STL】vector容器详解&#xff1a;从入门到精通 摘要&#xff1a;本文深入讲解C STL中vector容器的使用方法&#xff0c;涵盖常用函数、代码示例及注意事项&#xff0c;助你快速掌握动态数组的核心操作&#xff01; 一、vector概述 vector是C标准模板库&#xff08;STL&am…...

LLMs之OpenAI o系列:OpenAI o3-mini的简介、安装和使用方法、案例应用之详细攻略

LLMs之OpenAI o系列&#xff1a;OpenAI o3-mini的简介、安装和使用方法、案例应用之详细攻略 目录 相关文章 LLMs之o3&#xff1a;《Deliberative Alignment: Reasoning Enables Safer Language Models》翻译与解读 LLMs之OpenAI o系列&#xff1a;OpenAI o3-mini的简介、安…...

Notepad++消除生成bak文件

设置(T) ⇒ 首选项... ⇒ 备份 ⇒ 勾选 "禁用" 勾选禁用 就不会再生成bak文件了 notepad怎么修改字符集编码格式为gbk 如图所示...

后台管理系统通用页面抽离=>高阶组件+配置文件+hooks

目录结构 配置文件和通用页面组件 content.config.ts const contentConfig {pageName: "role",header: {title: "角色列表",btnText: "新建角色"},propsList: [{ type: "selection", label: "选择", width: "80px&q…...

Spring Boot项目如何使用MyBatis实现分页查询

写在前面&#xff1a;大家好&#xff01;我是晴空๓。如果博客中有不足或者的错误的地方欢迎在评论区或者私信我指正&#xff0c;感谢大家的不吝赐教。我的唯一博客更新地址是&#xff1a;https://ac-fun.blog.csdn.net/。非常感谢大家的支持。一起加油&#xff0c;冲鸭&#x…...

[Java]多态

1. 多态的基本概念 1.1 定义&#xff1a; 多态是指同一操作作用于不同的对象时&#xff0c;能够表现出不同的行为。多态通常通过以下两种方式实现&#xff1a; 方法重载&#xff08;Overloading&#xff09;方法重写&#xff08;Overriding&#xff09; 1.2 示例&#xff1…...

用Impala对存储在HDFS中的大规模数据集进行快速、实时的交互式SQL查询的具体步骤和关键代码

AWS EMR&#xff08;Elastic MapReduce&#xff09;中应用Impala的典型案例&#xff0c;主要体现在大型企业和数据密集型组织如何利用Impala对存储在Hadoop分布式文件系统&#xff08;HDFS&#xff09;中的大规模数据集进行快速、实时的交互式SQL查询。以下是一个具体的案例说明…...

Intellij 插件开发-快速开始

目录 一、开发环境搭建以及创建action1. 安装 Plugin DevKit 插件2. 新建idea插件项目3. 创建 Action4. 向新的 Action 表单注册 Action5. Enabling Internal Mode 二、插件实战开发[不推荐]UI Designer 基础JBPanel类&#xff08;JPanel面板&#xff09;需求&#xff1a;插件设…...

GIt使用笔记大全

Git 使用笔记大全 1. 安装 Git 在终端或命令提示符中&#xff0c;输入以下命令检查是否已安装 Git&#xff1a; git --version如果未安装&#xff0c;可以从 Git 官方网站 下载并安装适合你操作系统的版本。 2. 配置 Git 首次使用 Git 时&#xff0c;需要配置用户名和邮箱…...

语言月赛 202412【题目名没活了】题解(AC)

》》》点我查看「视频」详解》》》 [语言月赛 202412] 题目名没活了 题目描述 在 XCPC 竞赛里&#xff0c;会有若干道题目&#xff0c;一支队伍可以对每道题目提交若干次。我们称一支队伍对一道题目的一次提交是有效的&#xff0c;当且仅当&#xff1a; 在本次提交以前&…...

MySQL锁类型(详解)

锁的分类图&#xff0c;如下&#xff1a; 锁操作类型划分 读锁 : 也称为共享锁 、英文用S表示。针对同一份数据&#xff0c;多个事务的读操作可以同时进行而不会互相影响&#xff0c;相互不阻塞的。 写锁 : 也称为排他锁 、英文用X表示。当前写操作没有完成前&#xff0c;它会…...

面经--C语言——static,volatile,malloc,使用异或进行数据交换

文章目录 static静态变量和全局变量的区别volatile主要作用 malloc1. 内存分配器的作用2. 内存分配过程(1) 查找空闲内存块(2) 扩展堆空间(3) 元数据 3. 内存释放过程(1) 标记为可用(2) 合并相邻空闲块(3) 延迟释放 4. 内存管理策略(1) 分配缓存&#xff08;Allocation Caching…...

stm32小白成长为高手的学习步骤和方法

我们假定大家已经对STM32的书籍或者文档有一定的理解。如不理解&#xff0c;请立即阅读STM32的文档&#xff0c;以获取最基本的知识点。STM32单片机自学教程 这篇博文也是一篇不错的入门教程&#xff0c;初学者可以看看&#xff0c;讲的真心不错。 英文好的同学&#xf…...

OSCP - Proving Grounds - Roquefort

主要知识点 githook 注入Linux path覆盖 具体步骤 依旧是nmap扫描开始&#xff0c;3000端口不是很熟悉&#xff0c;先看一下 Nmap scan report for 192.168.54.67 Host is up (0.00083s latency). Not shown: 65530 filtered tcp ports (no-response) PORT STATE SERV…...

集合通讯概览

&#xff08;1&#xff09;通信的算法 是根据通讯的链路组成的 &#xff08;2&#xff09;因为通信链路 跟硬件强相关&#xff0c;所以每个CCL的库都不一样 芯片与芯片、不同U之间是怎么通信的&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01; 很重要…...

【贪心算法篇】:“贪心”之旅--算法练习题中的智慧与策略(二)

✨感谢您阅读本篇文章&#xff0c;文章内容是个人学习笔记的整理&#xff0c;如果哪里有误的话还请您指正噢✨ ✨ 个人主页&#xff1a;余辉zmh–CSDN博客 ✨ 文章所属专栏&#xff1a;贪心算法篇–CSDN博客 文章目录 前言例题1.买卖股票的最佳时机2.买卖股票的最佳时机23.k次取…...

oracle: 表分区>>范围分区,列表分区,散列分区/哈希分区,间隔分区,参考分区,组合分区,子分区/复合分区/组合分区

分区表 是将一个逻辑上的大表按照特定的规则划分为多个物理上的子表&#xff0c;这些子表称为分区。 分区可以基于不同的维度&#xff0c;如时间、数值范围、字符串值等&#xff0c;将数据分散存储在不同的分区 中&#xff0c;以提高数据管理的效率和查询性能&#xff0c;同时…...

基于SpringBoot 前端接收中文显示解决方案

一. 问题 返回给前端的的中文值会变成“???” 二. 解决方案 1. 在application.yml修改字符编码 &#xff08;无效&#xff09; 在网上看到说修改servlet字符集编码&#xff0c;尝试了不行 server:port: 8083servlet:encoding:charset: UTF-8enabled: trueforce: true2. …...

java练习(5)

ps:题目来自力扣 给你两个 非空 的链表&#xff0c;表示两个非负的整数。它们每位数字都是按照 逆序 的方式存储的&#xff0c;并且每个节点只能存储 一位 数字。 请你将两个数相加&#xff0c;并以相同形式返回一个表示和的链表。 你可以假设除了数字 0 之外&#xff0c;这…...

python算法和数据结构刷题[3]:哈希表、滑动窗口、双指针、回溯算法、贪心算法

回溯算法 「所有可能的结果」&#xff0c;而不是「结果的个数」&#xff0c;一般情况下&#xff0c;我们就知道需要暴力搜索所有的可行解了&#xff0c;可以用「回溯法」。 回溯算法关键在于:不合适就退回上一步。在回溯算法中&#xff0c;递归用于深入到所有可能的分支&…...

大数据数仓实战项目(离线数仓+实时数仓)1

目录 1.课程目标 2.电商行业与电商系统介绍 3.数仓项目整体技术架构介绍 4.数仓项目架构-kylin补充 5.数仓具体技术介绍与项目环境介绍 6.kettle的介绍与安装 7.kettle入门案例 8.kettle输入组件之JSON输入与表输入 9.kettle输入组件之生成记录组件 10.kettle输出组件…...