23  Data Engineering for Embeddings

NoteChapter Overview

High-quality embeddings demand high-quality data engineering. This chapter explores the data infrastructure that enables trillion-row embedding systems: ETL pipelines that transform raw data into training-ready formats while preserving semantic relationships, streaming architectures that update embeddings in near-real-time as data evolves, data quality frameworks that detect and remediate issues before they corrupt embeddings, schema evolution strategies that maintain backwards compatibility across model versions, and multi-source data fusion techniques that combine embeddings from heterogeneous datasets. These data engineering practices ensure embedding models have the clean, consistent, well-structured data needed to achieve their potential in production.

After optimizing vector operations for sub-millisecond search (Chapter 22), the remaining production challenge is data engineering. Embeddings are only as good as the data they’re trained on. A model trained on corrupted data produces corrupted embeddings. A pipeline that can’t handle schema changes breaks during routine database migrations. A system that can’t fuse data from multiple sources misses critical context. This chapter addresses the data engineering practices that separate prototype embedding systems from production-ready platforms serving billions of users.

23.1 ETL Pipelines for Embedding Generation

Embedding generation requires transforming raw data—database records, documents, images, logs—into vector representations while preserving semantic meaning. ETL (Extract, Transform, Load) pipelines orchestrate this transformation at scale, handling data extraction from diverse sources, feature engineering that captures relevant signals, quality validation that ensures training stability, and efficient loading into training systems.

23.1.1 The Embedding ETL Challenge

Traditional ETL optimizes for data warehousing: schema normalization, aggregation, and SQL-friendly formats. Embedding ETL has unique requirements:

  • Semantic preservation: Transformations must preserve meaning (normalization can destroy signal)
  • Feature engineering: Extract features that capture relationships (not just facts)
  • Scale: Process billions of records efficiently (trillion-row datasets)
  • Freshness: Keep training data current (embedding drift occurs within weeks)
  • Multimodal: Handle text, images, structured data, time series simultaneously
Show ETL Pipeline for Embedding Generation
from dataclasses import dataclass
from typing import List, Dict, Any
import pandas as pd

@dataclass
class EmbeddingRecord:
    """Record prepared for embedding generation."""
    id: str
    text: str
    metadata: Dict[str, Any]

class EmbeddingETLPipeline:
    """ETL pipeline for preparing data for embedding generation."""

    def extract(self, source: str) -> pd.DataFrame:
        """Extract data from source (database, file, API)."""
        # Example: Load from CSV
        return pd.read_csv(source)

    def transform(self, df: pd.DataFrame) -> List[EmbeddingRecord]:
        """Transform raw data into embedding-ready format."""
        records = []
        for _, row in df.iterrows():
            # Clean text
            text = str(row.get('content', '')).strip()

            # Skip empty or invalid records
            if not text or len(text) < 10:
                continue

            # Extract metadata
            metadata = {
                'category': row.get('category'),
                'timestamp': row.get('created_at'),
                'source': 'etl_pipeline'
            }

            records.append(EmbeddingRecord(
                id=str(row['id']),
                text=text,
                metadata=metadata
            ))

        return records

    def load(self, records: List[EmbeddingRecord], output_path: str):
        """Load prepared records for embedding."""
        df = pd.DataFrame([
            {'id': r.id, 'text': r.text, **r.metadata}
            for r in records
        ])
        df.to_parquet(output_path, index=False)
        return len(records)

# Usage example
pipeline = EmbeddingETLPipeline()
# Create sample data
sample_df = pd.DataFrame({
    'id': [1, 2, 3],
    'content': ['Machine learning article', 'AI research paper', 'Data science tutorial'],
    'category': ['ML', 'AI', 'DS'],
    'created_at': ['2024-01-01', '2024-01-02', '2024-01-03']
})
records = pipeline.transform(sample_df)
print(f"Processed {len(records)} records ready for embedding")
Processed 3 records ready for embedding
TipETL Best Practices for Embeddings

Data quality:

  • Validate at every stage (extract, transform, load)
  • Implement deduplication (exact and near-duplicate)
  • Handle missing values explicitly (don’t drop silently)
  • Monitor data drift (distribution shifts over time)

Performance:

  • Partition data for parallel processing (100-1000 partitions)
  • Use columnar formats (Parquet) for analytics
  • Implement checkpointing for fault tolerance
  • Optimize for I/O (sequential reads, batching)

Maintainability:

  • Keep transformations simple and composable
  • Document feature engineering logic
  • Version control pipeline code
  • Test with representative samples before production runs

23.2 Streaming Embedding Updates

Batch ETL processes data hourly or daily, but many applications need real-time embeddings. A news recommender must embed articles seconds after publication. A fraud detector must embed transactions milliseconds after they occur. Streaming architectures enable continuous embedding updates with end-to-end latency measured in seconds, not hours.

23.2.1 Streaming vs. Batch: The Trade-off

Batch processing (hourly/daily):

  • Advantages: Simple, efficient, easy to debug, supports complex aggregations
  • Disadvantages: Stale embeddings (hours old), high latency for new items
  • Use when: Daily updates sufficient, complex transformations required

Stream processing (seconds):

  • Advantages: Fresh embeddings (seconds old), low latency for new items, event-driven
  • Disadvantages: Complex architecture, harder to debug, limited aggregation window
  • Use when: Real-time updates critical, simple transformations, event-driven workflows
Show Streaming Embedding Processor
from queue import Queue
from typing import List
import time
import threading

class StreamingEmbeddingProcessor:
    """Process embeddings in real-time from streaming data."""

    def __init__(self, batch_size: int = 32, batch_timeout_ms: int = 100):
        self.batch_size = batch_size
        self.batch_timeout_ms = batch_timeout_ms
        self.queue = Queue()
        self.running = False
        self.processor_thread = None

    def start(self):
        """Start the streaming processor."""
        self.running = True
        self.processor_thread = threading.Thread(target=self._process_loop)
        self.processor_thread.start()

    def stop(self):
        """Stop the streaming processor."""
        self.running = False
        if self.processor_thread:
            self.processor_thread.join()

    def submit(self, item: dict):
        """Submit an item for processing."""
        self.queue.put(item)

    def _process_loop(self):
        """Main processing loop with micro-batching."""
        while self.running:
            batch = self._collect_batch()
            if batch:
                self._process_batch(batch)

    def _collect_batch(self) -> List[dict]:
        """Collect items into micro-batch."""
        batch = []
        deadline = time.time() + (self.batch_timeout_ms / 1000.0)

        while len(batch) < self.batch_size and time.time() < deadline:
            try:
                item = self.queue.get(timeout=0.01)
                batch.append(item)
            except:
                break

        return batch

    def _process_batch(self, batch: List[dict]):
        """Process a batch of items."""
        # Simulate embedding generation
        texts = [item['text'] for item in batch]
        print(f"Processing batch of {len(batch)} items")
        # Here you would call your embedding model
        # embeddings = model.encode(texts)

# Usage example
processor = StreamingEmbeddingProcessor(batch_size=10, batch_timeout_ms=100)
processor.start()

# Simulate streaming data
for i in range(25):
    processor.submit({'id': i, 'text': f'Document {i}'})
    time.sleep(0.01)  # Simulate streaming arrival

time.sleep(0.2)  # Wait for processing
processor.stop()
print("Streaming processor completed")
Processing batch of 8 items
Processing batch of 9 items
Processing batch of 1 items
Processing batch of 2 items
Processing batch of 5 items
Streaming processor completed
TipStreaming Architecture Best Practices

Micro-batching:

  • Batch window: 100-1000ms (balance latency vs throughput)
  • Batch size: 10-100 items (optimize for GPU)
  • Adaptive batching: Adjust based on load

Fault tolerance:

  • Checkpointing: Save progress every N events
  • Exactly-once semantics: Idempotent operations
  • Dead letter queue: Handle failed events separately
  • Retry logic: Exponential backoff for transient failures

Monitoring:

  • End-to-end latency (p50, p95, p99)
  • Throughput (events/second)
  • Error rate (failures / total events)
  • Queue depth (backpressure indicator)
WarningStreaming Complexity

Streaming pipelines are significantly more complex than batch:

  • Debugging: Harder to reproduce issues (event order matters)
  • Testing: Need to simulate real-time event streams
  • Operations: 24/7 monitoring required
  • Cost: Higher infrastructure costs (always running)

Start with batch, migrate to streaming only when business value justifies complexity.

23.3 Data Quality for Embedding Training

Poor data quality causes poor embeddings. Data quality frameworks detect and remediate issues before they corrupt training: duplicate detection prevents training on repeated examples, outlier detection identifies corrupted or adversarial data, consistency validation ensures relationships hold across updates, and drift detection alerts when distributions shift unexpectedly.

23.3.1 The Data Quality Challenge for Embeddings

Traditional data quality focuses on completeness and correctness. Embedding quality has additional requirements:

  • Semantic consistency: Similar items must have similar features
  • Label quality: Incorrect labels poison contrastive learning
  • Distribution stability: Embedding space shifts when data distribution changes
  • Relationship preservation: Entity relationships must remain consistent
Show Data Quality Framework
from dataclasses import dataclass
from typing import List, Optional
import numpy as np
import hashlib

@dataclass
class QualityReport:
    """Report of data quality issues."""
    total_records: int
    duplicates: int
    outliers: int
    missing_values: int
    passed: bool

class DataQualityFramework:
    """Framework for validating embedding training data quality."""

    def __init__(self, outlier_threshold: float = 3.0):
        self.outlier_threshold = outlier_threshold
        self.seen_hashes = set()

    def validate(self, records: List[dict]) -> QualityReport:
        """Run comprehensive quality checks."""
        duplicates = self._check_duplicates(records)
        outliers = self._check_outliers(records)
        missing = self._check_missing_values(records)

        passed = (duplicates == 0 and outliers == 0 and missing < len(records) * 0.05)

        return QualityReport(
            total_records=len(records),
            duplicates=duplicates,
            outliers=outliers,
            missing_values=missing,
            passed=passed
        )

    def _check_duplicates(self, records: List[dict]) -> int:
        """Detect exact and near-duplicate records."""
        duplicates = 0
        for record in records:
            text = record.get('text', '')
            # Create hash for duplicate detection
            text_hash = hashlib.md5(text.encode()).hexdigest()
            if text_hash in self.seen_hashes:
                duplicates += 1
            else:
                self.seen_hashes.add(text_hash)
        return duplicates

    def _check_outliers(self, records: List[dict]) -> int:
        """Detect statistical outliers in text length."""
        lengths = [len(r.get('text', '')) for r in records]
        if not lengths:
            return 0

        mean_length = np.mean(lengths)
        std_length = np.std(lengths)

        outliers = 0
        for length in lengths:
            z_score = abs((length - mean_length) / std_length) if std_length > 0 else 0
            if z_score > self.outlier_threshold:
                outliers += 1

        return outliers

    def _check_missing_values(self, records: List[dict]) -> int:
        """Count records with missing required fields."""
        missing = 0
        for record in records:
            if not record.get('text') or not record.get('id'):
                missing += 1
        return missing

# Usage example
quality_framework = DataQualityFramework(outlier_threshold=3.0)
sample_records = [
    {'id': '1', 'text': 'Normal text'},
    {'id': '2', 'text': 'Another normal text'},
    {'id': '3', 'text': 'Normal text'},  # Duplicate
    {'id': '4', 'text': 'x' * 10000},  # Outlier (very long)
    {'id': '5', 'text': ''},  # Missing
]
report = quality_framework.validate(sample_records)
print(f"Quality Report: {report.total_records} records")
print(f"  Duplicates: {report.duplicates}")
print(f"  Outliers: {report.outliers}")
print(f"  Missing: {report.missing_values}")
print(f"  Passed: {report.passed}")
Quality Report: 5 records
  Duplicates: 1
  Outliers: 0
  Missing: 1
  Passed: False
TipData Quality Best Practices

Prevention:

  • Validate at ingestion (catch issues early)
  • Implement schema contracts (enforce structure)
  • Use type systems (prevent type errors)
  • Automate quality checks (continuous validation)

Detection:

  • Statistical profiling (baseline distributions)
  • Anomaly detection (outliers, drift)
  • Relationship validation (foreign keys, consistency)
  • Duplicate detection (exact and near-duplicate)

Remediation:

  • Automated fixes (fill missing values, clip outliers)
  • Human review queue (ambiguous cases)
  • Dead letter queue (unfixable records)
  • Feedback loops (fix upstream sources)

Monitoring:

  • Quality dashboards (real-time metrics)
  • Alerts on degradation (threshold breaches)
  • Trend analysis (quality over time)
  • Root cause analysis (trace issues to source)

23.4 Schema Evolution and Backwards Compatibility

Production embedding systems evolve: new features are added, old features deprecated, data types change. Schema evolution enables safe changes while maintaining backwards compatibility with existing embeddings, models, and downstream consumers.

23.4.1 The Schema Evolution Challenge

Embedding systems have complex dependencies:

  • Trained models: Expect specific feature schema
  • Vector indices: Store embeddings from specific model versions
  • Downstream consumers: Query embeddings with specific schemas
  • Historical data: May use old schemas

Change one component, and the entire system can break.

Show Schema Evolution Manager
from typing import Dict, Any, Optional
from enum import Enum

class SchemaVersion(Enum):
    """Schema version enumeration."""
    V1 = "1.0"
    V2 = "2.0"

class SchemaEvolutionManager:
    """Manage schema evolution with backwards compatibility."""

    def __init__(self):
        self.current_version = SchemaVersion.V2

    def migrate(self, record: Dict[str, Any], from_version: SchemaVersion) -> Dict[str, Any]:
        """Migrate record from old schema to current schema."""
        if from_version == self.current_version:
            return record  # No migration needed

        # Apply migration chain
        if from_version == SchemaVersion.V1:
            record = self._migrate_v1_to_v2(record)

        return record

    def _migrate_v1_to_v2(self, record: Dict[str, Any]) -> Dict[str, Any]:
        """Migrate from v1 to v2 schema."""
        migrated = record.copy()

        # V2 added 'category' field with default value
        if 'category' not in migrated:
            migrated['category'] = 'uncategorized'

        # V2 renamed 'title' to 'name'
        if 'title' in migrated:
            migrated['name'] = migrated.pop('title')

        # V2 added required 'embedding_model' field
        if 'embedding_model' not in migrated:
            migrated['embedding_model'] = 'default-v1'

        # Tag with schema version
        migrated['__schema_version__'] = SchemaVersion.V2.value

        return migrated

    def validate_schema(self, record: Dict[str, Any], version: SchemaVersion) -> bool:
        """Validate record against schema version."""
        if version == SchemaVersion.V1:
            required = ['id', 'text', 'title']
        elif version == SchemaVersion.V2:
            required = ['id', 'text', 'name', 'category', 'embedding_model']
        else:
            return False

        return all(field in record for field in required)

    def get_schema_version(self, record: Dict[str, Any]) -> Optional[SchemaVersion]:
        """Detect schema version from record."""
        version_str = record.get('__schema_version__')
        if version_str:
            try:
                return SchemaVersion(version_str)
            except ValueError:
                pass

        # Fallback: infer from fields
        if 'name' in record and 'category' in record:
            return SchemaVersion.V2
        return SchemaVersion.V1

# Usage example
manager = SchemaEvolutionManager()

# Old v1 record
v1_record = {
    'id': '123',
    'text': 'Machine learning content',
    'title': 'ML Article'
}

# Detect version
version = manager.get_schema_version(v1_record)
print(f"Detected schema version: {version.value}")

# Migrate to current version
v2_record = manager.migrate(v1_record, version)
print(f"Migrated record: {v2_record}")
print(f"Valid v2 schema: {manager.validate_schema(v2_record, SchemaVersion.V2)}")
Detected schema version: 1.0
Migrated record: {'id': '123', 'text': 'Machine learning content', 'category': 'uncategorized', 'name': 'ML Article', 'embedding_model': 'default-v1', '__schema_version__': '2.0'}
Valid v2 schema: True
TipSchema Evolution Best Practices

Safe evolution strategies:

  • Additive changes only: Add fields, don’t remove (backwards compatible)
  • Deprecation before removal: Mark fields deprecated for 1-2 versions
  • Default values: Provide defaults for new required fields
  • Version tagging: Tag data with schema version explicitly

Migration strategies:

  • Online migration: Transform data on-read (lazy)
  • Offline migration: Reprocess entire dataset (eager)
  • Hybrid: Migrate hot data online, cold data offline

Compatibility levels:

  • Forward compatible: New consumers can read old data
  • Backward compatible: Old consumers can read new data
  • Full compatibility: Both directions work
WarningBreaking Changes

Some changes cannot be made backwards-compatible:

  • Removing required fields
  • Changing field types incompatibly
  • Removing entire entities

For breaking changes: 1. Version bump: Increment major version (v1 → v2) 2. Parallel operation: Run both versions simultaneously 3. Gradual migration: Migrate consumers incrementally 4. Deprecation timeline: Announce timeline (3-6 months) 5. Sunset old version: Remove after migration complete

23.5 Multi-Source Data Fusion

Production embedding systems integrate data from multiple sources: user profiles from CRM, product data from inventory, behavioral logs from analytics, external data from partners. Multi-source data fusion combines these heterogeneous datasets into unified embeddings while handling schema mismatches, different update frequencies, and varying data quality.

23.5.1 The Data Fusion Challenge

Each data source has unique characteristics:

  • Schema: Different field names, types, structures
  • Frequency: Some update real-time, others daily/weekly
  • Quality: Varying completeness, correctness, timeliness
  • Scale: Some have millions of records, others billions
  • Access: APIs, databases, files, streams

Challenge: Combine these sources into training data that preserves relationships across sources.

Show Multi-Source Data Fusion
from typing import Dict, List, Any, Optional
from datetime import datetime

class MultiSourceDataFusion:
    """Fuse data from multiple sources into unified records."""

    def __init__(self):
        self.source_priorities = {
            'primary_db': 1,
            'external_api': 2,
            'user_input': 3
        }

    def fuse(self, entity_id: str, sources: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
        """Fuse data from multiple sources for a single entity."""
        fused = {'entity_id': entity_id}

        # Collect all fields from all sources
        all_fields = set()
        for source_data in sources.values():
            all_fields.update(source_data.keys())

        # Resolve conflicts for each field
        for field in all_fields:
            value = self._resolve_field(field, sources)
            if value is not None:
                fused[field] = value

        return fused

    def _resolve_field(self, field: str, sources: Dict[str, Dict[str, Any]]) -> Any:
        """Resolve conflicts for a single field across sources."""
        candidates = []

        for source_name, source_data in sources.items():
            if field in source_data:
                value = source_data[field]
                priority = self.source_priorities.get(source_name, 999)
                timestamp = source_data.get('_timestamp', datetime.min)
                candidates.append((value, priority, timestamp, source_name))

        if not candidates:
            return None

        # Resolution strategy: Priority-based with recency as tiebreaker
        candidates.sort(key=lambda x: (x[1], -x[2].timestamp() if isinstance(x[2], datetime) else 0))

        return candidates[0][0]

    def batch_fuse(self, entity_sources: Dict[str, Dict[str, Dict[str, Any]]]) -> List[Dict[str, Any]]:
        """Fuse data for multiple entities in batch."""
        return [
            self.fuse(entity_id, sources)
            for entity_id, sources in entity_sources.items()
        ]

# Usage example
fusion = MultiSourceDataFusion()

# Data from multiple sources for same entity
entity_data = {
    'primary_db': {
        'name': 'John Doe',
        'email': 'john@example.com',
        '_timestamp': datetime(2024, 1, 1)
    },
    'external_api': {
        'name': 'John D.',
        'phone': '+1-555-0100',
        '_timestamp': datetime(2024, 1, 5)
    },
    'user_input': {
        'email': 'john.doe@example.com',  # More recent
        'preferences': {'theme': 'dark'},
        '_timestamp': datetime(2024, 1, 10)
    }
}

fused_record = fusion.fuse('user_123', entity_data)
print(f"Fused record: {fused_record}")
print(f"  Name from: primary_db (highest priority)")
print(f"  Email from: user_input (most recent)")
print(f"  Phone from: external_api (only source)")
Fused record: {'entity_id': 'user_123', '_timestamp': datetime.datetime(2024, 1, 1, 0, 0), 'name': 'John Doe', 'email': 'john@example.com', 'preferences': {'theme': 'dark'}, 'phone': '+1-555-0100'}
  Name from: primary_db (highest priority)
  Email from: user_input (most recent)
  Phone from: external_api (only source)
TipMulti-Source Fusion Best Practices

Schema management:

  • Canonical schema: Define single target schema
  • Schema registry: Centralize source schema definitions
  • Schema evolution: Version schemas and migrate incrementally
  • Type safety: Validate types during alignment

Conflict resolution:

  • Priority-based: Assign priority to sources (authority)
  • Recency-based: Prefer most recently updated value
  • Quality-based: Weight by source quality score
  • Context-aware: Consider semantic meaning

Performance:

  • Incremental fusion: Only fuse changed entities
  • Partitioning: Partition by entity_id for parallel fusion
  • Caching: Cache fused results (invalidate on update)
  • Lazy loading: Fuse on-demand for rarely accessed entities

23.6 Key Takeaways

  • ETL pipelines must preserve semantic relationships: Unlike traditional ETL that optimizes for SQL analytics, embedding ETL requires feature engineering that captures similarity and meaning, not just facts

  • Streaming enables real-time embeddings with sub-second latency: Micro-batching architectures (100-1000ms windows) balance throughput and latency, enabling fresh embeddings for dynamic content like news and social media

  • Data quality directly determines embedding quality: Comprehensive validation (schema, values, semantics, duplicates, drift) prevents training on corrupted data that would poison embeddings for months

  • Schema evolution requires careful coordination across components: Backwards-compatible changes (add fields, provide defaults) enable safe evolution while breaking changes (remove fields, change types) require parallel operation and gradual migration

  • Multi-source fusion combines heterogeneous datasets into unified embeddings: Schema alignment, entity resolution, conflict resolution, and temporal alignment enable leveraging data from multiple systems with different schemas and update frequencies

  • Data engineering is the foundation of embedding systems: High-quality embeddings require high-quality data engineering; invest in pipelines, quality frameworks, and fusion strategies before scaling models

  • The data engineering hierarchy: Quality > Schema design > Performance. Impact ratios vary by domain, but garbage-in-garbage-out applies universally—focus on correctness before optimizing throughput

23.7 Looking Ahead

Part III (Production Engineering) concludes with robust data engineering practices that ensure embedding systems have the clean, consistent, high-quality data needed to achieve their potential. Part IV (Advanced Applications) begins with Chapter 24, which explores how to prepare text documents for embedding systems, covering chunking strategies, document-type specific approaches, and metadata preservation for optimal retrieval quality.

23.8 Further Reading

23.8.1 Data Engineering

  • Kleppmann, Martin (2017). “Designing Data-Intensive Applications.” O’Reilly Media.
  • Reis, Cathy, and Rupal Mahajan (2019). “Data Engineering with Apache Spark, Delta Lake, and Lakehouse.” O’Reilly Media.
  • Kalidindi, Santhosh (2021). “Data Engineering with Python.” Packt Publishing.

23.8.2 ETL and Pipelines

  • Kimball, Ralph, and Margy Ross (2013). “The Data Warehouse Toolkit.” Wiley.
  • Apache Airflow Documentation. “Best Practices.”
  • dbt Documentation. “Best Practices for Data Transformation.”

23.8.3 Streaming Systems

  • Kleppmann, Martin (2016). “Making Sense of Stream Processing.” O’Reilly Media.
  • Narkhede, Neha, et al. (2017). “Kafka: The Definitive Guide.” O’Reilly Media.
  • Apache Flink Documentation. “Streaming Concepts.”

23.8.4 Data Quality

  • Redman, Thomas (2016). “Getting in Front on Data Quality.” Harvard Business Review.
  • Batini, Carlo, and Monica Scannapieco (2016). “Data and Information Quality.” Springer.
  • Talend Data Quality Documentation. “Data Quality Best Practices.”

23.8.5 Schema Evolution

  • Kleppmann, Martin (2015). “Schema Evolution in Avro, Protocol Buffers and Thrift.” Blog post.
  • Confluent Documentation. “Schema Evolution and Compatibility.”
  • Fowler, Martin (2016). “Evolutionary Database Design.” martinfowler.com.

23.8.6 Data Integration

  • Doan, AnHai, et al. (2012). “Principles of Data Integration.” Morgan Kaufmann.
  • Haas, Laura, et al. (2005). “Clio Grows Up: From Research Prototype to Industrial Tool.” SIGMOD.
  • Madhavan, Jayant, et al. (2001). “Generic Schema Matching with Cupid.” VLDB.