当前位置: 首页 > news >正文

数据分析ReAct工作流

让我用一个数据分析项目的例子来展示plan-and-execute框架的应用。这个例子会涉及数据处理、分析和可视化等任务。

from typing import List, Dict, Any
from dataclasses import dataclass
import json
from enum import Enum
import logging
from datetime import datetime# 任务状态枚举
class TaskStatus(Enum):PENDING = "pending"RUNNING = "running"COMPLETED = "completed"FAILED = "failed"# 任务优先级枚举
class TaskPriority(Enum):LOW = 1MEDIUM = 2HIGH = 3# 任务定义
@dataclass
class Task:id: strname: strdescription: strpriority: TaskPrioritydependencies: List[str]  # 依赖的任务ID列表status: TaskStatusresult: Any = Noneerror: str = None# 工作流执行器
class WorkflowExecutor:def __init__(self):self.tasks = {}self.logger = logging.getLogger(__name__)def add_task(self, task: Task):self.tasks[task.id] = taskdef get_ready_tasks(self) -> List[Task]:"""获取所有依赖已满足的待执行任务"""ready_tasks = []for task in self.tasks.values():if task.status == TaskStatus.PENDING:dependencies_met = all(self.tasks[dep_id].status == TaskStatus.COMPLETEDfor dep_id in task.dependencies)if dependencies_met:ready_tasks.append(task)return sorted(ready_tasks, key=lambda x: x.priority.value, reverse=True)def execute_task(self, task: Task):"""执行单个任务"""task.status = TaskStatus.RUNNINGtry:# 这里实现具体任务的执行逻辑result = self.task_handlers[task.id](task, {dep: self.tasks[dep].result for dep in task.dependencies})task.result = resulttask.status = TaskStatus.COMPLETEDexcept Exception as e:task.status = TaskStatus.FAILEDtask.error = str(e)self.logger.error(f"Task {task.id} failed: {e}")def execute_workflow(self):"""执行整个工作流"""while True:ready_tasks = self.get_ready_tasks()if not ready_tasks:breakfor task in ready_tasks:self.execute_task(task)# 检查是否所有任务都完成all_completed = all(task.status == TaskStatus.COMPLETED for task in self.tasks.values())return all_completed# 数据分析工作流示例
class DataAnalysisWorkflow:def __init__(self, data_path: str, output_path: str):self.data_path = data_pathself.output_path = output_pathself.executor = WorkflowExecutor()def plan_workflow(self):"""规划工作流程"""tasks = [Task(id="load_data",name="加载数据",description="从CSV文件加载数据",priority=TaskPriority.HIGH,dependencies=[],status=TaskStatus.PENDING),Task(id="clean_data",name="数据清洗",description="处理缺失值和异常值",priority=TaskPriority.HIGH,dependencies=["load_data"],status=TaskStatus.PENDING),Task(id="feature_engineering",name="特征工程",description="创建新特征",priority=TaskPriority.MEDIUM,dependencies=["clean_data"],status=TaskStatus.PENDING),Task(id="statistical_analysis",name="统计分析",description="计算基本统计指标",priority=TaskPriority.MEDIUM,dependencies=["clean_data"],status=TaskStatus.PENDING),Task(id="visualization",name="数据可视化",description="生成图表",priority=TaskPriority.MEDIUM,dependencies=["statistical_analysis"],status=TaskStatus.PENDING),Task(id="generate_report",name="生成报告",description="生成分析报告",priority=TaskPriority.LOW,dependencies=["visualization", "feature_engineering"],status=TaskStatus.PENDING)]for task in tasks:self.executor.add_task(task)def register_task_handlers(self):"""注册任务处理函数"""self.executor.task_handlers = {"load_data": self.load_data,"clean_data": self.clean_data,"feature_engineering": self.feature_engineering,"statistical_analysis": self.statistical_analysis,"visualization": self.visualization,"generate_report": self.generate_report}def load_data(self, task: Task, dependencies: Dict):import pandas as pddf = pd.read_csv(self.data_path)return dfdef clean_data(self, task: Task, dependencies: Dict):df = dependencies["load_data"]# 处理缺失值df = df.fillna(df.mean())# 处理异常值# ... 其他清洗逻辑return dfdef feature_engineering(self, task: Task, dependencies: Dict):df = dependencies["clean_data"]# 创建新特征# ... 特征工程逻辑return dfdef statistical_analysis(self, task: Task, dependencies: Dict):df = dependencies["clean_data"]stats = {"basic_stats": df.describe(),"correlations": df.corr(),# ... 其他统计分析}return statsdef visualization(self, task: Task, dependencies: Dict):import matplotlib.pyplot as pltstats = dependencies["statistical_analysis"]figures = []# 生成可视化# ... 可视化逻辑return figuresdef generate_report(self, task: Task, dependencies: Dict):figures = dependencies["visualization"]df_features = dependencies["feature_engineering"]report = {"timestamp": datetime.now().isoformat(),"statistics": str(dependencies["statistical_analysis"]),"features": df_features.columns.tolist(),"figures": [f.to_json() for f in figures]}# 保存报告with open(f"{self.output_path}/report.json", "w") as f:json.dump(report, f, indent=2)return reportdef run(self):"""运行完整的工作流"""self.plan_workflow()self.register_task_handlers()success = self.executor.execute_workflow()if success:final_report = self.executor.tasks["generate_report"].resultprint("工作流执行成功!")return final_reportelse:failed_tasks = [task for task in self.executor.tasks.values()if task.status == TaskStatus.FAILED]print("工作流执行失败。失败的任务:")for task in failed_tasks:print(f"- {task.name}: {task.error}")return None# 使用示例
def main():workflow = DataAnalysisWorkflow(data_path="data/sales_data.csv",output_path="output")result = workflow.run()if result:print("分析报告已生成:", result)else:print("工作流执行失败")if __name__ == "__main__":main()

这个例子展示了:

  1. 工作流框架的核心组件:
  • Task定义
  • 工作流执行器
  • 依赖管理
  • 状态追踪
  • 错误处理
  1. 实现的关键特性:
  • 自动任务规划
  • 依赖关系处理
  • 并行任务执行
  • 结果传递
  • 错误恢复
  1. 可以扩展的方向:
# 1. 添加任务重试机制
class RetryableExecutor(WorkflowExecutor):def execute_task(self, task: Task, max_retries: int = 3):retries = 0while retries < max_retries:try:super().execute_task(task)if task.status == TaskStatus.COMPLETED:breakexcept Exception as e:retries += 1self.logger.warning(f"Retry {retries}/{max_retries} for task {task.id}")# 2. 添加进度监控
class MonitoredWorkflow(DataAnalysisWorkflow):def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs)self.progress_callback = Nonedef set_progress_callback(self, callback):self.progress_callback = callbackdef update_progress(self, task: Task, status: str):if self.progress_callback:self.progress_callback(task, status)# 3. 添加中间结果缓存
class CachedExecutor(WorkflowExecutor):def __init__(self, cache_dir: str):super().__init__()self.cache_dir = cache_dirdef get_cached_result(self, task: Task):cache_path = f"{self.cache_dir}/{task.id}.cache"if os.path.exists(cache_path):return pickle.load(open(cache_path, "rb"))return Nonedef cache_result(self, task: Task):cache_path = f"{self.cache_dir}/{task.id}.cache"pickle.dump(task.result, open(cache_path, "wb"))
  1. 使用建议:
# 1. 配置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)# 2. 添加性能监控
from time import timeclass PerformanceMonitor:def __init__(self):self.task_times = {}def start_task(self, task_id: str):self.task_times[task_id] = {"start": time()}def end_task(self, task_id: str):self.task_times[task_id]["end"] = time()def get_task_duration(self, task_id: str):times = self.task_times[task_id]return times["end"] - times["start"]# 3. 实现优雅的终止
import signalclass GracefulWorkflow(DataAnalysisWorkflow):def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs)self.should_stop = Falsesignal.signal(signal.SIGINT, self.handle_interrupt)def handle_interrupt(self, signum, frame):print("\nReceived interrupt signal. Cleaning up...")self.should_stop = True

这个框架可以用于很多场景,比如:

  • 数据处理管道
  • ETL工作流
  • 机器学习实验
  • 报告生成系统
  • 自动化测试流程

关键是要根据具体需求调整任务定义和执行逻辑。

相关文章:

数据分析ReAct工作流

让我用一个数据分析项目的例子来展示plan-and-execute框架的应用。这个例子会涉及数据处理、分析和可视化等任务。 from typing import List, Dict, Any from dataclasses import dataclass import json from enum import Enum import logging from datetime import datetime#…...

Rust-AOP编程实战

文章本天成&#xff0c;妙手偶得之。粹然无疵瑕&#xff0c;岂复须人为&#xff1f;君看古彝器&#xff0c;巧拙两无施。汉最近先秦&#xff0c;固已殊淳漓。胡部何为者&#xff0c;豪竹杂哀丝。后夔不复作&#xff0c;千载谁与期&#xff1f; ——《文章》宋陆游 【哲理】文章…...

Flutter鸿蒙next 中的 Expanded 和 Flexible 使用技巧详解

在 Flutter 开发中&#xff0c;Expanded 和 Flexible 是两个非常常用的布局控件&#xff0c;它们可以帮助开发者更加灵活地管理 UI 布局的空间分配。虽然它们看起来非常相似&#xff0c;但它们的功能和使用场景有所不同。理解这两者的区别&#xff0c;能帮助你在构建复杂 UI 布…...

【微信小游戏学习心得】

这里是引用 微信小游戏学习心得 简介了解微信小游戏理解2d游戏原理数据驱动视图总结 简介 本人通过学习了解微信小游戏&#xff0c;学习微信小游戏&#xff0c;加深了对前端框架&#xff0c;vue和react基于数据驱动视图的理解&#xff0c;及浏览器文档模型和javaScript之间的关…...

Python | Leetcode Python题解之第539题最小时间差

题目&#xff1a; 题解&#xff1a; def getMinutes(t: str) -> int:return ((ord(t[0]) - ord(0)) * 10 ord(t[1]) - ord(0)) * 60 (ord(t[3]) - ord(0)) * 10 ord(t[4]) - ord(0)class Solution:def findMinDifference(self, timePoints: List[str]) -> int:n len…...

Zookeeper运维秘籍:四字命令基础、详解及业务应用全解析

文章目录 一、四字命令基础二、四字命令详解三、四字命令的开启与配置四、结合业务解读四字命令confconsenvi命令Stat命令MNTR命令ruok命令dump命令wchswchp ZooKeeper&#xff0c;作为一款分布式协调服务&#xff0c;提供了丰富的四字命令&#xff08;也称为四字短语&#xff…...

Error: `slot-scope` are deprecated报错解决

本人新手菜鸡&#xff0c;文章为自己遇到问题的记录&#xff0c;如有错误或不足还请大佬批评指正 问题描述 在Vue3环境下使用slot插槽&#xff0c;出现‘slot-scope’ are deprecated报错问题&#xff0c;经过查找发现&#xff0c;是因为在slot插槽使用中&#xff0c;vue2和vu…...

Excel(图例)中使用上标下标

单元格中 1、在Excel单元格中刷黑要设置成上标的字符&#xff0c;如m2中的2&#xff1b; 2、单击右键&#xff0c;在弹出的对话框中选择“设置单元格格式”&#xff1b; 3、在弹出的“设置单元格格式”对话框中选择上标&#xff08;或下标&#xff09;&#xff1b; 4、最后…...

熔断和降级

目录 隔离和降级 FeignClient整合Sentinel 通过Feign设置服务降级 1.创建类实现FallbackFactory接口&#xff0c;并让这个类和使用FeignClient的接口类绑定 2.让order-service服务的feign开启sentinel 3.测试&#xff0c;只开启order-service服务&#xff0c;而不开启user-…...

【学习笔记】Linux系统基础知识 6 —— su命令详解

提示&#xff1a;学习Linux系统基础命令 su 命令详解&#xff0c;包含通过 su 命令切换用户实例 一、前期准备 1.已经正确安装并成功进入Linux系统 说明&#xff1a;本实验采用的 Redhat 系统&#xff08;因系统不一致&#xff0c;可能部分显示存在差异&#xff09; 二、学…...

docker-compose命令介绍

docker-compose命令介绍 docker-compose1. docker-compose是什么2. Compose file format3. 命令3.1 服务相关命令upruncreatestartrestartdownstopkillrmpauseunpause 3.2 镜像相关命令3.3 查看相关命令 docker-compose 学了docker&#xff0c;然后就直接去学k8s了。恍恍惚惚几…...

Spring学习笔记_29——@Transactional

Transactional 1. 介绍 Transactional 是 Spring 框架提供的一个注解&#xff0c;用于声明方法或类级别的事务属性。 Spring事务&#xff1a;Spring学习笔记_28——事务-CSDN博客 当你在一个方法或类上使用 Transactional 注解时&#xff0c;Spring 会为该方法或类创建一个…...

github使用基础

要通过终端绑定GitHub账号并进行文件传输&#xff0c;你需要使用Git和SSH密钥来实现安全连接和操作。以下是一个基本流程&#xff1a; 设置GitHub和SSH 检查Git安装 通过终端输入以下命令查看是否安装Git&#xff1a; bash 复制代码 git --version配置Git用户名和邮箱 bash …...

Flink-Kafka-Connector

Apache Flink 是一个用于处理无界和有界数据的开源流处理框架。它支持高吞吐量、低延迟以及精确一次的状态一致性等特性。Flink 社区提供了丰富的连接器&#xff08;Connectors&#xff09;以方便与不同的数据源进行交互&#xff0c;其中就包括了 Apache Kafka 连接器。 Apach…...

远程终端vim里使用系统剪切板

1、本地通过终端远程linux server&#xff0c;由于不是桌面环境/GUI&#xff0c;终端vim里似乎没办法直接使用系统剪切板&#xff0c;即便已经是clipboard。 $ vim --version | grep clipboard clipboard keymap printer vertsplit eval …...

底层视角看C语言

文章目录 main函数很普通main函数之前调用了什么main函数和自定义函数的对比 变量名只为人而存在goto是循环的本质指针变量指针是一个特殊的数字汇编层面看指针 数组和指针数组越界问题低端地址越界高端地址越界 引用就是指针 main函数很普通 main函数是第一个被调用的函数吗&…...

【点云学习笔记】——分割任务学习

3D点云实例分割 vs 3D点云语义分割 1. 功能对比 代码1&#xff08;实例分割&#xff09;&#xff1a;用于3D点云中的实例分割任务&#xff0c;其目标是将点云中的物体分割成独立的实例。每个实例可能属于相同类别但需要被分开&#xff0c;比如在自动驾驶中的多个行人、汽车&am…...

Qt——窗口

一.窗口概述 Qt 窗口是通过 QMainWindow 类来实现的。 QMainWindow是一个为用户提供主窗口程序的类&#xff0c;继承QWidget类&#xff0c;并且提供一个预定义的布局。包含一个菜单栏&#xff08;menu bar&#xff09;&#xff0c;多个工具栏&#xff08;tool bars&#xff0…...

InfluxDB性能优化指南

1. 引言 1.1 InfluxDB的简介与发展背景 InfluxDB是一个开源的时间序列数据库&#xff08;TSDB&#xff09;&#xff0c;由InfluxData公司开发&#xff0c;专门用于处理高频率的数据写入和查询。其设计初衷是为物联网、应用程序监控、DevOps和实时分析等场景提供一个高效的存储…...

负载均衡式在线oj项目开发文档2(个人项目)

judge模块的框架 完成了网页渲染的功能之后&#xff0c;就需要判断用户提交的代码是否是正确的&#xff0c;当用户点击提交之后&#xff0c;就会交给路由模块的/judge模块&#xff0c;然后这个路由模块就需要去调用jude模块了&#xff0c;也就是需要一个新的jude模块&#xff…...

C# rtwpriv Wi-Fi定频工具

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录一、使用简介&#xff0c;说明#前言 对于无线产品&#xff0c;很多需要做CE,FCC,SRRC等认证&#xff0c;需要测试RF&#xff0c;像Realtek方案的Wi-Fi用到rtwpriv工具…...

Pixel Dream Workshop 对比测试:不同采样器与模型版本的出图效果

Pixel Dream Workshop 对比测试&#xff1a;不同采样器与模型版本的出图效果 1. 测试背景与目的 在AI绘画领域&#xff0c;采样器和模型版本的选择直接影响最终生成效果。本次测试旨在通过严谨的对比实验&#xff0c;帮助用户理解Pixel Dream Workshop中不同参数组合的实际表…...

避坑指南:StarRocks聚合模型排序键的5个常见错误配置(含性能对比测试)

StarRocks聚合模型排序键配置实战&#xff1a;从性能陷阱到最佳实践 当电商平台的UV统计查询从3秒延长到30秒&#xff0c;当数据仓库的存储空间以每天10%的速度膨胀&#xff0c;很多团队才意识到——聚合模型的排序键配置出了问题。作为StarRocks最核心的性能杠杆&#xff0c;排…...

VRChat玩家必看:用批处理脚本一键把缓存从C盘挪到E盘(附config.json手动修改方法)

VRChat玩家自救指南&#xff1a;彻底解决C盘缓存爆满的终极方案 每次打开VRChat看到C盘剩余空间像倒计时一样减少&#xff0c;是不是感觉血压都上来了&#xff1f;作为一款以用户生成内容为核心的游戏&#xff0c;VRChat会不断下载并缓存其他玩家创建的虚拟形象、世界场景等资源…...

NASA、ESA官方数据源直连失败?Python遥感API调用失效诊断手册(含12个HTTPS/Token/CRS认证报错速查表)

第一章&#xff1a;NASA、ESA官方遥感数据直连失效的典型现象与影响评估近年来&#xff0c;全球多个科研机构与商业遥感平台频繁报告无法稳定访问NASA Earthdata Login和ESA Copernicus Open Access Hub的API端点&#xff0c;表现为HTTP 503、401或连接超时等异常响应。此类直连…...

LaTeX algorithm2e避坑指南:为什么你的\tcp*注释后面总多个分号?

LaTeX algorithm2e避坑指南&#xff1a;为什么你的\tcp*注释后面总多个分号&#xff1f; 第一次在LaTeX中用algorithm2e写算法伪代码时&#xff0c;很多人会被\tcp*这个看似简单的注释命令坑到——明明只是想加个注释&#xff0c;结果代码末尾莫名其妙多出个分号&#xff0c;排…...

深入解析SD卡CMD指令集:从寄存器操作到数据传输实战

1. SD卡基础寄存器全解析 当你把一张SD卡插入读卡器时&#xff0c;系统瞬间就能识别出容量和型号&#xff0c;这个过程背后其实是SD卡内部寄存器的功劳。这些寄存器就像SD卡的"身份证"和"体检报告"&#xff0c;存储着所有关键信息。我刚开始接触嵌入式开发…...

3.28 学习笔记

3.28 学习笔记web金融项目实战1.对于需求分析仔细研读需求规格说明书&#xff0c;以及相关文档&#xff0c;理解项目的目标和流程2.对于编写测试点&#xff08;1&#xff09;进行界面检查&#xff08;2&#xff09;从正确的业务流程编写&#xff0c;执行&#xff0c;查看对应功…...

DLSS Swapper:轻松管理游戏超采样版本,释放显卡全部性能

DLSS Swapper&#xff1a;轻松管理游戏超采样版本&#xff0c;释放显卡全部性能 【免费下载链接】dlss-swapper 项目地址: https://gitcode.com/GitHub_Trending/dl/dlss-swapper 在追求极致游戏体验的今天&#xff0c;DLSS&#xff08;深度学习超采样&#xff09;技术…...

开源小模型怎么选?Qwen1.5-0.5B-Chat轻量化优势解析

开源小模型怎么选&#xff1f;Qwen1.5-0.5B-Chat轻量化优势解析 1. 为什么需要轻量级小模型&#xff1f; 当我们谈论AI大模型时&#xff0c;很多人首先想到的是那些需要高端GPU、动辄几十GB内存的庞然大物。但在实际应用中&#xff0c;特别是个人开发者、中小企业或者教育场景…...