High-quality embeddings demand high-quality data engineering. This chapter explores the data infrastructure that enables trillion-row embedding systems: ETL pipelines that transform raw data into training-ready formats while preserving semantic relationships, streaming architectures that update embeddings in near-real-time as data evolves, data quality frameworks that detect and remediate issues before they corrupt embeddings, schema evolution strategies that maintain backwards compatibility across model versions, and multi-source data fusion techniques that combine embeddings from heterogeneous datasets. These data engineering practices ensure embedding models have the clean, consistent, well-structured data needed to achieve their potential in production.
After optimizing vector operations for sub-millisecond search (Chapter 22), the remaining production challenge is data engineering. Embeddings are only as good as the data they’re trained on. A model trained on corrupted data produces corrupted embeddings. A pipeline that can’t handle schema changes breaks during routine database migrations. A system that can’t fuse data from multiple sources misses critical context. This chapter addresses the data engineering practices that separate prototype embedding systems from production-ready platforms serving billions of users.
23.1 ETL Pipelines for Embedding Generation
Embedding generation requires transforming raw data—database records, documents, images, logs—into vector representations while preserving semantic meaning. ETL (Extract, Transform, Load) pipelines orchestrate this transformation at scale, handling data extraction from diverse sources, feature engineering that captures relevant signals, quality validation that ensures training stability, and efficient loading into training systems.
23.1.1 The Embedding ETL Challenge
Traditional ETL optimizes for data warehousing: schema normalization, aggregation, and SQL-friendly formats. Embedding ETL has unique requirements:
Semantic preservation: Transformations must preserve meaning (normalization can destroy signal)
Feature engineering: Extract features that capture relationships (not just facts)
Scale: Process billions of records efficiently (trillion-row datasets)
Freshness: Keep training data current (embedding drift occurs within weeks)
Multimodal: Handle text, images, structured data, time series simultaneously
Show ETL Pipeline for Embedding Generation
from dataclasses import dataclassfrom typing import List, Dict, Anyimport pandas as pd@dataclassclass EmbeddingRecord:"""Record prepared for embedding generation."""id: str text: str metadata: Dict[str, Any]class EmbeddingETLPipeline:"""ETL pipeline for preparing data for embedding generation."""def extract(self, source: str) -> pd.DataFrame:"""Extract data from source (database, file, API)."""# Example: Load from CSVreturn pd.read_csv(source)def transform(self, df: pd.DataFrame) -> List[EmbeddingRecord]:"""Transform raw data into embedding-ready format.""" records = []for _, row in df.iterrows():# Clean text text =str(row.get('content', '')).strip()# Skip empty or invalid recordsifnot text orlen(text) <10:continue# Extract metadata metadata = {'category': row.get('category'),'timestamp': row.get('created_at'),'source': 'etl_pipeline' } records.append(EmbeddingRecord(id=str(row['id']), text=text, metadata=metadata ))return recordsdef load(self, records: List[EmbeddingRecord], output_path: str):"""Load prepared records for embedding.""" df = pd.DataFrame([ {'id': r.id, 'text': r.text, **r.metadata}for r in records ]) df.to_parquet(output_path, index=False)returnlen(records)# Usage examplepipeline = EmbeddingETLPipeline()# Create sample datasample_df = pd.DataFrame({'id': [1, 2, 3],'content': ['Machine learning article', 'AI research paper', 'Data science tutorial'],'category': ['ML', 'AI', 'DS'],'created_at': ['2024-01-01', '2024-01-02', '2024-01-03']})records = pipeline.transform(sample_df)print(f"Processed {len(records)} records ready for embedding")
Processed 3 records ready for embedding
TipETL Best Practices for Embeddings
Data quality:
Validate at every stage (extract, transform, load)
Implement deduplication (exact and near-duplicate)
Handle missing values explicitly (don’t drop silently)
Monitor data drift (distribution shifts over time)
Performance:
Partition data for parallel processing (100-1000 partitions)
Use columnar formats (Parquet) for analytics
Implement checkpointing for fault tolerance
Optimize for I/O (sequential reads, batching)
Maintainability:
Keep transformations simple and composable
Document feature engineering logic
Version control pipeline code
Test with representative samples before production runs
23.2 Streaming Embedding Updates
Batch ETL processes data hourly or daily, but many applications need real-time embeddings. A news recommender must embed articles seconds after publication. A fraud detector must embed transactions milliseconds after they occur. Streaming architectures enable continuous embedding updates with end-to-end latency measured in seconds, not hours.
23.2.1 Streaming vs. Batch: The Trade-off
Batch processing (hourly/daily):
Advantages: Simple, efficient, easy to debug, supports complex aggregations
Disadvantages: Stale embeddings (hours old), high latency for new items
Use when: Daily updates sufficient, complex transformations required
Stream processing (seconds):
Advantages: Fresh embeddings (seconds old), low latency for new items, event-driven
Disadvantages: Complex architecture, harder to debug, limited aggregation window
Use when: Real-time updates critical, simple transformations, event-driven workflows
Show Streaming Embedding Processor
from queue import Queuefrom typing import Listimport timeimport threadingclass StreamingEmbeddingProcessor:"""Process embeddings in real-time from streaming data."""def__init__(self, batch_size: int=32, batch_timeout_ms: int=100):self.batch_size = batch_sizeself.batch_timeout_ms = batch_timeout_msself.queue = Queue()self.running =Falseself.processor_thread =Nonedef start(self):"""Start the streaming processor."""self.running =Trueself.processor_thread = threading.Thread(target=self._process_loop)self.processor_thread.start()def stop(self):"""Stop the streaming processor."""self.running =Falseifself.processor_thread:self.processor_thread.join()def submit(self, item: dict):"""Submit an item for processing."""self.queue.put(item)def _process_loop(self):"""Main processing loop with micro-batching."""whileself.running: batch =self._collect_batch()if batch:self._process_batch(batch)def _collect_batch(self) -> List[dict]:"""Collect items into micro-batch.""" batch = [] deadline = time.time() + (self.batch_timeout_ms /1000.0)whilelen(batch) <self.batch_size and time.time() < deadline:try: item =self.queue.get(timeout=0.01) batch.append(item)except:breakreturn batchdef _process_batch(self, batch: List[dict]):"""Process a batch of items."""# Simulate embedding generation texts = [item['text'] for item in batch]print(f"Processing batch of {len(batch)} items")# Here you would call your embedding model# embeddings = model.encode(texts)# Usage exampleprocessor = StreamingEmbeddingProcessor(batch_size=10, batch_timeout_ms=100)processor.start()# Simulate streaming datafor i inrange(25): processor.submit({'id': i, 'text': f'Document {i}'}) time.sleep(0.01) # Simulate streaming arrivaltime.sleep(0.2) # Wait for processingprocessor.stop()print("Streaming processor completed")
Processing batch of 8 items
Processing batch of 9 items
Processing batch of 1 items
Processing batch of 2 items
Processing batch of 5 items
Streaming processor completed
TipStreaming Architecture Best Practices
Micro-batching:
Batch window: 100-1000ms (balance latency vs throughput)
Batch size: 10-100 items (optimize for GPU)
Adaptive batching: Adjust based on load
Fault tolerance:
Checkpointing: Save progress every N events
Exactly-once semantics: Idempotent operations
Dead letter queue: Handle failed events separately
Retry logic: Exponential backoff for transient failures
Monitoring:
End-to-end latency (p50, p95, p99)
Throughput (events/second)
Error rate (failures / total events)
Queue depth (backpressure indicator)
WarningStreaming Complexity
Streaming pipelines are significantly more complex than batch:
Debugging: Harder to reproduce issues (event order matters)
Start with batch, migrate to streaming only when business value justifies complexity.
23.3 Data Quality for Embedding Training
Poor data quality causes poor embeddings. Data quality frameworks detect and remediate issues before they corrupt training: duplicate detection prevents training on repeated examples, outlier detection identifies corrupted or adversarial data, consistency validation ensures relationships hold across updates, and drift detection alerts when distributions shift unexpectedly.
23.3.1 The Data Quality Challenge for Embeddings
Traditional data quality focuses on completeness and correctness. Embedding quality has additional requirements:
Semantic consistency: Similar items must have similar features
Production embedding systems evolve: new features are added, old features deprecated, data types change. Schema evolution enables safe changes while maintaining backwards compatibility with existing embeddings, models, and downstream consumers.
23.4.1 The Schema Evolution Challenge
Embedding systems have complex dependencies:
Trained models: Expect specific feature schema
Vector indices: Store embeddings from specific model versions
Downstream consumers: Query embeddings with specific schemas
Historical data: May use old schemas
Change one component, and the entire system can break.
Show Schema Evolution Manager
from typing import Dict, Any, Optionalfrom enum import Enumclass SchemaVersion(Enum):"""Schema version enumeration.""" V1 ="1.0" V2 ="2.0"class SchemaEvolutionManager:"""Manage schema evolution with backwards compatibility."""def__init__(self):self.current_version = SchemaVersion.V2def migrate(self, record: Dict[str, Any], from_version: SchemaVersion) -> Dict[str, Any]:"""Migrate record from old schema to current schema."""if from_version ==self.current_version:return record # No migration needed# Apply migration chainif from_version == SchemaVersion.V1: record =self._migrate_v1_to_v2(record)return recorddef _migrate_v1_to_v2(self, record: Dict[str, Any]) -> Dict[str, Any]:"""Migrate from v1 to v2 schema.""" migrated = record.copy()# V2 added 'category' field with default valueif'category'notin migrated: migrated['category'] ='uncategorized'# V2 renamed 'title' to 'name'if'title'in migrated: migrated['name'] = migrated.pop('title')# V2 added required 'embedding_model' fieldif'embedding_model'notin migrated: migrated['embedding_model'] ='default-v1'# Tag with schema version migrated['__schema_version__'] = SchemaVersion.V2.valuereturn migrateddef validate_schema(self, record: Dict[str, Any], version: SchemaVersion) ->bool:"""Validate record against schema version."""if version == SchemaVersion.V1: required = ['id', 'text', 'title']elif version == SchemaVersion.V2: required = ['id', 'text', 'name', 'category', 'embedding_model']else:returnFalsereturnall(field in record for field in required)def get_schema_version(self, record: Dict[str, Any]) -> Optional[SchemaVersion]:"""Detect schema version from record.""" version_str = record.get('__schema_version__')if version_str:try:return SchemaVersion(version_str)exceptValueError:pass# Fallback: infer from fieldsif'name'in record and'category'in record:return SchemaVersion.V2return SchemaVersion.V1# Usage examplemanager = SchemaEvolutionManager()# Old v1 recordv1_record = {'id': '123','text': 'Machine learning content','title': 'ML Article'}# Detect versionversion = manager.get_schema_version(v1_record)print(f"Detected schema version: {version.value}")# Migrate to current versionv2_record = manager.migrate(v1_record, version)print(f"Migrated record: {v2_record}")print(f"Valid v2 schema: {manager.validate_schema(v2_record, SchemaVersion.V2)}")
Hybrid: Migrate hot data online, cold data offline
Compatibility levels:
Forward compatible: New consumers can read old data
Backward compatible: Old consumers can read new data
Full compatibility: Both directions work
WarningBreaking Changes
Some changes cannot be made backwards-compatible:
Removing required fields
Changing field types incompatibly
Removing entire entities
For breaking changes: 1. Version bump: Increment major version (v1 → v2) 2. Parallel operation: Run both versions simultaneously 3. Gradual migration: Migrate consumers incrementally 4. Deprecation timeline: Announce timeline (3-6 months) 5. Sunset old version: Remove after migration complete
23.5 Multi-Source Data Fusion
Production embedding systems integrate data from multiple sources: user profiles from CRM, product data from inventory, behavioral logs from analytics, external data from partners. Multi-source data fusion combines these heterogeneous datasets into unified embeddings while handling schema mismatches, different update frequencies, and varying data quality.
23.5.1 The Data Fusion Challenge
Each data source has unique characteristics:
Schema: Different field names, types, structures
Frequency: Some update real-time, others daily/weekly
Schema evolution: Version schemas and migrate incrementally
Type safety: Validate types during alignment
Conflict resolution:
Priority-based: Assign priority to sources (authority)
Recency-based: Prefer most recently updated value
Quality-based: Weight by source quality score
Context-aware: Consider semantic meaning
Performance:
Incremental fusion: Only fuse changed entities
Partitioning: Partition by entity_id for parallel fusion
Caching: Cache fused results (invalidate on update)
Lazy loading: Fuse on-demand for rarely accessed entities
23.6 Key Takeaways
ETL pipelines must preserve semantic relationships: Unlike traditional ETL that optimizes for SQL analytics, embedding ETL requires feature engineering that captures similarity and meaning, not just facts
Streaming enables real-time embeddings with sub-second latency: Micro-batching architectures (100-1000ms windows) balance throughput and latency, enabling fresh embeddings for dynamic content like news and social media
Data quality directly determines embedding quality: Comprehensive validation (schema, values, semantics, duplicates, drift) prevents training on corrupted data that would poison embeddings for months
Schema evolution requires careful coordination across components: Backwards-compatible changes (add fields, provide defaults) enable safe evolution while breaking changes (remove fields, change types) require parallel operation and gradual migration
Multi-source fusion combines heterogeneous datasets into unified embeddings: Schema alignment, entity resolution, conflict resolution, and temporal alignment enable leveraging data from multiple systems with different schemas and update frequencies
Data engineering is the foundation of embedding systems: High-quality embeddings require high-quality data engineering; invest in pipelines, quality frameworks, and fusion strategies before scaling models
The data engineering hierarchy: Quality > Schema design > Performance. Impact ratios vary by domain, but garbage-in-garbage-out applies universally—focus on correctness before optimizing throughput
23.7 Looking Ahead
Part III (Production Engineering) concludes with robust data engineering practices that ensure embedding systems have the clean, consistent, high-quality data needed to achieve their potential. Part IV (Advanced Applications) begins with Chapter 24, which explores how to prepare text documents for embedding systems, covering chunking strategies, document-type specific approaches, and metadata preservation for optimal retrieval quality.
23.8 Further Reading
23.8.1 Data Engineering
Kleppmann, Martin (2017). “Designing Data-Intensive Applications.” O’Reilly Media.
Reis, Cathy, and Rupal Mahajan (2019). “Data Engineering with Apache Spark, Delta Lake, and Lakehouse.” O’Reilly Media.
Kalidindi, Santhosh (2021). “Data Engineering with Python.” Packt Publishing.
23.8.2 ETL and Pipelines
Kimball, Ralph, and Margy Ross (2013). “The Data Warehouse Toolkit.” Wiley.
Apache Airflow Documentation. “Best Practices.”
dbt Documentation. “Best Practices for Data Transformation.”
23.8.3 Streaming Systems
Kleppmann, Martin (2016). “Making Sense of Stream Processing.” O’Reilly Media.
Narkhede, Neha, et al. (2017). “Kafka: The Definitive Guide.” O’Reilly Media.
Apache Flink Documentation. “Streaming Concepts.”
23.8.4 Data Quality
Redman, Thomas (2016). “Getting in Front on Data Quality.” Harvard Business Review.
Batini, Carlo, and Monica Scannapieco (2016). “Data and Information Quality.” Springer.
Talend Data Quality Documentation. “Data Quality Best Practices.”
23.8.5 Schema Evolution
Kleppmann, Martin (2015). “Schema Evolution in Avro, Protocol Buffers and Thrift.” Blog post.
Confluent Documentation. “Schema Evolution and Compatibility.”
Fowler, Martin (2016). “Evolutionary Database Design.” martinfowler.com.
23.8.6 Data Integration
Doan, AnHai, et al. (2012). “Principles of Data Integration.” Morgan Kaufmann.
Haas, Laura, et al. (2005). “Clio Grows Up: From Research Prototype to Industrial Tool.” SIGMOD.
Madhavan, Jayant, et al. (2001). “Generic Schema Matching with Cupid.” VLDB.