开源模型应用落地-业务优化篇(三)
一、前言
假如您跟随我的脚步,学习到上一篇的内容,到这里,相信细心的您,已经发现了,在上一篇中遗留的问题。那就是IM服务过载的时候,如何进行水平扩容?
因为在每个IM服务中,我们用JVM缓存了用户与WS的通道的绑定关系,并且使用Redis队列进行解耦。那扩展了IM服务实例之后,如何确保Redis队列的消息能正常消费,即如何能找回对应的用户通道?别着急,接下来,我将给您做详细的解释。
二、术语
2.1.水平扩容
是指通过增加系统中的资源实例数量来提高系统的处理能力和吞吐量。在计算机领域,水平扩容通常用于应对系统负载的增加或需要处理更多请求的情况。
2.2.无状态
无状态(stateless)是指系统或组件在处理请求时不依赖于之前的请求或会话信息。换句话说,每个请求都是独立的,系统不会在不同的请求之间保存任何状态或上下文信息。
在无状态系统中,每个请求被视为一个独立的事件,系统只关注当前请求所包含的信息和参数,而不依赖于之前的请求历史。这使得系统更加简单、可伸缩和易于管理。
三、前置条件
3.1. 已经完成前两篇的学习
四、技术实现
4.1、实现思路
首先,IM服务是有状态的(AI服务是无状态),每个实例中,会缓存用户与WebSocket通道之间的信息。那是否可以采用中间共享存储的方式,将状态信息保存至Redis或外部存储中?答案是:不行。WebScoket的通道信息,无法进行序列化。
要实现IM服务水平扩容的方式有多种,但目前我们采用以下的方案:
- 每个IM服务保存对应的用户和WS通道的关系;
- 每个IM服务对应唯一一个redis队列;
- 前置的SLB(App入口)能根据用户标识(哈希)将请求转发至指定的IM服务;
- 当某一IM服务出现故障的时候,由App端发起重连,重新建立WebSocket连接。
4.2、调整配置文件
# 每个IM服务实例指定全局唯一的ID,例如:下面指定的node:0
ws:server:node: 0
PS:具体参数可以在外部指定,作为JVM的运行参数传入
4.3、调整业务逻辑处理类
# 将原有Redis的单一队列名,改为拼接上全局唯一ID的方式
# Redis中缓存的数据如下
4.4、调整任务处理类
# 将原有Redis的单一队列名,改为拼接上全局唯一ID的方式
五、测试
# 这次换一下测试方式,用离线页面的方式进行测试
5.1. 建立连接
5.2. 业务初始化
5.3. 业务对话
六、附带说明
6.1. BusinessHandler完整代码
import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandler;
import lombok.extern.slf4j.Slf4j;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;/*** @Description: 处理消息的handler*/
@Slf4j
@ChannelHandler.Sharable
@Component
public class BusinessHandler extends AbstractBusinessLogicHandler<TextWebSocketFrame> {public static final String LINE_UP_QUEUE_NAME = "AI-REQ-QUEUE";private static final String LINE_UP_LOCK_NAME = "AI-REQ-LOCK";private static final int MAX_QUEUE_SIZE = 100;// @Autowired
// private TaskUtils taskExecuteUtils;@Autowiredprivate RedisUtils redisUtils;@Autowiredprivate RedissonClient redissonClient;@Autowiredprivate NettyConfig nettyConfig;@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {String channelId = ctx.channel().id().asShortText();log.info("add client,channelId:{}", channelId);}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {String channelId = ctx.channel().id().asShortText();log.info("remove client,channelId:{}", channelId);}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame)throws Exception {// 获取客户端传输过来的消息String content = textWebSocketFrame.text();// 兼容在线测试if (StringUtils.equals(content, "PING")) {buildResponse(channelHandlerContext, ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode())).respTime(String.valueOf(System.currentTimeMillis())).msgType(String.valueOf(MsgType.HEARTBEAT.getCode())).contents("心跳测试,很高兴收到你的心跳包").build());return;}log.info("接收到客户端发送的信息: {}", content);Long userIdForReq;String msgType = "";String contents = "";try {ApiReqMessage apiReqMessage = JSON.parseObject(content, ApiReqMessage.class);msgType = apiReqMessage.getMsgType();contents = apiReqMessage.getContents();userIdForReq = apiReqMessage.getUserId();// 用户身份标识校验if (null == userIdForReq || (long) userIdForReq <= 10000) {ApiRespMessage apiRespMessage = ApiRespMessage.builder().code(String.valueOf(StatusCode.SYSTEM_ERROR.getCode())).respTime(String.valueOf(System.currentTimeMillis())).contents("用户身份标识有误!").msgType(String.valueOf(MsgType.SYSTEM.getCode())).build();buildResponseAndClose(channelHandlerContext, apiRespMessage);return;}if (StringUtils.equals(msgType, String.valueOf(MsgType.CHAT.getCode()))) {// 对用户输入的内容进行自定义违规词检测// 对用户输入的内容进行第三方在线违规词检测// 对用户输入的内容进行组装成Prompt// 对Prompt根据业务进行增强(完善prompt的内容)// 对history进行裁剪或总结(检测history是否操作模型支持的上下文长度,例如qwen-7b支持的上下文长度为8192)// ...// 通过线程池来处理
// String messageId = apiReqMessage.getMessageId();
// List<ChatContext> history = apiReqMessage.getHistory();
// AITaskReqMessage aiTaskReqMessage = AITaskReqMessage.builder().messageId(messageId).userId(userIdForReq).contents(contents).history(history).build();
// taskExecuteUtils.execute(aiTaskReqMessage);// 通过队列来缓冲boolean flag = true;RLock lock = redissonClient.getLock(LINE_UP_LOCK_NAME);String queueName = LINE_UP_QUEUE_NAME+"-"+nettyConfig.getNode();//尝试获取锁,最多等待3秒,锁的自动释放时间为10秒if (lock.tryLock(3, 10, TimeUnit.SECONDS)) {try {if (redisUtils.queueSize(queueName) < MAX_QUEUE_SIZE) {redisUtils.queueAdd(queueName, content);log.info("当前线程为:{}, 添加请求至redis队列",Thread.currentThread().getName());} else {flag = false;}} catch (Throwable e) {log.error("系统处理异常", e);} finally {lock.unlock();}} else {flag = false;}if (!flag) {buildResponse(channelHandlerContext, ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode())).respTime(String.valueOf(System.currentTimeMillis())).msgType(String.valueOf(MsgType.SYSTEM.getCode())).contents("当前排队人数较多,请稍后再重试!").build());}} else if (StringUtils.equals(msgType, String.valueOf(MsgType.INIT.getCode()))) {//一、业务黑名单检测(多次违规,永久锁定)//二、账户锁定检测(临时锁定)//三、多设备登录检测//四、剩余对话次数检测//检测通过,绑定用户与channel之间关系addChannel(channelHandlerContext, userIdForReq);String respMessage = "用户标识: " + userIdForReq + " 登录成功";buildResponse(channelHandlerContext, ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode())).respTime(String.valueOf(System.currentTimeMillis())).msgType(String.valueOf(MsgType.INIT.getCode())).contents(respMessage).build());} else if (StringUtils.equals(msgType, String.valueOf(MsgType.HEARTBEAT.getCode()))) {buildResponse(channelHandlerContext, ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode())).respTime(String.valueOf(System.currentTimeMillis())).msgType(String.valueOf(MsgType.HEARTBEAT.getCode())).contents("心跳测试,很高兴收到你的心跳包").build());} else {log.info("用户标识: {}, 消息类型有误,不支持类型: {}", userIdForReq, msgType);}} catch (Exception e) {log.warn("【BusinessHandler】接收到请求内容:{},异常信息:{}", content, e.getMessage(), e);// 异常返回return;}}}
6.2. TaskUtils完整代码
import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;@Component
@Slf4j
public class TaskUtils implements ApplicationRunner {private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);@Autowiredprivate AIChatUtils aiChatUtils;@Autowiredprivate RedisUtils redisUtils;@Autowiredprivate NettyConfig nettyConfig;@Overridepublic void run(ApplicationArguments args) throws Exception {while(true){String queueName = BusinessHandler.LINE_UP_QUEUE_NAME+"-"+nettyConfig.getNode();
// 执行定时任务的逻辑String content = redisUtils.queuePoll(queueName);if(StringUtils.isNotEmpty(content) && StringUtils.isNoneBlank(content)){try{ApiReqMessage apiReqMessage = JSON.parseObject(content, ApiReqMessage.class);String messageId = apiReqMessage.getMessageId();String contents = apiReqMessage.getContents();Long userIdForReq = apiReqMessage.getUserId();List<ChatContext> history = apiReqMessage.getHistory();AITaskReqMessage aiTaskReqMessage = AITaskReqMessage.builder().messageId(messageId).userId(userIdForReq).contents(contents).history(history).build();execute(aiTaskReqMessage);}catch (Throwable e){log.error("处理消息出现异常",e);//将请求再次返回去队列//将请求丢弃//其他处理?}}else{TimeUnit.SECONDS.sleep(1);}}}public void execute(AITaskReqMessage aiTaskReqMessage) {executorService.execute(() -> {Long userId = aiTaskReqMessage.getUserId();if (null == userId || (long) userId < 10000) {log.warn("用户身份标识有误!");return;}ChannelHandlerContext channelHandlerContext = BusinessHandler.getContextByUserId(userId);if (channelHandlerContext != null) {try {aiChatUtils.chatStream(aiTaskReqMessage);} catch (Throwable exception) {exception.printStackTrace();}}});}public static void destory() {executorService.shutdownNow();executorService = null;}}
相关文章:

开源模型应用落地-业务优化篇(三)
一、前言 假如您跟随我的脚步,学习到上一篇的内容,到这里,相信细心的您,已经发现了,在上一篇中遗留的问题。那就是IM服务过载的时候,如何进行水平扩容? 因为在每个IM服务中,我们用JV…...

基于SpringBoot+Vue实现的物流快递仓库管理系统
基于SpringBootVue实现的物流快递仓库管理系统 文章目录 基于SpringBootVue实现的物流快递仓库管理系统系统介绍技术选型成果展示账号地址及其他说明源码获取 系统介绍 系统演示 关注视频号【全栈小白】,观看演示视频 基于SpringBootVue实现的物流快递仓库管理系…...
编程笔记 html5cssjs 072 JavaScrip BigInt数据类型
编程笔记 html5&css&js 072 JavaScrip BigInt数据类型 一、BigInt 数据类型二、BigInt 的创建和使用三、BigInt 操作与方法三、示例小结 JavaScript BigInt 数据类型是一种内置的数据类型,用于表示大于 Number.MAX_SAFE_INTEGER(即2^53 - 1&…...

matlab simulink 步进电机控制
1、内容简介 略 41-可以交流、咨询、答疑 2、内容说明 电动执行器定位控制在生产生活中具有广泛的应用,在使用搭载步进电机的电动执行器进行定位控制的时候,定位系统的定位精度和响应波形,会随着负载质量的变化而变化,这是由电…...

使用阿里云的IDaaS实现知行之桥EDI系统的单点登录
,在开始测试之前,需要确定用哪个信息作为“登陆用户的ID字段”。 这个字段用来在完成SSO登陆之后,用哪个信息将阿里云IDaaS的用户和知行之桥EDI系统的用户做对应。这里我们使用了 phonenumber 这个自定义属性。需要在阿里云做如下配置&#x…...

基于微服务的高考志愿智能辅助决策系统(附源码)
目录 一.引言 1、编写目的 2、系统功能概述 二.功能分析 三.微服务模块 1、微服务用户相关模块 (1)用户注册 (2)用户登录 (3)用户信息管理 (4)用户操作 2、微服务文件云存…...

LeetCode —— 137. 只出现一次的数字 II
😶🌫️😶🌫️😶🌫️😶🌫️Take your time ! 😶🌫️😶🌫️😶🌫️😶🌫️…...
pnpm、npm、yarn 包管理工具
1、npm 关键词:软件包管理器、命令行工具、一个社区和一个平台 npm(Node Package Manager)是一个用于Node.js环境的软件包管理器。它是一个命令行工具,用于安装、升级、删除和管理JavaScript软件包。npm最初是随同Node.js一起发布…...

微服务知识
1、概念 大型单体应用拆分成多个独立部署运行的微服务(解决并发问题) 2、特点 3、技术栈 4、微服务带来的问题 5、微服务的注册中心 服务注册与发现:微服务实例在启动时会向注册中心注册自己的信息…...

如何在微信搭建私域流量池?
A: ①给客户打标签 添加标签,多维度构建用户画像,精准发送消息。 ②群发信息 选择自定义时间,上传内容 (支持文字,图片) ,一键群发 。 ③建立专属素材库 将常用的话术、图片与文件录入至素材库,员工可随…...

MySQL原理(三)锁定机制(1)综述
一、介绍: 1、锁的本质 业务场景中存在共享资源,多个进程或线程需要竞争获取并处理共享资源,为了保证公平、可靠、结果正确等业务逻辑,要把并发执行的问题变为串行,串行时引入第三方锁当成谁有权限来操作共享资源的判…...
Qt知识点总结
将枚举类型转换为字符串 这里使用的在网络编程中,获取socket状态并显示的时候,遇到的一个问题 #include <QMetaEnum>// 将枚举类型转换为字符串 QMetaEnum metaEnum QMetaEnum::fromType<QAbstractSocket::SocketState>(); const char *c…...
什么是系统工程(字幕)13
0 00:00:00,670 --> 00:00:01,582 如果不加图 1 00:00:01,582 --> 00:00:02,130 怎么加 2 00:00:02,130 --> 00:00:03,225 我们来看一下 3 00:00:03,225 --> 00:00:03,590 你看 4 00:00:03,980 --> 00:00:06,720 右键点这个,添加元素 5 00:00:0…...

qt学习:Table widget控件
目录 头文件 实战 重新配置ui界面 添加头文件 在构造函数中添加初始化 显示方法 该实例是在sqlite项目上添加qt学习:QTSQL连接sqlite数据库增删改查-CSDN博客 头文件 #include <QTableWidgetItem> 实战 重新配置ui界面 用法介绍,可以双击…...
Android --- Content Provider是使用示例,通俗易懂
当两个应用程序之间需要共享数据时,可以通过 Content Provider 来实现。在这个示例中,我们将创建一个简单的 Content Provider,让 App_B 暴露人口总数的数据,并由 App_A 来获取这个数据。 首先,我们来创建一个简单的示…...

02-opencv简单实例效果和基本介绍-上
机器视觉概述 机器视觉是人工智能正在快速发展的一个分支。简单说来,机器视觉就是用机器代替人眼来做测量和判断。机器视觉系统是通过机器视觉产品(即图像摄取装置,分CMOS和CCD两种)将被摄取目标转换成图像信号,传送给专用的图像处理系统,得到被摄目标的形态信息,根据像素…...

中科大计网学习记录笔记(一):Internet | 网络边缘
计算机网络 前言: 学习视频:中科大郑烇、杨坚全套《计算机网络(自顶向下方法 第7版,James F.Kurose,Keith W.Ross)》课程 该视频是B站非常著名的计网学习视频,但相信很多朋友和我一样在听完前面…...

Shell脚本——免交互
目录 一、Here Document免交互 1、免交互概述 2、语法格式 2.1示例:免交互方式实现对行数的统计,将要统计的内容置于标记EOF之间,直接将内容传给wc-l来统计 3、变量设定 ①变量图换成实际值 ②整行内容作为变量并输出结果 ③使输出内…...

【数据分享】1929-2023年全球站点的逐月最高气温数据(Shp\Excel\无需转发)
气象数据是在各项研究中都经常使用的数据,气象指标包括气温、风速、降水、湿度等指标,其中又以气温指标最为常用!说到气温数据,最详细的气温数据是具体到气象监测站点的气温数据! 之前我们分享过1929-2023年全球气象站…...

CentOS gui 图形界面显示文字乱码
一、现象 CentOS(CentOS 7.5)控制台下显示中文乱码: 或者通过X11 Forwarding远程显示CentOS的图形化程序文字乱码: 二、解决方法 安装中文语言包: yum install kde-l10n-Chinese 注:网上有些文章会推荐安…...

龙虎榜——20250610
上证指数放量收阴线,个股多数下跌,盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型,指数短线有调整的需求,大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的:御银股份、雄帝科技 驱动…...

Python:操作 Excel 折叠
💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...

Swift 协议扩展精进之路:解决 CoreData 托管实体子类的类型不匹配问题(下)
概述 在 Swift 开发语言中,各位秃头小码农们可以充分利用语法本身所带来的便利去劈荆斩棘。我们还可以恣意利用泛型、协议关联类型和协议扩展来进一步简化和优化我们复杂的代码需求。 不过,在涉及到多个子类派生于基类进行多态模拟的场景下,…...
FastAPI 教程:从入门到实践
FastAPI 是一个现代、快速(高性能)的 Web 框架,用于构建 API,支持 Python 3.6。它基于标准 Python 类型提示,易于学习且功能强大。以下是一个完整的 FastAPI 入门教程,涵盖从环境搭建到创建并运行一个简单的…...

DAY 47
三、通道注意力 3.1 通道注意力的定义 # 新增:通道注意力模块(SE模块) class ChannelAttention(nn.Module):"""通道注意力模块(Squeeze-and-Excitation)"""def __init__(self, in_channels, reduction_rat…...
1688商品列表API与其他数据源的对接思路
将1688商品列表API与其他数据源对接时,需结合业务场景设计数据流转链路,重点关注数据格式兼容性、接口调用频率控制及数据一致性维护。以下是具体对接思路及关键技术点: 一、核心对接场景与目标 商品数据同步 场景:将1688商品信息…...

《用户共鸣指数(E)驱动品牌大模型种草:如何抢占大模型搜索结果情感高地》
在注意力分散、内容高度同质化的时代,情感连接已成为品牌破圈的关键通道。我们在服务大量品牌客户的过程中发现,消费者对内容的“有感”程度,正日益成为影响品牌传播效率与转化率的核心变量。在生成式AI驱动的内容生成与推荐环境中࿰…...
OkHttp 中实现断点续传 demo
在 OkHttp 中实现断点续传主要通过以下步骤完成,核心是利用 HTTP 协议的 Range 请求头指定下载范围: 实现原理 Range 请求头:向服务器请求文件的特定字节范围(如 Range: bytes1024-) 本地文件记录:保存已…...

相机从app启动流程
一、流程框架图 二、具体流程分析 1、得到cameralist和对应的静态信息 目录如下: 重点代码分析: 启动相机前,先要通过getCameraIdList获取camera的个数以及id,然后可以通过getCameraCharacteristics获取对应id camera的capabilities(静态信息)进行一些openCamera前的…...

12.找到字符串中所有字母异位词
🧠 题目解析 题目描述: 给定两个字符串 s 和 p,找出 s 中所有 p 的字母异位词的起始索引。 返回的答案以数组形式表示。 字母异位词定义: 若两个字符串包含的字符种类和出现次数完全相同,顺序无所谓,则互为…...