RabbitMQ实践——搭建单人聊天服务
大纲
- 创建Core交换器
- 用户登录
- 发起聊天邀请
- 接受邀请
- 聊天
- 实验过程
- 总结
- 代码工程
经过之前的若干节的学习,我们基本掌握了Rabbitmq各个组件和功能。本文我们将使用之前的知识搭建一个简单的单人聊天服务。
基本结构如下。为了避免Server有太多连线导致杂乱,下图将Server画成两个模块,实则是一个服务。

该服务由两个核心交换器构成。
Core交换器是服务启动时创建的,它主要是为了向不同用户传递“系统通知型”消息。比如Jerry向Tom发起聊天邀请,则是通过上面黑色字体6-10的流程发给了Core交换器。然后Core交换器将该条消息告知Tom。
Fanout交换器是用来消息传递的。Jerry和Tom都向其发送消息,然后路由到两个队列。它们两各自订阅一个队列,就可以看到彼此的聊天内容了。
创建Core交换器
package com.rabbitmq.chat.service;import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import jakarta.annotation.PostConstruct;
import reactor.core.publisher.Flux;@Service
public class Core {@Autowiredprivate RabbitTemplate rabbitTemplate;private ConnectionFactory connectionFactory;final String exchangeName = "Core";@PostConstructpublic void init() {connectionFactory = rabbitTemplate.getConnectionFactory();createExchange(exchangeName);}private void createExchange(String exchangeName) {rabbitTemplate.execute(channel -> {channel.exchangeDeclare(exchangeName, "direct", false, true, null);return null;});}
用户登录
用户登录后,我们会创建一个“系统通知”队列。然后用户就会通过长连接形式,持续等待系统发出通知。
private final ReentrantLock lock = new ReentrantLock();final private Map<String, SimpleMessageListenerContainer> listeners = new java.util.HashMap<>();public Flux<String> Login(String username) {createExclusiveQueue(username);createBanding(exchangeName, username, username);return Flux.create(emitter -> {SimpleMessageListenerContainer container = getListener(username, (Message message) -> {String msg = new String(message.getBody());System.out.println("Received message: " + msg);emitter.next(msg);});container.start();});}private void createExchange(String exchangeName) {rabbitTemplate.execute(channel -> {channel.exchangeDeclare(exchangeName, "direct", false, true, null);return null;});}private void createBanding(String exchangeName, String queueName, String routingKey) {rabbitTemplate.execute(channel -> {channel.queueBind(queueName, exchangeName, routingKey);return null;});}private SimpleMessageListenerContainer getListener(String queueName, MessageListener messageListener) {lock.lock();try {SimpleMessageListenerContainer listener = listeners.get(queueName);if (listener == null && messageListener != null) {listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener(messageListener);listeners.put(queueName, listener);}return listener;} finally {lock.unlock();}}
Controller如下
package com.rabbitmq.chat.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import com.rabbitmq.chat.service.Core;import reactor.core.publisher.Flux;@RestController
@RequestMapping("/user")
public class UserController {@Autowiredprivate Core core;@PostMapping(value = "/login", produces = "text/event-stream")public Flux<String> login(@RequestParam String username) {return core.Login(username);}
}
发起聊天邀请
发起聊天邀请时,系统会预先创建一个聊天室(ChatRoomInfo )。它包含上图中Fanout交换器、以及聊天双方需要订阅的消息队列。
这些创建完后,发起方就会等待对方发送的消息,也可以自己和自己聊天。因为消息队列已经创建好了,只是对方还没使用。
package com.rabbitmq.chat.service;import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import jakarta.annotation.PostConstruct;
import lombok.Data;
import reactor.core.publisher.Flux;@Service
public class ChatRoom {@Autowiredprivate RabbitTemplate rabbitTemplate;private ConnectionFactory connectionFactory;@Dataprivate class ChatRoomInfo {private String exchange;private Map<String, String> usernameToQueuename;}private final Map<String, ChatRoomInfo> chatRooms = new java.util.HashMap<>();private final ReentrantLock lock = new ReentrantLock(); @PostConstructpublic void init() {connectionFactory = rabbitTemplate.getConnectionFactory();}public Flux<String> invite(String fromUsername, String toUsername) {String chatRoomName = getChatRoomName(fromUsername, toUsername);ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);if (chatRoomInfo == null) {createChatRoom(fromUsername, toUsername);}return talk(chatRoomName, fromUsername);}private void createChatRoom(String fromUsername, String toUsername) {String chatRoomName = getChatRoomName(fromUsername, toUsername);String exchangeName = chatRoomName;String fromQueueName = "queue-" + fromUsername + "-" + toUsername;String toQueueName = "queue-" + toUsername + "-" + fromUsername;rabbitTemplate.execute(action -> {action.exchangeDeclare(exchangeName, "fanout", false, true, null);action.queueDeclare(fromQueueName, false, true, false, null);action.queueDeclare(toQueueName, false, true, false, null);action.queueBind(fromQueueName, exchangeName, "");action.queueBind(toQueueName, exchangeName, "");return null;});lock.lock();try {ChatRoomInfo chatRoomInfo = new ChatRoomInfo();chatRoomInfo.setExchange(exchangeName);chatRoomInfo.setUsernameToQueuename(Map.of(fromUsername, fromQueueName, toUsername, toQueueName));chatRooms.put(chatRoomName, chatRoomInfo);} finally {lock.unlock();}}
接受邀请
被邀请方通过Core交换器得知有人要和它聊天。
然后接受邀请的请求会寻找聊天室信息,然后订阅聊天记录队列。
public Flux<String> accept(String fromUsername, String toUsername) {String chatRoomName = getChatRoomName(fromUsername, toUsername);return talk(chatRoomName, toUsername);}private Flux<String> talk(String chatRoomName, String username) {ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);if (chatRoomInfo == null) {throw new IllegalArgumentException("Chat room not found");}String queueName = chatRoomInfo.getUsernameToQueuename().get(username);return Flux.create(emitter -> {SimpleMessageListenerContainer listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener((Message message) -> {String msg = new String(message.getBody());System.out.println(username + " received message: " + msg);emitter.next(msg);});listener.start();});}
聊天
聊天的逻辑就是找到聊天室信息,然后向交换器发送消息。
public void chat(String fromUsername, String toUsername, String message) {String chatRoomName = getChatRoomName(fromUsername, toUsername);ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);if (chatRoomInfo == null) {chatRoomName = getChatRoomName(toUsername, fromUsername);chatRoomInfo = chatRooms.get(chatRoomName);}if (chatRoomInfo == null) {throw new IllegalArgumentException("Chat room not found");}rabbitTemplate.convertAndSend(chatRoomInfo.getExchange(), "", fromUsername + ": " + message);}private String getChatRoomName(String fromUsername, String toUsername) {return fromUsername + "-" + toUsername + "-chat-room";}
Controller侧代码
package com.rabbitmq.chat.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import com.rabbitmq.chat.service.ChatRoom;
import com.rabbitmq.chat.service.Core;import reactor.core.publisher.Flux;@RestController
@RequestMapping("/chat")
public class ChatController {@Autowiredprivate Core core;@Autowiredprivate ChatRoom chatRoom;@PutMapping(value = "/invite", produces = "text/event-stream")public Flux<String> invite(@RequestParam String fromUsername, @RequestParam String toUsername) {core.invite(fromUsername, toUsername);return chatRoom.invite(fromUsername, toUsername);}@PutMapping(value = "/accept", produces = "text/event-stream")public Flux<String> accept(@RequestParam String fromUsername, @RequestParam String toUsername) {core.accept(fromUsername, toUsername);return chatRoom.accept(fromUsername, toUsername);}@PostMapping("/send")public void send(@RequestParam String fromUsername, @RequestParam String toUsername, @RequestParam String message) {chatRoom.chat(fromUsername, toUsername, message);}
}
实验过程
在Postman中,我们先让tom登录,然后jerry登录。


在后台,我们看到创建两个队列

以及Core交换器的绑定关系也被更新

Jerry向Tom发起聊天邀请

可以看到Tom收到了邀请

同时新增了两个队列

以及一个交换器


Tom通过下面请求接受邀请

Jerry收到Tom接受了邀请的通知

后面它们就可以聊天了


它们的聊天窗口都收到了消息


总结
本文主要使用的知识点:
- direct交换器以及其绑定规则
- fanout交换器
- 自动删除的交换器
- 自动删除的队列
- 只有一个消费者的队列
- WebFlux响应式编程
代码工程
https://github.com/f304646673/RabbitMQDemo
相关文章:
RabbitMQ实践——搭建单人聊天服务
大纲 创建Core交换器用户登录发起聊天邀请接受邀请聊天实验过程总结代码工程 经过之前的若干节的学习,我们基本掌握了Rabbitmq各个组件和功能。本文我们将使用之前的知识搭建一个简单的单人聊天服务。 基本结构如下。为了避免Server有太多连线导致杂乱,下…...
GPT-5
欢迎来到 Papicatch的博客 文章目录 🍉技术突破预测 🍈算法进步 🍈理解力提升 🍈行业推动力 🍉人机协作的未来 🍈辅助决策 🍈增强创造力 🍈复杂任务中的角色 🍈人…...
Vip-智能预估+大数据标签+人群全选=用户分群!
Mobpush用户分群功能升级,创建推送入口vip用户可进入自有选择标签创建“用户分群”,相比于免费标签,“用户标签”维度更丰富。在应用基础属性上,增加“品牌”、“网络状态”、“运营商”,众所周知,不同厂商…...
SpringBoot异常处理机制之自定义404、500错误提示页面 - 518篇
历史文章(文章累计500) 《国内最全的Spring Boot系列之一》 《国内最全的Spring Boot系列之二》 《国内最全的Spring Boot系列之三》 《国内最全的Spring Boot系列之四》 《国内最全的Spring Boot系列之五》 《国内最全的Spring Boot系列之六》 《…...
为什么选择Xinstall CPA结算系统?因为它能帮您解决这些痛点!
在App推广和运营的道路上,我们时常面临着各种挑战和痛点。其中,结算系统的复杂性和不透明性往往成为制约我们发展的瓶颈。然而,有了Xinstall CPA结算系统,这些问题将迎刃而解,让您的App推广之路更加顺畅和高效。 一、…...
2024年【建筑电工(建筑特殊工种)】模拟试题及建筑电工(建筑特殊工种)作业考试题库
题库来源:安全生产模拟考试一点通公众号小程序 2024年建筑电工(建筑特殊工种)模拟试题为正在备考建筑电工(建筑特殊工种)操作证的学员准备的理论考试专题,每个月更新的建筑电工(建筑特殊工种)作业考试题库祝您顺利通过建筑电工(建筑特殊工种)考试。 1、…...
解锁数字化转型的双引擎:MSP和CMP的力量
随着企业数字化转型的深入,云计算已经成为现代企业IT基础设施的重要组成部分。为了高效地管理和优化多云环境,企业通常会依赖管理服务提供商 (Managed Service Providers, MSP) 和云管理平台 (Cloud Management Platforms, CMP)。本文将探讨MSP和CMP的定…...
Pyecharts入门
数据可视化 Pyecharts简介 Apache ECharts 是一个由百度开源的数据可视化,凭借着良好的交互性,精巧的图表设计,得到了众多开发者的认可。而 Python 是一门富有表达力的语言,很适合用于数据处理。当数据分析遇上数据可视化时&#…...
Socket编程详解(一)服务端与客户端的双向对话
目录 预备知识 视频教程 项目前准备知识点 1、服务器端程序的编写步骤 2、客户端程序编写步骤 代码部分 1、服务端FrmServer.cs文件 2、客户端FrmClient.cs文件 3、启动文件Program.cs 结果展示 预备知识 请查阅博客http://t.csdnimg.cn/jE4Tp 视频教程 链接&#…...
使用Python实现深度学习模型:强化学习与深度Q网络(DQN)
深度Q网络(Deep Q-Network,DQN)是结合深度学习与强化学习的一种方法,用于解决复杂的决策问题。本文将详细介绍如何使用Python实现DQN,主要包括以下几个方面: 强化学习简介DQN算法简介环境搭建DQN模型实现模型训练与评估1. 强化学习简介 强化学习是一种训练智能体(agent…...
Py-Spy、Scalene 和 VizTracer 的对比分析
在前几篇文章中,我们详细介绍了如何使用 py-spy、scalene 和 viztracer 进行性能分析和优化。今天,我们将对这三个性能分析工具进行详细对比,帮助你选择最适合你的工具。 工具简介 Py-Spy: 实时性能分析:Py-Spy 可以…...
软考架构师考试内容
软考系统架构设计师考试是中国计算机技术与软件专业技术资格(水平)考试(简称软考)中的一项高级资格考试,旨在评估考生是否具备系统架构设计的能力。根据提供的参考资料,考试内容主要包括以下几个方面&#…...
【MySQL基础篇】概述及SQL指令:DDL及DML
数据库是一个按照数据结构来组织、存储和管理数据的仓库。以下是对数据库概念的详细解释:定义与基本概念: 数据库是长期存储在计算机内的、有组织的、可共享的、统一管理的大量数据的集合。 数据库不仅仅是数据的简单堆积,而是遵循一定的规则…...
计算机网络 —— 网络字节序
网络字节序 1、网络字节序 (Network Byte Order)和本机转换 1、大端、小端字节序 “大端” 和” 小端” 表示多字节值的哪一端存储在该值的起始地址处;小端存储在起始地址处,即是小端字节序;大端存储在起始地址处,即是大端字节…...
区块链不可能三角
区块链不可能三角:探索去中心化、安全与可扩展性的权衡 引言 区块链技术自诞生以来,以其去中心化、透明、安全等特点吸引了全球的关注,成为金融科技领域的重要革新力量。然而,随着区块链应用的日益广泛,一个核心问题…...
新手第一个漏洞复现:MS17-010(永恒之蓝)
文章目录 漏洞原理漏洞影响范围复现环境复现步骤 漏洞原理 漏洞出现在Windows SMB v1中的内核态函数srv!SrvOs2FeaListToNt在处理FEA(File Extended Attributes)转换时。该函数在将FEA list转换成NTFEA(Windows NT FEA)list前&am…...
代码随想录Day64
98.所有可达路径 题目:98. 所有可达路径 (kamacoder.com) 思路:果断放弃 答案 import java.util.*;public class Main {private static List<List<Integer>> adjList;private static List<List<Integer>> allPaths;private sta…...
Angular 指令
Angular 指令是 Angular 框架中的一项核心功能,它允许开发人员扩展 HTML 的功能,并创建可复用的组件和行为。以下是一些常见的 Angular 指令: 1. 组件指令 (Component Directives) 组件指令是最常用的一种指令,用于创建可复用的 U…...
移动端 UI 风格,书写华丽篇章
移动端 UI 风格,书写华丽篇章...
flutter开发实战-ListWheelScrollView与自定义TimePicker时间选择器
flutter开发实战-ListWheelScrollView与自定义TimePicker 最近在使用时间选择器的时候,需要自定义一个TimePicker效果,当然这里就使用了ListWheelScrollView。ListWheelScrollView与ListView类似,但ListWheelScrollView渲染效果类似滚筒效果…...
Spark 之 入门讲解详细版(1)
1、简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架。Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目,速度之快足见过人之处&…...
R语言AI模型部署方案:精准离线运行详解
R语言AI模型部署方案:精准离线运行详解 一、项目概述 本文将构建一个完整的R语言AI部署解决方案,实现鸢尾花分类模型的训练、保存、离线部署和预测功能。核心特点: 100%离线运行能力自包含环境依赖生产级错误处理跨平台兼容性模型版本管理# 文件结构说明 Iris_AI_Deployme…...
服务器硬防的应用场景都有哪些?
服务器硬防是指一种通过硬件设备层面的安全措施来防御服务器系统受到网络攻击的方式,避免服务器受到各种恶意攻击和网络威胁,那么,服务器硬防通常都会应用在哪些场景当中呢? 硬防服务器中一般会配备入侵检测系统和预防系统&#x…...
Qt Http Server模块功能及架构
Qt Http Server 是 Qt 6.0 中引入的一个新模块,它提供了一个轻量级的 HTTP 服务器实现,主要用于构建基于 HTTP 的应用程序和服务。 功能介绍: 主要功能 HTTP服务器功能: 支持 HTTP/1.1 协议 简单的请求/响应处理模型 支持 GET…...
Android Bitmap治理全解析:从加载优化到泄漏防控的全生命周期管理
引言 Bitmap(位图)是Android应用内存占用的“头号杀手”。一张1080P(1920x1080)的图片以ARGB_8888格式加载时,内存占用高达8MB(192010804字节)。据统计,超过60%的应用OOM崩溃与Bitm…...
DeepSeek 技术赋能无人农场协同作业:用 AI 重构农田管理 “神经网”
目录 一、引言二、DeepSeek 技术大揭秘2.1 核心架构解析2.2 关键技术剖析 三、智能农业无人农场协同作业现状3.1 发展现状概述3.2 协同作业模式介绍 四、DeepSeek 的 “农场奇妙游”4.1 数据处理与分析4.2 作物生长监测与预测4.3 病虫害防治4.4 农机协同作业调度 五、实际案例大…...
基于IDIG-GAN的小样本电机轴承故障诊断
目录 🔍 核心问题 一、IDIG-GAN模型原理 1. 整体架构 2. 核心创新点 (1) 梯度归一化(Gradient Normalization) (2) 判别器梯度间隙正则化(Discriminator Gradient Gap Regularization) (3) 自注意力机制(Self-Attention) 3. 完整损失函数 二…...
C++--string的模拟实现
一,引言 string的模拟实现是只对string对象中给的主要功能经行模拟实现,其目的是加强对string的底层了解,以便于在以后的学习或者工作中更加熟练的使用string。本文中的代码仅供参考并不唯一。 二,默认成员函数 string主要有三个成员变量,…...
【java面试】微服务篇
【java面试】微服务篇 一、总体框架二、Springcloud(一)Springcloud五大组件(二)服务注册和发现1、Eureka2、Nacos (三)负载均衡1、Ribbon负载均衡流程2、Ribbon负载均衡策略3、自定义负载均衡策略4、总结 …...
PydanticAI快速入门示例
参考链接:https://ai.pydantic.dev/#why-use-pydanticai 示例代码 from pydantic_ai import Agent from pydantic_ai.models.openai import OpenAIModel from pydantic_ai.providers.openai import OpenAIProvider# 配置使用阿里云通义千问模型 model OpenAIMode…...
