-->

How to work correctly airflow schedule_interval

2020-07-05 10:54发布

问题:

I want to try to use Airflow instead of Cron. But schedule_interval doesn't work as I expected.

I wrote the python code like below.
And in my understanding, Airflow should have ran on "2016/03/30 8:15:00" but it didn't work at that time.

If I changed it like this "'schedule_interval': timedelta(minutes = 5)", it worked correctly, I think.

The "notice_slack.sh" is just to call slack api to my channels.

# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import os
from airflow.operators import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 3, 29, 8, 15),
}

dag = DAG(
    dag_id='notice_slack',
    default_args=args,
    schedule_interval="@daily",
    dagrun_timeout=timedelta(minutes=1))

# cmd file name
CMD = '/tmp/notice_slack.sh'

run_this = BashOperator(
    task_id='run_transport', bash_command=CMD, dag=dag)

I want to run some of my scripts at specific time every day like this cron setting.

15 08 * * * bash /tmp/notice_slack.sh

I have read the document Scheduling & Triggers, and I know it's a little bit different cron.
So I attempt to arrange at "start_date" and "schedule_interval" settings.

Does anyone know what should I do ?

airflow version

INFO - Using executor LocalExecutor

v1.7.0

amazon-linux-ami/2015.09-release-notes

回答1:

Airflow will start your DAG when the 2016/03/30 8:15:00 + schedule interval (daily) is passed. So your DAG will run on 2016/03/31 8:15:00.

You can check the Airflow FAQ



回答2:

Try this:

# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import os
from airflow.operators import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 3, 29),
}

dag = DAG(
    dag_id='notice_slack',
    default_args=args,
    schedule_interval="15 08 * * *",
    dagrun_timeout=timedelta(minutes=1))

# cmd file name
CMD = 'bash /tmp/notice_slack.sh'

run_this = BashOperator(
    task_id='run_transport', bash_command=CMD, dag=dag)

start_date (datetime) – The start_date for the task, determines the execution_date for the first task instance. The best practice is to have the start_date rounded to your DAG’s schedule_interval.

schedule_interval (datetime.timedelta or dateutil.relativedelta.relativedelta or str that acts as a cron expression) – Defines how often that DAG runs, this timedelta object gets added to your latest task instance’s execution_date to figure out the next schedule.

Simply configuring the schedule_interval and bash_command as the same in your cron setting is okay.



回答3:

you can try using crontab.guru if you are not really sure how to create the airflow cron expression



回答4:

With the example you've given @daily will run your job after it passes midnight. You might try changing it either to timedelta(days=1) which is relative to your fixed start_date that includes 08:15. Or you could use a cron spec for the schedule_interval='15 08 * * *' in which case any start date prior to 8:15 on the day BEFORE the day you wanted the first run would work.

Note that depends_on_past: False is already the default, and you may have confused its behavior with catchup=false in the DAG parameters, which would avoid making past runs for time between the start date and now where the DAG schedule interval would have run.



标签: airflow