[Airflow] 안녕, 에어플로우! with PythonOperator

PythonOperator는 어떤 환경에서도 가장 기본이 되는 오퍼레이터입니다. 파이선 함수를 태스크로 사용합니다.

$ cd $AIRFLOW_HOME
$ mkdir dags
$ ls
airflow.cfg
airflow.db
dags
logs
unittests.cfg

dags 디렉토리에 우리의 첫 번째 DAG을 만들겠습니다. 즐겨쓰는 편집기를 이용해서 dags/hello_airflow.py 파일을 만드세요.

from airflow.operators.bash_operator import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta

args = {
    'owner': 'airflow',
    'start_date': datetime(2018, 11, 1)
}

dag = DAG(
    dag_id='mon_disk',
    default_args=args,
    schedule_interval=timedelta(days=1))

def task

# Disk usage + LINE notify
cmd_notify = """
THRESHOLD_VALUE=$((1024**2))
RESULT=$(du ~/Library/Caches/* | sort -rn | awk -v TVAL=$THRESHOLD_VALUE '{ if($1==$1+0 && $1>TVAL) {split($2,u,"//"); print $1 /1024**2 " GB " u[1] } }')

TOKEN="## use your token ##"
curl -sX POST -H "Authorization: Bearer ${TOKEN}" -F "message='${RESULT}'" https://notify-api.line.me/api/notify
"""
BashOperator(task_id='check_usage', bash_command=cmd_notify, dag=dag)