当前位置: 首页 > news >正文

MindSpore Serving基于昇腾910B实现大模型部署

一、Why MindSpore Serving

大模型时代,作为一个开发人员更多的是关注一个大模型如何训练好、如何调整模型参数、如何才能得到一个更高的模型精度。而作为一个整体项目,只有项目落地才能有其真正的价值。那么如何才能够使得大模型实现落地?如何才能使大模型项目中的文件以app的形式呈现给用户?

解决这个问题的一个组件就是Serving(服务),它主要解决的问题有:

  • 模型如何提交给服务;
  • 服务如何部署;
  • 服务如何呈现给用户;
  • 如何应用各种复杂场景等待

MindSpore Serving就是为了实现将大模型部署到生产环境而产生的。

MindSpore Serving是一个轻量级、高性能的服务模块,旨在帮助MindSpore开发者在生产环境中高效部署在线推理服务。当用户使用MindSpore完成模型训练后,导出MindIR,即可使用MindSpore Serving创建该大模型的推理服务。

MindSpore Serving实现的是一个模型服务化的部署,也就是说模型以线上的形式部署在服务器和云上,客户通过浏览器或者客户端去访问这个服务,将需要进行推理的输入内容发送给服务器,然后服务器将推理的结果返回给用户。

二、Component

MindSpore Serving由三部分组成,分别是客户端(Client)、Master和Worker。

  • 客户端是用户节点,提供了gRPC和RESTful的访问。

  • Master是一个管理节点,管理所有Worker的信息,包括Worker有哪些模型的信息;Master也是一个分化节点,接收到了客户端的请求之后,会根据请求的内容,结合当前管理的Worker节点的信息进行分发,将请求分发给不同的Worker执行。

  • Worker是一个执行节点,会执行加载、模型的更新,在接收到Master转发的请求之后,会将请求进行组装和拆分,然后做前处理、推理和后处理,执行完之后将结果返回给Master,Master再将结果返回给客户端。

三、Features

1.简单易用:
对客户端提供了gRPC和RESTful的服务,同时又提供了服务的拉起、服务的部署和客户端的访问,提供了简单的python接口,通过python接口,用户可以很方便的定制和访问部署服务,只需要一行命令就能够完成一件事。

2.提供定制化的服务:
对于模型来说输入和输出一般是固定的,而对于用户来说输入和输出可能是多变的,这就需要一个预处理模块,将模型的输入转为一个模型可以识别的输入。同时还需要一个后处理模块,给用户提供定制化的服务,针对模型可以定制方法classifly_top,用户根据需要去写前处理和后处理的操作。对于客户端来说只要指定模型名和方法名就能实现推理的结果。

3.支持批处理:
主要是针对具有batchsize维度的文本来说。batchsize实现了文本的并行,在硬件资源足够的情况下,batchsize可以很大地提高性能。对于MindSpore Serving来说,用户一次性发送的请求是不确定的,因此Serving分割和组合一个或者多个请求以匹配用户模型的batchsize。例如batchsize=2,但是有三个请求发过来,这时候就会将两个请求合并处理,到后面再拆分,这样就实现了三个请求的并行,提高了效率。

  1. 高性能扩展:
    MindSpore Serving所使用的算子引擎框架是MindSpore框架,具有自动融合和自动并行的高性能,再加上MindSpore Serving本身具有一个高性能的底层通信能力,客户端可以进行多实例组装,模型支持批处理,多模型之间支持并发,预处理和后处理支持多线程的处理。客户端和Worker可以实现扩展的,因此它也实现了一个高扩展性。

四、Demo

基于昇腾910B3

start_agent.py
from agent.agent_multi_post_method import *
from multiprocessing import Queuefrom config.serving_config import AgentConfig, ModelNameif __name__ == "__main__":startup_queue = Queue(1024)startup_agents(AgentConfig.ctx_setting,AgentConfig.inc_setting,AgentConfig.post_model_setting,len(AgentConfig.AgentPorts),AgentConfig.prefill_model,AgentConfig.decode_model,AgentConfig.argmax_model,AgentConfig.topk_model,startup_queue)started_agents = 0while True:value = startup_queue.get()print("agent : %f started" % value)started_agents = started_agents + 1if started_agents >= len(AgentConfig.AgentPorts):print("all agents started")break# server_app_post.init_server_app()# server_app_post.warmup_model(ModelName)# server_app_post.run_server_app()
client/server_app_post.py
import asyncio
import json
import logging
import signal
import sys
import uuid
from multiprocessing import Processimport uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse, ServerSentEventfrom client.client_utils import ClientRequest, Parameters
from config.serving_config import SERVER_APP_HOST, SERVER_APP_PORT
from server.llm_server_post import LLMServerlogging.basicConfig(level=logging.DEBUG,filename='./output/server_app.log',filemode='w',format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s')app = FastAPI()
llm_server = Noneasync def get_full_res(request, results):all_texts = ''async for result in results:prompt_ = result.promptanswer_texts = [output.text for output in result.outputs]text = answer_texts[0]if text is None:text = ""all_texts += textret = {"generated_text": all_texts,}yield (json.dumps(ret, ensure_ascii=False) + '\n').encode("utf-8")async def get_full_res_sse(request, results):all_texts = ''async for result in results:answer_texts = [output.text for output in result.outputs]text = answer_texts[0]if text is None:text = ""all_texts += textret = {"event": "message", "retry": 30000, "generated_text": all_texts}yield json.dumps(ret, ensure_ascii=False)async def get_stream_res(request, results):all_texts = ''index = 0async for result in results:prompt_ = result.promptanswer_texts = [output.text for output in result.outputs]text = answer_texts[0]if text is None:text = ""else:index += 1all_texts += textret = {"token": {"text": text,"index": index},}print(ret, index)yield ("data:" + json.dumps(ret, ensure_ascii=False) + '\n').encode("utf-8")print(all_texts)return_full_text = request.parameters.return_full_textif return_full_text:ret = {"generated_text": all_texts,}yield ("data:" + json.dumps(ret, ensure_ascii=False) + '\n').encode("utf-8")async def get_stream_res_sse(request, results):all_texts = ""index = 0async for result in results:answer_texts = [output.text for output in result.outputs]text = answer_texts[0]if text is None:text = ""else:index += 1all_texts += textret = {"event": "message", "retry": 30000, "data": text}yield json.dumps(ret, ensure_ascii=False)print(all_texts)if request.parameters.return_full_text:ret = {"event": "message", "retry": 30000, "data": all_texts}yield json.dumps(ret, ensure_ascii=False)def send_request(request: ClientRequest):print('request: ', request)request_id = str(uuid.uuid1())if request.parameters is None:request.parameters = Parameters()if request.parameters.do_sample is None:request.parameters.do_sample = Falseif request.parameters.top_k is None:request.parameters.top_k = 3if request.parameters.top_p is None:request.parameters.top_p = 1.0if request.parameters.temperature is None:request.parameters.temperature = 1.0if request.parameters.repetition_penalty is None:request.parameters.repetition_penalty = 1.0if request.parameters.max_new_tokens is None:request.parameters.max_new_tokens = 300if request.parameters.return_protocol is None:request.parameters.return_protocol = "sse"if request.parameters.top_k < 0:request.parameters.top_k = 0if request.parameters.top_p < 0.01:request.parameters.top_p = 0.01if request.parameters.top_p > 1.0:request.parameters.top_p = 1.0params = {"prompt": request.inputs,"do_sample": request.parameters.do_sample,"top_k": request.parameters.top_k,"top_p": request.parameters.top_p,"temperature": request.parameters.temperature,"repetition_penalty": request.parameters.repetition_penalty,"max_token_len": request.parameters.max_new_tokens}print('generate_answer...')global llm_serverresults = llm_server.generate_answer(request_id, **params)return results@app.post("/models/llama2")
async def async_generator(request: ClientRequest):results = send_request(request)if request.stream:if request.parameters.return_protocol == "sse":print('get_stream_res_sse...')return EventSourceResponse(get_stream_res_sse(request, results),media_type="text/event-stream",ping_message_factory=lambda: ServerSentEvent(**{"comment": "You can't see this ping"}),ping=600)else:print('get_stream_res...')return StreamingResponse(get_stream_res(request, results))else:print('get_full_res...')return StreamingResponse(get_full_res(request, results))@app.post("/models/llama2/generate")
async def async_full_generator(request: ClientRequest):results = send_request(request)print('get_full_res...')return StreamingResponse(get_full_res(request, results))@app.post("/models/llama2/generate_stream")
async def async_stream_generator(request: ClientRequest):results = send_request(request)if request.parameters.return_protocol == "sse":print('get_stream_res_sse...')return EventSourceResponse(get_stream_res_sse(request, results),media_type="text/event-stream",ping_message_factory=lambda: ServerSentEvent(**{"comment": "You can't see this ping"}),ping=600)else:print('get_stream_res...')return StreamingResponse(get_stream_res(request, results))def update_internlm_request(request: ClientRequest):if request.inputs:request.inputs = "<s><|User|>:{}<eoh>\n<|Bot|>:".format(request.inputs)@app.post("/models/internlm")
async def async_internlm_generator(request: ClientRequest):# update_internlm_request(request)return await async_generator(request)@app.post("/models/internlm/generate")
async def async_internlm_full_generator(request: ClientRequest):# update_internlm_request(request)return await async_full_generator(request)@app.post("/models/internlm/generate_stream")
async def async_internlm_stream_generator(request: ClientRequest):# update_internlm_request(request)return await async_stream_generator(request)def init_server_app():global llm_serverllm_server = LLMServer()print('init server app finish')async def warmup(request: ClientRequest):request.parameters = Parameters(max_new_tokens=3)results = send_request(request)print('warmup get_stream_res...')async for item in get_stream_res(request, results):print(item)def warmup_llama2():request = ClientRequest(inputs="test")asyncio.run(warmup(request))print('warmup llama2 finish')def warmup_internlm():request = ClientRequest(inputs="test")update_internlm_request(request)asyncio.run(warmup(request))print('warmup internlm finish')def run_server_app():print('server port is: ', SERVER_APP_PORT)uvicorn.run(app, host=SERVER_APP_HOST, port=SERVER_APP_PORT)WARMUP_MODEL_MAP = {"llama": warmup_llama2,"internlm": warmup_internlm,
}def warmup_model(model_name):model_prefix = model_name.split('_')[0]if model_prefix in WARMUP_MODEL_MAP.keys():func = WARMUP_MODEL_MAP[model_prefix]warmup_process = Process(target=func)warmup_process.start()warmup_process.join()print("mindspore serving is started.")else:print("model not support warmup : ", model_name)async def _get_batch_size():global llm_serverbatch_size = llm_server.get_bs_current()ret = {'event': "message", "retry": 30000, "data": batch_size}yield json.dumps(ret, ensure_ascii=False)async def _get_request_numbers():global llm_serverqueue_size = llm_server.get_queue_current()ret = {'event': "message", "retry": 30000, "data": queue_size}yield json.dumps(ret, ensure_ascii=False)@app.get("/serving/get_bs")
async def get_batch_size():return EventSourceResponse(_get_batch_size(),media_type="text/event-stream",ping_message_factory=lambda: ServerSentEvent(**{"comment": "You can't see this ping"}),ping=600)@app.get("/serving/get_request_numbers")
async def get_request_numbers():return EventSourceResponse(_get_request_numbers(),media_type="text/event-stream",ping_message_factory=lambda: ServerSentEvent(**{"comment": "You can't see this ping"}),ping=600)def sig_term_handler(signal, frame):print("catch SIGTERM")global llm_serverllm_server.stop()print("----serving exit----")sys.exit(0)if __name__ == "__main__":signal.signal(signal.SIGTERM, sig_term_handler)init_server_app()# warmup_model(ModelName)run_server_app()

相关文章:

MindSpore Serving基于昇腾910B实现大模型部署

一、Why MindSpore Serving 大模型时代&#xff0c;作为一个开发人员更多的是关注一个大模型如何训练好、如何调整模型参数、如何才能得到一个更高的模型精度。而作为一个整体项目&#xff0c;只有项目落地才能有其真正的价值。那么如何才能够使得大模型实现落地&#xff1f;如…...

mysql原理--InnoDB的Buffer Pool

1.缓存的重要性 对于使用 InnoDB 作为存储引擎的表来说&#xff0c;不管是用于存储用户数据的索引&#xff08;包括聚簇索引和二级索引&#xff09;&#xff0c;还是各种系统数据&#xff0c;都是以 页 的形式存放在 表空间 中的&#xff0c;而所谓的 表空间 只不过是 InnoDB 对…...

Redis不同环境缓存同一条数据,数据内部值不同

背景 现实中&#xff0c;本地环境&#xff08;dev&#xff09;和开发环境&#xff08;feature&#xff09;会共同使用相同的中间件&#xff08;本篇拿Redis举例&#xff09;&#xff0c;对于不同环境中的&#xff0c;图片、视频、语音等资源类型的预览地址url&#xff0c;需要配…...

MySQL之导入、导出远程备份

一、Navicat工具导入、导出 1.1 导入 第一步&#xff1a; 右键&#xff0c;点击运行SQL文件 第二步&#xff1a; 选择要运行的SQL&#xff0c;点击开始 第三步&#xff1a; 关闭即可 1.2 导出 第一步&#xff1a; 右键选择&#xff0c;导出向导 第二步&#xff1a; 选择SQL脚…...

OpenGL学习笔记-Blending

混合方程中&#xff0c;Csource是片段着色器输出的颜色向量&#xff08;the color output of the fragment shader&#xff09;&#xff0c;其权重为Fsource。Cdestination是当前存储在color buffer中的颜色向量&#xff08;the color vector that is currently stored in the …...

支持 input 函数的在线 python 运行环境 - 基于队列

支持 input 函数的在线 python 运行环境 - 基于队列 思路两次用户输入三次用户输入 实现前端使用 vue element uiWindows 环境的执行器子进程需要执行的代码 代码仓库参考 本文提供了一种方式来实现支持 input 函数&#xff0c;即支持用户输的在线 python 运行环境。效果如下图…...

欧拉Euler release 21.10 (LTS-SP2)升级openssh至9版本记录

背景&#xff1a;安扫漏洞&#xff0c;需要对openssh经行升级 1.先查看升级前的openssh版本 2.避免升级失败断开远程登录&#xff0c;先开启telnt服务用于远程连接&#xff08;这步可查看其他博客&#xff09; 3.从欧拉官网下载rpm包&#xff0c;https://www.openeuler.org/zh…...

php 数组中的元素进行排列组合

需求背景&#xff1a;计算出数组[A,B,C,D]各种排列组合&#xff0c;希望得到的是数据如下图 直接上代码&#xff1a; private function finish_combination($array, &$groupResult [], $splite ,){$result [];$finish_result [];$this->diffArrayItems($array, $…...

Python从入门到网络爬虫(OS模块详解)

前言 本章介绍python自带模块os&#xff0c;os为操作系统 operating system 的简写&#xff0c;意为python与电脑的交互。os 模块提供了非常丰富的方法用来处理文件和目录。通过使用 os 模块&#xff0c;一方面可以方便地与操作系统进行交互&#xff0c;另一方面页可以极大增强…...

人机交互不是人机融合智能

一、人机交互和人机融合智能是两个不同的概念 人机交互是指人类与计算机之间的信息交流和操作方式&#xff0c;包括输入和输出界面、交互技术、用户体验等方面。人机交互的目标是提供用户友好的界面和自然的交互方式&#xff0c;使人类能够与计算机更加高效地进行沟通和协作。 …...

RabbitMQ解决消息丢失以及重复消费问题

文章目录 1、概念2、基于ACK/NACK机制2.1 基于Spring AMQP框架整合ACK/NACK机制2.2 测试消费失败1.02.3 测试结果1.02.4 测试MQ宕机2.5 测试结果2.0 3、RabbitMQ 如何实现幂等性设计3.1 幂等服务设计思路3.1.1 通过雪花算法生成分布式唯一ID3.1.2 通过枚举类&#xff0c;设计Me…...

docker 安装redis集群

一、准备6台机器 二、6台机器分别拉取镜像&#xff1a; docker pull redis三、6台机器分别建立挂载文件夹 mkdir -p /home/redis/data四、6台机器分别执行容器操作 docker run --restartalways -d --name redis-node-1 --net host --privilegedtrue -v /home/redis/data:/da…...

锂电池制造设备中分布式IO模块优势

在“碳达峰、碳中和”目标推动下&#xff0c;新能源汽车当下发展势头正盛&#xff0c;而纯电动车的核心部件则是&#xff1a;锂电池。动力型锂电池作为新能源汽车核心零部件&#xff0c;其发展与新能源汽车行业息息相关&#xff0c;迎来广阔的市场空间。 为何采用I/O模块&#…...

Android Room数据库升级Migration解决方案

一、介绍 Android Room 是 Android 官方提供的一个轻量级数据库框架&#xff0c;用于在 Android 应用程序中管理数据持久性。它简化了数据库访问&#xff0c;提供了更安全、更快速的数据存储方式&#xff0c;并使得数据操作更加便捷。 二、Room的特点(八股文可以参考) 以下是…...

离线安装docker和docker-compose

1.下载 docker Index of linux/static/stable/x86_64/ docker-compose Overview of installing Docker Compose | Docker Docs 2.docker /etc/systemd/system/docker.service [Unit] DescriptionDocker Application Container Engine Documentationhttps://docs.docker.…...

奇怪的事情记录:外置网卡和外置显示器不兼容

身为程序员&#xff0c;不应该对世界上的稀奇古怪的事情感到惊讶&#xff08;毕竟&#xff0c;大部分都是程序员自己搞出来的&#xff09;。 外置网卡和外置显示器不兼容 mbp2019intel版&#xff0c;win10&#xff0c;外接有线网卡&#xff0c;平时用得很好&#xff0c;接上外…...

【大数据进阶第三阶段之Hive学习笔记】Hive基础入门

目录 1、什么是Hive 2、Hive的优缺点 2.1、 优点 2.2、 缺点 2.2.1、Hive的HQL表达能力有限 2.2.2、Hive的效率比较低 3、Hive架构原理 3.1、用户接口&#xff1a;Client 3.2、元数据&#xff1a;Metastore 3.3、Hadoop 3.4、驱动器&#xff1a;Driver Hive运行机制…...

第三代量子计算机交付,中国芯片开辟新道路,光刻机难挡中国芯

日前安徽本源量子宣布第三代超导量子计算系统正式上线&#xff0c;这是中国最先进的量子计算机&#xff0c;计算量子比特已达到72个&#xff0c;在全球已居于较为领先的水平&#xff0c;这对于中国芯片在原来的硅基芯片受到光刻机阻碍无疑是巨大的鼓舞。 据悉本源量子的第一代、…...

react native中使用tailwind并配置自动补全

使用的第三方库是tailwind-react-native-classnames&#xff0c;同类的也有tailwind-rn&#xff0c;但是我更喜欢前者官方demo&#xff1a; import { View, Text } from react-native; import tw from twrnc;const MyComponent () > (<View style{twp-4 android:pt-2 b…...

数据分析——火车信息

任务目标 任务 1、整理火车发车信息数据&#xff0c;结果的表格形式为&#xff1a; 2、并输出最终的发车信息表 难点 1、多文件 一个文件夹&#xff0c;多个月的发车信息&#xff0c;一个excel&#xff0c;放一天的发车情况 2、数据表的格式特殊 如何分析表是一个难点 数…...

浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)

✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义&#xff08;Task Definition&…...

智慧工地云平台源码,基于微服务架构+Java+Spring Cloud +UniApp +MySql

智慧工地管理云平台系统&#xff0c;智慧工地全套源码&#xff0c;java版智慧工地源码&#xff0c;支持PC端、大屏端、移动端。 智慧工地聚焦建筑行业的市场需求&#xff0c;提供“平台网络终端”的整体解决方案&#xff0c;提供劳务管理、视频管理、智能监测、绿色施工、安全管…...

DIY|Mac 搭建 ESP-IDF 开发环境及编译小智 AI

前一阵子在百度 AI 开发者大会上&#xff0c;看到基于小智 AI DIY 玩具的演示&#xff0c;感觉有点意思&#xff0c;想着自己也来试试。 如果只是想烧录现成的固件&#xff0c;乐鑫官方除了提供了 Windows 版本的 Flash 下载工具 之外&#xff0c;还提供了基于网页版的 ESP LA…...

OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别

OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别 直接训练提示词嵌入向量的核心区别 您提到的代码: prompt_embedding = initial_embedding.clone().requires_grad_(True) optimizer = torch.optim.Adam([prompt_embedding...

Java线上CPU飙高问题排查全指南

一、引言 在Java应用的线上运行环境中&#xff0c;CPU飙高是一个常见且棘手的性能问题。当系统出现CPU飙高时&#xff0c;通常会导致应用响应缓慢&#xff0c;甚至服务不可用&#xff0c;严重影响用户体验和业务运行。因此&#xff0c;掌握一套科学有效的CPU飙高问题排查方法&…...

CRMEB 中 PHP 短信扩展开发:涵盖一号通、阿里云、腾讯云、创蓝

目前已有一号通短信、阿里云短信、腾讯云短信扩展 扩展入口文件 文件目录 crmeb\services\sms\Sms.php 默认驱动类型为&#xff1a;一号通 namespace crmeb\services\sms;use crmeb\basic\BaseManager; use crmeb\services\AccessTokenServeService; use crmeb\services\sms\…...

日常一水C

多态 言简意赅&#xff1a;就是一个对象面对同一事件时做出的不同反应 而之前的继承中说过&#xff0c;当子类和父类的函数名相同时&#xff0c;会隐藏父类的同名函数转而调用子类的同名函数&#xff0c;如果要调用父类的同名函数&#xff0c;那么就需要对父类进行引用&#…...

解析奥地利 XARION激光超声检测系统:无膜光学麦克风 + 无耦合剂的技术协同优势及多元应用

在工业制造领域&#xff0c;无损检测&#xff08;NDT)的精度与效率直接影响产品质量与生产安全。奥地利 XARION开发的激光超声精密检测系统&#xff0c;以非接触式光学麦克风技术为核心&#xff0c;打破传统检测瓶颈&#xff0c;为半导体、航空航天、汽车制造等行业提供了高灵敏…...

【UE5 C++】通过文件对话框获取选择文件的路径

目录 效果 步骤 源码 效果 步骤 1. 在“xxx.Build.cs”中添加需要使用的模块 &#xff0c;这里主要使用“DesktopPlatform”模块 2. 添加后闭UE编辑器&#xff0c;右键点击 .uproject 文件&#xff0c;选择 "Generate Visual Studio project files"&#xff0c;重…...

全面解析数据库:从基础概念到前沿应用​

在数字化时代&#xff0c;数据已成为企业和社会发展的核心资产&#xff0c;而数据库作为存储、管理和处理数据的关键工具&#xff0c;在各个领域发挥着举足轻重的作用。从电商平台的商品信息管理&#xff0c;到社交网络的用户数据存储&#xff0c;再到金融行业的交易记录处理&a…...