28  Entity Resolution and Data Quality

NoteChapter Overview

Entity resolution—determining when different records refer to the same real-world entity—is a foundational data quality challenge that affects every organization operating at scale. This chapter applies embeddings to entity resolution at trillion-record scale: blocking strategies that reduce quadratic matching complexity to linear, similarity scoring using learned embeddings that capture semantic equivalence beyond exact string matching, graph-based resolution that propagates match decisions through entity networks, active learning approaches that maximize human labeling efficiency, and incremental matching systems that handle streaming data. These techniques transform entity resolution from a batch data cleaning task to a real-time, continuously learning system that maintains data quality as records arrive.

Building on the cross-industry patterns (Chapter 26), entity resolution represents the most computationally challenging cross-industry problem: determining when two records refer to the same real-world entity. Customer databases contain duplicates; healthcare systems need patient matching; marketing platforms build identity graphs; government agencies link records across departments. At scale, the naive O(N²) approach—comparing every record to every other—becomes impossible: 1 billion records would require 10¹⁸ comparisons.

Embedding-based entity resolution transforms this challenge by representing records as vectors, enabling approximate nearest neighbor search to find candidate matches in sub-linear time, learned similarity functions that capture semantic equivalence, and graph neural networks that propagate match decisions through entity networks.

28.1 The Entity Resolution Challenge

Entity resolution appears under many names: record linkage, deduplication, entity matching, merge-purge, and identity resolution. The core problem remains the same: given two (or more) records, do they refer to the same real-world entity?

28.1.1 Why Entity Resolution is Hard

Traditional approaches face fundamental limitations:

  • Scale: N records require O(N²) comparisons for exhaustive matching
  • Data quality: Typos, abbreviations, missing values, format variations
  • Schema heterogeneity: Different systems represent entities differently
  • Temporal changes: Addresses, names, relationships change over time
  • Ambiguity: “John Smith” could be millions of different people
  • Transitivity: If A=B and B=C, then A=C—but similarity isn’t transitive

Example challenges:

Record A: "Jon Smith, 123 Main St, NYC"
Record B: "Jonathan Smith, 123 Main Street, New York"
Record C: "John Smith, 456 Oak Ave, Los Angeles"

Are A and B the same person? (Likely yes - name variation, address normalization)
Are A and C the same person? (Likely no - different address, but common name)

28.1.2 The Scale Problem

At trillion-record scale, exhaustive comparison is impossible:

Records Comparisons (N²) Time at 1M/sec
1,000 500,000 0.5 seconds
1 million 500 billion 6 days
1 billion 500 quintillion 15 million years

Embedding approach: Use learned embeddings + approximate nearest neighbor (ANN) search to find candidate matches in O(N log N) or O(N) time, then apply detailed comparison only to candidates.

28.2 Blocking: Reducing Comparison Space

Blocking partitions records into groups where matches are likely, comparing only within groups. Traditional blocking uses exact attribute values (same zip code, same first letter of name). Embedding-based blocking uses vector similarity to find candidate matches regardless of surface-form variations.

Show Embedding-Based Blocking
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from dataclasses import dataclass, field
from typing import List, Dict, Set, Tuple, Optional


@dataclass
class Record:
    """Entity record for matching."""
    record_id: str
    attributes: Dict[str, str]
    source: str
    timestamp: float = 0.0
    embedding: Optional[np.ndarray] = None


class RecordEncoder(nn.Module):
    """Encode records to embeddings for blocking and matching."""
    def __init__(self, vocab_size: int = 50000, embedding_dim: int = 256,
                 num_attributes: int = 10):
        super().__init__()
        self.char_embedding = nn.Embedding(vocab_size, 64)
        self.char_encoder = nn.LSTM(64, 128, num_layers=2,
                                     batch_first=True, bidirectional=True)
        self.attribute_attention = nn.MultiheadAttention(256, num_heads=8, batch_first=True)
        self.projection = nn.Sequential(
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(256, embedding_dim)
        )

    def encode_attribute(self, char_ids: torch.Tensor) -> torch.Tensor:
        """Encode single attribute from character IDs."""
        char_emb = self.char_embedding(char_ids)
        _, (hidden, _) = self.char_encoder(char_emb)
        # Concatenate forward and backward final states
        return torch.cat([hidden[-2], hidden[-1]], dim=-1)

    def forward(self, attribute_char_ids: List[torch.Tensor]) -> torch.Tensor:
        """Encode full record from multiple attributes."""
        # Encode each attribute
        attr_embeddings = [self.encode_attribute(attr) for attr in attribute_char_ids]
        attr_stack = torch.stack(attr_embeddings, dim=1)  # [batch, num_attrs, 256]

        # Self-attention across attributes
        attn_out, _ = self.attribute_attention(attr_stack, attr_stack, attr_stack)

        # Pool and project
        pooled = attn_out.mean(dim=1)
        return F.normalize(self.projection(pooled), p=2, dim=-1)


class EmbeddingBlocker:
    """Block records using embedding similarity."""
    def __init__(self, encoder: RecordEncoder, similarity_threshold: float = 0.7,
                 max_candidates: int = 100):
        self.encoder = encoder
        self.similarity_threshold = similarity_threshold
        self.max_candidates = max_candidates
        self.index = None  # Would use FAISS/ScaNN in production
        self.records: List[Record] = []

    def index_records(self, records: List[Record]) -> None:
        """Index records for blocking."""
        self.records = records
        # In production: build ANN index (FAISS, ScaNN)
        print(f"Indexed {len(records)} records for blocking")

    def find_candidates(self, query_record: Record) -> List[Tuple[str, float]]:
        """Find candidate matches using embedding similarity."""
        # In production: ANN search
        # Here: brute force for demonstration
        candidates = []
        query_emb = query_record.embedding

        for record in self.records:
            if record.record_id == query_record.record_id:
                continue
            similarity = np.dot(query_emb, record.embedding)
            if similarity >= self.similarity_threshold:
                candidates.append((record.record_id, similarity))

        # Sort by similarity, return top candidates
        candidates.sort(key=lambda x: x[1], reverse=True)
        return candidates[:self.max_candidates]


# Usage example
encoder = RecordEncoder(vocab_size=50000, embedding_dim=256)
blocker = EmbeddingBlocker(encoder, similarity_threshold=0.7)

# Simulate records
records = [
    Record("r1", {"name": "John Smith", "address": "123 Main St"}, "crm"),
    Record("r2", {"name": "Jon Smith", "address": "123 Main Street"}, "sales"),
    Record("r3", {"name": "Jane Doe", "address": "456 Oak Ave"}, "crm"),
]

# Assign random embeddings for demo
for r in records:
    r.embedding = np.random.randn(256).astype(np.float32)
    r.embedding /= np.linalg.norm(r.embedding)

blocker.index_records(records)
print(f"Blocker ready with {len(records)} records")
Indexed 3 records for blocking
Blocker ready with 3 records
TipBlocking Best Practices

Blocking strategies:

  • LSH (Locality Sensitive Hashing): Hash similar embeddings to same buckets
  • ANN indexes: FAISS IVF, ScaNN, HNSW for approximate search
  • Canopy clustering: Loose clusters for initial grouping
  • Sorted neighborhood: Sort by blocking key, slide window
  • Hybrid: Combine multiple blocking strategies for coverage

Tuning for recall vs efficiency:

  • High recall (>99%): Lower similarity threshold, more candidates per record
  • Efficiency: Higher threshold, fewer candidates, risk missing matches
  • Adaptive: Vary threshold based on record characteristics

Production considerations:

  • Incremental blocking: Add new records without full reindex
  • Distributed blocking: Partition across nodes by blocking key
  • Monitoring: Track blocking recall on labeled sample

28.3 Similarity Scoring with Learned Embeddings

Once blocking identifies candidate pairs, similarity scoring determines match probability. Traditional approaches use hand-crafted rules (Jaro-Winkler on names, edit distance on addresses). Learned similarity scoring trains models to predict match probability from record embeddings.

Show Learned Similarity Scoring
import torch
import torch.nn as nn


class SiameseMatcher(nn.Module):
    """Siamese network for record matching."""
    def __init__(self, embedding_dim: int = 256):
        super().__init__()
        # Comparison network
        self.comparison = nn.Sequential(
            nn.Linear(embedding_dim * 3, 512),  # concat + element-wise diff + product
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, 1),
            nn.Sigmoid()
        )

    def forward(self, emb_a: torch.Tensor, emb_b: torch.Tensor) -> torch.Tensor:
        """Compute match probability for record pairs."""
        # Multiple comparison features
        diff = torch.abs(emb_a - emb_b)
        product = emb_a * emb_b
        combined = torch.cat([emb_a, diff, product], dim=-1)

        return self.comparison(combined)


class AttributeAwareMatcher(nn.Module):
    """Match records with attribute-level attention."""
    def __init__(self, embedding_dim: int = 256, num_attributes: int = 10):
        super().__init__()
        self.num_attributes = num_attributes

        # Per-attribute comparison
        self.attr_comparators = nn.ModuleList([
            nn.Sequential(
                nn.Linear(embedding_dim * 2, 128),
                nn.ReLU(),
                nn.Linear(128, 64)
            ) for _ in range(num_attributes)
        ])

        # Attribute importance weights
        self.importance_weights = nn.Linear(num_attributes * 64, num_attributes)

        # Final classifier
        self.classifier = nn.Sequential(
            nn.Linear(num_attributes * 64, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 1),
            nn.Sigmoid()
        )

    def forward(self, attrs_a: List[torch.Tensor],
                attrs_b: List[torch.Tensor]) -> Tuple[torch.Tensor, torch.Tensor]:
        """Compare records at attribute level."""
        attr_comparisons = []

        for i, (a, b, comparator) in enumerate(
            zip(attrs_a, attrs_b, self.attr_comparators)
        ):
            combined = torch.cat([a, b], dim=-1)
            comparison = comparator(combined)
            attr_comparisons.append(comparison)

        # Stack attribute comparisons
        stacked = torch.cat(attr_comparisons, dim=-1)

        # Compute attribute importance
        importance = torch.softmax(self.importance_weights(stacked), dim=-1)

        # Final match probability
        match_prob = self.classifier(stacked)

        return match_prob, importance


# Usage example
siamese = SiameseMatcher(embedding_dim=256)
attr_matcher = AttributeAwareMatcher(embedding_dim=64, num_attributes=5)

# Test with random embeddings
emb_a = torch.randn(1, 256)
emb_b = torch.randn(1, 256)

match_prob = siamese(emb_a, emb_b)
print(f"Siamese match probability: {match_prob.item():.3f}")

# Attribute-level matching
attrs_a = [torch.randn(1, 64) for _ in range(5)]
attrs_b = [torch.randn(1, 64) for _ in range(5)]
match_prob, importance = attr_matcher(attrs_a, attrs_b)
print(f"Attribute-aware match probability: {match_prob.item():.3f}")
print(f"Attribute importance: {importance.squeeze().detach().numpy()}")
Siamese match probability: 0.494
Attribute-aware match probability: 0.492
Attribute importance: [0.20713502 0.17491557 0.21656144 0.22018927 0.1811987 ]
TipSimilarity Scoring Best Practices

Model architectures:

  • Siamese networks: Shared encoder, comparison network (see Chapter 16)
  • Cross-encoders: Joint encoding of record pairs (more accurate, slower)
  • Attribute-level: Compare attributes separately, aggregate
  • Ensemble: Combine multiple matchers for robustness

Training data:

  • Labeled pairs: Gold standard matches and non-matches
  • Active learning: Prioritize uncertain pairs for labeling
  • Weak supervision: Generate labels from rules, aggregate
  • Contrastive learning: Learn embeddings where matches are close

Threshold selection:

  • Precision-focused: High threshold (0.9+) for automated matching
  • Recall-focused: Lower threshold (0.5-0.7) + human review
  • Cost-based: Optimize threshold for business cost function

28.4 Graph-Based Resolution

Entity resolution isn’t just pairwise—matches form connected components where transitivity applies: if A matches B and B matches C, all three likely refer to the same entity. Graph-based resolution models records as nodes, potential matches as edges, and uses graph algorithms to find entity clusters.

Show Graph-Based Entity Resolution
from collections import defaultdict
from dataclasses import dataclass
from typing import Dict, List, Set, Tuple
import numpy as np


@dataclass
class MatchEdge:
    """Edge representing potential match between records."""
    record_a: str
    record_b: str
    similarity: float
    match_probability: float
    evidence: Dict[str, float]  # Per-attribute match scores


class EntityGraph:
    """Graph structure for entity resolution."""
    def __init__(self):
        self.nodes: Set[str] = set()
        self.edges: Dict[Tuple[str, str], MatchEdge] = {}
        self.adjacency: Dict[str, Set[str]] = defaultdict(set)

    def add_edge(self, edge: MatchEdge) -> None:
        """Add potential match edge."""
        self.nodes.add(edge.record_a)
        self.nodes.add(edge.record_b)

        key = tuple(sorted([edge.record_a, edge.record_b]))
        self.edges[key] = edge

        self.adjacency[edge.record_a].add(edge.record_b)
        self.adjacency[edge.record_b].add(edge.record_a)

    def find_connected_components(self, threshold: float = 0.5) -> List[Set[str]]:
        """Find entity clusters via connected components."""
        visited = set()
        components = []

        for node in self.nodes:
            if node in visited:
                continue

            # BFS to find component
            component = set()
            queue = [node]

            while queue:
                current = queue.pop(0)
                if current in visited:
                    continue

                visited.add(current)
                component.add(current)

                # Add neighbors with edge above threshold
                for neighbor in self.adjacency[current]:
                    key = tuple(sorted([current, neighbor]))
                    if key in self.edges:
                        edge = self.edges[key]
                        if edge.match_probability >= threshold:
                            queue.append(neighbor)

            if component:
                components.append(component)

        return components


class GraphEntityResolver:
    """Resolve entities using graph clustering."""
    def __init__(self, match_threshold: float = 0.5,
                 min_cluster_confidence: float = 0.7):
        self.match_threshold = match_threshold
        self.min_cluster_confidence = min_cluster_confidence
        self.graph = EntityGraph()

    def add_match_evidence(self, edge: MatchEdge) -> None:
        """Add pairwise match evidence to graph."""
        if edge.match_probability >= self.match_threshold:
            self.graph.add_edge(edge)

    def resolve(self) -> Dict[str, str]:
        """Resolve all entities, return record_id -> entity_id mapping."""
        components = self.graph.find_connected_components(self.match_threshold)

        record_to_entity = {}
        for i, component in enumerate(components):
            entity_id = f"entity_{i}"
            for record_id in component:
                record_to_entity[record_id] = entity_id

        return record_to_entity

    def get_entity_records(self, entity_id: str,
                           record_to_entity: Dict[str, str]) -> Set[str]:
        """Get all records belonging to an entity."""
        return {r for r, e in record_to_entity.items() if e == entity_id}


# Usage example
resolver = GraphEntityResolver(match_threshold=0.7)

# Add match evidence
edges = [
    MatchEdge("r1", "r2", 0.95, 0.92, {"name": 0.98, "address": 0.85}),
    MatchEdge("r2", "r3", 0.88, 0.85, {"name": 0.90, "address": 0.80}),
    MatchEdge("r4", "r5", 0.91, 0.89, {"name": 0.95, "address": 0.82}),
    MatchEdge("r1", "r6", 0.45, 0.35, {"name": 0.60, "address": 0.20}),  # Below threshold
]

for edge in edges:
    resolver.add_match_evidence(edge)

# Resolve entities
mapping = resolver.resolve()
print("Record to Entity mapping:")
for record, entity in sorted(mapping.items()):
    print(f"  {record} -> {entity}")

# Count entities
entities = set(mapping.values())
print(f"\nResolved {len(mapping)} records into {len(entities)} entities")
Record to Entity mapping:
  r1 -> entity_0
  r2 -> entity_0
  r3 -> entity_0
  r4 -> entity_1
  r5 -> entity_1

Resolved 5 records into 2 entities
TipGraph Resolution Best Practices

Clustering algorithms:

  • Connected components: Simple, transitive closure
  • Correlation clustering: Minimize disagreements with edge weights
  • Markov clustering: Flow-based clustering for weighted graphs
  • Hierarchical: Build dendrograms, cut at desired granularity

Handling conflicts:

  • Negative evidence: Some pairs definitely don’t match (different SSN)
  • Hard constraints: Never merge records with conflicting unique IDs
  • Soft constraints: Penalize but allow merging with minor conflicts

Scalability:

  • Incremental updates: Add new records to existing clusters
  • Distributed clustering: Partition graph, local clustering, merge
  • Streaming: Online clustering as matches arrive

28.5 Active Learning for Entity Resolution

Labeling record pairs is expensive—domain experts must review each pair. Active learning maximizes labeling efficiency by prioritizing the most informative pairs: those where the model is uncertain, or where a label would most improve overall accuracy.

Show Active Learning for Entity Resolution
from dataclasses import dataclass
from typing import List, Tuple, Optional
import numpy as np


@dataclass
class RecordPair:
    """Candidate record pair for matching."""
    record_a_id: str
    record_b_id: str
    features: np.ndarray
    match_probability: float = 0.5
    label: Optional[bool] = None  # None = unlabeled
    uncertainty: float = 0.5


class ActiveLearningMatcher:
    """Active learning for entity resolution."""
    def __init__(self, model, initial_threshold: float = 0.5):
        self.model = model
        self.threshold = initial_threshold
        self.labeled_pairs: List[RecordPair] = []
        self.unlabeled_pairs: List[RecordPair] = []

    def compute_uncertainty(self, pair: RecordPair) -> float:
        """Compute uncertainty for a pair (entropy-based)."""
        p = pair.match_probability
        if p <= 0 or p >= 1:
            return 0.0
        # Binary entropy
        return -p * np.log2(p) - (1-p) * np.log2(1-p)

    def select_pairs_uncertainty(self, n: int) -> List[RecordPair]:
        """Select most uncertain pairs for labeling."""
        # Update uncertainties
        for pair in self.unlabeled_pairs:
            pair.uncertainty = self.compute_uncertainty(pair)

        # Sort by uncertainty (highest first)
        sorted_pairs = sorted(self.unlabeled_pairs,
                             key=lambda p: p.uncertainty, reverse=True)
        return sorted_pairs[:n]

    def select_pairs_diversity(self, n: int) -> List[RecordPair]:
        """Select diverse pairs using clustering."""
        if len(self.unlabeled_pairs) <= n:
            return self.unlabeled_pairs

        # Simple diversity: select from different similarity ranges
        sorted_by_prob = sorted(self.unlabeled_pairs,
                               key=lambda p: p.match_probability)

        selected = []
        step = len(sorted_by_prob) // n
        for i in range(n):
            idx = min(i * step, len(sorted_by_prob) - 1)
            selected.append(sorted_by_prob[idx])

        return selected

    def select_pairs_hybrid(self, n: int,
                            uncertainty_weight: float = 0.7) -> List[RecordPair]:
        """Hybrid selection: uncertainty + diversity."""
        uncertain = self.select_pairs_uncertainty(n * 2)
        diverse = self.select_pairs_diversity(n * 2)

        # Score by weighted combination
        pair_scores = {}
        for i, pair in enumerate(uncertain):
            pair_scores[pair.record_a_id + pair.record_b_id] = \
                uncertainty_weight * (len(uncertain) - i)

        for i, pair in enumerate(diverse):
            key = pair.record_a_id + pair.record_b_id
            pair_scores[key] = pair_scores.get(key, 0) + \
                (1 - uncertainty_weight) * (len(diverse) - i)

        # Select top n
        all_pairs = {p.record_a_id + p.record_b_id: p
                    for p in uncertain + diverse}
        sorted_keys = sorted(pair_scores.keys(),
                            key=lambda k: pair_scores[k], reverse=True)

        return [all_pairs[k] for k in sorted_keys[:n]]

    def label_pair(self, pair: RecordPair, is_match: bool) -> None:
        """Record human label for a pair."""
        pair.label = is_match
        self.labeled_pairs.append(pair)
        if pair in self.unlabeled_pairs:
            self.unlabeled_pairs.remove(pair)

    def get_labeling_progress(self) -> dict:
        """Get labeling statistics."""
        matches = sum(1 for p in self.labeled_pairs if p.label)
        non_matches = sum(1 for p in self.labeled_pairs if not p.label)

        return {
            "total_labeled": len(self.labeled_pairs),
            "matches": matches,
            "non_matches": non_matches,
            "match_rate": matches / max(len(self.labeled_pairs), 1),
            "remaining_unlabeled": len(self.unlabeled_pairs)
        }


# Usage example
class DummyModel:
    def predict(self, features):
        return np.random.random()

matcher = ActiveLearningMatcher(DummyModel())

# Add unlabeled pairs
for i in range(100):
    pair = RecordPair(
        record_a_id=f"a_{i}",
        record_b_id=f"b_{i}",
        features=np.random.randn(128),
        match_probability=np.random.random()
    )
    matcher.unlabeled_pairs.append(pair)

# Select pairs for labeling
uncertain_pairs = matcher.select_pairs_uncertainty(10)
print(f"Selected {len(uncertain_pairs)} uncertain pairs")
print(f"Uncertainty range: {uncertain_pairs[0].uncertainty:.3f} - {uncertain_pairs[-1].uncertainty:.3f}")

# Simulate labeling
for pair in uncertain_pairs[:5]:
    matcher.label_pair(pair, is_match=np.random.random() > 0.5)

progress = matcher.get_labeling_progress()
print(f"\nLabeling progress: {progress}")
Selected 10 uncertain pairs
Uncertainty range: 1.000 - 0.991

Labeling progress: {'total_labeled': 5, 'matches': 4, 'non_matches': 1, 'match_rate': 0.8, 'remaining_unlabeled': 95}
TipActive Learning Best Practices

Selection strategies:

  • Uncertainty sampling: Pairs where model is most uncertain (near 0.5)
  • Query-by-committee: Pairs where multiple models disagree
  • Diversity sampling: Cover different regions of feature space
  • Expected model change: Pairs that would most change model if labeled

Practical considerations:

  • Batch selection: Select batches (10-100) for efficiency
  • Human fatigue: Mix easy and hard pairs to maintain quality
  • Stopping criteria: When to stop labeling (convergence, budget)
  • Cold start: Initial random sample before active selection

Quality control:

  • Duplicate questions: Verify labeler consistency
  • Gold questions: Known-answer pairs to check quality
  • Multiple labelers: Aggregate labels for difficult pairs

28.6 Incremental and Streaming Resolution

Production systems receive new records continuously—customers sign up, data feeds arrive, systems sync. Incremental entity resolution matches new records against existing entities without re-processing the entire database.

Show Incremental Entity Resolution
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Set
from datetime import datetime
import numpy as np


@dataclass
class Entity:
    """Resolved entity with member records."""
    entity_id: str
    canonical_record: Dict[str, str]  # Best values for each attribute
    member_records: Set[str] = field(default_factory=set)
    embedding: Optional[np.ndarray] = None
    created_at: datetime = field(default_factory=datetime.now)
    updated_at: datetime = field(default_factory=datetime.now)


class IncrementalResolver:
    """Incremental entity resolution for streaming data."""
    def __init__(self, match_threshold: float = 0.8,
                 max_candidates: int = 50):
        self.match_threshold = match_threshold
        self.max_candidates = max_candidates
        self.entities: Dict[str, Entity] = {}
        self.record_to_entity: Dict[str, str] = {}
        self.entity_embeddings: List[np.ndarray] = []
        self.entity_ids: List[str] = []

    def _find_candidate_entities(self, record_embedding: np.ndarray) -> List[str]:
        """Find candidate entities using ANN search."""
        if not self.entity_embeddings:
            return []

        # Brute force for demo (use FAISS/ScaNN in production)
        embeddings = np.stack(self.entity_embeddings)
        similarities = embeddings @ record_embedding

        # Get top candidates
        top_indices = np.argsort(similarities)[::-1][:self.max_candidates]
        return [self.entity_ids[i] for i in top_indices
                if similarities[i] >= self.match_threshold]

    def _compute_match_score(self, record_embedding: np.ndarray,
                             entity: Entity) -> float:
        """Compute match score between record and entity."""
        if entity.embedding is None:
            return 0.0
        return float(np.dot(record_embedding, entity.embedding))

    def _create_entity(self, record_id: str, attributes: Dict[str, str],
                       embedding: np.ndarray) -> Entity:
        """Create new entity from record."""
        entity_id = f"entity_{len(self.entities)}"
        entity = Entity(
            entity_id=entity_id,
            canonical_record=attributes.copy(),
            member_records={record_id},
            embedding=embedding
        )

        self.entities[entity_id] = entity
        self.record_to_entity[record_id] = entity_id
        self.entity_embeddings.append(embedding)
        self.entity_ids.append(entity_id)

        return entity

    def _merge_into_entity(self, record_id: str, attributes: Dict[str, str],
                           embedding: np.ndarray, entity: Entity) -> None:
        """Merge record into existing entity."""
        entity.member_records.add(record_id)
        self.record_to_entity[record_id] = entity.entity_id

        # Update entity embedding (running average)
        n = len(entity.member_records)
        entity.embedding = (entity.embedding * (n-1) + embedding) / n
        entity.embedding /= np.linalg.norm(entity.embedding)

        # Update canonical record (simple: keep existing)
        # In production: more sophisticated merging
        entity.updated_at = datetime.now()

    def resolve_record(self, record_id: str, attributes: Dict[str, str],
                       embedding: np.ndarray) -> Dict:
        """Resolve a single new record."""
        # Find candidate entities
        candidates = self._find_candidate_entities(embedding)

        if not candidates:
            # No candidates: create new entity
            entity = self._create_entity(record_id, attributes, embedding)
            return {
                "action": "created",
                "entity_id": entity.entity_id,
                "confidence": 1.0
            }

        # Score candidates
        best_entity = None
        best_score = 0.0

        for entity_id in candidates:
            entity = self.entities[entity_id]
            score = self._compute_match_score(embedding, entity)
            if score > best_score:
                best_score = score
                best_entity = entity

        if best_score >= self.match_threshold:
            # Merge into existing entity
            self._merge_into_entity(record_id, attributes, embedding, best_entity)
            return {
                "action": "merged",
                "entity_id": best_entity.entity_id,
                "confidence": best_score
            }
        else:
            # Create new entity
            entity = self._create_entity(record_id, attributes, embedding)
            return {
                "action": "created",
                "entity_id": entity.entity_id,
                "confidence": 1.0,
                "closest_entity": best_entity.entity_id if best_entity else None,
                "closest_score": best_score
            }


# Usage example
resolver = IncrementalResolver(match_threshold=0.8)

# Process streaming records
records = [
    ("r1", {"name": "John Smith", "email": "john@email.com"}),
    ("r2", {"name": "Jon Smith", "email": "john@email.com"}),  # Should merge with r1
    ("r3", {"name": "Jane Doe", "email": "jane@email.com"}),   # New entity
    ("r4", {"name": "John Smith", "email": "different@email.com"}),  # Uncertain
]

for record_id, attrs in records:
    # Generate embedding (would use encoder in production)
    embedding = np.random.randn(256).astype(np.float32)
    embedding /= np.linalg.norm(embedding)

    result = resolver.resolve_record(record_id, attrs, embedding)
    print(f"Record {record_id}: {result['action']} -> {result['entity_id']} "
          f"(confidence: {result['confidence']:.2f})")

print(f"\nTotal entities: {len(resolver.entities)}")
print(f"Total records: {len(resolver.record_to_entity)}")
Record r1: created -> entity_0 (confidence: 1.00)
Record r2: created -> entity_1 (confidence: 1.00)
Record r3: created -> entity_2 (confidence: 1.00)
Record r4: created -> entity_3 (confidence: 1.00)

Total entities: 4
Total records: 4
TipIncremental Resolution Best Practices

Index maintenance:

  • Incremental ANN updates: Add vectors to index without rebuild
  • Periodic reindexing: Full rebuild during low-traffic windows
  • Entity embedding updates: Running average or periodic recomputation

Handling merges:

  • Transitive closure: When two entities merge, update all references
  • Split detection: Sometimes entities should be split (wrong merge)
  • Audit trail: Track merge history for debugging and compliance

Latency requirements:

  • Real-time (< 100ms): Online matching for customer-facing applications
  • Near-real-time (< 1s): Streaming pipelines, fraud detection
  • Batch (minutes-hours): Nightly deduplication runs

28.7 Industry Applications

Entity resolution appears across industries with domain-specific challenges:

28.7.1 Healthcare: Patient Matching

Patient matching across healthcare systems is critical for care coordination and safety. Challenges include:

  • Name variations: Married names, nicknames, spelling errors
  • Address changes: Patients move frequently
  • No universal ID: Unlike SSN for adults, no unique patient identifier
  • Life-or-death stakes: Wrong patient match can be fatal

Regulatory context: HIPAA requires accurate patient identification; ONC promotes patient matching standards.

28.7.2 Financial Services: Customer Identity

Financial institutions must maintain accurate customer records for:

  • KYC/AML compliance: Know Your Customer, Anti-Money Laundering
  • Fraud prevention: Detect accounts opened by same fraudster
  • Cross-selling: Unified view of customer across products
  • Regulatory reporting: Accurate aggregate positions

28.7.3 Marketing: Identity Graphs

Marketing platforms build identity graphs linking:

  • Cross-device: Same person on phone, laptop, tablet
  • Cross-channel: Email, cookies, mobile IDs, CTV
  • Offline-online: Store purchases linked to digital profiles
  • Household: Grouping family members

Scale: Major identity graphs contain billions of records with trillions of potential links.

28.7.4 Government: Record Linkage

Government agencies link records across:

  • Benefits programs: Prevent fraud, ensure eligibility
  • Tax administration: Match income reports across sources
  • Law enforcement: Link identities across jurisdictions
  • Census: Deduplicate responses, link to administrative records

28.8 Key Takeaways

  • Entity resolution at scale requires blocking to avoid O(N²) complexity: Embedding-based blocking using ANN search reduces candidate pairs to O(N log N) or O(N), enabling trillion-record matching by comparing only records with similar embeddings

  • Learned similarity scoring outperforms hand-crafted rules: Siamese networks and attribute-aware matchers learn semantic similarity from labeled examples, capturing variations (nicknames, abbreviations, typos) that rule-based systems miss while providing calibrated match probabilities

  • Graph-based resolution handles transitivity and conflicts: Modeling records as nodes and matches as edges enables connected component clustering for entity groups, correlation clustering that respects negative evidence, and principled handling of conflicting match signals

  • Active learning maximizes labeling efficiency: Uncertainty sampling prioritizes pairs where the model is uncertain, reducing labeling effort by 60-80% compared to random sampling while achieving equivalent accuracy

  • Incremental resolution is essential for production systems: New records must be matched against existing entities in real-time without full reprocessing, requiring incremental ANN indexes, entity embedding updates, and merge/split handling

  • Domain-specific challenges require specialized approaches: Healthcare patient matching faces unique identifier absence and safety stakes; financial services requires KYC/AML compliance; marketing operates at billion-record scale with cross-device linking

28.9 Looking Ahead

Chapter 29 explores how financial services applies entity resolution for KYC/AML compliance, customer deduplication, and fraud detection, along with other embedding applications including trading signals, credit risk, and market sentiment.

28.10 Further Reading

28.10.1 Entity Resolution Foundations

  • Christen, Peter (2012). “Data Matching: Concepts and Techniques for Record Linkage, Entity Resolution, and Duplicate Detection.” Springer.
  • Elmagarmid, Ahmed K., Panagiotis G. Ipeirotis, and Vassilios S. Verykios (2007). “Duplicate Record Detection: A Survey.” IEEE TKDE.
  • Getoor, Lise, and Ashwin Machanavajjhala (2012). “Entity Resolution: Theory, Practice & Open Challenges.” VLDB Tutorial.
  • Papadakis, George, et al. (2020). “Blocking and Filtering Techniques for Entity Resolution: A Survey.” ACM Computing Surveys.

28.10.2 Deep Learning for Entity Resolution

  • Mudgal, Sidharth, et al. (2018). “Deep Learning for Entity Matching: A Design Space Exploration.” SIGMOD.
  • Ebraheem, Muhammad, et al. (2018). “Distributed Representations of Tuples for Entity Resolution.” VLDB.
  • Kasai, Jungo, et al. (2019). “Low-Resource Deep Entity Resolution with Transfer and Active Learning.” ACL.
  • Li, Yuliang, et al. (2020). “Deep Entity Matching with Pre-Trained Language Models.” VLDB.

28.10.3 Scalable Entity Resolution

  • Steorts, Rebecca C., Samuel L. Ventura, Mauricio Sadinle, and Stephen E. Fienberg (2016). “A Comparison of Blocking Methods for Record Linkage.” Privacy in Statistical Databases.
  • Wang, Qing, et al. (2011). “Fast-Join: An Efficient Method for Fuzzy Token Matching Based String Similarity Join.” ICDE.
  • Vernica, Rares, Michael J. Carey, and Chen Li (2010). “Efficient Parallel Set-Similarity Joins Using MapReduce.” SIGMOD.
  • Chu, Xu, et al. (2016). “Distributed Data Deduplication.” VLDB.

28.10.4 Active Learning and Human-in-the-Loop

  • Arasu, Arvind, Michaela Götz, and Raghav Kaushik (2010). “On Active Learning of Record Matching Packages.” SIGMOD.
  • Bellare, Kedar, et al. (2012). “Active Learning for Crowdsourced Entity Resolution.” Workshop on Crowdsourcing and Data Mining.
  • Wang, Jiannan, et al. (2012). “CrowdER: Crowdsourcing Entity Resolution.” VLDB.
  • Firmani, Donatella, et al. (2016). “Online Entity Resolution Using an Oracle.” VLDB.

28.10.5 Graph-Based Entity Resolution

  • Bhattacharya, Indrajit, and Lise Getoor (2007). “Collective Entity Resolution in Relational Data.” ACM TKDD.
  • Rastogi, Vibhor, et al. (2011). “Large-Scale Collective Entity Matching.” VLDB.
  • Vesdapunt, Norases, Kedar Bellare, and Nilesh Dalvi (2014). “Crowdsourcing Algorithms for Entity Resolution.” VLDB.
  • Steorts, Rebecca C., Rob Hall, and Stephen E. Fienberg (2016). “A Bayesian Approach to Graphical Record Linkage and Deduplication.” JASA.