当前位置: 首页 > news >正文

【LangFuse】数据集与测试

1. 在线标注

在这里插入图片描述

2. 上传已有数据集

import json# 调整数据格式 {"input":{...},"expected_output":"label"}
data = []
with open('my_annotations.jsonl', 'r', encoding='utf-8') as fp:for line in fp:example = json.loads(line.strip())item = {"input": {"outlines": example["outlines"],"user_input": example["user_input"]},"expected_output": example["label"]}data.append(item)
#%%
from langfuse import Langfuse
from langfuse.model import CreateDatasetRequest, CreateDatasetItemRequest
from tqdm import tqdm
import langfusedataset_name = "my-dataset"# 初始化客户端
langfuse=Langfuse()# 创建数据集,如果已存在不会重复创建
try:langfuse.create_dataset(name=dataset_name,# optional descriptiondescription="My first dataset",# optional metadatametadata={"author": "wzr","type": "demo"})
except:pass# 考虑演示运行速度,只上传前50条数据
for item in tqdm(data[:50]):langfuse.create_dataset_item(dataset_name="my-dataset",input=item["input"],expected_output=item["expected_output"])

tqdm

100%|██████████| 50/50 [00:11<00:00, 4.48it/s]

3. 定义评估函数

def simple_evaluation(output, expected_output):return output == expected_output

4.运行测试

Prompt 模板与 Chain(LCEL)

 import ChatOpenAI
from langchain_core.output_parsers import StrOutputParserneed_answer = PromptTemplate.from_template("""
*********
你是老师的助教,你的工作是从学员的课堂交流中选择出需要老师回答的问题,加以整理以交给老师回答。课程内容:
{outlines}
*********
学员输入:
{user_input}
*********
如果这是一个需要老师答疑的问题,回复Y,否则回复N。
只回复Y或N,不要回复其他内容。""")model = ChatOpenAI(temperature=0, seed=42)
parser = StrOutputParser()chain_v1 = (need_answer| model| parser

在LangFuse数据集测试效果:

from concurrent.futures import ThreadPoolExecutor
import threading
from langfuse import Langfuse
from datetime import datetimelangfuse = Langfuse()
lock = threading.Lock()def run_evaluation(chain, dataset_name, run_name):dataset = langfuse.get_dataset(dataset_name)def process_item(item):with lock:# 注意:多线程调用此处要加锁,否则会有id冲突导致丢数据handler = item.get_langchain_handler(run_name=run_name)# Assuming chain.invoke is a synchronous functionoutput = chain.invoke(item.input, config={"callbacks": [handler]})# Assuming handler.root_span.score is a synchronous functionhandler.trace.score(name="accuracy",value=simple_evaluation(output, item.expected_output))print('.', end='', flush=True)# for item in dataset.items:#    process_item(item)with ThreadPoolExecutor(max_workers=4) as executor:executor.map(process_item, dataset.items)
run_evaluation(chain_v1, "my-dataset", "v1-"+datetime.now().strftime("%d/%m/%Y %H:%M:%S"))# 保证全部数据同步到云端
langfuse_context.flush()

tqdm Python进度条

tqdm 是一个快速,可扩展的Python进度条,可以在 Python 长循环中添加一个进度提示信息,用户只需要封装任意的 iterable 在 tqdm(iterable) 函数下即可。

在你的代码中:

from tqdm import tqdmwith tqdm(total=len(dataset.items)) as pbar:for _ in executor.map(process_item, dataset.items):pbar.update(1)

你首先从 tqdm 模块导入了 tqdm 函数,然后在 with 语句中创建了一个 tqdm 对象。total=len(dataset.items) 设置了进度条的总长度,即数据集项目的总数。

for 循环中,executor.map(process_item, dataset.items) 是一个并行执行任务的操作,它会对 dataset.items 中的每一个元素调用 process_item 函数。每次任务完成时,pbar.update(1) 会更新进度条,表示有一个任务已经完成。

这样,当你的程序运行时,你会在控制台上看到一个进度条,它会显示已经完成的任务数量和总任务数量,以及进度条的百分比和估计的剩余时间。这对于长时间运行的任务非常有用,因为它可以让用户知道任务的进度和完成时间。

ThreadPoolExecutor

这段代码使用了Python的concurrent.futures.ThreadPoolExecutor类来创建一个线程池,并使用该线程池并行地执行任务。下面是对这段代码的详细解释:

from concurrent.futures import ThreadPoolExecutorwith ThreadPoolExecutor(max_workers=4) as executor:executor.map(process_item, dataset.items)
  1. ThreadPoolExecutor(max_workers=4): 这里创建了一个ThreadPoolExecutor实例,它管理一个包含4个线程的线程池。max_workers参数定义了线程池中线程的最大数量。

  2. with … as executor: 使用with语句来管理线程池的生命周期。当with语句块开始时,线程池被创建,当with语句块结束时,线程池会被关闭。使用with语句可以确保线程池在不再需要时被正确关闭,释放资源。

  3. executor.map(process_item, dataset.items): map方法是ThreadPoolExecutor的一个方法,它接受一个函数和一个可迭代对象作为参数。在这个例子中,process_item函数会被应用到dataset.items中的每一个元素上。map方法会返回一个结果的迭代器,这些结果与输入的可迭代对象的元素一一对应。

这段代码的作用是创建一个包含4个线程的线程池,然后并行地对dataset.items中的每一个元素调用process_item函数。这是一种并行处理任务的方式,可以提高程序的效率,特别是在处理I/O密集型任务(如网络请求或文件读写)时。

这段代码实现了一个基于Langfuse的并行评估系统,主要用于对LangChain流程进行批量测试和监控。以下是对代码的逐层解析:


代码结构概览

from concurrent.futures import ThreadPoolExecutor
import threading
from langfuse import Langfuse
from datetime import datetimelangfuse = Langfuse()  # 初始化Langfuse客户端
lock = threading.Lock()  # 全局线程锁def run_evaluation(chain, dataset_name, run_name):# 核心评估逻辑# ...

核心功能模块分解

1. 数据加载
dataset = langfuse.get_dataset(dataset_name)
  • 作用:从Langfuse平台加载预定义的测试数据集。
  • 数据格式:假设数据集包含多个items,每个item包含:
    {"input": "用户输入内容",  # 测试输入"expected_output": "期望输出"  # 预期结果
    }
    

2. 多线程任务分发
with ThreadPoolExecutor(max_workers=4) as executor:executor.map(process_item, dataset.items)
  • 技术选型:使用线程池(4线程)并行处理数据集。
  • 性能优势:适合处理I/O密集型任务(如网络请求到OpenAI接口)。
  • 注意事项:需确保chain.invoke和Langfuse操作是线程安全的。

3. 线程安全处理
def process_item(item):with lock:  # 加锁防止Handler ID冲突handler = item.get_langchain_handler(run_name=run_name)
  • 关键问题
    • Langfuse的Handler生成依赖上下文ID,多线程环境下可能产生ID冲突。
    • 不加锁会导致监控数据丢失或错乱。
  • 解决方式:通过全局锁(threading.Lock)序列化Handler的创建过程。

4. 监控埋点与评估
output = chain.invoke(item.input, config={"callbacks": [handler]})handler.trace.score(name="accuracy",value=simple_evaluation(output, item.expected_output)
)
  • 埋点层次
    1. 全链路追踪:通过handler自动记录LangChain每一步的执行过程。
    2. 自定义评分:手动添加准确率(accuracy)评分。
  • 评估逻辑simple_evaluation函数(需自定义)对比输出与预期结果,例如:
    def simple_evaluation(output, expected):return 1 if output.strip() == expected.strip() else 0
    

监控数据示例

在Langfuse后台会生成如下结构的数据:

字段示例值说明
TraceLangChain Evaluation评估任务根节点
├─ Spanprompt_processingLangChain内部步骤
├─ Generationgpt-3.5-turboLLM调用记录
└─ Scoreaccuracy: 0.8自定义评分

关键设计模式

1. 生产者-消费者模式
  • 生产者:线程池分发数据项。
  • 消费者process_item函数处理单个数据项。
2. 回调机制

通过config={"callbacks": [handler]}将Langfuse监控注入LangChain执行过程。

3. 线程安全隔离
  • 全局锁:确保Handler创建原子性。
  • 线程局部存储:Langfuse SDK内部应使用线程局部变量存储上下文。

潜在优化点

1. 错误处理增强
try:output = chain.invoke(...)
except Exception as e:handler.trace.error(str(e))  # 记录异常
2. 动态线程数调整
max_workers = os.cpu_count() * 2  # 根据CPU核心数动态设置
3. 进度可视化
from tqdm import tqdmwith tqdm(total=len(dataset.items)) as pbar:for _ in executor.map(process_item, dataset.items):pbar.update(1)

适用场景

  1. 批量模型测试:对比不同模型版本的效果。
  2. 回归测试:确保代码更新不破坏核心功能。
  3. 异常诊断:通过Trace分析失败案例的详细执行路径。

通过这种设计,开发者可以高效地验证LangChain应用的质量,并在生产环境中持续监控性能表现。

相关文章:

【LangFuse】数据集与测试

1. 在线标注 2. 上传已有数据集 import json# 调整数据格式 {"input":{...},"expected_output":"label"} data [] with open(my_annotations.jsonl, r, encodingutf-8) as fp:for line in fp:example json.loads(line.strip())item {"i…...

【Python】如何解决Jupyter Notebook修改外部模块后必须重启内核的问题?

“为什么我修改了Python模块的代码&#xff0c;Jupyter Notebook却看不到变化&#xff1f;” 一、问题现象&#xff1a;令人抓狂的开发体验 假设你正在开发一个图像处理项目&#xff0c;项目结构如下&#xff1a; project/ ├── utils/ │ └── image_processor.py └…...

Redis 篇

一、数据结构 二、持久化方式 Redis 提供了两种主要的持久化方式&#xff0c;分别是 RDB&#xff08;Redis Database&#xff09;和 AOF&#xff08;Append Only File&#xff09;&#xff0c;此外&#xff0c;还可以同时使用这两种方式以增强数据安全性&#xff0c;以下为你…...

React + TypeScript 实战指南:用类型守护你的组件

TypeScript 为 React 开发带来了强大的类型安全保障&#xff0c;这里解析常见的一些TS写法&#xff1a; 一、组件基础类型 1. 函数组件定义 // 显式声明 Props 类型并标注返回值 interface WelcomeProps {name: string;age?: number; // 可选属性 }const Welcome: React.FC…...

从零开始:Linux环境下如何制作静态库与动态库

个人主页&#xff1a;chian-ocean 文章专栏-Linux 前言 动静态库是编程中两种主要的库类型&#xff0c;它们用于帮助开发者复用已有的代码&#xff0c;而不需要每次都从头开始编写。它们的主要区别在于链接和加载的时机、方式以及使用场景 库 库就是一些已经写好并且经过测试…...

【智能体Agent】ReAct智能体的实现思路和关键技术

基于ReAct&#xff08;Reasoning Acting&#xff09;框架的自主智能体 import re from typing import List, Tuplefrom langchain_community.chat_message_histories.in_memory import ChatMessageHistory from langchain_core.language_models.chat_models import BaseChatM…...

Java进阶:Zookeeper相关笔记

概要总结&#xff1a; ●Zookeeper是一个开源的分布式协调服务&#xff0c;需要下载并部署在服务器上(使用cmd启动&#xff0c;windows与linux都可用)。 ●zookeeper一般用来实现诸如数据订阅/发布、负载均衡、命名服务、集群管理、分布式锁和分布式队列等功能。 ●有多台服…...

QT-绘画事件

实现颜色的随时调整&#xff0c;追加橡皮擦功能 widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QColor> #include <QPoint> #include <QVector> #include <QMouseEvent> #include <QPainter> #include <Q…...

鸿蒙NEXT开发-端云一体化开发

注意&#xff1a;博主有个鸿蒙专栏&#xff0c;里面从上到下有关于鸿蒙next的教学文档&#xff0c;大家感兴趣可以学习下 如果大家觉得博主文章写的好的话&#xff0c;可以点下关注&#xff0c;博主会一直更新鸿蒙next相关知识 目录 端云一体化开发基本概念 传统架构 端云一…...

大模型——股票分析AI工具开发教程

大模型——股票分析AI工具开发教程 在本教程中,我们将利用Google Gemini 2.0 Flash模型创建一个简单但有效的股票分析器。 你是否曾被大量的股票市场数据所淹没?希望有一个私人助理来筛选噪音并为您提供清晰、可操作的见解?好吧,你可以自己构建一个,而且由于 Python 的强…...

nexus 实现https 私有镜像搭建

1、安装nexus 1.1 安装JDK17 rpm -ivh jdk-17.0.13_linux-x64_bin.rpm 1.2 下载安装包解压到指定目录 tar zxvf nexus-3.77.2-02-unix.tar.gz -C /usr/local 2、运行nexus 默认8081端口 cd /usr/local/nexus-3.77.2-02 && bin/nexus start 3、配置nexus私有docker 镜…...

颈椎X光数据集(cervical spine X-ray dataset)

颈椎X光数据集&#xff08;cervical spine X-ray dataset&#xff09; 一.颈椎X光&#xff08;1248张原始图像&#xff0c;无处理&#xff0c;jpg格式&#xff09; 二&#xff0e;颈椎X光&#xff08;1000张原始图像&#xff0c;无处理&#xff0c;jpg格式&#xff09; 此数据…...

(动态规划 完全背包 零钱兑换)leetcode 322

本题为完全背包 与01背包的区别是 物品可以任意取 而01背包只能取一次 这就导致了状态转移方程的不同 1.当放不下:的时候 转移方程是一样的 取0到i-1 物品&#xff0c;背包容量为j的最优值 else 2.放得下:就是取 0到i-1 物品,背包容量为j的最优值和 “0到i的[j-w[i]]v…...

【AI大模型】DeepSeek + Kimi 高效制作PPT实战详解

目录 一、前言 二、传统 PPT 制作问题 2.1 传统方式制作 PPT 2.2 AI 大模型辅助制作 PPT 2.3 适用场景对比分析 2.4 最佳实践与推荐 三、DeepSeek Kimi 高效制作PPT操作实践 3.1 Kimi 简介 3.2 DeepSeek Kimi 制作PPT优势 3.2.1 DeepSeek 优势 3.2.2 Kimi 制作PPT优…...

Pytorch的一小步,昇腾芯片的一大步

Pytorch的一小步&#xff0c;昇腾芯片的一大步 相信在AI圈的人多多少少都看到了最近的信息&#xff1a;PyTorch最新2.1版本宣布支持华为昇腾芯片&#xff01; 1、 发生了什么事儿&#xff1f; 在2023年10月4日PyTorch 2.1版本的发布博客上&#xff0c;PyTorch介绍的beta版本…...

rabbitmq-amqp事务消息+消费失败重试机制+prefetch限流

1. 安装和配置 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency><dependency> <groupId>com.fasterxml.jackson.core</groupId> <arti…...

【HarmonyOS Next】自定义Tabs

背景 项目中Tabs的使用可以说是特别的频繁&#xff0c;但是官方提供的Tabs使用起来&#xff0c;存在tab选项卡切换动画滞后的问题。 原始动画无法满足产品的UI需求&#xff0c;因此&#xff0c;这篇文章将实现下面页面滑动&#xff0c;tab选项卡实时滑动的动画效果。 实现逻…...

Sass 模块化革命:深入解析 @use 语法,打造高效 CSS 架构

文章目录 前言use 用法1. 模块化与命名空间2. use 中 as 语法的使用3. as * 语法的使用4. 私有成员的访问5. use 中with默认值6. use 导入问题总结下一篇预告&#xff1a; 前言 在上一篇中&#xff0c;我们深入探讨了 Sass 中 import 语法的局限性&#xff0c;正是因为这些问题…...

【渗透测试】反弹 Shell 技术详解(一)

反弹 Shell 技术详解 一、前置知识 反弹 shell&#xff08;Reverse Shell&#xff09;是一种技术&#xff0c;攻击者利用它可以在远程主机上获得一个交互式的命令行接口。通常情况下&#xff0c;反弹 shell 会将标准输入&#xff08;stdin&#xff09;、标准输出&#xff08;…...

python:pymunk + pygame 模拟六边形中小球弹跳运动

向 chat.deepseek.com 提问&#xff1a;编写 python 程序&#xff0c;用 pymunk, 有一个正六边形&#xff0c;围绕中心点缓慢旋转&#xff0c;六边形内有一个小球&#xff0c;六边形的6条边作为墙壁&#xff0c;小球受重力和摩擦力、弹力影响&#xff0c;模拟小球弹跳运动&…...

大数据学习栈记——Neo4j的安装与使用

本文介绍图数据库Neofj的安装与使用&#xff0c;操作系统&#xff1a;Ubuntu24.04&#xff0c;Neofj版本&#xff1a;2025.04.0。 Apt安装 Neofj可以进行官网安装&#xff1a;Neo4j Deployment Center - Graph Database & Analytics 我这里安装是添加软件源的方法 最新版…...

C++_核心编程_多态案例二-制作饮品

#include <iostream> #include <string> using namespace std;/*制作饮品的大致流程为&#xff1a;煮水 - 冲泡 - 倒入杯中 - 加入辅料 利用多态技术实现本案例&#xff0c;提供抽象制作饮品基类&#xff0c;提供子类制作咖啡和茶叶*//*基类*/ class AbstractDr…...

R语言AI模型部署方案:精准离线运行详解

R语言AI模型部署方案:精准离线运行详解 一、项目概述 本文将构建一个完整的R语言AI部署解决方案,实现鸢尾花分类模型的训练、保存、离线部署和预测功能。核心特点: 100%离线运行能力自包含环境依赖生产级错误处理跨平台兼容性模型版本管理# 文件结构说明 Iris_AI_Deployme…...

DBAPI如何优雅的获取单条数据

API如何优雅的获取单条数据 案例一 对于查询类API&#xff0c;查询的是单条数据&#xff0c;比如根据主键ID查询用户信息&#xff0c;sql如下&#xff1a; select id, name, age from user where id #{id}API默认返回的数据格式是多条的&#xff0c;如下&#xff1a; {&qu…...

在鸿蒙HarmonyOS 5中使用DevEco Studio实现录音机应用

1. 项目配置与权限设置 1.1 配置module.json5 {"module": {"requestPermissions": [{"name": "ohos.permission.MICROPHONE","reason": "录音需要麦克风权限"},{"name": "ohos.permission.WRITE…...

uniapp手机号一键登录保姆级教程(包含前端和后端)

目录 前置条件创建uniapp项目并关联uniClound云空间开启一键登录模块并开通一键登录服务编写云函数并上传部署获取手机号流程(第一种) 前端直接调用云函数获取手机号&#xff08;第三种&#xff09;后台调用云函数获取手机号 错误码常见问题 前置条件 手机安装有sim卡手机开启…...

WebRTC从入门到实践 - 零基础教程

WebRTC从入门到实践 - 零基础教程 目录 WebRTC简介 基础概念 工作原理 开发环境搭建 基础实践 三个实战案例 常见问题解答 1. WebRTC简介 1.1 什么是WebRTC&#xff1f; WebRTC&#xff08;Web Real-Time Communication&#xff09;是一个支持网页浏览器进行实时语音…...

在 Spring Boot 项目里,MYSQL中json类型字段使用

前言&#xff1a; 因为程序特殊需求导致&#xff0c;需要mysql数据库存储json类型数据&#xff0c;因此记录一下使用流程 1.java实体中新增字段 private List<User> users 2.增加mybatis-plus注解 TableField(typeHandler FastjsonTypeHandler.class) private Lis…...

前端开发者常用网站

Can I use网站&#xff1a;一个查询网页技术兼容性的网站 一个查询网页技术兼容性的网站Can I use&#xff1a;Can I use... Support tables for HTML5, CSS3, etc (查询浏览器对HTML5的支持情况) 权威网站&#xff1a;MDN JavaScript权威网站&#xff1a;JavaScript | MDN...

基于单片机的宠物屋智能系统设计与实现(论文+源码)

本设计基于单片机的宠物屋智能系统核心是实现对宠物生活环境及状态的智能管理。系统以单片机为中枢&#xff0c;连接红外测温传感器&#xff0c;可实时精准捕捉宠物体温变化&#xff0c;以便及时发现健康异常&#xff1b;水位检测传感器时刻监测饮用水余量&#xff0c;防止宠物…...