Observability Overview¶
The Federated Learning Platform implements comprehensive observability using modern tools and practices to ensure system reliability, performance monitoring, and effective troubleshooting across distributed federated learning environments.
Observability Stack¶
graph TB
subgraph "Application Layer"
FRONTEND[Next.js Frontend<br/>Browser Telemetry]
BACKEND[FastAPI Backend<br/>API Telemetry]
FL_SERVER[FL Aggregator<br/>Training Telemetry]
FL_CLIENTS[FL Clients<br/>Distributed Telemetry]
end
subgraph "Collection Layer"
OTEL_COLLECTOR[OpenTelemetry Collector<br/>Unified Telemetry Pipeline]
subgraph "Receivers"
OTLP_GRPC[OTLP gRPC<br/>Port 4317]
OTLP_HTTP[OTLP HTTP<br/>Port 4318]
PROMETHEUS_RECEIVER[Prometheus<br/>Metrics Scraping]
end
subgraph "Processors"
BATCH_PROCESSOR[Batch Processor<br/>Performance Optimization]
MEMORY_LIMITER[Memory Limiter<br/>Resource Protection]
ATTRIBUTE_PROCESSOR[Attribute Processor<br/>Data Enrichment]
end
subgraph "Exporters"
TEMPO_EXPORTER[Tempo Exporter<br/>Distributed Tracing]
GRAFANA_EXPORTER[Grafana Exporter<br/>Metrics & Logs]
DEBUG_EXPORTER[Debug Exporter<br/>Development]
end
end
subgraph "Storage Layer"
TEMPO[Grafana Tempo<br/>Trace Storage]
PROMETHEUS[Prometheus<br/>Metrics Storage]
LOKI[Grafana Loki<br/>Log Aggregation]
end
subgraph "Visualization Layer"
GRAFANA[Grafana<br/>Dashboards & Alerts]
ALERTMANAGER[AlertManager<br/>Notification Routing]
end
subgraph "Analysis Layer"
JAEGER_UI[Jaeger UI<br/>Trace Analysis]
CUSTOM_DASHBOARDS[Custom Dashboards<br/>FL Metrics]
SLA_MONITORING[SLA Monitoring<br/>Performance Tracking]
end
FRONTEND --> OTLP_HTTP
BACKEND --> OTLP_GRPC
FL_SERVER --> OTLP_GRPC
FL_CLIENTS --> OTLP_GRPC
OTLP_GRPC --> OTEL_COLLECTOR
OTLP_HTTP --> OTEL_COLLECTOR
PROMETHEUS_RECEIVER --> OTEL_COLLECTOR
OTEL_COLLECTOR --> BATCH_PROCESSOR
BATCH_PROCESSOR --> MEMORY_LIMITER
MEMORY_LIMITER --> ATTRIBUTE_PROCESSOR
ATTRIBUTE_PROCESSOR --> TEMPO_EXPORTER
ATTRIBUTE_PROCESSOR --> GRAFANA_EXPORTER
ATTRIBUTE_PROCESSOR --> DEBUG_EXPORTER
TEMPO_EXPORTER --> TEMPO
GRAFANA_EXPORTER --> PROMETHEUS
GRAFANA_EXPORTER --> LOKI
TEMPO --> GRAFANA
PROMETHEUS --> GRAFANA
LOKI --> GRAFANA
GRAFANA --> ALERTMANAGER
GRAFANA --> JAEGER_UI
GRAFANA --> CUSTOM_DASHBOARDS
GRAFANA --> SLA_MONITORING
Three Pillars of Observability¶
1. Metrics¶
Quantitative measurements of system behavior over time.
System Metrics¶
- CPU Usage: Processor utilization across all nodes
- Memory Usage: RAM consumption and allocation patterns
- Disk I/O: Storage read/write operations and latency
- Network I/O: Bandwidth utilization and packet loss
Application Metrics¶
- Request Rate: API requests per second
- Response Time: Request processing latency (p50, p95, p99)
- Error Rate: Failed requests percentage
- Throughput: Successful operations per unit time
Federated Learning Metrics¶
- Training Progress: Rounds completed, accuracy improvements
- Client Participation: Active clients, dropout rates
- Model Performance: Loss reduction, convergence metrics
- Communication Overhead: Data transfer volumes, compression ratios
2. Traces¶
Distributed request tracking across service boundaries.
Trace Structure¶
graph LR
subgraph "Training Request Trace"
SPAN1[Frontend Request<br/>user-action]
SPAN2[Backend Processing<br/>training-start]
SPAN3[Flower Initialization<br/>fl-setup]
SPAN4[Client Deployment<br/>ansible-deploy]
SPAN5[Model Training<br/>local-training]
SPAN6[Aggregation<br/>model-aggregate]
end
SPAN1 --> SPAN2
SPAN2 --> SPAN3
SPAN3 --> SPAN4
SPAN4 --> SPAN5
SPAN5 --> SPAN6
Key Trace Scenarios¶
- User Authentication Flow: Login → Token validation → Session creation
- Training Job Lifecycle: Initialization → Client deployment → Training rounds → Completion
- Model Aggregation: Client updates → Validation → Aggregation → Distribution
- Error Propagation: Failure detection → Error handling → Recovery actions
3. Logs¶
Structured event records with contextual information.
Log Levels and Usage¶
# Structured logging implementation
import logging
import json
from datetime import datetime
class StructuredLogger:
def __init__(self, service_name: str):
self.service_name = service_name
self.logger = logging.getLogger(service_name)
# Configure JSON formatter
handler = logging.StreamHandler()
handler.setFormatter(self.JSONFormatter())
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
class JSONFormatter(logging.Formatter):
def format(self, record):
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"service": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
}
# Add trace context if available
if hasattr(record, 'trace_id'):
log_entry["trace_id"] = record.trace_id
if hasattr(record, 'span_id'):
log_entry["span_id"] = record.span_id
# Add custom attributes
if hasattr(record, 'user_id'):
log_entry["user_id"] = record.user_id
if hasattr(record, 'job_id'):
log_entry["job_id"] = record.job_id
return json.dumps(log_entry)
def info(self, message: str, **kwargs):
extra = {k: v for k, v in kwargs.items()}
self.logger.info(message, extra=extra)
def error(self, message: str, **kwargs):
extra = {k: v for k, v in kwargs.items()}
self.logger.error(message, extra=extra)
def warning(self, message: str, **kwargs):
extra = {k: v for k, v in kwargs.items()}
self.logger.warning(message, extra=extra)
# Usage example
logger = StructuredLogger("fl-backend")
logger.info(
"Training job started",
user_id="user123",
job_id="job456",
trace_id="trace789"
)
OpenTelemetry Implementation¶
Instrumentation Setup¶
Backend Instrumentation¶
# OpenTelemetry setup for FastAPI backend
from opentelemetry import trace, metrics
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.pymongo import PymongoInstrumentor
def setup_telemetry():
"""Configure OpenTelemetry for the backend service."""
# Trace provider setup
trace.set_tracer_provider(TracerProvider(
resource=Resource.create({
"service.name": "fl-backend",
"service.version": "1.2.0",
"deployment.environment": os.getenv("ENVIRONMENT", "development")
})
))
# Trace exporter
trace_exporter = OTLPSpanExporter(
endpoint="http://otel-collector:4317",
insecure=True
)
span_processor = BatchSpanProcessor(trace_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# Metrics provider setup
metric_reader = PeriodicExportingMetricReader(
OTLPMetricExporter(
endpoint="http://otel-collector:4317",
insecure=True
),
export_interval_millis=5000
)
metrics.set_meter_provider(MeterProvider(
resource=Resource.create({
"service.name": "fl-backend",
"service.version": "1.2.0"
}),
metric_readers=[metric_reader]
))
# Auto-instrumentation
FastAPIInstrumentor.instrument_app(app)
PymongoInstrumentor().instrument()
# Custom metrics
meter = metrics.get_meter(__name__)
# Define custom metrics
training_jobs_counter = meter.create_counter(
"training_jobs_total",
description="Total number of training jobs started"
)
active_clients_gauge = meter.create_up_down_counter(
"active_clients",
description="Number of active federated learning clients"
)
training_duration_histogram = meter.create_histogram(
"training_duration_seconds",
description="Duration of training rounds in seconds"
)
# Usage in application code
@app.post("/training/start")
async def start_training(config: TrainingConfig):
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("start_training") as span:
span.set_attribute("training.rounds", config.rounds)
span.set_attribute("training.clients", config.clients)
span.set_attribute("training.model_type", config.model_type)
# Increment counter
training_jobs_counter.add(1, {"model_type": config.model_type})
# Start training logic
job = await training_service.start_training(config)
span.set_attribute("job.id", job.id)
span.set_status(Status(StatusCode.OK))
return job
Federated Learning Instrumentation¶
# FL client instrumentation
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
class InstrumentedFlowerClient(FlowerClient):
"""Flower client with OpenTelemetry instrumentation."""
def __init__(self, model, train_loader, test_loader, device, client_id):
super().__init__(model, train_loader, test_loader, device)
self.client_id = client_id
self.tracer = trace.get_tracer(__name__)
# Setup telemetry
self.setup_telemetry()
def setup_telemetry(self):
"""Configure OpenTelemetry for FL client."""
trace.set_tracer_provider(TracerProvider(
resource=Resource.create({
"service.name": "fl-client",
"service.instance.id": self.client_id,
"service.version": "1.2.0"
})
))
trace_exporter = OTLPSpanExporter(
endpoint="http://otel-collector:4317",
insecure=True
)
span_processor = BatchSpanProcessor(trace_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
def fit(self, parameters, config):
"""Instrumented training method."""
with self.tracer.start_as_current_span("client_training") as span:
span.set_attribute("client.id", self.client_id)
span.set_attribute("training.round", config.get("round", 0))
span.set_attribute("training.epochs", config.get("local_epochs", 1))
start_time = time.time()
# Perform training
result = super().fit(parameters, config)
duration = time.time() - start_time
span.set_attribute("training.duration_seconds", duration)
span.set_attribute("training.loss", result[2].get("loss", 0))
return result
def evaluate(self, parameters, config):
"""Instrumented evaluation method."""
with self.tracer.start_as_current_span("client_evaluation") as span:
span.set_attribute("client.id", self.client_id)
span.set_attribute("evaluation.round", config.get("round", 0))
start_time = time.time()
# Perform evaluation
loss, num_examples, metrics = super().evaluate(parameters, config)
duration = time.time() - start_time
span.set_attribute("evaluation.duration_seconds", duration)
span.set_attribute("evaluation.loss", loss)
span.set_attribute("evaluation.accuracy", metrics.get("accuracy", 0))
span.set_attribute("evaluation.examples", num_examples)
return loss, num_examples, metrics
Collector Configuration¶
# otel-collector-config.yaml
receivers:
otlp:
protocols:
grpc:
endpoint: "0.0.0.0:4317"
http:
endpoint: "0.0.0.0:4318"
prometheus:
config:
scrape_configs:
- job_name: 'fl-backend'
static_configs:
- targets: ['backend-fastapi:8001']
- job_name: 'fl-clients'
static_configs:
- targets: ['client1:8082', 'client2:8083']
processors:
batch:
timeout: 1s
send_batch_size: 1024
send_batch_max_size: 2048
memory_limiter:
check_interval: 1s
limit_mib: 1000
spike_limit_mib: 200
attributes:
actions:
- key: environment
value: production
action: insert
- key: cluster
value: fl-cluster
action: insert
resource:
attributes:
- key: service.namespace
value: federated-learning
action: insert
exporters:
debug:
verbosity: detailed
otlp/tempo:
endpoint: tempo:4317
tls:
insecure: true
otlphttp/grafana:
endpoint: http://grafana:3000/api/otel/v1/metrics
tls:
insecure: true
prometheus:
endpoint: "0.0.0.0:8889"
namespace: federated_learning
service:
pipelines:
traces:
receivers: [otlp]
processors: [memory_limiter, batch, attributes, resource]
exporters: [otlp/tempo, debug]
metrics:
receivers: [otlp, prometheus]
processors: [memory_limiter, batch, attributes, resource]
exporters: [otlphttp/grafana, prometheus, debug]
logs:
receivers: [otlp]
processors: [memory_limiter, batch, attributes, resource]
exporters: [debug]
extensions: [health_check, pprof, zpages]
Grafana Dashboards¶
Federated Learning Dashboard¶
{
"dashboard": {
"title": "Federated Learning Overview",
"tags": ["federated-learning", "ml", "training"],
"panels": [
{
"title": "Active Training Jobs",
"type": "stat",
"targets": [
{
"expr": "sum(training_jobs_active)",
"legendFormat": "Active Jobs"
}
]
},
{
"title": "Client Participation Rate",
"type": "gauge",
"targets": [
{
"expr": "sum(fl_clients_connected) / sum(fl_clients_total) * 100",
"legendFormat": "Participation %"
}
]
},
{
"title": "Training Progress",
"type": "graph",
"targets": [
{
"expr": "avg(training_accuracy) by (job_id)",
"legendFormat": "Accuracy - {{job_id}}"
},
{
"expr": "avg(training_loss) by (job_id)",
"legendFormat": "Loss - {{job_id}}"
}
]
},
{
"title": "System Resource Usage",
"type": "graph",
"targets": [
{
"expr": "avg(cpu_usage_percent) by (instance)",
"legendFormat": "CPU - {{instance}}"
},
{
"expr": "avg(memory_usage_percent) by (instance)",
"legendFormat": "Memory - {{instance}}"
}
]
}
]
}
}
Performance Monitoring Dashboard¶
{
"dashboard": {
"title": "System Performance",
"panels": [
{
"title": "API Response Times",
"type": "heatmap",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))",
"legendFormat": "95th Percentile"
}
]
},
{
"title": "Error Rates",
"type": "graph",
"targets": [
{
"expr": "rate(http_requests_total{status=~\"5..\"}[5m])",
"legendFormat": "5xx Errors"
},
{
"expr": "rate(http_requests_total{status=~\"4..\"}[5m])",
"legendFormat": "4xx Errors"
}
]
},
{
"title": "Database Performance",
"type": "graph",
"targets": [
{
"expr": "rate(mongodb_operations_total[5m])",
"legendFormat": "Operations/sec"
},
{
"expr": "mongodb_connections_current",
"legendFormat": "Active Connections"
}
]
}
]
}
}
Alerting and Notifications¶
Alert Rules¶
# alerting-rules.yml
groups:
- name: federated_learning_alerts
rules:
- alert: HighErrorRate
expr: rate(http_requests_total{status=~"5.."}[5m]) > 0.1
for: 5m
labels:
severity: critical
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value }} errors per second"
- alert: TrainingJobStuck
expr: increase(training_round_completed_total[30m]) == 0
for: 30m
labels:
severity: warning
annotations:
summary: "Training job appears stuck"
description: "No training rounds completed in 30 minutes"
- alert: ClientDropout
expr: fl_clients_connected < fl_clients_expected * 0.5
for: 10m
labels:
severity: warning
annotations:
summary: "High client dropout rate"
description: "Only {{ $value }} clients connected out of {{ $labels.expected }}"
- alert: SystemResourceHigh
expr: cpu_usage_percent > 90 or memory_usage_percent > 90
for: 15m
labels:
severity: critical
annotations:
summary: "High system resource usage"
description: "{{ $labels.instance }} resource usage is {{ $value }}%"
- name: infrastructure_alerts
rules:
- alert: ServiceDown
expr: up == 0
for: 5m
labels:
severity: critical
annotations:
summary: "Service is down"
description: "{{ $labels.instance }} has been down for more than 5 minutes"
- alert: DatabaseConnectionsHigh
expr: mongodb_connections_current > mongodb_connections_available * 0.8
for: 10m
labels:
severity: warning
annotations:
summary: "High database connection usage"
description: "MongoDB connections at {{ $value }} ({{ $labels.available }} available)"
Notification Channels¶
# alertmanager.yml
global:
smtp_smarthost: 'smtp.gmail.com:587'
smtp_from: 'alerts@yourcompany.com'
route:
group_by: ['alertname']
group_wait: 10s
group_interval: 10s
repeat_interval: 1h
receiver: 'web.hook'
routes:
- match:
severity: critical
receiver: 'critical-alerts'
- match:
severity: warning
receiver: 'warning-alerts'
receivers:
- name: 'web.hook'
webhook_configs:
- url: 'http://webhook-service:5000/alerts'
- name: 'critical-alerts'
email_configs:
- to: 'oncall@yourcompany.com'
subject: 'CRITICAL: {{ .GroupLabels.alertname }}'
body: |
{{ range .Alerts }}
Alert: {{ .Annotations.summary }}
Description: {{ .Annotations.description }}
{{ end }}
slack_configs:
- api_url: 'YOUR_SLACK_WEBHOOK_URL'
channel: '#alerts-critical'
title: 'Critical Alert'
text: '{{ range .Alerts }}{{ .Annotations.summary }}{{ end }}'
- name: 'warning-alerts'
email_configs:
- to: 'team@yourcompany.com'
subject: 'WARNING: {{ .GroupLabels.alertname }}'
body: |
{{ range .Alerts }}
Alert: {{ .Annotations.summary }}
Description: {{ .Annotations.description }}
{{ end }}
Performance Monitoring¶
SLA/SLO Definitions¶
# Service Level Objectives
slos:
api_availability:
target: 99.9%
measurement: "up == 1"
window: "30d"
api_latency:
target: 95%
measurement: "histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) < 0.2"
window: "7d"
training_success_rate:
target: 95%
measurement: "rate(training_jobs_completed_total[1h]) / rate(training_jobs_started_total[1h]) * 100"
window: "24h"
client_participation:
target: 80%
measurement: "avg(fl_clients_connected) / avg(fl_clients_expected) * 100"
window: "1h"
Custom Metrics Collection¶
# Custom metrics for federated learning
from prometheus_client import Counter, Histogram, Gauge, start_http_server
# Training metrics
training_rounds_total = Counter(
'training_rounds_total',
'Total number of training rounds completed',
['job_id', 'model_type']
)
training_accuracy = Gauge(
'training_accuracy',
'Current training accuracy',
['job_id', 'round']
)
client_training_duration = Histogram(
'client_training_duration_seconds',
'Time spent training on client',
['client_id', 'model_type'],
buckets=[1, 5, 10, 30, 60, 120, 300, 600]
)
model_size_bytes = Gauge(
'model_size_bytes',
'Size of model in bytes',
['model_type', 'compression']
)
# Usage in application
class MetricsCollector:
def record_training_round(self, job_id: str, model_type: str, accuracy: float, round_num: int):
training_rounds_total.labels(job_id=job_id, model_type=model_type).inc()
training_accuracy.labels(job_id=job_id, round=round_num).set(accuracy)
def record_client_training(self, client_id: str, model_type: str, duration: float):
client_training_duration.labels(client_id=client_id, model_type=model_type).observe(duration)
def record_model_size(self, model_type: str, compression: str, size: int):
model_size_bytes.labels(model_type=model_type, compression=compression).set(size)
Next: Continue to OpenTelemetry for detailed instrumentation and configuration.