当前位置: 首页 > news >正文

基于springboot websocket和okhttp实现消息中转

1、业务介绍

消息源服务的消息不能直接推给用户侧,用户与中间服务建立websocket连接,中间服务再与源服务建立websocket连接,源服务的消息推给中间服务,中间服务再将消息推送给用户。流程如下图:
在这里插入图片描述
此例中我们定义中间服务A的端口为8082,消息源头服务B的端口为8081,方便阅读下面代码。
说明:此例子只实现了中间服务的转发,连接的关闭等其他逻辑并没有完善,如需要请自行完善;

2、中间服务实现

中间服务即为上图的中间服务A,由于中间服务既要发送(发给用户端)消息,又要接收(从消息源服务B接收)消息,故服务A分为服务端与客户端。
服务A的websocket服务端我们使用springboot websocket实现,客户端使用okhttp实现;会话缓存暂使用内存缓存(实际项目中可置于其他缓存中)
中间服务所需依赖为:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId>
</dependency>
<dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp</artifactId><version>4.2.2</version>
</dependency>

缓存类:

public class WSCache {//存储客户端session信息, {会话id:ws_session}public static Map<String, Session> clients = new ConcurrentHashMap<>();//存储把不同用户的客户端session信息集合 {userId, [会话id1,会话id2,会话id3,会话id4]}public static Map<String, Set<String>> connection = new ConcurrentHashMap<>();
}

自定义消息类:

@Accessors(chain = true)
@Data
public class MsgInfo {private String massage;//为userId,用于从缓存中获取对应用户的websocket sessionprivate String userKey;
}

2.1 中间服务A的客户端:

客户端也可以使用springboot websocket,当下我们选择使用okhttp实现。

@Slf4j
public class CommonWSClient extends WebSocketListener {/*** websocket连接建立** @param webSocket* @param response*/@Overridepublic void onOpen(WebSocket webSocket, Response response) {super.onOpen(webSocket, response);log.info("客户端连接建立:{}", response.body().string());}/*** 收到消息* @param webSocket* @param text*/@Overridepublic void onMessage(WebSocket webSocket, String text) {super.onMessage(webSocket, text);log.info("okhttp receive=>{}", text);//todo 收到源(8081)的消息,取到对应userId的消息,并将消息通过本地server发送给用户ObjectMapper mapper = new ObjectMapper();try {MsgInfo msgInfo = mapper.readValue(text, MsgInfo.class);Set<String> strings = WSCache.connection.get(msgInfo.getUserKey());if(!CollectionUtils.isEmpty(strings)){for (String sid : strings) {Session session = WSCache.clients.get(sid);session.getBasicRemote().sendText(msgInfo.getMassage());}}} catch (Exception e) {e.printStackTrace();//throw new RuntimeException(e);}}@Overridepublic void onMessage(WebSocket webSocket, ByteString bytes) {super.onMessage(webSocket, bytes);}@Overridepublic void onClosing(WebSocket webSocket, int code, String reason) {super.onClosing(webSocket, code, reason);log.info("okhttp socket closing.");}@Overridepublic void onClosed(WebSocket webSocket, int code, String reason) {super.onClosed(webSocket, code, reason);log.info("okhttp socket closed.");}@Overridepublic void onFailure(WebSocket webSocket, Throwable t, Response response) {super.onFailure(webSocket, t, response);if (response == null) {log.error("okhttp onFailure, response is null.");return;}try {log.error("okhttp onFailure, code: {}, errmsg: {}", response.code(), response.body().string());} catch (IOException e) {log.warn("okhttp onFailure failed, error: {}", e.getMessage());}}}

2.2 中间服务A的服务端:

websocket服务:

@Slf4j
@Component
@ServerEndpoint("/notice/{userId}")
public class WebSocketServer {//会话idprivate String sid = null;//建立连接的用户idprivate String userId;/*** @description: 当与用户端连接成功时,执行该方法* @PathParam 获取ServerEndpoint路径中的占位符信息类似 控制层的 @PathVariable注解**/@OnOpenpublic String onOpen(Session session, @PathParam("userId") String userId){this.sid = UUID.randomUUID().toString();this.userId = userId;WSCache.clients.put(this.sid,session);//判断该用户是否存在会话信息,不存在则添加Set<String> clientSet = WSCache.connection.get(userId);if (CollectionUtils.isEmpty(clientSet)){clientSet = new HashSet<>();clientSet.add(this.sid);}else {clientSet.add(this.sid);}WSCache.connection.put(userId,clientSet);log.info("用户{}与本地(8082)server建立连接", this.userId);//todo 本地client与源server(8081)连接Request requestRemote = new Request.Builder().url("ws://127.0.0.1:8081/api/notice/" + userId).build();OkHttpClient webSocketClientRemote = new OkHttpClient.Builder().build();WebSocket localClientRemote = webSocketClientRemote.newWebSocket(requestRemote, new CommonWSClient());log.info("本地server创建本地client,且本地client与远程(8082)server连接成功");return userId + "与本地server连接";}/*** @description: 当连接失败时,执行该方法**/@OnClosepublic void onClose(){WSCache.clients.remove(this.sid);System.out.println(this.sid+"连接断开");}/*** @description: 当收到client发送的消息时,执行该方法**/@OnMessagepublic void onMessage(String message, Session session) {System.out.println("-----------收到来自用户:" + this.userId + "的信息   " + message);}/*** @description: 当连接发生错误时,执行该方法**/@OnErrorpublic void onError(Throwable error){System.out.println("error--------系统错误");error.printStackTrace();}
}

websocket配置类:

@Configuration
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter(){return new ServerEndpointExporter();}
}

3、消息源服务

消息源服务B只需要websocket服务用来发送消息即可,其实现与中间服务A的服务端相同。
服务:

@Slf4j
@Component
@ServerEndpoint("/notice/{userId}")
public class WebSocketServer {//存储客户端session信息, {会话id:ws_session}public static Map<String, Session> clients = new ConcurrentHashMap<>();//存储把不同用户的客户端session信息集合 {userId, [会话id1,会话id2,会话id3,会话id4]}public static Map<String, Set<String>> connection = new ConcurrentHashMap<>();//会话idprivate String sid = null;//建立连接的用户idprivate String userId;/*** @description: 当与客户端的websocket连接成功时,执行该方法* @PathParam 获取ServerEndpoint路径中的占位符信息类似 控制层的 @PathVariable注解**/@OnOpenpublic void onOpen(Session session, @PathParam("userId") String userId){log.info("onOpen-->session.getRequestParameterMap():{}", session.getRequestParameterMap());this.sid = UUID.randomUUID().toString();this.userId = userId;clients.put(this.sid,session);//判断该用户是否存在会话信息,不存在则添加Set<String> clientSet = connection.get(userId);if (clientSet == null){clientSet = new HashSet<>();connection.put(userId,clientSet);}clientSet.add(this.sid);System.out.println(this.userId + "用户建立连接," + this.sid+"连接开启!");}/*** @description: 当连接失败时,执行该方法**/@OnClosepublic void onClose(){clients.remove(this.sid);System.out.println(this.sid+"连接断开");}/*** @description: 当收到客户端发送的消息时,执行该方法**/@OnMessagepublic void onMessage(String message, Session session) {System.out.println("-----------收到来自用户:" + this.userId + "的信息   " + message);//自定义消息实体MsgInfo msgInfo = new MsgInfo().setUserKey(this.userId).setMassage("服务端-" + System.currentTimeMillis() + ":已收到用户" +this.userId + "的信息: " + message);sendMessageByUserId(this.userId,  msgInfo);}/*** @description: 当连接发生错误时,执行该方法**/@OnErrorpublic void onError(Throwable error){System.out.println("error--------系统错误");error.printStackTrace();}/*** @description: 通过userId向用户发送信息* 该类定义成静态可以配合定时任务实现定时推送**/public static void sendMessageByUserId(String userId, MsgInfo msgInfo){if (!StringUtils.isEmpty(userId)) {Set<String> clientSet = connection.get(userId);//用户是否存在客户端连接if (Objects.nonNull(clientSet)) {Iterator<String> iterator = clientSet.iterator();while (iterator.hasNext()) {String sid = iterator.next();Session session = clients.get(sid);//向每个会话发送消息if (Objects.nonNull(session)){try {//同步发送数据,需要等上一个sendText发送完成才执行下一个发送ObjectMapper mapper = new ObjectMapper();session.getBasicRemote().sendText(mapper.writeValueAsString(msgInfo));} catch (Exception e) {e.printStackTrace();}}}}}}@Scheduled(cron = "0/10 * * * * ?")public void testSendMessageByCron(){log.info("-----------模拟消息开始发送--------------");//模拟两个用户100和200MsgInfo msg100 = new MsgInfo().setUserKey("100").setMassage("这是8081发给用户100的消息" + System.currentTimeMillis());sendMessageByUserId("100", msg100);MsgInfo msg200 = new MsgInfo().setUserKey("200").setMassage("这是8081发给用户200的消息" + System.currentTimeMillis());sendMessageByUserId("200", msg200);}
}

4、测试

我们使用: wss在线测试工具进行测试;
1、 打开两个该工具窗口,分别模拟用户100和用户200,这两个用户都连接中间服务A(端口8082的服务);
用户100
用户200
2、分别启动消息源服务B和中间服务A
此时在服务B控制台我们可以看到:
在这里插入图片描述
我们模拟的消息发送已经在给用户100和用户200发送,因为我们的用户100和用户200均没有与中间服务A建立连接,故此时测试界面看不到消息;
当我们在用户100的模拟界面点击“开启连接”后,可以在右侧看到发给用户100的模拟消息:
在这里插入图片描述

之后我们再打开用户200的连接:
在这里插入图片描述

好了,到这里就结束了,有任何问题请积极指出,此例子只是个例子,并未经受任何生产的测试,欢迎讨论沟通:)

相关文章:

基于springboot websocket和okhttp实现消息中转

1、业务介绍 消息源服务的消息不能直接推给用户侧&#xff0c;用户与中间服务建立websocket连接&#xff0c;中间服务再与源服务建立websocket连接&#xff0c;源服务的消息推给中间服务&#xff0c;中间服务再将消息推送给用户。流程如下图&#xff1a; 此例中我们定义中间服…...

@PostConstruct 注解的方法用于资源的初始化

PostConstruct 是 Java EE 5 引入的一个注解&#xff0c;主要用于依赖注入完成之后&#xff0c;需要执行的方法上。这个注解的方法会在依赖注入完成后自动被容器&#xff08;如 EJB 容器或 Spring 容器&#xff09;调用&#xff0c;并且只会被调用一次。 PostConstruct 注解的…...

(一)SvelteKit教程:hello world

&#xff08;一&#xff09;SvelteKit教程&#xff1a;hello world sveltekit 的官方教程&#xff0c;在这里&#xff1a;Creating a project • Docs • SvelteKitCreating a project • Docs • SvelteKit 我们可以按照如下的步骤来创建一个项目&#xff1a; npm create s…...

华为Atlas NPU ffmpeg 编译安装

处理器&#xff1a;鲲鹏920 NPU&#xff1a;昇腾 310P3 操作系统&#xff1a;Kylin Linux Advanced Server V10 CANN&#xff1a;Ascend-cann-toolkit_8.0.RC1_linux-aarch64.run FFmpeg&#xff1a;AscendFFmpegPlugin(不要用AscendFFmpeg) AscendFFmpegPlugin下载地址&…...

Python 虚拟环境 requirements.txt 文件生成 ;pipenv导出pip安装文件

搜索关键词: Python 虚拟环境Pipenv requirements.txt 文件生成;Pipenv 导出 pip requirements.txt安装文件 本文基于python版本 >3.9 文章内容有效日期2023年01月开始(因为此方法从这个时间开始是完全ok的) 上述为pipenv的演示版本 使用以下命令可精准生成requirement…...

Less与Sass的区别

1. 功能和工具&#xff1a; Sass&#xff1a;提供了更多的功能和内置方法&#xff0c;如条件语句、循环、数学函数等。Sass 也支持更复杂的操作和逻辑构建。 Less&#xff1a;功能也很强大&#xff0c;但相比之下&#xff0c;Sass 在功能上更为丰富和成熟。 2、编译环境&…...

力扣-2663

题目 如果一个字符串满足以下条件&#xff0c;则称其为 美丽字符串 &#xff1a; 它由英语小写字母表的前 k 个字母组成。它不包含任何长度为 2 或更长的回文子字符串。 给你一个长度为 n 的美丽字符串 s 和一个正整数 k 。 请你找出并返回一个长度为 n 的美丽字符串&#…...

CausalMMM:基于因果结构学习的营销组合建模

1. 摘要 在线广告中&#xff0c;营销组合建模&#xff08;Marketing Mix Modeling&#xff0c;MMM&#xff09; 被用于预测广告商家的总商品交易量&#xff08;GMV&#xff09;&#xff0c;并帮助决策者调整各种广告渠道的预算分配。传统的基于回归技术的MMM方法在复杂营销场景…...

编译 CUDA 程序的基本知识和步骤

基本工具 NVCC&#xff08;NVIDIA CUDA Compiler&#xff09;: nvcc 是 NVIDIA 提供的 CUDA 编译器&#xff0c;用于将 CUDA 源代码&#xff08;.cu 文件&#xff09;编译成可执行文件或库。它可以处理 CUDA 和主机代码&#xff08;例如 C&#xff09;的混合编译。nvcc 调用底层…...

[SAP ABAP] 排序内表数据

语法格式 整表排序 SORT <itab> [ASCENDING|DESCENDING]. 按指定字段排序 SORT <itab> BY f1 [ASCENDING|DESCENDING] f2 [ASCENDING|DESCENDING] ... fn [ASCENDING|DESCENDING].<itab>&#xff1a;代表内表 不指定排序方式则默认升序排序 示例1 结果显…...

【UML用户指南】-21-对基本行为建模-活动图

目录 1、概念 2、组成结构 2.1、动作 2.2、活动节点 2.3、控制流 2.4、分支 2.5、分岔和汇合 2.6、泳道 2.7、对象流 2.8、扩展区域 3、一般用法 3.1、对工作流建模 3.2、对操作建模 一个活动图从本质上说是一个流程图&#xff0c;展现从活动到活动的控制流 活动图…...

【web2】jquary,bootstrap,vue

文章目录 1.jquary&#xff1a;选择器1.1 jquery框架引入&#xff1a;$("mydiv") 当成id选择器1.2 jquery版本/对象&#xff1a;$(js对象) -> jquery对象1.3 jquery的页面加载事件&#xff1a;$ 想象成 window.onload 1.4 jquery的基本选择器&#xff1a;$()里内容…...

独角兽品牌獭崎酱酒:高性价比的酱香之选

在酱香型白酒领域中&#xff0c;獭崎酱酒以其独特的品牌定位和高性价比迅速崛起&#xff0c;成为市场上备受关注的独角兽品牌。作为贵州茅台镇的一款新秀酱香酒&#xff0c;獭崎酱酒不仅传承了百年酿造工艺&#xff0c;还以创新的商业模式和亲民的价格赢得了广大消费者的青睐。…...

java打印菱形和空心菱形

java打印菱形 菱形分上下两个部分。其中上部分同打印金字塔&#xff1b;下部分循环部分i是递减 &#xff08;ps:菱形层数只能为奇数&#xff09; import java.util.Scanner;public class Lingxing{public static void main(String[] args) {Scanner myScanner new Scanner(S…...

Day10 —— 大数据技术之Scala

Scala编程入门 Scala的概述什么是Scala&#xff1f;Scala的重要特点Scala的使用场景 Scala的安装Scala基础Scala总结 Scala的概述 什么是Scala&#xff1f; Scala是一种将面向对象和函数式编程结合在一起的高级语言&#xff0c;旨在以简洁、优雅和类型安全的方式表达通用编程…...

Linux应用系统快速部署:docker快速部署linux应用程序

目录 一、背景 &#xff08;一&#xff09;引入docker的起因 &#xff08;二&#xff09;docker介绍 &#xff08;三&#xff09;Docker部署的优势 1、轻量级和可移植性 2、快速部署和扩展 3、一致性 4、版本控制 5、安全性 6、资源隔离 7、简化团队协作 8、多容器…...

三目运算符中间的表达式可以省略吗(a?:c)?

熟悉C语言的童靴对三目运算符都非常熟悉&#xff0c;a? b : c; 如果a为true&#xff0c;则整个运算符的值为b,否则为c;那么问题来了&#xff0c;三目运算符中间的表达式可以省略吗?即a? : c; 1、linux内核中出现的省略情况 本人在阅读内核代码是发现了下面的代码: preferr…...

android 彩虹进度条自定义view实现

实现一个彩虹色进度条功能&#xff0c;不说明具体用途大家应该能猜到。想找别人造的轮子&#xff0c;但是没有合适的&#xff0c;所以决定自己实现一个。 相关知识 android 自定义view LinearGradient 线性渐变 实现步骤 自定义view 自定义一个TmcView类继承View 重写两…...

免费一年SSL证书申请——建议收藏

免费一年SSL证书申请——建议收藏 获取免费一年期SSL证书其实挺简单的 准备你的网站&#xff1a; 确保你的网站已经有了域名&#xff0c;而且这个域名已经指向你的服务器。还要检查你的服务器支持HTTPS&#xff0c;也就是443端口要打开&#xff0c;这是HTTPS默认用的。 验证域…...

【docker1】指令,docker-compose,Dockerfile

文章目录 1.pull/image&#xff0c;run/ps&#xff08;进程&#xff09;&#xff0c;exec/commit2.save/load&#xff1a;docker save 镜像id&#xff0c;不是容器id3.docker-compose&#xff1a;多容器&#xff1a;宿主机&#xff08;eth0网卡&#xff09;安装docker会生成一…...

PyTorch 2.8镜像实操手册:Git+vim+htop+screen开发运维一体化工作流

PyTorch 2.8镜像实操手册&#xff1a;Gitvimhtopscreen开发运维一体化工作流 1. 镜像概述与环境准备 PyTorch 2.8深度学习镜像是一个为专业开发者打造的全功能工作环境&#xff0c;基于RTX 4090D 24GB显卡和CUDA 12.4进行了深度优化。这个镜像不仅预装了最新版的PyTorch框架&…...

为什么你的Java车载服务在-40℃冷启动失败?温度敏感型ClassLoader加载异常的12小时紧急修复路径

第一章&#xff1a;为什么你的Java车载服务在-40℃冷启动失败&#xff1f;温度敏感型ClassLoader加载异常的12小时紧急修复路径低温环境并非仅影响硬件可靠性——JVM 的类加载机制在极端低温下会触发底层文件系统与内存映射的隐式行为偏移。某车规级 Java 服务在-40℃冷启动时反…...

FastAPI 2.0 + LLM流式输出全栈方案,含OpenAI兼容层、前端SSE重连策略、服务端背压控制(仅限内部技术白皮书级实录)

第一章&#xff1a;FastAPI 2.0 异步 AI 流式响应教程概览FastAPI 2.0 原生强化了对异步流式响应&#xff08;StreamingResponse&#xff09;的支持&#xff0c;为构建低延迟、高吞吐的 AI 接口&#xff08;如大语言模型推理、语音合成、实时图像生成&#xff09;提供了坚实基础…...

彩灯广告屏PLC控制S7-200程序:包含梯形图、接线图、原理图及IO分配与组态画面详解

彩灯广告屏的PLC控制S7-200程序 程序 我们主要的后发送的产品有&#xff0c;带解释的梯形图接线图原理图图纸&#xff0c;io分配&#xff0c;组态画面上周刚帮客户搞定了一套户外彩灯广告屏的PLC控制项目&#xff0c;用的还是经典的S7-200&#xff0c;本来以为老架构玩不出花…...

Linux远程连接工具评测与选型指南

1. Linux远程连接工具概述作为一名嵌入式Linux开发者&#xff0c;我每天都需要通过远程连接工具访问各种开发板和服务器。在多年的实践中&#xff0c;我尝试过市面上几乎所有主流的远程终端工具&#xff0c;深知每款工具的特点和适用场景。选择一款合适的远程连接工具&#xff…...

赛美特冲刺港股:年营收7亿,刚完成8亿融资,估值73亿

雷递网 雷建平 3月31日赛美特信息集团股份有限公司&#xff08;简称&#xff1a;“赛美特”&#xff09;日前更新招股书&#xff0c;准备在港交所上市。赛美特成立以来获得多次融资&#xff0c;其中&#xff0c;2023年4月完成2.33亿元融资&#xff0c;投后估值62.33亿&#xff…...

如何突破Cursor AI使用限制?解锁永久免费Pro功能的终极指南

如何突破Cursor AI使用限制&#xff1f;解锁永久免费Pro功能的终极指南 【免费下载链接】cursor-free-vip [Support 0.45]&#xff08;Multi Language 多语言&#xff09;自动注册 Cursor Ai &#xff0c;自动重置机器ID &#xff0c; 免费升级使用Pro 功能: Youve reached you…...

Phi-4-mini-reasoning效果对比:在GSM8K与AQuA数据集上的zero-shot推理表现

Phi-4-mini-reasoning效果对比&#xff1a;在GSM8K与AQuA数据集上的zero-shot推理表现 1. 模型介绍 Phi-4-mini-reasoning是一款专注于推理任务的文本生成模型&#xff0c;特别擅长处理需要多步逻辑分析和精确结论输出的任务场景。与通用对话模型不同&#xff0c;它被专门设计…...

学习网络安全至少需要什么配置的电脑?

很多同学对于学习 Web 渗透所需的电脑配置仍有疑问&#xff0c;所以老师结合自己的教学经验&#xff0c;总结了关于电脑配置要求的一些内容&#xff0c;遂成此文。当然&#xff0c;对于电脑配置的追求是无上限的&#xff0c;所以有条件的话最好还是搞一台配置强劲的电脑。 一、…...

【测试之道】第四篇:分层测试论 —— 金字塔、奖杯与蜂巢:构建你的质量防御阵型

专栏进度&#xff1a;04 / 10 (测试理论专题) 在不同的架构&#xff08;单体、微服务、前端驱动&#xff09;下&#xff0c;测试资源的分配比例是完全不同的。盲目套用模板是测试经理最容易犯的错误。 一、 经典模型&#xff1a;测试金字塔 (Testing Pyramid) 由 Mike Cohn 提出…...