开源模型应用落地-业务优化篇(三)
一、前言
假如您跟随我的脚步,学习到上一篇的内容,到这里,相信细心的您,已经发现了,在上一篇中遗留的问题。那就是IM服务过载的时候,如何进行水平扩容?
因为在每个IM服务中,我们用JVM缓存了用户与WS的通道的绑定关系,并且使用Redis队列进行解耦。那扩展了IM服务实例之后,如何确保Redis队列的消息能正常消费,即如何能找回对应的用户通道?别着急,接下来,我将给您做详细的解释。
二、术语
2.1.水平扩容
是指通过增加系统中的资源实例数量来提高系统的处理能力和吞吐量。在计算机领域,水平扩容通常用于应对系统负载的增加或需要处理更多请求的情况。
2.2.无状态
无状态(stateless)是指系统或组件在处理请求时不依赖于之前的请求或会话信息。换句话说,每个请求都是独立的,系统不会在不同的请求之间保存任何状态或上下文信息。
在无状态系统中,每个请求被视为一个独立的事件,系统只关注当前请求所包含的信息和参数,而不依赖于之前的请求历史。这使得系统更加简单、可伸缩和易于管理。
三、前置条件
3.1. 已经完成前两篇的学习
四、技术实现
4.1、实现思路
首先,IM服务是有状态的(AI服务是无状态),每个实例中,会缓存用户与WebSocket通道之间的信息。那是否可以采用中间共享存储的方式,将状态信息保存至Redis或外部存储中?答案是:不行。WebScoket的通道信息,无法进行序列化。
要实现IM服务水平扩容的方式有多种,但目前我们采用以下的方案:

- 每个IM服务保存对应的用户和WS通道的关系;
- 每个IM服务对应唯一一个redis队列;
- 前置的SLB(App入口)能根据用户标识(哈希)将请求转发至指定的IM服务;
- 当某一IM服务出现故障的时候,由App端发起重连,重新建立WebSocket连接。
4.2、调整配置文件
# 每个IM服务实例指定全局唯一的ID,例如:下面指定的node:0
ws:server:node: 0
PS:具体参数可以在外部指定,作为JVM的运行参数传入
4.3、调整业务逻辑处理类
# 将原有Redis的单一队列名,改为拼接上全局唯一ID的方式

# Redis中缓存的数据如下

4.4、调整任务处理类
# 将原有Redis的单一队列名,改为拼接上全局唯一ID的方式

五、测试
# 这次换一下测试方式,用离线页面的方式进行测试
5.1. 建立连接

5.2. 业务初始化

5.3. 业务对话

六、附带说明
6.1. BusinessHandler完整代码
import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandler;
import lombok.extern.slf4j.Slf4j;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;/*** @Description: 处理消息的handler*/
@Slf4j
@ChannelHandler.Sharable
@Component
public class BusinessHandler extends AbstractBusinessLogicHandler<TextWebSocketFrame> {public static final String LINE_UP_QUEUE_NAME = "AI-REQ-QUEUE";private static final String LINE_UP_LOCK_NAME = "AI-REQ-LOCK";private static final int MAX_QUEUE_SIZE = 100;// @Autowired
// private TaskUtils taskExecuteUtils;@Autowiredprivate RedisUtils redisUtils;@Autowiredprivate RedissonClient redissonClient;@Autowiredprivate NettyConfig nettyConfig;@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {String channelId = ctx.channel().id().asShortText();log.info("add client,channelId:{}", channelId);}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {String channelId = ctx.channel().id().asShortText();log.info("remove client,channelId:{}", channelId);}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame)throws Exception {// 获取客户端传输过来的消息String content = textWebSocketFrame.text();// 兼容在线测试if (StringUtils.equals(content, "PING")) {buildResponse(channelHandlerContext, ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode())).respTime(String.valueOf(System.currentTimeMillis())).msgType(String.valueOf(MsgType.HEARTBEAT.getCode())).contents("心跳测试,很高兴收到你的心跳包").build());return;}log.info("接收到客户端发送的信息: {}", content);Long userIdForReq;String msgType = "";String contents = "";try {ApiReqMessage apiReqMessage = JSON.parseObject(content, ApiReqMessage.class);msgType = apiReqMessage.getMsgType();contents = apiReqMessage.getContents();userIdForReq = apiReqMessage.getUserId();// 用户身份标识校验if (null == userIdForReq || (long) userIdForReq <= 10000) {ApiRespMessage apiRespMessage = ApiRespMessage.builder().code(String.valueOf(StatusCode.SYSTEM_ERROR.getCode())).respTime(String.valueOf(System.currentTimeMillis())).contents("用户身份标识有误!").msgType(String.valueOf(MsgType.SYSTEM.getCode())).build();buildResponseAndClose(channelHandlerContext, apiRespMessage);return;}if (StringUtils.equals(msgType, String.valueOf(MsgType.CHAT.getCode()))) {// 对用户输入的内容进行自定义违规词检测// 对用户输入的内容进行第三方在线违规词检测// 对用户输入的内容进行组装成Prompt// 对Prompt根据业务进行增强(完善prompt的内容)// 对history进行裁剪或总结(检测history是否操作模型支持的上下文长度,例如qwen-7b支持的上下文长度为8192)// ...// 通过线程池来处理
// String messageId = apiReqMessage.getMessageId();
// List<ChatContext> history = apiReqMessage.getHistory();
// AITaskReqMessage aiTaskReqMessage = AITaskReqMessage.builder().messageId(messageId).userId(userIdForReq).contents(contents).history(history).build();
// taskExecuteUtils.execute(aiTaskReqMessage);// 通过队列来缓冲boolean flag = true;RLock lock = redissonClient.getLock(LINE_UP_LOCK_NAME);String queueName = LINE_UP_QUEUE_NAME+"-"+nettyConfig.getNode();//尝试获取锁,最多等待3秒,锁的自动释放时间为10秒if (lock.tryLock(3, 10, TimeUnit.SECONDS)) {try {if (redisUtils.queueSize(queueName) < MAX_QUEUE_SIZE) {redisUtils.queueAdd(queueName, content);log.info("当前线程为:{}, 添加请求至redis队列",Thread.currentThread().getName());} else {flag = false;}} catch (Throwable e) {log.error("系统处理异常", e);} finally {lock.unlock();}} else {flag = false;}if (!flag) {buildResponse(channelHandlerContext, ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode())).respTime(String.valueOf(System.currentTimeMillis())).msgType(String.valueOf(MsgType.SYSTEM.getCode())).contents("当前排队人数较多,请稍后再重试!").build());}} else if (StringUtils.equals(msgType, String.valueOf(MsgType.INIT.getCode()))) {//一、业务黑名单检测(多次违规,永久锁定)//二、账户锁定检测(临时锁定)//三、多设备登录检测//四、剩余对话次数检测//检测通过,绑定用户与channel之间关系addChannel(channelHandlerContext, userIdForReq);String respMessage = "用户标识: " + userIdForReq + " 登录成功";buildResponse(channelHandlerContext, ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode())).respTime(String.valueOf(System.currentTimeMillis())).msgType(String.valueOf(MsgType.INIT.getCode())).contents(respMessage).build());} else if (StringUtils.equals(msgType, String.valueOf(MsgType.HEARTBEAT.getCode()))) {buildResponse(channelHandlerContext, ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode())).respTime(String.valueOf(System.currentTimeMillis())).msgType(String.valueOf(MsgType.HEARTBEAT.getCode())).contents("心跳测试,很高兴收到你的心跳包").build());} else {log.info("用户标识: {}, 消息类型有误,不支持类型: {}", userIdForReq, msgType);}} catch (Exception e) {log.warn("【BusinessHandler】接收到请求内容:{},异常信息:{}", content, e.getMessage(), e);// 异常返回return;}}}
6.2. TaskUtils完整代码
import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;@Component
@Slf4j
public class TaskUtils implements ApplicationRunner {private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);@Autowiredprivate AIChatUtils aiChatUtils;@Autowiredprivate RedisUtils redisUtils;@Autowiredprivate NettyConfig nettyConfig;@Overridepublic void run(ApplicationArguments args) throws Exception {while(true){String queueName = BusinessHandler.LINE_UP_QUEUE_NAME+"-"+nettyConfig.getNode();
// 执行定时任务的逻辑String content = redisUtils.queuePoll(queueName);if(StringUtils.isNotEmpty(content) && StringUtils.isNoneBlank(content)){try{ApiReqMessage apiReqMessage = JSON.parseObject(content, ApiReqMessage.class);String messageId = apiReqMessage.getMessageId();String contents = apiReqMessage.getContents();Long userIdForReq = apiReqMessage.getUserId();List<ChatContext> history = apiReqMessage.getHistory();AITaskReqMessage aiTaskReqMessage = AITaskReqMessage.builder().messageId(messageId).userId(userIdForReq).contents(contents).history(history).build();execute(aiTaskReqMessage);}catch (Throwable e){log.error("处理消息出现异常",e);//将请求再次返回去队列//将请求丢弃//其他处理?}}else{TimeUnit.SECONDS.sleep(1);}}}public void execute(AITaskReqMessage aiTaskReqMessage) {executorService.execute(() -> {Long userId = aiTaskReqMessage.getUserId();if (null == userId || (long) userId < 10000) {log.warn("用户身份标识有误!");return;}ChannelHandlerContext channelHandlerContext = BusinessHandler.getContextByUserId(userId);if (channelHandlerContext != null) {try {aiChatUtils.chatStream(aiTaskReqMessage);} catch (Throwable exception) {exception.printStackTrace();}}});}public static void destory() {executorService.shutdownNow();executorService = null;}}
相关文章:
开源模型应用落地-业务优化篇(三)
一、前言 假如您跟随我的脚步,学习到上一篇的内容,到这里,相信细心的您,已经发现了,在上一篇中遗留的问题。那就是IM服务过载的时候,如何进行水平扩容? 因为在每个IM服务中,我们用JV…...
基于SpringBoot+Vue实现的物流快递仓库管理系统
基于SpringBootVue实现的物流快递仓库管理系统 文章目录 基于SpringBootVue实现的物流快递仓库管理系统系统介绍技术选型成果展示账号地址及其他说明源码获取 系统介绍 系统演示 关注视频号【全栈小白】,观看演示视频 基于SpringBootVue实现的物流快递仓库管理系…...
编程笔记 html5cssjs 072 JavaScrip BigInt数据类型
编程笔记 html5&css&js 072 JavaScrip BigInt数据类型 一、BigInt 数据类型二、BigInt 的创建和使用三、BigInt 操作与方法三、示例小结 JavaScript BigInt 数据类型是一种内置的数据类型,用于表示大于 Number.MAX_SAFE_INTEGER(即2^53 - 1&…...
matlab simulink 步进电机控制
1、内容简介 略 41-可以交流、咨询、答疑 2、内容说明 电动执行器定位控制在生产生活中具有广泛的应用,在使用搭载步进电机的电动执行器进行定位控制的时候,定位系统的定位精度和响应波形,会随着负载质量的变化而变化,这是由电…...
使用阿里云的IDaaS实现知行之桥EDI系统的单点登录
,在开始测试之前,需要确定用哪个信息作为“登陆用户的ID字段”。 这个字段用来在完成SSO登陆之后,用哪个信息将阿里云IDaaS的用户和知行之桥EDI系统的用户做对应。这里我们使用了 phonenumber 这个自定义属性。需要在阿里云做如下配置&#x…...
基于微服务的高考志愿智能辅助决策系统(附源码)
目录 一.引言 1、编写目的 2、系统功能概述 二.功能分析 三.微服务模块 1、微服务用户相关模块 (1)用户注册 (2)用户登录 (3)用户信息管理 (4)用户操作 2、微服务文件云存…...
LeetCode —— 137. 只出现一次的数字 II
😶🌫️😶🌫️😶🌫️😶🌫️Take your time ! 😶🌫️😶🌫️😶🌫️😶🌫️…...
pnpm、npm、yarn 包管理工具
1、npm 关键词:软件包管理器、命令行工具、一个社区和一个平台 npm(Node Package Manager)是一个用于Node.js环境的软件包管理器。它是一个命令行工具,用于安装、升级、删除和管理JavaScript软件包。npm最初是随同Node.js一起发布…...
微服务知识
1、概念 大型单体应用拆分成多个独立部署运行的微服务(解决并发问题) 2、特点 3、技术栈 4、微服务带来的问题 5、微服务的注册中心 服务注册与发现:微服务实例在启动时会向注册中心注册自己的信息…...
如何在微信搭建私域流量池?
A: ①给客户打标签 添加标签,多维度构建用户画像,精准发送消息。 ②群发信息 选择自定义时间,上传内容 (支持文字,图片) ,一键群发 。 ③建立专属素材库 将常用的话术、图片与文件录入至素材库,员工可随…...
MySQL原理(三)锁定机制(1)综述
一、介绍: 1、锁的本质 业务场景中存在共享资源,多个进程或线程需要竞争获取并处理共享资源,为了保证公平、可靠、结果正确等业务逻辑,要把并发执行的问题变为串行,串行时引入第三方锁当成谁有权限来操作共享资源的判…...
Qt知识点总结
将枚举类型转换为字符串 这里使用的在网络编程中,获取socket状态并显示的时候,遇到的一个问题 #include <QMetaEnum>// 将枚举类型转换为字符串 QMetaEnum metaEnum QMetaEnum::fromType<QAbstractSocket::SocketState>(); const char *c…...
什么是系统工程(字幕)13
0 00:00:00,670 --> 00:00:01,582 如果不加图 1 00:00:01,582 --> 00:00:02,130 怎么加 2 00:00:02,130 --> 00:00:03,225 我们来看一下 3 00:00:03,225 --> 00:00:03,590 你看 4 00:00:03,980 --> 00:00:06,720 右键点这个,添加元素 5 00:00:0…...
qt学习:Table widget控件
目录 头文件 实战 重新配置ui界面 添加头文件 在构造函数中添加初始化 显示方法 该实例是在sqlite项目上添加qt学习:QTSQL连接sqlite数据库增删改查-CSDN博客 头文件 #include <QTableWidgetItem> 实战 重新配置ui界面 用法介绍,可以双击…...
Android --- Content Provider是使用示例,通俗易懂
当两个应用程序之间需要共享数据时,可以通过 Content Provider 来实现。在这个示例中,我们将创建一个简单的 Content Provider,让 App_B 暴露人口总数的数据,并由 App_A 来获取这个数据。 首先,我们来创建一个简单的示…...
02-opencv简单实例效果和基本介绍-上
机器视觉概述 机器视觉是人工智能正在快速发展的一个分支。简单说来,机器视觉就是用机器代替人眼来做测量和判断。机器视觉系统是通过机器视觉产品(即图像摄取装置,分CMOS和CCD两种)将被摄取目标转换成图像信号,传送给专用的图像处理系统,得到被摄目标的形态信息,根据像素…...
中科大计网学习记录笔记(一):Internet | 网络边缘
计算机网络 前言: 学习视频:中科大郑烇、杨坚全套《计算机网络(自顶向下方法 第7版,James F.Kurose,Keith W.Ross)》课程 该视频是B站非常著名的计网学习视频,但相信很多朋友和我一样在听完前面…...
Shell脚本——免交互
目录 一、Here Document免交互 1、免交互概述 2、语法格式 2.1示例:免交互方式实现对行数的统计,将要统计的内容置于标记EOF之间,直接将内容传给wc-l来统计 3、变量设定 ①变量图换成实际值 ②整行内容作为变量并输出结果 ③使输出内…...
【数据分享】1929-2023年全球站点的逐月最高气温数据(Shp\Excel\无需转发)
气象数据是在各项研究中都经常使用的数据,气象指标包括气温、风速、降水、湿度等指标,其中又以气温指标最为常用!说到气温数据,最详细的气温数据是具体到气象监测站点的气温数据! 之前我们分享过1929-2023年全球气象站…...
CentOS gui 图形界面显示文字乱码
一、现象 CentOS(CentOS 7.5)控制台下显示中文乱码: 或者通过X11 Forwarding远程显示CentOS的图形化程序文字乱码: 二、解决方法 安装中文语言包: yum install kde-l10n-Chinese 注:网上有些文章会推荐安…...
挑战杯推荐项目
“人工智能”创意赛 - 智能艺术创作助手:借助大模型技术,开发能根据用户输入的主题、风格等要求,生成绘画、音乐、文学作品等多种形式艺术创作灵感或初稿的应用,帮助艺术家和创意爱好者激发创意、提高创作效率。 - 个性化梦境…...
Java 语言特性(面试系列2)
一、SQL 基础 1. 复杂查询 (1)连接查询(JOIN) 内连接(INNER JOIN):返回两表匹配的记录。 SELECT e.name, d.dept_name FROM employees e INNER JOIN departments d ON e.dept_id d.dept_id; 左…...
Cursor实现用excel数据填充word模版的方法
cursor主页:https://www.cursor.com/ 任务目标:把excel格式的数据里的单元格,按照某一个固定模版填充到word中 文章目录 注意事项逐步生成程序1. 确定格式2. 调试程序 注意事项 直接给一个excel文件和最终呈现的word文件的示例,…...
Vue3 + Element Plus + TypeScript中el-transfer穿梭框组件使用详解及示例
使用详解 Element Plus 的 el-transfer 组件是一个强大的穿梭框组件,常用于在两个集合之间进行数据转移,如权限分配、数据选择等场景。下面我将详细介绍其用法并提供一个完整示例。 核心特性与用法 基本属性 v-model:绑定右侧列表的值&…...
visual studio 2022更改主题为深色
visual studio 2022更改主题为深色 点击visual studio 上方的 工具-> 选项 在选项窗口中,选择 环境 -> 常规 ,将其中的颜色主题改成深色 点击确定,更改完成...
Python爬虫(二):爬虫完整流程
爬虫完整流程详解(7大核心步骤实战技巧) 一、爬虫完整工作流程 以下是爬虫开发的完整流程,我将结合具体技术点和实战经验展开说明: 1. 目标分析与前期准备 网站技术分析: 使用浏览器开发者工具(F12&…...
WEB3全栈开发——面试专业技能点P2智能合约开发(Solidity)
一、Solidity合约开发 下面是 Solidity 合约开发 的概念、代码示例及讲解,适合用作学习或写简历项目背景说明。 🧠 一、概念简介:Solidity 合约开发 Solidity 是一种专门为 以太坊(Ethereum)平台编写智能合约的高级编…...
【C++从零实现Json-Rpc框架】第六弹 —— 服务端模块划分
一、项目背景回顾 前五弹完成了Json-Rpc协议解析、请求处理、客户端调用等基础模块搭建。 本弹重点聚焦于服务端的模块划分与架构设计,提升代码结构的可维护性与扩展性。 二、服务端模块设计目标 高内聚低耦合:各模块职责清晰,便于独立开发…...
使用Matplotlib创建炫酷的3D散点图:数据可视化的新维度
文章目录 基础实现代码代码解析进阶技巧1. 自定义点的大小和颜色2. 添加图例和样式美化3. 真实数据应用示例实用技巧与注意事项完整示例(带样式)应用场景在数据科学和可视化领域,三维图形能为我们提供更丰富的数据洞察。本文将手把手教你如何使用Python的Matplotlib库创建引…...
用机器学习破解新能源领域的“弃风”难题
音乐发烧友深有体会,玩音乐的本质就是玩电网。火电声音偏暖,水电偏冷,风电偏空旷。至于太阳能发的电,则略显朦胧和单薄。 不知你是否有感觉,近两年家里的音响声音越来越冷,听起来越来越单薄? —…...
