Websocket集群解决方案以及实战(附图文源码)
最近在项目中在做一个消息推送的功能,比如客户下单之后通知给给对应的客户发送系统通知,这种消息推送需要使用到全双工的websocket推送消息。
所谓的全双工表示客户端和服务端都能向对方发送消息。不使用同样是全双工的http是因为http只能由客户端主动发起请求,服务接收后返回消息。websocket建立起连接之后,客户端和服务端都能主动向对方发送消息。
websocket在单机模式下进行消息的发送和接收:
用户A和用户B和web服务器建立连接之后,用户A发送一条消息到服务器,服务器再推送给用户B,在单机系统上所有的用户都和同一个服务器建立连接,所有的session都存储在同一个服务器中。
单个服务器是无法支撑几万人同时连接同一个服务器,需要使用到分布式或者集群将请求连接负载均衡到到不同的服务下。消息的发送方和接收方在同一个服务器,这就和单体服务器类似,能成功接收到消息:
但负载均衡使用轮询的算法,无法保证消息发送方和接收方处于同一个服务器,当发送方和接收方不是在同一个服务器时,接收方是无法接受到消息的:
websocket集群问题解决思路
客户端和服务端每次建立连接时候,会创建有状态的会话session,服务器的保存维持连接的session。客户端每次只能和集群服务器其中的一个服务器连接,后续也是和该服务器进行数据传输。
要解决集群的问题,应该考虑session共享的问题,客户端成功连接服务器之后,其他服务器也知道客户端连接成功。
方案一:session 共享(不可行)
和websocket类似的http是如何解决集群问题的?解决方案之一就是共享session,客户端登录服务端之后,将session信息存储在Redis数据库中,连接其他服务器时,从Redis获取session,实际就是将session信息存储在Redis中,实现redis的共享。
session可以被共享的前提是可以被序列化,而websocket的session是无法被序列化的,http的session记录的是请求的数据,而websocket的session对应的是连接,连接到不同的服务器,session也不同,无法被序列化。
方案二:ip hash(不可行)
http不使用session共享,就可以使用Nginx负载均衡的ip hash算法,客户端每次都是请求同一个服务器,客户端的session都保存在服务器上,而后续请求都是请求该服务器,都能获取到session,就不存在分布式session问题了。
websocket相对http来说,可以由服务端主动推动消息给客户端,如果接收消息的服务端和发送消息消息的服务端不是同一个服务端,发送消息的服务端无法找到接收消息对应的session,即两个session不处于同一个服务端,也就无法推送消息。如下图所示:
解决问题的方法是将所有消息的发送方和接收方都处于同一个服务器下,而消息发送方和接收方都是不确定的,显然是无法实现的。
方案三:广播模式
将消息的发送方和接收方都处于同一个服务器下才能发送消息,那么可以转换一下思路,可以将消息以消息广播的方式通知给所有的服务器,可以使用消息中间件发布订阅模式,消息脱离了服务器的限制,通过发送到中间件,再发送给订阅的服务器,类似广播一样,只要订阅了消息,都能接收到消息的通知:
发布者发布消息到消息中间件,消息中间件再将发送给所有订阅者:
广播模式的实现
搭建单机 websocket
参考以前写的websocket单机搭建 文章,先搭建单机websocket实现消息的推送。
- 添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-freemarker</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 创建 ServerEndpointExporter 的 bean 实例
ServerEndpointExporter 的 bean 实例自动注册 @ServerEndpoint 注解声明的 websocket endpoint,使用springboot自带tomcat启动需要该配置,使用独立 tomcat 则不需要该配置。
@Configuration
public class WebSocketConfig {//tomcat启动无需该配置@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
- 创建服务端点 ServerEndpoint 和 客户端端
服务端点
@Component
@ServerEndpoint(value = "/message")
@Slf4j
public class WebSocket {private static Map<String, WebSocket> webSocketSet = new ConcurrentHashMap<>();private Session session;@OnOpenpublic void onOpen(Session session) throws SocketException {this.session = session;webSocketSet.put(this.session.getId(),this);log.info("【websocket】有新的连接,总数:{}",webSocketSet.size());}@OnClosepublic void onClose(){String id = this.session.getId();if (id != null){webSocketSet.remove(id);log.info("【websocket】连接断开:总数:{}",webSocketSet.size());}}@OnMessagepublic void onMessage(String message){if (!message.equals("ping")){log.info("【wesocket】收到客户端发送的消息,message={}",message);sendMessage(message);}}/*** 发送消息* @param message* @return*/public void sendMessage(String message){for (WebSocket webSocket : webSocketSet.values()) {webSocket.session.getAsyncRemote().sendText(message);}log.info("【wesocket】发送消息,message={}", message);}}
客户端点
<div><input type="text" name="message" id="message"><button id="sendBtn">发送</button>
</div>
<div style="width:100px;height: 500px;" id="content">
</div>
<script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.6.0/jquery.js"></script>
<script type="text/javascript">var ws = new WebSocket("ws://127.0.0.1:8080/message");ws.onopen = function(evt) {console.log("Connection open ...");};ws.onmessage = function(evt) {console.log( "Received Message: " + evt.data);var p = $("<p>"+evt.data+"</p>")$("#content").prepend(p);$("#message").val("");};ws.onclose = function(evt) {console.log("Connection closed.");};$("#sendBtn").click(function(){var aa = $("#message").val();ws.send(aa);})</script>
服务端和客户端中的OnOpen、onclose、onmessage都是一一对应的。
服务启动后,客户端ws.onopen调用服务端的@OnOpen注解的方法,储存客户端的session信息,握手建立连接。
客户端调用ws.send发送消息,对应服务端的@OnMessage注解下面的方法接收消息。
服务端调用session.getAsyncRemote().sendText发送消息,对应的客户端ws.onmessage接收消息。
添加 controller
@GetMapping({"","index.html"})
public ModelAndView index() {ModelAndView view = new ModelAndView("index");return view;
}
效果展示
打开两个客户端,其中的一个客户端发送消息,另一个客户端也能接收到消息。
添加 RabbitMQ 中间件
这里使用比较常用的RabbitMQ作为消息中间件,而RabbitMQ支持发布订阅模式:
添加消息订阅
交换机使用扇形交换机,消息分发给每一条绑定该交换机的队列。以服务器所在的IP + 端口作为唯一标识作为队列的命名,启动一个服务,使用队列绑定交换机,实现消息的订阅:
@Configuration
public class RabbitConfig {@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("PUBLISH_SUBSCRIBE_EXCHANGE");}@Beanpublic Queue psQueue() throws SocketException {// ip + 端口 为队列名 String ip = IpUtils.getServerIp() + "_" + IpUtils.getPort();return new Queue("ps_" + ip);}@Beanpublic Binding routingFirstBinding() throws SocketException {return BindingBuilder.bind(psQueue()).to(fanoutExchange());}
}
修改服务端点 ServerEndpoint
在WebSocket添加消息的接收方法,@RabbitListener 接收消息,队列名称使用常量命名,动态队列名称使用 #{name},其中的name是Queue的bean 名称:
@RabbitListener(queues= "#{psQueue.name}")
public void pubsubQueueFirst(String message) {System.out.println(message);sendMessage(message);
}
然后再调用sendMessage方法发送给所在连接的客户端。
修改消息发送
在WebSocket类的onMessage方法将消息发送改成RabbitMQ方式发送:
@OnMessage
public void onMessage(String message){if (!message.equals("ping")){log.info("【wesocket】收到客户端发送的消息,message={}",message);//sendMessage(message);if (rabbitTemplate == null) {rabbitTemplate = (RabbitTemplate) SpringContextUtil.getBean("rabbitTemplate");}rabbitTemplate.convertAndSend("PUBLISH_SUBSCRIBE_EXCHANGE", null, message);}
}
消息通知流程如下所示:
启动两个实例,模拟集群环境
打开idea的Edit Configurations:
点击左上角的COPY,然后添加端口server.port=8081:
启动两个服务,端口分别是8080和8081。在启动8081端口的服务,将前端连接端口改成8081:
var ws = new WebSocket("ws://127.0.0.1:8081/message");
效果展示
相关文章:

Websocket集群解决方案以及实战(附图文源码)
最近在项目中在做一个消息推送的功能,比如客户下单之后通知给给对应的客户发送系统通知,这种消息推送需要使用到全双工的websocket推送消息。 所谓的全双工表示客户端和服务端都能向对方发送消息。不使用同样是全双工的http是因为http只能由客户端主动发…...
科技的成就(五十一)
397、初等数论的不可解问题 1936 年 4 月,邱奇证明判定性问题不可解。33 岁的邱奇发表论文《初等数论的不可解问题》,运用λ演算给出了判定性问题一个否定的答案。λ演算是一套从数学逻辑中发展起来的形式系统,采用变量绑定和替换,…...

Tomcat8 任意写文件PUT方法 (CVE-2017-12615)
Tomcat 任意写文件PUT方法 (CVE-2017-12615) 文章目录 Tomcat 任意写文件PUT方法 (CVE-2017-12615)1 在线漏洞解读:2 版本影响3 环境搭建4 漏洞复现4.1 访问4.2 POC攻击点4.2.1 直接发送以下数据包,然后shell将被写入Web根目录。4.2.2 访问是否通,可以访…...
SAP服务器修改主机名操作手册
1、业务背景 SAP服务器P2V:虚拟化后的服务器主机名(或叫计算机名、设备名,hostname,下文同)会和原参照克隆的服务器主机名一样,若两台服务器处于同一网域,会出现域冲突,导致以下事故发生 (1)、使得原服务器出现掉域情况(DEV->CLN->PRD后台服务器访问失效) …...

【大数据】Doris 构建实时数仓落地方案详解(一):实时数据仓库概述
本系列包含: Doris 构建实时数仓落地方案详解(一):实时数据仓库概述Doris 构建实时数仓落地方案详解(二):Doris 核心功能解读Doris 构建实时数仓落地方案详解(三)&#…...

C++ list容器的实现及讲解
所需要的基础知识 对C类的基本了解 默认构造函数 操作符重载 this指针 引用 模板等知识具有一定的了解,阅读该文章会很轻松。 链表节点 template<class T>struct list_node{T _data;list_node<T>* _next;list_node<T>* _prev;list_node(const T&…...

前端项目练习(练习-002-NodeJS项目初始化)
首先,创建一个web-002项目,内容和web-001一样。 下一步,规范一下项目结构,将html,js,css三个文件放到 src/view目录下面: 由于html引入css和js时,使用的是相对路径,所以…...

C++QT day11
绘制时钟 widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QPaintEvent>//绘制事件类 #include <QDebug>//信息调试类 #include <QPainter>//画家类 #include <QTimer>//定时器类 #include <QTime> #include &…...

Stable DIffusion 炫酷应用 | AI嵌入艺术字+光影光效
目录 1 生成AI艺术字基本流程 1.1 生成黑白图 1.2 启用ControlNet 参数设置 1.3 选择大模型 写提示词 2 不同效果组合 2.1 更改提示词 2.2 更改ControlNet 2.2.1 更改模型或者预处理器 2.2.2 更改参数 3. 其他应用 3.1 AI光影字 本节需要用到ControlNet,可…...

C#通过重写Panel改变边框颜色与宽度的方法
在C#中,Panel控件是一个容器控件,用于在窗体或用户控件中创建一个可用于容纳其他控件的面板。Panel提供了一种将相关控件组合在一起并进行布局的方式。以下是Panel控件的详细使用方法: 在窗体上放置 Panel 控件: 在 Visual Studio 的窗体设计器中,从工具箱中拖动并放置一…...

Vue2+ElementUI 静态首页案例
源码 <template><div class"app-container home"><el-row type"flex" justify"space-around" class"row-bg"><el-card class"box-card cardDiv1"><el-col :span"5"><div clas…...

Linux的socket通信
关于套接字通信定义如下: 套接字对应程序猿来说就是一套网络通信的接口,使用这套接口就可以完成网络通信。网络通信的主体主要分为两部分:客户端和服务器端。在客户端和服务器通信的时候需要频繁提到三个概念:IP、端口、通信数据&…...
MySQL学习大纲
了解 MySQL 的基础知识和命令是使用此数据库的前提。以下是一些必须了解的 MySQL 概念和命令,包括基础的 CRUD(创建,读取,更新,删除)操作,以及一些高级功能: 1. 安装和启动 命令su…...

【Ambari】银河麒麟V10 ARM64架构_安装Ambari2.7.6HDP3.3.1(HiDataPlus)
🍁 博主 "开着拖拉机回家"带您 Go to New World.✨🍁 🦄 个人主页——🎐开着拖拉机回家_大数据运维-CSDN博客 🎐✨🍁 🪁🍁 希望本文能够给您带来一定的帮助🌸文…...

驱动开发练习,platform实现如下功能
实验要求 驱动代码 #include <linux/init.h> #include <linux/module.h> #include <linux/platform_device.h> #include <linux/mod_devicetable.h> #include <linux/of_gpio.h> #include <linux/unistd.h> #include <linux/interrupt…...
QT之QString的用法介绍
QT之QString的用法介绍 成员函数常见用法 成员函数 1)QString &append(const QString &str) 将 str 字符串追加到当前字符串末尾,并返回修改后的 QString 对象的引用。 2)QString &prepend(const QString &str) 将 str 字符…...

基于Java+SpringBoot+Vue3+Uniapp前后端分离考试学习一体机设计与实现2.0版本(视频讲解,已发布上线)
博主介绍:✌全网粉丝4W,全栈开发工程师,从事多年软件开发,在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战,博主也曾写过优秀论文,查重率极低,在这方面有丰富的经验…...

springboot 获取参数
1.获取简单参数 2.实体对象参数...

【笔记】离线Ubuntu20.04+mysql 5.7.36 + xtrabackup定时增量备份脚本
一、环境 ● Ubuntu版本查看 lsb_release -a● mysql 版本查看 mysql --version我的是ubuntu 20.04,mysql是5.7.36,所以要用 install_percona-xtrabackup-24 二、原理 备份 通过ubuntu自带的定时器运行增量备份脚本备份文件可以存储在映射后的其他…...
树哈希与换根dp:CF763D
采用的树哈希函数是: d p x w x ∑ y ∈ x d p y 2 w x 2 \Large dp_xw_x\times \sum_{y\in x}dp_y^2w_x^2 dpxwxy∈x∑dpy2wx2 发现从 x x x 到 y y y 时只有 x x x 与 y y y 的哈希值会变化,分别维护即可 #include<bits/stdc.h&…...
uniapp 对接腾讯云IM群组成员管理(增删改查)
UniApp 实战:腾讯云IM群组成员管理(增删改查) 一、前言 在社交类App开发中,群组成员管理是核心功能之一。本文将基于UniApp框架,结合腾讯云IM SDK,详细讲解如何实现群组成员的增删改查全流程。 权限校验…...
【根据当天日期输出明天的日期(需对闰年做判定)。】2022-5-15
缘由根据当天日期输出明天的日期(需对闰年做判定)。日期类型结构体如下: struct data{ int year; int month; int day;};-编程语言-CSDN问答 struct mdata{ int year; int month; int day; }mdata; int 天数(int year, int month) {switch (month){case 1: case 3:…...
椭圆曲线密码学(ECC)
一、ECC算法概述 椭圆曲线密码学(Elliptic Curve Cryptography)是基于椭圆曲线数学理论的公钥密码系统,由Neal Koblitz和Victor Miller在1985年独立提出。相比RSA,ECC在相同安全强度下密钥更短(256位ECC ≈ 3072位RSA…...

【电力电子】基于STM32F103C8T6单片机双极性SPWM逆变(硬件篇)
本项目是基于 STM32F103C8T6 微控制器的 SPWM(正弦脉宽调制)电源模块,能够生成可调频率和幅值的正弦波交流电源输出。该项目适用于逆变器、UPS电源、变频器等应用场景。 供电电源 输入电压采集 上图为本设计的电源电路,图中 D1 为二极管, 其目的是防止正负极电源反接, …...
动态 Web 开发技术入门篇
一、HTTP 协议核心 1.1 HTTP 基础 协议全称 :HyperText Transfer Protocol(超文本传输协议) 默认端口 :HTTP 使用 80 端口,HTTPS 使用 443 端口。 请求方法 : GET :用于获取资源,…...

MacOS下Homebrew国内镜像加速指南(2025最新国内镜像加速)
macos brew国内镜像加速方法 brew install 加速formula.jws.json下载慢加速 🍺 最新版brew安装慢到怀疑人生?别怕,教你轻松起飞! 最近Homebrew更新至最新版,每次执行 brew 命令时都会自动从官方地址 https://formulae.…...

从“安全密码”到测试体系:Gitee Test 赋能关键领域软件质量保障
关键领域软件测试的"安全密码":Gitee Test如何破解行业痛点 在数字化浪潮席卷全球的今天,软件系统已成为国家关键领域的"神经中枢"。从国防军工到能源电力,从金融交易到交通管控,这些关乎国计民生的关键领域…...

若依登录用户名和密码加密
/*** 获取公钥:前端用来密码加密* return*/GetMapping("/getPublicKey")public RSAUtil.RSAKeyPair getPublicKey() {return RSAUtil.rsaKeyPair();}新建RSAUti.Java package com.ruoyi.common.utils;import org.apache.commons.codec.binary.Base64; im…...
在鸿蒙HarmonyOS 5中使用DevEco Studio实现指南针功能
指南针功能是许多位置服务应用的基础功能之一。下面我将详细介绍如何在HarmonyOS 5中使用DevEco Studio实现指南针功能。 1. 开发环境准备 确保已安装DevEco Studio 3.1或更高版本确保项目使用的是HarmonyOS 5.0 SDK在项目的module.json5中配置必要的权限 2. 权限配置 在mo…...

WebRTC调研
WebRTC是什么,为什么,如何使用 WebRTC有什么优势 WebRTC Architecture Amazon KVS WebRTC 其它厂商WebRTC 海康门禁WebRTC 海康门禁其他界面整理 威视通WebRTC 局域网 Google浏览器 Microsoft Edge 公网 RTSP RTMP NVR ONVIF SIP SRT WebRTC协…...