软件-Airflow进阶


基本命令


用户界面也有以下相关操作按钮

  • 查看

    1
    2
    3
    4
    5
    6
    7
    8
    1、列出现有所有的活动的DAGS
    airflow list_dags

    2、列出 tutorial 的任务id
    airflow list_tasks tutorial

    3、以树形图的形式列出 tutorial 的任务id
    airflow list_tasks tutorial --tree
  • 测试

    1
    2
    1、模拟2015-06-01 执行tutorial的print_date任务
    airflow test tutorial print_date 2015-06-01
  • 回填数据

    如果希望新写的DAG执行过去一段时间的任务怎么办?
    backfill 可以执行一个时间段内应该执行的所有任务

    1
    airflow backfill tutorial -s 2018-06-01 -e 2015-08-01
  • 重建元数据库

    1
    2
    3
    airflow resetdb [-h] [-y]

    -y --yes,不经过提示确认就重置,默认为False
更多命令 官方Command Line Interface

BaseOperator

官网例子,里面的各个属性有代表什么意思?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)

t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)

templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""

t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

  1. retries (int) 重试几次才标记为失败
  2. retry_delay (timedelta) 两次重试间隔多长时间
  3. retry_exponential_backoff(bool)在重试延迟上运用算法增加等待时间
  4. max_retry_delay (timedelta) 重试之间的最大时间间隔
  5. start_time (datatime)确定第一个任务实例的execution_date,最佳做法是将start_date四舍五入到DAG的schedule_interval。
  6. end_time (datetime)如果指定,调度程序将不会超出此日期。
  7. depends_on_past (bool)设置为true时,任务实例将依次运行,同时依赖上一个任务的计划成功。允许start_date的任务实例运行。
  8. wait_for_downstream (bool)TODO
  9. dag (DAG) 任务所附的dag的引用(如果有的话)
  10. parallelism:这个参数指定了整个Airflow系统,在任何一刻能同时运行的Task Instance的数量,这个数量跟DAG无关,只跟Executor和Task有关。举个例子:如果parallelism=15, 这时你有两个DAG,A和B,如果A需要同时开跑10个Task,B也要同时开跑10个Task,两个DAG同时触发,那么这时候同时在跑的Task数量只能是15,其余的5个会等之前的Task运行完了触发,这时的状态不会显示在web上。而且在这种情况下,触发的顺序是不确定的。
  1. dag_concurrency:这个参数指定了同一个Dag Run中能同时运行的Task Instance的个数
  1. max_active_runs_per_dag:这个参数指定了同一个Dag能被同时激活的Dag Run的数量
  1. non_pooled_task_slot_count:这个参数指定了默认的Pool能同时运行的Task Instance的数量,如果你的Task没有指定Pool选项,那么这个Task就是属于这个默认的Pool的
  2. 更多配置

airflow执行组件

  • Scheduler:这个是整个Airflow的调度器,Airflow所有DAG的调度过程是由Scheduler轮询来处理的。触发条件达到后,会丢给Executor执行。
  • Executor:现在的Executor有三种:
  • SequnceExecutor:提供本地执行,并且串行执行一个DAG中的所有Task,基本上只用在初期的Airflow概念验证阶段
  • LocalExecutor:这个是比较常用的Executor,可以在本地并行执行一个DAG内的所有Task
  • CeleryExecutor:这个是在大型任务调度场景,或者是表较复杂的任务分离场景中需要用到的Executor。顾名思义,在这个Executor下,Airflow使用了Celery这个强大的Python分布式队列框架去分发任务,然后在这样的环境下,需要在执行任务的机器上启用Airflow Worker来处理队列中的请求。
  • 在一个Airflow中同时只能一个Executor启动,不能给指定的DAG指定Executor
  • Pool:这个Pool虽然不是Airflow的核心,但也跟整个Airflow的执行流程相关。任何一个Task其实都是指定了Pool这个参数的,即使没有自己指定,其实也是归结到了Default Pool这么个池子中。Pool本身是个抽象的概念,由Slot组成,可以建立任何一个Pool,指定Slot的数量。任何一个使用了这个Pool的Task Instance就需要占用一个Slot,Slot用完了,Task就处于等待状态。

配置文件

  • 配置元素优先级

    环境变量
    airflow.cfg中的配置
    airflow.cfg中的命令
    默认

数据库

  • 官方推荐 MySQL or Postgres 两种数据库
  • 本地配置好数据库后

    在airflow.cfg 中配置“executor”为“LocalExecutor”,可以在本地并行化任务实例的执行程序。

感谢

http://wingerted.com


  目录