接口 V2 完善:分布式环境下的 WebSocket 实现与 Token 校验
🎯 本文档详细介绍了如何使用WebSocket协议优化客户端与服务端之间的通信,特别是在处理异步订单创建通知的场景中。通过引入WebSocket代替传统的HTTP请求-响应模式,实现了服务器主动向客户端推送数据的功能,极大地提高了实时性和效率。文中首先概述了WebSocket的优势,随后深入探讨了其在分布式系统中的具体实现,包括依赖管理、网关配置、WebSocket服务类的设计以及消息队列的使用等关键环节。特别地,针对分布式架构下WebSocket连接状态同步问题,提出了一种基于消息队列广播机制的解决方案,确保了系统的可扩展性和稳定性。同时,还强调了心跳检测机制的重要性,以维护连接的有效性。
🏠️ HelloDam/场快订(场馆预定 SaaS 平台)
文章目录
- 前言
- WebSocket 介绍
- 流程图
- 具体实现
- 依赖
- 网关配置
- WebSocket配置类
- WebSocket服务类
- MQ消费者
- 启动类
- 配置文件
- 注意事项
- 登录验证
- WebSocket 配置类
- token校验
- 分布式 WebSocket
- 心跳检测
前言
在时间段预定接口 V2 中,用户预定之后,会发送一个消息,让消息队列异步创建订单。此时客户端是无法知道服务端什么时候完成订单创建的,因此需要服务端告知客户端。但是以往都是客户端给服务端发 http 请求,但是服务端如何主动告知客户端呢?
这个时候就需要请出我们今天的主角 WebSocket 了
WebSocket 介绍
WebSocket是一种在单个TCP连接上进行全双工通信的协议。它使得客户端和服务器之间的数据交换变得更加简单,允许服务器直接向客户端推送数据而不必由客户端发起请求。这种特性让实时性要求较高的应用,如即时通讯工具、在线游戏以及实时交易系统等,能够更加高效地进行数据交互。通过WebSocket,开发者可以构建响应更快、性能更高的网络应用,同时减少不必要的网络开销和延迟。相比传统的HTTP请求-响应模式,WebSocket提供了更低的延迟和更高的效率,特别是在需要频繁更新数据的应用场景中表现出色。
因此使用了 WebSocket ,一旦客户端和服务端建立了连接,当订单创建成功之后,服务端直接别订单数据推送给客户端即可。
流程图
user1、user2 和 user3 分别发起 WebSocket 连接,首先经过网关,连接请求被分发到不同的服务中。WebSocket 服务接收到连接请求之后,对其进行登录校验,如果校验成功,将其 Session 信息存储在服务器的内存中,如果校验失败,直接关闭 Session 。其中 user1、user2 的Session信息被存储在 WebSocket 服务1 中,user3 的Session信息被存储在 WebSocket 服务2 中。
当用户预定时间段,生成订单之后,场馆服务向消息队列中发生订单数据。接着消息队列将订单数据广播到 WebSocket 服务1 和 WebSocket 服务2中。WebSocket 服务2 发现自己的内存中存有 user3 的Session,因此将订单数据通过该 Session 发送给 user3 。
暂时无法在飞书文档外展示此内容
具体实现
为了解耦 WebSocket 和其他服务,单独创建一个 WebSocket 服务。
依赖
<dependencies><dependency><groupId>com.vrs</groupId><artifactId>vrs-web</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>org.dam</groupId><artifactId>vrs-rocketmq</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>com.vrs</groupId><artifactId>vrs-common</artifactId></dependency><dependency><groupId>com.vrs</groupId><artifactId>vrs-idempotent</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><!-- websocket --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
</dependencies>
网关配置
当访问 /websocket/**
路径时,将请求转化到 WebSocket 服务,注意,转发的时候添加了前缀ws:
- id: vrs-websocketuri: lb:ws://vrs-websocketpredicates:- Path=/websocket/**filters:- name: TokenValidateargs:whitePathList:- /websocket/**
【去除默认过滤器】
如果像这样全局配置了默认过滤器,DedupeResponseHeader
过滤器的作用是对指定的响应头(在这个例子中为Vary
、Access-Control-Allow-Origin
和Access-Control-Allow-Credentials
)进行去重。当有多个相同名称的响应头时,它会按照给定的策略保留其中的一个。这里的策略是RETAIN_FIRST
,意味着它将保留这些头部中第一次出现的那个,而删除后续出现的重复头部。
spring:cloud:gateway:default-filters:- DedupeResponseHeader=Vary Access-Control-Allow-Origin Access-Control-Allow-Credentials, RETAIN_FIRST
发起 WebSocket 连接的时候,会报如下错误,这是因为修改了只读的请求头
java.lang.UnsupportedOperationException: nullat org.springframework.http.ReadOnlyHttpHeaders.set(ReadOnlyHttpHeaders.java:108) ~[spring-web-6.0.9.jar:6.0.9]Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):*__checkpoint ⇢ org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter [DefaultWebFilterChain]*__checkpoint ⇢ HTTP GET "/websocket/admin?token=eyJhbGciOiJIUzUxMiIsInppcCI6IkdaSVAifQ.H4sIAAAAAAAA_6tWKi5NUrJScgwN8dANDXYNUtJRSq0oULIyNDe2NDMyNrYw0lEqLU4t8kwBilmYmZgZm5sbG5mbGViYGpgYQyX9EnNTgYYkpuRm5ilBhEIqC4BCRrUAvgeVqmEAAAA.e7wanr0gKu4FD-Y_afO2MEIECxZ6oMKGlf8zarZp-GOmzqL5n354gasKr7GKKs4H3Pq0CYJQECO_Rv9ixGsvZQ" [ExceptionHandlingWebHandler]
Original Stack Trace:at org.springframework.http.ReadOnlyHttpHeaders.set(ReadOnlyHttpHeaders.java:108) ~[spring-web-6.0.9.jar:6.0.9]
因此需要将上述配置删除,如果还需要这些默认配置,可以到具体的路由下面设置,就像下面一样
spring:cloud:gateway:routes:- id: vrs-adminuri: lb://vrs-adminpredicates:- Path=/admin/**filters:- DedupeResponseHeader=Vary Access-Control-Allow-Origin Access-Control-Allow-Credentials, RETAIN_FIRST- name: TokenValidateargs:whitePathList:- /admin/user/v1/login- /admin/user/v1/wechatLogin- ...
WebSocket配置类
配置类 WebSocketConfig
主要用于配置和初始化 WebSocket 服务器端点,并处理与 WebSocket 连接相关的操作,具体功能如下:
- Spring Bean 注册:通过
@Configuration
注解标明这是一个 Spring 配置类。在该类中定义了一个@Bean
方法serverEndpointExporter()
,它返回一个ServerEndpointExporter
实例。这个实例的作用是自动注册使用了@ServerEndpoint
注解声明的 WebSocket 端点对象到 Spring 容器中。 - 握手请求修改:
modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response)
方法重写了父类中的同名方法,用于在建立 WebSocket 连接前对握手请求进行自定义修改。在这个例子中,方法尝试从握手请求参数中获取名为 “token” 的参数,并将其存储在ServerEndpointConfig
对象的用户属性中(即sec.getUserProperties().put("token", token);
)。这使得后续逻辑可以通过访问端点配置对象来获取令牌信息。 - 端点实例化:
getEndpointInstance(Class<T> clazz)
方法重写了父类的方法,用于提供自定义逻辑来实例化被@ServerEndpoint
标注的 WebSocket 端点类。在这个实现中,它直接调用了父类的实现super.getEndpointInstance(clazz)
来创建端点实例。通常情况下,除非需要特别的实例化逻辑,否则可以直接使用父类的默认实现。
package com.vrs.config;import jakarta.websocket.HandshakeResponse;
import jakarta.websocket.server.HandshakeRequest;
import jakarta.websocket.server.ServerEndpointConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;import java.util.List;
import java.util.Map;/*** @Author dam* @create 2025/1/24 15:25*/
@Configuration
public class WebSocketConfig extends ServerEndpointConfig.Configurator {/*** 这个bean会自动注册使用了@ServerEndpoint注解声明的对象** @return*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}/*** 建立握手时,连接前的操作*/@Overridepublic void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {// 获取请求参数Map<String, List<String>> parameterMap = request.getParameterMap();List<String> tokenList = parameterMap.get("token");if (tokenList != null && !tokenList.isEmpty()) {String token = tokenList.get(0);sec.getUserProperties().put("token", token);}}/*** 初始化端点对象,也就是被@ServerEndpoint所标注的对象*/@Overridepublic <T> T getEndpointInstance(Class<T> clazz) throws InstantiationException {return super.getEndpointInstance(clazz);}
}
WebSocket服务类
WebSocketServer
类是为实现实时通信而设计的,能够有效地管理多个客户端之间的双向通信以及保持这些通信的稳定性和可靠性。它通过 Spring 的 @Component
和 Jakarta WebSocket 的 @ServerEndpoint
注解被注册为一个 Spring Bean,并监听路径为 /websocket/{username}
的 WebSocket 请求。该类利用一个静态的 ConcurrentHashMap
来存储每个用户的会话 (Session
) 和最后一次活动时间,以跟踪在线用户和他们的活跃状态。它实现了以下关键功能:
- 连接管理:处理用户的连接建立 (
onOpen
) 和关闭 (onClose
) 事件,包括校验用户提供的 token 是否有效。 - 消息处理:接收来自客户端的消息 (
onMessage
) 并据此更新用户的最后活动时间,支持发送 PING/PONG 心跳消息来维持连接。 - 心跳检测:通过定时任务每30秒检查一次用户的心跳,若某用户超过60秒未活动,则自动断开其连接,确保资源的有效利用。
- 消息发送:提供了一个方法用于向特定用户发送消息。
package com.vrs.controller;import com.vrs.config.WebSocketConfig;
import com.vrs.constant.RedisCacheConstant;
import com.vrs.utils.JwtUtil;
import jakarta.websocket.*;
import jakarta.websocket.server.PathParam;
import jakarta.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;/*** @Author dam* @create 2024/1/24 14:32*/
// 将WebSocketServer注册为spring的一个bean
@ServerEndpoint(value = "/websocket/{username}", configurator = WebSocketConfig.class)
@Component
@Slf4j(topic = "WebSocketServer")
public class WebSocketServer {/*** 心跳检查间隔时间(单位:秒)*/private static final int HEARTBEAT_INTERVAL = 30;/*** 心跳超时时间(单位:秒)*/private static final int HEARTBEAT_TIMEOUT = 60;/*** 记录当前在线连接的客户端的session*/private static final Map<String, Session> usernameAndSessionMap = new ConcurrentHashMap<>();/*** 记录用户最后一次活动时间*/private static final Map<String, Long> lastActivityTimeMap = new ConcurrentHashMap<>();/*** 直接通过 Autowired 注入的话,redisTemplate为null,因此使用这种引入方式*/private static StringRedisTemplate redisTemplate;@Autowiredpublic void setRabbitTemplate(StringRedisTemplate redisTemplate) {WebSocketServer.redisTemplate = redisTemplate;}/*** 定时任务线程池,用于心跳检查*/private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);// 初始化心跳检查任务static {scheduler.scheduleAtFixedRate(WebSocketServer::checkHeartbeat, HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);}/*** 浏览器和服务端连接建立成功之后会调用这个方法*/@OnOpenpublic void onOpen(Session session, @PathParam("username") String username, EndpointConfig config) {// 校验 token 是否有效String token = (String) config.getUserProperties().get("token");boolean validToken = validToken(token);if (!validToken) {try {session.close(new CloseReason(CloseReason.CloseCodes.VIOLATED_POLICY, "无效的token,请先登录"));} catch (IOException e) {e.printStackTrace();}}// 如果用户已存在,关闭旧连接if (usernameAndSessionMap.containsKey(username)) {Session oldSession = usernameAndSessionMap.get(username);if (oldSession != null && oldSession.isOpen()) {try {oldSession.close();} catch (IOException e) {log.error("关闭旧连接时发生错误", e);}}}// 记录新连接usernameAndSessionMap.put(username, session);// 记录用户活动时间lastActivityTimeMap.put(username, System.currentTimeMillis());log.info("有新用户加入,username={}, 当前在线人数为:{}", username, usernameAndSessionMap.size());}/*** 连接关闭调用的方法*/@OnClosepublic void onClose(Session session, @PathParam("username") String username) throws IOException {try {if (session != null && session.isOpen()) {session.close();}} catch (IOException e) {log.error("关闭连接时发生错误", e);} finally {usernameAndSessionMap.remove(username);lastActivityTimeMap.remove(username);log.info("有一连接关闭,移除username={}的用户session, 当前在线人数为:{}", username, usernameAndSessionMap.size());}}/*** 发生错误的时候会调用这个方法*/@OnErrorpublic void onError(Session session, Throwable error) {log.error("发生错误,原因:" + error.getMessage());error.printStackTrace();}/*** 收到客户端消息时调用*/@OnMessagepublic void onMessage(String message, Session session, @PathParam("username") String username) {// 更新用户最后一次活动时间lastActivityTimeMap.put(username, System.currentTimeMillis());if ("PING".equals(message)) {log.debug("收到来自 {} 的心跳检测请求", username);} else {log.info("收到来自 {} 的消息: {}", username, message);}}/*** 服务端发送消息给客户端*/public void sendMessage(String toUsername, String message) {try {Session toSession = usernameAndSessionMap.get(toUsername);if (toSession != null && toSession.isOpen()) {toSession.getBasicRemote().sendText(message);} else {log.warn("用户 {} 的会话已关闭或不存在", toUsername);}} catch (Exception e) {log.error("服务端发送消息给客户端失败", e);}}/*** 关闭心跳检测超时的 session*/private static void checkHeartbeat() {long currentTime = System.currentTimeMillis();for (Map.Entry<String, Long> entry : lastActivityTimeMap.entrySet()) {String username = entry.getKey();long lastActivityTime = entry.getValue();if (currentTime - lastActivityTime > HEARTBEAT_TIMEOUT * 1000) {log.info("用户 {} 心跳超时,关闭连接", username);Session session = usernameAndSessionMap.get(username);if (session != null) {try {session.close();} catch (IOException e) {log.error("关闭连接时发生错误", e);}}usernameAndSessionMap.remove(username);lastActivityTimeMap.remove(username);}}}/*** 校验 token 有效** @param token* @return*/private boolean validToken(String token) {String userName = "";try {// 如果从 token 中解析用户名错误,说明 token 是捏造的,或者已经失效userName = JwtUtil.getUsername(token);} catch (Exception e) {return false;}if (StringUtils.hasText(userName) && StringUtils.hasText(token) &&(redisTemplate.opsForHash().get(RedisCacheConstant.USER_LOGIN_KEY + userName, token)) != null) {// --if-- 如果可以通过 token 从 Redis 中获取到用户的登录信息,说明通过校验return true;}return false;}}
MQ消费者
package com.vrs.rocketMq.listener;import com.vrs.constant.RocketMqConstant;
import com.vrs.controller.WebSocketServer;
import com.vrs.domain.dto.mq.WebsocketMqDTO;
import com.vrs.templateMethod.MessageWrapper;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** 执行预订流程 消费者** @Author dam* @create 2024/9/20 21:30*/
@Slf4j(topic = RocketMqConstant.VENUE_TOPIC)
@Component
@RocketMQMessageListener(topic = RocketMqConstant.VENUE_TOPIC,consumerGroup = RocketMqConstant.VENUE_CONSUMER_GROUP + "-" + RocketMqConstant.WEBSOCKET_SEND_MESSAGE_TAG,// 需要使用广播模式messageModel = MessageModel.BROADCASTING,// 监听tagselectorType = SelectorType.TAG,selectorExpression = RocketMqConstant.WEBSOCKET_SEND_MESSAGE_TAG
)
@RequiredArgsConstructor
public class WebSocketSendMessageListener implements RocketMQListener<MessageWrapper<WebsocketMqDTO>> {private final WebSocketServer webSocketServer;/*** 消费消息的方法* 方法报错就会拒收消息** @param messageWrapper 消息内容,类型和上面的泛型一致。如果泛型指定了固定的类型,消息体就是我们的参数*/@SneakyThrows@Overridepublic void onMessage(MessageWrapper<WebsocketMqDTO> messageWrapper) {// 开头打印日志,平常可 Debug 看任务参数,线上可报平安(比如消息是否消费,重新投递时获取参数等)log.info("[消费者] websocket发生消息给{}", messageWrapper.getMessage().getToUsername());webSocketServer.sendMessage(messageWrapper.getMessage().getToUsername(), messageWrapper.getMessage().getMessage());}
}
启动类
package com.vrs;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;/*** @Author dam* @create 2025/01/24 16:34*/
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class VrsWebSocketApplication {public static void main(String[] args) {SpringApplication.run(VrsWebSocketApplication.class, args);}
}
配置文件
server:port: 7054
spring:profiles:active: damapplication:name: vrs-websocketcloud:nacos:discovery:server-addr: 127.0.0.1:8848data:redis:host: 127.0.0.1port: 6379password: 12345678database: 0timeout: 1800000jedis:pool:max-active: 20 #最大连接数max-wait: -1 #最大阻塞等待时间(负数表示没限制)max-idle: 5 #最大空闲min-idle: 0 #最小空闲
rocketmq:# rocketMq的nameServer地址name-server: 127.0.0.1:9876producer:# 生产者组别group: vrs-websocket-group# 消息发送的超时时间send-message-timeout: 10000# 异步消息发送失败重试次数retry-times-when-send-async-failed: 1# 发送消息的最大大小,单位字节,这里等于4Mmax-message-size: 999999999
注意事项
登录验证
为了防止被人恶意发生大量 WebSocket 连接,占用服务器资源,因此在建立连接的时候,需要进行登录验证,用户登录了才可以建立 WebSocket 连接。
由于建立 WebSocket 连接时,无法像之前的 http 请求一样在请求头携带 token 信息,因此之前网关实现的登录校验机制不生效,需要我们针对 WebSocket 连接额外实现一套登录验证方式。
假设前端发起 WebSocket 连接的代码如下:
new WebSocket("ws://localhost:7049/websocket/admin?token=dahidaho");
WebSocket 配置类
在modifyHandshake
中,将客户端发起连接请求时的 token 设置到属性中,这样后面就可以将 token 获取出来进行校验,如果说校验不通过,就关闭 WebSokcet
连接
token校验
代码位于WebSocketServer
类中,当调用validToken
校验失败之后,通过session.close
来关闭连接
/*** 校验 token 有效** @param token* @return*/
private boolean validToken(String token) {String userName = "";try {// 如果从 token 中解析用户名错误,说明 token 是捏造的,或者已经失效userName = JwtUtil.getUsername(token);} catch (Exception e) {return false;}if (StringUtils.hasText(userName) && StringUtils.hasText(token) &&(redisTemplate.opsForHash().get(RedisCacheConstant.USER_LOGIN_KEY + userName, token)) != null) {// --if-- 如果可以通过 token 从 Redis 中获取到用户的登录信息,说明通过校验return true;}return false;
}/*** 浏览器和服务端连接建立成功之后会调用这个方法*/
@OnOpen
public void onOpen(Session session, @PathParam("username") String username, EndpointConfig config) {// 校验 token 是否有效String token = (String) config.getUserProperties().get("token");boolean validToken = validToken(token);if (!validToken) {try {session.close(new CloseReason(CloseReason.CloseCodes.VIOLATED_POLICY, "无效的token,请先登录"));} catch (IOException e) {e.printStackTrace();}}// 如果用户已存在,关闭旧连接if (usernameAndSessionMap.containsKey(username)) {Session oldSession = usernameAndSessionMap.get(username);if (oldSession != null && oldSession.isOpen()) {try {oldSession.close();} catch (IOException e) {log.error("关闭旧连接时发生错误", e);}}}// 记录新连接usernameAndSessionMap.put(username, session);// 记录用户活动时间lastActivityTimeMap.put(username, System.currentTimeMillis());log.info("有新用户加入,username={}, 当前在线人数为:{}", username, usernameAndSessionMap.size());
}
分布式 WebSocket
由于我们的项目是分布式架构的,如果vrs-websocket
启动多个服务的话,需要处理如下问题:
WebSocketServer
中的用户名及其对应的session信息usernameAndSessionMap
是存储在本地的,假设发起连接的时候,session被存储在机器 1 上面。后续服务端要通知客户端时,怎么知道当前用户的信息是存储在机器1、机器 2 还是机器 3 呢?
由于 Session 无法直接序列化存储到 Redis 中,为了解决这个问题,本文通过借助消息队列来解决。
服务端要发送消息给客户端时,先将消息发送至消息队列中,消息设置为广播模式。后续多台部署了vrs-websocket
的机器去消息队列中获取消息来消费,如果机器检查到了这条消息的接收者 session 就在机器上,则执行发送,否则直接 return 即可。
【消息生产者】
package com.vrs.rocketMq.producer;import cn.hutool.core.util.StrUtil;
import com.vrs.constant.RocketMqConstant;
import com.vrs.domain.dto.mq.WebsocketMqDTO;
import com.vrs.templateMethod.AbstractCommonSendProduceTemplate;
import com.vrs.templateMethod.BaseSendExtendDTO;
import com.vrs.templateMethod.MessageWrapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import java.util.UUID;/*** websocket发送消息 生产者** @Author dam* @create 2024/9/20 16:00*/
@Slf4j
@Component
public class WebsocketSendMessageProducer extends AbstractCommonSendProduceTemplate<WebsocketMqDTO> {@Overrideprotected BaseSendExtendDTO buildBaseSendExtendParam(WebsocketMqDTO messageSendEvent) {return BaseSendExtendDTO.builder().eventName("执行时间段预定").topic(RocketMqConstant.VENUE_TOPIC).tag(RocketMqConstant.WEBSOCKET_SEND_MESSAGE_TAG).sentTimeout(2000L).build();}@Overrideprotected Message<?> buildMessage(WebsocketMqDTO messageSendEvent, BaseSendExtendDTO requestParam) {String keys = StrUtil.isEmpty(requestParam.getKeys()) ? UUID.randomUUID().toString() : requestParam.getKeys();return MessageBuilder.withPayload(new MessageWrapper(keys, messageSendEvent)).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, requestParam.getTag()).build();}
}
【消息消费者】
消费者的代码就在具体实现中,这里不重复放
【使用】
// 通过 websocket 发送消息,通知前端
websocketSendMessageProducer.sendMessage(WebsocketMqDTO.builder().toUsername(orderDO.getUserName()).message(JSON.toJSONString(orderDO)).build());
心跳检测
用户建立 WebSocket 连接之后的 session 数据是存储在服务器本地的,随着连接数量的增加,session会占用大量的内存,心跳检测是为了定期清理那些无效的连接。
在WebSocketServer
中,通过定时任务每30秒检查一次客户端的心跳状态,记录每个用户的最后活动时间。如果当前时间与某用户最后活动时间之差超过60秒,则认为该用户心跳超时,服务端将关闭其WebSocket连接并清理相关记录。客户端需定期向服务端发送"PING"消息以维持连接活跃,确保不会因超时而被服务端断开。
相关文章:

接口 V2 完善:分布式环境下的 WebSocket 实现与 Token 校验
🎯 本文档详细介绍了如何使用WebSocket协议优化客户端与服务端之间的通信,特别是在处理异步订单创建通知的场景中。通过引入WebSocket代替传统的HTTP请求-响应模式,实现了服务器主动向客户端推送数据的功能,极大地提高了实时性和效…...

微前端架构在前端开发中的实践与挑战
随着单页面应用(SPA)和前端框架如 React、Vue、Angular 的快速发展,现代前端应用的复杂度日益提升。尤其是当应用规模逐渐增大时,单一的代码库往往难以应对不同团队的协作和版本管理问题。为了应对这一挑战,微前端架构…...

【自学嵌入式(6)天气时钟:软硬件准备、串口模块开发】
天气时钟:软硬件准备、串口模块开发 软硬件准备接线及模块划分ESP8266开发板引脚图软件准备 串口模块编写串口介绍Serial库介绍 近期跟着网上一些教学视频,编写了一个天气时钟,本篇及往后数篇都将围绕天气时钟的制作过程展开。本文先解决硬件…...

macbook安装go语言
通过brew来安装go语言 使用brew命令时,一般都会通过brew search看看有哪些版本 brew search go执行后,返回了一堆内容,最下方展示 If you meant "go" specifically: It was migrated from homebrew/cask to homebrew/core. Cas…...

代码随想录算法训练营第三十八天-动态规划-完全背包-322. 零钱兑换
太难了 但听了前面再听这道题感觉递推公式也不是不难理解 动规五部曲 dp[j]代表装满容量为j(也就是目标值)的背包最少物品数量递推公式:dp[j] std::min(dp[j], dp[j - coins[i]] 1)当使用coins[i]这张纸币时,要向前找到容量为…...

小阿卡纳牌
小阿卡纳牌 风:热湿 火:热干 水:冷湿 土:冷干 火风:温度相同,但是湿度不同,二人可能会在短期内十分热情,但是等待热情消退之后,会趋于平淡。 湿度相同、温度不同&#x…...

DDD 和 TDD
领域驱动设计(DDD) DDD 是一种软件开发方法,强调通过与领域专家的密切合作来构建一个反映业务逻辑的模型。其核心思想是将业务逻辑和技术实现紧密结合,以便更好地解决复杂的业务问题。 DDD 的关键概念: 1. 领域模型 …...

Java学习教程,从入门到精通,JDBC插入记录语法及案例(104)
JDBC插入记录语法及案例 一、JDBC插入记录语法 在JDBC中,插入记录主要通过执行SQL的INSERT语句来实现。其基本语法如下: INSERT INTO 表名 (列1, 列2, ..., 列n) VALUES (值1, 值2, ..., 值n);表名:需要插入记录的表的名称。列1, 列2, …,…...

Linux文件基本操作
Linux 的设计哲学 在 Linux 中,一切皆文件! 什么是文件? 文件是具有永久存储性,按特定字节顺序组成的命名数据集 文件可分为:文本文件,二进制文件 文本文件:每个文件存放一个 ASCII 码 存储…...

React 路由导航与传参详解
随着单页面应用(SPA)已经成为主流。React 作为最流行的前端框架之一,提供了强大的路由管理工具 react-router-dom,帮助开发者轻松实现页面导航和传参。本文将详细介绍如何使用 react-router-dom 构建路由导航、传参以及嵌套路由的…...

C#面试常考随笔6:ArrayList和 List的主要区别?
在 C# 中,ArrayList和List<T>(泛型列表)都可用于存储一组对象。推荐优先使用List<T>,因为它具有更好的类型安全性、性能和语法简洁性,并且提供了更丰富的功能。只有在需要与旧代码兼容或存储不同类型对象的…...

C#分页思路:双列表数据组合返回设计思路
一、应用场景 需要分页查询(并非全表查载入物理内存再筛选),返回列表1和列表2叠加的数据时 二、实现方式 列表1必查,列表2根据列表1的查询结果决定列表2的分页查询参数 三、示意图及其实现代码 1.示意图 黄色代表list1的数据&a…...

中科大:LLM检索偏好优化应对RAG知识冲突
📖标题:RPO: Retrieval Preference Optimization for Robust Retrieval-Augmented Generation 🌐来源:arXiv, 2501.13726 🌟摘要 🔸虽然检索增强生成(RAG)在利用外部知识方面表现出…...

知识库管理系统提升企业知识价值与工作效率的实践路径分析
内容概要 知识库管理系统在企业发展中的重要性日益凸显,尤其是在信息爆炸的时代。现代企业需要有效地管理和利用自身知识资产,以提升整体效率和竞争力。本文旨在探讨知识库管理系统的应用实践,围绕其在信息整理、知识共享及决策支持等方面的…...

中文输入法方案
使用了三年的自然码双拼,毫无疑问是推荐使用双拼输入法。 三年积累下来的习惯是: 1 自然码方案 2 空格出字 字母选字 直到如今,想要做出改变,是因为这样的方案带来的痛点: 1 使用空格出字就无法使用辅助码&#…...

《AI芯片:如何让硬件与AI计算需求完美契合》
在人工智能飞速发展的今天,AI芯片已成为推动这一领域前行的关键力量。从智能语音助手到自动驾驶汽车,从图像识别技术到复杂的自然语言处理,AI芯片的身影无处不在。它就像是人工智能的“超级大脑”,以强大的计算能力支撑着各种复杂…...

AlertDialog组件的功能与用法
文章目录 概念介绍使用方法示例代码 我们在上一章回中介绍了Dismissible Widget相关的内容,本章回中将介绍AlertDialog Widget.闲话休提,让我们一起Talk Flutter吧。 概念介绍 我们介绍的AlertDialog是指程序中弹出的确认窗口,其实我们在上一章回中删除…...

【Python百日进阶-Web开发-FastAPI】Day813 - FastAPI 响应模型
文章目录 一、返回与输入相同的数据二、添加输出模型三、在文档中查看四、响应模型编码参数4.1 使用 response_model_exclude_unset 参数4.1.1 默认值字段有实际值的数据4.1.2 具有与默认值相同值的数据4.2 response_model_include 和 response_model_exclude4.2.1 使用 list 而…...

洛谷U525376 信号干扰 (判断多个区间是否有重叠)
U525376信号干扰 题目描述 有 n n n 座信号塔,第 i i i 座信号塔的信号将覆盖区间 [ l i , r i ] [l_i,r_i] [li,ri]。 若某个点被超过一座信号塔的信号覆盖,则在该点会产生信号干扰。 对于信号塔区间 [ a , b ] [a,b] [a,b],若建…...

ESP32-S3模组上跑通esp32-camera(35)
接前一篇文章:ESP32-S3模组上跑通esp32-camera(34) 一、OV5640初始化 2. 相机初始化及图像传感器配置 上一回继续对reset函数的后一段代码进行解析。为了便于理解和回顾,再次贴出reset函数源码,在components\esp32-camera\sensors\ov5640.c中,如下: static int reset…...

Java进阶(二):Java设计模式
目录 设计模式 一.建模语言 二.类之间的关系 1.依赖关系 2.关联关系 3.聚合关系 4.组合关系 5.继承关系 6.实现关系 三.面向对象设计原则 单一职责原则 开闭原则 里氏替换原则 依赖倒置 接口隔离原则 迪米特原则 组合/聚合(关联关系)复用原则 四.23种设计模式…...

DeepSeek R1:中国AI黑马的崛起与挑战
文章目录 技术突破:从零开始的推理能力进化DeepSeek R1-Zero:纯RL训练的“自我觉醒”DeepSeek R1:冷启动与多阶段训练的平衡之道 实验验证:推理能力的全方位跃升基准测试:超越顶尖闭源模型蒸馏技术:小模型的…...

抗体人源化服务如何优化药物的分子结构【卡梅德生物】
抗体药物作为一种重要的生物制药产品,已在癌症、免疫疾病、传染病等领域展现出巨大的治疗潜力。然而,传统的抗体药物常常面临免疫原性高、稳定性差以及治疗靶向性不足等问题,这限制了其在临床应用中的效果和广泛性。为了克服这些问题…...

AndroidCompose Navigation导航精通2-过渡动画与路由切换
目录 前言路由切换NavControllerBackStackEntry过渡动画过渡原理缩放动画渐隐动画滑动动画动画过渡实战前言 在当今的移动应用开发中,导航是用户与应用交互的核心环节。随着 Android Compose 的兴起,它为开发者提供了一种全新的、声明式的方式来构建用户界面,同时也带来了更…...

基于微信小程序的社团活动助手php+论文源码调试讲解
4 系统设计 微信小程序社团微信小程序的设计方案比如功能框架的设计,比如数据库的设计的好坏也就决定了该系统在开发层面是否高效,以及在系统维护层面是否容易维护和升级,因为在系统实现阶段是需要考虑用户的所有需求,要是在设计…...

WebSocket 详解:全双工通信的实现与应用
目录 一、什么是 WebSocket?(简介) 二、为什么需要 WebSocket? 三、HTTP 与 WebSocket 的区别 WebSocket 的劣势 WebSocket 的常见应用场景 WebSocket 握手过程 WebSocket 事件处理和生命周期 一、什么是 WebSocket…...

漏洞修复:Apache Tomcat 安全漏洞(CVE-2024-50379) | Apache Tomcat 安全漏洞(CVE-2024-52318)
文章目录 引言I Apache Tomcat 安全漏洞(CVE-2024-50379)漏洞描述修复建议升级Tomcat教程II Apache Tomcat 安全漏洞(CVE-2024-52318)漏洞描述修复建议III 安全警告引言 解决方案:升级到最新版Tomcat https://blog.csdn.net/z929118967/article/details/142934649 service in…...

智慧园区系统分类及其在提升企业管理效率中的创新应用探讨
内容概要 智慧园区的概念已经逐渐深入人心,成为现代城市发展中不可或缺的一部分。随着信息技术的飞速发展和数字化转型的不断推进,一系列智慧园区管理系统应运而生。这些系统不仅帮助企业提高了管理效率,还在多个方面激发了创新。 首先&…...

29. 【.NET 8 实战--孢子记账--从单体到微服务】--项目发布
这是本专栏最后一篇文章了,在这片文章里我们不重点讲解如何配置服务器,重点讲如何发布服务,我们开始吧。 一、服务器配置 服务器配置包含:服务器的选择和项目运行环境的配置,下面我们分别来讲解一下。 在服务器选择上…...

Langchain+讯飞星火大模型Spark Max调用
1、安装langchain #安装langchain环境 pip install langchain0.3.3 openai -i https://mirrors.aliyun.com/pypi/simple #灵积模型服务 pip install dashscope -i https://mirrors.aliyun.com/pypi/simple #安装第三方集成,就是各种大语言模型 pip install langchain-comm…...