# [Airflow] 안녕, 에어플로우! with BashOperator

Part 1. 실습으로 익히는 에어플로우 기본

# 1. 목적

이번 포스트에서는 하루에 한번씩 "안녕, 에어플로우!" 라고 인사하는 단순한 워크플로우를 만들어보겠습니다. 에어플로우(Airflow)에서 말하는 워크플로우는 하나의 DAG로 정의됩니다. 또한 DAG은 한 개 이상의 작업들(tasks)로 이루어집니다.

conda venv

DAG의 특징의 자세한 이야기는 뒤에 Part 3에서 할 예정이지만, 간단하게 용어설명 정도 하고 넘어가겠습니다.

DAG 이란 Directed Acyclic Graph의 약자로, "방향성 비순환 그래프"로 번역합니다. 다시 풀어 얘기하면

  • **그래프(graph)**는 노드(node, 꼭지점)와 엣지(edge, 변)으로 구성되는 구조를 말합니다.
  • 방향성(directed) 노드와 노드간의 엣지가 단방향으로 연결된다. (화살표로 연결된다)
  • 비순환(acyclic): 한번 지나간 노드는 다시 지나가지 않는다

즉, 작업간의 의존성 혹은 선후 관계가 생긴다고 할 수 있습니다.

conda venv

작업(task)는 그래프를 구성하는 노드라고 생각할 수 있습니다. 에어플로우는 여러 형태의 작업을 지원하는데, 우리는 가장 기본이 되는 BashOperator로 시작해보죠.

# 2. BashOperator

# 2.1 환경변수

우선 앞으로 자주 언급할 환경변수를 정의하겠습니다. 사용하시는 쉘에 맞게 .bashrc 또는 .zshrc 파일 마지막에 다음 줄을 추가합니다.

export AIRFLOW_HOME=$HOME/airflow
export PYTHONPATH=$AIRFLOW_HOME/config:$PYTHONPATH

추가한 내용이 적용되도록 source ~/.bashrc 명령을 실행합니다.

# 2.2 DAG을 위한 디렉토리

우선 AIRFLOW_HOME 디렉토리 아래에 dags 디렉토리를 만듭니다. 이 곳이 앞으로 만들 모든 DAG의 소스코드를 저장하는 곳입니다.

$ cd $AIRFLOW_HOME
$ mkdir dags
$ ls
airflow.cfg
airflow.db
dags
logs
unittests.cfg

dags 디렉토리에 우리의 첫 번째 DAG을 만들겠습니다. 즐겨쓰는 편집기를 이용해서 dags/hello_airflow.py 파일을 만드세요.

from airflow.operators.bash_operator import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta

args = {
    'owner': 'airflow',
    'start_date': datetime(2018, 11, 1)
}

dag = DAG(
    dag_id='hello_airflow',
    default_args=args,
    schedule_interval="@once")

# Bash Operator
cmd = 'echo "Hello, Airflow"'
BashOperator(task_id='t1', bash_command=cmd, dag=dag)

위의 소스코드에서 중요한 DAG과 작업(task)부터 살펴보겠습니다. 10행에서 DAG 클래스의 인스턴스 만듭니다. 매개변수는 세 개입니다.

  • dag_id: DAG를 구별하는 유일한 이름. 여기서는 `hello_airflow1
  • default_args: 각종 설정 데이터
  • schedule_interval: DAG 실행 간격. 예를들어 1시간, 하루, 일주일 등.

dags 디렉토리의 같은 dag_id의 DAG 이 여러 개가 있다면, 에어플로우는 그 중 마지막에 읽은 단 하나 밖에 인식하지 못하기 때문에 주의가 필요합니다

default_args는 매우 많은 설정을 가지고 있습니다. 뒤에서 설정파일과 함께 설명하겠습니다. 마지막으로 schedule_interval="@once" 는 한 번만 쓰겠다는 의미입니다. 일회성 DAG이나 테스트 할 때 주로 사용하는 설정입니다. 좀 더 실용적인 사용 예는 뒤에서 소개하겠습니다.

위의 소스코드에서 마지막 줄의 BashOperator 가 바로 작업입니다. 이름처럼 bash 명령을 실행하는 operator 입니다. 에어플로우가 동자하는 환경에서 실행가능한 모든 실행파일을 실행할 수 있다고 생각하시면 됩니다. 예를들어 ls, echo, mkdir, awk, df, jq 같은 명령들이 되겠죠.

우선 매개변수부터 설명하겠습니다.

  • task_id: 작업을 구별하는 유일한 이름. 여기서는 t1
  • bash_command: 실행할 bash 명령어. 문자열 형태.
  • dag: 작업이 속하는 DAG

16행의 문자열을 작업환경에서 실행하면 echo 명령어가 뒤에 나오는 문자열을 그대로 화면에 출력함을 확인할 수 있습니다.

$ echo "Hello, Airflow"
Hello, Airflow

마지막으로 라이브러리 import 문을 살펴보면, 1행은 BashOperator를 위해서, 2행은 DAG 클래스를 위해서, 3행은 DAGdefault_args에 들어가는 datetime 클래스를 사용하기 위해서 정의합니다.

이처럼 hello_airflow DAG은 하나의 작업(task)으로 구성되는 가장 단순한 DAG입니다.

conda venv

그럼 이제 실행해보겠습니다.

$ airflow test hello_airflow t1 20181101
...
[2018-11-26 16:10:13,503] {bash_operator.py:97} INFO - Running command: echo "Hello, Airflow"
[2018-11-26 16:10:13,509] {bash_operator.py:106} INFO - Output:
[2018-11-26 16:10:13,513] {bash_operator.py:110} INFO - Hello, Airflow
[2018-11-26 16:10:13,513] {bash_operator.py:114} INFO - Command exited with return code 0

테스트 명령을 조금 구체적으로 살펴보겠습니다.

airflow 명령은 많은 하위명령(subcommand)가 있습니다. airflow 만 입력하면 전체 하위명령을 확인할 수 있습니다.

$ airflow
usage: airflow [-h]
               {backfill,list_tasks,clear,pause,unpause,trigger_dag,delete_dag,pool,variables,kerberos,render,run,initdb,list_dags,dag_state,task_failed_deps,task_state,serve_logs,test,webserver,resetdb,upgradedb,scheduler,worker,flower,version,connections,create_user}

test 하위명령은 특정한 작업을 실행하는 명령이며, 뒤에서 다룰 의존성에 관계없이 무조건 실행하기 때문에 개발하며 동작을 확인할 때 편리한 명령입니다. airflow test 만 입력하면, dag_idtask_id, execution_date가 꼭 필요한 매개변수임을 알 수 있습니다.

$ airflow test
[2018-11-21 15:18:38,017] {__init__.py:51} INFO - Using executor SequentialExecutor
usage: airflow test [-h] [-sd SUBDIR] [-dr] [-tp TASK_PARAMS]
                    dag_id task_id execution_date

실행 방법이나 결과는 airflow testairflow run 이 동일합니다. 하지만, airflow run 은 실행결과를 데이터베이스에 남기고, airflow test는 남기지 않는 차이가 있습니다.

위에서 설명한 것 처럼 우리의 예제에서 dag_idhello_airflow, task_idt1 입니다. 그럼 execution_date는 어떤 값을 넣어야 할까요? execution_date는 다양한 날짜-시간 형태를 받아들입니다. 나중에 자세히 언급할 start_date 항목과 깊은 관계가 있지만, 지금은 형태에만 주목하도록 하겠습니다.

제가 실행할 때 사용한 20181101 값을 보시면, yyyyMMdd 형태인 것을 쉽게 알아차리실 수 있을 것입니다. 그 밖에도 2018-11-01 도 사용할 수 있습니다. 또한 시간도 필요할때가 있는데, 그럴 때는 20181101-1430 또는 2018-11-01T14:30 형태를 사용할 수 있습니다. 시간이 생략된 경우 기본 값은 0시 입니다.

# 3. 마크로 (macro)

에어플로우의 강력한 기능 중 하나가 마크로 입니다. 이번에도 간단한 소스코드와 함께 알아보시죠.

from airflow.operators.bash_operator import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta

args = {
    'owner': 'airflow',
    'start_date': datetime(2018, 11, 1)
}

dag = DAG(
    dag_id='hello_today',
    default_args=args,
    schedule_interval="@once")

# Bash Operator
cmd = 'echo "Hello! Today is {{ ds }}"'
BashOperator(task_id='t1', bash_command=cmd, dag=dag)

첫 번째 예제와 다른 부분을 찾으셨습니까? 그렇습니다. 11행에서 dag_idhello_today로 바뀌었고, 16행의 명령에 {{ ds }} 라는 생소한 문자열이 생겼습니다. 실행해보고 결과부터 확인해죠.

(batch) $ airflow test hello_today t1 20181101
[2018-11-21 15:31:26,361] {bash_operator.py:97} INFO - Running command: echo "Hello! Today is 2018-11-01"
[2018-11-21 15:31:26,370] {bash_operator.py:106} INFO - Output:
[2018-11-21 15:31:26,374] {bash_operator.py:110} INFO - Hello! Today is 2018-11-01
[2018-11-21 15:31:26,374] {bash_operator.py:114} INFO - Command exited with return code 0

위에 보시는 바와 같이 {{ ds }} 가 있던 자리에 2018-11-01 이라는 문자열이 출력됨을 확인하실 수 있습니다. 혹시, "하지만 오늘은 2018-11-01 이 아니야"라고 생각하시고 계신 분이 계신가요? 여기에 에어플로우의 작업을 실행할 때 항상 "시간"이 필요한 이유가 있습니다. 에어플로우는 항상 특정 시점을 기준으로 동작합니다. 우리가 2018-11-01오늘 이라고 알려줬기 때문에 Hello! Today is 2018-11-01 라고 출력한 것입니다. 마찬가지로 다른 시간을 주면, 다른 시간을 오늘 로 인식합니다.

(batch) $ airflow test hello_today t1 20181105
...
[2018-11-21 15:31:26,374] {bash_operator.py:110} INFO - Hello! Today is 2018-11-05
...

다만, 에어플로우는 과거 만 다룹니다. 미래 시간은 사용할 수 없습니다.

(batch) $ airflow test hello_today t1 20281105
[2018-11-26 16:18:51,774] {__init__.py:51} INFO - Using executor SequentialExecutor
[2018-11-26 16:18:51,920] {models.py:271} INFO - Filling up the DagBag from /Users/{USER}/airflow/dags
[2018-11-26 16:18:51,977] {models.py:1355} INFO - Dependencies not met for <TaskInstance: hello_today.t1 2028-11-05T00:00:00+00:00 [None]>, dependency 'Execution Date' FAILED: Execution date 2028-11-05T00:00:00+00:00 is in the future (the current date is 2018-11-26T07:18:51.977695+00:00).

ds 이외에도 공식적으로 지원하는 매크로는 아주 많습니다. 공식문서를 한번 살펴보세요. 나중에 예제를 통해 몇 가지 매크로에 대해 더 알아보고, "Part 4"에서 자신이 원하는대로 매크로를 확장하는 방법도 다루도록 하겠습니다.

# 4. 디스크 공간 알림 예제

작지만 유용한 예제를 하나 소개해드리겠습니다. 디스크 공간 알림 예제인데요, 하는 일은 특정 디렉토리의 크기가 1GB 이상인 경우 알림 메시지를 보내는 것입니다.

# 4.1 디스크 공간 검사하기

여기서는 아래 리눅스 명령들을 사용해보겠습니다.

  • 디렉토리의 크기를 알아내는 du 명령
  • 값을 정렬하는 sort 명령
  • 간단한 계산과 원하는 출력을 해주는 awk 명령

저는 맥OS를 사용하는데, 어떤 프로그램이 캐시를 많이 사용하는지 알려주는 알림을 받고 싶다고 가정하겠습니다. 경계값(threshold)을 1GB 라고 할 때, 아래 명령으로 캐시를 많이 사용하는 프로그램 목록을 얻을 수 있습니다.

$ THRESHOLD_VALUE=$((1024**2))
$ du ~/Library/Caches/* | sort -rn | awk -v TVAL=$THRESHOLD_VALUE '{ if($1==$1+0 && $1>TVAL) {split($2,u,"//"); print $1 /1024**2 " GB " u[1] } }'
4.2501 GB /Users/{USER}/Library/Caches/Homebrew
2.37842 GB /Users/{USER}/Library/Caches/Homebrew/Cask
...
1.16296 GB /Users/{USER}/Library/Caches/Google/Chrome/Default/Cache

이제 매일 오전 9시에 나에게 알림을 주도록 에어플로우 DAG으로 변환해보겠습니다.

from airflow.operators.bash_operator import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta

args = {
    'owner': 'airflow',
    'start_date': datetime(2018, 11, 1, 9, 0, 0)
}

dag = DAG(
    dag_id='mon_disk',
    default_args=args,
    schedule_interval=timedelta(days=1))

# Bash Operator
cmd = '''
THRESHOLD_VALUE=$((1024**2))
du ~/Library/Caches/* | sort -rn | awk -v TVAL=$THRESHOLD_VALUE '{ if($1==$1+0 && $1>TVAL) {split($2,u,"//"); print $1 /1024**2 " GB " u[1] } }'
'''
BashOperator(task_id='t1', bash_command=cmd, dag=dag)

이전 예제와 다른 부분은 start_date 에 시간(9시)까지 설정한 것과, DAG 초기화 할 때 schedule_interval@once 대신 timedelta(days=1) 값을 넣은 것입니다. 에어플로우 환경 설정이 완성되면, 이 DAG 은 매일 실행되며, 실행 시간은 오전 9시가 됩니다.

앞에서와 마찬가지 방법으로 테스트 해보겠습니다.

(batch) $ airflow test mon_disk t1 20181101
...
Output:
4.2501 GB /Users/{USER}/Library/Caches/Homebrew
2.37842 GB /Users/{USER}/Library/Caches/Homebrew/Cask
...
1.16378 GB /Users/{USER}/Library/Caches/Google/Chrome/Default/Cache
Command exited with return code 0

그런데, 뭔가 부족하지 않나요? 알림 메시지는 어떻게 보낼까요?

# 4.2 라인 노티파이(LINE notify) 추가하기

라인 메신져에는 LINE notify (opens new window)라는 API를 제공합니다. Node.js와 Python으로 LINE Notify 사용해보기(1) – 기본 (opens new window) 글을 참고하시면 쉽게 토근을 발급받을 수 있습니다.

위의 블로그 포스트에서는 파이선을 사용했지만, curl 명령을 통해서도 쉽게 메시지를 보낼 수 있습니다. 아래 예제는 TOKEN 값만 여러분의 토근으로 적어주시면 잘 동작합니다.

$ notify() {
    # kyryu token
    TOKEN="## use your token ##"

    MESSAGE="$1"
    echo ${MESSAGE}
    curl -X POST -H "Authorization: Bearer ${TOKEN}" -F "message=${MESSAGE}" https://notify-api.line.me/api/notify
}
$ MSG="New message"
$ notify "${MSG}"
New message
{"status":200,"message":"ok"}%                                                                                                                                                         (batch)

conda venv

이를 그대로 에어플로우 작업으로 바꿔보겠습니다.

TOKEN = "## use your token ##"
message = "Hello, World @ {{ ds }}"
cmd_notify = f"""
curl -sX POST -H "Authorization: Bearer {TOKEN}" -F "message='{message}'" https://notify-api.line.me/api/notify
"""

BashOperator(task_id='t2', bash_command=cmd_notify, dag=dag)

위의 코드 역시 테스트해보면 잘 동작합니다.

$ airflow test mon_disk t2 20181101

conda venv

이제 코드를 완성해보겠습니다. 지금까지 두 개로 만든 작업을

from airflow.operators.bash_operator import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta

args = {
    'owner': 'airflow',
    'start_date': datetime(2018, 11, 1)
}

dag = DAG(
    dag_id='mon_disk',
    default_args=args,
    schedule_interval=timedelta(days=1))

# Disk usage + LINE notify
cmd_notify = """
THRESHOLD_VALUE=$((1024**2))
RESULT=$(du ~/Library/Caches/* | sort -rn | awk -v TVAL=$THRESHOLD_VALUE '{ if($1==$1+0 && $1>TVAL) {split($2,u,"//"); print $1 /1024**2 " GB " u[1] } }')

TOKEN="## use your token ##"
curl -sX POST -H "Authorization: Bearer ${TOKEN}" -F "message='${RESULT}'" https://notify-api.line.me/api/notify
"""
BashOperator(task_id='check_usage', bash_command=cmd_notify, dag=dag)
airflow test mon_disk check_usage 20181101

conda venv

지금까지 BashOperator를 이용해서 단순하지만, 매우 유용한 디스크 알림예제를 만들어 봤습니다. 이를 응용하여 CPU, 힙메모리 사용률, 로그 파일 확인 등을 주기적으로 살펴보며 알림을 주는 간단한 모니터링 시스템으로 사용할 수도 있을 것입니다.

위의 예제가 잘 동작하기는 하나 배시 스크립트에 익숙하지 않으면 사용하기 어렵겠다는 생각을 하셨을 수도 있겠습니다. 하지만 에어플로우의 강력함은 이제부터 시작입니다. 다음 포스트에서 PythonOperator 를 사용해서 더 구조적이고 더 강력한 DAG을 만들어봅시다.

Last Updated: 3/23/2020, 11:10:33 PM