当前位置: 首页 > news >正文

RabbitMQ学习整理————基于RabbitMQ实现RPC

基于RabbitMQ实现RPC

  • 前言
  • 什么是RPC
  • RabbitMQ如何实现RPC
  • RPC简单示例
  • 通过Spring AMQP实现RPC

前言

这边参考了RabbitMQ的官网,想整理一篇关于RabbitMQ实现RPC调用的博客,打算把两种实现RPC调用的都整理一下,一个是使用官方提供的一个Java client,还有一个是Spring AMQP的整合使用。
代码路径:https://github.com/yzh19961031/blogDemo/tree/master/rabbitmq

什么是RPC

RPC是远程过程调用(Remote Procedure Call)的缩写形式,简单说就是一个节点去请求另一个节点上面的服务并获得响应结果。
我们之前总结的工作模式都是发送消息到指定的队列,再由相关的消费者进行消费,如果存在这样的场景,比如消费者消费完消息需给生产者一个具体的响应,然后生产者再根据这个响应进行其他的业务逻辑,这样就需要使用到RabbitMQ提供的RPC能力。

RabbitMQ如何实现RPC

官方有很详细的介绍文档,这边贴一下地址:https://www.rabbitmq.com/tutorials/tutorial-six-java.html
RabbitMQ实现RPC很简单,正常的流程就是请求以及响应,我们只需要在请求的消息的属性里面添加一个响应队列的地址,这边需要使用到一个BasicProperties这个类。具体配置如下:

// 指定一个回调队列
callbackQueueName = channel.queueDeclare().getQueue();
// 设置replyTo的属性为指定的回调队列
BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName).build();channel.basicPublish("", "rpc_queue", props, message.getBytes());

BasicProperties这个类中提供了很多的属性,有14个,很多基本上很少用到,常用的就是几个,我这边也贴一下,其实在我上一篇文章中基于RabbitMQ实现的一个RPC工具里面都有用到这些属性。

  1. contentType 这个属性用来表明消息的类型,默认是"application/octet-stream"这种流的类型,还有常用的比如"application/json","text/plain"等,这些在我的RPC工具里面都有用到。
  2. replyTo 这个就是上面指定的回调队列。
  3. correlationId 这个id可以用来进行消息的确认,将相应与请求相关联。主要是可以确认服务端收到的消息是不是指定客户端发过来的,用于确认。

首先先贴一张官方提供的图,这个是RabbitMQ实现RPC的主要工作流程:
在这里插入图片描述
实现RPC的具体工作流程:

  1. 首先客户端发送一个请求消息,这个请求消息里面有两个属性,一个是replyTo回调队列的地址,一个是correlationId用于标识当前消息唯一的id信息。
  2. 这个消息是发送到指定的rpc_queue这个队列上面。
  3. 对应我们的服务端Server就会等待rpc_queue上面的请求消息,当请求消息来得时候,服务端会进行处理,处理完成会将相应的消息再发送到请求消息属性中的replyTo回调的队列上面。
  4. 客户端发送消息之后,会等待replyTo队列中的消息。当有消息来得时候,会检查响应消息中correlationId属性和请求消息中correlationId是否一致,完成一次PRC调用。

RPC简单示例

我这边根据官网上面提供的例子简单修改整理了一下,这边提供一个大小写转换的功能,就是客户端发送一段小写的字符串,服务端将字符串转为大写再响应过来。详细逻辑可以看下代码中注释,具体代码如下:
首先服务端:

/*** RPC服务端** @author yuanzhihao* @since 2020/11/21*/
public class RPCServer {public static void main(String[] args) throws IOException, TimeoutException {// 首先还是正常获得connection以及channel对象ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.1.108");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();// 定义一个rpc的队列String queueName = "test_rpc";channel.queueDeclare(queueName, false, false, false, null);Object monitor = new Object();// 具体的消费代码里面实现DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 消费者将请求消息中的correlationId信息再作为响应传回replyTo队列AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(delivery.getProperties().getCorrelationId()).build();String response = "";try {// 提供一个大小写转换的方法String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("toUpperCase(" + message + ")");response = toUpperCase(message);} catch (RuntimeException e) {System.out.println(e.toString());} finally {// 将响应传回replyTo队列channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8));// 设置了手动应答 需要手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);// 执行完成会释放主线程的锁// RabbitMq consumer worker thread notifies the RPC server owner threadsynchronized (monitor) {monitor.notify();}}};// 监听"test_rpc"队列channel.basicConsume(queueName, false, deliverCallback, (consumerTag -> { }));// 这个锁对象是确保我们server的调用逻辑执行完成 首先挂起主线程// Wait and be prepared to consume the message from RPC client.while (true) {synchronized (monitor) {try {monitor.wait();} catch (InterruptedException e) {e.printStackTrace();}}}}// 提供一个大小写转换的方法private static String toUpperCase(String msg) {return msg.toUpperCase();}
}

客户端:

/*** RPC客户端** @author yuanzhihao* @since 2020/11/21*/public class RPCClient {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {// 创建connection以及channel对象ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.1.108");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");try ( Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel()) {// 声明一个队列String queueName = "test_rpc";// 请求消息中需要带一个唯一标识ID String corrId = UUID.randomUUID().toString();// 声明一个回调队列String replayQueueName = channel.queueDeclare().getQueue();// 将correlationId以及回调队列设置在消息的属性中AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replayQueueName).build();// 具体消息内容String msg = "hello rpc";// 发送请求消息channel.basicPublish("",queueName,properties,msg.getBytes());// 设置一个阻塞队列  等待服务端的响应final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);String ctag = channel.basicConsume(replayQueueName, true, (consumerTag, message) -> {// 注意 这边根据correlationId进行下判断if (message.getProperties().getCorrelationId().equals(corrId)) {response.offer(new String(message.getBody(), StandardCharsets.UTF_8));}}, consumerTag -> {});// 获取响应结果String take = response.take();System.out.println("rpc result is "+ take);channel.basicCancel(ctag);}}
}

执行代码,具体的客户端与服务端运行结果在这里插入图片描述 在这里插入图片描述

通过Spring AMQP实现RPC

通过Spring来实现RPC也很简单,主要通过spring提供的一个RabbitTemplate对象中sendAndReceive方法来实现,这个方法是发送消息然后一直等待响应。监听器里面实现的和之前的逻辑大致相同,都需要将response响应消息发送到对应的replyTo回调队列上。下面直接贴一下代码。
首先是服务端,我这边直接是使用配置类的形式,具体一些的配置项可以参考下我之前的那篇博客或者上网搜一下~

/*** 主配置类** @author yuanzhihao* @since 2021/1/9*/
@Configuration
public class RabbitMQConfig {private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);// 注入connectionFactory对象@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setAddresses("192.168.1.108:5672");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setVirtualHost("/");return connectionFactory;}// 声明队列@Beanpublic Queue rpcQueue() {return new Queue("test_rpc",false);}@Beanpublic RabbitTemplate rabbitTemplate() {return new RabbitTemplate(connectionFactory());}// 创建初始化RabbitAdmin对象@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}// 消息监听器@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer(RabbitTemplate rabbitTemplate) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());// 监听的队列container.setQueues(rpcQueue());MessageListener messageListener = message -> {String receiveMsg = new String(message.getBody(), StandardCharsets.UTF_8);log.info("Receive a message message is {}", receiveMsg);// 执行对应逻辑String responseMsg = toUpperCase(receiveMsg);MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setCorrelationId(message.getMessageProperties().getCorrelationId()).build();// 响应消息 这边就是如果没有绑定交换机和队列的话 消息应该直接传到对应的队列上面rabbitTemplate.send("", message.getMessageProperties().getReplyTo(), new Message(responseMsg.getBytes(StandardCharsets.UTF_8), messageProperties));};// 设置监听器container.setMessageListener(messageListener);return container;}// 提供一个大小写转换的方法private String toUpperCase(String msg) {return msg.toUpperCase();}
}

客户端我采用test单元测试的形式

/*** spring amqp rpc 测试类** @author yuanzhihao* @since 2021/1/9*/
@ContextConfiguration(classes = {RabbitMQConfig.class})
@RunWith(SpringRunner.class)
public class RabbitMQRpcTest {private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);@Autowiredprivate RabbitTemplate rabbitTemplate;// 测试RPC客户端@Testpublic void testRpcClient() {// 设置correlationIdString corrId = UUID.randomUUID().toString();String msg = "hello rpc";MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setCorrelationId(corrId).build();// 注意 这边如果使用sendAndReceive不指定replyTo回调队列 spring会默认帮我们添加一个回调队列// 格式默认 "amq.rabbitmq.reply-to" 前缀Message message = rabbitTemplate.sendAndReceive("", "test_rpc", new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties));log.info("The response is {}", new String(message.getBody(), StandardCharsets.UTF_8));}
}

具体实现可以看下代码的注释
代码执行结果:
在这里插入图片描述

相关文章:

RabbitMQ学习整理————基于RabbitMQ实现RPC

基于RabbitMQ实现RPC 前言什么是RPCRabbitMQ如何实现RPCRPC简单示例通过Spring AMQP实现RPC 前言 这边参考了RabbitMQ的官网&#xff0c;想整理一篇关于RabbitMQ实现RPC调用的博客&#xff0c;打算把两种实现RPC调用的都整理一下&#xff0c;一个是使用官方提供的一个Java cli…...

Linux-基础知识(黑马学习笔记)

硬件和软件 我们所熟知的计算机是由&#xff1a;硬件和软件组成。 硬件&#xff1a;计算机系统中电子&#xff0c;机械和光电元件等组成的各种物理装置的总称。 软件&#xff1a;是用户和计算机硬件之间的接口和桥梁&#xff0c;用户通过软件与计算机进行交流。 而操作系统…...

SpringBoot项目启动报java.nio.charset.MalformedInputException Input length = 1解决方案

报错详情 SpringBoot启动报错java.nio.charset.MalformedInputException: Input length 1 报错原因 出现这个的原因&#xff0c;就是解析yml文件时&#xff0c;中文字符集不是utf-8的原因&#xff0c;这是maven在项目编译时&#xff0c;默认字符集编码是GBK。 解决方式 检…...

【Unity2019.4.35f1】配置JDK、NDK、SDK、Gradle

目录 JDK NDK SDK 环境变量 Gradle JDK JDK&#xff1a;jdk-1.8版本Java Downloads | Oracle 下载要登录&#xff0c;搜索JDK下载公用账号&#xff1a;Oracle官网 JDK下载 注册登录公共账号和密码_oracle下载账号-CSDN博客 路径&#xff1a;C:\Program Files\Java\jd…...

MySQL中的高级查询

通过条件查询可以查询到符合条件的数据&#xff0c;但如同要实现对字段的值进行计算、根据一个或多个字段对查询结果进行分组等操作时&#xff0c;就需要使用更高级的查询&#xff0c;MySQL提供了聚合函数、分组查询、排序查询、限量查询、内置函数以实现更复杂的查询需求。接下…...

leetcode383赎金信

用字符数组ch来记录magazine每个字母出现频率&#xff0c;用ransomNote的字母减去字符数组ch对应的字符出现频率&#xff0c;如果该字符对应的频率小于0&#xff0c;则不够&#xff0c;无法组成ransomNote&#xff01; class Solution { public:bool canConstruct(string rans…...

【Unity3D】ASE制作天空盒

找到官方shader并分析 下载对应资源包找到\DefaultResourcesExtra\Skybox-Cubed.shader找到\CGIncludes\UnityCG.cginc观察变量, 观察tag, 观察代码 需要注意的内容 ASE要处理的内容 核心修改 添加一个Custom Expression节点 code内容为: return DecodeHDR(In0, In1);outp…...

MyBatisPlus常用注解

目录 一、TableName 二、TableId 三、TableField 四、TableLogic 一、TableName 在使用MyBatis-Plus实现基本的CRUD时&#xff0c;我们并没有指定要操作的表&#xff0c;只是在Mapper接口继承BaseMapper时&#xff0c;设置了泛型User&#xff0c;而操作的表为user表 由此得出…...

Putty中运行matlab文件

首先使用命令 cd /home/ya/CodeTest/Matlab进入路径&#xff1a;到Matlab文件夹下 然后键入matlab&#xff0c;进入matlab环境&#xff0c;如果main.m文件在Matlab文件夹下&#xff0c;直接键入main即可运行该文件。细节代码如下&#xff1a; Unable to use key file "y…...

ES6 | (一)ES6 新特性(上) | 尚硅谷Web前端ES6教程

文章目录 &#x1f4da;ES6新特性&#x1f4da;let关键字&#x1f4da;const关键字&#x1f4da;变量的解构赋值&#x1f4da;模板字符串&#x1f4da;简化对象写法&#x1f4da;箭头函数&#x1f4da;函数参数默认值设定&#x1f4da;rest参数&#x1f4da;spread扩展运算符&a…...

生产环境下,应用模式部署flink任务,通过hdfs提交

前言 通过通过yarn.provided.lib.dirs配置选项指定位置&#xff0c;将flink的依赖上传到hdfs文件管理系统 1. 实践 &#xff08;1&#xff09;生产集群为cdh集群&#xff0c;从cm上下载配置文件&#xff0c;设置环境 export HADOOP_CONF_DIR/home/conf/auth export HADOOP_CL…...

【lesson59】线程池问题解答和读者写者问题

文章目录 线程池问题解答什么是单例模式什么是设计模式单例模式的特点饿汉和懒汉模式的理解STL中的容器是否是线程安全的?智能指针是否是线程安全的&#xff1f;其他常见的各种锁 读者写者问题 线程池问题解答 什么是单例模式 单例模式是一种 “经典的, 常用的, 常考的” 设…...

【LeetCode每日一题】单调栈316去除重复字母

题目&#xff1a;去除重复字母 给你一个字符串 s &#xff0c;请你去除字符串中重复的字母&#xff0c;使得每个字母只出现一次。需保证 返回结果的字典序最小&#xff08;要求不能打乱其他字符的相对位置&#xff09;。 示例 1&#xff1a; 输入&#xff1a;s “bcabc” 输…...

【Git】Gitbash使用ssh 上传本地项目到github

SSH Git上传项目到GitHub&#xff08;图文&#xff09;_git ssh上传github-CSDN博客 前提 ssh-keygen -t rsa -C “自己的github电子邮箱” 生成密钥&#xff0c;公钥保存到自己的github的ssh里 1.先创建一个仓库&#xff0c;复制ssh地址 git init git add . git commit -m …...

activeMq将mqtt发布订阅转成消息队列

1、activemq.xml置文件新增如下内容 2、mqttx测试发送&#xff1a; 主题&#xff08;配置的模糊匹配&#xff0c;为了并发&#xff09;&#xff1a;VirtualTopic/device/sendData/12312 3、mqtt接收的结果 4、程序处理 package comimport cn.hutool.core.date.DateUtil; imp…...

Go语言教程

一、引言 Go&#xff08;又称Golang&#xff09;是由Google开发的一种静态类型、编译型的开源编程语言。它旨在提供简单、快速和可靠的软件开发体验。Go语言结合了动态语言的开发效率和静态语言的安全性能&#xff0c;特别适用于网络编程、系统编程和并发编程。本教程将介绍Go…...

分布式锁的应用场景及实现

文章目录 分布式锁的应用场景及实现1. 应用场景2. 分布式锁原理3. 分布式锁的实现3.1 基于数据库 分布式锁的应用场景及实现 1. 应用场景 电商网站在进行秒杀、特价等大促活动时&#xff0c;面临访问量激增和高并发的挑战。由于活动商品通常是有限库存的&#xff0c;为了避免…...

嵌入式Linux中apt、apt-get命令用法汇总

在Linux环境开发过程中接触ubuntu虚拟机时&#xff0c;在安装软件或者更新软件时apt和apt-get命令使用相对较频繁&#xff0c;下面对这两个命令的用法进行汇总。 apt&#xff08;Advanced Package Tool&#xff09;和 apt-get 是用于在基于 Debian 的 Linux 发行版中进行软件包…...

Unity之ShaderGraph如何实现水面波浪

前言 这几天通过一个水的波浪数学公式,实现了一个波浪效果,感觉成就感满满,下面给大家分享一下 首先先给大家看一下公式; 把公式转为ShaderGraph 第一行公式:waveType = z*-1*Mathf.Cos(wave.WaveAngle/360*2*Mathf.PI)+x*Mathf.Sin(WaveAngle/360*-2*Mathf.PI) 转换…...

无线局域网(WLAN)简单概述

无线局域网 无线局域网概述 无限局域网&#xff08;Wireless Local Area Network&#xff0c;WLAN&#xff09;是一种短距离无线通信组网技术&#xff0c;它是以无线信道为传输媒质构成的计算机网络&#xff0c;通过无线电传播技术来实现在空间传输数据。 WLAN是传输范围在1…...

XML Group端口详解

在XML数据映射过程中&#xff0c;经常需要对数据进行分组聚合操作。例如&#xff0c;当处理包含多个物料明细的XML文件时&#xff0c;可能需要将相同物料号的明细归为一组&#xff0c;或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码&#xff0c;增加了开…...

Xshell远程连接Kali(默认 | 私钥)Note版

前言:xshell远程连接&#xff0c;私钥连接和常规默认连接 任务一 开启ssh服务 service ssh status //查看ssh服务状态 service ssh start //开启ssh服务 update-rc.d ssh enable //开启自启动ssh服务 任务二 修改配置文件 vi /etc/ssh/ssh_config //第一…...

Unity3D中Gfx.WaitForPresent优化方案

前言 在Unity中&#xff0c;Gfx.WaitForPresent占用CPU过高通常表示主线程在等待GPU完成渲染&#xff08;即CPU被阻塞&#xff09;&#xff0c;这表明存在GPU瓶颈或垂直同步/帧率设置问题。以下是系统的优化方案&#xff1a; 对惹&#xff0c;这里有一个游戏开发交流小组&…...

django filter 统计数量 按属性去重

在Django中&#xff0c;如果你想要根据某个属性对查询集进行去重并统计数量&#xff0c;你可以使用values()方法配合annotate()方法来实现。这里有两种常见的方法来完成这个需求&#xff1a; 方法1&#xff1a;使用annotate()和Count 假设你有一个模型Item&#xff0c;并且你想…...

关于 WASM:1. WASM 基础原理

一、WASM 简介 1.1 WebAssembly 是什么&#xff1f; WebAssembly&#xff08;WASM&#xff09; 是一种能在现代浏览器中高效运行的二进制指令格式&#xff0c;它不是传统的编程语言&#xff0c;而是一种 低级字节码格式&#xff0c;可由高级语言&#xff08;如 C、C、Rust&am…...

在WSL2的Ubuntu镜像中安装Docker

Docker官网链接: https://docs.docker.com/engine/install/ubuntu/ 1、运行以下命令卸载所有冲突的软件包&#xff1a; for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt-get remove $pkg; done2、设置Docker…...

【开发技术】.Net使用FFmpeg视频特定帧上绘制内容

目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法&#xff0c;当前调用一个医疗行业的AI识别算法后返回…...

AI书签管理工具开发全记录(十九):嵌入资源处理

1.前言 &#x1f4dd; 在上一篇文章中&#xff0c;我们完成了书签的导入导出功能。本篇文章我们研究如何处理嵌入资源&#xff0c;方便后续将资源打包到一个可执行文件中。 2.embed介绍 &#x1f3af; Go 1.16 引入了革命性的 embed 包&#xff0c;彻底改变了静态资源管理的…...

Maven 概述、安装、配置、仓库、私服详解

目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...

《C++ 模板》

目录 函数模板 类模板 非类型模板参数 模板特化 函数模板特化 类模板的特化 模板&#xff0c;就像一个模具&#xff0c;里面可以将不同类型的材料做成一个形状&#xff0c;其分为函数模板和类模板。 函数模板 函数模板可以简化函数重载的代码。格式&#xff1a;templa…...