开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(六)
一、前言
使用 FastAPI 可以帮助我们更简单高效地部署 AI 交互业务。FastAPI 提供了快速构建 API 的能力,开发者可以轻松地定义模型需要的输入和输出格式,并编写好相应的业务逻辑。
FastAPI 的异步高性能架构,可以有效支持大量并发的预测请求,为用户提供流畅的交互体验。此外,FastAPI 还提供了容器化部署能力,开发者可以轻松打包 AI 模型为 Docker 镜像,实现跨环境的部署和扩展。
总之,使用 FastAPI 可以大大提高 AI 应用程序的开发效率和用户体验,为 AI 模型的部署和交互提供全方位的支持。
本篇在开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)基础上,学习如何集成Tool获取实时数据,并以流式方式返回
二、术语
2.1.Tool
Tool(工具)是为了增强其语言模型的功能和实用性而设计的一系列辅助手段,用于扩展模型的能力。例如代码解释器(Code Interpreter)和知识检索(Knowledge Retrieval)等都属于其工具。
2.2.langchain预置的tools
https://github.com/langchain-ai/langchain/tree/v0.1.16/docs/docs/integrations/tools
基本这些工具能满足大部分需求,具体使用参见:
2.3.LangChain支持流式输出的方法
stream
:基本的流式传输方式,能逐步给出代理的动作和观察结果。astream
:异步的流式传输,用于异步处理需求的情况。astream_events
:更细致的流式传输,能流式传输代理的每个具体事件,如工具调用和结束、模型启动和结束等,便于深入了解和监控代理执行的详细过程。
2.4.langchainhub
是 LangChain 相关工具的集合中心,其作用在于方便开发者发现和共享常用的提示(Prompt)、链、代理等。
它受 Hugging Face Hub 启发,促进社区交流与协作,推动 LangChain 生态发展。当前,它在新架构中被置于 LangSmith 里,主要聚焦于 Prompt。
2.5.asyncio
是一个用于编写并发代码的标准库,它提供了构建异步应用程序的基础框架。
三、前置条件
3.1. 创建虚拟环境&安装依赖
增加Google Search以及langchainhub的依赖包
conda create -n fastapi_test python=3.10
conda activate fastapi_test
pip install fastapi websockets uvicorn
pip install --quiet langchain-core langchain-community langchain-openai
pip install google-search-results langchainhub
3.2. 注册Google Search API账号
参见:开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)
3.3. 生成Google Search API的KEY
四、技术实现
4.1. 使用Tool&流式输出
# -*- coding: utf-8 -*-
import asyncio
import os
from langchain.agents import create_structured_chat_agent, AgentExecutor
from langchain_community.utilities.serpapi import SerpAPIWrapper
from langchain_core.prompts import SystemMessagePromptTemplate, HumanMessagePromptTemplate, ChatPromptTemplate
from langchain_core.tools import tool
from langchain_openai import ChatOpenAIos.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' # 你的Open AI Key
os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"llm = ChatOpenAI(model="gpt-3.5-turbo",temperature=0,max_tokens=512)@tool
def search(query:str):"""只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""serp = SerpAPIWrapper()result = serp.run(query)print("实时搜索结果:", result)return resulttools = [search]template='''
Respond to the human as helpfully and accurately as possible. You have access to the following tools:{tools}Use a json blob to specify a tool by providing an action key (tool name) and an action_input key (tool input).Valid "action" values: "Final Answer" or {tool_names}Provide only ONE action per $JSON_BLOB, as shown:```{{"action": $TOOL_NAME,"action_input": $INPUT}}```Follow this format:Question: input question to answerThought: consider previous and subsequent stepsAction:```$JSON_BLOB```Observation: action result... (repeat Thought/Action/Observation N times)Thought: I know what to respondAction:```{{"action": "Final Answer","action_input": "Final response to human"}}Begin! Reminder to ALWAYS respond with a valid json blob of a single action. Use tools if necessary. Respond directly if appropriate. Format is Action:```$JSON_BLOB```then Observation
'''
system_message_prompt = SystemMessagePromptTemplate.from_template(template)
human_template='''
{input}{agent_scratchpad}(reminder to respond in a JSON blob no matter what)
'''
human_message_prompt = HumanMessagePromptTemplate.from_template(human_template)
prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])print(prompt)agent = create_structured_chat_agent(llm, tools, prompt
)agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)async def chat(params):events = agent_executor.astream_events(params,version="v2")async for event in events:type = event['event']if 'on_chat_model_stream' == type:data = event['data']chunk = data['chunk']content = chunk.contentif content and len(content) > 0:print(content)asyncio.run(chat({"input": "广州现在天气如何?"}))
调用结果:
说明:
流式输出的数据结构为:
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='天', id='run-92515b63-4b86-4af8-8515-2f84def9dfab')}, 'run_id': '92515b63-4b86-4af8-8515-2f84def9dfab', 'name': 'ChatOpenAI', 'tags': ['seq:step:3'], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 512, 'ls_stop': ['\nObservation']}}
type: on_chat_model_stream
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='气', id='run-92515b63-4b86-4af8-8515-2f84def9dfab')}, 'run_id': '92515b63-4b86-4af8-8515-2f84def9dfab', 'name': 'ChatOpenAI', 'tags': ['seq:step:3'], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 512, 'ls_stop': ['\nObservation']}}
4.2. 通过langchainhub使用公共prompt
在4.1使用Tool&流式输出的代码基础上进行调整
# -*- coding: utf-8 -*-
import asyncio
import os
from langchain.agents import create_structured_chat_agent, AgentExecutor
from langchain_community.utilities.serpapi import SerpAPIWrapper
from langchain_core.tools import tool
from langchain_openai import ChatOpenAIos.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' # 你的Open AI Key
os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"from langchain import hubllm = ChatOpenAI(model="gpt-3.5-turbo",temperature=0,max_tokens=512)@tool
def search(query:str):"""只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""serp = SerpAPIWrapper()result = serp.run(query)print("实时搜索结果:", result)return resulttools = [search]prompt = hub.pull("hwchase17/structured-chat-agent")print(prompt)agent = create_structured_chat_agent(llm, tools, prompt
)agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)async def chat(params):events = agent_executor.astream_events(params,version="v2")async for event in events:type = event['event']if 'on_chat_model_stream' == type:data = event['data']chunk = data['chunk']content = chunk.contentif content and len(content) > 0:print(content)asyncio.run(chat({"input": "广州现在天气如何?"}))
调用结果:
4.3. 整合代码
在开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)的代码基础上进行调整
import uvicorn
import osfrom typing import Annotated
from fastapi import (Depends,FastAPI,WebSocket,WebSocketException,WebSocketDisconnect,status,
)
from langchain import hub
from langchain.agents import create_structured_chat_agent, AgentExecutor
from langchain_community.utilities import SerpAPIWrapperfrom langchain_core.tools import tool
from langchain_openai import ChatOpenAIos.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' # 你的Open AI Key
os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"class ConnectionManager:def __init__(self):self.active_connections: list[WebSocket] = []async def connect(self, websocket: WebSocket):await websocket.accept()self.active_connections.append(websocket)def disconnect(self, websocket: WebSocket):self.active_connections.remove(websocket)async def send_personal_message(self, message: str, websocket: WebSocket):await websocket.send_text(message)async def broadcast(self, message: str):for connection in self.active_connections:await connection.send_text(message)manager = ConnectionManager()app = FastAPI()async def authenticate(websocket: WebSocket,userid: str,secret: str,
):if userid is None or secret is None:raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)print(f'userid: {userid},secret: {secret}')if '12345' == userid and 'xxxxxxxxxxxxxxxxxxxxxxxxxx' == secret:return 'pass'else:return 'fail'@tool
def search(query:str):"""只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""serp = SerpAPIWrapper()result = serp.run(query)print("实时搜索结果:", result)return resultdef get_prompt():prompt = hub.pull("hwchase17/structured-chat-agent")return promptasync def chat(query):global llm,toolsagent = create_structured_chat_agent(llm, tools, get_prompt())agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)events = agent_executor.astream_events({"input": query}, version="v1")async for event in events:type = event['event']if 'on_chat_model_stream' == type:data = event['data']chunk = data['chunk']content = chunk.contentif content and len(content) > 0:print(content)yield content@app.websocket("/ws")
async def websocket_endpoint(*,websocket: WebSocket,userid: str,permission: Annotated[str, Depends(authenticate)],):await manager.connect(websocket)try:while True:text = await websocket.receive_text()if 'fail' == permission:await manager.send_personal_message(f"authentication failed", websocket)else:if text is not None and len(text) > 0:async for msg in chat(text):await manager.send_personal_message(msg, websocket)except WebSocketDisconnect:manager.disconnect(websocket)print(f"Client #{userid} left the chat")await manager.broadcast(f"Client #{userid} left the chat")if __name__ == '__main__':tools = [search]llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, max_tokens=512)uvicorn.run(app, host='0.0.0.0',port=7777)
客户端:
<!DOCTYPE html>
<html><head><title>Chat</title></head><body><h1>WebSocket Chat</h1><form action="" onsubmit="sendMessage(event)"><label>USERID: <input type="text" id="userid" autocomplete="off" value="12345"/></label><label>SECRET: <input type="text" id="secret" autocomplete="off" value="xxxxxxxxxxxxxxxxxxxxxxxxxx"/></label><br/><button onclick="connect(event)">Connect</button><hr><label>Message: <input type="text" id="messageText" autocomplete="off"/></label><button>Send</button></form><ul id='messages'></ul><script>var ws = null;function connect(event) {var userid = document.getElementById("userid")var secret = document.getElementById("secret")ws = new WebSocket("ws://localhost:7777/ws?userid="+userid.value+"&secret=" + secret.value);ws.onmessage = function(event) {var messages = document.getElementById('messages')var message = document.createElement('li')var content = document.createTextNode(event.data)message.appendChild(content)messages.appendChild(message)};event.preventDefault()}function sendMessage(event) {var input = document.getElementById("messageText")ws.send(input.value)input.value = ''event.preventDefault()}</script></body>
</html>
调用结果:
用户输入:你好
不需要触发工具调用
模型输出:
用户输入:广州现在天气如何?
需要调用工具
模型输出:
```
Action:
```
{"action": "Final Answer","action_input": "广州现在的天气是多云,温度为87华氏度,降水概率为7%,湿度为76%,风力为7英里/小时。"
}
```
PS:
1. 上面仅用于演示流式输出的效果,里面包含一些冗余的信息,例如:"action": "Final Answer",要根据实际情况过滤。
2. 页面输出的样式可以根据实际需要进行调整,此处仅用于演示效果。
相关文章:

开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(六)
一、前言 使用 FastAPI 可以帮助我们更简单高效地部署 AI 交互业务。FastAPI 提供了快速构建 API 的能力,开发者可以轻松地定义模型需要的输入和输出格式,并编写好相应的业务逻辑。 FastAPI 的异步高性能架构,可以有效支持大量并发的预测请求,为用户提供流畅的交互体验。此外,F…...

独立开发者系列(17)——MYSQL的常见异常整理
虽然安装MYSQL到本地很简单,但是数据库报错还是经常出现,这个时候,需要我们进行逐步检查与修复。作为我们最常用的开发软件,无论切换php/go/python/node/java,数据库的身影都少不了,对于我们储存数据而言&a…...

【ajax实战02】数据管理网站—验证码登录
一:数据提交(提交手机验证码) 核心思路整理 利用form-serialize插件,收集对象形式的表单数据后,一并提交给服务器。后得到返回值,进一步操作 基地址: axios.defaults.baseURL http://geek.…...
人工智能在反无人机中的应用介绍
人工智能技术在无人机的发展中扮演着至关重要的角色,这一作用在反无人机技术领域同样显著。随着无人机技术的发展,飞行器具备了微小尺寸、高速机动性,以及可能采用的隐蔽或低空飞行轨迹等特性。这些特性使得传统的人工监视和控制手段面临着重…...

【力扣 - 每日一题】3115. 质数的最大距离(一次遍历、头尾遍历、空间换时间、埃式筛、欧拉筛、打表)Golang实现
原题链接 题目描述 给你一个整数数组 nums。 返回两个(不一定不同的)质数在 nums 中 下标 的 最大距离。 示例 1: 输入: nums [4,2,9,5,3] 输出: 3 解释: nums[1]、nums[3] 和 nums[4] 是质数。因此答…...

【Gin】项目搭建 一
环境准备 首先确保自己电脑安装了Golang 开始项目 1、初始化项目 mkdir gin-hello; # 创建文件夹 cd gin-hello; # 需要到刚创建的文件夹里操作 go mod init goserver; # 初始化项目,项目名称:goserver go get -u github.com/gin-gonic/gin; # 下载…...

C++ 和C#的差别
首先把眼睛瞪大,然后憋住一口气,读下去: 1、CPP 就是C plus plus的缩写,中国大陆的程序员圈子中通常被读做"C加加",而西方的程序员通常读做"C plus plus",它是一种使用非常广泛的计算…...

Vue2组件传值(通信)的方式
目录 1.父传后代 ( 后代拿到了父的数据 )1. 父组件引入子组件,绑定数据2. 子组件直接使用父组件的数据3. 依赖注入(使用 provide/inject API)1.在祖先组件中使用 provide2.在后代组件中使用 inject 2.后代传父 (父拿到了后代的数据)1. 子组件…...
【数据结构 - 时间复杂度和空间复杂度】
文章目录 <center>时间复杂度和空间复杂度算法的复杂度时间复杂度大O的渐进表示法常见时间复杂度计算举例 空间复杂度实例 时间复杂度和空间复杂度 算法的复杂度 算法在编写成可执行程序后,运行时需要耗费时间资源和空间(内存)资源 。因此衡量一个算法的好坏&…...
telegram支付
今天开始接入telegram支付,参考教程这个是telegram的官方说明,详细介绍了机器人支付API。 文章公开地址 新建机器人 因为支付是一个单独的系统,所以在做支付的时候单独创建了一个bot,没有用之前的bot了,特意这样将其分开。创建bot的方法和之前不变,这里不过多介绍。 获…...
elasticsearch-6.8.23的集群搭建过程
三个节点的 ElasticSearch 集群搭建步骤 准备三台机器:28.104.87.98、28.104.87.100、28.104.87.101 和 ElasticSearch 的安装包 elasticsearch-6.8.23.tar.gz ----------------------------- 28.104.87.98,使用 root 用户操作 ----------------------…...
javascript输出语法
javascript输出有三种方式 一种是弹窗输出,就是网页弹出一个对话框,弹出输出内容 语法是aler(内容) 示例代码如下 <body> <script> alert(你好); </script> </body> 这段代码运行后网页会出现一个对话框,弹出你…...

仓库管理系统26--权限设置
原创不易,打字不易,截图不易,多多点赞,送人玫瑰,留有余香,财务自由明日实现 1、权限概述 在应用软件中,通常将软件的功能分为若干个子程序,通过主程序调用。那么,通过…...

d3dx9_43.dll丢失怎么解决?d3dx9_43.dll怎么安装详细教程
在使用计算机中,如果遇到d3dx9_43.dll丢失或许找不到d3dx9_43.dll无法运行打开软件怎么办?这个是非常常见问题,下面我详细介绍一下d3dx9_43.dll是什么文件与d3dx9_43.dll的各种问题以及d3dx9_43.dll丢失的多个解决方法! 一、d3dx9…...
[C++] 退出清理函数解读(exit、_exit、abort、atexit)
说明:在C中,exit、_exit(或_Exit)、abort和atexit是用于控制程序退出和清理的标准库函数。下面是对这些函数的详细解读: exit 函数原型:void exit(int status);作用:exit函数用于正常退出程序…...
代码随想录(回溯)
组合(Leetcode77) 思路 用递归每次遍历从1-n得数,然后list来记录是不是组合到k个了,然后这个每次for循环的开始不能和上一个值的开始重复,所以设置个遍历开始索引startindex class Solution {static List<List<…...

编译原理1
NFA&DFA 在正规式的等价证明可以借助正规集,也可以通过有限自动机DFA来证明等价,以下例题是针对DFA证明正规式的等价,主要步骤是①NFA;②状态转换表; ③状态转换矩阵; ④化简DFA; 文法和语…...
【信息系统项目管理师知识点速记】组织通用管理:流程管理
23.2 流程管理 通过流程视角能够真正看清楚组织系统的本质与内在联系,理顺流程能够理顺整个组织系统。流程是组织运行体系的框架基础,流程框架的质量影响和决定了整个组织运行体系的质量。把流程作为组织运行体系的主线,配备满足流程运作需要的资源,并构建与流程框架相匹配…...
前端 JS 经典:箭头函数的意义
箭头函数是为了消除函数的二义性。 1. 二义性 函数的二义性指函数有不同的两种用法,就造成了二义性,函数的两种用法:1. 指令序列。2. 构造器 1.1 指令序列 就是调用函数,相当于将函数内部的代码再从头执行一次。 1.2 构造器 …...
Java List操作详解及常用方法
Java List操作详解及常用方法 大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿! 什么是Java List? Java中的List是一种动态数组,它允许存…...
在软件开发中正确使用MySQL日期时间类型的深度解析
在日常软件开发场景中,时间信息的存储是底层且核心的需求。从金融交易的精确记账时间、用户操作的行为日志,到供应链系统的物流节点时间戳,时间数据的准确性直接决定业务逻辑的可靠性。MySQL作为主流关系型数据库,其日期时间类型的…...
ubuntu搭建nfs服务centos挂载访问
在Ubuntu上设置NFS服务器 在Ubuntu上,你可以使用apt包管理器来安装NFS服务器。打开终端并运行: sudo apt update sudo apt install nfs-kernel-server创建共享目录 创建一个目录用于共享,例如/shared: sudo mkdir /shared sud…...
MySQL 隔离级别:脏读、幻读及不可重复读的原理与示例
一、MySQL 隔离级别 MySQL 提供了四种隔离级别,用于控制事务之间的并发访问以及数据的可见性,不同隔离级别对脏读、幻读、不可重复读这几种并发数据问题有着不同的处理方式,具体如下: 隔离级别脏读不可重复读幻读性能特点及锁机制读未提交(READ UNCOMMITTED)允许出现允许…...

linux arm系统烧录
1、打开瑞芯微程序 2、按住linux arm 的 recover按键 插入电源 3、当瑞芯微检测到有设备 4、松开recover按键 5、选择升级固件 6、点击固件选择本地刷机的linux arm 镜像 7、点击升级 (忘了有没有这步了 估计有) 刷机程序 和 镜像 就不提供了。要刷的时…...

微信小程序 - 手机震动
一、界面 <button type"primary" bindtap"shortVibrate">短震动</button> <button type"primary" bindtap"longVibrate">长震动</button> 二、js逻辑代码 注:文档 https://developers.weixin.qq…...

屋顶变身“发电站” ,中天合创屋面分布式光伏发电项目顺利并网!
5月28日,中天合创屋面分布式光伏发电项目顺利并网发电,该项目位于内蒙古自治区鄂尔多斯市乌审旗,项目利用中天合创聚乙烯、聚丙烯仓库屋面作为场地建设光伏电站,总装机容量为9.96MWp。 项目投运后,每年可节约标煤3670…...

Mac软件卸载指南,简单易懂!
刚和Adobe分手,它却总在Library里给你写"回忆录"?卸载的Final Cut Pro像电子幽灵般阴魂不散?总是会有残留文件,别慌!这份Mac软件卸载指南,将用最硬核的方式教你"数字分手术"࿰…...

Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)
目录 一、👋🏻前言 二、😈sinx波动的基本原理 三、😈波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、🌊波动优化…...

AI,如何重构理解、匹配与决策?
AI 时代,我们如何理解消费? 作者|王彬 封面|Unplash 人们通过信息理解世界。 曾几何时,PC 与移动互联网重塑了人们的购物路径:信息变得唾手可得,商品决策变得高度依赖内容。 但 AI 时代的来…...

【Redis】笔记|第8节|大厂高并发缓存架构实战与优化
缓存架构 代码结构 代码详情 功能点: 多级缓存,先查本地缓存,再查Redis,最后才查数据库热点数据重建逻辑使用分布式锁,二次查询更新缓存采用读写锁提升性能采用Redis的发布订阅机制通知所有实例更新本地缓存适用读多…...