当前位置: 首页 > news >正文

RabbitMQ实践——搭建多人聊天服务

大纲

  • 用户登录
  • 创建聊天室
  • 监听Stream(聊天室)
  • 发送消息
  • 实验
    • 登录
      • Tom侧
      • Jerry侧
    • 创建聊天室
      • Jerry侧
      • Tom侧
    • 进入聊天室
      • Jerry侧
      • Tom侧
    • 发送消息
      • Jerry发送消息
        • Jerry侧聊天室
        • Tom侧聊天室
      • Tom发送消息
        • Jerry侧聊天室
        • Tom侧聊天室
  • 代码工程
  • 参考资料

在《RabbitMQ实践——搭建单人聊天服务》一文中,我们搭建了Tom和Jerry两人的聊天服务。在这个服务中,它们都向Fanout交换器发送消息。而Fanout会将消息路由到它们两各自监听的队列。这样它们就可以得到全部消息。
在这里插入图片描述
如果是多人聊天,比如10个人聊天,按上述方案,需要Fanout交换器绑定10个队列。这就会使得结构变得非常复杂。
这是因为Classic类型队列在消费者确认读取消息后,会将消息从队列中删除。这样就需要我们使用fanout向多个队列路由消息,以供不同消费者消费。如果多个消费者消费同一个队列,则会导致每个消费者得到的都是部分信息。这就不符合我们理解的聊天场景。
但是我们可以使用Stream类型队列来解决这个问题。
Stream类型队列和之前的Classic队列的不同点是:Stream队列并不会清除消息。消息会一直存在于Stream队列中,消费者可以从指定位置开始读取消息。这样我们只要有一个Stream队列保存消息,所有消费者都从队列中读取消息即可。
在这里插入图片描述

用户登录

关于用户登录的流程我们在《RabbitMQ实践——搭建单人聊天服务》中已经有详细的介绍。即上图中黑色字体1、2、3、4、5的步骤。

创建聊天室

我们会创建一个以聊天室名称命名的交换器和Stream类型队列。即上图中黑色字体6、7、8、9的步骤。
需要注意的是Stream类型队列创建方案和Classic类型类似,只需要多指定"x-queue-type"=“stream”。但是对于Durable(持久化)只能设置为True,exclusive只能设置为False,autoDelete只能设置为False。

package com.rabbitmq.chat.service;import java.util.Collections;
import java.util.Date;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import reactor.core.publisher.Flux;@Service
public class ChatRoomV2 {@Autowiredprivate RabbitTemplate rabbitTemplate;public void createChatRoom(String admin, String roomName) {createChatRoom(roomName);}private void createChatRoom(String roomName) {rabbitTemplate.execute(action -> {action.exchangeDeclare(roomName, "fanout", false, true, null);action.queueDeclare(roomName, true, false, false,Collections.singletonMap("x-queue-type", "stream"));action.queueBind(roomName, roomName, "");return null;});}

聊天室创建完毕后,会通知所有登录的用户。

    @PostMapping("/create")public void create(@RequestParam String admin, @RequestParam String roomName) {chatRoomV2.createChatRoom(admin, roomName);core.notifyEveryone(roomName + " is created");}

监听Stream(聊天室)

    public Flux<String> receive(String username, String roomName) {return Flux.create(emitter -> {rabbitTemplate.execute(channel -> {channel.basicQos(100);Date timestamp = new Date(System.currentTimeMillis());channel.basicConsume(roomName, false, username,false, true,Collections.singletonMap("x-stream-offset", timestamp),(consumerTag, message) -> {String senderOfMessage = message.getProperties().getHeaders().get("username").toString();String show = "You Said: ";if (!senderOfMessage.equals(username)) {show = senderOfMessage + " Said: ";}show += new String(message.getBody());System.out.println(show);emitter.next(show);channel.basicAck(message.getEnvelope().getDeliveryTag(), false);},consumerTag -> { });return null;});});}

我们将"x-stream-offset"设置为当前毫秒数,是表示我们只读取当前时间之后发布的消息。这也符合聊天室的业务特点:不能读取历史消息。
当我们收到消息后,我们会获取消息Header中的自定义字段username,它标志了消息的发布者。如果发布者和读取者是同一人,我们将展示内容前面新增“You Said:”;如果是别人说的,则标记发布者的名称。
由于我们使用了WebFlux响应式编程,所以Controller层要做特殊处理

    @GetMapping(value = "/receive", produces = "text/event-stream")public Flux<String> receive(@RequestParam String username, @RequestParam String roomName) {return chatRoomV2.receive(username, roomName);}

发送消息

每个聊天室用户只要给之前创建的Fanout交换器发送消息即可。在这一步,我们给他们发送的消息Header中新增了字段username,以标记是谁发送的。

    public void send(String username, String roomName, String message) {Message msg = MessageBuilder.withBody(message.getBytes()).setHeader("username", username).build();rabbitTemplate.send(roomName, "", msg);}

实验

登录

Tom侧

在这里插入图片描述

Jerry侧

在这里插入图片描述

创建聊天室

Jerry侧

Jerry申请创建一个聊天室
在这里插入图片描述
在管理后台,我们看到对应的交换器和Stream都创建出来了。
在这里插入图片描述
在这里插入图片描述
同时在刚才的登录接口界面,Jerry收到了通知
在这里插入图片描述

Tom侧

Tom也会收到通知
在这里插入图片描述

进入聊天室

Tom和Jerry在收到通知后,可以通过receive接口进入聊天室,监听聊天室内容变化。

Jerry侧

在这里插入图片描述

Tom侧

在这里插入图片描述

发送消息

Jerry发送消息

在这里插入图片描述

Jerry侧聊天室

在这里插入图片描述

Tom侧聊天室

在这里插入图片描述

Tom发送消息

在这里插入图片描述

Jerry侧聊天室

在这里插入图片描述

Tom侧聊天室

在这里插入图片描述

代码工程

https://github.com/f304646673/RabbitMQDemo

参考资料

  • https://www.rabbitmq.com/docs/streams

相关文章:

RabbitMQ实践——搭建多人聊天服务

大纲 用户登录创建聊天室监听Stream&#xff08;聊天室&#xff09;发送消息实验登录Tom侧Jerry侧 创建聊天室Jerry侧Tom侧 进入聊天室Jerry侧Tom侧 发送消息Jerry发送消息Jerry侧聊天室Tom侧聊天室 Tom发送消息Jerry侧聊天室Tom侧聊天室 代码工程参考资料 在《RabbitMQ实践——…...

git分布式版本控制系统

Git - Downloads (git-scm.com) gitee教程&#xff08;超全&#xff0c;超详细&#xff0c;超长&#xff09;-CSDN博客 Git教程 - 廖雪峰的官方网站 (liaoxuefeng.com) 所有的版本控制系统&#xff0c;其实只能跟踪文本文件改动&#xff0c;比如TXT文件&#xff0c;网页&…...

基于weixin小程序的民宿短租系统的设计与实现

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;房主管理&#xff0c;房间类型管理&#xff0c;用户管理&#xff0c;民宿信息管理&#xff0c;民宿预订管理&#xff0c;系统管理 小程序功能包括&#xff1a;系统首页&#xff0c;民宿信息&#xff0c…...

2024-06-22力扣每日一题

链接&#xff1a; 2663. 字典序最小的美丽字符串 题意 略 解&#xff1a; 要求字符串内不存在任何长度为 2 或更长的回文子字符串&#xff0c;则在任意位置不存在aa或aba形式 由于要被给定字符串字典序大&#xff0c;且找到符合条件的字典序最小字符串&#xff0c;则竟可…...

S_LOVE多端恋爱小站小程序源码 uniapp多端

S_LOVE多端恋爱小站小程序源码&#xff0c;采用uniapp多端开发框架进行开发&#xff0c;目前已适配H5、微信小程序版本。 源码下载&#xff1a;https://download.csdn.net/download/m0_66047725/89421726 更多资源下载&#xff1a;关注我。...

如何避免MySQL的死锁或性能下降

1、按顺序访问数据 确保多个线程或事务在访问多个表或行时&#xff0c;按照相同的顺序进行。这可以避免循环等待和资源竞争&#xff0c;从而降低死锁的风险。 2、避免长时间持有锁 尽量缩短事务的执行时间&#xff0c;避免长时间持有锁。长时间持有锁会增加其他事务等待的时…...

《C语言》编译和链接

文章目录 一、翻译环境1、预处理2、编译3、汇编4、链接 二、运行环境 一、翻译环境 在使用编译器编写代码时&#xff0c;编写的代码是高级语言&#xff0c;机器无法直接识别和运行&#xff0c;在编译器内部会翻译成机器可执行的机器语言。 编译环境由编译和链接两大过程组成。 …...

group by和select的兼容性问题

group by和select的兼容性问题 在标准的SQL语法中&#xff0c;GROUP BY 和 SELECT 之间不存在兼容性问题&#xff0c;因为它们是 SQL 查询语句的基本组成部分&#xff0c;而且它们的使用方式是相互兼容的。 SELECT 子句和 GROUP BY 子句的关系&#xff1a; SELECT 子句&#…...

切面aspect处理fegin调用转本地调用

切面处理fegin调用转本地调用 问题:原fegin调用转本地调用详细描述方案代码实现总结问题:原fegin调用转本地调用 项目原来是微服务项目服务与服务之间是通过fegin进行交互的,但是现在微服务项目要重构为单体项目,原fegin调用的方法要给为本地调用 详细描述 zyy-aiot │ …...

Linux 磁盘挂载与分区

Linux 磁盘挂载与分区 vda1: 其中vd表示虚拟磁盘&#xff0c;a表示第一块磁盘&#xff0c;b表示第二块磁盘&#xff0c;1表示第一块磁盘的第一分区&#xff08;显然两块磁盘都只有一个分区&#xff09;图中可以看到&#xff0c;vda1磁盘只有一个分区&#xff0c;且全部挂载到根…...

Open3D 将ShapeNet数据集txt转pcd

目录 一、概述 二、代码实现 三、实现效果 一、概述 ShapeNet 数据集是一个广泛使用的三维物体数据集&#xff0c;主要用于计算机视觉、计算机图形学、机器人学和机器学习等领域的研究。它包含大量的三维物体模型&#xff0c;并附有丰富的标注信息。ShapeNet 数据集由普林斯…...

综合项目实战--jenkins节点模式

一、DevOps流程 DevOps是一种方法论,是一系列可以帮助开发者和运维人员在实现各自目标的前提下,向自己的客户或用户交付最大化价值及最高质量成果的基本原则和实践,能让开发、测试、运维效率协同工作的方法。 DevOps流程(自动化测试部分) DevOps完整流程 二、gitee+j…...

WhaleStudio 2.6重磅发布!调度模块WhaleScheduler更新78项核心功能

我们很高兴地宣布WhaleStudio 2.6版本的正式发布&#xff01;新版本中包含了数据调度模块WhaleScheduler和数据集成模块WhaleTunnel的百余项核心功能更新&#xff0c;本文摘选了WhaleScheduler常用功能更新的概况&#xff0c;关于WhaleTunnel的更新详情将于近期发布&#xff0c…...

笔记101:OSQP求解器的底层算法 -- ADMM算法

前言1&#xff1a;这篇博客仅限于介绍拉格朗日乘子法&#xff0c;KKT条件&#xff0c;ALM算法&#xff0c;ADMM算法等最优化方法的使用以及简版代码实现&#xff0c;但不会涉及具体的数学推导&#xff1b;不过在下面我会给出具体数学推导的相关文章和截图&#xff0c;供学有余力…...

Java银系统/超市收银系统/智慧新零售/ERP进销存管理/线上商城/h5/小程序

>>>系统简述&#xff1a; 神点收银系统支持B2B2C多商户模式&#xff0c;系统基于前后端分离的架构&#xff0c;后端采用Java SpringBoot Mysql Mybatis Plus&#xff0c;前端基于当前流行的Uniapp、Element UI&#xff0c;支持小程序、h5。架构包含&#xff1a;会员端…...

大学网页制作作品1

作品须知&#xff1a;1.该网页作品预计分为5个页面&#xff08;其中1个登录页面&#xff0c;1个首页主页面&#xff0c;3个分页面&#xff09;&#xff0c;如需要可自行删改增加页面。&#xff08;总共约800行html,1200行css,100行js&#xff09; 2.此网页源代码只用于学习和模…...

【会议征稿,IEEE出版】第三届机器人、人工智能与智能控制国际会议(RAIIC 2024,7月5-7)

第三届机器人、人工智能与智能控制国际会议&#xff08;RAIIC 2024&#xff09;将于2024年7月5-7日中国绵阳举行。 RAIIC 2024是汇聚业界和学术界的顶级论坛&#xff0c;会议将邀请国内外著名专家就以传播机器人、人工智能与智能控制领域的技术进步、研究成果和应用做专题报告…...

离线部署OpenIM

目录 1.提取相关安装包和镜像 2.安装docker和docker-compose 3.依次导入镜像 4.解压安装包 5.执行安装命令 6.PC Web 验证 7.开放端口 7.1IM 端口 7.2Chat 端口 7.3 PC Web 及管理后台前端资源端口 “如果您在解决类似问题时也遇到了困难&#xff0c;希望我的经验分享…...

sql:between and日期毫秒精度过多导致的查询bug

复现 一般情况下&#xff0c;前端传的日期值大多都是yyyy-MM-dd HH:mm:ss(标准格式)&#xff0c;比如2024-06-25 10:49:50&#xff0c;但是在测试环境&#xff0c;测试人员测出了一个带毫秒的日期&#xff1a;比如2024-06-25 10:49:50.9999999 这种情况下会出现查询bug SELEC…...

【日常记录】【JS】优雅检测用户是否在指定元素的外部点击

文章目录 1、界面基本布局2、代码实现3、参考链接 1、界面基本布局 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0">…...

51单片机入门-直流电机(十五)

目录&#xff1a;1.直流电机驱动&#xff08;PWM&#xff09;2.LED呼吸灯&直流电机调速1.直流电机驱动&#xff08;PWM&#xff09;让他转的快一些让他转2us停1us2.LED呼吸灯&直流电机调速点亮一个LED&#xff1a;在循环里&#xff1a;点亮熄灭显示暗一些&#xff1a;让…...

保姆级教程:用Proteus 8.13和STM32F103C8T6复刻一个烟雾报警器仿真(附源码调试心得)

从零到一&#xff1a;Proteus与STM32烟雾报警器仿真全流程实战指南 第一次打开Proteus时&#xff0c;那个蓝色界面和密密麻麻的元件库让我既兴奋又茫然。作为一个刚接触嵌入式仿真的电子爱好者&#xff0c;我原本以为有了开源文件和代码就能轻松复现一个烟雾报警器仿真项目&…...

Qwen-Ranker Pro快速部署:Windows WSL2环境下Streamlit兼容性方案

Qwen-Ranker Pro快速部署&#xff1a;Windows WSL2环境下Streamlit兼容性方案 1. 环境准备与系统要求 在Windows WSL2环境中部署Qwen-Ranker Pro需要确保系统满足以下基本要求&#xff1a; 硬件要求&#xff1a; 内存&#xff1a;至少8GB RAM&#xff08;推荐16GB以上&…...

Notepad--:国产跨平台文本编辑器的终极指南与快速上手

Notepad--&#xff1a;国产跨平台文本编辑器的终极指南与快速上手 【免费下载链接】notepad-- 一个支持windows/linux/mac的文本编辑器&#xff0c;目标是做中国人自己的编辑器&#xff0c;来自中国。 项目地址: https://gitcode.com/GitHub_Trending/no/notepad-- Note…...

视频高清低延时直播/音视频点播/云点播/云直播EasyDSS在校园教育/K12教育等各场景中的应用介绍

在线教育的核心竞争力&#xff0c;归根结底在于教学体验的优劣&#xff0c;而视频技术作为线上教学的核心载体&#xff0c;直接决定了教学体验的上限。随着在线教育行业的快速迭代&#xff0c;学员对线上课堂的要求愈发严苛&#xff1a;不仅需要高清流畅、稳定无卡顿的音视频传…...

C++ 安全子集:探讨在关键任务系统中限制部分 C++ 特性(如 RTTI)的必要性

尊敬的各位专家、各位同仁&#xff0c;大家好。今天&#xff0c;我们齐聚一堂&#xff0c;共同探讨一个在软件工程领域&#xff0c;尤其是在关键任务系统&#xff08;Critical Mission Systems&#xff09;开发中至关重要的话题&#xff1a;C 安全子集——在严苛环境下限制部分…...

OpenCV实战:从相机响应函数(CRF)到HDR图像合成的完整流程解析

1. 相机响应函数(CRF)基础解析 第一次听说相机响应函数(CRF)时&#xff0c;我也是一头雾水。简单来说&#xff0c;CRF就是描述相机如何把真实世界的光线强度(L)转换成图像像素值(B)的数学关系。想象一下&#xff0c;你拿着手机对着同一个场景拍三张照片&#xff1a;一张很暗、一…...

NoSleep防休眠工具:系统唤醒与持续运行的高效解决方案

NoSleep防休眠工具&#xff1a;系统唤醒与持续运行的高效解决方案 【免费下载链接】NoSleep Lightweight Windows utility to prevent screen locking 项目地址: https://gitcode.com/gh_mirrors/nos/NoSleep 在数字化工作环境中&#xff0c;电脑意外休眠往往导致工作中…...

手把手教你用UML用例图梳理业务流程(附真实项目案例)

实战指南&#xff1a;用UML用例图重构电商订单系统业务流程 1. 为什么用例图是需求分析的基石 在软件开发的混沌初期&#xff0c;当产品经理、开发者和业务方还在用各自的语言描述需求时&#xff0c;UML用例图就像一盏明灯&#xff0c;它能跨越专业术语的鸿沟&#xff0c;用可视…...

如何用Mermaid Live Editor 5分钟创建专业图表

如何用Mermaid Live Editor 5分钟创建专业图表 【免费下载链接】mermaid-live-editor Edit, preview and share mermaid charts/diagrams. New implementation of the live editor. 项目地址: https://gitcode.com/GitHub_Trending/me/mermaid-live-editor Mermaid Live…...