当前位置: 首页 > news >正文

Apache Airflow (九) :Airflow Operators及案例之BashOperator及调度Shell命令及脚本

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客

 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。

 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频


目录

1. BashOperator 调度Shell命令案例

2. BashOperator 调度Shell脚本案例


Airflow中最重要的还是各种Operator,其允许生成特定类型的任务,这个任务在实例化时称为DAG中的任务节点,所有的Operator均派生自BaseOparator,并且继承了许多属性和方法。关于BaseOperator的参数可以参照:

http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#module-airflow.models.baseoperator

BaseOperator中常用参数如下:

task_id(str) : 唯一task_id标记owner(str):任务的所有者,建议使用linux用户名email(str or list[str]):出问题时,发送报警Email的地址,可以填写多个,用逗号隔开。email_on_retry(bool):当任务重试时是否发送电子邮件email_on_failure(bool):当任务执行失败时是否发送电子邮件retries(int):在任务失败之前应该重试的次数retry_delay(datetime.timedelta):重试间隔,必须是timedelta对象start_date(datetime.datetime):DAG开始执行时间,这个参数必须是datetime对象,不可以使用字符串。end_date(datetime.datetime):DAG运行结束时间,任务启动后一般都会一直执行下去,一般不设置此参数。depends_on_past(bool,默认False):是否依赖于过去,如果为True,那么必须之前的DAG调度成功了,现在的DAG调度才能执行。dag(airflow.models.DAG):指定的dag。execution_timeout(datetime.timedelta):执行此任务实例允许的最长时间,超过最长时间则任务失败。trigger_rule(str):定义依赖的触发规则,包括选项如下:{ all_success | all_failed | all_done | one_success | one_failed | none_failed | none_failed_or_skipped | none_skipped | dummy(无条件执行)} default is all_success。

BashOperator主要执行bash脚本或命令,BashOperator参数如下:

bash_command(str):要执行的命令或脚本(脚本必须是.sh结尾)

1. BashOperator 调度Shell命令案例

from datetime import datetime, timedeltafrom airflow import DAG
from airflow.operators.bash import BashOperatordefault_args = {'owner':'zhangsan','start_date':datetime(2021, 9, 23),'email':'kettle_test1@163.com', #pwd:kettle123456'retries': 1,  # 失败重试次数'retry_delay': timedelta(minutes=5) # 失败重试间隔
}dag = DAG(dag_id = 'execute_shell_cmd',default_args=default_args,schedule_interval=timedelta(minutes=1)
)t1=BashOperator(task_id='print_date',bash_command='date',dag = dag
)t2=BashOperator(task_id='print_helloworld',bash_command='echo "hello world!"',dag=dag
)t3=BashOperator(task_id='tempplated',bash_command="""{% for i in range(5) %}echo "{{ ds }}"echo "{{ params.name}}"echo "{{ params.age}}"{% endfor %}""",params={'name':'wangwu','age':10},dag=dag
)t1 >> t2 >> t3

注意在t3中使用了Jinja模板,“{% %}”内部是for标签,用于循环操作,但是必须以{% endfor %}结束。“{{}}”内部是变量,其中ds是执行日期,是airflow的宏变量,params.name和params.age是自定义变量。

在default_args中的email是指当DAG执行失败时,发送邮件到指定邮箱,想要使用airflow发送邮件,需要在$AIRFLOW_HOME/airflow.cfg中配置如下内容:

[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = smtp.163.com
smtp_starttls = True
smtp_ssl = False
# Example: smtp_user = airflow
smtp_user =kettle_test2
# Example: smtp_password = airflow
smtp_password =VIOFSYMFDIKKIUEA
smtp_port = 25
smtp_mail_from =kettle_test2@163.com
smtp_timeout = 30
smtp_retry_limit = 5

此外,配置163邮箱时需要开启“POP3/SMTP/IMAP服务”服务,设置如下:

2. BashOperator 调度Shell脚本案例

准备如下两个shell脚本,将以下两个脚本放在$AIRFLOW_HOME/dags目录下,BashOperator默认执行脚本时,默认从/tmp/airflow**临时目录查找对应脚本,由于临时目录名称不定,这里建议执行脚本时,在“bash_command”中写上绝对路径。如果要写相对路径,可以将脚本放在/tmp目录下,在“bash_command”中执行命令写上“sh ../xxx.sh”也可以。

first_shell.sh

#!/bin/bashdt=$1echo "==== execute first shell ===="echo "---- first : time is ${dt}"

second_shell.sh

#!/bin/bashdt=$1echo "==== execute second shell ===="echo "---- second : time is ${dt}"

编写airflow python 配置:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperatordefault_args = {'owner':'zhangsan','start_date':datetime(2021, 9, 23),'retries': 1,  # 失败重试次数'retry_delay': timedelta(minutes=5) # 失败重试间隔
}dag = DAG(dag_id = 'execute_shell_sh',default_args=default_args,schedule_interval=timedelta(minutes=1)
)first=BashOperator(task_id='first',#脚本路径建议写绝对路径bash_command='sh /root/airflow/dags/first_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),dag = dag
)second=BashOperator(task_id='second',#脚本路径建议写绝对路径bash_command='sh /root/airflow/dags/second_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),dag=dag
)first >> second

执行结果:

特别注意:在“bash_command”中写执行脚本时,一定要在脚本后跟上空格,有没有参数都要跟上空格,否则会找不到对应的脚本。如下:


相关文章:

Apache Airflow (九) :Airflow Operators及案例之BashOperator及调度Shell命令及脚本

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹…...

IJ中配置TortoiseSVN插件:

文章目录 一、报错情况:二、配置TortoiseSVN插件: 一、报错情况: 由于公司电脑加密,TortoiseSVN菜单没有提交和更新按钮,所以需要使用IJ的SVN进行代码相关操作 二、配置TortoiseSVN插件: 需要设置一个svn.…...

个人实现在线支付,一种另类的在线支付解决方案

Hi, I’m Shendi 个人实现在线支付,一种另类的在线支付解决方案 个人实现在线支付的方式 对于在线支付,最多的是接入微信与支付宝。但都需要营业执照,不适用于个人。 当然,可以去办理一个个体工商户,但对我这种小额收…...

浅谈智能安全配电装置应用在银行配电系统中

【摘要】银行是国家重点安全保护部分,关系到社会资金的稳定,也是消防重点单位。消防安全是银行工作的重要组成部分。在银行配电系统中应用智能安全配电装置,可以提高银行的智能控制水平,有效预防电气火灾。 【关键词】银行&#…...

macOS下如何使用Flask进行开发

👨🏻‍💻 热爱摄影的程序员 👨🏻‍🎨 喜欢编码的设计师 🧕🏻 擅长设计的剪辑师 🧑🏻‍🏫 一位高冷无情的编码爱好者 大家好,我是全栈工…...

记一次服务器配置文件获取OSS

一、漏洞原因 由于网站登录口未做双因子校验,导致可以通过暴力破解获取管理员账号,成功进入系统;未对上传的格式和内容进行校验,可以任意文件上传获取服务器权限;由于服务器上配置信息,可以进一步获取数据库权限和OSS管理权限。二、漏洞成果 弱口令获取网站的管理员权限通…...

合众汽车选用风河Wind River Linux系统

导读合众新能源汽车股份有限公司近日选择了Wind River Linux 用于开发合众智能安全汽车平台。 合众智能安全汽车平台(Hozon Automo-tive Intelligent Security Vehicle Plat-form)是一个面向高性能服务网关及车辆控制调度的硬件与软件框架,将于2024年中开始投入量产…...

PTA平台-2023年软件设计综合实践_5(指针及引用)

第一题 6-1 调和平均 - C/C 指针及引用 函数hmean()用于计算整数x和y的调和平均数,结果应保存在指针r所指向的浮点数对象中。当xy等于0时,函数返回0表示无法计算,否则返回1。数学上,两个数x和y的调和平均数 z 2xy/(xy) 。 直接…...

智慧卫生间

智慧卫生间 获取ApiKey/SecretKey获取Access_token获取卫生间实时数据返回说明 获取ApiKey/SecretKey ApiKey/SecretKey采用 线下获取的方式,手动分配。 获取Access_token 向授权服务地址http://xxxxxx:12345/token(示意)发送post请求,并在data中带上…...

Cadence virtuoso drc lvs pex 无法输入

问题描述:在PEX中的PEX options中 Ground node name 无法输入内容。 在save runset的时候也出现无法输入名称的情况 解决办法: copy一个.bashrc文件到自己的工作目录下 打开.bashrc文件 在.bashrc中加一行代码:unset XMODIFIERS 在终端sour…...

反序列化漏洞(2), 分析调用链, 编写POC

反序列化漏洞(2), 反序列化调用链分析 一, 编写php漏洞脚本 http://192.168.112.200/security/unserial/ustest.php <?php class Tiger{public $string;protected $var;public function __toString(){return $this->string;}public function boss($value){eval($valu…...

Pytorch reshape用法

这里-1是指未设定行数&#xff0c;程序自动计算&#xff0c;所以这里-1表示任一正整数 example reshape(-1, 1) 表示&#xff08;任意行&#xff0c;1列&#xff09;&#xff0c;4行4列变为16行1列reshape(1, -1) 表示&#xff08;1行&#xff0c;任意列&#xff09;&#xf…...

Latex 辅助写作工具

语法修改 https://app.grammarly.com/润色 文心一言、ChatGPTlatex 编辑公式 https://www.latexlive.comlatex 编辑表格 https://www.tablesgenerator.comlatex 图片转公式 https://www.tablesgenerator.com...

frp新版本frp_0.52.3设置

服务端 frps.toml cp /root/frp/frpc /usr/bin #bindPort 7000 bindPort 7000# 如果指定了“oidc”&#xff0c;将使用 OIDC 设置颁发 OIDC&#xff08;开放 ID 连接&#xff09;令牌。默认情况下&#xff0c;此值为“令牌”。auth.method “token” auth.method "…...

100G.的DDoS高防够用吗?

很多人以为100G的DDoS防御已经足够了&#xff0c;但殊不知DDoS攻击大小也是需要分行业类型的&#xff0c;比如游戏、金融、影视、电商甚至ZF或者行业龙头等等行业类型&#xff0c;都是大型DDoS攻击的重灾区&#xff0c;别说100G防御&#xff0c;就算300G防御服务器也不一定够用…...

【django+vue】项目搭建、解决跨域访问

笔记为自我总结整理的学习笔记&#xff0c;若有错误欢迎指出哟~ 【djangovue】项目搭建、解决跨域访问 djangovue介绍vue环境准备vue框架搭建1.创建vue项目2.配置vue项目3.进入项目目录4.运行项目5.项目文件讲解6.vue的扩展库或者插件 django环境准备django框架搭建1.使用conda…...

【数据库】数据库连接池导致系统吞吐量上不去-复盘

在实际的开发中&#xff0c;我们会使用数据库连接池&#xff0c;但是如果不能很好的理解其中的含义&#xff0c;那么就可以出现生产事故。 HikariPool-1 - Connection is not available, request timed out after 30001ms.当系统的调用量上去&#xff0c;就出现大量这样的连接…...

华纳云:租用的服务器连接超时怎么办?

服务器连接超时可能由多种原因引起&#xff0c;解决问题的方法取决于具体的情况。以下是一些常见的原因和相应的解决方法&#xff1a; 网络问题&#xff1a; 检查本地网络&#xff1a; 确保本地网络连接正常&#xff0c;尝试访问其他网站或服务&#xff0c;检查是否存在网络问题…...

基于MS16F3211芯片的触摸控制灯的状态变化和亮度控制(11.17,PWM)

紧接上文&#xff0c;基本的控制逻辑并不难写&#xff0c;难的是是、如何输出自己想要频率的PWM波在对应的端口 阅读文档定时器与PWM相关的寄存器&#xff0c;因为之前玩的STM32&#xff0c;所以看起来还是有点困难&#xff0c;准备边看边记录。 如果想要实现在长按时改变PWM…...

编译buildroot出错,这个怎么解决呢,感谢

编译buildroot出错,这个怎么解决呢,感谢 发表于 2019-5-22 20:24:25 浏览:8025 | 回复:5 打印 只看该作者 [复制链接]楼主 g++: internal compiler error: 已杀死 (program cc1plus) Please submit a full bug report, with preprocessed source if appro…...

STM32+rt-thread判断是否联网

一、根据NETDEV_FLAG_INTERNET_UP位判断 static bool is_conncected(void) {struct netdev *dev RT_NULL;dev netdev_get_first_by_flags(NETDEV_FLAG_INTERNET_UP);if (dev RT_NULL){printf("wait netdev internet up...");return false;}else{printf("loc…...

【项目实战】通过多模态+LangGraph实现PPT生成助手

PPT自动生成系统 基于LangGraph的PPT自动生成系统&#xff0c;可以将Markdown文档自动转换为PPT演示文稿。 功能特点 Markdown解析&#xff1a;自动解析Markdown文档结构PPT模板分析&#xff1a;分析PPT模板的布局和风格智能布局决策&#xff1a;匹配内容与合适的PPT布局自动…...

JUC笔记(上)-复习 涉及死锁 volatile synchronized CAS 原子操作

一、上下文切换 即使单核CPU也可以进行多线程执行代码&#xff0c;CPU会给每个线程分配CPU时间片来实现这个机制。时间片非常短&#xff0c;所以CPU会不断地切换线程执行&#xff0c;从而让我们感觉多个线程是同时执行的。时间片一般是十几毫秒(ms)。通过时间片分配算法执行。…...

【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统

目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索&#xff08;基于物理空间 广播范围&#xff09;2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...

免费PDF转图片工具

免费PDF转图片工具 一款简单易用的PDF转图片工具&#xff0c;可以将PDF文件快速转换为高质量PNG图片。无需安装复杂的软件&#xff0c;也不需要在线上传文件&#xff0c;保护您的隐私。 工具截图 主要特点 &#x1f680; 快速转换&#xff1a;本地转换&#xff0c;无需等待上…...

Web中间件--tomcat学习

Web中间件–tomcat Java虚拟机详解 什么是JAVA虚拟机 Java虚拟机是一个抽象的计算机&#xff0c;它可以执行Java字节码。Java虚拟机是Java平台的一部分&#xff0c;Java平台由Java语言、Java API和Java虚拟机组成。Java虚拟机的主要作用是将Java字节码转换为机器代码&#x…...

比特币:固若金汤的数字堡垒与它的四道防线

第一道防线&#xff1a;机密信函——无法破解的哈希加密 将每一笔比特币交易比作一封在堡垒内部传递的机密信函。 解释“哈希”&#xff08;Hashing&#xff09;就是一种军事级的加密术&#xff08;SHA-256&#xff09;&#xff0c;能将信函内容&#xff08;交易细节&#xf…...

数据挖掘是什么?数据挖掘技术有哪些?

目录 一、数据挖掘是什么 二、常见的数据挖掘技术 1. 关联规则挖掘 2. 分类算法 3. 聚类分析 4. 回归分析 三、数据挖掘的应用领域 1. 商业领域 2. 医疗领域 3. 金融领域 4. 其他领域 四、数据挖掘面临的挑战和未来趋势 1. 面临的挑战 2. 未来趋势 五、总结 数据…...

vxe-table vue 表格复选框多选数据,实现快捷键 Shift 批量选择功能

vxe-table vue 表格复选框多选数据&#xff0c;实现快捷键 Shift 批量选择功能 查看官网&#xff1a;https://vxetable.cn 效果 代码 通过 checkbox-config.isShift 启用批量选中,启用后按住快捷键和鼠标批量选取 <template><div><vxe-grid v-bind"gri…...

Yolo11改进策略:Block改进|FCM,特征互补映射模块|AAAI 2025|即插即用

1 论文信息 FBRT-YOLO&#xff08;Faster and Better for Real-Time Aerial Image Detection&#xff09;是由北京理工大学团队提出的专用于航拍图像实时目标检测的创新框架&#xff0c;发表于AAAI 2025。论文针对航拍场景中小目标检测的核心难题展开研究&#xff0c;重点解决…...