当前位置: 首页 > news >正文

RabbitMQ实践——搭建单人聊天服务

大纲

  • 创建Core交换器
  • 用户登录
  • 发起聊天邀请
  • 接受邀请
  • 聊天
  • 实验过程
  • 总结
  • 代码工程

经过之前的若干节的学习,我们基本掌握了Rabbitmq各个组件和功能。本文我们将使用之前的知识搭建一个简单的单人聊天服务。
基本结构如下。为了避免Server有太多连线导致杂乱,下图将Server画成两个模块,实则是一个服务。
在这里插入图片描述
该服务由两个核心交换器构成。
Core交换器是服务启动时创建的,它主要是为了向不同用户传递“系统通知型”消息。比如Jerry向Tom发起聊天邀请,则是通过上面黑色字体6-10的流程发给了Core交换器。然后Core交换器将该条消息告知Tom。
Fanout交换器是用来消息传递的。Jerry和Tom都向其发送消息,然后路由到两个队列。它们两各自订阅一个队列,就可以看到彼此的聊天内容了。

创建Core交换器

package com.rabbitmq.chat.service;import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import jakarta.annotation.PostConstruct;
import reactor.core.publisher.Flux;@Service
public class Core {@Autowiredprivate RabbitTemplate rabbitTemplate;private ConnectionFactory connectionFactory;final String exchangeName = "Core";@PostConstructpublic void init() {connectionFactory = rabbitTemplate.getConnectionFactory();createExchange(exchangeName);}private void createExchange(String exchangeName) {rabbitTemplate.execute(channel -> {channel.exchangeDeclare(exchangeName, "direct", false, true, null);return null;});}

用户登录

用户登录后,我们会创建一个“系统通知”队列。然后用户就会通过长连接形式,持续等待系统发出通知。

    private final ReentrantLock lock = new ReentrantLock();final private Map<String, SimpleMessageListenerContainer> listeners = new java.util.HashMap<>();public Flux<String> Login(String username) {createExclusiveQueue(username);createBanding(exchangeName, username, username);return Flux.create(emitter -> {SimpleMessageListenerContainer container = getListener(username, (Message message) -> {String msg = new String(message.getBody());System.out.println("Received message: " + msg);emitter.next(msg);});container.start();});}private void createExchange(String exchangeName) {rabbitTemplate.execute(channel -> {channel.exchangeDeclare(exchangeName, "direct", false, true, null);return null;});}private void createBanding(String exchangeName, String queueName, String routingKey) {rabbitTemplate.execute(channel -> {channel.queueBind(queueName, exchangeName, routingKey);return null;});}private SimpleMessageListenerContainer getListener(String queueName, MessageListener messageListener) {lock.lock();try {SimpleMessageListenerContainer listener = listeners.get(queueName);if (listener == null && messageListener != null) {listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener(messageListener);listeners.put(queueName, listener);}return listener;} finally {lock.unlock();}}

Controller如下

package com.rabbitmq.chat.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import com.rabbitmq.chat.service.Core;import reactor.core.publisher.Flux;@RestController
@RequestMapping("/user")
public class UserController {@Autowiredprivate Core core;@PostMapping(value = "/login", produces = "text/event-stream")public Flux<String> login(@RequestParam String username) {return core.Login(username);}
}

发起聊天邀请

发起聊天邀请时,系统会预先创建一个聊天室(ChatRoomInfo )。它包含上图中Fanout交换器、以及聊天双方需要订阅的消息队列。
这些创建完后,发起方就会等待对方发送的消息,也可以自己和自己聊天。因为消息队列已经创建好了,只是对方还没使用。

package com.rabbitmq.chat.service;import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import jakarta.annotation.PostConstruct;
import lombok.Data;
import reactor.core.publisher.Flux;@Service
public class ChatRoom {@Autowiredprivate RabbitTemplate rabbitTemplate;private ConnectionFactory connectionFactory;@Dataprivate class ChatRoomInfo {private String exchange;private Map<String, String> usernameToQueuename;}private final Map<String, ChatRoomInfo> chatRooms = new java.util.HashMap<>();private final ReentrantLock lock = new ReentrantLock();   @PostConstructpublic void init() {connectionFactory = rabbitTemplate.getConnectionFactory();}public Flux<String> invite(String fromUsername, String toUsername) {String chatRoomName = getChatRoomName(fromUsername, toUsername);ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);if (chatRoomInfo == null) {createChatRoom(fromUsername, toUsername);}return talk(chatRoomName, fromUsername);}private void createChatRoom(String fromUsername, String toUsername) {String chatRoomName = getChatRoomName(fromUsername, toUsername);String exchangeName = chatRoomName;String fromQueueName = "queue-" + fromUsername + "-" + toUsername;String toQueueName = "queue-" + toUsername + "-" + fromUsername;rabbitTemplate.execute(action -> {action.exchangeDeclare(exchangeName, "fanout", false, true, null);action.queueDeclare(fromQueueName, false, true, false, null);action.queueDeclare(toQueueName, false, true, false, null);action.queueBind(fromQueueName, exchangeName, "");action.queueBind(toQueueName, exchangeName, "");return null;});lock.lock();try {ChatRoomInfo chatRoomInfo = new ChatRoomInfo();chatRoomInfo.setExchange(exchangeName);chatRoomInfo.setUsernameToQueuename(Map.of(fromUsername, fromQueueName, toUsername, toQueueName));chatRooms.put(chatRoomName, chatRoomInfo);} finally {lock.unlock();}}

接受邀请

被邀请方通过Core交换器得知有人要和它聊天。
然后接受邀请的请求会寻找聊天室信息,然后订阅聊天记录队列。

    public Flux<String> accept(String fromUsername, String toUsername) {String chatRoomName = getChatRoomName(fromUsername, toUsername);return talk(chatRoomName, toUsername);}private Flux<String> talk(String chatRoomName, String username) {ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);if (chatRoomInfo == null) {throw new IllegalArgumentException("Chat room not found");}String queueName = chatRoomInfo.getUsernameToQueuename().get(username);return Flux.create(emitter -> {SimpleMessageListenerContainer listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener((Message message) -> {String msg = new String(message.getBody());System.out.println(username + " received message: " + msg);emitter.next(msg);});listener.start();});}

聊天

聊天的逻辑就是找到聊天室信息,然后向交换器发送消息。

    public void chat(String fromUsername, String toUsername, String message) {String chatRoomName = getChatRoomName(fromUsername, toUsername);ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);if (chatRoomInfo == null) {chatRoomName = getChatRoomName(toUsername, fromUsername);chatRoomInfo = chatRooms.get(chatRoomName);}if (chatRoomInfo == null) {throw new IllegalArgumentException("Chat room not found");}rabbitTemplate.convertAndSend(chatRoomInfo.getExchange(), "", fromUsername + ": " + message);}private String getChatRoomName(String fromUsername, String toUsername) {return fromUsername + "-" + toUsername + "-chat-room";}

Controller侧代码

package com.rabbitmq.chat.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import com.rabbitmq.chat.service.ChatRoom;
import com.rabbitmq.chat.service.Core;import reactor.core.publisher.Flux;@RestController
@RequestMapping("/chat")
public class ChatController {@Autowiredprivate Core core;@Autowiredprivate ChatRoom chatRoom;@PutMapping(value = "/invite", produces = "text/event-stream")public Flux<String> invite(@RequestParam String fromUsername, @RequestParam String toUsername) {core.invite(fromUsername, toUsername);return chatRoom.invite(fromUsername, toUsername);}@PutMapping(value = "/accept", produces = "text/event-stream")public Flux<String> accept(@RequestParam String fromUsername, @RequestParam String toUsername) {core.accept(fromUsername, toUsername);return chatRoom.accept(fromUsername, toUsername);}@PostMapping("/send")public void send(@RequestParam String fromUsername, @RequestParam String toUsername, @RequestParam String message) {chatRoom.chat(fromUsername, toUsername, message);}
}

实验过程

在Postman中,我们先让tom登录,然后jerry登录。
在这里插入图片描述
在这里插入图片描述
在后台,我们看到创建两个队列
在这里插入图片描述
以及Core交换器的绑定关系也被更新
在这里插入图片描述
Jerry向Tom发起聊天邀请
在这里插入图片描述
可以看到Tom收到了邀请
在这里插入图片描述
同时新增了两个队列
在这里插入图片描述
以及一个交换器
在这里插入图片描述
在这里插入图片描述
Tom通过下面请求接受邀请
在这里插入图片描述
Jerry收到Tom接受了邀请的通知
在这里插入图片描述
后面它们就可以聊天了
在这里插入图片描述
在这里插入图片描述
它们的聊天窗口都收到了消息
在这里插入图片描述
在这里插入图片描述

总结

本文主要使用的知识点:

  • direct交换器以及其绑定规则
  • fanout交换器
  • 自动删除的交换器
  • 自动删除的队列
  • 只有一个消费者的队列
  • WebFlux响应式编程

代码工程

https://github.com/f304646673/RabbitMQDemo

相关文章:

RabbitMQ实践——搭建单人聊天服务

大纲 创建Core交换器用户登录发起聊天邀请接受邀请聊天实验过程总结代码工程 经过之前的若干节的学习&#xff0c;我们基本掌握了Rabbitmq各个组件和功能。本文我们将使用之前的知识搭建一个简单的单人聊天服务。 基本结构如下。为了避免Server有太多连线导致杂乱&#xff0c;下…...

GPT-5

欢迎来到 Papicatch的博客 文章目录 &#x1f349;技术突破预测 &#x1f348;算法进步 &#x1f348;理解力提升 &#x1f348;行业推动力 &#x1f349;人机协作的未来 &#x1f348;辅助决策 &#x1f348;增强创造力 &#x1f348;复杂任务中的角色 &#x1f348;人…...

Vip-智能预估+大数据标签+人群全选=用户分群!

Mobpush用户分群功能升级&#xff0c;创建推送入口vip用户可进入自有选择标签创建“用户分群”&#xff0c;相比于免费标签&#xff0c;“用户标签”维度更丰富。在应用基础属性上&#xff0c;增加“品牌”、“网络状态”、“运营商”&#xff0c;众所周知&#xff0c;不同厂商…...

SpringBoot异常处理机制之自定义404、500错误提示页面 - 518篇

历史文章&#xff08;文章累计500&#xff09; 《国内最全的Spring Boot系列之一》 《国内最全的Spring Boot系列之二》 《国内最全的Spring Boot系列之三》 《国内最全的Spring Boot系列之四》 《国内最全的Spring Boot系列之五》 《国内最全的Spring Boot系列之六》 《…...

为什么选择Xinstall CPA结算系统?因为它能帮您解决这些痛点!

在App推广和运营的道路上&#xff0c;我们时常面临着各种挑战和痛点。其中&#xff0c;结算系统的复杂性和不透明性往往成为制约我们发展的瓶颈。然而&#xff0c;有了Xinstall CPA结算系统&#xff0c;这些问题将迎刃而解&#xff0c;让您的App推广之路更加顺畅和高效。 一、…...

2024年【建筑电工(建筑特殊工种)】模拟试题及建筑电工(建筑特殊工种)作业考试题库

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2024年建筑电工(建筑特殊工种)模拟试题为正在备考建筑电工(建筑特殊工种)操作证的学员准备的理论考试专题&#xff0c;每个月更新的建筑电工(建筑特殊工种)作业考试题库祝您顺利通过建筑电工(建筑特殊工种)考试。 1、…...

解锁数字化转型的双引擎:MSP和CMP的力量

随着企业数字化转型的深入&#xff0c;云计算已经成为现代企业IT基础设施的重要组成部分。为了高效地管理和优化多云环境&#xff0c;企业通常会依赖管理服务提供商 (Managed Service Providers, MSP) 和云管理平台 (Cloud Management Platforms, CMP)。本文将探讨MSP和CMP的定…...

Pyecharts入门

数据可视化 Pyecharts简介 Apache ECharts 是一个由百度开源的数据可视化&#xff0c;凭借着良好的交互性&#xff0c;精巧的图表设计&#xff0c;得到了众多开发者的认可。而 Python 是一门富有表达力的语言&#xff0c;很适合用于数据处理。当数据分析遇上数据可视化时&#…...

Socket编程详解(一)服务端与客户端的双向对话

目录 预备知识 视频教程 项目前准备知识点 1、服务器端程序的编写步骤 2、客户端程序编写步骤 代码部分 1、服务端FrmServer.cs文件 2、客户端FrmClient.cs文件 3、启动文件Program.cs 结果展示 预备知识 请查阅博客http://t.csdnimg.cn/jE4Tp 视频教程 链接&#…...

使用Python实现深度学习模型:强化学习与深度Q网络(DQN)

深度Q网络(Deep Q-Network,DQN)是结合深度学习与强化学习的一种方法,用于解决复杂的决策问题。本文将详细介绍如何使用Python实现DQN,主要包括以下几个方面: 强化学习简介DQN算法简介环境搭建DQN模型实现模型训练与评估1. 强化学习简介 强化学习是一种训练智能体(agent…...

Py-Spy、Scalene 和 VizTracer 的对比分析

在前几篇文章中&#xff0c;我们详细介绍了如何使用 py-spy、scalene 和 viztracer 进行性能分析和优化。今天&#xff0c;我们将对这三个性能分析工具进行详细对比&#xff0c;帮助你选择最适合你的工具。 工具简介 Py-Spy&#xff1a; 实时性能分析&#xff1a;Py-Spy 可以…...

软考架构师考试内容

软考系统架构设计师考试是中国计算机技术与软件专业技术资格&#xff08;水平&#xff09;考试&#xff08;简称软考&#xff09;中的一项高级资格考试&#xff0c;旨在评估考生是否具备系统架构设计的能力。根据提供的参考资料&#xff0c;考试内容主要包括以下几个方面&#…...

【MySQL基础篇】概述及SQL指令:DDL及DML

数据库是一个按照数据结构来组织、存储和管理数据的仓库。以下是对数据库概念的详细解释&#xff1a;定义与基本概念&#xff1a; 数据库是长期存储在计算机内的、有组织的、可共享的、统一管理的大量数据的集合。 数据库不仅仅是数据的简单堆积&#xff0c;而是遵循一定的规则…...

计算机网络 —— 网络字节序

网络字节序 1、网络字节序 (Network Byte Order)和本机转换 1、大端、小端字节序 “大端” 和” 小端” 表示多字节值的哪一端存储在该值的起始地址处&#xff1b;小端存储在起始地址处&#xff0c;即是小端字节序&#xff1b;大端存储在起始地址处&#xff0c;即是大端字节…...

区块链不可能三角

区块链不可能三角&#xff1a;探索去中心化、安全与可扩展性的权衡 引言 区块链技术自诞生以来&#xff0c;以其去中心化、透明、安全等特点吸引了全球的关注&#xff0c;成为金融科技领域的重要革新力量。然而&#xff0c;随着区块链应用的日益广泛&#xff0c;一个核心问题…...

新手第一个漏洞复现:MS17-010(永恒之蓝)

文章目录 漏洞原理漏洞影响范围复现环境复现步骤 漏洞原理 漏洞出现在Windows SMB v1中的内核态函数srv!SrvOs2FeaListToNt在处理FEA&#xff08;File Extended Attributes&#xff09;转换时。该函数在将FEA list转换成NTFEA&#xff08;Windows NT FEA&#xff09;list前&am…...

代码随想录Day64

98.所有可达路径 题目&#xff1a;98. 所有可达路径 (kamacoder.com) 思路&#xff1a;果断放弃 答案 import java.util.*;public class Main {private static List<List<Integer>> adjList;private static List<List<Integer>> allPaths;private sta…...

Angular 指令

Angular 指令是 Angular 框架中的一项核心功能&#xff0c;它允许开发人员扩展 HTML 的功能&#xff0c;并创建可复用的组件和行为。以下是一些常见的 Angular 指令&#xff1a; 1. 组件指令 (Component Directives) 组件指令是最常用的一种指令&#xff0c;用于创建可复用的 U…...

移动端 UI 风格,书写华丽篇章

移动端 UI 风格&#xff0c;书写华丽篇章...

flutter开发实战-ListWheelScrollView与自定义TimePicker时间选择器

flutter开发实战-ListWheelScrollView与自定义TimePicker 最近在使用时间选择器的时候&#xff0c;需要自定义一个TimePicker效果&#xff0c;当然这里就使用了ListWheelScrollView。ListWheelScrollView与ListView类似&#xff0c;但ListWheelScrollView渲染效果类似滚筒效果…...

Android 相机有线连接开发复盘:PTP/MTP 协议适配与稳定性实践

一、项目背景在做一个相机互联类 App 的过程中&#xff0c;我们需要在 Android 设备上通过 USB 有线方式​ 连接相机&#xff0c;实现&#xff1a;遥控拍摄实时获取照片稳定地进行文件同步最初评估时以为只要调用系统 API 就能跑起来&#xff0c;但实际开发中发现&#xff0c;标…...

3个核心功能让Notepad++成为你的Markdown高效编辑器

3个核心功能让Notepad成为你的Markdown高效编辑器 【免费下载链接】MarkdownViewerPlusPlus A Notepad Plugin to view a Markdown file rendered on-the-fly 项目地址: https://gitcode.com/gh_mirrors/ma/MarkdownViewerPlusPlus 你是否曾经在Notepad中编写Markdown文…...

告别轮询!用STM32CubeMX和DMA实现ADC多通道‘无感’采集与串口打印(附完整工程)

告别轮询&#xff01;STM32CubeMX与DMA实现ADC多通道无感采集实战指南 在嵌入式开发中&#xff0c;数据采集系统的效率往往决定了整个应用的性能上限。传统轮询方式不仅消耗大量CPU资源&#xff0c;还会引入不可预测的延迟。想象一下&#xff0c;当你需要同时监测多个环境传感器…...

甲级钢制隔热平开防火窗:技术参数、结构工艺与工程应用解析

一、产品概述甲级钢制隔热平开防火窗严格依照国家消防标准制造&#xff0c;采用加厚冷轧镀锌钢板打造框架&#xff0c;搭配防火填充材料、隔热防火玻璃与专用密封配件&#xff0c;防火隔热、密闭性强&#xff0c;耐用抗腐蚀。相较于低等级防火窗&#xff0c;本品耐火隔热性能更…...

AI人才缺口500万:2026年最值得入局的10个职业方向

人社部最新披露的数据让人心惊&#xff1a;我国人工智能相关人才缺口已突破500万&#xff0c;平均10个岗位在抢1个人。 2026年春天&#xff0c;一边是考公大军挤破脑袋&#xff0c;另一边是大厂拿着月薪6万的支票本愁得睡不着觉。智能体开发岗位需求暴涨455%&#xff0c;就连零…...

麒麟系统离线安装PostgreSQL?手把手教你用dnf和repotrack搞定所有依赖包

麒麟系统离线部署PostgreSQL全攻略&#xff1a;从依赖包下载到本地仓库构建 在政企级IT基础设施中&#xff0c;麒麟操作系统因其安全可控的特性成为关键业务系统的首选平台。当这些系统运行在物理隔离的内网环境时&#xff0c;如何解决软件依赖的"最后一公里"问题&am…...

LAV Filters终极指南:深度解析开源DirectShow解码器的架构原理与实战配置

LAV Filters终极指南&#xff1a;深度解析开源DirectShow解码器的架构原理与实战配置 【免费下载链接】LAVFilters LAV Filters - Open-Source DirectShow Media Splitter and Decoders 项目地址: https://gitcode.com/gh_mirrors/la/LAVFilters LAV Filters是一套基于F…...

手把手教你用Obsidian+Excalidraw画流程图,告别切换软件的麻烦

手把手教你用ObsidianExcalidraw画流程图&#xff0c;告别切换软件的麻烦 每次写技术文档时&#xff0c;最让我头疼的就是画流程图。原本思路清晰&#xff0c;一打开绘图软件就卡壳——要么是工具太复杂&#xff0c;要么是画完图还要导出再插入笔记&#xff0c;来回切换几次灵感…...

ETT数据集实战:如何用油温预测优化电网负载与设备维护策略

ETT数据集实战&#xff1a;如何用油温预测优化电网负载与设备维护策略 当一座城市的电网在盛夏午后突然崩溃&#xff0c;背后往往隐藏着变压器油温失控的连锁反应。去年某沿海城市电网的故障分析报告显示&#xff0c;超过60%的突发停电事件与变压器过热直接相关——这个数据让行…...

山海再赴,探索向新|2026 第二届搜狐极限探索者大会盛大启航!

2025年6月5日&#xff0c;由搜狐主办的首届搜狐极限探索者大会在北京盛大举行。大会以“致敬极限探索者”&#xff08;Salute to the Ultimate Explorers&#xff09;为主题&#xff0c;汇聚中国上百位各极限运动领域顶尖的探索者、企业及明星嘉宾&#xff0c;通过巅峰演讲、深…...