RabbitMQ实践——搭建多人聊天服务
大纲
- 用户登录
- 创建聊天室
- 监听Stream(聊天室)
- 发送消息
- 实验
- 登录
- Tom侧
- Jerry侧
- 创建聊天室
- Jerry侧
- Tom侧
- 进入聊天室
- Jerry侧
- Tom侧
- 发送消息
- Jerry发送消息
- Jerry侧聊天室
- Tom侧聊天室
- Tom发送消息
- Jerry侧聊天室
- Tom侧聊天室
- 代码工程
- 参考资料
在《RabbitMQ实践——搭建单人聊天服务》一文中,我们搭建了Tom和Jerry两人的聊天服务。在这个服务中,它们都向Fanout交换器发送消息。而Fanout会将消息路由到它们两各自监听的队列。这样它们就可以得到全部消息。

如果是多人聊天,比如10个人聊天,按上述方案,需要Fanout交换器绑定10个队列。这就会使得结构变得非常复杂。
这是因为Classic类型队列在消费者确认读取消息后,会将消息从队列中删除。这样就需要我们使用fanout向多个队列路由消息,以供不同消费者消费。如果多个消费者消费同一个队列,则会导致每个消费者得到的都是部分信息。这就不符合我们理解的聊天场景。
但是我们可以使用Stream类型队列来解决这个问题。
Stream类型队列和之前的Classic队列的不同点是:Stream队列并不会清除消息。消息会一直存在于Stream队列中,消费者可以从指定位置开始读取消息。这样我们只要有一个Stream队列保存消息,所有消费者都从队列中读取消息即可。

用户登录
关于用户登录的流程我们在《RabbitMQ实践——搭建单人聊天服务》中已经有详细的介绍。即上图中黑色字体1、2、3、4、5的步骤。
创建聊天室
我们会创建一个以聊天室名称命名的交换器和Stream类型队列。即上图中黑色字体6、7、8、9的步骤。
需要注意的是Stream类型队列创建方案和Classic类型类似,只需要多指定"x-queue-type"=“stream”。但是对于Durable(持久化)只能设置为True,exclusive只能设置为False,autoDelete只能设置为False。
package com.rabbitmq.chat.service;import java.util.Collections;
import java.util.Date;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import reactor.core.publisher.Flux;@Service
public class ChatRoomV2 {@Autowiredprivate RabbitTemplate rabbitTemplate;public void createChatRoom(String admin, String roomName) {createChatRoom(roomName);}private void createChatRoom(String roomName) {rabbitTemplate.execute(action -> {action.exchangeDeclare(roomName, "fanout", false, true, null);action.queueDeclare(roomName, true, false, false,Collections.singletonMap("x-queue-type", "stream"));action.queueBind(roomName, roomName, "");return null;});}
聊天室创建完毕后,会通知所有登录的用户。
@PostMapping("/create")public void create(@RequestParam String admin, @RequestParam String roomName) {chatRoomV2.createChatRoom(admin, roomName);core.notifyEveryone(roomName + " is created");}
监听Stream(聊天室)
public Flux<String> receive(String username, String roomName) {return Flux.create(emitter -> {rabbitTemplate.execute(channel -> {channel.basicQos(100);Date timestamp = new Date(System.currentTimeMillis());channel.basicConsume(roomName, false, username,false, true,Collections.singletonMap("x-stream-offset", timestamp),(consumerTag, message) -> {String senderOfMessage = message.getProperties().getHeaders().get("username").toString();String show = "You Said: ";if (!senderOfMessage.equals(username)) {show = senderOfMessage + " Said: ";}show += new String(message.getBody());System.out.println(show);emitter.next(show);channel.basicAck(message.getEnvelope().getDeliveryTag(), false);},consumerTag -> { });return null;});});}
我们将"x-stream-offset"设置为当前毫秒数,是表示我们只读取当前时间之后发布的消息。这也符合聊天室的业务特点:不能读取历史消息。
当我们收到消息后,我们会获取消息Header中的自定义字段username,它标志了消息的发布者。如果发布者和读取者是同一人,我们将展示内容前面新增“You Said:”;如果是别人说的,则标记发布者的名称。
由于我们使用了WebFlux响应式编程,所以Controller层要做特殊处理
@GetMapping(value = "/receive", produces = "text/event-stream")public Flux<String> receive(@RequestParam String username, @RequestParam String roomName) {return chatRoomV2.receive(username, roomName);}
发送消息
每个聊天室用户只要给之前创建的Fanout交换器发送消息即可。在这一步,我们给他们发送的消息Header中新增了字段username,以标记是谁发送的。
public void send(String username, String roomName, String message) {Message msg = MessageBuilder.withBody(message.getBytes()).setHeader("username", username).build();rabbitTemplate.send(roomName, "", msg);}
实验
登录
Tom侧

Jerry侧

创建聊天室
Jerry侧
Jerry申请创建一个聊天室

在管理后台,我们看到对应的交换器和Stream都创建出来了。


同时在刚才的登录接口界面,Jerry收到了通知

Tom侧
Tom也会收到通知

进入聊天室
Tom和Jerry在收到通知后,可以通过receive接口进入聊天室,监听聊天室内容变化。
Jerry侧

Tom侧

发送消息
Jerry发送消息

Jerry侧聊天室

Tom侧聊天室

Tom发送消息

Jerry侧聊天室

Tom侧聊天室

代码工程
https://github.com/f304646673/RabbitMQDemo
参考资料
- https://www.rabbitmq.com/docs/streams
相关文章:
RabbitMQ实践——搭建多人聊天服务
大纲 用户登录创建聊天室监听Stream(聊天室)发送消息实验登录Tom侧Jerry侧 创建聊天室Jerry侧Tom侧 进入聊天室Jerry侧Tom侧 发送消息Jerry发送消息Jerry侧聊天室Tom侧聊天室 Tom发送消息Jerry侧聊天室Tom侧聊天室 代码工程参考资料 在《RabbitMQ实践——…...
git分布式版本控制系统
Git - Downloads (git-scm.com) gitee教程(超全,超详细,超长)-CSDN博客 Git教程 - 廖雪峰的官方网站 (liaoxuefeng.com) 所有的版本控制系统,其实只能跟踪文本文件改动,比如TXT文件,网页&…...
基于weixin小程序的民宿短租系统的设计与实现
管理员账户功能包括:系统首页,个人中心,房主管理,房间类型管理,用户管理,民宿信息管理,民宿预订管理,系统管理 小程序功能包括:系统首页,民宿信息,…...
2024-06-22力扣每日一题
链接: 2663. 字典序最小的美丽字符串 题意 略 解: 要求字符串内不存在任何长度为 2 或更长的回文子字符串,则在任意位置不存在aa或aba形式 由于要被给定字符串字典序大,且找到符合条件的字典序最小字符串,则竟可…...
S_LOVE多端恋爱小站小程序源码 uniapp多端
S_LOVE多端恋爱小站小程序源码,采用uniapp多端开发框架进行开发,目前已适配H5、微信小程序版本。 源码下载:https://download.csdn.net/download/m0_66047725/89421726 更多资源下载:关注我。...
如何避免MySQL的死锁或性能下降
1、按顺序访问数据 确保多个线程或事务在访问多个表或行时,按照相同的顺序进行。这可以避免循环等待和资源竞争,从而降低死锁的风险。 2、避免长时间持有锁 尽量缩短事务的执行时间,避免长时间持有锁。长时间持有锁会增加其他事务等待的时…...
《C语言》编译和链接
文章目录 一、翻译环境1、预处理2、编译3、汇编4、链接 二、运行环境 一、翻译环境 在使用编译器编写代码时,编写的代码是高级语言,机器无法直接识别和运行,在编译器内部会翻译成机器可执行的机器语言。 编译环境由编译和链接两大过程组成。 …...
group by和select的兼容性问题
group by和select的兼容性问题 在标准的SQL语法中,GROUP BY 和 SELECT 之间不存在兼容性问题,因为它们是 SQL 查询语句的基本组成部分,而且它们的使用方式是相互兼容的。 SELECT 子句和 GROUP BY 子句的关系: SELECT 子句&#…...
切面aspect处理fegin调用转本地调用
切面处理fegin调用转本地调用 问题:原fegin调用转本地调用详细描述方案代码实现总结问题:原fegin调用转本地调用 项目原来是微服务项目服务与服务之间是通过fegin进行交互的,但是现在微服务项目要重构为单体项目,原fegin调用的方法要给为本地调用 详细描述 zyy-aiot │ …...
Linux 磁盘挂载与分区
Linux 磁盘挂载与分区 vda1: 其中vd表示虚拟磁盘,a表示第一块磁盘,b表示第二块磁盘,1表示第一块磁盘的第一分区(显然两块磁盘都只有一个分区)图中可以看到,vda1磁盘只有一个分区,且全部挂载到根…...
Open3D 将ShapeNet数据集txt转pcd
目录 一、概述 二、代码实现 三、实现效果 一、概述 ShapeNet 数据集是一个广泛使用的三维物体数据集,主要用于计算机视觉、计算机图形学、机器人学和机器学习等领域的研究。它包含大量的三维物体模型,并附有丰富的标注信息。ShapeNet 数据集由普林斯…...
综合项目实战--jenkins节点模式
一、DevOps流程 DevOps是一种方法论,是一系列可以帮助开发者和运维人员在实现各自目标的前提下,向自己的客户或用户交付最大化价值及最高质量成果的基本原则和实践,能让开发、测试、运维效率协同工作的方法。 DevOps流程(自动化测试部分) DevOps完整流程 二、gitee+j…...
WhaleStudio 2.6重磅发布!调度模块WhaleScheduler更新78项核心功能
我们很高兴地宣布WhaleStudio 2.6版本的正式发布!新版本中包含了数据调度模块WhaleScheduler和数据集成模块WhaleTunnel的百余项核心功能更新,本文摘选了WhaleScheduler常用功能更新的概况,关于WhaleTunnel的更新详情将于近期发布,…...
笔记101:OSQP求解器的底层算法 -- ADMM算法
前言1:这篇博客仅限于介绍拉格朗日乘子法,KKT条件,ALM算法,ADMM算法等最优化方法的使用以及简版代码实现,但不会涉及具体的数学推导;不过在下面我会给出具体数学推导的相关文章和截图,供学有余力…...
Java银系统/超市收银系统/智慧新零售/ERP进销存管理/线上商城/h5/小程序
>>>系统简述: 神点收银系统支持B2B2C多商户模式,系统基于前后端分离的架构,后端采用Java SpringBoot Mysql Mybatis Plus,前端基于当前流行的Uniapp、Element UI,支持小程序、h5。架构包含:会员端…...
大学网页制作作品1
作品须知:1.该网页作品预计分为5个页面(其中1个登录页面,1个首页主页面,3个分页面),如需要可自行删改增加页面。(总共约800行html,1200行css,100行js) 2.此网页源代码只用于学习和模…...
【会议征稿,IEEE出版】第三届机器人、人工智能与智能控制国际会议(RAIIC 2024,7月5-7)
第三届机器人、人工智能与智能控制国际会议(RAIIC 2024)将于2024年7月5-7日中国绵阳举行。 RAIIC 2024是汇聚业界和学术界的顶级论坛,会议将邀请国内外著名专家就以传播机器人、人工智能与智能控制领域的技术进步、研究成果和应用做专题报告…...
离线部署OpenIM
目录 1.提取相关安装包和镜像 2.安装docker和docker-compose 3.依次导入镜像 4.解压安装包 5.执行安装命令 6.PC Web 验证 7.开放端口 7.1IM 端口 7.2Chat 端口 7.3 PC Web 及管理后台前端资源端口 “如果您在解决类似问题时也遇到了困难,希望我的经验分享…...
sql:between and日期毫秒精度过多导致的查询bug
复现 一般情况下,前端传的日期值大多都是yyyy-MM-dd HH:mm:ss(标准格式),比如2024-06-25 10:49:50,但是在测试环境,测试人员测出了一个带毫秒的日期:比如2024-06-25 10:49:50.9999999 这种情况下会出现查询bug SELEC…...
【日常记录】【JS】优雅检测用户是否在指定元素的外部点击
文章目录 1、界面基本布局2、代码实现3、参考链接 1、界面基本布局 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0">…...
JavaSec-RCE
简介 RCE(Remote Code Execution),可以分为:命令注入(Command Injection)、代码注入(Code Injection) 代码注入 1.漏洞场景:Groovy代码注入 Groovy是一种基于JVM的动态语言,语法简洁,支持闭包、动态类型和Java互操作性,…...
利用ngx_stream_return_module构建简易 TCP/UDP 响应网关
一、模块概述 ngx_stream_return_module 提供了一个极简的指令: return <value>;在收到客户端连接后,立即将 <value> 写回并关闭连接。<value> 支持内嵌文本和内置变量(如 $time_iso8601、$remote_addr 等)&a…...
Linux简单的操作
ls ls 查看当前目录 ll 查看详细内容 ls -a 查看所有的内容 ls --help 查看方法文档 pwd pwd 查看当前路径 cd cd 转路径 cd .. 转上一级路径 cd 名 转换路径 …...
最新SpringBoot+SpringCloud+Nacos微服务框架分享
文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的,根据Excel列的需求预估的工时直接打骨折,不要问我为什么,主要…...
苍穹外卖--缓存菜品
1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得,如果用户端访问量比较大,数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据,减少数据库查询操作。 缓存逻辑分析: ①每个分类下的菜品保持一份缓存数据…...
Java + Spring Boot + Mybatis 实现批量插入
在 Java 中使用 Spring Boot 和 MyBatis 实现批量插入可以通过以下步骤完成。这里提供两种常用方法:使用 MyBatis 的 <foreach> 标签和批处理模式(ExecutorType.BATCH)。 方法一:使用 XML 的 <foreach> 标签ÿ…...
Java毕业设计:WML信息查询与后端信息发布系统开发
JAVAWML信息查询与后端信息发布系统实现 一、系统概述 本系统基于Java和WML(无线标记语言)技术开发,实现了移动设备上的信息查询与后端信息发布功能。系统采用B/S架构,服务器端使用Java Servlet处理请求,数据库采用MySQL存储信息࿰…...
热门Chrome扩展程序存在明文传输风险,用户隐私安全受威胁
赛门铁克威胁猎手团队最新报告披露,数款拥有数百万活跃用户的Chrome扩展程序正在通过未加密的HTTP连接静默泄露用户敏感数据,严重威胁用户隐私安全。 知名扩展程序存在明文传输风险 尽管宣称提供安全浏览、数据分析或便捷界面等功能,但SEMR…...
写一个shell脚本,把局域网内,把能ping通的IP和不能ping通的IP分类,并保存到两个文本文件里
写一个shell脚本,把局域网内,把能ping通的IP和不能ping通的IP分类,并保存到两个文本文件里 脚本1 #!/bin/bash #定义变量 ip10.1.1 #循环去ping主机的IP for ((i1;i<10;i)) doping -c1 $ip.$i &>/dev/null[ $? -eq 0 ] &&am…...
Python的__call__ 方法
在 Python 中,__call__ 是一个特殊的魔术方法(magic method),它允许一个类的实例像函数一样被调用。当你在一个对象后面加上 () 并执行时(例如 obj()),Python 会自动调用该对象的 __call__ 方法…...
