当前位置: 首页 > news >正文

RabbitMQ实践——搭建单人聊天服务

大纲

  • 创建Core交换器
  • 用户登录
  • 发起聊天邀请
  • 接受邀请
  • 聊天
  • 实验过程
  • 总结
  • 代码工程

经过之前的若干节的学习,我们基本掌握了Rabbitmq各个组件和功能。本文我们将使用之前的知识搭建一个简单的单人聊天服务。
基本结构如下。为了避免Server有太多连线导致杂乱,下图将Server画成两个模块,实则是一个服务。
在这里插入图片描述
该服务由两个核心交换器构成。
Core交换器是服务启动时创建的,它主要是为了向不同用户传递“系统通知型”消息。比如Jerry向Tom发起聊天邀请,则是通过上面黑色字体6-10的流程发给了Core交换器。然后Core交换器将该条消息告知Tom。
Fanout交换器是用来消息传递的。Jerry和Tom都向其发送消息,然后路由到两个队列。它们两各自订阅一个队列,就可以看到彼此的聊天内容了。

创建Core交换器

package com.rabbitmq.chat.service;import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import jakarta.annotation.PostConstruct;
import reactor.core.publisher.Flux;@Service
public class Core {@Autowiredprivate RabbitTemplate rabbitTemplate;private ConnectionFactory connectionFactory;final String exchangeName = "Core";@PostConstructpublic void init() {connectionFactory = rabbitTemplate.getConnectionFactory();createExchange(exchangeName);}private void createExchange(String exchangeName) {rabbitTemplate.execute(channel -> {channel.exchangeDeclare(exchangeName, "direct", false, true, null);return null;});}

用户登录

用户登录后,我们会创建一个“系统通知”队列。然后用户就会通过长连接形式,持续等待系统发出通知。

    private final ReentrantLock lock = new ReentrantLock();final private Map<String, SimpleMessageListenerContainer> listeners = new java.util.HashMap<>();public Flux<String> Login(String username) {createExclusiveQueue(username);createBanding(exchangeName, username, username);return Flux.create(emitter -> {SimpleMessageListenerContainer container = getListener(username, (Message message) -> {String msg = new String(message.getBody());System.out.println("Received message: " + msg);emitter.next(msg);});container.start();});}private void createExchange(String exchangeName) {rabbitTemplate.execute(channel -> {channel.exchangeDeclare(exchangeName, "direct", false, true, null);return null;});}private void createBanding(String exchangeName, String queueName, String routingKey) {rabbitTemplate.execute(channel -> {channel.queueBind(queueName, exchangeName, routingKey);return null;});}private SimpleMessageListenerContainer getListener(String queueName, MessageListener messageListener) {lock.lock();try {SimpleMessageListenerContainer listener = listeners.get(queueName);if (listener == null && messageListener != null) {listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener(messageListener);listeners.put(queueName, listener);}return listener;} finally {lock.unlock();}}

Controller如下

package com.rabbitmq.chat.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import com.rabbitmq.chat.service.Core;import reactor.core.publisher.Flux;@RestController
@RequestMapping("/user")
public class UserController {@Autowiredprivate Core core;@PostMapping(value = "/login", produces = "text/event-stream")public Flux<String> login(@RequestParam String username) {return core.Login(username);}
}

发起聊天邀请

发起聊天邀请时,系统会预先创建一个聊天室(ChatRoomInfo )。它包含上图中Fanout交换器、以及聊天双方需要订阅的消息队列。
这些创建完后,发起方就会等待对方发送的消息,也可以自己和自己聊天。因为消息队列已经创建好了,只是对方还没使用。

package com.rabbitmq.chat.service;import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import jakarta.annotation.PostConstruct;
import lombok.Data;
import reactor.core.publisher.Flux;@Service
public class ChatRoom {@Autowiredprivate RabbitTemplate rabbitTemplate;private ConnectionFactory connectionFactory;@Dataprivate class ChatRoomInfo {private String exchange;private Map<String, String> usernameToQueuename;}private final Map<String, ChatRoomInfo> chatRooms = new java.util.HashMap<>();private final ReentrantLock lock = new ReentrantLock();   @PostConstructpublic void init() {connectionFactory = rabbitTemplate.getConnectionFactory();}public Flux<String> invite(String fromUsername, String toUsername) {String chatRoomName = getChatRoomName(fromUsername, toUsername);ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);if (chatRoomInfo == null) {createChatRoom(fromUsername, toUsername);}return talk(chatRoomName, fromUsername);}private void createChatRoom(String fromUsername, String toUsername) {String chatRoomName = getChatRoomName(fromUsername, toUsername);String exchangeName = chatRoomName;String fromQueueName = "queue-" + fromUsername + "-" + toUsername;String toQueueName = "queue-" + toUsername + "-" + fromUsername;rabbitTemplate.execute(action -> {action.exchangeDeclare(exchangeName, "fanout", false, true, null);action.queueDeclare(fromQueueName, false, true, false, null);action.queueDeclare(toQueueName, false, true, false, null);action.queueBind(fromQueueName, exchangeName, "");action.queueBind(toQueueName, exchangeName, "");return null;});lock.lock();try {ChatRoomInfo chatRoomInfo = new ChatRoomInfo();chatRoomInfo.setExchange(exchangeName);chatRoomInfo.setUsernameToQueuename(Map.of(fromUsername, fromQueueName, toUsername, toQueueName));chatRooms.put(chatRoomName, chatRoomInfo);} finally {lock.unlock();}}

接受邀请

被邀请方通过Core交换器得知有人要和它聊天。
然后接受邀请的请求会寻找聊天室信息,然后订阅聊天记录队列。

    public Flux<String> accept(String fromUsername, String toUsername) {String chatRoomName = getChatRoomName(fromUsername, toUsername);return talk(chatRoomName, toUsername);}private Flux<String> talk(String chatRoomName, String username) {ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);if (chatRoomInfo == null) {throw new IllegalArgumentException("Chat room not found");}String queueName = chatRoomInfo.getUsernameToQueuename().get(username);return Flux.create(emitter -> {SimpleMessageListenerContainer listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener((Message message) -> {String msg = new String(message.getBody());System.out.println(username + " received message: " + msg);emitter.next(msg);});listener.start();});}

聊天

聊天的逻辑就是找到聊天室信息,然后向交换器发送消息。

    public void chat(String fromUsername, String toUsername, String message) {String chatRoomName = getChatRoomName(fromUsername, toUsername);ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);if (chatRoomInfo == null) {chatRoomName = getChatRoomName(toUsername, fromUsername);chatRoomInfo = chatRooms.get(chatRoomName);}if (chatRoomInfo == null) {throw new IllegalArgumentException("Chat room not found");}rabbitTemplate.convertAndSend(chatRoomInfo.getExchange(), "", fromUsername + ": " + message);}private String getChatRoomName(String fromUsername, String toUsername) {return fromUsername + "-" + toUsername + "-chat-room";}

Controller侧代码

package com.rabbitmq.chat.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import com.rabbitmq.chat.service.ChatRoom;
import com.rabbitmq.chat.service.Core;import reactor.core.publisher.Flux;@RestController
@RequestMapping("/chat")
public class ChatController {@Autowiredprivate Core core;@Autowiredprivate ChatRoom chatRoom;@PutMapping(value = "/invite", produces = "text/event-stream")public Flux<String> invite(@RequestParam String fromUsername, @RequestParam String toUsername) {core.invite(fromUsername, toUsername);return chatRoom.invite(fromUsername, toUsername);}@PutMapping(value = "/accept", produces = "text/event-stream")public Flux<String> accept(@RequestParam String fromUsername, @RequestParam String toUsername) {core.accept(fromUsername, toUsername);return chatRoom.accept(fromUsername, toUsername);}@PostMapping("/send")public void send(@RequestParam String fromUsername, @RequestParam String toUsername, @RequestParam String message) {chatRoom.chat(fromUsername, toUsername, message);}
}

实验过程

在Postman中,我们先让tom登录,然后jerry登录。
在这里插入图片描述
在这里插入图片描述
在后台,我们看到创建两个队列
在这里插入图片描述
以及Core交换器的绑定关系也被更新
在这里插入图片描述
Jerry向Tom发起聊天邀请
在这里插入图片描述
可以看到Tom收到了邀请
在这里插入图片描述
同时新增了两个队列
在这里插入图片描述
以及一个交换器
在这里插入图片描述
在这里插入图片描述
Tom通过下面请求接受邀请
在这里插入图片描述
Jerry收到Tom接受了邀请的通知
在这里插入图片描述
后面它们就可以聊天了
在这里插入图片描述
在这里插入图片描述
它们的聊天窗口都收到了消息
在这里插入图片描述
在这里插入图片描述

总结

本文主要使用的知识点:

  • direct交换器以及其绑定规则
  • fanout交换器
  • 自动删除的交换器
  • 自动删除的队列
  • 只有一个消费者的队列
  • WebFlux响应式编程

代码工程

https://github.com/f304646673/RabbitMQDemo

相关文章:

RabbitMQ实践——搭建单人聊天服务

大纲 创建Core交换器用户登录发起聊天邀请接受邀请聊天实验过程总结代码工程 经过之前的若干节的学习&#xff0c;我们基本掌握了Rabbitmq各个组件和功能。本文我们将使用之前的知识搭建一个简单的单人聊天服务。 基本结构如下。为了避免Server有太多连线导致杂乱&#xff0c;下…...

GPT-5

欢迎来到 Papicatch的博客 文章目录 &#x1f349;技术突破预测 &#x1f348;算法进步 &#x1f348;理解力提升 &#x1f348;行业推动力 &#x1f349;人机协作的未来 &#x1f348;辅助决策 &#x1f348;增强创造力 &#x1f348;复杂任务中的角色 &#x1f348;人…...

Vip-智能预估+大数据标签+人群全选=用户分群!

Mobpush用户分群功能升级&#xff0c;创建推送入口vip用户可进入自有选择标签创建“用户分群”&#xff0c;相比于免费标签&#xff0c;“用户标签”维度更丰富。在应用基础属性上&#xff0c;增加“品牌”、“网络状态”、“运营商”&#xff0c;众所周知&#xff0c;不同厂商…...

SpringBoot异常处理机制之自定义404、500错误提示页面 - 518篇

历史文章&#xff08;文章累计500&#xff09; 《国内最全的Spring Boot系列之一》 《国内最全的Spring Boot系列之二》 《国内最全的Spring Boot系列之三》 《国内最全的Spring Boot系列之四》 《国内最全的Spring Boot系列之五》 《国内最全的Spring Boot系列之六》 《…...

为什么选择Xinstall CPA结算系统?因为它能帮您解决这些痛点!

在App推广和运营的道路上&#xff0c;我们时常面临着各种挑战和痛点。其中&#xff0c;结算系统的复杂性和不透明性往往成为制约我们发展的瓶颈。然而&#xff0c;有了Xinstall CPA结算系统&#xff0c;这些问题将迎刃而解&#xff0c;让您的App推广之路更加顺畅和高效。 一、…...

2024年【建筑电工(建筑特殊工种)】模拟试题及建筑电工(建筑特殊工种)作业考试题库

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2024年建筑电工(建筑特殊工种)模拟试题为正在备考建筑电工(建筑特殊工种)操作证的学员准备的理论考试专题&#xff0c;每个月更新的建筑电工(建筑特殊工种)作业考试题库祝您顺利通过建筑电工(建筑特殊工种)考试。 1、…...

解锁数字化转型的双引擎:MSP和CMP的力量

随着企业数字化转型的深入&#xff0c;云计算已经成为现代企业IT基础设施的重要组成部分。为了高效地管理和优化多云环境&#xff0c;企业通常会依赖管理服务提供商 (Managed Service Providers, MSP) 和云管理平台 (Cloud Management Platforms, CMP)。本文将探讨MSP和CMP的定…...

Pyecharts入门

数据可视化 Pyecharts简介 Apache ECharts 是一个由百度开源的数据可视化&#xff0c;凭借着良好的交互性&#xff0c;精巧的图表设计&#xff0c;得到了众多开发者的认可。而 Python 是一门富有表达力的语言&#xff0c;很适合用于数据处理。当数据分析遇上数据可视化时&#…...

Socket编程详解(一)服务端与客户端的双向对话

目录 预备知识 视频教程 项目前准备知识点 1、服务器端程序的编写步骤 2、客户端程序编写步骤 代码部分 1、服务端FrmServer.cs文件 2、客户端FrmClient.cs文件 3、启动文件Program.cs 结果展示 预备知识 请查阅博客http://t.csdnimg.cn/jE4Tp 视频教程 链接&#…...

使用Python实现深度学习模型:强化学习与深度Q网络(DQN)

深度Q网络(Deep Q-Network,DQN)是结合深度学习与强化学习的一种方法,用于解决复杂的决策问题。本文将详细介绍如何使用Python实现DQN,主要包括以下几个方面: 强化学习简介DQN算法简介环境搭建DQN模型实现模型训练与评估1. 强化学习简介 强化学习是一种训练智能体(agent…...

Py-Spy、Scalene 和 VizTracer 的对比分析

在前几篇文章中&#xff0c;我们详细介绍了如何使用 py-spy、scalene 和 viztracer 进行性能分析和优化。今天&#xff0c;我们将对这三个性能分析工具进行详细对比&#xff0c;帮助你选择最适合你的工具。 工具简介 Py-Spy&#xff1a; 实时性能分析&#xff1a;Py-Spy 可以…...

软考架构师考试内容

软考系统架构设计师考试是中国计算机技术与软件专业技术资格&#xff08;水平&#xff09;考试&#xff08;简称软考&#xff09;中的一项高级资格考试&#xff0c;旨在评估考生是否具备系统架构设计的能力。根据提供的参考资料&#xff0c;考试内容主要包括以下几个方面&#…...

【MySQL基础篇】概述及SQL指令:DDL及DML

数据库是一个按照数据结构来组织、存储和管理数据的仓库。以下是对数据库概念的详细解释&#xff1a;定义与基本概念&#xff1a; 数据库是长期存储在计算机内的、有组织的、可共享的、统一管理的大量数据的集合。 数据库不仅仅是数据的简单堆积&#xff0c;而是遵循一定的规则…...

计算机网络 —— 网络字节序

网络字节序 1、网络字节序 (Network Byte Order)和本机转换 1、大端、小端字节序 “大端” 和” 小端” 表示多字节值的哪一端存储在该值的起始地址处&#xff1b;小端存储在起始地址处&#xff0c;即是小端字节序&#xff1b;大端存储在起始地址处&#xff0c;即是大端字节…...

区块链不可能三角

区块链不可能三角&#xff1a;探索去中心化、安全与可扩展性的权衡 引言 区块链技术自诞生以来&#xff0c;以其去中心化、透明、安全等特点吸引了全球的关注&#xff0c;成为金融科技领域的重要革新力量。然而&#xff0c;随着区块链应用的日益广泛&#xff0c;一个核心问题…...

新手第一个漏洞复现:MS17-010(永恒之蓝)

文章目录 漏洞原理漏洞影响范围复现环境复现步骤 漏洞原理 漏洞出现在Windows SMB v1中的内核态函数srv!SrvOs2FeaListToNt在处理FEA&#xff08;File Extended Attributes&#xff09;转换时。该函数在将FEA list转换成NTFEA&#xff08;Windows NT FEA&#xff09;list前&am…...

代码随想录Day64

98.所有可达路径 题目&#xff1a;98. 所有可达路径 (kamacoder.com) 思路&#xff1a;果断放弃 答案 import java.util.*;public class Main {private static List<List<Integer>> adjList;private static List<List<Integer>> allPaths;private sta…...

Angular 指令

Angular 指令是 Angular 框架中的一项核心功能&#xff0c;它允许开发人员扩展 HTML 的功能&#xff0c;并创建可复用的组件和行为。以下是一些常见的 Angular 指令&#xff1a; 1. 组件指令 (Component Directives) 组件指令是最常用的一种指令&#xff0c;用于创建可复用的 U…...

移动端 UI 风格,书写华丽篇章

移动端 UI 风格&#xff0c;书写华丽篇章...

flutter开发实战-ListWheelScrollView与自定义TimePicker时间选择器

flutter开发实战-ListWheelScrollView与自定义TimePicker 最近在使用时间选择器的时候&#xff0c;需要自定义一个TimePicker效果&#xff0c;当然这里就使用了ListWheelScrollView。ListWheelScrollView与ListView类似&#xff0c;但ListWheelScrollView渲染效果类似滚筒效果…...

IDEA运行Tomcat出现乱码问题解决汇总

最近正值期末周&#xff0c;有很多同学在写期末Java web作业时&#xff0c;运行tomcat出现乱码问题&#xff0c;经过多次解决与研究&#xff0c;我做了如下整理&#xff1a; 原因&#xff1a; IDEA本身编码与tomcat的编码与Windows编码不同导致&#xff0c;Windows 系统控制台…...

OpenLayers 可视化之热力图

注&#xff1a;当前使用的是 ol 5.3.0 版本&#xff0c;天地图使用的key请到天地图官网申请&#xff0c;并替换为自己的key 热力图&#xff08;Heatmap&#xff09;又叫热点图&#xff0c;是一种通过特殊高亮显示事物密度分布、变化趋势的数据可视化技术。采用颜色的深浅来显示…...

基于当前项目通过npm包形式暴露公共组件

1.package.sjon文件配置 其中xh-flowable就是暴露出去的npm包名 2.创建tpyes文件夹&#xff0c;并新增内容 3.创建package文件夹...

WordPress插件:AI多语言写作与智能配图、免费AI模型、SEO文章生成

厌倦手动写WordPress文章&#xff1f;AI自动生成&#xff0c;效率提升10倍&#xff01; 支持多语言、自动配图、定时发布&#xff0c;让内容创作更轻松&#xff01; AI内容生成 → 不想每天写文章&#xff1f;AI一键生成高质量内容&#xff01;多语言支持 → 跨境电商必备&am…...

select、poll、epoll 与 Reactor 模式

在高并发网络编程领域&#xff0c;高效处理大量连接和 I/O 事件是系统性能的关键。select、poll、epoll 作为 I/O 多路复用技术的代表&#xff0c;以及基于它们实现的 Reactor 模式&#xff0c;为开发者提供了强大的工具。本文将深入探讨这些技术的底层原理、优缺点。​ 一、I…...

OPenCV CUDA模块图像处理-----对图像执行 均值漂移滤波(Mean Shift Filtering)函数meanShiftFiltering()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 在 GPU 上对图像执行 均值漂移滤波&#xff08;Mean Shift Filtering&#xff09;&#xff0c;用于图像分割或平滑处理。 该函数将输入图像中的…...

佰力博科技与您探讨热释电测量的几种方法

热释电的测量主要涉及热释电系数的测定&#xff0c;这是表征热释电材料性能的重要参数。热释电系数的测量方法主要包括静态法、动态法和积分电荷法。其中&#xff0c;积分电荷法最为常用&#xff0c;其原理是通过测量在电容器上积累的热释电电荷&#xff0c;从而确定热释电系数…...

LangChain知识库管理后端接口:数据库操作详解—— 构建本地知识库系统的基础《二》

这段 Python 代码是一个完整的 知识库数据库操作模块&#xff0c;用于对本地知识库系统中的知识库进行增删改查&#xff08;CRUD&#xff09;操作。它基于 SQLAlchemy ORM 框架 和一个自定义的装饰器 with_session 实现数据库会话管理。 &#x1f4d8; 一、整体功能概述 该模块…...

逻辑回归暴力训练预测金融欺诈

简述 「使用逻辑回归暴力预测金融欺诈&#xff0c;并不断增加特征维度持续测试」的做法&#xff0c;体现了一种逐步建模与迭代验证的实验思路&#xff0c;在金融欺诈检测中非常有价值&#xff0c;本文作为一篇回顾性记录了早年间公司给某行做反欺诈预测用到的技术和思路。百度…...

学习一下用鸿蒙​​DevEco Studio HarmonyOS5实现百度地图

在鸿蒙&#xff08;HarmonyOS5&#xff09;中集成百度地图&#xff0c;可以通过以下步骤和技术方案实现。结合鸿蒙的分布式能力和百度地图的API&#xff0c;可以构建跨设备的定位、导航和地图展示功能。 ​​1. 鸿蒙环境准备​​ ​​开发工具​​&#xff1a;下载安装 ​​De…...