ML 파이프라인과 실험 추적(ML Pipelines)
모델은 제품이 아닙니다. 파이프라인이 제품입니다. 파이프라인은 원시 데이터부터 배포된 예측까지의 모든 단계이며, 모든 단계는 재현 가능해야 합니다.
유형: Build
언어: Python
선수 지식: Phase 2, Lesson 12 (하이퍼파라미터 튜닝)
소요 시간: 약 120분
학습 목표
- 결측값 대체(imputation), 스케일링(scaling), 인코딩(encoding), 모델 학습(model training)을 하나의 재현 가능한 객체로 연결하는 ML 파이프라인(pipeline)을 처음부터 만듭니다.
- 데이터 누수(data leakage) 상황을 식별하고, 파이프라인이 변환기(transformer)를 학습 데이터(training data)에만 적합(fit)시킴으로써 이를 어떻게 방지하는지 설명합니다.
- 숫자형(numeric) 특성과 범주형(categorical) 특성에 서로 다른 전처리를 적용하는
ColumnTransformer를 구성합니다.
- 파이프라인 직렬화(serialization)를 구현하고, 적합된 같은 파이프라인이 학습과 운영 추론(inference)에서 동일한 결과를 만드는지 보입니다.
문제
데이터를 불러오고, 결측값을 중앙값(median)으로 채우고, 특성(feature)을 스케일링(scaling)하고, 모델을 학습하고, 정확도(accuracy)를 출력하는 노트북(notebook)이 있다고 가정해 봅시다. 잘 작동합니다. 그대로 배포합니다.
한 달 뒤 누군가 모델을 다시 학습했는데 결과가 달라집니다. 중앙값이 테스트 데이터(test data)를 포함한 전체 데이터셋(dataset)에서 계산되었습니다. 즉 데이터 누수입니다. 스케일링 파라미터(scaling parameter)가 저장되지 않아 추론(inference) 시점에 다른 통계가 사용되었습니다. 특성 공학(feature engineering) 코드가 학습(training)과 서빙(serving) 사이에 복사되어 있었고, 두 복사본이 서로 달라졌습니다. 운영(production) 환경에서 범주형 열(categorical column)에 학습 때 보지 못한 새 값이 들어왔습니다.
이것은 가상의 문제가 아닙니다. ML 시스템이 운영 환경에서 실패하는 가장 흔한 이유입니다. 파이프라인은 모든 변환 단계를 하나의 순서 있는 재현 가능한 객체로 묶어 이 문제들을 해결합니다.
개념
파이프라인이란 무엇인가
파이프라인은 데이터 변환(data transformation)의 순서 있는 시퀀스(sequence)와 그 뒤를 잇는 모델입니다. 각 단계는 이전 단계의 출력을 입력으로 받습니다. 전체 파이프라인은 학습 데이터에 한 번 적합됩니다. 추론 시점에는 같은 적합된 파이프라인이 새 데이터를 변환하고 예측을 만듭니다.
flowchart LR
A[원시 데이터] --> B[결측값 대체]
B --> C[숫자형 특성 스케일링]
C --> D[범주형 특성 인코딩]
D --> E[모델 학습]
E --> F[예측]
파이프라인은 다음을 보장합니다.
- 변환은 학습 데이터에만 적합됩니다. 즉 누수(leakage)가 없습니다.
- 추론 시점에도 같은 변환이 적용됩니다.
- 전체 객체를 하나의 산출물(artifact)로 직렬화(serialize)하여 배포할 수 있습니다.
- 교차 검증(cross-validation)이 폴드(fold)마다 파이프라인을 적용해 미묘한 누수를 막습니다.
데이터 누수(Data Leakage): 조용한 실패 원인
데이터 누수는 테스트 세트(test set)나 미래 데이터의 정보가 학습 과정에 섞이는 일입니다. 파이프라인은 가장 흔한 형태의 누수를 막습니다.
누수 있는 잘못된 코드:
X = df.drop("target", axis=1)
y = df["target"]
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
X_train, X_test = X_scaled[:800], X_scaled[800:]
y_train, y_test = y[:800], y[800:]
스케일러(scaler)가 테스트 데이터를 봤습니다. 평균과 표준편차가 테스트 표본(sample)을 포함합니다. 이것은 정확도 추정치를 부풀립니다.
올바른 코드:
X_train, X_test = X[:800], X[800:]
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
파이프라인을 쓰면 이 부분을 매번 직접 생각할 필요가 없습니다. 파이프라인이 자동으로 처리합니다.
sklearn 파이프라인(sklearn Pipeline)
sklearn의 Pipeline은 변환기와 추정기(estimator)를 연결합니다. .fit(), .predict(), .score()는 모든 단계를 순서대로 적용합니다.
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
pipe = Pipeline([
("scaler", StandardScaler()),
("model", LogisticRegression()),
])
pipe.fit(X_train, y_train)
predictions = pipe.predict(X_test)
pipe.fit(X_train, y_train)을 호출하면:
- 스케일러가 X_train에서
fit_transform을 호출합니다.
- 모델이 스케일링된 X_train에서
fit을 호출합니다.
pipe.predict(X_test)를 호출하면:
- 스케일러가 X_test에서
transform을 호출합니다. fit_transform이 아닙니다.
- 모델이 스케일링된 X_test에서
predict를 호출합니다.
스케일러는 적합 과정에서 테스트 데이터를 절대 보지 않습니다. 이것이 핵심입니다.
실제 데이터셋에는 서로 다른 전처리가 필요한 숫자형 열과 범주형 열이 함께 있습니다. ColumnTransformer는 이 문제를 처리합니다.
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
numeric_pipe = Pipeline([
("impute", SimpleImputer(strategy="median")),
("scale", StandardScaler()),
])
categorical_pipe = Pipeline([
("impute", SimpleImputer(strategy="most_frequent")),
("encode", OneHotEncoder(handle_unknown="ignore")),
])
preprocessor = ColumnTransformer([
("num", numeric_pipe, ["age", "income", "score"]),
("cat", categorical_pipe, ["city", "gender", "plan"]),
])
full_pipeline = Pipeline([
("preprocess", preprocessor),
("model", GradientBoostingClassifier()),
])
OneHotEncoder의 handle_unknown="ignore"는 운영 환경에서 중요합니다. 새 범주, 예를 들어 모델이 한 번도 본 적 없는 도시 값이 들어오면 충돌(crash) 대신 0 벡터(zero vector)를 만듭니다.
실험 추적(Experiment Tracking)
파이프라인은 학습을 재현 가능하게 만들지만, 실험 사이에 무슨 일이 있었는지도 추적해야 합니다. 어떤 하이퍼파라미터(hyperparameter)를 썼는지, 어떤 데이터셋 버전이었는지, 지표(metric)는 어땠는지, 어떤 코드가 실행되었는지를 남겨야 합니다.
가장 흔한 오픈 소스(open-source) 해결책은 MLflow입니다.
import mlflow
with mlflow.start_run():
mlflow.log_param("max_depth", 5)
mlflow.log_param("n_estimators", 100)
mlflow.log_param("learning_rate", 0.1)
pipe.fit(X_train, y_train)
accuracy = pipe.score(X_test, y_test)
mlflow.log_metric("accuracy", accuracy)
mlflow.sklearn.log_model(pipe, "model")
각 실행(run)은 파라미터, 지표, 산출물, 전체 모델과 함께 기록됩니다. 실행을 비교하고, 어떤 실험이든 재현하고, 어떤 모델 버전이든 배포할 수 있습니다.
Weights & Biases(wandb) 도 호스팅(hosted) 대시보드(dashboard)와 함께 같은 기능을 제공합니다.
import wandb
wandb.init(project="my-pipeline")
wandb.config.update({"max_depth": 5, "n_estimators": 100})
pipe.fit(X_train, y_train)
accuracy = pipe.score(X_test, y_test)
wandb.log({"accuracy": accuracy})
모델 버전 관리(Model Versioning)
실험 추적 이후에는 모델 버전을 관리해야 합니다. 어떤 모델이 운영 환경에 있나요? 어떤 모델이 스테이징(staging)인가요? 지난주 모델은 무엇이었나요?
MLflow 모델 레지스트리(Model Registry)는 다음을 제공합니다.
- 버전 추적(Version tracking): 저장된 모든 모델에 버전 번호가 붙습니다.
- 단계 전환(Stage transitions): "Staging", "Production", "Archived" 상태를 관리합니다.
- 승인 워크플로(Approval workflow): 모델을 명시적으로 운영으로 승격해야 합니다.
- 롤백(Rollback): 이전 버전으로 즉시 되돌릴 수 있습니다.
DVC를 이용한 데이터 버전 관리
코드는 git으로 버전 관리합니다. 데이터도 버전 관리해야 하지만 git은 큰 파일에 적합하지 않습니다. DVC(Data Version Control)가 이 문제를 해결합니다.
dvc init
dvc add data/training.csv
git add data/training.csv.dvc data/.gitignore
git commit -m "Track training data"
dvc push
DVC는 실제 데이터를 원격 저장소(remote storage; S3, GCS, Azure)에 저장하고, git에는 해시(hash)를 기록한 작은 .dvc 파일만 둡니다. 어떤 git 커밋(commit)을 체크아웃(checkout)하면 dvc checkout으로 그 커밋이 사용한 정확한 데이터를 복원할 수 있습니다.
즉 모든 git 커밋이 코드와 데이터를 함께 고정합니다. 완전한 재현성입니다.
재현 가능한 실험(Reproducible Experiments)
재현 가능한 실험에는 네 가지가 필요합니다.
- 고정된 난수 시드(random seed): numpy, random, 프레임워크(framework; torch, sklearn)의 시드를 설정합니다.
- 고정된 의존성(dependency): 정확한 버전이 들어 있는
requirements.txt 또는 잠금 파일(lock file)이 필요합니다.
- 버전 관리되는 데이터: DVC 같은 도구를 사용합니다.
- 설정 파일(Config file): 모든 하이퍼파라미터를 하드코딩(hardcoding)하지 않고 설정(config)에 둡니다.
import numpy as np
import random
def set_seed(seed=42):
random.seed(seed)
np.random.seed(seed)
try:
import torch
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
except ImportError:
pass
노트북에서 프로덕션 파이프라인으로
flowchart TD
A[Jupyter Notebook] --> B[함수 추출]
B --> C[Pipeline 객체 구성]
C --> D[하이퍼파라미터 config file 추가]
D --> E[실험 추적 추가]
E --> F[데이터 검증 추가]
F --> G[테스트 추가]
G --> H[배포 패키징]
style A fill:#fdd,stroke:#333
style H fill:#dfd,stroke:#333
일반적인 진행은 다음과 같습니다.
- 노트북 탐색(Notebook exploration): 빠른 실험, 시각화, 특성 아이디어 정리
- 함수 추출(Extract functions): 전처리, 특성 공학, 평가(evaluation)를 모듈(module)로 이동
- 파이프라인 구성(Build Pipeline): 변환들을 sklearn
Pipeline 또는 커스텀 클래스(custom class)로 연결
- 설정 관리(Config management): 모든 하이퍼파라미터를 YAML/JSON 설정으로 이동
- 실험 추적(Experiment tracking): MLflow 또는 wandb 로깅(logging) 추가
- 데이터 검증(Data validation): 학습 전 스키마(schema), 분포(distribution), 결측값 패턴 확인
- 테스트(Tests): 변환기 단위 테스트(unit test)와 전체 파이프라인 통합 테스트(integration test)
- 배포(Deployment): 파이프라인을 직렬화하고 API(FastAPI, Flask)로 감싼 뒤 컨테이너화(containerize)
흔한 파이프라인 실수
| 실수 | 왜 나쁜가 | 해결 |
|---|
| 분할 전에 전체 데이터에 적합 | 데이터 누수 | Pipeline과 cross_val_score 사용 |
| 특성 공학을 파이프라인 밖에서 수행 | 학습과 서빙의 변환이 달라짐 | 모든 변환을 Pipeline 안에 둠 |
| 알 수 없는 범주(unknown category) 미처리 | 운영에서 새 값이 들어오면 충돌 | OneHotEncoder(handle_unknown="ignore") |
| 하드코딩된 열 이름 | 스키마 변경 시 깨짐 | 설정의 열 이름 목록 사용 |
| 데이터 검증 없음 | 잘못된 데이터로 조용히 틀린 예측 | 예측 전 스키마 검증 추가 |
| 학습/서빙 편차(training/serving skew) | 운영에서 다른 특성을 봄 | 학습과 추론에 하나의 Pipeline 객체 사용 |
만들어 보기
code/pipeline.py는 완전한 ML 파이프라인을 처음부터 만듭니다.
class CustomTransformer:
def __init__(self):
self.means = None
self.stds = None
def fit(self, X):
self.means = np.mean(X, axis=0)
self.stds = np.std(X, axis=0)
self.stds[self.stds == 0] = 1.0
return self
def transform(self, X):
return (X - self.means) / self.stds
def fit_transform(self, X):
return self.fit(X).transform(X)
Step 2: Pipeline 처음부터 구현하기
class PipelineFromScratch:
def __init__(self, steps):
self.steps = steps
def fit(self, X, y=None):
X_current = X.copy()
for name, step in self.steps[:-1]:
X_current = step.fit_transform(X_current)
name, model = self.steps[-1]
model.fit(X_current, y)
return self
def predict(self, X):
X_current = X.copy()
for name, step in self.steps[:-1]:
X_current = step.transform(X_current)
name, model = self.steps[-1]
return model.predict(X_current)
Step 3: 파이프라인으로 교차 검증하기
코드는 파이프라인을 사용한 교차 검증이 데이터 누수를 어떻게 막는지 보여줍니다. 스케일러는 각 폴드의 학습 데이터에 따로 적합됩니다.
Step 4: sklearn으로 프로덕션 파이프라인 구성하기
ColumnTransformer, 여러 전처리 경로, 모델을 포함한 완전한 파이프라인을 만들고, 올바른 교차 검증과 실험 로깅을 함께 사용합니다.
사용하기
이 lesson에서 만든 파이프라인 기준은 다음 상황에 바로 적용합니다.
- 노트북 실험을 반복 가능한 학습 스크립트(script)로 옮길 때
- 전처리가 학습과 추론 사이에서 달라질 위험이 있을 때
- 테스트 세트 정보가 전처리에 섞였는지 의심될 때
- 모델 산출물을 저장하고 나중에 같은 방식으로 예측해야 할 때
산출물 만들기
이 lesson의 최종 산출물은 다음입니다.
outputs/prompt-ml-pipeline.md: ML 파이프라인을 만들고 디버깅(debugging)하기 위한 프롬프트(prompt)
code/pipeline.py: 처음부터 만든 파이프라인과 sklearn 파이프라인 예제
산출물은 데이터 누수 여부, 학습/서빙 편차 여부, 재현성 조건(시드, 의존성, 데이터 버전, 설정)을 점검할 수 있어야 합니다.
연습문제
-
(쉬움) 숫자형 열 3개와 범주형 열 2개가 있는 데이터셋을 처리하는 파이프라인을 만듭니다. ColumnTransformer로 숫자형에는 중앙값 대체와 스케일링을, 범주형에는 최빈값 대체(most-frequent imputation)와 원-핫 인코딩(one-hot encoding)을 적용합니다. 5겹 교차 검증(5-fold cross-validation)으로 학습합니다.
-
(중간) 의도적으로 데이터 누수를 넣습니다. 분할 전에 전체 데이터셋에 스케일러를 적합시킵니다. 누수가 있는 교차 검증 점수와 깨끗한 파이프라인 교차 검증 점수를 비교합니다. 차이는 얼마나 큰가요?
-
(중간) joblib.dump로 파이프라인을 직렬화합니다. 별도 스크립트에서 로드(load)하고 예측을 실행합니다. 예측이 동일한지 확인합니다.
-
(어려움) 가장 중요한 숫자형 열 두 개에 차수(degree) 2의 다항 특성(polynomial feature)을 만드는 커스텀 변환기를 파이프라인에 추가합니다. 파이프라인의 어느 위치에 들어가야 하나요?
-
(어려움) 파이프라인에 MLflow 추적을 설정합니다. 서로 다른 하이퍼파라미터로 5번 실험합니다. MLflow UI(mlflow ui)로 실행을 비교하고 최선의 모델을 고릅니다.
핵심 용어
| 용어 | 흔한 설명 | 실제 의미 |
|---|
| 파이프라인(Pipeline) | "변환 + 모델 체인" | 적합된 변환기들과 모델의 순서 있는 시퀀스를 하나의 단위로 적용해 누수를 막는 구조 |
| 데이터 누수(Data Leakage) | "테스트 정보가 학습에 샘" | 학습 세트 밖의 정보를 모델 구축에 사용해 성능 추정치를 부풀리는 문제 |
| ColumnTransformer | "열별 다른 전처리" | 서로 다른 열 부분집합에 서로 다른 파이프라인을 적용하고 결과를 결합하는 도구 |
| 실험 추적(Experiment Tracking) | "실행 로깅" | 모든 학습 실행의 파라미터, 지표, 산출물, 코드 버전을 기록하는 것 |
| MLflow | "모델 추적과 배포" | 실험 추적, 모델 레지스트리, 배포를 위한 오픈 소스 플랫폼 |
| DVC | "데이터용 git" | 큰 데이터 파일은 원격 저장소에 두고 git에는 해시를 저장하는 데이터 버전 관리 시스템 |
| 모델 레지스트리(Model Registry) | "모델 버전 카탈로그" | 스테이징, 운영, 보관(archived) 같은 단계 레이블(stage label)과 함께 모델 버전을 관리하는 시스템 |
| 학습/서빙 편차(Training/Serving Skew) | "노트북에서는 됐는데" | 학습과 추론에서 데이터 처리가 달라져 조용한 오류를 만드는 문제 |
| 재현성(Reproducibility) | "같은 코드, 같은 결과" | 같은 코드, 데이터, 설정으로 동일한 결과를 다시 얻을 수 있는 능력 |
더 읽을거리