MLOps: Building Production Machine Learning Pipelines That Don't Break
MLOps in 2026 — ML pipeline architecture, model versioning with MLflow, feature stores, model serving with FastAPI, drift detection, and the production patterns
MLOps: Building Production Machine Learning Pipelines That Don't Break
The gap between a working ML model in a Jupyter notebook and a reliable ML system in production is enormous. Most data science teams close the gap eventually — but many close it by accumulating fragile scripts, undocumented experiments, and models nobody knows how to retrain.
MLOps is the set of practices that make ML systems as reliable, maintainable, and reproducible as software systems. This guide covers the full production ML pipeline: data ingestion, feature engineering, training, evaluation, deployment, and monitoring.
The MLOps Stack
Data Sources → Feature Store → Training Pipeline
↓
Model Registry (MLflow)
↓
Model Serving (FastAPI/TorchServe)
↓
Monitoring (drift, performance)
↓
Retraining Trigger
Phase 1: Data Pipeline
# data/pipeline.py — reproducible data pipeline with DVC
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
def load_and_validate_data(source_path: str) -> pd.DataFrame:
"""Load raw data and validate schema."""
df = pd.read_parquet(source_path)
# Schema validation
required_columns = ['user_id', 'feature_1', 'feature_2', 'label', 'event_date']
missing = set(required_columns) - set(df.columns)
if missing:
raise ValueError(f"Missing required columns: {missing}")
# Data quality checks
null_pct = df.isnull().mean()
high_null_cols = null_pct[null_pct > 0.2].index.tolist()
if high_null_cols:
logger.warning(f"High null rate in: {high_null_cols}")
logger.info(f"Loaded {len(df)} rows, {df.isnull().sum().sum()} nulls")
return df
def build_features(df: pd.DataFrame) -> pd.DataFrame:
"""Feature engineering — deterministic, testable functions."""
df = df.copy()
# Temporal features
df['event_date'] = pd.to_datetime(df['event_date'])
df['day_of_week'] = df['event_date'].dt.dayofweek
df['hour'] = df['event_date'].dt.hour
df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)
# Derived features
df['feature_ratio'] = df['feature_1'] / (df['feature_2'] + 1e-8)
df['feature_log'] = np.log1p(df['feature_1'].clip(lower=0))
# Encodings
df['day_of_week_sin'] = np.sin(2 * np.pi * df['day_of_week'] / 7)
df['day_of_week_cos'] = np.cos(2 * np.pi * df['day_of_week'] / 7)
return df
def split_data(df: pd.DataFrame, test_size: float = 0.2, val_size: float = 0.1):
"""Time-based split — never random split for time-series data."""
df = df.sort_values('event_date')
n = len(df)
train_end = int(n * (1 - test_size - val_size))
val_end = int(n * (1 - test_size))
train = df.iloc[:train_end]
val = df.iloc[train_end:val_end]
test = df.iloc[val_end:]
logger.info(f"Split: train={len(train)}, val={len(val)}, test={len(test)}")
return train, val, test
🤖 AI Is Not the Future — It Is Right Now
Businesses using AI automation cut manual work by 60–80%. We build production-ready AI systems — RAG pipelines, LLM integrations, custom ML models, and AI agent workflows.
- LLM integration (OpenAI, Anthropic, Gemini, local models)
- RAG systems that answer from your own data
- AI agents that take real actions — not just chat
- Custom ML models for prediction, classification, detection
Phase 2: Model Training with MLflow
MLflow tracks experiments, parameters, metrics, and artifacts — the foundation of reproducible ML.
# training/train.py
import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import roc_auc_score, precision_recall_curve
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
import optuna
import pandas as pd
FEATURE_COLS = ['feature_1', 'feature_2', 'feature_ratio', 'feature_log',
'day_of_week_sin', 'day_of_week_cos', 'is_weekend']
TARGET_COL = 'label'
mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("churn-prediction-v2")
def objective(trial: optuna.Trial, X_train, y_train, X_val, y_val) -> float:
"""Optuna objective — hyperparameter tuning with MLflow logging."""
params = {
'n_estimators': trial.suggest_int('n_estimators', 100, 500),
'max_depth': trial.suggest_int('max_depth', 3, 8),
'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True),
'subsample': trial.suggest_float('subsample', 0.6, 1.0),
'min_samples_leaf': trial.suggest_int('min_samples_leaf', 5, 50),
}
model = Pipeline([
('scaler', StandardScaler()),
('clf', GradientBoostingClassifier(**params, random_state=42)),
])
model.fit(X_train, y_train)
y_prob = model.predict_proba(X_val)[:, 1]
return roc_auc_score(y_val, y_prob)
def train_and_register(train: pd.DataFrame, val: pd.DataFrame, test: pd.DataFrame):
X_train, y_train = train[FEATURE_COLS], train[TARGET_COL]
X_val, y_val = val[FEATURE_COLS], val[TARGET_COL]
X_test, y_test = test[FEATURE_COLS], test[TARGET_COL]
with mlflow.start_run(run_name="gbm-optuna-tuned"):
# Hyperparameter search
study = optuna.create_study(direction='maximize')
study.optimize(
lambda trial: objective(trial, X_train, y_train, X_val, y_val),
n_trials=50,
timeout=3600,
)
best_params = study.best_params
mlflow.log_params(best_params)
# Train final model with best params
final_model = Pipeline([
('scaler', StandardScaler()),
('clf', GradientBoostingClassifier(**best_params, random_state=42)),
])
final_model.fit(
pd.concat([X_train, X_val]),
pd.concat([y_train, y_val]),
)
# Evaluate on hold-out test set
y_test_prob = final_model.predict_proba(X_test)[:, 1]
test_auc = roc_auc_score(y_test, y_test_prob)
mlflow.log_metric("test_auc", test_auc)
mlflow.log_metric("val_auc", study.best_value)
# Log feature importance
importances = dict(zip(
FEATURE_COLS,
final_model.named_steps['clf'].feature_importances_
))
mlflow.log_dict(importances, "feature_importances.json")
# Register model if performance threshold met
if test_auc >= 0.78: # Minimum AUC threshold
mlflow.sklearn.log_model(
final_model,
"model",
registered_model_name="churn-predictor",
signature=mlflow.models.infer_signature(X_test, y_test_prob),
)
print(f"Model registered! Test AUC: {test_auc:.4f}")
else:
print(f"Model NOT registered — AUC {test_auc:.4f} below threshold 0.78")
Phase 3: Model Serving
# serving/api.py — FastAPI model serving
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, validator
import mlflow.sklearn
import numpy as np
import pandas as pd
import time
import logging
logger = logging.getLogger(__name__)
app = FastAPI(title="Churn Prediction API")
# Load model at startup (not on each request)
MODEL_URI = "models:/churn-predictor/Production"
model = None
@app.on_event("startup")
async def load_model():
global model
logger.info(f"Loading model from {MODEL_URI}")
model = mlflow.sklearn.load_model(MODEL_URI)
logger.info("Model loaded successfully")
class PredictionRequest(BaseModel):
user_id: str
feature_1: float
feature_2: float
feature_ratio: float
feature_log: float
day_of_week_sin: float
day_of_week_cos: float
is_weekend: int
@validator('is_weekend')
def is_weekend_binary(cls, v):
if v not in (0, 1):
raise ValueError('is_weekend must be 0 or 1')
return v
class PredictionResponse(BaseModel):
user_id: str
churn_probability: float
churn_predicted: bool
latency_ms: float
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
if model is None:
raise HTTPException(status_code=503, detail="Model not loaded")
start = time.perf_counter()
features = pd.DataFrame([{
col: getattr(request, col)
for col in ['feature_1', 'feature_2', 'feature_ratio',
'feature_log', 'day_of_week_sin', 'day_of_week_cos', 'is_weekend']
}])
prob = float(model.predict_proba(features)[0, 1])
latency_ms = (time.perf_counter() - start) * 1000
# Log prediction for monitoring
logger.info({
"user_id": request.user_id,
"churn_probability": prob,
"latency_ms": latency_ms,
})
return PredictionResponse(
user_id=request.user_id,
churn_probability=round(prob, 4),
churn_predicted=prob >= 0.5,
latency_ms=round(latency_ms, 2),
)
@app.get("/health")
async def health():
return {"status": "ok", "model_loaded": model is not None}
⚡ Your Competitors Are Already Using AI — Are You?
We build AI systems that actually work in production — not demos that die in a Colab notebook. From data pipeline to deployed model to real business outcomes.
- AI agent systems that run autonomously — not just chatbots
- Integrates with your existing tools (CRM, ERP, Slack, etc.)
- Explainable outputs — know why the model decided what it did
- Free AI opportunity audit for your business
Phase 4: Model Monitoring and Drift Detection
# monitoring/drift_detector.py
import pandas as pd
import numpy as np
from scipy import stats
from dataclasses import dataclass
from typing import Optional
@dataclass
class DriftReport:
feature: str
psi: float # Population Stability Index
ks_statistic: float
ks_pvalue: float
drift_detected: bool
severity: str # 'none' | 'minor' | 'major'
def population_stability_index(expected: np.ndarray, actual: np.ndarray, bins: int = 10) -> float:
"""PSI measures distribution shift. PSI < 0.1 = stable, 0.1-0.2 = minor, >0.2 = major."""
expected_pct, bin_edges = np.histogram(expected, bins=bins, density=True)
actual_pct, _ = np.histogram(actual, bins=bin_edges, density=True)
# Avoid log(0)
expected_pct = np.where(expected_pct == 0, 1e-8, expected_pct)
actual_pct = np.where(actual_pct == 0, 1e-8, actual_pct)
psi = np.sum((actual_pct - expected_pct) * np.log(actual_pct / expected_pct))
return float(psi)
def detect_feature_drift(
reference: pd.DataFrame,
production: pd.DataFrame,
features: list[str],
) -> list[DriftReport]:
reports = []
for feature in features:
ref_values = reference[feature].dropna().values
prod_values = production[feature].dropna().values
psi = population_stability_index(ref_values, prod_values)
ks_stat, ks_pvalue = stats.ks_2samp(ref_values, prod_values)
# Drift classification
if psi > 0.2 or ks_pvalue < 0.01:
severity = 'major'
elif psi > 0.1 or ks_pvalue < 0.05:
severity = 'minor'
else:
severity = 'none'
reports.append(DriftReport(
feature=feature,
psi=round(psi, 4),
ks_statistic=round(ks_stat, 4),
ks_pvalue=round(ks_pvalue, 4),
drift_detected=severity != 'none',
severity=severity,
))
return reports
# Run daily drift check
def run_daily_drift_check():
reference_data = load_training_data() # Training set baseline
production_data = load_recent_predictions(days=7) # Last 7 days
reports = detect_feature_drift(
reference_data,
production_data,
features=FEATURE_COLS,
)
major_drifts = [r for r in reports if r.severity == 'major']
if major_drifts:
alert_team(f"Major feature drift detected: {[r.feature for r in major_drifts]}")
trigger_retraining_pipeline()
# Log to monitoring dashboard
for report in reports:
metrics_client.gauge(f"model.drift.psi.{report.feature}", report.psi)
MLOps Cost Ranges
| Scope | Timeline | Investment |
|---|---|---|
| ML pipeline setup (training + serving) | 4–8 weeks | $20,000–$50,000 |
| MLflow tracking + model registry | 1–2 weeks | $5,000–$15,000 |
| Feature store implementation | 4–8 weeks | $20,000–$50,000 |
| Drift monitoring + alerting | 2–4 weeks | $10,000–$25,000 |
| Full MLOps platform | 3–6 months | $80,000–$200,000 |
Infrastructure: MLflow tracking server (~$50/month), model serving (ECS Fargate, ~$100–$500/month), feature store (Feast on Redis, ~$100–$300/month).
Working With Viprasol
We build production ML pipelines — from data ingestion through model training, deployment, and monitoring. We work with Python-based ML stacks (scikit-learn, XGBoost, PyTorch) and cloud-native deployment.
→ ML pipeline consultation →
→ AI & Machine Learning Services →
→ Machine Learning Development Services →
See Also
- Machine Learning Development Services
- AI Integration Services
- Data Analytics Consulting
- Python Development Company
About the Author
Viprasol Tech Team
Custom Software Development Specialists
The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.
Want to Implement AI in Your Business?
From chatbots to predictive models — harness the power of AI with a team that delivers.
Free consultation • No commitment • Response within 24 hours
Ready to automate your business with AI agents?
We build custom multi-agent AI systems that handle sales, support, ops, and content — across Telegram, WhatsApp, Slack, and 20+ other platforms. We run our own business on these systems.