吴恩达MCP课程(5):mcp_chatbot_prompt_resource.py
前提条件:
1、吴恩达MCP课程(5):research_server_prompt_resource.py
2、server_config_prompt_resource.json文件
{"mcpServers": {"filesystem": {"command": "npx","args": ["--y","@modelcontextprotocol/server-filesystem","."]},"research": {"command": "uv","args": ["run", "research_server_prompt_resource.py"]},"fetch": {"command": "uvx","args": ["mcp-server-fetch"]}}
}
代码
原课程用的anthropic的,下面改成openai,并用千问模型做测试
from dotenv import load_dotenv
import openai
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from contextlib import AsyncExitStack
import json
import asyncio
import osload_dotenv()class MCP_ChatBot:def __init__(self):self.exit_stack = AsyncExitStack()self.client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"),base_url=os.getenv("OPENAI_API_BASE"))# Tools list required for OpenAI APIself.available_tools = []# Prompts list for quick display self.available_prompts = []# Sessions dict maps tool/prompt names or resource URIs to MCP client sessionsself.sessions = {}async def connect_to_server(self, server_name, server_config):try:server_params = StdioServerParameters(**server_config)stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))read, write = stdio_transportsession = await self.exit_stack.enter_async_context(ClientSession(read, write))await session.initialize()try:# List available toolsresponse = await session.list_tools()for tool in response.tools:self.sessions[tool.name] = sessionself.available_tools.append({"type": "function","function": {"name": tool.name,"description": tool.description,"parameters": tool.inputSchema}})# List available promptsprompts_response = await session.list_prompts()if prompts_response and prompts_response.prompts:for prompt in prompts_response.prompts:self.sessions[prompt.name] = sessionself.available_prompts.append({"name": prompt.name,"description": prompt.description,"arguments": prompt.arguments})# List available resourcesresources_response = await session.list_resources()if resources_response and resources_response.resources:for resource in resources_response.resources:resource_uri = str(resource.uri)self.sessions[resource_uri] = sessionexcept Exception as e:print(f"Error {e}")except Exception as e:print(f"Error connecting to {server_name}: {e}")async def connect_to_servers(self):try:with open("server_config_prompt_resource.json", "r") as file:data = json.load(file)servers = data.get("mcpServers", {})for server_name, server_config in servers.items():await self.connect_to_server(server_name, server_config)except Exception as e:print(f"Error loading server config: {e}")raiseasync def process_query(self, query):messages = [{"role":"user", "content":query}]while True:response = self.client.chat.completions.create(model="qwen-turbo",tools=self.available_tools,messages=messages)message = response.choices[0].message# 检查是否有普通文本内容if message.content:print(message.content)messages.append({"role": "assistant", "content": message.content})break# 检查是否有工具调用elif message.tool_calls:# 添加助手消息到历史messages.append({"role": "assistant", "content": None,"tool_calls": message.tool_calls})# 处理每个工具调用for tool_call in message.tool_calls:tool_id = tool_call.idtool_name = tool_call.function.nametool_args = json.loads(tool_call.function.arguments)print(f"Calling tool {tool_name} with args {tool_args}")# 获取session并调用工具session = self.sessions.get(tool_name)if not session:print(f"Tool '{tool_name}' not found.")breakresult = await session.call_tool(tool_name, arguments=tool_args)# 添加工具结果到消息历史messages.append({"role": "tool","tool_call_id": tool_id,"content": result.content})else:breakasync def get_resource(self, resource_uri):session = self.sessions.get(resource_uri)# Fallback for papers URIs - try any papers resource sessionif not session and resource_uri.startswith("papers://"):for uri, sess in self.sessions.items():if uri.startswith("papers://"):session = sessbreakif not session:print(f"Resource '{resource_uri}' not found.")returntry:result = await session.read_resource(uri=resource_uri)if result and result.contents:print(f"\nResource: {resource_uri}")print("Content:")print(result.contents[0].text)else:print("No content available.")except Exception as e:print(f"Error: {e}")async def list_prompts(self):"""List all available prompts."""if not self.available_prompts:print("No prompts available.")returnprint("\nAvailable prompts:")for prompt in self.available_prompts:print(f"- {prompt['name']}: {prompt['description']}")if prompt['arguments']:print(f" Arguments:")for arg in prompt['arguments']:arg_name = arg.name if hasattr(arg, 'name') else arg.get('name', '')print(f" - {arg_name}")async def execute_prompt(self, prompt_name, args):"""Execute a prompt with the given arguments."""session = self.sessions.get(prompt_name)if not session:print(f"Prompt '{prompt_name}' not found.")returntry:result = await session.get_prompt(prompt_name, arguments=args)if result and result.messages:prompt_content = result.messages[0].content# Extract text from content (handles different formats)if isinstance(prompt_content, str):text = prompt_contentelif hasattr(prompt_content, 'text'):text = prompt_content.textelse:# Handle list of content itemstext = " ".join(item.text if hasattr(item, 'text') else str(item) for item in prompt_content)print(f"\nExecuting prompt '{prompt_name}'...")await self.process_query(text)except Exception as e:print(f"Error: {e}")async def chat_loop(self):print("\nMCP Chatbot Started!")print("Type your queries or 'quit' to exit.")print("Use @folders to see available topics")print("Use @<topic> to search papers in that topic")print("Use /prompts to list available prompts")print("Use /prompt <name> <arg1=value1> to execute a prompt")while True:try:query = input("\nQuery: ").strip()if not query:continueif query.lower() == 'quit':break# Check for @resource syntax firstif query.startswith('@'):# Remove @ sign topic = query[1:]if topic == "folders":resource_uri = "papers://folders"else:resource_uri = f"papers://{topic}"await self.get_resource(resource_uri)continue# Check for /command syntaxif query.startswith('/'):parts = query.split()command = parts[0].lower()if command == '/prompts':await self.list_prompts()elif command == '/prompt':if len(parts) < 2:print("Usage: /prompt <name> <arg1=value1> <arg2=value2>")continueprompt_name = parts[1]args = {}# Parse argumentsfor arg in parts[2:]:if '=' in arg:key, value = arg.split('=', 1)args[key] = valueawait self.execute_prompt(prompt_name, args)else:print(f"Unknown command: {command}")continueawait self.process_query(query)except Exception as e:print(f"\nError: {str(e)}")async def cleanup(self):await self.exit_stack.aclose()async def main():chatbot = MCP_ChatBot()try:await chatbot.connect_to_servers()await chatbot.chat_loop()finally:await chatbot.cleanup()if __name__ == "__main__":asyncio.run(main())
代码解释
这段代码实现了一个基于MCP(Model Context Protocol)的聊天机器人,能够连接到多个MCP服务器,使用它们提供的工具、提示和资源。下面是对代码的详细解释:
类结构与初始化
class MCP_ChatBot:def __init__(self):self.exit_stack = AsyncExitStack()self.client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"),base_url=os.getenv("OPENAI_API_BASE"))# Tools list required for OpenAI APIself.available_tools = []# Prompts list for quick display self.available_prompts = []# Sessions dict maps tool/prompt names or resource URIs to MCP client sessionsself.sessions = {}
AsyncExitStack
:用于管理多个异步上下文管理器,确保资源正确释放openai.OpenAI
:创建OpenAI客户端,使用环境变量中的API密钥和基础URLavailable_tools
:存储可用工具列表,用于OpenAI API的工具调用功能available_prompts
:存储可用提示列表,用于快速显示sessions
:字典,将工具名称、提示名称或资源URI映射到对应的MCP客户端会话
服务器连接
async def connect_to_server(self, server_name, server_config):# 创建服务器参数、建立连接并初始化会话# 获取可用工具、提示和资源
这个方法负责:
- 使用提供的配置创建
StdioServerParameters
- 通过
stdio_client
建立与服务器的连接 - 创建并初始化
ClientSession
- 获取服务器提供的工具、提示和资源
- 将它们添加到相应的列表和字典中
async def connect_to_servers(self):# 从配置文件加载服务器信息并连接到每个服务器
这个方法从server_config_prompt_resource.json
文件加载服务器配置,并为每个服务器调用connect_to_server
方法。
查询处理
async def process_query(self, query):# 处理用户查询,使用OpenAI API和可用工具
这个方法是聊天机器人的核心,它:
- 创建包含用户查询的消息列表
- 调用OpenAI API,传递消息和可用工具
- 处理API的响应:
- 如果有普通文本内容,打印并添加到消息历史
- 如果有工具调用,执行每个工具调用并将结果添加到消息历史
工具调用的处理流程:
- 从响应中提取工具ID、名称和参数
- 从
sessions
字典中获取对应的会话 - 调用工具并获取结果
- 将结果添加到消息历史中
资源和提示管理
async def get_resource(self, resource_uri):# 获取并显示指定URI的资源内容
这个方法用于获取和显示资源内容,特别是论文资源。它会:
- 从
sessions
字典中获取对应的会话 - 对于以
papers://
开头的URI,如果找不到对应会话,会尝试使用任何处理papers的会话 - 调用
read_resource
方法获取资源内容 - 打印资源内容
async def list_prompts(self):# 列出所有可用的提示
这个方法列出所有可用的提示及其描述和参数。
async def execute_prompt(self, prompt_name, args):# 执行指定的提示,并处理结果
这个方法执行指定的提示,并将结果传递给process_query
方法进行处理。它处理不同格式的提示内容,确保能够正确提取文本。
聊天循环
async def chat_loop(self):# 主聊天循环,处理用户输入
这个方法是聊天机器人的主循环,它:
- 打印欢迎信息和使用说明
- 循环接收用户输入
- 根据输入的不同格式执行不同操作:
- 如果输入是
quit
,退出循环 - 如果输入以
@
开头,调用get_resource
方法获取资源 - 如果输入以
/
开头,执行命令(如列出提示或执行提示) - 否则,调用
process_query
方法处理普通查询
- 如果输入是
资源清理
async def cleanup(self):# 清理资源
这个方法调用exit_stack.aclose()
来清理所有资源。
主函数
async def main():chatbot = MCP_ChatBot()try:await chatbot.connect_to_servers()await chatbot.chat_loop()finally:await chatbot.cleanup()
主函数创建MCP_ChatBot
实例,连接到服务器,运行聊天循环,并确保在结束时清理资源。
代码运行结果
uv run mcp_chatbot_prompt_resource.py
相关文章:

吴恩达MCP课程(5):mcp_chatbot_prompt_resource.py
前提条件: 1、吴恩达MCP课程(5):research_server_prompt_resource.py 2、server_config_prompt_resource.json文件 {"mcpServers": {"filesystem": {"command": "npx","args"…...
关于DDOS
DDOS是一门没什么技术含量的东西,其本质而言是通过大量数据报文,发送到目标受害主机IP地址上,导致目标主机无法继续服务(俗称:拒绝服务) DDOS灰产人期望达成的预期目标,几乎都是只要把对面打到 …...
云服务器自带的防御可靠吗
最近有不少朋友问我,云服务器自带的防御靠不靠谱。就拿大厂的云服务器来说,很多都自带5G防御。但这5G防御能力,在如今的网络攻击环境下,真的有些不够看。 如今,网络攻击手段层出不穷,攻击流量更是越来越大。…...
Java详解LeetCode 热题 100(27):LeetCode 21. 合并两个有序链表(Merge Two Sorted Lists)详解
文章目录 1. 题目描述1.1 链表节点定义 2. 理解题目2.1 问题可视化2.2 核心挑战 3. 解法一:迭代法(哨兵节点)3.1 算法思路3.2 Java代码实现3.3 详细执行过程演示3.4 执行结果示例3.5 复杂度分析3.6 优缺点分析 4. 解法二:递归法4.…...

设计模式——抽象工厂设计模式(创建型)
摘要 抽象工厂设计模式是一种创建型设计模式,旨在提供一个接口,用于创建一系列相关或依赖的对象,无需指定具体类。它通过抽象工厂、具体工厂、抽象产品和具体产品等组件构建,相比工厂方法模式,能创建一个产品族。该模…...

基于LocalAI与cpolar技术协同的本地化AI模型部署与远程访问方案解析
文章目录 前言1. Docker部署2. 简单使用演示3. 安装cpolar内网穿透4. 配置公网地址5. 配置固定公网地址前言 各位极客朋友们!今天要向大家推荐一套创新性的本地部署方案——LocalAI技术架构。这款开源工具包能够将普通配置的笔记本电脑转化为具备强大算力的AI工作站,轻松实现…...
Linux 云服务器部署 Flask 项目(含后台运行与 systemd 开机自启)
一、准备工作 在开始正式部署之前,请确认以下前提条件已经准备好: 你有一台运行 Linux 系统(CentOS 或 Ubuntu)的服务器; 服务器有公网 IP,本例中使用:111.229.204.102; 你拥有该服务器的管理员权限(可以使用 sudo); 打算使用 Flask 构建一个简单的 Web 接口; 服务…...

霍尔效应传感器的革新突破:铟化铟晶体与结构演进驱动汽车点火系统升级
一、半导体材料革新:铟化铟晶体的电压放大机制 铟化铟(InSb)晶体因其独特的能带结构,成为提升霍尔电压的关键材料。相较于传统硅基材料,其载流子迁移率高出3-5倍,在相同磁场强度下可显著放大霍尔电压。其作…...

无法运用pytorch环境、改环境路径、隔离环境
一.未建虚拟环境时 1.创建新项目后,直接运行是这样的。 2.设置中Virtualenv找不到pytorch环境?因为此时没有创建新虚拟环境。 3.选择conda环境(全局环境)时,是可以下载环境的。 运行结果如下: 是全局环境…...

从0开始学vue:pnpm怎么安装
一、什么是 pnpm? pnpm(Performant npm)是新一代 JavaScript 包管理器,优势包括: 节省磁盘空间:通过硬链接和符号链接实现高效存储安装速度更快:比 npm/yarn 快 2-3 倍内置工作区支持…...
React从基础入门到高级实战:React 实战项目 - 项目二:电商平台前端
React 实战项目:电商平台前端 欢迎来到本 React 开发教程专栏的第 27 篇!在前 26 篇文章中,我们从 React 的基础概念逐步深入到高级技巧,涵盖了组件、状态、路由、性能优化和设计模式等核心知识。这一次,我们将通过一…...

Python 网络编程 -- WebSocket编程
作者主要是为了用python构建实时网络通信程序。 概念性的东西越简单越好理解,因此,下面我从晚上摘抄的概念 我的理解。 什么是网络通信? 更确切地说,网络通信是两台计算机上的两个进程之间的通信。比如,浏览器进程和新浪服务器上的某个Web服务进程在通…...
微信小程序动态组件加载的应用场景与实现方式
动态组件加载的应用场景与实现方式 你提供的代码展示了微信小程序中动态加载组件的方法,但这种方式在实际开发中需要注意使用场景和实现细节。下面我来详细说明如何应用: 应用场景 按需加载组件:在某些条件满足时才加载组件动态配置组件&a…...
人工智能在智能教育中的创新应用与未来趋势
随着人工智能(AI)技术的飞速发展,教育领域正经历着一场深刻的变革。智能教育通过引入AI、物联网(IoT)、大数据和云计算等前沿技术,正在实现教育的个性化、智能化和高效化。本文将探讨人工智能在智能教育中的…...

边缘计算应用实践心得
当数据中心的光纤开始承载不了爆炸式增长的物联网数据流时,边缘计算就像毛细血管般渗透进现代数字肌理的末梢。这种将算力下沉到数据源头的技术范式,本质上是对传统云计算中心化架构的叛逆与补充——在智能制造车间里,实时质检算法直接在工业…...

EXCEL如何快速批量给两字姓名中间加空格
EXCEL如何快速批量给姓名中间加空格 优点:不会导致排版混乱 缺点:无法输出在原有单元格上,若需要保留原始数据,可将公式结果复制后“选择性粘贴为值” 使用场景:在EXCEL中想要快速批量给两字姓名中间加入空格使姓名对…...
OD 算法题 B卷【BOSS的收入】
文章目录 BOSS的收入 BOSS的收入 一个公司只有一个boss,其有若干一级分销,一级分销又有若干二级分销,每个分销只有唯一的上级;每个月,下级分销需要将自己的总收入(自己的下级上交的)࿰…...
Linux共享内存原理及系统调用分析
shmget 是 System V 共享内存的核心系统调用之一,其权限位(shmflg 参数)决定了共享内存段的访问控制和创建行为。以下是权限位的详细解析: 权限位的组成 shmflg 参数由两部分组成: 权限标志(低 9 位&…...

Jenkins | Linux环境部署Jenkins与部署java项目
1. 部署jenkins 1.1 下载war包 依赖环境 jdk 11 下载地址: https://www.jenkins.io/ 依赖环境 1.2 启动服务 启动命令 需要注意使用jdk11以上的版本 直接启动 # httpPort 指定端口 #-Xms2048m -Xmx4096m 指定java 堆内存初始大小 与最大大小 /usr/java/jdk17/bin/java…...

react私有样式处理
react私有样式处理 Nav.jsx Menu.jsx vue中通过scoped来实现样式私有化。加上scoped,就属于当前组件的私有样式。 给视图中的元素都加了一个属性data-v-xxx,然后给这些样式都加上属性选择器。(deep就是不加属性也不加属性选择器) …...

UDP/TCP协议全解
目录 一. UDP协议 1.UDP协议概念 2.UDP数据报格式 3.UDP协议差错控制 二. TCP协议 1.TCP协议概念 2.三次握手与四次挥手 3.TCP报文段格式(重点) 4.流量控制 5.拥塞控制 一. UDP协议 1.UDP协议概念 当应用层的进程1要向进程2传输报文ÿ…...
nginx 服务启动失败问题记录
背景和问题 systemctl status nginx.service 查看报错信息,显示如下: nginx: [emerg] socket() [::]:80 failed (97: Address family not supported by protocol) nginx: configuration file /etc/nginx/nginx.conf test failed问题分析 这个错误通常…...

Duix.HeyGem:以“离线+开源”重构数字人创作生态
在AI技术快速演进的今天,虚拟数字人正从高成本、高门槛的专业领域走向大众化应用。Duix.HeyGem 数字人项目正是这一趋势下的杰出代表。该项目由一支拥有七年AI研发经验的团队打造,通过放弃传统3D建模路径,转向真人视频驱动的AI训练模型,成功实现了低成本、高质量、本地化的…...

ubuntu22.04安装megaton
前置 sudo apt-get install git cmake ninja-build generate-ninja安装devkitPro https://blog.csdn.net/qq_39942341/article/details/148388639?spm1001.2014.3001.5502 安装cargo https://blog.csdn.net/qq_39942341/article/details/148387783?spm1001.2014.3001.5501 …...
风机下引线断点检测算法实现
风机下引线断点检测算法实现 1. 算法原理 该检测系统基于时域反射法(TDR)原理: 在引线起点注入高压纳秒级脉冲脉冲沿引线传播,遇到阻抗不连续点(断点)产生反射采集反射信号并计算时间差通过小波变换进行信号去噪和特征提取根据传播速度计算断点位置:距离 = (传播速度 时间…...

Windows应用-GUID工具
下载本应用 我们在DirectShow和媒体基础程序的调试中,将会遇到大量的GUID,调试窗口大部分情况下只给出GUID字符串,此GUID代表什么,我们无从得知。这时,就需要此“GUID工具”,将GUID字符串翻译为GUID定义&am…...

vue+element-ui一个页面有多个子组件组成。子组件里面有各种表单,实现点击enter实现跳转到下一个表单元素的功能。
一个父组件里面是有各个子组件的form表单组成的。 我想实现点击enter。焦点直接跳转到下一个表单元素。 父组件就是由各个子组件构成 子组件就像下图一样的都有个el-form的表单。 enterToTab.js let enterToTab {}; (function() {// 返回随机数enterToTab.addEnterListener …...
Spring Boot 启动流程及配置类解析原理
Spring Boot 是一个基于 Spring 框架的开源框架,旨在简化 Spring 应用的配置和部署。通过提供约定优于配置的原则,Spring Boot 大大降低了 Java 企业级应用的开发复杂度。本文将详细介绍 Spring Boot 的启动流程及其配置类的解析原理,帮助开发…...

Vehicle HAL(5)--vhal 实现设置属性的流程
目录 1. ard11 vhal 设置属性的时序图 CarService > vhal > CarService 2. EmulatedVehicleHal::set(xxx) 的实现 本文介绍ard11的vhal属性设置流程图。 1. ard11 vhal 设置属性的时序图 CarService > vhal > CarService 2. EmulatedVehicleHal::set(xxx) 的实现…...

WebRTC中的几个Rtp*Sender
一、问题: webrtc当中有几个比较相似的类,看着都是发送RTP数据包的,分别是:RtpPacketToSend 和RtpSenderVideo还有RtpVideoSender以及RTPSender,这说明什么呢?首先,说明我会很多连词࿰…...