[Airflow] 최소 개발 환경 구축하기

v.1.10.0 버전 기준으로 글을 작성하는 도중 v1.10.1 이 릴리즈 되어서 관련된 부분을 업데이트 했습니다.

Part 1. 실습으로 익히는 에어플로우 기본

실습으로 시리즈를 시작하겠습니다. 아파치 에어플로우(Apache Airflow)는 파이선으로 만든 프로젝트입니다. 파이선은 여러 다른 언어로 만든 프로그램들을 실행하기 편하기 때문에 워크플로우를 만드는 언어로 적합하다고 할 수 있겠습니다.

여기서는 파이선 버전 3.6을 사용하겠습니다. 특정 버전을 사용하는 이유가 있습니다. 오래되었지만 호환성이 좋은 2.7버전을 사용하면 설치나 사용에 문제가 발생하는 패키지가 없어서 개발이 수월합니다. 하지만, 곧 공식 지원이 끝길 예정이니 미리 3.x 버전에 익숙해지는 것이 좋습니다. (파이선2의 은퇴까지 남은 시간은?) 현 시점의 가장 최신의 버전 3.7은 아직 호환이 안되는 패키지들이 많아서 사용이 어렵습니다.

1. 가상환경 만들기

파이선 가상환경을 미니콘다 로 선택하겠습니다. 콘다의 설치가 필요하거나 콘다에 익숙하지 않으시거나 아나콘다 는 알지만 미니콘다 는 처음 들어본다 하시는 분은 저의 이전 포스트 파이선 가상환경 콘다 시작하기를 참고하세요. 아나콘다가 설치되신 분들은 미니콘다와 실행환경의 차이는 없으므로 그대로 따라오시면 됩니다.

가상환경의 이름은 무엇이든 상관없지만, 저는 batch라는 이름으로 만들겠습니다. 파이선 패키지 버전을 직접 지정해줘서 - python=3.6 - 이 가상환경이 파이선 3.6 버전으로 실행 되게 합니다.

$ conda create -n batch python=3.6 -y

설치를 정상적으로 마치면, 새로운 가상환경에 들어가서 원하는 파이선 버전이 설치되었나 확인힙니다.

$ source activate batch
(batch) $ python --version
Python 3.6.7 :: Anaconda, Inc.

참고로 가상환경을 빠져나오는 명령은 source deactivate 입니다.

(batch) $ source deactivate
$

2. 에어플로우 패키지 설치

일단, 위에서 만든 가상환경 batch를 사용하고 있는지 확인해봅시다. 설치된 가상환경들의 목록을 확인하는 명령은 conda env list 또는 conda info --env 을 사용할 수 있습니다. * 표시가 된 항목이 현재 사용 중인 가상환경입니다. batch 항목에 * 표시가 되어있는지 확인하고 다른 환경에 표시되어있다면 다시 source activate batch 명령으로 올바른 환경으로 이동합니다.

$ conda env list
# conda environments:
#
base                  *  /Users/${USER}/miniconda3
batch                    /Users/${USER}/miniconda3/envs/batch
$ source activate batch
(batch) $

이제 에어플로우를 설치할 차례입니다. 글 쓰는 시점의 에어플로우 최신버전은 1.10.0 이며 pipconda로 모두 설치할 수 있는데, 각각 주의할 점이 있습니다. conda로 설치할 때는, 꼭 채널을 conda-forge로 지정해줘야 합니다. 여기서 패키지 이름은 airflow-with-kubernetes로 했는데, 이는 잡다한 warning 메시지를 표시하지 않기 위함이지, kubernetes를 바로 사용하기 위함이 아닙니다. warning 메지지를 신경쓰시지 않으시면 airflow 패키지도 괜찮습니다.

conda install -c conda-forge airflow-with-kubernetes

만일, pip로 설치한다면 패키지 이름을 apache-airflow로 해줘야 합니다. 만일 airflow 만 입력하면, 조금 오래된 1.8.0 버전의 패키지가 설치됩니다. pip 설치 후에도 kubernetes 관련 warning 이 발생하는데, pip install airflow['kubernetes'] 명령으로 메시지를 없앨 수 있습니다.

pip install apache-airflow

이때, 아래와 같은 오류를 만나면

    RuntimeError: By default one of Airflow's dependencies installs a GPL dependency (unidecode). To avoid this dependency set SLUGIFY_USES_TEXT_UNIDECODE=yes in your environment when you install or upgrade Airflow. To force installing the GPL version set AIRFLOW_GPL_UNIDECODE

아래와 같이 환경변수와 함께 설치합니다.

SLUGIFY_USES_TEXT_UNIDECODE=yes pip install apache-airflow

3. 에어플로우 기본 설정

에어플로우 패키지 설치가 잘 되었나 확인하기 전에 중요한 환경변수 한 가지를 알려드리겠습니다. 바로 AIRFLOW_HOME 입니다. AIRFLOW_HOME은 에어플로우 관련 모든 중요한 파일들이 저장되는 곳이며, 설정하지 않은 경우 기본 값은 홈디렉터리 아래 airflow 라는 디렉토리입니다. 만일 다른 위치를 원한다면 아래와 같이 환경변수를 설정하고, .bashrc.zshrc 파일 같은 쉘 설정파일에도 추가하도록 합시다. 저는 기본 값으로 진행하겠습니다.

export AIRFLOW_HOME=__SOMEWHERE_YOU_WANT_TO_USE__

에어플로우의 모든 명령행(command line) 명령은 하나의 airflow 명령으로 실행할 수 있어서 직관적입니다. 가장 먼저 버전을 확인해보겠습니다.

$ airflow version
...
  File "/Users/{USER}/miniconda3/envs/batch/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context
    context)
  File "/Users/{USER}/miniconda3/envs/batch/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 470, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: log [SQL: 'INSERT INTO log (dttm, dag_id, task_id, event, execution_date, owner, extra) VALUES (?, ?, ?, ?, ?, ?, ?)'] [parameters: ('2018-11-26 06:53:55.958480', None, None, 'cli_version', None, '{USER}', '{"host_name": "w022581806312m.local", "full_command": "[\'/Users/{USER}/miniconda3/envs/batch/bin/airflow\', \'version\']"}')]
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
   v1.10.1

무언가 많은 에러와 함께 마지막에 v1.10.0 을 확인할 수 있습니다. 무엇이 문제일까요? 설치가 잘못된 것일까요? 에어플로우는 워크플로우 관련된 모든 실행기록을 데이터베이스에 저장합니다. 데이터베이스는 sqlite3, MySQL, PostgresqlSQLAlchemy를 지원하는 다양한 제품을 사용할 수 있습니다. 이번 장에서는 에어플로우의 기본 데이터베이스 값이면서 가장 단순한 sqlite3을 이용한 방법을 설명합니다.

원래는 설정 파일을 변경해야하지만, 기본 값이기 때문에 설정 파일 변경없이 airflow initdb 명령으로 데이터베이스를 초기화합니다.

(batch) $ airflow initdb
[2018-11-16 00:00:24,557] {__init__.py:51} INFO - Using executor SequentialExecutor
DB: sqlite:////Users/${USER}/airflow/airflow.db
[2018-11-16 00:00:24,906] {db.py:338} INFO - Creating tables
...
INFO  [alembic.runtime.migration] Running upgrade 05f30312d566 -> f23433877c24, fix mysql not null constraint
INFO  [alembic.runtime.migration] Running upgrade f23433877c24 -> 856955da8476, fix sqlite foreign key
INFO  [alembic.runtime.migration] Running upgrade 856955da8476 -> 9635ae0956e7, index-faskfail
Done.

각자의 환경에 따라서 조금 다른 메시지가 표시되겠지만, 두 번째 줄에서 sqlite3이 사용하는 데이터베이스 파일의 위치와 - DB: sqlite:////Users/${USER}/airflow/airflow.db - 세 번째 줄의 Creating tables 같은 메시지로 데이터베이스 작업이 실행되었음을 알 수 있습니다.

데이터베이스 파일인 airflow.db 파일은 위에서 언급한 AIRFLOW_HOME 디렉토리에 위치합니다.

(batch) $ cd airflow

(batch) ~/airflow $ ls
airflow.cfg    airflow.db    logs    unittests.cfg

참고로, AIRFLOW_HOMEairflow.cfg 파일은 에어플로우 설정파일이고, logs 디렉토리는 우리가 워크플로우를 실행했을 때 실행 결과가 남는 곳입니다.

이제 airflow version을 다시 하면 깔끔한 메시지를 확인하실 수 있습니다.

(batch) ~/airflow $ airflow version
[2018-11-16 00:06:39,429] {__init__.py:51} INFO - Using executor SequentialExecutor
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
   v1.10.1
(batch) ~/airflow $ grep 'sql_alchemy_conn' airflow.cfg
sql_alchemy_conn = sqlite:////Users/${USER}/airflow/airflow.db

4. 에어플로우 웹 UI 확인

이제 본격적으로 시작할 준비가 거의 끝났습니다. 다음 포스트에서 사용할 명령어 두 가지만 미리보고 마치도록 하겠습니다. 우선 에어플로우가 관리하고 있는 DAG를 확인하는 airflow list_dags 명령입니다. 아직 시작도 안했는데 example_ 로 시작하는 많은 예제 DAG이 있습니다. 이 DAG의 소스코드는 웹 UI 에서 확인할 수 있지만, 앞으로 여러분들이 만들 DAG과는 달리 AIRFLOW_HOME에서는 발견할 수 없습니다. 나중에 설정에서 예제 DAG들을 없애는 방법을 소개하겠습니다.

(batch) ~/airflow $ airflow list_dags
[2018-11-16 00:10:37,439] {__init__.py:51} INFO - Using executor SequentialExecutor
[2018-11-16 00:10:37,643] {models.py:258} INFO - Filling up the DagBag from /Users/${USER}/airflow/dags


-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
example_bash_operator
example_branch_dop_operator_v3
example_branch_operator
...
tutorial

마지막으로 웹 UI를 실행해보겠습니다. 앞에서 이야기한데로 에어플로우는 필요한 모든 명령을 airflow 명령어로 수행할 수 있습니다. airflow webserver 명령을 실행합니다.

(batch) ~/airflow $ airflow webserver
[2018-11-16 00:15:42,549] {__init__.py:51} INFO - Using executor SequentialExecutor
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/

[2018-11-16 00:15:43,715] {models.py:258} INFO - Filling up the DagBag from /Users/${USER}/airflow/dags
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8080
Timeout: 120
Logfiles: - -
=================================================================
[2018-11-16 00:15:44 +0900] [87615] [INFO] Starting gunicorn 19.9.0
[2018-11-16 00:15:44 +0900] [87615] [INFO] Listening at: http://0.0.0.0:8080 (87615)
[2018-11-16 00:15:44 +0900] [87615] [INFO] Using worker: sync
[2018-11-16 00:15:44 +0900] [87621] [INFO] Booting worker with pid: 87621
[2018-11-16 00:15:44 +0900] [87622] [INFO] Booting worker with pid: 87622
...

웹 서버가 시작했습니다. 웹 브라우져를 열고 http://0.0.0.0:8080 주소를 열어보세요. 아래와 같은 화면을 확인하실 수 있습니다.

이제 에어플로우를 실습하기 위한 모든 준비를 마쳤습니다. 다음 장부터는 실제로 동작하는 예제와 함께 에어플로우 실습을 하겠습니다.

5. 명령 요약

혹시 따라하시다가 막혀서 다시 해보시는 분을 위해 제가 사용한 명령들을 간략하게 적어놓았으니 참고하세요.

# 가상환경 만들기
conda create -n batch python=3.6 -y

# 가상환경 진입하기
source activate batch

# airflow 패키지 설치
conda install -c conda-forge airflow-with-kubernetes

# 데이터베이스 초기화
airflow initdb

# 버전 확인
airflow version

# 예제 DAG 확인
airflow list_dags

# 웹 UI 시작
airflow webserver

# 웹 UI 접속. 브라우저에서
http://0.0.0.0:8080