데이터 파이프라인을 구성하다보면 근거 없는 자신감이 들 때가 있다. 그럴싸한 아키텍처를 설계하고 나면 이러이러한 서비스를 뚝딱뚝딱 이어붙이면 문제 없이 동작하겠지라는 생각이 사실 매번 드는 것 같다. 그러나 어디까지나 이상이고 현실은 다르다. 데이터 파이프라인은 많은 이유로 실패한다. 문제 상황도 아주 제각각인데, 데이터의 소스가 매우 다양하기에 발생하는 문제들, 오픈소스 호환성 때문에 발생하는 문제들, 데이터 파이프라인들 간에 의존도에 대한 이해가 부족하기에 발생하는 문제들 등 수많은 문제가 그렇다. 특히나 데이터 소스 간에 의존성이 발생하기 시작한다면 더욱 문제는 복잡해진다. 예를 들어 마케팅 채널 정보의 업데이트가 안된다면 다른 모든 정보들의 업데이트가 이뤄지지 않는 상황에서는 마케팅 채널 정보와 다른 정보들 간의 의존 관계가 형성되게 된다. 문제를 따라가다보면 쌓여있던 문제들이 왜 이제 왔냐며 나를 기다려왔다는 듯이 반겨준다. 물론 나는 반갑지 않다.

Airflow 서비스의 본질적인 필요성을 이해하기 위해서는 Incremental Update 환경에서의 Backfill에서 발생하는 여러 문제 상황들을 이해해야 한다. Incremental update는 데이터 저장소에 데이터를 업데이트할 경우 모든 데이터를 처음부터 다시 쓰는 것이 아니라 새롭게 추가된 부분만을 뒤에 이어붙이는 방식이다. Incremental update의 반대되는 개념으로는 Full Refresh가 있다. 이는 매번 소스의 내용을 전부 읽어오는 방식이다.

Full Refresh와 Incremental update에는 각각의 장단점이 있다. 먼저, Full Refresh를 통해 데이터를 업데이트해나간다면 데이터를 처음부터 읽어오기 때문에 매우 비효율적이다. 특히나 데이터가 커질 수록 Full Refresh를 통해 매번 데이터를 처음부터 써야하는 것은 불가능에 가깝게 들린다. 반면 Incremental update는 daily나 hourly 주기로 추가할 데이터만 쓰므로 비교적 효율적이다. 여기까지만 본다면 Incremental update 방식에는 아무런 문제가 없다.

다만 Inremental update는 유지보수가 매우 어렵다. 만약 소스 데이터 상에 문제가 생기거나, 중복 데이터가 쌓이게 되면 이를 일일히 확인해가며 데이터를 정기적으로 정제해주는 추가작업이 불가피하다. Full Refresh의 경우 매번 소스의 내용을 전부 읽어오므로 중복의 문제에서는 자유롭다. 혹여 데이터를 가져오는데 있어 중간에 문제가 발생하더라도 다시 처음부터 데이터를 추가하면 되므로 그리 큰 문제가 아닐 것이다. 그럼에도 데이터 엔지니어는 Incremental update를 지향해야 한다. Full Refresh는 매우 안전한 방법이지만 데이터의 크기가 커지는 환경을 고려하지 않기 때문이다. 

데이터가 작을 경우 가능하면 Full Refresh 방식의 데이터 업데이트를 진행한다. 만약 Incremental update만이 가능하다면 대상 데이터소스가 갖춰야할 필드의 조건이 몇가지 있다.

1. Created: 각 레코드가 처음 생성된 시점이 기록되어 있어야 한다. 이는 Incremental update에서 필수적이지는 않지만, 데이터의 수명 주기를 추적하거나 특정 시점 이후 생성된 레코드를 조회할 때 유용하다.

2. Modified: 각 레코드가 마지막에 수정된 시점을 나타낸다. 해당 필드가 있다면, 마지막 업데이트 이후  변경된 레코드만 선택하여 처리할 수 있다. 해당 필드를 통해 데이터 처리의 효율성을 높이고 불필요한 작업을 수행하지 않을 수 있다.

3. Deleted: 해당 필드는 레코드의 삭제 여부를 나타낸다. "Soft delete"라는 개념과 연관이 있는데, 이는 레코드를 데이터베이스에서 물리적으로 제거하는대신 레코드를 "Deleted"라고 표시함으로서 삭제하는 방법을 의미한다. 이렇게 하면, 삭제된 레코드와 그렇지 않은 레코드를 구분할 수 있으며 필요에 따라 레코드를 복구할 수도 있게 된다.

4. 만약 대상 소스가 API라면 특정 날짜를 기준으로 새로 생성되거나 업데이트된 레코드들을 읽어올 수 있어야 한다.

데이터 파이프라인이라면, Incremental update라면 특히나 문제가 발생할 여지가 많으며 데이터가 중복되거나 누락되었을때 Backfillng하는 것이 데이터 엔지니어의 주요한 책무이다. 다시 말해 멱등성(Idempotency)를 보장하는 것이 파이프라인 구축의 가장 중요한 문제이다. 이를 달성하기 위해서는 결국 다음 요소들이 요구된다.

1.  실패한 데이터 파이프라인을 재실행하는데 용이해야 한다.

2.  과거 데이터를 채우는 과정(Backfill)이 쉬워야 한다.

이러한 문제에서 보다 자유롭기 위해서 Airflow는 도입되었다.

먼저 Airflow는 Backfill이 용이하다. 앞서 설명하였듯 DAG로 구성된 태스크들을 시각적으로 쉽게 확인할 수 있다. DAG가 중간에 실패하였다면, 어떤 태스크에서 문제가 발생하였는지를 쉽게 대시보드 형태로 모니터링 할 수 있다. 즉 실패한 데이터 파이프라인의 재실행을 용이하게 만들어주자는 것이 Airflow의 의의이다.

스크립트 기반의 다음의 작업을 수행하는 파이프라인이 있다고 하자.

from datetime import datetime, timedelta

# 지금 시간 기준으로 어제 날짜를 계산
y = datetime.now() - timedelta(1)
yesterday = datetime.strftime(y, '%Y-%m-%d')

# yesterday에 해당하는 데이터를 소스에서 읽어옴
# 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"

 

이는 매우 이상적인 파이프라인에서만 유효하다. 앞서 설명했듯 파이프라인은 문제가 발생할 수 밖에 없고 데이터 소스의 복잡성이 늘어날 수록, 운영 기간이 늘어날 수록 문제는 복잡해진다.  만약 올 초의 1월 1일 문제가 생겨서 해당 일자 정보에 대한 Backfill을 수행해야한다면 다음의 코드로 수행을 할 것이다.

간단해보이는 코드이지만, 만약 파이프라인이 많고 복잡해질 수록 이러한 형태의 관리는 실수의 가능성이 높을 수 밖에 없다. 특히 연도를 잘못 기입한 후 파이프라인을 실행해버리면 말그대로 대참사가 발생해버릴 수 있다.

from datetime import datetime, timedelta

# 코드를 직접 수정
yesterday = '2023-01-01'

sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"

 

Airflow의 DAG에서는 catchup 파라미터가 True이면서 start_date와 end_date가 적절하게 설정된 경우 손쉽게 Backfill이 가능하다. Airflow에서는 날짜 별로 Backfill의 결과를 기록하고 성공 여부를 기록한다. 이를 통해 이후에도 쉽게 결과를 확인할 수 있게 된다. 해당 성공과 실패의 날짜들은 시스템에서 ETL의 인자로 제공된다. 때문에 데이터 엔지니어는 데이터의 날짜를 계산하지 않고 시스템이 지정해준 날짜('execution_date')를 사용만 하면 된다.

 

from airflow.operators.python_operator import PythonOperator

def sample_task(**context):
    # context로부터 실행 시점을 불러온다
    execution_date = context['execution_date']

    # 시스템에 의해 관리된 execution_date에 의해 어떤 데이터를 업데이트할지를 정할 수 있다.
    ...

task = PythonOperator(
    task_id='sample_task',
    python_callable=sample_task,
    provide_context=True,  # execution_date와 같은 context 변수를 넘길 때 필요하다.
    dag=my_dag,
)

 

파이썬의 datetime등을 통해 업데이트 대상을 선택하는 것은 안티 패턴이다. 대신 Airflow에서 지원하는 execution_date를 활용하여 레코드가 업데이트되는 날짜 혹은 시간을 알아낼 수 있도록 코드를 작성해야 한다.

 

물론 이 경우에도 수많은 하위 문제들이 발생한다. 

때문에 아무리 Airflow를 사용한다고 하더라도 데이터 파이프라인의 입력과 출력을 명확히 하고 이를 문서화하는 것이 중요하다. 특히 테이블이 늘어나고 데이터가 복잡해지는 경우 분석팀에서 데이터를 찾아내는 데 어려움을 겪을 수 있다. 소위 데이터 디스커버리 문제라고도 불리는데, 분석을 하고 싶어도 방대한 데이터에 압도되어 어디서부터 시작해야할지 모르는 경우가 종종 발생한다. 이때 잘 정리된 문서는 큰 도움이 된다.

또한 주기적으로 쓸모없는 데이터를 삭제하는 과정은 불가피하다. 아무리 잘 만든 파이프라인이라도 관리가 필요하다. 예를 들어 주기적으로 사용하지 않는 테이블과 파이프라인을 삭제한다거나 필수적으로 빠른 접근이 요구되는 데이터만을 DW(Hot storage)에 남기고 그렇지 않은 데이터를 DL(Cold Storage)에 옮기는 일은 인간의 개입이 필요할 가능성이 높다. 물론 이조차도 자동화할 수 있겠으나 이는 자동화에 따른 위험성을 고려해야 한다.

그 외에도 데이터 파이프라인의 사고가 발생하면 사고 리포트를 작성하여 이후의 비슷한 사고가 발생하였을 때 이를 방지하도록 한다거나 중요한 데이터 파이프라인의 경우에는 입력과 출력 데이터의 체크가 필요할 것이다. 간단하게는 입력 레코드의 수와 출력 레코드의 수를 체크하는 것에서부터 중복 레코드 체크를 수행한다거나 Summary 테이블을 만들고 PK가 존재한다면 정말로 PK의 Uniqueness가 보장이 되는지를 체크하는 것이 필요하다(Big query 같은 빅데이터 스토리지에서는 PK의 유일성이 보장되지 않는 것이 일반적). 

'Data Engineering > Airflow' 카테고리의 다른 글

DAG 작성  (0) 2023.07.09
Airflow의 DAG  (0) 2023.06.28