大数据调度平台 Airflow(六):Airflow Operators 及案例
wxin55 2024-11-10 12:17 8 浏览 0 评论
Airflow Operators及案例
Airflow中最重要的还是各种Operator,其允许生成特定类型的任务,这个任务在实例化时称为DAG中的任务节点,所有的Operator均派生自BaseOparator,并且继承了许多属性和方法。关于BaseOperator的参数可以参照:
http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#module-airflow.models.baseoperator
BaseOperator中常用参数如下:
task_id(str) : 唯一task_id标记
owner(str):任务的所有者,建议使用linux用户名
email(str or liststr):出问题时,发送报警Email的地址,可以填写多个,用逗号隔开。
email_on_retry(bool):当任务重试时是否发送电子邮件
email_on_failure(bool):当任务执行失败时是否发送电子邮件
retries(int):在任务失败之前应该重试的次数
retry_delay(datetime.timedelta):重试间隔,必须是timedelta对象
start_date(datetime.datetime):DAG开始执行时间,这个参数必须是datetime对象,不可以使用字符串。
end_date(datetime.datetime):DAG运行结束时间,任务启动后一般都会一直执行下去,一般不设置此参数。
depends_on_past(bool,默认False):是否依赖于过去,如果为True,那么必须之前的DAG调度成功了,现在的DAG调度才能执行。
dag(airflow.models.DAG):指定的dag。
execution_timeout(datetime.timedelta):执行此任务实例允许的最长时间,超过最长时间则任务失败。
trigger_rule(str):定义依赖的触发规则,包括选项如下:{ all_success | all_failed | all_done | one_success | one_failed | none_failed | none_failed_or_skipped | none_skipped | dummy(无条件执行)} default is all_success。
一、BashOperator及调度Shell命令及脚本
BashOperator主要执行bash脚本或命令,BashOperator参数如下:
bash_command(str):要执行的命令或脚本(脚本必须是.sh结尾)
- BashOperator 调度Shell命令案例
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {
'owner':'zhangsan',
'start_date':datetime(2021, 9, 23),
'email':'kettle_test1@163.com', #pwd:kettle123456
'retries': 1, # 失败重试次数
'retry_delay': timedelta(minutes=5) # 失败重试间隔
}
dag = DAG(
dag_id = 'execute_shell_cmd',
default_args=default_args,
schedule_interval=timedelta(minutes=1)
)
t1=BashOperator(
task_id='print_date',
bash_command='date',
dag = dag
)
t2=BashOperator(
task_id='print_helloworld',
bash_command='echo "hello world!"',
dag=dag
)
t3=BashOperator(
task_id='tempplated',
bash_command="""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ params.name}}"
echo "{{ params.age}}"
{% endfor %}
""",
params={'name':'wangwu','age':10},
dag=dag
)
t1 >> t2 >> t3
注意在t3中使用了Jinja模板,“{% %}”内部是for标签,用于循环操作,但是必须以{% endfor %}结束。“{{}}”内部是变量,其中ds是执行日期,是airflow的宏变量,params.name和params.age是自定义变量。
在default_args中的email是指当DAG执行失败时,发送邮件到指定邮箱,想要使用airflow发送邮件,需要在$AIRFLOW_HOME/airflow.cfg中配置如下内容:
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = smtp.163.com
smtp_starttls = True
smtp_ssl = False
# Example: smtp_user = airflow
smtp_user =kettle_test2
# Example: smtp_password = airflow
smtp_password =VIOFSYMFDIKKIUEA
smtp_port = 25
smtp_mail_from =kettle_test2@163.com
smtp_timeout = 30
smtp_retry_limit = 5
此外,关于邮箱的信息如下:
邮箱1:kettle_test1@163.com password:kettle123456
邮箱2:kettle_test2@163.com password:kettle123456
163邮箱SMTP服务器地址: smtp.163.com 端口:25
配置163邮箱时需要开启“POP3/SMTP/IMAP服务”服务,设置如下:
kettle_test1@163.com FECJJVEPGPTZJYMQ
kettle_test2@163.com VIOFSYMFDIKKIUEA
- BashOperator 调度Shell脚本案例
准备如下两个shell脚本,将以下两个脚本放在$AIRFLOW_HOME/dags目录下,
BashOperator默认执行脚本时,默认从/tmp/airflow**临时目录查找对应脚本,由于临时目录名称不定,这里建议执行脚本时,在“bash_command”中写上绝对路径。如果要写相对路径,可以将脚本放在/tmp目录下,在“bash_command”中执行命令写上“sh ../xxx.sh”也可以。
first_shell.sh
#!/bin/bash
dt=$1
echo "==== execute first shell ===="
echo "---- first : time is ${dt}"
second_shell.sh
#!/bin/bash
dt=$1
echo "==== execute second shell ===="
echo "---- second : time is ${dt}"
编写airflow python 配置:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {
'owner':'zhangsan',
'start_date':datetime(2021, 9, 23),
'retries': 1, # 失败重试次数
'retry_delay': timedelta(minutes=5) # 失败重试间隔
}
dag = DAG(
dag_id = 'execute_shell_sh',
default_args=default_args,
schedule_interval=timedelta(minutes=1)
)
first=BashOperator(
task_id='first',
#脚本路径建议写绝对路径
bash_command='sh /root/airflow/dags/first_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),
dag = dag
)
second=BashOperator(
task_id='second',
#脚本路径建议写绝对路径
bash_command='sh /root/airflow/dags/second_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),
dag=dag
)
first >> second
执行结果:
特别注意:在“bash_command”中写执行脚本时,一定要在脚本后跟上空格,有没有参数都要跟上空格,否则会找不到对应的脚本。如下:
二、SSHOperator及调度远程Shell脚本
在实际的调度任务中,任务脚本大多分布在不同的机器上,我们可以使用SSHOperator来调用远程机器上的脚本任务。SSHOperator使用ssh协议与远程主机通信,需要注意的是SSHOperator调用脚本时并不会读取用户的配置文件,最好在脚本中加入以下代码以便脚本被调用时会自动读取当前用户的配置信息:
#Ubunto系统
. ~/.profile
#CentoOS或者RedHat系统
. ~/.bashrc
关于SSHOperator参数详解可以参照:
airflow.providers.ssh.operators.ssh — apache-airflow-providers-ssh Documentation
SSHOperator的常用参数如下:
ssh_conn_id(str):ssh连接id,名称自取,需要在airflow webserver界面配置,具体配置参照案例。
remote_host(str):远程连接节点host,如果配置,可替换ssh_conn_id中配置的远程host,可选。
command(str):在远程主机上执行的命令或脚本。
- SSHOperator调度远程节点脚本案例
按照如下步骤来使用SSHOperator调度远程节点脚本:
1、安装“apache-airflow-providers-ssh ”provider package
首先停止airflow webserver与scheduler,在node4节点切换到python37环境,安装ssh Connection包。另外,关于Providers package安装方式可以参照如下官网地址:
https://airflow.apache.org/docs/apache-airflow-providers/packages-ref.html#apache-airflow-providers-ssh
#切换Python37环境
[root@node4 ~]# conda activate python37
#安装ssh provider package
(python37) [root@node4 ~]# pip install apache-airflow-providers-ssh==2.1.1
#启动airflow
(python37) [root@node4 ~]# airflow webserver --port 8080
(python37) [root@node4 ~]# airflow scheduler
2、配置SSH Connection连接
登录airflow webui ,选择“Admin”->“Connections”:
点击“+”添加连接,这里host连接的是node5节点:
3、准备远程执行脚本
在node5节点/root路径下创建first_shell.sh,内容如下:
#!/bin/bash
echo "==== execute first shell ===="
在node3节点/root路径下创建second_shell.sh,内容如下:
#!/bin/bash
echo "==== execute second shell ===="
4、编写DAG python配置文件
注意在本地开发工具编写python配置时,需要用到SSHOperator,需要在本地对应的python环境中安装对应的provider package。
C:\Users\wubai>d:
D:\>cd d:\ProgramData\Anaconda3\envs\python37\Scripts
d:\ProgramData\Anaconda3\envs\python37\Scripts>pip install apache-airflow-providers-ssh==2.1.1
python配置文件:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.ssh.operators.ssh import SSHOperator
default_args = {
'owner':'lisi',
'start_date':datetime(2021, 9, 23),
'retries': 1, # 失败重试次数
'retry_delay': timedelta(minutes=5) # 失败重试间隔
}
dag = DAG(
dag_id = 'execute_remote_shell',
default_args=default_args,
schedule_interval=timedelta(minutes=1)
)
first=SSHOperator(
task_id='first',
ssh_conn_id='ssh-node5',# 配置在Airflow webui Connection中配置的SSH Conn id
command='sh /root/first_shell.sh ',
dag = dag
)
second=SSHOperator(
task_id='second',
ssh_conn_id='ssh-node5',# 配置在Airflow webui Connection中配置的SSH Conn id
command='sh /root/second_shell.sh ',
remote_host="192.168.179.6",#如果配置remote_host ,将会替换Connection中的SSH 配置的host
dag=dag
)
first >> second
5、调度python配置脚本
将以上配置好的python文件上传至node4节点$AIRFLOW_HOME/dags下,重启Airflow websever与scheduler,登录webui,开启调度:
调度结果如下:
三、HiveOperator及调度HQL
可以通过HiveOperator直接操作Hive SQL ,HiveOperator的参数如下:
hql(str):需要执行的Hive SQL。
hive_cli_conn_id(str):连接Hive的conn_id,在airflow webui connection中配置的。
想要在airflow中使用HiveOperator调用Hive任务,首先需要安装以下依赖并配置Hive Metastore:
#切换Python37环境
[root@node4 ~]# conda activate python37
#安装hive provider package
(python37) [root@node4 ~]# pip install apache-airflow-providers-apache-hive==2.0.2
#启动airflow
(python37) [root@node4 ~]# airflow webserver --port 8080
(python37) [root@node4 ~]# airflow scheduler
登录Airflow webui并设置Hive Metastore,登录后找到”Admin”->”Connections”,点击“+”新增配置:
- HiveOperator调度HQL案例
1、启动Hive,准备表
启动HDFS、Hive Metastore,在Hive中创建以下三张表:
create table person_info(id int,name string,age int) row format delimited fields terminated by '\t';
create table score_info(id int,name string,score int) row format delimited fields terminated by '\t';
向表 person_info加载如下数据:
1 zs 18
2 ls 19
3 ww 20
向表score_info加载如下数据:
1 zs 100
2 ls 200
3 ww 300
2、在node4节点配置Hive 客户端
由于Airflow 使用HiveOperator时需要在Airflow安装节点上有Hive客户端,所以需要在node4节点上配置Hive客户端。
将Hive安装包上传至node4 “/software”下解压,并配置Hive环境变量
#在/etc/profile文件最后配置Hive环境变量
export HIVE_HOME=/software/hive-1.2.1
export PATH=$PATH:$HIVE_HOME/bin
#使环境变量生效
source /etc/profile
修改HIVE_HOME/conf/hive-site.xml ,写入如下内容:
<configuration>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<property>
<name>hive.metastore.local</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://node1:9083</value>
</property>
</configuration>
3、编写DAG python配置文件
注意在本地开发工具编写python配置时,需要用到HiveOperator,需要在本地对应的python环境中安装对应的provider package。
C:\Users\wubai>d:
D:\>cd d:\ProgramData\Anaconda3\envs\python37\Scripts
d:\ProgramData\Anaconda3\envs\python37\Scripts>pip install apache-airflow-providers-apache-hive==2.0.2
注意:这里本地安装也有可能缺少对应的C++环境,我们也可以不安装,直接跳过也可以。
Python配置文件:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.hive.operators.hive import HiveOperator
default_args = {
'owner':'wangwu',
'start_date':datetime(2021, 9, 23),
'retries': 1, # 失败重试次数
'retry_delay': timedelta(minutes=5) # 失败重试间隔
}
dag = DAG(
dag_id = 'execute_hive_sql',
default_args=default_args,
schedule_interval=timedelta(minutes=1)
)
first=HiveOperator(
task_id='person_info',
hive_cli_conn_id="node1-hive-metastore",
hql='select id,name,age from person_info',
dag = dag
)
second=HiveOperator(
task_id='score_info',
hive_cli_conn_id="node1-hive-metastore",
hql='select id,name,score from score_info',
dag=dag
)
third=HiveOperator(
task_id='join_info',
hive_cli_conn_id="node1-hive-metastore",
hql='select a.id,a.name,a.age,b.score from person_info a join score_info b on a.id = b.id',
dag=dag
)
first >> second >>third
4、调度python配置脚本
将以上配置好的python文件上传至node4节点$AIRFLOW_HOME/dags下,重启Airflow websever与scheduler,登录webui,开启调度:
调度结果如下:
四、PythonOperator
PythonOperator可以调用Python函数,由于Python基本可以调用任何类型的任务,如果实在找不到合适的Operator,将任务转为Python函数,使用PythonOperator即可。
关于PythonOperator常用参数如下,更多参数可以查看官网:airflow.operators.python — Airflow Documentation
python_callable(python callable):调用的python函数
op_kwargs(dict):调用python函数对应的 **args 参数,dict格式,使用参照案例。
op_args(list):调用python函数对应的 *args 参数,多个封装到一个tuple中,list格式,使用参照案例。
- PythonOperator调度案例
import random
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
# python中 * 关键字参数允许你传入0个或任意个参数,这些可变参数在函数调用时自动组装为一个tuple。
# python中 ** 关键字参数允许你传入0个或任意个含参数名的参数,这些关键字参数在函数内部自动组装为一个dict。
def print__hello1(*a,**b):
print(a)
print(b)
print("hello airflow1")
# 返回的值只会打印到日志中
return{"sss1":"xxx1"}
def print__hello2(random_base):
print(random_base)
print("hello airflow2")
# 返回的值只会打印到日志中
return{"sss2":"xxx2"}
default_args = {
'owner':'maliu',
'start_date':datetime(2021, 10, 1),
'retries': 1, # 失败重试次数
'retry_delay': timedelta(minutes=5) # 失败重试间隔
}
dag = DAG(
dag_id = 'execute_pythoncode',
default_args=default_args,
schedule_interval=timedelta(minutes=1)
)
first=PythonOperator(
task_id='first',
#填写 print__hello1 方法时,不要加上“()”
python_callable=print__hello1,
# op_args 对应 print_hello1 方法中的a参数
op_args=[1,2,3,"hello","world"],
# op_kwargs 对应 print__hello1 方法中的b参数
op_kwargs={"id":"1","name":"zs","age":18},
dag = dag
)
second=PythonOperator(
task_id='second',
#填写 print__hello2 方法时,不要加上“()”
python_callable=print__hello2,
# random_base 参数对应 print_hello2 方法中参数“random_base”
op_kwargs={"random_base":random.randint(0,9)},
dag=dag
)
first >> second
相关推荐
- ES6中 Promise的使用场景?(es6promise用法例子)
-
一、介绍Promise,译为承诺,是异步编程的一种解决方案,比传统的解决方案(回调函数)更加合理和更加强大在以往我们如果处理多层异步操作,我们往往会像下面那样编写我们的代码doSomething(f...
- JavaScript 对 Promise 并发的处理方法
-
Promise对象代表一个未来的值,它有三种状态:pending待定,这是Promise的初始状态,它可能成功,也可能失败,前途未卜fulfilled已完成,这是一种成功的状态,此时可以获取...
- Promise的九大方法(promise的实例方法)
-
1、promise.resolv静态方法Promise.resolve(value)可以认为是newPromise方法的语法糖,比如Promise.resolve(42)可以认为是以下代码的语...
- 360前端一面~面试题解析(360前端开发面试题)
-
1.组件库按需加载怎么做的,具体打包配了什么-按需加载实现:借助打包工具(如Webpack的require.context或ES模块动态导入),在使用组件时才引入对应的代码。例如在V...
- 前端面试-Promise 的 finally 怎么实现的?如何在工作中使用?
-
Promise的finally方法是一个非常有用的工具,它无论Promise是成功(fulfilled)还是失败(rejected)都会执行,且不改变Promise的最终结果。它的实现原...
- 最简单手写Promise,30行代码理解Promise核心原理和发布订阅模式
-
看了全网手写Promise的,大部分对于新手还是比较难理解的,其中几个比较难的点:状态还未改变时通过发布订阅模式去收集事件实例化的时候通过调用构造函数里传出来的方法去修改类里面的状态,这个叫Re...
- 前端分享-Promise可以中途取消啦(promise可以取消吗)
-
传统Promise就像一台需要手动组装的设备,每次使用都要重新接线。而Promise.withResolvers的出现,相当于给开发者发了一个智能遥控器,可以随时随地控制异步操作。它解决了三大...
- 手写 Promise(手写输入法 中文)
-
前言都2020年了,Promise大家肯定都在用了,但是估计很多人对其原理还是一知半解,今天就让我们一起实现一个符合PromiseA+规范的Promise。附PromiseA+规范地址...
- 什么是 Promise.allSettled()!新手老手都要会?
-
Promise.allSettled()方法返回一个在所有给定的promise都已经fulfilled或rejected后的promise,并带有一个对象数组,每个对象表示对应的pr...
- 前端面试-关于Promise解析与高频面试题示范
-
Promise是啥,直接上图:Promise就是处理异步函数的API,它可以包裹一个异步函数,在异步函数完成时抛出完成状态,让代码结束远古时无限回掉的窘境。配合async/await语法糖,可...
- 宇宙厂:为什么前端离不开 Promise.withResolvers() ?
-
大家好,很高兴又见面了,我是"高级前端进阶",由我带着大家一起关注前端前沿、深入前端底层技术,大家一起进步,也欢迎大家关注、点赞、收藏、转发。1.为什么需要Promise.with...
- Promise 新增了一个超实用的 API!
-
在JavaScript的世界里,Promise一直是处理异步操作的神器。而现在,随着ES2025的发布,Promise又迎来了一个超实用的新成员——Promise.try()!这个新方法简...
- 一次搞懂 Promise 异步处理(promise 异步顺序执行)
-
PromisePromise就像这个词的表面意识一样,表示一种承诺、许诺,会在后面给出一个结果,成功或者失败。现在已经成为了主流的异步编程的操作方式,写进了标准里面。状态Promise有且仅有...
- Promise 核心机制详解(promise机制的实现原理)
-
一、Promise的核心状态机Promise本质上是一个状态机,其行为由内部状态严格管控。每个Promise实例在创建时处于Pending(等待)状态,此时异步操作尚未完成。当异步操作成功...
- javascript——Promise(js实现promise)
-
1.PromiseES6开始支持,Promise对象用于一个异步操作的最终完成(包括成功和失败)及结果值的表示。简单说就是处理异步请求的。之所以叫Promise,就是我承诺,如果成功则怎么处理,失败怎...
你 发表评论:
欢迎- 一周热门
- 最近发表
-
- ES6中 Promise的使用场景?(es6promise用法例子)
- JavaScript 对 Promise 并发的处理方法
- Promise的九大方法(promise的实例方法)
- 360前端一面~面试题解析(360前端开发面试题)
- 前端面试-Promise 的 finally 怎么实现的?如何在工作中使用?
- 最简单手写Promise,30行代码理解Promise核心原理和发布订阅模式
- 前端分享-Promise可以中途取消啦(promise可以取消吗)
- 手写 Promise(手写输入法 中文)
- 什么是 Promise.allSettled()!新手老手都要会?
- 前端面试-关于Promise解析与高频面试题示范
- 标签列表
-
- hive行转列函数 (63)
- sourcemap文件是什么 (54)
- display none 隐藏后怎么显示 (56)
- 共享锁和排他锁的区别 (51)
- httpservletrequest 获取参数 (64)
- jstl包 (64)
- qsharedmemory (50)
- watch computed (53)
- java中switch (68)
- date.now (55)
- git-bash (56)
- 盒子垂直居中 (68)
- npm是什么命令 (62)
- python中+=代表什么 (70)
- fsimage (51)
- nginx break (61)
- mysql分区表的优缺点 (53)
- centos7切换到图形界面 (55)
- 前端深拷贝 (62)
- kmp模式匹配算法 (57)
- jsjson字符串转json对象 (53)
- jdbc connection (61)
- javascript字符串转换为数字 (54)
- mybatis 使用 (73)
- 安装mysql数据库 (55)