RabbitMQ实践——搭建多人聊天服务
大纲
- 用户登录
- 创建聊天室
- 监听Stream(聊天室)
- 发送消息
- 实验
- 登录
- Tom侧
- Jerry侧
- 创建聊天室
- Jerry侧
- Tom侧
- 进入聊天室
- Jerry侧
- Tom侧
- 发送消息
- Jerry发送消息
- Jerry侧聊天室
- Tom侧聊天室
- Tom发送消息
- Jerry侧聊天室
- Tom侧聊天室
- 代码工程
- 参考资料
在《RabbitMQ实践——搭建单人聊天服务》一文中,我们搭建了Tom和Jerry两人的聊天服务。在这个服务中,它们都向Fanout交换器发送消息。而Fanout会将消息路由到它们两各自监听的队列。这样它们就可以得到全部消息。
如果是多人聊天,比如10个人聊天,按上述方案,需要Fanout交换器绑定10个队列。这就会使得结构变得非常复杂。
这是因为Classic类型队列在消费者确认读取消息后,会将消息从队列中删除。这样就需要我们使用fanout向多个队列路由消息,以供不同消费者消费。如果多个消费者消费同一个队列,则会导致每个消费者得到的都是部分信息。这就不符合我们理解的聊天场景。
但是我们可以使用Stream类型队列来解决这个问题。
Stream类型队列和之前的Classic队列的不同点是:Stream队列并不会清除消息。消息会一直存在于Stream队列中,消费者可以从指定位置开始读取消息。这样我们只要有一个Stream队列保存消息,所有消费者都从队列中读取消息即可。
用户登录
关于用户登录的流程我们在《RabbitMQ实践——搭建单人聊天服务》中已经有详细的介绍。即上图中黑色字体1、2、3、4、5的步骤。
创建聊天室
我们会创建一个以聊天室名称命名的交换器和Stream类型队列。即上图中黑色字体6、7、8、9的步骤。
需要注意的是Stream类型队列创建方案和Classic类型类似,只需要多指定"x-queue-type"=“stream”。但是对于Durable(持久化)只能设置为True,exclusive只能设置为False,autoDelete只能设置为False。
package com.rabbitmq.chat.service;import java.util.Collections;
import java.util.Date;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import reactor.core.publisher.Flux;@Service
public class ChatRoomV2 {@Autowiredprivate RabbitTemplate rabbitTemplate;public void createChatRoom(String admin, String roomName) {createChatRoom(roomName);}private void createChatRoom(String roomName) {rabbitTemplate.execute(action -> {action.exchangeDeclare(roomName, "fanout", false, true, null);action.queueDeclare(roomName, true, false, false,Collections.singletonMap("x-queue-type", "stream"));action.queueBind(roomName, roomName, "");return null;});}
聊天室创建完毕后,会通知所有登录的用户。
@PostMapping("/create")public void create(@RequestParam String admin, @RequestParam String roomName) {chatRoomV2.createChatRoom(admin, roomName);core.notifyEveryone(roomName + " is created");}
监听Stream(聊天室)
public Flux<String> receive(String username, String roomName) {return Flux.create(emitter -> {rabbitTemplate.execute(channel -> {channel.basicQos(100);Date timestamp = new Date(System.currentTimeMillis());channel.basicConsume(roomName, false, username,false, true,Collections.singletonMap("x-stream-offset", timestamp),(consumerTag, message) -> {String senderOfMessage = message.getProperties().getHeaders().get("username").toString();String show = "You Said: ";if (!senderOfMessage.equals(username)) {show = senderOfMessage + " Said: ";}show += new String(message.getBody());System.out.println(show);emitter.next(show);channel.basicAck(message.getEnvelope().getDeliveryTag(), false);},consumerTag -> { });return null;});});}
我们将"x-stream-offset"设置为当前毫秒数,是表示我们只读取当前时间之后发布的消息。这也符合聊天室的业务特点:不能读取历史消息。
当我们收到消息后,我们会获取消息Header中的自定义字段username,它标志了消息的发布者。如果发布者和读取者是同一人,我们将展示内容前面新增“You Said:”;如果是别人说的,则标记发布者的名称。
由于我们使用了WebFlux响应式编程,所以Controller层要做特殊处理
@GetMapping(value = "/receive", produces = "text/event-stream")public Flux<String> receive(@RequestParam String username, @RequestParam String roomName) {return chatRoomV2.receive(username, roomName);}
发送消息
每个聊天室用户只要给之前创建的Fanout交换器发送消息即可。在这一步,我们给他们发送的消息Header中新增了字段username,以标记是谁发送的。
public void send(String username, String roomName, String message) {Message msg = MessageBuilder.withBody(message.getBytes()).setHeader("username", username).build();rabbitTemplate.send(roomName, "", msg);}
实验
登录
Tom侧
Jerry侧
创建聊天室
Jerry侧
Jerry申请创建一个聊天室
在管理后台,我们看到对应的交换器和Stream都创建出来了。
同时在刚才的登录接口界面,Jerry收到了通知
Tom侧
Tom也会收到通知
进入聊天室
Tom和Jerry在收到通知后,可以通过receive接口进入聊天室,监听聊天室内容变化。
Jerry侧
Tom侧
发送消息
Jerry发送消息
Jerry侧聊天室
Tom侧聊天室
Tom发送消息
Jerry侧聊天室
Tom侧聊天室
代码工程
https://github.com/f304646673/RabbitMQDemo
参考资料
- https://www.rabbitmq.com/docs/streams
相关文章:

RabbitMQ实践——搭建多人聊天服务
大纲 用户登录创建聊天室监听Stream(聊天室)发送消息实验登录Tom侧Jerry侧 创建聊天室Jerry侧Tom侧 进入聊天室Jerry侧Tom侧 发送消息Jerry发送消息Jerry侧聊天室Tom侧聊天室 Tom发送消息Jerry侧聊天室Tom侧聊天室 代码工程参考资料 在《RabbitMQ实践——…...

git分布式版本控制系统
Git - Downloads (git-scm.com) gitee教程(超全,超详细,超长)-CSDN博客 Git教程 - 廖雪峰的官方网站 (liaoxuefeng.com) 所有的版本控制系统,其实只能跟踪文本文件改动,比如TXT文件,网页&…...

基于weixin小程序的民宿短租系统的设计与实现
管理员账户功能包括:系统首页,个人中心,房主管理,房间类型管理,用户管理,民宿信息管理,民宿预订管理,系统管理 小程序功能包括:系统首页,民宿信息,…...
2024-06-22力扣每日一题
链接: 2663. 字典序最小的美丽字符串 题意 略 解: 要求字符串内不存在任何长度为 2 或更长的回文子字符串,则在任意位置不存在aa或aba形式 由于要被给定字符串字典序大,且找到符合条件的字典序最小字符串,则竟可…...

S_LOVE多端恋爱小站小程序源码 uniapp多端
S_LOVE多端恋爱小站小程序源码,采用uniapp多端开发框架进行开发,目前已适配H5、微信小程序版本。 源码下载:https://download.csdn.net/download/m0_66047725/89421726 更多资源下载:关注我。...
如何避免MySQL的死锁或性能下降
1、按顺序访问数据 确保多个线程或事务在访问多个表或行时,按照相同的顺序进行。这可以避免循环等待和资源竞争,从而降低死锁的风险。 2、避免长时间持有锁 尽量缩短事务的执行时间,避免长时间持有锁。长时间持有锁会增加其他事务等待的时…...

《C语言》编译和链接
文章目录 一、翻译环境1、预处理2、编译3、汇编4、链接 二、运行环境 一、翻译环境 在使用编译器编写代码时,编写的代码是高级语言,机器无法直接识别和运行,在编译器内部会翻译成机器可执行的机器语言。 编译环境由编译和链接两大过程组成。 …...
group by和select的兼容性问题
group by和select的兼容性问题 在标准的SQL语法中,GROUP BY 和 SELECT 之间不存在兼容性问题,因为它们是 SQL 查询语句的基本组成部分,而且它们的使用方式是相互兼容的。 SELECT 子句和 GROUP BY 子句的关系: SELECT 子句&#…...
切面aspect处理fegin调用转本地调用
切面处理fegin调用转本地调用 问题:原fegin调用转本地调用详细描述方案代码实现总结问题:原fegin调用转本地调用 项目原来是微服务项目服务与服务之间是通过fegin进行交互的,但是现在微服务项目要重构为单体项目,原fegin调用的方法要给为本地调用 详细描述 zyy-aiot │ …...

Linux 磁盘挂载与分区
Linux 磁盘挂载与分区 vda1: 其中vd表示虚拟磁盘,a表示第一块磁盘,b表示第二块磁盘,1表示第一块磁盘的第一分区(显然两块磁盘都只有一个分区)图中可以看到,vda1磁盘只有一个分区,且全部挂载到根…...

Open3D 将ShapeNet数据集txt转pcd
目录 一、概述 二、代码实现 三、实现效果 一、概述 ShapeNet 数据集是一个广泛使用的三维物体数据集,主要用于计算机视觉、计算机图形学、机器人学和机器学习等领域的研究。它包含大量的三维物体模型,并附有丰富的标注信息。ShapeNet 数据集由普林斯…...

综合项目实战--jenkins节点模式
一、DevOps流程 DevOps是一种方法论,是一系列可以帮助开发者和运维人员在实现各自目标的前提下,向自己的客户或用户交付最大化价值及最高质量成果的基本原则和实践,能让开发、测试、运维效率协同工作的方法。 DevOps流程(自动化测试部分) DevOps完整流程 二、gitee+j…...

WhaleStudio 2.6重磅发布!调度模块WhaleScheduler更新78项核心功能
我们很高兴地宣布WhaleStudio 2.6版本的正式发布!新版本中包含了数据调度模块WhaleScheduler和数据集成模块WhaleTunnel的百余项核心功能更新,本文摘选了WhaleScheduler常用功能更新的概况,关于WhaleTunnel的更新详情将于近期发布,…...
笔记101:OSQP求解器的底层算法 -- ADMM算法
前言1:这篇博客仅限于介绍拉格朗日乘子法,KKT条件,ALM算法,ADMM算法等最优化方法的使用以及简版代码实现,但不会涉及具体的数学推导;不过在下面我会给出具体数学推导的相关文章和截图,供学有余力…...

Java银系统/超市收银系统/智慧新零售/ERP进销存管理/线上商城/h5/小程序
>>>系统简述: 神点收银系统支持B2B2C多商户模式,系统基于前后端分离的架构,后端采用Java SpringBoot Mysql Mybatis Plus,前端基于当前流行的Uniapp、Element UI,支持小程序、h5。架构包含:会员端…...

大学网页制作作品1
作品须知:1.该网页作品预计分为5个页面(其中1个登录页面,1个首页主页面,3个分页面),如需要可自行删改增加页面。(总共约800行html,1200行css,100行js) 2.此网页源代码只用于学习和模…...

【会议征稿,IEEE出版】第三届机器人、人工智能与智能控制国际会议(RAIIC 2024,7月5-7)
第三届机器人、人工智能与智能控制国际会议(RAIIC 2024)将于2024年7月5-7日中国绵阳举行。 RAIIC 2024是汇聚业界和学术界的顶级论坛,会议将邀请国内外著名专家就以传播机器人、人工智能与智能控制领域的技术进步、研究成果和应用做专题报告…...

离线部署OpenIM
目录 1.提取相关安装包和镜像 2.安装docker和docker-compose 3.依次导入镜像 4.解压安装包 5.执行安装命令 6.PC Web 验证 7.开放端口 7.1IM 端口 7.2Chat 端口 7.3 PC Web 及管理后台前端资源端口 “如果您在解决类似问题时也遇到了困难,希望我的经验分享…...

sql:between and日期毫秒精度过多导致的查询bug
复现 一般情况下,前端传的日期值大多都是yyyy-MM-dd HH:mm:ss(标准格式),比如2024-06-25 10:49:50,但是在测试环境,测试人员测出了一个带毫秒的日期:比如2024-06-25 10:49:50.9999999 这种情况下会出现查询bug SELEC…...

【日常记录】【JS】优雅检测用户是否在指定元素的外部点击
文章目录 1、界面基本布局2、代码实现3、参考链接 1、界面基本布局 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0">…...
Vue记事本应用实现教程
文章目录 1. 项目介绍2. 开发环境准备3. 设计应用界面4. 创建Vue实例和数据模型5. 实现记事本功能5.1 添加新记事项5.2 删除记事项5.3 清空所有记事 6. 添加样式7. 功能扩展:显示创建时间8. 功能扩展:记事项搜索9. 完整代码10. Vue知识点解析10.1 数据绑…...
【位运算】消失的两个数字(hard)
消失的两个数字(hard) 题⽬描述:解法(位运算):Java 算法代码:更简便代码 题⽬链接:⾯试题 17.19. 消失的两个数字 题⽬描述: 给定⼀个数组,包含从 1 到 N 所有…...

前端导出带有合并单元格的列表
// 导出async function exportExcel(fileName "共识调整.xlsx") {// 所有数据const exportData await getAllMainData();// 表头内容let fitstTitleList [];const secondTitleList [];allColumns.value.forEach(column > {if (!column.children) {fitstTitleL…...

学习STC51单片机31(芯片为STC89C52RCRC)OLED显示屏1
每日一言 生活的美好,总是藏在那些你咬牙坚持的日子里。 硬件:OLED 以后要用到OLED的时候找到这个文件 OLED的设备地址 SSD1306"SSD" 是品牌缩写,"1306" 是产品编号。 驱动 OLED 屏幕的 IIC 总线数据传输格式 示意图 …...

Mac软件卸载指南,简单易懂!
刚和Adobe分手,它却总在Library里给你写"回忆录"?卸载的Final Cut Pro像电子幽灵般阴魂不散?总是会有残留文件,别慌!这份Mac软件卸载指南,将用最硬核的方式教你"数字分手术"࿰…...

Module Federation 和 Native Federation 的比较
前言 Module Federation 是 Webpack 5 引入的微前端架构方案,允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...

JUC笔记(上)-复习 涉及死锁 volatile synchronized CAS 原子操作
一、上下文切换 即使单核CPU也可以进行多线程执行代码,CPU会给每个线程分配CPU时间片来实现这个机制。时间片非常短,所以CPU会不断地切换线程执行,从而让我们感觉多个线程是同时执行的。时间片一般是十几毫秒(ms)。通过时间片分配算法执行。…...
C++八股 —— 单例模式
文章目录 1. 基本概念2. 设计要点3. 实现方式4. 详解懒汉模式 1. 基本概念 线程安全(Thread Safety) 线程安全是指在多线程环境下,某个函数、类或代码片段能够被多个线程同时调用时,仍能保证数据的一致性和逻辑的正确性…...

C++ Visual Studio 2017厂商给的源码没有.sln文件 易兆微芯片下载工具加开机动画下载。
1.先用Visual Studio 2017打开Yichip YC31xx loader.vcxproj,再用Visual Studio 2022打开。再保侟就有.sln文件了。 易兆微芯片下载工具加开机动画下载 ExtraDownloadFile1Info.\logo.bin|0|0|10D2000|0 MFC应用兼容CMD 在BOOL CYichipYC31xxloaderDlg::OnIni…...
【Java学习笔记】BigInteger 和 BigDecimal 类
BigInteger 和 BigDecimal 类 二者共有的常见方法 方法功能add加subtract减multiply乘divide除 注意点:传参类型必须是类对象 一、BigInteger 1. 作用:适合保存比较大的整型数 2. 使用说明 创建BigInteger对象 传入字符串 3. 代码示例 import j…...