Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

Appendix: Generating Training Data for Observability

Problem: You want to follow this tutorial series but don’t have OCSF observability data to train on.

Solution: This appendix shows how to generate realistic observability data (logs, metrics, traces, config changes) using Docker Compose to spin up actual infrastructure, then instrument it to produce training data you can release as open source.


Overview: Synthetic Observability Stack

We’ll create a realistic microservices environment that generates:

Architecture:

Diagram explanation:


Download Code Files

All code files from this appendix are available for download:

Download appendix-code.zip

The zip contains a complete, runnable stack:


Section 1: Docker Compose Infrastructure

docker-compose.yml

Create a realistic multi-service application with built-in observability:

docker-compose.yml
# Unified OpenTelemetry Architecture:
# - Logs: Services -> OTLP -> OTel Collector -> logs.jsonl
# - Traces: Services -> OTLP -> OTel Collector -> traces.jsonl
# - Metrics: OTel Collector scrapes /metrics -> metrics.jsonl
#
# All three signals exported to ./logs/otel/ as JSONL files

services:
  # Application Services
  web-api:
    build: ./services/web-api
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://postgres:password@postgres:5432/appdb
      - REDIS_URL=redis://redis:6379
      - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
    depends_on:
      - postgres
      - redis
      - auth-service
    labels:
      - "monitoring=enabled"

  auth-service:
    build: ./services/auth-service
    ports:
      - "8001:8001"
    environment:
      - LDAP_URL=ldap://openldap:389
    labels:
      - "monitoring=enabled"

  payment-worker:
    build: ./services/payment-worker
    environment:
      - CELERY_BROKER_URL=redis://redis:6379
      - DATABASE_URL=postgresql://postgres:password@postgres:5432/appdb
    depends_on:
      - redis
      - postgres
    labels:
      - "monitoring=enabled"

  # Data Stores
  postgres:
    image: postgres:15
    environment:
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=appdb
    volumes:
      - postgres_data:/var/lib/postgresql/data
    labels:
      - "monitoring=enabled"

  redis:
    image: redis:7-alpine
    labels:
      - "monitoring=enabled"

  # Observability Stack
  prometheus:
    image: prom/prometheus:latest
    volumes:
      - ./config/prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus_data:/prometheus
    ports:
      - "9090:9090"
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'

  # Unified telemetry collector - receives logs/traces via OTLP, scrapes metrics
  otel-collector:
    image: otel/opentelemetry-collector-contrib:latest
    user: "1000:1000"  # Run as non-root so exported files are readable
    volumes:
      - ./config/otel-collector-config.yml:/etc/otel/config.yml
      - ./logs/otel:/var/log/otel
    ports:
      - "4317:4317"  # OTLP gRPC (logs, traces)
      - "4318:4318"  # OTLP HTTP (logs, traces)
      - "8888:8888"  # Collector metrics
    command: ["--config=/etc/otel/config.yml"]
    depends_on:
      - web-api

  fluentd:
    image: fluent/fluentd:latest
    volumes:
      - ./config/fluentd.conf:/fluentd/etc/fluent.conf
      - ./logs:/var/log/fluentd
    ports:
      - "24224:24224"
      - "24224:24224/udp"

  # Load Generator (creates traffic patterns)
  load-generator:
    build: ./services/load-generator
    depends_on:
      - web-api
    environment:
      - TARGET_URL=http://web-api:8000
      - NORMAL_RPS=10
      - ANOMALY_PROBABILITY=0.05

volumes:
  postgres_data:
  prometheus_data:

Configuration Files

The observability stack requires these configuration files:

config/prometheus.yml - Metrics collection:

prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'prometheus'
    static_configs:
      - targets: ['localhost:9090']

  - job_name: 'web-api'
    static_configs:
      - targets: ['web-api:8000']
    metrics_path: '/metrics'

  - job_name: 'postgres'
    static_configs:
      - targets: ['postgres:5432']

  - job_name: 'redis'
    static_configs:
      - targets: ['redis:6379']

config/otel-collector-config.yml - Distributed tracing:

otel-collector-config.yml
receivers:
  # OTLP receiver for logs and traces from applications
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318

  # Prometheus receiver to scrape metrics from services
  prometheus:
    config:
      scrape_configs:
        - job_name: 'web-api'
          scrape_interval: 15s
          static_configs:
            - targets: ['web-api:8000']
        - job_name: 'otel-collector'
          scrape_interval: 15s
          static_configs:
            - targets: ['localhost:8888']

processors:
  batch:
    timeout: 1s
    send_batch_size: 1024

  # Add resource attributes
  resource:
    attributes:
      - key: deployment.environment
        value: demo
        action: upsert

exporters:
  debug:
    verbosity: basic

  # File exporter for traces
  file/traces:
    path: /var/log/otel/traces.jsonl
    rotation:
      max_megabytes: 100
      max_days: 1
      max_backups: 3

  # File exporter for logs
  file/logs:
    path: /var/log/otel/logs.jsonl
    rotation:
      max_megabytes: 100
      max_days: 1
      max_backups: 3

  # File exporter for metrics
  file/metrics:
    path: /var/log/otel/metrics.jsonl
    rotation:
      max_megabytes: 100
      max_days: 1
      max_backups: 3

service:
  # Enable telemetry for the collector itself
  telemetry:
    metrics:
      readers:
        - pull:
            exporter:
              prometheus:
                host: 0.0.0.0
                port: 8888

  pipelines:
    # Traces pipeline: OTLP -> file
    traces:
      receivers: [otlp]
      processors: [batch, resource]
      exporters: [debug, file/traces]

    # Logs pipeline: OTLP -> file
    logs:
      receivers: [otlp]
      processors: [batch, resource]
      exporters: [debug, file/logs]

    # Metrics pipeline: Prometheus scraping -> file
    metrics:
      receivers: [prometheus]
      processors: [batch, resource]
      exporters: [debug, file/metrics]

config/fluentd.conf - Log aggregation:

fluentd.conf
<source>
  @type forward
  port 24224
  bind 0.0.0.0
</source>

<filter **>
  @type record_transformer
  <record>
    hostname "#{Socket.gethostname}"
    timestamp ${time}
  </record>
</filter>

<match **>
  @type file
  path /var/log/fluentd/app
  append true
  <format>
    @type json
  </format>
  <buffer>
    timekey 1h
    timekey_wait 10m
    timekey_use_utc true
  </buffer>
</match>

Section 2: Instrumented Services

Web API Service

services/web-api/app.py - Flask service with comprehensive observability:

app.py
from flask import Flask, request, jsonify, Response
import time
import random
import logging
import json
import os
import socket
import uuid
from datetime import datetime
from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST

app = Flask(__name__)

# Service identity
SERVICE_NAME = "web-api"
SERVICE_VERSION = "1.0.0"
HOSTNAME = socket.gethostname()

# Simulated users for realistic data
SIMULATED_USERS = [
    {"uid": "user-1001", "name": "alice.johnson", "email": "alice@company.com", "department": "engineering"},
    {"uid": "user-1002", "name": "bob.smith", "email": "bob@company.com", "department": "sales"},
    {"uid": "user-1003", "name": "carol.williams", "email": "carol@company.com", "department": "support"},
    {"uid": "user-1004", "name": "david.brown", "email": "david@company.com", "department": "marketing"},
    {"uid": "user-1005", "name": "eve.davis", "email": "eve@company.com", "department": "engineering"},
]

# OpenTelemetry setup for unified observability (logs, traces, metrics via OTLP)
otel_logger = None
tracer = None
otlp_endpoint = os.getenv('OTEL_EXPORTER_OTLP_ENDPOINT', 'http://otel-collector:4317')

try:
    from opentelemetry import trace
    from opentelemetry.sdk.trace import TracerProvider
    from opentelemetry.sdk.trace.export import BatchSpanProcessor
    from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
    from opentelemetry.sdk.resources import Resource, SERVICE_NAME as RESOURCE_SERVICE_NAME
    from opentelemetry._logs import set_logger_provider
    from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
    from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
    from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter

    # Create shared resource
    resource = Resource.create({
        RESOURCE_SERVICE_NAME: SERVICE_NAME,
        "service.version": SERVICE_VERSION,
        "host.name": HOSTNAME,
    })

    # Set up tracing
    trace.set_tracer_provider(TracerProvider(resource=resource))
    tracer = trace.get_tracer(__name__)
    otlp_trace_exporter = OTLPSpanExporter(endpoint=otlp_endpoint, insecure=True)
    trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(otlp_trace_exporter))

    # Set up logging via OTLP
    logger_provider = LoggerProvider(resource=resource)
    set_logger_provider(logger_provider)
    otlp_log_exporter = OTLPLogExporter(endpoint=otlp_endpoint, insecure=True)
    logger_provider.add_log_record_processor(BatchLogRecordProcessor(otlp_log_exporter))

    # Create OTel logging handler
    otel_handler = LoggingHandler(level=logging.INFO, logger_provider=logger_provider)
    otel_logger = logging.getLogger("otel." + SERVICE_NAME)
    otel_logger.setLevel(logging.INFO)
    otel_logger.addHandler(otel_handler)

    print(f"[{SERVICE_NAME}] OpenTelemetry enabled: logs and traces via OTLP to {otlp_endpoint}")
except Exception as e:
    print(f"[{SERVICE_NAME}] OpenTelemetry setup failed: {e}")
    otel_logger = None
    tracer = None


class StructuredLogger:
    """Logger that emits OCSF-ready structured JSON logs via OpenTelemetry."""

    def __init__(self, service_name):
        self.service_name = service_name
        # Fallback to stdout if OTel unavailable
        self.fallback_logger = logging.getLogger(service_name + ".fallback")
        self.fallback_logger.setLevel(logging.INFO)
        self.fallback_logger.handlers = []
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter('%(message)s'))
        self.fallback_logger.addHandler(handler)
        self.fallback_logger.propagate = False

    def _emit(self, level, message, **kwargs):
        """Emit a structured log entry with OCSF-compatible fields."""
        timestamp = datetime.utcnow().isoformat() + "Z"

        log_entry = {
            "timestamp": timestamp,
            "time": int(time.time() * 1000),
            "service": self.service_name,
            "level": level,
            "message": message,
            "metadata": {
                "version": "1.0.0",
                "product": {
                    "name": self.service_name,
                    "version": SERVICE_VERSION,
                    "vendor_name": "Demo"
                }
            },
            "device": {
                "hostname": HOSTNAME,
                "type": "server",
                "type_id": 1
            }
        }

        # Add optional OCSF fields
        for key, value in kwargs.items():
            if value is not None:
                log_entry[key] = value

        # Log via OpenTelemetry if available (JSON in message body)
        if otel_logger:
            log_method = getattr(otel_logger, level.lower(), otel_logger.info)
            log_method(json.dumps(log_entry))

        # Always log to stdout for Docker capture (backup)
        self.fallback_logger.info(json.dumps(log_entry))

    def info(self, message, **kwargs):
        self._emit("INFO", message, **kwargs)

    def warning(self, message, **kwargs):
        self._emit("WARNING", message, **kwargs)

    def error(self, message, **kwargs):
        self._emit("ERROR", message, **kwargs)

    def debug(self, message, **kwargs):
        self._emit("DEBUG", message, **kwargs)


# Initialize structured logger
logger = StructuredLogger(SERVICE_NAME)

# Disable verbose werkzeug logging
logging.getLogger('werkzeug').setLevel(logging.WARNING)

# Set up metrics
request_count = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'])
request_duration = Histogram('http_request_duration_seconds', 'HTTP request duration', ['method', 'endpoint'])
db_query_duration = Histogram('db_query_duration_seconds', 'Database query duration', ['query_type'])

# Database and cache connections (lazy initialization)
db_conn = None
cache = None

def get_db():
    global db_conn
    if db_conn is None:
        try:
            import psycopg2
            db_url = os.getenv('DATABASE_URL', 'postgresql://postgres:password@postgres:5432/appdb')
            db_conn = psycopg2.connect(db_url)
            logger.info("Connected to PostgreSQL")
        except Exception as e:
            logger.warning(f"PostgreSQL unavailable: {e}")
    return db_conn

def get_cache():
    global cache
    if cache is None:
        try:
            import redis as redis_lib
            redis_url = os.getenv('REDIS_URL', 'redis://redis:6379')
            cache = redis_lib.from_url(redis_url, decode_responses=True)
            cache.ping()
            logger.info("Connected to Redis")
        except Exception as e:
            logger.warning(f"Redis unavailable: {e}")
    return cache


def get_request_context():
    """Extract request context for structured logging."""
    # Simulate user from request (in production, this would come from auth)
    user = random.choice(SIMULATED_USERS)
    session_id = f"sess-{uuid.uuid4().hex[:12]}"

    # Get client IP (may be forwarded)
    client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
    if client_ip and ',' in client_ip:
        client_ip = client_ip.split(',')[0].strip()

    return {
        "actor": {
            "user": {
                "uid": user["uid"],
                "name": user["name"],
                "email": user["email"]
            },
            "session": {
                "uid": session_id,
                "created_time": int(time.time() * 1000)
            }
        },
        "src_endpoint": {
            "ip": client_ip or "192.168.1.100",
            "port": random.randint(30000, 65000),
            "domain": request.headers.get('Host', 'unknown')
        },
        "dst_endpoint": {
            "ip": "10.0.0.1",
            "port": 8000,
            "svc_name": SERVICE_NAME
        },
        "http_request": {
            "method": request.method,
            "url": {
                "path": request.path,
                "query_string": request.query_string.decode() if request.query_string else "",
                "scheme": request.scheme,
                "hostname": request.host
            },
            "user_agent": request.headers.get('User-Agent', 'unknown'),
            "http_headers": dict(list(request.headers)[:5])  # First 5 headers
        }
    }


def log_api_activity(message, status_code, duration_ms, trace_id=None, activity_id=1, **extra):
    """Log an API activity event with full OCSF context."""
    ctx = get_request_context()

    # Determine status
    if status_code >= 500:
        status_id = 2  # Failure
        severity_id = 4  # Error
    elif status_code >= 400:
        status_id = 2  # Failure
        severity_id = 3  # Warning
    else:
        status_id = 1  # Success
        severity_id = 2  # Info

    # Calculate type_uid: class_uid * 100 + activity_id
    class_uid = 6003  # API Activity
    type_uid = class_uid * 100 + activity_id

    logger.info(
        message,
        class_uid=class_uid,
        class_name="API Activity",
        category_uid=6,
        category_name="Application Activity",
        activity_id=activity_id,
        activity_name=["Unknown", "Create", "Read", "Update", "Delete"][min(activity_id, 4)],
        type_uid=type_uid,
        severity_id=severity_id,
        status_id=status_id,
        status_code=str(status_code),
        status=["Unknown", "Success", "Failure"][status_id],
        duration=duration_ms,
        trace_id=str(trace_id) if trace_id else None,
        actor=ctx["actor"],
        src_endpoint=ctx["src_endpoint"],
        dst_endpoint=ctx["dst_endpoint"],
        http_request=ctx["http_request"],
        http_response={
            "code": status_code,
            "status": "OK" if status_code < 400 else "Error",
            "latency": duration_ms
        },
        **extra
    )


@app.route('/api/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
    """Fetch user data - normal operation."""
    start_time = time.time()
    trace_id = None

    # Start span if tracing available
    span = None
    if tracer:
        span = tracer.start_span("get_user")
        span.set_attribute("user.id", user_id)
        trace_id = span.get_span_context().trace_id

    try:
        # Check cache first
        redis_cache = get_cache()
        if redis_cache:
            try:
                cached = redis_cache.get(f"user:{user_id}")
                if cached:
                    duration_ms = (time.time() - start_time) * 1000
                    request_count.labels(method='GET', endpoint='/api/users', status=200).inc()
                    request_duration.labels(method='GET', endpoint='/api/users').observe(time.time() - start_time)
                    log_api_activity(
                        f"Cache hit for user {user_id}",
                        status_code=200,
                        duration_ms=duration_ms,
                        trace_id=trace_id,
                        activity_id=2,  # Read
                        resources=[{"type": "user", "uid": str(user_id), "data": {"source": "cache"}}]
                    )
                    return jsonify({"user_id": user_id, "source": "cache"})
            except Exception as e:
                logger.warning(f"Cache error: {e}")

        # Query database
        conn = get_db()
        if conn:
            try:
                db_start = time.time()
                cursor = conn.cursor()
                cursor.execute("SELECT id FROM users WHERE id = %s", (user_id,))
                result = cursor.fetchone()
                db_query_duration.labels(query_type='SELECT').observe(time.time() - db_start)

                if result:
                    if redis_cache:
                        redis_cache.setex(f"user:{user_id}", 300, str(result))
                    duration_ms = (time.time() - start_time) * 1000
                    request_count.labels(method='GET', endpoint='/api/users', status=200).inc()
                    request_duration.labels(method='GET', endpoint='/api/users').observe(time.time() - start_time)
                    log_api_activity(
                        f"User {user_id} fetched from database",
                        status_code=200,
                        duration_ms=duration_ms,
                        trace_id=trace_id,
                        activity_id=2,  # Read
                        resources=[{"type": "user", "uid": str(user_id), "data": {"source": "database"}}]
                    )
                    return jsonify({"user_id": user_id, "source": "database"})
            except Exception as e:
                logger.warning(f"Database error: {e}")

        # User not found or DB unavailable - return mock data for demo
        duration_ms = (time.time() - start_time) * 1000
        request_count.labels(method='GET', endpoint='/api/users', status=200).inc()
        request_duration.labels(method='GET', endpoint='/api/users').observe(time.time() - start_time)
        log_api_activity(
            f"Returning mock user {user_id}",
            status_code=200,
            duration_ms=duration_ms,
            trace_id=trace_id,
            activity_id=2,  # Read
            resources=[{"type": "user", "uid": str(user_id), "data": {"source": "mock"}}]
        )
        return jsonify({"user_id": user_id, "source": "mock", "name": f"User {user_id}"})

    except Exception as e:
        duration_ms = (time.time() - start_time) * 1000
        request_count.labels(method='GET', endpoint='/api/users', status=500).inc()
        log_api_activity(
            f"Error fetching user {user_id}: {str(e)}",
            status_code=500,
            duration_ms=duration_ms,
            trace_id=trace_id,
            activity_id=2,
            error={"message": str(e), "type": type(e).__name__}
        )
        return jsonify({"error": "Internal server error"}), 500

    finally:
        if span:
            span.end()


@app.route('/api/checkout', methods=['POST'])
def checkout():
    """Checkout operation - can trigger anomalies."""
    start_time = time.time()
    trace_id = None

    span = None
    if tracer:
        span = tracer.start_span("checkout")
        trace_id = span.get_span_context().trace_id

    try:
        # Simulate anomalies based on environment variable
        anomaly_prob = float(os.getenv('ANOMALY_PROBABILITY', 0.05))

        if random.random() < anomaly_prob:
            # ANOMALY: Simulate various failure modes
            anomaly_type = random.choice(['db_timeout', 'memory_leak', 'slow_query', 'cache_miss_storm'])

            if anomaly_type == 'db_timeout':
                time.sleep(5)  # Simulate slow query
                duration_ms = (time.time() - start_time) * 1000
                request_count.labels(method='POST', endpoint='/api/checkout', status=504).inc()
                log_api_activity(
                    "Database timeout during checkout",
                    status_code=504,
                    duration_ms=duration_ms,
                    trace_id=trace_id,
                    activity_id=1,  # Create
                    anomaly={"type": "db_timeout", "severity": "high"},
                    error={"message": "Database timeout", "type": "TimeoutError"}
                )
                return jsonify({"error": "Database timeout"}), 504

            elif anomaly_type == 'memory_leak':
                # Simulate memory leak by holding large objects
                leak = ["x" * 1000000 for _ in range(100)]  # 100MB allocation
                duration_ms = (time.time() - start_time) * 1000
                log_api_activity(
                    "High memory allocation during checkout",
                    status_code=200,
                    duration_ms=duration_ms,
                    trace_id=trace_id,
                    activity_id=1,
                    anomaly={"type": "memory_leak", "severity": "medium", "memory_mb": 100}
                )

            elif anomaly_type == 'slow_query':
                time.sleep(random.uniform(2, 5))
                duration_ms = (time.time() - start_time) * 1000
                log_api_activity(
                    f"Slow checkout processing: {duration_ms:.0f}ms",
                    status_code=200,
                    duration_ms=duration_ms,
                    trace_id=trace_id,
                    activity_id=1,
                    anomaly={"type": "slow_query", "severity": "medium", "threshold_ms": 2000}
                )

            elif anomaly_type == 'cache_miss_storm':
                # Simulate cache invalidation causing DB overload
                redis_cache = get_cache()
                if redis_cache:
                    for i in range(50):
                        redis_cache.delete(f"user:{i}")
                duration_ms = (time.time() - start_time) * 1000
                log_api_activity(
                    "Cache miss storm detected",
                    status_code=200,
                    duration_ms=duration_ms,
                    trace_id=trace_id,
                    activity_id=1,
                    anomaly={"type": "cache_miss_storm", "severity": "high", "invalidated_keys": 50}
                )

        # Normal checkout flow
        duration_ms = (time.time() - start_time) * 1000
        request_count.labels(method='POST', endpoint='/api/checkout', status=200).inc()
        request_duration.labels(method='POST', endpoint='/api/checkout').observe(time.time() - start_time)
        log_api_activity(
            "Checkout completed successfully",
            status_code=200,
            duration_ms=duration_ms,
            trace_id=trace_id,
            activity_id=1,  # Create
            resources=[{"type": "order", "uid": f"order-{uuid.uuid4().hex[:8]}"}]
        )
        return jsonify({"status": "success"})

    finally:
        if span:
            span.end()


@app.route('/api/search', methods=['GET'])
def search():
    """Search endpoint for load generator."""
    start_time = time.time()
    query = request.args.get('q', '')

    # Simulate search delay
    time.sleep(random.uniform(0.01, 0.1))

    duration_ms = (time.time() - start_time) * 1000
    request_count.labels(method='GET', endpoint='/api/search', status=200).inc()
    request_duration.labels(method='GET', endpoint='/api/search').observe(time.time() - start_time)
    log_api_activity(
        f"Search completed for query: {query}",
        status_code=200,
        duration_ms=duration_ms,
        activity_id=2,  # Read
        resources=[{"type": "search", "data": {"query": query, "results_count": 0}}]
    )

    return jsonify({"results": [], "query": query})


@app.route('/metrics')
def metrics():
    """Prometheus metrics endpoint."""
    return Response(generate_latest(), mimetype=CONTENT_TYPE_LATEST)


@app.route('/health')
def health():
    """Health check endpoint."""
    return jsonify({"status": "healthy"})


if __name__ == '__main__':
    logger.info("Starting web-api service on port 8000")
    app.run(host='0.0.0.0', port=8000, threaded=True)

services/web-api/Dockerfile:

Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY app.py .

EXPOSE 8000

CMD ["python", "app.py"]

services/web-api/requirements.txt:

requirements.txt
flask==3.0.0
prometheus-client==0.19.0
opentelemetry-api==1.22.0
opentelemetry-sdk==1.22.0
opentelemetry-exporter-otlp==1.22.0
psycopg2-binary==2.9.9
redis==5.0.1

Auth Service

services/auth-service/app.js - Node.js authentication service:

app.js
const express = require('express');
const app = express();

app.use(express.json());

// Simple auth endpoint for demo purposes
app.post('/auth/validate', (req, res) => {
    const { token } = req.body;

    // Simulate auth validation with some latency
    const latency = Math.random() * 50 + 10;
    setTimeout(() => {
        console.log(JSON.stringify({
            timestamp: new Date().toISOString(),
            service: 'auth-service',
            level: 'INFO',
            message: `Token validation request`,
            duration_ms: latency
        }));

        res.json({ valid: true, user_id: Math.floor(Math.random() * 1000) });
    }, latency);
});

app.get('/health', (req, res) => {
    res.json({ status: 'healthy' });
});

const PORT = process.env.PORT || 8001;
app.listen(PORT, () => {
    console.log(`Auth service listening on port ${PORT}`);
});

services/auth-service/Dockerfile:

Dockerfile
FROM node:20-slim

WORKDIR /app

COPY package.json .
RUN npm install

COPY app.js .

EXPOSE 8001

CMD ["node", "app.js"]

services/auth-service/package.json:

package.json
{
  "name": "auth-service",
  "version": "1.0.0",
  "description": "Simple auth service for observability demo",
  "main": "app.js",
  "dependencies": {
    "express": "^4.18.2"
  }
}

Payment Worker

services/payment-worker/worker.py - Celery background worker:

worker.py
import os
import time
import random
import json
import logging
from datetime import datetime

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='{"timestamp":"%(asctime)s","service":"payment-worker","level":"%(levelname)s","message":"%(message)s"}'
)
logger = logging.getLogger(__name__)

def process_payment(payment_id):
    """Simulate payment processing with logging."""
    start_time = time.time()

    # Simulate processing time
    processing_time = random.uniform(0.1, 2.0)
    time.sleep(processing_time)

    # Simulate occasional failures
    if random.random() < 0.05:
        logger.error(f"Payment {payment_id} failed: timeout")
        return False

    duration = (time.time() - start_time) * 1000
    logger.info(f"Payment {payment_id} processed successfully in {duration:.2f}ms")
    return True

def main():
    """Simple worker loop for demo purposes."""
    logger.info("Payment worker started")

    while True:
        # Simulate receiving jobs from queue
        payment_id = random.randint(10000, 99999)
        process_payment(payment_id)

        # Wait before next job
        time.sleep(random.uniform(1, 5))

if __name__ == '__main__':
    main()

services/payment-worker/Dockerfile:

Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY worker.py .

CMD ["python", "worker.py"]

services/payment-worker/requirements.txt:

requirements.txt
celery==5.3.4
redis==5.0.1
psycopg2-binary==2.9.9

Section 3: Load Generator

services/load-generator/generate_load.py - Generates realistic traffic patterns with controlled anomalies:

generate_load.py
import requests
import time
import random
from datetime import datetime, timedelta
import json
import os

class LoadGenerator:
    """
    Generate realistic observability data with normal and anomalous patterns.
    """
    def __init__(self, target_url, normal_rps=10, anomaly_probability=0.05):
        self.target_url = target_url
        self.normal_rps = normal_rps
        self.anomaly_probability = anomaly_probability
        self.session = requests.Session()

    def generate_normal_traffic(self):
        """Generate normal user traffic patterns."""
        patterns = [
            # Pattern 1: User browsing
            lambda: self.session.get(f"{self.target_url}/api/users/{random.randint(1, 1000)}"),

            # Pattern 2: Search
            lambda: self.session.get(f"{self.target_url}/api/search?q=product"),

            # Pattern 3: Checkout (normal)
            lambda: self.session.post(f"{self.target_url}/api/checkout", json={"cart_id": random.randint(1, 100)}),
        ]

        pattern = random.choice(patterns)
        try:
            response = pattern()
            print(f"[NORMAL] {response.status_code} - {response.url}")
        except Exception as e:
            print(f"[ERROR] {str(e)}")

    def generate_anomaly_scenario(self):
        """Generate specific anomaly scenarios."""
        scenarios = [
            self.scenario_deployment_memory_leak,
            self.scenario_database_connection_pool_exhaustion,
            self.scenario_cache_invalidation_storm,
            self.scenario_slow_query_cascade,
        ]

        scenario = random.choice(scenarios)
        print(f"\n[ANOMALY] Starting scenario: {scenario.__name__}")
        scenario()

    def scenario_deployment_memory_leak(self):
        """
        Simulate memory leak after deployment (gradual degradation).

        Sequence:
        1. Deployment completes (config change)
        2. Memory usage gradually increases
        3. GC pressure increases
        4. Query latency spikes
        5. Connection pool exhaustion
        """
        print("  -> Simulating deployment with memory leak")

        # Generate increasing load over 5 minutes
        for i in range(60):
            try:
                # Each request allocates more memory in the service
                response = self.session.post(
                    f"{self.target_url}/api/checkout",
                    json={"trigger_memory_leak": True}
                )
                print(f"  -> Minute {i//12}: Memory pressure increasing")
                time.sleep(5)
            except Exception as e:
                print(f"  -> Service degraded: {str(e)}")
                break

    def scenario_database_connection_pool_exhaustion(self):
        """
        Simulate DB connection pool exhaustion.

        Sequence:
        1. Spike in concurrent requests
        2. Slow queries hold connections
        3. Pool exhausted
        4. New requests timeout
        """
        print("  -> Simulating DB connection pool exhaustion")

        import concurrent.futures

        def slow_request():
            try:
                response = self.session.get(
                    f"{self.target_url}/api/users/{random.randint(1, 1000)}",
                    timeout=10
                )
                return response.status_code
            except:
                return 500

        # Flood with 100 concurrent requests
        with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
            futures = [executor.submit(slow_request) for _ in range(100)]
            results = [f.result() for f in concurrent.futures.as_completed(futures)]

        print(f"  -> Results: {results.count(200)} success, {results.count(500)} failed")

    def scenario_cache_invalidation_storm(self):
        """
        Simulate cache invalidation causing DB overload.

        Sequence:
        1. Cache gets invalidated (deployment or manual flush)
        2. All requests hit database
        3. DB overloaded
        4. Latency spikes
        """
        print("  -> Simulating cache invalidation storm")

        # Trigger cache invalidation
        self.session.post(f"{self.target_url}/admin/cache/flush")

        # Generate high read traffic (all cache misses)
        for i in range(100):
            try:
                response = self.session.get(
                    f"{self.target_url}/api/users/{random.randint(1, 10000)}"
                )
                if i % 10 == 0:
                    print(f"  -> Cache miss {i}: {response.elapsed.total_seconds():.2f}s")
            except:
                pass
            time.sleep(0.1)

    def scenario_slow_query_cascade(self):
        """
        Simulate slow query causing cascading failure.

        Sequence:
        1. One slow query blocks resources
        2. Other queries queue up
        3. Thread pool exhaustion
        4. Service unresponsive
        """
        print("  -> Simulating slow query cascade")

        # Trigger slow query
        self.session.post(
            f"{self.target_url}/api/analytics",
            json={"trigger_slow_query": True}
        )

        time.sleep(30)  # Let cascade develop

    def run(self, duration_minutes=60):
        """
        Run load generator for specified duration.

        Args:
            duration_minutes: How long to generate traffic
        """
        end_time = datetime.now() + timedelta(minutes=duration_minutes)

        print(f"Starting load generator (RPS: {self.normal_rps}, anomaly prob: {self.anomaly_probability})")
        print(f"Will run until: {end_time}")

        request_count = 0
        anomaly_count = 0

        while datetime.now() < end_time:
            # Decide if this cycle is normal or anomaly
            if random.random() < self.anomaly_probability:
                self.generate_anomaly_scenario()
                anomaly_count += 1
            else:
                # Generate normal traffic at target RPS
                for _ in range(self.normal_rps):
                    self.generate_normal_traffic()
                    time.sleep(1.0 / self.normal_rps)
                    request_count += 1

            # Print progress every minute
            if request_count % (self.normal_rps * 60) == 0:
                print(f"\n[PROGRESS] Generated {request_count} requests, {anomaly_count} anomaly scenarios")

if __name__ == '__main__':
    target_url = os.getenv('TARGET_URL', 'http://web-api:8000')
    normal_rps = int(os.getenv('NORMAL_RPS', 10))
    anomaly_probability = float(os.getenv('ANOMALY_PROBABILITY', 0.05))

    generator = LoadGenerator(target_url, normal_rps, anomaly_probability)
    generator.run(duration_minutes=120)  # Run for 2 hours

services/load-generator/Dockerfile:

Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY generate_load.py .

CMD ["python", "generate_load.py"]

services/load-generator/requirements.txt:

requirements.txt
requests==2.31.0

Section 4: Converting OpenTelemetry Data to OCSF

The OpenTelemetry Collector exports all three signals (logs, traces, metrics) as JSONL files to ./logs/otel/. A single unified script converts them to OCSF-formatted Parquet files.

scripts/convert_otel_to_ocsf.py

Convert all OpenTelemetry JSONL exports to OCSF format:

convert_otel_to_ocsf.py
#!/usr/bin/env python3
"""
Convert OpenTelemetry JSONL exports to OCSF (Open Cybersecurity Schema Framework) format.

This unified converter handles all three telemetry signals exported by the OTel collector:
- logs.jsonl -> ocsf_logs.parquet
- traces.jsonl -> ocsf_traces.parquet
- metrics.jsonl -> ocsf_metrics.parquet

Usage:
    # Convert all signals at once
    python scripts/convert_otel_to_ocsf.py

    # Convert specific signal
    python scripts/convert_otel_to_ocsf.py --signal logs
    python scripts/convert_otel_to_ocsf.py --signal traces
    python scripts/convert_otel_to_ocsf.py --signal metrics
"""

import json
import os
import sys
import argparse
from datetime import datetime
from pathlib import Path

try:
    import pandas as pd
except ImportError:
    print("Error: pandas is required. Install with: pip install pandas pyarrow")
    sys.exit(1)


class OTelToOCSFConverter:
    """Convert OpenTelemetry JSONL exports to OCSF format."""

    # OCSF class definitions
    OCSF_CLASSES = {
        6001: "Web Resources Activity",
        6002: "Application Lifecycle",
        6003: "API Activity",
        6004: "Web Resource Access Activity",
        99: "Metric",  # Custom for metrics
    }

    SEVERITY_MAP = {
        'TRACE': 0,
        'DEBUG': 1,
        'INFO': 2,
        'WARN': 3,
        'WARNING': 3,
        'ERROR': 4,
        'FATAL': 5,
        'CRITICAL': 6,
    }

    def convert_logs(self, input_path, output_path):
        """Convert OTel logs JSONL to OCSF parquet."""
        if not os.path.exists(input_path):
            print(f"Logs file not found: {input_path}")
            return None

        ocsf_events = []
        with open(input_path, 'r') as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                try:
                    otel_record = json.loads(line)
                    events = self._parse_otel_log_record(otel_record)
                    ocsf_events.extend(events)
                except json.JSONDecodeError:
                    continue

        if not ocsf_events:
            print(f"No valid log events found in {input_path}")
            return None

        return self._save_to_parquet(ocsf_events, output_path, "logs")

    def _parse_otel_log_record(self, otel_record):
        """Parse OTel log export format and extract OCSF events."""
        events = []

        # OTel log export has resourceLogs -> scopeLogs -> logRecords
        for resource_log in otel_record.get('resourceLogs', []):
            resource = resource_log.get('resource', {})
            resource_attrs = self._extract_attributes(resource.get('attributes', []))

            for scope_log in resource_log.get('scopeLogs', []):
                for log_record in scope_log.get('logRecords', []):
                    event = self._log_record_to_ocsf(log_record, resource_attrs)
                    if event:
                        events.append(event)

        return events

    def _log_record_to_ocsf(self, log_record, resource_attrs):
        """Convert a single OTel log record to OCSF format."""
        try:
            # Extract timestamp (nanoseconds to milliseconds)
            time_unix_nano = log_record.get('timeUnixNano', 0)
            if isinstance(time_unix_nano, str):
                time_unix_nano = int(time_unix_nano)
            time_ms = time_unix_nano // 1_000_000

            # Get severity
            severity_number = log_record.get('severityNumber', 9)  # Default INFO
            severity_text = log_record.get('severityText', 'INFO')
            severity_id = min(severity_number // 4, 6)  # OTel severity 1-24 -> OCSF 0-6

            # Get message body
            body = log_record.get('body', {})
            message = body.get('stringValue', '') if isinstance(body, dict) else str(body)

            # Try to parse structured log from message
            ocsf_data = {}
            if message.startswith('{'):
                try:
                    ocsf_data = json.loads(message)
                except json.JSONDecodeError:
                    pass

            # Extract attributes
            attrs = self._extract_attributes(log_record.get('attributes', []))

            # Build OCSF event
            ocsf_event = {
                # Core OCSF fields
                "class_uid": ocsf_data.get('class_uid', 6003),
                "class_name": ocsf_data.get('class_name', self.OCSF_CLASSES.get(6003)),
                "category_uid": ocsf_data.get('category_uid', 6),
                "category_name": ocsf_data.get('category_name', "Application Activity"),
                "activity_id": ocsf_data.get('activity_id', 1),
                "activity_name": ocsf_data.get('activity_name', "Unknown"),
                "type_uid": ocsf_data.get('type_uid', 600301),
                "severity_id": ocsf_data.get('severity_id', severity_id),
                "time": ocsf_data.get('time', time_ms),

                # Status fields
                "status_id": ocsf_data.get('status_id', 1),
                "status": ocsf_data.get('status', 'Success'),
                "status_code": ocsf_data.get('status_code', '200'),

                # Message and service
                "message": ocsf_data.get('message', message),
                "service": ocsf_data.get('service', resource_attrs.get('service.name', 'unknown')),
                "level": ocsf_data.get('level', severity_text),

                # Duration
                "duration": ocsf_data.get('duration'),

                # Trace context
                "trace_id": log_record.get('traceId', ocsf_data.get('trace_id')),
                "span_id": log_record.get('spanId'),
            }

            # Handle nested objects from structured log
            self._flatten_nested_objects(ocsf_event, ocsf_data)

            # Add resource attributes
            ocsf_event["service_version"] = resource_attrs.get('service.version')
            ocsf_event["host_name"] = resource_attrs.get('host.name')

            # Remove None values
            ocsf_event = {k: v for k, v in ocsf_event.items() if v is not None}

            return ocsf_event

        except Exception as e:
            return None

    def _flatten_nested_objects(self, ocsf_event, ocsf_data):
        """Flatten nested OCSF objects for ML use."""
        # Metadata
        metadata = ocsf_data.get('metadata', {})
        if metadata:
            ocsf_event["metadata"] = json.dumps(metadata)
            ocsf_event["metadata_version"] = metadata.get('version')
            product = metadata.get('product', {})
            ocsf_event["metadata_product_name"] = product.get('name')
            ocsf_event["metadata_product_version"] = product.get('version')
            ocsf_event["metadata_product_vendor_name"] = product.get('vendor_name')

        # Actor
        actor = ocsf_data.get('actor', {})
        if actor:
            ocsf_event["actor"] = json.dumps(actor)
            user = actor.get('user', {})
            ocsf_event["actor_user_uid"] = user.get('uid')
            ocsf_event["actor_user_name"] = user.get('name')
            ocsf_event["actor_user_email"] = user.get('email')
            session = actor.get('session', {})
            ocsf_event["actor_session_uid"] = session.get('uid')

        # Source endpoint
        src = ocsf_data.get('src_endpoint', {})
        if src:
            ocsf_event["src_endpoint"] = json.dumps(src)
            ocsf_event["src_endpoint_ip"] = src.get('ip')
            ocsf_event["src_endpoint_port"] = src.get('port')
            ocsf_event["src_endpoint_domain"] = src.get('domain')

        # Destination endpoint
        dst = ocsf_data.get('dst_endpoint', {})
        if dst:
            ocsf_event["dst_endpoint"] = json.dumps(dst)
            ocsf_event["dst_endpoint_ip"] = dst.get('ip')
            ocsf_event["dst_endpoint_port"] = dst.get('port')
            ocsf_event["dst_endpoint_svc_name"] = dst.get('svc_name')

        # HTTP request
        http_req = ocsf_data.get('http_request', {})
        if http_req:
            ocsf_event["http_request"] = json.dumps(http_req)
            ocsf_event["http_request_method"] = http_req.get('method')
            ocsf_event["http_request_user_agent"] = http_req.get('user_agent')
            url = http_req.get('url', {})
            ocsf_event["http_request_url_path"] = url.get('path')
            ocsf_event["http_request_url_hostname"] = url.get('hostname')
            ocsf_event["http_request_url_scheme"] = url.get('scheme')

        # HTTP response
        http_resp = ocsf_data.get('http_response', {})
        if http_resp:
            ocsf_event["http_response"] = json.dumps(http_resp)
            ocsf_event["http_response_code"] = http_resp.get('code')
            ocsf_event["http_response_status"] = http_resp.get('status')
            ocsf_event["http_response_latency"] = http_resp.get('latency')

        # Device
        device = ocsf_data.get('device', {})
        if device:
            ocsf_event["device"] = json.dumps(device)
            ocsf_event["device_hostname"] = device.get('hostname')
            ocsf_event["device_type"] = device.get('type')

        # Resources
        resources = ocsf_data.get('resources', [])
        if resources:
            ocsf_event["resources"] = json.dumps(resources)
            if len(resources) > 0:
                ocsf_event["resource_type"] = resources[0].get('type')
                ocsf_event["resource_uid"] = resources[0].get('uid')

        # Anomaly
        anomaly = ocsf_data.get('anomaly', {})
        if anomaly:
            ocsf_event["anomaly"] = json.dumps(anomaly)
            ocsf_event["anomaly_type"] = anomaly.get('type')
            ocsf_event["anomaly_severity"] = anomaly.get('severity')

        # Error
        error = ocsf_data.get('error', {})
        if error:
            ocsf_event["error"] = json.dumps(error)
            ocsf_event["error_message"] = error.get('message')
            ocsf_event["error_type"] = error.get('type')

    def convert_traces(self, input_path, output_path):
        """Convert OTel traces JSONL to OCSF parquet."""
        if not os.path.exists(input_path):
            print(f"Traces file not found: {input_path}")
            return None

        ocsf_events = []
        with open(input_path, 'r') as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                try:
                    otel_record = json.loads(line)
                    events = self._parse_otel_trace_record(otel_record)
                    ocsf_events.extend(events)
                except json.JSONDecodeError:
                    continue

        if not ocsf_events:
            print(f"No valid trace events found in {input_path}")
            return None

        return self._save_to_parquet(ocsf_events, output_path, "traces")

    def _parse_otel_trace_record(self, otel_record):
        """Parse OTel trace export format and extract OCSF events."""
        events = []

        for resource_span in otel_record.get('resourceSpans', []):
            resource = resource_span.get('resource', {})
            resource_attrs = self._extract_attributes(resource.get('attributes', []))

            for scope_span in resource_span.get('scopeSpans', []):
                for span in scope_span.get('spans', []):
                    event = self._span_to_ocsf(span, resource_attrs)
                    if event:
                        events.append(event)

        return events

    def _span_to_ocsf(self, span, resource_attrs):
        """Convert a single OTel span to OCSF format."""
        try:
            # Extract timing (nanoseconds to milliseconds)
            start_time = int(span.get('startTimeUnixNano', '0')) // 1_000_000
            end_time = int(span.get('endTimeUnixNano', '0')) // 1_000_000
            duration_ms = end_time - start_time if end_time > start_time else 0

            # Extract attributes
            attrs = self._extract_attributes(span.get('attributes', []))

            # Determine activity type from span kind
            span_kind = span.get('kind', 1)
            activity_map = {1: 1, 2: 2, 3: 2, 4: 1, 5: 1}  # Internal, Server, Client, Producer, Consumer
            activity_id = activity_map.get(span_kind, 1)

            # Determine status
            status = span.get('status', {})
            status_code = status.get('code', 0)
            status_id = 1 if status_code == 0 else 2

            # Build OCSF event
            ocsf_event = {
                "class_uid": 6003,
                "class_name": "API Activity",
                "category_uid": 6,
                "category_name": "Application Activity",
                "activity_id": activity_id,
                "activity_name": ["Unknown", "Create", "Read", "Update", "Delete"][min(activity_id, 4)],
                "type_uid": 600300 + activity_id,
                "severity_id": 4 if status_id == 2 else 2,
                "time": start_time,

                "status_id": status_id,
                "status": "Success" if status_id == 1 else "Failure",
                "status_code": str(attrs.get('http.status_code', 200)),

                "message": span.get('name', ''),
                "service": resource_attrs.get('service.name', 'unknown'),
                "duration": duration_ms,

                "trace_id": span.get('traceId'),
                "span_id": span.get('spanId'),
                "parent_span_id": span.get('parentSpanId'),

                # HTTP attributes if present
                "http_request_method": attrs.get('http.method'),
                "http_request_url_path": attrs.get('http.target') or attrs.get('http.url'),
                "http_response_code": attrs.get('http.status_code'),
            }

            # Remove None values
            ocsf_event = {k: v for k, v in ocsf_event.items() if v is not None}

            return ocsf_event

        except Exception as e:
            return None

    def convert_metrics(self, input_path, output_path):
        """Convert OTel metrics JSONL to OCSF parquet."""
        if not os.path.exists(input_path):
            print(f"Metrics file not found: {input_path}")
            return None

        ocsf_events = []
        with open(input_path, 'r') as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                try:
                    otel_record = json.loads(line)
                    events = self._parse_otel_metrics_record(otel_record)
                    ocsf_events.extend(events)
                except json.JSONDecodeError:
                    continue

        if not ocsf_events:
            print(f"No valid metric events found in {input_path}")
            return None

        return self._save_to_parquet(ocsf_events, output_path, "metrics")

    def _parse_otel_metrics_record(self, otel_record):
        """Parse OTel metrics export format and extract OCSF events."""
        events = []

        for resource_metric in otel_record.get('resourceMetrics', []):
            resource = resource_metric.get('resource', {})
            resource_attrs = self._extract_attributes(resource.get('attributes', []))

            for scope_metric in resource_metric.get('scopeMetrics', []):
                for metric in scope_metric.get('metrics', []):
                    metric_events = self._metric_to_ocsf(metric, resource_attrs)
                    events.extend(metric_events)

        return events

    def _metric_to_ocsf(self, metric, resource_attrs):
        """Convert a single OTel metric to OCSF format."""
        events = []
        metric_name = metric.get('name', 'unknown')
        metric_unit = metric.get('unit', '')
        metric_description = metric.get('description', '')

        # Handle different metric types
        data_points = []
        if 'sum' in metric:
            data_points = metric['sum'].get('dataPoints', [])
            metric_type = 'counter'
        elif 'gauge' in metric:
            data_points = metric['gauge'].get('dataPoints', [])
            metric_type = 'gauge'
        elif 'histogram' in metric:
            data_points = metric['histogram'].get('dataPoints', [])
            metric_type = 'histogram'
        elif 'summary' in metric:
            data_points = metric['summary'].get('dataPoints', [])
            metric_type = 'summary'
        else:
            return events

        for dp in data_points:
            try:
                # Get timestamp
                time_unix_nano = dp.get('timeUnixNano', 0)
                if isinstance(time_unix_nano, str):
                    time_unix_nano = int(time_unix_nano)
                time_ms = time_unix_nano // 1_000_000

                # Get value
                if 'asDouble' in dp:
                    value = dp['asDouble']
                elif 'asInt' in dp:
                    value = dp['asInt']
                elif 'sum' in dp:
                    value = dp['sum']
                elif 'count' in dp:
                    value = dp['count']
                else:
                    value = 0

                # Get labels
                labels = self._extract_attributes(dp.get('attributes', []))

                ocsf_event = {
                    "class_uid": 99,  # Custom metric class
                    "class_name": "Metric",
                    "category_uid": 6,
                    "category_name": "Application Activity",
                    "activity_id": 0,
                    "activity_name": "Observe",
                    "type_uid": 9900,
                    "severity_id": 2,  # Info
                    "time": time_ms,

                    "metric_name": metric_name,
                    "metric_value": value,
                    "metric_type": metric_type,
                    "metric_unit": metric_unit,
                    "metric_description": metric_description,

                    "service": resource_attrs.get('service.name') or labels.get('job', 'unknown'),

                    # Common metric labels
                    "endpoint": labels.get('endpoint'),
                    "method": labels.get('method'),
                    "status": labels.get('status'),
                    "job": labels.get('job'),
                    "instance": labels.get('instance'),
                }

                # Add all labels as flattened fields
                for k, v in labels.items():
                    key = f"label_{k.replace('.', '_').replace('-', '_')}"
                    ocsf_event[key] = v

                # Remove None values
                ocsf_event = {k: v for k, v in ocsf_event.items() if v is not None}

                events.append(ocsf_event)

            except Exception as e:
                continue

        return events

    def _extract_attributes(self, attrs_list):
        """Extract OTel attributes from list format to dict."""
        result = {}
        for attr in attrs_list:
            key = attr.get('key', '')
            value = attr.get('value', {})
            if 'stringValue' in value:
                result[key] = value['stringValue']
            elif 'intValue' in value:
                result[key] = int(value['intValue'])
            elif 'doubleValue' in value:
                result[key] = float(value['doubleValue'])
            elif 'boolValue' in value:
                result[key] = value['boolValue']
        return result

    def _save_to_parquet(self, events, output_path, signal_type):
        """Save events to parquet file."""
        if not events:
            print(f"Warning: No {signal_type} events to save")
            return None

        Path(output_path).parent.mkdir(parents=True, exist_ok=True)
        df = pd.DataFrame(events)
        df.to_parquet(output_path, compression='snappy')
        print(f"Saved {len(df)} {signal_type} events to {output_path}")
        print(f"  Columns: {len(df.columns)}")
        return df


def main():
    parser = argparse.ArgumentParser(
        description='Convert OpenTelemetry JSONL exports to OCSF format',
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Convert all signals
  python scripts/convert_otel_to_ocsf.py

  # Convert specific signal
  python scripts/convert_otel_to_ocsf.py --signal logs
  python scripts/convert_otel_to_ocsf.py --signal traces
  python scripts/convert_otel_to_ocsf.py --signal metrics

  # Custom input/output directories
  python scripts/convert_otel_to_ocsf.py --input-dir ./logs/otel --output-dir ./data
        """
    )
    parser.add_argument('--signal', choices=['logs', 'traces', 'metrics', 'all'],
                        default='all', help='Which signal to convert (default: all)')
    parser.add_argument('--input-dir', default='./logs/otel',
                        help='Directory containing OTel JSONL files (default: ./logs/otel)')
    parser.add_argument('--output-dir', default='./data',
                        help='Output directory for parquet files (default: ./data)')

    args = parser.parse_args()

    converter = OTelToOCSFConverter()
    results = {}

    if args.signal in ['logs', 'all']:
        logs_input = os.path.join(args.input_dir, 'logs.jsonl')
        logs_output = os.path.join(args.output_dir, 'ocsf_logs.parquet')
        print(f"\nConverting logs: {logs_input} -> {logs_output}")
        results['logs'] = converter.convert_logs(logs_input, logs_output)

    if args.signal in ['traces', 'all']:
        traces_input = os.path.join(args.input_dir, 'traces.jsonl')
        traces_output = os.path.join(args.output_dir, 'ocsf_traces.parquet')
        print(f"\nConverting traces: {traces_input} -> {traces_output}")
        results['traces'] = converter.convert_traces(traces_input, traces_output)

    if args.signal in ['metrics', 'all']:
        metrics_input = os.path.join(args.input_dir, 'metrics.jsonl')
        metrics_output = os.path.join(args.output_dir, 'ocsf_metrics.parquet')
        print(f"\nConverting metrics: {metrics_input} -> {metrics_output}")
        results['metrics'] = converter.convert_metrics(metrics_input, metrics_output)

    # Summary
    print("\n" + "=" * 50)
    print("Conversion Summary:")
    for signal, df in results.items():
        if df is not None:
            print(f"  {signal}: {len(df)} events, {len(df.columns)} columns")
        else:
            print(f"  {signal}: No data (file may not exist yet)")


if __name__ == '__main__':
    main()

Why unified conversion?


Section 5: Running the Data Generation Pipeline

Quick Start

# 1. Download and extract the code (or use the zip from this appendix)
cd appendix-code

# 2. Create required directories (with write permissions for containers)
# Note: If you previously ran docker compose, the directory may be root-owned
sudo rm -rf ./logs/otel 2>/dev/null || true
mkdir -p ./logs/otel ./data
chmod 777 ./logs/otel

# 3. Build services (ensures latest code)
docker compose build

# 4. Start all services
docker compose up -d

# 5. Verify services are running
docker compose ps

# 6. Let the load generator run for a while (e.g., 5-10 minutes for demo, 2 hours for full dataset)
# The load-generator service automatically sends traffic to web-api
# All telemetry flows through OpenTelemetry Collector to ./logs/otel/

# 7. Convert all OpenTelemetry data to OCSF format (single command):
python scripts/convert_otel_to_ocsf.py

# 8. Output files (ready for Parts 2-3 of tutorial):
# - data/ocsf_logs.parquet (application logs in OCSF format)
# - data/ocsf_traces.parquet (distributed traces in OCSF format)
# - data/ocsf_metrics.parquet (system metrics in OCSF format)

Prerequisites

Before running, ensure you have:

Stopping Services

# Stop all services
docker compose down

# Stop and remove volumes (clears all data)
docker compose down -v

Generated Dataset Statistics

After 2 hours of generation, you’ll have:

Important: The generated data is unlabeled - anomalies occur naturally in the traffic patterns without explicit labels. This matches the series approach:


Section 6: Using Generated Data in Tutorial (No Labels Needed)

Now you can use the generated data in Part 2 (TabularResNet) and Part 3 (Feature Engineering). No labeling required - the self-supervised and unsupervised methods work on raw observability data.

# In Part 3: Feature Engineering
import pandas as pd

# Load your generated OCSF data (unlabeled)
ocsf_df = pd.read_parquet('data/ocsf_logs.parquet')

# Now follow Part 3 to extract features
categorical_features = ['service', 'http_method', 'status_id']
numerical_features = ['duration', 'time', 'severity_id']

# Continue with feature engineering from Part 3...

Optional: Generating Labels for Method Evaluation

If you want to evaluate different anomaly detection methods (Part 6, Section 7: Method Comparison), you can optionally label a small subset for validation:

scripts/label_subset_for_evaluation.py

label_subset_for_evaluation.py
#!/usr/bin/env python3
"""
Label a small subset of OCSF events for evaluating anomaly detection methods.

This is OPTIONAL - only needed if you want to compare method performance.
The main training (Part 4) doesn't need labels - it uses self-supervised learning.

Usage:
    python scripts/label_subset_for_evaluation.py
"""

import os
import sys
import pandas as pd


def label_evaluation_subset(ocsf_events_path, sample_size=1000):
    """
    Label a small subset for evaluating anomaly detection methods.

    Args:
        ocsf_events_path: Path to OCSF Parquet file
        sample_size: How many events to label for evaluation

    Returns:
        Small labeled DataFrame for validation
    """
    # Check if input file exists
    if not os.path.exists(ocsf_events_path):
        print(f"Input file not found: {ocsf_events_path}")
        print()
        print("Please run the OCSF conversion first:")
        print("  python scripts/convert_otel_to_ocsf.py")
        return None

    df = pd.read_parquet(ocsf_events_path)

    if len(df) == 0:
        print(f"No events found in {ocsf_events_path}")
        print("Make sure docker compose services are running and generating logs.")
        return None

    print(f"Loaded {len(df)} events with columns: {list(df.columns)}")

    # Sample a subset
    eval_df = df.sample(n=min(sample_size, len(df)), random_state=42).copy()

    # Initialize anomaly labels
    eval_df['is_anomaly'] = 0
    eval_df['anomaly_reason'] = ''

    anomaly_count = 0

    # Label based on available columns

    # 1. Check for explicit anomaly field (from new structured logs)
    if 'anomaly_type' in eval_df.columns:
        mask = eval_df['anomaly_type'].notna()
        eval_df.loc[mask, 'is_anomaly'] = 1
        eval_df.loc[mask, 'anomaly_reason'] = 'explicit_anomaly_' + eval_df.loc[mask, 'anomaly_type'].astype(str)
        anomaly_count += mask.sum()
        print(f"  Found {mask.sum()} events with explicit anomaly markers")

    # 2. High latency (if duration field exists)
    if 'duration' in eval_df.columns:
        # Duration might be in ms
        duration_threshold = 2000  # 2 seconds in ms
        mask = eval_df['duration'].notna() & (eval_df['duration'] > duration_threshold)
        eval_df.loc[mask, 'is_anomaly'] = 1
        eval_df.loc[mask & (eval_df['anomaly_reason'] == ''), 'anomaly_reason'] = 'high_latency'
        new_anomalies = mask.sum() - (eval_df.loc[mask, 'anomaly_reason'].str.contains('explicit', na=False)).sum()
        if new_anomalies > 0:
            print(f"  Found {new_anomalies} events with high latency (>{duration_threshold}ms)")

    # 3. HTTP response latency (alternative duration field)
    if 'http_response_latency' in eval_df.columns:
        mask = eval_df['http_response_latency'].notna() & (eval_df['http_response_latency'] > 2000)
        eval_df.loc[mask, 'is_anomaly'] = 1
        eval_df.loc[mask & (eval_df['anomaly_reason'] == ''), 'anomaly_reason'] = 'high_response_latency'

    # 4. Error status (failures)
    if 'status_id' in eval_df.columns:
        mask = eval_df['status_id'] == 2  # Failure
        eval_df.loc[mask, 'is_anomaly'] = 1
        eval_df.loc[mask & (eval_df['anomaly_reason'] == ''), 'anomaly_reason'] = 'status_failure'
        print(f"  Found {mask.sum()} events with failure status")

    # 5. HTTP error codes (4xx, 5xx)
    if 'http_response_code' in eval_df.columns:
        mask = eval_df['http_response_code'].notna() & (eval_df['http_response_code'] >= 400)
        eval_df.loc[mask, 'is_anomaly'] = 1
        eval_df.loc[mask & (eval_df['anomaly_reason'] == ''), 'anomaly_reason'] = 'http_error'

    # 6. High severity (ERROR, CRITICAL)
    if 'severity_id' in eval_df.columns:
        mask = eval_df['severity_id'] >= 4  # ERROR or higher
        eval_df.loc[mask, 'is_anomaly'] = 1
        eval_df.loc[mask & (eval_df['anomaly_reason'] == ''), 'anomaly_reason'] = 'high_severity'
        print(f"  Found {mask.sum()} events with high severity (ERROR+)")

    # 7. Check message content for specific anomaly keywords (not generic "error")
    # Only check INFO-level messages to avoid double-counting ERROR logs
    if 'message' in eval_df.columns:
        # Use specific keywords that indicate anomalies, not generic "error"
        anomaly_keywords = ['timeout', 'leak', 'storm', 'exhausted', 'overflow', 'oom', 'killed']
        info_mask = eval_df['severity_id'] <= 2 if 'severity_id' in eval_df.columns else True
        for keyword in anomaly_keywords:
            mask = info_mask & eval_df['message'].str.lower().str.contains(keyword, na=False)
            new_anomalies = mask & (eval_df['is_anomaly'] == 0)
            eval_df.loc[mask, 'is_anomaly'] = 1
            eval_df.loc[new_anomalies & (eval_df['anomaly_reason'] == ''), 'anomaly_reason'] = f'keyword_{keyword}'

    # Summary
    total_anomalies = eval_df['is_anomaly'].sum()
    print(f"\nLabeled {total_anomalies} / {len(eval_df)} events as anomalies for evaluation")
    print(f"Anomaly rate: {total_anomalies/len(eval_df)*100:.2f}%")

    if total_anomalies > 0:
        print(f"\nAnomaly reasons:")
        reason_counts = eval_df[eval_df['is_anomaly'] == 1]['anomaly_reason'].value_counts()
        for reason, count in reason_counts.items():
            print(f"  {reason}: {count}")

    # Save small evaluation set
    output_path = 'data/ocsf_eval_subset.parquet'
    eval_df.to_parquet(output_path)
    print(f"\nSaved evaluation subset to {output_path}")

    return eval_df


if __name__ == '__main__':
    # Generate small labeled subset for Part 6 evaluation (optional)
    result = label_evaluation_subset('data/ocsf_logs.parquet', sample_size=1000)
    if result is None:
        sys.exit(1)

When to use labels:


Section 7: Releasing as Open Source

README.md for the repository

README.md
# OCSF Training Data Generator

Generate realistic observability data in OCSF format for **self-supervised** anomaly detection training.

## Architecture

All telemetry flows through **OpenTelemetry Collector**:
- **Logs**: Services -> OTLP -> OTel Collector -> logs.jsonl
- **Traces**: Services -> OTLP -> OTel Collector -> traces.jsonl
- **Metrics**: OTel Collector scrapes /metrics -> metrics.jsonl

## What This Generates

- Realistic observability data from a multi-service application
- Normal traffic + anomaly scenarios (unlabeled)
- **Logs**, **metrics**, and **traces** in OCSF format
- No labels required - use with self-supervised learning

## Quick Start

```bash
# 1. Create required directories (with write permissions for containers)
# Note: If you previously ran docker compose, the directory may be root-owned
sudo rm -rf ./logs/otel 2>/dev/null || true
mkdir -p ./logs/otel ./data
chmod 777 ./logs/otel

# 2. Build services (ensures latest code)
docker compose build

# 3. Start all services
docker compose up -d

# 4. Let the load generator run (5-10 min for demo, 2 hours for full dataset)
# Traffic is automatically generated

# 5. Convert all OpenTelemetry data to OCSF format:
python scripts/convert_otel_to_ocsf.py

# 6. (Optional) Generate small labeled subset for evaluation
python scripts/label_subset_for_evaluation.py
```

## Prerequisites

- Docker 20.10+ with Compose plugin (uses `docker compose`, not `docker-compose`)
- Python 3.8+ with: `pip install pandas pyarrow`

## Output Datasets

After running the conversion script:

- `data/ocsf_logs.parquet` - Application logs in OCSF format (unlabeled)
- `data/ocsf_traces.parquet` - Distributed traces in OCSF format (unlabeled)
- `data/ocsf_metrics.parquet` - System metrics in OCSF format (unlabeled)
- `data/ocsf_eval_subset.parquet` - Small labeled subset for evaluation (optional)

## Stack Components

| Service | Role | Telemetry |
|---------|------|-----------|
| **web-api** | Flask service with observability | Logs + traces via OTLP, metrics via /metrics |
| **auth-service** | Node.js authentication service | Logs via stdout |
| **payment-worker** | Background job processor | Logs via stdout |
| **postgres** | Database | - |
| **redis** | Cache | - |
| **otel-collector** | Unified telemetry hub | Exports all signals to JSONL |
| **load-generator** | Traffic patterns with anomalies | - |

## Conversion Script

| Script | Input | Output |
|--------|-------|--------|
| `convert_otel_to_ocsf.py` | `logs/otel/*.jsonl` | `data/ocsf_*.parquet` |

Convert specific signals:
```bash
python scripts/convert_otel_to_ocsf.py --signal logs
python scripts/convert_otel_to_ocsf.py --signal traces
python scripts/convert_otel_to_ocsf.py --signal metrics
```

## Use Cases

- Self-supervised training with TabularResNet (Part 4)
- Unsupervised anomaly detection (Part 6: LOF, Isolation Forest, k-NN)
- Testing observability systems
- Learning OCSF schema

## Important: No Labels Needed

This generator creates **unlabeled** data that works with:
- **Self-supervised learning** (Part 4: contrastive learning, masked prediction)
- **Unsupervised anomaly detection** (Part 6: LOF, Isolation Forest, k-NN)

Labels are only needed for evaluation/comparison (Part 6, Section 7), not training.

## Stopping Services

```bash
docker compose down      # Stop services
docker compose down -v   # Stop and remove volumes
```

## License

MIT - Free to use for commercial and non-commercial purposes

Summary

This appendix provides a complete, open-source solution for generating realistic unlabeled observability data for self-supervised learning:

  1. Docker Compose stack: Multi-service application with unified OpenTelemetry instrumentation

  2. Instrumented services: Web API, auth service, payment worker emitting telemetry via OTLP

  3. OpenTelemetry Collector: Central hub receiving logs/traces via OTLP and scraping metrics

  4. Load generator: Creates normal traffic + anomaly scenarios (naturally occurring, unlabeled)

  5. Unified OCSF converter: Single script converts all OTel exports to OCSF-formatted Parquet:

    • data/ocsf_logs.parquet - Application logs (via OTLP)

    • data/ocsf_traces.parquet - Distributed traces (via OTLP)

    • data/ocsf_metrics.parquet - System metrics (scraped by OTel)

  6. Optional evaluation labels: Small labeled subset for comparing detection methods (Part 6, Section 7 only)

Key difference from supervised learning:

To use in the tutorial series: Links from Part 3 (Feature Engineering) and Part 4 (Self-Supervised Training) point to this appendix for readers who need training data.