21  Embedding Quality Evaluation

NoteChapter Overview

Measuring embedding quality is deceptively difficult. Unlike classification accuracy or regression error, embedding quality is multidimensional: retrieval performance, representation quality, downstream task accuracy, and user satisfaction all matter but may not correlate. This chapter provides a comprehensive framework for evaluating embeddings across intrinsic quality metrics (isotropy, uniformity, alignment), retrieval metrics (Recall@K, MAP, NDCG, MRR), human evaluation protocols, domain-specific metrics, and statistical rigor. We cover evaluation at trillion-row scale including sampling strategies, efficient computation, and continuous monitoring—enabling you to measure what matters and catch degradation before users notice.

Embedding evaluation differs fundamentally from traditional ML evaluation. A classifier has a clear target: predict the correct label. An embedding has no single correct answer—quality depends on how well the embedding supports downstream tasks, which may not be known at training time. This creates a challenging evaluation landscape requiring multiple complementary metrics, careful experimental design, and statistical rigor.

21.1 Intrinsic Quality Metrics

Intrinsic metrics measure embedding quality without reference to a specific downstream task. They capture properties of the embedding space itself—how well-distributed vectors are, how much of the space is utilized, and whether semantic relationships are preserved. These metrics detect problems even without labeled evaluation data.

21.1.1 Isotropy: Are Embeddings Well-Distributed?

Isotropy measures how uniformly embeddings are distributed across the vector space. Perfectly isotropic embeddings have equal variance in all directions—no dimension dominates, and vectors aren’t clustered in a narrow cone.

Why isotropy matters:

  • Low isotropy means embeddings cluster in a small region, reducing discriminative power
  • Highly anisotropic embeddings waste dimensions on directions with little variance
  • Similarity search becomes unreliable when all vectors are similar to each other
Show isotropy measurement implementation
import torch
import torch.nn.functional as F
import numpy as np

def compute_isotropy(embeddings: torch.Tensor) -> dict:
    """
    Compute isotropy metrics for a set of embeddings.

    Isotropy measures how uniformly embeddings are distributed in the space.
    Perfect isotropy = 1.0, all vectors identical = 0.0

    Args:
        embeddings: Tensor of shape (n_samples, embedding_dim)

    Returns:
        Dictionary with isotropy metrics
    """
    # Center embeddings
    centered = embeddings - embeddings.mean(dim=0)

    # Compute covariance matrix
    n = embeddings.shape[0]
    cov = (centered.T @ centered) / (n - 1)

    # Eigenvalue decomposition
    eigenvalues = torch.linalg.eigvalsh(cov)
    eigenvalues = eigenvalues.clamp(min=1e-10)  # Numerical stability

    # Sort descending
    eigenvalues = eigenvalues.flip(0)

    # Isotropy metrics
    # 1. Partition function isotropy (Mu et al., 2018)
    #    Measures how much the eigenvalue distribution deviates from uniform
    Z = eigenvalues.sum()
    partition_isotropy = (eigenvalues.min() * len(eigenvalues)) / Z

    # 2. Effective dimensionality (participation ratio)
    #    How many dimensions are "active"
    effective_dim = (eigenvalues.sum() ** 2) / (eigenvalues ** 2).sum()

    # 3. Explained variance ratio
    #    What fraction of variance is in top-k dimensions
    total_var = eigenvalues.sum()
    top_10_var = eigenvalues[:10].sum() / total_var
    top_50_var = eigenvalues[:50].sum() / total_var

    # 4. Average cosine similarity (should be ~0 for isotropic)
    normalized = F.normalize(embeddings, dim=1)
    cos_sim_matrix = normalized @ normalized.T
    # Exclude diagonal
    mask = ~torch.eye(n, dtype=torch.bool, device=embeddings.device)
    avg_cos_sim = cos_sim_matrix[mask].mean()

    return {
        "partition_isotropy": partition_isotropy.item(),
        "effective_dimensionality": effective_dim.item(),
        "effective_dim_ratio": effective_dim.item() / embeddings.shape[1],
        "top_10_variance_ratio": top_10_var.item(),
        "top_50_variance_ratio": top_50_var.item(),
        "avg_cosine_similarity": avg_cos_sim.item(),
        "embedding_dim": embeddings.shape[1]
    }


# Example: Compare isotropic vs anisotropic embeddings
torch.manual_seed(42)

# Well-distributed embeddings (more isotropic)
isotropic_emb = torch.randn(1000, 256)

# Poorly distributed (anisotropic - most variance in few dimensions)
anisotropic_emb = torch.randn(1000, 256)
anisotropic_emb[:, :10] *= 10  # First 10 dims dominate

print("Isotropic embeddings:")
iso_metrics = compute_isotropy(isotropic_emb)
print(f"  Partition isotropy: {iso_metrics['partition_isotropy']:.4f}")
print(f"  Effective dim ratio: {iso_metrics['effective_dim_ratio']:.2%}")
print(f"  Avg cosine similarity: {iso_metrics['avg_cosine_similarity']:.4f}")

print("\nAnisotropic embeddings:")
aniso_metrics = compute_isotropy(anisotropic_emb)
print(f"  Partition isotropy: {aniso_metrics['partition_isotropy']:.4f}")
print(f"  Effective dim ratio: {aniso_metrics['effective_dim_ratio']:.2%}")
print(f"  Avg cosine similarity: {aniso_metrics['avg_cosine_similarity']:.4f}")
Isotropic embeddings:
  Partition isotropy: 0.2481
  Effective dim ratio: 79.38%
  Avg cosine similarity: 0.0001

Anisotropic embeddings:
  Partition isotropy: 0.0518
  Effective dim ratio: 5.91%
  Avg cosine similarity: -0.0001
TipIsotropy Benchmarks

Good isotropy indicators:

  • Partition isotropy > 0.5 (higher is better, max 1.0)
  • Effective dimensionality > 50% of embedding dimension
  • Average cosine similarity close to 0 (typically -0.01 to 0.05)
  • Top 10 dimensions explain < 20% of variance

Warning signs:

  • Average cosine similarity > 0.3 (vectors too similar)
  • Effective dimensionality < 20% of embedding dimension
  • Top 10 dimensions explain > 50% of variance

Note: Some anisotropy is expected and even desirable—it reflects the structure of your data. The key is ensuring useful dimensions aren’t wasted on noise.

21.1.2 Uniformity and Alignment

Wang & Isola (2020) introduced uniformity and alignment as complementary metrics for contrastive embeddings:

  • Alignment: Similar items should have similar embeddings (low distance between positives)
  • Uniformity: Embeddings should be uniformly distributed on the unit hypersphere (maximize use of space)
Show uniformity and alignment metrics
import torch
import torch.nn.functional as F

def compute_alignment(
    embeddings: torch.Tensor,
    positive_pairs: torch.Tensor,
    alpha: float = 2.0
) -> float:
    """
    Compute alignment metric: expected distance between positive pairs.

    Lower is better - positive pairs should be close.

    Args:
        embeddings: (n_samples, dim) normalized embeddings
        positive_pairs: (n_pairs, 2) indices of positive pairs
        alpha: exponent for distance (default 2 = squared distance)
    """
    emb1 = embeddings[positive_pairs[:, 0]]
    emb2 = embeddings[positive_pairs[:, 1]]

    # Squared L2 distance for normalized vectors = 2 - 2*cos_sim
    distances = (emb1 - emb2).pow(2).sum(dim=1)

    alignment = distances.pow(alpha / 2).mean()
    return alignment.item()


def compute_uniformity(
    embeddings: torch.Tensor,
    t: float = 2.0,
    sample_size: int = 10000
) -> float:
    """
    Compute uniformity metric: how uniformly distributed embeddings are.

    Lower is better - embeddings should spread across the hypersphere.
    Based on Wang & Isola (2020).

    Args:
        embeddings: (n_samples, dim) normalized embeddings
        t: temperature parameter (default 2)
        sample_size: number of pairs to sample for efficiency
    """
    n = embeddings.shape[0]

    if n * (n - 1) // 2 > sample_size:
        # Sample pairs for efficiency
        idx1 = torch.randint(0, n, (sample_size,))
        idx2 = torch.randint(0, n, (sample_size,))
        # Ensure different indices
        mask = idx1 != idx2
        idx1, idx2 = idx1[mask], idx2[mask]
        emb1, emb2 = embeddings[idx1], embeddings[idx2]
    else:
        # Compute all pairs
        emb1 = embeddings.unsqueeze(1)  # (n, 1, dim)
        emb2 = embeddings.unsqueeze(0)  # (1, n, dim)

    # Squared L2 distance
    sq_distances = (emb1 - emb2).pow(2).sum(dim=-1)

    # Uniformity loss (log of average Gaussian kernel)
    uniformity = torch.log(torch.exp(-t * sq_distances).mean())

    return uniformity.item()


def compute_alignment_uniformity(
    embeddings: torch.Tensor,
    positive_pairs: torch.Tensor = None,
    labels: torch.Tensor = None
) -> dict:
    """
    Compute both alignment and uniformity metrics.

    Args:
        embeddings: (n_samples, dim) embeddings (will be normalized)
        positive_pairs: (n_pairs, 2) indices of positive pairs, OR
        labels: (n_samples,) class labels to generate positive pairs
    """
    # Normalize embeddings
    embeddings = F.normalize(embeddings, dim=1)

    # Generate positive pairs from labels if needed
    if positive_pairs is None and labels is not None:
        positive_pairs = []
        for label in labels.unique():
            indices = (labels == label).nonzero().squeeze()
            if len(indices) > 1:
                # Sample pairs within class
                for i in range(min(len(indices), 50)):
                    for j in range(i + 1, min(len(indices), 50)):
                        positive_pairs.append([indices[i].item(), indices[j].item()])
        positive_pairs = torch.tensor(positive_pairs)

    alignment = compute_alignment(embeddings, positive_pairs) if positive_pairs is not None else None
    uniformity = compute_uniformity(embeddings)

    return {
        "alignment": alignment,
        "uniformity": uniformity,
        "alignment_uniformity_sum": (alignment + uniformity) if alignment else None
    }


# Example
torch.manual_seed(42)
embeddings = torch.randn(500, 128)
labels = torch.randint(0, 10, (500,))  # 10 classes

metrics = compute_alignment_uniformity(embeddings, labels=labels)
print(f"Alignment: {metrics['alignment']:.4f} (lower = positive pairs closer)")
print(f"Uniformity: {metrics['uniformity']:.4f} (lower = more spread out)")
Alignment: 1.9989 (lower = positive pairs closer)
Uniformity: -3.9338 (lower = more spread out)
NoteAlignment vs Uniformity Trade-off

Perfect alignment (all positives identical) destroys uniformity (everything clustered). Good embeddings balance both:

  • High alignment + low uniformity: Over-clustered, losing discriminative power
  • Low alignment + high uniformity: Good spread but positives too far apart
  • Target: Low alignment AND low uniformity (both close to optimal)

Typical good values: alignment < 0.5, uniformity < -2.0

21.1.3 Dimension Utilization and Collapse

Representation collapse occurs when embeddings fail to use the full dimensionality—a common failure mode in self-supervised learning.

Show dimension collapse detection
import torch

def detect_dimension_collapse(embeddings: torch.Tensor, threshold: float = 0.01) -> dict:
    """
    Detect dimension collapse in embeddings.

    Collapse occurs when:
    - Many dimensions have near-zero variance
    - Embeddings are constant along certain dimensions
    - Effective rank is much lower than nominal dimension

    Args:
        embeddings: (n_samples, dim) embeddings
        threshold: variance threshold for "dead" dimensions
    """
    # Per-dimension statistics
    dim_means = embeddings.mean(dim=0)
    dim_vars = embeddings.var(dim=0)
    dim_stds = dim_vars.sqrt()

    # Dead dimensions (near-zero variance)
    dead_dims = (dim_vars < threshold).sum().item()
    dead_ratio = dead_dims / embeddings.shape[1]

    # Dimension variance distribution
    var_percentiles = {
        "min": dim_vars.min().item(),
        "p25": dim_vars.quantile(0.25).item(),
        "median": dim_vars.median().item(),
        "p75": dim_vars.quantile(0.75).item(),
        "max": dim_vars.max().item()
    }

    # SVD-based rank estimation
    _, singular_values, _ = torch.svd(embeddings - embeddings.mean(dim=0))

    # Effective rank (Roy & Bhattacharya, 2007)
    normalized_sv = singular_values / singular_values.sum()
    entropy = -(normalized_sv * normalized_sv.log()).sum()
    effective_rank = entropy.exp().item()

    # Stable rank
    stable_rank = (singular_values.sum() ** 2) / (singular_values ** 2).sum()

    return {
        "dead_dimensions": dead_dims,
        "dead_ratio": dead_ratio,
        "variance_distribution": var_percentiles,
        "effective_rank": effective_rank,
        "stable_rank": stable_rank.item(),
        "nominal_dimension": embeddings.shape[1],
        "collapse_detected": dead_ratio > 0.1 or effective_rank < embeddings.shape[1] * 0.3
    }


# Example: Detect collapse
torch.manual_seed(42)

# Healthy embeddings
healthy = torch.randn(1000, 256)

# Collapsed embeddings (many dimensions unused)
collapsed = torch.randn(1000, 256)
collapsed[:, 50:] = 0.01 * torch.randn(1000, 206)  # Last 206 dims nearly dead

print("Healthy embeddings:")
h_metrics = detect_dimension_collapse(healthy)
print(f"  Dead dimensions: {h_metrics['dead_dimensions']}/{h_metrics['nominal_dimension']}")
print(f"  Effective rank: {h_metrics['effective_rank']:.1f}")
print(f"  Collapse detected: {h_metrics['collapse_detected']}")

print("\nCollapsed embeddings:")
c_metrics = detect_dimension_collapse(collapsed)
print(f"  Dead dimensions: {c_metrics['dead_dimensions']}/{c_metrics['nominal_dimension']}")
print(f"  Effective rank: {c_metrics['effective_rank']:.1f}")
print(f"  Collapse detected: {c_metrics['collapse_detected']}")
Healthy embeddings:
  Dead dimensions: 0/256
  Effective rank: 247.0
  Collapse detected: False

Collapsed embeddings:
  Dead dimensions: 206/256
  Effective rank: 61.5
  Collapse detected: True

21.2 Retrieval Metrics

Retrieval metrics measure how well embeddings support similarity search—the most common downstream task. Understanding when to use each metric and how they differ is crucial for meaningful evaluation.

21.2.1 Recall@K: Did We Find the Relevant Items?

Recall@K measures the fraction of relevant items found in the top K results. It answers: “Of all the things I should find, how many did I actually find?”

\[\text{Recall@K} = \frac{|\text{Relevant items in top K}|}{|\text{Total relevant items}|}\]

Show Recall@K implementation
import torch
import numpy as np

def recall_at_k(
    query_embeddings: torch.Tensor,
    corpus_embeddings: torch.Tensor,
    relevance_labels: torch.Tensor,
    k_values: list = [1, 5, 10, 50, 100]
) -> dict:
    """
    Compute Recall@K for embedding retrieval.

    Args:
        query_embeddings: (n_queries, dim)
        corpus_embeddings: (n_corpus, dim)
        relevance_labels: (n_queries, n_corpus) binary relevance matrix
                         or (n_queries,) with corpus index of single relevant item
        k_values: list of K values to compute

    Returns:
        Dictionary with Recall@K for each K
    """
    # Compute similarities
    query_norm = query_embeddings / query_embeddings.norm(dim=1, keepdim=True)
    corpus_norm = corpus_embeddings / corpus_embeddings.norm(dim=1, keepdim=True)
    similarities = query_norm @ corpus_norm.T  # (n_queries, n_corpus)

    # Get rankings
    rankings = similarities.argsort(dim=1, descending=True)

    results = {}

    # Handle single relevant item case
    if relevance_labels.dim() == 1:
        for k in k_values:
            top_k = rankings[:, :k]
            hits = (top_k == relevance_labels.unsqueeze(1)).any(dim=1)
            results[f"recall@{k}"] = hits.float().mean().item()
    else:
        # Multiple relevant items case
        for k in k_values:
            top_k = rankings[:, :k]
            recalls = []
            for i in range(len(query_embeddings)):
                relevant = relevance_labels[i].nonzero().squeeze(-1)
                if len(relevant) == 0:
                    continue
                found = (top_k[i].unsqueeze(1) == relevant.unsqueeze(0)).any(dim=1).sum()
                recalls.append(found.item() / len(relevant))
            results[f"recall@{k}"] = np.mean(recalls) if recalls else 0.0

    return results


# Example
torch.manual_seed(42)
n_queries, n_corpus, dim = 100, 10000, 256

queries = torch.randn(n_queries, dim)
corpus = torch.randn(n_corpus, dim)
# Each query has one relevant document
relevant_indices = torch.randint(0, n_corpus, (n_queries,))

results = recall_at_k(queries, corpus, relevant_indices)
for k, v in results.items():
    print(f"{k}: {v:.4f}")
recall@1: 0.0000
recall@5: 0.0000
recall@10: 0.0000
recall@50: 0.0000
recall@100: 0.0100

21.2.2 Precision@K: How Many Results Are Relevant?

Precision@K measures the fraction of top K results that are relevant. It answers: “Of what I returned, how much is useful?”

\[\text{Precision@K} = \frac{|\text{Relevant items in top K}|}{K}\]

Show Precision@K implementation
import torch

def precision_at_k(
    similarities: torch.Tensor,
    relevance: torch.Tensor,
    k_values: list = [1, 5, 10]
) -> dict:
    """
    Compute Precision@K.

    Args:
        similarities: (n_queries, n_corpus) similarity scores
        relevance: (n_queries, n_corpus) binary relevance matrix
        k_values: list of K values
    """
    rankings = similarities.argsort(dim=1, descending=True)

    results = {}
    for k in k_values:
        top_k_indices = rankings[:, :k]
        # Gather relevance for top-k items
        precisions = []
        for i in range(len(similarities)):
            relevant_in_topk = relevance[i, top_k_indices[i]].sum().item()
            precisions.append(relevant_in_topk / k)
        results[f"precision@{k}"] = np.mean(precisions)

    return results


# Example with multiple relevant items per query
torch.manual_seed(42)
n_queries, n_corpus = 50, 1000

similarities = torch.randn(n_queries, n_corpus)
# Each query has ~10 relevant documents
relevance = (torch.rand(n_queries, n_corpus) < 0.01).float()

results = precision_at_k(similarities, relevance)
for k, v in results.items():
    print(f"{k}: {v:.4f}")
precision@1: 0.0000
precision@5: 0.0000
precision@10: 0.0020

21.2.3 Mean Average Precision (MAP)

MAP summarizes precision across all recall levels, rewarding systems that rank relevant items higher:

\[\text{AP} = \frac{1}{|\text{Relevant}|} \sum_{k=1}^{N} P@k \cdot \text{rel}(k)\]

\[\text{MAP} = \frac{1}{|Q|} \sum_{q \in Q} \text{AP}(q)\]

Show MAP implementation
import torch
import numpy as np

def average_precision(ranked_relevance: torch.Tensor) -> float:
    """
    Compute Average Precision for a single query.

    Args:
        ranked_relevance: (n_items,) binary relevance in rank order
    """
    relevant_mask = ranked_relevance.bool()
    n_relevant = relevant_mask.sum().item()

    if n_relevant == 0:
        return 0.0

    # Cumulative sum of relevant items up to each position
    cum_relevant = ranked_relevance.cumsum(dim=0)

    # Precision at each position
    positions = torch.arange(1, len(ranked_relevance) + 1, dtype=torch.float32)
    precisions = cum_relevant / positions

    # AP = mean of precisions at relevant positions
    ap = (precisions * ranked_relevance).sum() / n_relevant

    return ap.item()


def mean_average_precision(
    similarities: torch.Tensor,
    relevance: torch.Tensor,
    cutoff: int = None
) -> dict:
    """
    Compute Mean Average Precision.

    Args:
        similarities: (n_queries, n_corpus) similarity scores
        relevance: (n_queries, n_corpus) binary relevance
        cutoff: optional cutoff for ranking (MAP@K)
    """
    rankings = similarities.argsort(dim=1, descending=True)

    if cutoff:
        rankings = rankings[:, :cutoff]

    aps = []
    for i in range(len(similarities)):
        ranked_rel = relevance[i, rankings[i]]
        aps.append(average_precision(ranked_rel))

    return {
        "map": np.mean(aps),
        "map_std": np.std(aps),
        "min_ap": np.min(aps),
        "max_ap": np.max(aps)
    }


# Example
torch.manual_seed(42)
similarities = torch.randn(100, 1000)
relevance = (torch.rand(100, 1000) < 0.02).float()

map_results = mean_average_precision(similarities, relevance)
print(f"MAP: {map_results['map']:.4f} (std: {map_results['map_std']:.4f})")
MAP: 0.0259 (std: 0.0102)

21.2.4 Mean Reciprocal Rank (MRR)

MRR measures how high the first relevant result appears:

\[\text{MRR} = \frac{1}{|Q|} \sum_{q \in Q} \frac{1}{\text{rank of first relevant}}\]

MRR is particularly useful for navigational queries where users want one specific result.

Show MRR implementation
import torch
import numpy as np

def mean_reciprocal_rank(
    similarities: torch.Tensor,
    relevance: torch.Tensor
) -> dict:
    """
    Compute Mean Reciprocal Rank.

    Args:
        similarities: (n_queries, n_corpus) similarity scores
        relevance: (n_queries, n_corpus) binary relevance
    """
    rankings = similarities.argsort(dim=1, descending=True)

    reciprocal_ranks = []

    for i in range(len(similarities)):
        ranked_rel = relevance[i, rankings[i]]
        # Find first relevant item
        first_relevant = (ranked_rel == 1).nonzero()

        if len(first_relevant) > 0:
            rank = first_relevant[0].item() + 1  # 1-indexed
            reciprocal_ranks.append(1.0 / rank)
        else:
            reciprocal_ranks.append(0.0)

    return {
        "mrr": np.mean(reciprocal_ranks),
        "mrr_std": np.std(reciprocal_ranks),
        "queries_with_relevant": sum(1 for rr in reciprocal_ranks if rr > 0)
    }


# Example
torch.manual_seed(42)
similarities = torch.randn(100, 1000)
relevance = (torch.rand(100, 1000) < 0.01).float()

mrr_results = mean_reciprocal_rank(similarities, relevance)
print(f"MRR: {mrr_results['mrr']:.4f}")
print(f"Queries with relevant results: {mrr_results['queries_with_relevant']}/100")
MRR: 0.0434
Queries with relevant results: 100/100

21.2.5 Normalized Discounted Cumulative Gain (NDCG)

NDCG handles graded relevance (not just binary) and discounts the value of results lower in the ranking:

\[\text{DCG@K} = \sum_{i=1}^{K} \frac{2^{\text{rel}_i} - 1}{\log_2(i + 1)}\]

\[\text{NDCG@K} = \frac{\text{DCG@K}}{\text{IDCG@K}}\]

where IDCG is the DCG of the ideal (perfect) ranking.

Show NDCG implementation
import torch
import numpy as np

def dcg_at_k(relevance_scores: torch.Tensor, k: int) -> float:
    """Compute DCG@K for graded relevance."""
    relevance_scores = relevance_scores[:k]
    gains = 2 ** relevance_scores - 1
    discounts = torch.log2(torch.arange(2, len(relevance_scores) + 2, dtype=torch.float32))
    return (gains / discounts).sum().item()


def ndcg_at_k(
    similarities: torch.Tensor,
    relevance: torch.Tensor,
    k_values: list = [5, 10, 20]
) -> dict:
    """
    Compute NDCG@K for graded relevance.

    Args:
        similarities: (n_queries, n_corpus) similarity scores
        relevance: (n_queries, n_corpus) graded relevance (0, 1, 2, 3, ...)
        k_values: list of K values
    """
    rankings = similarities.argsort(dim=1, descending=True)

    results = {}

    for k in k_values:
        ndcgs = []

        for i in range(len(similarities)):
            # Get relevance in predicted order
            predicted_rel = relevance[i, rankings[i]]
            dcg = dcg_at_k(predicted_rel, k)

            # Get ideal relevance (sorted descending)
            ideal_rel = relevance[i].sort(descending=True).values
            idcg = dcg_at_k(ideal_rel, k)

            ndcg = dcg / idcg if idcg > 0 else 0.0
            ndcgs.append(ndcg)

        results[f"ndcg@{k}"] = np.mean(ndcgs)

    return results


# Example with graded relevance (0=not relevant, 1=somewhat, 2=relevant, 3=highly relevant)
torch.manual_seed(42)
similarities = torch.randn(100, 1000)
# Graded relevance
relevance = torch.zeros(100, 1000)
relevance[torch.rand(100, 1000) < 0.01] = 1
relevance[torch.rand(100, 1000) < 0.005] = 2
relevance[torch.rand(100, 1000) < 0.002] = 3

ndcg_results = ndcg_at_k(similarities, relevance.float())
for k, v in ndcg_results.items():
    print(f"{k}: {v:.4f}")
ndcg@5: 0.0032
ndcg@10: 0.0066
ndcg@20: 0.0132

21.2.6 When to Use Which Metric

Metric Best For Limitations
Recall@K Measuring coverage, ensuring relevant items aren’t missed Ignores precision, treats all relevant items equally
Precision@K When false positives are costly (e.g., legal, medical) Ignores items outside top K
MAP Comprehensive ranking quality, comparing systems Assumes binary relevance
MRR Navigational queries with single correct answer Only considers first relevant item
NDCG Graded relevance, nuanced quality assessment Requires graded judgments, harder to interpret
TipMetric Selection Guidelines

For product search: Use NDCG (users prefer more relevant products) + Recall@100 (coverage)

For document retrieval: Use MAP (comprehensive) + MRR (navigational queries)

For recommendations: Use NDCG@10 (top matters most) + Precision@10 (quality of shown items)

For fraud detection: Use Recall@K (can’t miss fraud) + Precision (avoid alert fatigue)

Always report multiple metrics to get a complete picture.

21.3 Human Evaluation Framework

Automated metrics have limitations. Human evaluation provides ground truth that algorithms can’t capture: subjective relevance, contextual appropriateness, and user satisfaction. This section covers how to collect high-quality human judgments at scale.

21.3.1 Designing Evaluation Tasks

Effective human evaluation requires clear task design:

Show human evaluation task framework
from dataclasses import dataclass
from typing import List, Optional
from enum import Enum

class RelevanceScale(Enum):
    """Standard graded relevance scale (TREC-style)."""
    NOT_RELEVANT = 0      # Completely irrelevant
    MARGINALLY = 1        # Marginally relevant
    FAIRLY = 2            # Fairly relevant
    HIGHLY = 3            # Highly relevant
    PERFECTLY = 4         # Perfect match


@dataclass
class EvaluationTask:
    """A single human evaluation task."""
    task_id: str
    query: str
    candidate: str
    context: Optional[str] = None
    instructions: str = ""

    def to_annotation_format(self) -> dict:
        return {
            "id": self.task_id,
            "query": self.query,
            "candidate": self.candidate,
            "context": self.context,
            "instructions": self.instructions,
            "scale": [s.name for s in RelevanceScale]
        }


@dataclass
class AnnotationGuidelines:
    """Guidelines for human annotators."""
    task_description: str
    relevance_definitions: dict
    examples: List[dict]
    edge_cases: List[str]

    @staticmethod
    def create_search_relevance_guidelines():
        return AnnotationGuidelines(
            task_description="""
            Rate how well each document answers the given query.
            Consider: Does it answer the question? Is the information accurate?
            Would a user be satisfied with this result?
            """,
            relevance_definitions={
                "NOT_RELEVANT": "Document has no useful information for the query",
                "MARGINALLY": "Document is tangentially related but doesn't answer the query",
                "FAIRLY": "Document partially answers the query or provides related info",
                "HIGHLY": "Document substantially answers the query",
                "PERFECTLY": "Document is an ideal answer to the query"
            },
            examples=[
                {
                    "query": "How to make sourdough bread",
                    "document": "Sourdough bread recipe: Mix flour, water, starter...",
                    "rating": "PERFECTLY",
                    "reason": "Direct recipe for the query"
                },
                {
                    "query": "How to make sourdough bread",
                    "document": "The history of bread dates back 10,000 years...",
                    "rating": "NOT_RELEVANT",
                    "reason": "About bread history, not how to make sourdough"
                }
            ],
            edge_cases=[
                "If document is relevant but outdated, rate MARGINALLY",
                "If document answers a related but different question, rate FAIRLY",
                "If unsure between two ratings, choose the lower one"
            ]
        )


class HumanEvaluationPipeline:
    """Pipeline for collecting and analyzing human judgments."""

    def __init__(self, guidelines: AnnotationGuidelines):
        self.guidelines = guidelines
        self.annotations = []
        self.annotator_stats = {}

    def create_task_batch(
        self,
        queries: List[str],
        candidates: List[List[str]],
        n_per_query: int = 10
    ) -> List[EvaluationTask]:
        """Create a batch of evaluation tasks."""
        tasks = []
        for i, (query, cands) in enumerate(zip(queries, candidates)):
            for j, cand in enumerate(cands[:n_per_query]):
                tasks.append(EvaluationTask(
                    task_id=f"q{i}_c{j}",
                    query=query,
                    candidate=cand,
                    instructions=self.guidelines.task_description
                ))
        return tasks

    def compute_inter_annotator_agreement(
        self,
        annotations: List[dict]
    ) -> dict:
        """
        Compute inter-annotator agreement metrics.

        Returns Cohen's Kappa for pairs and Fleiss' Kappa for groups.
        """
        # Group by task
        task_annotations = {}
        for ann in annotations:
            tid = ann["task_id"]
            if tid not in task_annotations:
                task_annotations[tid] = []
            task_annotations[tid].append(ann["rating"])

        # Only tasks with multiple annotations
        multi = {k: v for k, v in task_annotations.items() if len(v) >= 2}

        if not multi:
            return {"error": "No tasks with multiple annotations"}

        # Simple agreement rate
        agreements = []
        for ratings in multi.values():
            # Check if all annotators agree
            agreements.append(1.0 if len(set(ratings)) == 1 else 0.0)

        # Pairwise agreement
        pairwise = []
        for ratings in multi.values():
            for i in range(len(ratings)):
                for j in range(i + 1, len(ratings)):
                    pairwise.append(1.0 if ratings[i] == ratings[j] else 0.0)

        return {
            "exact_agreement_rate": sum(agreements) / len(agreements),
            "pairwise_agreement_rate": sum(pairwise) / len(pairwise) if pairwise else 0,
            "tasks_with_multiple_annotations": len(multi)
        }


# Example usage
guidelines = AnnotationGuidelines.create_search_relevance_guidelines()
pipeline = HumanEvaluationPipeline(guidelines)

print("Task description:")
print(guidelines.task_description)
print("\nRelevance scale:")
for level, desc in guidelines.relevance_definitions.items():
    print(f"  {level}: {desc}")
Task description:

            Rate how well each document answers the given query.
            Consider: Does it answer the question? Is the information accurate?
            Would a user be satisfied with this result?
            

Relevance scale:
  NOT_RELEVANT: Document has no useful information for the query
  MARGINALLY: Document is tangentially related but doesn't answer the query
  FAIRLY: Document partially answers the query or provides related info
  HIGHLY: Document substantially answers the query
  PERFECTLY: Document is an ideal answer to the query

21.3.2 Quality Assurance for Annotations

Annotation quality varies. Implement quality controls:

Show annotation quality assurance
import numpy as np
from collections import defaultdict

class AnnotationQualityMonitor:
    """Monitor and filter annotation quality."""

    def __init__(self, gold_standard_tasks: dict):
        """
        Args:
            gold_standard_tasks: {task_id: expected_rating} for quality checks
        """
        self.gold_standard = gold_standard_tasks
        self.annotator_performance = defaultdict(lambda: {"correct": 0, "total": 0})

    def check_annotation(self, annotator_id: str, task_id: str, rating: int) -> dict:
        """Check annotation against gold standard if available."""
        result = {"is_gold": False, "correct": None}

        if task_id in self.gold_standard:
            result["is_gold"] = True
            expected = self.gold_standard[task_id]
            result["correct"] = (rating == expected)

            self.annotator_performance[annotator_id]["total"] += 1
            if result["correct"]:
                self.annotator_performance[annotator_id]["correct"] += 1

        return result

    def get_annotator_accuracy(self, annotator_id: str) -> float:
        """Get annotator's accuracy on gold standard tasks."""
        perf = self.annotator_performance[annotator_id]
        if perf["total"] == 0:
            return None
        return perf["correct"] / perf["total"]

    def get_reliable_annotators(self, min_accuracy: float = 0.7, min_tasks: int = 10) -> List[str]:
        """Get list of annotators meeting quality threshold."""
        reliable = []
        for annotator_id, perf in self.annotator_performance.items():
            if perf["total"] >= min_tasks:
                accuracy = perf["correct"] / perf["total"]
                if accuracy >= min_accuracy:
                    reliable.append(annotator_id)
        return reliable

    def filter_annotations(
        self,
        annotations: List[dict],
        require_agreement: bool = True,
        min_annotator_accuracy: float = 0.7
    ) -> List[dict]:
        """Filter annotations based on quality criteria."""
        reliable_annotators = set(self.get_reliable_annotators(min_annotator_accuracy))

        # First filter: annotator quality
        quality_filtered = [
            a for a in annotations
            if a["annotator_id"] in reliable_annotators
        ]

        if not require_agreement:
            return quality_filtered

        # Second filter: annotation agreement
        task_ratings = defaultdict(list)
        for a in quality_filtered:
            task_ratings[a["task_id"]].append(a)

        final_annotations = []
        for task_id, ratings in task_ratings.items():
            if len(ratings) < 2:
                continue

            # Check for agreement (allow ±1 difference)
            values = [r["rating"] for r in ratings]
            if max(values) - min(values) <= 1:
                # Use median rating
                median_rating = int(np.median(values))
                final_annotations.append({
                    "task_id": task_id,
                    "rating": median_rating,
                    "confidence": 1.0 - (max(values) - min(values)) / 4,
                    "n_annotators": len(ratings)
                })

        return final_annotations


# Example
gold_standard = {"q0_c0": 3, "q1_c0": 1, "q2_c0": 4}
monitor = AnnotationQualityMonitor(gold_standard)

# Simulate annotations
for i, (tid, expected) in enumerate(gold_standard.items()):
    # Good annotator gets most right
    monitor.check_annotation("annotator_1", tid, expected)
    # Poor annotator gets some wrong
    monitor.check_annotation("annotator_2", tid, expected if i % 2 == 0 else expected - 1)

print("Annotator accuracy on gold standard:")
print(f"  Annotator 1: {monitor.get_annotator_accuracy('annotator_1'):.0%}")
print(f"  Annotator 2: {monitor.get_annotator_accuracy('annotator_2'):.0%}")
Annotator accuracy on gold standard:
  Annotator 1: 100%
  Annotator 2: 67%
WarningCommon Annotation Pitfalls
  1. Position bias: Annotators rate earlier items higher—randomize order
  2. Fatigue: Quality drops after many annotations—limit session length
  3. Anchoring: First example influences all subsequent ratings—vary examples
  4. Scale confusion: Annotators interpret scales differently—provide clear examples
  5. Speed-accuracy trade-off: Fast annotators often less accurate—monitor speed

Mitigation strategies: - Include 10-15% gold standard tasks for quality monitoring - Require minimum time per task (e.g., 10 seconds) - Use attention check questions - Collect 3+ annotations per task for agreement filtering

21.4 Domain-Specific Metrics

Different applications require specialized metrics. This section covers evaluation frameworks for common embedding use cases.

21.4.2 Recommendation Systems

Show recommendation metrics
import numpy as np

class RecommendationMetrics:
    """Metrics for evaluating recommendation systems."""

    @staticmethod
    def hit_rate(
        recommendations: list,
        ground_truth: list,
        k: int = 10
    ) -> float:
        """Fraction of users with at least one relevant item in top-K."""
        hits = 0
        for recs, truth in zip(recommendations, ground_truth):
            if set(recs[:k]) & set(truth):
                hits += 1
        return hits / len(recommendations)

    @staticmethod
    def serendipity(
        recommendations: list,
        user_history: list,
        item_similarity: dict,
        k: int = 10
    ) -> float:
        """
        Serendipity: relevant recommendations that are unexpected.
        Balances relevance with surprise.
        """
        serendipities = []

        for recs, history in zip(recommendations, user_history):
            top_k = recs[:k]
            rec_serendipity = []

            for rec in top_k:
                # How different is this from user's history?
                min_similarity = min(
                    item_similarity.get((rec, h), item_similarity.get((h, rec), 0.5))
                    for h in history
                ) if history else 1.0

                # Serendipity = 1 - max_similarity (higher when more different)
                rec_serendipity.append(1 - min_similarity)

            serendipities.append(np.mean(rec_serendipity) if rec_serendipity else 0)

        return np.mean(serendipities)

    @staticmethod
    def gini_coefficient(item_recommendation_counts: list) -> float:
        """
        Gini coefficient of recommendation distribution.
        0 = perfect equality (all items recommended equally)
        1 = perfect inequality (one item gets all recommendations)

        Use to detect popularity bias.
        """
        counts = np.array(sorted(item_recommendation_counts))
        n = len(counts)
        index = np.arange(1, n + 1)
        return (2 * np.sum(index * counts) - (n + 1) * np.sum(counts)) / (n * np.sum(counts))

    @staticmethod
    def beyond_accuracy_report(
        recommendations: list,
        item_categories: dict,
        item_popularity: dict,
        k: int = 10
    ) -> dict:
        """Comprehensive beyond-accuracy metrics report."""
        # Aggregate statistics
        all_recs = [item for rec in recommendations for item in rec[:k]]
        rec_counts = Counter(all_recs)

        # Coverage
        coverage = len(set(all_recs)) / len(item_popularity)

        # Gini (popularity concentration)
        popularity_counts = list(rec_counts.values())
        gini = RecommendationMetrics.gini_coefficient(popularity_counts)

        # Category coverage
        rec_categories = set(item_categories.get(item, "unk") for item in set(all_recs))
        category_coverage = len(rec_categories) / len(set(item_categories.values()))

        # Popularity bias
        avg_pop = np.mean([item_popularity.get(item, 0) for item in all_recs])
        overall_avg_pop = np.mean(list(item_popularity.values()))
        popularity_bias = avg_pop / overall_avg_pop

        return {
            "catalog_coverage": coverage,
            "gini_coefficient": gini,
            "category_coverage": category_coverage,
            "popularity_bias": popularity_bias,  # >1 means biased toward popular
            "unique_items_recommended": len(set(all_recs))
        }


# Example
metrics = RecommendationMetrics()

# Simulate
recommendations = [
    ["item_1", "item_2", "item_3"],
    ["item_1", "item_4", "item_5"],
    ["item_1", "item_2", "item_6"]
]
ground_truth = [["item_2", "item_7"], ["item_4"], ["item_6", "item_8"]]

print(f"Hit rate@3: {metrics.hit_rate(recommendations, ground_truth, k=3):.2f}")

# Beyond accuracy
item_popularity = {f"item_{i}": 100 - i*10 for i in range(1, 11)}
item_categories = {f"item_{i}": f"cat_{i % 3}" for i in range(1, 11)}

report = metrics.beyond_accuracy_report(recommendations, item_categories, item_popularity, k=3)
print(f"\nBeyond-accuracy metrics:")
for k, v in report.items():
    print(f"  {k}: {v:.2f}")
Hit rate@3: 1.00

Beyond-accuracy metrics:
  catalog_coverage: 0.60
  gini_coefficient: 0.24
  category_coverage: 1.00
  popularity_bias: 1.60
  unique_items_recommended: 6.00

21.4.3 Anomaly Detection and Fraud

Show anomaly detection metrics
import numpy as np

class AnomalyDetectionMetrics:
    """Metrics for evaluating anomaly/fraud detection systems."""

    @staticmethod
    def detection_rate_at_false_positive_rate(
        scores: np.ndarray,
        labels: np.ndarray,
        target_fpr: float = 0.01
    ) -> dict:
        """
        Detection rate (recall) at a specific false positive rate.
        Critical for fraud detection where FP rate must be controlled.
        """
        # Sort by score descending
        sorted_indices = np.argsort(scores)[::-1]
        sorted_labels = labels[sorted_indices]

        n_positives = labels.sum()
        n_negatives = len(labels) - n_positives

        # Find threshold achieving target FPR
        max_fp = int(target_fpr * n_negatives)

        fp = 0
        tp = 0
        threshold_idx = 0

        for i, label in enumerate(sorted_labels):
            if label == 1:
                tp += 1
            else:
                fp += 1
                if fp >= max_fp:
                    threshold_idx = i
                    break

        detection_rate = tp / n_positives if n_positives > 0 else 0
        actual_fpr = fp / n_negatives if n_negatives > 0 else 0

        return {
            "detection_rate": detection_rate,
            "actual_fpr": actual_fpr,
            "target_fpr": target_fpr,
            "threshold_index": threshold_idx
        }

    @staticmethod
    def cost_sensitive_evaluation(
        predictions: np.ndarray,
        labels: np.ndarray,
        fp_cost: float,
        fn_cost: float
    ) -> dict:
        """
        Evaluate with asymmetric costs.

        Args:
            fp_cost: Cost of false positive (e.g., investigation cost)
            fn_cost: Cost of false negative (e.g., fraud loss)
        """
        tp = ((predictions == 1) & (labels == 1)).sum()
        fp = ((predictions == 1) & (labels == 0)).sum()
        tn = ((predictions == 0) & (labels == 0)).sum()
        fn = ((predictions == 0) & (labels == 1)).sum()

        total_cost = fp * fp_cost + fn * fn_cost

        # Compare to always-predict-negative baseline
        baseline_cost = labels.sum() * fn_cost

        return {
            "total_cost": total_cost,
            "cost_per_prediction": total_cost / len(labels),
            "cost_reduction_vs_baseline": 1 - (total_cost / baseline_cost) if baseline_cost > 0 else 0,
            "confusion_matrix": {"tp": tp, "fp": fp, "tn": tn, "fn": fn}
        }

    @staticmethod
    def time_to_detection(
        anomaly_timestamps: list,
        detection_timestamps: list
    ) -> dict:
        """
        How quickly anomalies are detected after they occur.
        """
        detection_times = []

        for anomaly_time, detection_time in zip(anomaly_timestamps, detection_timestamps):
            if detection_time is not None:
                detection_times.append(detection_time - anomaly_time)

        if not detection_times:
            return {"mean_ttd": None, "detection_rate": 0}

        return {
            "mean_ttd": np.mean(detection_times),
            "median_ttd": np.median(detection_times),
            "p95_ttd": np.percentile(detection_times, 95),
            "detection_rate": len(detection_times) / len(anomaly_timestamps)
        }


# Example
np.random.seed(42)
n_samples = 10000
fraud_rate = 0.01

labels = (np.random.random(n_samples) < fraud_rate).astype(int)
# Scores: frauds should have higher scores on average
scores = np.random.random(n_samples)
scores[labels == 1] += 0.3
scores = np.clip(scores, 0, 1)

metrics = AnomalyDetectionMetrics()

# Detection at 1% FPR
result = metrics.detection_rate_at_false_positive_rate(scores, labels, target_fpr=0.01)
print(f"Detection rate at 1% FPR: {result['detection_rate']:.1%}")

# Cost-sensitive evaluation
predictions = (scores > 0.5).astype(int)
cost_result = metrics.cost_sensitive_evaluation(
    predictions, labels,
    fp_cost=100,    # $100 investigation cost
    fn_cost=10000   # $10,000 fraud loss
)
print(f"Cost reduction vs baseline: {cost_result['cost_reduction_vs_baseline']:.1%}")
Detection rate at 1% FPR: 24.2%
Cost reduction vs baseline: 23.0%

21.5 Statistical Rigor

Embedding evaluation requires statistical rigor to draw valid conclusions. This section covers sample size calculation, significance testing, and multiple comparison corrections.

21.5.1 Sample Size and Power Analysis

Show power analysis for A/B testing
import numpy as np
from scipy import stats

def sample_size_for_metric_change(
    baseline_metric: float,
    minimum_detectable_effect: float,
    metric_variance: float,
    alpha: float = 0.05,
    power: float = 0.8
) -> int:
    """
    Calculate required sample size for detecting a metric change.

    Args:
        baseline_metric: Current metric value (e.g., 0.15 for 15% CTR)
        minimum_detectable_effect: Relative change to detect (e.g., 0.05 for 5% improvement)
        metric_variance: Variance of the metric
        alpha: Significance level (Type I error rate)
        power: Statistical power (1 - Type II error rate)

    Returns:
        Required sample size per group
    """
    effect_size = baseline_metric * minimum_detectable_effect

    z_alpha = stats.norm.ppf(1 - alpha/2)  # Two-tailed
    z_beta = stats.norm.ppf(power)

    # Sample size formula for two-sample t-test
    n = 2 * ((z_alpha + z_beta) ** 2) * metric_variance / (effect_size ** 2)

    return int(np.ceil(n))


def minimum_detectable_effect(
    sample_size: int,
    baseline_metric: float,
    metric_variance: float,
    alpha: float = 0.05,
    power: float = 0.8
) -> float:
    """
    Calculate minimum detectable effect given sample size.
    """
    z_alpha = stats.norm.ppf(1 - alpha/2)
    z_beta = stats.norm.ppf(power)

    effect = np.sqrt(2 * ((z_alpha + z_beta) ** 2) * metric_variance / sample_size)

    return effect / baseline_metric


# Example: CTR experiment
baseline_ctr = 0.15
ctr_variance = baseline_ctr * (1 - baseline_ctr)  # Bernoulli variance

print("Sample size requirements for CTR experiment:")
for mde in [0.01, 0.02, 0.05, 0.10]:
    n = sample_size_for_metric_change(baseline_ctr, mde, ctr_variance)
    print(f"  Detect {mde:.0%} change: {n:,} samples per group")

print("\nMinimum detectable effect for given sample sizes:")
for n in [1000, 10000, 100000]:
    mde = minimum_detectable_effect(n, baseline_ctr, ctr_variance)
    print(f"  n={n:,}: can detect {mde:.1%} change")
Sample size requirements for CTR experiment:
  Detect 1% change: 889,540 samples per group
  Detect 2% change: 222,385 samples per group
  Detect 5% change: 35,582 samples per group
  Detect 10% change: 8,896 samples per group

Minimum detectable effect for given sample sizes:
  n=1,000: can detect 29.8% change
  n=10,000: can detect 9.4% change
  n=100,000: can detect 3.0% change

21.5.2 Confidence Intervals for Metrics

Show bootstrap confidence intervals
import numpy as np

def bootstrap_confidence_interval(
    metric_func,
    data: np.ndarray,
    n_bootstrap: int = 1000,
    confidence: float = 0.95
) -> dict:
    """
    Compute bootstrap confidence interval for any metric.

    Args:
        metric_func: Function that computes metric from data
        data: Array of data points
        n_bootstrap: Number of bootstrap samples
        confidence: Confidence level
    """
    point_estimate = metric_func(data)

    bootstrap_estimates = []
    for _ in range(n_bootstrap):
        # Sample with replacement
        sample = np.random.choice(data, size=len(data), replace=True)
        bootstrap_estimates.append(metric_func(sample))

    bootstrap_estimates = np.array(bootstrap_estimates)

    alpha = 1 - confidence
    lower = np.percentile(bootstrap_estimates, 100 * alpha/2)
    upper = np.percentile(bootstrap_estimates, 100 * (1 - alpha/2))

    return {
        "point_estimate": point_estimate,
        "ci_lower": lower,
        "ci_upper": upper,
        "confidence": confidence,
        "std_error": bootstrap_estimates.std()
    }


# Example: Confidence interval for recall@10
np.random.seed(42)
# Simulate recall values for 1000 queries
recall_values = np.random.beta(8, 2, 1000)  # Skewed distribution

result = bootstrap_confidence_interval(np.mean, recall_values)
print(f"Recall@10: {result['point_estimate']:.4f}")
print(f"95% CI: [{result['ci_lower']:.4f}, {result['ci_upper']:.4f}]")
print(f"Standard error: {result['std_error']:.4f}")
Recall@10: 0.7956
95% CI: [0.7880, 0.8023]
Standard error: 0.0036

21.5.3 Multiple Testing Correction

When evaluating multiple metrics, the chance of false positives increases. Apply corrections:

Show multiple testing correction
import numpy as np

def bonferroni_correction(p_values: list, alpha: float = 0.05) -> dict:
    """
    Bonferroni correction: most conservative.
    Adjusted alpha = alpha / n_tests
    """
    n_tests = len(p_values)
    adjusted_alpha = alpha / n_tests

    significant = [p < adjusted_alpha for p in p_values]

    return {
        "method": "bonferroni",
        "original_alpha": alpha,
        "adjusted_alpha": adjusted_alpha,
        "significant": significant,
        "n_significant": sum(significant)
    }


def benjamini_hochberg_correction(p_values: list, alpha: float = 0.05) -> dict:
    """
    Benjamini-Hochberg: controls False Discovery Rate.
    Less conservative than Bonferroni, more power.
    """
    n_tests = len(p_values)
    sorted_indices = np.argsort(p_values)
    sorted_p = np.array(p_values)[sorted_indices]

    # BH threshold: p_i <= (i/n) * alpha
    thresholds = [(i + 1) / n_tests * alpha for i in range(n_tests)]

    # Find largest k where p_k <= threshold_k
    significant_sorted = [False] * n_tests
    max_significant = -1

    for i in range(n_tests):
        if sorted_p[i] <= thresholds[i]:
            max_significant = i

    for i in range(max_significant + 1):
        significant_sorted[i] = True

    # Map back to original order
    significant = [False] * n_tests
    for i, orig_idx in enumerate(sorted_indices):
        significant[orig_idx] = significant_sorted[i]

    return {
        "method": "benjamini_hochberg",
        "original_alpha": alpha,
        "significant": significant,
        "n_significant": sum(significant)
    }


# Example: Testing multiple metrics
np.random.seed(42)
p_values = [0.001, 0.02, 0.03, 0.04, 0.06, 0.15, 0.25]
metric_names = ["NDCG@10", "Recall@10", "MRR", "Precision@10", "CTR", "Dwell", "Bounce"]

print("P-values and significance (alpha=0.05):")
print("-" * 50)

bonf = bonferroni_correction(p_values)
bh = benjamini_hochberg_correction(p_values)

for i, (name, p) in enumerate(zip(metric_names, p_values)):
    bonf_sig = "✓" if bonf["significant"][i] else "✗"
    bh_sig = "✓" if bh["significant"][i] else "✗"
    print(f"{name:15} p={p:.3f}  Bonferroni: {bonf_sig}  BH: {bh_sig}")

print("-" * 50)
print(f"Bonferroni significant: {bonf['n_significant']}/{len(p_values)}")
print(f"Benjamini-Hochberg significant: {bh['n_significant']}/{len(p_values)}")
P-values and significance (alpha=0.05):
--------------------------------------------------
NDCG@10         p=0.001  Bonferroni: ✓  BH: ✓
Recall@10       p=0.020  Bonferroni: ✗  BH: ✗
MRR             p=0.030  Bonferroni: ✗  BH: ✗
Precision@10    p=0.040  Bonferroni: ✗  BH: ✗
CTR             p=0.060  Bonferroni: ✗  BH: ✗
Dwell           p=0.150  Bonferroni: ✗  BH: ✗
Bounce          p=0.250  Bonferroni: ✗  BH: ✗
--------------------------------------------------
Bonferroni significant: 1/7
Benjamini-Hochberg significant: 1/7
TipWhen to Use Which Correction

Bonferroni: Use when false positives are very costly (medical, financial decisions). Very conservative—may miss real effects.

Benjamini-Hochberg: Use for exploratory analysis or when some false positives are acceptable. Controls False Discovery Rate rather than family-wise error rate.

No correction: Only when metrics are truly independent and you’re comfortable with inflated Type I error.

Rule of thumb: If you’re making decisions based on results, use correction. If exploring data for hypotheses to test later, correction may be optional.

21.6 Evaluation at Scale

Evaluating embeddings over trillions of items requires efficient sampling and computation strategies.

21.6.1 Stratified Sampling for Large Corpora

Show stratified sampling strategy
import numpy as np
from collections import defaultdict

class StratifiedEvaluationSampler:
    """Efficient stratified sampling for large-scale evaluation."""

    def __init__(self, corpus_size: int, strata_assignments: dict):
        """
        Args:
            corpus_size: Total number of items
            strata_assignments: {stratum_name: [item_indices]}
        """
        self.corpus_size = corpus_size
        self.strata = strata_assignments

    def sample_stratified(
        self,
        total_sample_size: int,
        allocation: str = "proportional",
        min_per_stratum: int = 100
    ) -> dict:
        """
        Draw stratified sample.

        Args:
            total_sample_size: Total samples to draw
            allocation: 'proportional', 'equal', or 'neyman' (optimal)
            min_per_stratum: Minimum samples per stratum
        """
        stratum_sizes = {k: len(v) for k, v in self.strata.items()}
        n_strata = len(self.strata)

        # Determine allocation
        if allocation == "proportional":
            weights = {k: v / self.corpus_size for k, v in stratum_sizes.items()}
        elif allocation == "equal":
            weights = {k: 1 / n_strata for k in self.strata}
        else:
            raise ValueError(f"Unknown allocation: {allocation}")

        # Allocate samples
        samples_per_stratum = {}
        remaining = total_sample_size - min_per_stratum * n_strata

        for stratum in self.strata:
            base = min_per_stratum
            additional = int(remaining * weights[stratum])
            samples_per_stratum[stratum] = min(base + additional, stratum_sizes[stratum])

        # Draw samples
        sampled_indices = {}
        for stratum, indices in self.strata.items():
            n_sample = samples_per_stratum[stratum]
            sampled_indices[stratum] = np.random.choice(
                indices, size=n_sample, replace=False
            ).tolist()

        return {
            "samples_per_stratum": samples_per_stratum,
            "sampled_indices": sampled_indices,
            "total_sampled": sum(samples_per_stratum.values())
        }

    def oversample_rare_strata(
        self,
        base_sample: dict,
        rare_strata: list,
        oversample_factor: float = 3.0
    ) -> dict:
        """Oversample rare but important strata (e.g., tail queries)."""
        enhanced_indices = dict(base_sample["sampled_indices"])

        for stratum in rare_strata:
            if stratum in self.strata:
                current_n = len(enhanced_indices[stratum])
                target_n = min(
                    int(current_n * oversample_factor),
                    len(self.strata[stratum])
                )
                enhanced_indices[stratum] = np.random.choice(
                    self.strata[stratum], size=target_n, replace=False
                ).tolist()

        return {
            "sampled_indices": enhanced_indices,
            "total_sampled": sum(len(v) for v in enhanced_indices.values())
        }


# Example
np.random.seed(42)
corpus_size = 10_000_000

# Define strata based on item popularity
strata = {
    "head": list(range(0, 1000)),           # Top 1K items (0.01%)
    "torso": list(range(1000, 100000)),     # Next 99K (1%)
    "tail": list(range(100000, corpus_size)) # Rest (99%)
}

sampler = StratifiedEvaluationSampler(corpus_size, strata)

# Draw sample
sample = sampler.sample_stratified(total_sample_size=10000)
print("Proportional stratified sample:")
for stratum, n in sample["samples_per_stratum"].items():
    print(f"  {stratum}: {n} samples")

# Oversample tail
enhanced = sampler.oversample_rare_strata(sample, rare_strata=["tail"])
print(f"\nAfter oversampling tail: {enhanced['total_sampled']} total samples")
Proportional stratified sample:
  head: 100 samples
  torso: 196 samples
  tail: 9703 samples

After oversampling tail: 29405 total samples

21.6.2 Efficient Metric Computation

Show efficient evaluation at scale
import torch
import numpy as np

class EfficientEvaluator:
    """Efficient evaluation for large-scale embedding systems."""

    def __init__(self, embedding_dim: int, device: str = "cpu"):
        self.embedding_dim = embedding_dim
        self.device = device

    def batch_recall_at_k(
        self,
        query_embeddings: torch.Tensor,
        corpus_embeddings: torch.Tensor,
        relevance: torch.Tensor,
        k: int = 10,
        batch_size: int = 1000
    ) -> float:
        """
        Compute Recall@K with batched processing for memory efficiency.
        """
        n_queries = len(query_embeddings)
        total_recall = 0.0

        for i in range(0, n_queries, batch_size):
            batch_queries = query_embeddings[i:i+batch_size].to(self.device)
            batch_relevance = relevance[i:i+batch_size]

            # Normalize
            batch_queries = batch_queries / batch_queries.norm(dim=1, keepdim=True)
            corpus_norm = corpus_embeddings / corpus_embeddings.norm(dim=1, keepdim=True)

            # Compute similarities
            similarities = batch_queries @ corpus_norm.T

            # Get top-k
            top_k_indices = similarities.topk(k, dim=1).indices

            # Compute recall
            for j, (topk, rel) in enumerate(zip(top_k_indices, batch_relevance)):
                relevant_items = rel.nonzero().squeeze(-1)
                if len(relevant_items) == 0:
                    continue
                found = (topk.unsqueeze(1) == relevant_items.unsqueeze(0)).any(dim=1).sum()
                total_recall += found.item() / len(relevant_items)

        return total_recall / n_queries

    def approximate_evaluation(
        self,
        query_sample_indices: list,
        query_embeddings: torch.Tensor,
        corpus_embeddings: torch.Tensor,
        relevance: torch.Tensor,
        k: int = 10
    ) -> dict:
        """
        Evaluate on sampled queries with confidence intervals.
        """
        sampled_queries = query_embeddings[query_sample_indices]
        sampled_relevance = relevance[query_sample_indices]

        # Compute metric
        recall = self.batch_recall_at_k(
            sampled_queries, corpus_embeddings, sampled_relevance, k
        )

        # Bootstrap confidence interval
        n_bootstrap = 100
        bootstrap_recalls = []

        for _ in range(n_bootstrap):
            boot_indices = np.random.choice(len(query_sample_indices), size=len(query_sample_indices), replace=True)
            boot_queries = sampled_queries[boot_indices]
            boot_relevance = sampled_relevance[boot_indices]

            boot_recall = self.batch_recall_at_k(
                boot_queries, corpus_embeddings, boot_relevance, k
            )
            bootstrap_recalls.append(boot_recall)

        return {
            f"recall@{k}": recall,
            "ci_lower": np.percentile(bootstrap_recalls, 2.5),
            "ci_upper": np.percentile(bootstrap_recalls, 97.5),
            "n_queries_evaluated": len(query_sample_indices),
            "confidence": 0.95
        }


# Example
torch.manual_seed(42)
evaluator = EfficientEvaluator(embedding_dim=256)

# Simulate large-scale evaluation
n_queries, n_corpus = 10000, 100000
queries = torch.randn(n_queries, 256)
corpus = torch.randn(n_corpus, 256)
relevance = (torch.rand(n_queries, n_corpus) < 0.001).float()

# Sample 1000 queries for evaluation
sample_indices = np.random.choice(n_queries, size=1000, replace=False).tolist()

result = evaluator.approximate_evaluation(
    sample_indices, queries, corpus, relevance, k=10
)
print(f"Recall@10: {result['recall@10']:.4f}")
print(f"95% CI: [{result['ci_lower']:.4f}, {result['ci_upper']:.4f}]")
print(f"Evaluated on {result['n_queries_evaluated']} queries")
Recall@10: 0.0001
95% CI: [0.0000, 0.0002]
Evaluated on 1000 queries

21.7 Key Takeaways

  • Intrinsic quality metrics (isotropy, uniformity, alignment) detect embedding problems without downstream tasks—monitor them continuously to catch degradation early

  • Choose retrieval metrics based on your use case: Recall@K for coverage, Precision@K when false positives are costly, NDCG for graded relevance, MRR for navigational queries, MAP for comprehensive ranking quality

  • Human evaluation provides ground truth that automated metrics cannot capture—design clear tasks, use quality controls, and measure inter-annotator agreement

  • Domain-specific metrics matter: E-commerce needs zero-result rate and catalog coverage; recommendations need diversity and novelty; fraud detection needs cost-sensitive evaluation

  • Statistical rigor is essential: Calculate required sample sizes, report confidence intervals, and apply multiple testing corrections when evaluating many metrics

  • Scale requires smart sampling: Use stratified sampling, oversample rare but important segments, and compute confidence intervals to quantify uncertainty

21.8 Looking Ahead

Chapter 22 shifts focus from evaluation to serving, exploring high-performance vector operations: optimized similarity search algorithms, approximate nearest neighbor (ANN) methods, GPU acceleration for vector operations, memory-mapped storage strategies, and parallel query processing that enables sub-millisecond similarity search across billion-vector indices.

21.9 Further Reading

21.9.1 Intrinsic Quality Metrics

  • Mu, Jiaqi, et al. (2018). “All-but-the-Top: Simple and Effective Postprocessing for Word Representations.” ICLR.
  • Wang, Tongzhou, and Phillip Isola (2020). “Understanding Contrastive Representation Learning through Alignment and Uniformity on the Hypersphere.” ICML.
  • Ethayarajh, Kawin (2019). “How Contextual are Contextualized Word Representations?” EMNLP.

21.9.2 Retrieval Evaluation

  • Manning, Christopher D., Prabhakar Raghavan, and Hinrich Schütze (2008). “Introduction to Information Retrieval.” Cambridge University Press. Chapter 8.
  • Järvelin, Kalervo, and Jaana Kekäläinen (2002). “Cumulated Gain-Based Evaluation of IR Techniques.” ACM TOIS.
  • Craswell, Nick (2009). “Mean Reciprocal Rank.” Encyclopedia of Database Systems.

21.9.3 Human Evaluation

  • Voorhees, Ellen M. (2000). “Variations in Relevance Judgments and the Measurement of Retrieval Effectiveness.” Information Processing & Management.
  • Carterette, Ben (2011). “System Effectiveness, User Models, and User Utility: A Conceptual Framework for Investigation.” SIGIR.
  • Alonso, Omar, and Stefano Mizzaro (2012). “Using Crowdsourcing for TREC Relevance Assessment.” Information Processing & Management.

21.9.4 Statistical Methods

  • Sakai, Tetsuya (2014). “Statistical Reform in Information Retrieval?” SIGIR Forum.
  • Carterette, Ben (2012). “Multiple Testing in Statistical Analysis of Systems-Based Information Retrieval Experiments.” ACM TOIS.
  • Smucker, Mark D., James Allan, and Ben Carterette (2007). “A Comparison of Statistical Significance Tests for Information Retrieval Evaluation.” CIKM.

21.9.5 Beyond-Accuracy Evaluation

  • Ge, Mouzhi, Carla Delgado-Battenfeld, and Dietmar Jannach (2010). “Beyond Accuracy: Evaluating Recommender Systems by Coverage and Serendipity.” RecSys.
  • Kaminskas, Marius, and Derek Bridge (2016). “Diversity, Serendipity, Novelty, and Coverage: A Survey and Empirical Analysis of Beyond-Accuracy Objectives in Recommender Systems.” ACM TIST.
  • Castells, Pablo, Neil J. Hurley, and Saul Vargas (2015). “Novelty and Diversity in Recommender Systems.” Recommender Systems Handbook.