[Kubeflow] pipeline

mydailylogs
|2023. 4. 7. 16:58

kubeflow란

💡 컨테이너 기반의 End-to-end ML 워크 플로우 관리 플랫폼


Kubeflow Pipeline의 구성

  • Experiment, Job, Run을 관리, 추적하고 실행할 수 있는 UI

  • 파이프라인의 단계별 실행 스케쥴링을 위한 엔진

  • Python에서 파이프라인을 정의, 구축하기 위한 SDK

  • SDK와 파이프라인의 실행을 위한 쥬피터 노트북


Pipeline의 지향점

  • 쉬운 파이프라인 구성

  • 쉬운 파이프라인 생성: Trial/ Experiement 컴포넌트들을 통해 다양한 기술 적용 가능

  • 쉬운 재사용: 컨테이너 기반 구조

💡 간단하게 말해서 ML Pipeline이란, 워크플로우 컴포넌트를 통해 작업을 그래프 형태로 결합시킨 것!


Pipeline의 실행 과정

  1. 파이프라인이 실행되면, 시스템은 각 단계에 맞는 쿠버네티스 파드를 실행시킨다.

  2. 해당 파드는 설정된 컨테이너를 실행시킨다.

  3. 마찬가지로 해당 컨테이너는 내부의 애플리케이션을 실행한다.

  4. 이후 스케쥴러에 따라, 순서대로 컨테이너들을 실행시키는 형태로 파이프라인이 진행된다.


Pipeline 수행 과정 이해

 

Python SDK: 파이프라인 DSL을 통해 컴포넌트를 작성

In Python, DSL stands for Domain Specific Language. A DSL is a programming language that is specialized for a particular domain, such as data analysis, scientific computing, or web development. Some popular DSLs in Python include NumPy for numerical computing, Pandas for data analysis, and Django for web development.

 

  • DSL Compiler: 파이썬 코드를 YAML(쿠버네티스 리소스 양식)로 변환한다.

  • Pipeline Service: 쿠버네티스 리소스 양식에서 파이프라인을 생성하기 위해 호출

  • Kubernetes resources: Pipeline Service가 쿠버네티스 API 서버를 호출하여, 파이프라인 실행을 위한 쿠버네티스 리소스를 생성

  • Orchestration controllers: 생성된 쿠버네티스 리소스에 정의된 파이프라인을 실행하기 위한 할당 가능한 파드를 지정하며 컨테이너를 실행

  • Artifact storage: 실행된 파드에선 Meta-data → mysql 서버에 담고, 그 외의 큰 사이즈의 정보 → Minio 서버나, 별도 지정된 클라우드 스토리지에 저장

  • Pipeline web server: 실행된 파이프라인은 웹 UI를 통해 모니터링 가능


Component ( From 모두의 MLOPS)

💡 컴포넌트는 Component ContentsComponent Wrapper로 구성되며, 하나의 컴포넌트는 Wrapper를 통해 파이프라인에 전달되며 정의된 컴퍼넌트 콘텐츠를 실행(execute)하고 아티팩트(artifacts)를 생산합니다.

 

[출처] 모두의 mlops


Component Content

컴포넌트는 총 3가지로 구성됩니다.


1. Environment

2. Python code with Config

3. Generates Artifacts

예시와 함께 살펴보면

import dill
import pandas as pd

from sklearn.svm import SVC

train_data = pd.read_csv(train_data_path)
train_target= pd.read_csv(train_target_path)

clf= SVC(
    kernel=kernel
)
clf.fit(train_data)

with open(model_path, mode="wb") as file_writer:
     dill.dump(clf, file_writer)


다음과 같이 컴포넌트 콘텐츠로 나눌 수 있습니다.

[출처] 모두의 mlops


Environment는 파이썬 코드에서 사용하는 패키지들을 import하는 부분입니다.

다음으로 Python Code w\ Config 에서는 주어진 Config를 이용해 실제로 학습을 수행합니다.

마지막으로 아티팩트를 저장하는 과정이 있습니다.

Component Wrapper

컴포넌트 래퍼는 컴포넌트 콘텐츠에 필요한 Config를 전달하고 실행시키는 작업을 합니다.

[출처] 모두의 mlops


Kubeflow에서는 컴포넌트 래퍼를 위의 train_svc_from_csv와 같이 함수의 형태로 정의합니다. 컴포넌트 래퍼가 콘텐츠를 감싸면 다음과 같이 됩니다.

[출처] 모두의 mlops

Artifacts

위의 설명에서 컴포넌트는 아티팩트(Artifacts)를 생성한다고 했습니다. 아티팩트란 evaluation result, log 등 어떤 형태로든 파일로 생성되는 것을 통틀어서 칭하는 용어입니다. 그 중 다음과 같은 값을 주로 작업하게 됩니다.

[출처] 모두의 mlops

  • Model: ML 모델
  • Data: 데이터는 전 처리된 피처, 모델의 예측 값 등을 포함
  • Metric
    • 동적 지표: 학습과 함께 변화하는 값 (Train loss)
    • 정적 지표: 최종적으로 모델을 평가하는 지표 (Accuracy)
  • etc
💡 "모델이란 파이썬 코드와 학습된 Weights와 Network 구조 그리고 이를 실행시키기 위한 환경이 모두 포함된 형태"
- 모두의 MLOps


다시 파이프라인으로 돌아오자면 …

[출처] 모두의 mlops

파이프라인은 컴포넌트의 집합과 컴포넌트를 실행시키는 순서도로 구성되어 있습니다. 이때 순서도는 방향이 없는 그래프(Undirected Graph)로 표현되며, 간단한 조건문을 포함할 수 있습니다.

 

Pipeline Config

[출처] 모두의 mlops

컴포넌트를 실행시키기 위해 Config가 필요합니다. 이때 파이프라인을 구성하는 컴포넌트의 Config들을 모아둔 것이 파이프라인 Config입니다.

Run

  • Config 까지 주어진다면 → 파이프라인 실행가능

  • 이때 실행된 파이프라인 명세를 Run이라고 부름 (명세의 인스턴스화)


Run은 파이프라인의 실행 단위로서 앞서 그래프 형태로 표현된 파이프라인의 명세를 순서대로 실행합니다.

파이프라인이 실행되면 각 컴포넌트가 아티팩트들을 생성합니다. Kubeflow pipeline에서는 Run 하나당 고유한 ID 를 생성하고, Run에서 생성되는 모든 아티팩트들을 저장합니다.


Recurring Run

  • 주기적으로 파이프라인을 실행하는 Run
  • 대표적으로 Cron 형태로 수행가능 → 배치성 작업이나 모니터링 작업에 적합
  • Run Trigger을 통해 관리


Run Trigger

Run의 실행여부를 결정하는데 활용 (일종의 Flag)

  • Periodic: 간격 기반의 스케쥴링 (Ex. 30분에 한번 실행)
  • Cron: cron 형태의 스케쥴링 (Cron 형태란)


Step

  • 파이프라인 상에서 하나의 컴포넌트의 실행
  • 중첩/분기되어 실행되기도 함


Experiment

  • 파이프라인을 실행하는 워크스페이스
  • 파이프라인 실행을 논리적으로 묶어주는 역할 (K8s의 NS와 유사)
  • 별도 지정하지 않는 한 default라는 Experiment (group)에서 Run이 실행


Pipeline SDK

💡 파이프라인 SDK는 파이썬 패키지로 구성되어 있으며, 크게 5가지의 영역으로 나뉘어져 있습니다.


kfp.compiler

  • 파이프라인 컴포넌트를 빌드하는 클래스와 메소들의 패키지


kfp.components

  • 파이프라인 컴포넌트들을 다루는 클래스와 메소드들의 패키지

kfp.dsl

  • domain-specific language

  • 파이썬 코드를 통해 파이프라인을 작성하기 위해 필요한 클래스와 메소드, 모듈들을 모아둔 패키지


kfp.Client


파이프라인 API와 통신하는 파이썬 클라이언트
라이브러리 패키지


kfp.containers


컨테이너 이미지관련 메소드와 클래스들의 패키지


SDK로 파이프라인 만들기


파이프라인은 DSL을 통해서 작성되며, 컴파일 과정을 거쳐 쿠버네티스 파이프라인 리소스로 변환되어 사용됩니다.


파이프라인은 2가지 방법으로 작성이 가능합니다.


ksp.dsl 내부의 ContainerOp
를 이용하여 생성


이때, 각 Step에서 필요한 테스크를 수행하는 애플리케이션이 미리 작성이 되어 있어야 하며, 해당 애플리케이션은 도커 이미지로 패키징되어 단독으로도 실행이 가능해야 합니다.


이때, 도커 이미지는 ContainerOp의 매개변수를 통해 지정됩니다.

classkfp.dsl.BaseOp(
        name: # 컴포넌트 이름
        image: # 컨테이너 이미지 이름
        Command: # StringOrStringList 컨테이너 실행 명령어
        arguments: # StringOrStringList 컨테이너 실행 인자값
        ...
)


이를 활용한 파이프라인 정의

from kfp import ds1

def echo_op():
    return ds1.ContainerOp(
            name='echo',
            image='library/bash:4.4.23',
            command=['sh', '-c'],
            arguments=['echo "Hello World"'],
    )

@dsl.pipeline(
    name='ContainerOp pipeine',
    description="..."
)
def hello_world_pipeline():
    echo_task = echo_op()


활용 예시 1 - 쥬피터 노트북

if __name__ == "__main__":
    kfp.compiler.Compiler().complie(hello_world_pipeline, 'containerop.pipeline.tar.gz')



활용 예시 2 - dsl-compile

$ dsl-compile --py container.py --output containerpop.pipeline.tar.gz


파이썬 함수를 통해 생성


생성 예시

import kfp.dsl as dsl

@dsl.pipeline (
    name='example_1',
    description='A simple pipeline'
)
def my_pipeline(a: int = 1 b: str = "default value"):
    print(a)
    print(b)


활용 예제

if __name__ == "__main__":
    import kfp.compiler as compiler
    compiler.Compiler().compile(my_pipeline, 'my_pipeline.pipeline.tar.gz')