当前位置: 首页 > news >正文

【LangFuse】数据集与测试

1. 在线标注

在这里插入图片描述

2. 上传已有数据集

import json# 调整数据格式 {"input":{...},"expected_output":"label"}
data = []
with open('my_annotations.jsonl', 'r', encoding='utf-8') as fp:for line in fp:example = json.loads(line.strip())item = {"input": {"outlines": example["outlines"],"user_input": example["user_input"]},"expected_output": example["label"]}data.append(item)
#%%
from langfuse import Langfuse
from langfuse.model import CreateDatasetRequest, CreateDatasetItemRequest
from tqdm import tqdm
import langfusedataset_name = "my-dataset"# 初始化客户端
langfuse=Langfuse()# 创建数据集,如果已存在不会重复创建
try:langfuse.create_dataset(name=dataset_name,# optional descriptiondescription="My first dataset",# optional metadatametadata={"author": "wzr","type": "demo"})
except:pass# 考虑演示运行速度,只上传前50条数据
for item in tqdm(data[:50]):langfuse.create_dataset_item(dataset_name="my-dataset",input=item["input"],expected_output=item["expected_output"])

tqdm

100%|██████████| 50/50 [00:11<00:00, 4.48it/s]

3. 定义评估函数

def simple_evaluation(output, expected_output):return output == expected_output

4.运行测试

Prompt 模板与 Chain(LCEL)

 import ChatOpenAI
from langchain_core.output_parsers import StrOutputParserneed_answer = PromptTemplate.from_template("""
*********
你是老师的助教,你的工作是从学员的课堂交流中选择出需要老师回答的问题,加以整理以交给老师回答。课程内容:
{outlines}
*********
学员输入:
{user_input}
*********
如果这是一个需要老师答疑的问题,回复Y,否则回复N。
只回复Y或N,不要回复其他内容。""")model = ChatOpenAI(temperature=0, seed=42)
parser = StrOutputParser()chain_v1 = (need_answer| model| parser

在LangFuse数据集测试效果:

from concurrent.futures import ThreadPoolExecutor
import threading
from langfuse import Langfuse
from datetime import datetimelangfuse = Langfuse()
lock = threading.Lock()def run_evaluation(chain, dataset_name, run_name):dataset = langfuse.get_dataset(dataset_name)def process_item(item):with lock:# 注意:多线程调用此处要加锁,否则会有id冲突导致丢数据handler = item.get_langchain_handler(run_name=run_name)# Assuming chain.invoke is a synchronous functionoutput = chain.invoke(item.input, config={"callbacks": [handler]})# Assuming handler.root_span.score is a synchronous functionhandler.trace.score(name="accuracy",value=simple_evaluation(output, item.expected_output))print('.', end='', flush=True)# for item in dataset.items:#    process_item(item)with ThreadPoolExecutor(max_workers=4) as executor:executor.map(process_item, dataset.items)
run_evaluation(chain_v1, "my-dataset", "v1-"+datetime.now().strftime("%d/%m/%Y %H:%M:%S"))# 保证全部数据同步到云端
langfuse_context.flush()

tqdm Python进度条

tqdm 是一个快速,可扩展的Python进度条,可以在 Python 长循环中添加一个进度提示信息,用户只需要封装任意的 iterable 在 tqdm(iterable) 函数下即可。

在你的代码中:

from tqdm import tqdmwith tqdm(total=len(dataset.items)) as pbar:for _ in executor.map(process_item, dataset.items):pbar.update(1)

你首先从 tqdm 模块导入了 tqdm 函数,然后在 with 语句中创建了一个 tqdm 对象。total=len(dataset.items) 设置了进度条的总长度,即数据集项目的总数。

for 循环中,executor.map(process_item, dataset.items) 是一个并行执行任务的操作,它会对 dataset.items 中的每一个元素调用 process_item 函数。每次任务完成时,pbar.update(1) 会更新进度条,表示有一个任务已经完成。

这样,当你的程序运行时,你会在控制台上看到一个进度条,它会显示已经完成的任务数量和总任务数量,以及进度条的百分比和估计的剩余时间。这对于长时间运行的任务非常有用,因为它可以让用户知道任务的进度和完成时间。

ThreadPoolExecutor

这段代码使用了Python的concurrent.futures.ThreadPoolExecutor类来创建一个线程池,并使用该线程池并行地执行任务。下面是对这段代码的详细解释:

from concurrent.futures import ThreadPoolExecutorwith ThreadPoolExecutor(max_workers=4) as executor:executor.map(process_item, dataset.items)
  1. ThreadPoolExecutor(max_workers=4): 这里创建了一个ThreadPoolExecutor实例,它管理一个包含4个线程的线程池。max_workers参数定义了线程池中线程的最大数量。

  2. with … as executor: 使用with语句来管理线程池的生命周期。当with语句块开始时,线程池被创建,当with语句块结束时,线程池会被关闭。使用with语句可以确保线程池在不再需要时被正确关闭,释放资源。

  3. executor.map(process_item, dataset.items): map方法是ThreadPoolExecutor的一个方法,它接受一个函数和一个可迭代对象作为参数。在这个例子中,process_item函数会被应用到dataset.items中的每一个元素上。map方法会返回一个结果的迭代器,这些结果与输入的可迭代对象的元素一一对应。

这段代码的作用是创建一个包含4个线程的线程池,然后并行地对dataset.items中的每一个元素调用process_item函数。这是一种并行处理任务的方式,可以提高程序的效率,特别是在处理I/O密集型任务(如网络请求或文件读写)时。

这段代码实现了一个基于Langfuse的并行评估系统,主要用于对LangChain流程进行批量测试和监控。以下是对代码的逐层解析:


代码结构概览

from concurrent.futures import ThreadPoolExecutor
import threading
from langfuse import Langfuse
from datetime import datetimelangfuse = Langfuse()  # 初始化Langfuse客户端
lock = threading.Lock()  # 全局线程锁def run_evaluation(chain, dataset_name, run_name):# 核心评估逻辑# ...

核心功能模块分解

1. 数据加载
dataset = langfuse.get_dataset(dataset_name)
  • 作用:从Langfuse平台加载预定义的测试数据集。
  • 数据格式:假设数据集包含多个items,每个item包含:
    {"input": "用户输入内容",  # 测试输入"expected_output": "期望输出"  # 预期结果
    }
    

2. 多线程任务分发
with ThreadPoolExecutor(max_workers=4) as executor:executor.map(process_item, dataset.items)
  • 技术选型:使用线程池(4线程)并行处理数据集。
  • 性能优势:适合处理I/O密集型任务(如网络请求到OpenAI接口)。
  • 注意事项:需确保chain.invoke和Langfuse操作是线程安全的。

3. 线程安全处理
def process_item(item):with lock:  # 加锁防止Handler ID冲突handler = item.get_langchain_handler(run_name=run_name)
  • 关键问题
    • Langfuse的Handler生成依赖上下文ID,多线程环境下可能产生ID冲突。
    • 不加锁会导致监控数据丢失或错乱。
  • 解决方式:通过全局锁(threading.Lock)序列化Handler的创建过程。

4. 监控埋点与评估
output = chain.invoke(item.input, config={"callbacks": [handler]})handler.trace.score(name="accuracy",value=simple_evaluation(output, item.expected_output)
)
  • 埋点层次
    1. 全链路追踪:通过handler自动记录LangChain每一步的执行过程。
    2. 自定义评分:手动添加准确率(accuracy)评分。
  • 评估逻辑simple_evaluation函数(需自定义)对比输出与预期结果,例如:
    def simple_evaluation(output, expected):return 1 if output.strip() == expected.strip() else 0
    

监控数据示例

在Langfuse后台会生成如下结构的数据:

字段示例值说明
TraceLangChain Evaluation评估任务根节点
├─ Spanprompt_processingLangChain内部步骤
├─ Generationgpt-3.5-turboLLM调用记录
└─ Scoreaccuracy: 0.8自定义评分

关键设计模式

1. 生产者-消费者模式
  • 生产者:线程池分发数据项。
  • 消费者process_item函数处理单个数据项。
2. 回调机制

通过config={"callbacks": [handler]}将Langfuse监控注入LangChain执行过程。

3. 线程安全隔离
  • 全局锁:确保Handler创建原子性。
  • 线程局部存储:Langfuse SDK内部应使用线程局部变量存储上下文。

潜在优化点

1. 错误处理增强
try:output = chain.invoke(...)
except Exception as e:handler.trace.error(str(e))  # 记录异常
2. 动态线程数调整
max_workers = os.cpu_count() * 2  # 根据CPU核心数动态设置
3. 进度可视化
from tqdm import tqdmwith tqdm(total=len(dataset.items)) as pbar:for _ in executor.map(process_item, dataset.items):pbar.update(1)

适用场景

  1. 批量模型测试:对比不同模型版本的效果。
  2. 回归测试:确保代码更新不破坏核心功能。
  3. 异常诊断:通过Trace分析失败案例的详细执行路径。

通过这种设计,开发者可以高效地验证LangChain应用的质量,并在生产环境中持续监控性能表现。

相关文章:

【LangFuse】数据集与测试

1. 在线标注 2. 上传已有数据集 import json# 调整数据格式 {"input":{...},"expected_output":"label"} data [] with open(my_annotations.jsonl, r, encodingutf-8) as fp:for line in fp:example json.loads(line.strip())item {"i…...

【Python】如何解决Jupyter Notebook修改外部模块后必须重启内核的问题?

“为什么我修改了Python模块的代码&#xff0c;Jupyter Notebook却看不到变化&#xff1f;” 一、问题现象&#xff1a;令人抓狂的开发体验 假设你正在开发一个图像处理项目&#xff0c;项目结构如下&#xff1a; project/ ├── utils/ │ └── image_processor.py └…...

Redis 篇

一、数据结构 二、持久化方式 Redis 提供了两种主要的持久化方式&#xff0c;分别是 RDB&#xff08;Redis Database&#xff09;和 AOF&#xff08;Append Only File&#xff09;&#xff0c;此外&#xff0c;还可以同时使用这两种方式以增强数据安全性&#xff0c;以下为你…...

React + TypeScript 实战指南:用类型守护你的组件

TypeScript 为 React 开发带来了强大的类型安全保障&#xff0c;这里解析常见的一些TS写法&#xff1a; 一、组件基础类型 1. 函数组件定义 // 显式声明 Props 类型并标注返回值 interface WelcomeProps {name: string;age?: number; // 可选属性 }const Welcome: React.FC…...

从零开始:Linux环境下如何制作静态库与动态库

个人主页&#xff1a;chian-ocean 文章专栏-Linux 前言 动静态库是编程中两种主要的库类型&#xff0c;它们用于帮助开发者复用已有的代码&#xff0c;而不需要每次都从头开始编写。它们的主要区别在于链接和加载的时机、方式以及使用场景 库 库就是一些已经写好并且经过测试…...

【智能体Agent】ReAct智能体的实现思路和关键技术

基于ReAct&#xff08;Reasoning Acting&#xff09;框架的自主智能体 import re from typing import List, Tuplefrom langchain_community.chat_message_histories.in_memory import ChatMessageHistory from langchain_core.language_models.chat_models import BaseChatM…...

Java进阶:Zookeeper相关笔记

概要总结&#xff1a; ●Zookeeper是一个开源的分布式协调服务&#xff0c;需要下载并部署在服务器上(使用cmd启动&#xff0c;windows与linux都可用)。 ●zookeeper一般用来实现诸如数据订阅/发布、负载均衡、命名服务、集群管理、分布式锁和分布式队列等功能。 ●有多台服…...

QT-绘画事件

实现颜色的随时调整&#xff0c;追加橡皮擦功能 widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QColor> #include <QPoint> #include <QVector> #include <QMouseEvent> #include <QPainter> #include <Q…...

鸿蒙NEXT开发-端云一体化开发

注意&#xff1a;博主有个鸿蒙专栏&#xff0c;里面从上到下有关于鸿蒙next的教学文档&#xff0c;大家感兴趣可以学习下 如果大家觉得博主文章写的好的话&#xff0c;可以点下关注&#xff0c;博主会一直更新鸿蒙next相关知识 目录 端云一体化开发基本概念 传统架构 端云一…...

大模型——股票分析AI工具开发教程

大模型——股票分析AI工具开发教程 在本教程中,我们将利用Google Gemini 2.0 Flash模型创建一个简单但有效的股票分析器。 你是否曾被大量的股票市场数据所淹没?希望有一个私人助理来筛选噪音并为您提供清晰、可操作的见解?好吧,你可以自己构建一个,而且由于 Python 的强…...

nexus 实现https 私有镜像搭建

1、安装nexus 1.1 安装JDK17 rpm -ivh jdk-17.0.13_linux-x64_bin.rpm 1.2 下载安装包解压到指定目录 tar zxvf nexus-3.77.2-02-unix.tar.gz -C /usr/local 2、运行nexus 默认8081端口 cd /usr/local/nexus-3.77.2-02 && bin/nexus start 3、配置nexus私有docker 镜…...

颈椎X光数据集(cervical spine X-ray dataset)

颈椎X光数据集&#xff08;cervical spine X-ray dataset&#xff09; 一.颈椎X光&#xff08;1248张原始图像&#xff0c;无处理&#xff0c;jpg格式&#xff09; 二&#xff0e;颈椎X光&#xff08;1000张原始图像&#xff0c;无处理&#xff0c;jpg格式&#xff09; 此数据…...

(动态规划 完全背包 零钱兑换)leetcode 322

本题为完全背包 与01背包的区别是 物品可以任意取 而01背包只能取一次 这就导致了状态转移方程的不同 1.当放不下:的时候 转移方程是一样的 取0到i-1 物品&#xff0c;背包容量为j的最优值 else 2.放得下:就是取 0到i-1 物品,背包容量为j的最优值和 “0到i的[j-w[i]]v…...

【AI大模型】DeepSeek + Kimi 高效制作PPT实战详解

目录 一、前言 二、传统 PPT 制作问题 2.1 传统方式制作 PPT 2.2 AI 大模型辅助制作 PPT 2.3 适用场景对比分析 2.4 最佳实践与推荐 三、DeepSeek Kimi 高效制作PPT操作实践 3.1 Kimi 简介 3.2 DeepSeek Kimi 制作PPT优势 3.2.1 DeepSeek 优势 3.2.2 Kimi 制作PPT优…...

Pytorch的一小步,昇腾芯片的一大步

Pytorch的一小步&#xff0c;昇腾芯片的一大步 相信在AI圈的人多多少少都看到了最近的信息&#xff1a;PyTorch最新2.1版本宣布支持华为昇腾芯片&#xff01; 1、 发生了什么事儿&#xff1f; 在2023年10月4日PyTorch 2.1版本的发布博客上&#xff0c;PyTorch介绍的beta版本…...

rabbitmq-amqp事务消息+消费失败重试机制+prefetch限流

1. 安装和配置 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency><dependency> <groupId>com.fasterxml.jackson.core</groupId> <arti…...

【HarmonyOS Next】自定义Tabs

背景 项目中Tabs的使用可以说是特别的频繁&#xff0c;但是官方提供的Tabs使用起来&#xff0c;存在tab选项卡切换动画滞后的问题。 原始动画无法满足产品的UI需求&#xff0c;因此&#xff0c;这篇文章将实现下面页面滑动&#xff0c;tab选项卡实时滑动的动画效果。 实现逻…...

Sass 模块化革命:深入解析 @use 语法,打造高效 CSS 架构

文章目录 前言use 用法1. 模块化与命名空间2. use 中 as 语法的使用3. as * 语法的使用4. 私有成员的访问5. use 中with默认值6. use 导入问题总结下一篇预告&#xff1a; 前言 在上一篇中&#xff0c;我们深入探讨了 Sass 中 import 语法的局限性&#xff0c;正是因为这些问题…...

【渗透测试】反弹 Shell 技术详解(一)

反弹 Shell 技术详解 一、前置知识 反弹 shell&#xff08;Reverse Shell&#xff09;是一种技术&#xff0c;攻击者利用它可以在远程主机上获得一个交互式的命令行接口。通常情况下&#xff0c;反弹 shell 会将标准输入&#xff08;stdin&#xff09;、标准输出&#xff08;…...

python:pymunk + pygame 模拟六边形中小球弹跳运动

向 chat.deepseek.com 提问&#xff1a;编写 python 程序&#xff0c;用 pymunk, 有一个正六边形&#xff0c;围绕中心点缓慢旋转&#xff0c;六边形内有一个小球&#xff0c;六边形的6条边作为墙壁&#xff0c;小球受重力和摩擦力、弹力影响&#xff0c;模拟小球弹跳运动&…...

手游刚开服就被攻击怎么办?如何防御DDoS?

开服初期是手游最脆弱的阶段&#xff0c;极易成为DDoS攻击的目标。一旦遭遇攻击&#xff0c;可能导致服务器瘫痪、玩家流失&#xff0c;甚至造成巨大经济损失。本文为开发者提供一套简洁有效的应急与防御方案&#xff0c;帮助快速应对并构建长期防护体系。 一、遭遇攻击的紧急应…...

8k长序列建模,蛋白质语言模型Prot42仅利用目标蛋白序列即可生成高亲和力结合剂

蛋白质结合剂&#xff08;如抗体、抑制肽&#xff09;在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上&#xff0c;高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术&#xff0c;但这类方法普遍面临资源消耗巨大、研发周期冗长…...

学习STC51单片机31(芯片为STC89C52RCRC)OLED显示屏1

每日一言 生活的美好&#xff0c;总是藏在那些你咬牙坚持的日子里。 硬件&#xff1a;OLED 以后要用到OLED的时候找到这个文件 OLED的设备地址 SSD1306"SSD" 是品牌缩写&#xff0c;"1306" 是产品编号。 驱动 OLED 屏幕的 IIC 总线数据传输格式 示意图 …...

什么?连接服务器也能可视化显示界面?:基于X11 Forwarding + CentOS + MobaXterm实战指南

文章目录 什么是X11?环境准备实战步骤1️⃣ 服务器端配置(CentOS)2️⃣ 客户端配置(MobaXterm)3️⃣ 验证X11 Forwarding4️⃣ 运行自定义GUI程序(Python示例)5️⃣ 成功效果![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/55aefaea8a9f477e86d065227851fe3d.pn…...

ip子接口配置及删除

配置永久生效的子接口&#xff0c;2个IP 都可以登录你这一台服务器。重启不失效。 永久的 [应用] vi /etc/sysconfig/network-scripts/ifcfg-eth0修改文件内内容 TYPE"Ethernet" BOOTPROTO"none" NAME"eth0" DEVICE"eth0" ONBOOT&q…...

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据 Power Query 具有大量专门帮助您清理和准备数据以供分析的功能。 您将了解如何简化复杂模型、更改数据类型、重命名对象和透视数据。 您还将了解如何分析列&#xff0c;以便知晓哪些列包含有价值的数据&#xff0c;…...

AI,如何重构理解、匹配与决策?

AI 时代&#xff0c;我们如何理解消费&#xff1f; 作者&#xff5c;王彬 封面&#xff5c;Unplash 人们通过信息理解世界。 曾几何时&#xff0c;PC 与移动互联网重塑了人们的购物路径&#xff1a;信息变得唾手可得&#xff0c;商品决策变得高度依赖内容。 但 AI 时代的来…...

[大语言模型]在个人电脑上部署ollama 并进行管理,最后配置AI程序开发助手.

ollama官网: 下载 https://ollama.com/ 安装 查看可以使用的模型 https://ollama.com/search 例如 https://ollama.com/library/deepseek-r1/tags # deepseek-r1:7bollama pull deepseek-r1:7b改token数量为409622 16384 ollama命令说明 ollama serve #&#xff1a…...

xmind转换为markdown

文章目录 解锁思维导图新姿势&#xff1a;将XMind转为结构化Markdown 一、认识Xmind结构二、核心转换流程详解1.解压XMind文件&#xff08;ZIP处理&#xff09;2.解析JSON数据结构3&#xff1a;递归转换树形结构4&#xff1a;Markdown层级生成逻辑 三、完整代码 解锁思维导图新…...

PydanticAI快速入门示例

参考链接&#xff1a;https://ai.pydantic.dev/#why-use-pydanticai 示例代码 from pydantic_ai import Agent from pydantic_ai.models.openai import OpenAIModel from pydantic_ai.providers.openai import OpenAIProvider# 配置使用阿里云通义千问模型 model OpenAIMode…...