Back to Blog

Machine Learning Model Deployment: ONNX, TorchServe, and FastAPI Serving

Deploy machine learning models in production — ONNX export, TorchServe configuration, FastAPI serving with Pydantic, batching, GPU inference, and monitoring mod

Viprasol Tech Team
April 20, 2026
13 min read

Machine Learning Model Deployment: ONNX, TorchServe, and FastAPI Serving

Training a model is 20% of the work. Getting it into production — reliably, with acceptable latency, at scale — is the other 80%. ML model deployment has more failure modes than most engineers expect: model drift, preprocessing inconsistencies between training and serving, GPU memory exhaustion under concurrent load, and cold starts that exceed SLA.

This guide covers three proven deployment patterns with working implementations.


The Serving Options

ApproachLatencyThroughputOps ComplexityBest For
FastAPI + PythonMedium (10–100ms)MediumLowPrototypes, small scale
ONNX RuntimeLow (1–30ms)HighLow–MediumCPU inference, cross-framework
TorchServeLow–MediumHighMediumPyTorch production serving
Triton Inference ServerVery LowVery HighHighGPU inference at scale
AWS SageMakerMediumHighLow (managed)Fully managed, AWS-native
BentoMLLow–MediumHighLowMulti-model, multi-framework

Pattern 1: FastAPI Model Serving

The simplest production setup — a FastAPI app that loads a model at startup and serves predictions.

# main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from typing import Optional
import numpy as np
import joblib
import time
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI(title="Fraud Detection API", version="1.0.0")

# Model loaded once at startup — not per request
model = None
scaler = None

@app.on_event("startup")
async def load_model():
    global model, scaler
    logger.info("Loading model...")
    model = joblib.load("models/fraud_detector_v2.pkl")
    scaler = joblib.load("models/feature_scaler_v2.pkl")
    logger.info("Model loaded successfully")

class TransactionFeatures(BaseModel):
    amount: float = Field(..., gt=0, description="Transaction amount in cents")
    merchant_category: int = Field(..., ge=0, le=999)
    hour_of_day: int = Field(..., ge=0, le=23)
    day_of_week: int = Field(..., ge=0, le=6)
    user_avg_transaction: float = Field(..., gt=0)
    transactions_last_hour: int = Field(..., ge=0)
    is_international: bool
    card_present: bool

class PredictionResponse(BaseModel):
    fraud_probability: float
    is_fraud: bool
    confidence: str  # "high" | "medium" | "low"
    model_version: str
    latency_ms: float

@app.post("/predict", response_model=PredictionResponse)
async def predict_fraud(transaction: TransactionFeatures):
    if model is None:
        raise HTTPException(status_code=503, detail="Model not loaded")

    start = time.time()

    # Feature engineering — must match training pipeline exactly
    features = np.array([[
        transaction.amount,
        transaction.merchant_category,
        transaction.hour_of_day,
        transaction.day_of_week,
        transaction.user_avg_transaction,
        transaction.transactions_last_hour,
        int(transaction.is_international),
        int(transaction.card_present),
        transaction.amount / transaction.user_avg_transaction,  # Derived feature
        transaction.transactions_last_hour * transaction.amount,  # Interaction
    ]])

    # Scale features (same scaler as training)
    features_scaled = scaler.transform(features)

    # Predict
    fraud_prob = float(model.predict_proba(features_scaled)[0][1])
    is_fraud = fraud_prob > 0.7  # Threshold tuned for business requirements

    latency_ms = (time.time() - start) * 1000

    # Log for monitoring
    logger.info({
        "event": "prediction",
        "fraud_prob": fraud_prob,
        "is_fraud": is_fraud,
        "amount": transaction.amount,
        "latency_ms": latency_ms,
    })

    confidence = (
        "high" if fraud_prob > 0.9 or fraud_prob < 0.1
        else "medium" if fraud_prob > 0.7 or fraud_prob < 0.3
        else "low"
    )

    return PredictionResponse(
        fraud_probability=round(fraud_prob, 4),
        is_fraud=is_fraud,
        confidence=confidence,
        model_version="fraud-detector-v2.1.0",
        latency_ms=round(latency_ms, 2),
    )

@app.get("/health")
async def health():
    return {
        "status": "healthy" if model is not None else "loading",
        "model_loaded": model is not None,
    }

Docker container:

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# Install dependencies first (cached unless requirements.txt changes)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy model artifacts
COPY models/ models/

# Copy application code
COPY main.py .

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s \
  CMD curl -f http://localhost:8000/health || exit 1

EXPOSE 8000

# Use gunicorn with uvicorn workers for production
CMD ["gunicorn", "main:app", \
     "--workers", "4", \
     "--worker-class", "uvicorn.workers.UvicornWorker", \
     "--bind", "0.0.0.0:8000", \
     "--timeout", "30"]

🤖 AI Is Not the Future — It Is Right Now

Businesses using AI automation cut manual work by 60–80%. We build production-ready AI systems — RAG pipelines, LLM integrations, custom ML models, and AI agent workflows.

  • LLM integration (OpenAI, Anthropic, Gemini, local models)
  • RAG systems that answer from your own data
  • AI agents that take real actions — not just chat
  • Custom ML models for prediction, classification, detection

Pattern 2: ONNX Runtime for Low-Latency CPU Inference

ONNX (Open Neural Network Exchange) is a standard format for ML models. Export your PyTorch/scikit-learn/XGBoost model to ONNX, then serve with ONNX Runtime — which is 2–5× faster than Python inference for many model types.

Export PyTorch model to ONNX:

# export_onnx.py
import torch
import torch.onnx
from model import SentimentClassifier  # Your model class

# Load trained model
model = SentimentClassifier(vocab_size=30000, embed_dim=128, hidden_dim=256)
model.load_state_dict(torch.load("checkpoints/sentiment_v3.pt"))
model.eval()

# Dummy input matching training shape
dummy_input = torch.zeros(1, 128, dtype=torch.long)  # batch=1, seq_len=128

# Export
torch.onnx.export(
    model,
    dummy_input,
    "models/sentiment_v3.onnx",
    export_params=True,
    opset_version=17,
    do_constant_folding=True,
    input_names=["input_ids"],
    output_names=["logits"],
    dynamic_axes={
        "input_ids": {0: "batch_size", 1: "sequence_length"},
        "logits": {0: "batch_size"},
    },
)

print("Model exported to ONNX")

# Verify export
import onnx
onnx_model = onnx.load("models/sentiment_v3.onnx")
onnx.checker.check_model(onnx_model)
print("ONNX model verified")

ONNX Runtime serving:

# onnx_server.py
from fastapi import FastAPI
from pydantic import BaseModel
import onnxruntime as ort
import numpy as np
from transformers import AutoTokenizer
import asyncio
from concurrent.futures import ThreadPoolExecutor

app = FastAPI()

# ONNX Runtime session with optimization
session_options = ort.SessionOptions()
session_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
session_options.intra_op_num_threads = 4

# CPU provider with thread config
providers = [
    ('CPUExecutionProvider', {
        'arena_extend_strategy': 'kNextPowerOfTwo',
    })
]

session = ort.InferenceSession(
    "models/sentiment_v3.onnx",
    sess_options=session_options,
    providers=providers,
)

tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
executor = ThreadPoolExecutor(max_workers=8)

class TextInput(BaseModel):
    text: str

def run_inference(input_ids: np.ndarray) -> np.ndarray:
    """CPU-bound inference — runs in thread pool"""
    outputs = session.run(
        ["logits"],
        {"input_ids": input_ids}
    )
    return outputs[0]

@app.post("/classify")
async def classify_sentiment(body: TextInput):
    # Tokenize
    encoded = tokenizer(
        body.text,
        max_length=128,
        padding="max_length",
        truncation=True,
        return_tensors="np",
    )
    input_ids = encoded["input_ids"].astype(np.int64)

    # Run inference in thread pool (non-blocking)
    loop = asyncio.get_event_loop()
    logits = await loop.run_in_executor(executor, run_inference, input_ids)

    # Softmax
    probs = np.exp(logits) / np.exp(logits).sum(axis=1, keepdims=True)
    label_idx = int(np.argmax(probs[0]))
    labels = ["negative", "neutral", "positive"]

    return {
        "label": labels[label_idx],
        "confidence": round(float(probs[0][label_idx]), 4),
        "probabilities": {
            label: round(float(prob), 4)
            for label, prob in zip(labels, probs[0])
        },
    }

Pattern 3: Batching for Throughput

Individual predictions are inefficient at high throughput. Batching collects multiple requests and processes them together.

# batch_server.py
import asyncio
from collections import defaultdict
import numpy as np

class BatchPredictor:
    def __init__(self, model_fn, max_batch_size: int = 32, max_wait_ms: float = 10.0):
        self.model_fn = model_fn
        self.max_batch_size = max_batch_size
        self.max_wait_ms = max_wait_ms
        self.queue: asyncio.Queue = asyncio.Queue()
        self.running = False

    async def start(self):
        self.running = True
        asyncio.create_task(self._batch_processor())

    async def _batch_processor(self):
        while self.running:
            batch = []
            futures = []

            # Wait for first item
            try:
                item, future = await asyncio.wait_for(
                    self.queue.get(),
                    timeout=0.1
                )
                batch.append(item)
                futures.append(future)
            except asyncio.TimeoutError:
                continue

            # Collect more items (up to max_batch_size) within max_wait_ms
            deadline = asyncio.get_event_loop().time() + self.max_wait_ms / 1000
            while len(batch) < self.max_batch_size:
                remaining = deadline - asyncio.get_event_loop().time()
                if remaining <= 0:
                    break
                try:
                    item, future = await asyncio.wait_for(
                        self.queue.get(),
                        timeout=remaining
                    )
                    batch.append(item)
                    futures.append(future)
                except asyncio.TimeoutError:
                    break

            # Process batch
            if batch:
                try:
                    batch_input = np.stack(batch)
                    results = await asyncio.get_event_loop().run_in_executor(
                        None, self.model_fn, batch_input
                    )
                    for future, result in zip(futures, results):
                        future.set_result(result)
                except Exception as e:
                    for future in futures:
                        future.set_exception(e)

    async def predict(self, input_data: np.ndarray) -> np.ndarray:
        future = asyncio.get_event_loop().create_future()
        await self.queue.put((input_data, future))
        return await future

⚡ Your Competitors Are Already Using AI — Are You?

We build AI systems that actually work in production — not demos that die in a Colab notebook. From data pipeline to deployed model to real business outcomes.

  • AI agent systems that run autonomously — not just chatbots
  • Integrates with your existing tools (CRM, ERP, Slack, etc.)
  • Explainable outputs — know why the model decided what it did
  • Free AI opportunity audit for your business

Model Versioning and A/B Testing

# model_registry.py
import redis
import json
from dataclasses import dataclass

@dataclass
class ModelVersion:
    name: str
    version: str
    weight: int  # Traffic weight (0-100)
    path: str

class ModelRegistry:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.models: dict[str, object] = {}

    async def get_model_for_request(self, user_id: str) -> str:
        """Route user to model version based on weights"""
        config = self.redis.get("model:fraud_detector:config")
        versions = json.loads(config)

        # Consistent hash — same user always goes to same version
        bucket = hash(user_id) % 100
        cumulative = 0
        for version in versions:
            cumulative += version["weight"]
            if bucket < cumulative:
                return version["name"]

        return versions[-1]["name"]  # Fallback

Monitoring Model Performance

# monitoring.py
import prometheus_client as prom

# Metrics
prediction_latency = prom.Histogram(
    'model_prediction_latency_seconds',
    'Model prediction latency',
    ['model_version', 'endpoint'],
    buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)

prediction_count = prom.Counter(
    'model_predictions_total',
    'Total predictions made',
    ['model_version', 'result_label']
)

model_confidence = prom.Histogram(
    'model_prediction_confidence',
    'Distribution of model confidence scores',
    ['model_version'],
    buckets=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
)

# Alert when p95 confidence drops below 0.7 (model drift signal)
# Alert when prediction latency p99 > 200ms

Deployment Cost Reference

InfrastructurevCPURAMGPUMonthly CostThroughput
ECS Fargate (CPU only)24GB$60–120100–500 req/s
EC2 c7g.xlarge (CPU)48GB$120–180500–2K req/s
EC2 g4dn.xlarge (GPU)416GBT4$380–5305K–20K req/s
EC2 g5.xlarge (GPU)416GBA10G$600–90010K–50K req/s
SageMaker ml.m5.xlarge416GB$200–300500–1K req/s

For most inference workloads under 1K req/s: ECS Fargate with ONNX Runtime is the best cost/ops balance. Above 1K req/s of deep learning models: GPU instances pay for themselves.


Working With Viprasol

We deploy ML models to production as part of AI/ML engineering engagements — model export, serving infrastructure, batching, monitoring, and CI/CD pipelines for model updates. Our work includes the full MLOps stack, not just the training side.

Talk to our AI/ML team about deploying your models.


See Also

Share this article:

About the Author

V

Viprasol Tech Team

Custom Software Development Specialists

The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.

MT4/MT5 EA DevelopmentAI Agent SystemsSaaS DevelopmentAlgorithmic Trading

Want to Implement AI in Your Business?

From chatbots to predictive models — harness the power of AI with a team that delivers.

Free consultation • No commitment • Response within 24 hours

Viprasol · AI Agent Systems

Ready to automate your business with AI agents?

We build custom multi-agent AI systems that handle sales, support, ops, and content — across Telegram, WhatsApp, Slack, and 20+ other platforms. We run our own business on these systems.