Skip to content

taps.apps.fedlearn.app

FedlearnApp

FedlearnApp(
    clients: int,
    rounds: int,
    dataset: DataChoices,
    batch_size: int,
    epochs: int,
    lr: float,
    data_dir: Path,
    device: str = "cpu",
    download: bool = False,
    train: bool = True,
    test: bool = True,
    alpha: float = 100000.0,
    participation: float = 1.0,
    seed: int | None = None,
)

Federated learning application.

Parameters:

  • clients (int) –

    Number of simulated clients.

  • rounds (int) –

    Number of aggregation rounds to perform.

  • dataset (DataChoices) –

    Dataset (and corresponding model) to use.

  • batch_size (int) –

    Batch size used for local training across all clients.

  • epochs (int) –

    Number of epochs used during local training on all the clients.

  • lr (float) –

    Learning rate used during local training on all the clients.

  • data_dir (Path) –

    Root directory where the dataset is stored or where you wish to download the data (i.e., download=True).

  • device (str, default: 'cpu' ) –

    Device to use for model training (e.g., 'cuda', 'cpu', 'mps').

  • train (bool, default: True ) –

    If True (default), the local training will be run. If `False, then a no-op version of the application will be performed where no training is done. This is useful for debugging purposes.

  • test (bool, default: True ) –

    If True (default), model testing is done at the end of each aggregation round.

  • alpha (float, default: 100000.0 ) –

    The number of data samples across clients is defined by a Dirichlet distribution. This value is used to define the uniformity of the amount of data samples across all clients. When data alpha is large, then the number of data samples across clients is uniform (default). When the value is very small, then the sample distribution becomes more non-uniform. Note: this value must be greater than 0.

  • participation (float, default: 1.0 ) –

    The portion of clients that participate in an aggregation round. If set to 1.0, then all clients participate in each round; if 0.5 then half of the clients, and so on. At least one client will be selected regardless of this value and the number of clients.

  • seed (int | None, default: None ) –

    Seed for reproducibility.

Source code in taps/apps/fedlearn/app.py
def __init__(
    self,
    clients: int,
    rounds: int,
    dataset: DataChoices,
    batch_size: int,
    epochs: int,
    lr: float,
    data_dir: pathlib.Path,
    device: str = 'cpu',
    download: bool = False,
    train: bool = True,
    test: bool = True,
    alpha: float = 1e5,
    participation: float = 1.0,
    seed: int | None = None,
) -> None:
    self.rng = numpy.random.default_rng(seed)
    if seed is not None:
        torch.manual_seed(seed)

    self.dataset = dataset
    self.global_model = create_model(self.dataset)

    self.train, self.test = train, test
    self.train_data, self.test_data = None, None
    root = pathlib.Path(data_dir)
    if self.train:
        self.train_data = load_data(
            self.dataset,
            root,
            train=True,
            download=True,
        )
    if self.test:
        self.test_data = load_data(
            self.dataset,
            root,
            train=False,
            download=True,
        )

    self.device = torch.device(device)
    self.epochs = epochs
    self.batch_size = batch_size
    self.lr = lr

    self.participation = participation

    self.rounds = rounds
    if alpha <= 0:
        raise ValueError('Argument `alpha` must be greater than 0.')
    self.alpha = alpha

    self.clients = create_clients(
        clients,
        self.dataset,
        self.train,
        self.train_data,
        self.alpha,
        self.rng,
    )
    logger.log(APP_LOG_LEVEL, f'Created {len(self.clients)} clients')

close

close() -> None

Close the application.

Source code in taps/apps/fedlearn/app.py
def close(self) -> None:
    """Close the application."""
    pass

run

run(engine: Engine, run_dir: Path) -> None

Run the application.

Parameters:

  • engine (Engine) –

    Application execution engine.

  • run_dir (Path) –

    Directory for run outputs.

Source code in taps/apps/fedlearn/app.py
def run(self, engine: Engine, run_dir: pathlib.Path) -> None:
    """Run the application.

    Args:
        engine: Application execution engine.
        run_dir: Directory for run outputs.
    """
    results = []
    for round_idx in range(self.rounds):
        preface = f'({round_idx+1}/{self.rounds})'
        logger.log(
            APP_LOG_LEVEL,
            f'{preface} Starting local training for this round',
        )

        train_result = self._federated_round(round_idx, engine, run_dir)
        results.extend(train_result)

        if self.test_data is not None:
            logger.log(
                APP_LOG_LEVEL,
                f'{preface} Starting the test for the global model',
            )
            test_result = engine.submit(
                test_model,
                self.global_model,
                self.test_data,
                round_idx,
                self.device,
            ).result()
            logger.log(
                APP_LOG_LEVEL,
                f"{preface} Finished testing with test_loss="
                f"{test_result['test_loss']:.3f}",
            )