DAG 작성

mydailylogs
|2023. 7. 9. 17:06

💡 이번 글에서는 DAG가 어떻게 구성되어 있는지를 중점으로 기본적인 Airflow의 사용방법을 소개합니다.



데이터 셋

본격적으로 실습에 들어가기 앞서 이번 실습에 사용할 데이터 셋을 소개합니다.

하단의 링크에서는 우주와 관련된 다양한 데이터를 오픈 API 형태로 제공합니다. 하루 요청 제한(시간 당 15 call) 하에서 누구나 사용가능하며 로켓의 발사, 엔진의 테스트, 우주 비행사의 기록 등과 같은 신기한 정보들을 쉽게 제공합니다.

https://thespacedevs.com/llapi

 

TheSpaceDevs - Home

A group of space enthusiast developers working on services to improve accessibility of spaceflight information.

thespacedevs.com

 

Example을 한번 들어가보시면 로켓의 카운트 다운이 보입니다. 마음이 왠지 모르게 두근두근해집니다 !


아래의 링크는 해당 API에 대한 문서인데요. 다양한 REST API 리소스를 제공하니 한번 살펴보시고 원하는 데이터를 살펴보시면 좋을 것 같습니다. 해당 글에서는 발사가 임박한 로켓의 정보를 활용하고자 합니다. 다음의 URL을 통해서 기본적인 요청을 수행하며 테스트를 먼저 진행해보겠습니다.

 

$ curl -L "https://ll.thespacedevs.com/2.2.0/launch/upcoming"
{
  "count": 299,
  "next": "https://ll.thespacedevs.com/2.2.0/launch/upcoming/?limit=10&offset=10",
  "previous": null,
  "results": [
    {
      "id": "5d3e11d7-5d13-4c8c-b94a-c73da97c39a5",
      "url": "https://ll.thespacedevs.com/2.2.0/launch/5d3e11d7-5d13-4c8c-b94a-c73da97c39a5/",
      "slug": "falcon-9-block-5-starlink-group-5-13",
      "name": "Falcon 9 Block 5 | Starlink Group 5-13",
      "status": {
        "id": 3,
        "name": "Launch Successful",
        "abbrev": "Success",
        "description": "The launch vehicle successfully inserted its payload(s) into the target orbit(s)."
      },
...
			"webcast_live": false,
      "image": "https://spacelaunchnow-prod-east.nyc3.digitaloceanspaces.com/media/launch_images/falcon2520925_image_20230522091938.png",
...


JSON을 살펴보시면 로켓의 ID와 이름, 이미지 URL을 통해 로켓 발사 정보가 주어집니다.

REST API 콜 관련해서 파라미터가 궁금하시다면 아래 문서를 확인해보시길 바랍니다.

https://github.com/TheSpaceDevs/Tutorials/blob/main/faqs/faq_LL2.md

참고) "next": "https://ll.thespacedevs.com/2.2.0/launch/upcoming/?limit=10&offset=10" 처럼 filter를 통해 paginate가 가능합니다. 명시하지 않았다면 기본적으로 첫 페이지에서부터 10개의 로켓 정보를 가져옵니다. limit=<number>를 늘려주면 최대 100개의 로켓 정보까지 가져올 수 있으며, offset=<number>를 통해 pagination이 가능하다고 하니 참고하시기 바랍니다.


사실 위의 curl을 수행해보셨다면 의문이 드실 수 있습니다. “저렇게 간단하게 데이터를 뽑을 수 있다면 bash script를 사용해서 가져오면 될 껄 굳이 Airflow까지 써야하나 …” (넵 바로 제가 들었던 첫 번째 의문입니다.)

물론 Bash 스크립트 형태만으로 데이터를 가져와도 좋겠지만, 가장 먼저 발생할 문제는 이전 글에 명시하였던 Incremental update시에 발생하는 문제에 대해 능동적인 대처가 어렵다는 점일 것이다. 주기적으로 실행하는 추출 작업이 어느날 갑자기 문제가 생겼는데 중간에 문제가 된 부분만을 다시 추출해야한다면? 기존 스크립트의 추출로는 절대 쉽지 않다.

Airflow는 태스크를 DAG 형태로 제공하며 파이프라인을 세분화합니다. 문제가 발생한 태스크가 보다 명확해집니다. 또한 이를 시각적으로 확인까지 가능합니다. 또한 태스크 형태로 제공하다보니 의존성이 없다면 병렬화까지도 훨씬 용이해질 것입니다.

물론 Airflow가 무조건 옳을 수는 없겠습니다. 위에서 휘황찬란하게 서술했지만 현업에서 사용하게 되면 Airflow만큼 데이터 엔지니어를 괴롭히는 서비스도 없다고 합니다. 그럼에도 불구하고 Airflow는 기존 방식에 비해서는 확실한 개선이라고 말할 수 있겠습니다.

 

혹여 CeleryExecutor를 활용하신다면…

Airflow가 CeleryExecutor를 사용하고 있다면 저장되는 위치가 호스트 머신이 아니라 Airflow의 컨테이너들이 공유하는 볼륨이이기에 확인이 참 어렵습니다. 예를 들어 특정 컨테이너 (Airflow-trigger-1)에서 폴더를 만들어도 이는 개별 컨테이너의 변경 사항일 뿐이며 전체 Airflow 클러스터에는 반영이 되지 않습니다. 예를 들어 분명 데이터를 API에서 받아서 /tmp에 저장을 했지만, 도무지 확인할 방법이 없습니다. 특정 컨테이너의 터미널에 접근하여 ls를 수행해볼 수는 있겠지만, CeleryExecutor에서 사용하는 볼륨은 특정 컨테이너에 종속되는 볼륨이 아니기에 분명 Web UI에서는 파이프라인이 성공적으로 실행되었지만 아무리 찾아봐도 데이터를 찾을 수가 없습니다.

우리가 DAG를 작성하며 “/tmp/data”라는 경로에 데이터를 저장하고 싶다면 특정 컨테이너에 접근하여 “/tmp/data”라는 폴더를 만드는게 일반적인 리눅스 파일시스템의 접근이지만 CeleryExecutor는 컨테이너 환경 하에서 분산 처리라는 특수한 상황을 가정하기에 접근이 조금 달라야만 합니다.

가장 간단한 해결을 데이터를 저장하고자 하는 호스트 파일시스템의 경로를 Airflow 클러스터에서 사용하는 볼륨에 마운트시켜주는 것입니다. 혹여 Docker-compose로 빌드를 진행하신 상황에서 CeleryExecutor를 사용하고 있는 환경을 사용 중이신 경우, airflow 컨테이너들이 공유하는 x-airflow-common라는 Placeholder 아래의 volume이라는 필드를 통해 호스트 볼륨을 airflow 클러스터에 마운트할 수 있습니다.

version: '3.8'
x-airflow-common:
  &airflow-common
  # In order to add custom dependencies or upgrade provider packages you can use your extended image.
  # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
  # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.6.2}
  ...
  volumes:
    - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
    - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
    - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
    - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
    - /mnt/c/Users/admin/data/:/data/ # 여기!


별 다른 점은 없고, 직전의 정의에서 가장 아래줄의 형태로 호스트 볼륨을 airflow-common 내부의 볼륨에 마운트해주시면 됩니다. 제 경우에는 윈도우 환경의 wsl 파일 시스템을 사용하고 있으니, 혹여 다른 설정이 필요하신 분들께서는 호스트 파일시스템의 권한만 잘 확인해주신 후 마운트해주신다면 이후 문제 없으시리라 생각합니다.

Docker-compose.yaml을 찾고 계신분이 있다면 다음 사이트를 참고해보세요.

https://airflow.apache.org/docs/apache-airflow/2.6.2/docker-compose.yaml


Airflow에서 공식적으로 제공하는 yaml 파일입니다. 다른 것은 없고 볼륨 쪽에서 마운트 부분의 경우에는 기본 정의에는 없는 사항이니 상황에 맞게 추가해주시면 됩니다. (부디 제 설명이 직관적으로 잘 전달되었길 …!)

 

DAG 작성

본격적으로 Airflow를 통해 로켓 발사 데이터셋에 대한 파이프라인을 구성해보겠습니다.

파이프라인의 목표는 최대한 간단하게 설정해보겠습니다. 바로 로켓의 이미지를 제 호스트 컴퓨터에 저장하는 것입니다.

본격적으로 작성하기 앞서 제 환경을 다시 한번 설명드리면

1. 저는 Airflow를 Docker compose를 통해 컨테이너 환경에서 실행 중에 있습니다.

2. 저는 Docker를 Window 10의 wsl 환경 아래에서 실행 중에 있습니다.

3. 제 Airflow 클러스터는 CeleryExecutor를 기반으로 실행되고 있으며, 별도의 볼륨 마운트를 통해 호스트의 경로와 Airflow 클러스터 내부의 경로를 마운트해놓은 상황입니다.

앞서 API 호출을 수행한 결과를 바탕으로 Image 필드의 정보를 바탕으로 데이터를 호스트 머신에 저장해보겠습니다.

앞서 설명하였듯, DAG는 여러 태스크로 구성되며 여느 아키텍처가 그렇듯 태스크를 어떻게 쪼개는 것이 정답인지는 대부분 명확하지 않습니다. 다양한 DAG 작성 경험을 통해 각 시나리오 상에서 발생할 수 있는 장단점을 익혀가면 OK라고 생각합니다.

해당 글에서는 세 태스크로 나뉜 DAG를 작성하고자 합니다.

 

 

Task 1: curl 을 통해 로켓 API를 통해 데이터를 가져와 내부에 저장합니다.


본격적으로 각 태스크들을 작성하기에 앞서서 DAG를 정의해줍니다. 앞으로 정의할 태스크들은 해당 DAG에 속할 예정입니다.

dag = DAG(
    dag_id="download_rocket_launches", # DAG 구분자
    start_date=airflow.utils.dates.days_ago(14), # DAG의 처음 실행 시작 날짜
    schedule_interval=None, # DAG 실행주기
)


BashOperator를 통해 앞서 수행했던 curl 명령어를 별도의 태스크를 통해 구성합니다. Operator는 태스크를 사용하기 쉽도록 미리 구성된 템플릿이라고 생각해주시면 됩니다. (Operator 이야기를 하다보면 끝이 나지 않을 것 같아 추후 다른 글로 찾아오도록하겠습니다)

download_launches=BashOperator(
    task_id="download_launches", # task 구분자
    bash_command="curl -o /data/launches.json -L 'https://ll.thespacedevs.com/2.2.0/launch/upcoming'", 
    dag=dag, # 속하는 DAG
)


parameter 자체가 상당히 직관적이기 때문에 별도의 설명없이도 코드 내부의 주석을 통해서도 무리없이 이해하실 수 있으리라 생각합니다.

여기까지 태스크가 수행이 된다면 /data 경로 바로 아래에 launches.json이라는 파일이 생성되어야 합니다. 좀 더 아래쪽에서 파이프라인을 실행해보며 정말 생성이 된 건지 확인해보겠습니다.

 

 

Task 2: 내부 Image라는 필드를 통해 해당 URL에서 이미지를 다운받아 호스트에 저장합니다.

여기까지 왔다면 각 Image의 URL 정보를 가져올 수 있게 되었으니 실제 주소에 가서 이미지를 다운받아야겠죠. 앞선 작업과는 달리 JSON 필드에 접근하여 각 주소로부터 API 요청을 보내는 작업이 필요합니다. 작업이 조금 더 세밀하니 이번에는 파이썬 코드를 통해 태스크를 표현합니다.

def _get_pictures():
    pathlib.Path("/data/images").mkdir(parents=True, exist_ok=True)

    with open("/data/launches.json") as f:
        launches=json.load(f)
        image_urls=[launch["image"] for launch in launches["results"]]
        for image_url in image_urls:
            try:
                response=requests.get(image_url)
                image_filename=image_url.split("/")[-1]
                target_file=f"/data/images/{image_filename}"
                with open(target_file, "wb") as f:
                    f.write(response.content)
                print(f"Downloaded {image_url} to {target_file}")
            except requests_exceptions.MissingSchema:
                print(f"{image_url} appears to be an invalid URL.")
            except requests_exceptions.ConnectionError:
                print(f"Could not connect to {image_url}.")


먼저 pathlib.Path는 해당 경로에 폴더가 존재하는 지를 확인하고 만약 해당 경로에 폴더가 없다면 mkdir을 수행하여 경로를 생성해줍니다.

 pathlib.Path("/data/images").mkdir(parents=True, exist_ok=True)


그 다음에는 앞서 저장한 json 파일에 접근하여 데이터를 읽어옵니다. 이때 앞서 테스트 호출을 수행하면서 보셨듯 results의 하위 필드에 위치한 image 정보를 가져와야 합니다. 이미지의 이름의 경우에는 URL의 맨 마지막 부분을 잘라오며 정의합니다.

마지막에는 데이터를 저장해야겠죠. 데이터를 쓸때는 “wb (write binary)” 권한을 부여하는데 이는 현재 저장하고자 하는 데이터가 이미지(Non-text) 데이터이기에 Binary 형태로의 Write를 의미한다고 보시면 됩니다.

또 중요한 점은 Data Pipeline의 경우 에러 처리에 대해서 상당히 민감해야 한다는 점입니다. 해당 글에서는 간단히 try-catch를 사용하며 URL 구성이 이상하다던지, URL에 접근했는데 정작 이미지가 없다던지 등의 에러를 만났을 때 로그에 표시해줄 뿐입니다. 향후 에러 처리에 대해서도 별도의 글로 다뤄보도록 하겠습니다. (어째 써야할 글이 점점 늘어나는 것 같네요)


마지막으로 태스크의 동작 정의가 끝났다면, DAG에 등록해주며 마무리합니다. 직전의 BashOperator와 달리 파이썬 함수를 태스크로 옮기기 위해서는 PythonOperator를 사용합니다.

get_pictures=PythonOperator(
    task_id="get_pictures",
    python_callable=_get_pictures, # 등록할 Python 함수
    dag=dag,
)

 

 

Task 3: 실제로 잘 저장되었는지 확인합니다.

이후에는 간단하게 데이터 저장 경로에 접근하여 몇 개의 파일이 저장되었는지를 확인합니다.

notify=BashOperator(
    task_id="notify",
    bash_command='echo "There are now $(ls /data/images | wc -l) images."',
    dag=dag,
)


명령어는 중간의 파이프(’|’) 를 통해 ls의 결과를 wc에 넘기고 있습니다. 혹여 wc -l이 생소하시다면, wordcount -line이 줄여져있다고 기억하시면 바로 이해가 되실 것 같습니다.

여기까지 왔다면 DAG 내의 태스크 간의 실행 순서를 명시해줘야 합니다. 매우 직관적인 형태임을 확인하실 수 있습니다.

download_launches >> get_pictures >> notify


이렇게 정의한 DAG를 정리해보면 다음과 같습니다. 경로 쪽에서만 잘 신경써주시면 딱히 문제가 될 부분이 없는 코드입니다.

import json
import pathlib

import airflow
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="download_rocket_launches",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval=None,
)

download_launches=BashOperator(
    task_id="download_launches",
    bash_command="curl -o /data/launches.json -L 'https://ll.thespacedevs.com/2.2.0/launch/upcoming'",
    dag=dag,
)

def _get_pictures():
    pathlib.Path("/data/images").mkdir(parents=True, exist_ok=True)

    with open("/data/launches.json") as f:
        launches=json.load(f)
        image_urls=[launch["image"] for launch in launches["results"]]
        for image_url in image_urls:
            try:
                response=requests.get(image_url)
                image_filename=image_url.split("/")[-1]
                target_file=f"/data/images/{image_filename}"
                with open(target_file, "wb") as f:
                    f.write(response.content)
                print(f"Downloaded {image_url} to {target_file}")
            except requests_exceptions.MissingSchema:
                print(f"{image_url} appears to be an invalid URL.")
            except requests_exceptions.ConnectionError:
                print(f"Could not connect to {image_url}.")

get_pictures=PythonOperator(
    task_id="get_pictures",
    python_callable=_get_pictures,
    dag=dag,
)

notify=BashOperator(
    task_id="notify",
    bash_command='echo "There are now $(ls /data/images | wc -l) images."',
    dag=dag,
)

download_launches >> get_pictures >> notify

 


DAG 실행

무사히 Docker compose를 통해 Airflow가 띄워졌다면, http://localhost:8080/home에서 Airflow의 Web UI에 접근할 수 있습니다.

참고로 Airflow 공식 문서에서 제공하는 정의의 경우 ID와 PW가 airflow로 동일합니다.

로그인에 성공하셨다면 DAGs라는 제목과 함께 DAG들의 목록을 확인하실 수 있습니다.


지금 제 환경에서는 위에서 정의한 DAG가 실행이 완료되어 있는 상태인데요. 아마 방금 실행을 하셨다면 아직 DAG를 제출하지 않았으므로 인식이 안되는게 맞습니다.

docker compose up을 해주셨던 저장소를 확인해보시면 dags라는 폴더가 생성되는 것을 확인할 수 있습니다. 이름에서 알 수 있듯 DAG를 제출하는 폴더이고 아래 그림처럼 파이썬 코드 형태로 DAG 정의를 제출해주시면 됩니다.


여기까지 DAG의 추가까지  완료되었다면 UI에서 실행해주시면 됩니다.

실행하는 방법은 여러가지인데, 일단 해당 DAG로 들어가서 우측 상단에 있는 시작 버튼을 눌러주겠습니다.


만약 실행해서 문제가 없다면 제 실행처럼 초록색으로 완료가 되어야 합니다.

혹여 문제가 생겼을 시 로그를 추적해볼 수 있을텐데, 각 테스크 실행에 대한 로그는 제 화면 기준 초록색의 긴 사각형 아래의 개별 태스크를 의미하는 정사각형들을 눌러주셔야 합니다. 빨간색으로 표시되거나 주황색으로 표시되어 정상 실행이 되지 않은 태스크를 눌러주시면 되겠습니다. (저는 정상 실행이기에 빨간색이나 주황색은 없습니다.)


그러면 Details, Graph 옆에 친절하게 Logs를 제공하는 페이지 링크가 존재합니다.

제 경우 문제 없이 실행되어 이러한 메세지를 받을 수 있었습니다.


마지막으로 실제 호스트 PC의 경로에도 데이터가 저장된 것을 확인하였습니다.

 


마치며

저 또한 이제 막 Airflow를 공부하는 입장에서 거창한 형용사들로 글을 꾸며보았는데, 사실 그리 어려운 내용은 아닌지라 혹여 이해가 안되시는 부분이 있다면 제 글솜씨가 아쉬운 탓이라고 양해해주시면 감사하겠습니다 ㅠㅠ 아무쪼록 이해가 안되시는 부분이 있다면 최대한 제가 아는 선에서 도움드리겠습니다. 피드백도 환영합니다. 언제든 답글 주세요!

'Data Engineering > Airflow' 카테고리의 다른 글

Airflow의 도입 배경  (0) 2023.06.28
Airflow의 DAG  (0) 2023.06.28