Back to Blog

MLOps: Building Production Machine Learning Pipelines That Don't Break

MLOps in 2026 — ML pipeline architecture, model versioning with MLflow, feature stores, model serving with FastAPI, drift detection, and the production patterns

Viprasol Tech Team
April 14, 2026
14 min read

MLOps: Building Production Machine Learning Pipelines That Don't Break

The gap between a working ML model in a Jupyter notebook and a reliable ML system in production is enormous. Most data science teams close the gap eventually — but many close it by accumulating fragile scripts, undocumented experiments, and models nobody knows how to retrain.

MLOps is the set of practices that make ML systems as reliable, maintainable, and reproducible as software systems. This guide covers the full production ML pipeline: data ingestion, feature engineering, training, evaluation, deployment, and monitoring.


The MLOps Stack

Data Sources → Feature Store → Training Pipeline
                                      ↓
                                Model Registry (MLflow)
                                      ↓
                               Model Serving (FastAPI/TorchServe)
                                      ↓
                              Monitoring (drift, performance)
                                      ↓
                              Retraining Trigger

Phase 1: Data Pipeline

# data/pipeline.py — reproducible data pipeline with DVC
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from pathlib import Path
import logging

logger = logging.getLogger(__name__)

def load_and_validate_data(source_path: str) -> pd.DataFrame:
    """Load raw data and validate schema."""
    df = pd.read_parquet(source_path)
    
    # Schema validation
    required_columns = ['user_id', 'feature_1', 'feature_2', 'label', 'event_date']
    missing = set(required_columns) - set(df.columns)
    if missing:
        raise ValueError(f"Missing required columns: {missing}")
    
    # Data quality checks
    null_pct = df.isnull().mean()
    high_null_cols = null_pct[null_pct > 0.2].index.tolist()
    if high_null_cols:
        logger.warning(f"High null rate in: {high_null_cols}")
    
    logger.info(f"Loaded {len(df)} rows, {df.isnull().sum().sum()} nulls")
    return df


def build_features(df: pd.DataFrame) -> pd.DataFrame:
    """Feature engineering — deterministic, testable functions."""
    df = df.copy()
    
    # Temporal features
    df['event_date'] = pd.to_datetime(df['event_date'])
    df['day_of_week'] = df['event_date'].dt.dayofweek
    df['hour'] = df['event_date'].dt.hour
    df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)
    
    # Derived features
    df['feature_ratio'] = df['feature_1'] / (df['feature_2'] + 1e-8)
    df['feature_log'] = np.log1p(df['feature_1'].clip(lower=0))
    
    # Encodings
    df['day_of_week_sin'] = np.sin(2 * np.pi * df['day_of_week'] / 7)
    df['day_of_week_cos'] = np.cos(2 * np.pi * df['day_of_week'] / 7)
    
    return df


def split_data(df: pd.DataFrame, test_size: float = 0.2, val_size: float = 0.1):
    """Time-based split — never random split for time-series data."""
    df = df.sort_values('event_date')
    
    n = len(df)
    train_end = int(n * (1 - test_size - val_size))
    val_end = int(n * (1 - test_size))
    
    train = df.iloc[:train_end]
    val = df.iloc[train_end:val_end]
    test = df.iloc[val_end:]
    
    logger.info(f"Split: train={len(train)}, val={len(val)}, test={len(test)}")
    return train, val, test

🤖 AI Is Not the Future — It Is Right Now

Businesses using AI automation cut manual work by 60–80%. We build production-ready AI systems — RAG pipelines, LLM integrations, custom ML models, and AI agent workflows.

  • LLM integration (OpenAI, Anthropic, Gemini, local models)
  • RAG systems that answer from your own data
  • AI agents that take real actions — not just chat
  • Custom ML models for prediction, classification, detection

Phase 2: Model Training with MLflow

MLflow tracks experiments, parameters, metrics, and artifacts — the foundation of reproducible ML.

# training/train.py
import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import roc_auc_score, precision_recall_curve
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
import optuna
import pandas as pd

FEATURE_COLS = ['feature_1', 'feature_2', 'feature_ratio', 'feature_log',
                'day_of_week_sin', 'day_of_week_cos', 'is_weekend']
TARGET_COL = 'label'

mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("churn-prediction-v2")


def objective(trial: optuna.Trial, X_train, y_train, X_val, y_val) -> float:
    """Optuna objective — hyperparameter tuning with MLflow logging."""
    params = {
        'n_estimators': trial.suggest_int('n_estimators', 100, 500),
        'max_depth': trial.suggest_int('max_depth', 3, 8),
        'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True),
        'subsample': trial.suggest_float('subsample', 0.6, 1.0),
        'min_samples_leaf': trial.suggest_int('min_samples_leaf', 5, 50),
    }
    
    model = Pipeline([
        ('scaler', StandardScaler()),
        ('clf', GradientBoostingClassifier(**params, random_state=42)),
    ])
    model.fit(X_train, y_train)
    
    y_prob = model.predict_proba(X_val)[:, 1]
    return roc_auc_score(y_val, y_prob)


def train_and_register(train: pd.DataFrame, val: pd.DataFrame, test: pd.DataFrame):
    X_train, y_train = train[FEATURE_COLS], train[TARGET_COL]
    X_val, y_val = val[FEATURE_COLS], val[TARGET_COL]
    X_test, y_test = test[FEATURE_COLS], test[TARGET_COL]
    
    with mlflow.start_run(run_name="gbm-optuna-tuned"):
        # Hyperparameter search
        study = optuna.create_study(direction='maximize')
        study.optimize(
            lambda trial: objective(trial, X_train, y_train, X_val, y_val),
            n_trials=50,
            timeout=3600,
        )
        
        best_params = study.best_params
        mlflow.log_params(best_params)
        
        # Train final model with best params
        final_model = Pipeline([
            ('scaler', StandardScaler()),
            ('clf', GradientBoostingClassifier(**best_params, random_state=42)),
        ])
        final_model.fit(
            pd.concat([X_train, X_val]),
            pd.concat([y_train, y_val]),
        )
        
        # Evaluate on hold-out test set
        y_test_prob = final_model.predict_proba(X_test)[:, 1]
        test_auc = roc_auc_score(y_test, y_test_prob)
        
        mlflow.log_metric("test_auc", test_auc)
        mlflow.log_metric("val_auc", study.best_value)
        
        # Log feature importance
        importances = dict(zip(
            FEATURE_COLS,
            final_model.named_steps['clf'].feature_importances_
        ))
        mlflow.log_dict(importances, "feature_importances.json")
        
        # Register model if performance threshold met
        if test_auc >= 0.78:  # Minimum AUC threshold
            mlflow.sklearn.log_model(
                final_model,
                "model",
                registered_model_name="churn-predictor",
                signature=mlflow.models.infer_signature(X_test, y_test_prob),
            )
            print(f"Model registered! Test AUC: {test_auc:.4f}")
        else:
            print(f"Model NOT registered — AUC {test_auc:.4f} below threshold 0.78")

Phase 3: Model Serving

# serving/api.py — FastAPI model serving
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, validator
import mlflow.sklearn
import numpy as np
import pandas as pd
import time
import logging

logger = logging.getLogger(__name__)

app = FastAPI(title="Churn Prediction API")

# Load model at startup (not on each request)
MODEL_URI = "models:/churn-predictor/Production"
model = None

@app.on_event("startup")
async def load_model():
    global model
    logger.info(f"Loading model from {MODEL_URI}")
    model = mlflow.sklearn.load_model(MODEL_URI)
    logger.info("Model loaded successfully")


class PredictionRequest(BaseModel):
    user_id: str
    feature_1: float
    feature_2: float
    feature_ratio: float
    feature_log: float
    day_of_week_sin: float
    day_of_week_cos: float
    is_weekend: int

    @validator('is_weekend')
    def is_weekend_binary(cls, v):
        if v not in (0, 1):
            raise ValueError('is_weekend must be 0 or 1')
        return v


class PredictionResponse(BaseModel):
    user_id: str
    churn_probability: float
    churn_predicted: bool
    latency_ms: float


@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    if model is None:
        raise HTTPException(status_code=503, detail="Model not loaded")
    
    start = time.perf_counter()
    
    features = pd.DataFrame([{
        col: getattr(request, col)
        for col in ['feature_1', 'feature_2', 'feature_ratio',
                    'feature_log', 'day_of_week_sin', 'day_of_week_cos', 'is_weekend']
    }])
    
    prob = float(model.predict_proba(features)[0, 1])
    latency_ms = (time.perf_counter() - start) * 1000
    
    # Log prediction for monitoring
    logger.info({
        "user_id": request.user_id,
        "churn_probability": prob,
        "latency_ms": latency_ms,
    })
    
    return PredictionResponse(
        user_id=request.user_id,
        churn_probability=round(prob, 4),
        churn_predicted=prob >= 0.5,
        latency_ms=round(latency_ms, 2),
    )


@app.get("/health")
async def health():
    return {"status": "ok", "model_loaded": model is not None}

⚡ Your Competitors Are Already Using AI — Are You?

We build AI systems that actually work in production — not demos that die in a Colab notebook. From data pipeline to deployed model to real business outcomes.

  • AI agent systems that run autonomously — not just chatbots
  • Integrates with your existing tools (CRM, ERP, Slack, etc.)
  • Explainable outputs — know why the model decided what it did
  • Free AI opportunity audit for your business

Phase 4: Model Monitoring and Drift Detection

# monitoring/drift_detector.py
import pandas as pd
import numpy as np
from scipy import stats
from dataclasses import dataclass
from typing import Optional

@dataclass
class DriftReport:
    feature: str
    psi: float           # Population Stability Index
    ks_statistic: float
    ks_pvalue: float
    drift_detected: bool
    severity: str        # 'none' | 'minor' | 'major'


def population_stability_index(expected: np.ndarray, actual: np.ndarray, bins: int = 10) -> float:
    """PSI measures distribution shift. PSI < 0.1 = stable, 0.1-0.2 = minor, >0.2 = major."""
    expected_pct, bin_edges = np.histogram(expected, bins=bins, density=True)
    actual_pct, _ = np.histogram(actual, bins=bin_edges, density=True)
    
    # Avoid log(0)
    expected_pct = np.where(expected_pct == 0, 1e-8, expected_pct)
    actual_pct = np.where(actual_pct == 0, 1e-8, actual_pct)
    
    psi = np.sum((actual_pct - expected_pct) * np.log(actual_pct / expected_pct))
    return float(psi)


def detect_feature_drift(
    reference: pd.DataFrame,
    production: pd.DataFrame,
    features: list[str],
) -> list[DriftReport]:
    reports = []
    
    for feature in features:
        ref_values = reference[feature].dropna().values
        prod_values = production[feature].dropna().values
        
        psi = population_stability_index(ref_values, prod_values)
        ks_stat, ks_pvalue = stats.ks_2samp(ref_values, prod_values)
        
        # Drift classification
        if psi > 0.2 or ks_pvalue < 0.01:
            severity = 'major'
        elif psi > 0.1 or ks_pvalue < 0.05:
            severity = 'minor'
        else:
            severity = 'none'
        
        reports.append(DriftReport(
            feature=feature,
            psi=round(psi, 4),
            ks_statistic=round(ks_stat, 4),
            ks_pvalue=round(ks_pvalue, 4),
            drift_detected=severity != 'none',
            severity=severity,
        ))
    
    return reports


# Run daily drift check
def run_daily_drift_check():
    reference_data = load_training_data()  # Training set baseline
    production_data = load_recent_predictions(days=7)  # Last 7 days
    
    reports = detect_feature_drift(
        reference_data,
        production_data,
        features=FEATURE_COLS,
    )
    
    major_drifts = [r for r in reports if r.severity == 'major']
    
    if major_drifts:
        alert_team(f"Major feature drift detected: {[r.feature for r in major_drifts]}")
        trigger_retraining_pipeline()
    
    # Log to monitoring dashboard
    for report in reports:
        metrics_client.gauge(f"model.drift.psi.{report.feature}", report.psi)

MLOps Cost Ranges

ScopeTimelineInvestment
ML pipeline setup (training + serving)4–8 weeks$20,000–$50,000
MLflow tracking + model registry1–2 weeks$5,000–$15,000
Feature store implementation4–8 weeks$20,000–$50,000
Drift monitoring + alerting2–4 weeks$10,000–$25,000
Full MLOps platform3–6 months$80,000–$200,000

Infrastructure: MLflow tracking server (~$50/month), model serving (ECS Fargate, ~$100–$500/month), feature store (Feast on Redis, ~$100–$300/month).


Working With Viprasol

We build production ML pipelines — from data ingestion through model training, deployment, and monitoring. We work with Python-based ML stacks (scikit-learn, XGBoost, PyTorch) and cloud-native deployment.

ML pipeline consultation →
AI & Machine Learning Services →
Machine Learning Development Services →


See Also


Share this article:

About the Author

V

Viprasol Tech Team

Custom Software Development Specialists

The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.

MT4/MT5 EA DevelopmentAI Agent SystemsSaaS DevelopmentAlgorithmic Trading

Want to Implement AI in Your Business?

From chatbots to predictive models — harness the power of AI with a team that delivers.

Free consultation • No commitment • Response within 24 hours

Viprasol · AI Agent Systems

Ready to automate your business with AI agents?

We build custom multi-agent AI systems that handle sales, support, ops, and content — across Telegram, WhatsApp, Slack, and 20+ other platforms. We run our own business on these systems.