19  Embedding Pipeline Engineering

NoteChapter Overview

Moving from custom embedding development to production deployment requires robust engineering practices. This chapter explores the operational infrastructure needed to deploy, monitor, and maintain embedding systems at trillion-row scale. We’ll cover MLOps practices specific to embeddings, the trade-offs between real-time and batch processing, versioning strategies that enable safe rollouts and rollbacks, A/B testing methodologies for embedding models, and comprehensive monitoring approaches to detect drift and degradation. These practices ensure embedding systems remain reliable, performant, and maintainable as they scale from prototype to production.

The journey from a successful embedding model to a production-ready system involves significant engineering challenges. Unlike traditional ML models that produce discrete predictions, embedding systems integrate into search pipelines, recommendation engines, and real-time decision systems where latency, freshness, and consistency are critical. This chapter provides the operational toolkit for building embedding pipelines that scale to hundreds of millions of queries per day across trillion-row datasets.

19.1 MLOps for Embedding Production

Embedding systems have unique MLOps requirements that distinguish them from traditional ML deployments. While a classification model serves predictions on demand, an embedding system must continuously generate and update vectors for massive datasets, maintain multiple indices for fast retrieval, serve both embedding generation and similarity search, and coordinate versioning across embedding models and vector indices.

19.1.1 The Embedding Production Stack

A production embedding system comprises multiple interconnected components:

Show Model Registry Implementation
import json
from datetime import datetime
from pathlib import Path
import torch


class EmbeddingModelRegistry:
    """Registry for versioned embedding models with metadata tracking."""

    def __init__(self, registry_path="./models"):
        self.registry_path = Path(registry_path)
        self.registry_path.mkdir(exist_ok=True)
        self.models = {}

    def register_model(self, model_id, model, metadata):
        """Register a new embedding model version."""
        model_path = self.registry_path / f"{model_id}.pt"
        metadata_path = self.registry_path / f"{model_id}.json"

        torch.save(model.state_dict(), model_path)

        metadata["registered_at"] = datetime.now().isoformat()
        metadata["model_path"] = str(model_path)
        with open(metadata_path, 'w') as f:
            json.dump(metadata, f, indent=2)

        self.models[model_id] = metadata
        print(f"Registered model: {model_id}")

    def load_model(self, model_id, device="cpu"):
        """Load model from registry."""
        model_path = self.registry_path / f"{model_id}.pt"
        metadata_path = self.registry_path / f"{model_id}.json"

        with open(metadata_path, 'r') as f:
            metadata = json.load(f)

        return torch.load(model_path, map_location=device), metadata


# Usage example
registry = EmbeddingModelRegistry()
model = torch.nn.Embedding(1000, 128)
metadata = {"version": "1.0.0", "embedding_dim": 128, "description": "Production model"}
registry.register_model("product-embeddings-v1.0.0", model, metadata)
print("Model registry initialized")
Registered model: product-embeddings-v1.0.0
Model registry initialized
TipModel Registry Best Practices
  1. Semantic versioning: Use MAJOR.MINOR.PATCH for model versions
  2. Immutable models: Never modify registered models; create new versions
  3. Metadata completeness: Track training data, hyperparameters, and performance metrics
  4. Rollback plan: Always maintain reference to previous production model
  5. Audit trail: Log all deployments, rollbacks, and configuration changes

19.2 Real-Time vs. Batch Embedding Generation

One of the most critical architectural decisions for embedding systems is when and how to generate embeddings. Batch processing offers throughput and cost efficiency, while real-time generation provides freshness and personalization. Most production systems use a hybrid approach, optimizing for different use cases within the same platform.

19.2.1 The Batch vs. Real-Time Trade-off

Batch Processing generates embeddings offline in large batches:

  • Advantages: High throughput (10-100x faster), cost-efficient (cheaper compute), optimized resource utilization, quality control before serving
  • Disadvantages: Staleness (hours to days old), no personalization, large storage requirements, delayed updates
  • Best for: Product catalogs, document collections, static content, historical data

Real-Time Processing generates embeddings on-demand:

  • Advantages: Fresh embeddings (milliseconds old), personalized to context, storage efficient (compute on-demand), immediate updates
  • Disadvantages: High latency (10-100ms), expensive (online GPU inference), variable load patterns, harder to monitor quality
  • Best for: User queries, personalized feeds, dynamic content, real-time sessions
Show Hybrid Embedding System
import torch
from datetime import datetime, timedelta


class HybridEmbeddingSystem:
    """Hybrid system combining batch and real-time embedding generation."""

    def __init__(self, model, cache_size=100000):
        self.model = model
        self.model.eval()
        self.batch_embeddings = {}
        self.batch_timestamps = {}
        self.cache = {}
        self.cache_hits = 0
        self.cache_misses = 0

    def get_embedding(self, entity_id, entity_type, features=None, max_staleness=None):
        """Get embedding using appropriate strategy (batch or real-time)."""
        if entity_type in ["query", "session"]:
            return self._generate_realtime(entity_id, features)
        elif entity_type in ["product", "document"]:
            batch_emb = self._lookup_batch(entity_id, max_staleness)
            return batch_emb if batch_emb is not None else self._generate_realtime(entity_id, features)
        else:
            raise ValueError(f"Unknown entity type: {entity_type}")

    def _lookup_batch(self, entity_id, max_staleness):
        """Lookup pre-computed batch embedding."""
        if entity_id not in self.batch_embeddings:
            return None
        if max_staleness and (datetime.now() - self.batch_timestamps[entity_id]) > max_staleness:
            return None
        return self.batch_embeddings[entity_id]

    def _generate_realtime(self, entity_id, features):
        """Generate embedding in real-time with caching."""
        if entity_id in self.cache:
            self.cache_hits += 1
            return self.cache[entity_id]

        self.cache_misses += 1
        with torch.no_grad():
            embedding = self.model(features).cpu().numpy()
        self.cache[entity_id] = embedding
        return embedding

    def batch_update(self, entity_ids, embeddings):
        """Update batch embeddings from offline processing."""
        timestamp = datetime.now()
        for eid, emb in zip(entity_ids, embeddings):
            self.batch_embeddings[eid] = emb
            self.batch_timestamps[eid] = timestamp


# Usage example
model = torch.nn.Sequential(torch.nn.Linear(100, 128))
hybrid = HybridEmbeddingSystem(model)
emb = hybrid.get_embedding("product_123", "product", features=torch.randn(1, 100), max_staleness=timedelta(days=1))
print(f"Cache hit rate: {hybrid.cache_hits / (hybrid.cache_hits + hybrid.cache_misses) if hybrid.cache_misses > 0 else 0:.2%}")
Cache hit rate: 0.00%
TipChoosing the Right Strategy

Use batch processing when:

  • Entity changes are infrequent (daily/weekly updates)
  • Dataset is large but manageable (millions to billions)
  • Latency requirements are relaxed (seconds acceptable)
  • Cost optimization is critical

Use real-time generation when:

  • Freshness is critical (sub-second requirements)
  • Entities are transient (search queries, sessions)
  • Personalization is required (user-specific embeddings)
  • Dataset is small (thousands to millions)

Use hybrid approach when:

  • Mixed entity types with different requirements
  • Need both cost efficiency and freshness
  • Serving 100M+ requests/day across diverse use cases
WarningCold Start Problem

Real-time generation can fail during cold starts (model not loaded, GPU unavailable). Always maintain: 1. Warm standby: Pre-warmed models ready to serve 2. Fallback to batch: Serve slightly stale batch embeddings if real-time fails 3. Graceful degradation: Return approximate results rather than errors

19.3 Embedding Versioning and Rollback Strategies

Embeddings generated by different model versions are incompatible—you cannot mix vectors from v1.0 and v2.0 in the same similarity search. This creates unique versioning challenges that require careful coordination across the entire embedding pipeline.

19.3.1 The Versioning Challenge

When you deploy a new embedding model: 1. All existing embeddings become incompatible with new queries 2. Must re-generate embeddings for entire corpus (billions of vectors) 3. Must coordinate index updates with model deployment 4. Must support rollback if new model underperforms

The core challenge: How do you deploy a new embedding model without downtime or inconsistency?

Show Deployment Strategy Implementation
from enum import Enum
import torch


class DeploymentStrategy(Enum):
    """Deployment strategies for new embedding models."""
    BLUE_GREEN = "blue_green"
    INCREMENTAL = "incremental"
    SHADOW = "shadow"
    CANARY = "canary"


class EmbeddingVersionCoordinator:
    """Coordinate embedding model versions across pipeline stages."""

    def __init__(self, model_registry):
        self.model_registry = model_registry
        self.active_versions = {}
        self.version_to_index = {}
        self.traffic_routing = {}

    def deploy_new_version(self, new_model_id, strategy, corpus_iterator=None):
        """Deploy new embedding model version using specified strategy."""
        print(f"Deploying {new_model_id} using {strategy.value} strategy...")

        if strategy == DeploymentStrategy.BLUE_GREEN:
            self._deploy_blue_green(new_model_id, corpus_iterator)
        elif strategy == DeploymentStrategy.CANARY:
            self._deploy_canary(new_model_id)

    def _deploy_blue_green(self, new_model_id, corpus_iterator):
        """Blue-green deployment: build complete new index, then switch."""
        print("Building GREEN index (new version)...")
        green_index = f"embeddings_{new_model_id.replace('.', '_')}"
        # Re-embed entire corpus into GREEN...
        print("Switching traffic from BLUE → GREEN...")
        self.version_to_index[new_model_id] = green_index
        self.traffic_routing[new_model_id] = 1.0

    def _deploy_canary(self, new_model_id):
        """Canary deployment: route small % of traffic to new model."""
        self.traffic_routing[new_model_id] = 0.01  # 1% traffic
        print(f"Canary deployment: {new_model_id} receiving 1% traffic")

    def rollback(self, target_model_id):
        """Rollback to previous model version."""
        print(f"Rolling back to {target_model_id}...")
        self.traffic_routing = {target_model_id: 1.0}


# Usage example
registry = EmbeddingModelRegistry()
coordinator = EmbeddingVersionCoordinator(registry)
coordinator.deploy_new_version("v2.0.0", DeploymentStrategy.CANARY)
print("Version coordinator manages safe deployments")
Deploying v2.0.0 using canary strategy...
Canary deployment: v2.0.0 receiving 1% traffic
Version coordinator manages safe deployments
TipVersion Pinning for Reproducibility

For debugging and compliance, support version pinning in queries:

# Allow clients to specify model version explicitly
query_embedding = embedding_service.get_embedding(
    query="...",
    model_version="v1.2.3"  # Pin to specific version
)

This enables:

  • Reproducing historical results for debugging
  • A/B testing different model versions
  • Gradual migration for sensitive applications

19.4 A/B Testing Embedding Models

Embedding quality is difficult to evaluate offline. A/B testing measures real-world impact on business metrics: click-through rate, conversion rate, user satisfaction, revenue. This section covers experimental design for embedding systems at scale.

19.4.1 Unique Challenges of Embedding A/B Tests

Unlike testing UI changes or ranking algorithms, embedding A/B tests require: 1. Consistency: Same user must see results from same model version throughout session 2. Index versioning: Maintain separate indices for treatment and control 3. Longer ramp-up: New embeddings need time to “stabilize” in caches 4. Interaction effects: Embeddings affect multiple surfaces (search, recommendations, related items)

Show A/B Testing Framework
import hashlib
from datetime import datetime
import numpy as np


class EmbeddingExperimentFramework:
    """Framework for A/B testing embedding models."""

    def __init__(self):
        self.active_experiments = {}
        self.user_assignments = {}
        self.metrics = {}

    def create_experiment(self, experiment_id, control_model, treatment_model, traffic_allocation=0.05):
        """Create new A/B test experiment."""
        self.active_experiments[experiment_id] = {
            "control": control_model,
            "treatment": treatment_model,
            "allocation": traffic_allocation,
            "start_time": datetime.now()
        }
        self.metrics[experiment_id] = []
        print(f"Created experiment: {experiment_id} with {traffic_allocation:.1%} treatment traffic")

    def assign_user(self, user_id, experiment_id):
        """Assign user to control or treatment (deterministic hash-based)."""
        if user_id in self.user_assignments and experiment_id in self.user_assignments[user_id]:
            return self.user_assignments[user_id][experiment_id]

        hash_input = f"{user_id}:{experiment_id}".encode()
        hash_value = int(hashlib.md5(hash_input).hexdigest()[:8], 16) / (2**32)

        exp = self.active_experiments[experiment_id]
        variant = "treatment" if hash_value < exp["allocation"] else "control"

        if user_id not in self.user_assignments:
            self.user_assignments[user_id] = {}
        self.user_assignments[user_id][experiment_id] = variant
        return variant

    def log_metric(self, experiment_id, user_id, metric_name, metric_value):
        """Log metric event for analysis."""
        variant = self.user_assignments.get(user_id, {}).get(experiment_id)
        if not variant:
            variant = self.assign_user(user_id, experiment_id)

        self.metrics[experiment_id].append({
            "user_id": user_id,
            "variant": variant,
            "metric": metric_name,
            "value": metric_value,
            "timestamp": datetime.now()
        })

    def analyze_experiment(self, experiment_id):
        """Analyze experiment results."""
        events = self.metrics[experiment_id]
        control = [e for e in events if e["variant"] == "control"]
        treatment = [e for e in events if e["variant"] == "treatment"]

        control_mean = np.mean([e["value"] for e in control]) if control else 0
        treatment_mean = np.mean([e["value"] for e in treatment]) if treatment else 0
        lift = (treatment_mean - control_mean) / control_mean if control_mean > 0 else 0

        return {"control_mean": control_mean, "treatment_mean": treatment_mean, "lift": lift}


# Usage example
framework = EmbeddingExperimentFramework()
framework.create_experiment("emb_v2_test", "v1.0.0", "v2.0.0", traffic_allocation=0.05)
variant = framework.assign_user("user_123", "emb_v2_test")
framework.log_metric("emb_v2_test", "user_123", "click_through_rate", 0.15)
results = framework.analyze_experiment("emb_v2_test")
print(f"Experiment results: {results}")
Created experiment: emb_v2_test with 5.0% treatment traffic
Experiment results: {'control_mean': np.float64(0.15), 'treatment_mean': 0, 'lift': np.float64(-1.0)}
TipA/B Test Best Practices
  1. Pre-register hypothesis: Define success metrics before starting
  2. Power analysis: Calculate required sample size upfront
  3. Avoid peeking: Don’t conclude early based on interim results (increases false positive rate)
  4. Monitor guardrail metrics: Latency, error rate, system health
  5. Document everything: Experiment design, results, learnings for future reference
WarningSimpson’s Paradox in Embedding Tests

Embeddings can show different effects across user segments. A model might improve recommendations for new users but degrade for power users. Always segment analysis by key user characteristics (tenure, engagement level, device type) to detect heterogeneous treatment effects.

19.4.2 Interleaving Experiments

Interleaving provides a more sensitive alternative to A/B testing by showing results from both systems on the same result page and measuring user preferences directly. Where A/B tests require large sample sizes to detect small improvements, interleaving experiments can detect the same effect with 10-100x fewer users.

Show Team Draft Interleaving implementation
import numpy as np
from typing import List, Tuple
from dataclasses import dataclass

@dataclass
class InterleavingResult:
    """Result of an interleaving experiment."""
    interleaved_list: List[str]
    system_a_items: set
    system_b_items: set
    attribution: dict  # item_id -> system

class TeamDraftInterleaving:
    """
    Team Draft Interleaving for comparing two ranking systems.

    Alternates between systems picking items, like team captains
    picking players. More sensitive than A/B testing for ranking
    comparison.
    """

    def interleave(
        self,
        ranking_a: List[str],
        ranking_b: List[str],
        length: int = 10
    ) -> InterleavingResult:
        """
        Create interleaved result list using Team Draft algorithm.

        Args:
            ranking_a: Ranked list from system A
            ranking_b: Ranked list from system B
            length: Maximum length of interleaved list
        """
        interleaved = []
        team_a = set()
        team_b = set()
        attribution = {}

        ptr_a = 0
        ptr_b = 0

        while len(interleaved) < length:
            # Randomly decide which team picks first this round
            if np.random.random() < 0.5:
                teams = [('A', ranking_a, team_a), ('B', ranking_b, team_b)]
            else:
                teams = [('B', ranking_b, team_b), ('A', ranking_a, team_a)]

            for team_name, ranking, team_set in teams:
                if len(interleaved) >= length:
                    break

                # Find next item from this ranking not already in list
                ptr = ptr_a if team_name == 'A' else ptr_b
                while ptr < len(ranking) and ranking[ptr] in interleaved:
                    ptr += 1

                if ptr < len(ranking):
                    item = ranking[ptr]
                    interleaved.append(item)
                    team_set.add(item)
                    attribution[item] = team_name
                    ptr += 1

                if team_name == 'A':
                    ptr_a = ptr
                else:
                    ptr_b = ptr

        return InterleavingResult(
            interleaved_list=interleaved,
            system_a_items=team_a,
            system_b_items=team_b,
            attribution=attribution
        )

    def compute_preference(
        self,
        result: InterleavingResult,
        clicked_items: List[str]
    ) -> dict:
        """
        Compute system preference based on user clicks.

        Returns which system the user preferred based on clicks.
        """
        a_clicks = sum(1 for item in clicked_items if item in result.system_a_items)
        b_clicks = sum(1 for item in clicked_items if item in result.system_b_items)

        if a_clicks > b_clicks:
            winner = 'A'
        elif b_clicks > a_clicks:
            winner = 'B'
        else:
            winner = 'tie'

        return {
            "system_a_clicks": a_clicks,
            "system_b_clicks": b_clicks,
            "winner": winner,
            "margin": abs(a_clicks - b_clicks)
        }


class InterleavingExperiment:
    """Run and analyze interleaving experiments."""

    def __init__(self, interleaver: TeamDraftInterleaving):
        self.interleaver = interleaver
        self.results = []

    def record_impression(
        self,
        query_id: str,
        ranking_a: List[str],
        ranking_b: List[str],
        clicks: List[str]
    ):
        """Record a single interleaving impression and clicks."""
        interleaved = self.interleaver.interleave(ranking_a, ranking_b)
        preference = self.interleaver.compute_preference(interleaved, clicks)

        self.results.append({
            "query_id": query_id,
            "winner": preference["winner"],
            "a_clicks": preference["system_a_clicks"],
            "b_clicks": preference["system_b_clicks"]
        })

    def analyze(self, min_queries: int = 100) -> dict:
        """Analyze experiment results."""
        if len(self.results) < min_queries:
            return {"error": f"Need at least {min_queries} queries"}

        a_wins = sum(1 for r in self.results if r["winner"] == "A")
        b_wins = sum(1 for r in self.results if r["winner"] == "B")
        ties = sum(1 for r in self.results if r["winner"] == "tie")

        total_decisive = a_wins + b_wins
        if total_decisive == 0:
            return {"error": "No decisive comparisons"}

        # Delta: proportion of wins for B minus wins for A
        delta = (b_wins - a_wins) / total_decisive

        # Sign test for significance
        from scipy import stats
        p_value = stats.binom_test(b_wins, total_decisive, 0.5)

        return {
            "n_queries": len(self.results),
            "a_wins": a_wins,
            "b_wins": b_wins,
            "ties": ties,
            "delta": delta,  # Positive = B is better
            "p_value": p_value,
            "significant": p_value < 0.05,
            "winner": "B" if delta > 0 and p_value < 0.05 else ("A" if delta < 0 and p_value < 0.05 else "inconclusive")
        }


# Example
interleaver = TeamDraftInterleaving()

# Compare two ranking systems
ranking_a = ["doc_1", "doc_2", "doc_3", "doc_4", "doc_5"]
ranking_b = ["doc_2", "doc_1", "doc_4", "doc_3", "doc_6"]

result = interleaver.interleave(ranking_a, ranking_b, length=5)
print(f"Interleaved list: {result.interleaved_list}")
print(f"System A items: {result.system_a_items}")
print(f"System B items: {result.system_b_items}")

# Simulate user clicking on doc_2 and doc_4
clicks = ["doc_2", "doc_4"]
preference = interleaver.compute_preference(result, clicks)
print(f"User preference: {preference}")
Interleaved list: ['doc_2', 'doc_1', 'doc_4', 'doc_3', 'doc_5']
System A items: {'doc_5', 'doc_1', 'doc_3'}
System B items: {'doc_2', 'doc_4'}
User preference: {'system_a_clicks': 0, 'system_b_clicks': 2, 'winner': 'B', 'margin': 2}
TipWhen to Use Interleaving vs A/B Testing

Use interleaving when:

  • Comparing ranking quality of two systems
  • You need faster results (10-100x fewer impressions required)
  • Systems are similar in quality (detecting small differences)
  • User-level randomization isn’t critical

Use A/B testing when:

  • Measuring absolute metrics (conversion rate, revenue)
  • Testing UI changes alongside embedding changes
  • You need user-level consistency for business metrics
  • Testing fundamentally different experiences

Best practice: Use interleaving to quickly identify promising embedding models, then validate winners with A/B tests before full deployment.

19.4.3 Multi-Armed Bandits for Embedding Selection

Multi-armed bandits (MABs) dynamically allocate traffic to better-performing embedding variants during an experiment, reducing the cost of testing inferior models. Unlike A/B tests with fixed allocation, bandits learn and adapt.

Show Thompson Sampling Bandit
import numpy as np
from typing import Dict, List
from dataclasses import dataclass, field

@dataclass
class BetaArm:
    """Arm with Beta distribution prior for binary outcomes (clicks)."""
    successes: int = 1  # Prior: Beta(1,1) = uniform
    failures: int = 1

    def sample(self) -> float:
        """Sample from posterior."""
        return np.random.beta(self.successes, self.failures)

    def update(self, reward: float):
        """Update posterior with observed reward."""
        if reward > 0:
            self.successes += 1
        else:
            self.failures += 1

    @property
    def mean(self) -> float:
        """Posterior mean."""
        return self.successes / (self.successes + self.failures)


class ThompsonSamplingBandit:
    """
    Thompson Sampling for embedding model selection.

    Balances exploration (trying uncertain models) with exploitation
    (using known-good models) to minimize regret during experimentation.
    """

    def __init__(self, model_ids: List[str]):
        self.arms: Dict[str, BetaArm] = {
            model_id: BetaArm() for model_id in model_ids
        }
        self.selection_history = []

    def select_model(self) -> str:
        """Select model using Thompson Sampling."""
        samples = {
            model_id: arm.sample()
            for model_id, arm in self.arms.items()
        }
        selected = max(samples, key=samples.get)
        self.selection_history.append(selected)
        return selected

    def record_outcome(self, model_id: str, success: bool):
        """Record outcome for selected model."""
        self.arms[model_id].update(1.0 if success else 0.0)

    def get_statistics(self) -> dict:
        """Get current statistics for all arms."""
        return {
            model_id: {
                "mean": arm.mean,
                "successes": arm.successes,
                "failures": arm.failures,
                "total": arm.successes + arm.failures - 2,  # Subtract prior
                "selection_rate": self.selection_history.count(model_id) / len(self.selection_history) if self.selection_history else 0
            }
            for model_id, arm in self.arms.items()
        }

    def get_best_model(self) -> str:
        """Return model with highest posterior mean."""
        return max(self.arms, key=lambda m: self.arms[m].mean)


class UCBBandit:
    """
    Upper Confidence Bound (UCB) bandit for embedding selection.

    More deterministic than Thompson Sampling, good for
    settings where reproducibility matters.
    """

    def __init__(self, model_ids: List[str], exploration_weight: float = 2.0):
        self.model_ids = model_ids
        self.exploration_weight = exploration_weight
        self.successes = {m: 0 for m in model_ids}
        self.trials = {m: 0 for m in model_ids}
        self.total_trials = 0

    def select_model(self) -> str:
        """Select model using UCB algorithm."""
        # Ensure each model is tried at least once
        for model_id in self.model_ids:
            if self.trials[model_id] == 0:
                return model_id

        ucb_values = {}
        for model_id in self.model_ids:
            mean = self.successes[model_id] / self.trials[model_id]
            exploration_bonus = np.sqrt(
                self.exploration_weight * np.log(self.total_trials) / self.trials[model_id]
            )
            ucb_values[model_id] = mean + exploration_bonus

        return max(ucb_values, key=ucb_values.get)

    def record_outcome(self, model_id: str, success: bool):
        """Record outcome."""
        self.trials[model_id] += 1
        self.total_trials += 1
        if success:
            self.successes[model_id] += 1

    def get_statistics(self) -> dict:
        return {
            model_id: {
                "mean": self.successes[model_id] / max(self.trials[model_id], 1),
                "trials": self.trials[model_id],
                "successes": self.successes[model_id]
            }
            for model_id in self.model_ids
        }


# Example: Compare embedding models with Thompson Sampling
np.random.seed(42)

models = ["emb_v1", "emb_v2", "emb_v3"]
# True click rates (unknown to bandit)
true_rates = {"emb_v1": 0.10, "emb_v2": 0.12, "emb_v3": 0.11}

bandit = ThompsonSamplingBandit(models)

# Simulate 1000 requests
for _ in range(1000):
    selected = bandit.select_model()
    # Simulate click based on true rate
    clicked = np.random.random() < true_rates[selected]
    bandit.record_outcome(selected, clicked)

print("Thompson Sampling Results after 1000 requests:")
stats = bandit.get_statistics()
for model_id, s in stats.items():
    print(f"  {model_id}: mean={s['mean']:.3f}, selected {s['selection_rate']:.1%}, n={s['total']}")
print(f"Best model: {bandit.get_best_model()}")
Thompson Sampling Results after 1000 requests:
  emb_v1: mean=0.082, selected 14.5%, n=145
  emb_v2: mean=0.119, selected 46.0%, n=460
  emb_v3: mean=0.116, selected 39.5%, n=395
Best model: emb_v2
WarningBandit Caveats for Embedding Experiments

Delayed rewards: Embedding quality may not show immediate effects (e.g., conversion happens days after search). Use appropriate attribution windows.

Non-stationarity: If embedding quality changes over time (model degradation, seasonal effects), standard bandits may not adapt correctly. Consider windowed or discounted bandits.

Context matters: User segments may prefer different embeddings. Consider contextual bandits that condition on user features.

Compliance: Some regulatory contexts require fixed allocation (clinical trials). Bandits may not be appropriate for all experiments.

19.4.4 Online Learning from Implicit Feedback

Production embedding systems can learn from user interactions continuously, adapting to changing preferences without full retraining.

Show position bias correction
import numpy as np
from typing import List, Tuple

class PositionBiasCorrector:
    """
    Correct for position bias in click data.

    Users are more likely to click higher-ranked results regardless
    of relevance. This class estimates and corrects for position bias
    to get unbiased relevance signals.
    """

    def __init__(self, n_positions: int = 10):
        self.n_positions = n_positions
        # Initialize position bias estimates (examination probabilities)
        self.examination_prob = np.ones(n_positions)
        self.position_clicks = np.zeros(n_positions)
        self.position_impressions = np.zeros(n_positions)

    def estimate_examination_probability(
        self,
        click_data: List[Tuple[int, bool]]  # (position, clicked)
    ):
        """
        Estimate examination probability per position.

        Uses result randomization or swap experiments to estimate
        how likely users are to examine each position.
        """
        for position, clicked in click_data:
            if position < self.n_positions:
                self.position_impressions[position] += 1
                if clicked:
                    self.position_clicks[position] += 1

        # Simple estimate: CTR at position / max CTR
        ctrs = np.divide(
            self.position_clicks,
            self.position_impressions,
            where=self.position_impressions > 0,
            out=np.zeros(self.n_positions)
        )
        max_ctr = ctrs.max() if ctrs.max() > 0 else 1
        self.examination_prob = ctrs / max_ctr
        self.examination_prob = np.clip(self.examination_prob, 0.01, 1.0)

    def correct_click(
        self,
        position: int,
        clicked: bool
    ) -> float:
        """
        Return position-bias-corrected relevance signal.

        Uses Inverse Propensity Scoring (IPS) to debias clicks.
        """
        if position >= self.n_positions:
            return 0.0

        exam_prob = self.examination_prob[position]

        if clicked:
            # IPS: upweight clicks at low-examination positions
            return 1.0 / exam_prob
        else:
            # For non-clicks, we can't distinguish "not examined" from
            # "examined but not relevant". Return 0 for simplicity.
            return 0.0

    def get_unbiased_relevance_estimates(
        self,
        impressions: List[dict]  # [{position, clicked, item_id}, ...]
    ) -> dict:
        """
        Get unbiased relevance estimates per item.
        """
        item_relevance = {}
        item_counts = {}

        for imp in impressions:
            item_id = imp["item_id"]
            corrected = self.correct_click(imp["position"], imp["clicked"])

            if item_id not in item_relevance:
                item_relevance[item_id] = 0
                item_counts[item_id] = 0

            item_relevance[item_id] += corrected
            item_counts[item_id] += 1

        # Average corrected relevance
        return {
            item_id: item_relevance[item_id] / item_counts[item_id]
            for item_id in item_relevance
        }


# Example
np.random.seed(42)
corrector = PositionBiasCorrector(n_positions=10)

# Simulate click data with position bias
click_data = []
for _ in range(10000):
    position = np.random.randint(0, 10)
    # Click probability decreases with position
    true_exam_prob = 1.0 / (1 + position * 0.3)
    clicked = np.random.random() < true_exam_prob * 0.3  # 30% base CTR if examined
    click_data.append((position, clicked))

corrector.estimate_examination_probability(click_data)
print("Estimated examination probabilities:")
for i, prob in enumerate(corrector.examination_prob):
    print(f"  Position {i}: {prob:.3f}")

# Correct a click at position 5
raw_click = 1.0
corrected = corrector.correct_click(position=5, clicked=True)
print(f"\nRaw click at position 5: {raw_click}")
print(f"Corrected (IPS) relevance: {corrected:.2f}")
Estimated examination probabilities:
  Position 0: 1.000
  Position 1: 0.788
  Position 2: 0.639
  Position 3: 0.499
  Position 4: 0.510
  Position 5: 0.357
  Position 6: 0.436
  Position 7: 0.334
  Position 8: 0.262
  Position 9: 0.245

Raw click at position 5: 1.0
Corrected (IPS) relevance: 2.80
TipImplicit Feedback Best Practices

Combine multiple signals: Clicks alone are noisy. Combine with dwell time, add-to-cart, purchases, and returns for robust relevance estimates.

Use counterfactual evaluation: Before deploying models trained on biased data, use off-policy evaluation to estimate their true performance.

Monitor feedback loops: Models trained on their own predictions can amplify biases. Track diversity metrics and inject exploration to prevent filter bubbles.

19.5 Monitoring Embedding Drift and Degradation

Embedding quality degrades over time even without model changes. Data distribution shifts, user behavior evolves, and the corpus grows. Continuous monitoring detects degradation before it impacts users, enabling proactive retraining and updates.

19.5.1 Sources of Embedding Degradation

  1. Data drift: Input data distribution changes (new product categories, seasonal trends)
  2. Concept drift: Relationships between entities change (word meanings shift, user preferences evolve)
  3. Corpus growth: New items dilute existing embeddings (index becomes less representative)
  4. Model staleness: Fixed model doesn’t adapt to new patterns
  5. Infrastructure changes: Index configuration, hardware, network latency
Show Embedding Monitoring System
from datetime import datetime
import numpy as np
import torch


class EmbeddingMonitoringSystem:
    """Continuous monitoring system for embedding quality and drift detection."""

    def __init__(self, model, test_dataset, alert_thresholds=None):
        self.model = model
        self.test_dataset = test_dataset
        self.alert_thresholds = alert_thresholds or {
            "recall_drop": 0.05,
            "latency_increase": 0.20,
            "norm_change": 0.15
        }
        self.baseline_metrics = None
        self.historical_metrics = []

    def evaluate_current_quality(self, sample_size=10000):
        """Evaluate current embedding quality."""
        self.model.eval()

        # Intrinsic metrics
        with torch.no_grad():
            sample_embeddings = self.model(torch.randn(sample_size, 100))
        norms = torch.norm(sample_embeddings, dim=1)
        avg_norm = norms.mean().item()

        # Extrinsic metrics (simplified)
        recall_at_10 = 0.89  # Placeholder for real evaluation

        metrics = {
            "timestamp": datetime.now(),
            "avg_norm": avg_norm,
            "recall_at_10": recall_at_10
        }

        return metrics

    def detect_drift(self, current_metrics):
        """Detect drift in embedding quality."""
        if self.baseline_metrics is None:
            self.baseline_metrics = current_metrics
            print("Baseline metrics established")
            return {}

        alerts = {}

        # Check recall drift
        recall_drop = (self.baseline_metrics["recall_at_10"] - current_metrics["recall_at_10"]) / self.baseline_metrics["recall_at_10"]
        if recall_drop > self.alert_thresholds["recall_drop"]:
            alerts["recall_degradation"] = True
            print(f"ALERT: Recall dropped {recall_drop:.1%} from baseline")

        # Check norm drift
        norm_change = abs(current_metrics["avg_norm"] - self.baseline_metrics["avg_norm"]) / self.baseline_metrics["avg_norm"]
        if norm_change > self.alert_thresholds["norm_change"]:
            alerts["distribution_shift"] = True
            print(f"ALERT: Embedding norm changed {norm_change:.1%}")

        if not alerts:
            print("No drift detected - quality stable")

        return alerts

    def should_retrain(self, alerts, days_since_training):
        """Decide whether to trigger model retraining."""
        critical_alerts = ["recall_degradation", "distribution_shift"]
        if any(alerts.get(alert) for alert in critical_alerts):
            return True, "quality_degradation"
        if days_since_training > 30:
            return True, "model_staleness"
        return False, ""


# Usage example
model = torch.nn.Sequential(torch.nn.Linear(100, 128))
monitor = EmbeddingMonitoringSystem(model, test_dataset=None)
metrics = monitor.evaluate_current_quality(sample_size=1000)
alerts = monitor.detect_drift(metrics)
should_retrain, reason = monitor.should_retrain(alerts, days_since_training=15)
print(f"Monitoring: {len(alerts)} alerts, Retrain needed: {should_retrain}")
Baseline metrics established
Monitoring: 0 alerts, Retrain needed: False
TipMonitoring Dashboard Essentials

A production embedding monitoring dashboard should display:

Real-time metrics (updated every minute):

  • Query latency (p50, p95, p99)
  • Throughput (queries/second)
  • Error rate
  • Cache hit rate

Quality metrics (updated hourly/daily):

  • Retrieval recall@10, recall@100
  • NDCG@10
  • User engagement metrics (CTR, conversion rate)
  • Embedding distribution statistics

System health (updated every 5 minutes):

  • Index size and growth rate
  • Memory usage
  • GPU utilization
  • Background job status (retraining, re-embedding)
WarningSilent Degradation

Embedding quality can degrade gradually without triggering alerts. Complement threshold-based alerts with:

  • Trend analysis: Detect slow downward trends even within thresholds
  • Comparative baselines: Compare against historical best, not just initial baseline
  • Canary queries: Maintain set of “golden queries” that should always perform well

For comprehensive evaluation metrics including intrinsic quality (isotropy, uniformity), retrieval metrics (MAP, NDCG, MRR), human evaluation frameworks, and domain-specific metrics, see Chapter 21.

19.6 Key Takeaways

  • MLOps for embeddings requires specialized infrastructure: Model registries, batch inference pipelines, and version coordination across training, serving, and indexing stages differentiate embedding systems from traditional ML deployments

  • Hybrid batch/real-time strategies optimize cost and freshness: Batch processing for stable entities (products, documents), real-time generation for dynamic content (queries, sessions), and caching for popular items balances throughput, latency, and resource utilization at scale

  • Embedding versioning is complex due to incompatibility between model versions: Blue-green, incremental, shadow, and canary deployment strategies each offer different trade-offs between safety, speed, and resource requirements when updating embedding models

  • A/B testing measures real-world embedding impact: Hash-based user assignment, consistent routing, separate indices per variant, and statistical analysis of business metrics (CTR, conversion, revenue) validate embedding improvements beyond offline metrics

  • Continuous monitoring detects degradation before user impact: Track intrinsic metrics (embedding norms, variance, nearest neighbor distances), extrinsic metrics (recall, NDCG, MRR), and system metrics (latency, throughput) with drift detection and automatic retraining triggers

  • Production embedding systems require operational maturity: Rollback plans, version pinning for reproducibility, graceful degradation, alerting on quality and performance regressions, and documentation of all experiments and deployments

  • Scale demands automation: Manual embedding pipeline management breaks down at trillion-row scale; invest in automated quality monitoring, deployment orchestration, and retraining workflows early

19.7 Looking Ahead

This chapter covered the operational practices for deploying and maintaining embedding systems in production. Chapter 20 shifts focus to the computational challenges of training embedding models at scale, exploring distributed training architectures, gradient accumulation and mixed precision techniques, memory optimization strategies, and multi-GPU/multi-node training approaches that enable learning from trillion-row datasets.

19.8 Further Reading

19.8.1 MLOps and Model Management

  • Sculley et al. (2015). “Hidden Technical Debt in Machine Learning Systems.” NeurIPS.
  • Renggli et al. (2021). “A Data Quality-Driven View of MLOps.” IEEE Data Engineering Bulletin.
  • Paleyes et al. (2022). “Challenges in Deploying Machine Learning: A Survey of Case Studies.” ACM Computing Surveys.

19.8.2 Deployment Strategies

  • Kubernetes Documentation. “Blue-Green Deployments and Canary Releases.”
  • Richardson, C. (2018). “Microservices Patterns: With Examples in Java.” Manning Publications.
  • Humble & Farley (2010). “Continuous Delivery: Reliable Software Releases through Build, Test, and Deployment Automation.” Addison-Wesley.

19.8.3 A/B Testing

  • Kohavi & Longbotham (2017). “Online Controlled Experiments and A/B Testing.” Encyclopedia of Machine Learning and Data Mining.
  • Deng et al. (2013). “Improving the Sensitivity of Online Controlled Experiments by Utilizing Pre-Experiment Data.” WSDM.
  • Gupta et al. (2019). “Top Challenges from the First Practical Online Controlled Experiments Summit.” SIGKDD.

19.8.4 Monitoring and Observability

  • Schelter et al. (2018). “Automating Large-Scale Data Quality Verification.” VLDB.
  • Polyzotis et al. (2018). “Data Lifecycle Challenges in Production Machine Learning.” SIGMOD.
  • Breck et al. (2019). “Data Validation for Machine Learning.” MLSys.

19.8.5 Embedding-Specific Operations

  • Grbovic & Cheng (2018). “Real-time Personalization using Embeddings for Search Ranking at Airbnb.” KDD.
  • Haldar et al. (2019). “Applying Deep Learning To Airbnb Search.” KDD.
  • Bernhardsson, E. (2015). “Nearest Neighbors and Vector Models.” Erik Bernhardsson Blog.