软件-Airflow进阶


基本命令


用户界面也有以下相关操作按钮

  • 查看

    1、列出现有所有的活动的DAGS
      airflow list_dags
    2、列出 tutorial 的任务id
      airflow list_tasks tutorial
    3、以树形图的形式列出 tutorial 的任务id
      airflow list_tasks tutorial --tree
  • 测试

    1、模拟2015-06-01 执行tutorial的print_date任务
      airflow test tutorial print_date 2015-06-01
  • 回填数据

    如果希望新写的DAG执行过去一段时间的任务怎么办?
    backfill 可以执行一个时间段内应该执行的所有任务

    airflow backfill tutorial -s 2018-06-01 -e 2015-08-01
  • 重建元数据库

    airflow resetdb [-h] [-y]
    -y --yes,不经过提示确认就重置,默认为False
更多命令 官方Command Line Interface

BaseOperator

官网例子,里面的各个属性有代表什么意思?

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}
dag = DAG('tutorial', default_args=default_args)
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)
t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)
templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
  1. retries (int) 重试几次才标记为失败
  2. retry_delay (timedelta) 两次重试间隔多长时间
  3. retry_exponential_backoff(bool)在重试延迟上运用算法增加等待时间
  4. max_retry_delay (timedelta) 重试之间的最大时间间隔
  5. start_time (datatime)确定第一个任务实例的execution_date,最佳做法是将start_date四舍五入到DAG的schedule_interval。
  6. end_time (datetime)如果指定,调度程序将不会超出此日期。
  7. depends_on_past (bool)设置为true时,任务实例将依次运行,同时依赖上一个任务的计划成功。允许start_date的任务实例运行。
  8. wait_for_downstream (bool)TODO
  9. dag (DAG) 任务所附的dag的引用(如果有的话)
  10. parallelism:这个参数指定了整个Airflow系统,在任何一刻能同时运行的Task Instance的数量,这个数量跟DAG无关,只跟Executor和Task有关。举个例子:如果parallelism=15, 这时你有两个DAG,A和B,如果A需要同时开跑10个Task,B也要同时开跑10个Task,两个DAG同时触发,那么这时候同时在跑的Task数量只能是15,其余的5个会等之前的Task运行完了触发,这时的状态不会显示在web上。而且在这种情况下,触发的顺序是不确定的。
  1. dag_concurrency:这个参数指定了同一个Dag Run中能同时运行的Task Instance的个数
  1. max_active_runs_per_dag:这个参数指定了同一个Dag能被同时激活的Dag Run的数量
  1. non_pooled_task_slot_count:这个参数指定了默认的Pool能同时运行的Task Instance的数量,如果你的Task没有指定Pool选项,那么这个Task就是属于这个默认的Pool的
  2. 更多配置

airflow执行组件

  • Scheduler:这个是整个Airflow的调度器,Airflow所有DAG的调度过程是由Scheduler轮询来处理的。触发条件达到后,会丢给Executor执行。
  • Executor:现在的Executor有三种:
  • SequnceExecutor:提供本地执行,并且串行执行一个DAG中的所有Task,基本上只用在初期的Airflow概念验证阶段
  • LocalExecutor:这个是比较常用的Executor,可以在本地并行执行一个DAG内的所有Task
  • CeleryExecutor:这个是在大型任务调度场景,或者是表较复杂的任务分离场景中需要用到的Executor。顾名思义,在这个Executor下,Airflow使用了Celery这个强大的Python分布式队列框架去分发任务,然后在这样的环境下,需要在执行任务的机器上启用Airflow Worker来处理队列中的请求。
  • 在一个Airflow中同时只能一个Executor启动,不能给指定的DAG指定Executor
  • Pool:这个Pool虽然不是Airflow的核心,但也跟整个Airflow的执行流程相关。任何一个Task其实都是指定了Pool这个参数的,即使没有自己指定,其实也是归结到了Default Pool这么个池子中。Pool本身是个抽象的概念,由Slot组成,可以建立任何一个Pool,指定Slot的数量。任何一个使用了这个Pool的Task Instance就需要占用一个Slot,Slot用完了,Task就处于等待状态。

配置文件

  • 配置元素优先级

    环境变量
    airflow.cfg中的配置
    airflow.cfg中的命令
    默认

数据库

  • 官方推荐 MySQL or Postgres 两种数据库
  • 本地配置好数据库后

    在airflow.cfg 中配置“executor”为“LocalExecutor”,可以在本地并行化任务实例的执行程序。

感谢

http://wingerted.com


评论
  目录