当前位置: 首页 > news >正文

【LangFuse】数据集与测试

1. 在线标注

在这里插入图片描述

2. 上传已有数据集

import json# 调整数据格式 {"input":{...},"expected_output":"label"}
data = []
with open('my_annotations.jsonl', 'r', encoding='utf-8') as fp:for line in fp:example = json.loads(line.strip())item = {"input": {"outlines": example["outlines"],"user_input": example["user_input"]},"expected_output": example["label"]}data.append(item)
#%%
from langfuse import Langfuse
from langfuse.model import CreateDatasetRequest, CreateDatasetItemRequest
from tqdm import tqdm
import langfusedataset_name = "my-dataset"# 初始化客户端
langfuse=Langfuse()# 创建数据集,如果已存在不会重复创建
try:langfuse.create_dataset(name=dataset_name,# optional descriptiondescription="My first dataset",# optional metadatametadata={"author": "wzr","type": "demo"})
except:pass# 考虑演示运行速度,只上传前50条数据
for item in tqdm(data[:50]):langfuse.create_dataset_item(dataset_name="my-dataset",input=item["input"],expected_output=item["expected_output"])

tqdm

100%|██████████| 50/50 [00:11<00:00, 4.48it/s]

3. 定义评估函数

def simple_evaluation(output, expected_output):return output == expected_output

4.运行测试

Prompt 模板与 Chain(LCEL)

 import ChatOpenAI
from langchain_core.output_parsers import StrOutputParserneed_answer = PromptTemplate.from_template("""
*********
你是老师的助教,你的工作是从学员的课堂交流中选择出需要老师回答的问题,加以整理以交给老师回答。课程内容:
{outlines}
*********
学员输入:
{user_input}
*********
如果这是一个需要老师答疑的问题,回复Y,否则回复N。
只回复Y或N,不要回复其他内容。""")model = ChatOpenAI(temperature=0, seed=42)
parser = StrOutputParser()chain_v1 = (need_answer| model| parser

在LangFuse数据集测试效果:

from concurrent.futures import ThreadPoolExecutor
import threading
from langfuse import Langfuse
from datetime import datetimelangfuse = Langfuse()
lock = threading.Lock()def run_evaluation(chain, dataset_name, run_name):dataset = langfuse.get_dataset(dataset_name)def process_item(item):with lock:# 注意:多线程调用此处要加锁,否则会有id冲突导致丢数据handler = item.get_langchain_handler(run_name=run_name)# Assuming chain.invoke is a synchronous functionoutput = chain.invoke(item.input, config={"callbacks": [handler]})# Assuming handler.root_span.score is a synchronous functionhandler.trace.score(name="accuracy",value=simple_evaluation(output, item.expected_output))print('.', end='', flush=True)# for item in dataset.items:#    process_item(item)with ThreadPoolExecutor(max_workers=4) as executor:executor.map(process_item, dataset.items)
run_evaluation(chain_v1, "my-dataset", "v1-"+datetime.now().strftime("%d/%m/%Y %H:%M:%S"))# 保证全部数据同步到云端
langfuse_context.flush()

tqdm Python进度条

tqdm 是一个快速,可扩展的Python进度条,可以在 Python 长循环中添加一个进度提示信息,用户只需要封装任意的 iterable 在 tqdm(iterable) 函数下即可。

在你的代码中:

from tqdm import tqdmwith tqdm(total=len(dataset.items)) as pbar:for _ in executor.map(process_item, dataset.items):pbar.update(1)

你首先从 tqdm 模块导入了 tqdm 函数,然后在 with 语句中创建了一个 tqdm 对象。total=len(dataset.items) 设置了进度条的总长度,即数据集项目的总数。

for 循环中,executor.map(process_item, dataset.items) 是一个并行执行任务的操作,它会对 dataset.items 中的每一个元素调用 process_item 函数。每次任务完成时,pbar.update(1) 会更新进度条,表示有一个任务已经完成。

这样,当你的程序运行时,你会在控制台上看到一个进度条,它会显示已经完成的任务数量和总任务数量,以及进度条的百分比和估计的剩余时间。这对于长时间运行的任务非常有用,因为它可以让用户知道任务的进度和完成时间。

ThreadPoolExecutor

这段代码使用了Python的concurrent.futures.ThreadPoolExecutor类来创建一个线程池,并使用该线程池并行地执行任务。下面是对这段代码的详细解释:

from concurrent.futures import ThreadPoolExecutorwith ThreadPoolExecutor(max_workers=4) as executor:executor.map(process_item, dataset.items)
  1. ThreadPoolExecutor(max_workers=4): 这里创建了一个ThreadPoolExecutor实例,它管理一个包含4个线程的线程池。max_workers参数定义了线程池中线程的最大数量。

  2. with … as executor: 使用with语句来管理线程池的生命周期。当with语句块开始时,线程池被创建,当with语句块结束时,线程池会被关闭。使用with语句可以确保线程池在不再需要时被正确关闭,释放资源。

  3. executor.map(process_item, dataset.items): map方法是ThreadPoolExecutor的一个方法,它接受一个函数和一个可迭代对象作为参数。在这个例子中,process_item函数会被应用到dataset.items中的每一个元素上。map方法会返回一个结果的迭代器,这些结果与输入的可迭代对象的元素一一对应。

这段代码的作用是创建一个包含4个线程的线程池,然后并行地对dataset.items中的每一个元素调用process_item函数。这是一种并行处理任务的方式,可以提高程序的效率,特别是在处理I/O密集型任务(如网络请求或文件读写)时。

这段代码实现了一个基于Langfuse的并行评估系统,主要用于对LangChain流程进行批量测试和监控。以下是对代码的逐层解析:


代码结构概览

from concurrent.futures import ThreadPoolExecutor
import threading
from langfuse import Langfuse
from datetime import datetimelangfuse = Langfuse()  # 初始化Langfuse客户端
lock = threading.Lock()  # 全局线程锁def run_evaluation(chain, dataset_name, run_name):# 核心评估逻辑# ...

核心功能模块分解

1. 数据加载
dataset = langfuse.get_dataset(dataset_name)
  • 作用:从Langfuse平台加载预定义的测试数据集。
  • 数据格式:假设数据集包含多个items,每个item包含:
    {"input": "用户输入内容",  # 测试输入"expected_output": "期望输出"  # 预期结果
    }
    

2. 多线程任务分发
with ThreadPoolExecutor(max_workers=4) as executor:executor.map(process_item, dataset.items)
  • 技术选型:使用线程池(4线程)并行处理数据集。
  • 性能优势:适合处理I/O密集型任务(如网络请求到OpenAI接口)。
  • 注意事项:需确保chain.invoke和Langfuse操作是线程安全的。

3. 线程安全处理
def process_item(item):with lock:  # 加锁防止Handler ID冲突handler = item.get_langchain_handler(run_name=run_name)
  • 关键问题
    • Langfuse的Handler生成依赖上下文ID,多线程环境下可能产生ID冲突。
    • 不加锁会导致监控数据丢失或错乱。
  • 解决方式:通过全局锁(threading.Lock)序列化Handler的创建过程。

4. 监控埋点与评估
output = chain.invoke(item.input, config={"callbacks": [handler]})handler.trace.score(name="accuracy",value=simple_evaluation(output, item.expected_output)
)
  • 埋点层次
    1. 全链路追踪:通过handler自动记录LangChain每一步的执行过程。
    2. 自定义评分:手动添加准确率(accuracy)评分。
  • 评估逻辑simple_evaluation函数(需自定义)对比输出与预期结果,例如:
    def simple_evaluation(output, expected):return 1 if output.strip() == expected.strip() else 0
    

监控数据示例

在Langfuse后台会生成如下结构的数据:

字段示例值说明
TraceLangChain Evaluation评估任务根节点
├─ Spanprompt_processingLangChain内部步骤
├─ Generationgpt-3.5-turboLLM调用记录
└─ Scoreaccuracy: 0.8自定义评分

关键设计模式

1. 生产者-消费者模式
  • 生产者:线程池分发数据项。
  • 消费者process_item函数处理单个数据项。
2. 回调机制

通过config={"callbacks": [handler]}将Langfuse监控注入LangChain执行过程。

3. 线程安全隔离
  • 全局锁:确保Handler创建原子性。
  • 线程局部存储:Langfuse SDK内部应使用线程局部变量存储上下文。

潜在优化点

1. 错误处理增强
try:output = chain.invoke(...)
except Exception as e:handler.trace.error(str(e))  # 记录异常
2. 动态线程数调整
max_workers = os.cpu_count() * 2  # 根据CPU核心数动态设置
3. 进度可视化
from tqdm import tqdmwith tqdm(total=len(dataset.items)) as pbar:for _ in executor.map(process_item, dataset.items):pbar.update(1)

适用场景

  1. 批量模型测试:对比不同模型版本的效果。
  2. 回归测试:确保代码更新不破坏核心功能。
  3. 异常诊断:通过Trace分析失败案例的详细执行路径。

通过这种设计,开发者可以高效地验证LangChain应用的质量,并在生产环境中持续监控性能表现。

相关文章:

【LangFuse】数据集与测试

1. 在线标注 2. 上传已有数据集 import json# 调整数据格式 {"input":{...},"expected_output":"label"} data [] with open(my_annotations.jsonl, r, encodingutf-8) as fp:for line in fp:example json.loads(line.strip())item {"i…...

【Python】如何解决Jupyter Notebook修改外部模块后必须重启内核的问题?

“为什么我修改了Python模块的代码&#xff0c;Jupyter Notebook却看不到变化&#xff1f;” 一、问题现象&#xff1a;令人抓狂的开发体验 假设你正在开发一个图像处理项目&#xff0c;项目结构如下&#xff1a; project/ ├── utils/ │ └── image_processor.py └…...

Redis 篇

一、数据结构 二、持久化方式 Redis 提供了两种主要的持久化方式&#xff0c;分别是 RDB&#xff08;Redis Database&#xff09;和 AOF&#xff08;Append Only File&#xff09;&#xff0c;此外&#xff0c;还可以同时使用这两种方式以增强数据安全性&#xff0c;以下为你…...

React + TypeScript 实战指南:用类型守护你的组件

TypeScript 为 React 开发带来了强大的类型安全保障&#xff0c;这里解析常见的一些TS写法&#xff1a; 一、组件基础类型 1. 函数组件定义 // 显式声明 Props 类型并标注返回值 interface WelcomeProps {name: string;age?: number; // 可选属性 }const Welcome: React.FC…...

从零开始:Linux环境下如何制作静态库与动态库

个人主页&#xff1a;chian-ocean 文章专栏-Linux 前言 动静态库是编程中两种主要的库类型&#xff0c;它们用于帮助开发者复用已有的代码&#xff0c;而不需要每次都从头开始编写。它们的主要区别在于链接和加载的时机、方式以及使用场景 库 库就是一些已经写好并且经过测试…...

【智能体Agent】ReAct智能体的实现思路和关键技术

基于ReAct&#xff08;Reasoning Acting&#xff09;框架的自主智能体 import re from typing import List, Tuplefrom langchain_community.chat_message_histories.in_memory import ChatMessageHistory from langchain_core.language_models.chat_models import BaseChatM…...

Java进阶:Zookeeper相关笔记

概要总结&#xff1a; ●Zookeeper是一个开源的分布式协调服务&#xff0c;需要下载并部署在服务器上(使用cmd启动&#xff0c;windows与linux都可用)。 ●zookeeper一般用来实现诸如数据订阅/发布、负载均衡、命名服务、集群管理、分布式锁和分布式队列等功能。 ●有多台服…...

QT-绘画事件

实现颜色的随时调整&#xff0c;追加橡皮擦功能 widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QColor> #include <QPoint> #include <QVector> #include <QMouseEvent> #include <QPainter> #include <Q…...

鸿蒙NEXT开发-端云一体化开发

注意&#xff1a;博主有个鸿蒙专栏&#xff0c;里面从上到下有关于鸿蒙next的教学文档&#xff0c;大家感兴趣可以学习下 如果大家觉得博主文章写的好的话&#xff0c;可以点下关注&#xff0c;博主会一直更新鸿蒙next相关知识 目录 端云一体化开发基本概念 传统架构 端云一…...

大模型——股票分析AI工具开发教程

大模型——股票分析AI工具开发教程 在本教程中,我们将利用Google Gemini 2.0 Flash模型创建一个简单但有效的股票分析器。 你是否曾被大量的股票市场数据所淹没?希望有一个私人助理来筛选噪音并为您提供清晰、可操作的见解?好吧,你可以自己构建一个,而且由于 Python 的强…...

nexus 实现https 私有镜像搭建

1、安装nexus 1.1 安装JDK17 rpm -ivh jdk-17.0.13_linux-x64_bin.rpm 1.2 下载安装包解压到指定目录 tar zxvf nexus-3.77.2-02-unix.tar.gz -C /usr/local 2、运行nexus 默认8081端口 cd /usr/local/nexus-3.77.2-02 && bin/nexus start 3、配置nexus私有docker 镜…...

颈椎X光数据集(cervical spine X-ray dataset)

颈椎X光数据集&#xff08;cervical spine X-ray dataset&#xff09; 一.颈椎X光&#xff08;1248张原始图像&#xff0c;无处理&#xff0c;jpg格式&#xff09; 二&#xff0e;颈椎X光&#xff08;1000张原始图像&#xff0c;无处理&#xff0c;jpg格式&#xff09; 此数据…...

(动态规划 完全背包 零钱兑换)leetcode 322

本题为完全背包 与01背包的区别是 物品可以任意取 而01背包只能取一次 这就导致了状态转移方程的不同 1.当放不下:的时候 转移方程是一样的 取0到i-1 物品&#xff0c;背包容量为j的最优值 else 2.放得下:就是取 0到i-1 物品,背包容量为j的最优值和 “0到i的[j-w[i]]v…...

【AI大模型】DeepSeek + Kimi 高效制作PPT实战详解

目录 一、前言 二、传统 PPT 制作问题 2.1 传统方式制作 PPT 2.2 AI 大模型辅助制作 PPT 2.3 适用场景对比分析 2.4 最佳实践与推荐 三、DeepSeek Kimi 高效制作PPT操作实践 3.1 Kimi 简介 3.2 DeepSeek Kimi 制作PPT优势 3.2.1 DeepSeek 优势 3.2.2 Kimi 制作PPT优…...

Pytorch的一小步,昇腾芯片的一大步

Pytorch的一小步&#xff0c;昇腾芯片的一大步 相信在AI圈的人多多少少都看到了最近的信息&#xff1a;PyTorch最新2.1版本宣布支持华为昇腾芯片&#xff01; 1、 发生了什么事儿&#xff1f; 在2023年10月4日PyTorch 2.1版本的发布博客上&#xff0c;PyTorch介绍的beta版本…...

rabbitmq-amqp事务消息+消费失败重试机制+prefetch限流

1. 安装和配置 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency><dependency> <groupId>com.fasterxml.jackson.core</groupId> <arti…...

【HarmonyOS Next】自定义Tabs

背景 项目中Tabs的使用可以说是特别的频繁&#xff0c;但是官方提供的Tabs使用起来&#xff0c;存在tab选项卡切换动画滞后的问题。 原始动画无法满足产品的UI需求&#xff0c;因此&#xff0c;这篇文章将实现下面页面滑动&#xff0c;tab选项卡实时滑动的动画效果。 实现逻…...

Sass 模块化革命:深入解析 @use 语法,打造高效 CSS 架构

文章目录 前言use 用法1. 模块化与命名空间2. use 中 as 语法的使用3. as * 语法的使用4. 私有成员的访问5. use 中with默认值6. use 导入问题总结下一篇预告&#xff1a; 前言 在上一篇中&#xff0c;我们深入探讨了 Sass 中 import 语法的局限性&#xff0c;正是因为这些问题…...

【渗透测试】反弹 Shell 技术详解(一)

反弹 Shell 技术详解 一、前置知识 反弹 shell&#xff08;Reverse Shell&#xff09;是一种技术&#xff0c;攻击者利用它可以在远程主机上获得一个交互式的命令行接口。通常情况下&#xff0c;反弹 shell 会将标准输入&#xff08;stdin&#xff09;、标准输出&#xff08;…...

python:pymunk + pygame 模拟六边形中小球弹跳运动

向 chat.deepseek.com 提问&#xff1a;编写 python 程序&#xff0c;用 pymunk, 有一个正六边形&#xff0c;围绕中心点缓慢旋转&#xff0c;六边形内有一个小球&#xff0c;六边形的6条边作为墙壁&#xff0c;小球受重力和摩擦力、弹力影响&#xff0c;模拟小球弹跳运动&…...

uniapp 对接腾讯云IM群组成员管理(增删改查)

UniApp 实战&#xff1a;腾讯云IM群组成员管理&#xff08;增删改查&#xff09; 一、前言 在社交类App开发中&#xff0c;群组成员管理是核心功能之一。本文将基于UniApp框架&#xff0c;结合腾讯云IM SDK&#xff0c;详细讲解如何实现群组成员的增删改查全流程。 权限校验…...

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…...

ES6从入门到精通:前言

ES6简介 ES6&#xff08;ECMAScript 2015&#xff09;是JavaScript语言的重大更新&#xff0c;引入了许多新特性&#xff0c;包括语法糖、新数据类型、模块化支持等&#xff0c;显著提升了开发效率和代码可维护性。 核心知识点概览 变量声明 let 和 const 取代 var&#xf…...

MFC内存泄露

1、泄露代码示例 void X::SetApplicationBtn() {CMFCRibbonApplicationButton* pBtn GetApplicationButton();// 获取 Ribbon Bar 指针// 创建自定义按钮CCustomRibbonAppButton* pCustomButton new CCustomRibbonAppButton();pCustomButton->SetImage(IDB_BITMAP_Jdp26)…...

【大模型RAG】Docker 一键部署 Milvus 完整攻略

本文概要 Milvus 2.5 Stand-alone 版可通过 Docker 在几分钟内完成安装&#xff1b;只需暴露 19530&#xff08;gRPC&#xff09;与 9091&#xff08;HTTP/WebUI&#xff09;两个端口&#xff0c;即可让本地电脑通过 PyMilvus 或浏览器访问远程 Linux 服务器上的 Milvus。下面…...

高等数学(下)题型笔记(八)空间解析几何与向量代数

目录 0 前言 1 向量的点乘 1.1 基本公式 1.2 例题 2 向量的叉乘 2.1 基础知识 2.2 例题 3 空间平面方程 3.1 基础知识 3.2 例题 4 空间直线方程 4.1 基础知识 4.2 例题 5 旋转曲面及其方程 5.1 基础知识 5.2 例题 6 空间曲面的法线与切平面 6.1 基础知识 6.2…...

Java入门学习详细版(一)

大家好&#xff0c;Java 学习是一个系统学习的过程&#xff0c;核心原则就是“理论 实践 坚持”&#xff0c;并且需循序渐进&#xff0c;不可过于着急&#xff0c;本篇文章推出的这份详细入门学习资料将带大家从零基础开始&#xff0c;逐步掌握 Java 的核心概念和编程技能。 …...

Caliper 配置文件解析:config.yaml

Caliper 是一个区块链性能基准测试工具,用于评估不同区块链平台的性能。下面我将详细解释你提供的 fisco-bcos.json 文件结构,并说明它与 config.yaml 文件的关系。 fisco-bcos.json 文件解析 这个文件是针对 FISCO-BCOS 区块链网络的 Caliper 配置文件,主要包含以下几个部…...

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据 Power Query 具有大量专门帮助您清理和准备数据以供分析的功能。 您将了解如何简化复杂模型、更改数据类型、重命名对象和透视数据。 您还将了解如何分析列&#xff0c;以便知晓哪些列包含有价值的数据&#xff0c;…...

腾讯云V3签名

想要接入腾讯云的Api&#xff0c;必然先按其文档计算出所要求的签名。 之前也调用过腾讯云的接口&#xff0c;但总是卡在签名这一步&#xff0c;最后放弃选择SDK&#xff0c;这次终于自己代码实现。 可能腾讯云翻新了接口文档&#xff0c;现在阅读起来&#xff0c;清晰了很多&…...