当前位置: 首页 > news >正文

Apache Airflow (九) :Airflow Operators及案例之BashOperator及调度Shell命令及脚本

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客

 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。

 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频


目录

1. BashOperator 调度Shell命令案例

2. BashOperator 调度Shell脚本案例


Airflow中最重要的还是各种Operator,其允许生成特定类型的任务,这个任务在实例化时称为DAG中的任务节点,所有的Operator均派生自BaseOparator,并且继承了许多属性和方法。关于BaseOperator的参数可以参照:

http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#module-airflow.models.baseoperator

BaseOperator中常用参数如下:

task_id(str) : 唯一task_id标记owner(str):任务的所有者,建议使用linux用户名email(str or list[str]):出问题时,发送报警Email的地址,可以填写多个,用逗号隔开。email_on_retry(bool):当任务重试时是否发送电子邮件email_on_failure(bool):当任务执行失败时是否发送电子邮件retries(int):在任务失败之前应该重试的次数retry_delay(datetime.timedelta):重试间隔,必须是timedelta对象start_date(datetime.datetime):DAG开始执行时间,这个参数必须是datetime对象,不可以使用字符串。end_date(datetime.datetime):DAG运行结束时间,任务启动后一般都会一直执行下去,一般不设置此参数。depends_on_past(bool,默认False):是否依赖于过去,如果为True,那么必须之前的DAG调度成功了,现在的DAG调度才能执行。dag(airflow.models.DAG):指定的dag。execution_timeout(datetime.timedelta):执行此任务实例允许的最长时间,超过最长时间则任务失败。trigger_rule(str):定义依赖的触发规则,包括选项如下:{ all_success | all_failed | all_done | one_success | one_failed | none_failed | none_failed_or_skipped | none_skipped | dummy(无条件执行)} default is all_success。

BashOperator主要执行bash脚本或命令,BashOperator参数如下:

bash_command(str):要执行的命令或脚本(脚本必须是.sh结尾)

1. BashOperator 调度Shell命令案例

from datetime import datetime, timedeltafrom airflow import DAG
from airflow.operators.bash import BashOperatordefault_args = {'owner':'zhangsan','start_date':datetime(2021, 9, 23),'email':'kettle_test1@163.com', #pwd:kettle123456'retries': 1,  # 失败重试次数'retry_delay': timedelta(minutes=5) # 失败重试间隔
}dag = DAG(dag_id = 'execute_shell_cmd',default_args=default_args,schedule_interval=timedelta(minutes=1)
)t1=BashOperator(task_id='print_date',bash_command='date',dag = dag
)t2=BashOperator(task_id='print_helloworld',bash_command='echo "hello world!"',dag=dag
)t3=BashOperator(task_id='tempplated',bash_command="""{% for i in range(5) %}echo "{{ ds }}"echo "{{ params.name}}"echo "{{ params.age}}"{% endfor %}""",params={'name':'wangwu','age':10},dag=dag
)t1 >> t2 >> t3

注意在t3中使用了Jinja模板,“{% %}”内部是for标签,用于循环操作,但是必须以{% endfor %}结束。“{{}}”内部是变量,其中ds是执行日期,是airflow的宏变量,params.name和params.age是自定义变量。

在default_args中的email是指当DAG执行失败时,发送邮件到指定邮箱,想要使用airflow发送邮件,需要在$AIRFLOW_HOME/airflow.cfg中配置如下内容:

[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = smtp.163.com
smtp_starttls = True
smtp_ssl = False
# Example: smtp_user = airflow
smtp_user =kettle_test2
# Example: smtp_password = airflow
smtp_password =VIOFSYMFDIKKIUEA
smtp_port = 25
smtp_mail_from =kettle_test2@163.com
smtp_timeout = 30
smtp_retry_limit = 5

此外,配置163邮箱时需要开启“POP3/SMTP/IMAP服务”服务,设置如下:

2. BashOperator 调度Shell脚本案例

准备如下两个shell脚本,将以下两个脚本放在$AIRFLOW_HOME/dags目录下,BashOperator默认执行脚本时,默认从/tmp/airflow**临时目录查找对应脚本,由于临时目录名称不定,这里建议执行脚本时,在“bash_command”中写上绝对路径。如果要写相对路径,可以将脚本放在/tmp目录下,在“bash_command”中执行命令写上“sh ../xxx.sh”也可以。

first_shell.sh

#!/bin/bashdt=$1echo "==== execute first shell ===="echo "---- first : time is ${dt}"

second_shell.sh

#!/bin/bashdt=$1echo "==== execute second shell ===="echo "---- second : time is ${dt}"

编写airflow python 配置:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperatordefault_args = {'owner':'zhangsan','start_date':datetime(2021, 9, 23),'retries': 1,  # 失败重试次数'retry_delay': timedelta(minutes=5) # 失败重试间隔
}dag = DAG(dag_id = 'execute_shell_sh',default_args=default_args,schedule_interval=timedelta(minutes=1)
)first=BashOperator(task_id='first',#脚本路径建议写绝对路径bash_command='sh /root/airflow/dags/first_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),dag = dag
)second=BashOperator(task_id='second',#脚本路径建议写绝对路径bash_command='sh /root/airflow/dags/second_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),dag=dag
)first >> second

执行结果:

特别注意:在“bash_command”中写执行脚本时,一定要在脚本后跟上空格,有没有参数都要跟上空格,否则会找不到对应的脚本。如下:


相关文章:

Apache Airflow (九) :Airflow Operators及案例之BashOperator及调度Shell命令及脚本

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹…...

IJ中配置TortoiseSVN插件:

文章目录 一、报错情况:二、配置TortoiseSVN插件: 一、报错情况: 由于公司电脑加密,TortoiseSVN菜单没有提交和更新按钮,所以需要使用IJ的SVN进行代码相关操作 二、配置TortoiseSVN插件: 需要设置一个svn.…...

个人实现在线支付,一种另类的在线支付解决方案

Hi, I’m Shendi 个人实现在线支付,一种另类的在线支付解决方案 个人实现在线支付的方式 对于在线支付,最多的是接入微信与支付宝。但都需要营业执照,不适用于个人。 当然,可以去办理一个个体工商户,但对我这种小额收…...

浅谈智能安全配电装置应用在银行配电系统中

【摘要】银行是国家重点安全保护部分,关系到社会资金的稳定,也是消防重点单位。消防安全是银行工作的重要组成部分。在银行配电系统中应用智能安全配电装置,可以提高银行的智能控制水平,有效预防电气火灾。 【关键词】银行&#…...

macOS下如何使用Flask进行开发

👨🏻‍💻 热爱摄影的程序员 👨🏻‍🎨 喜欢编码的设计师 🧕🏻 擅长设计的剪辑师 🧑🏻‍🏫 一位高冷无情的编码爱好者 大家好,我是全栈工…...

记一次服务器配置文件获取OSS

一、漏洞原因 由于网站登录口未做双因子校验,导致可以通过暴力破解获取管理员账号,成功进入系统;未对上传的格式和内容进行校验,可以任意文件上传获取服务器权限;由于服务器上配置信息,可以进一步获取数据库权限和OSS管理权限。二、漏洞成果 弱口令获取网站的管理员权限通…...

合众汽车选用风河Wind River Linux系统

导读合众新能源汽车股份有限公司近日选择了Wind River Linux 用于开发合众智能安全汽车平台。 合众智能安全汽车平台(Hozon Automo-tive Intelligent Security Vehicle Plat-form)是一个面向高性能服务网关及车辆控制调度的硬件与软件框架,将于2024年中开始投入量产…...

PTA平台-2023年软件设计综合实践_5(指针及引用)

第一题 6-1 调和平均 - C/C 指针及引用 函数hmean()用于计算整数x和y的调和平均数,结果应保存在指针r所指向的浮点数对象中。当xy等于0时,函数返回0表示无法计算,否则返回1。数学上,两个数x和y的调和平均数 z 2xy/(xy) 。 直接…...

智慧卫生间

智慧卫生间 获取ApiKey/SecretKey获取Access_token获取卫生间实时数据返回说明 获取ApiKey/SecretKey ApiKey/SecretKey采用 线下获取的方式,手动分配。 获取Access_token 向授权服务地址http://xxxxxx:12345/token(示意)发送post请求,并在data中带上…...

Cadence virtuoso drc lvs pex 无法输入

问题描述:在PEX中的PEX options中 Ground node name 无法输入内容。 在save runset的时候也出现无法输入名称的情况 解决办法: copy一个.bashrc文件到自己的工作目录下 打开.bashrc文件 在.bashrc中加一行代码:unset XMODIFIERS 在终端sour…...

反序列化漏洞(2), 分析调用链, 编写POC

反序列化漏洞(2), 反序列化调用链分析 一, 编写php漏洞脚本 http://192.168.112.200/security/unserial/ustest.php <?php class Tiger{public $string;protected $var;public function __toString(){return $this->string;}public function boss($value){eval($valu…...

Pytorch reshape用法

这里-1是指未设定行数&#xff0c;程序自动计算&#xff0c;所以这里-1表示任一正整数 example reshape(-1, 1) 表示&#xff08;任意行&#xff0c;1列&#xff09;&#xff0c;4行4列变为16行1列reshape(1, -1) 表示&#xff08;1行&#xff0c;任意列&#xff09;&#xf…...

Latex 辅助写作工具

语法修改 https://app.grammarly.com/润色 文心一言、ChatGPTlatex 编辑公式 https://www.latexlive.comlatex 编辑表格 https://www.tablesgenerator.comlatex 图片转公式 https://www.tablesgenerator.com...

frp新版本frp_0.52.3设置

服务端 frps.toml cp /root/frp/frpc /usr/bin #bindPort 7000 bindPort 7000# 如果指定了“oidc”&#xff0c;将使用 OIDC 设置颁发 OIDC&#xff08;开放 ID 连接&#xff09;令牌。默认情况下&#xff0c;此值为“令牌”。auth.method “token” auth.method "…...

100G.的DDoS高防够用吗?

很多人以为100G的DDoS防御已经足够了&#xff0c;但殊不知DDoS攻击大小也是需要分行业类型的&#xff0c;比如游戏、金融、影视、电商甚至ZF或者行业龙头等等行业类型&#xff0c;都是大型DDoS攻击的重灾区&#xff0c;别说100G防御&#xff0c;就算300G防御服务器也不一定够用…...

【django+vue】项目搭建、解决跨域访问

笔记为自我总结整理的学习笔记&#xff0c;若有错误欢迎指出哟~ 【djangovue】项目搭建、解决跨域访问 djangovue介绍vue环境准备vue框架搭建1.创建vue项目2.配置vue项目3.进入项目目录4.运行项目5.项目文件讲解6.vue的扩展库或者插件 django环境准备django框架搭建1.使用conda…...

【数据库】数据库连接池导致系统吞吐量上不去-复盘

在实际的开发中&#xff0c;我们会使用数据库连接池&#xff0c;但是如果不能很好的理解其中的含义&#xff0c;那么就可以出现生产事故。 HikariPool-1 - Connection is not available, request timed out after 30001ms.当系统的调用量上去&#xff0c;就出现大量这样的连接…...

华纳云:租用的服务器连接超时怎么办?

服务器连接超时可能由多种原因引起&#xff0c;解决问题的方法取决于具体的情况。以下是一些常见的原因和相应的解决方法&#xff1a; 网络问题&#xff1a; 检查本地网络&#xff1a; 确保本地网络连接正常&#xff0c;尝试访问其他网站或服务&#xff0c;检查是否存在网络问题…...

基于MS16F3211芯片的触摸控制灯的状态变化和亮度控制(11.17,PWM)

紧接上文&#xff0c;基本的控制逻辑并不难写&#xff0c;难的是是、如何输出自己想要频率的PWM波在对应的端口 阅读文档定时器与PWM相关的寄存器&#xff0c;因为之前玩的STM32&#xff0c;所以看起来还是有点困难&#xff0c;准备边看边记录。 如果想要实现在长按时改变PWM…...

编译buildroot出错,这个怎么解决呢,感谢

编译buildroot出错,这个怎么解决呢,感谢 发表于 2019-5-22 20:24:25 浏览:8025 | 回复:5 打印 只看该作者 [复制链接]楼主 g++: internal compiler error: 已杀死 (program cc1plus) Please submit a full bug report, with preprocessed source if appro…...

内存分配函数malloc kmalloc vmalloc

内存分配函数malloc kmalloc vmalloc malloc实现步骤: 1)请求大小调整:首先,malloc 需要调整用户请求的大小,以适应内部数据结构(例如,可能需要存储额外的元数据)。通常,这包括对齐调整,确保分配的内存地址满足特定硬件要求(如对齐到8字节或16字节边界)。 2)空闲…...

django filter 统计数量 按属性去重

在Django中&#xff0c;如果你想要根据某个属性对查询集进行去重并统计数量&#xff0c;你可以使用values()方法配合annotate()方法来实现。这里有两种常见的方法来完成这个需求&#xff1a; 方法1&#xff1a;使用annotate()和Count 假设你有一个模型Item&#xff0c;并且你想…...

家政维修平台实战20:权限设计

目录 1 获取工人信息2 搭建工人入口3 权限判断总结 目前我们已经搭建好了基础的用户体系&#xff0c;主要是分成几个表&#xff0c;用户表我们是记录用户的基础信息&#xff0c;包括手机、昵称、头像。而工人和员工各有各的表。那么就有一个问题&#xff0c;不同的角色&#xf…...

HBuilderX安装(uni-app和小程序开发)

下载HBuilderX 访问官方网站&#xff1a;https://www.dcloud.io/hbuilderx.html 根据您的操作系统选择合适版本&#xff1a; Windows版&#xff08;推荐下载标准版&#xff09; Windows系统安装步骤 运行安装程序&#xff1a; 双击下载的.exe安装文件 如果出现安全提示&…...

MySQL用户和授权

开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务&#xff1a; test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...

中医有效性探讨

文章目录 西医是如何发展到以生物化学为药理基础的现代医学&#xff1f;传统医学奠基期&#xff08;远古 - 17 世纪&#xff09;近代医学转型期&#xff08;17 世纪 - 19 世纪末&#xff09;​现代医学成熟期&#xff08;20世纪至今&#xff09; 中医的源远流长和一脉相承远古至…...

算法岗面试经验分享-大模型篇

文章目录 A 基础语言模型A.1 TransformerA.2 Bert B 大语言模型结构B.1 GPTB.2 LLamaB.3 ChatGLMB.4 Qwen C 大语言模型微调C.1 Fine-tuningC.2 Adapter-tuningC.3 Prefix-tuningC.4 P-tuningC.5 LoRA A 基础语言模型 A.1 Transformer &#xff08;1&#xff09;资源 论文&a…...

JS设计模式(4):观察者模式

JS设计模式(4):观察者模式 一、引入 在开发中&#xff0c;我们经常会遇到这样的场景&#xff1a;一个对象的状态变化需要自动通知其他对象&#xff0c;比如&#xff1a; 电商平台中&#xff0c;商品库存变化时需要通知所有订阅该商品的用户&#xff1b;新闻网站中&#xff0…...

如何更改默认 Crontab 编辑器 ?

在 Linux 领域中&#xff0c;crontab 是您可能经常遇到的一个术语。这个实用程序在类 unix 操作系统上可用&#xff0c;用于调度在预定义时间和间隔自动执行的任务。这对管理员和高级用户非常有益&#xff0c;允许他们自动执行各种系统任务。 编辑 Crontab 文件通常使用文本编…...

C++实现分布式网络通信框架RPC(2)——rpc发布端

有了上篇文章的项目的基本知识的了解&#xff0c;现在我们就开始构建项目。 目录 一、构建工程目录 二、本地服务发布成RPC服务 2.1理解RPC发布 2.2实现 三、Mprpc框架的基础类设计 3.1框架的初始化类 MprpcApplication 代码实现 3.2读取配置文件类 MprpcConfig 代码实现…...