Apache Airflow (九) :Airflow Operators及案例之BashOperator及调度Shell命令及脚本

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客
🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。
🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频
目录
1. BashOperator 调度Shell命令案例
2. BashOperator 调度Shell脚本案例
Airflow中最重要的还是各种Operator,其允许生成特定类型的任务,这个任务在实例化时称为DAG中的任务节点,所有的Operator均派生自BaseOparator,并且继承了许多属性和方法。关于BaseOperator的参数可以参照:
http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#module-airflow.models.baseoperator
BaseOperator中常用参数如下:
task_id(str) : 唯一task_id标记owner(str):任务的所有者,建议使用linux用户名email(str or list[str]):出问题时,发送报警Email的地址,可以填写多个,用逗号隔开。email_on_retry(bool):当任务重试时是否发送电子邮件email_on_failure(bool):当任务执行失败时是否发送电子邮件retries(int):在任务失败之前应该重试的次数retry_delay(datetime.timedelta):重试间隔,必须是timedelta对象start_date(datetime.datetime):DAG开始执行时间,这个参数必须是datetime对象,不可以使用字符串。end_date(datetime.datetime):DAG运行结束时间,任务启动后一般都会一直执行下去,一般不设置此参数。depends_on_past(bool,默认False):是否依赖于过去,如果为True,那么必须之前的DAG调度成功了,现在的DAG调度才能执行。dag(airflow.models.DAG):指定的dag。execution_timeout(datetime.timedelta):执行此任务实例允许的最长时间,超过最长时间则任务失败。trigger_rule(str):定义依赖的触发规则,包括选项如下:{ all_success | all_failed | all_done | one_success | one_failed | none_failed | none_failed_or_skipped | none_skipped | dummy(无条件执行)} default is all_success。
BashOperator主要执行bash脚本或命令,BashOperator参数如下:
bash_command(str):要执行的命令或脚本(脚本必须是.sh结尾)
1. BashOperator 调度Shell命令案例
from datetime import datetime, timedeltafrom airflow import DAG
from airflow.operators.bash import BashOperatordefault_args = {'owner':'zhangsan','start_date':datetime(2021, 9, 23),'email':'kettle_test1@163.com', #pwd:kettle123456'retries': 1, # 失败重试次数'retry_delay': timedelta(minutes=5) # 失败重试间隔
}dag = DAG(dag_id = 'execute_shell_cmd',default_args=default_args,schedule_interval=timedelta(minutes=1)
)t1=BashOperator(task_id='print_date',bash_command='date',dag = dag
)t2=BashOperator(task_id='print_helloworld',bash_command='echo "hello world!"',dag=dag
)t3=BashOperator(task_id='tempplated',bash_command="""{% for i in range(5) %}echo "{{ ds }}"echo "{{ params.name}}"echo "{{ params.age}}"{% endfor %}""",params={'name':'wangwu','age':10},dag=dag
)t1 >> t2 >> t3
注意在t3中使用了Jinja模板,“{% %}”内部是for标签,用于循环操作,但是必须以{% endfor %}结束。“{{}}”内部是变量,其中ds是执行日期,是airflow的宏变量,params.name和params.age是自定义变量。
在default_args中的email是指当DAG执行失败时,发送邮件到指定邮箱,想要使用airflow发送邮件,需要在$AIRFLOW_HOME/airflow.cfg中配置如下内容:
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = smtp.163.com
smtp_starttls = True
smtp_ssl = False
# Example: smtp_user = airflow
smtp_user =kettle_test2
# Example: smtp_password = airflow
smtp_password =VIOFSYMFDIKKIUEA
smtp_port = 25
smtp_mail_from =kettle_test2@163.com
smtp_timeout = 30
smtp_retry_limit = 5
此外,配置163邮箱时需要开启“POP3/SMTP/IMAP服务”服务,设置如下:

2. BashOperator 调度Shell脚本案例
准备如下两个shell脚本,将以下两个脚本放在$AIRFLOW_HOME/dags目录下,BashOperator默认执行脚本时,默认从/tmp/airflow**临时目录查找对应脚本,由于临时目录名称不定,这里建议执行脚本时,在“bash_command”中写上绝对路径。如果要写相对路径,可以将脚本放在/tmp目录下,在“bash_command”中执行命令写上“sh ../xxx.sh”也可以。
first_shell.sh
#!/bin/bashdt=$1echo "==== execute first shell ===="echo "---- first : time is ${dt}"
second_shell.sh
#!/bin/bashdt=$1echo "==== execute second shell ===="echo "---- second : time is ${dt}"
编写airflow python 配置:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperatordefault_args = {'owner':'zhangsan','start_date':datetime(2021, 9, 23),'retries': 1, # 失败重试次数'retry_delay': timedelta(minutes=5) # 失败重试间隔
}dag = DAG(dag_id = 'execute_shell_sh',default_args=default_args,schedule_interval=timedelta(minutes=1)
)first=BashOperator(task_id='first',#脚本路径建议写绝对路径bash_command='sh /root/airflow/dags/first_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),dag = dag
)second=BashOperator(task_id='second',#脚本路径建议写绝对路径bash_command='sh /root/airflow/dags/second_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),dag=dag
)first >> second
执行结果:


特别注意:在“bash_command”中写执行脚本时,一定要在脚本后跟上空格,有没有参数都要跟上空格,否则会找不到对应的脚本。如下:


相关文章:
Apache Airflow (九) :Airflow Operators及案例之BashOperator及调度Shell命令及脚本
🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹…...
IJ中配置TortoiseSVN插件:
文章目录 一、报错情况:二、配置TortoiseSVN插件: 一、报错情况: 由于公司电脑加密,TortoiseSVN菜单没有提交和更新按钮,所以需要使用IJ的SVN进行代码相关操作 二、配置TortoiseSVN插件: 需要设置一个svn.…...
个人实现在线支付,一种另类的在线支付解决方案
Hi, I’m Shendi 个人实现在线支付,一种另类的在线支付解决方案 个人实现在线支付的方式 对于在线支付,最多的是接入微信与支付宝。但都需要营业执照,不适用于个人。 当然,可以去办理一个个体工商户,但对我这种小额收…...
浅谈智能安全配电装置应用在银行配电系统中
【摘要】银行是国家重点安全保护部分,关系到社会资金的稳定,也是消防重点单位。消防安全是银行工作的重要组成部分。在银行配电系统中应用智能安全配电装置,可以提高银行的智能控制水平,有效预防电气火灾。 【关键词】银行&#…...
macOS下如何使用Flask进行开发
👨🏻💻 热爱摄影的程序员 👨🏻🎨 喜欢编码的设计师 🧕🏻 擅长设计的剪辑师 🧑🏻🏫 一位高冷无情的编码爱好者 大家好,我是全栈工…...
记一次服务器配置文件获取OSS
一、漏洞原因 由于网站登录口未做双因子校验,导致可以通过暴力破解获取管理员账号,成功进入系统;未对上传的格式和内容进行校验,可以任意文件上传获取服务器权限;由于服务器上配置信息,可以进一步获取数据库权限和OSS管理权限。二、漏洞成果 弱口令获取网站的管理员权限通…...
合众汽车选用风河Wind River Linux系统
导读合众新能源汽车股份有限公司近日选择了Wind River Linux 用于开发合众智能安全汽车平台。 合众智能安全汽车平台(Hozon Automo-tive Intelligent Security Vehicle Plat-form)是一个面向高性能服务网关及车辆控制调度的硬件与软件框架,将于2024年中开始投入量产…...
PTA平台-2023年软件设计综合实践_5(指针及引用)
第一题 6-1 调和平均 - C/C 指针及引用 函数hmean()用于计算整数x和y的调和平均数,结果应保存在指针r所指向的浮点数对象中。当xy等于0时,函数返回0表示无法计算,否则返回1。数学上,两个数x和y的调和平均数 z 2xy/(xy) 。 直接…...
智慧卫生间
智慧卫生间 获取ApiKey/SecretKey获取Access_token获取卫生间实时数据返回说明 获取ApiKey/SecretKey ApiKey/SecretKey采用 线下获取的方式,手动分配。 获取Access_token 向授权服务地址http://xxxxxx:12345/token(示意)发送post请求,并在data中带上…...
Cadence virtuoso drc lvs pex 无法输入
问题描述:在PEX中的PEX options中 Ground node name 无法输入内容。 在save runset的时候也出现无法输入名称的情况 解决办法: copy一个.bashrc文件到自己的工作目录下 打开.bashrc文件 在.bashrc中加一行代码:unset XMODIFIERS 在终端sour…...
反序列化漏洞(2), 分析调用链, 编写POC
反序列化漏洞(2), 反序列化调用链分析 一, 编写php漏洞脚本 http://192.168.112.200/security/unserial/ustest.php <?php class Tiger{public $string;protected $var;public function __toString(){return $this->string;}public function boss($value){eval($valu…...
Pytorch reshape用法
这里-1是指未设定行数,程序自动计算,所以这里-1表示任一正整数 example reshape(-1, 1) 表示(任意行,1列),4行4列变为16行1列reshape(1, -1) 表示(1行,任意列)…...
Latex 辅助写作工具
语法修改 https://app.grammarly.com/润色 文心一言、ChatGPTlatex 编辑公式 https://www.latexlive.comlatex 编辑表格 https://www.tablesgenerator.comlatex 图片转公式 https://www.tablesgenerator.com...
frp新版本frp_0.52.3设置
服务端 frps.toml cp /root/frp/frpc /usr/bin #bindPort 7000 bindPort 7000# 如果指定了“oidc”,将使用 OIDC 设置颁发 OIDC(开放 ID 连接)令牌。默认情况下,此值为“令牌”。auth.method “token” auth.method "…...
100G.的DDoS高防够用吗?
很多人以为100G的DDoS防御已经足够了,但殊不知DDoS攻击大小也是需要分行业类型的,比如游戏、金融、影视、电商甚至ZF或者行业龙头等等行业类型,都是大型DDoS攻击的重灾区,别说100G防御,就算300G防御服务器也不一定够用…...
【django+vue】项目搭建、解决跨域访问
笔记为自我总结整理的学习笔记,若有错误欢迎指出哟~ 【djangovue】项目搭建、解决跨域访问 djangovue介绍vue环境准备vue框架搭建1.创建vue项目2.配置vue项目3.进入项目目录4.运行项目5.项目文件讲解6.vue的扩展库或者插件 django环境准备django框架搭建1.使用conda…...
【数据库】数据库连接池导致系统吞吐量上不去-复盘
在实际的开发中,我们会使用数据库连接池,但是如果不能很好的理解其中的含义,那么就可以出现生产事故。 HikariPool-1 - Connection is not available, request timed out after 30001ms.当系统的调用量上去,就出现大量这样的连接…...
华纳云:租用的服务器连接超时怎么办?
服务器连接超时可能由多种原因引起,解决问题的方法取决于具体的情况。以下是一些常见的原因和相应的解决方法: 网络问题: 检查本地网络: 确保本地网络连接正常,尝试访问其他网站或服务,检查是否存在网络问题…...
基于MS16F3211芯片的触摸控制灯的状态变化和亮度控制(11.17,PWM)
紧接上文,基本的控制逻辑并不难写,难的是是、如何输出自己想要频率的PWM波在对应的端口 阅读文档定时器与PWM相关的寄存器,因为之前玩的STM32,所以看起来还是有点困难,准备边看边记录。 如果想要实现在长按时改变PWM…...
编译buildroot出错,这个怎么解决呢,感谢
编译buildroot出错,这个怎么解决呢,感谢 发表于 2019-5-22 20:24:25 浏览:8025 | 回复:5 打印 只看该作者 [复制链接]楼主 g++: internal compiler error: 已杀死 (program cc1plus) Please submit a full bug report, with preprocessed source if appro…...
进程地址空间(比特课总结)
一、进程地址空间 1. 环境变量 1 )⽤户级环境变量与系统级环境变量 全局属性:环境变量具有全局属性,会被⼦进程继承。例如当bash启动⼦进程时,环 境变量会⾃动传递给⼦进程。 本地变量限制:本地变量只在当前进程(ba…...
AI Agent与Agentic AI:原理、应用、挑战与未来展望
文章目录 一、引言二、AI Agent与Agentic AI的兴起2.1 技术契机与生态成熟2.2 Agent的定义与特征2.3 Agent的发展历程 三、AI Agent的核心技术栈解密3.1 感知模块代码示例:使用Python和OpenCV进行图像识别 3.2 认知与决策模块代码示例:使用OpenAI GPT-3进…...
阿里云ACP云计算备考笔记 (5)——弹性伸缩
目录 第一章 概述 第二章 弹性伸缩简介 1、弹性伸缩 2、垂直伸缩 3、优势 4、应用场景 ① 无规律的业务量波动 ② 有规律的业务量波动 ③ 无明显业务量波动 ④ 混合型业务 ⑤ 消息通知 ⑥ 生命周期挂钩 ⑦ 自定义方式 ⑧ 滚的升级 5、使用限制 第三章 主要定义 …...
汽车生产虚拟实训中的技能提升与生产优化
在制造业蓬勃发展的大背景下,虚拟教学实训宛如一颗璀璨的新星,正发挥着不可或缺且日益凸显的关键作用,源源不断地为企业的稳健前行与创新发展注入磅礴强大的动力。就以汽车制造企业这一极具代表性的行业主体为例,汽车生产线上各类…...
1.3 VSCode安装与环境配置
进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件,然后打开终端,进入下载文件夹,键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...
ESP32 I2S音频总线学习笔记(四): INMP441采集音频并实时播放
简介 前面两期文章我们介绍了I2S的读取和写入,一个是通过INMP441麦克风模块采集音频,一个是通过PCM5102A模块播放音频,那如果我们将两者结合起来,将麦克风采集到的音频通过PCM5102A播放,是不是就可以做一个扩音器了呢…...
Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信
文章目录 Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信前言一、网络通信基础概念二、服务端与客户端的完整流程图解三、每一步的详细讲解和代码示例1. 创建Socket(服务端和客户端都要)2. 绑定本地地址和端口&#x…...
短视频矩阵系统文案创作功能开发实践,定制化开发
在短视频行业迅猛发展的当下,企业和个人创作者为了扩大影响力、提升传播效果,纷纷采用短视频矩阵运营策略,同时管理多个平台、多个账号的内容发布。然而,频繁的文案创作需求让运营者疲于应对,如何高效产出高质量文案成…...
GitFlow 工作模式(详解)
今天再学项目的过程中遇到使用gitflow模式管理代码,因此进行学习并且发布关于gitflow的一些思考 Git与GitFlow模式 我们在写代码的时候通常会进行网上保存,无论是github还是gittee,都是一种基于git去保存代码的形式,这样保存代码…...
jmeter聚合报告中参数详解
sample、average、min、max、90%line、95%line,99%line、Error错误率、吞吐量Thoughput、KB/sec每秒传输的数据量 sample(样本数) 表示测试中发送的请求数量,即测试执行了多少次请求。 单位,以个或者次数表示。 示例:…...
