완전한 비전 파이프라인 구축

프로덕션 비전 시스템(production vision system)은 데이터 계약(data contract)으로 이어 붙인 모델과 규칙의 체인입니다. 필요한 조각은 이미 이 phase 안에 있습니다. 이 종합 프로젝트(Capstone)는 그 조각들을 엔드투엔드(end-to-end)로 연결합니다.

유형: Build 언어: Python 선수 지식: Phase 4 Lessons 01-15 예상 시간: 약 120분

학습 목표

  • 객체를 탐지하고 분류한 뒤 구조화된 JSON을 내보내는 프로덕션 비전 파이프라인(production vision pipeline)을 설계합니다. 모든 실패 경로(failure path)를 처리합니다.
  • 탐지기(detector: Mask R-CNN 또는 YOLO), 분류기(classifier: ConvNeXt-Tiny), 데이터 계약(data contract: Pydantic)을 하나의 서비스로 연결합니다.
  • 엔드투엔드 파이프라인을 벤치마킹(benchmark)하고 첫 번째 병목(bottleneck), 보통 전처리(preprocessing) 다음의 탐지기를 식별합니다.
  • 이미지 업로드를 받아 파이프라인을 실행하고 탐지(detection)와 분류(classification) 결과를 돌려주는 최소한의 FastAPI 서비스를 배포 가능한 형태로 만듭니다.

문제

개별 비전 모델(vision model)은 유용하지만, 실제 비전 제품은 여러 모델이 이어진 체인입니다. 소매 진열대 감사(retail shelf audit)는 탐지기, 제품 분류기, 가격 광학 문자 인식(Optical Character Recognition; OCR) 파이프라인이 연결된 구조입니다. 자율주행(autonomous driving)은 2D 탐지기, 3D 탐지기, 분할기(segmenter), 추적기(tracker), 계획기(planner)가 함께 동작합니다. 의료 사전 선별(medical pre-screen)은 분할기, 영역 분류기, 임상의용 사용자 인터페이스(User Interface; UI)가 결합된 형태입니다.

이 체인을 실제로 연결하는 작업이 머신러닝(Machine Learning; ML) 프로토타입(prototype)과 제품(product)을 가릅니다. 모델과 모델 사이의 모든 인터페이스(interface)는 새로운 버그가 자라는 자리입니다. 모든 좌표 변환(coordinate transform), 정규화(normalization), 마스크 크기 조정(mask resize)이 조용한 실패(silent failure) 후보입니다. 파이프라인은 가장 약한 인터페이스만큼만 단단합니다.

이 종합 프로젝트는 최소 실행 가능 파이프라인(minimum viable pipeline), 즉 탐지와 분류, 구조화된 출력(structured output), 서빙 계층(serving layer)을 갖춘 골격을 세웁니다. Phase 4의 나머지 모든 요소는 이 골격(skeleton)에 꽂을 수 있습니다. Mask R-CNN을 YOLOv8로 바꾸거나, 광학 문자 인식(OCR) 헤드를 더하거나, 분할 브랜치(segmentation branch)를 더하거나, 추적기를 추가할 수 있습니다. 아키텍처(architecture)는 안정적으로 유지되고, 조각은 언제든 갈아 끼울 수 있습니다(pluggable).

사전 테스트

2문제 · 이 강의를 시작하기 전에 얼마나 알고 있는지 확인해보세요

1.프로덕션 비전 파이프라인(production vision pipeline)이 모든 단계 경계(stage boundary)에서 Pydantic 모델 또는 그와 동등한 스키마(schema)를 사용하는 이유는 무엇인가요?

2.CPU 전용(CPU-only) 비전 파이프라인에서 가장 큰 지연 시간 블록(latency block)이 되는 경우가 많은 단계는 무엇인가요?

0/2 답변 완료

개념

파이프라인(Pipeline)

flowchart LR
    REQ["HTTP 요청<br/>+ 이미지 바이트"] --> LOAD["디코드<br/>+ 전처리"]
    LOAD --> DET["탐지기<br/>(YOLO / Mask R-CNN)"]
    DET --> CROP["각 탐지 영역<br/>자르기 + 크기 조정"]
    CROP --> CLS["분류기<br/>(ConvNeXt-Tiny)"]
    CLS --> AGG["탐지 결과 + 클래스<br/>집계"]
    AGG --> SCHEMA["Pydantic<br/>검증"]
    SCHEMA --> RESP["JSON 응답"]

    REQ -.->|오류| RESP

    style DET fill:#fef3c7,stroke:#d97706
    style CLS fill:#dbeafe,stroke:#2563eb
    style SCHEMA fill:#dcfce7,stroke:#16a34a

일곱 단계로 구성됩니다. 두 개의 모델 단계는 비용이 크고, 나머지 다섯 단계는 버그가 자리 잡는 곳입니다.

Pydantic으로 데이터 계약 만들기

모든 모델 경계(model boundary)는 타입이 있는 객체(typed object)로 표현됩니다. 이렇게 하면 조용한 실패가 큰 소리로 드러나는 실패(loud failure)로 바뀝니다.

Detection(
    box: tuple[float, float, float, float],   # (x1, y1, x2, y2), 절대 픽셀 좌표
    score: float,                              # [0, 1]
    class_id: int,                             # 탐지기의 라벨 맵(label map)에서 가져온 클래스 id
    mask: Optional[list[list[int]]],           # 있으면 RLE로 인코딩된 마스크
)

PipelineResult(
    image_id: str,
    detections: list[Detection],
    classifications: list[Classification],
    inference_ms: float,
)

탐지기가 (x1, y1, x2, y2)가 아니라 (cx, cy, w, h) 형식으로 박스를 반환하면 Pydantic 검증(validation)이 경계에서 곧바로 실패합니다. 그 덕분에 다운스트림(downstream)의 자르기 단계가 조용히 빈 영역을 반환하는 문제를 오래 디버깅(debugging)하지 않아도 됩니다.

지연 시간(Latency)이 어디서 쓰이는가

거의 모든 비전 파이프라인에서 다음 세 가지가 성립합니다.

  1. 전처리가 가장 큰 단일 비용 블록(block)이 되는 경우가 많습니다. JPEG 디코드, 색 공간 변환(colour space conversion), 크기 조정은 CPU 종속(CPU-bound) 작업이고, 의외로 쉽게 잊힙니다.
  2. 탐지기가 GPU 시간을 지배합니다. 일반적으로 GPU 시간의 70-90%가 탐지 순전파(detection forward pass)에 쓰입니다.
  3. 후처리(postprocessing: NMS, RLE 인코딩/디코딩)는 GPU에서는 싸고 CPU에서는 비쌉니다. 항상 실제 운영 환경에서 프로파일링(profile)해야 합니다.

이러한 분포를 파악하고 있어야 최적화(optimisation)가 막연한 작업이 아니라 우선순위 목록으로 정리됩니다.

실패 모드(Failure modes)

  • 빈 탐지 결과(empty detections) — 충돌(crash)시키지 말고 빈 목록을 반환합니다. 로그는 반드시 남깁니다.
  • 경계 밖 박스(out-of-bounds boxes) — 자르기 전에 이미지 크기로 클램프(clamp)합니다.
  • 너무 작은 크롭(tiny crops) — 분류기의 최소 입력 크기보다 작은 박스는 분류를 건너뜁니다.
  • 깨진 업로드(corrupt upload) — 500이 아니라 구체적인 에러 코드(specific error code)가 있는 400 응답을 돌려줍니다.
  • 모델 로드 실패(model load failure) — 첫 요청이 들어왔을 때가 아니라 서비스 시작(startup) 시점에서 실패하게 만듭니다.

프로덕션 파이프라인은 일반적인 try/except로 실패를 묻어 두지 않고 각 실패를 개별적으로 처리합니다. 모든 실패는 이름 있는 코드(named code)와 명확한 응답을 가집니다.

배칭(Batching)

프로덕션 서비스는 여러 클라이언트의 요청을 동시에 처리합니다. 요청을 가로질러 탐지와 분류를 배치 처리하면 처리량(throughput)이 크게 늘어납니다. 그 대가로 배치가 찰 때까지 기다리는 추가 지연 시간이 발생합니다. 일반적인 설정은 최대 20ms 동안 요청을 모아 배치로 처리한 뒤 응답을 다시 분배하는 방식입니다. torchservetriton은 이를 네이티브(native)로 지원하며, 부하 패턴이 예측 가능한 작은 서비스에서는 마이크로배처(micro-batcher)를 직접 구현하기도 합니다.

직접 만들기

Step 1: 데이터 계약(Data contracts)

from pydantic import BaseModel, Field
from typing import List, Optional, Tuple

class Detection(BaseModel):
    box: Tuple[float, float, float, float]
    score: float = Field(ge=0, le=1)
    class_id: int = Field(ge=0)
    mask_rle: Optional[str] = None


class Classification(BaseModel):
    detection_index: int
    class_id: int
    class_name: str
    score: float = Field(ge=0, le=1)


class PipelineResult(BaseModel):
    image_id: str
    detections: List[Detection]
    classifications: List[Classification]
    inference_ms: float

5초 만에 작성하는 코드가 진지한 파이프라인에서 한 시간짜리 디버깅을 줄여줍니다.

Step 2: 최소한의 파이프라인 클래스

import time
import numpy as np
import torch
from PIL import Image

class VisionPipeline:
    def __init__(self, detector, classifier, class_names,
                 device="cpu", min_crop=32):
        self.detector = detector.to(device).eval()
        self.classifier = classifier.to(device).eval()
        self.class_names = class_names
        self.device = device
        self.min_crop = min_crop

    def preprocess(self, image):
        """
        image: PIL.Image 또는 np.ndarray (H, W, 3) uint8
        반환값: device 위의 CHW float tensor
        """
        if isinstance(image, Image.Image):
            image = np.asarray(image.convert("RGB"))
        tensor = torch.from_numpy(image).permute(2, 0, 1).float() / 255.0
        return tensor.to(self.device)

    @torch.no_grad()
    def detect(self, image_tensor):
        return self.detector([image_tensor])[0]

    @torch.no_grad()
    def classify(self, crops):
        if len(crops) == 0:
            return []
        batch = torch.stack(crops).to(self.device)
        logits = self.classifier(batch)
        probs = logits.softmax(-1)
        scores, cls = probs.max(-1)
        return list(zip(cls.tolist(), scores.tolist()))

    def run(self, image, image_id="anonymous"):
        t0 = time.perf_counter()
        tensor = self.preprocess(image)
        det = self.detect(tensor)

        crops = []
        detections = []
        valid_indices = []
        for i, (box, score, cls) in enumerate(zip(det["boxes"], det["scores"], det["labels"])):
            x1, y1, x2, y2 = [max(0, int(b)) for b in box.tolist()]
            x2 = min(x2, tensor.shape[-1])
            y2 = min(y2, tensor.shape[-2])
            detections.append(Detection(
                box=(x1, y1, x2, y2),
                score=float(score),
                class_id=int(cls),
            ))
            if (x2 - x1) < self.min_crop or (y2 - y1) < self.min_crop:
                continue
            crop = tensor[:, y1:y2, x1:x2]
            crop = torch.nn.functional.interpolate(
                crop.unsqueeze(0),
                size=(224, 224),
                mode="bilinear",
                align_corners=False,
            )[0]
            crops.append(crop)
            valid_indices.append(i)

        class_preds = self.classify(crops)

        classifications = []
        for valid_idx, (cls_id, cls_score) in zip(valid_indices, class_preds):
            classifications.append(Classification(
                detection_index=valid_idx,
                class_id=int(cls_id),
                class_name=self.class_names[cls_id],
                score=float(cls_score),
            ))

        return PipelineResult(
            image_id=image_id,
            detections=detections,
            classifications=classifications,
            inference_ms=(time.perf_counter() - t0) * 1000,
        )

모든 인터페이스에 타입이 붙어 있고, 모든 실패 경로에는 구체적인 처리 결정이 들어 있습니다.

Step 3: 탐지기와 분류기 연결

from torchvision.models.detection import maskrcnn_resnet50_fpn_v2
from torchvision.models import convnext_tiny

# 학습 없이도 현실적인 파이프라인을 만들기 위해 ImageNet 사전학습 가중치를 사용합니다.
detector = maskrcnn_resnet50_fpn_v2(weights="DEFAULT")
classifier = convnext_tiny(weights="DEFAULT")
class_names = [f"imagenet_class_{i}" for i in range(1000)]

pipe = VisionPipeline(detector, classifier, class_names)

# 합성 이미지로 동작 점검(smoke test)을 실행합니다.
test_image = (np.random.rand(400, 600, 3) * 255).astype(np.uint8)
result = pipe.run(test_image, image_id="demo")
print(result.model_dump_json(indent=2)[:500])

Step 4: FastAPI 서비스

from fastapi import FastAPI, UploadFile, HTTPException
from io import BytesIO

app = FastAPI()
pipe = None  # 서비스 시작 시점에 초기화합니다.

@app.on_event("startup")
def load():
    global pipe
    detector = maskrcnn_resnet50_fpn_v2(weights="DEFAULT").eval()
    classifier = convnext_tiny(weights="DEFAULT").eval()
    pipe = VisionPipeline(detector, classifier, class_names=[f"c{i}" for i in range(1000)])

@app.post("/detect")
async def detect_endpoint(file: UploadFile):
    if file.content_type not in {"image/jpeg", "image/png", "image/webp"}:
        raise HTTPException(status_code=400, detail="지원하지 않는 이미지 형식입니다")
    data = await file.read()
    try:
        img = Image.open(BytesIO(data)).convert("RGB")
    except Exception:
        raise HTTPException(status_code=400, detail="이미지를 디코딩할 수 없습니다")
    result = pipe.run(img, image_id=file.filename or "upload")
    return result.model_dump()

uvicorn main:app --host 0.0.0.0 --port 8000으로 실행합니다. curl -F 'file=@dog.jpg' http://localhost:8000/detect로 테스트합니다.

Step 5: 파이프라인 벤치마크

import time

def benchmark(pipe, num_runs=20, image_size=(400, 600)):
    img = (np.random.rand(*image_size, 3) * 255).astype(np.uint8)
    pipe.run(img)  # 워밍업(warm up)

    stages = {"전처리": [], "탐지": [], "분류": [], "전체": []}
    for _ in range(num_runs):
        t0 = time.perf_counter()
        tensor = pipe.preprocess(img)
        t1 = time.perf_counter()
        det = pipe.detect(tensor)
        t2 = time.perf_counter()
        crops = []
        for box in det["boxes"]:
            x1, y1, x2, y2 = [max(0, int(b)) for b in box.tolist()]
            x2 = min(x2, tensor.shape[-1])
            y2 = min(y2, tensor.shape[-2])
            if (x2 - x1) >= pipe.min_crop and (y2 - y1) >= pipe.min_crop:
                crop = tensor[:, y1:y2, x1:x2]
                crop = torch.nn.functional.interpolate(
                    crop.unsqueeze(0), size=(224, 224), mode="bilinear", align_corners=False
                )[0]
                crops.append(crop)
        pipe.classify(crops)
        t3 = time.perf_counter()
        stages["전처리"].append((t1 - t0) * 1000)
        stages["탐지"].append((t2 - t1) * 1000)
        stages["분류"].append((t3 - t2) * 1000)
        stages["전체"].append((t3 - t0) * 1000)

    for stage, times in stages.items():
        times.sort()
        print(f"{stage:12s}  p50={times[len(times)//2]:7.1f} ms  p95={times[int(len(times)*0.95)]:7.1f} ms")

CPU에서의 전형적인 결과는 전처리 약 3ms, 탐지 300-500ms, 분류 20-40ms, 전체 350-550ms 수준입니다. GPU에서는 탐지가 20-40ms로 줄어들고, 그만큼 전처리와 분류가 상대적으로 더 중요해집니다.

사용해보기

프로덕션 템플릿은 보통 같은 구조 위에 다음과 같은 요소를 더합니다.

  • 모델 버전 관리(model versioning) — 응답에 모델 이름과 가중치 해시(weights hash)를 항상 기록합니다.
  • 요청별 추적 식별자(per-request trace ID) — 모든 요청의 단계별 시간을 로그로 남겨, 느린 응답이 어느 단계에서 발생했는지 추적할 수 있게 합니다.
  • 폴백 경로(fallback path) — 분류기가 타임아웃(timeout)되더라도 전체 요청을 실패시키지 말고 분류가 빠진 탐지 결과를 반환합니다.
  • 안전 필터(safety filter) — 부적절 콘텐츠(NSFW; Not Safe For Work)와 개인 식별 정보(Personally Identifiable Information; PII) 필터는 분류 뒤, 응답이 서비스를 떠나기 전에 실행합니다.
  • 배치 엔드포인트(batch endpoint) — 대량 처리를 위해 이미지 URL 목록을 받는 /detect_batch를 제공합니다.

프로덕션 서빙(serving) 환경에서는 torchserve, Triton Inference Server, BentoML이 배치 처리, 버전 관리, 메트릭(metrics), 헬스 체크(health check)를 기본으로 제공합니다. FastAPI를 직접 실행하는 방식도 프로토타입과 소규모 제품에는 충분히 적합합니다.

산출물 만들기

이 lesson의 최종 산출물은 다음과 같습니다.

  • outputs/prompt-vision-service-shape-reviewer.md — 비전 서비스 코드에서 계약과 응답 형태(response shape) 위반을 검토하고, 가장 먼저 터질 버그를 짚어 주는 프롬프트입니다.
  • outputs/skill-pipeline-budget-planner.md — 목표 지연 시간과 처리량을 입력으로 받아 파이프라인 단계별로 시간 예산(time budget)을 배정하고, 어느 단계가 예산을 가장 먼저 넘길지 표시하는 skill입니다.

연습문제

  1. (쉬움) 임의의 공개 데이터셋(open dataset)에서 이미지 10장에 파이프라인을 실행합니다. 단계별 평균 시간과 이미지당 탐지 개수 분포를 보고합니다.
  2. (중간) Detection에 마스크 출력 필드를 추가하고 RLE로 인코딩(encode)합니다. 객체가 10개 들어 있는 이미지에서도 JSON 응답이 1MB 아래로 유지되는지 확인합니다.
  3. (어려움) 분류기 앞에 마이크로배처를 추가합니다. 최대 10ms 동안 크롭을 모으고, 한 번의 GPU 호출로 모두 분류한 뒤 요청별로 결과를 돌려줍니다. 초당 5건의 동시 요청(concurrent request) 부하에서 처리량 증가와 추가 지연 시간을 측정합니다.

핵심 용어

용어흔한 설명실제 의미
파이프라인(pipeline)"시스템(system)"전처리, 추론(inference), 후처리 단계를 순서대로 잇는 체인이며, 각 단계 쌍 사이에 타입이 있는 인터페이스가 있다.
데이터 계약(data contract)"스키마(schema)"각 단계의 입출력이 따라야 하는 Pydantic 또는 dataclass 정의이다. 경계에서 통합 버그(integration bug)를 잡는다.
전처리(preprocessing)"모델 앞 단계"디코딩, 색 변환, 크기 조정, 정규화를 가리킨다. 보통 CPU 시간을 가장 많이 쓰는 지점이다.
후처리(postprocessing)"모델 뒤 단계"NMS, 마스크 크기 조정, 임계값 처리(threshold), RLE 인코딩을 묶어 부르는 말이다. GPU에서는 싸고 CPU에서는 비싸다.
마이크로배처(microbatcher)"모았다가 한 번에 순전파"여러 요청을 일정 시간 동안 모은 뒤 한 번의 배치 순전파(batched forward pass)를 실행하는 집계기(aggregator)이다.
추적 식별자(trace ID)"요청 ID(request id)"느린 요청을 엔드투엔드로 추적하기 위해 모든 단계에 로그로 남기는 요청별 식별자이다.
실패 코드(failure code)"이름 있는 에러"일반적인 500 대신 실패 종류마다 부여하는 구체적인 에러 코드이다. 클라이언트의 재시도 로직(retry logic)을 가능하게 한다.
헬스 체크(health check)"준비 상태 프로브(readiness probe)"서비스가 응답 가능한 상태인지 알려주는 가벼운 엔드포인트(endpoint)이다. 로드 밸런서(load balancer)가 사용한다.

더 읽을거리

실습 코드

이 강의의 실습 코드 1개

main
Code

산출물

이 강의에서 생성된 프롬프트, 스킬, 코드 산출물 2개

skill-pipeline-budget-planner

Given target latency and throughput, assign a time budget to every pipeline stage and flag which stage will miss its budget first

Skill
prompt-vision-service-shape-reviewer

Review a vision service's code for contract/response shape violations and name the first breaking bug

Prompt

확인 문제

3문제 · 모두 맞추면 완료 표시가 가능합니다

1.탐지 box 크기가 3x4 픽셀일 때 파이프라인 분류기(classifier)가 충돌(crash)합니다. 올바른 수정은 무엇인가요?

2.분류기에 배치 처리(batching)를 추가했습니다. 최대 10ms 기다렸다가 대기 중인 crop을 모두 배치로 묶어 한 번의 GPU 순전파(forward)를 실행합니다. 예상 효과는 무엇인가요?

3.사용자가 깨진 JPEG를 업로드했을 때 파이프라인 응답이 500 error입니다. 어떻게 고쳐야 하나요?

0/3 답변 완료