RabbitMQ 客户端 连接、发送、接收处理消息
RabbitMQ 客户端 连接、发送、接收处理消息
一. RabbitMQ 的机制跟 Tcp、Udp、Http 这种还不太一样
RabbitMQ 服务,不是像其他服务器一样,负责逻辑处理,然后转发给客户端
而是所有客户端想要向 RabbitMQ服务发送消息,
第一步:创建一个链接 RabbitMQ 服务的连接
需要传入 RabbitMQ服务地址、用户名、密码,然后在连接代码中传入一个 queue 的字符串作为 标志
连接成功后,RabbitMQ服务上就可以看到这个链接了
如下图,可以看到有一个 Name = queueL1 的连接,后边有链接状态、消息数
Ready 和 Total 都是 0
向 RabbitMQ 发送消息的:
(1) 如果没有建立连接,执行第一步,建立一个链接
(2) 通过 发送消息接口向 RabbitMQ 服务 发消息
(3) RabbitMQ 服务接收到消息,只是按照连接的 queue 分别把消息放在自己名字的 queue 下, RabbitMQ 服务只是存着客户端发送的消息,服务什么都不处理
向 RabbitMQ 服务发送几条消息
下图可以看到 queueL1 的队列已经接收了 5 条消息,这五条消息如果没有客户端接收处理,就一直在这存着
接收 RabbitMQ 服务消息:
(1) 如果没有建立连接,执行第一步,建立一个链接
(2) 注册接收消息接口,在 RabbitMQ 中叫 消费消息,可以标记消费消息后是否将 RabbitMQ 的数据删除
(3) 如果 RabbitMQ 服务收到消息,就转发给 注册接收消息接口的 连接,如果接收的连接标记了 AutoDelete,那么发送给客户端后,RabbitMQ 就会将消息从消息队列中删除
注册接收消息,我的客户端就会收到 RabbitMQ 发送过来的消息,消息中包含发送上来的消息内容,还有发送消息的 queue 名字
此时再看,就会发现 Ready 和 Total 又变成 0 了
为什么上面讲解中将 接收 RabbitMQ 服务消息、向 RabbitMQ 发送消息的 分开说
是因为 RabbitMQ 发送消息就仅仅是发消息,发送完就不管了
而 RabbitMQ 的消费消息(接收消息) 也仅仅是接收消息,它不管是谁发的消息,只要是发送的 RabbitMQ 服务的消息,它都能接收,
(3.1) 比如我创建了 一个 连接,queue名为 xxxA,
它发送了消息 “Hello World”,
xxxA 连接自己又注册了 消费消息(接收消息),那么xxxA 自己就会接收到 xxxA 队列发送的 Hello World 信息
(3.2) 我又创建了 新的连接,queue 名还是 xxxA
那么新的连接也可以收到 (3.1) 发的 消息 HelloWorld
二. 客户端连接服务器
- 实例化一个 连接 RabbitMQ 服务的客户端连接
实例化需要传入 服务地址、端口、用户名、密码
using RabbitMQ.Client;
using System;
using System.Threading.Tasks;
using RabbitMQ.Client.Exceptions;
using UnityEngine;
using System.Text;
using RabbitMQ.Client.Events;namespace Network
{/// <summary>/// RabbitMQ 创建一个链接/// 供 RabbitMQReceive、RabbitMQSend 使用/// </summary>public class RabbitMQConnect{private RabbitMQConnectData connectData;private ConnectionFactory factory;private IChannel channel;private IConnection connection;private NetWorkState state;private Action<string, byte[]> receivedCallBack;private const int TimeOut = 10; //连接超时 10 秒private bool dispose = false;public RabbitMQConnect(RabbitMQConnectData connectData){this.connectData = connectData;State = NetWorkState.Disconnected;dispose = false;}public string Queue{get { return connectData.queue; }}public NetWorkState State{get { return state; }private set { state = value; }}public IChannel Channel{get { return channel; }}/// <summary>/// 网络是否连接中/// </summary>public bool IsConnect{get{if (null == channel || null == connection){return false;}return channel.IsOpen && connection.IsOpen;}}public async Task StartConnect(){if (State == NetWorkState.Connecting){await Task.Delay(TimeOut * 1000);}if (State == NetWorkState.Connected){return;}// 创建连接工厂// 如果初始化失败,不会启动恢复连接//factory = new ConnectionFactory()//{// HostName = hostName, // 替换为你的 RabbitMQ 服务器地址// UserName = userName, // 替换为用户名// Password = password // 替换为密码//};string url = $"amqp://{connectData.userName}:{connectData.password}@{connectData.hostName}:{connectData.port}"; //string.Format("amqp://unity:unity@139.9.137.14:5672");factory = new ConnectionFactory(){Uri = new Uri(url)};// 自动恢复连接factory.AutomaticRecoveryEnabled = true;// 如果由于异常导致恢复失败(例如RabbitMQ节点仍然不可达),它将在固定的时间间隔(默认为5秒)后重试。间隔时间可配置如下// Connection.CloseAsync 关闭的连接不会启动自动恢复连接factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);factory.TopologyRecoveryEnabled = true;while (State != NetWorkState.Connected){if (!dispose){await Connect();}}await Task.Delay(1);if (!string.IsNullOrEmpty(connectData.receiveQueeu)){await BasicConsumer();}}private async Task Connect(){try{State = NetWorkState.Connecting;// 异步创建连接connection = await factory.CreateConnectionAsync();channel = await connection.CreateChannelAsync();// 声明队列QueueDeclareOk queueDeclareOk = await channel.QueueDeclareAsync(queue: connectData.queue,durable: false,exclusive: false,autoDelete: false,arguments: null);/*autoDelete = true:没有消费者时队列自动删除,通常用于临时或一次性的队列。autoDelete = false:队列不会自动删除,通常用于需要长期存在的队列。选择是否设置 autoDelete = true 取决于你是否希望队列在没有消费者时自动删除。如果你的队列是临时的、一次性的,那么使用 autoDelete = true 会更适合;如果队列是长期需要使用的,则设置为 autoDelete = false 会更为合适 */State = NetWorkState.Connected;// 设置消费者的预取计数为10,允许同时处理10条消息await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 10, global: false);Debug.Log("RabbitMQ Connect Success");GameNotifycation.GetInstance().Notify<NetWorkState>(ENUM_MSG_TYPE.MSG_NETWORK_STATE_CHANGE, State);}catch (BrokerUnreachableException e){await Task.Delay(5000);State = NetWorkState.ConnectFailed;Debug.LogError("ConnectError:" + e.ToString());// apply retry logic}await Task.Delay(1);}/// <summary>/// 发送消息/// exchange: 要发布消息的交换机名称。/// routingKey: 路由键,决定消息应该路由到哪个队列。/// mandatory: 如果设置为 true,RabbitMQ 会确保消息至少被投递到一个队列。如果没有队列接收该消息,RabbitMQ 会触发 basic.return。/// immediate: 如果设置为 true,RabbitMQ 会在消息无法立即被消费时丢弃消息。/// basicProperties: 消息的属性,类型为 IBasicProperties。这些属性可以设置消息的优先级、持久性等。/// body: 消息体的字节数组。/// /// BasicPublishAsync 方法 没有返回消息投递的结果。它仅仅表示“请求已经被成功发送到 RabbitMQ 的交换机”。如果发布操作成功,Task 会正常完成,不会抛出异常。你可以通过异常处理来捕获潜在的错误。/// </summary>/// <param name="msg"></param>public async Task SendAsync(string message){if (!IsConnect){UnityEngine.Debug.Log("Send not IsConnect");await StartConnect();}try{IChannel channel = Channel;var body = Encoding.UTF8.GetBytes(message);var props = new BasicProperties();props.ContentType = "text/plain";props.DeliveryMode = DeliveryModes.Transient;await channel.BasicPublishAsync(exchange: "",routingKey: Queue,mandatory: false,basicProperties: props,body: body).ConfigureAwait(false);//Debug.Log($"[x] Sent: Complete");}catch (Exception ex){UnityEngine.Debug.LogError($"Error publishing message: {ex.Message}");}}/// <summary>/// 设置接收消息回调/// </summary>/// <param name="receivedCallBack"></param>public void SetReceive(Action<string, byte[]> receivedCallBack){this.receivedCallBack = receivedCallBack;}/// <summary>/// 创建异步消费者/// </summary>/// <returns></returns>public async Task<string> BasicConsumer(){if (!IsConnect){await StartConnect();}var consumer = new AsyncEventingBasicConsumer(Channel);// 处理消息的异步回调逻辑consumer.ReceivedAsync += ReceivedAsync;// 开始消费string result = await Channel.BasicConsumeAsync(queue: connectData.receiveQueeu, // 指定消费者要监听的队列名称autoAck: false, // 决定是否自动确认消息。如果 true,消息在交付时会自动确认。如果 false,则需要手动调用 BasicAck 确认消息consumer: consumer); // 指定消息的处理方式,通过实现 IBasicConsumer 接口来定义如何处理从队列中接收到的消息/*autoAck = true:消息一旦传递给消费者,RabbitMQ 就认为该消息被成功处理,无需再确认。autoAck = false:消费者需要显式地调用 channel.BasicAck 来确认消息的处理,通常用于消息处理失败时能够重试消息。*/return result;}/// <summary>/// 异步接收消息/// 如果 Channel.BasicConsumeAsync 方法中 autoAck 设置为 true,那么 channel.BasicAckAsync 调用是不允许的/// 想在 Channel.BasicConsumeAsync 消费消息收到消息时 调用 channel.BasicAckAsync,必须将 Channel.BasicConsumeAsync 方法中 autoAck 设置为 false/// </summary>/// <param name="sender"></param>/// <param name="eventArgs"></param>/// <returns></returns>private async Task ReceivedAsync(object sender, BasicDeliverEventArgs eventArgs){try{//Debug.Log("ReceivedAsync");AsyncEventingBasicConsumer consumer = sender as AsyncEventingBasicConsumer;string queue = consumer.Channel.CurrentQueue;var body = eventArgs.Body.ToArray();receivedCallBack?.Invoke(queue, body);// 模拟异步任务处理(比如访问数据库或调用其他服务)await channel.BasicAckAsync(eventArgs.DeliveryTag, false);}catch (Exception ex){Debug.LogError($"Error processing message: {ex.Message}");// 如果处理失败,可以拒绝并重新入队(可选)//await Channel.BasicNackAsync(eventArgs.DeliveryTag, multiple: false, requeue: true);}await Task.Delay(1);}/// <summary>/// 关闭连接/// </summary>public async void Dispose(){dispose = true;// 先关闭通道、再关闭连接if (channel != null) // 通道关闭{await channel.CloseAsync();channel = null;}if (connection != null) // 连接关闭{UnityEngine.Debug.Log("ConnectDispose");await connection.CloseAsync();connection = null;}await Task.Delay(1);}}
}
RibbitMQ 服务通过 queue 来区分每一个连接的客户端,代码部分如下
QueueDeclareOk queueDeclareOk = await channel.QueueDeclareAsync(queue: queue,durable: false,exclusive: false,autoDelete: false,arguments: null);
-
客户端实例
-
测试用例
using UnityEngine;
using Network;
using LitJson;
using System.Text;
using System.Collections;
using System.Collections.Generic;public class RabbitMQDemo : MonoBehaviour
{// 客户端private RabbitMQConnect rabbitMQConnect;private Queue<string> receiveQueue = new Queue<string>();void Start(){RabbitMQConnectData connectData = new RabbitMQConnectData();connectData.queue = "TestA";connectData.receiveQueeu = "TestA";connectData.hostName = "XXX.XXX.XXX.XXX";connectData.port = "5672";connectData.userName = "unity";connectData.password = "unity";// 实例化rabbitMQConnect = new RabbitMQConnect(connectData);rabbitMQConnect.SetReceive(Receive);StartConnect();}private async void StartConnect(){await rabbitMQConnect.StartConnect();}private async void Send(string meg){await rabbitMQConnect.SendAsync(meg);}private void Receive(string queue, byte[] byteData){var json = Encoding.UTF8.GetString(byteData);UnityEngine.Debug.Log($"[x] ReceivedAsync: {json}");receiveQueue.Enqueue(netWorkData);}private int number = 1000;// Update is called once per framevoid Update(){if (Input.GetKeyDown(KeyCode.A)){++number;Send("Hello RabbitMQ:" + number);}DispatchMessage();}private void DispatchMessage(){if (receiveQueue.Count <= 0){return;}string json = receiveQueue.Dequeue();}private void OnDestroy(){Debug.LogError("OnDestroy");rabbitMQConnect.Dispose();}
}
/// <summary>/// 网络连接状态/// </summary>public enum NetWorkState{// init/// <summary>/// 关闭/断开连接/// </summary>Closed,// client/// <summary>/// 已经建立连接/// </summary>Connected,/// <summary>/// 正在请求连接/// </summary>Connecting,/// <summary>/// 连接失败/// </summary>ConnectFailed,// both/// <summary>/// 连接超时/// </summary>Timeout,/// <summary>/// 断开连接/// </summary>Disconnected,}
扩展
可以在 网页上 Overview 页面,找到 Ports and contexts 部分
可以看到每种协议对应的端口是不一样的
每种协议都有一种独立的连接方式
需要根据自己选择的协议拼接路径
比如 我上面代码使用的 http 方式
string localHost = "localhost"; // ip如 xxx.xxx.xxx.xxxstring userName = "用户名";string password = "密码";// 创建连接工厂// 如果初始化失败,不会启动恢复连接factory = new ConnectionFactory(){HostName = hostName, // 替换为你的 RabbitMQ 服务器地址UserName = userName, // 替换为用户名Password = password // 替换为密码};
amqp 协议连接方式如下
string url = $"amqp://{userName}:{password}@{hostName}:{port}"; factory = new ConnectionFactory(){Uri = new Uri(url)};
相关文章:

RabbitMQ 客户端 连接、发送、接收处理消息
RabbitMQ 客户端 连接、发送、接收处理消息 一. RabbitMQ 的机制跟 Tcp、Udp、Http 这种还不太一样 RabbitMQ 服务,不是像其他服务器一样,负责逻辑处理,然后转发给客户端 而是所有客户端想要向 RabbitMQ服务发送消息, 第一步&a…...

Java Web 3 Axios Vue组件库
一 Ajax 1 同步 异步 2 原生Ajax 比较繁琐 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Documen…...

双目相机的标定,视差图,深度图,点云生成思路与实现。
该文档记录从双目相机标定到点云生成的所有过程,同时会附上代码。 代码直接能跑。https://github.com/stu-yzZ/stereoCamera 目录 大致思路如下: 一、相机标定 1、相机参数介绍 2、单目相机标定 3、双目相机标定 二、图片畸变矫正 三、极线矫正…...

【H2O2|全栈】MySQL的基本操作(三)
目录 前言 开篇语 准备工作 案例准备 多表查询 笛卡尔积 等值连接 外连接 内连接 自连接 子查询 存在和所有 含于 分页查询 建表语句 结束语 前言 开篇语 本篇继续讲解MySQL的一些基础的操作——数据字段的查询中的多表查询和分页查询,与单表查询…...
2、C++命名空间
命名空间 命名空间是一种用来避免命名冲突的机制; 原理是将一个全局的作用域分成一个个命名空间,每个命名空间是个单独的作用域,从而有效避免命名冲突。 注意:命名空间定义在全局 命名空间定义格式 使用: …...

Elemenu-UI时间日期单个组件,限制当前日期之后的时间
element的时间日期组件, type"datetime" ,当你设置了:picker-options"pickerOptions"之后 pickerOptions: { disabledDate(time) { return time.getTime() > Date.now(); }, }, 会发现,他只会限制日期,但不…...
flutter修改状态栏学习
在flutter中如何动态更改状态栏的颜色和风格。 前置知识点学习 AnnotatedRegion AnnotatedRegion 是 Flutter 中的一个小部件,用于在特定区域中提供元数据(metadata)以影响某些系统级的行为或外观。它通常用于改变系统 UI 的外观ÿ…...

解决Unity编辑器Inspector视图中文注释乱码
1.问题介绍 新创建一个脚本,用VS打开编辑,增加一行中文注释保存,在Unity中找到该脚本并选中,Inspector视图中预览的显示内容,该中文注释显示为乱码,如下图所示: 2.图示解决步骤 按上述步骤操作…...
关于csgo的游戏作弊与封禁
关于csgo的游戏作弊与封禁 一.关于作弊 什么叫作弊? 1.换肤,换库存 2.各种参(回溯,自瞄,透视,急停,连跳,假身,子弹跟踪等) 3.某一部分更改游戏内存&…...

严格单元测试造就安全软件
在信息技术迅速发展的今天,软件在各个行业中扮演着至关重要的角色,尤其是在汽车行业,其中软件的可靠性和安全性直接影响到人们的生命安全。软件缺陷所带来的潜在风险不容小觑,尤其在涉及到自动驾驶和车辆控制等关键系统时…...
ubuntu 根分区逻辑卷扩容
1、虚拟机关机通过管理界面给磁盘扩容。 rootcurtis:/home/curtis/git_code# pvdisplay--- Physical volume ---PV Name /dev/vda3VG Name ubuntu-vgPV Size <239.00 GiB / not usable 0Allocatable yes (but full)PE…...
如何查看电脑生产日期
查看电脑的生产日期通常可以通过以下方法实现,具体方式取决于操作系统和电脑类型: 方法 1:检查电脑 BIOS 生产日期通常记录在 BIOS 中。可以通过以下步骤查看: 重启电脑并进入 BIOS: 启动时按下特定的键(…...
MAC M1 mysql 8.0 如何修改root用户密码
关闭mysql服务 使用brew方式安装,可以通过一下命令关闭 brew services stop mysql使用安装包安装的方式 可以选择🍎->系统偏好设置->最下方单机MySQL图标->stop mysql server 启动 MySQL 到安全模式 sudo mysqld_safe --skip-grant-tables …...

漫画之家系统:Spring Boot框架下的漫画版权保护
摘 要 随着信息技术和网络技术的飞速发展,人类已进入全新信息化时代,传统管理技术已无法高效,便捷地管理信息。为了迎合时代需求,优化管理效率,各种各样的管理系统应运而生,各行各业相继进入信息管理时代&a…...

在 MacOS 上为 LM Studio 更换镜像源
在 MacOS 之中使用 LM Studio 部署本地 LLM时,用户可能会遇到无法下载模型的问题。 一般的解决方法是在 huggingface.co 或者国内的镜像站 hf-mirror.com 的项目介绍卡页面下载模型后拖入 LM Studio 的模型文件夹。这样无法利用 LM Studio 本身的搜索功能。 本文将…...

Nginx配置https(Ubuntu、Debian、Linux、麒麟)
Ubuntu操作系统,Debian系统底层是Ubuntu,差异不大 ubuntu 安装nginx 1.安装依赖 sudo apt-get update sudo apt-get install gcc sudo apt-get install libpcre3 libpcre3-dev sudo apt-get install zlib1g zlib1g-dev sudo apt-get install openssl lib…...

「Mac畅玩鸿蒙与硬件40」UI互动应用篇17 - 照片墙布局
本篇将带你实现一个简单的照片墙布局应用,通过展示多张图片组成照片墙效果,用户可以点击图片查看其状态变化。 关键词 UI互动应用照片墙布局Grid 布局动态图片加载用户交互 一、功能说明 照片墙布局应用的特点: 动态加载多张图片组成网格布…...

VMware Workstation 安装Ubuntu 系统(图文步骤)
之前一直在讲Ubuntu Linux的用户和组 链接: Linux专栏 今天来讲讲Ubuntu 系统基础的安装步骤!!! 废话少说,马上开始! 文章目录 前言准备安装环境先下载Ubuntu 镜像 详细安装步骤如下新建虚拟机默认使用 15.…...

mybatis用pagehelper 然后用CountJSqlParser45,发现自己手写的mapper查询效率很慢
如题 效率慢疑惑 效率慢 分页查询,发现效率很慢,然后发现是比较复杂的sql,CountJSqlParser45它不会帮忙优化掉,就是select多少字段它count的时候也还是这么多字段 框架里的用法是这样的 所以去看了CountJSqlParser45里面的代码,发现如果有group之类的,它就不帮忙把count优化…...

【优选算法 二分查找】二分查找入门详解:二分查找 & 在排序数组中查找元素的第一个和最后一个位置
二分查找 题目描述 题目解析 暴力解法 我们可以从左往右遍历一次数组,如果存在 target 则返回数组的下标,否则返回 -1; 时间复杂度 O(N),因为没有利用数组有序的特点,每次比较只能舍弃一个要比较的数&…...

【Python】 -- 趣味代码 - 小恐龙游戏
文章目录 文章目录 00 小恐龙游戏程序设计框架代码结构和功能游戏流程总结01 小恐龙游戏程序设计02 百度网盘地址00 小恐龙游戏程序设计框架 这段代码是一个基于 Pygame 的简易跑酷游戏的完整实现,玩家控制一个角色(龙)躲避障碍物(仙人掌和乌鸦)。以下是代码的详细介绍:…...

突破不可导策略的训练难题:零阶优化与强化学习的深度嵌合
强化学习(Reinforcement Learning, RL)是工业领域智能控制的重要方法。它的基本原理是将最优控制问题建模为马尔可夫决策过程,然后使用强化学习的Actor-Critic机制(中文译作“知行互动”机制),逐步迭代求解…...
前端倒计时误差!
提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...

微服务商城-商品微服务
数据表 CREATE TABLE product (id bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 商品id,cateid smallint(6) UNSIGNED NOT NULL DEFAULT 0 COMMENT 类别Id,name varchar(100) NOT NULL DEFAULT COMMENT 商品名称,subtitle varchar(200) NOT NULL DEFAULT COMMENT 商…...

2025盘古石杯决赛【手机取证】
前言 第三届盘古石杯国际电子数据取证大赛决赛 最后一题没有解出来,实在找不到,希望有大佬教一下我。 还有就会议时间,我感觉不是图片时间,因为在电脑看到是其他时间用老会议系统开的会。 手机取证 1、分析鸿蒙手机检材&#x…...

多种风格导航菜单 HTML 实现(附源码)
下面我将为您展示 6 种不同风格的导航菜单实现,每种都包含完整 HTML、CSS 和 JavaScript 代码。 1. 简约水平导航栏 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport&qu…...

项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)
Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败,具体原因是客户端发送了密码认证请求,但Redis服务器未设置密码 1.为Redis设置密码(匹配客户端配置) 步骤: 1).修…...
PAN/FPN
import torch import torch.nn as nn import torch.nn.functional as F import mathclass LowResQueryHighResKVAttention(nn.Module):"""方案 1: 低分辨率特征 (Query) 查询高分辨率特征 (Key, Value).输出分辨率与低分辨率输入相同。"""def __…...
Java毕业设计:WML信息查询与后端信息发布系统开发
JAVAWML信息查询与后端信息发布系统实现 一、系统概述 本系统基于Java和WML(无线标记语言)技术开发,实现了移动设备上的信息查询与后端信息发布功能。系统采用B/S架构,服务器端使用Java Servlet处理请求,数据库采用MySQL存储信息࿰…...
C++课设:简易日历程序(支持传统节假日 + 二十四节气 + 个人纪念日管理)
名人说:路漫漫其修远兮,吾将上下而求索。—— 屈原《离骚》 创作者:Code_流苏(CSDN)(一个喜欢古诗词和编程的Coder😊) 专栏介绍:《编程项目实战》 目录 一、为什么要开发一个日历程序?1. 深入理解时间算法2. 练习面向对象设计3. 学习数据结构应用二、核心算法深度解析…...