MLOps in 2026: Model Registry, Drift Detection, and Production ML Pipelines
Build production MLOps pipelines with model registry, automated retraining, drift detection, and Prefect orchestration. Feature stores, A/B testing for models, and monitoring setup.
MLOps in 2026: Model Registry, Drift Detection, and Production ML Pipelines
Getting a model to 90% accuracy in a Jupyter notebook is the easy part. Keeping it at 90% accuracy six months later, with automated retraining when it degrades, clear ownership, and the ability to roll back when a new version underperforms โ that's MLOps.
In 2026, the ML engineering stack has stabilized around a set of open-source tools that handle the full lifecycle: Prefect for pipeline orchestration, MLflow for experiment tracking and model registry, and statistical drift detection to catch when production data diverges from training data. This post covers the full pipeline from data ingestion to production monitoring.
The MLOps Maturity Levels
| Level | What It Looks Like | Pain Points |
|---|---|---|
| 0 โ Manual | Scripts run by hand, model in a .pkl file on a server | No versioning, no monitoring, "works on my machine" |
| 1 โ Automated training | Scheduled retraining, basic experiment tracking | Still manual deployment, no drift detection |
| 2 โ CI/CD for ML | Automated train โ evaluate โ deploy pipeline | Model comparison, gating on metrics |
| 3 โ Full MLOps | Feature store, drift detection, shadow mode, automatic rollback | Requires significant investment |
Most teams should target Level 2. Level 3 is for organizations with >10 production models and dedicated ML engineering staff.
Stack Overview
Data Sources (S3, PostgreSQL, Kafka)
โ
Feature Pipeline (Prefect)
โ
Training Pipeline (Prefect + sklearn/XGBoost/PyTorch)
โ
MLflow Experiment Tracking
โ
Model Registry (MLflow)
โ
Model Serving (FastAPI + BentoML / Ray Serve)
โ
Monitoring (Evidently AI + Prometheus + Grafana)
โ
Drift Alert โ Retrain Trigger
๐ค AI Is Not the Future โ It Is Right Now
Businesses using AI automation cut manual work by 60โ80%. We build production-ready AI systems โ RAG pipelines, LLM integrations, custom ML models, and AI agent workflows.
- LLM integration (OpenAI, Anthropic, Gemini, local models)
- RAG systems that answer from your own data
- AI agents that take real actions โ not just chat
- Custom ML models for prediction, classification, detection
Feature Pipeline with Prefect
# pipelines/feature_pipeline.py
from prefect import flow, task
from prefect.blocks.system import Secret
from prefect_aws.s3 import S3Bucket
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import sqlalchemy
@task(retries=3, retry_delay_seconds=60)
def extract_raw_data(
start_date: datetime,
end_date: datetime,
) -> pd.DataFrame:
"""Pull raw order data from PostgreSQL."""
db_url = Secret.load("db-url").get()
engine = sqlalchemy.create_engine(db_url)
query = """
SELECT
o.id,
o.customer_id,
o.total,
o.created_at,
c.country,
c.signup_date,
COUNT(prev.id) AS prior_order_count,
COALESCE(SUM(prev.total), 0) AS prior_total_spent
FROM orders o
JOIN customers c ON c.id = o.customer_id
LEFT JOIN orders prev ON prev.customer_id = o.customer_id
AND prev.created_at < o.created_at
WHERE o.created_at BETWEEN :start AND :end
GROUP BY o.id, o.customer_id, o.total, o.created_at, c.country, c.signup_date
"""
with engine.connect() as conn:
return pd.read_sql(query, conn, params={"start": start_date, "end": end_date})
@task
def engineer_features(df: pd.DataFrame) -> pd.DataFrame:
"""Create model features from raw data."""
df = df.copy()
# Customer tenure in days at time of order
df["tenure_days"] = (
pd.to_datetime(df["created_at"]) - pd.to_datetime(df["signup_date"])
).dt.days.clip(lower=0)
# Order velocity (orders per 30-day period)
df["order_velocity"] = df["prior_order_count"] / (df["tenure_days"] / 30 + 1)
# Average order value
df["avg_order_value"] = np.where(
df["prior_order_count"] > 0,
df["prior_total_spent"] / df["prior_order_count"],
0,
)
# Country encoding (top 10, rest = "other")
top_countries = ["US", "GB", "CA", "AU", "DE", "FR", "IN", "JP", "BR", "MX"]
df["country_encoded"] = df["country"].where(
df["country"].isin(top_countries), other="OTHER"
)
# Binary: is high-value customer?
df["is_high_value"] = (df["prior_total_spent"] > 500).astype(int)
# Target: will this customer return within 30 days? (label for training)
# (Would be joined from a future-looking query in production)
return df[[
"id", "tenure_days", "order_velocity", "avg_order_value",
"is_high_value", "total", "country_encoded",
]]
@task
def save_features(df: pd.DataFrame, version: str) -> str:
"""Save feature set to S3 as Parquet."""
s3 = S3Bucket.load("ml-features")
path = f"features/churn/{version}/features.parquet"
s3.upload_from_dataframe(df, path, serialization_format="parquet")
return path
@flow(name="churn-feature-pipeline")
def feature_pipeline(
start_date: datetime | None = None,
end_date: datetime | None = None,
):
if end_date is None:
end_date = datetime.utcnow()
if start_date is None:
start_date = end_date - timedelta(days=90)
version = end_date.strftime("%Y%m%d_%H%M%S")
raw = extract_raw_data(start_date, end_date)
features = engineer_features(raw)
path = save_features(features, version)
return path
Training Pipeline with MLflow
# pipelines/training_pipeline.py
from prefect import flow, task
import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import roc_auc_score, precision_score, recall_score, f1_score
from sklearn.preprocessing import LabelEncoder
import pandas as pd
import numpy as np
MLFLOW_URI = "http://mlflow.internal:5000"
EXPERIMENT_NAME = "churn-prediction"
@task
def load_features(feature_path: str) -> pd.DataFrame:
from prefect_aws.s3 import S3Bucket
s3 = S3Bucket.load("ml-features")
return s3.read_path(feature_path, as_dataframe=True)
@task
def prepare_training_data(
df: pd.DataFrame,
) -> tuple[np.ndarray, np.ndarray, list[str]]:
"""Encode categoricals and split features/target."""
le = LabelEncoder()
df["country_encoded"] = le.fit_transform(df["country_encoded"])
feature_cols = [
"tenure_days", "order_velocity", "avg_order_value",
"is_high_value", "total", "country_encoded",
]
X = df[feature_cols].fillna(0).values
y = df["churned"].values # 1 = churned within 30 days
return X, y, feature_cols
@task
def train_and_evaluate(
X: np.ndarray,
y: np.ndarray,
feature_cols: list[str],
hyperparams: dict,
) -> str:
"""Train model and log everything to MLflow."""
mlflow.set_tracking_uri(MLFLOW_URI)
mlflow.set_experiment(EXPERIMENT_NAME)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
with mlflow.start_run() as run:
mlflow.log_params(hyperparams)
mlflow.log_param("feature_count", len(feature_cols))
mlflow.log_param("train_samples", len(X_train))
mlflow.log_param("test_samples", len(X_test))
mlflow.log_param("churn_rate", y.mean())
model = GradientBoostingClassifier(**hyperparams, random_state=42)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
y_proba = model.predict_proba(X_test)[:, 1]
metrics = {
"auc_roc": roc_auc_score(y_test, y_proba),
"precision": precision_score(y_test, y_pred),
"recall": recall_score(y_test, y_pred),
"f1": f1_score(y_test, y_pred),
}
# Cross-validation AUC
cv_scores = cross_val_score(model, X_train, y_train, cv=5, scoring="roc_auc")
metrics["cv_auc_mean"] = cv_scores.mean()
metrics["cv_auc_std"] = cv_scores.std()
mlflow.log_metrics(metrics)
# Log feature importances
for col, importance in zip(feature_cols, model.feature_importances_):
mlflow.log_metric(f"feature_importance_{col}", importance)
# Log model artifact
mlflow.sklearn.log_model(
model,
"model",
registered_model_name="churn-predictor",
signature=mlflow.models.infer_signature(X_train, y_pred),
)
print(f"Run {run.info.run_id}: AUC={metrics['auc_roc']:.4f}")
return run.info.run_id
@task
def promote_if_better(run_id: str, min_auc: float = 0.75) -> bool:
"""Promote model to Staging if it beats threshold and current production."""
mlflow.set_tracking_uri(MLFLOW_URI)
client = mlflow.tracking.MlflowClient()
run = client.get_run(run_id)
new_auc = run.data.metrics["auc_roc"]
# Get current production model AUC
try:
prod_versions = client.get_latest_versions("churn-predictor", stages=["Production"])
if prod_versions:
prod_run = client.get_run(prod_versions[0].run_id)
prod_auc = prod_run.data.metrics["auc_roc"]
if new_auc <= prod_auc + 0.005: # Must beat prod by >0.5%
print(f"New AUC {new_auc:.4f} does not beat prod {prod_auc:.4f}")
return False
except Exception:
pass # No production model yet
if new_auc < min_auc:
print(f"AUC {new_auc:.4f} below minimum {min_auc}")
return False
# Promote to Staging
model_version = client.get_latest_versions("churn-predictor", stages=["None"])[0]
client.transition_model_version_stage(
name="churn-predictor",
version=model_version.version,
stage="Staging",
archive_existing_versions=False,
)
print(f"Promoted version {model_version.version} to Staging (AUC={new_auc:.4f})")
return True
@flow(name="churn-training-pipeline")
def training_pipeline(feature_path: str):
df = load_features(feature_path)
X, y, feature_cols = prepare_training_data(df)
hyperparams = {
"n_estimators": 200,
"max_depth": 4,
"learning_rate": 0.05,
"subsample": 0.8,
"min_samples_leaf": 20,
}
run_id = train_and_evaluate(X, y, feature_cols, hyperparams)
promoted = promote_if_better(run_id)
return {"run_id": run_id, "promoted": promoted}
โก Your Competitors Are Already Using AI โ Are You?
We build AI systems that actually work in production โ not demos that die in a Colab notebook. From data pipeline to deployed model to real business outcomes.
- AI agent systems that run autonomously โ not just chatbots
- Integrates with your existing tools (CRM, ERP, Slack, etc.)
- Explainable outputs โ know why the model decided what it did
- Free AI opportunity audit for your business
Drift Detection with Evidently
# monitoring/drift_detector.py
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, ClassificationPreset
from evidently.metrics import DatasetDriftMetric, ColumnDriftMetric
import pandas as pd
import boto3
import json
from datetime import datetime
def detect_data_drift(
reference_df: pd.DataFrame,
production_df: pd.DataFrame,
feature_cols: list[str],
) -> dict:
"""
Compare production data distribution against training reference.
Returns drift report and per-feature drift metrics.
"""
report = Report(metrics=[
DatasetDriftMetric(),
*[ColumnDriftMetric(column_name=col) for col in feature_cols],
])
report.run(
reference_data=reference_df[feature_cols],
current_data=production_df[feature_cols],
)
result = report.as_dict()
metrics = result["metrics"]
dataset_drift = metrics[0]["result"]
feature_drifts = {
m["result"]["column_name"]: m["result"]["drift_detected"]
for m in metrics[1:]
}
return {
"dataset_drift_detected": dataset_drift["dataset_drift"],
"share_drifted_features": dataset_drift["share_drifted_columns"],
"drifted_features": [k for k, v in feature_drifts.items() if v],
"checked_at": datetime.utcnow().isoformat(),
}
# Prefect task: daily drift check
from prefect import task
@task
def run_daily_drift_check():
# Load reference (training data)
reference_df = pd.read_parquet("s3://ml-features/features/churn/reference/features.parquet")
# Load last 7 days of production predictions
production_df = load_recent_predictions(days=7)
feature_cols = ["tenure_days", "order_velocity", "avg_order_value", "is_high_value", "total"]
drift_result = detect_data_drift(reference_df, production_df, feature_cols)
# Push metrics to CloudWatch
cw = boto3.client("cloudwatch")
cw.put_metric_data(
Namespace="MLOps/ChurnPredictor",
MetricData=[
{
"MetricName": "DataDriftDetected",
"Value": 1.0 if drift_result["dataset_drift_detected"] else 0.0,
"Unit": "Count",
},
{
"MetricName": "DriftedFeatureShare",
"Value": drift_result["share_drifted_features"],
"Unit": "None",
},
],
)
# Trigger retraining if drift detected
if drift_result["dataset_drift_detected"]:
trigger_retraining(reason="data_drift", details=drift_result)
return drift_result
Model Serving with FastAPI
# serving/app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow.sklearn
import numpy as np
from functools import lru_cache
app = FastAPI(title="Churn Predictor API")
@lru_cache(maxsize=1)
def load_model():
"""Load production model from MLflow registry (cached)."""
mlflow.set_tracking_uri("http://mlflow.internal:5000")
model = mlflow.sklearn.load_model("models:/churn-predictor/Production")
return model
class PredictionRequest(BaseModel):
customer_id: str
tenure_days: float
order_velocity: float
avg_order_value: float
is_high_value: int
order_total: float
country_encoded: int
class PredictionResponse(BaseModel):
customer_id: str
churn_probability: float
churn_predicted: bool
model_version: str
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
model = load_model()
features = np.array([[
request.tenure_days,
request.order_velocity,
request.avg_order_value,
request.is_high_value,
request.order_total,
request.country_encoded,
]])
proba = model.predict_proba(features)[0][1]
# Log prediction for drift monitoring
await log_prediction(request, proba)
return PredictionResponse(
customer_id=request.customer_id,
churn_probability=round(float(proba), 4),
churn_predicted=proba > 0.5,
model_version=getattr(model, "_mlflow_model_version", "unknown"),
)
@app.get("/health")
async def health():
return {"status": "ok", "model_loaded": load_model() is not None}
Pipeline Schedule with Prefect
# deploy_flows.py
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from pipelines.feature_pipeline import feature_pipeline
from pipelines.training_pipeline import training_pipeline
# Feature pipeline: daily at 2am UTC
Deployment.build_from_flow(
flow=feature_pipeline,
name="daily-features",
schedule=CronSchedule(cron="0 2 * * *"),
work_pool_name="ml-workers",
infrastructure_overrides={"env": {"PREFECT_API_URL": "http://prefect.internal:4200/api"}},
).apply()
# Training: weekly Sunday at 3am UTC
Deployment.build_from_flow(
flow=training_pipeline,
name="weekly-training",
schedule=CronSchedule(cron="0 3 * * 0"),
work_pool_name="ml-workers",
parameters={"feature_path": "features/churn/latest/features.parquet"},
).apply()
MLOps Cost Estimates
| Component | Open Source | Managed | Monthly Cost |
|---|---|---|---|
| Experiment tracking | MLflow (self-hosted) | Weights & Biases | $0 vs $50โ$500 |
| Pipeline orchestration | Prefect OSS | Prefect Cloud | $0 vs $200โ$2,000 |
| Model serving | FastAPI + ECS | SageMaker Endpoints | $50โ$300 vs $200โ$1,000 |
| Feature store | PostgreSQL + S3 | Feast / Tecton | $50โ$200 vs $500โ$3,000 |
| Monitoring | Evidently + Prometheus | Arize / WhyLabs | $0 vs $300โ$2,000 |
| GPU training | Spot instances | SageMaker Training | $20โ$200 per run |
For startups: self-hosted MLflow + Prefect OSS + ECS serving = <$300/month for a production ML pipeline.
Working With Viprasol
Our AI/ML engineering team builds end-to-end MLOps pipelines โ from feature engineering through automated retraining, model registry, and production monitoring.
What we deliver:
- Prefect pipeline design for feature engineering and training
- MLflow experiment tracking and model registry setup
- Drift detection with Evidently + CloudWatch alerting
- FastAPI model serving with version management
- Automated retraining triggers based on drift or scheduled cadence
โ Discuss your ML infrastructure needs โ AI and machine learning services
See Also
About the Author
Viprasol Tech Team
Custom Software Development Specialists
The Viprasol Tech team specialises in algorithmic trading software, AI agent systems, and SaaS development. With 100+ projects delivered across MT4/MT5 EAs, fintech platforms, and production AI systems, the team brings deep technical experience to every engagement. Based in India, serving clients globally.
Want to Implement AI in Your Business?
From chatbots to predictive models โ harness the power of AI with a team that delivers.
Free consultation โข No commitment โข Response within 24 hours
Ready to automate your business with AI agents?
We build custom multi-agent AI systems that handle sales, support, ops, and content โ across Telegram, WhatsApp, Slack, and 20+ other platforms. We run our own business on these systems.