ML 파이프라인과 실험 추적(ML Pipelines)

모델은 제품이 아닙니다. 파이프라인이 제품입니다. 파이프라인은 원시 데이터부터 배포된 예측까지의 모든 단계이며, 모든 단계는 재현 가능해야 합니다.

유형: Build 언어: Python 선수 지식: Phase 2, Lesson 12 (하이퍼파라미터 튜닝) 소요 시간: 약 120분

학습 목표

  • 결측값 대체(imputation), 스케일링(scaling), 인코딩(encoding), 모델 학습(model training)을 하나의 재현 가능한 객체로 연결하는 ML 파이프라인(pipeline)을 처음부터 만듭니다.
  • 데이터 누수(data leakage) 상황을 식별하고, 파이프라인이 변환기(transformer)를 학습 데이터(training data)에만 적합(fit)시킴으로써 이를 어떻게 방지하는지 설명합니다.
  • 숫자형(numeric) 특성과 범주형(categorical) 특성에 서로 다른 전처리를 적용하는 ColumnTransformer를 구성합니다.
  • 파이프라인 직렬화(serialization)를 구현하고, 적합된 같은 파이프라인이 학습과 운영 추론(inference)에서 동일한 결과를 만드는지 보입니다.

문제

데이터를 불러오고, 결측값을 중앙값(median)으로 채우고, 특성(feature)을 스케일링(scaling)하고, 모델을 학습하고, 정확도(accuracy)를 출력하는 노트북(notebook)이 있다고 가정해 봅시다. 잘 작동합니다. 그대로 배포합니다.

한 달 뒤 누군가 모델을 다시 학습했는데 결과가 달라집니다. 중앙값이 테스트 데이터(test data)를 포함한 전체 데이터셋(dataset)에서 계산되었습니다. 즉 데이터 누수입니다. 스케일링 파라미터(scaling parameter)가 저장되지 않아 추론(inference) 시점에 다른 통계가 사용되었습니다. 특성 공학(feature engineering) 코드가 학습(training)과 서빙(serving) 사이에 복사되어 있었고, 두 복사본이 서로 달라졌습니다. 운영(production) 환경에서 범주형 열(categorical column)에 학습 때 보지 못한 새 값이 들어왔습니다.

이것은 가상의 문제가 아닙니다. ML 시스템이 운영 환경에서 실패하는 가장 흔한 이유입니다. 파이프라인은 모든 변환 단계를 하나의 순서 있는 재현 가능한 객체로 묶어 이 문제들을 해결합니다.

사전 테스트

2문제 · 이 강의를 시작하기 전에 얼마나 알고 있는지 확인해보세요

1.ML 파이프라인(pipeline) 맥락에서 데이터 누수(data leakage)란 무엇인가요?

2.학습/테스트로 나누기 전에 전체 데이터셋에 스케일러를 적합(fit)하는 것이 누수인 이유는 무엇인가요?

0/2 답변 완료

개념

파이프라인이란 무엇인가

파이프라인은 데이터 변환(data transformation)의 순서 있는 시퀀스(sequence)와 그 뒤를 잇는 모델입니다. 각 단계는 이전 단계의 출력을 입력으로 받습니다. 전체 파이프라인은 학습 데이터에 한 번 적합됩니다. 추론 시점에는 같은 적합된 파이프라인이 새 데이터를 변환하고 예측을 만듭니다.

flowchart LR
    A[원시 데이터] --> B[결측값 대체]
    B --> C[숫자형 특성 스케일링]
    C --> D[범주형 특성 인코딩]
    D --> E[모델 학습]
    E --> F[예측]

파이프라인은 다음을 보장합니다.

  • 변환은 학습 데이터에만 적합됩니다. 즉 누수(leakage)가 없습니다.
  • 추론 시점에도 같은 변환이 적용됩니다.
  • 전체 객체를 하나의 산출물(artifact)로 직렬화(serialize)하여 배포할 수 있습니다.
  • 교차 검증(cross-validation)이 폴드(fold)마다 파이프라인을 적용해 미묘한 누수를 막습니다.

데이터 누수(Data Leakage): 조용한 실패 원인

데이터 누수는 테스트 세트(test set)나 미래 데이터의 정보가 학습 과정에 섞이는 일입니다. 파이프라인은 가장 흔한 형태의 누수를 막습니다.

누수 있는 잘못된 코드:

X = df.drop("target", axis=1)
y = df["target"]

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

X_train, X_test = X_scaled[:800], X_scaled[800:]
y_train, y_test = y[:800], y[800:]

스케일러(scaler)가 테스트 데이터를 봤습니다. 평균과 표준편차가 테스트 표본(sample)을 포함합니다. 이것은 정확도 추정치를 부풀립니다.

올바른 코드:

X_train, X_test = X[:800], X[800:]

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

파이프라인을 쓰면 이 부분을 매번 직접 생각할 필요가 없습니다. 파이프라인이 자동으로 처리합니다.

sklearn 파이프라인(sklearn Pipeline)

sklearn의 Pipeline은 변환기와 추정기(estimator)를 연결합니다. .fit(), .predict(), .score()는 모든 단계를 순서대로 적용합니다.

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression

pipe = Pipeline([
    ("scaler", StandardScaler()),
    ("model", LogisticRegression()),
])

pipe.fit(X_train, y_train)
predictions = pipe.predict(X_test)

pipe.fit(X_train, y_train)을 호출하면:

  1. 스케일러가 X_train에서 fit_transform을 호출합니다.
  2. 모델이 스케일링된 X_train에서 fit을 호출합니다.

pipe.predict(X_test)를 호출하면:

  1. 스케일러가 X_test에서 transform을 호출합니다. fit_transform이 아닙니다.
  2. 모델이 스케일링된 X_test에서 predict를 호출합니다.

스케일러는 적합 과정에서 테스트 데이터를 절대 보지 않습니다. 이것이 핵심입니다.

ColumnTransformer: 서로 다른 열(column)에 서로 다른 파이프라인 적용하기

실제 데이터셋에는 서로 다른 전처리가 필요한 숫자형 열과 범주형 열이 함께 있습니다. ColumnTransformer는 이 문제를 처리합니다.

from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer

numeric_pipe = Pipeline([
    ("impute", SimpleImputer(strategy="median")),
    ("scale", StandardScaler()),
])

categorical_pipe = Pipeline([
    ("impute", SimpleImputer(strategy="most_frequent")),
    ("encode", OneHotEncoder(handle_unknown="ignore")),
])

preprocessor = ColumnTransformer([
    ("num", numeric_pipe, ["age", "income", "score"]),
    ("cat", categorical_pipe, ["city", "gender", "plan"]),
])

full_pipeline = Pipeline([
    ("preprocess", preprocessor),
    ("model", GradientBoostingClassifier()),
])

OneHotEncoderhandle_unknown="ignore"는 운영 환경에서 중요합니다. 새 범주, 예를 들어 모델이 한 번도 본 적 없는 도시 값이 들어오면 충돌(crash) 대신 0 벡터(zero vector)를 만듭니다.

실험 추적(Experiment Tracking)

파이프라인은 학습을 재현 가능하게 만들지만, 실험 사이에 무슨 일이 있었는지도 추적해야 합니다. 어떤 하이퍼파라미터(hyperparameter)를 썼는지, 어떤 데이터셋 버전이었는지, 지표(metric)는 어땠는지, 어떤 코드가 실행되었는지를 남겨야 합니다.

가장 흔한 오픈 소스(open-source) 해결책은 MLflow입니다.

import mlflow

with mlflow.start_run():
    mlflow.log_param("max_depth", 5)
    mlflow.log_param("n_estimators", 100)
    mlflow.log_param("learning_rate", 0.1)

    pipe.fit(X_train, y_train)
    accuracy = pipe.score(X_test, y_test)

    mlflow.log_metric("accuracy", accuracy)
    mlflow.sklearn.log_model(pipe, "model")

각 실행(run)은 파라미터, 지표, 산출물, 전체 모델과 함께 기록됩니다. 실행을 비교하고, 어떤 실험이든 재현하고, 어떤 모델 버전이든 배포할 수 있습니다.

Weights & Biases(wandb) 도 호스팅(hosted) 대시보드(dashboard)와 함께 같은 기능을 제공합니다.

import wandb

wandb.init(project="my-pipeline")
wandb.config.update({"max_depth": 5, "n_estimators": 100})

pipe.fit(X_train, y_train)
accuracy = pipe.score(X_test, y_test)

wandb.log({"accuracy": accuracy})

모델 버전 관리(Model Versioning)

실험 추적 이후에는 모델 버전을 관리해야 합니다. 어떤 모델이 운영 환경에 있나요? 어떤 모델이 스테이징(staging)인가요? 지난주 모델은 무엇이었나요?

MLflow 모델 레지스트리(Model Registry)는 다음을 제공합니다.

  • 버전 추적(Version tracking): 저장된 모든 모델에 버전 번호가 붙습니다.
  • 단계 전환(Stage transitions): "Staging", "Production", "Archived" 상태를 관리합니다.
  • 승인 워크플로(Approval workflow): 모델을 명시적으로 운영으로 승격해야 합니다.
  • 롤백(Rollback): 이전 버전으로 즉시 되돌릴 수 있습니다.

DVC를 이용한 데이터 버전 관리

코드는 git으로 버전 관리합니다. 데이터도 버전 관리해야 하지만 git은 큰 파일에 적합하지 않습니다. DVC(Data Version Control)가 이 문제를 해결합니다.

dvc init
dvc add data/training.csv
git add data/training.csv.dvc data/.gitignore
git commit -m "Track training data"
dvc push

DVC는 실제 데이터를 원격 저장소(remote storage; S3, GCS, Azure)에 저장하고, git에는 해시(hash)를 기록한 작은 .dvc 파일만 둡니다. 어떤 git 커밋(commit)을 체크아웃(checkout)하면 dvc checkout으로 그 커밋이 사용한 정확한 데이터를 복원할 수 있습니다.

즉 모든 git 커밋이 코드와 데이터를 함께 고정합니다. 완전한 재현성입니다.

재현 가능한 실험(Reproducible Experiments)

재현 가능한 실험에는 네 가지가 필요합니다.

  1. 고정된 난수 시드(random seed): numpy, random, 프레임워크(framework; torch, sklearn)의 시드를 설정합니다.
  2. 고정된 의존성(dependency): 정확한 버전이 들어 있는 requirements.txt 또는 잠금 파일(lock file)이 필요합니다.
  3. 버전 관리되는 데이터: DVC 같은 도구를 사용합니다.
  4. 설정 파일(Config file): 모든 하이퍼파라미터를 하드코딩(hardcoding)하지 않고 설정(config)에 둡니다.
import numpy as np
import random

def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    try:
        import torch
        torch.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
    except ImportError:
        pass

노트북에서 프로덕션 파이프라인으로

flowchart TD
    A[Jupyter Notebook] --> B[함수 추출]
    B --> C[Pipeline 객체 구성]
    C --> D[하이퍼파라미터 config file 추가]
    D --> E[실험 추적 추가]
    E --> F[데이터 검증 추가]
    F --> G[테스트 추가]
    G --> H[배포 패키징]

    style A fill:#fdd,stroke:#333
    style H fill:#dfd,stroke:#333

일반적인 진행은 다음과 같습니다.

  1. 노트북 탐색(Notebook exploration): 빠른 실험, 시각화, 특성 아이디어 정리
  2. 함수 추출(Extract functions): 전처리, 특성 공학, 평가(evaluation)를 모듈(module)로 이동
  3. 파이프라인 구성(Build Pipeline): 변환들을 sklearn Pipeline 또는 커스텀 클래스(custom class)로 연결
  4. 설정 관리(Config management): 모든 하이퍼파라미터를 YAML/JSON 설정으로 이동
  5. 실험 추적(Experiment tracking): MLflow 또는 wandb 로깅(logging) 추가
  6. 데이터 검증(Data validation): 학습 전 스키마(schema), 분포(distribution), 결측값 패턴 확인
  7. 테스트(Tests): 변환기 단위 테스트(unit test)와 전체 파이프라인 통합 테스트(integration test)
  8. 배포(Deployment): 파이프라인을 직렬화하고 API(FastAPI, Flask)로 감싼 뒤 컨테이너화(containerize)

흔한 파이프라인 실수

실수왜 나쁜가해결
분할 전에 전체 데이터에 적합데이터 누수Pipelinecross_val_score 사용
특성 공학을 파이프라인 밖에서 수행학습과 서빙의 변환이 달라짐모든 변환을 Pipeline 안에 둠
알 수 없는 범주(unknown category) 미처리운영에서 새 값이 들어오면 충돌OneHotEncoder(handle_unknown="ignore")
하드코딩된 열 이름스키마 변경 시 깨짐설정의 열 이름 목록 사용
데이터 검증 없음잘못된 데이터로 조용히 틀린 예측예측 전 스키마 검증 추가
학습/서빙 편차(training/serving skew)운영에서 다른 특성을 봄학습과 추론에 하나의 Pipeline 객체 사용

만들어 보기

code/pipeline.py는 완전한 ML 파이프라인을 처음부터 만듭니다.

Step 1: 커스텀 변환기(Custom Transformer)

class CustomTransformer:
    def __init__(self):
        self.means = None
        self.stds = None

    def fit(self, X):
        self.means = np.mean(X, axis=0)
        self.stds = np.std(X, axis=0)
        self.stds[self.stds == 0] = 1.0
        return self

    def transform(self, X):
        return (X - self.means) / self.stds

    def fit_transform(self, X):
        return self.fit(X).transform(X)

Step 2: Pipeline 처음부터 구현하기

class PipelineFromScratch:
    def __init__(self, steps):
        self.steps = steps

    def fit(self, X, y=None):
        X_current = X.copy()
        for name, step in self.steps[:-1]:
            X_current = step.fit_transform(X_current)
        name, model = self.steps[-1]
        model.fit(X_current, y)
        return self

    def predict(self, X):
        X_current = X.copy()
        for name, step in self.steps[:-1]:
            X_current = step.transform(X_current)
        name, model = self.steps[-1]
        return model.predict(X_current)

Step 3: 파이프라인으로 교차 검증하기

코드는 파이프라인을 사용한 교차 검증이 데이터 누수를 어떻게 막는지 보여줍니다. 스케일러는 각 폴드의 학습 데이터에 따로 적합됩니다.

Step 4: sklearn으로 프로덕션 파이프라인 구성하기

ColumnTransformer, 여러 전처리 경로, 모델을 포함한 완전한 파이프라인을 만들고, 올바른 교차 검증과 실험 로깅을 함께 사용합니다.

사용하기

이 lesson에서 만든 파이프라인 기준은 다음 상황에 바로 적용합니다.

  • 노트북 실험을 반복 가능한 학습 스크립트(script)로 옮길 때
  • 전처리가 학습과 추론 사이에서 달라질 위험이 있을 때
  • 테스트 세트 정보가 전처리에 섞였는지 의심될 때
  • 모델 산출물을 저장하고 나중에 같은 방식으로 예측해야 할 때

산출물 만들기

이 lesson의 최종 산출물은 다음입니다.

  • outputs/prompt-ml-pipeline.md: ML 파이프라인을 만들고 디버깅(debugging)하기 위한 프롬프트(prompt)
  • code/pipeline.py: 처음부터 만든 파이프라인과 sklearn 파이프라인 예제

산출물은 데이터 누수 여부, 학습/서빙 편차 여부, 재현성 조건(시드, 의존성, 데이터 버전, 설정)을 점검할 수 있어야 합니다.

연습문제

  1. (쉬움) 숫자형 열 3개와 범주형 열 2개가 있는 데이터셋을 처리하는 파이프라인을 만듭니다. ColumnTransformer로 숫자형에는 중앙값 대체와 스케일링을, 범주형에는 최빈값 대체(most-frequent imputation)와 원-핫 인코딩(one-hot encoding)을 적용합니다. 5겹 교차 검증(5-fold cross-validation)으로 학습합니다.

  2. (중간) 의도적으로 데이터 누수를 넣습니다. 분할 전에 전체 데이터셋에 스케일러를 적합시킵니다. 누수가 있는 교차 검증 점수와 깨끗한 파이프라인 교차 검증 점수를 비교합니다. 차이는 얼마나 큰가요?

  3. (중간) joblib.dump로 파이프라인을 직렬화합니다. 별도 스크립트에서 로드(load)하고 예측을 실행합니다. 예측이 동일한지 확인합니다.

  4. (어려움) 가장 중요한 숫자형 열 두 개에 차수(degree) 2의 다항 특성(polynomial feature)을 만드는 커스텀 변환기를 파이프라인에 추가합니다. 파이프라인의 어느 위치에 들어가야 하나요?

  5. (어려움) 파이프라인에 MLflow 추적을 설정합니다. 서로 다른 하이퍼파라미터로 5번 실험합니다. MLflow UI(mlflow ui)로 실행을 비교하고 최선의 모델을 고릅니다.

핵심 용어

용어흔한 설명실제 의미
파이프라인(Pipeline)"변환 + 모델 체인"적합된 변환기들과 모델의 순서 있는 시퀀스를 하나의 단위로 적용해 누수를 막는 구조
데이터 누수(Data Leakage)"테스트 정보가 학습에 샘"학습 세트 밖의 정보를 모델 구축에 사용해 성능 추정치를 부풀리는 문제
ColumnTransformer"열별 다른 전처리"서로 다른 열 부분집합에 서로 다른 파이프라인을 적용하고 결과를 결합하는 도구
실험 추적(Experiment Tracking)"실행 로깅"모든 학습 실행의 파라미터, 지표, 산출물, 코드 버전을 기록하는 것
MLflow"모델 추적과 배포"실험 추적, 모델 레지스트리, 배포를 위한 오픈 소스 플랫폼
DVC"데이터용 git"큰 데이터 파일은 원격 저장소에 두고 git에는 해시를 저장하는 데이터 버전 관리 시스템
모델 레지스트리(Model Registry)"모델 버전 카탈로그"스테이징, 운영, 보관(archived) 같은 단계 레이블(stage label)과 함께 모델 버전을 관리하는 시스템
학습/서빙 편차(Training/Serving Skew)"노트북에서는 됐는데"학습과 추론에서 데이터 처리가 달라져 조용한 오류를 만드는 문제
재현성(Reproducibility)"같은 코드, 같은 결과"같은 코드, 데이터, 설정으로 동일한 결과를 다시 얻을 수 있는 능력

더 읽을거리

실습 코드

이 강의의 실습 코드 1개

pipeline
Code

산출물

이 강의에서 생성된 프롬프트, 스킬, 코드 산출물 1개

prompt-ml-pipeline

Build, debug, and deploy reproducible ML pipelines

Prompt

확인 문제

3문제 · 모두 맞추면 완료 표시가 가능합니다

1.sklearn에서 파이프라인 단계에 `fit_transform`을 호출하는 것과 `transform`을 호출하는 것의 차이는 무엇인가요?

2.실제 데이터셋에서 `ColumnTransformer`가 필요한 이유는 무엇인가요?

3.운영 모델이 학습 중 보지 못한 범주형 값인 `'new_category'`를 받았습니다. 파이프라인은 무엇을 처리해야 하나요?

0/3 답변 완료

추가 문제 풀기

AI가 강의 내용을 바탕으로 새로운 문제를 생성합니다