Skip to content

Backend Technologies

This document provides detailed information about the backend technologies used in the Federated Learning Platform, including frameworks, libraries, databases, and supporting tools.

FastAPI Framework

Overview

FastAPI is a modern, high-performance web framework for building APIs with Python 3.10+ based on standard Python type hints.

Key Features

  • High Performance: Comparable to NodeJS and Go
  • Fast to Code: Increase development speed by 200-300%
  • Fewer Bugs: Reduce human-induced errors by 40%
  • Intuitive: Great editor support with auto-completion
  • Easy: Designed to be easy to use and learn
  • Short: Minimize code duplication
  • Robust: Production-ready code with automatic interactive documentation

Architecture Implementation

# main.py - Application entry point
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import logging

app = FastAPI(
    title="Flower Training API",
    description="A comprehensive API for federated learning orchestration",
    version="1.2.0"
)

# CORS configuration
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:4000"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Include routers
from app.routes import auth, training, nodes, ansible
app.include_router(auth.router, prefix="/auth", tags=["authentication"])
app.include_router(training.router, prefix="/training", tags=["training"])
app.include_router(nodes.router, prefix="/nodes", tags=["nodes"])
app.include_router(ansible.router, prefix="/ansible", tags=["ansible"])

Dependency Injection System

# core/auth.py - Authentication dependency
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer
from jose import JWTError, jwt

security = HTTPBearer()

async def get_current_user(token: str = Depends(security)) -> User:
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

    try:
        payload = jwt.decode(token.credentials, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception

    user = await get_user_by_username(username)
    if user is None:
        raise credentials_exception
    return user

# Usage in routes
@app.get("/protected-endpoint")
async def protected_route(current_user: User = Depends(get_current_user)):
    return {"message": f"Hello {current_user.username}"}

Pydantic Models

# models/training.py - Data validation models
from pydantic import BaseModel, Field, validator
from typing import Optional, List
from datetime import datetime

class TrainingConfig(BaseModel):
    rounds: int = Field(..., ge=1, le=100, description="Number of training rounds")
    clients: int = Field(..., ge=2, le=1000, description="Number of client devices")
    model_type: str = Field(..., description="Type of ML model")
    learning_rate: float = Field(0.01, ge=0.001, le=1.0)
    batch_size: int = Field(32, ge=1, le=512)

    @validator('model_type')
    def validate_model_type(cls, v):
        allowed_types = ['cnn', 'resnet', 'mobilenet']
        if v not in allowed_types:
            raise ValueError(f'Model type must be one of {allowed_types}')
        return v

class TrainingJob(BaseModel):
    id: str
    config: TrainingConfig
    status: str
    created_at: datetime
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    metrics: Optional[dict] = None

    class Config:
        json_encoders = {
            datetime: lambda v: v.isoformat()
        }

Async/Await Implementation

# services/training_service.py - Asynchronous service
import asyncio
from typing import List, Optional

class TrainingService:
    def __init__(self):
        self.active_jobs = {}

    async def start_training(self, config: TrainingConfig) -> TrainingJob:
        job_id = generate_job_id()
        job = TrainingJob(
            id=job_id,
            config=config,
            status="initializing",
            created_at=datetime.utcnow()
        )

        # Store job in database
        await self.db.training_jobs.insert_one(job.dict())

        # Start training asynchronously
        asyncio.create_task(self._execute_training(job))

        return job

    async def _execute_training(self, job: TrainingJob):
        try:
            job.status = "running"
            job.started_at = datetime.utcnow()
            await self.db.training_jobs.update_one(
                {"id": job.id},
                {"$set": job.dict()}
            )

            # Execute federated learning
            result = await self._run_federated_learning(job.config)

            job.status = "completed"
            job.completed_at = datetime.utcnow()
            job.metrics = result

        except Exception as e:
            job.status = "failed"
            job.error = str(e)

        finally:
            await self.db.training_jobs.update_one(
                {"id": job.id},
                {"$set": job.dict()}
            )

MongoDB Database

Database Architecture

graph TB
    subgraph "MongoDB Cluster"
        PRIMARY[Primary Node<br/>Read/Write]
        SECONDARY1[Secondary Node 1<br/>Read Replica]
        SECONDARY2[Secondary Node 2<br/>Read Replica]
    end

    subgraph "Collections"
        USERS[users<br/>User Management]
        PROJECTS[projects<br/>Project Data]
        CONFIGS[configs<br/>ML Configurations]
        JOBS[training_jobs<br/>Training History]
        ANSIBLE_JOBS[ansible_jobs<br/>Deployment Jobs]
    end

    subgraph "Indexes"
        USER_IDX[username_email_idx]
        PROJECT_IDX[owner_created_idx]
        JOB_IDX[status_created_idx]
        CONFIG_IDX[project_type_idx]
    end

    PRIMARY --> USERS
    PRIMARY --> PROJECTS
    PRIMARY --> CONFIGS
    PRIMARY --> JOBS
    PRIMARY --> ANSIBLE_JOBS

    USERS --> USER_IDX
    PROJECTS --> PROJECT_IDX
    JOBS --> JOB_IDX
    CONFIGS --> CONFIG_IDX

Motor Async Driver

# database/mongodb.py - Async MongoDB connection
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo.errors import ConnectionFailure
import logging

class MongoDB:
    def __init__(self):
        self.client = None
        self.database = None

    async def connect(self, connection_string: str, database_name: str):
        try:
            self.client = AsyncIOMotorClient(connection_string)
            self.database = self.client[database_name]

            # Test connection
            await self.client.admin.command('ping')
            logging.info("Connected to MongoDB successfully")

        except ConnectionFailure as e:
            logging.error(f"Failed to connect to MongoDB: {e}")
            raise

    async def close(self):
        if self.client:
            self.client.close()

    # Collection accessors
    @property
    def users(self):
        return self.database.users

    @property
    def projects(self):
        return self.database.projects

    @property
    def training_jobs(self):
        return self.database.training_jobs

    @property
    def configs(self):
        return self.database.configs

# Usage in services
class UserService:
    def __init__(self, db: MongoDB):
        self.db = db

    async def create_user(self, user_data: UserCreate) -> User:
        # Check if user exists
        existing_user = await self.db.users.find_one({
            "$or": [
                {"username": user_data.username},
                {"email": user_data.email}
            ]
        })

        if existing_user:
            raise HTTPException(
                status_code=400,
                detail="User already exists"
            )

        # Hash password
        hashed_password = get_password_hash(user_data.password)

        # Create user document
        user_doc = {
            "username": user_data.username,
            "email": user_data.email,
            "password_hash": hashed_password,
            "created_at": datetime.utcnow(),
            "is_active": True,
            "roles": ["user"]
        }

        # Insert user
        result = await self.db.users.insert_one(user_doc)
        user_doc["_id"] = result.inserted_id

        return User(**user_doc)

Database Indexing Strategy

# database/indexes.py - Database index creation
async def create_indexes(db: MongoDB):
    # Users collection indexes
    await db.users.create_index([
        ("username", 1),
        ("email", 1)
    ], unique=True, name="username_email_unique")

    await db.users.create_index("created_at", name="users_created_at")

    # Projects collection indexes
    await db.projects.create_index([
        ("owner_id", 1),
        ("created_at", -1)
    ], name="owner_created_idx")

    await db.projects.create_index("name", name="project_name_idx")

    # Training jobs indexes
    await db.training_jobs.create_index([
        ("status", 1),
        ("created_at", -1)
    ], name="status_created_idx")

    await db.training_jobs.create_index("user_id", name="jobs_user_idx")

    # Configs collection indexes
    await db.configs.create_index([
        ("project_id", 1),
        ("type", 1)
    ], name="project_type_idx")

Authentication & Security

JWT Implementation

# core/security.py - JWT token management
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext

SECRET_KEY = "your-secret-key"  # Use environment variable in production
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password: str) -> str:
    return pwd_context.hash(password)

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)

    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

def verify_token(token: str) -> dict:
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload
    except JWTError:
        return None

Password Security

# services/auth_service.py - Authentication service
from passlib.context import CryptContext
from passlib.hash import bcrypt

class AuthService:
    def __init__(self):
        self.pwd_context = CryptContext(
            schemes=["bcrypt"],
            deprecated="auto",
            bcrypt__rounds=12  # Increased rounds for better security
        )

    async def authenticate_user(self, username: str, password: str) -> Optional[User]:
        user = await self.get_user_by_username(username)
        if not user:
            return None

        if not self.verify_password(password, user.password_hash):
            return None

        return user

    def verify_password(self, plain_password: str, hashed_password: str) -> bool:
        return self.pwd_context.verify(plain_password, hashed_password)

    def get_password_hash(self, password: str) -> str:
        return self.pwd_context.hash(password)

    def validate_password_strength(self, password: str) -> bool:
        # Password strength validation
        if len(password) < 8:
            return False

        has_upper = any(c.isupper() for c in password)
        has_lower = any(c.islower() for c in password)
        has_digit = any(c.isdigit() for c in password)
        has_special = any(c in "!@#$%^&*()_+-=[]{}|;:,.<>?" for c in password)

        return all([has_upper, has_lower, has_digit, has_special])

WebSocket Implementation

WebSocket Manager

# services/websocket_service.py - WebSocket connection management
from fastapi import WebSocket, WebSocketDisconnect
from typing import List, Dict
import json
import asyncio

class WebSocketManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []
        self.user_connections: Dict[str, List[WebSocket]] = {}

    async def connect(self, websocket: WebSocket, user_id: str):
        await websocket.accept()
        self.active_connections.append(websocket)

        if user_id not in self.user_connections:
            self.user_connections[user_id] = []
        self.user_connections[user_id].append(websocket)

    def disconnect(self, websocket: WebSocket, user_id: str):
        self.active_connections.remove(websocket)
        if user_id in self.user_connections:
            self.user_connections[user_id].remove(websocket)
            if not self.user_connections[user_id]:
                del self.user_connections[user_id]

    async def send_personal_message(self, message: str, user_id: str):
        if user_id in self.user_connections:
            for connection in self.user_connections[user_id]:
                try:
                    await connection.send_text(message)
                except:
                    # Connection is broken, remove it
                    self.disconnect(connection, user_id)

    async def broadcast(self, message: str):
        for connection in self.active_connections.copy():
            try:
                await connection.send_text(message)
            except:
                self.active_connections.remove(connection)

# WebSocket endpoint
@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
    await manager.connect(websocket, user_id)
    try:
        while True:
            data = await websocket.receive_text()
            # Handle incoming messages
            await handle_websocket_message(data, user_id)
    except WebSocketDisconnect:
        manager.disconnect(websocket, user_id)

File Upload and Processing

File Upload Handler

# services/file_service.py - File upload and processing
from fastapi import UploadFile, HTTPException
import zipfile
import tempfile
import os
import shutil

class FileService:
    def __init__(self, upload_dir: str = "/app/uploads"):
        self.upload_dir = upload_dir
        os.makedirs(upload_dir, exist_ok=True)

    async def upload_ml_project(self, file: UploadFile, user_id: str) -> dict:
        # Validate file type
        if not file.filename.endswith('.zip'):
            raise HTTPException(
                status_code=400,
                detail="Only ZIP files are allowed"
            )

        # Create temporary file
        with tempfile.NamedTemporaryFile(delete=False, suffix='.zip') as temp_file:
            content = await file.read()
            temp_file.write(content)
            temp_path = temp_file.name

        try:
            # Validate ZIP structure
            await self._validate_ml_project_structure(temp_path)

            # Extract to permanent location
            project_dir = os.path.join(self.upload_dir, user_id, file.filename[:-4])
            os.makedirs(project_dir, exist_ok=True)

            with zipfile.ZipFile(temp_path, 'r') as zip_ref:
                zip_ref.extractall(project_dir)

            # Parse configuration
            config = await self._parse_project_config(project_dir)

            return {
                "project_path": project_dir,
                "config": config,
                "status": "uploaded"
            }

        finally:
            # Clean up temporary file
            os.unlink(temp_path)

    async def _validate_ml_project_structure(self, zip_path: str):
        required_files = ['pyproject.toml', 'client_app.py', 'server_app.py']

        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            file_list = zip_ref.namelist()

            for required_file in required_files:
                if not any(required_file in f for f in file_list):
                    raise HTTPException(
                        status_code=400,
                        detail=f"Missing required file: {required_file}"
                    )

Background Tasks and Async Processing

Celery Alternative with FastAPI

# services/background_tasks.py - Background task processing
from fastapi import BackgroundTasks
import asyncio
from typing import Callable, Any

class TaskManager:
    def __init__(self):
        self.running_tasks = {}

    async def run_background_task(
        self,
        task_id: str,
        func: Callable,
        *args,
        **kwargs
    ):
        task = asyncio.create_task(func(*args, **kwargs))
        self.running_tasks[task_id] = task

        try:
            result = await task
            return result
        except Exception as e:
            logging.error(f"Task {task_id} failed: {e}")
            raise
        finally:
            if task_id in self.running_tasks:
                del self.running_tasks[task_id]

    def get_task_status(self, task_id: str) -> str:
        if task_id not in self.running_tasks:
            return "not_found"

        task = self.running_tasks[task_id]
        if task.done():
            if task.exception():
                return "failed"
            return "completed"
        return "running"

# Usage in routes
@app.post("/training/start")
async def start_training(
    config: TrainingConfig,
    background_tasks: BackgroundTasks,
    current_user: User = Depends(get_current_user)
):
    job_id = generate_job_id()

    # Add background task
    background_tasks.add_task(
        execute_training_job,
        job_id,
        config,
        current_user.id
    )

    return {"job_id": job_id, "status": "started"}

Error Handling and Logging

Custom Exception Handlers

# core/exceptions.py - Custom exception handling
from fastapi import HTTPException, Request
from fastapi.responses import JSONResponse
import logging

class FederatedLearningException(Exception):
    def __init__(self, message: str, error_code: str = None):
        self.message = message
        self.error_code = error_code
        super().__init__(self.message)

class TrainingException(FederatedLearningException):
    pass

class ConfigurationException(FederatedLearningException):
    pass

# Exception handlers
@app.exception_handler(FederatedLearningException)
async def fl_exception_handler(request: Request, exc: FederatedLearningException):
    logging.error(f"FL Exception: {exc.message}")
    return JSONResponse(
        status_code=400,
        content={
            "error": exc.message,
            "error_code": exc.error_code,
            "type": "federated_learning_error"
        }
    )

@app.exception_handler(500)
async def internal_server_error_handler(request: Request, exc: Exception):
    logging.error(f"Internal server error: {exc}")
    return JSONResponse(
        status_code=500,
        content={
            "error": "Internal server error",
            "message": "An unexpected error occurred"
        }
    )

Structured Logging

# core/logging.py - Structured logging configuration
import logging
import json
from datetime import datetime

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
            "line": record.lineno
        }

        if hasattr(record, 'user_id'):
            log_entry["user_id"] = record.user_id

        if hasattr(record, 'request_id'):
            log_entry["request_id"] = record.request_id

        return json.dumps(log_entry)

# Configure logging
def setup_logging():
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    handler = logging.StreamHandler()
    handler.setFormatter(JSONFormatter())
    logger.addHandler(handler)

    # Reduce noise from external libraries
    logging.getLogger("httpx").setLevel(logging.WARNING)
    logging.getLogger("httpcore").setLevel(logging.WARNING)

Next: Continue to Frontend Technologies for detailed frontend technology specifications.