Problem: You want to follow this tutorial series but don’t have OCSF observability data to train on.
Solution: This appendix shows how to generate realistic observability data (logs, metrics, traces, config changes) using Docker Compose to spin up actual infrastructure, then instrument it to produce training data you can release as open source.
Overview: Synthetic Observability Stack¶
We’ll create a realistic microservices environment that generates:
Logs: Application errors, access logs, system events
Metrics: CPU, memory, latency, throughput
Traces: Distributed request spans
Config Changes: Deployments, feature flags, scaling events
Architecture:
Diagram explanation:
Solid lines: Data/service dependencies
Dotted lines: Telemetry data flow (OTLP protocol)
Application Services (blue): Multi-service architecture generating realistic traffic
OpenTelemetry Collector (yellow): Unified telemetry hub - receives logs/traces via OTLP, scrapes metrics from /metrics endpoints
Load Generator (pink): Creates normal traffic + anomaly scenarios
OCSF Output (red): Raw JSONL files exported by OTel collector
Parquet Files (green): Single Python script converts all signals to OCSF parquet format
Download Code Files¶
All code files from this appendix are available for download:
The zip contains a complete, runnable stack:
README.md- Quick start guide and documentationdocker-compose.yml- Infrastructure configurationservices/web-api/- Flask service with observability (app.py, Dockerfile, requirements.txt)services/load-generator/- Traffic generator (generate_load.py, Dockerfile, requirements.txt)services/auth-service/- Node.js auth service (app.js, Dockerfile, package.json)services/payment-worker/- Celery worker (worker.py, Dockerfile, requirements.txt)config/- Prometheus, OpenTelemetry, and Fluentd configurationsscripts/- OCSF converters for logs, traces, and metrics, plus optional labeling script
Section 1: Docker Compose Infrastructure¶
docker-compose.yml¶
Create a realistic multi-service application with built-in observability:
# Unified OpenTelemetry Architecture:
# - Logs: Services -> OTLP -> OTel Collector -> logs.jsonl
# - Traces: Services -> OTLP -> OTel Collector -> traces.jsonl
# - Metrics: OTel Collector scrapes /metrics -> metrics.jsonl
#
# All three signals exported to ./logs/otel/ as JSONL files
services:
# Application Services
web-api:
build: ./services/web-api
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://postgres:password@postgres:5432/appdb
- REDIS_URL=redis://redis:6379
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
depends_on:
- postgres
- redis
- auth-service
labels:
- "monitoring=enabled"
auth-service:
build: ./services/auth-service
ports:
- "8001:8001"
environment:
- LDAP_URL=ldap://openldap:389
labels:
- "monitoring=enabled"
payment-worker:
build: ./services/payment-worker
environment:
- CELERY_BROKER_URL=redis://redis:6379
- DATABASE_URL=postgresql://postgres:password@postgres:5432/appdb
depends_on:
- redis
- postgres
labels:
- "monitoring=enabled"
# Data Stores
postgres:
image: postgres:15
environment:
- POSTGRES_PASSWORD=password
- POSTGRES_DB=appdb
volumes:
- postgres_data:/var/lib/postgresql/data
labels:
- "monitoring=enabled"
redis:
image: redis:7-alpine
labels:
- "monitoring=enabled"
# Observability Stack
prometheus:
image: prom/prometheus:latest
volumes:
- ./config/prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
ports:
- "9090:9090"
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
# Unified telemetry collector - receives logs/traces via OTLP, scrapes metrics
otel-collector:
image: otel/opentelemetry-collector-contrib:latest
user: "1000:1000" # Run as non-root so exported files are readable
volumes:
- ./config/otel-collector-config.yml:/etc/otel/config.yml
- ./logs/otel:/var/log/otel
ports:
- "4317:4317" # OTLP gRPC (logs, traces)
- "4318:4318" # OTLP HTTP (logs, traces)
- "8888:8888" # Collector metrics
command: ["--config=/etc/otel/config.yml"]
depends_on:
- web-api
fluentd:
image: fluent/fluentd:latest
volumes:
- ./config/fluentd.conf:/fluentd/etc/fluent.conf
- ./logs:/var/log/fluentd
ports:
- "24224:24224"
- "24224:24224/udp"
# Load Generator (creates traffic patterns)
load-generator:
build: ./services/load-generator
depends_on:
- web-api
environment:
- TARGET_URL=http://web-api:8000
- NORMAL_RPS=10
- ANOMALY_PROBABILITY=0.05
volumes:
postgres_data:
prometheus_data:
Configuration Files¶
The observability stack requires these configuration files:
config/prometheus.yml - Metrics collection:
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']
- job_name: 'web-api'
static_configs:
- targets: ['web-api:8000']
metrics_path: '/metrics'
- job_name: 'postgres'
static_configs:
- targets: ['postgres:5432']
- job_name: 'redis'
static_configs:
- targets: ['redis:6379']
config/otel-collector-config.yml - Distributed tracing:
receivers:
# OTLP receiver for logs and traces from applications
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
# Prometheus receiver to scrape metrics from services
prometheus:
config:
scrape_configs:
- job_name: 'web-api'
scrape_interval: 15s
static_configs:
- targets: ['web-api:8000']
- job_name: 'otel-collector'
scrape_interval: 15s
static_configs:
- targets: ['localhost:8888']
processors:
batch:
timeout: 1s
send_batch_size: 1024
# Add resource attributes
resource:
attributes:
- key: deployment.environment
value: demo
action: upsert
exporters:
debug:
verbosity: basic
# File exporter for traces
file/traces:
path: /var/log/otel/traces.jsonl
rotation:
max_megabytes: 100
max_days: 1
max_backups: 3
# File exporter for logs
file/logs:
path: /var/log/otel/logs.jsonl
rotation:
max_megabytes: 100
max_days: 1
max_backups: 3
# File exporter for metrics
file/metrics:
path: /var/log/otel/metrics.jsonl
rotation:
max_megabytes: 100
max_days: 1
max_backups: 3
service:
# Enable telemetry for the collector itself
telemetry:
metrics:
readers:
- pull:
exporter:
prometheus:
host: 0.0.0.0
port: 8888
pipelines:
# Traces pipeline: OTLP -> file
traces:
receivers: [otlp]
processors: [batch, resource]
exporters: [debug, file/traces]
# Logs pipeline: OTLP -> file
logs:
receivers: [otlp]
processors: [batch, resource]
exporters: [debug, file/logs]
# Metrics pipeline: Prometheus scraping -> file
metrics:
receivers: [prometheus]
processors: [batch, resource]
exporters: [debug, file/metrics]
config/fluentd.conf - Log aggregation:
<source>
@type forward
port 24224
bind 0.0.0.0
</source>
<filter **>
@type record_transformer
<record>
hostname "#{Socket.gethostname}"
timestamp ${time}
</record>
</filter>
<match **>
@type file
path /var/log/fluentd/app
append true
<format>
@type json
</format>
<buffer>
timekey 1h
timekey_wait 10m
timekey_use_utc true
</buffer>
</match>
Section 2: Instrumented Services¶
Web API Service¶
services/web-api/app.py - Flask service with comprehensive observability:
from flask import Flask, request, jsonify, Response
import time
import random
import logging
import json
import os
import socket
import uuid
from datetime import datetime
from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST
app = Flask(__name__)
# Service identity
SERVICE_NAME = "web-api"
SERVICE_VERSION = "1.0.0"
HOSTNAME = socket.gethostname()
# Simulated users for realistic data
SIMULATED_USERS = [
{"uid": "user-1001", "name": "alice.johnson", "email": "alice@company.com", "department": "engineering"},
{"uid": "user-1002", "name": "bob.smith", "email": "bob@company.com", "department": "sales"},
{"uid": "user-1003", "name": "carol.williams", "email": "carol@company.com", "department": "support"},
{"uid": "user-1004", "name": "david.brown", "email": "david@company.com", "department": "marketing"},
{"uid": "user-1005", "name": "eve.davis", "email": "eve@company.com", "department": "engineering"},
]
# OpenTelemetry setup for unified observability (logs, traces, metrics via OTLP)
otel_logger = None
tracer = None
otlp_endpoint = os.getenv('OTEL_EXPORTER_OTLP_ENDPOINT', 'http://otel-collector:4317')
try:
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource, SERVICE_NAME as RESOURCE_SERVICE_NAME
from opentelemetry._logs import set_logger_provider
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
# Create shared resource
resource = Resource.create({
RESOURCE_SERVICE_NAME: SERVICE_NAME,
"service.version": SERVICE_VERSION,
"host.name": HOSTNAME,
})
# Set up tracing
trace.set_tracer_provider(TracerProvider(resource=resource))
tracer = trace.get_tracer(__name__)
otlp_trace_exporter = OTLPSpanExporter(endpoint=otlp_endpoint, insecure=True)
trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(otlp_trace_exporter))
# Set up logging via OTLP
logger_provider = LoggerProvider(resource=resource)
set_logger_provider(logger_provider)
otlp_log_exporter = OTLPLogExporter(endpoint=otlp_endpoint, insecure=True)
logger_provider.add_log_record_processor(BatchLogRecordProcessor(otlp_log_exporter))
# Create OTel logging handler
otel_handler = LoggingHandler(level=logging.INFO, logger_provider=logger_provider)
otel_logger = logging.getLogger("otel." + SERVICE_NAME)
otel_logger.setLevel(logging.INFO)
otel_logger.addHandler(otel_handler)
print(f"[{SERVICE_NAME}] OpenTelemetry enabled: logs and traces via OTLP to {otlp_endpoint}")
except Exception as e:
print(f"[{SERVICE_NAME}] OpenTelemetry setup failed: {e}")
otel_logger = None
tracer = None
class StructuredLogger:
"""Logger that emits OCSF-ready structured JSON logs via OpenTelemetry."""
def __init__(self, service_name):
self.service_name = service_name
# Fallback to stdout if OTel unavailable
self.fallback_logger = logging.getLogger(service_name + ".fallback")
self.fallback_logger.setLevel(logging.INFO)
self.fallback_logger.handlers = []
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(message)s'))
self.fallback_logger.addHandler(handler)
self.fallback_logger.propagate = False
def _emit(self, level, message, **kwargs):
"""Emit a structured log entry with OCSF-compatible fields."""
timestamp = datetime.utcnow().isoformat() + "Z"
log_entry = {
"timestamp": timestamp,
"time": int(time.time() * 1000),
"service": self.service_name,
"level": level,
"message": message,
"metadata": {
"version": "1.0.0",
"product": {
"name": self.service_name,
"version": SERVICE_VERSION,
"vendor_name": "Demo"
}
},
"device": {
"hostname": HOSTNAME,
"type": "server",
"type_id": 1
}
}
# Add optional OCSF fields
for key, value in kwargs.items():
if value is not None:
log_entry[key] = value
# Log via OpenTelemetry if available (JSON in message body)
if otel_logger:
log_method = getattr(otel_logger, level.lower(), otel_logger.info)
log_method(json.dumps(log_entry))
# Always log to stdout for Docker capture (backup)
self.fallback_logger.info(json.dumps(log_entry))
def info(self, message, **kwargs):
self._emit("INFO", message, **kwargs)
def warning(self, message, **kwargs):
self._emit("WARNING", message, **kwargs)
def error(self, message, **kwargs):
self._emit("ERROR", message, **kwargs)
def debug(self, message, **kwargs):
self._emit("DEBUG", message, **kwargs)
# Initialize structured logger
logger = StructuredLogger(SERVICE_NAME)
# Disable verbose werkzeug logging
logging.getLogger('werkzeug').setLevel(logging.WARNING)
# Set up metrics
request_count = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'])
request_duration = Histogram('http_request_duration_seconds', 'HTTP request duration', ['method', 'endpoint'])
db_query_duration = Histogram('db_query_duration_seconds', 'Database query duration', ['query_type'])
# Database and cache connections (lazy initialization)
db_conn = None
cache = None
def get_db():
global db_conn
if db_conn is None:
try:
import psycopg2
db_url = os.getenv('DATABASE_URL', 'postgresql://postgres:password@postgres:5432/appdb')
db_conn = psycopg2.connect(db_url)
logger.info("Connected to PostgreSQL")
except Exception as e:
logger.warning(f"PostgreSQL unavailable: {e}")
return db_conn
def get_cache():
global cache
if cache is None:
try:
import redis as redis_lib
redis_url = os.getenv('REDIS_URL', 'redis://redis:6379')
cache = redis_lib.from_url(redis_url, decode_responses=True)
cache.ping()
logger.info("Connected to Redis")
except Exception as e:
logger.warning(f"Redis unavailable: {e}")
return cache
def get_request_context():
"""Extract request context for structured logging."""
# Simulate user from request (in production, this would come from auth)
user = random.choice(SIMULATED_USERS)
session_id = f"sess-{uuid.uuid4().hex[:12]}"
# Get client IP (may be forwarded)
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
if client_ip and ',' in client_ip:
client_ip = client_ip.split(',')[0].strip()
return {
"actor": {
"user": {
"uid": user["uid"],
"name": user["name"],
"email": user["email"]
},
"session": {
"uid": session_id,
"created_time": int(time.time() * 1000)
}
},
"src_endpoint": {
"ip": client_ip or "192.168.1.100",
"port": random.randint(30000, 65000),
"domain": request.headers.get('Host', 'unknown')
},
"dst_endpoint": {
"ip": "10.0.0.1",
"port": 8000,
"svc_name": SERVICE_NAME
},
"http_request": {
"method": request.method,
"url": {
"path": request.path,
"query_string": request.query_string.decode() if request.query_string else "",
"scheme": request.scheme,
"hostname": request.host
},
"user_agent": request.headers.get('User-Agent', 'unknown'),
"http_headers": dict(list(request.headers)[:5]) # First 5 headers
}
}
def log_api_activity(message, status_code, duration_ms, trace_id=None, activity_id=1, **extra):
"""Log an API activity event with full OCSF context."""
ctx = get_request_context()
# Determine status
if status_code >= 500:
status_id = 2 # Failure
severity_id = 4 # Error
elif status_code >= 400:
status_id = 2 # Failure
severity_id = 3 # Warning
else:
status_id = 1 # Success
severity_id = 2 # Info
# Calculate type_uid: class_uid * 100 + activity_id
class_uid = 6003 # API Activity
type_uid = class_uid * 100 + activity_id
logger.info(
message,
class_uid=class_uid,
class_name="API Activity",
category_uid=6,
category_name="Application Activity",
activity_id=activity_id,
activity_name=["Unknown", "Create", "Read", "Update", "Delete"][min(activity_id, 4)],
type_uid=type_uid,
severity_id=severity_id,
status_id=status_id,
status_code=str(status_code),
status=["Unknown", "Success", "Failure"][status_id],
duration=duration_ms,
trace_id=str(trace_id) if trace_id else None,
actor=ctx["actor"],
src_endpoint=ctx["src_endpoint"],
dst_endpoint=ctx["dst_endpoint"],
http_request=ctx["http_request"],
http_response={
"code": status_code,
"status": "OK" if status_code < 400 else "Error",
"latency": duration_ms
},
**extra
)
@app.route('/api/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
"""Fetch user data - normal operation."""
start_time = time.time()
trace_id = None
# Start span if tracing available
span = None
if tracer:
span = tracer.start_span("get_user")
span.set_attribute("user.id", user_id)
trace_id = span.get_span_context().trace_id
try:
# Check cache first
redis_cache = get_cache()
if redis_cache:
try:
cached = redis_cache.get(f"user:{user_id}")
if cached:
duration_ms = (time.time() - start_time) * 1000
request_count.labels(method='GET', endpoint='/api/users', status=200).inc()
request_duration.labels(method='GET', endpoint='/api/users').observe(time.time() - start_time)
log_api_activity(
f"Cache hit for user {user_id}",
status_code=200,
duration_ms=duration_ms,
trace_id=trace_id,
activity_id=2, # Read
resources=[{"type": "user", "uid": str(user_id), "data": {"source": "cache"}}]
)
return jsonify({"user_id": user_id, "source": "cache"})
except Exception as e:
logger.warning(f"Cache error: {e}")
# Query database
conn = get_db()
if conn:
try:
db_start = time.time()
cursor = conn.cursor()
cursor.execute("SELECT id FROM users WHERE id = %s", (user_id,))
result = cursor.fetchone()
db_query_duration.labels(query_type='SELECT').observe(time.time() - db_start)
if result:
if redis_cache:
redis_cache.setex(f"user:{user_id}", 300, str(result))
duration_ms = (time.time() - start_time) * 1000
request_count.labels(method='GET', endpoint='/api/users', status=200).inc()
request_duration.labels(method='GET', endpoint='/api/users').observe(time.time() - start_time)
log_api_activity(
f"User {user_id} fetched from database",
status_code=200,
duration_ms=duration_ms,
trace_id=trace_id,
activity_id=2, # Read
resources=[{"type": "user", "uid": str(user_id), "data": {"source": "database"}}]
)
return jsonify({"user_id": user_id, "source": "database"})
except Exception as e:
logger.warning(f"Database error: {e}")
# User not found or DB unavailable - return mock data for demo
duration_ms = (time.time() - start_time) * 1000
request_count.labels(method='GET', endpoint='/api/users', status=200).inc()
request_duration.labels(method='GET', endpoint='/api/users').observe(time.time() - start_time)
log_api_activity(
f"Returning mock user {user_id}",
status_code=200,
duration_ms=duration_ms,
trace_id=trace_id,
activity_id=2, # Read
resources=[{"type": "user", "uid": str(user_id), "data": {"source": "mock"}}]
)
return jsonify({"user_id": user_id, "source": "mock", "name": f"User {user_id}"})
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
request_count.labels(method='GET', endpoint='/api/users', status=500).inc()
log_api_activity(
f"Error fetching user {user_id}: {str(e)}",
status_code=500,
duration_ms=duration_ms,
trace_id=trace_id,
activity_id=2,
error={"message": str(e), "type": type(e).__name__}
)
return jsonify({"error": "Internal server error"}), 500
finally:
if span:
span.end()
@app.route('/api/checkout', methods=['POST'])
def checkout():
"""Checkout operation - can trigger anomalies."""
start_time = time.time()
trace_id = None
span = None
if tracer:
span = tracer.start_span("checkout")
trace_id = span.get_span_context().trace_id
try:
# Simulate anomalies based on environment variable
anomaly_prob = float(os.getenv('ANOMALY_PROBABILITY', 0.05))
if random.random() < anomaly_prob:
# ANOMALY: Simulate various failure modes
anomaly_type = random.choice(['db_timeout', 'memory_leak', 'slow_query', 'cache_miss_storm'])
if anomaly_type == 'db_timeout':
time.sleep(5) # Simulate slow query
duration_ms = (time.time() - start_time) * 1000
request_count.labels(method='POST', endpoint='/api/checkout', status=504).inc()
log_api_activity(
"Database timeout during checkout",
status_code=504,
duration_ms=duration_ms,
trace_id=trace_id,
activity_id=1, # Create
anomaly={"type": "db_timeout", "severity": "high"},
error={"message": "Database timeout", "type": "TimeoutError"}
)
return jsonify({"error": "Database timeout"}), 504
elif anomaly_type == 'memory_leak':
# Simulate memory leak by holding large objects
leak = ["x" * 1000000 for _ in range(100)] # 100MB allocation
duration_ms = (time.time() - start_time) * 1000
log_api_activity(
"High memory allocation during checkout",
status_code=200,
duration_ms=duration_ms,
trace_id=trace_id,
activity_id=1,
anomaly={"type": "memory_leak", "severity": "medium", "memory_mb": 100}
)
elif anomaly_type == 'slow_query':
time.sleep(random.uniform(2, 5))
duration_ms = (time.time() - start_time) * 1000
log_api_activity(
f"Slow checkout processing: {duration_ms:.0f}ms",
status_code=200,
duration_ms=duration_ms,
trace_id=trace_id,
activity_id=1,
anomaly={"type": "slow_query", "severity": "medium", "threshold_ms": 2000}
)
elif anomaly_type == 'cache_miss_storm':
# Simulate cache invalidation causing DB overload
redis_cache = get_cache()
if redis_cache:
for i in range(50):
redis_cache.delete(f"user:{i}")
duration_ms = (time.time() - start_time) * 1000
log_api_activity(
"Cache miss storm detected",
status_code=200,
duration_ms=duration_ms,
trace_id=trace_id,
activity_id=1,
anomaly={"type": "cache_miss_storm", "severity": "high", "invalidated_keys": 50}
)
# Normal checkout flow
duration_ms = (time.time() - start_time) * 1000
request_count.labels(method='POST', endpoint='/api/checkout', status=200).inc()
request_duration.labels(method='POST', endpoint='/api/checkout').observe(time.time() - start_time)
log_api_activity(
"Checkout completed successfully",
status_code=200,
duration_ms=duration_ms,
trace_id=trace_id,
activity_id=1, # Create
resources=[{"type": "order", "uid": f"order-{uuid.uuid4().hex[:8]}"}]
)
return jsonify({"status": "success"})
finally:
if span:
span.end()
@app.route('/api/search', methods=['GET'])
def search():
"""Search endpoint for load generator."""
start_time = time.time()
query = request.args.get('q', '')
# Simulate search delay
time.sleep(random.uniform(0.01, 0.1))
duration_ms = (time.time() - start_time) * 1000
request_count.labels(method='GET', endpoint='/api/search', status=200).inc()
request_duration.labels(method='GET', endpoint='/api/search').observe(time.time() - start_time)
log_api_activity(
f"Search completed for query: {query}",
status_code=200,
duration_ms=duration_ms,
activity_id=2, # Read
resources=[{"type": "search", "data": {"query": query, "results_count": 0}}]
)
return jsonify({"results": [], "query": query})
@app.route('/metrics')
def metrics():
"""Prometheus metrics endpoint."""
return Response(generate_latest(), mimetype=CONTENT_TYPE_LATEST)
@app.route('/health')
def health():
"""Health check endpoint."""
return jsonify({"status": "healthy"})
if __name__ == '__main__':
logger.info("Starting web-api service on port 8000")
app.run(host='0.0.0.0', port=8000, threaded=True)
services/web-api/Dockerfile:
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY app.py .
EXPOSE 8000
CMD ["python", "app.py"]
services/web-api/requirements.txt:
flask==3.0.0
prometheus-client==0.19.0
opentelemetry-api==1.22.0
opentelemetry-sdk==1.22.0
opentelemetry-exporter-otlp==1.22.0
psycopg2-binary==2.9.9
redis==5.0.1
Auth Service¶
services/auth-service/app.js - Node.js authentication service:
const express = require('express');
const app = express();
app.use(express.json());
// Simple auth endpoint for demo purposes
app.post('/auth/validate', (req, res) => {
const { token } = req.body;
// Simulate auth validation with some latency
const latency = Math.random() * 50 + 10;
setTimeout(() => {
console.log(JSON.stringify({
timestamp: new Date().toISOString(),
service: 'auth-service',
level: 'INFO',
message: `Token validation request`,
duration_ms: latency
}));
res.json({ valid: true, user_id: Math.floor(Math.random() * 1000) });
}, latency);
});
app.get('/health', (req, res) => {
res.json({ status: 'healthy' });
});
const PORT = process.env.PORT || 8001;
app.listen(PORT, () => {
console.log(`Auth service listening on port ${PORT}`);
});
services/auth-service/Dockerfile:
FROM node:20-slim
WORKDIR /app
COPY package.json .
RUN npm install
COPY app.js .
EXPOSE 8001
CMD ["node", "app.js"]
services/auth-service/package.json:
{
"name": "auth-service",
"version": "1.0.0",
"description": "Simple auth service for observability demo",
"main": "app.js",
"dependencies": {
"express": "^4.18.2"
}
}
Payment Worker¶
services/payment-worker/worker.py - Celery background worker:
import os
import time
import random
import json
import logging
from datetime import datetime
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='{"timestamp":"%(asctime)s","service":"payment-worker","level":"%(levelname)s","message":"%(message)s"}'
)
logger = logging.getLogger(__name__)
def process_payment(payment_id):
"""Simulate payment processing with logging."""
start_time = time.time()
# Simulate processing time
processing_time = random.uniform(0.1, 2.0)
time.sleep(processing_time)
# Simulate occasional failures
if random.random() < 0.05:
logger.error(f"Payment {payment_id} failed: timeout")
return False
duration = (time.time() - start_time) * 1000
logger.info(f"Payment {payment_id} processed successfully in {duration:.2f}ms")
return True
def main():
"""Simple worker loop for demo purposes."""
logger.info("Payment worker started")
while True:
# Simulate receiving jobs from queue
payment_id = random.randint(10000, 99999)
process_payment(payment_id)
# Wait before next job
time.sleep(random.uniform(1, 5))
if __name__ == '__main__':
main()
services/payment-worker/Dockerfile:
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY worker.py .
CMD ["python", "worker.py"]
services/payment-worker/requirements.txt:
celery==5.3.4
redis==5.0.1
psycopg2-binary==2.9.9
Section 3: Load Generator¶
services/load-generator/generate_load.py - Generates realistic traffic patterns with controlled anomalies:
import requests
import time
import random
from datetime import datetime, timedelta
import json
import os
class LoadGenerator:
"""
Generate realistic observability data with normal and anomalous patterns.
"""
def __init__(self, target_url, normal_rps=10, anomaly_probability=0.05):
self.target_url = target_url
self.normal_rps = normal_rps
self.anomaly_probability = anomaly_probability
self.session = requests.Session()
def generate_normal_traffic(self):
"""Generate normal user traffic patterns."""
patterns = [
# Pattern 1: User browsing
lambda: self.session.get(f"{self.target_url}/api/users/{random.randint(1, 1000)}"),
# Pattern 2: Search
lambda: self.session.get(f"{self.target_url}/api/search?q=product"),
# Pattern 3: Checkout (normal)
lambda: self.session.post(f"{self.target_url}/api/checkout", json={"cart_id": random.randint(1, 100)}),
]
pattern = random.choice(patterns)
try:
response = pattern()
print(f"[NORMAL] {response.status_code} - {response.url}")
except Exception as e:
print(f"[ERROR] {str(e)}")
def generate_anomaly_scenario(self):
"""Generate specific anomaly scenarios."""
scenarios = [
self.scenario_deployment_memory_leak,
self.scenario_database_connection_pool_exhaustion,
self.scenario_cache_invalidation_storm,
self.scenario_slow_query_cascade,
]
scenario = random.choice(scenarios)
print(f"\n[ANOMALY] Starting scenario: {scenario.__name__}")
scenario()
def scenario_deployment_memory_leak(self):
"""
Simulate memory leak after deployment (gradual degradation).
Sequence:
1. Deployment completes (config change)
2. Memory usage gradually increases
3. GC pressure increases
4. Query latency spikes
5. Connection pool exhaustion
"""
print(" -> Simulating deployment with memory leak")
# Generate increasing load over 5 minutes
for i in range(60):
try:
# Each request allocates more memory in the service
response = self.session.post(
f"{self.target_url}/api/checkout",
json={"trigger_memory_leak": True}
)
print(f" -> Minute {i//12}: Memory pressure increasing")
time.sleep(5)
except Exception as e:
print(f" -> Service degraded: {str(e)}")
break
def scenario_database_connection_pool_exhaustion(self):
"""
Simulate DB connection pool exhaustion.
Sequence:
1. Spike in concurrent requests
2. Slow queries hold connections
3. Pool exhausted
4. New requests timeout
"""
print(" -> Simulating DB connection pool exhaustion")
import concurrent.futures
def slow_request():
try:
response = self.session.get(
f"{self.target_url}/api/users/{random.randint(1, 1000)}",
timeout=10
)
return response.status_code
except:
return 500
# Flood with 100 concurrent requests
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
futures = [executor.submit(slow_request) for _ in range(100)]
results = [f.result() for f in concurrent.futures.as_completed(futures)]
print(f" -> Results: {results.count(200)} success, {results.count(500)} failed")
def scenario_cache_invalidation_storm(self):
"""
Simulate cache invalidation causing DB overload.
Sequence:
1. Cache gets invalidated (deployment or manual flush)
2. All requests hit database
3. DB overloaded
4. Latency spikes
"""
print(" -> Simulating cache invalidation storm")
# Trigger cache invalidation
self.session.post(f"{self.target_url}/admin/cache/flush")
# Generate high read traffic (all cache misses)
for i in range(100):
try:
response = self.session.get(
f"{self.target_url}/api/users/{random.randint(1, 10000)}"
)
if i % 10 == 0:
print(f" -> Cache miss {i}: {response.elapsed.total_seconds():.2f}s")
except:
pass
time.sleep(0.1)
def scenario_slow_query_cascade(self):
"""
Simulate slow query causing cascading failure.
Sequence:
1. One slow query blocks resources
2. Other queries queue up
3. Thread pool exhaustion
4. Service unresponsive
"""
print(" -> Simulating slow query cascade")
# Trigger slow query
self.session.post(
f"{self.target_url}/api/analytics",
json={"trigger_slow_query": True}
)
time.sleep(30) # Let cascade develop
def run(self, duration_minutes=60):
"""
Run load generator for specified duration.
Args:
duration_minutes: How long to generate traffic
"""
end_time = datetime.now() + timedelta(minutes=duration_minutes)
print(f"Starting load generator (RPS: {self.normal_rps}, anomaly prob: {self.anomaly_probability})")
print(f"Will run until: {end_time}")
request_count = 0
anomaly_count = 0
while datetime.now() < end_time:
# Decide if this cycle is normal or anomaly
if random.random() < self.anomaly_probability:
self.generate_anomaly_scenario()
anomaly_count += 1
else:
# Generate normal traffic at target RPS
for _ in range(self.normal_rps):
self.generate_normal_traffic()
time.sleep(1.0 / self.normal_rps)
request_count += 1
# Print progress every minute
if request_count % (self.normal_rps * 60) == 0:
print(f"\n[PROGRESS] Generated {request_count} requests, {anomaly_count} anomaly scenarios")
if __name__ == '__main__':
target_url = os.getenv('TARGET_URL', 'http://web-api:8000')
normal_rps = int(os.getenv('NORMAL_RPS', 10))
anomaly_probability = float(os.getenv('ANOMALY_PROBABILITY', 0.05))
generator = LoadGenerator(target_url, normal_rps, anomaly_probability)
generator.run(duration_minutes=120) # Run for 2 hours
services/load-generator/Dockerfile:
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY generate_load.py .
CMD ["python", "generate_load.py"]
services/load-generator/requirements.txt:
requests==2.31.0
Section 4: Converting OpenTelemetry Data to OCSF¶
The OpenTelemetry Collector exports all three signals (logs, traces, metrics) as JSONL files to ./logs/otel/. A single unified script converts them to OCSF-formatted Parquet files.
scripts/convert_otel_to_ocsf.py¶
Convert all OpenTelemetry JSONL exports to OCSF format:
#!/usr/bin/env python3
"""
Convert OpenTelemetry JSONL exports to OCSF (Open Cybersecurity Schema Framework) format.
This unified converter handles all three telemetry signals exported by the OTel collector:
- logs.jsonl -> ocsf_logs.parquet
- traces.jsonl -> ocsf_traces.parquet
- metrics.jsonl -> ocsf_metrics.parquet
Usage:
# Convert all signals at once
python scripts/convert_otel_to_ocsf.py
# Convert specific signal
python scripts/convert_otel_to_ocsf.py --signal logs
python scripts/convert_otel_to_ocsf.py --signal traces
python scripts/convert_otel_to_ocsf.py --signal metrics
"""
import json
import os
import sys
import argparse
from datetime import datetime
from pathlib import Path
try:
import pandas as pd
except ImportError:
print("Error: pandas is required. Install with: pip install pandas pyarrow")
sys.exit(1)
class OTelToOCSFConverter:
"""Convert OpenTelemetry JSONL exports to OCSF format."""
# OCSF class definitions
OCSF_CLASSES = {
6001: "Web Resources Activity",
6002: "Application Lifecycle",
6003: "API Activity",
6004: "Web Resource Access Activity",
99: "Metric", # Custom for metrics
}
SEVERITY_MAP = {
'TRACE': 0,
'DEBUG': 1,
'INFO': 2,
'WARN': 3,
'WARNING': 3,
'ERROR': 4,
'FATAL': 5,
'CRITICAL': 6,
}
def convert_logs(self, input_path, output_path):
"""Convert OTel logs JSONL to OCSF parquet."""
if not os.path.exists(input_path):
print(f"Logs file not found: {input_path}")
return None
ocsf_events = []
with open(input_path, 'r') as f:
for line in f:
line = line.strip()
if not line:
continue
try:
otel_record = json.loads(line)
events = self._parse_otel_log_record(otel_record)
ocsf_events.extend(events)
except json.JSONDecodeError:
continue
if not ocsf_events:
print(f"No valid log events found in {input_path}")
return None
return self._save_to_parquet(ocsf_events, output_path, "logs")
def _parse_otel_log_record(self, otel_record):
"""Parse OTel log export format and extract OCSF events."""
events = []
# OTel log export has resourceLogs -> scopeLogs -> logRecords
for resource_log in otel_record.get('resourceLogs', []):
resource = resource_log.get('resource', {})
resource_attrs = self._extract_attributes(resource.get('attributes', []))
for scope_log in resource_log.get('scopeLogs', []):
for log_record in scope_log.get('logRecords', []):
event = self._log_record_to_ocsf(log_record, resource_attrs)
if event:
events.append(event)
return events
def _log_record_to_ocsf(self, log_record, resource_attrs):
"""Convert a single OTel log record to OCSF format."""
try:
# Extract timestamp (nanoseconds to milliseconds)
time_unix_nano = log_record.get('timeUnixNano', 0)
if isinstance(time_unix_nano, str):
time_unix_nano = int(time_unix_nano)
time_ms = time_unix_nano // 1_000_000
# Get severity
severity_number = log_record.get('severityNumber', 9) # Default INFO
severity_text = log_record.get('severityText', 'INFO')
severity_id = min(severity_number // 4, 6) # OTel severity 1-24 -> OCSF 0-6
# Get message body
body = log_record.get('body', {})
message = body.get('stringValue', '') if isinstance(body, dict) else str(body)
# Try to parse structured log from message
ocsf_data = {}
if message.startswith('{'):
try:
ocsf_data = json.loads(message)
except json.JSONDecodeError:
pass
# Extract attributes
attrs = self._extract_attributes(log_record.get('attributes', []))
# Build OCSF event
ocsf_event = {
# Core OCSF fields
"class_uid": ocsf_data.get('class_uid', 6003),
"class_name": ocsf_data.get('class_name', self.OCSF_CLASSES.get(6003)),
"category_uid": ocsf_data.get('category_uid', 6),
"category_name": ocsf_data.get('category_name', "Application Activity"),
"activity_id": ocsf_data.get('activity_id', 1),
"activity_name": ocsf_data.get('activity_name', "Unknown"),
"type_uid": ocsf_data.get('type_uid', 600301),
"severity_id": ocsf_data.get('severity_id', severity_id),
"time": ocsf_data.get('time', time_ms),
# Status fields
"status_id": ocsf_data.get('status_id', 1),
"status": ocsf_data.get('status', 'Success'),
"status_code": ocsf_data.get('status_code', '200'),
# Message and service
"message": ocsf_data.get('message', message),
"service": ocsf_data.get('service', resource_attrs.get('service.name', 'unknown')),
"level": ocsf_data.get('level', severity_text),
# Duration
"duration": ocsf_data.get('duration'),
# Trace context
"trace_id": log_record.get('traceId', ocsf_data.get('trace_id')),
"span_id": log_record.get('spanId'),
}
# Handle nested objects from structured log
self._flatten_nested_objects(ocsf_event, ocsf_data)
# Add resource attributes
ocsf_event["service_version"] = resource_attrs.get('service.version')
ocsf_event["host_name"] = resource_attrs.get('host.name')
# Remove None values
ocsf_event = {k: v for k, v in ocsf_event.items() if v is not None}
return ocsf_event
except Exception as e:
return None
def _flatten_nested_objects(self, ocsf_event, ocsf_data):
"""Flatten nested OCSF objects for ML use."""
# Metadata
metadata = ocsf_data.get('metadata', {})
if metadata:
ocsf_event["metadata"] = json.dumps(metadata)
ocsf_event["metadata_version"] = metadata.get('version')
product = metadata.get('product', {})
ocsf_event["metadata_product_name"] = product.get('name')
ocsf_event["metadata_product_version"] = product.get('version')
ocsf_event["metadata_product_vendor_name"] = product.get('vendor_name')
# Actor
actor = ocsf_data.get('actor', {})
if actor:
ocsf_event["actor"] = json.dumps(actor)
user = actor.get('user', {})
ocsf_event["actor_user_uid"] = user.get('uid')
ocsf_event["actor_user_name"] = user.get('name')
ocsf_event["actor_user_email"] = user.get('email')
session = actor.get('session', {})
ocsf_event["actor_session_uid"] = session.get('uid')
# Source endpoint
src = ocsf_data.get('src_endpoint', {})
if src:
ocsf_event["src_endpoint"] = json.dumps(src)
ocsf_event["src_endpoint_ip"] = src.get('ip')
ocsf_event["src_endpoint_port"] = src.get('port')
ocsf_event["src_endpoint_domain"] = src.get('domain')
# Destination endpoint
dst = ocsf_data.get('dst_endpoint', {})
if dst:
ocsf_event["dst_endpoint"] = json.dumps(dst)
ocsf_event["dst_endpoint_ip"] = dst.get('ip')
ocsf_event["dst_endpoint_port"] = dst.get('port')
ocsf_event["dst_endpoint_svc_name"] = dst.get('svc_name')
# HTTP request
http_req = ocsf_data.get('http_request', {})
if http_req:
ocsf_event["http_request"] = json.dumps(http_req)
ocsf_event["http_request_method"] = http_req.get('method')
ocsf_event["http_request_user_agent"] = http_req.get('user_agent')
url = http_req.get('url', {})
ocsf_event["http_request_url_path"] = url.get('path')
ocsf_event["http_request_url_hostname"] = url.get('hostname')
ocsf_event["http_request_url_scheme"] = url.get('scheme')
# HTTP response
http_resp = ocsf_data.get('http_response', {})
if http_resp:
ocsf_event["http_response"] = json.dumps(http_resp)
ocsf_event["http_response_code"] = http_resp.get('code')
ocsf_event["http_response_status"] = http_resp.get('status')
ocsf_event["http_response_latency"] = http_resp.get('latency')
# Device
device = ocsf_data.get('device', {})
if device:
ocsf_event["device"] = json.dumps(device)
ocsf_event["device_hostname"] = device.get('hostname')
ocsf_event["device_type"] = device.get('type')
# Resources
resources = ocsf_data.get('resources', [])
if resources:
ocsf_event["resources"] = json.dumps(resources)
if len(resources) > 0:
ocsf_event["resource_type"] = resources[0].get('type')
ocsf_event["resource_uid"] = resources[0].get('uid')
# Anomaly
anomaly = ocsf_data.get('anomaly', {})
if anomaly:
ocsf_event["anomaly"] = json.dumps(anomaly)
ocsf_event["anomaly_type"] = anomaly.get('type')
ocsf_event["anomaly_severity"] = anomaly.get('severity')
# Error
error = ocsf_data.get('error', {})
if error:
ocsf_event["error"] = json.dumps(error)
ocsf_event["error_message"] = error.get('message')
ocsf_event["error_type"] = error.get('type')
def convert_traces(self, input_path, output_path):
"""Convert OTel traces JSONL to OCSF parquet."""
if not os.path.exists(input_path):
print(f"Traces file not found: {input_path}")
return None
ocsf_events = []
with open(input_path, 'r') as f:
for line in f:
line = line.strip()
if not line:
continue
try:
otel_record = json.loads(line)
events = self._parse_otel_trace_record(otel_record)
ocsf_events.extend(events)
except json.JSONDecodeError:
continue
if not ocsf_events:
print(f"No valid trace events found in {input_path}")
return None
return self._save_to_parquet(ocsf_events, output_path, "traces")
def _parse_otel_trace_record(self, otel_record):
"""Parse OTel trace export format and extract OCSF events."""
events = []
for resource_span in otel_record.get('resourceSpans', []):
resource = resource_span.get('resource', {})
resource_attrs = self._extract_attributes(resource.get('attributes', []))
for scope_span in resource_span.get('scopeSpans', []):
for span in scope_span.get('spans', []):
event = self._span_to_ocsf(span, resource_attrs)
if event:
events.append(event)
return events
def _span_to_ocsf(self, span, resource_attrs):
"""Convert a single OTel span to OCSF format."""
try:
# Extract timing (nanoseconds to milliseconds)
start_time = int(span.get('startTimeUnixNano', '0')) // 1_000_000
end_time = int(span.get('endTimeUnixNano', '0')) // 1_000_000
duration_ms = end_time - start_time if end_time > start_time else 0
# Extract attributes
attrs = self._extract_attributes(span.get('attributes', []))
# Determine activity type from span kind
span_kind = span.get('kind', 1)
activity_map = {1: 1, 2: 2, 3: 2, 4: 1, 5: 1} # Internal, Server, Client, Producer, Consumer
activity_id = activity_map.get(span_kind, 1)
# Determine status
status = span.get('status', {})
status_code = status.get('code', 0)
status_id = 1 if status_code == 0 else 2
# Build OCSF event
ocsf_event = {
"class_uid": 6003,
"class_name": "API Activity",
"category_uid": 6,
"category_name": "Application Activity",
"activity_id": activity_id,
"activity_name": ["Unknown", "Create", "Read", "Update", "Delete"][min(activity_id, 4)],
"type_uid": 600300 + activity_id,
"severity_id": 4 if status_id == 2 else 2,
"time": start_time,
"status_id": status_id,
"status": "Success" if status_id == 1 else "Failure",
"status_code": str(attrs.get('http.status_code', 200)),
"message": span.get('name', ''),
"service": resource_attrs.get('service.name', 'unknown'),
"duration": duration_ms,
"trace_id": span.get('traceId'),
"span_id": span.get('spanId'),
"parent_span_id": span.get('parentSpanId'),
# HTTP attributes if present
"http_request_method": attrs.get('http.method'),
"http_request_url_path": attrs.get('http.target') or attrs.get('http.url'),
"http_response_code": attrs.get('http.status_code'),
}
# Remove None values
ocsf_event = {k: v for k, v in ocsf_event.items() if v is not None}
return ocsf_event
except Exception as e:
return None
def convert_metrics(self, input_path, output_path):
"""Convert OTel metrics JSONL to OCSF parquet."""
if not os.path.exists(input_path):
print(f"Metrics file not found: {input_path}")
return None
ocsf_events = []
with open(input_path, 'r') as f:
for line in f:
line = line.strip()
if not line:
continue
try:
otel_record = json.loads(line)
events = self._parse_otel_metrics_record(otel_record)
ocsf_events.extend(events)
except json.JSONDecodeError:
continue
if not ocsf_events:
print(f"No valid metric events found in {input_path}")
return None
return self._save_to_parquet(ocsf_events, output_path, "metrics")
def _parse_otel_metrics_record(self, otel_record):
"""Parse OTel metrics export format and extract OCSF events."""
events = []
for resource_metric in otel_record.get('resourceMetrics', []):
resource = resource_metric.get('resource', {})
resource_attrs = self._extract_attributes(resource.get('attributes', []))
for scope_metric in resource_metric.get('scopeMetrics', []):
for metric in scope_metric.get('metrics', []):
metric_events = self._metric_to_ocsf(metric, resource_attrs)
events.extend(metric_events)
return events
def _metric_to_ocsf(self, metric, resource_attrs):
"""Convert a single OTel metric to OCSF format."""
events = []
metric_name = metric.get('name', 'unknown')
metric_unit = metric.get('unit', '')
metric_description = metric.get('description', '')
# Handle different metric types
data_points = []
if 'sum' in metric:
data_points = metric['sum'].get('dataPoints', [])
metric_type = 'counter'
elif 'gauge' in metric:
data_points = metric['gauge'].get('dataPoints', [])
metric_type = 'gauge'
elif 'histogram' in metric:
data_points = metric['histogram'].get('dataPoints', [])
metric_type = 'histogram'
elif 'summary' in metric:
data_points = metric['summary'].get('dataPoints', [])
metric_type = 'summary'
else:
return events
for dp in data_points:
try:
# Get timestamp
time_unix_nano = dp.get('timeUnixNano', 0)
if isinstance(time_unix_nano, str):
time_unix_nano = int(time_unix_nano)
time_ms = time_unix_nano // 1_000_000
# Get value
if 'asDouble' in dp:
value = dp['asDouble']
elif 'asInt' in dp:
value = dp['asInt']
elif 'sum' in dp:
value = dp['sum']
elif 'count' in dp:
value = dp['count']
else:
value = 0
# Get labels
labels = self._extract_attributes(dp.get('attributes', []))
ocsf_event = {
"class_uid": 99, # Custom metric class
"class_name": "Metric",
"category_uid": 6,
"category_name": "Application Activity",
"activity_id": 0,
"activity_name": "Observe",
"type_uid": 9900,
"severity_id": 2, # Info
"time": time_ms,
"metric_name": metric_name,
"metric_value": value,
"metric_type": metric_type,
"metric_unit": metric_unit,
"metric_description": metric_description,
"service": resource_attrs.get('service.name') or labels.get('job', 'unknown'),
# Common metric labels
"endpoint": labels.get('endpoint'),
"method": labels.get('method'),
"status": labels.get('status'),
"job": labels.get('job'),
"instance": labels.get('instance'),
}
# Add all labels as flattened fields
for k, v in labels.items():
key = f"label_{k.replace('.', '_').replace('-', '_')}"
ocsf_event[key] = v
# Remove None values
ocsf_event = {k: v for k, v in ocsf_event.items() if v is not None}
events.append(ocsf_event)
except Exception as e:
continue
return events
def _extract_attributes(self, attrs_list):
"""Extract OTel attributes from list format to dict."""
result = {}
for attr in attrs_list:
key = attr.get('key', '')
value = attr.get('value', {})
if 'stringValue' in value:
result[key] = value['stringValue']
elif 'intValue' in value:
result[key] = int(value['intValue'])
elif 'doubleValue' in value:
result[key] = float(value['doubleValue'])
elif 'boolValue' in value:
result[key] = value['boolValue']
return result
def _save_to_parquet(self, events, output_path, signal_type):
"""Save events to parquet file."""
if not events:
print(f"Warning: No {signal_type} events to save")
return None
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
df = pd.DataFrame(events)
df.to_parquet(output_path, compression='snappy')
print(f"Saved {len(df)} {signal_type} events to {output_path}")
print(f" Columns: {len(df.columns)}")
return df
def main():
parser = argparse.ArgumentParser(
description='Convert OpenTelemetry JSONL exports to OCSF format',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Convert all signals
python scripts/convert_otel_to_ocsf.py
# Convert specific signal
python scripts/convert_otel_to_ocsf.py --signal logs
python scripts/convert_otel_to_ocsf.py --signal traces
python scripts/convert_otel_to_ocsf.py --signal metrics
# Custom input/output directories
python scripts/convert_otel_to_ocsf.py --input-dir ./logs/otel --output-dir ./data
"""
)
parser.add_argument('--signal', choices=['logs', 'traces', 'metrics', 'all'],
default='all', help='Which signal to convert (default: all)')
parser.add_argument('--input-dir', default='./logs/otel',
help='Directory containing OTel JSONL files (default: ./logs/otel)')
parser.add_argument('--output-dir', default='./data',
help='Output directory for parquet files (default: ./data)')
args = parser.parse_args()
converter = OTelToOCSFConverter()
results = {}
if args.signal in ['logs', 'all']:
logs_input = os.path.join(args.input_dir, 'logs.jsonl')
logs_output = os.path.join(args.output_dir, 'ocsf_logs.parquet')
print(f"\nConverting logs: {logs_input} -> {logs_output}")
results['logs'] = converter.convert_logs(logs_input, logs_output)
if args.signal in ['traces', 'all']:
traces_input = os.path.join(args.input_dir, 'traces.jsonl')
traces_output = os.path.join(args.output_dir, 'ocsf_traces.parquet')
print(f"\nConverting traces: {traces_input} -> {traces_output}")
results['traces'] = converter.convert_traces(traces_input, traces_output)
if args.signal in ['metrics', 'all']:
metrics_input = os.path.join(args.input_dir, 'metrics.jsonl')
metrics_output = os.path.join(args.output_dir, 'ocsf_metrics.parquet')
print(f"\nConverting metrics: {metrics_input} -> {metrics_output}")
results['metrics'] = converter.convert_metrics(metrics_input, metrics_output)
# Summary
print("\n" + "=" * 50)
print("Conversion Summary:")
for signal, df in results.items():
if df is not None:
print(f" {signal}: {len(df)} events, {len(df.columns)} columns")
else:
print(f" {signal}: No data (file may not exist yet)")
if __name__ == '__main__':
main()
Why unified conversion?
Single source of truth: All telemetry flows through OpenTelemetry Collector
Consistent format: JSONL export format is the same for all signals
Simpler workflow: One script handles logs, traces, and metrics
OCSF compliance: Automatic field flattening for ML use
Section 5: Running the Data Generation Pipeline¶
Quick Start¶
# 1. Download and extract the code (or use the zip from this appendix)
cd appendix-code
# 2. Create required directories (with write permissions for containers)
# Note: If you previously ran docker compose, the directory may be root-owned
sudo rm -rf ./logs/otel 2>/dev/null || true
mkdir -p ./logs/otel ./data
chmod 777 ./logs/otel
# 3. Build services (ensures latest code)
docker compose build
# 4. Start all services
docker compose up -d
# 5. Verify services are running
docker compose ps
# 6. Let the load generator run for a while (e.g., 5-10 minutes for demo, 2 hours for full dataset)
# The load-generator service automatically sends traffic to web-api
# All telemetry flows through OpenTelemetry Collector to ./logs/otel/
# 7. Convert all OpenTelemetry data to OCSF format (single command):
python scripts/convert_otel_to_ocsf.py
# 8. Output files (ready for Parts 2-3 of tutorial):
# - data/ocsf_logs.parquet (application logs in OCSF format)
# - data/ocsf_traces.parquet (distributed traces in OCSF format)
# - data/ocsf_metrics.parquet (system metrics in OCSF format)Prerequisites¶
Before running, ensure you have:
Docker 20.10+ with Compose plugin (uses
docker compose, notdocker-compose)Python 3.8+ with pandas and pyarrow:
pip install pandas pyarrow
Stopping Services¶
# Stop all services
docker compose down
# Stop and remove volumes (clears all data)
docker compose down -vGenerated Dataset Statistics¶
After 2 hours of generation, you’ll have:
~72,000 normal events (10 RPS * 7200 seconds)
~6-8 anomaly scenarios (5% anomaly probability)
Event types:
60% user browsing (GET /api/users)
25% search operations
15% checkout operations
Anomaly types (unlabeled - for natural occurrence):
Memory leak (gradual degradation over 30-60 min)
Connection pool exhaustion (sudden spike)
Cache invalidation storm (burst of DB hits)
Slow query cascade (thread pool exhaustion)
Important: The generated data is unlabeled - anomalies occur naturally in the traffic patterns without explicit labels. This matches the series approach:
Part 4: Self-supervised learning (no labels needed)
Part 6: Unsupervised anomaly detection (LOF, Isolation Forest, k-NN)
Section 6: Using Generated Data in Tutorial (No Labels Needed)¶
Now you can use the generated data in Part 2 (TabularResNet) and Part 3 (Feature Engineering). No labeling required - the self-supervised and unsupervised methods work on raw observability data.
# In Part 3: Feature Engineering
import pandas as pd
# Load your generated OCSF data (unlabeled)
ocsf_df = pd.read_parquet('data/ocsf_logs.parquet')
# Now follow Part 3 to extract features
categorical_features = ['service', 'http_method', 'status_id']
numerical_features = ['duration', 'time', 'severity_id']
# Continue with feature engineering from Part 3...Optional: Generating Labels for Method Evaluation¶
If you want to evaluate different anomaly detection methods (Part 6, Section 7: Method Comparison), you can optionally label a small subset for validation:
scripts/label_subset_for_evaluation.py
#!/usr/bin/env python3
"""
Label a small subset of OCSF events for evaluating anomaly detection methods.
This is OPTIONAL - only needed if you want to compare method performance.
The main training (Part 4) doesn't need labels - it uses self-supervised learning.
Usage:
python scripts/label_subset_for_evaluation.py
"""
import os
import sys
import pandas as pd
def label_evaluation_subset(ocsf_events_path, sample_size=1000):
"""
Label a small subset for evaluating anomaly detection methods.
Args:
ocsf_events_path: Path to OCSF Parquet file
sample_size: How many events to label for evaluation
Returns:
Small labeled DataFrame for validation
"""
# Check if input file exists
if not os.path.exists(ocsf_events_path):
print(f"Input file not found: {ocsf_events_path}")
print()
print("Please run the OCSF conversion first:")
print(" python scripts/convert_otel_to_ocsf.py")
return None
df = pd.read_parquet(ocsf_events_path)
if len(df) == 0:
print(f"No events found in {ocsf_events_path}")
print("Make sure docker compose services are running and generating logs.")
return None
print(f"Loaded {len(df)} events with columns: {list(df.columns)}")
# Sample a subset
eval_df = df.sample(n=min(sample_size, len(df)), random_state=42).copy()
# Initialize anomaly labels
eval_df['is_anomaly'] = 0
eval_df['anomaly_reason'] = ''
anomaly_count = 0
# Label based on available columns
# 1. Check for explicit anomaly field (from new structured logs)
if 'anomaly_type' in eval_df.columns:
mask = eval_df['anomaly_type'].notna()
eval_df.loc[mask, 'is_anomaly'] = 1
eval_df.loc[mask, 'anomaly_reason'] = 'explicit_anomaly_' + eval_df.loc[mask, 'anomaly_type'].astype(str)
anomaly_count += mask.sum()
print(f" Found {mask.sum()} events with explicit anomaly markers")
# 2. High latency (if duration field exists)
if 'duration' in eval_df.columns:
# Duration might be in ms
duration_threshold = 2000 # 2 seconds in ms
mask = eval_df['duration'].notna() & (eval_df['duration'] > duration_threshold)
eval_df.loc[mask, 'is_anomaly'] = 1
eval_df.loc[mask & (eval_df['anomaly_reason'] == ''), 'anomaly_reason'] = 'high_latency'
new_anomalies = mask.sum() - (eval_df.loc[mask, 'anomaly_reason'].str.contains('explicit', na=False)).sum()
if new_anomalies > 0:
print(f" Found {new_anomalies} events with high latency (>{duration_threshold}ms)")
# 3. HTTP response latency (alternative duration field)
if 'http_response_latency' in eval_df.columns:
mask = eval_df['http_response_latency'].notna() & (eval_df['http_response_latency'] > 2000)
eval_df.loc[mask, 'is_anomaly'] = 1
eval_df.loc[mask & (eval_df['anomaly_reason'] == ''), 'anomaly_reason'] = 'high_response_latency'
# 4. Error status (failures)
if 'status_id' in eval_df.columns:
mask = eval_df['status_id'] == 2 # Failure
eval_df.loc[mask, 'is_anomaly'] = 1
eval_df.loc[mask & (eval_df['anomaly_reason'] == ''), 'anomaly_reason'] = 'status_failure'
print(f" Found {mask.sum()} events with failure status")
# 5. HTTP error codes (4xx, 5xx)
if 'http_response_code' in eval_df.columns:
mask = eval_df['http_response_code'].notna() & (eval_df['http_response_code'] >= 400)
eval_df.loc[mask, 'is_anomaly'] = 1
eval_df.loc[mask & (eval_df['anomaly_reason'] == ''), 'anomaly_reason'] = 'http_error'
# 6. High severity (ERROR, CRITICAL)
if 'severity_id' in eval_df.columns:
mask = eval_df['severity_id'] >= 4 # ERROR or higher
eval_df.loc[mask, 'is_anomaly'] = 1
eval_df.loc[mask & (eval_df['anomaly_reason'] == ''), 'anomaly_reason'] = 'high_severity'
print(f" Found {mask.sum()} events with high severity (ERROR+)")
# 7. Check message content for specific anomaly keywords (not generic "error")
# Only check INFO-level messages to avoid double-counting ERROR logs
if 'message' in eval_df.columns:
# Use specific keywords that indicate anomalies, not generic "error"
anomaly_keywords = ['timeout', 'leak', 'storm', 'exhausted', 'overflow', 'oom', 'killed']
info_mask = eval_df['severity_id'] <= 2 if 'severity_id' in eval_df.columns else True
for keyword in anomaly_keywords:
mask = info_mask & eval_df['message'].str.lower().str.contains(keyword, na=False)
new_anomalies = mask & (eval_df['is_anomaly'] == 0)
eval_df.loc[mask, 'is_anomaly'] = 1
eval_df.loc[new_anomalies & (eval_df['anomaly_reason'] == ''), 'anomaly_reason'] = f'keyword_{keyword}'
# Summary
total_anomalies = eval_df['is_anomaly'].sum()
print(f"\nLabeled {total_anomalies} / {len(eval_df)} events as anomalies for evaluation")
print(f"Anomaly rate: {total_anomalies/len(eval_df)*100:.2f}%")
if total_anomalies > 0:
print(f"\nAnomaly reasons:")
reason_counts = eval_df[eval_df['is_anomaly'] == 1]['anomaly_reason'].value_counts()
for reason, count in reason_counts.items():
print(f" {reason}: {count}")
# Save small evaluation set
output_path = 'data/ocsf_eval_subset.parquet'
eval_df.to_parquet(output_path)
print(f"\nSaved evaluation subset to {output_path}")
return eval_df
if __name__ == '__main__':
# Generate small labeled subset for Part 6 evaluation (optional)
result = label_evaluation_subset('data/ocsf_logs.parquet', sample_size=1000)
if result is None:
sys.exit(1)
When to use labels:
✅ For evaluation only (Part 6, Section 7: comparing LOF vs Isolation Forest vs k-NN)
❌ Not for training (Part 4 uses self-supervised learning on unlabeled data)
Section 7: Releasing as Open Source¶
README.md for the repository¶
# OCSF Training Data Generator
Generate realistic observability data in OCSF format for **self-supervised** anomaly detection training.
## Architecture
All telemetry flows through **OpenTelemetry Collector**:
- **Logs**: Services -> OTLP -> OTel Collector -> logs.jsonl
- **Traces**: Services -> OTLP -> OTel Collector -> traces.jsonl
- **Metrics**: OTel Collector scrapes /metrics -> metrics.jsonl
## What This Generates
- Realistic observability data from a multi-service application
- Normal traffic + anomaly scenarios (unlabeled)
- **Logs**, **metrics**, and **traces** in OCSF format
- No labels required - use with self-supervised learning
## Quick Start
```bash
# 1. Create required directories (with write permissions for containers)
# Note: If you previously ran docker compose, the directory may be root-owned
sudo rm -rf ./logs/otel 2>/dev/null || true
mkdir -p ./logs/otel ./data
chmod 777 ./logs/otel
# 2. Build services (ensures latest code)
docker compose build
# 3. Start all services
docker compose up -d
# 4. Let the load generator run (5-10 min for demo, 2 hours for full dataset)
# Traffic is automatically generated
# 5. Convert all OpenTelemetry data to OCSF format:
python scripts/convert_otel_to_ocsf.py
# 6. (Optional) Generate small labeled subset for evaluation
python scripts/label_subset_for_evaluation.py
```
## Prerequisites
- Docker 20.10+ with Compose plugin (uses `docker compose`, not `docker-compose`)
- Python 3.8+ with: `pip install pandas pyarrow`
## Output Datasets
After running the conversion script:
- `data/ocsf_logs.parquet` - Application logs in OCSF format (unlabeled)
- `data/ocsf_traces.parquet` - Distributed traces in OCSF format (unlabeled)
- `data/ocsf_metrics.parquet` - System metrics in OCSF format (unlabeled)
- `data/ocsf_eval_subset.parquet` - Small labeled subset for evaluation (optional)
## Stack Components
| Service | Role | Telemetry |
|---------|------|-----------|
| **web-api** | Flask service with observability | Logs + traces via OTLP, metrics via /metrics |
| **auth-service** | Node.js authentication service | Logs via stdout |
| **payment-worker** | Background job processor | Logs via stdout |
| **postgres** | Database | - |
| **redis** | Cache | - |
| **otel-collector** | Unified telemetry hub | Exports all signals to JSONL |
| **load-generator** | Traffic patterns with anomalies | - |
## Conversion Script
| Script | Input | Output |
|--------|-------|--------|
| `convert_otel_to_ocsf.py` | `logs/otel/*.jsonl` | `data/ocsf_*.parquet` |
Convert specific signals:
```bash
python scripts/convert_otel_to_ocsf.py --signal logs
python scripts/convert_otel_to_ocsf.py --signal traces
python scripts/convert_otel_to_ocsf.py --signal metrics
```
## Use Cases
- Self-supervised training with TabularResNet (Part 4)
- Unsupervised anomaly detection (Part 6: LOF, Isolation Forest, k-NN)
- Testing observability systems
- Learning OCSF schema
## Important: No Labels Needed
This generator creates **unlabeled** data that works with:
- **Self-supervised learning** (Part 4: contrastive learning, masked prediction)
- **Unsupervised anomaly detection** (Part 6: LOF, Isolation Forest, k-NN)
Labels are only needed for evaluation/comparison (Part 6, Section 7), not training.
## Stopping Services
```bash
docker compose down # Stop services
docker compose down -v # Stop and remove volumes
```
## License
MIT - Free to use for commercial and non-commercial purposes
Summary¶
This appendix provides a complete, open-source solution for generating realistic unlabeled observability data for self-supervised learning:
Docker Compose stack: Multi-service application with unified OpenTelemetry instrumentation
Instrumented services: Web API, auth service, payment worker emitting telemetry via OTLP
OpenTelemetry Collector: Central hub receiving logs/traces via OTLP and scraping metrics
Load generator: Creates normal traffic + anomaly scenarios (naturally occurring, unlabeled)
Unified OCSF converter: Single script converts all OTel exports to OCSF-formatted Parquet:
data/ocsf_logs.parquet- Application logs (via OTLP)data/ocsf_traces.parquet- Distributed traces (via OTLP)data/ocsf_metrics.parquet- System metrics (scraped by OTel)
Optional evaluation labels: Small labeled subset for comparing detection methods (Part 6, Section 7 only)
Key difference from supervised learning:
✅ Generates unlabeled data for self-supervised training (Part 4)
✅ Works with unsupervised detection (Part 6: LOF, Isolation Forest, k-NN)
⚠️ Labels are optional - only for evaluation, not training
To use in the tutorial series: Links from Part 3 (Feature Engineering) and Part 4 (Self-Supervised Training) point to this appendix for readers who need training data.