Airflow Operator 开发

Airflow 是 Airbnb 公司开源的任务调度系统, 通过使用 Python 开发 DAG, 非常方便的调度计算任务. 介绍一下在 Airflow 提供的 Operator 不满足需求的场景下, 如何自己开发 Operator.

0x00 DAG 的最基本执行单元: Operator

在 Airflow 的一个 DAG 中, 最基本的执行单元是 Operator. 例如如下示例 DAG 中, 使用的都是 BashOperator, 执行一个 bash 脚本.

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('tutorial', default_args=default_args)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

Airflow 实现了很多 Operator(参见 Airflow 源代码), 涵盖了常用的功能, 例如执行 Hive 查询, 执行 bash 脚本等. 有几种特殊的 Operator:

  • XXXSensor 用作其他外界条件的 sensor, 实现也很简单, 在 Operator 的 execute 方法中进行 long poll, 直到 poke 方法返回 True 则完成执行.
# BaseSensorOperator 部分源码

def poke(self, context):
    '''
    Function that the sensors defined while deriving this class should
    override.
    '''
    raise AirflowException('Override me.')
def execute(self, context):
    started_at = datetime.now()
    while not self.poke(context):
        sleep(self.poke_interval)
        if (datetime.now() - started_at).seconds > self.timeout:
            raise AirflowSensorTimeout('Snap. Time is OUT.')
    logging.info("Success criteria met. Exiting.")
  • PythonOperator 用来执行 Python 函数, 这也是使用 Python 代码来定义 DAG 的好处
  • BranchPythonOperator 用来支持分支, 通过函数返回要执行的分支

Airflow Operator 相关 class 继承关系如下:

.
└── BaseOperator
    ├── BaseSensorOperator
    │   └── ...Sensor
    ├── PythonOperator
    │   ├── BranchPythonOperator
    │   └── ShortCircuitOperator
    └── ...Operator

0x01 Operator 开发

如果官方的 Operator 都不满足需求, 那么我们就要来开发一个 Operator. 开发 Operator 也很简单, 直接继承 BaseOperator并实现 execute 方法即可.

from airflow.models import BaseOperator

class DemoOperator(BaseOperator):

    def __init__(*args, **kwargs):
        super(DemoOperator, self).__init__(*args, **kwargs)
    
    def execute(self, context):
        print "hello"

除了 execute 方法必须实现外, 还有一个 hook 方法:

  • pre_execute: 在 execute 方法前调用, 实现点儿准备逻辑
  • post_execute: 在 execute 方法完成后调用, cleanup 一下
  • on_kill: 在 task 被 kill 的时候执行.

Operator 获取模板变量

Aiflow 是支持 Templating with Jinja 功能的, 具体来说就是 Operator 中支持模板语言 Jinja, 写一些 for 循环, 或者通过 {{param}} 语法替换一些变量等(例如 {{ds}} 被替换成执行任务的日期)

# 官方示例的 jinja 语句
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7) }}"
    echo "{{ params.my_param }}"
{% endfor %}

那么, 自己开发的 Operator 中如何使用这个功能呢?

其实也很简单, 在自己的 Operator 中添加属性 template_fields = (attributes_to_be_rendered_with_jinja). 在任务被执行前, Airflow 会自动渲染 template_fields 中的属性再执行任务.

# 节选自 Airflow 中 BashOperator 源码
class BashOperator(BaseOperator):
    # 这里定义需要被渲染的属性名称
    template_fields = ('bash_command', 'env')

Operator 部署

开发的 Operator 代码作为一个 Python 的 Package, 使用 distutil 打包安装到 Airflow 对应的服务器上即可.

0x02 Operator 跟其他系统交互

Airflow 考虑到为了跟外界环境隔离, 提出了 Connection 概念: 将配置信息放到 Airflow Server 本身配置, 在 DAG 中使用 connection_id 来引用. 例如, 我们配置一个 HiveServer 的 Connection, 使用 liulishuo_hiveserver1 作为 connection_id, 这样同一个 DAG 文件就可以在测试环境和生成环境调用对应环境中的 HiveServer 服务了. 总结来说, 这就是架构设计模式中的 External Configuration Store Pattern 的标准实现.

那么如果自己开发的 Operator 如何调用这些 Connection 呢? 这里 Airflow 又引入了一个 Hook 的概念:

Hooks are interfaces to external platforms and databases like Hive, S3, MySQL, Postgres, HDFS, and Pig. Hooks implement a common interface when possible, and act as a building block for operators. They also use the airflow.models.Connection model to retrieve hostnames and authentication information. Hooks keep authentication code and information out of pipelines, centralized in the metadata database.

我个人觉得这个概念的引入有点儿臃肿了, 没有任何意义. 开发者在 Airflow Web 中看到的也是 Connection 的配置, 那么开发 DAG 时第一个想到的就是去找 Connection 相关的 class, 再多一个 Hook的概念有点儿绕.

那么 Operator 中想使用对应的 Connection, 直接根据 connection_id 创建对应的 Hook 就好了(讲真, 真绕), 例如, 想使用 HiveServer2Connection, 创建一个 HiveServer2Hook 即可.

# Operator 调用 Connection 示例代码
class LiulishuoDemoOperator(BaseOperator):

    def __init__(self, hive_server2_connection_id, *args, **kwargs):
        super(LiulishuoDemoOperator, self).__init__(*args, **kwargs)
        self.hive_server2_connection_id = hive_server2_connection_id
    
    def execute(self, context):
        hive_server2 = HiveServer2Hook(self.hive_server2_connection_id)
        hive_serve2.get_records('SELECT * FROM testdb.table1 LIMIT 20')
        # ....

HiveServer2Hook 设计有还有一个贴心之处就是, 在创建 HiveServer2Hook 时根本不涉及真正连接 HiveServer2 的逻辑, 只有真正调用其get_records 等方法时才会真正去连接 HiveServer2, 这样就给单元测试 mock 带来很大的方便, 毕竟在 CI 环境中构建一个隔离的专门用于跑自己的 test-case 的 HiveServer2 也不是那么容易的.

def test_operator_with_mock(self):
    with mock.patch.object(HiveServer2Hook, 'get_records') as mock_get_records: 
        # 这里设置 mock 的返回值
        mock_get_records.return_value = [['Location: ', 's3://test-bucket/valid_table']]
        hive_server_id = 'test-hive-server'
        # 这里测试对应的 Operator 代码

0x03 总结

Airflow 通过精简的抽象, 将 DAG 开发简化到了会写 Python 基本就没问题的程度, 还是值得点赞的. 自己开发一个 Operator 也是很简单, 不过自己开发 Operator 也仅仅是技术选型的其中一个方案而已, 复杂逻辑也可以通过暴露一个 Restful API 的形式, 使用 Airflow 提供的 SimpleHttpOperator 调用对应的服务执行任务.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 195,980评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,422评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,130评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,553评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,408评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,326评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,720评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,373评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,678评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,722评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,486评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,335评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,738评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,283评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,692评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,893评论 2 335

推荐阅读更多精彩内容