当前位置: 首页 > article >正文

[拆解LangChain执行引擎]一个实例理解LangChain的几种流模式

invoke/ainvoke方法看起来是采用简单的请求/回复消息交换模式客户端需等待整个流程执行完毕后才能得到结果其实方法背后还是会调用stream/astream方法以流的方式进行交互。如果我们直接调用调用这两个方法并采用相应的流模式我们就能有效解决客户端长时间无响应的问题实时地得到对方的反馈。class Pregel( PregelProtocol[StateT, ContextT, InputT, OutputT], Generic[StateT, ContextT, InputT, OutputT]): def stream( self, input: InputT | Command | None, config: RunnableConfig | None None, *, context: ContextT | None None, stream_mode: StreamMode | Sequence[StreamMode] | None None, print_mode: StreamMode | Sequence[StreamMode] (), output_keys: str | Sequence[str] | None None, interrupt_before: All | Sequence[str] | None None, interrupt_after: All | Sequence[str] | None None, durability: Durability | None None, subgraphs: bool False, debug: bool | None None, **kwargs: Unpack[DeprecatedKwargs], ) - Iterator[dict[str, Any] | Any] async def astream( self, input: InputT | Command | None, config: RunnableConfig | None None, *, context: ContextT | None None, stream_mode: StreamMode | Sequence[StreamMode] | None None, print_mode: StreamMode | Sequence[StreamMode] (), output_keys: str | Sequence[str] | None None, interrupt_before: All | Sequence[str] | None None, interrupt_after: All | Sequence[str] | None None, durability: Durability | None None, subgraphs: bool False, debug: bool | None None, **kwargs: Unpack[DeprecatedKwargs], ) - AsyncIterator[dict[str, Any] | Any]在stream/astream方法的众多参数中表示流模式的stream_mode参数最为重要其对应类型StreamMode以字符串字面量的形式定义了七个选项。流是Pregel引擎向调用者提供数据的基本工作方法它采用订阅发布的形式。Pregel对象发布的内容由对它订阅决定因为发布客户端不敢兴趣的内容不但毫无意义而且还会对影响造成极大的影响。StreamMode Literal[ values, updates, checkpoints, tasks, debug, messages, custom ]在调用stream/astream方法时我们可以根据需要指定一个或者多个流模式以StreamMode序列的形式。如果没有显式设置NonePregel对象自身的stream_mode字段会作为兜底该字典的默认值为“values。如果当前Pregel对象以子图的形式被调用会默认使用values模式subgraphs参数用于控制是否希望得到子图的输出。下面列出了七种流模式对应的输出内容values在每个Superstep结束后输出全部Channel的值updates针对每个Node输出由它更新的Channel值checkpoints在创建新的Checkpoint的时候输出与get_state方法返回值具有相同结构的内容tasks在Node任务开始和结束的时候输出任务ID、Node名称和其他相关信息debug可以简单认为是tasks checkpointsmessages输出语言模型产生的Token和相关元数据开发者在Node处理函数中利用StreamWriter自行输出的内容如果混合使用多种流模式stream/astream方法会返回一个字典自带的Key表示当前输出采用的流模式。对于单一模式的调用会直接返回输出的内容。如果采用custom模式Node处理方法可以利用SteamWriter向客户端实时输出自定义的内容。StreamWriter和静态上下文一样都属于当前Runtime的一部分后者可以利用注入Node处理函数的RunnableConfig提取。下面演示程序会使用所有的流模式我们在每个Node的处理函数中利用StreamWriter输出当前的Node名称。from langgraph.checkpoint.memory import InMemorySaver from langgraph.pregel import Pregel, NodeBuilder from langgraph.channels import LastValue, BinaryOperatorAggregate import operator from functools import partial from langchain_core.runnables import RunnableConfig from typing import Any,Sequence from langgraph.runtime import Runtime from langgraph.types import StreamWriter,StreamMode from collections import defaultdict def handle(node: str, inputs: dict[str, Any], config: RunnableConfig) - list[str]: runtime:Runtime config[configurable].get(__pregel_runtime) writer:StreamWriter runtime.stream_writer writer(fnode {node} is called.) return [node] foo (NodeBuilder() .subscribe_to(foo,read False) .do(partial(handle, foo)) .write_to(bartriggered by foo) ) bar1 (NodeBuilder() .subscribe_to(bar,read False) .do(partial(handle, bar1)) .write_to(output) ) bar2 (NodeBuilder() .subscribe_to(bar,read False) .do(partial(handle, bar2)) .write_to(output)) app Pregel( nodes{foo: foo, bar1: bar1, bar2: bar2}, channels{ foo: LastValue(str), bar: LastValue(str), output: BinaryOperatorAggregate(list, operator.add), }, input_channels[foo], output_channels[output], checkpointerInMemorySaver(), ) config{configurable: {thread_id: 123}} stream_mode: Sequence[StreamMode] [values, updates,checkpoints,tasks,debug,custom] result: defaultdict[str, list[str]] defaultdict(list) for (mode,chunk) in app.stream(input{foo: None}, stream_mode stream_mode, configconfig): result[mode].append(chunk) for mode,chunks in result.items(): index 1 for chunk in chunks: print(f{index}.[{mode}] {chunk}) index 1 print()创建的Pregel由节点foo、bar1和bar2构成。节点foo率先执行bar1和bar2随后并行执行。我们将调用stream方法收集到的内容根据流模式分组进行输出。我们来分析一下如下的输出结果由于三个Node都会涉及到针对Channel的更新所以会有三个updates模式的输出。整个流程涉及三个Supperstep两个values模式的输出的全量的状态对应于后两个Superstep。三个Node对应三个任务所以具有六个tasks模式的输出反映这三个任务的开始和结束。经历的三个Superstep对应三次Checkpoint的创建所以我们能看到三个checkpoints模式的输出。六个tasks加三个checkpoints所以有九个debug模式的输出。三个Node中针对StreamWriter的调用对应三个custom模式的输出。1.[checkpoints] {config: {configurable: {checkpoint_ns: , thread_id: 123, checkpoint_id: 1f0fb144-d9ea-6dad-bfff-aaf158b4ced6}}, parent_config: None, values: {foo: None, output: []}, metadata: {source: input, step: -1, parents: {}}, next: [foo], tasks: [{id: d40d3fbc-0f70-33fe-1b31-51a3e12846fe, name: foo, interrupts: (), state: None}]} 2.[checkpoints] {config: {configurable: {checkpoint_ns: , thread_id: 123, checkpoint_id: 1f0fb144-d9ef-658f-8000-fe11b210fb05}}, parent_config: {configurable: {checkpoint_ns: , thread_id: 123, checkpoint_id: 1f0fb144-d9ea-6dad-bfff-aaf158b4ced6}}, values: {foo: None, bar: triggered by foo, output: []}, metadata: {source: loop, step: 0, parents: {}}, next: [bar1, bar2], tasks: [{id: ca9f30fb-ebea-87bf-074c-9a397616125a, name: bar1, interrupts: (), state: None}, {id: c4540722-5a7a-4df3-14e1-abe0fddc9d73, name: bar2, interrupts: (), state: None}]} 3.[checkpoints] {config: {configurable: {checkpoint_ns: , thread_id: 123, checkpoint_id: 1f0fb144-d9f3-6cbf-8001-e4219a2f4b58}}, parent_config: {configurable: {checkpoint_ns: , thread_id: 123, checkpoint_id: 1f0fb144-d9ef-658f-8000-fe11b210fb05}}, values: {foo: None, bar: triggered by foo, output: [bar1, bar2]}, metadata: {source: loop, step: 1, parents: {}}, next: [], tasks: []} 1.[debug] {step: -1, timestamp: 2026-01-27T00:08:26.86591800:00, type: checkpoint, payload: {config: {configurable: {checkpoint_ns: , thread_id: 123, checkpoint_id: 1f0fb144-d9ea-6dad-bfff-aaf158b4ced6}}, parent_config: None, values: {foo: None, output: []}, metadata: {source: input, step: -1, parents: {}}, next: [foo], tasks: [{id: d40d3fbc-0f70-33fe-1b31-51a3e12846fe, name: foo, interrupts: (), state: None}]}} 2.[debug] {step: 0, timestamp: 2026-01-27T00:08:26.86593300:00, type: task, payload: {id: d40d3fbc-0f70-33fe-1b31-51a3e12846fe, name: foo, input: {}, triggers: (foo,)}} 3.[debug] {step: 0, timestamp: 2026-01-27T00:08:26.86651600:00, type: task_result, payload: {id: d40d3fbc-0f70-33fe-1b31-51a3e12846fe, name: foo, error: None, result: {bar: triggered by foo}, interrupts: []}} 4.[debug] {step: 0, timestamp: 2026-01-27T00:08:26.86732600:00, type: checkpoint, payload: {config: {configurable: {checkpoint_ns: , thread_id: 123, checkpoint_id: 1f0fb144-d9ef-658f-8000-fe11b210fb05}}, parent_config: {configurable: {checkpoint_ns: , thread_id: 123, checkpoint_id: 1f0fb144-d9ea-6dad-bfff-aaf158b4ced6}}, values: {foo: None, bar: triggered by foo, output: []}, metadata: {source: loop, step: 0, parents: {}}, next: [bar1, bar2], tasks: [{id: ca9f30fb-ebea-87bf-074c-9a397616125a, name: bar1, interrupts: (), state: None}, {id: c4540722-5a7a-4df3-14e1-abe0fddc9d73, name: bar2, interrupts: (), state: None}]}} 5.[debug] {step: 1, timestamp: 2026-01-27T00:08:26.86733800:00, type: task, payload: {id: ca9f30fb-ebea-87bf-074c-9a397616125a, name: bar1, input: {}, triggers: (bar,)}} 6.[debug] {step: 1, timestamp: 2026-01-27T00:08:26.86734300:00, type: task, payload: {id: c4540722-5a7a-4df3-14e1-abe0fddc9d73, name: bar2, input: {}, triggers: (bar,)}} 7.[debug] {step: 1, timestamp: 2026-01-27T00:08:26.86804300:00, type: task_result, payload: {id: c4540722-5a7a-4df3-14e1-abe0fddc9d73, name: bar2, error: None, result: {output: [bar2]}, interrupts: []}} 8.[debug] {step: 1, timestamp: 2026-01-27T00:08:26.86811700:00, type: task_result, payload: {id: ca9f30fb-ebea-87bf-074c-9a397616125a, name: bar1, error: None, result: {output: [bar1]}, interrupts: []}} 9.[debug] {step: 1, timestamp: 2026-01-27T00:08:26.86853300:00, type: checkpoint, payload: {config: {configurable: {checkpoint_ns: , thread_id: 123, checkpoint_id: 1f0fb144-d9f3-6cbf-8001-e4219a2f4b58}}, parent_config: {configurable: {checkpoint_ns: , thread_id: 123, checkpoint_id: 1f0fb144-d9ef-658f-8000-fe11b210fb05}}, values: {foo: None, bar: triggered by foo, output: [bar1, bar2]}, metadata: {source: loop, step: 1, parents: {}}, next: [], tasks: []}} 1.[tasks] {id: d40d3fbc-0f70-33fe-1b31-51a3e12846fe, name: foo, input: {}, triggers: (foo,)} 2.[tasks] {id: d40d3fbc-0f70-33fe-1b31-51a3e12846fe, name: foo, error: None, result: {bar: triggered by foo}, interrupts: []} 3.[tasks] {id: ca9f30fb-ebea-87bf-074c-9a397616125a, name: bar1, input: {}, triggers: (bar,)} 4.[tasks] {id: c4540722-5a7a-4df3-14e1-abe0fddc9d73, name: bar2, input: {}, triggers: (bar,)} 5.[tasks] {id: c4540722-5a7a-4df3-14e1-abe0fddc9d73, name: bar2, error: None, result: {output: [bar2]}, interrupts: []} 6.[tasks] {id: ca9f30fb-ebea-87bf-074c-9a397616125a, name: bar1, error: None, result: {output: [bar1]}, interrupts: []} 1.[custom] node foo is called. 2.[custom] node bar2 is called. 3.[custom] node bar1 is called. 1.[updates] {foo: {bar: triggered by foo}} 2.[updates] {bar2: {output: [bar2]}} 3.[updates] {bar1: {output: [bar1]}} 1.[values] {foo: None, bar: triggered by foo, output: []} 2.[values] {foo: None, bar: triggered by foo, output: [bar1, bar2]}

相关文章:

[拆解LangChain执行引擎]一个实例理解LangChain的几种流模式

invoke/ainvoke方法看起来是采用简单的请求/回复消息交换模式,客户端需等待整个流程执行完毕后才能得到结果,其实方法背后还是会调用stream/astream方法以流的方式进行交互。如果我们直接调用调用这两个方法,并采用相应的流模式,我…...

手把手教你修改Sense插件源码:解决Elasticsearch 6.0+的Content-Type报错

深度改造Sense插件:解决Elasticsearch 6.0的Content-Type兼容性问题 当Elasticsearch升级到6.0版本后,许多开发者发现原本运行良好的Sense插件突然开始报错。这个问题的核心在于HTTP请求头部的Content-Type规范变更,而旧版Sense插件并未及时适…...

万象熔炉 | Anything XL实操手册:负向提示词避坑与高质量出图技巧

万象熔炉 | Anything XL实操手册:负向提示词避坑与高质量出图技巧 1. 工具概览:Anything XL能为你做什么 万象熔炉 | Anything XL是一款基于Stable Diffusion XL开发的本地图像生成工具,专门为二次元和通用风格图像生成而优化。它最大的特点…...

OpenClaw会议纪要助手:千问3.5-35B-A3B-FP8实时转录录音与生成待办事项

OpenClaw会议纪要助手:千问3.5-35B-A3B-FP8实时转录录音与生成待办事项 1. 为什么需要自动化会议纪要 每次开完远程会议,最头疼的就是整理录音和待办事项。上周三的跨部门协调会上,我尝试用飞书妙记手动导出录音,再粘贴到ChatGP…...

bge-large-zh-v1.5在RAG中的应用:sglang快速部署,提升问答准确率

bge-large-zh-v1.5在RAG中的应用:sglang快速部署,提升问答准确率 1. bge-large-zh-v1.5模型概述 bge-large-zh-v1.5是一款专为中文优化的嵌入模型,在检索增强生成(RAG)系统中发挥着关键作用。该模型通过深度学习技术…...

TranslucentTB错误代码0x80070490:从现象到本质的解决之道

TranslucentTB错误代码0x80070490:从现象到本质的解决之道 【免费下载链接】TranslucentTB A lightweight utility that makes the Windows taskbar translucent/transparent. 项目地址: https://gitcode.com/gh_mirrors/tr/TranslucentTB 问题定位 今天收到…...

SDMatte环境部署避坑指南:从Python安装到模型推理全流程

SDMatte环境部署避坑指南:从Python安装到模型推理全流程 1. 前言:为什么要用SDMatte? 如果你正在寻找一个强大的图像抠图工具,SDMatte可能是你的理想选择。这个基于深度学习的模型能够精准地从复杂背景中分离出前景物体&#xf…...

碧蓝航线自动化工具:解放双手的智能管理完整方案

碧蓝航线自动化工具:解放双手的智能管理完整方案 【免费下载链接】AzurLaneAutoScript Azur Lane bot (CN/EN/JP/TW) 碧蓝航线脚本 | 无缝委托科研,全自动大世界 项目地址: https://gitcode.com/gh_mirrors/az/AzurLaneAutoScript 【玩家痛点】现…...

Pi0具身智能v1快速原型验证:用浏览器交互,迭代你的机器人UI/UX设计

Pi0具身智能v1快速原型验证:用浏览器交互,迭代你的机器人UI/UX设计 1. 为什么需要快速原型验证工具 在机器人开发领域,从算法设计到实际部署往往存在巨大鸿沟。传统开发流程中,工程师需要: 编写复杂的仿真环境代码等…...

腾讯混元翻译模型HY-MT1.5-1.8B保姆级部署指南:小白也能轻松搞定

腾讯混元翻译模型HY-MT1.5-1.8B保姆级部署指南:小白也能轻松搞定 1. 引言:为什么选择HY-MT1.5-1.8B? 在当今全球化交流日益频繁的背景下,机器翻译已成为跨语言沟通的重要工具。腾讯混元团队推出的HY-MT1.5-1.8B翻译模型&#xf…...

Qwen3-TTS多线程代码骨架:复用模型实例,节省80%显存

Qwen3-TTS多线程代码骨架:复用模型实例,节省80%显存 1. 多线程语音生成的必要性 语音合成技术在实际应用中往往面临批量处理的需求。以教育行业为例,一套完整的在线课程可能需要生成数百个不同语言、不同风格的语音片段。传统单线程处理方式…...

做seo优化需要多少预算_关键词seo优化排名价格是多少

做SEO优化需要多少预算_关键词SEO优化排名价格是多少 在当今的数字化时代,SEO优化成为了每个企业网站推广的重要手段。究竟需要多少预算来进行SEO优化,关键词SEO优化排名的价格又是多少呢?本文将详细分析这些问题,帮助你更好地理…...

COMSOL设计电极加热通道机关模型:探究内部热流场及电场分布

comsol设计电极加热通道机关模型,可以得加热通道内部热流场及电场分布,害,最近蹲在工位上啃Comsol的时候,突然被导师塞了个小活——要算个电极加热的金属通道里的热流和电场分布,本来想直接拿网上的模板改改交差&#…...

基于Vue3的前端界面开发:FLUX.1-dev图像生成平台搭建

基于Vue3的前端界面开发:FLUX.1-dev图像生成平台搭建 1. 引言 想象一下,你刚拿到一个强大的AI图像生成模型FLUX.1-dev,它能根据文字描述生成高质量图片,还能进行智能编辑。但如何让这个"大脑"拥有一个友好的"面孔…...

打卡信奥刷题(3062)用C++实现信奥题 P6862 [RC-03] 随机树生成器

P6862 [RC-03] 随机树生成器 题目描述 小 R 有一个随机树生成器&#xff0c;其工作原理如下&#xff1a; 输入 nnn&#xff0c;则对于每个 1<i≤n1<i\le n1<i≤n&#xff0c;随机选择一个 [1,i)[1,i)[1,i) 中的节点作为其父亲。返回这棵树。 给定 n,kn,kn,k&#xff0…...

Ollama一键部署EmbeddingGemma-300M:小白也能玩转文本向量化

Ollama一键部署EmbeddingGemma-300M&#xff1a;小白也能玩转文本向量化 想给你的应用加点“智能”吗&#xff1f;比如让用户用自然语言搜索文档&#xff0c;或者自动给文章分类&#xff0c;又或者做个简单的推荐系统&#xff1f;这些听起来高大上的功能&#xff0c;其实核心都…...

RePKG工具完全指南:Wallpaper Engine资源处理全流程解析

RePKG工具完全指南&#xff1a;Wallpaper Engine资源处理全流程解析 【免费下载链接】repkg Wallpaper engine PKG extractor/TEX to image converter 项目地址: https://gitcode.com/gh_mirrors/re/repkg 核心能力解析&#xff1a;从文件解析到格式转换 解析PKG文件结…...

如何用UABEA轻松探索和编辑Unity游戏资源:完整指南

如何用UABEA轻松探索和编辑Unity游戏资源&#xff1a;完整指南 【免费下载链接】UABEA c# uabe for newer versions of unity 项目地址: https://gitcode.com/gh_mirrors/ua/UABEA UABEA&#xff08;Unity Asset Bundle Extractor Avalonia&#xff09;是一款强大的跨平…...

Qwen2.5-72B-Instruct-GPTQ-Int4快速部署:无需conda环境的一键启动方案

Qwen2.5-72B-Instruct-GPTQ-Int4快速部署&#xff1a;无需conda环境的一键启动方案 1. 模型简介 Qwen2.5-72B-Instruct-GPTQ-Int4是通义千问大模型系列的最新成员&#xff0c;作为72B参数规模的指令调优模型&#xff0c;它带来了多项显著改进&#xff1a; 知识量与能力提升&…...

JetBrains IDE试用期到期影响开发?ide-eval-resetter让无缝开发体验成为现实

JetBrains IDE试用期到期影响开发&#xff1f;ide-eval-resetter让无缝开发体验成为现实 【免费下载链接】ide-eval-resetter 项目地址: https://gitcode.com/gh_mirrors/id/ide-eval-resetter 问题剖析&#xff1a;开发中断的隐形成本 在现代软件开发流程中&#xff…...

保姆级教程:Qwen3-TTS-Tokenizer-12Hz快速部署与音频处理实战

保姆级教程&#xff1a;Qwen3-TTS-Tokenizer-12Hz快速部署与音频处理实战 你是否曾为处理海量音频数据而头疼&#xff1f;无论是语音合成训练前的数据预处理&#xff0c;还是低带宽环境下的语音传输&#xff0c;传统的音频处理方法往往在效率和质量之间难以两全。今天&#xf…...

XUnity自动翻译器:打破语言壁垒的终极Unity游戏翻译解决方案

XUnity自动翻译器&#xff1a;打破语言壁垒的终极Unity游戏翻译解决方案 【免费下载链接】XUnity.AutoTranslator 项目地址: https://gitcode.com/gh_mirrors/xu/XUnity.AutoTranslator XUnity自动翻译器是一款开源的Unity游戏实时文本翻译插件&#xff0c;能够在游戏运…...

小米耳机协议逆向实战:如何用Wireshark分析蓝牙数据包(Redmi Buds 5为例)

小米耳机蓝牙协议逆向工程全解析&#xff1a;从数据捕获到模式控制 去年夏天&#xff0c;我在咖啡馆里第一次注意到这个问题——当我把Redmi Buds 5从手机切换到笔记本电脑时&#xff0c;那些在手机上轻松可调的降噪功能突然变得遥不可及。每次都需要笨拙地按压耳机物理按键来切…...

s2-pro开源语音模型入门:Fish Audio s2-pro架构特点与适用场景解析

s2-pro开源语音模型入门&#xff1a;Fish Audio s2-pro架构特点与适用场景解析 1. 专业级语音合成新选择 s2-pro是Fish Audio最新开源的专业级语音合成模型镜像&#xff0c;为开发者提供高质量的文本转语音(TTS)能力。与常规语音合成工具不同&#xff0c;s2-pro最突出的特点是…...

Kandinsky-5.0-I2V-Lite-5s多模型对比:与同类I2V模型的生成效果横向评测

Kandinsky-5.0-I2V-Lite-5s多模型对比&#xff1a;与同类I2V模型的生成效果横向评测 1. 开场白&#xff1a;为什么需要关注图像转视频技术 想象一下这样的场景&#xff1a;你手头有一张精美的产品静物照片&#xff0c;如果能让它动起来展示360度视角&#xff0c;转化率会不会…...

OpenClaw自动化报告:Phi-3-mini-128k-instruct数据分析与可视化

OpenClaw自动化报告&#xff1a;Phi-3-mini-128k-instruct数据分析与可视化 1. 为什么需要自动化数据分析 上周我接手了一个紧急任务&#xff1a;需要从300多份客户反馈的CSV文件中提取关键洞察&#xff0c;并制作成PPT向团队汇报。手动操作不仅耗时&#xff0c;还容易遗漏重…...

WuliArt Qwen-Image Turbo实战:快速生成赛博朋克壁纸,效果惊艳

WuliArt Qwen-Image Turbo实战&#xff1a;快速生成赛博朋克壁纸&#xff0c;效果惊艳 1. 引言&#xff1a;当AI绘画遇见赛博朋克 你是否曾经幻想过&#xff0c;只需输入一段文字描述&#xff0c;就能立即获得一张充满未来感的赛博朋克风格壁纸&#xff1f;过去&#xff0c;这…...

终极指南:使用BetterJoy让Switch手柄变身全能PC游戏控制器

终极指南&#xff1a;使用BetterJoy让Switch手柄变身全能PC游戏控制器 【免费下载链接】BetterJoy Allows the Nintendo Switch Pro Controller, Joycons and SNES controller to be used with CEMU, Citra, Dolphin, Yuzu and as generic XInput 项目地址: https://gitcode.…...

使用Matlab进行RVC变声效果的信号分析与可视化

使用Matlab进行RVC变声效果的信号分析与可视化 最近在研究语音转换技术&#xff0c;特别是RVC这类模型&#xff0c;发现大家讨论的焦点大多在模型架构、训练技巧或者最终听感上。作为一个有信号处理背景的工程师&#xff0c;我总觉得少了点什么——我们能不能“看见”声音的变…...

StructBERT中文相似度模型部署:支持多模型并行服务(BERT/RoBERTa/StructBERT)

StructBERT中文相似度模型部署&#xff1a;支持多模型并行服务&#xff08;BERT/RoBERTa/StructBERT&#xff09; 想快速搭建一个能理解中文句子相似度的AI服务吗&#xff1f;比如判断“今天天气真好”和“阳光明媚的一天”是不是一个意思&#xff0c;或者自动给用户提问匹配最…...