当前位置: 首页 > article >正文

AI Agents系列之构建多智能体系统

🧠 向所有学习者致敬!

“学习不是装满一桶水,而是点燃一把火。” —— 叶芝


我的博客主页: https://lizheng.blog.csdn.net

🌐 欢迎点击加入AI人工智能社区!

🚀 让我们一起努力,共创AI未来! 🚀

智能体是一个使用大型语言模型(LLM)来决定应用程序控制流程的系统。随着这些系统的开发,它们可能会随着时间的推移变得越来越复杂,从而难以管理和扩展。例如,你可能会遇到以下问题:

  • 智能体拥有太多工具,却不知道该调用哪个工具,或者何时调用。
  • 上下文变得过于复杂,单个智能体难以跟踪。
  • 系统需要多个专业领域(例如规划者、研究者、数学专家等)。

为了解决这些问题,你可以考虑将应用程序拆分为多个较小的独立智能体,并将它们组合成一个多智能体系统。这些独立智能体可以简单到只是一个提示和一个LLM调用,也可以复杂到一个ReAct智能体(以及更多)!

文章目录

      • 🧠 向所有学习者致敬!
      • 🌐 欢迎[点击加入AI人工智能社区](https://bbs.csdn.net/forums/b8786ecbbd20451bbd20268ed52c0aad?joinKey=bngoppzm57nz-0m89lk4op0-1-315248b33aafff0ea7b)!
      • 1.1 单智能体架构与多智能体架构
    • 2\. 多智能体架构
      • 2.1 多智能体系统中的模式
      • **2.1.1 并行**
      • **2.1.2 顺序**
      • **2.1.3 循环**
      • **2.1.4 路由器**
      • **2.1.5 聚合器(或合成器)**
      • **2.1.6 网络(或水平)**
      • **2.1.7 交接**
      • **2.1.8 监督者**
      • **2.1.9 监督者(工具调用)**
      • **2.1.10 层级(或垂直)**
      • **2.1.11 自定义多智能体工作流**
    • 3\. 智能体之间的通信
      • 3.1 图状态与工具调用
      • 3.2 不同的状态模式
      • 3.3 共享消息列表
    • 4\. 结论

在这里插入图片描述
随着智能体框架的发展,许多公司开始构建自己的多智能体系统,并寻找一种能够解决所有智能体任务的万能解决方案。两年前,研究人员设计了一个名为 ChatDev的多智能体协作系统。ChatDev就像一家虚拟软件公司,通过各种具有不同角色的智能体(如首席执行官、首席产品官、艺术设计师、编码员、审查员、测试员等)来运营,就像一家普通的软件工程公司一样。

None

图片来源:ChatDev

所有这些智能体共同努力并相互沟通,成功创建了一款视频游戏,他们的努力取得了成功。在这一成就之后,许多人相信任何软件工程任务都可以通过这种多智能体架构来解决,其中每个AI都有一个独特的角色。然而,现实世界的实验表明,并非每个问题都可以用相同的架构来解决。在某些情况下,更简单的架构可能会提供更有效、更经济的解决方案。

1.1 单智能体架构与多智能体架构

一开始,单智能体方法似乎很有道理(即一个AI智能体能够完成从浏览器导航到文件操作的所有任务)。然而,随着时间的推移,随着任务变得越来越复杂,工具数量不断增加,我们的单智能体方法将开始面临挑战。

None

单智能体 | 图片来源:Weaviate

我们会注意到当智能体开始出现异常行为时,这可能是由于以下原因导致的:

  • 工具过多:智能体在使用哪些工具以及何时使用方面变得困惑。
  • 上下文过于复杂:智能体的上下文窗口越来越大,包含的工具也越来越多。
  • 错误过多:由于职责过于广泛,智能体开始产生次优或错误的结果。

当我们开始自动化多个不同的子任务(如数据提取或报告生成)时,可能就是时候开始分离职责了。通过使用多个AI智能体,每个智能体专注于自己的领域和工具集,我们可以提高解决方案的清晰度和质量。这不仅可以让智能体变得更加高效,还能简化智能体本身的开发过程。

None

多智能体 | 图片来源:Weaviate

2. 多智能体架构

正如你所看到的,单智能体和多智能体架构都有各自的优缺点。单智能体架构适用于任务简单明了、定义明确且没有特定资源限制的情况。另一方面,多智能体架构在用例复杂且动态、需要更多专业知识和协作,或者有可扩展性和适应性要求时会更有帮助。

2.1 多智能体系统中的模式

在多智能体系统中,有几种连接智能体的方式:

2.1.1 并行

多个智能体同时处理任务的不同部分。

None

图片来源:Weaviate

示例:我们希望同时使用3个智能体对给定文本进行总结翻译情感分析

None

图由作者提供

代码:

from typing import Dict, Any, TypedDict
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableConfig
from textblob import TextBlob
import re
import time# 定义状态
class AgentState(TypedDict):text: strsummary: strtranslation: strsentiment: strsummary_time: floattranslation_time: floatsentiment_time: float# 总结智能体
def summarize_agent(state: AgentState) -> Dict[str, Any]:print("总结智能体:运行中")start_time = time.time()try:text = state["text"]if not text.strip():return {"summary": "未提供用于总结的文本。","summary_time": 0.0}time.sleep(2)sentences = re.split(r'(?<=[.!?]) +', text.strip())scored_sentences = [(s, len(s.split())) for s in sentences if s]top_sentences = [s for s, _ in sorted(scored_sentences, key=lambda x: x[1], reverse=True)[:2]]summary = " ".join(top_sentences) if top_sentences else "文本太短,无法总结。"processing_time = time.time() - start_timeprint(f"总结智能体:完成,耗时 {processing_time:.2f} 秒")return {"summary": summary,"summary_time": processing_time}except Exception as e:return {"summary": f"总结时出错:{str(e)}","summary_time": 0.0}# 翻译智能体
def translate_agent(state: AgentState) -> Dict[str, Any]:print("翻译智能体:运行中")start_time = time.time()try:text = state["text"]if not text.strip():return {"translation": "未提供用于翻译的文本。","translation_time": 0.0}time.sleep(3)translation = ("El nuevo parque en la ciudad es una maravillosa adición. ""Las familias disfrutan de los espacios abiertos, y a los niños les encanta el parque infantil. ""Sin embargo, algunas personas piensan que el área de estacionamiento es demasiado pequeña.")processing_time = time.time() - start_timeprint(f"翻译智能体:完成,耗时 {processing_time:.2f} 秒")return {"translation": translation,"translation_time": processing_time}except Exception as e:return {"translation": f"翻译时出错:{str(e)}","translation_time": 0.0}# 情感分析智能体
def sentiment_agent(state: AgentState) -> Dict[str, Any]:print("情感分析智能体:运行中")start_time = time.time()try:text = state["text"]if not text.strip():return {"sentiment": "未提供用于情感分析的文本。","sentiment_time": 0.0}time.sleep(1.5)blob = TextBlob(text)polarity = blob.sentiment.polaritysubjectivity = blob.sentiment.subjectivitysentiment = "Positive" if polarity > 0 else "Negative" if polarity < 0 else "Neutral"result = f"{sentiment} (Polarity: {polarity:.2f}, Subjectivity: {subjectivity:.2f})"processing_time = time.time() - start_timeprint(f"情感分析智能体:完成,耗时 {processing_time:.2f} 秒")return {"sentiment": result,"sentiment_time": processing_time}except Exception as e:return {"sentiment": f"情感分析时出错:{str(e)}","sentiment_time": 0.0}# 合并并行结果
def join_parallel_results(state: AgentState) -> AgentState:return state# 构建图
def build_parallel_graph() -> StateGraph:workflow = StateGraph(AgentState)# 定义并行分支parallel_branches = {"summarize_node": summarize_agent,"translate_node": translate_agent,"sentiment_node": sentiment_agent}# 添加并行处理节点for name, agent in parallel_branches.items():workflow.add_node(name, agent)# 添加分支和合并节点workflow.add_node("branch", lambda state: state)  # 简化的分支函数workflow.add_node("join", join_parallel_results)# 设置入口点workflow.set_entry_point("branch")# 添加并行执行的边for name in parallel_branches:workflow.add_edge("branch", name)workflow.add_edge(name, "join")workflow.add_edge("join", END)return workflow.compile()# 主函数
def main():text = ("The new park in the city is a wonderful addition. Families are enjoying the open spaces, ""and children love the playground. However, some people think the parking area is too small.")initial_state: AgentState = {"text": text,"summary": "","translation": "","sentiment": "","summary_time": 0.0,"translation_time": 0.0,"sentiment_time": 0.0}print("\n构建新图...")app = build_parallel_graph()print("\n开始并行处理...")start_time = time.time()config = RunnableConfig(parallel=True)result = app.invoke(initial_state, config=config)total_time = time.time() - start_timeprint("\n=== 并行任务结果 ===")print(f"输入文本:\n{text}\n")print(f"总结:\n{result['summary']}\n")print(f"翻译(西班牙语):\n{result['translation']}\n")print(f"情感分析:\n{result['sentiment']}\n")print("\n=== 处理时间 ===")processing_times = {"summary": result["summary_time"],"translation": result["translation_time"],"sentiment": result["sentiment_time"]}for agent, time_taken in processing_times.items():print(f"{agent.capitalize()}{time_taken:.2f} 秒")print(f"\n总墙钟时间:{total_time:.2f} 秒")print(f"各处理时间之和:{sum(processing_times.values()):.2f} 秒")print(f"并行处理节省的时间:{sum(processing_times.values()) - total_time:.2f} 秒")if __name__ == "__main__":main()

输出:

构建新图...开始并行处理...
情感分析智能体:运行中
总结智能体:运行中
翻译智能体:运行中
情感分析智能体:完成,耗时 1.50 秒
总结智能体:完成,耗时 2.00 秒
翻译智能体:完成,耗时 3.00=== 并行任务结果 ===
输入文本:
The new park in the city is a wonderful addition. Families are enjoying the open spaces, and children love the playground. However, some people think the parking area is too small.总结:
Families are enjoying the open spaces, and children love the playground. The new park in the city is a wonderful addition.翻译(西班牙语):
El nuevo parque en la ciudad es una maravillosa adición. Las familias disfrutan de los espacios abiertos, y a los niños les encanta el parque infantil. Sin embargo, algunas personas piensan que el área de estacionamiento es demasiado pequeña.情感分析:
Positive (Polarity: 0.31, Subjectivity: 0.59)=== 处理时间 ===
Summary: 2.00 秒
Translation: 3.00 秒
Sentiment: 1.50 秒总墙钟时间:3.01 秒
各处理时间之和:6.50 秒
并行处理节省的时间:3.50 秒
  • 并行性:总结、翻译和情感分析这三个任务同时运行,减少了总处理时间。
  • 独立性:每个智能体独立处理输入文本,在执行过程中不需要智能体之间的通信。
  • 协调性:队列确保结果被安全收集并按顺序显示。
  • 实际用例:总结、翻译和情感分析是常见的自然语言处理任务,从较大的文本中受益于并行处理。

2.1.2 顺序

任务按顺序处理,一个智能体的输出成为下一个智能体的输入。

None

图片来源:Weaviate

示例:多步骤审批。

代码:

from typing import Dict
from langgraph.graph import StateGraph, MessagesState, END
from langchain_core.runnables import RunnableConfig
from langchain_core.messages import HumanMessage, AIMessage
import json# 智能体1:团队负责人
def team_lead_agent(state: MessagesState, config: RunnableConfig) -> Dict:print("智能体(团队负责人):开始审批")messages = state["messages"]proposal = json.loads(messages[0].content)title = proposal.get("title", "")amount = proposal.get("amount", 0.0)if not title or amount <= 0:status = "Rejected"comment = "团队负责人:由于缺少标题或金额无效,提案被拒绝。"goto = ENDelse:status = "Approved by Team Lead"comment = "团队负责人:提案完整且已批准。"goto = "dept_manager"print(f"智能体(团队负责人):审批完成 - {status}")messages.append(AIMessage(content=json.dumps({"status": status, "comment": comment}),additional_kwargs={"agent": "team_lead", "goto": goto}))return {"messages": messages}# 智能体2:部门经理
def dept_manager_agent(state: MessagesState, config: RunnableConfig) -> Dict:print("智能体(部门经理):开始审批")messages = state["messages"]team_lead_msg = next((m for m in messages if m.additional_kwargs.get("agent") == "team_lead"), None)proposal = json.loads(messages[0].content)amount = proposal.get("amount", 0.0)if json.loads(team_lead_msg.content)["status"] != "Approved by Team Lead":status = "Rejected"comment = "部门经理:由于团队负责人拒绝,跳过。"goto = ENDelif amount > 100000:status = "Rejected"comment = "部门经理:预算超出限制。"goto = ENDelse:status = "Approved by Department Manager"comment = "部门经理:预算在限制范围内。"goto = "finance_director"print(f"智能体(部门经理):审批完成 - {status}")messages.append(AIMessage(content=json.dumps({"status": status, "comment": comment}),additional_kwargs={"agent": "dept_manager", "goto": goto}))return {"messages": messages}# 智能体3:财务总监
def finance_director_agent(state: MessagesState, config: RunnableConfig) -> Dict:print("智能体(财务总监):开始审批")messages = state["messages"]dept_msg = next((m for m in messages if m.additional_kwargs.get("agent") == "dept_manager"), None)proposal = json.loads(messages[0].content)amount = proposal.get("amount", 0.0)if json.loads(dept_msg.content)["status"] != "Approved by Department Manager":status = "Rejected"comment = "财务总监:由于部门经理拒绝,跳过。"elif amount > 50000:status = "Rejected"comment = "财务总监:预算不足。"else:status = "Approved"comment = "财务总监:已批准且可行。"print(f"智能体(财务总监):审批完成 - {status}")messages.append(AIMessage(content=json.dumps({"status": status, "comment": comment}),additional_kwargs={"agent": "finance_director", "goto": END}))return {"messages": messages}# 路由函数
def route_step(state: MessagesState) -> str:for msg in reversed(state["messages"]):goto = msg.additional_kwargs.get("goto")if goto:print(f"路由:智能体 {msg.additional_kwargs.get('agent')} 设置 goto 为 {goto}")return gotoreturn END# 构建 LangGraph
builder = StateGraph(MessagesState)
builder.add_node("team_lead", team_lead_agent)
builder.add_node("dept_manager", dept_manager_agent)
builder.add_node("finance_director", finance_director_agent)builder.set_entry_point("team_lead")builder.add_conditional_edges("team_lead", route_step, {"dept_manager": "dept_manager",END: END
})
builder.add_conditional_edges("dept_manager", route_step, {"finance_director": "finance_director",END: END
})
builder.add_conditional_edges("finance_director", route_step, {END: END
})workflow = builder.compile()# 主运行器
def main():initial_state = {"messages": [HumanMessage(content=json.dumps({"title": "New Equipment Purchase","amount": 40000.0,"department": "Engineering"}))]}result = workflow.invoke(initial_state)messages = result["messages"]proposal = json.loads(messages[0].content)print("\n=== 审批结果 ===")print(f"提案标题:{proposal['title']}")final_status = "Unknown"comments = []for msg in messages[1:]:if isinstance(msg, AIMessage):try:data = json.loads(msg.content)if "status" in data:final_status = data["status"]if "comment" in data:comments.append(data["comment"])except Exception:continueprint(f"最终状态:{final_status}")print("评论:")for comment in comments:print(f"  - {comment}")if __name__ == "__main__":main()

输出(金额 = $40,000):

智能体(团队负责人):开始审批
智能体(团队负责人):审批完成 - 已由团队负责人批准
路由:智能体 team_lead 设置 goto 为 dept_manager
智能体(部门经理):开始审批
智能体(部门经理):审批完成 - 已由部门经理批准
路由:智能体 dept_manager 设置 goto 为 finance_director
智能体(财务总监):开始审批
智能体(财务总监):审批完成 - 已批准
路由:智能体 finance_director 设置 goto 为 __end__=== 审批结果 ===
提案标题:New Equipment Purchase
最终状态:Approved
评论:- 团队负责人:提案完整且已批准。- 部门经理:预算在限制范围内。- 财务总监:已批准且可行。
  • 顺序执行
  • 智能体按顺序运行:团队负责人 → 部门经理 → 财务总监。
  • 如果任何智能体拒绝,循环将中断,跳过剩余的智能体。
  • 每个智能体修改共享的 Proposal 对象,更新状态和评论。
  • 协调性
  • 结果存储在列表中,但 Proposal 对象在智能体之间传递状态。
  • 不使用多处理,确保单线程、有序的工作流。

2.1.3 循环

智能体在迭代周期中运行,根据其他智能体的反馈不断改进其输出。

None

图片来源:Weaviate

示例:评估用例,如代码编写和代码测试。

None

图由作者提供

代码:

from typing import Dict, Any, List
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableConfig
import textwrap# 状态用于跟踪工作流
class EvaluationState(Dict[str, Any]):code: str = ""feedback: str = ""passed: bool = Falseiteration: int = 0max_iterations: int = 3history: List[Dict] = []def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs)self.setdefault("code", "")self.setdefault("feedback", "")self.setdefault("passed", False)self.setdefault("iteration", 0)self.setdefault("max_iterations", 3)self.setdefault("history", [])# 智能体1:代码编写者
def code_writer_agent(state: EvaluationState, config: RunnableConfig) -> Dict[str, Any]:print(f"迭代 {state['iteration'] + 1} - 代码编写者:生成代码")print(f"迭代 {state['iteration'] + 1} - 代码编写者:收到反馈:{state['feedback']}")iteration = state["iteration"] + 1feedback = state["feedback"]if iteration == 1:# 初始尝试:基本阶乘(有漏洞,未处理零或负数)code = textwrap.dedent("""def factorial(n):result = 1for i in range(1, n + 1):result *= ireturn result""")writer_feedback = "初始代码已生成。"elif "factorial(0)" in feedback.lower():# 修复零的情况code = textwrap.dedent("""def factorial(n):if n == 0:return 1result = 1for i in range(1, n + 1):result *= ireturn result""")writer_feedback = "修复了 n=0 的处理。"elif "factorial(-1)" in feedback.lower() or "negative" in feedback.lower():# 修复负数输入code = textwrap.dedent("""def factorial(n):if n < 0:raise ValueError("阶乘对负数未定义")if n == 0:return 1result = 1for i in range(1, n + 1):result *= ireturn result""")writer_feedback = "添加了对负数输入的错误处理。"else:code = state["code"]writer_feedback = "未发现进一步改进。"print(f"迭代 {iteration} - 代码编写者:代码已生成")return {"code": code,"feedback": writer_feedback,"iteration": iteration}# 智能体2:代码测试者
def code_tester_agent(state: EvaluationState, config: RunnableConfig) -> Dict[str, Any]:print(f"迭代 {state['iteration']} - 代码测试者:测试代码")code = state["code"]try:# 定义测试用例test_cases = [(0, 1),      # factorial(0) = 1(1, 1),      # factorial(1) = 1(5, 120),    # factorial(5) = 120(-1, None),  # 应该引发 ValueError]# 在安全命名空间中执行代码namespace = {}exec(code, namespace)factorial = namespace.get('factorial')if not callable(factorial):return {"passed": False, "feedback": "未找到阶乘函数。"}feedback_parts = []passed = True# 运行所有测试用例并收集所有失败情况for input_val, expected in test_cases:try:result = factorial(input_val)if expected is None:  # 期望出错passed = Falsefeedback_parts.append(f"测试失败:factorial({input_val}) 应该引发错误。")elif result != expected:passed = Falsefeedback_parts.append(f"测试失败:factorial({input_val}) 返回 {result},期望 {expected}。")except ValueError as ve:if expected is not None:passed = Falsefeedback_parts.append(f"测试失败:factorial({input_val}) 意外引发 ValueError:{str(ve)}")except Exception as e:passed = Falsefeedback_parts.append(f"测试失败:factorial({input_val}) 导致错误:{str(e)}")feedback = "所有测试通过!" if passed else "\n".join(feedback_parts)print(f"迭代 {state['iteration']} - 代码测试者:测试完成 - {'通过' if passed else '失败'}")# 在历史记录中记录此次尝试history = state["history"]history.append({"iteration": state["iteration"],"code": code,"feedback": feedback,"passed": passed})return {"passed": passed,"feedback": feedback,"history": history}except Exception as e:print(f"迭代 {state['iteration']} - 代码测试者:失败")return {"passed": False, "feedback": f"测试时出错:{str(e)}"}# 条件边以决定是否继续循环
def should_continue(state: EvaluationState) -> str:if state["passed"] or state["iteration"] >= state["max_iterations"]:print(f"迭代 {state['iteration']} - {'循环停止:测试通过' if state['passed'] else '循环停止:达到最大迭代次数'}")return "end"print(f"迭代 {state['iteration']} - 循环继续:测试失败")return "code_writer"# 构建 LangGraph 工作流
workflow = StateGraph(EvaluationState)# 添加节点
workflow.add_node("code_writer", code_writer_agent)
workflow.add_node("code_tester", code_tester_agent)# 添加边
workflow.set_entry_point("code_writer")
workflow.add_edge("code_writer", "code_tester")
workflow.add_conditional_edges("code_tester",should_continue,{"code_writer": "code_writer","end": END}
)# 编译图
app = workflow.compile()# 运行工作流
def main():initial_state = EvaluationState()result = app.invoke(initial_state)# 显示结果print("\n=== 评估结果 ===")print(f"最终状态:{'通过' if result['passed'] else '失败'},经过 {result['iteration']} 次迭代")print(f"最终代码:\n{result['code']}")print(f"最终反馈:\n{result['feedback']}")print("\n迭代历史:")for attempt in result["history"]:print(f"迭代 {attempt['iteration']}:")print(f"  代码:\n{attempt['code']}")print(f"  反馈:{attempt['feedback']}")print(f"  通过:{attempt['passed']}\n")if __name__ == "__main__":main()

输出:

迭代 1 - 代码编写者:生成代码
迭代 1 - 代码编写者:收到反馈:
迭代 1 - 代码编写者:代码已生成
迭代 1 - 代码测试者:测试代码
迭代 1 - 代码测试者:测试完成 - 失败
迭代 1 - 循环继续:测试失败
迭代 2 - 代码编写者:生成代码
迭代 2 - 代码编写者:收到反馈:测试失败:factorial(-1) 应该引发错误。
迭代 2 - 代码编写者:代码已生成
迭代 2 - 代码测试者:测试代码
迭代 2 - 代码测试者:测试完成 - 通过
迭代 2 - 循环停止:测试通过=== 评估结果 ===
最终状态:通过,经过 2 次迭代
最终代码:def factorial(n):if n < 0:raise ValueError("阶乘对负数未定义")if n == 0:return 1result = 1for i in range(1, n + 1):result *= ireturn result最终反馈:
所有测试通过!迭代历史:
迭代 1:代码:def factorial(n):result = 1for i in range(1, n + 1):result *= ireturn result反馈:测试失败:factorial(-1) 应该引发错误。通过:False迭代 2:代码:def factorial(n):if n < 0:raise ValueError("阶乘对负数未定义")if n == 0:return 1result = 1for i in range(1, n + 1):result *= ireturn result反馈:所有测试通过!通过:True
  • 全面反馈:代码测试者现在报告所有测试失败情况,确保代码编写者能够逐步修复问题。
  • 正确处理反馈:代码编写者优先修复问题(先处理零的情况,然后是负数输入),确保逐步改进。
  • 循环终止:当测试通过时,循环正确退出,而不是不必要地运行所有 3 次迭代。

2.1.4 路由器

一个中央路由器根据任务或输入决定调用哪些智能体。

None

图片来源:Weaviate

示例:客户支持工单路由
在这里插入图片描述

代码:

from typing import Dict, Any, TypedDict, Literal
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableConfig
import re
import time# 第一步:定义状态
# 状态保存工单信息和处理结果
class TicketState(TypedDict):ticket_text: str  # 工单内容category: str     # 确定的类别(计费、技术、一般或未知)resolution: str   # 支持团队提供的解决方案processing_time: float  # 处理工单所用时间# 第二步:定义路由器智能体
# 该智能体分析工单并确定其类别
def router_agent(state: TicketState) -> Dict[str, Any]:print("路由器智能体:分析工单...")start_time = time.time()ticket_text = state["ticket_text"].lower()# 简单的基于关键字的分类(可以用 LLM 或 ML 模型替换)if any(keyword in ticket_text for keyword in ["billing", "payment", "invoice", "charge"]):category = "Billing"elif any(keyword in ticket_text for keyword in ["technical", "bug", "error", "crash"]):category = "Technical"elif any(keyword in ticket_text for keyword in ["general", "question", "inquiry", "info"]):category = "General"else:category = "Unknown"processing_time = time.time() - start_timeprint(f"路由器智能体:在 {processing_time:.2f} 秒内将工单分类为 '{category}'")return {"category": category,"processing_time": processing_time}# 第三步:定义支持团队智能体
# 每个智能体处理特定类别的工单# 计费团队智能体
def billing_team_agent(state: TicketState) -> Dict[str, Any]:print("计费团队智能体:处理工单...")start_time = time.time()ticket_text = state["ticket_text"]resolution = f"计费团队:已审阅工单 '{ticket_text}'。请检查您的发票详情,或联系我们的计费部门以获取进一步帮助。"processing_time = time.time() - start_timetime.sleep(1)  # 模拟处理时间print(f"计费团队智能体:在 {processing_time:.2f} 秒内完成")return {"resolution": resolution,"processing_time": state["processing_time"] + processing_time}# 技术支持团队智能体
def technical_team_agent(state: TicketState) -> Dict[str, Any]:print("技术支持团队智能体:处理工单...")start_time = time.time()ticket_text = state["ticket_text"]resolution = f"技术支持团队:已审阅工单 '{ticket_text}'。请尝试重新启动您的设备,或提交详细的错误日志以便进一步调查。"processing_time = time.time() - start_timetime.sleep(1.5)  # 模拟处理时间print(f"技术支持团队智能体:在 {processing_time:.2f} 秒内完成")return {"resolution": resolution,"processing_time": state["processing_time"] + processing_time}# 一般支持团队智能体
def general_team_agent(state: TicketState) -> Dict[str, Any]:print("一般团队智能体:处理工单...")start_time = time.time()ticket_text = state["ticket_text"]resolution = f"一般团队:已审阅工单 '{ticket_text}'。有关更多信息,请参阅我们的常见问题解答,或通过电子邮件联系我们。"processing_time = time.time() - start_timetime.sleep(0.8)  # 模拟处理时间print(f"一般团队智能体:在 {processing_time:.2f} 秒内完成")return {"resolution": resolution,"processing_time": state["processing_time"] + processing_time}# 手动审核智能体(用于未知类别)
def manual_review_agent(state: TicketState) -> Dict[str, Any]:print("手动审核智能体:处理工单...")start_time = time.time()ticket_text = state["ticket_text"]resolution = f"手动审核:工单 '{ticket_text}' 无法分类。标记为人工审核。请手动分配给适当的团队。"processing_time = time.time() - start_timetime.sleep(0.5)  # 模拟处理时间print(f"手动审核智能体:在 {processing_time:.2f} 秒内完成")return {"resolution": resolution,"processing_time": state["processing_time"] + processing_time}# 第四步:定义路由器函数
# 该函数根据工单类别确定下一个节点
def route_ticket(state: TicketState) -> Literal["billing_team", "technical_team", "general_team", "manual_review"]:category = state["category"]print(f"路由:工单类别为 '{category}'")if category == "Billing":return "billing_team"elif category == "Technical":return "technical_team"elif category == "General":return "general_team"else:return "manual_review"# 第五步:构建带有路由器模式的图
def build_router_graph() -> StateGraph:workflow = StateGraph(TicketState)# 添加节点workflow.add_node("router", router_agent)  # 入口点:对工单进行分类workflow.add_node("billing_team", billing_team_agent)  # 处理计费工单workflow.add_node("technical_team", technical_team_agent)  # 处理技术工单workflow.add_node("general_team", general_team_agent)  # 处理一般咨询workflow.add_node("manual_review", manual_review_agent)  # 处理未分类工单# 设置入口点workflow.set_entry_point("router")# 添加条件边以进行路由workflow.add_conditional_edges("router",route_ticket,  # 路由器函数,用于确定下一个节点{"billing_team": "billing_team","technical_team": "technical_team","general_team": "general_team","manual_review": "manual_review"})# 从每个团队添加边到 ENDworkflow.add_edge("billing_team", END)workflow.add_edge("technical_team", END)workflow.add_edge("general_team", END)workflow.add_edge("manual_review", END)return workflow.compile()# 第六步:运行工作流
def main():# 不同工单类别的测试用例test_tickets = ["我对我的最后一张发票有计费问题。似乎我被多收费了。","我的应用程序不断出现技术错误。请帮忙!","我对您的服务有一个一般性问题。能否提供更多信息?","我需要帮助解决一个与计费或技术问题无关的问题。"]for ticket_text in test_tickets:# 初始化每个工单的状态initial_state: TicketState = {"ticket_text": ticket_text,"category": "","resolution": "","processing_time": 0.0}print(f"\n=== 正在处理工单:'{ticket_text}' ===")app = build_router_graph()start_time = time.time()result = app.invoke(initial_state, config=RunnableConfig())total_time = time.time() - start_timeprint("\n=== 工单结果 ===")print(f"类别:{result['category']}")print(f"解决方案:{result['resolution']}")print(f"总处理时间:{result['processing_time']:.2f} 秒")print(f"总运行时间:{total_time:.2f} 秒")print("-" * 50)if __name__ == "__main__":main()

输出:

=== 正在处理工单:'我对我的最后一张发票有计费问题。似乎我被多收费了。' ===
路由器智能体:分析工单...
路由器智能体:在 0.00 秒内将工单分类为 'Billing'
路由:工单类别为 'Billing'
计费团队智能体:处理工单...
计费团队智能体:在 0.00 秒内完成=== 工单结果 ===
类别:Billing
解决方案:计费团队:已审阅工单 '我对我的最后一张发票有计费问题。似乎我被多收费了。'。请检查您的发票详情,或联系我们的计费部门以获取进一步帮助。
总处理时间:0.00 秒
总运行时间:1.03 秒
--------------------------------------------------=== 正在处理工单:'我的应用程序不断出现技术错误。请帮忙!' ===
路由器智能体:分析工单...
路由器智能体:在 0.00 秒内将工单分类为 'Technical'
路由:工单类别为 'Technical'
技术支持团队智能体:处理工单...
技术支持团队智能体:在 0.00 秒内完成=== 工单结果 ===
类别:Technical
解决方案:技术支持团队:已审阅工单 '我的应用程序不断出现技术错误。请帮忙!'。请尝试重新启动您的设备,或提交详细的错误日志以便进一步调查。
总处理时间:0.00 秒
总运行时间:1.50 秒
--------------------------------------------------=== 正在处理工单:'我对您的服务有一个一般性问题。能否提供更多信息?' ===
路由器智能体:分析工单...
路由器智能体:在 0.00 秒内将工单分类为 'General'
路由:工单类别为 'General'
一般团队智能体:处理工单...
一般团队智能体:在 0.00 秒内完成=== 工单结果 ===
类别:General
解决方案:一般团队:已审阅工单 '我对您的服务有一个一般性问题。能否提供更多信息?'。有关更多信息,请参阅我们的常见问题解答,或通过电子邮件联系我们。
总处理时间:0.00 秒
总运行时间:0.80 秒
--------------------------------------------------=== 正在处理工单:'我需要帮助解决一个与计费或技术问题无关的问题。' ===
路由器智能体:分析工单...
路由器智能体:在 0.00 秒内将工单分类为 'Billing'
路由:工单类别为 'Billing'
计费团队智能体:处理工单...
计费团队智能体:在 0.00 秒内完成=== 工单结果 ===
类别:Billing
解决方案:计费团队:已审阅工单 '我需要帮助解决一个与计费或技术问题无关的问题。'。请检查您的发票详情,或联系我们的计费部门以获取进一步帮助。
总处理时间:0.00 秒
总运行时间:1.00 秒
--------------------------------------------------
  • 动态路由:路由器智能体确定工单类别,route_ticket 函数使用 add_conditional_edges 指导工作流流向适当的节点。
  • 基于条件的流程:与并行模式不同(多个节点同时运行),路由器模式仅根据条件(类别)执行一个路径。
  • 可扩展性:通过扩展节点并更新 route_ticket 函数以处理新类别,可以轻松添加更多支持团队。

2.1.5 聚合器(或合成器)

智能体贡献输出,这些输出由聚合器智能体收集并合成最终结果。

None

图片来源:Weaviate

示例:社交媒体情感分析聚合器

None

图由作者提供

代码:

from typing import Dict, Any, TypedDict, List
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableConfig
from textblob import TextBlob
import time
from typing_extensions import Annotated
from operator import add# 第一步:定义状态
class SocialMediaState(TypedDict):twitter_posts: List[str]instagram_posts: List[str]reddit_posts: List[str]twitter_sentiment: Dict[str, float]instagram_sentiment: Dict[str, float]reddit_sentiment: Dict[str, float]final_report: strprocessing_time: Annotated[float, add]# 第二步:定义帖子收集智能体
def collect_twitter_posts(state: SocialMediaState) -> Dict[str, Any]:print("推特智能体:收集帖子...")start_time = time.time()posts = ["Loving the new product from this brand! Amazing quality.","Terrible customer service from this brand. Very disappointed."]time.sleep(1)  # 模拟处理时间processing_time = time.time() - start_time  # 包括 time.sleep 在处理时间中print(f"推特智能体:在 {processing_time:.2f} 秒内完成")return {"twitter_posts": posts,"processing_time": processing_time}def collect_instagram_posts(state: SocialMediaState) -> Dict[str, Any]:print("Instagram 智能体:收集帖子...")start_time = time.time()posts = ["Beautiful design by this brand! #loveit","Not impressed with the latest release. Expected better."]time.sleep(1.2)  # 模拟处理时间processing_time = time.time() - start_timeprint(f"Instagram 智能体:在 {processing_time:.2f} 秒内完成")return {"instagram_posts": posts,"processing_time": processing_time}def collect_reddit_posts(state: SocialMediaState) -> Dict[str, Any]:print("Reddit 智能体:收集帖子...")start_time = time.time()posts = ["This brand is awesome! Great value for money.","Had a bad experience with their support team. Not happy."]time.sleep(0.8)  # 模拟处理时间processing_time = time.time() - start_timeprint(f"Reddit 智能体:在 {processing_time:.2f} 秒内完成")return {"reddit_posts": posts,"processing_time": processing_time}# 第三步:定义情感分析智能体
def analyze_twitter_sentiment(state: SocialMediaState) -> Dict[str, Any]:print("推特情感分析智能体:分析情感...")start_time = time.time()posts = state["twitter_posts"]polarities = [TextBlob(post).sentiment.polarity for post in posts]avg_polarity = sum(polarities) / len(polarities) if polarities else 0.0time.sleep(0.5)  # 模拟处理时间processing_time = time.time() - start_timeprint(f"推特情感分析智能体:在 {processing_time:.2f} 秒内完成")return {"twitter_sentiment": {"average_polarity": avg_polarity, "num_posts": len(posts)},"processing_time": processing_time}def analyze_instagram_sentiment(state: SocialMediaState) -> Dict[str, Any]:print("Instagram 情感分析智能体:分析情感...")start_time = time.time()posts = state["instagram_posts"]polarities = [TextBlob(post).sentiment.polarity for post in posts]avg_polarity = sum(polarities) / len(polarities) if polarities else 0.0time.sleep(0.6)  # 模拟处理时间processing_time = time.time() - start_timeprint(f"Instagram 情感分析智能体:在 {processing_time:.2f} 秒内完成")return {"instagram_sentiment": {"average_polarity": avg_polarity, "num_posts": len(posts)},"processing_time": processing_time}def analyze_reddit_sentiment(state: SocialMediaState) -> Dict[str, Any]:print("Reddit 情感分析智能体:分析情感...")start_time = time.time()posts = state["reddit_posts"]polarities = [TextBlob(post).sentiment.polarity for post in posts]avg_polarity = sum(polarities) / len(polarities) if polarities else 0.0time.sleep(0.4)  # 模拟处理时间processing_time = time.time() - start_timeprint(f"Reddit 情感分析智能体:在 {processing_time:.2f} 秒内完成")return {"reddit_sentiment": {"average_polarity": avg_polarity, "num_posts": len(posts)},"processing_time": processing_time}# 第四步:定义聚合智能体
def aggregate_results(state: SocialMediaState) -> Dict[str, Any]:print("聚合智能体:生成最终报告...")start_time = time.time()twitter_sentiment = state["twitter_sentiment"]instagram_sentiment = state["instagram_sentiment"]reddit_sentiment = state["reddit_sentiment"]total_posts = (twitter_sentiment["num_posts"] +instagram_sentiment["num_posts"] +reddit_sentiment["num_posts"])weighted_polarity = (twitter_sentiment["average_polarity"] * twitter_sentiment["num_posts"] +instagram_sentiment["average_polarity"] * instagram_sentiment["num_posts"] +reddit_sentiment["average_polarity"] * reddit_sentiment["num_posts"]) / total_posts if total_posts > 0 else 0.0overall_sentiment = ("Positive" if weighted_polarity > 0 else"Negative" if weighted_polarity < 0 else "Neutral")report = (f"Overall Sentiment: {overall_sentiment} (Average Polarity: {weighted_polarity:.2f})\n"f"Twitter Sentiment: {twitter_sentiment['average_polarity']:.2f} (Posts: {twitter_sentiment['num_posts']})\n"f"Instagram Sentiment: {instagram_sentiment['average_polarity']:.2f} (Posts: {instagram_sentiment['num_posts']})\n"f"Reddit Sentiment: {reddit_sentiment['average_polarity']:.2f} (Posts: {reddit_sentiment['num_posts']})")time.sleep(0.3)  # 模拟处理时间processing_time = time.time() - start_timeprint(f"聚合智能体:在 {processing_time:.2f} 秒内完成")return {"final_report": report,"processing_time": processing_time}# 第五步:构建带有聚合器模式的图
def build_aggregator_graph() -> StateGraph:workflow = StateGraph(SocialMediaState)# 添加收集帖子的节点workflow.add_node("collect_twitter", collect_twitter_posts)workflow.add_node("collect_instagram", collect_instagram_posts)workflow.add_node("collect_reddit", collect_reddit_posts)# 添加情感分析节点workflow.add_node("analyze_twitter", analyze_twitter_sentiment)workflow.add_node("analyze_instagram", analyze_instagram_sentiment)workflow.add_node("analyze_reddit", analyze_reddit_sentiment)# 添加聚合节点workflow.add_node("aggregate", aggregate_results)# 添加一个分支节点以并行触发所有收集节点workflow.add_node("branch", lambda state: state)# 将入口点设置为分支节点workflow.set_entry_point("branch")# 添加从分支到收集节点的边(并行执行)workflow.add_edge("branch", "collect_twitter")workflow.add_edge("branch", "collect_instagram")workflow.add_edge("branch", "collect_reddit")# 添加从收集到情感分析的边workflow.add_edge("collect_twitter", "analyze_twitter")workflow.add_edge("collect_instagram", "analyze_instagram")workflow.add_edge("collect_reddit", "analyze_reddit")# 添加从情感分析到聚合器的边workflow.add_edge("analyze_twitter", "aggregate")workflow.add_edge("analyze_instagram", "aggregate")workflow.add_edge("analyze_reddit", "aggregate")# 添加从聚合器到 END 的边workflow.add_edge("aggregate", END)return workflow.compile()# 第六步:运行工作流
def main():initial_state: SocialMediaState = {"twitter_posts": [],"instagram_posts": [],"reddit_posts": [],"twitter_sentiment": {"average_polarity": 0.0, "num_posts": 0},"instagram_sentiment": {"average_polarity": 0.0, "num_posts": 0},"reddit_sentiment": {"average_polarity": 0.0, "num_posts": 0},"final_report": "","processing_time": 0.0}print("\n开始社交媒体情感分析...")app = build_aggregator_graph()start_time = time.time()config = RunnableConfig(parallel=True)result = app.invoke(initial_state, config=config)total_time = time.time() - start_timeprint("\n=== 情感分析结果 ===")print(result["final_report"])print(f"\n总处理时间:{result['processing_time']:.2f} 秒")print(f"总运行时间:{total_time:.2f} 秒")if __name__ == "__main__":main()

输出:

开始社交媒体情感分析...
Instagram 智能体:收集帖子...
Reddit 智能体:收集帖子...
推特智能体:收集帖子...
Reddit 智能体:在 0.80 秒内完成
推特智能体:在 1.00 秒内完成
Instagram 智能体:在 1.20 秒内完成
Instagram 情感分析智能体:分析情感...
Reddit 情感分析智能体:分析情感...
推特情感分析智能体:分析情感...
Reddit 情感分析智能体:在 0.40 秒内完成
推特情感分析智能体:在 0.50 秒内完成
Instagram 情感分析智能体:在 0.60 秒内完成
聚合智能体:生成最终报告...
聚合智能体:在 0.30 秒内完成=== 情感分析结果 ===
Overall Sentiment: Positive (Average Polarity: 0.15)
Twitter Sentiment: -0.27 (Posts: 2)
Instagram Sentiment: 0.55 (Posts: 2)
Reddit Sentiment: 0.18 (Posts: 2)总处理时间:4.80 秒
总运行时间:2.13 秒
  • 并行执行:收集和分析节点并行运行,减少了总运行时间(2.1 秒),与各个处理时间之和(3.8 秒)相比。
  • 聚合:聚合节点将情感分析结果合并为最终报告,计算整体情感,并按平台提供分解。

2.1.6 网络(或水平)

智能体以多对多的方式直接相互通信,形成一个去中心化的网络。

None

图片来源:Weaviate

这种架构适用于没有明确的智能体层级结构或智能体调用顺序的问题。

from typing import Literal
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START, END
model = ChatOpenAI()
def agent_1(state: MessagesState) -> Command[Literal["agent_2", "agent_3", END]]:# 你可以将相关部分的状态传递给 LLM(例如,state["messages"])# 以确定要调用的下一个智能体。一种常见模式是调用模型# 并要求其返回一个结构化输出(例如,强制其返回一个包含 "next_agent" 字段的输出)response = model.invoke(...)# 根据 LLM 的决策,路由到其中一个智能体或退出# 如果 LLM 返回 "__end__",图将结束执行return Command(goto=response["next_agent"],update={"messages": [response["content"]]},)
def agent_2(state: MessagesState) -> Command[Literal["agent_1", "agent_3", END]]:response = model.invoke(...)return Command(goto=response["next_agent"],update={"messages": [response["content"]]},)
def agent_3(state: MessagesState) -> Command[Literal["agent_1", "agent_2", END]]:...return Command(goto=response["next_agent"],update={"messages": [response["content"]]},)
builder = StateGraph(MessagesState)
builder.add_node(agent_1)
builder.add_node(agent_2)
builder.add_node(agent_3)
builder.add_edge(START, "agent_1")
network = builder.compile()

API 参考:ChatOpenAI | StateGraph | START | END

优点:分布式协作和群体驱动的决策制定。即使某些智能体失败,系统仍然可以正常运行。

缺点:管理智能体之间的通信可能会变得复杂。更多的通信可能会导致效率低下,并且智能体可能会重复工作。

2.1.7 交接

在多智能体架构中,智能体可以表示为图的节点。每个智能体节点执行其步骤,并决定是结束执行还是路由到另一个智能体,包括可能路由到自身(例如,在循环中运行)。多智能体交互中的一种常见模式是交接,其中一个智能体将控制权移交给另一个智能体。交接允许你指定:

  • 目标:目标智能体(例如,要导航到的节点名称)
  • 负载:要传递给该智能体的信息(例如,状态更新)

None

图片来源:aka.ms/ai-agents-beginners

要在 LangGraph 中实现交接,智能体节点可以返回一个 Command 对象,该对象允许你同时控制流程和状态更新:

def agent(state) -> Command[Literal["agent", "another_agent"]]:# 路由/停止的条件可以是任何内容,例如 LLM 工具调用 / 结构化输出等。goto = get_next_agent(...)  # 'agent' / 'another_agent'return Command(# 指定要调用的下一个智能体goto=goto,# 更新图状态update={"my_state_key": "my_state_value"})

在一个更复杂的场景中,每个智能体节点本身就是一个图(即,一个子图),一个子图中的节点可能希望导航到另一个智能体。例如,如果你有两个智能体,alicebob(父图中的子图节点),并且 alice 需要导航到 bob,你可以在 Command 对象中设置 graph=Command.PARENT

def some_node_inside_alice(state)return Command(goto="bob",update={"my_state_key": "my_state_value"},# 指定要导航到的图(默认为当前图)graph=Command.PARENT,)

注意

如果你需要为子图之间的通信使用 Command(graph=Command.PARENT) 提供可视化支持,你需要将它们包装在一个带有 Command 注解的节点函数中,例如,而不是这样:

builder.add_node(alice)

你需要这样做:

def call_alice(state) -> Command[Literal["bob"]]:return alice.invoke(state)builder.add_node("alice", call_alice)

交接作为工具

最常见的智能体类型之一是 ReAct 风格的工具调用智能体。对于这种类型的智能体,一个常见模式是将交接包装在一个工具调用中,例如:

def transfer_to_bob(state):"""移交到 bob。"""return Command(goto="bob",update={"my_state_key": "my_state_value"},graph=Command.PARENT,)

这是从工具中更新图状态的一个特殊情况,除了状态更新外,还包括控制流程。

重要

如果你希望使用返回 Command 的工具,你可以使用预构建的 create_react_agent /ToolNode 组件,或者自己实现一个工具执行节点,该节点收集工具返回的 Command 对象并返回一个列表,例如:

def call_tools(state):...commands = [tools_by_name[tool_call["name"]].invoke(tool_call) for tool_call in tool_calls]return commands

让我们更仔细地看看不同的多智能体架构。

2.1.8 监督者

在这种架构中,我们将智能体定义为节点,并添加一个监督者节点(LLM),该节点决定应该调用哪些智能体节点。我们使用 Command 根据监督者的决策将执行路由到适当的智能体节点。这种架构也非常适合并行运行多个智能体,或者使用Map-Reduce模式。

from typing import Literal
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START, ENDmodel = ChatOpenAI()def supervisor(state: MessagesState) -> Command[Literal["agent_1", "agent_2", END]]:# 你可以将相关部分的状态传递给 LLM(例如,state["messages"])# 以确定要调用的下一个智能体。一种常见模式是调用模型# 并要求其返回一个结构化输出(例如,强制其返回一个包含 "next_agent" 字段的输出)response = model.invoke(...)# 根据监督者的决策,路由到其中一个智能体或退出# 如果监督者返回 "__end__",图将结束执行return Command(goto=response["next_agent"])def agent_1(state: MessagesState) -> Command[Literal["supervisor"]]:# 你可以将相关部分的状态传递给 LLM(例如,state["messages"])# 并添加任何额外的逻辑(不同的模型、自定义提示、结构化输出等)response = model.invoke(...)return Command(goto="supervisor",update={"messages": [response]},)def agent_2(state: MessagesState) -> Command[Literal["supervisor"]]:response = model.invoke(...)return Command(goto="supervisor",update={"messages": [response]},)builder = StateGraph(MessagesState)
builder.add_node(supervisor)
builder.add_node(agent_1)
builder.add_node(agent_2)builder.add_edge(START, "supervisor")supervisor = builder.compile()

API 参考:ChatOpenAI | StateGraph | START | END

查看这个 教程,了解监督者多智能体架构的示例。

2.1.9 监督者(工具调用)

这是 监督者 架构的一个变体,我们将单独的智能体定义为 工具,并在监督者节点中使用工具调用 LLM。这可以实现为一个 ReAct-风格的智能体图,其中包含两个节点——一个 LLM 节点(监督者)和一个工具调用节点,用于执行工具(在这种情况下是智能体)。

from typing import Annotated
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import InjectedState, create_react_agentmodel = ChatOpenAI()# 这是将被调用为工具的智能体函数
# 注意,你可以通过 InjectedState 注解将状态传递给工具
def agent_1(state: Annotated[dict, InjectedState]):# 你可以将相关部分的状态传递给 LLM(例如,state["messages"])# 并添加任何额外的逻辑(不同的模型、自定义提示、结构化输出等)response = model.invoke(...)# 以字符串形式返回 LLM 响应(预期的工具响应格式)# 这将被预构建的 create_react_agent(监督者)自动转换为 ToolMessagereturn response.contentdef agent_2(state: Annotated[dict, InjectedState]):response = model.invoke(...)return response.contenttools = [agent_1, agent_2]
# 使用预构建的 ReAct 智能体图来构建带有工具调用的监督者
# 该图由一个工具调用 LLM 节点(即监督者)和一个工具执行节点组成
supervisor = create_react_agent(model, tools)

API 参考:ChatOpenAI | InjectedState | create_react_agent

2.1.10 层级(或垂直)

智能体以树状结构组织,高层智能体(监督者智能体)管理低层智能体。

None

图片来源:Weaviate

随着你向系统中添加更多智能体,监督者可能难以管理所有智能体。监督者可能开始对调用下一个智能体做出错误决策,上下文可能变得过于复杂,以至于单个监督者难以跟踪。换句话说,你最终会遇到最初促使你采用多智能体架构的相同问题。

为了解决这个问题,你可以设计一个分层的系统。例如,你可以创建由单独的监督者管理的独立、专业化的智能体团队,并由一个顶层监督者来管理这些团队。

from typing import Literal
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.types import Command
model = ChatOpenAI()# 定义团队 1(与上面的单监督者示例相同)def team_1_supervisor(state: MessagesState) -> Command[Literal["team_1_agent_1", "team_1_agent_2", END]]:response = model.invoke(...)return Command(goto=response["next_agent"])def team_1_agent_1(state: MessagesState) -> Command[Literal["team_1_supervisor"]]:response = model.invoke(...)return Command(goto="team_1_supervisor", update={"messages": [response]})def team_1_agent_2(state: MessagesState) -> Command[Literal["team_1_supervisor"]]:response = model.invoke(...)return Command(goto="team_1_supervisor", update={"messages": [response]})team_1_builder = StateGraph(Team1State)
team_1_builder.add_node(team_1_supervisor)
team_1_builder.add_node(team_1_agent_1)
team_1_builder.add_node(team_1_agent_2)
team_1_builder.add_edge(START, "team_1_supervisor")
team_1_graph = team_1_builder.compile()# 定义团队 2(与上面的单监督者示例相同)
class Team2State(MessagesState):next: Literal["team_2_agent_1", "team_2_agent_2", "__end__"]def team_2_supervisor(state: Team2State):...def team_2_agent_1(state: Team2State):...def team_2_agent_2(state: Team2State):...team_2_builder = StateGraph(Team2State)
...
team_2_graph = team_2_builder.compile()# 定义顶层监督者builder = StateGraph(MessagesState)
def top_level_supervisor(state: MessagesState) -> Command[Literal["team_1_graph", "team_2_graph", END]]:# 你可以将相关部分的状态传递给 LLM(例如,state["messages"])# 以确定要调用的下一个团队。一种常见模式是调用模型# 并要求其返回一个结构化输出(例如,强制其返回一个包含 "next_team" 字段的输出)response = model.invoke(...)# 根据监督者的决策,路由到其中一个团队或退出# 如果监督者返回 "__end__",图将结束执行return Command(goto=response["next_team"])builder = StateGraph(MessagesState)
builder.add_node(top_level_supervisor)
builder.add_node("team_1_graph", team_1_graph)
builder.add_node("team_2_graph", team_2_graph)
builder.add_edge(START, "top_level_supervisor")
builder.add_edge("team_1_graph", "top_level_supervisor")
builder.add_edge("team_2_graph", "top_level_supervisor")
graph = builder.compile()

优点:在不同层级的智能体之间有明确的角色和职责划分。通信顺畅。适用于具有结构化决策流程的大型系统。

缺点:上层的失败可能会破坏整个系统。下层智能体的独立性有限。

API 参考:ChatOpenAI | StateGraph | START | END | Command

2.1.11 自定义多智能体工作流

每个智能体仅与部分智能体通信。流程的一部分是确定性的,只有部分智能体可以决定接下来调用哪些智能体。

在这种架构中,我们将各个智能体作为图节点添加,并提前定义智能体被调用的顺序,形成一个自定义工作流。在 LangGraph 中,可以通过以下两种方式定义工作流:

  • 显式控制流(普通边):LangGraph 允许你通过普通图边显式定义应用程序的控制流(即智能体之间的通信顺序)。这是上述架构中最确定性的变体 —— 我们可以提前知道下一个将被调用的智能体。
  • 动态控制流(Command):在 LangGraph 中,你可以允许 LLM 决定应用程序控制流的一部分。这可以通过使用Command来实现。一个特殊情况是带有工具调用的监督者架构。在这种情况下,为监督者智能体提供支持的工具调用 LLM 将决定工具(智能体)被调用的顺序。
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, STARTmodel = ChatOpenAI()def agent_1(state: MessagesState):response = model.invoke(...)return {"messages": [response]}def agent_2(state: MessagesState):response = model.invoke(...)return {"messages": [response]}builder = StateGraph(MessagesState)
builder.add_node(agent_1)
builder.add_node(agent_2)
# 显式定义流程
builder.add_edge(START, "agent_1")
builder.add_edge("agent_1", "agent_2")

API 参考:ChatOpenAI | StateGraph | START

3. 智能体之间的通信

构建多智能体系统时,最重要的是弄清楚智能体之间如何通信。有以下几点需要考虑:

3.1 图状态与工具调用

在智能体之间传递的“负载”是什么?在上述讨论的大多数架构中,智能体通过图状态进行通信。在带有工具调用的监督者的情况下,负载是工具调用参数。

None

图片来源:Langchain

图状态

为了通过图状态进行通信,各个智能体需要被定义为图节点。这些可以被添加为函数或整个子图。在图执行的每一步中,智能体节点接收当前图状态,执行智能体代码,然后将更新后的状态传递给下一个节点。

通常,智能体节点共享一个单一的状态模式。然而,你可能希望设计具有不同状态模式的智能体节点。

3.2 不同的状态模式

一个智能体可能需要拥有与其他智能体不同的状态模式。例如,一个搜索智能体可能只需要跟踪查询和检索到的文档。在 LangGraph 中,可以通过以下两种方式实现:

  • 定义具有独立状态模式的子图智能体。如果子图和父图之间没有共享状态键(通道),则需要添加输入/输出转换,以便父图知道如何与子图通信。
  • 定义具有私有输入状态模式的智能体节点函数,该模式与整体图状态模式不同。这允许传递仅用于执行该特定智能体的信息。

3.3 共享消息列表

智能体之间通信的最常见方式是通过共享状态通道,通常是一个消息列表。这假设状态中至少有一个通道(键)被智能体共享。当通过共享消息列表进行通信时,还有一个额外的考虑:智能体是共享完整的对话历史记录,还是只共享最终结果?

None

图片来源:Langchain

共享完整历史记录

智能体可以共享完整的思考过程(即“草稿”)与其他所有智能体。这个“草稿”通常看起来像一个消息列表。共享完整思考过程的好处是,它可能有助于其他智能体做出更好的决策,并提高整个系统的推理能力。缺点是,随着智能体数量和复杂性的增加,“草稿”会迅速增长,可能需要额外的内存管理策略。

仅共享最终结果

智能体可以有自己的私有“草稿”,并且只共享最终结果与其余智能体。这种方法可能更适合拥有许多智能体或智能体更复杂的系统。在这种情况下,你需要定义具有不同状态模式的智能体。

对于作为工具被调用的智能体,监督者根据工具模式确定输入。此外,LangGraph 允许在运行时将状态传递给工具,以便从属智能体在需要时可以访问父状态。

4. 结论

多智能体 LLM 系统提供了一个强大的范式,用于通过利用并行、顺序、路由器和聚合器工作流等多种架构模式来解决复杂任务,正如我们在本博客中所探讨的那样。

在这里插入图片描述

通过详细检查共享状态、消息列表和工具调用等通信机制,我们看到了智能体如何协作以实现无缝协调。

原作者:vipra_singh-https://medium.com/@vipra_singh/ai-agents-multi-agent-architectures

相关文章:

AI Agents系列之构建多智能体系统

&#x1f9e0; 向所有学习者致敬&#xff01; “学习不是装满一桶水&#xff0c;而是点燃一把火。” —— 叶芝 我的博客主页&#xff1a; https://lizheng.blog.csdn.net &#x1f310; 欢迎点击加入AI人工智能社区&#xff01; &#x1f680; 让我们一起努力&#xff0c;共创…...

OJ笔试强训_1至24天

OJ笔试强训 Day01 [NOIP2010]数字统计_牛客题霸_牛客网 点击消除_牛客题霸_牛客网 两个数组的交集_牛客题霸_牛客网 Day02 牛牛的快递_牛客题霸_牛客网 最小花费爬楼梯_牛客题霸_牛客网 数组中两个字符串的最小距离__牛客网 Day03 简写单词_牛客题霸_牛客网 dd爱框框_…...

3款顶流云电脑与传统电脑性能PK战:START云游戏/无影云/ToDesk云电脑谁更流畅?

这里写目录标题 一、前言二、本地机器配置环境三、START云游戏/无影云/ToDesk云电脑配置对比3.1 START云游戏3.2 无影云个人版3.3 ToDesk云电脑 四、本地电脑与云电脑性能实战4.1 游戏场景体验4.1.1 本地电脑测试4.1.2 云电脑测试英雄联盟黑神话悟空其他游戏 4.2 主流设计场景体…...

java IO/NIO/AIO

(✪▽✪)曼波~~~~&#xff01;让曼波用最可爱的赛马娘方式给你讲解吧&#xff01;(⁄ ⁄•⁄ω⁄•⁄ ⁄) &#x1f3a0;曼波思维导图大冲刺&#xff08;先看框架再看细节哦&#xff09;&#xff1a; &#x1f4da; 解释 Java 中 IO、NIO、AIO 的区别和适用场景&#xff1a; …...

java输出、输入语句

先创建一个用于测试的java 编写程序 #java.util使java标准库的一个包&#xff0c;这里拉取Scanner类 import java.util.Scanner;public class VariableTest {public static void main(String[] args) {#创建一个 Scanner 对象Scanner scanner new Scanner(System.in);System.…...

宏基因组产品升级——抗菌肽数据库APD

抗菌肽&#xff08;Antimicrobial Peptides&#xff0c;简称AMPs&#xff09;是一类存在于多种生物体中的天然分子。它们在抵御微生物感染中扮演着重要角色&#xff0c;发挥着先天免疫反应的作用。抗菌肽功能分类广泛&#xff0c;包括&#xff1a;抗菌&#xff0c;抗生物膜&…...

大数据面试问答-Spark

1. Spark 1.1 Spark定位 "Apache Spark是一个基于内存的分布式计算框架&#xff0c;旨在解决Hadoop MapReduce在迭代计算和实时处理上的性能瓶颈。 1.2 核心架构 Spark架构中有三个关键角色&#xff1a; Driver&#xff1a;解析代码生成DAG&#xff0c;协调任务调度&a…...

线程池七个参数的含义

Java中的线程池里七个参数的以及其各自的含义 面试题&#xff1a;说一下线程池七个参数的含义&#xff1f; 所谓的线程池的 7 大参数是指&#xff0c;在使用 ThreadPoolExecutor 创建线程池时所设置的 7 个参数&#xff0c;如以下源码所示&#xff1a; public ThreadPoolExe…...

Windows suwellofd 阅读器-v5.0.25.0320

Windows suwellofd 阅读器 链接&#xff1a;https://pan.xunlei.com/s/VOO7tUkTHHTTjSe39CeVkUHbA1?pwd3ibx# OFD(Open Fixed-layout Document) &#xff0c; 数科OFD阅读器支持国标版式、可信阅读、是电子发票、电子证照&#xff0c;电子病历等电子文件理想阅读工具。 多格…...

三大等待和三大切换

三大等待 1、三大等待&#xff1a;等待的方式有三种&#xff1a;强制等待&#xff0c;隐性等待&#xff0c;显性等待。 1、强制等待&#xff1a;time.sleep(2)&#xff0c;秒 优点&#xff1a;使用简单缺点&#xff1a;等待时间把握不准&#xff0c;容易造成时间浪费或者等待时…...

告别定时任务!用Dagster监听器实现秒级数据响应自动化

在数据管道开发中&#xff0c;我们经常面临需要根据外部事件触发计算任务的场景。传统基于时间的调度方式存在资源浪费和时效性不足的问题。本文将通过Dagster的**传感器&#xff08;Sensor&#xff09;**功能&#xff0c;演示如何构建事件驱动的数据处理流程。 场景模拟&…...

一文读懂WPF系列之MVVM

WPF MVVM 什么是MVVMWPF为何使用MVVM机制WPFMVVM 的实现手段 INotifyPropertyChanged​数据绑定的源端通知​​原理 PropertyChanged事件双向绑定的完整条件常见疑惑问题 什么是MVVM 翻译全称就是 model-view-viewmodel 3部分内容 以wpf的概念角度来解释就是 数据库数据源模型…...

【Unity】打包TextMeshPro的字体

前言 在Unity中&#xff0c;TextMeshPro与常规 Text 组件相比提供了更高级的文本呈现功能&#xff0c;TextMesh Pro 可以处理各种语言&#xff0c;包括中文。我们可以轻松地在 Unity 项目中使用中文&#xff0c;而不必担心字体和布局问题。TextMeshPro需要的字体资源就需要我们…...

51单片机实验五:A/D和D/A转换

一、实验环境与实验器材 环境&#xff1a;Keli&#xff0c;STC-ISP烧写软件,Proteus. 器材&#xff1a;TX-1C单片机&#xff08;STC89C52RC&#xff09;、电脑。 二、 实验内容及实验步骤 1.A/D转换 概念&#xff1a;模数转换是将连续的模拟信号转换为离散的数字信…...

使用VHD虚拟磁盘安装双系统,避免磁盘分区

前言 很多时候&#xff0c;我们对现在的操作系统不满意,就想要自己安装一个双系统 但是安装双系统又涉及到硬盘分区,非常复杂,容易造成数据问题 虚拟机的话有经常用的不爽,这里其实有一个介于虚拟机和双系统之间的解决方法,就是使用虚拟硬盘文件安装系统. 相当于系统在机上…...

Kafka消费者端重平衡流程

重平衡的完整流程需要消费者 端和协调者组件共同参与才能完成。我们先从消费者的视角来审视一下重平衡的流程。在消费者端&#xff0c;重平衡分为两个步骤&#xff1a;分别是加入组和等待领导者消费者&#xff08;Leader Consumer&#xff09;分配方案。这两个步骤分别对应两类…...

Django之modelform使用

Django新增修改数据功能优化 目录 1.新增数据功能优化 2.修改数据功能优化 在我们做数据优化处理之前, 我们先回顾下传统的写法, 是如何实现增加修改的。 我们需要在templates里面新建前端的页面, 需要有新增还要删除, 比如说员工数据的新增, 那需要有很多个输入框, 那html…...

云轴科技ZStack入选中国人工智能产业发展联盟《大模型应用交付供应商名录》

2025年4月8日至9日&#xff0c;中国人工智能产业发展联盟&#xff08;以下简称AIIA&#xff09;第十四次全体会议暨人工智能赋能新型工业化深度行&#xff08;南京站&#xff09;在南京召开。工业和信息化部科技司副司长杜广达&#xff0c;中国信息通信研究院院长、中国人工智能…...

写论文时降AIGC和降重的一些注意事项

‘ 写一些研究成果&#xff0c;英文不是很好&#xff0c;用有道翻译过来句子很简单&#xff0c;句型很单一。那么你会考虑用ai吗&#xff1f; 如果语句太正式&#xff0c;高级&#xff0c;会被误判成aigc &#xff0c;慎重选择ai润色。 有的话就算没有用ai生成&#xff0c;但…...

AI 编程工具—如何在 Cursor 中集成使用 MCP工具

AI 编程工具—如何在 Cursor 中集成使用 MCP工具 这里我们给出了常用的MCP 聚合工具,也就是我们可以在这些网站找MCP服务 这是一个MCP Server共享平台,用户可以在上面发布和下载MCP Server配置。在这里可以选择你需要的MCP 服务。 如果你不知道你的mcp 对应的server 名称也不…...

基础算法篇(5)(蓝桥杯常考点)—动态规划(C/C++)

文章目录 动态规划前言线性dp路径类dp经典线性dp背包问题分类01背包问题完全背包问题多重背包分组背包问题混合背包问题多维费用的背包问题区间dp 动态规划 前言 在竞赛中&#xff0c;如果遇到动态规划的题目&#xff0c;只要不是经典题型&#xff0c;那么大概率就是以压轴题的…...

MLLMS_KNOW尝鲜版

背景&#xff08;个人流水账&#xff0c;可毫不犹豫跳过&#xff09; 最近项目中有涉及到小物体检测的内容&#xff0c;昨天晚上讨论的时候有提出是否可以将关注区域放大的idea&#xff0c;不过后来没有就着这个东西深入&#xff0c;结果好巧不巧地&#xff0c;今天关注到这篇…...

《软件设计师》复习笔记(12.2)——成本管理、配置管理

目录 一、项目成本管理 1. 定义 2. 主要过程 3. 成本类型 4. 其他概念 真题示例&#xff1a; 二、软件配置管理 1. 定义 2. 主要活动 3. 配置项 4. 基线&#xff08;Baseline&#xff09; 5. 配置库类型 真题示例&#xff1a; 一、项目成本管理 1. 定义 在批准…...

《AI赋能职场:大模型高效应用课》第8课 AI辅助职场沟通与协作

【本课目标】 掌握AI辅助邮件、沟通话术的优化技巧。学习利用AI快速生成高效的会议纪要。通过实操演练&#xff0c;提升职场沟通效率与协作能力。 【准备工具】 DeepSeek大模型&#xff08;deepseek.com&#xff09;百度文心一言&#xff08;yiyan.baidu.com&#xff09; 一…...

Spring 中的 @Cacheable 缓存注解

1 什么是缓存 第一个问题&#xff0c;首先要搞明白什么是缓存&#xff0c;缓存的意义是什么。 对于普通业务&#xff0c;如果要查询一个数据&#xff0c;一般直接select数据库进行查找。但是在高流量的情况下&#xff0c;直接查找数据库就会成为性能的瓶颈。因为数据库查找的…...

settimeout和setinterval区别

1. setTimeout&#xff1a;单次延迟执行 语法&#xff1a; const timeoutId setTimeout(callback, delay, arg1, arg2, ...); 核心功能&#xff1a;在指定的 delay&#xff08;毫秒&#xff09;后&#xff0c;执行一次 callback 函数。 参数&#xff1a; callback&#x…...

UE5编辑器静止状态下(非 Play 模式)睫毛和眼睛的渲染是正常的,而在 Play 模式下出现模糊

这通常指向以下几个 运行时&#xff08;Runtime&#xff09; 特有的原因&#xff1a; 抗锯齿 (Anti-Aliasing) 方法&#xff0c;特别是 Temporal Anti-Aliasing (TAA): 这是最可能的原因。 UE5 默认启用的 TAA 通过混合多帧信息来平滑边缘和减少闪烁&#xff0c;尤其是在运动中…...

怎样选择适合网站的服务器带宽?

合适的服务器带宽对于网站的需求起着至关重要的作用&#xff0c;服务器带宽会直接影响到网站的访问速度和用户体验&#xff0c;本文将介绍一下企业该怎样选择适合网站需求的服务器带宽&#xff01; 不同类型的网站对于服务器带宽的需求也是不同的&#xff0c;小型博客网站的访问…...

Kaamel隐私与安全分析报告:Microsoft Recall功能评估与风险控制

本报告对Microsoft最新推出的Recall功能进行了全面隐私与安全分析。Recall是Windows 11 Copilot电脑的专属AI功能&#xff0c;允许用户以自然语言搜索曾在电脑上查看过的内容。该功能在初次发布时因严重隐私和安全问题而备受争议&#xff0c;后经微软全面重新设计。我们的分析表…...

linux 4.14内核jffs2文件系统不自动释放空间的bug

前段时间在做spi-nor flash项目的时候&#xff0c;使用jffs2文件系统&#xff0c;发现在4.14内核下存在无法释放空间的bug&#xff0c;后来进行了修复&#xff0c;修复后功能正常&#xff0c;现将修复patch公开&#xff0c;供后来者学习&#xff1a; diff --git a/fs/jffs2/ac…...