Skip to content

Airflow

Code snippet#

import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'ABC',
    'start_date': airflow.utils.dates.days_ago(1),
    'depends_on_past': False,
    # failure email
    'email': ['abc@xxx.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'pool': 'data_hadoop_pool',
    'priority_weight': 900,
    'queue': '66.66.0.66:8080'
}

dag = DAG(
    dag_id='daily', 
    default_args=default_args,
    schedule_interval='0 13 * * *')

def fetch_data_from_hdfs_function(ds, **kwargs):
    pass

def push_data_to_mysql_function(ds, **kwargs):
    pass

fetch_data_from_hdfs = PythonOperator(
    task_id='fetch_data_from_hdfs',
    provide_context=True,
    python_callable=fetch_data_from_hdfs_function,
    dag=dag)

push_data_to_mysql = PythonOperator(
    task_id='push_data_to_mysql',
    provide_context=True,
    python_callable=push_data_to_mysql_function,
    dag=dag)

fetch_data_from_hdfs >> push_data_to_mysql

update#

#default parameters
fetch_data_from_hdfs = PythonOperator(
    task_id='fetch_data_from_hdfs',
    provide_context=True,
    python_callable=fetch_data_from_hdfs_function,
    dag=dag)

#overwrite parameters
push_data_to_mysql = PythonOperator(
    task_id='push_data_to_mysql',
    queue='77.66.0.66:8080', #update
    pool='data_mysql_pool', #update
    provide_context=True,
    python_callable=push_data_to_mysql_function,
    dag=dag)

decouple#

import xx.fetch_data_from_hdfs 

def fetch_data_from_hdfs_function(ds, **kwargs):
    if not fetch_data_from_hdfs: 
        raise AirflowException('run fail: fetch_data_from_hdfs')

fetch_data_from_hdfs = PythonOperator(
    task_id='fetch_data_from_hdfs',
    provide_context=True,
    python_callable=fetch_data_from_hdfs_function,
    dag=dag)