开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(六)
一、前言
使用 FastAPI 可以帮助我们更简单高效地部署 AI 交互业务。FastAPI 提供了快速构建 API 的能力,开发者可以轻松地定义模型需要的输入和输出格式,并编写好相应的业务逻辑。
FastAPI 的异步高性能架构,可以有效支持大量并发的预测请求,为用户提供流畅的交互体验。此外,FastAPI 还提供了容器化部署能力,开发者可以轻松打包 AI 模型为 Docker 镜像,实现跨环境的部署和扩展。
总之,使用 FastAPI 可以大大提高 AI 应用程序的开发效率和用户体验,为 AI 模型的部署和交互提供全方位的支持。
本篇在开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)基础上,学习如何集成Tool获取实时数据,并以流式方式返回
二、术语
2.1.Tool
Tool(工具)是为了增强其语言模型的功能和实用性而设计的一系列辅助手段,用于扩展模型的能力。例如代码解释器(Code Interpreter)和知识检索(Knowledge Retrieval)等都属于其工具。
2.2.langchain预置的tools
https://github.com/langchain-ai/langchain/tree/v0.1.16/docs/docs/integrations/tools
基本这些工具能满足大部分需求,具体使用参见:
2.3.LangChain支持流式输出的方法
stream
:基本的流式传输方式,能逐步给出代理的动作和观察结果。astream
:异步的流式传输,用于异步处理需求的情况。astream_events
:更细致的流式传输,能流式传输代理的每个具体事件,如工具调用和结束、模型启动和结束等,便于深入了解和监控代理执行的详细过程。
2.4.langchainhub
是 LangChain 相关工具的集合中心,其作用在于方便开发者发现和共享常用的提示(Prompt)、链、代理等。
它受 Hugging Face Hub 启发,促进社区交流与协作,推动 LangChain 生态发展。当前,它在新架构中被置于 LangSmith 里,主要聚焦于 Prompt。
2.5.asyncio
是一个用于编写并发代码的标准库,它提供了构建异步应用程序的基础框架。
三、前置条件
3.1. 创建虚拟环境&安装依赖
增加Google Search以及langchainhub的依赖包
conda create -n fastapi_test python=3.10
conda activate fastapi_test
pip install fastapi websockets uvicorn
pip install --quiet langchain-core langchain-community langchain-openai
pip install google-search-results langchainhub
3.2. 注册Google Search API账号
参见:开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)
3.3. 生成Google Search API的KEY
四、技术实现
4.1. 使用Tool&流式输出
# -*- coding: utf-8 -*-
import asyncio
import os
from langchain.agents import create_structured_chat_agent, AgentExecutor
from langchain_community.utilities.serpapi import SerpAPIWrapper
from langchain_core.prompts import SystemMessagePromptTemplate, HumanMessagePromptTemplate, ChatPromptTemplate
from langchain_core.tools import tool
from langchain_openai import ChatOpenAIos.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' # 你的Open AI Key
os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"llm = ChatOpenAI(model="gpt-3.5-turbo",temperature=0,max_tokens=512)@tool
def search(query:str):"""只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""serp = SerpAPIWrapper()result = serp.run(query)print("实时搜索结果:", result)return resulttools = [search]template='''
Respond to the human as helpfully and accurately as possible. You have access to the following tools:{tools}Use a json blob to specify a tool by providing an action key (tool name) and an action_input key (tool input).Valid "action" values: "Final Answer" or {tool_names}Provide only ONE action per $JSON_BLOB, as shown:```{{"action": $TOOL_NAME,"action_input": $INPUT}}```Follow this format:Question: input question to answerThought: consider previous and subsequent stepsAction:```$JSON_BLOB```Observation: action result... (repeat Thought/Action/Observation N times)Thought: I know what to respondAction:```{{"action": "Final Answer","action_input": "Final response to human"}}Begin! Reminder to ALWAYS respond with a valid json blob of a single action. Use tools if necessary. Respond directly if appropriate. Format is Action:```$JSON_BLOB```then Observation
'''
system_message_prompt = SystemMessagePromptTemplate.from_template(template)
human_template='''
{input}{agent_scratchpad}(reminder to respond in a JSON blob no matter what)
'''
human_message_prompt = HumanMessagePromptTemplate.from_template(human_template)
prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])print(prompt)agent = create_structured_chat_agent(llm, tools, prompt
)agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)async def chat(params):events = agent_executor.astream_events(params,version="v2")async for event in events:type = event['event']if 'on_chat_model_stream' == type:data = event['data']chunk = data['chunk']content = chunk.contentif content and len(content) > 0:print(content)asyncio.run(chat({"input": "广州现在天气如何?"}))
调用结果:
说明:
流式输出的数据结构为:
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='天', id='run-92515b63-4b86-4af8-8515-2f84def9dfab')}, 'run_id': '92515b63-4b86-4af8-8515-2f84def9dfab', 'name': 'ChatOpenAI', 'tags': ['seq:step:3'], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 512, 'ls_stop': ['\nObservation']}}
type: on_chat_model_stream
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='气', id='run-92515b63-4b86-4af8-8515-2f84def9dfab')}, 'run_id': '92515b63-4b86-4af8-8515-2f84def9dfab', 'name': 'ChatOpenAI', 'tags': ['seq:step:3'], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 512, 'ls_stop': ['\nObservation']}}
4.2. 通过langchainhub使用公共prompt
在4.1使用Tool&流式输出的代码基础上进行调整
# -*- coding: utf-8 -*-
import asyncio
import os
from langchain.agents import create_structured_chat_agent, AgentExecutor
from langchain_community.utilities.serpapi import SerpAPIWrapper
from langchain_core.tools import tool
from langchain_openai import ChatOpenAIos.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' # 你的Open AI Key
os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"from langchain import hubllm = ChatOpenAI(model="gpt-3.5-turbo",temperature=0,max_tokens=512)@tool
def search(query:str):"""只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""serp = SerpAPIWrapper()result = serp.run(query)print("实时搜索结果:", result)return resulttools = [search]prompt = hub.pull("hwchase17/structured-chat-agent")print(prompt)agent = create_structured_chat_agent(llm, tools, prompt
)agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)async def chat(params):events = agent_executor.astream_events(params,version="v2")async for event in events:type = event['event']if 'on_chat_model_stream' == type:data = event['data']chunk = data['chunk']content = chunk.contentif content and len(content) > 0:print(content)asyncio.run(chat({"input": "广州现在天气如何?"}))
调用结果:
4.3. 整合代码
在开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)的代码基础上进行调整
import uvicorn
import osfrom typing import Annotated
from fastapi import (Depends,FastAPI,WebSocket,WebSocketException,WebSocketDisconnect,status,
)
from langchain import hub
from langchain.agents import create_structured_chat_agent, AgentExecutor
from langchain_community.utilities import SerpAPIWrapperfrom langchain_core.tools import tool
from langchain_openai import ChatOpenAIos.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' # 你的Open AI Key
os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"class ConnectionManager:def __init__(self):self.active_connections: list[WebSocket] = []async def connect(self, websocket: WebSocket):await websocket.accept()self.active_connections.append(websocket)def disconnect(self, websocket: WebSocket):self.active_connections.remove(websocket)async def send_personal_message(self, message: str, websocket: WebSocket):await websocket.send_text(message)async def broadcast(self, message: str):for connection in self.active_connections:await connection.send_text(message)manager = ConnectionManager()app = FastAPI()async def authenticate(websocket: WebSocket,userid: str,secret: str,
):if userid is None or secret is None:raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)print(f'userid: {userid},secret: {secret}')if '12345' == userid and 'xxxxxxxxxxxxxxxxxxxxxxxxxx' == secret:return 'pass'else:return 'fail'@tool
def search(query:str):"""只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""serp = SerpAPIWrapper()result = serp.run(query)print("实时搜索结果:", result)return resultdef get_prompt():prompt = hub.pull("hwchase17/structured-chat-agent")return promptasync def chat(query):global llm,toolsagent = create_structured_chat_agent(llm, tools, get_prompt())agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)events = agent_executor.astream_events({"input": query}, version="v1")async for event in events:type = event['event']if 'on_chat_model_stream' == type:data = event['data']chunk = data['chunk']content = chunk.contentif content and len(content) > 0:print(content)yield content@app.websocket("/ws")
async def websocket_endpoint(*,websocket: WebSocket,userid: str,permission: Annotated[str, Depends(authenticate)],):await manager.connect(websocket)try:while True:text = await websocket.receive_text()if 'fail' == permission:await manager.send_personal_message(f"authentication failed", websocket)else:if text is not None and len(text) > 0:async for msg in chat(text):await manager.send_personal_message(msg, websocket)except WebSocketDisconnect:manager.disconnect(websocket)print(f"Client #{userid} left the chat")await manager.broadcast(f"Client #{userid} left the chat")if __name__ == '__main__':tools = [search]llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, max_tokens=512)uvicorn.run(app, host='0.0.0.0',port=7777)
客户端:
<!DOCTYPE html>
<html><head><title>Chat</title></head><body><h1>WebSocket Chat</h1><form action="" onsubmit="sendMessage(event)"><label>USERID: <input type="text" id="userid" autocomplete="off" value="12345"/></label><label>SECRET: <input type="text" id="secret" autocomplete="off" value="xxxxxxxxxxxxxxxxxxxxxxxxxx"/></label><br/><button onclick="connect(event)">Connect</button><hr><label>Message: <input type="text" id="messageText" autocomplete="off"/></label><button>Send</button></form><ul id='messages'></ul><script>var ws = null;function connect(event) {var userid = document.getElementById("userid")var secret = document.getElementById("secret")ws = new WebSocket("ws://localhost:7777/ws?userid="+userid.value+"&secret=" + secret.value);ws.onmessage = function(event) {var messages = document.getElementById('messages')var message = document.createElement('li')var content = document.createTextNode(event.data)message.appendChild(content)messages.appendChild(message)};event.preventDefault()}function sendMessage(event) {var input = document.getElementById("messageText")ws.send(input.value)input.value = ''event.preventDefault()}</script></body>
</html>
调用结果:
用户输入:你好
不需要触发工具调用
模型输出:
用户输入:广州现在天气如何?
需要调用工具
模型输出:
```
Action:
```
{"action": "Final Answer","action_input": "广州现在的天气是多云,温度为87华氏度,降水概率为7%,湿度为76%,风力为7英里/小时。"
}
```
PS:
1. 上面仅用于演示流式输出的效果,里面包含一些冗余的信息,例如:"action": "Final Answer",要根据实际情况过滤。
2. 页面输出的样式可以根据实际需要进行调整,此处仅用于演示效果。
相关文章:

开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(六)
一、前言 使用 FastAPI 可以帮助我们更简单高效地部署 AI 交互业务。FastAPI 提供了快速构建 API 的能力,开发者可以轻松地定义模型需要的输入和输出格式,并编写好相应的业务逻辑。 FastAPI 的异步高性能架构,可以有效支持大量并发的预测请求,为用户提供流畅的交互体验。此外,F…...

独立开发者系列(17)——MYSQL的常见异常整理
虽然安装MYSQL到本地很简单,但是数据库报错还是经常出现,这个时候,需要我们进行逐步检查与修复。作为我们最常用的开发软件,无论切换php/go/python/node/java,数据库的身影都少不了,对于我们储存数据而言&a…...

【ajax实战02】数据管理网站—验证码登录
一:数据提交(提交手机验证码) 核心思路整理 利用form-serialize插件,收集对象形式的表单数据后,一并提交给服务器。后得到返回值,进一步操作 基地址: axios.defaults.baseURL http://geek.…...
人工智能在反无人机中的应用介绍
人工智能技术在无人机的发展中扮演着至关重要的角色,这一作用在反无人机技术领域同样显著。随着无人机技术的发展,飞行器具备了微小尺寸、高速机动性,以及可能采用的隐蔽或低空飞行轨迹等特性。这些特性使得传统的人工监视和控制手段面临着重…...

【力扣 - 每日一题】3115. 质数的最大距离(一次遍历、头尾遍历、空间换时间、埃式筛、欧拉筛、打表)Golang实现
原题链接 题目描述 给你一个整数数组 nums。 返回两个(不一定不同的)质数在 nums 中 下标 的 最大距离。 示例 1: 输入: nums [4,2,9,5,3] 输出: 3 解释: nums[1]、nums[3] 和 nums[4] 是质数。因此答…...

【Gin】项目搭建 一
环境准备 首先确保自己电脑安装了Golang 开始项目 1、初始化项目 mkdir gin-hello; # 创建文件夹 cd gin-hello; # 需要到刚创建的文件夹里操作 go mod init goserver; # 初始化项目,项目名称:goserver go get -u github.com/gin-gonic/gin; # 下载…...

C++ 和C#的差别
首先把眼睛瞪大,然后憋住一口气,读下去: 1、CPP 就是C plus plus的缩写,中国大陆的程序员圈子中通常被读做"C加加",而西方的程序员通常读做"C plus plus",它是一种使用非常广泛的计算…...

Vue2组件传值(通信)的方式
目录 1.父传后代 ( 后代拿到了父的数据 )1. 父组件引入子组件,绑定数据2. 子组件直接使用父组件的数据3. 依赖注入(使用 provide/inject API)1.在祖先组件中使用 provide2.在后代组件中使用 inject 2.后代传父 (父拿到了后代的数据)1. 子组件…...
【数据结构 - 时间复杂度和空间复杂度】
文章目录 <center>时间复杂度和空间复杂度算法的复杂度时间复杂度大O的渐进表示法常见时间复杂度计算举例 空间复杂度实例 时间复杂度和空间复杂度 算法的复杂度 算法在编写成可执行程序后,运行时需要耗费时间资源和空间(内存)资源 。因此衡量一个算法的好坏&…...
telegram支付
今天开始接入telegram支付,参考教程这个是telegram的官方说明,详细介绍了机器人支付API。 文章公开地址 新建机器人 因为支付是一个单独的系统,所以在做支付的时候单独创建了一个bot,没有用之前的bot了,特意这样将其分开。创建bot的方法和之前不变,这里不过多介绍。 获…...
elasticsearch-6.8.23的集群搭建过程
三个节点的 ElasticSearch 集群搭建步骤 准备三台机器:28.104.87.98、28.104.87.100、28.104.87.101 和 ElasticSearch 的安装包 elasticsearch-6.8.23.tar.gz ----------------------------- 28.104.87.98,使用 root 用户操作 ----------------------…...
javascript输出语法
javascript输出有三种方式 一种是弹窗输出,就是网页弹出一个对话框,弹出输出内容 语法是aler(内容) 示例代码如下 <body> <script> alert(你好); </script> </body> 这段代码运行后网页会出现一个对话框,弹出你…...

仓库管理系统26--权限设置
原创不易,打字不易,截图不易,多多点赞,送人玫瑰,留有余香,财务自由明日实现 1、权限概述 在应用软件中,通常将软件的功能分为若干个子程序,通过主程序调用。那么,通过…...

d3dx9_43.dll丢失怎么解决?d3dx9_43.dll怎么安装详细教程
在使用计算机中,如果遇到d3dx9_43.dll丢失或许找不到d3dx9_43.dll无法运行打开软件怎么办?这个是非常常见问题,下面我详细介绍一下d3dx9_43.dll是什么文件与d3dx9_43.dll的各种问题以及d3dx9_43.dll丢失的多个解决方法! 一、d3dx9…...
[C++] 退出清理函数解读(exit、_exit、abort、atexit)
说明:在C中,exit、_exit(或_Exit)、abort和atexit是用于控制程序退出和清理的标准库函数。下面是对这些函数的详细解读: exit 函数原型:void exit(int status);作用:exit函数用于正常退出程序…...
代码随想录(回溯)
组合(Leetcode77) 思路 用递归每次遍历从1-n得数,然后list来记录是不是组合到k个了,然后这个每次for循环的开始不能和上一个值的开始重复,所以设置个遍历开始索引startindex class Solution {static List<List<…...

编译原理1
NFA&DFA 在正规式的等价证明可以借助正规集,也可以通过有限自动机DFA来证明等价,以下例题是针对DFA证明正规式的等价,主要步骤是①NFA;②状态转换表; ③状态转换矩阵; ④化简DFA; 文法和语…...
【信息系统项目管理师知识点速记】组织通用管理:流程管理
23.2 流程管理 通过流程视角能够真正看清楚组织系统的本质与内在联系,理顺流程能够理顺整个组织系统。流程是组织运行体系的框架基础,流程框架的质量影响和决定了整个组织运行体系的质量。把流程作为组织运行体系的主线,配备满足流程运作需要的资源,并构建与流程框架相匹配…...
前端 JS 经典:箭头函数的意义
箭头函数是为了消除函数的二义性。 1. 二义性 函数的二义性指函数有不同的两种用法,就造成了二义性,函数的两种用法:1. 指令序列。2. 构造器 1.1 指令序列 就是调用函数,相当于将函数内部的代码再从头执行一次。 1.2 构造器 …...
Java List操作详解及常用方法
Java List操作详解及常用方法 大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿! 什么是Java List? Java中的List是一种动态数组,它允许存…...
ES6从入门到精通:前言
ES6简介 ES6(ECMAScript 2015)是JavaScript语言的重大更新,引入了许多新特性,包括语法糖、新数据类型、模块化支持等,显著提升了开发效率和代码可维护性。 核心知识点概览 变量声明 let 和 const 取代 var…...

大话软工笔记—需求分析概述
需求分析,就是要对需求调研收集到的资料信息逐个地进行拆分、研究,从大量的不确定“需求”中确定出哪些需求最终要转换为确定的“功能需求”。 需求分析的作用非常重要,后续设计的依据主要来自于需求分析的成果,包括: 项目的目的…...

阿里云ACP云计算备考笔记 (5)——弹性伸缩
目录 第一章 概述 第二章 弹性伸缩简介 1、弹性伸缩 2、垂直伸缩 3、优势 4、应用场景 ① 无规律的业务量波动 ② 有规律的业务量波动 ③ 无明显业务量波动 ④ 混合型业务 ⑤ 消息通知 ⑥ 生命周期挂钩 ⑦ 自定义方式 ⑧ 滚的升级 5、使用限制 第三章 主要定义 …...
【SpringBoot】100、SpringBoot中使用自定义注解+AOP实现参数自动解密
在实际项目中,用户注册、登录、修改密码等操作,都涉及到参数传输安全问题。所以我们需要在前端对账户、密码等敏感信息加密传输,在后端接收到数据后能自动解密。 1、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId...
WEB3全栈开发——面试专业技能点P2智能合约开发(Solidity)
一、Solidity合约开发 下面是 Solidity 合约开发 的概念、代码示例及讲解,适合用作学习或写简历项目背景说明。 🧠 一、概念简介:Solidity 合约开发 Solidity 是一种专门为 以太坊(Ethereum)平台编写智能合约的高级编…...
3403. 从盒子中找出字典序最大的字符串 I
3403. 从盒子中找出字典序最大的字符串 I 题目链接:3403. 从盒子中找出字典序最大的字符串 I 代码如下: class Solution { public:string answerString(string word, int numFriends) {if (numFriends 1) {return word;}string res;for (int i 0;i &…...

成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战
在现代战争中,电磁频谱已成为继陆、海、空、天之后的 “第五维战场”,雷达作为电磁频谱领域的关键装备,其干扰与抗干扰能力的较量,直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器,凭借数字射…...

自然语言处理——循环神经网络
自然语言处理——循环神经网络 循环神经网络应用到基于机器学习的自然语言处理任务序列到类别同步的序列到序列模式异步的序列到序列模式 参数学习和长程依赖问题基于门控的循环神经网络门控循环单元(GRU)长短期记忆神经网络(LSTM)…...
根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:
根据万维钢精英日课6的内容,使用AI(2025)可以参考以下方法: 四个洞见 模型已经比人聪明:以ChatGPT o3为代表的AI非常强大,能运用高级理论解释道理、引用最新学术论文,生成对顶尖科学家都有用的…...
DeepSeek 技术赋能无人农场协同作业:用 AI 重构农田管理 “神经网”
目录 一、引言二、DeepSeek 技术大揭秘2.1 核心架构解析2.2 关键技术剖析 三、智能农业无人农场协同作业现状3.1 发展现状概述3.2 协同作业模式介绍 四、DeepSeek 的 “农场奇妙游”4.1 数据处理与分析4.2 作物生长监测与预测4.3 病虫害防治4.4 农机协同作业调度 五、实际案例大…...