Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/OpenMined/syft-flwr/llms.txt

Use this file to discover all available pages before exploring further.

The server orchestration layer enables Flower ServerApps to coordinate federated learning across multiple data owners using SyftBox communication.

syftbox_flwr_server

Run a Flower ServerApp with SyftBox integration for coordinating FL training.

Function Signature

def syftbox_flwr_server(
    server_app: ServerApp,
    context: Context,
    datasites: list[str],
    app_name: str,
    project_dir: Optional[Path] = None,
) -> Context
Source: src/syft_flwr/fl_orchestrator/flower_server.py:15

Parameters

server_app
flwr.server.ServerApp
required
The Flower ServerApp instance containing the aggregation strategy
context
flwr.common.Context
required
Flower context with run configuration and state
datasites
list[str]
required
List of data owner email addresses to coordinate (e.g., [“do1@example.com”, “do2@example.com”])
app_name
str
required
Name of the FL application (e.g., “diabetes_prediction”)
project_dir
Path
default:"None"
Path to the FL project directory (reads transport config from pyproject.toml)
return
flwr.common.Context
Updated context after training completion

Workflow

  1. Setup: Create SyftGrid instance with datasites and transport
  2. Initialize: Set random run ID for the FL session
  3. Train: Execute Flower server with SyftGrid backend
  4. Cleanup: Send stop signals to all clients
  5. Return: Updated context with final state

Example Usage

from pathlib import Path
from flwr.server import ServerApp, ServerConfig
from flwr.server.strategy import FedAvg
from flwr.common import Context
from syft_flwr.fl_orchestrator import syftbox_flwr_server

# Define datasites (data owners)
datasites = [
    "dataowner1@example.com",
    "dataowner2@example.com",
    "dataowner3@example.com"
]

# Create server app with FedAvg strategy
strategy = FedAvg(
    fraction_fit=1.0,  # Use all clients for training
    fraction_evaluate=1.0,  # Use all clients for evaluation
    min_available_clients=3,  # Wait for all clients
)

config = ServerConfig(num_rounds=5)

def server_fn(context: Context):
    return strategy, config

app = ServerApp(server_fn=server_fn)

# Run aggregation server
if __name__ == "__main__":
    context = Context(state={}, run_config={})
    
    updated_context = syftbox_flwr_server(
        server_app=app,
        context=context,
        datasites=datasites,
        app_name="diabetes_prediction",
        project_dir=Path("./diabetes_fl_project")
    )
    
    print(f"Training complete! Final context: {updated_context}")

SyftGrid Integration

The server uses SyftGrid as the communication backend:
from syft_flwr.fl_orchestrator.syft_grid import SyftGrid

# Automatically created by syftbox_flwr_server
syft_grid = SyftGrid(
    app_name="flwr/diabetes_prediction",
    datasites=datasites,
    client=client  # Auto-detected SyftFlwrClient
)

# Set run ID
run_id = 12345
syft_grid.set_run(run_id)

# SyftGrid handles all message routing
See SyftGrid for detailed API reference.

Stop Signal Behavior

After training completes (success or failure), the server sends stop signals to all clients:
# Sent to each datasite
{
    "metadata": {
        "message_type": "SYSTEM",
        "group_id": "final",
        "dst_node_id": <client_node_id>
    },
    "content": {
        "config": {
            "action": "stop",
            "reason": "Server stopped"
        }
    }
}
Clients will gracefully shut down upon receiving this signal.

Error Handling

The server handles errors and still sends stop signals:
try:
    updated_context = run_server(
        syft_grid,
        context=context,
        loaded_server_app=server_app,
        server_app_dir=""
    )
    logger.info(f"Server completed with context: {updated_context}")
except Exception as e:
    logger.error(f"Server encountered an error: {str(e)}")
    logger.error(f"Traceback: {traceback.format_exc()}")
    updated_context = context  # Return original context
finally:
    syft_grid.send_stop_signal(group_id="final", reason="Server stopped")
    logger.info("Sending stop signals to the clients")

Transport Support

SyftBox Mode:
  • Full RPC/crypto stack with optional encryption
  • Real-time message delivery via filesystem watching
  • Futures database for response tracking
P2P Mode:
  • File-based messaging via Google Drive API
  • Polling-based message retrieval
  • No encryption (access control via Drive permissions)

Environment Variables

SYFTBOX_EMAIL
str
Email address for P2P mode (required when using P2P transport)
SYFT_FLWR_ENCRYPTION_ENABLED
str
default:"true"
Enable/disable encryption in SyftBox mode (“true” or “false”)
SYFT_FLWR_MSG_TIMEOUT
float
default:"120.0"
Maximum time (seconds) to wait for client responses
SYFT_FLWR_POLL_INTERVAL
float
default:"3.0"
Polling interval (seconds) for checking client responses

Complete Example

Full working example with custom strategy and metrics:
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import numpy as np

from flwr.server import ServerApp, ServerConfig, ClientManager
from flwr.server.strategy import FedAvg
from flwr.common import (
    Context,
    Parameters,
    FitRes,
    EvaluateRes,
    Scalar,
    parameters_to_ndarrays,
    ndarrays_to_parameters
)
from syft_flwr.fl_orchestrator import syftbox_flwr_server

class DiabetesFedAvg(FedAvg):
    """Custom FedAvg with metrics aggregation for diabetes prediction."""
    
    def aggregate_fit(
        self,
        server_round: int,
        results: List[Tuple[ClientProxy, FitRes]],
        failures: List[BaseException]
    ) -> Tuple[Optional[Parameters], Dict[str, Scalar]]:
        """Aggregate model updates and log statistics."""
        
        # Call parent aggregation
        parameters_aggregated, metrics_aggregated = super().aggregate_fit(
            server_round, results, failures
        )
        
        # Log round statistics
        if results:
            total_examples = sum([r.num_examples for _, r in results])
            print(f"\n=== Round {server_round} ===")
            print(f"Clients participated: {len(results)}")
            print(f"Total training examples: {total_examples}")
            print(f"Failures: {len(failures)}")
            
            metrics_aggregated["num_clients"] = len(results)
            metrics_aggregated["total_examples"] = total_examples
        
        return parameters_aggregated, metrics_aggregated
    
    def aggregate_evaluate(
        self,
        server_round: int,
        results: List[Tuple[ClientProxy, EvaluateRes]],
        failures: List[BaseException]
    ) -> Tuple[Optional[float], Dict[str, Scalar]]:
        """Aggregate evaluation results."""
        
        if not results:
            return None, {}
        
        # Weighted average of losses
        total_examples = sum([r.num_examples for _, r in results])
        weighted_losses = [
            r.loss * r.num_examples for _, r in results
        ]
        avg_loss = sum(weighted_losses) / total_examples
        
        # Aggregate accuracy metrics
        accuracies = [
            r.metrics.get("accuracy", 0.0) * r.num_examples
            for _, r in results
        ]
        avg_accuracy = sum(accuracies) / total_examples
        
        print(f"\n=== Evaluation Round {server_round} ===")
        print(f"Average loss: {avg_loss:.4f}")
        print(f"Average accuracy: {avg_accuracy:.4f}")
        
        metrics = {
            "accuracy": avg_accuracy,
            "num_clients_evaluated": len(results)
        }
        
        return avg_loss, metrics

# Configuration
datasites = [
    "hospital1@example.com",
    "hospital2@example.com",
    "hospital3@example.com"
]

strategy = DiabetesFedAvg(
    fraction_fit=1.0,
    fraction_evaluate=1.0,
    min_fit_clients=3,
    min_evaluate_clients=3,
    min_available_clients=3
)

config = ServerConfig(num_rounds=10)

def server_fn(context: Context):
    return strategy, config

app = ServerApp(server_fn=server_fn)

if __name__ == "__main__":
    print("Starting Diabetes Prediction FL Server")
    print(f"Datasites: {datasites}")
    
    context = Context(state={}, run_config={})
    
    final_context = syftbox_flwr_server(
        server_app=app,
        context=context,
        datasites=datasites,
        app_name="diabetes_prediction",
        project_dir=Path("./diabetes_fl_project")
    )
    
    print("\n=== Training Complete ===")
    print(f"Final context state: {final_context.state}")

Run ID Assignment

The server assigns a random run ID for each FL session:
from random import randint

run_id = randint(0, 1000)
syft_grid.set_run(run_id)
In production deployments, you may want to use a more sophisticated run ID generation scheme (e.g., timestamp-based, UUID, or database-assigned IDs).

See Also