Airflow中的DAG使用比较多的是就是各种语言的算子。比如BashOperator、PythonOperator、DummyOperator等等。
1.空算子
一般还有空的算子,不做任何操作,一般开始和结束使用
@一般还有空的算子,不做任何操作,一般开始和结束使用
run_this_first = DummyOperator(
task_id='run_this_first ',
dag=dag,
)
run_this_last = DummyOperator(
task_id='run_this_last ',
dag=dag,
)
2.运行shell脚本的bash_Operator
templated_command :针对bash操作,用户可以动态生成脚本命令
bash_command:需要执行的命令,如果这个地方写的是shell脚本路径,一定要在脚本名称后边加空格
2.1 运行shell脚本的bash_Operator
from airflow.operators.bash_operator import BashOperator
@针对bash操作,用户可以动态生成脚本命令
templated_command = """
{ % f or i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{ % e ndfor %}
"""
t3 = BashOperator(
task_id='templated',
@需要执行的命令,如果这个地方写的是shell脚本路径,一定要在脚本名称后边加空格
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
2.2 类似于if else 的判断的branch_Operator
from airflow.operators.python_operator import BranchPythonOperator
def compare(**kwargs){
@上从一个task任务接收到一个变量值result,如果result=1,就返回insert_oracle,执行task_id为insert_oracle的task
result =kwargs['t1'].xcom.pll(key='pushkey',task_ids='query_record')
if result ==1:
return 'isnert_oracle'
else:
return 'end'
}
branch_task=BranchPythonOperator(
task_id='branch_task_id',
python_callable=compare,
provide_context=true,
dag=dag)
)
2.3 执行hive的算子HiveOperator
实例一:
注意事项:在执行hive的时候,一定需要配置hive连接,同时hive_cli_conn_id的值去配置hive连接 中Conn id的值
from airflow.operators.hive_operator import HiveOperator
hive_task=HiveOperator(
task_id='hivetest',
hql=hivesql,
hive_cli_conn_id='hivetest',
dag=dag)
)
实例二:
TMP_XT_LOST=HiveOperator
(taskid='TMP_ALL_CUST_DAY_1',hiveconfs=hiveconfs,
hql=TMP_ALL_CUST_DAY_1_SQL,
dag=dag)
一个DAG.
task_id
:任务名称,有一个TMP_ALL_CUST_DAY_1的任务名称。
hiveconfs
:一个变量,在代码里面就表示一个日期。
hql
:执行的一段sql语句,TMP_ALL_CUST_DAY_1_SQL这个的sql语句(一般是如果存在表A删掉表A,重新创建表A,然后做对应的数据业务逻辑)
2.4 查询mysql或oracle或hive,可以使用hook来执行
查询mysql或oracle或hive,可以使用hook来执行
from airflow.operators.mysql_hook import MysqlHook
from airflow.operators.oracle_hook import OracleHook
sql ="""INSERT INTO random_table(onwer) VALUES (%(owner)s)"""
def mysql_hook(**kwargs):
mysql_hook = MySqlHook().get_hook(mysql_conn_id=mysql_default)
mysql_hook.run(sql=sql, autocommit=True, parameters={'owner':'abx'})
mysqlhookTask=PythonOperator(
task_id='mysqlhookTask_id',
python_callable=mysql_hook,
provide_context=true,
dag=dag)
)
sql ="""INSERT INTO random_table(onwer) VALUES (:phone)"""
def oracle_hook(**kwargs):
oracle_hook = OracleHook('oracle_sms')
oracle_hook.run(sql=sql, autocommit=True, parameters={'phone':'abx'})
oraclehookTask=PythonOperator(
task_id='oraclehookTask_id',
python_callable=oracle_hook,
provide_context=true,
dag=dag)
)