Backend Technologies¶
This document provides detailed information about the backend technologies used in the Federated Learning Platform, including frameworks, libraries, databases, and supporting tools.
FastAPI Framework¶
Overview¶
FastAPI is a modern, high-performance web framework for building APIs with Python 3.10+ based on standard Python type hints.
Key Features¶
- High Performance: Comparable to NodeJS and Go
- Fast to Code: Increase development speed by 200-300%
- Fewer Bugs: Reduce human-induced errors by 40%
- Intuitive: Great editor support with auto-completion
- Easy: Designed to be easy to use and learn
- Short: Minimize code duplication
- Robust: Production-ready code with automatic interactive documentation
Architecture Implementation¶
# main.py - Application entry point
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import logging
app = FastAPI(
title="Flower Training API",
description="A comprehensive API for federated learning orchestration",
version="1.2.0"
)
# CORS configuration
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:4000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
from app.routes import auth, training, nodes, ansible
app.include_router(auth.router, prefix="/auth", tags=["authentication"])
app.include_router(training.router, prefix="/training", tags=["training"])
app.include_router(nodes.router, prefix="/nodes", tags=["nodes"])
app.include_router(ansible.router, prefix="/ansible", tags=["ansible"])
Dependency Injection System¶
# core/auth.py - Authentication dependency
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer
from jose import JWTError, jwt
security = HTTPBearer()
async def get_current_user(token: str = Depends(security)) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token.credentials, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = await get_user_by_username(username)
if user is None:
raise credentials_exception
return user
# Usage in routes
@app.get("/protected-endpoint")
async def protected_route(current_user: User = Depends(get_current_user)):
return {"message": f"Hello {current_user.username}"}
Pydantic Models¶
# models/training.py - Data validation models
from pydantic import BaseModel, Field, validator
from typing import Optional, List
from datetime import datetime
class TrainingConfig(BaseModel):
rounds: int = Field(..., ge=1, le=100, description="Number of training rounds")
clients: int = Field(..., ge=2, le=1000, description="Number of client devices")
model_type: str = Field(..., description="Type of ML model")
learning_rate: float = Field(0.01, ge=0.001, le=1.0)
batch_size: int = Field(32, ge=1, le=512)
@validator('model_type')
def validate_model_type(cls, v):
allowed_types = ['cnn', 'resnet', 'mobilenet']
if v not in allowed_types:
raise ValueError(f'Model type must be one of {allowed_types}')
return v
class TrainingJob(BaseModel):
id: str
config: TrainingConfig
status: str
created_at: datetime
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
metrics: Optional[dict] = None
class Config:
json_encoders = {
datetime: lambda v: v.isoformat()
}
Async/Await Implementation¶
# services/training_service.py - Asynchronous service
import asyncio
from typing import List, Optional
class TrainingService:
def __init__(self):
self.active_jobs = {}
async def start_training(self, config: TrainingConfig) -> TrainingJob:
job_id = generate_job_id()
job = TrainingJob(
id=job_id,
config=config,
status="initializing",
created_at=datetime.utcnow()
)
# Store job in database
await self.db.training_jobs.insert_one(job.dict())
# Start training asynchronously
asyncio.create_task(self._execute_training(job))
return job
async def _execute_training(self, job: TrainingJob):
try:
job.status = "running"
job.started_at = datetime.utcnow()
await self.db.training_jobs.update_one(
{"id": job.id},
{"$set": job.dict()}
)
# Execute federated learning
result = await self._run_federated_learning(job.config)
job.status = "completed"
job.completed_at = datetime.utcnow()
job.metrics = result
except Exception as e:
job.status = "failed"
job.error = str(e)
finally:
await self.db.training_jobs.update_one(
{"id": job.id},
{"$set": job.dict()}
)
MongoDB Database¶
Database Architecture¶
graph TB
subgraph "MongoDB Cluster"
PRIMARY[Primary Node<br/>Read/Write]
SECONDARY1[Secondary Node 1<br/>Read Replica]
SECONDARY2[Secondary Node 2<br/>Read Replica]
end
subgraph "Collections"
USERS[users<br/>User Management]
PROJECTS[projects<br/>Project Data]
CONFIGS[configs<br/>ML Configurations]
JOBS[training_jobs<br/>Training History]
ANSIBLE_JOBS[ansible_jobs<br/>Deployment Jobs]
end
subgraph "Indexes"
USER_IDX[username_email_idx]
PROJECT_IDX[owner_created_idx]
JOB_IDX[status_created_idx]
CONFIG_IDX[project_type_idx]
end
PRIMARY --> USERS
PRIMARY --> PROJECTS
PRIMARY --> CONFIGS
PRIMARY --> JOBS
PRIMARY --> ANSIBLE_JOBS
USERS --> USER_IDX
PROJECTS --> PROJECT_IDX
JOBS --> JOB_IDX
CONFIGS --> CONFIG_IDX
Motor Async Driver¶
# database/mongodb.py - Async MongoDB connection
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo.errors import ConnectionFailure
import logging
class MongoDB:
def __init__(self):
self.client = None
self.database = None
async def connect(self, connection_string: str, database_name: str):
try:
self.client = AsyncIOMotorClient(connection_string)
self.database = self.client[database_name]
# Test connection
await self.client.admin.command('ping')
logging.info("Connected to MongoDB successfully")
except ConnectionFailure as e:
logging.error(f"Failed to connect to MongoDB: {e}")
raise
async def close(self):
if self.client:
self.client.close()
# Collection accessors
@property
def users(self):
return self.database.users
@property
def projects(self):
return self.database.projects
@property
def training_jobs(self):
return self.database.training_jobs
@property
def configs(self):
return self.database.configs
# Usage in services
class UserService:
def __init__(self, db: MongoDB):
self.db = db
async def create_user(self, user_data: UserCreate) -> User:
# Check if user exists
existing_user = await self.db.users.find_one({
"$or": [
{"username": user_data.username},
{"email": user_data.email}
]
})
if existing_user:
raise HTTPException(
status_code=400,
detail="User already exists"
)
# Hash password
hashed_password = get_password_hash(user_data.password)
# Create user document
user_doc = {
"username": user_data.username,
"email": user_data.email,
"password_hash": hashed_password,
"created_at": datetime.utcnow(),
"is_active": True,
"roles": ["user"]
}
# Insert user
result = await self.db.users.insert_one(user_doc)
user_doc["_id"] = result.inserted_id
return User(**user_doc)
Database Indexing Strategy¶
# database/indexes.py - Database index creation
async def create_indexes(db: MongoDB):
# Users collection indexes
await db.users.create_index([
("username", 1),
("email", 1)
], unique=True, name="username_email_unique")
await db.users.create_index("created_at", name="users_created_at")
# Projects collection indexes
await db.projects.create_index([
("owner_id", 1),
("created_at", -1)
], name="owner_created_idx")
await db.projects.create_index("name", name="project_name_idx")
# Training jobs indexes
await db.training_jobs.create_index([
("status", 1),
("created_at", -1)
], name="status_created_idx")
await db.training_jobs.create_index("user_id", name="jobs_user_idx")
# Configs collection indexes
await db.configs.create_index([
("project_id", 1),
("type", 1)
], name="project_type_idx")
Authentication & Security¶
JWT Implementation¶
# core/security.py - JWT token management
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
SECRET_KEY = "your-secret-key" # Use environment variable in production
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(token: str) -> dict:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
return None
Password Security¶
# services/auth_service.py - Authentication service
from passlib.context import CryptContext
from passlib.hash import bcrypt
class AuthService:
def __init__(self):
self.pwd_context = CryptContext(
schemes=["bcrypt"],
deprecated="auto",
bcrypt__rounds=12 # Increased rounds for better security
)
async def authenticate_user(self, username: str, password: str) -> Optional[User]:
user = await self.get_user_by_username(username)
if not user:
return None
if not self.verify_password(password, user.password_hash):
return None
return user
def verify_password(self, plain_password: str, hashed_password: str) -> bool:
return self.pwd_context.verify(plain_password, hashed_password)
def get_password_hash(self, password: str) -> str:
return self.pwd_context.hash(password)
def validate_password_strength(self, password: str) -> bool:
# Password strength validation
if len(password) < 8:
return False
has_upper = any(c.isupper() for c in password)
has_lower = any(c.islower() for c in password)
has_digit = any(c.isdigit() for c in password)
has_special = any(c in "!@#$%^&*()_+-=[]{}|;:,.<>?" for c in password)
return all([has_upper, has_lower, has_digit, has_special])
WebSocket Implementation¶
WebSocket Manager¶
# services/websocket_service.py - WebSocket connection management
from fastapi import WebSocket, WebSocketDisconnect
from typing import List, Dict
import json
import asyncio
class WebSocketManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
self.user_connections: Dict[str, List[WebSocket]] = {}
async def connect(self, websocket: WebSocket, user_id: str):
await websocket.accept()
self.active_connections.append(websocket)
if user_id not in self.user_connections:
self.user_connections[user_id] = []
self.user_connections[user_id].append(websocket)
def disconnect(self, websocket: WebSocket, user_id: str):
self.active_connections.remove(websocket)
if user_id in self.user_connections:
self.user_connections[user_id].remove(websocket)
if not self.user_connections[user_id]:
del self.user_connections[user_id]
async def send_personal_message(self, message: str, user_id: str):
if user_id in self.user_connections:
for connection in self.user_connections[user_id]:
try:
await connection.send_text(message)
except:
# Connection is broken, remove it
self.disconnect(connection, user_id)
async def broadcast(self, message: str):
for connection in self.active_connections.copy():
try:
await connection.send_text(message)
except:
self.active_connections.remove(connection)
# WebSocket endpoint
@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
await manager.connect(websocket, user_id)
try:
while True:
data = await websocket.receive_text()
# Handle incoming messages
await handle_websocket_message(data, user_id)
except WebSocketDisconnect:
manager.disconnect(websocket, user_id)
File Upload and Processing¶
File Upload Handler¶
# services/file_service.py - File upload and processing
from fastapi import UploadFile, HTTPException
import zipfile
import tempfile
import os
import shutil
class FileService:
def __init__(self, upload_dir: str = "/app/uploads"):
self.upload_dir = upload_dir
os.makedirs(upload_dir, exist_ok=True)
async def upload_ml_project(self, file: UploadFile, user_id: str) -> dict:
# Validate file type
if not file.filename.endswith('.zip'):
raise HTTPException(
status_code=400,
detail="Only ZIP files are allowed"
)
# Create temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix='.zip') as temp_file:
content = await file.read()
temp_file.write(content)
temp_path = temp_file.name
try:
# Validate ZIP structure
await self._validate_ml_project_structure(temp_path)
# Extract to permanent location
project_dir = os.path.join(self.upload_dir, user_id, file.filename[:-4])
os.makedirs(project_dir, exist_ok=True)
with zipfile.ZipFile(temp_path, 'r') as zip_ref:
zip_ref.extractall(project_dir)
# Parse configuration
config = await self._parse_project_config(project_dir)
return {
"project_path": project_dir,
"config": config,
"status": "uploaded"
}
finally:
# Clean up temporary file
os.unlink(temp_path)
async def _validate_ml_project_structure(self, zip_path: str):
required_files = ['pyproject.toml', 'client_app.py', 'server_app.py']
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
file_list = zip_ref.namelist()
for required_file in required_files:
if not any(required_file in f for f in file_list):
raise HTTPException(
status_code=400,
detail=f"Missing required file: {required_file}"
)
Background Tasks and Async Processing¶
Celery Alternative with FastAPI¶
# services/background_tasks.py - Background task processing
from fastapi import BackgroundTasks
import asyncio
from typing import Callable, Any
class TaskManager:
def __init__(self):
self.running_tasks = {}
async def run_background_task(
self,
task_id: str,
func: Callable,
*args,
**kwargs
):
task = asyncio.create_task(func(*args, **kwargs))
self.running_tasks[task_id] = task
try:
result = await task
return result
except Exception as e:
logging.error(f"Task {task_id} failed: {e}")
raise
finally:
if task_id in self.running_tasks:
del self.running_tasks[task_id]
def get_task_status(self, task_id: str) -> str:
if task_id not in self.running_tasks:
return "not_found"
task = self.running_tasks[task_id]
if task.done():
if task.exception():
return "failed"
return "completed"
return "running"
# Usage in routes
@app.post("/training/start")
async def start_training(
config: TrainingConfig,
background_tasks: BackgroundTasks,
current_user: User = Depends(get_current_user)
):
job_id = generate_job_id()
# Add background task
background_tasks.add_task(
execute_training_job,
job_id,
config,
current_user.id
)
return {"job_id": job_id, "status": "started"}
Error Handling and Logging¶
Custom Exception Handlers¶
# core/exceptions.py - Custom exception handling
from fastapi import HTTPException, Request
from fastapi.responses import JSONResponse
import logging
class FederatedLearningException(Exception):
def __init__(self, message: str, error_code: str = None):
self.message = message
self.error_code = error_code
super().__init__(self.message)
class TrainingException(FederatedLearningException):
pass
class ConfigurationException(FederatedLearningException):
pass
# Exception handlers
@app.exception_handler(FederatedLearningException)
async def fl_exception_handler(request: Request, exc: FederatedLearningException):
logging.error(f"FL Exception: {exc.message}")
return JSONResponse(
status_code=400,
content={
"error": exc.message,
"error_code": exc.error_code,
"type": "federated_learning_error"
}
)
@app.exception_handler(500)
async def internal_server_error_handler(request: Request, exc: Exception):
logging.error(f"Internal server error: {exc}")
return JSONResponse(
status_code=500,
content={
"error": "Internal server error",
"message": "An unexpected error occurred"
}
)
Structured Logging¶
# core/logging.py - Structured logging configuration
import logging
import json
from datetime import datetime
class JSONFormatter(logging.Formatter):
def format(self, record):
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
}
if hasattr(record, 'user_id'):
log_entry["user_id"] = record.user_id
if hasattr(record, 'request_id'):
log_entry["request_id"] = record.request_id
return json.dumps(log_entry)
# Configure logging
def setup_logging():
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
# Reduce noise from external libraries
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
Next: Continue to Frontend Technologies for detailed frontend technology specifications.