Skip to content

Observability Overview

The Federated Learning Platform implements comprehensive observability using modern tools and practices to ensure system reliability, performance monitoring, and effective troubleshooting across distributed federated learning environments.

Observability Stack

graph TB
    subgraph "Application Layer"
        FRONTEND[Next.js Frontend<br/>Browser Telemetry]
        BACKEND[FastAPI Backend<br/>API Telemetry]
        FL_SERVER[FL Aggregator<br/>Training Telemetry]
        FL_CLIENTS[FL Clients<br/>Distributed Telemetry]
    end

    subgraph "Collection Layer"
        OTEL_COLLECTOR[OpenTelemetry Collector<br/>Unified Telemetry Pipeline]

        subgraph "Receivers"
            OTLP_GRPC[OTLP gRPC<br/>Port 4317]
            OTLP_HTTP[OTLP HTTP<br/>Port 4318]
            PROMETHEUS_RECEIVER[Prometheus<br/>Metrics Scraping]
        end

        subgraph "Processors"
            BATCH_PROCESSOR[Batch Processor<br/>Performance Optimization]
            MEMORY_LIMITER[Memory Limiter<br/>Resource Protection]
            ATTRIBUTE_PROCESSOR[Attribute Processor<br/>Data Enrichment]
        end

        subgraph "Exporters"
            TEMPO_EXPORTER[Tempo Exporter<br/>Distributed Tracing]
            GRAFANA_EXPORTER[Grafana Exporter<br/>Metrics & Logs]
            DEBUG_EXPORTER[Debug Exporter<br/>Development]
        end
    end

    subgraph "Storage Layer"
        TEMPO[Grafana Tempo<br/>Trace Storage]
        PROMETHEUS[Prometheus<br/>Metrics Storage]
        LOKI[Grafana Loki<br/>Log Aggregation]
    end

    subgraph "Visualization Layer"
        GRAFANA[Grafana<br/>Dashboards & Alerts]
        ALERTMANAGER[AlertManager<br/>Notification Routing]
    end

    subgraph "Analysis Layer"
        JAEGER_UI[Jaeger UI<br/>Trace Analysis]
        CUSTOM_DASHBOARDS[Custom Dashboards<br/>FL Metrics]
        SLA_MONITORING[SLA Monitoring<br/>Performance Tracking]
    end

    FRONTEND --> OTLP_HTTP
    BACKEND --> OTLP_GRPC
    FL_SERVER --> OTLP_GRPC
    FL_CLIENTS --> OTLP_GRPC

    OTLP_GRPC --> OTEL_COLLECTOR
    OTLP_HTTP --> OTEL_COLLECTOR
    PROMETHEUS_RECEIVER --> OTEL_COLLECTOR

    OTEL_COLLECTOR --> BATCH_PROCESSOR
    BATCH_PROCESSOR --> MEMORY_LIMITER
    MEMORY_LIMITER --> ATTRIBUTE_PROCESSOR

    ATTRIBUTE_PROCESSOR --> TEMPO_EXPORTER
    ATTRIBUTE_PROCESSOR --> GRAFANA_EXPORTER
    ATTRIBUTE_PROCESSOR --> DEBUG_EXPORTER

    TEMPO_EXPORTER --> TEMPO
    GRAFANA_EXPORTER --> PROMETHEUS
    GRAFANA_EXPORTER --> LOKI

    TEMPO --> GRAFANA
    PROMETHEUS --> GRAFANA
    LOKI --> GRAFANA

    GRAFANA --> ALERTMANAGER
    GRAFANA --> JAEGER_UI
    GRAFANA --> CUSTOM_DASHBOARDS
    GRAFANA --> SLA_MONITORING

Three Pillars of Observability

1. Metrics

Quantitative measurements of system behavior over time.

System Metrics

  • CPU Usage: Processor utilization across all nodes
  • Memory Usage: RAM consumption and allocation patterns
  • Disk I/O: Storage read/write operations and latency
  • Network I/O: Bandwidth utilization and packet loss

Application Metrics

  • Request Rate: API requests per second
  • Response Time: Request processing latency (p50, p95, p99)
  • Error Rate: Failed requests percentage
  • Throughput: Successful operations per unit time

Federated Learning Metrics

  • Training Progress: Rounds completed, accuracy improvements
  • Client Participation: Active clients, dropout rates
  • Model Performance: Loss reduction, convergence metrics
  • Communication Overhead: Data transfer volumes, compression ratios

2. Traces

Distributed request tracking across service boundaries.

Trace Structure

graph LR
    subgraph "Training Request Trace"
        SPAN1[Frontend Request<br/>user-action]
        SPAN2[Backend Processing<br/>training-start]
        SPAN3[Flower Initialization<br/>fl-setup]
        SPAN4[Client Deployment<br/>ansible-deploy]
        SPAN5[Model Training<br/>local-training]
        SPAN6[Aggregation<br/>model-aggregate]
    end

    SPAN1 --> SPAN2
    SPAN2 --> SPAN3
    SPAN3 --> SPAN4
    SPAN4 --> SPAN5
    SPAN5 --> SPAN6

Key Trace Scenarios

  • User Authentication Flow: Login → Token validation → Session creation
  • Training Job Lifecycle: Initialization → Client deployment → Training rounds → Completion
  • Model Aggregation: Client updates → Validation → Aggregation → Distribution
  • Error Propagation: Failure detection → Error handling → Recovery actions

3. Logs

Structured event records with contextual information.

Log Levels and Usage

# Structured logging implementation
import logging
import json
from datetime import datetime

class StructuredLogger:
    def __init__(self, service_name: str):
        self.service_name = service_name
        self.logger = logging.getLogger(service_name)

        # Configure JSON formatter
        handler = logging.StreamHandler()
        handler.setFormatter(self.JSONFormatter())
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)

    class JSONFormatter(logging.Formatter):
        def format(self, record):
            log_entry = {
                "timestamp": datetime.utcnow().isoformat(),
                "level": record.levelname,
                "service": record.name,
                "message": record.getMessage(),
                "module": record.module,
                "function": record.funcName,
                "line": record.lineno
            }

            # Add trace context if available
            if hasattr(record, 'trace_id'):
                log_entry["trace_id"] = record.trace_id
            if hasattr(record, 'span_id'):
                log_entry["span_id"] = record.span_id

            # Add custom attributes
            if hasattr(record, 'user_id'):
                log_entry["user_id"] = record.user_id
            if hasattr(record, 'job_id'):
                log_entry["job_id"] = record.job_id

            return json.dumps(log_entry)

    def info(self, message: str, **kwargs):
        extra = {k: v for k, v in kwargs.items()}
        self.logger.info(message, extra=extra)

    def error(self, message: str, **kwargs):
        extra = {k: v for k, v in kwargs.items()}
        self.logger.error(message, extra=extra)

    def warning(self, message: str, **kwargs):
        extra = {k: v for k, v in kwargs.items()}
        self.logger.warning(message, extra=extra)

# Usage example
logger = StructuredLogger("fl-backend")
logger.info(
    "Training job started",
    user_id="user123",
    job_id="job456",
    trace_id="trace789"
)

OpenTelemetry Implementation

Instrumentation Setup

Backend Instrumentation

# OpenTelemetry setup for FastAPI backend
from opentelemetry import trace, metrics
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.pymongo import PymongoInstrumentor

def setup_telemetry():
    """Configure OpenTelemetry for the backend service."""

    # Trace provider setup
    trace.set_tracer_provider(TracerProvider(
        resource=Resource.create({
            "service.name": "fl-backend",
            "service.version": "1.2.0",
            "deployment.environment": os.getenv("ENVIRONMENT", "development")
        })
    ))

    # Trace exporter
    trace_exporter = OTLPSpanExporter(
        endpoint="http://otel-collector:4317",
        insecure=True
    )

    span_processor = BatchSpanProcessor(trace_exporter)
    trace.get_tracer_provider().add_span_processor(span_processor)

    # Metrics provider setup
    metric_reader = PeriodicExportingMetricReader(
        OTLPMetricExporter(
            endpoint="http://otel-collector:4317",
            insecure=True
        ),
        export_interval_millis=5000
    )

    metrics.set_meter_provider(MeterProvider(
        resource=Resource.create({
            "service.name": "fl-backend",
            "service.version": "1.2.0"
        }),
        metric_readers=[metric_reader]
    ))

    # Auto-instrumentation
    FastAPIInstrumentor.instrument_app(app)
    PymongoInstrumentor().instrument()

# Custom metrics
meter = metrics.get_meter(__name__)

# Define custom metrics
training_jobs_counter = meter.create_counter(
    "training_jobs_total",
    description="Total number of training jobs started"
)

active_clients_gauge = meter.create_up_down_counter(
    "active_clients",
    description="Number of active federated learning clients"
)

training_duration_histogram = meter.create_histogram(
    "training_duration_seconds",
    description="Duration of training rounds in seconds"
)

# Usage in application code
@app.post("/training/start")
async def start_training(config: TrainingConfig):
    tracer = trace.get_tracer(__name__)

    with tracer.start_as_current_span("start_training") as span:
        span.set_attribute("training.rounds", config.rounds)
        span.set_attribute("training.clients", config.clients)
        span.set_attribute("training.model_type", config.model_type)

        # Increment counter
        training_jobs_counter.add(1, {"model_type": config.model_type})

        # Start training logic
        job = await training_service.start_training(config)

        span.set_attribute("job.id", job.id)
        span.set_status(Status(StatusCode.OK))

        return job

Federated Learning Instrumentation

# FL client instrumentation
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

class InstrumentedFlowerClient(FlowerClient):
    """Flower client with OpenTelemetry instrumentation."""

    def __init__(self, model, train_loader, test_loader, device, client_id):
        super().__init__(model, train_loader, test_loader, device)
        self.client_id = client_id
        self.tracer = trace.get_tracer(__name__)

        # Setup telemetry
        self.setup_telemetry()

    def setup_telemetry(self):
        """Configure OpenTelemetry for FL client."""
        trace.set_tracer_provider(TracerProvider(
            resource=Resource.create({
                "service.name": "fl-client",
                "service.instance.id": self.client_id,
                "service.version": "1.2.0"
            })
        ))

        trace_exporter = OTLPSpanExporter(
            endpoint="http://otel-collector:4317",
            insecure=True
        )

        span_processor = BatchSpanProcessor(trace_exporter)
        trace.get_tracer_provider().add_span_processor(span_processor)

    def fit(self, parameters, config):
        """Instrumented training method."""
        with self.tracer.start_as_current_span("client_training") as span:
            span.set_attribute("client.id", self.client_id)
            span.set_attribute("training.round", config.get("round", 0))
            span.set_attribute("training.epochs", config.get("local_epochs", 1))

            start_time = time.time()

            # Perform training
            result = super().fit(parameters, config)

            duration = time.time() - start_time
            span.set_attribute("training.duration_seconds", duration)
            span.set_attribute("training.loss", result[2].get("loss", 0))

            return result

    def evaluate(self, parameters, config):
        """Instrumented evaluation method."""
        with self.tracer.start_as_current_span("client_evaluation") as span:
            span.set_attribute("client.id", self.client_id)
            span.set_attribute("evaluation.round", config.get("round", 0))

            start_time = time.time()

            # Perform evaluation
            loss, num_examples, metrics = super().evaluate(parameters, config)

            duration = time.time() - start_time
            span.set_attribute("evaluation.duration_seconds", duration)
            span.set_attribute("evaluation.loss", loss)
            span.set_attribute("evaluation.accuracy", metrics.get("accuracy", 0))
            span.set_attribute("evaluation.examples", num_examples)

            return loss, num_examples, metrics

Collector Configuration

# otel-collector-config.yaml
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: "0.0.0.0:4317"
      http:
        endpoint: "0.0.0.0:4318"

  prometheus:
    config:
      scrape_configs:
        - job_name: 'fl-backend'
          static_configs:
            - targets: ['backend-fastapi:8001']
        - job_name: 'fl-clients'
          static_configs:
            - targets: ['client1:8082', 'client2:8083']

processors:
  batch:
    timeout: 1s
    send_batch_size: 1024
    send_batch_max_size: 2048

  memory_limiter:
    check_interval: 1s
    limit_mib: 1000
    spike_limit_mib: 200

  attributes:
    actions:
      - key: environment
        value: production
        action: insert
      - key: cluster
        value: fl-cluster
        action: insert

  resource:
    attributes:
      - key: service.namespace
        value: federated-learning
        action: insert

exporters:
  debug:
    verbosity: detailed

  otlp/tempo:
    endpoint: tempo:4317
    tls:
      insecure: true

  otlphttp/grafana:
    endpoint: http://grafana:3000/api/otel/v1/metrics
    tls:
      insecure: true

  prometheus:
    endpoint: "0.0.0.0:8889"
    namespace: federated_learning

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [memory_limiter, batch, attributes, resource]
      exporters: [otlp/tempo, debug]

    metrics:
      receivers: [otlp, prometheus]
      processors: [memory_limiter, batch, attributes, resource]
      exporters: [otlphttp/grafana, prometheus, debug]

    logs:
      receivers: [otlp]
      processors: [memory_limiter, batch, attributes, resource]
      exporters: [debug]

  extensions: [health_check, pprof, zpages]

Grafana Dashboards

Federated Learning Dashboard

{
  "dashboard": {
    "title": "Federated Learning Overview",
    "tags": ["federated-learning", "ml", "training"],
    "panels": [
      {
        "title": "Active Training Jobs",
        "type": "stat",
        "targets": [
          {
            "expr": "sum(training_jobs_active)",
            "legendFormat": "Active Jobs"
          }
        ]
      },
      {
        "title": "Client Participation Rate",
        "type": "gauge",
        "targets": [
          {
            "expr": "sum(fl_clients_connected) / sum(fl_clients_total) * 100",
            "legendFormat": "Participation %"
          }
        ]
      },
      {
        "title": "Training Progress",
        "type": "graph",
        "targets": [
          {
            "expr": "avg(training_accuracy) by (job_id)",
            "legendFormat": "Accuracy - {{job_id}}"
          },
          {
            "expr": "avg(training_loss) by (job_id)",
            "legendFormat": "Loss - {{job_id}}"
          }
        ]
      },
      {
        "title": "System Resource Usage",
        "type": "graph",
        "targets": [
          {
            "expr": "avg(cpu_usage_percent) by (instance)",
            "legendFormat": "CPU - {{instance}}"
          },
          {
            "expr": "avg(memory_usage_percent) by (instance)",
            "legendFormat": "Memory - {{instance}}"
          }
        ]
      }
    ]
  }
}

Performance Monitoring Dashboard

{
  "dashboard": {
    "title": "System Performance",
    "panels": [
      {
        "title": "API Response Times",
        "type": "heatmap",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))",
            "legendFormat": "95th Percentile"
          }
        ]
      },
      {
        "title": "Error Rates",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(http_requests_total{status=~\"5..\"}[5m])",
            "legendFormat": "5xx Errors"
          },
          {
            "expr": "rate(http_requests_total{status=~\"4..\"}[5m])",
            "legendFormat": "4xx Errors"
          }
        ]
      },
      {
        "title": "Database Performance",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(mongodb_operations_total[5m])",
            "legendFormat": "Operations/sec"
          },
          {
            "expr": "mongodb_connections_current",
            "legendFormat": "Active Connections"
          }
        ]
      }
    ]
  }
}

Alerting and Notifications

Alert Rules

# alerting-rules.yml
groups:
  - name: federated_learning_alerts
    rules:
      - alert: HighErrorRate
        expr: rate(http_requests_total{status=~"5.."}[5m]) > 0.1
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "High error rate detected"
          description: "Error rate is {{ $value }} errors per second"

      - alert: TrainingJobStuck
        expr: increase(training_round_completed_total[30m]) == 0
        for: 30m
        labels:
          severity: warning
        annotations:
          summary: "Training job appears stuck"
          description: "No training rounds completed in 30 minutes"

      - alert: ClientDropout
        expr: fl_clients_connected < fl_clients_expected * 0.5
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "High client dropout rate"
          description: "Only {{ $value }} clients connected out of {{ $labels.expected }}"

      - alert: SystemResourceHigh
        expr: cpu_usage_percent > 90 or memory_usage_percent > 90
        for: 15m
        labels:
          severity: critical
        annotations:
          summary: "High system resource usage"
          description: "{{ $labels.instance }} resource usage is {{ $value }}%"

  - name: infrastructure_alerts
    rules:
      - alert: ServiceDown
        expr: up == 0
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Service is down"
          description: "{{ $labels.instance }} has been down for more than 5 minutes"

      - alert: DatabaseConnectionsHigh
        expr: mongodb_connections_current > mongodb_connections_available * 0.8
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "High database connection usage"
          description: "MongoDB connections at {{ $value }} ({{ $labels.available }} available)"

Notification Channels

# alertmanager.yml
global:
  smtp_smarthost: 'smtp.gmail.com:587'
  smtp_from: 'alerts@yourcompany.com'

route:
  group_by: ['alertname']
  group_wait: 10s
  group_interval: 10s
  repeat_interval: 1h
  receiver: 'web.hook'
  routes:
    - match:
        severity: critical
      receiver: 'critical-alerts'
    - match:
        severity: warning
      receiver: 'warning-alerts'

receivers:
  - name: 'web.hook'
    webhook_configs:
      - url: 'http://webhook-service:5000/alerts'

  - name: 'critical-alerts'
    email_configs:
      - to: 'oncall@yourcompany.com'
        subject: 'CRITICAL: {{ .GroupLabels.alertname }}'
        body: |
          {{ range .Alerts }}
          Alert: {{ .Annotations.summary }}
          Description: {{ .Annotations.description }}
          {{ end }}
    slack_configs:
      - api_url: 'YOUR_SLACK_WEBHOOK_URL'
        channel: '#alerts-critical'
        title: 'Critical Alert'
        text: '{{ range .Alerts }}{{ .Annotations.summary }}{{ end }}'

  - name: 'warning-alerts'
    email_configs:
      - to: 'team@yourcompany.com'
        subject: 'WARNING: {{ .GroupLabels.alertname }}'
        body: |
          {{ range .Alerts }}
          Alert: {{ .Annotations.summary }}
          Description: {{ .Annotations.description }}
          {{ end }}

Performance Monitoring

SLA/SLO Definitions

# Service Level Objectives
slos:
  api_availability:
    target: 99.9%
    measurement: "up == 1"
    window: "30d"

  api_latency:
    target: 95%
    measurement: "histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) < 0.2"
    window: "7d"

  training_success_rate:
    target: 95%
    measurement: "rate(training_jobs_completed_total[1h]) / rate(training_jobs_started_total[1h]) * 100"
    window: "24h"

  client_participation:
    target: 80%
    measurement: "avg(fl_clients_connected) / avg(fl_clients_expected) * 100"
    window: "1h"

Custom Metrics Collection

# Custom metrics for federated learning
from prometheus_client import Counter, Histogram, Gauge, start_http_server

# Training metrics
training_rounds_total = Counter(
    'training_rounds_total',
    'Total number of training rounds completed',
    ['job_id', 'model_type']
)

training_accuracy = Gauge(
    'training_accuracy',
    'Current training accuracy',
    ['job_id', 'round']
)

client_training_duration = Histogram(
    'client_training_duration_seconds',
    'Time spent training on client',
    ['client_id', 'model_type'],
    buckets=[1, 5, 10, 30, 60, 120, 300, 600]
)

model_size_bytes = Gauge(
    'model_size_bytes',
    'Size of model in bytes',
    ['model_type', 'compression']
)

# Usage in application
class MetricsCollector:
    def record_training_round(self, job_id: str, model_type: str, accuracy: float, round_num: int):
        training_rounds_total.labels(job_id=job_id, model_type=model_type).inc()
        training_accuracy.labels(job_id=job_id, round=round_num).set(accuracy)

    def record_client_training(self, client_id: str, model_type: str, duration: float):
        client_training_duration.labels(client_id=client_id, model_type=model_type).observe(duration)

    def record_model_size(self, model_type: str, compression: str, size: int):
        model_size_bytes.labels(model_type=model_type, compression=compression).set(size)

Next: Continue to OpenTelemetry for detailed instrumentation and configuration.