RabbitMQ实践——搭建多人聊天服务
大纲
- 用户登录
- 创建聊天室
- 监听Stream(聊天室)
- 发送消息
- 实验
- 登录
- Tom侧
- Jerry侧
- 创建聊天室
- Jerry侧
- Tom侧
- 进入聊天室
- Jerry侧
- Tom侧
- 发送消息
- Jerry发送消息
- Jerry侧聊天室
- Tom侧聊天室
- Tom发送消息
- Jerry侧聊天室
- Tom侧聊天室
- 代码工程
- 参考资料
在《RabbitMQ实践——搭建单人聊天服务》一文中,我们搭建了Tom和Jerry两人的聊天服务。在这个服务中,它们都向Fanout交换器发送消息。而Fanout会将消息路由到它们两各自监听的队列。这样它们就可以得到全部消息。

如果是多人聊天,比如10个人聊天,按上述方案,需要Fanout交换器绑定10个队列。这就会使得结构变得非常复杂。
这是因为Classic类型队列在消费者确认读取消息后,会将消息从队列中删除。这样就需要我们使用fanout向多个队列路由消息,以供不同消费者消费。如果多个消费者消费同一个队列,则会导致每个消费者得到的都是部分信息。这就不符合我们理解的聊天场景。
但是我们可以使用Stream类型队列来解决这个问题。
Stream类型队列和之前的Classic队列的不同点是:Stream队列并不会清除消息。消息会一直存在于Stream队列中,消费者可以从指定位置开始读取消息。这样我们只要有一个Stream队列保存消息,所有消费者都从队列中读取消息即可。

用户登录
关于用户登录的流程我们在《RabbitMQ实践——搭建单人聊天服务》中已经有详细的介绍。即上图中黑色字体1、2、3、4、5的步骤。
创建聊天室
我们会创建一个以聊天室名称命名的交换器和Stream类型队列。即上图中黑色字体6、7、8、9的步骤。
需要注意的是Stream类型队列创建方案和Classic类型类似,只需要多指定"x-queue-type"=“stream”。但是对于Durable(持久化)只能设置为True,exclusive只能设置为False,autoDelete只能设置为False。
package com.rabbitmq.chat.service;import java.util.Collections;
import java.util.Date;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import reactor.core.publisher.Flux;@Service
public class ChatRoomV2 {@Autowiredprivate RabbitTemplate rabbitTemplate;public void createChatRoom(String admin, String roomName) {createChatRoom(roomName);}private void createChatRoom(String roomName) {rabbitTemplate.execute(action -> {action.exchangeDeclare(roomName, "fanout", false, true, null);action.queueDeclare(roomName, true, false, false,Collections.singletonMap("x-queue-type", "stream"));action.queueBind(roomName, roomName, "");return null;});}
聊天室创建完毕后,会通知所有登录的用户。
@PostMapping("/create")public void create(@RequestParam String admin, @RequestParam String roomName) {chatRoomV2.createChatRoom(admin, roomName);core.notifyEveryone(roomName + " is created");}
监听Stream(聊天室)
public Flux<String> receive(String username, String roomName) {return Flux.create(emitter -> {rabbitTemplate.execute(channel -> {channel.basicQos(100);Date timestamp = new Date(System.currentTimeMillis());channel.basicConsume(roomName, false, username,false, true,Collections.singletonMap("x-stream-offset", timestamp),(consumerTag, message) -> {String senderOfMessage = message.getProperties().getHeaders().get("username").toString();String show = "You Said: ";if (!senderOfMessage.equals(username)) {show = senderOfMessage + " Said: ";}show += new String(message.getBody());System.out.println(show);emitter.next(show);channel.basicAck(message.getEnvelope().getDeliveryTag(), false);},consumerTag -> { });return null;});});}
我们将"x-stream-offset"设置为当前毫秒数,是表示我们只读取当前时间之后发布的消息。这也符合聊天室的业务特点:不能读取历史消息。
当我们收到消息后,我们会获取消息Header中的自定义字段username,它标志了消息的发布者。如果发布者和读取者是同一人,我们将展示内容前面新增“You Said:”;如果是别人说的,则标记发布者的名称。
由于我们使用了WebFlux响应式编程,所以Controller层要做特殊处理
@GetMapping(value = "/receive", produces = "text/event-stream")public Flux<String> receive(@RequestParam String username, @RequestParam String roomName) {return chatRoomV2.receive(username, roomName);}
发送消息
每个聊天室用户只要给之前创建的Fanout交换器发送消息即可。在这一步,我们给他们发送的消息Header中新增了字段username,以标记是谁发送的。
public void send(String username, String roomName, String message) {Message msg = MessageBuilder.withBody(message.getBytes()).setHeader("username", username).build();rabbitTemplate.send(roomName, "", msg);}
实验
登录
Tom侧

Jerry侧

创建聊天室
Jerry侧
Jerry申请创建一个聊天室

在管理后台,我们看到对应的交换器和Stream都创建出来了。


同时在刚才的登录接口界面,Jerry收到了通知

Tom侧
Tom也会收到通知

进入聊天室
Tom和Jerry在收到通知后,可以通过receive接口进入聊天室,监听聊天室内容变化。
Jerry侧

Tom侧

发送消息
Jerry发送消息

Jerry侧聊天室

Tom侧聊天室

Tom发送消息

Jerry侧聊天室

Tom侧聊天室

代码工程
https://github.com/f304646673/RabbitMQDemo
参考资料
- https://www.rabbitmq.com/docs/streams
相关文章:
RabbitMQ实践——搭建多人聊天服务
大纲 用户登录创建聊天室监听Stream(聊天室)发送消息实验登录Tom侧Jerry侧 创建聊天室Jerry侧Tom侧 进入聊天室Jerry侧Tom侧 发送消息Jerry发送消息Jerry侧聊天室Tom侧聊天室 Tom发送消息Jerry侧聊天室Tom侧聊天室 代码工程参考资料 在《RabbitMQ实践——…...
git分布式版本控制系统
Git - Downloads (git-scm.com) gitee教程(超全,超详细,超长)-CSDN博客 Git教程 - 廖雪峰的官方网站 (liaoxuefeng.com) 所有的版本控制系统,其实只能跟踪文本文件改动,比如TXT文件,网页&…...
基于weixin小程序的民宿短租系统的设计与实现
管理员账户功能包括:系统首页,个人中心,房主管理,房间类型管理,用户管理,民宿信息管理,民宿预订管理,系统管理 小程序功能包括:系统首页,民宿信息,…...
2024-06-22力扣每日一题
链接: 2663. 字典序最小的美丽字符串 题意 略 解: 要求字符串内不存在任何长度为 2 或更长的回文子字符串,则在任意位置不存在aa或aba形式 由于要被给定字符串字典序大,且找到符合条件的字典序最小字符串,则竟可…...
S_LOVE多端恋爱小站小程序源码 uniapp多端
S_LOVE多端恋爱小站小程序源码,采用uniapp多端开发框架进行开发,目前已适配H5、微信小程序版本。 源码下载:https://download.csdn.net/download/m0_66047725/89421726 更多资源下载:关注我。...
如何避免MySQL的死锁或性能下降
1、按顺序访问数据 确保多个线程或事务在访问多个表或行时,按照相同的顺序进行。这可以避免循环等待和资源竞争,从而降低死锁的风险。 2、避免长时间持有锁 尽量缩短事务的执行时间,避免长时间持有锁。长时间持有锁会增加其他事务等待的时…...
《C语言》编译和链接
文章目录 一、翻译环境1、预处理2、编译3、汇编4、链接 二、运行环境 一、翻译环境 在使用编译器编写代码时,编写的代码是高级语言,机器无法直接识别和运行,在编译器内部会翻译成机器可执行的机器语言。 编译环境由编译和链接两大过程组成。 …...
group by和select的兼容性问题
group by和select的兼容性问题 在标准的SQL语法中,GROUP BY 和 SELECT 之间不存在兼容性问题,因为它们是 SQL 查询语句的基本组成部分,而且它们的使用方式是相互兼容的。 SELECT 子句和 GROUP BY 子句的关系: SELECT 子句&#…...
切面aspect处理fegin调用转本地调用
切面处理fegin调用转本地调用 问题:原fegin调用转本地调用详细描述方案代码实现总结问题:原fegin调用转本地调用 项目原来是微服务项目服务与服务之间是通过fegin进行交互的,但是现在微服务项目要重构为单体项目,原fegin调用的方法要给为本地调用 详细描述 zyy-aiot │ …...
Linux 磁盘挂载与分区
Linux 磁盘挂载与分区 vda1: 其中vd表示虚拟磁盘,a表示第一块磁盘,b表示第二块磁盘,1表示第一块磁盘的第一分区(显然两块磁盘都只有一个分区)图中可以看到,vda1磁盘只有一个分区,且全部挂载到根…...
Open3D 将ShapeNet数据集txt转pcd
目录 一、概述 二、代码实现 三、实现效果 一、概述 ShapeNet 数据集是一个广泛使用的三维物体数据集,主要用于计算机视觉、计算机图形学、机器人学和机器学习等领域的研究。它包含大量的三维物体模型,并附有丰富的标注信息。ShapeNet 数据集由普林斯…...
综合项目实战--jenkins节点模式
一、DevOps流程 DevOps是一种方法论,是一系列可以帮助开发者和运维人员在实现各自目标的前提下,向自己的客户或用户交付最大化价值及最高质量成果的基本原则和实践,能让开发、测试、运维效率协同工作的方法。 DevOps流程(自动化测试部分) DevOps完整流程 二、gitee+j…...
WhaleStudio 2.6重磅发布!调度模块WhaleScheduler更新78项核心功能
我们很高兴地宣布WhaleStudio 2.6版本的正式发布!新版本中包含了数据调度模块WhaleScheduler和数据集成模块WhaleTunnel的百余项核心功能更新,本文摘选了WhaleScheduler常用功能更新的概况,关于WhaleTunnel的更新详情将于近期发布,…...
笔记101:OSQP求解器的底层算法 -- ADMM算法
前言1:这篇博客仅限于介绍拉格朗日乘子法,KKT条件,ALM算法,ADMM算法等最优化方法的使用以及简版代码实现,但不会涉及具体的数学推导;不过在下面我会给出具体数学推导的相关文章和截图,供学有余力…...
Java银系统/超市收银系统/智慧新零售/ERP进销存管理/线上商城/h5/小程序
>>>系统简述: 神点收银系统支持B2B2C多商户模式,系统基于前后端分离的架构,后端采用Java SpringBoot Mysql Mybatis Plus,前端基于当前流行的Uniapp、Element UI,支持小程序、h5。架构包含:会员端…...
大学网页制作作品1
作品须知:1.该网页作品预计分为5个页面(其中1个登录页面,1个首页主页面,3个分页面),如需要可自行删改增加页面。(总共约800行html,1200行css,100行js) 2.此网页源代码只用于学习和模…...
【会议征稿,IEEE出版】第三届机器人、人工智能与智能控制国际会议(RAIIC 2024,7月5-7)
第三届机器人、人工智能与智能控制国际会议(RAIIC 2024)将于2024年7月5-7日中国绵阳举行。 RAIIC 2024是汇聚业界和学术界的顶级论坛,会议将邀请国内外著名专家就以传播机器人、人工智能与智能控制领域的技术进步、研究成果和应用做专题报告…...
离线部署OpenIM
目录 1.提取相关安装包和镜像 2.安装docker和docker-compose 3.依次导入镜像 4.解压安装包 5.执行安装命令 6.PC Web 验证 7.开放端口 7.1IM 端口 7.2Chat 端口 7.3 PC Web 及管理后台前端资源端口 “如果您在解决类似问题时也遇到了困难,希望我的经验分享…...
sql:between and日期毫秒精度过多导致的查询bug
复现 一般情况下,前端传的日期值大多都是yyyy-MM-dd HH:mm:ss(标准格式),比如2024-06-25 10:49:50,但是在测试环境,测试人员测出了一个带毫秒的日期:比如2024-06-25 10:49:50.9999999 这种情况下会出现查询bug SELEC…...
【日常记录】【JS】优雅检测用户是否在指定元素的外部点击
文章目录 1、界面基本布局2、代码实现3、参考链接 1、界面基本布局 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0">…...
变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析
一、变量声明设计:let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性,这种设计体现了语言的核心哲学。以下是深度解析: 1.1 设计理念剖析 安全优先原则:默认不可变强制开发者明确声明意图 let x 5; …...
Linux相关概念和易错知识点(42)(TCP的连接管理、可靠性、面临复杂网络的处理)
目录 1.TCP的连接管理机制(1)三次握手①握手过程②对握手过程的理解 (2)四次挥手(3)握手和挥手的触发(4)状态切换①挥手过程中状态的切换②握手过程中状态的切换 2.TCP的可靠性&…...
Keil 中设置 STM32 Flash 和 RAM 地址详解
文章目录 Keil 中设置 STM32 Flash 和 RAM 地址详解一、Flash 和 RAM 配置界面(Target 选项卡)1. IROM1(用于配置 Flash)2. IRAM1(用于配置 RAM)二、链接器设置界面(Linker 选项卡)1. 勾选“Use Memory Layout from Target Dialog”2. 查看链接器参数(如果没有勾选上面…...
Linux 内存管理实战精讲:核心原理与面试常考点全解析
Linux 内存管理实战精讲:核心原理与面试常考点全解析 Linux 内核内存管理是系统设计中最复杂但也最核心的模块之一。它不仅支撑着虚拟内存机制、物理内存分配、进程隔离与资源复用,还直接决定系统运行的性能与稳定性。无论你是嵌入式开发者、内核调试工…...
人工智能(大型语言模型 LLMs)对不同学科的影响以及由此产生的新学习方式
今天是关于AI如何在教学中增强学生的学习体验,我把重要信息标红了。人文学科的价值被低估了 ⬇️ 转型与必要性 人工智能正在深刻地改变教育,这并非炒作,而是已经发生的巨大变革。教育机构和教育者不能忽视它,试图简单地禁止学生使…...
Spring Security 认证流程——补充
一、认证流程概述 Spring Security 的认证流程基于 过滤器链(Filter Chain),核心组件包括 UsernamePasswordAuthenticationFilter、AuthenticationManager、UserDetailsService 等。整个流程可分为以下步骤: 用户提交登录请求拦…...
离线语音识别方案分析
随着人工智能技术的不断发展,语音识别技术也得到了广泛的应用,从智能家居到车载系统,语音识别正在改变我们与设备的交互方式。尤其是离线语音识别,由于其在没有网络连接的情况下仍然能提供稳定、准确的语音处理能力,广…...
智能职业发展系统:AI驱动的职业规划平台技术解析
智能职业发展系统:AI驱动的职业规划平台技术解析 引言:数字时代的职业革命 在当今瞬息万变的就业市场中,传统的职业规划方法已无法满足个人和企业的需求。据统计,全球每年有超过2亿人面临职业转型困境,而企业也因此遭…...
uni-app学习笔记三十五--扩展组件的安装和使用
由于内置组件不能满足日常开发需要,uniapp官方也提供了众多的扩展组件供我们使用。由于不是内置组件,需要安装才能使用。 一、安装扩展插件 安装方法: 1.访问uniapp官方文档组件部分:组件使用的入门教程 | uni-app官网 点击左侧…...
大数据治理的常见方式
大数据治理的常见方式 大数据治理是确保数据质量、安全性和可用性的系统性方法,以下是几种常见的治理方式: 1. 数据质量管理 核心方法: 数据校验:建立数据校验规则(格式、范围、一致性等)数据清洗&…...
