当前位置: 首页 > article >正文

dagster的etl实现

本文展示了如何使用Dagster框架实现一个动态ETL(Extract, Transform, Load)流程。通过定义多个操作(op),包括生成动态任务、处理单个任务、收集结果和汇总结果,构建了一个动态任务处理流程。generate_tasks操作生成多个动态任务,process_task对每个任务进行处理,collect_results收集所有处理结果,summarize_results汇总结果并生成资产。最后,通过Definitions将流程定义为可执行的作业(job),并提供了直接运行流程的示例代码

注意dagster版本

dagster, version 1.10.14

"""
实现dagster的etl实践案例。
"""from dagster import op, graph, job
from dagster import DynamicOut, DynamicOutput, AssetMaterialization, Out, Output
from dagster import Definitions
from typing import List@op(out=DynamicOut(int))
def generate_tasks(context):"""生成动态任务,每个任务对应一个整数值"""context.log.info("开始生成动态任务...")for i in range(3):yield DynamicOutput(value=i, mapping_key=f"task_{i}")@op
def process_task(context, num: int) -> int:"""处理单个任务,将输入值乘以2"""context.log.info(f"处理任务 {num}")return num * 2@op(out=Out(List[int]))
def collect_results(context, results: List[int]) -> List[int]:"""收集所有处理结果并返回列表"""context.log.info(f"收集到 {len(results)} 个处理结果")context.log.info(f"数据细节:{results}")return results@op(out = Out(int))
def summarize_results(context, results: list):"""汇总处理结果,计算总和"""total = sum(results)total = int(total)context.log.info(f"所有任务处理完成,总和为: {total}")yield AssetMaterialization(asset_key="final_result",description="所有任务处理结果的总和",metadata={"total": total})yield Output(total, output_name="result")@graph
def dynamic_pipeline():"""定义动态任务处理流程"""results = generate_tasks().map(process_task)collected = collect_results(results.collect())summarize_results(collected)# 将graph转换为可执行的job
@job
def run_dynamic_pipeline():dynamic_pipeline()# 定义可执行实体
defs = Definitions(jobs=[run_dynamic_pipeline]
)# 示例:直接运行pipeline(用于测试)
if __name__ == "__main__":result = run_dynamic_pipeline.execute_in_process()print("执行结果:", result.success)

相关文章:

dagster的etl实现

本文展示了如何使用Dagster框架实现一个动态ETL(Extract, Transform, Load)流程。通过定义多个操作(op),包括生成动态任务、处理单个任务、收集结果和汇总结果,构建了一个动态任务处理流程。generate_tasks…...

python的漫画网站管理系统

目录 技术栈介绍具体实现截图![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/0ed2084038144499a162b3fb731a5f37.png)![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/a76a091066f74a80bf7ac1be489ae8a8.png)系统设计研究方法:设计步骤设计流程核…...

源码安装gperftools工具

源码安装gperftools工具 下载gperftools源码 https://github.com/gperftools/gperftools/releases/download/gperftools-2.16/gperftools-2.16.tar.gz 注:需要下载github上release版本,如果直接下载master分支上源码,将可能出现各种编译报错…...

QMK 宏(Macros)功能详解(实战部分)

QMK 宏(Macros)功能详解(实战部分) 一、宏的基本概念与作用 宏(Macros)是 QMK 固件中一项强大的功能,它允许您在按下单个按键时执行多个按键操作。通过宏,您可以: 输入常用短语或文本执行复杂的按键组合自动化重复性操作触发系统功能或快捷键🔔 安全提示:虽然可以…...

前端脚手架开发指南:提高开发效率的核心操作

前端脚手架通过自动化的方式可以提高开发效率并减少重复工作,而最强大的脚手架并不是现成的那些工具而是属于你自己团队量身定制的脚手架!本篇文章将带你了解脚手架开发的基本技巧,帮助你掌握如何构建适合自己需求的工具,并带着你…...

搜索引擎工作原理|倒排索引|query改写|CTR点击率预估|爬虫

写在前面 使用搜索引擎是我们经常做的事情,搜索引擎的实现原理。 什么是搜索引擎 搜索引擎是一种在线搜索工具,当用户在搜索框输入关键词时,搜索引擎就会将与该关键词相关的内容展示给用户。比较大型的搜索引擎有谷歌,百度&…...

Python实例题:Python自动工资条

目录 Python实例题 题目 python-automatic-payroll-slipPython 自动生成工资条脚本 代码解释 加载文件: 获取表头: 写入表头: 生成工资条: 保存文件: 运行思路 注意事项 Python实例题 题目 Python自动工资…...

Function Calling万字实战指南:打造高智能数据分析Agent平台

个人主页:Guiat 归属专栏:科学技术变革创新 文章目录 1. Function Calling:智能交互的新范式1.1 Function Calling 技术概述1.2 核心优势分析 2. 数据分析Agent平台架构设计2.1 系统架构概览2.2 核心组件解析2.2.1 函数注册中心2.2.2 Agent控…...

spark MySQL数据库配置

Spark 连接 MySQL 数据库的配置 要让 Spark 与 MySQL 数据库实现连接,需要进行以下配置步骤。下面为你提供详细的操作指南和示例代码: 1. 添加 MySQL JDBC 驱动依赖 你得把 MySQL 的 JDBC 驱动添加到 Spark 的类路径中。可以通过以下两种方式来完成&a…...

python四则运算计算器

python四则运算计算器 是谁说,python不好写计算器的,我亲自写个无ui的计算器功能,证明这是谣言 step1:C:\Users\wangrusheng\Downloads\num.txt 15 - 4 * 3 10 / 2(5 3) * 2 6 / 31/2 * 8 3/4 * 4 - 0.52.5 * (4 1.6) - 9 / 3-6 12 * (…...

线对板连接器的兼容性问题:为何老旧设计难以满足现代需求?

线对板连接器作为电子设备的核心纽带,正面临前所未有的兼容性挑战。某智能工厂升级生产线时发现,沿用十年的2.54毫米间距连接器,在接入新型工业相机时出现30%的信号丢包率,而切换至0.4毫米超密间距连接器后,数据传输速…...

AI517 AI本地部署 docker微调(失败)

本地部署AI 计划使用OLLAMA进行本地部署 修改DNS 访问github 刷新缓存 配置环境变量 OLLAMA安装成功 部署成功 计划使用docker进行微调 下载安装docker 虚拟化已开启 开启上面这些 准备下载ubuntu docker ragflow dify 用git去泡...

VR和眼动控制集群机器人的方法

西安建筑科技大学信息与控制工程学院雷小康老师团队联合西北工业大学航海学院彭星光老师团队,基于虚拟现实(VR)和眼动追踪技术实现了人-集群机器人高效、灵活的交互控制。相关研究论文“基于虚拟现实和眼动的人-集群机器人交互方法” 发表于信…...

python训练营打卡第26天

函数专题1:函数定义与参数 知识点回顾: 函数的定义变量作用域:局部变量和全局变量函数的参数类型:位置参数、默认参数、不定参数传递参数的手段:关键词参数传递参数的顺序:同时出现三种参数类型时 作业&…...

TiDB 中新 Hash Join 的设计与性能优化

原文来源: https://tidb.net/blog/11667c37 本文作者:徐飞 导读 在数据库管理系统(DBMS)中,连接操作(Join)是查询处理的核心环节之一,其性能直接影响到整个系统的响应速度和效率…...

1.共享内存(python共享内存实际案例,传输opencv frame)

主进程程序 send.py import cv2 import numpy as np from multiprocessing import shared_memory, resource_trackercap cv2.VideoCapture(0) if not cap.isOpened():print("无法打开 RTSP 流,请检查地址、网络连接或 GStreamer 配置。") else:# 创建共…...

网页常见水印实现方式

文章目录 1 明水印技术实现1.1 DOM覆盖方案1.2 Canvas动态渲染1.3 CSS伪元素方案2 暗水印技术解析2.1 空域LSB算法2.2 频域傅里叶变换3 防篡改机制设计3.1 MutationObserver防护3.2 Canvas指纹追踪4 前后端实现对比5 攻防博弈深度分析5.1 常见破解手段5.2 进阶防御策略6 选型近…...

oracle主备切换参考

主备正常切换操作参考:RAC两节点->单机 (rac和单机的操作区别:就是关闭其它节点,剩一个节点操作即可) 1.主库准备 检查状态 SQL> select inst_id,database_role,OPEN_MODE from gv$database; INST_ID DATA…...

Java大师成长计划之第25天:Spring生态与微服务架构之容错与断路器模式

📢 友情提示: 本文由银河易创AI(https://ai.eaigx.com)平台gpt-4-turbo模型辅助创作完成,旨在提供灵感参考与技术分享,文中关键数据、代码与结论建议通过官方渠道验证。 在微服务架构中,系统通常…...

【ARM】MDK如何将变量存储到指定内存地址

1、 文档目标 在嵌入式系统开发中,通过MDK(Microcontroller Development Kit)进行工程配置,将指定的变量存储到指定的内存地址上是一项非常重要的技术。这项操作不仅能够满足特定硬件架构的需求,还能优化系统的性能和…...

Unity3D仿星露谷物语开发44之收集农作物

1、目标 在土地中挖掘后,洒下种子后逐渐成长,然后使用篮子收集成熟后的农作物,工具栏中也会相应地增加该农作物。 2、修改CropStandard的参数 Assets -> Prefabs -> Crop下的CropStandard,修改其Box Collider 2D的Size(Y…...

langchain—chatchat

署部 下载项目 git clone --recursive https://github.com/chatchat-space/Langchain-Chatchat.git 进入目录 cd Langchain-Chatchat anaconda环境准备 创建python环境 conda create -n langchain_env python3.10 -y 激活环境 conda activate langchain_env 验证pyhton环境…...

经典算法 求C(N, K) % mod,保证mod是质数

求C(N, K) % mod,保证mod是质数 问题描述 给你三个整数N,K,mod保证mod是一个质数,求组合数C(N, K) % mod。 输入描述 输入有多组,输入第一行为两个整数T,mod。接下来2 - T 1行,每行输入N, K。 输出描…...

【LeetCode 热题 100】二叉树的最大深度 / 翻转二叉树 / 二叉树的直径 / 验证二叉搜索树

⭐️个人主页:小羊 ⭐️所属专栏:LeetCode 热题 100 很荣幸您能阅读我的文章,诚请评论指点,欢迎欢迎 ~ 目录 二叉树的中序遍历二叉树的最大深度翻转二叉树对称二叉树二叉树的直径二叉树的层序遍历将有序数组转换为二叉搜索树验…...

关于软件测试开发的一些有趣的知识

文章目录 一、什么是测试?二、为什么要软件测试软件测试三、测试的岗位有哪些四 、软件测试和开发的区别五、走测试岗位为什么还要学开发。4、优秀的测试人员具备的素质我为什么走测试岗位 一、什么是测试? 其实这个问题说简单也不简单,说难…...

uni-app 开发HarmonyOS的鸿蒙影视项目分享:从实战案例到开源后台

最近,HBuilderX 新版本发布,带来了令人兴奋的消息——uni-app 现在支持 Harmony Next 平台的 App 开发。这对于开发者来说无疑是一个巨大的福音,意味着使用熟悉的 Vue 3 语法和开发框架,就可以为鸿蒙生态贡献自己的力量。 前言 作…...

售前工作.工作流程和工具

第一部分 售前解决方案及技术建议书的制作 售前解决方案编写的标准操作步骤SOP: 售前解决方案写作方法_哔哩哔哩_bilibili 第二部分 投标过程关键活动--商务标技术方案 1. 按项目管理--售前销售项目立项 销售活动和销售线索的跟踪流程和工具 1)拿到标书&#xff…...

GPU与NPU异构计算任务划分算法研究:基于强化学习的Transformer负载均衡实践

点击 “AladdinEdu,同学们用得起的【H卡】算力平台”,H卡级别算力,按量计费,灵活弹性,顶级配置,学生专属优惠。 引言 在边缘计算与AI推理场景中,GPU-NPU异构计算架构已成为突破算力瓶颈的关键技…...

学习ai课程大纲

以下是一个通用的 AI 课程大纲,涵盖从基础到进阶的核心内容,适用于大学课程或自学规划。你可以根据自身需求(如入门、进阶、专项方向)调整内容和深度。 人工智能(AI)课程大纲 第一部分:基础理论…...

基于CentOS7制作OpenSSL 1.1的RPM包

背景:CentOS7 已经不再维护了,有时候需要升级某些组件,网上却没有相关的资源了。尤其是制作OpenSSH 9.6 的RPM包,就会要求OpenSSL为1.1的版本。基于此,还是自己制作吧,以下是踩坑过程。 1、官网提供的源码包…...