基本命令
用户界面也有以下相关操作按钮
查看
1、列出现有所有的活动的DAGS airflow list_dags 2、列出 tutorial 的任务id airflow list_tasks tutorial 3、以树形图的形式列出 tutorial 的任务id airflow list_tasks tutorial --tree
测试
1、模拟2015-06-01 执行tutorial的print_date任务 airflow test tutorial print_date 2015-06-01
回填数据
如果希望新写的DAG执行过去一段时间的任务怎么办?
backfill 可以执行一个时间段内应该执行的所有任务airflow backfill tutorial -s 2018-06-01 -e 2015-08-01
重建元数据库
airflow resetdb [-h] [-y] -y --yes,不经过提示确认就重置,默认为False
更多命令 官方Command Line Interface
BaseOperator
官网例子,里面的各个属性有代表什么意思?
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('tutorial', default_args=default_args)
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
- retries (int) 重试几次才标记为失败
- retry_delay (timedelta) 两次重试间隔多长时间
- retry_exponential_backoff(bool)在重试延迟上运用算法增加等待时间
- max_retry_delay (timedelta) 重试之间的最大时间间隔
- start_time (datatime)确定第一个任务实例的execution_date,最佳做法是将start_date四舍五入到DAG的schedule_interval。
- end_time (datetime)如果指定,调度程序将不会超出此日期。
- depends_on_past (bool)设置为true时,任务实例将依次运行,同时依赖上一个任务的计划成功。允许start_date的任务实例运行。
- wait_for_downstream (bool)TODO
- dag (DAG) 任务所附的dag的引用(如果有的话)
- parallelism:这个参数指定了整个Airflow系统,在任何一刻能同时运行的Task Instance的数量,这个数量跟DAG无关,只跟Executor和Task有关。举个例子:如果parallelism=15, 这时你有两个DAG,A和B,如果A需要同时开跑10个Task,B也要同时开跑10个Task,两个DAG同时触发,那么这时候同时在跑的Task数量只能是15,其余的5个会等之前的Task运行完了触发,这时的状态不会显示在web上。而且在这种情况下,触发的顺序是不确定的。
- dag_concurrency:这个参数指定了同一个Dag Run中能同时运行的Task Instance的个数
- max_active_runs_per_dag:这个参数指定了同一个Dag能被同时激活的Dag Run的数量
- non_pooled_task_slot_count:这个参数指定了默认的Pool能同时运行的Task Instance的数量,如果你的Task没有指定Pool选项,那么这个Task就是属于这个默认的Pool的
- 更多配置
airflow执行组件
- Scheduler:这个是整个Airflow的调度器,Airflow所有DAG的调度过程是由Scheduler轮询来处理的。触发条件达到后,会丢给Executor执行。
- Executor:现在的Executor有三种:
- SequnceExecutor:提供本地执行,并且串行执行一个DAG中的所有Task,基本上只用在初期的Airflow概念验证阶段
- LocalExecutor:这个是比较常用的Executor,可以在本地并行执行一个DAG内的所有Task
- CeleryExecutor:这个是在大型任务调度场景,或者是表较复杂的任务分离场景中需要用到的Executor。顾名思义,在这个Executor下,Airflow使用了Celery这个强大的Python分布式队列框架去分发任务,然后在这样的环境下,需要在执行任务的机器上启用Airflow Worker来处理队列中的请求。
- 在一个Airflow中同时只能一个Executor启动,不能给指定的DAG指定Executor
- Pool:这个Pool虽然不是Airflow的核心,但也跟整个Airflow的执行流程相关。任何一个Task其实都是指定了Pool这个参数的,即使没有自己指定,其实也是归结到了Default Pool这么个池子中。Pool本身是个抽象的概念,由Slot组成,可以建立任何一个Pool,指定Slot的数量。任何一个使用了这个Pool的Task Instance就需要占用一个Slot,Slot用完了,Task就处于等待状态。
配置文件
- 配置元素优先级
环境变量
airflow.cfg中的配置
airflow.cfg中的命令
默认
数据库
- 官方推荐 MySQL or Postgres 两种数据库
- 本地配置好数据库后
在airflow.cfg 中配置“executor”为“LocalExecutor”,可以在本地并行化任务实例的执行程序。