Security Overview¶
This document provides a comprehensive overview of security measures, best practices, and considerations implemented in the Federated Learning Platform to ensure data protection, system integrity, and compliance with security standards.
Security Architecture¶
graph TB
subgraph "Security Layers"
subgraph "Network Security"
TLS[TLS/SSL Encryption]
FIREWALL[Firewall Rules]
VPN[VPN Access]
NETWORK_SEG[Network Segmentation]
end
subgraph "Application Security"
AUTH[Authentication]
AUTHZ[Authorization]
INPUT_VAL[Input Validation]
RATE_LIMIT[Rate Limiting]
end
subgraph "Data Security"
ENCRYPTION[Data Encryption]
FEDERATED[Federated Learning<br/>Privacy by Design]
BACKUP[Secure Backups]
AUDIT[Audit Logging]
end
subgraph "Infrastructure Security"
CONTAINER_SEC[Container Security]
SECRET_MGMT[Secret Management]
ACCESS_CTRL[Access Control]
MONITORING[Security Monitoring]
end
end
subgraph "Compliance & Standards"
GDPR[GDPR Compliance]
SOC2[SOC 2 Type II]
ISO27001[ISO 27001]
NIST[NIST Framework]
end
TLS --> AUTH
AUTH --> AUTHZ
AUTHZ --> INPUT_VAL
INPUT_VAL --> ENCRYPTION
ENCRYPTION --> FEDERATED
CONTAINER_SEC --> SECRET_MGMT
SECRET_MGMT --> ACCESS_CTRL
ACCESS_CTRL --> MONITORING
MONITORING --> GDPR
AUDIT --> SOC2
ENCRYPTION --> ISO27001
ACCESS_CTRL --> NIST
Core Security Principles¶
1. Defense in Depth¶
Multiple layers of security controls to protect against various attack vectors:
- Perimeter Security: Firewalls, intrusion detection systems
- Network Security: TLS encryption, network segmentation
- Application Security: Authentication, authorization, input validation
- Data Security: Encryption at rest and in transit
- Endpoint Security: Container security, host hardening
2. Zero Trust Architecture¶
Never trust, always verify approach:
- Identity Verification: Multi-factor authentication
- Device Verification: Certificate-based device authentication
- Network Verification: Encrypted communication channels
- Application Verification: API authentication and authorization
3. Privacy by Design¶
Built-in privacy protection mechanisms:
- Data Minimization: Collect only necessary data
- Federated Learning: Data never leaves client devices
- Differential Privacy: Statistical privacy protection (future enhancement)
- Secure Aggregation: Only model updates are shared
Authentication and Authorization¶
JWT-Based Authentication¶
# Authentication implementation
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
class AuthenticationService:
def __init__(self):
self.pwd_context = CryptContext(
schemes=["bcrypt"],
deprecated="auto",
bcrypt__rounds=12 # Strong hashing
)
self.secret_key = os.getenv("SECRET_KEY")
self.algorithm = "HS256"
self.access_token_expire_minutes = 30
async def authenticate_user(self, username: str, password: str) -> Optional[User]:
user = await self.get_user_by_username(username)
if not user or not self.verify_password(password, user.password_hash):
# Log failed authentication attempt
await self.log_auth_failure(username, request.client.host)
return None
# Log successful authentication
await self.log_auth_success(user.id, request.client.host)
return user
def create_access_token(self, data: dict) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=self.access_token_expire_minutes)
to_encode.update({
"exp": expire,
"iat": datetime.utcnow(),
"type": "access"
})
return jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
Role-Based Access Control (RBAC)¶
# Authorization implementation
from enum import Enum
from functools import wraps
class Role(Enum):
ADMIN = "admin"
USER = "user"
VIEWER = "viewer"
class Permission(Enum):
READ_PROJECTS = "read:projects"
WRITE_PROJECTS = "write:projects"
DELETE_PROJECTS = "delete:projects"
MANAGE_USERS = "manage:users"
START_TRAINING = "start:training"
VIEW_METRICS = "view:metrics"
ROLE_PERMISSIONS = {
Role.ADMIN: [
Permission.READ_PROJECTS,
Permission.WRITE_PROJECTS,
Permission.DELETE_PROJECTS,
Permission.MANAGE_USERS,
Permission.START_TRAINING,
Permission.VIEW_METRICS
],
Role.USER: [
Permission.READ_PROJECTS,
Permission.WRITE_PROJECTS,
Permission.START_TRAINING,
Permission.VIEW_METRICS
],
Role.VIEWER: [
Permission.READ_PROJECTS,
Permission.VIEW_METRICS
]
}
def require_permission(permission: Permission):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
current_user = kwargs.get('current_user')
if not current_user:
raise HTTPException(status_code=401, detail="Authentication required")
user_permissions = ROLE_PERMISSIONS.get(Role(current_user.role), [])
if permission not in user_permissions:
raise HTTPException(status_code=403, detail="Insufficient permissions")
return await func(*args, **kwargs)
return wrapper
return decorator
# Usage example
@app.delete("/projects/{project_id}")
@require_permission(Permission.DELETE_PROJECTS)
async def delete_project(project_id: str, current_user: User = Depends(get_current_user)):
# Delete project logic
pass
Data Protection¶
Encryption at Rest¶
# Database encryption configuration
from cryptography.fernet import Fernet
import os
class DataEncryption:
def __init__(self):
# Use environment variable for encryption key
key = os.getenv("ENCRYPTION_KEY")
if not key:
# Generate new key for development (store securely in production)
key = Fernet.generate_key()
self.cipher_suite = Fernet(key)
def encrypt_sensitive_data(self, data: str) -> str:
"""Encrypt sensitive data before storing in database"""
return self.cipher_suite.encrypt(data.encode()).decode()
def decrypt_sensitive_data(self, encrypted_data: str) -> str:
"""Decrypt sensitive data when retrieving from database"""
return self.cipher_suite.decrypt(encrypted_data.encode()).decode()
# MongoDB field-level encryption
class EncryptedUser(BaseModel):
username: str
email: str
password_hash: str
encrypted_personal_data: str # Encrypted PII
def set_personal_data(self, data: dict):
encryption = DataEncryption()
self.encrypted_personal_data = encryption.encrypt_sensitive_data(json.dumps(data))
def get_personal_data(self) -> dict:
encryption = DataEncryption()
decrypted = encryption.decrypt_sensitive_data(self.encrypted_personal_data)
return json.loads(decrypted)
Encryption in Transit¶
# TLS configuration for all services
version: '3.8'
services:
nginx:
image: nginx:alpine
ports:
- "443:443"
- "80:80"
volumes:
- ./nginx/nginx.conf:/etc/nginx/nginx.conf
- ./certs:/etc/nginx/certs
environment:
- SSL_CERT_PATH=/etc/nginx/certs/server.crt
- SSL_KEY_PATH=/etc/nginx/certs/server.key
backend-fastapi:
environment:
- FORCE_HTTPS=true
- SECURE_COOKIES=true
# nginx/nginx.conf - TLS configuration
server {
listen 443 ssl http2;
server_name your-domain.com;
ssl_certificate /etc/nginx/certs/server.crt;
ssl_certificate_key /etc/nginx/certs/server.key;
# Strong SSL configuration
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-RSA-AES256-GCM-SHA512:DHE-RSA-AES256-GCM-SHA512:ECDHE-RSA-AES256-GCM-SHA384;
ssl_prefer_server_ciphers off;
ssl_session_cache shared:SSL:10m;
ssl_session_timeout 10m;
# Security headers
add_header Strict-Transport-Security "max-age=63072000; includeSubDomains; preload";
add_header X-Content-Type-Options nosniff;
add_header X-Frame-Options DENY;
add_header X-XSS-Protection "1; mode=block";
add_header Referrer-Policy "strict-origin-when-cross-origin";
location / {
proxy_pass http://frontend:4000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
location /api/ {
proxy_pass http://backend-fastapi:8000/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
# Redirect HTTP to HTTPS
server {
listen 80;
server_name your-domain.com;
return 301 https://$server_name$request_uri;
}
Input Validation and Sanitization¶
Comprehensive Input Validation¶
# Input validation with Pydantic
from pydantic import BaseModel, validator, Field
from typing import Optional
import re
class UserRegistration(BaseModel):
username: str = Field(..., min_length=3, max_length=50)
email: str = Field(..., regex=r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
password: str = Field(..., min_length=8, max_length=128)
first_name: str = Field(..., min_length=1, max_length=100)
last_name: str = Field(..., min_length=1, max_length=100)
@validator('username')
def validate_username(cls, v):
if not re.match(r'^[a-zA-Z0-9_-]+$', v):
raise ValueError('Username can only contain letters, numbers, hyphens, and underscores')
return v
@validator('password')
def validate_password_strength(cls, v):
if not re.search(r'[A-Z]', v):
raise ValueError('Password must contain at least one uppercase letter')
if not re.search(r'[a-z]', v):
raise ValueError('Password must contain at least one lowercase letter')
if not re.search(r'\d', v):
raise ValueError('Password must contain at least one digit')
if not re.search(r'[!@#$%^&*()_+\-=\[\]{}|;:,.<>?]', v):
raise ValueError('Password must contain at least one special character')
return v
class TrainingConfiguration(BaseModel):
rounds: int = Field(..., ge=1, le=1000)
clients: int = Field(..., ge=2, le=10000)
learning_rate: float = Field(..., ge=0.0001, le=1.0)
batch_size: int = Field(..., ge=1, le=1024)
model_type: str = Field(..., regex=r'^[a-zA-Z0-9_-]+$')
@validator('model_type')
def validate_model_type(cls, v):
allowed_models = ['cnn', 'resnet18', 'resnet50', 'mobilenet', 'efficientnet']
if v not in allowed_models:
raise ValueError(f'Model type must be one of: {", ".join(allowed_models)}')
return v
SQL Injection Prevention¶
# Safe database queries with parameterized statements
from motor.motor_asyncio import AsyncIOMotorClient
from bson import ObjectId
class SecureDatabase:
def __init__(self, connection_string: str):
self.client = AsyncIOMotorClient(connection_string)
self.db = self.client.federated_learning
async def get_user_by_id(self, user_id: str) -> Optional[dict]:
# Validate ObjectId format
try:
object_id = ObjectId(user_id)
except Exception:
raise ValueError("Invalid user ID format")
# Use parameterized query
user = await self.db.users.find_one({"_id": object_id})
return user
async def search_projects(self, search_term: str, user_id: str) -> List[dict]:
# Sanitize search term
sanitized_term = re.escape(search_term)
# Use MongoDB text search with proper escaping
query = {
"owner_id": ObjectId(user_id),
"$text": {"$search": sanitized_term}
}
projects = await self.db.projects.find(query).to_list(length=100)
return projects
Container Security¶
Secure Container Configuration¶
# Secure Dockerfile practices
FROM python:3.10-slim AS base
# Create non-root user
RUN groupadd -r appuser && useradd -r -g appuser appuser
# Install security updates
RUN apt-get update && \
apt-get upgrade -y && \
apt-get install -y --no-install-recommends \
ca-certificates \
curl && \
rm -rf /var/lib/apt/lists/*
# Set secure working directory
WORKDIR /app
# Copy requirements first for better caching
COPY requirements.txt .
# Install Python dependencies as root
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY --chown=appuser:appuser . .
# Remove unnecessary packages and clean up
RUN apt-get autoremove -y && \
apt-get autoclean && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
# Set file permissions
RUN chmod -R 755 /app && \
chmod -R 644 /app/*.py
# Switch to non-root user
USER appuser
# Expose port (non-privileged)
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Use exec form for better signal handling
CMD ["python", "-m", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
Container Runtime Security¶
# docker-compose.security.yml
version: '3.8'
services:
backend-fastapi:
security_opt:
- no-new-privileges:true
cap_drop:
- ALL
cap_add:
- NET_BIND_SERVICE
read_only: true
tmpfs:
- /tmp:noexec,nosuid,size=100m
ulimits:
nproc: 65535
nofile:
soft: 65535
hard: 65535
sysctls:
- net.core.somaxconn=1024
mongodb:
security_opt:
- no-new-privileges:true
cap_drop:
- ALL
cap_add:
- SETGID
- SETUID
- DAC_OVERRIDE
read_only: true
tmpfs:
- /tmp:noexec,nosuid,size=100m
Secret Management¶
Environment-Based Secrets¶
# Secure secret management
import os
from typing import Optional
class SecretManager:
@staticmethod
def get_secret(secret_name: str, default: Optional[str] = None) -> str:
"""Get secret from environment variables or files"""
# Try environment variable first
secret = os.getenv(secret_name)
if secret:
return secret
# Try Docker secrets
secret_file = f"/run/secrets/{secret_name.lower()}"
if os.path.exists(secret_file):
with open(secret_file, 'r') as f:
return f.read().strip()
# Use default if provided
if default is not None:
return default
raise ValueError(f"Secret {secret_name} not found")
@staticmethod
def get_database_url() -> str:
return SecretManager.get_secret("DATABASE_URL", "mongodb://localhost:27017")
@staticmethod
def get_jwt_secret() -> str:
return SecretManager.get_secret("JWT_SECRET_KEY")
@staticmethod
def get_encryption_key() -> str:
return SecretManager.get_secret("ENCRYPTION_KEY")
# Usage in application
settings = {
"database_url": SecretManager.get_database_url(),
"jwt_secret": SecretManager.get_jwt_secret(),
"encryption_key": SecretManager.get_encryption_key()
}
Docker Secrets Configuration¶
# docker-compose.secrets.yml
version: '3.8'
services:
backend-fastapi:
secrets:
- jwt_secret
- database_password
- encryption_key
environment:
- JWT_SECRET_KEY_FILE=/run/secrets/jwt_secret
- DATABASE_PASSWORD_FILE=/run/secrets/database_password
- ENCRYPTION_KEY_FILE=/run/secrets/encryption_key
secrets:
jwt_secret:
external: true
database_password:
external: true
encryption_key:
external: true
Security Monitoring and Logging¶
Comprehensive Audit Logging¶
# Security audit logging
import logging
import json
from datetime import datetime
from typing import Optional
class SecurityLogger:
def __init__(self):
self.logger = logging.getLogger("security")
self.logger.setLevel(logging.INFO)
# Create file handler for security logs
handler = logging.FileHandler("/var/log/security.log")
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log_authentication_success(self, user_id: str, ip_address: str, user_agent: str):
self.logger.info(json.dumps({
"event": "authentication_success",
"user_id": user_id,
"ip_address": ip_address,
"user_agent": user_agent,
"timestamp": datetime.utcnow().isoformat()
}))
def log_authentication_failure(self, username: str, ip_address: str, reason: str):
self.logger.warning(json.dumps({
"event": "authentication_failure",
"username": username,
"ip_address": ip_address,
"reason": reason,
"timestamp": datetime.utcnow().isoformat()
}))
def log_authorization_failure(self, user_id: str, resource: str, action: str):
self.logger.warning(json.dumps({
"event": "authorization_failure",
"user_id": user_id,
"resource": resource,
"action": action,
"timestamp": datetime.utcnow().isoformat()
}))
def log_data_access(self, user_id: str, resource: str, action: str):
self.logger.info(json.dumps({
"event": "data_access",
"user_id": user_id,
"resource": resource,
"action": action,
"timestamp": datetime.utcnow().isoformat()
}))
# Usage in middleware
@app.middleware("http")
async def security_logging_middleware(request: Request, call_next):
start_time = time.time()
# Log request
security_logger.log_request(
method=request.method,
url=str(request.url),
ip_address=request.client.host,
user_agent=request.headers.get("user-agent", "")
)
response = await call_next(request)
# Log response
process_time = time.time() - start_time
security_logger.log_response(
status_code=response.status_code,
process_time=process_time
)
return response
Compliance and Standards¶
GDPR Compliance¶
# GDPR compliance implementation
class GDPRCompliance:
def __init__(self, db: Database):
self.db = db
async def handle_data_subject_request(self, user_id: str, request_type: str):
"""Handle GDPR data subject requests"""
if request_type == "access":
return await self.export_user_data(user_id)
elif request_type == "delete":
return await self.delete_user_data(user_id)
elif request_type == "portability":
return await self.export_portable_data(user_id)
else:
raise ValueError("Invalid request type")
async def export_user_data(self, user_id: str) -> dict:
"""Export all user data for GDPR access request"""
user_data = {}
# User profile
user = await self.db.users.find_one({"_id": ObjectId(user_id)})
if user:
user_data["profile"] = {
"username": user["username"],
"email": user["email"],
"created_at": user["created_at"],
"last_login": user.get("last_login")
}
# Projects
projects = await self.db.projects.find({"owner_id": ObjectId(user_id)}).to_list(None)
user_data["projects"] = projects
# Training jobs
jobs = await self.db.training_jobs.find({"user_id": ObjectId(user_id)}).to_list(None)
user_data["training_jobs"] = jobs
return user_data
async def delete_user_data(self, user_id: str) -> bool:
"""Delete all user data for GDPR deletion request"""
try:
# Delete user projects
await self.db.projects.delete_many({"owner_id": ObjectId(user_id)})
# Delete training jobs
await self.db.training_jobs.delete_many({"user_id": ObjectId(user_id)})
# Anonymize or delete user record
await self.db.users.update_one(
{"_id": ObjectId(user_id)},
{"$set": {
"username": f"deleted_user_{user_id[:8]}",
"email": f"deleted_{user_id[:8]}@example.com",
"is_active": False,
"deleted_at": datetime.utcnow()
}}
)
return True
except Exception as e:
logging.error(f"Failed to delete user data: {e}")
return False
Security Testing¶
Automated Security Scanning¶
#!/bin/bash
# security-scan.sh - Automated security scanning
echo "Running security scans..."
# Container vulnerability scanning
echo "Scanning container images..."
docker run --rm -v /var/run/docker.sock:/var/run/docker.sock \
aquasec/trivy image flip_backend-fastapi:latest
docker run --rm -v /var/run/docker.sock:/var/run/docker.sock \
aquasec/trivy image flip_frontend:latest
# Dependency vulnerability scanning
echo "Scanning Python dependencies..."
cd backend
pip install safety
safety check
echo "Scanning Node.js dependencies..."
cd ../frontend
npm audit
# Static code analysis
echo "Running static code analysis..."
cd ../backend
pip install bandit
bandit -r app/
cd ../frontend
npm install --save-dev eslint-plugin-security
npx eslint --ext .js,.jsx,.ts,.tsx src/
echo "Security scan complete."
Next: Continue to Operations Troubleshooting for comprehensive operational guidance.