当前位置: 首页 > news >正文

IM系统设计之websocket消息转发

Websocket消息转发

项目地址:git@github.com:muyixiaoxi/Link.git

上周面试被面试官问到:“在分布式IM系统中,如何实现多个websocket集群之间的通信”。

我在思考了良久后回答:“不会”。

随着我的回答,我和面试官的故事也到此完结了…

为什么会出现websocket集群

在IM系统中,需要在服务端和客户端之间维持一个长连接,而这个长连接可以通过websocket实现。
但服务端能维持websocket的数量并不是无限的。

WebSocket的并发连接数受到多种因素的影响,其中最主要的瓶颈通常在于服务器资源。在传统的模型中,一台服务器上的最大WebSocket连接数受到操作系统中TCP/IP连接数的限制。在Linux系统中,每个IPv4地址允许的最大连接数为65535,这意味着如果每个连接都使用不同的IP地址,一台服务器最多只能维持65535个WebSocket连接。

当用户量很多时,一台websocket服务器远远是不够的,所以需要多台websocket服务器。

假如现在只有一台IM服务器(即 Websocket 服务器),用户A、用户B均在线,用户A向用户B发送一条消息
在这里插入图片描述
单台IM服务器发送消息大概流程如下:

  1. 客户端向IM服务端发送消息
  2. IM服务端收到消息判断用户B是否在线
    • 在线,Websocket转发
    • 离线,将消息存储到B的离线消息库
  3. 用户B立即了消息,或者在下次上线时收到了消息

因为现在只有一台IM服务器,所以直接可以判断用户B是否在线,并且转发。

假如现在我有多台IM服务器,重复上面的操作

如果恰巧A和B连接在一台IM服务器上,那么和上面的流程一样

假如现在A连接在IM1上,B连接在IM2上
在这里插入图片描述

其中红线的部分,就是我们要解决的部分

websocket集群通信

我查阅了网上的一些资料(这一块具体采用哪种技术栈实现,网上的资料很少),大概分为两种

方法一:互为客户端

分别将IM服务端与其他IM服务端连接起来,可以通过网络编程或者MQ来实现
在这里插入图片描述
优点:

  • 不需要额外的服务
  • 转发过程中,各个IM服务负载相对均匀

缺点:

  • 每增加一个IM服务端都需要其他服务端多维持一个连接或者MQ
  • 水平扩展有点繁琐

方法二:C/S

采用c/s架构,新建一个transmit服务,单独实现转发功能,可以通过网络编程或者MQ实现。
因为各个 IM 都与 transmit 连接,所以扩展只需要该配置文件的运行端口
在这里插入图片描述
优点:

  • 高可用,IM server扩展方便,只需要修改自己的运行端口

缺点:

  • 不同服务端之间的消息需要通过 transmit 转发,当海量消息时,对 transmit 压力比较大
代码实现

为了实现消息的时效性,以及高可用,我采用net包中的tcp实现了c/s架构
项目目录如下:

transmit:
│  client.go
│  main.go
│
├─common
│  └─proto
│          proto.go
│
└─typestypes.go
proto

使用 net 包的 tcp 可能会出现粘包现象,封装编码与解码方法从而避免粘包

package mainimport ("bufio""encoding/json""fmt""github.com/zeromicro/go-zero/core/logx""net""sync""transmit/common/proto""transmit/types"
)var Connects sync.Mapfunc main() {listenClient()
}// listenClient 监听
func listenClient() {lister, err := net.Listen("tcp", "127.0.0.1:8333")if err != nil {fmt.Println("net.Listen failed:", err)}for {conn, err := lister.Accept()if err != nil {continue}fmt.Println(conn.RemoteAddr().String())Connects.Swap(conn.RemoteAddr().String(), conn)go addReceiver(conn)}
}// addReceiver 向连接添加接收器
func addReceiver(conn net.Conn) {defer conn.Close()reader := bufio.NewReader(conn)for {m, err := proto.Decode(reader)if err != nil {fmt.Println("与客户端", conn.LocalAddr(), "断开连接")return}transmit := types.TransmitMap{}json.Unmarshal([]byte(m), &transmit)// 读到消息后,根据服务器进行转发for connect := range transmit.Users {transmitMessage(conn, connect, transmit)}}
}func transmitMessage(conn net.Conn, ip string, transmit types.TransmitMap) {c, ok := Connects.Load(ip)message := types.TransmitMap{Users: map[string][]uint64{},}if !ok {message = types.TransmitMap{Message: types.Message{Id:          "",From:        0,To:          0,Type:        100,ContentType: 0,Time:        "",Content:     "客户端离线",},}s, _ := json.Marshal(message)msg, _ := proto.Encode(string(s))conn.Write(msg)fmt.Println("客户端离线:", ip)logx.Error("connect ip offline:", ip)return}message.Users[ip] = transmit.Users[ip]message.Message = transmit.Messagej, _ := json.Marshal(message)msg, _ := proto.Encode(string(j))fmt.Println("ip:", ip, "msg:", string(msg))c.(net.Conn).Write(msg)
}
main

通过监听某个端口,让 IM server与其建立间接,实现转发功能

package mainimport ("bufio""encoding/json""fmt""github.com/zeromicro/go-zero/core/logx""net""sync""transmit/common/proto""transmit/types"
)var Connects sync.Mapfunc main() {listenClient()
}// listenClient 监听
func listenClient() {lister, err := net.Listen("tcp", "127.0.0.1:8333")if err != nil {fmt.Println("net.Listen failed:", err)}for {conn, err := lister.Accept()if err != nil {continue}fmt.Println(conn.RemoteAddr().String())Connects.Swap(conn.RemoteAddr().String(), conn)go addReceiver(conn)}
}// addReceiver 向连接添加接收器
func addReceiver(conn net.Conn) {defer conn.Close()reader := bufio.NewReader(conn)for {m, err := proto.Decode(reader)if err != nil {fmt.Println("与客户端", conn.LocalAddr(), "断开连接")return}transmit := types.TransmitMap{}json.Unmarshal([]byte(m), &transmit)// 读到消息后,根据服务器进行转发for connect := range transmit.Users {transmitMessage(conn, connect, transmit)}}
}func transmitMessage(conn net.Conn, ip string, transmit types.TransmitMap) {c, ok := Connects.Load(ip)message := types.TransmitMap{Users: map[string][]uint64{},}if !ok {message = types.TransmitMap{Message: types.Message{Id:          "",From:        0,To:          0,Type:        100,ContentType: 0,Time:        "",Content:     "客户端离线",},}s, _ := json.Marshal(message)msg, _ := proto.Encode(string(s))conn.Write(msg)fmt.Println("客户端离线:", ip)logx.Error("connect ip offline:", ip)return}message.Users[ip] = transmit.Users[ip]message.Message = transmit.Messagej, _ := json.Marshal(message)msg, _ := proto.Encode(string(j))fmt.Println("ip:", ip, "msg:", string(msg))c.(net.Conn).Write(msg)
}
client

模拟客户端,根据自己的项目拆分到 IM server中

package mainimport ("bufio""encoding/json""fmt""net""time""transmit/common/proto""transmit/types"
)func client() {conn, _ := InitConnect()go Consumer(conn)var ip stringmessage := types.Message{Id:          "123",From:        1,To:          2,Type:        1,ContentType: 1,Time:        "123",Content:     "你好",}for {fmt.Scan(&ip)users := map[string][]uint64{}users[ip] = []uint64{1, 2}time.Sleep(2 * time.Second)if err := Producer(conn, users, message); err != nil {fmt.Println("Producer(conn, ip, message) failed", err)}}}func InitConnect() (conn net.Conn, err error) {conn, err = net.Dial("tcp", "127.0.0.1:8333")fmt.Println(conn.LocalAddr())return
}func Producer(conn net.Conn, user map[string][]uint64, mes types.Message) (err error) {transmit := types.TransmitMap{Users:   user,Message: mes,}message, _ := json.Marshal(transmit)m, _ := proto.Encode(string(message))_, err = conn.Write(m)if err != nil {// 重试三次,一次休眠一秒for i := 0; i < 3 && err != nil; i++ {time.Sleep(1 * time.Second)_, err = conn.Write(m)}}return
}// Consumer 消费者 读消息
func Consumer(conn net.Conn) {defer conn.Close()reader := bufio.NewReader(conn)for {m, err := proto.Decode(reader)if err != nil {continue}transmit := types.TransmitMap{}json.Unmarshal([]byte(m), &transmit)// 读到消息后,进行转发for _, uIds := range transmit.Users {for _, id := range uIds {fmt.Println(id, transmit.Message)}}}
}
types

在转发群聊消息时,需要将 m 个用户转发到 n 个IM服务端上,如果单独发送需要多次发送,所以封装成 TransmitMap 进行转发。

type Message struct {Id          string `json:"id"`From        uint64 `json:"from,optional"`To          uint64 `json:"to"`Type        uint32 `json:"type"`ContentType uint32 `json:"contentType"`Time        string `json:"time"`Content     string `json:"content"`
}type TransmitMap struct {Users map[string][]uint64 `json:"users"`  // map[主机地址]用户集合Message
}

为什么这里要封装一个?

type TransmitMap struct {Users map[string][]uint64 `json:"users"`Message
}

比如用户A、B、C、D、E、F、G在同一个群聊里,各自连接到的 IM server 如图所示
在这里插入图片描述
如果群聊消息采用上面单聊的转发方式

  1. 用户A发送一条消息
  2. IM server1 收到群聊消息,查找群里的其他用户(B、C、D、E、F、G)
  3. 判断哪些用户在当前 IM servier 上,发现 B,直接转发
  4. 遍历(B、C、D、E、F、G): 将在线用户消息转发消息到 transmit

如果不做任何操作的化,光过程 4 就需要转发5次消息

用户A发送一条群聊消息的过程

  1. 用户A发送一条群聊消息
  2. IM server1 收到群聊消息,查找群里的其他用户(B、C、D、E、F、G)
  3. 判断哪些用户在当前 IM servier 上,发现 B,直接转发
  4. 通过 redis 判断哪些用户在线并获取主机地址,将通过map将相同地址的用户分类 map[主机地址]用户集合,一块转发到 transmit
  5. transmit 以主机地址为组,将消息发送给 IM server2 和 IM server3
  6. IM server2 和 IM server3 收到消息后,将消息进行转发
  7. 离线用户同步离线消息库

这样的好处是,有效的减少群聊消息转发的次数。

ps:如果存在哪些不足,欢迎大家在评论区指正~

相关文章:

IM系统设计之websocket消息转发

Websocket消息转发 项目地址&#xff1a;gitgithub.com:muyixiaoxi/Link.git 上周面试被面试官问到&#xff1a;“在分布式IM系统中&#xff0c;如何实现多个websocket集群之间的通信”。 我在思考了良久后回答&#xff1a;“不会”。 随着我的回答&#xff0c;我和面试官的…...

关于vue 的生命周期的教程

Vue.js 是一款流行的前端框架&#xff0c;它提供了丰富的功能和便捷的开发式&#xff0c; 其中生命周期函数是 Vue 组件中非常重要的一部分。 本文将为您详细介绍 Vue 组件的生命周期函数及其执行顺序&#xff0c; 帮助您更好地理解和利用 Vue.js 框架。 什么是 Vue 生命周期 …...

STM32 CAN的工作模式

STM32 CAN的工作模式 正常模式 正常模式下就是一个正常的CAN节点&#xff0c;可以向总线发送数据和接收数据。 静默模式 静默模式下&#xff0c;它自己的输出端的逻辑0数据会直接传输到它自己的输入端&#xff0c;逻辑1可以被发送到总线&#xff0c;所以它不能向总线发送显性…...

Java中的常用类之Math类

Java中的Math类 一、Math类是什么&#xff1f;二、主要方法1.随机数2.绝对值3.向上取值4.向下取值5.四舍五入6.两个值中取大/小的 总结 一、Math类是什么&#xff1f; Math类是Java常用类的一种&#xff0c;主要方法针对于数学方面的运算&#xff0c;类中的所有方法都是static…...

Android冷启动优化

一、应用启动的三种状态 冷启动&#xff1a;系统不存在App进程&#xff08;APP首次启动或APP被完全杀死&#xff09;时启动APP&#xff0c;此时&#xff0c;APP的启动将经历两个阶段&#xff1a; 1、创建app进程&#xff1a;系统启动应用程序进程和虚拟机&#xff0c;创建app…...

jmeter之接口功能自动化

一、接口测试简述 接口&#xff1a;用来连接前端&#xff0c;后端还有移动端的程序模块。由于不同端的工作进度不一样&#xff0c;需要对最开始出来的接口进行接口测试。 接口分类&#xff1a;POST&#xff0c;GET&#xff0c;PUT&#xff0c;DELETE。 POST请求的数据是放在…...

【openGL4.x手册07】几何着色器

目录 一、说明二、关于几何着色器三、原始输入/输出规范3.1 实例 四、输入五、输出5.1 分层渲染 六、输出限制 一、说明 几何着色器对于渲染管线设计是一个新生事物&#xff1b;目前对应于几何着色器的资料不多&#xff0c;并且说法不一&#xff0c;因此如何用几何着色器&…...

鸿蒙OpenHarmony开发实战:【MiniCanvas】

介绍 基于OpenHarmony的Cavas组件封装了一版极简操作的MiniCanvas&#xff0c;屏蔽了原有Canvas内部复杂的调用流程&#xff0c;支持一个API就可以实现相应的绘制能力&#xff0c;该库还在继续完善中&#xff0c;也欢迎PR。 使用说明 添加MiniCanvas依赖 在项目entry目录执行…...

【JavaEE初阶系列】——单例模式 (“饿汉模式“和“懒汉模式“以及解决线程安全问题)

目录 &#x1f6a9;单例模式 &#x1f388;饿汉模式 &#x1f388;懒汉模式 ❗线程安全问题 &#x1f4dd;加锁 &#x1f4dd;执行效率提高 &#x1f4dd;指令重排序 &#x1f36d;总结 单例模式&#xff0c;非常经典的设计模式&#xff0c;也是一个重要的学科&#x…...

flutter-elinux的基本介绍及安装调试

搜集到两个很有用的网站&#xff1a; 1、flutter-elinux的基本介绍&#xff1a;https://juejin.cn/post/7257285697383612453 2、flutter-elinux的安装调试等&#xff1a;https://github.com/sony/flutter-elinux/wiki 其中&#xff0c;在flutter-elinux设置环境变量时&#…...

二分查找法总结

目录 1、思路讲解&#xff08;LC704&#xff09;2、代码思路讲解&#xff08;循环不变量&#xff09;&#xff08;1&#xff09; 左闭右闭&#xff08;2&#xff09;左闭右开&#xff08;3&#xff09;总结&#xff1a;左开右闭和左闭右开&#xff08;4&#xff09;复杂度分析 …...

Python工具-清理Unity(批量深度)清理U3D项目工程保留关键工程文件

前沿 1. Unity工程越来越多&#xff0c;很久不用的工程里存在了很多无用的大文件夹&#xff0c;极大的影响电脑容量。 2. 我电脑里面U3D工程只有17个&#xff0c;但容量就高达60GB&#xff0c;使用自己编写的工具清理后&#xff0c;减到了30GB多。清理了不是很重要的文件和文件…...

vue 安装脚手架报错 certificate has expired

vue 安装脚手架的时候报错&#xff0c;报错信息如下&#xff1a; 错误信息&#xff1a;npm ERR! request to https://registry.npm.taobao.org/vue%2fcli failed, reason: certificate has expired 翻译&#xff1a;npm ERR&#xff01;请求到https://registry.npm.taobao.org…...

使用 Python 快速开始机器学习

&#x1f517; 快速开始 PyTorch&#xff5c;使用 Python 建立深度学习模型 认识 PyTorch 1.1 Torch 与 PyTorch 1.2 安装 PyTorch 1.3 验证安装并查看 PyTorch 版本PyTorch 深度学习模型的建立范式 2.1 准备数据 2.2 定义模型 2.3 训练模型 2.4 评估模型 2.5 做出预测为预测任…...

CCDP.02.OS正确部署后的Dashboard摘图说明

前言 在部署成功OpenStack后&#xff0c;应该可以在浏览器打开Dashboard&#xff0c;并对计算资源&#xff08;这里主要是指VM&#xff09;进行管理&#xff0c;也可以在Dashboard上面查看OpenStack是否存在错误&#xff0c;下面&#xff0c;已针对检查的关键点&#xff0c;用红…...

【计算机视觉】Gaussian Splatting源码解读补充(二)

第一部分 本文是对学习笔记之——3D Gaussian Splatting源码解读的补充&#xff0c;并订正了一些错误。 目录 三、相机相关scene/cameras.py&#xff1a;class Camera 四、前向传播&#xff08;渲染&#xff09;&#xff1a;submodules/diff-gaussian-rasterization/cuda_rast…...

Java transient 关键字

Java字段不想序列化怎么办 在 Java 中&#xff0c;如果某个字段不想被序列化&#xff08;即不希望被写入到序列化的数据流中&#xff09;&#xff0c;可以使用 transient 关键字进行标记。通过在字段前加上 transient 关键字&#xff0c;可以告诉 Java 序列化机制忽略该字段&am…...

前端工程化(三)邂逅Webpack和打包过程

目录 Vue项目加载Webpack 安装Webpack的默认打包创建局部的 webpack Vue项目加载 JavaScript的打包&#xff1a;  将ES6转换成ES5的语法&#xff1b;  TypeScript的处理&#xff0c;将其转换成JavaScript&#xff1b; Css的处理&#xff1a;  CSS文件模块的加载、提取&a…...

Gradle v8.5 笔记 - 从入门到进阶(基于 Kotlin DSL)

目录 一、前置说明 二、Gradle 启动&#xff01; 2.1、安装 2.2、初始化项目 2.3、gradle 项目目录介绍 2.4、Gradle 项目下载慢&#xff1f;&#xff08;万能解决办法&#xff09; 2.5、Gradle 常用命令 2.6、项目构建流程 2.7、设置文件&#xff08;settings.gradle…...

Jmeter-基础元件使用(二)-属性及对数据库简单操作

一、Jmeter属性 当我们想要在不同线程组中使用某变量&#xff0c;就需要使用属&#xff0c;此时Jmeter属性的设置需要函数来进行set和get操作 1.创建set函数 2.然后采用Beanshell取样器进行函数执行 3.调用全局变量pro_id 4.将上面生成的函数字符串粘贴到另一个线程组即可…...

App无辜躺枪?手把手教你搞定腾讯手机管家误报导致的应用商店下架

当合规应用遭遇误报下架&#xff1a;开发者系统性应对指南运动健康类应用被标记为金融诈骗软件&#xff1f;社交工具因"病毒风险"被各大商店紧急下架&#xff1f;这类看似荒谬的误报事件&#xff0c;正在成为中小开发团队的"无妄之灾"。某知名运动App开发团…...

C++中显示与隐式加载dll的使用与区别

一、什么是 DLL&#xff1f;DLL&#xff08;Dynamic Link Library&#xff09; 是 Windows 下的动态链接库&#xff0c;包含可被多个程序共享的函数、资源或类。使用 DLL 可以实现代码复用、模块化设计和插件机制。在 C 中&#xff0c;调用 DLL 中的函数有两种主要方式&#xf…...

ThinkPad开机嘀嘀响或报2100/2110错误?可能是硬盘松了!自己动手检测与修复指南

ThinkPad开机嘀嘀响或报2100/2110错误&#xff1f;三步排查硬盘接触不良问题ThinkPad用户对那个标志性的开机"嘀嘀"声再熟悉不过——正常情况下它意味着系统自检通过。但当这个声音变成急促的报警音&#xff0c;伴随屏幕上出现"2100 Detection error"或&qu…...

Android 11开发避坑:为什么你的App获取的Wifi MAC地址总是变?手把手教你配置固定MAC

Android 11开发实战&#xff1a;彻底解决Wifi MAC地址随机化问题最近在开发一个设备管理系统时&#xff0c;遇到了一个棘手的问题&#xff1a;我们的App在Android 11设备上获取的Wifi MAC地址每次都不一样&#xff0c;导致基于MAC地址的设备识别功能完全失效。经过一周的深入研…...

全链路压测实战:双十一级别的流量,我是这样扛住的

作为一名在质量保障领域摸爬滚打多年的测试工程师&#xff0c;我深知传统的单接口压测在如今分布式架构下的无力感。当业务流量达到双十一这种脉冲式、高并发的级别时&#xff0c;任何一个非核心链路上的“短板”都可能引发系统性的雪崩。全链路压测不再是选择题&#xff0c;而…...

Claude端到端测试设计:从零搭建可审计、可回放、可量化的AI服务测试流水线(含开源Schema校验工具)

更多请点击&#xff1a; https://codechina.net 第一章&#xff1a;Claude端到端测试设计 端到端测试是验证Claude模型在真实用户交互链路中行为一致性的关键手段。它覆盖从原始提示输入、上下文管理、流式响应生成&#xff0c;到输出解析与业务校验的全路径&#xff0c;确保模…...

Sora 2 MOV导出画质崩坏真相:HDR10元数据丢失、BT.2020色域截断、帧率标志位误写——3大隐性缺陷紧急修复方案

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;Sora 2 MOV导出画质崩坏的系统性认知 Sora 2 在生成高保真视频后&#xff0c;导出为 MOV 格式时频繁出现色度抽样失真、动态范围压缩、帧间伪影加剧等现象&#xff0c;其本质并非单一环节失效&#xff…...

3分钟告别英文恐惧:Android Studio中文界面轻松切换指南

3分钟告别英文恐惧&#xff1a;Android Studio中文界面轻松切换指南 【免费下载链接】AndroidStudioChineseLanguagePack AndroidStudio中文插件(官方修改版本&#xff09; 项目地址: https://gitcode.com/gh_mirrors/an/AndroidStudioChineseLanguagePack 你是否曾经因…...

告别Appium!用Python+UIAutomator2搞定Android自动化测试(附完整环境搭建与实战代码)

PythonUIAutomator2&#xff1a;Android自动化测试的高效实践指南 在移动应用测试领域&#xff0c;效率与稳定性始终是工程师们追求的核心目标。传统方案如Appium虽然功能全面&#xff0c;但在执行速度和资源消耗方面往往难以满足高频测试需求。本文将带您探索基于Python和UIA…...

原来专业的赛事专用匹克球厂家有这么多门道?

引言在匹克球运动蓬勃发展的当下&#xff0c;专业赛事专用匹克球的选择至关重要。很多人可能不知道&#xff0c;看似普通的赛事专用匹克球背后&#xff0c;其实隐藏着诸多门道。接下来&#xff0c;我们就一起深入探究专业赛事专用匹克球厂家的秘密。核心技术与材料的门道专业赛…...