Back to Blog

MLOps in 2026: Model Registry, Drift Detection, and Production ML Pipelines

Build production MLOps pipelines with model registry, automated retraining, drift detection, and Prefect orchestration. Feature stores, A/B testing for models, and monitoring setup.

Viprasol Tech Team
July 23, 2026
13 min read

MLOps in 2026: Model Registry, Drift Detection, and Production ML Pipelines

Getting a model to 90% accuracy in a Jupyter notebook is the easy part. Keeping it at 90% accuracy six months later, with automated retraining when it degrades, clear ownership, and the ability to roll back when a new version underperforms โ€” that's MLOps.

In 2026, the ML engineering stack has stabilized around a set of open-source tools that handle the full lifecycle: Prefect for pipeline orchestration, MLflow for experiment tracking and model registry, and statistical drift detection to catch when production data diverges from training data. This post covers the full pipeline from data ingestion to production monitoring.


The MLOps Maturity Levels

LevelWhat It Looks LikePain Points
0 โ€” ManualScripts run by hand, model in a .pkl file on a serverNo versioning, no monitoring, "works on my machine"
1 โ€” Automated trainingScheduled retraining, basic experiment trackingStill manual deployment, no drift detection
2 โ€” CI/CD for MLAutomated train โ†’ evaluate โ†’ deploy pipelineModel comparison, gating on metrics
3 โ€” Full MLOpsFeature store, drift detection, shadow mode, automatic rollbackRequires significant investment

Most teams should target Level 2. Level 3 is for organizations with >10 production models and dedicated ML engineering staff.


Stack Overview

Data Sources (S3, PostgreSQL, Kafka)
    โ”‚
Feature Pipeline (Prefect)
    โ”‚
Training Pipeline (Prefect + sklearn/XGBoost/PyTorch)
    โ”‚
MLflow Experiment Tracking
    โ”‚
Model Registry (MLflow)
    โ”‚
Model Serving (FastAPI + BentoML / Ray Serve)
    โ”‚
Monitoring (Evidently AI + Prometheus + Grafana)
    โ”‚
Drift Alert โ†’ Retrain Trigger

๐Ÿค– AI Is Not the Future โ€” It Is Right Now

Businesses using AI automation cut manual work by 60โ€“80%. We build production-ready AI systems โ€” RAG pipelines, LLM integrations, custom ML models, and AI agent workflows.

  • LLM integration (OpenAI, Anthropic, Gemini, local models)
  • RAG systems that answer from your own data
  • AI agents that take real actions โ€” not just chat
  • Custom ML models for prediction, classification, detection

Feature Pipeline with Prefect

# pipelines/feature_pipeline.py
from prefect import flow, task
from prefect.blocks.system import Secret
from prefect_aws.s3 import S3Bucket
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import sqlalchemy

@task(retries=3, retry_delay_seconds=60)
def extract_raw_data(
    start_date: datetime,
    end_date: datetime,
) -> pd.DataFrame:
    """Pull raw order data from PostgreSQL."""
    db_url = Secret.load("db-url").get()
    engine = sqlalchemy.create_engine(db_url)

    query = """
        SELECT
            o.id,
            o.customer_id,
            o.total,
            o.created_at,
            c.country,
            c.signup_date,
            COUNT(prev.id) AS prior_order_count,
            COALESCE(SUM(prev.total), 0) AS prior_total_spent
        FROM orders o
        JOIN customers c ON c.id = o.customer_id
        LEFT JOIN orders prev ON prev.customer_id = o.customer_id
            AND prev.created_at < o.created_at
        WHERE o.created_at BETWEEN :start AND :end
        GROUP BY o.id, o.customer_id, o.total, o.created_at, c.country, c.signup_date
    """

    with engine.connect() as conn:
        return pd.read_sql(query, conn, params={"start": start_date, "end": end_date})

@task
def engineer_features(df: pd.DataFrame) -> pd.DataFrame:
    """Create model features from raw data."""
    df = df.copy()

    # Customer tenure in days at time of order
    df["tenure_days"] = (
        pd.to_datetime(df["created_at"]) - pd.to_datetime(df["signup_date"])
    ).dt.days.clip(lower=0)

    # Order velocity (orders per 30-day period)
    df["order_velocity"] = df["prior_order_count"] / (df["tenure_days"] / 30 + 1)

    # Average order value
    df["avg_order_value"] = np.where(
        df["prior_order_count"] > 0,
        df["prior_total_spent"] / df["prior_order_count"],
        0,
    )

    # Country encoding (top 10, rest = "other")
    top_countries = ["US", "GB", "CA", "AU", "DE", "FR", "IN", "JP", "BR", "MX"]
    df["country_encoded"] = df["country"].where(
        df["country"].isin(top_countries), other="OTHER"
    )

    # Binary: is high-value customer?
    df["is_high_value"] = (df["prior_total_spent"] > 500).astype(int)

    # Target: will this customer return within 30 days? (label for training)
    # (Would be joined from a future-looking query in production)

    return df[[
        "id", "tenure_days", "order_velocity", "avg_order_value",
        "is_high_value", "total", "country_encoded",
    ]]

@task
def save_features(df: pd.DataFrame, version: str) -> str:
    """Save feature set to S3 as Parquet."""
    s3 = S3Bucket.load("ml-features")
    path = f"features/churn/{version}/features.parquet"
    s3.upload_from_dataframe(df, path, serialization_format="parquet")
    return path

@flow(name="churn-feature-pipeline")
def feature_pipeline(
    start_date: datetime | None = None,
    end_date: datetime | None = None,
):
    if end_date is None:
        end_date = datetime.utcnow()
    if start_date is None:
        start_date = end_date - timedelta(days=90)

    version = end_date.strftime("%Y%m%d_%H%M%S")

    raw = extract_raw_data(start_date, end_date)
    features = engineer_features(raw)
    path = save_features(features, version)

    return path

Training Pipeline with MLflow

# pipelines/training_pipeline.py
from prefect import flow, task
import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import roc_auc_score, precision_score, recall_score, f1_score
from sklearn.preprocessing import LabelEncoder
import pandas as pd
import numpy as np

MLFLOW_URI = "http://mlflow.internal:5000"
EXPERIMENT_NAME = "churn-prediction"

@task
def load_features(feature_path: str) -> pd.DataFrame:
    from prefect_aws.s3 import S3Bucket
    s3 = S3Bucket.load("ml-features")
    return s3.read_path(feature_path, as_dataframe=True)

@task
def prepare_training_data(
    df: pd.DataFrame,
) -> tuple[np.ndarray, np.ndarray, list[str]]:
    """Encode categoricals and split features/target."""
    le = LabelEncoder()
    df["country_encoded"] = le.fit_transform(df["country_encoded"])

    feature_cols = [
        "tenure_days", "order_velocity", "avg_order_value",
        "is_high_value", "total", "country_encoded",
    ]
    X = df[feature_cols].fillna(0).values
    y = df["churned"].values  # 1 = churned within 30 days

    return X, y, feature_cols

@task
def train_and_evaluate(
    X: np.ndarray,
    y: np.ndarray,
    feature_cols: list[str],
    hyperparams: dict,
) -> str:
    """Train model and log everything to MLflow."""
    mlflow.set_tracking_uri(MLFLOW_URI)
    mlflow.set_experiment(EXPERIMENT_NAME)

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )

    with mlflow.start_run() as run:
        mlflow.log_params(hyperparams)
        mlflow.log_param("feature_count", len(feature_cols))
        mlflow.log_param("train_samples", len(X_train))
        mlflow.log_param("test_samples", len(X_test))
        mlflow.log_param("churn_rate", y.mean())

        model = GradientBoostingClassifier(**hyperparams, random_state=42)
        model.fit(X_train, y_train)

        # Evaluate
        y_pred = model.predict(X_test)
        y_proba = model.predict_proba(X_test)[:, 1]

        metrics = {
            "auc_roc":   roc_auc_score(y_test, y_proba),
            "precision": precision_score(y_test, y_pred),
            "recall":    recall_score(y_test, y_pred),
            "f1":        f1_score(y_test, y_pred),
        }

        # Cross-validation AUC
        cv_scores = cross_val_score(model, X_train, y_train, cv=5, scoring="roc_auc")
        metrics["cv_auc_mean"] = cv_scores.mean()
        metrics["cv_auc_std"] = cv_scores.std()

        mlflow.log_metrics(metrics)

        # Log feature importances
        for col, importance in zip(feature_cols, model.feature_importances_):
            mlflow.log_metric(f"feature_importance_{col}", importance)

        # Log model artifact
        mlflow.sklearn.log_model(
            model,
            "model",
            registered_model_name="churn-predictor",
            signature=mlflow.models.infer_signature(X_train, y_pred),
        )

        print(f"Run {run.info.run_id}: AUC={metrics['auc_roc']:.4f}")
        return run.info.run_id

@task
def promote_if_better(run_id: str, min_auc: float = 0.75) -> bool:
    """Promote model to Staging if it beats threshold and current production."""
    mlflow.set_tracking_uri(MLFLOW_URI)
    client = mlflow.tracking.MlflowClient()

    run = client.get_run(run_id)
    new_auc = run.data.metrics["auc_roc"]

    # Get current production model AUC
    try:
        prod_versions = client.get_latest_versions("churn-predictor", stages=["Production"])
        if prod_versions:
            prod_run = client.get_run(prod_versions[0].run_id)
            prod_auc = prod_run.data.metrics["auc_roc"]
            if new_auc <= prod_auc + 0.005:  # Must beat prod by >0.5%
                print(f"New AUC {new_auc:.4f} does not beat prod {prod_auc:.4f}")
                return False
    except Exception:
        pass  # No production model yet

    if new_auc < min_auc:
        print(f"AUC {new_auc:.4f} below minimum {min_auc}")
        return False

    # Promote to Staging
    model_version = client.get_latest_versions("churn-predictor", stages=["None"])[0]
    client.transition_model_version_stage(
        name="churn-predictor",
        version=model_version.version,
        stage="Staging",
        archive_existing_versions=False,
    )
    print(f"Promoted version {model_version.version} to Staging (AUC={new_auc:.4f})")
    return True

@flow(name="churn-training-pipeline")
def training_pipeline(feature_path: str):
    df = load_features(feature_path)
    X, y, feature_cols = prepare_training_data(df)

    hyperparams = {
        "n_estimators": 200,
        "max_depth": 4,
        "learning_rate": 0.05,
        "subsample": 0.8,
        "min_samples_leaf": 20,
    }

    run_id = train_and_evaluate(X, y, feature_cols, hyperparams)
    promoted = promote_if_better(run_id)

    return {"run_id": run_id, "promoted": promoted}

โšก Your Competitors Are Already Using AI โ€” Are You?

We build AI systems that actually work in production โ€” not demos that die in a Colab notebook. From data pipeline to deployed model to real business outcomes.

  • AI agent systems that run autonomously โ€” not just chatbots
  • Integrates with your existing tools (CRM, ERP, Slack, etc.)
  • Explainable outputs โ€” know why the model decided what it did
  • Free AI opportunity audit for your business

Drift Detection with Evidently

# monitoring/drift_detector.py
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, ClassificationPreset
from evidently.metrics import DatasetDriftMetric, ColumnDriftMetric
import pandas as pd
import boto3
import json
from datetime import datetime

def detect_data_drift(
    reference_df: pd.DataFrame,
    production_df: pd.DataFrame,
    feature_cols: list[str],
) -> dict:
    """
    Compare production data distribution against training reference.
    Returns drift report and per-feature drift metrics.
    """
    report = Report(metrics=[
        DatasetDriftMetric(),
        *[ColumnDriftMetric(column_name=col) for col in feature_cols],
    ])

    report.run(
        reference_data=reference_df[feature_cols],
        current_data=production_df[feature_cols],
    )

    result = report.as_dict()
    metrics = result["metrics"]

    dataset_drift = metrics[0]["result"]
    feature_drifts = {
        m["result"]["column_name"]: m["result"]["drift_detected"]
        for m in metrics[1:]
    }

    return {
        "dataset_drift_detected": dataset_drift["dataset_drift"],
        "share_drifted_features": dataset_drift["share_drifted_columns"],
        "drifted_features": [k for k, v in feature_drifts.items() if v],
        "checked_at": datetime.utcnow().isoformat(),
    }

# Prefect task: daily drift check
from prefect import task

@task
def run_daily_drift_check():
    # Load reference (training data)
    reference_df = pd.read_parquet("s3://ml-features/features/churn/reference/features.parquet")

    # Load last 7 days of production predictions
    production_df = load_recent_predictions(days=7)

    feature_cols = ["tenure_days", "order_velocity", "avg_order_value", "is_high_value", "total"]

    drift_result = detect_data_drift(reference_df, production_df, feature_cols)

    # Push metrics to CloudWatch
    cw = boto3.client("cloudwatch")
    cw.put_metric_data(
        Namespace="MLOps/ChurnPredictor",
        MetricData=[
            {
                "MetricName": "DataDriftDetected",
                "Value": 1.0 if drift_result["dataset_drift_detected"] else 0.0,
                "Unit": "Count",
            },
            {
                "MetricName": "DriftedFeatureShare",
                "Value": drift_result["share_drifted_features"],
                "Unit": "None",
            },
        ],
    )

    # Trigger retraining if drift detected
    if drift_result["dataset_drift_detected"]:
        trigger_retraining(reason="data_drift", details=drift_result)

    return drift_result

Model Serving with FastAPI

# serving/app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow.sklearn
import numpy as np
from functools import lru_cache

app = FastAPI(title="Churn Predictor API")

@lru_cache(maxsize=1)
def load_model():
    """Load production model from MLflow registry (cached)."""
    mlflow.set_tracking_uri("http://mlflow.internal:5000")
    model = mlflow.sklearn.load_model("models:/churn-predictor/Production")
    return model

class PredictionRequest(BaseModel):
    customer_id: str
    tenure_days: float
    order_velocity: float
    avg_order_value: float
    is_high_value: int
    order_total: float
    country_encoded: int

class PredictionResponse(BaseModel):
    customer_id: str
    churn_probability: float
    churn_predicted: bool
    model_version: str

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    model = load_model()

    features = np.array([[
        request.tenure_days,
        request.order_velocity,
        request.avg_order_value,
        request.is_high_value,
        request.order_total,
        request.country_encoded,
    ]])

    proba = model.predict_proba(features)[0][1]

    # Log prediction for drift monitoring
    await log_prediction(request, proba)

    return PredictionResponse(
        customer_id=request.customer_id,
        churn_probability=round(float(proba), 4),
        churn_predicted=proba > 0.5,
        model_version=getattr(model, "_mlflow_model_version", "unknown"),
    )

@app.get("/health")
async def health():
    return {"status": "ok", "model_loaded": load_model() is not None}

Pipeline Schedule with Prefect

# deploy_flows.py
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from pipelines.feature_pipeline import feature_pipeline
from pipelines.training_pipeline import training_pipeline

# Feature pipeline: daily at 2am UTC
Deployment.build_from_flow(
    flow=feature_pipeline,
    name="daily-features",
    schedule=CronSchedule(cron="0 2 * * *"),
    work_pool_name="ml-workers",
    infrastructure_overrides={"env": {"PREFECT_API_URL": "http://prefect.internal:4200/api"}},
).apply()

# Training: weekly Sunday at 3am UTC
Deployment.build_from_flow(
    flow=training_pipeline,
    name="weekly-training",
    schedule=CronSchedule(cron="0 3 * * 0"),
    work_pool_name="ml-workers",
    parameters={"feature_path": "features/churn/latest/features.parquet"},
).apply()

MLOps Cost Estimates

ComponentOpen SourceManagedMonthly Cost
Experiment trackingMLflow (self-hosted)Weights & Biases$0 vs $50โ€“$500
Pipeline orchestrationPrefect OSSPrefect Cloud$0 vs $200โ€“$2,000
Model servingFastAPI + ECSSageMaker Endpoints$50โ€“$300 vs $200โ€“$1,000
Feature storePostgreSQL + S3Feast / Tecton$50โ€“$200 vs $500โ€“$3,000
MonitoringEvidently + PrometheusArize / WhyLabs$0 vs $300โ€“$2,000
GPU trainingSpot instancesSageMaker Training$20โ€“$200 per run

For startups: self-hosted MLflow + Prefect OSS + ECS serving = <$300/month for a production ML pipeline.


Working With Viprasol

Our AI/ML engineering team builds end-to-end MLOps pipelines โ€” from feature engineering through automated retraining, model registry, and production monitoring.

What we deliver:

  • Prefect pipeline design for feature engineering and training
  • MLflow experiment tracking and model registry setup
  • Drift detection with Evidently + CloudWatch alerting
  • FastAPI model serving with version management
  • Automated retraining triggers based on drift or scheduled cadence

โ†’ Discuss your ML infrastructure needs โ†’ AI and machine learning services


See Also

Share this article:

About the Author

V

Viprasol Tech Team

Custom Software Development Specialists

The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.

MT4/MT5 EA DevelopmentAI Agent SystemsSaaS DevelopmentAlgorithmic Trading

Want to Implement AI in Your Business?

From chatbots to predictive models โ€” harness the power of AI with a team that delivers.

Free consultation โ€ข No commitment โ€ข Response within 24 hours

Viprasol ยท AI Agent Systems

Ready to automate your business with AI agents?

We build custom multi-agent AI systems that handle sales, support, ops, and content โ€” across Telegram, WhatsApp, Slack, and 20+ other platforms. We run our own business on these systems.