当前位置: 首页 > news >正文

开源模型应用落地-业务优化篇(三)

一、前言

    假如您跟随我的脚步,学习到上一篇的内容,到这里,相信细心的您,已经发现了,在上一篇中遗留的问题。那就是IM服务过载的时候,如何进行水平扩容?

    因为在每个IM服务中,我们用JVM缓存了用户与WS的通道的绑定关系,并且使用Redis队列进行解耦。那扩展了IM服务实例之后,如何确保Redis队列的消息能正常消费,即如何能找回对应的用户通道?别着急,接下来,我将给您做详细的解释。


二、术语

2.1.水平扩容

    是指通过增加系统中的资源实例数量来提高系统的处理能力和吞吐量。在计算机领域,水平扩容通常用于应对系统负载的增加或需要处理更多请求的情况。

2.2.无状态

    无状态(stateless)是指系统或组件在处理请求时不依赖于之前的请求或会话信息。换句话说,每个请求都是独立的,系统不会在不同的请求之间保存任何状态或上下文信息。

    在无状态系统中,每个请求被视为一个独立的事件,系统只关注当前请求所包含的信息和参数,而不依赖于之前的请求历史。这使得系统更加简单、可伸缩和易于管理。


三、前置条件

3.1. 已经完成前两篇的学习


四、技术实现

4.1、实现思路

    首先,IM服务是状态的(AI服务是无状态),每个实例中,会缓存用户与WebSocket通道之间的信息。那是否可以采用中间共享存储的方式,将状态信息保存至Redis或外部存储中?答案是:不行。WebScoket的通道信息,无法进行序列化。

    要实现IM服务水平扩容的方式有多种,但目前我们采用以下的方案:

  1.   每个IM服务保存对应的用户和WS通道的关系;
  2.   每个IM服务对应唯一一个redis队列;
  3.   前置的SLB(App入口)能根据用户标识(哈希)将请求转发至指定的IM服务;
  4.   当某一IM服务出现故障的时候,由App端发起重连,重新建立WebSocket连接。

4.2、调整配置文件

# 每个IM服务实例指定全局唯一的ID,例如:下面指定的node:0

ws:server:node: 0

PS:具体参数可以在外部指定,作为JVM的运行参数传入

4.3、调整业务逻辑处理类

# 将原有Redis的单一队列名,改为拼接上全局唯一ID的方式

# Redis中缓存的数据如下

4.4、调整任务处理类

# 将原有Redis的单一队列名,改为拼接上全局唯一ID的方式


五、测试

# 这次换一下测试方式,用离线页面的方式进行测试

5.1.  建立连接

5.2.  业务初始化

5.3.  业务对话


六、附带说明

6.1. BusinessHandler完整代码

import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandler;
import lombok.extern.slf4j.Slf4j;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;/*** @Description: 处理消息的handler*/
@Slf4j
@ChannelHandler.Sharable
@Component
public class BusinessHandler extends AbstractBusinessLogicHandler<TextWebSocketFrame> {public static final String LINE_UP_QUEUE_NAME = "AI-REQ-QUEUE";private static final String LINE_UP_LOCK_NAME = "AI-REQ-LOCK";private static final int MAX_QUEUE_SIZE = 100;//    @Autowired
//    private TaskUtils taskExecuteUtils;@Autowiredprivate RedisUtils redisUtils;@Autowiredprivate RedissonClient redissonClient;@Autowiredprivate NettyConfig nettyConfig;@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {String channelId = ctx.channel().id().asShortText();log.info("add client,channelId:{}", channelId);}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {String channelId = ctx.channel().id().asShortText();log.info("remove client,channelId:{}", channelId);}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame)throws Exception {// 获取客户端传输过来的消息String content = textWebSocketFrame.text();// 兼容在线测试if (StringUtils.equals(content, "PING")) {buildResponse(channelHandlerContext, ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode())).respTime(String.valueOf(System.currentTimeMillis())).msgType(String.valueOf(MsgType.HEARTBEAT.getCode())).contents("心跳测试,很高兴收到你的心跳包").build());return;}log.info("接收到客户端发送的信息: {}", content);Long userIdForReq;String msgType = "";String contents = "";try {ApiReqMessage apiReqMessage = JSON.parseObject(content, ApiReqMessage.class);msgType = apiReqMessage.getMsgType();contents = apiReqMessage.getContents();userIdForReq = apiReqMessage.getUserId();// 用户身份标识校验if (null == userIdForReq || (long) userIdForReq <= 10000) {ApiRespMessage apiRespMessage = ApiRespMessage.builder().code(String.valueOf(StatusCode.SYSTEM_ERROR.getCode())).respTime(String.valueOf(System.currentTimeMillis())).contents("用户身份标识有误!").msgType(String.valueOf(MsgType.SYSTEM.getCode())).build();buildResponseAndClose(channelHandlerContext, apiRespMessage);return;}if (StringUtils.equals(msgType, String.valueOf(MsgType.CHAT.getCode()))) {// 对用户输入的内容进行自定义违规词检测// 对用户输入的内容进行第三方在线违规词检测// 对用户输入的内容进行组装成Prompt// 对Prompt根据业务进行增强(完善prompt的内容)// 对history进行裁剪或总结(检测history是否操作模型支持的上下文长度,例如qwen-7b支持的上下文长度为8192)// ...//                通过线程池来处理
//                String messageId = apiReqMessage.getMessageId();
//                List<ChatContext> history = apiReqMessage.getHistory();
//                AITaskReqMessage aiTaskReqMessage = AITaskReqMessage.builder().messageId(messageId).userId(userIdForReq).contents(contents).history(history).build();
//                taskExecuteUtils.execute(aiTaskReqMessage);//                通过队列来缓冲boolean flag = true;RLock lock = redissonClient.getLock(LINE_UP_LOCK_NAME);String queueName = LINE_UP_QUEUE_NAME+"-"+nettyConfig.getNode();//尝试获取锁,最多等待3秒,锁的自动释放时间为10秒if (lock.tryLock(3, 10, TimeUnit.SECONDS)) {try {if (redisUtils.queueSize(queueName) < MAX_QUEUE_SIZE) {redisUtils.queueAdd(queueName, content);log.info("当前线程为:{}, 添加请求至redis队列",Thread.currentThread().getName());} else {flag = false;}} catch (Throwable e) {log.error("系统处理异常", e);} finally {lock.unlock();}} else {flag = false;}if (!flag) {buildResponse(channelHandlerContext, ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode())).respTime(String.valueOf(System.currentTimeMillis())).msgType(String.valueOf(MsgType.SYSTEM.getCode())).contents("当前排队人数较多,请稍后再重试!").build());}} else if (StringUtils.equals(msgType, String.valueOf(MsgType.INIT.getCode()))) {//一、业务黑名单检测(多次违规,永久锁定)//二、账户锁定检测(临时锁定)//三、多设备登录检测//四、剩余对话次数检测//检测通过,绑定用户与channel之间关系addChannel(channelHandlerContext, userIdForReq);String respMessage = "用户标识: " + userIdForReq + " 登录成功";buildResponse(channelHandlerContext, ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode())).respTime(String.valueOf(System.currentTimeMillis())).msgType(String.valueOf(MsgType.INIT.getCode())).contents(respMessage).build());} else if (StringUtils.equals(msgType, String.valueOf(MsgType.HEARTBEAT.getCode()))) {buildResponse(channelHandlerContext, ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode())).respTime(String.valueOf(System.currentTimeMillis())).msgType(String.valueOf(MsgType.HEARTBEAT.getCode())).contents("心跳测试,很高兴收到你的心跳包").build());} else {log.info("用户标识: {}, 消息类型有误,不支持类型: {}", userIdForReq, msgType);}} catch (Exception e) {log.warn("【BusinessHandler】接收到请求内容:{},异常信息:{}", content, e.getMessage(), e);// 异常返回return;}}}

6.2. TaskUtils完整代码

import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;@Component
@Slf4j
public class TaskUtils implements ApplicationRunner {private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);@Autowiredprivate AIChatUtils aiChatUtils;@Autowiredprivate RedisUtils redisUtils;@Autowiredprivate NettyConfig nettyConfig;@Overridepublic void run(ApplicationArguments args) throws Exception {while(true){String queueName = BusinessHandler.LINE_UP_QUEUE_NAME+"-"+nettyConfig.getNode();
//             执行定时任务的逻辑String content = redisUtils.queuePoll(queueName);if(StringUtils.isNotEmpty(content) && StringUtils.isNoneBlank(content)){try{ApiReqMessage apiReqMessage = JSON.parseObject(content, ApiReqMessage.class);String messageId = apiReqMessage.getMessageId();String contents = apiReqMessage.getContents();Long userIdForReq = apiReqMessage.getUserId();List<ChatContext> history = apiReqMessage.getHistory();AITaskReqMessage aiTaskReqMessage = AITaskReqMessage.builder().messageId(messageId).userId(userIdForReq).contents(contents).history(history).build();execute(aiTaskReqMessage);}catch (Throwable e){log.error("处理消息出现异常",e);//将请求再次返回去队列//将请求丢弃//其他处理?}}else{TimeUnit.SECONDS.sleep(1);}}}public void execute(AITaskReqMessage aiTaskReqMessage) {executorService.execute(() -> {Long userId = aiTaskReqMessage.getUserId();if (null == userId || (long) userId < 10000) {log.warn("用户身份标识有误!");return;}ChannelHandlerContext channelHandlerContext = BusinessHandler.getContextByUserId(userId);if (channelHandlerContext != null) {try {aiChatUtils.chatStream(aiTaskReqMessage);} catch (Throwable exception) {exception.printStackTrace();}}});}public static void destory() {executorService.shutdownNow();executorService = null;}}

相关文章:

开源模型应用落地-业务优化篇(三)

一、前言 假如您跟随我的脚步&#xff0c;学习到上一篇的内容&#xff0c;到这里&#xff0c;相信细心的您&#xff0c;已经发现了&#xff0c;在上一篇中遗留的问题。那就是IM服务过载的时候&#xff0c;如何进行水平扩容&#xff1f; 因为在每个IM服务中&#xff0c;我们用JV…...

基于SpringBoot+Vue实现的物流快递仓库管理系统

基于SpringBootVue实现的物流快递仓库管理系统 文章目录 基于SpringBootVue实现的物流快递仓库管理系统系统介绍技术选型成果展示账号地址及其他说明源码获取 系统介绍 系统演示 关注视频号【全栈小白】&#xff0c;观看演示视频 基于SpringBootVue实现的物流快递仓库管理系…...

编程笔记 html5cssjs 072 JavaScrip BigInt数据类型

编程笔记 html5&css&js 072 JavaScrip BigInt数据类型 一、BigInt 数据类型二、BigInt 的创建和使用三、BigInt 操作与方法三、示例小结 JavaScript BigInt 数据类型是一种内置的数据类型&#xff0c;用于表示大于 Number.MAX_SAFE_INTEGER&#xff08;即2^53 - 1&…...

matlab simulink 步进电机控制

1、内容简介 略 41-可以交流、咨询、答疑 2、内容说明 电动执行器定位控制在生产生活中具有广泛的应用&#xff0c;在使用搭载步进电机的电动执行器进行定位控制的时候&#xff0c;定位系统的定位精度和响应波形&#xff0c;会随着负载质量的变化而变化&#xff0c;这是由电…...

使用阿里云的IDaaS实现知行之桥EDI系统的单点登录

&#xff0c;在开始测试之前&#xff0c;需要确定用哪个信息作为“登陆用户的ID字段”。 这个字段用来在完成SSO登陆之后&#xff0c;用哪个信息将阿里云IDaaS的用户和知行之桥EDI系统的用户做对应。这里我们使用了 phonenumber 这个自定义属性。需要在阿里云做如下配置&#x…...

基于微服务的高考志愿智能辅助决策系统(附源码)

目录 一.引言 1、编写目的 2、系统功能概述 二.功能分析 三.微服务模块 1、微服务用户相关模块 &#xff08;1&#xff09;用户注册 &#xff08;2&#xff09;用户登录 &#xff08;3&#xff09;用户信息管理 &#xff08;4&#xff09;用户操作 2、微服务文件云存…...

LeetCode —— 137. 只出现一次的数字 II

&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️Take your time ! &#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️…...

pnpm、npm、yarn 包管理工具

1、npm 关键词&#xff1a;软件包管理器、命令行工具、一个社区和一个平台 npm&#xff08;Node Package Manager&#xff09;是一个用于Node.js环境的软件包管理器。它是一个命令行工具&#xff0c;用于安装、升级、删除和管理JavaScript软件包。npm最初是随同Node.js一起发布…...

微服务知识

1、概念 大型单体应用拆分成多个独立部署运行的微服务&#xff08;解决并发问题&#xff09;​​​​​​​ 2、特点 3、技术栈 4、微服务带来的问题 ​​​​​​​ 5、微服务的注册中心 服务注册与发现&#xff1a;微服务实例在启动时会向注册中心注册自己的信息&#xf…...

如何在微信搭建私域流量池?

A: ①给客户打标签 添加标签&#xff0c;多维度构建用户画像&#xff0c;精准发送消息。 ②群发信息 选择自定义时间&#xff0c;上传内容 (支持文字&#xff0c;图片) &#xff0c;一键群发 。 ③建立专属素材库 将常用的话术、图片与文件录入至素材库&#xff0c;员工可随…...

MySQL原理(三)锁定机制(1)综述

一、介绍&#xff1a; 1、锁的本质 业务场景中存在共享资源&#xff0c;多个进程或线程需要竞争获取并处理共享资源&#xff0c;为了保证公平、可靠、结果正确等业务逻辑&#xff0c;要把并发执行的问题变为串行&#xff0c;串行时引入第三方锁当成谁有权限来操作共享资源的判…...

Qt知识点总结

将枚举类型转换为字符串 这里使用的在网络编程中&#xff0c;获取socket状态并显示的时候&#xff0c;遇到的一个问题 #include <QMetaEnum>// 将枚举类型转换为字符串 QMetaEnum metaEnum QMetaEnum::fromType<QAbstractSocket::SocketState>(); const char *c…...

什么是系统工程(字幕)13

0 00:00:00,670 --> 00:00:01,582 如果不加图 1 00:00:01,582 --> 00:00:02,130 怎么加 2 00:00:02,130 --> 00:00:03,225 我们来看一下 3 00:00:03,225 --> 00:00:03,590 你看 4 00:00:03,980 --> 00:00:06,720 右键点这个&#xff0c;添加元素 5 00:00:0…...

qt学习:Table widget控件

目录 头文件 实战 重新配置ui界面 添加头文件 在构造函数中添加初始化 显示方法 该实例是在sqlite项目上添加qt学习&#xff1a;QTSQL连接sqlite数据库增删改查-CSDN博客 头文件 #include <QTableWidgetItem> 实战 重新配置ui界面 用法介绍&#xff0c;可以双击…...

Android --- Content Provider是使用示例,通俗易懂

当两个应用程序之间需要共享数据时&#xff0c;可以通过 Content Provider 来实现。在这个示例中&#xff0c;我们将创建一个简单的 Content Provider&#xff0c;让 App_B 暴露人口总数的数据&#xff0c;并由 App_A 来获取这个数据。 首先&#xff0c;我们来创建一个简单的示…...

02-opencv简单实例效果和基本介绍-上

机器视觉概述 机器视觉是人工智能正在快速发展的一个分支。简单说来,机器视觉就是用机器代替人眼来做测量和判断。机器视觉系统是通过机器视觉产品(即图像摄取装置,分CMOS和CCD两种)将被摄取目标转换成图像信号,传送给专用的图像处理系统,得到被摄目标的形态信息,根据像素…...

中科大计网学习记录笔记(一):Internet | 网络边缘

计算机网络 前言&#xff1a; 学习视频&#xff1a;中科大郑烇、杨坚全套《计算机网络&#xff08;自顶向下方法 第7版&#xff0c;James F.Kurose&#xff0c;Keith W.Ross&#xff09;》课程 该视频是B站非常著名的计网学习视频&#xff0c;但相信很多朋友和我一样在听完前面…...

Shell脚本——免交互

目录 一、Here Document免交互 1、免交互概述 2、语法格式 2.1示例&#xff1a;免交互方式实现对行数的统计&#xff0c;将要统计的内容置于标记EOF之间&#xff0c;直接将内容传给wc-l来统计 3、变量设定 ①变量图换成实际值 ②整行内容作为变量并输出结果 ③使输出内…...

【数据分享】1929-2023年全球站点的逐月最高气温数据(Shp\Excel\无需转发)

气象数据是在各项研究中都经常使用的数据&#xff0c;气象指标包括气温、风速、降水、湿度等指标&#xff0c;其中又以气温指标最为常用&#xff01;说到气温数据&#xff0c;最详细的气温数据是具体到气象监测站点的气温数据&#xff01; 之前我们分享过1929-2023年全球气象站…...

CentOS gui 图形界面显示文字乱码

一、现象 CentOS&#xff08;CentOS 7.5&#xff09;控制台下显示中文乱码&#xff1a; 或者通过X11 Forwarding远程显示CentOS的图形化程序文字乱码&#xff1a; 二、解决方法 安装中文语言包&#xff1a; yum install kde-l10n-Chinese 注&#xff1a;网上有些文章会推荐安…...

抖音内容下载终极指南:5分钟搞定批量下载与去水印

抖音内容下载终极指南&#xff1a;5分钟搞定批量下载与去水印 【免费下载链接】douyin-downloader A practical Douyin downloader for both single-item and profile batch downloads, with progress display, retries, SQLite deduplication, and browser fallback support. …...

跟着 MDN 学CSS day_9:(深入掌握CSS选择器核心技能测试)

在Web开发的学习路径中&#xff0c;CSS选择器是构建一切样式体系的基石。无论你是刚入门的新手&#xff0c;还是有一定经验的开发者&#xff0c;对选择器的理解深度直接决定了你能否高效、精准地控制页面元素的样式表现。MDN Web 文档提供了一套经典的"技能测试&#xff1…...

从‘相框’与‘相片’说起:彻底搞懂MFC文档/视图架构与消息路由(含实战避坑)

从相框到相片&#xff1a;深入解析MFC文档/视图架构的设计哲学与实战应用 在Windows桌面应用开发的历史长河中&#xff0c;MFC&#xff08;Microsoft Foundation Classes&#xff09;作为经典的C框架&#xff0c;其独特的文档/视图架构一直是开发者又爱又恨的设计。想象一下相框…...

《信息学奥赛一本通 编程启蒙C++版》适合小学生学习吗

‌适合小学生学习&#xff0c;尤其适合小学低年级作为C启蒙入门使用‌&#xff0c;可以按照以下方式安排阅读学习&#xff1a; 一、适配性说明 这本书是专门针对低龄学习者设计的C编程启蒙内容&#xff0c;整体难度较低、循序渐进&#xff1a; 1、对于小学1-4年级的孩子&#x…...

【电路板】基于matlab模拟电路板激光加工中的热分布【含Matlab源码 15559期】

&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;&#x1f49e;&#x1f49e;&#x1f49e;&#x1f49e;&#x1f49e;&#x1f49e;欢迎来到海神之光博客之家&#x1f49e;&#x1f49e;&#x1f49e;&#x1f49…...

Gemini深度研究模式到底有多强?3个颠覆性实验结果揭示它如何重构科研工作流

更多请点击&#xff1a; https://kaifayun.com 第一章&#xff1a;Gemini深度研究模式到底有多强&#xff1f;3个颠覆性实验结果揭示它如何重构科研工作流 Gemini深度研究模式并非简单增强版对话功能&#xff0c;而是一套面向复杂知识任务的原生协同推理架构。其核心突破在于支…...

AI Agent社交交互延迟超800ms?——用eBPF+LLM Token流控双引擎压测实录(性能提升4.8倍原始基线)

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;AI Agent社交交互延迟超800ms&#xff1f;——用eBPFLLM Token流控双引擎压测实录&#xff08;性能提升4.8倍原始基线&#xff09; 当AI Agent在高并发社交场景中响应延迟突破800ms&#xff0c;用户会感…...

麻雀AI助手Akagi:免费实时分析工具,5分钟提升雀魂游戏水平 [特殊字符]️

麻雀AI助手Akagi&#xff1a;免费实时分析工具&#xff0c;5分钟提升雀魂游戏水平 &#x1f004;️ 【免费下载链接】Akagi 支持雀魂、天鳳、麻雀一番街、天月麻將&#xff0c;能夠使用自定義的AI模型實時分析對局並給出建議&#xff0c;內建Mortal AI作為示例。 Supports Majs…...

Vue-Tree-List:轻松构建优雅树形结构的Vue组件指南

Vue-Tree-List&#xff1a;轻松构建优雅树形结构的Vue组件指南 【免费下载链接】vue-tree-list &#x1f332;A vue component for tree structure 项目地址: https://gitcode.com/gh_mirrors/vu/vue-tree-list 你是否曾为在Vue项目中实现复杂的树形结构而感到头疼&…...

在Node.js服务中集成Taotoken实现智能问答与内容生成功能

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 在Node.js服务中集成Taotoken实现智能问答与内容生成功能 对于Node.js后端开发者而言&#xff0c;为应用添加智能问答或内容生成能…...