Machine Learning Model Deployment: ONNX, TorchServe, and FastAPI Serving
Deploy machine learning models in production — ONNX export, TorchServe configuration, FastAPI serving with Pydantic, batching, GPU inference, and monitoring mod
Machine Learning Model Deployment: ONNX, TorchServe, and FastAPI Serving
Training a model is 20% of the work. Getting it into production — reliably, with acceptable latency, at scale — is the other 80%. ML model deployment has more failure modes than most engineers expect: model drift, preprocessing inconsistencies between training and serving, GPU memory exhaustion under concurrent load, and cold starts that exceed SLA.
This guide covers three proven deployment patterns with working implementations.
The Serving Options
| Approach | Latency | Throughput | Ops Complexity | Best For |
|---|---|---|---|---|
| FastAPI + Python | Medium (10–100ms) | Medium | Low | Prototypes, small scale |
| ONNX Runtime | Low (1–30ms) | High | Low–Medium | CPU inference, cross-framework |
| TorchServe | Low–Medium | High | Medium | PyTorch production serving |
| Triton Inference Server | Very Low | Very High | High | GPU inference at scale |
| AWS SageMaker | Medium | High | Low (managed) | Fully managed, AWS-native |
| BentoML | Low–Medium | High | Low | Multi-model, multi-framework |
Pattern 1: FastAPI Model Serving
The simplest production setup — a FastAPI app that loads a model at startup and serves predictions.
# main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from typing import Optional
import numpy as np
import joblib
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="Fraud Detection API", version="1.0.0")
# Model loaded once at startup — not per request
model = None
scaler = None
@app.on_event("startup")
async def load_model():
global model, scaler
logger.info("Loading model...")
model = joblib.load("models/fraud_detector_v2.pkl")
scaler = joblib.load("models/feature_scaler_v2.pkl")
logger.info("Model loaded successfully")
class TransactionFeatures(BaseModel):
amount: float = Field(..., gt=0, description="Transaction amount in cents")
merchant_category: int = Field(..., ge=0, le=999)
hour_of_day: int = Field(..., ge=0, le=23)
day_of_week: int = Field(..., ge=0, le=6)
user_avg_transaction: float = Field(..., gt=0)
transactions_last_hour: int = Field(..., ge=0)
is_international: bool
card_present: bool
class PredictionResponse(BaseModel):
fraud_probability: float
is_fraud: bool
confidence: str # "high" | "medium" | "low"
model_version: str
latency_ms: float
@app.post("/predict", response_model=PredictionResponse)
async def predict_fraud(transaction: TransactionFeatures):
if model is None:
raise HTTPException(status_code=503, detail="Model not loaded")
start = time.time()
# Feature engineering — must match training pipeline exactly
features = np.array([[
transaction.amount,
transaction.merchant_category,
transaction.hour_of_day,
transaction.day_of_week,
transaction.user_avg_transaction,
transaction.transactions_last_hour,
int(transaction.is_international),
int(transaction.card_present),
transaction.amount / transaction.user_avg_transaction, # Derived feature
transaction.transactions_last_hour * transaction.amount, # Interaction
]])
# Scale features (same scaler as training)
features_scaled = scaler.transform(features)
# Predict
fraud_prob = float(model.predict_proba(features_scaled)[0][1])
is_fraud = fraud_prob > 0.7 # Threshold tuned for business requirements
latency_ms = (time.time() - start) * 1000
# Log for monitoring
logger.info({
"event": "prediction",
"fraud_prob": fraud_prob,
"is_fraud": is_fraud,
"amount": transaction.amount,
"latency_ms": latency_ms,
})
confidence = (
"high" if fraud_prob > 0.9 or fraud_prob < 0.1
else "medium" if fraud_prob > 0.7 or fraud_prob < 0.3
else "low"
)
return PredictionResponse(
fraud_probability=round(fraud_prob, 4),
is_fraud=is_fraud,
confidence=confidence,
model_version="fraud-detector-v2.1.0",
latency_ms=round(latency_ms, 2),
)
@app.get("/health")
async def health():
return {
"status": "healthy" if model is not None else "loading",
"model_loaded": model is not None,
}
Docker container:
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install dependencies first (cached unless requirements.txt changes)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy model artifacts
COPY models/ models/
# Copy application code
COPY main.py .
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s \
CMD curl -f http://localhost:8000/health || exit 1
EXPOSE 8000
# Use gunicorn with uvicorn workers for production
CMD ["gunicorn", "main:app", \
"--workers", "4", \
"--worker-class", "uvicorn.workers.UvicornWorker", \
"--bind", "0.0.0.0:8000", \
"--timeout", "30"]
🤖 AI Is Not the Future — It Is Right Now
Businesses using AI automation cut manual work by 60–80%. We build production-ready AI systems — RAG pipelines, LLM integrations, custom ML models, and AI agent workflows.
- LLM integration (OpenAI, Anthropic, Gemini, local models)
- RAG systems that answer from your own data
- AI agents that take real actions — not just chat
- Custom ML models for prediction, classification, detection
Pattern 2: ONNX Runtime for Low-Latency CPU Inference
ONNX (Open Neural Network Exchange) is a standard format for ML models. Export your PyTorch/scikit-learn/XGBoost model to ONNX, then serve with ONNX Runtime — which is 2–5× faster than Python inference for many model types.
Export PyTorch model to ONNX:
# export_onnx.py
import torch
import torch.onnx
from model import SentimentClassifier # Your model class
# Load trained model
model = SentimentClassifier(vocab_size=30000, embed_dim=128, hidden_dim=256)
model.load_state_dict(torch.load("checkpoints/sentiment_v3.pt"))
model.eval()
# Dummy input matching training shape
dummy_input = torch.zeros(1, 128, dtype=torch.long) # batch=1, seq_len=128
# Export
torch.onnx.export(
model,
dummy_input,
"models/sentiment_v3.onnx",
export_params=True,
opset_version=17,
do_constant_folding=True,
input_names=["input_ids"],
output_names=["logits"],
dynamic_axes={
"input_ids": {0: "batch_size", 1: "sequence_length"},
"logits": {0: "batch_size"},
},
)
print("Model exported to ONNX")
# Verify export
import onnx
onnx_model = onnx.load("models/sentiment_v3.onnx")
onnx.checker.check_model(onnx_model)
print("ONNX model verified")
ONNX Runtime serving:
# onnx_server.py
from fastapi import FastAPI
from pydantic import BaseModel
import onnxruntime as ort
import numpy as np
from transformers import AutoTokenizer
import asyncio
from concurrent.futures import ThreadPoolExecutor
app = FastAPI()
# ONNX Runtime session with optimization
session_options = ort.SessionOptions()
session_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
session_options.intra_op_num_threads = 4
# CPU provider with thread config
providers = [
('CPUExecutionProvider', {
'arena_extend_strategy': 'kNextPowerOfTwo',
})
]
session = ort.InferenceSession(
"models/sentiment_v3.onnx",
sess_options=session_options,
providers=providers,
)
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
executor = ThreadPoolExecutor(max_workers=8)
class TextInput(BaseModel):
text: str
def run_inference(input_ids: np.ndarray) -> np.ndarray:
"""CPU-bound inference — runs in thread pool"""
outputs = session.run(
["logits"],
{"input_ids": input_ids}
)
return outputs[0]
@app.post("/classify")
async def classify_sentiment(body: TextInput):
# Tokenize
encoded = tokenizer(
body.text,
max_length=128,
padding="max_length",
truncation=True,
return_tensors="np",
)
input_ids = encoded["input_ids"].astype(np.int64)
# Run inference in thread pool (non-blocking)
loop = asyncio.get_event_loop()
logits = await loop.run_in_executor(executor, run_inference, input_ids)
# Softmax
probs = np.exp(logits) / np.exp(logits).sum(axis=1, keepdims=True)
label_idx = int(np.argmax(probs[0]))
labels = ["negative", "neutral", "positive"]
return {
"label": labels[label_idx],
"confidence": round(float(probs[0][label_idx]), 4),
"probabilities": {
label: round(float(prob), 4)
for label, prob in zip(labels, probs[0])
},
}
Pattern 3: Batching for Throughput
Individual predictions are inefficient at high throughput. Batching collects multiple requests and processes them together.
# batch_server.py
import asyncio
from collections import defaultdict
import numpy as np
class BatchPredictor:
def __init__(self, model_fn, max_batch_size: int = 32, max_wait_ms: float = 10.0):
self.model_fn = model_fn
self.max_batch_size = max_batch_size
self.max_wait_ms = max_wait_ms
self.queue: asyncio.Queue = asyncio.Queue()
self.running = False
async def start(self):
self.running = True
asyncio.create_task(self._batch_processor())
async def _batch_processor(self):
while self.running:
batch = []
futures = []
# Wait for first item
try:
item, future = await asyncio.wait_for(
self.queue.get(),
timeout=0.1
)
batch.append(item)
futures.append(future)
except asyncio.TimeoutError:
continue
# Collect more items (up to max_batch_size) within max_wait_ms
deadline = asyncio.get_event_loop().time() + self.max_wait_ms / 1000
while len(batch) < self.max_batch_size:
remaining = deadline - asyncio.get_event_loop().time()
if remaining <= 0:
break
try:
item, future = await asyncio.wait_for(
self.queue.get(),
timeout=remaining
)
batch.append(item)
futures.append(future)
except asyncio.TimeoutError:
break
# Process batch
if batch:
try:
batch_input = np.stack(batch)
results = await asyncio.get_event_loop().run_in_executor(
None, self.model_fn, batch_input
)
for future, result in zip(futures, results):
future.set_result(result)
except Exception as e:
for future in futures:
future.set_exception(e)
async def predict(self, input_data: np.ndarray) -> np.ndarray:
future = asyncio.get_event_loop().create_future()
await self.queue.put((input_data, future))
return await future
⚡ Your Competitors Are Already Using AI — Are You?
We build AI systems that actually work in production — not demos that die in a Colab notebook. From data pipeline to deployed model to real business outcomes.
- AI agent systems that run autonomously — not just chatbots
- Integrates with your existing tools (CRM, ERP, Slack, etc.)
- Explainable outputs — know why the model decided what it did
- Free AI opportunity audit for your business
Model Versioning and A/B Testing
# model_registry.py
import redis
import json
from dataclasses import dataclass
@dataclass
class ModelVersion:
name: str
version: str
weight: int # Traffic weight (0-100)
path: str
class ModelRegistry:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.models: dict[str, object] = {}
async def get_model_for_request(self, user_id: str) -> str:
"""Route user to model version based on weights"""
config = self.redis.get("model:fraud_detector:config")
versions = json.loads(config)
# Consistent hash — same user always goes to same version
bucket = hash(user_id) % 100
cumulative = 0
for version in versions:
cumulative += version["weight"]
if bucket < cumulative:
return version["name"]
return versions[-1]["name"] # Fallback
Monitoring Model Performance
# monitoring.py
import prometheus_client as prom
# Metrics
prediction_latency = prom.Histogram(
'model_prediction_latency_seconds',
'Model prediction latency',
['model_version', 'endpoint'],
buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)
prediction_count = prom.Counter(
'model_predictions_total',
'Total predictions made',
['model_version', 'result_label']
)
model_confidence = prom.Histogram(
'model_prediction_confidence',
'Distribution of model confidence scores',
['model_version'],
buckets=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
)
# Alert when p95 confidence drops below 0.7 (model drift signal)
# Alert when prediction latency p99 > 200ms
Deployment Cost Reference
| Infrastructure | vCPU | RAM | GPU | Monthly Cost | Throughput |
|---|---|---|---|---|---|
| ECS Fargate (CPU only) | 2 | 4GB | — | $60–120 | 100–500 req/s |
| EC2 c7g.xlarge (CPU) | 4 | 8GB | — | $120–180 | 500–2K req/s |
| EC2 g4dn.xlarge (GPU) | 4 | 16GB | T4 | $380–530 | 5K–20K req/s |
| EC2 g5.xlarge (GPU) | 4 | 16GB | A10G | $600–900 | 10K–50K req/s |
| SageMaker ml.m5.xlarge | 4 | 16GB | — | $200–300 | 500–1K req/s |
For most inference workloads under 1K req/s: ECS Fargate with ONNX Runtime is the best cost/ops balance. Above 1K req/s of deep learning models: GPU instances pay for themselves.
Working With Viprasol
We deploy ML models to production as part of AI/ML engineering engagements — model export, serving infrastructure, batching, monitoring, and CI/CD pipelines for model updates. Our work includes the full MLOps stack, not just the training side.
→ Talk to our AI/ML team about deploying your models.
See Also
- MLOps and Machine Learning Pipelines — training and versioning pipelines
- Vector Database Guide — serving embeddings alongside predictions
- AI Prompt Engineering — LLM deployment patterns
- Docker Best Practices — containerizing ML services
- AI and Machine Learning Services — ML engineering and deployment
About the Author
Viprasol Tech Team
Custom Software Development Specialists
The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.
Want to Implement AI in Your Business?
From chatbots to predictive models — harness the power of AI with a team that delivers.
Free consultation • No commitment • Response within 24 hours
Ready to automate your business with AI agents?
We build custom multi-agent AI systems that handle sales, support, ops, and content — across Telegram, WhatsApp, Slack, and 20+ other platforms. We run our own business on these systems.