Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/OpenMined/syft-flwr/llms.txt

Use this file to discover all available pages before exploring further.

The P2P transport enables federated learning in cloud environments without local SyftBox installations. It uses Google Drive API for file-based message passing between participants.

SyftP2PClient

Lightweight client for P2P sync mode using Google Drive API.

Class Definition

from syft_flwr.client import SyftP2PClient

class SyftP2PClient(SyftFlwrClient):
    """Client for P2P (Google Drive) sync mode using Google Drive API.
    
    This client is used when FL jobs are submitted via syft_client's
    Google Drive-based sync system.
    
    Key differences from syft_core:
    - Uses Google Drive API directly (via GDriveFileIO) instead of filesystem paths
    - No RPC/crypto/event - Google Drive handles transport and access control
    - Email comes from environment variable or explicit parameter
    """
Source: src/syft_flwr/client/syft_p2p_client.py:8

Constructor

email
str
required
The user’s email address for Google Drive access
Example:
from syft_flwr.client import SyftP2PClient

client = SyftP2PClient(email="user@example.com")
print(client.email)  # user@example.com

Class Methods

from_env

@classmethod
def from_env(cls) -> SyftP2PClient
Create a client from environment variables set by the job runner. Environment Variables:
  • SYFTBOX_EMAIL: The data owner’s email (set by job_runner.py)
return
SyftP2PClient
Initialized client instance
Raises:
  • ValueError: If SYFTBOX_EMAIL is not set
Example:
import os
os.environ["SYFTBOX_EMAIL"] = "user@example.com"

from syft_flwr.client import SyftP2PClient

client = SyftP2PClient.from_env()
print(client.email)  # user@example.com

Properties

email

@property
def email(self) -> str
return
str
User’s email address

syftbox_folder

@property
def syftbox_folder(self) -> Path
return
Path
Logical path to SyftBox folder in Google Drive (“SyftBox”)

config_path

@property
def config_path(self) -> Path
return
Path
Logical path to config directory (“SyftBox/.config”)
In P2P mode, there is no actual config file - this returns a logical path for compatibility.

my_datasite

@property
def my_datasite(self) -> Path
return
Path
Logical path to user’s datasite in Google Drive (e.g., “SyftBox/user@example.com”)

datasites

@property
def datasites(self) -> Path
return
Path
Logical path to datasites root (“SyftBox”)

Methods

app_data

def app_data(
    self,
    app_name: Optional[str] = None,
    datasite: Optional[str] = None,
) -> Path
Get the app data directory path in Google Drive.
app_name
str
default:"None"
Name of the application (e.g., “flwr/my_fl_app”)
datasite
str
default:"self.email"
Email address of the datasite owner
return
Path
Logical path to app data directory in Google Drive
Example:
client = SyftP2PClient(email="user@example.com")

# Get app data for current user
app_path = client.app_data("flwr/diabetes")
print(app_path)
# SyftBox/user@example.com/app_data/flwr/diabetes

# Get app data for another datasite
other_path = client.app_data("flwr/diabetes", "other@example.com")
print(other_path)
# SyftBox/other@example.com/app_data/flwr/diabetes

get_client

def get_client(self) -> SyftP2PClient
Return self - this IS the client (no underlying native client).
return
SyftP2PClient
Returns self for API compatibility

P2PFileRpc

File-based RPC adapter using Google Drive API for message passing.

Class Definition

from syft_flwr.rpc import P2PFileRpc

class P2PFileRpc(SyftFlwrRpc):
    """P2P File-based RPC adapter using Google Drive API via syft-client.
    
    Instead of using filesystem paths, this adapter uses GDriveFileIO to:
    - Write .request files to the shared outbox folder via Google Drive API
    - Poll for .response files in the inbox folder via Google Drive API
    - Uses in-memory tracking for pending futures
    """
Source: src/syft_flwr/rpc/p2p_file_rpc.py:12

Directory Structure

Messages are organized in Google Drive using inbox/outbox folders:
SyftBox/
├── syft_outbox_inbox_sender_to_recipient/
│   └── {app_name}/
│       └── rpc/
│           └── {endpoint}/
│               └── {future_id}.request
└── syft_outbox_inbox_recipient_to_sender/
    └── {app_name}/
        └── rpc/
            └── {endpoint}/
                └── {future_id}.response

Constructor

sender_email
str
required
Email address of the sender
app_name
str
required
Name of the FL application

Methods

send

def send(
    self,
    to_email: str,
    app_name: str,
    endpoint: str,
    body: bytes,
    encrypt: bool = False,
) -> str
Send a message by writing a .request file to Google Drive.
to_email
str
required
Recipient’s email address
app_name
str
required
Name of the FL application
endpoint
str
required
RPC endpoint (e.g., “messages”)
body
bytes
required
Message body as bytes
encrypt
bool
default:false
Ignored (encryption not yet supported in P2P mode)
return
str
Future ID (UUID) for tracking the response
Encryption is not yet supported in P2P mode. Setting encrypt=True will log a warning and send unencrypted.
Example:
from syft_flwr.rpc import P2PFileRpc

rpc = P2PFileRpc(
    sender_email="sender@example.com",
    app_name="flwr/my_app"
)

future_id = rpc.send(
    to_email="recipient@example.com",
    app_name="flwr/my_app",
    endpoint="messages",
    body=b"serialized_message",
    encrypt=False
)

print(f"Sent message, future_id: {future_id}")

get_response

def get_response(self, future_id: str) -> Optional[bytes]
Poll for a .response file in the inbox via Google Drive API.
future_id
str
required
The future ID returned by send()
return
Optional[bytes]
Response body as bytes, or None if not available yet
Example:
import time

future_id = rpc.send(
    to_email="recipient@example.com",
    app_name="flwr/my_app",
    endpoint="messages",
    body=b"request_data"
)

# Poll for response
while True:
    response = rpc.get_response(future_id)
    if response is not None:
        print(f"Got response: {len(response)} bytes")
        break
    time.sleep(2)  # Wait before polling again

delete_future

def delete_future(self, future_id: str) -> None
Clear in-memory tracking for a future ID.
future_id
str
required
The future ID to remove from tracking
Unlike SyftRpc, this only clears in-memory tracking. Response files are owned by the data owner and cannot be deleted by the data scientist.
Example:
response = rpc.get_response(future_id)
if response is not None:
    process_response(response)
    rpc.delete_future(future_id)  # Clean up tracking

P2PFileEvents

Polling-based event handler for processing incoming messages from Google Drive.

Class Definition

from syft_flwr.events import P2PFileEvents

class P2PFileEvents(SyftFlwrEvents):
    """P2P File-based polling events using Google Drive API via syft-client.
    
    This adapter:
    - Polls inbox folders for incoming .request files (from other participants) via Google Drive API
    - Calls the registered handler with the message bytes
    - Writes the response to the outbox folder (back to sender) via Google Drive API
    """
Source: src/syft_flwr/events/p2p_fle_events.py:18

Constructor

app_name
str
required
Name of the FL application
client_email
str
required
Email address of the client
poll_interval
float
Polling interval in seconds
max_processed_requests
int
default:10000
Maximum number of processed request IDs to track (LRU eviction)

Properties

client_email

@property
def client_email(self) -> str
return
str
Email address of the current client

app_dir

@property
def app_dir(self) -> Path
return
Path
Logical path to app data directory in Google Drive

is_running

@property
def is_running(self) -> bool
return
bool
Whether the polling loop is currently running

Methods

on_request

def on_request(
    self,
    endpoint: str,
    handler: Callable[[bytes], Optional[Union[str, bytes]]],
    auto_decrypt: bool = True,
    encrypt_reply: bool = False,
) -> None
Register a handler for incoming messages at an endpoint.
endpoint
str
required
The endpoint path (e.g., “/messages”)
handler
Callable[[bytes], Optional[Union[str, bytes]]]
required
Function that receives message bytes and returns response
auto_decrypt
bool
default:true
Ignored in P2P mode (Google Drive handles access control)
encrypt_reply
bool
default:false
Ignored in P2P mode
auto_decrypt and encrypt_reply parameters are ignored in P2P mode since Google Drive handles access control instead of X3DH encryption.
Example:
from syft_flwr.events import P2PFileEvents

events = P2PFileEvents(
    app_name="flwr/my_app",
    client_email="user@example.com",
    poll_interval=2.0
)

def handle_message(body: bytes) -> bytes:
    """Process incoming FL message."""
    print(f"Received: {len(body)} bytes")
    # Process message...
    return b"response_data"

events.on_request(
    endpoint="/messages",
    handler=handle_message
)

print("Starting polling loop...")
events.run_forever()

run_forever

def run_forever(self) -> None
Start the polling loop and block until stopped. Continuously polls Google Drive for new .request files. Polling Behavior:
  • Checks all inbox folders for messages from known senders
  • Processes each endpoint registered via on_request()
  • Tracks processed requests to avoid reprocessing (LRU cache)
  • Sleeps for poll_interval seconds between polls
Example:
try:
    events.run_forever()
except KeyboardInterrupt:
    print("Shutting down...")
    events.stop()

stop

def stop(self) -> None
Signal the polling loop to stop gracefully.

Usage Example

Complete example of using P2P transport for federated learning:
import os
from syft_flwr.client import create_client
from syft_flwr.rpc import create_rpc
from syft_flwr.events import create_events_watcher

# Set environment for P2P mode
os.environ["SYFTBOX_EMAIL"] = "dataowner@example.com"

# Create client (auto-detects P2P from config)
client = create_client(
    transport="p2p",
    email="dataowner@example.com"
)

# Create RPC adapter (returns P2PFileRpc)
rpc = create_rpc(client=client, app_name="flwr/diabetes")

# Send a message
future_id = rpc.send(
    to_email="aggregator@example.com",
    app_name="flwr/diabetes",
    endpoint="messages",
    body=b"training_update"
)

# Create events watcher (returns P2PFileEvents)
events = create_events_watcher(
    app_name="flwr/diabetes",
    client=client,
    poll_interval=2.0
)

def handle_training_request(body: bytes) -> bytes:
    """Handle incoming training requests from aggregator."""
    # Train local model...
    return b"local_model_weights"

events.on_request("/messages", handle_training_request)
events.run_forever()

Limitations

Current Limitations:
  • End-to-end encryption not yet implemented (relies on Google Drive access control)
  • Polling-based (higher latency than watchdog-based SyftBox)
  • Requires Google Drive API credentials
  • Cannot delete response files (owned by recipient)

See Also