当前位置: 首页 > news >正文

中间件-------RabbitMQ

同步和异步

异步调用

MQ

MQ优势:①服务解耦   ②异步调用   ③流量削峰

结构

消息模型

 RabbitMQ入门案例,实现消息发送和消息接收

生产者:
public class PublisherTest {@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.136.132");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("发送消息成功:【" + message + "】");// 5.关闭通道和连接channel.close();connection.close();}
}
消费者:
public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.136.132");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.处理消息String message = new String(body);System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息。。。。");}
}

SpringAMQP

引入依赖

        <!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

普通队列 

 第一步:publisher服务配置文件,发消息

spring:rabbitmq:host: 192.168.136.132port: 5672username: itcastpassword: 123321virtual-host: /
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {//获取RabbitTemplateAPI@Resourceprivate RabbitTemplate rabbitTemplate;@Testpublic void test(){String queueName = "simple.queue";String message = "hello SpringAMQP";//使用API传入队列名和消息即可直接发送rabbitTemplate.convertAndSend(queueName,message);}}

第二步:Consumer服务配置信息监听消息

spring:rabbitmq:host: 192.168.136.132port: 5672username: itcastpassword: 123321virtual-host: /
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;//定义一个监听类去监听消息
@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void ListenSimpleQueue(String msg){System.out.println("msg = " + msg);}
}


Work Queue队列

多个消费者绑定到同一个队列,可以通过prefetch来控制消费者消息预取的数量

第一步: 生产者发送消息

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {//获取RabbitTemplateAPI@Resourceprivate RabbitTemplate rabbitTemplate;@Testpublic void test01() throws InterruptedException {String queueName = "simple.queue";String message = "hello SpringAMQP--";for (int i = 0; i < 50; i++) {//使用API传入队列名和消息即可直接发送rabbitTemplate.convertAndSend(queueName,message+i);Thread.sleep(20);}}}

第二步:消费者设置多个监听消息

@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void ListenWorkQueue(String msg) throws InterruptedException {System.out.println("消费者一接收到消息---- = " + msg + LocalDateTime.now());Thread.sleep(20);}@RabbitListener(queues = "simple.queue")public void ListenWorkQueue01(String msg) throws InterruptedException {System.out.println("消费者二接收到消息---- = " + msg + LocalDateTime.now());Thread.sleep(200);}
}

 第三步:消费者可通过prehtch设置消息预取数量

spring:rabbitmq:host: 192.168.136.132port: 5672username: itcastpassword: 123321virtual-host: /listener:simple:prefetch: 1


发布-订阅模型

Fanout广播交换机 --->多个队列收到交换机的消息

第一步:Consumer声明交换机,队列并进行绑定。
@Configuration
public class FanoutConfig {//声明交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}//声明队列1@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}//绑定队列1到交换机上@Beanpublic Binding fanoutBanding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}//声明队列2@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}//绑定队列2到交换机上@Beanpublic Binding fanoutBanding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
 第二步:Consumer进行监听消息
@Component
public class SpringRabbitListener {@RabbitListener(queues = "fanout.queue1")public void ListenSimpleQueue1(String msg){System.out.println("消费者接收到fanout.queue1的消息 = " + msg);}@RabbitListener(queues = "fanout.queue2")public void ListenSimpleQueue2(String msg){System.out.println("消费者接收到fanout.queue2的消息 = " + msg);}
}
第三步:Publisher向交换机发送消息
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {//获取RabbitTemplateAPI@Resourceprivate RabbitTemplate rabbitTemplate;@Testpublic void testExchange() {//声明交换机名称String exchangeName = "itcast.fanout";//消息String message = "Hello Everyone";//发送消息rabbitTemplate.convertAndSend(exchangeName,"",message);}
}

Direct路由交换机 --->将消息发给指定key的队列

第一步:在Listener中声明队列,交换机以及key
@Component
public class SpringRabbitListener {//声明队列1,交换机以及队列1的bindingKey@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "derict.queue1"),exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void ListenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息 = " + msg);}//声明队列2,交换机以及队列2的bindingKey@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "derict.queue2"),exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void ListenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息 = " + msg);}
}
第二步:向指定key的队列发送消息
    @Testpublic void testDirect() {//声明交换机名称String exchangeName = "itcast.direct";//消息String message = "Hello Blue!!";//发送消息,指定交换机,队列以及要发送的keyrabbitTemplate.convertAndSend(exchangeName,"red",message);}

Topic主题交换机 ---->key必须是多个单词列表,统一主题,支持通配符

 第一步:在Listener中声明队列,交换机以及通配符key
@Component
public class SpringRabbitListener {//声明队列2的交换机,队列以及通配符key@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),key = "china.#"))public void ListenTopicQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息 = " + msg);}//声明队列2的交换机,队列以及通配符key@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),key = "#.news"))public void ListenTopicQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息 = " + msg);}}
第二步:向主题通配符发送消息

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {//获取RabbitTemplateAPI@Resourceprivate RabbitTemplate rabbitTemplate;@Testpublic void testTopic() {//声明交换机名称String exchangeName = "itcast.topic";//消息String message = "Hello China!!";//发送消息rabbitTemplate.convertAndSend(exchangeName,"china.news",message);}
}

消息转换器

 RabbitMQ发的消息体都是Object类型,所有还可以发送对象数据。而且默认的消息转换器是MessageConverter实现的,当使用的是Map数据类型时,就会序列化成很多字节,所以推荐使用JSON的序列化和反序列化,直接修改默认的MessageConverter的类型

        <dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
@Beanpublic MessageConverter messageConverter(){return  new Jackson2JsonMessageConverter();}

对于RabbitMQ高级部分:死信队列,延迟队列,发布确认,幂等性,优先,惰性队列等有时间再学

相关文章:

中间件-------RabbitMQ

同步和异步 异步调用 MQ MQ优势&#xff1a;①服务解耦 ②异步调用 ③流量削峰 结构 消息模型 RabbitMQ入门案例&#xff0c;实现消息发送和消息接收 生产者&#xff1a; public class PublisherTest {Testpublic void testSendMessage() throws IOException, TimeoutExce…...

flink Data Source数据源

flink Data Source数据源 Source 并行度 非并行&#xff1a;并行度只能为1 并行 基于集合的Source fromElements package com.pxj.sx.flink; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.ap…...

网络七层模型与云计算中的网络服务

网络七层模型&#xff0c;也称为OSI&#xff08;Open System Interconnection&#xff09;模型&#xff0c;是由国际标准化组织&#xff08;ISO&#xff09;制定的一个概念性框架&#xff0c;用于描述网络通信过程中信息是如何被封装、传输和解封装的。这一模型将复杂的网络通信…...

word一按空格就换行怎么办?word文本之间添加空格就换行怎么办?

如上图&#xff0c;无法在Connection和con之间添加空格&#xff0c;一按空格就会自动换行。 第一步&#xff1a;选中文本&#xff0c;打开段落。 第二步&#xff1a;点击中文版式&#xff0c;勾选允许西文在单词中间换行。 确定之后就解决一按空格就自动换行啦&#xff01;...

Python 遍历字典的方法,你都掌握了吗

Python中的字典是一种非常灵活的数据结构&#xff0c;它允许通过键来存储和访问值。在处理字典时&#xff0c;经常需要遍历字典中的元素&#xff0c;以下是几种常见的遍历字典的方法。 1. 使用 for 循环直接遍历字典的键 字典的键是唯一的&#xff0c;可以直接通过 for 循环来…...

MySQL 8.4.0 LTS 变更解析:I_S 表、权限、关键字和客户端

↑ 关注“少安事务所”公众号&#xff0c;欢迎⭐收藏&#xff0c;不错过精彩内容~ MySQL 8.4.0 LTS 已经发布 &#xff0c;作为发版模型变更后的第一个长期支持版本&#xff0c;注定要承担未来生产环境的重任&#xff0c;那么这个版本都有哪些新特性、变更&#xff0c;接下来少…...

LeetCode 124 —— 二叉树中的最大路径和

阅读目录 1. 题目2. 解题思路3. 代码实现 1. 题目 2. 解题思路 二叉树的问题首先我们要想想是否能用递归来解决&#xff0c;本题也不例外&#xff0c;而递归的关键是找到子问题。 我们首先来看看一棵最简单的树&#xff0c;也就是示例 1。这样的一棵树总共有六条路径&#xf…...

美甲店会员预约系统管理小程序的作用是什么

女性爱美体现在方方面面&#xff0c;美丽好看的指甲也不能少&#xff0c;市场中美甲店、小摊不少&#xff0c;也跑出了不少连锁品牌&#xff0c;70后到00后&#xff0c;每个层级都有不少潜在客户&#xff0c;商家需要获取和完善转化路径&#xff0c;不断提高品牌影响力与自身内…...

..堆..

堆 堆是完全二叉树&#xff0c;即除了最后一列之外&#xff0c;上面的每一层都是满的&#xff08;左右严格对称且每个节点都满子节点&#xff09; 最后一列从左向右排序。 默认大根堆&#xff1a;每一个节点都大于其左右儿子&#xff0c;根节点就是整个数据结构的最大值 pr…...

【LLM多模态】综述Visual Instruction Tuning towards General-Purpose Multimodal Model

note 文章目录 note论文1. 论文试图解决什么问题2. 这是否是一个新的问题3. 这篇文章要验证一个什么科学假设4. 有哪些相关研究&#xff1f;如何归类&#xff1f;谁是这一课题在领域内值得关注的研究员&#xff1f;5. 论文中提到的解决方案之关键是什么&#xff1f;6. 论文中的…...

探索Linux中的神奇工具:重定向符的妙用

探索Linux中的神奇工具&#xff1a;重定向符的妙用 在Linux系统中&#xff0c;重定向符是一个强大的工具&#xff0c;用于控制命令的输入和输出&#xff0c;实现数据流的定向。本文将详细介绍重定向符的基本用法和一些实用技巧&#xff0c;帮助读者更好地理解和运用这个功能。…...

Kubernetes 文档 / 概念 / 工作负载 / 工作负载管理 / Job

Kubernetes 文档 / 概念 / 工作负载 / 工作负载管理 / Job 此文档从 Kubernetes 官网摘录 中文地址 英文地址 Job 会创建一个或者多个 Pod&#xff0c;并将继续重试 Pod 的执行&#xff0c;直到指定数量的 Pod 成功终止。 随着 Pod 成功结束&#xff0c;Job 跟踪记录成功完成的…...

办公自动化-Python如何提取Word标题并保存到Excel中?

办公自动化-Python如何提取Word标题并保存到Excel中&#xff1f; 应用场景需求分析实现思路实现过程安装依赖库打开需求文件获取word中所有标题去除不需要的标题创建工作簿和工作表分割标题功能名称存入测试对象GN-TC需求标识符存入测试项标识存入需求标识符 完整源码实现效果学…...

基于Java、SpringBoot和uniapp在线考试系统安卓APP和微信小程序

摘要 基于Java、SpringBoot和uniapp的在线考试系统安卓APP微信小程序是一种结合了现代Web开发技术和移动应用技术的解决方案&#xff0c;旨在为教育机构提供一个方便、高效和灵活的在线考试平台。该系统采用Java语言进行后端开发&#xff0c;使用SpringBoot框架简化企业级应用…...

抖音a-bogus加密解析(三)

要补的环境我给提示&#xff0c;大家自行操作&#xff0c;出了问题就是因为缺环境&#xff0c;没补好 window global; // reading _u未定义 window.requestAnimationFrame function () {} // XMLHttpRequest 未定义 window.XMLHttpRequest function () {} window.onwheelx …...

IS-IS DIS

原理概述 OSPF 协议支持4种网络类型&#xff0c; IS-IS 协议只支持两种网络类型&#xff0c;即广播网络和点到点网络。与 OSPF 协议相同&#xff0c; IS-IS 协议在广播网络中会将网络视为一个伪节点( Pseudonode &#xff0c;简称 PSN )&#xff0c;并选举出一台 DIS ( Designa…...

random和range

含义&#xff1a; random(1&#xff0c;10) 不包含10&#xff0c;用于生成随机数。它可以生成浮点数或整数&#xff0c;取决于具体的使用方式。 range(0&#xff0c;1) 不包含1&#xff0c;用于生成一个整数序列。它可以生成一个指定范围内的连续整数序列。 区别在于&#x…...

研二学妹面试字节,竟倒在了ThreadLocal上,这是不要应届生还是不要女生啊?

一、写在开头 今天和一个之前研二的学妹聊天&#xff0c;聊及她上周面试字节的情况&#xff0c;着实感受到了Java后端现在找工作的压力啊&#xff0c;记得在18&#xff0c;19年的时候&#xff0c;研究生计算机专业的学生&#xff0c;背背八股文找个Java开发工作毫无问题&#x…...

Golang:gammazero/deque是一个快速环形缓冲区deque(双端队列)实现

gammazero/deque是一个快速环形缓冲区deque&#xff08;双端队列&#xff09;实现。 文档 https://github.com/gammazero/deque 安装 go get github.com/gammazero/deque代码示例 先入先出队列 package mainimport ("fmt""github.com/gammazero/deque&quo…...

C++ 时间处理-统计函数运行时间

1. 关键词2. 问题3. 解决思路4. 代码实现 4.1. timecount.h4.2. timecount.cpp 5. 测试代码6. 运行结果7. 源码地址 1. 关键词 C 时间处理 统计函数运行时间 跨平台 2. 问题 C如何简单便捷地实现“函数运行时间的统计”功能&#xff1f; 3. 解决思路 类的构造函数&#x…...

第三节课总结

一、计算机中的单位1、比特位&#xff08;bit&#xff09;&#xff1a;一个比特位只能放一个二进制数据&#xff0c;要么0要么12.字节&#xff08;byte&#xff09;&#xff1a;一个字节 8个比特位1024byte 1KB1024KB 1MB1024MB 1GB1024GB 1T1024TB 1PB3.每一种数据类型都可…...

源码级交付的低代码革命:基于 Spring Boot 的 AI 视频中台二次开发实战

引言&#xff1a;从“项目定制”到“产品化”的跨越之痛 作为一名在安防行业摸爬滚打多年的架构师&#xff0c;我深知行业内的一个悖论&#xff1a;客户想要的是“开箱即用”的成熟产品&#xff0c;而现实场景却要求“千企千面”的深度定制。传统的开发模式下&#xff0c;为了满…...

[Refactor]CPP Learn Data Day 信

一、什么是urllib3&#xff1f; urllib3 是一个用于处理 HTTP 请求和连接池的强大、用户友好的 Python 库。 它可以帮助你&#xff1a; 发送各种 HTTP 请求&#xff08;GET, POST, PUT, DELETE等&#xff09;。 管理连接池&#xff0c;提高网络请求效率。 处理重试和重定向。 支…...

ESP-IDF项目中的CMakeLists.txt配置:如何高效管理.c和.h文件

1. 为什么需要高效管理.c和.h文件 在ESP-IDF项目中&#xff0c;随着功能模块不断增加&#xff0c;代码文件会越来越多。想象一下&#xff0c;如果你的项目里有几十个.c文件和对应的.h文件&#xff0c;每次新增或修改文件都要手动调整编译配置&#xff0c;那简直是场噩梦。我刚开…...

8大网盘直链解析工具技术解析:本地化安全下载的终极解决方案

8大网盘直链解析工具技术解析&#xff1a;本地化安全下载的终极解决方案 【免费下载链接】Online-disk-direct-link-download-assistant 一个基于 JavaScript 的网盘文件下载地址获取工具。基于【网盘直链下载助手】修改 &#xff0c;支持 百度网盘 / 阿里云盘 / 中国移动云盘 …...

基于STM32与物联网平台的智能外卖柜系统开发实战

1. 项目背景与需求分析 最近两年&#xff0c;外卖柜突然成了写字楼和社区的标配。作为嵌入式开发者&#xff0c;我注意到传统外卖柜存在几个痛点&#xff1a;取件流程繁琐&#xff08;得输一长串密码&#xff09;、安全性存疑&#xff08;密码容易被偷看&#xff09;、管理不便…...

【AI智能体实战】Dify与MCP服务深度集成:从零构建企业级智能问答系统

1. 为什么选择DifyMCP搭建企业级问答系统 最近两年&#xff0c;企业知识库智能化改造的需求呈现爆发式增长。我经手过的十几个项目中&#xff0c;客户普遍反映传统问答系统存在三个痛点&#xff1a;第一是模型效果不稳定&#xff0c;简单问题能回答但复杂业务逻辑就出错&#x…...

破解重庆企业数据治理困局:基于本地化定制的大数据平台如何构建统一主数据标准

引言 在数字化转型浪潮席卷全国的背景下&#xff0c;重庆作为西部重要的制造业与商贸枢纽&#xff0c;正加速推进“智造重镇”和“智慧名城”建设。然而&#xff0c;众多中大型企业在迈向数据驱动的过程中&#xff0c;普遍面临数据孤岛林立、标准不一、质量低下、合规风险高等核…...

双膜储气柜的选择指南建议

Q1: 如何从公开信息初步判断双膜气柜可靠性与工艺适应性&#xff1f;A1: 可交叉验证以下核心维度&#xff1a;工艺细节&#xff1a;查看是否采用多次焊接成型、全密封处理&#xff0c;是否有泄漏监测、主动泄压等安全设计&#xff1b;环境适配&#xff1a;耐温范围、防冻设计、…...

5分钟搞定:用mkcert为Vue/Uniapp项目快速配置本地HTTPS(附常见问题排查)

前端开发者必备&#xff1a;5分钟为Vue/Uniapp项目配置本地HTTPS全指南 现代前端开发中&#xff0c;越来越多的浏览器API要求运行在HTTPS环境下才能正常工作&#xff0c;比如摄像头访问、地理位置获取、Service Worker等。这给本地开发带来了不小的挑战——我们既需要HTTPS环境…...