对于联邦学习正在学习中,下文中若有错误出现,望指正
介绍
本文在简单实现联邦平均算法时,使用客户-服务器架构,其基本流程是:
1、server初始化模型参数,所有clients将这个初始模型下载到本地
2、clients利用本地产生的数据进行训练
3、将训练得到的模型参数上传到server
4、server对得到的模型参数整合,所有的clients更新模型参数
5、重复执行2-5,直至收敛或达到预期要求
在我看来,联邦学习就是参数在客户端和服务器之间的反复调整
数据管理
加载数据集
在这部分代码段中,先加载MNIST数据集的训练集和测试集。然后将训练集和测试集的数据和标签分别提取出来。接着计算数据的均值和标准差,用于后续的归一化处理。归一化的目的是将数据缩放到一个相同的尺度,使得不同特征之间的数值差异不会对模型的学习产生过大的影响
接下来,使用transforms.Compose
创建一个转换操作序列,包括将数据转换为张量(tensor)和对数据进行归一化处理。
# 加载MNIST数据集
train_dataset = torchvision.datasets.MNIST("./dataset", train=True, download=False)
test_dataset = torchvision.datasets.MNIST("./dataset", train=False, download=False)
# 将数据与标签分离
train_data = train_dataset.data.to(torch.float)
train_labels = np.array(train_dataset.targets)test_data = test_dataset.data.data.to(torch.float)
test_labels = np.array(test_dataset.targets)
# 对数据进行归一化处理
mean = (train_data.mean()) / (train_data.max() - train_data.min())
std = (train_data.std()) / (train_data.max() - train_data.min())transform = transforms.Compose([transforms.Normalize((mean,), (std,))
])train_data = transform(train_data)
test_data = transform(test_data)# train_data.shpe[0] <-> len(train_data) 为数据集的大小
train_data_size = train_data.shape[0]
test_data_size = test_data.shape[0]test_DataLoader = DataLoader(TensorDataset(test_data, test_labels), batch_size=self.BatchSize)
数据划分
我们要为每个客户端分配数据,在实际场景中,每个客户端有自己独有的数据,这里为了模拟场景,手动划分数据集给每个客户端,MNIST数据集中共有60000个样本,这里设存在100个客户端,那我们就要将这60000个数据分配给100个客户端,而客户端之间的数据可能是独立分布IID,也可能是非独立同分布Non_IID的
IID
将数据集打乱,然后为每个client分配600个数据
涉及到的函数:
numpy.random.seed():生成随机种子
numpy.random.permutation():随机排列序列
numpy.array_split():将大型数组拆分为多个较小的子数组
代码
# 整体数据集(MNIST)中的类别数
nclass = np.max(train_labels) + 1# 数据划分
client_train_data = {}
client_train_label = {}
if self.is_iid:# 设置随机种子数可以保证每次运行代码时,np.random.permutation(self.train_data_size)产生的数列都是一样的, 这样可以确保实验的可重复性np.random.seed(12)# 对训练数据集(序号)进行随机排列, 得到一个索引数组idxs(实现了将数据集打乱)idxs = np.random.permutation(train_data_size)# 将索引数组idxs分割成与客户端数量相等的子数组batch_idxsbatch_idxs = np.array_split(idxs, self.num_of_clients)# 遍历所有客户端for i in range(self.num_of_clients):# 根据索引数组batch_idxs[i], 给每个客户端分配相应的数据和标签client_train_data[i] = train_data[batch_idxs[i]]client_train_label[i] = train_labels[batch_idxs[i]]# 类别分布,表示每个类别在客户端数据集中出现的次数distribution = [client_train_label[i].tolist().count(i) for i in range(nclass)]return client_train_data, client_train_label
Non-IID
首先根据数据标签将数据集排序(即MNIST中的数字大小),
然后将其划分为200组大小为300的数据切片,然后分给每个Client两个切片
涉及到的函数
numpy.random.dirichlet(): dirichlet分布中获取随机样本,并使用该方法返回一些随机样本的numpy数组
numpy.argwhere: 查找满足特定条件的索引
zip(): 将可迭代的对象作为参数,将对象中对应的元素打包成一个个元组,然后返回由这些元组组成的列表
numpy.cumsum(): 计算轴向的累加和
numpy.concatenate(): 拼接数组
代码
else:n_clients = self.num_of_clientstrain_label = train_labelsnp.random.seed(250)# [self.beta] * n_clients会创建一个长度为n_clients, 每个元素都为self.beta的列表# 生成一个形状为(nclass, n_clients)的矩阵, 记录每个类别划分到每个client的比例label_distribution = np.random.dirichlet([self.beta] * n_clients, nclass)# 对于每一个在范围[0, nclass-1]内的整数y, 找到train_label中所有等于y的元素的索引, 并将这些索引展平成一个一维数组# class_idcs是一个列表, 其中每个元素都是一个一维数组, 记录每个类别对应的样本索引class_idcs = [np.argwhere(train_label == y).flatten() for y in range(nclass)]# 创建一个名为client_idcs的列表, 其中包含n_clients个空列表, 每个空列表代表一个客户端的索引集合client_idcs = [[] for _ in range(n_clients)]# 使用zip函数将class_idcs和label_distribution进行配对, 并迭代处理每一对数据# 每次迭代, 都是一个不同的类别, c是从class_ids中取出的一个一维数组, 包含了本次迭代对应类别的样本索引,# fracs是从label_distribution中取出的一维数组, 表示当前类别的数据分配到各个客户端的比例for c, fracs in zip(class_idcs, label_distribution):# np.split按照比例将本次迭代对应类别的样本划分为了N个子集# (np.cumsum(fracs)[:-1] * len(c) 先计算了fracs数组的累积和, 然后[:-1]切片操作去除累积和数组的最后一个元素,以确保# fracs与新的累加和数组的长度相同, 最后累加和数组的每个元素乘以数组c的长度会得到一个新的数组# 将新数组的每个元素转换为整数后即为分割c的分割点# for i, idcs 为遍历第i个client对应样本集合的索引for i, idcs in enumerate(np.split(c, (np.cumsum(fracs)[:-1] * len(c)).astype(int))):client_idcs[i] += [idcs]for i in range(self.num_of_clients):idcs = client_idcs[i]# 记录每个client拥有(数据)的样本数量distribution = [len(c) for c in idcs]client_train_data[i] = train_data[np.concatenate(idcs)]client_train_label[i] = train_label[np.concatenate(idcs)]return client_train_data, client_train_label
定义CNN神经网络模型
这部分不在此赘述, 不明白的可移步主页
class Mnist_CNN(nn.Module):def __init__(self):super().__init__()self.conv1 = nn.Conv2d(in_channels=1, out_channels=30, kernel_size=3, stride=1, padding=1)self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)self.conv2 = nn.Conv2d(in_channels=30, out_channels=5, kernel_size=3, stride=1, padding=1)self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)self.fc1 = nn.Linear(7 * 7 * 5, 100)self.fc2 = nn.Linear(100, 10)def forward(self, inputs):tensor = inputs.view(-1, 1, 28, 28)tensor = F.relu(self.conv1(tensor))tensor = self.pool1(tensor)tensor = F.relu(self.conv2(tensor))tensor = self.pool2(tensor)tensor = tensor.view(-1, 7 * 7 * 5)tensor = F.relu(self.fc1(tensor))tensor = self.fc2(tensor)return tensor
客户端
在客户端接收聚合后的参数,并更新神经网络模型,利用新的神经网络模型进行数据集的训练,最后返回训练好的模型参数
class client:def __init__(self):self.dev = devself.train_DataLoader = Noneself.local_parameters = None# 模型训练def localUpdate(self, localEpoch, localBatchSize, Net, lossFun, opti, global_parameters, trainDataSet, dev):# localEpoch: 当前Client的迭代次数# localBatchSize: 当前Client的batchsize大小# Net: Server共享的模型# LossFun: 损失函数# opti: 优化函数# global_parmeters: 当前通讯中最全局参数# return: 返回当前Client基于自己的数据训练得到的新的模型参数# 加载当前通信中最新全局参数# 并将global_parameters传入网络模型Net.load_state_dict(global_parameters, strict=True)# 加载本地数据, client自己的数据集self.train_DataLoader = DataLoader(trainDataSet, batch_size=localBatchSize, shuffle=True)# 设置迭代次数for epoch in range(localEpoch):for data, label in self.train_DataLoader:# 加载到GPU上data, label = data.to(dev), label.to(dev)# 模型上传入数据output = Net(data)# 计算损失函数loss = lossFun(output, label)# 将梯度归零,初始化梯度opti.zero_grad()# 反向传播loss.backward()# 计算梯度,并更新梯度opti.step()# 返回当前Client基于自己的数据训练得到的新的模型参数return Net.state_dict()def local_val(self):pass
服务器
将clients训练好的参数通过加权平均的方式合并为一个新的模型参数,并传递给clients
class server:def __init__(self, client_params):self.client_params = client_paramsdef agg_average(self):w = self.client_params# 将第一个权重字典赋值给weights_avg, 即为将第一个client的模型参数赋给weights_avgweights_avg = w[0]# 遍历weights_avg字典的所有键(代表一个神经网络模型中的所有的参数)for k in weights_avg.keys():# 遍历权重列表w中从第二个元素开始的所有权重字典, 即遍历所有client的参数for i in range(1, len(w)):# 在第一个client参数的基础上, 将其他client的参数也全部加到weights_avg(weights_avg充当累加的“容器”)weights_avg[k] = weights_avg[k] + w[i][k]# 联邦平均, 将所有client的参数求平均值得到新的参数weights_avg[k] = weights_avg[k] / len(w)return weights_avg
测试集评估模型
class test_accuracy:def test_accuracy(self, net, parameters, testDataLoader, dev, lossFun):# 存储损失total_test_loss = 0with torch.no_grad():net.load_state_dict(parameters, strict=True)sum_accu = 0num = 0# 载入测试集for data, label in testDataLoader:data, label = data.to(dev), label.to(dev)output = net(data)loss = lossFun(output, label)# loss = 1total_test_loss = total_test_loss + loss.item()output = torch.argmax(output, dim=1)sum_accu += (output == label).float().mean()num += 1accuracy = sum_accu / numavg_loss = total_test_loss / numreturn avg_loss, accuracy
main函数
这段代码是一个基于联邦学习的神经网络训练过程:
- 设置设备(CPU或GPU)用于训练。
- 创建客户端数据,并将其分配给各个客户端。
- 初始化一个神经网络模型(Mnist_CNN)。
- 设置训练参数,如通信轮数、批次大小、损失函数和优化器。
- 在每一轮通信中,每个客户端使用本地数据进行训练,并更新其模型参数。
- 服务器收集所有客户端的模型参数,并计算全局模型参数的平均值。
- 将全局模型参数加载到网络模型中,并在测试数据集上评估模型的性能。
- 打印每10轮的训练结果,包括每个客户端的准确率和损失值以及全局模型的准确率和损失值。
if __name__ == "__main__":# ----------------------------------设置参数----------------------------------dev = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")# ----------------------------------创建clients, 并分配数据----------------------------------IID = False# 假设有10个客户端get_data = GetData(isIID=IID, num_clients=10, dev=dev)clients_data, clients_label, testDataLoader = get_data.load_data()# ----------------------------------初始化模型----------------------------------# 模型实例化net = Mnist_CNN()net = net.to(dev)# 通信轮数rounds = 100# batch_sizebatch_size = 64# 定义损失函数loss_func = nn.CrossEntropyLoss()loss_func = loss_func.to(dev)# 定义优化器if IID:lr = 0.00001else:lr = 0.0001opti = optim.Adam(net.parameters(), lr=lr)# 客户端数量num_in_comm = 10# ----------------------------------训练----------------------------------# 定义变量global_parametersglobal_parameters = net.state_dict()# train_loss = []# clients与server之间通信for curr_round in range(1, rounds + 1):local_loss = []client_params = {}sum_parameters = Nonefor k in range(num_in_comm):# print(k)my_client = client()train_data = clients_data[k]train_label = torch.tensor(clients_label[k])# 每个client训练得到的权重local_parameters = my_client.localUpdate(localEpoch=1, localBatchSize=batch_size, Net=net,lossFun=loss_func,opti=opti,global_parameters=global_parameters,trainDataSet=TensorDataset(train_data, train_label), dev=dev)client_params[k] = local_parametersaccuracy = test_accuracy()local_loss, local_acc = accuracy.test_accuracy(net, local_parameters, testDataLoader, dev, loss_func)if curr_round % 10 == 0:print('[Round: %d Client: %d] accuracy: %f loss: %f ' % (curr_round, k, local_acc, local_loss))# 取平均值,得到本次通信中server得到的更新后的模型参数s = server(client_params)global_parameters = s.agg_average()net.load_state_dict(global_parameters, strict=True)accuracy = test_accuracy()global_loss, global_acc = accuracy.test_accuracy(net, global_parameters, testDataLoader, dev, loss_func)if curr_round % 10 == 0:print('----------------------------------[Round: %d] accuracy: %f loss: %f----------------------------------'% (curr_round, global_acc, global_loss))
完整代码
import torch
import torchvision
import numpy as np
from torch import nn, optim
from torch.utils.data import DataLoader, TensorDataset
from torchvision import transforms
import torch.nn.functional as F# ----------------------------------数据管理----------------------------------
class GetData:def __init__(self, isIID, num_clients, dev, beta=0.4, BatchSize=256):self.dev = devself.beta = betaself.is_iid = isIIDself.num_of_clients = num_clients # 客户端的数量self.BatchSize = BatchSize# self.clients_set = {} # 用于整合客户端信息def load_data(self):# 加载MNIST数据集train_dataset = torchvision.datasets.MNIST("./dataset", train=True, download=False)test_dataset = torchvision.datasets.MNIST("./dataset", train=False, download=False)# 将数据与标签分离train_data = train_dataset.data.to(torch.float)train_labels = np.array(train_dataset.targets)test_data = test_dataset.data.data.to(torch.float)test_labels = test_dataset.targets# 对数据进行归一化处理, 使得数据的均值为0, 标准差为1mean = (train_data.mean()) / (train_data.max() - train_data.min())std = (train_data.std()) / (train_data.max() - train_data.min())transform = transforms.Compose([transforms.Normalize((mean,), (std,))])train_data = transform(train_data)test_data = transform(test_data)# train_data.shpe[0] <-> len(train_data) 为数据集的大小train_data_size = train_data.shape[0]test_data_size = test_data.shape[0]test_DataLoader = DataLoader(TensorDataset(test_data, test_labels), batch_size=self.BatchSize)# 整体数据集(MNIST)中的类别数nclass = np.max(train_labels) + 1# 数据划分client_train_data = {}client_train_label = {}distribution = {}if self.is_iid:# 设置随机种子数可以保证每次运行代码时,np.random.permutation(self.train_data_size)产生的数列都是一样的, 这样可以确保实验的可重复性np.random.seed(12)# 对训练数据集(序号)进行随机排列, 得到一个索引数组idxs(实现了将数据集打乱)idxs = np.random.permutation(train_data_size)# 将索引数组idxs分割成与客户端数量相等的子数组batch_idxsbatch_idxs = np.array_split(idxs, self.num_of_clients)# 遍历所有客户端for i in range(self.num_of_clients):# 根据索引数组batch_idxs[i], 给每个客户端分配相应的数据和标签client_train_data[i] = train_data[batch_idxs[i]]client_train_label[i] = train_labels[batch_idxs[i]]return client_train_data, client_train_label, test_DataLoaderelse:n_clients = self.num_of_clientstrain_label = train_labelsnp.random.seed(250)# [self.beta] * n_clients会创建一个长度为n_clients, 每个元素都为self.beta的列表# 生成一个形状为(nclass, n_clients)的矩阵, 记录每个类别划分到每个client的比例label_distribution = np.random.dirichlet([self.beta] * n_clients, nclass)# 对于每一个在范围[0, nclass-1]内的整数y, 找到train_label中所有等于y的元素的索引, 并将这些索引展平成一个一维数组# class_idcs是一个列表, 其中每个元素都是一个一维数组, 记录每个类别对应的样本索引class_idcs = [np.argwhere(train_label == y).flatten() for y in range(nclass)]# 创建一个名为client_idcs的列表, 其中包含n_clients个空列表, 每个空列表代表一个客户端的索引集合client_idcs = [[] for _ in range(n_clients)]# 使用zip函数将class_idcs和label_distribution进行配对, 并迭代处理每一对数据# 每次迭代, 都是一个不同的类别, c是从class_ids中取出的一个一维数组, 包含了本次迭代对应类别的样本索引,# fracs是从label_distribution中取出的一维数组, 表示当前类别的数据分配到各个客户端的比例for c, fracs in zip(class_idcs, label_distribution):# np.split按照比例将本次迭代对应类别的样本划分为了N个子集# (np.cumsum(fracs)[:-1] * len(c) 先计算了fracs数组的累积和, 然后[:-1]切片操作去除累积和数组的最后一个元素,以确保# fracs与新的累加和数组的长度相同, 最后累加和数组的每个元素乘以数组c的长度会得到一个新的数组# 将新数组的每个元素转换为整数后即为分割c的分割点# for i, idcs 为遍历第i个client对应样本集合的索引for i, idcs in enumerate(np.split(c, (np.cumsum(fracs)[:-1] * len(c)).astype(int))):client_idcs[i] += [idcs]for i in range(self.num_of_clients):idcs = client_idcs[i]# 记录每个client拥有(数据)的样本数量distribution = [len(c) for c in idcs]client_train_data[i] = train_data[np.concatenate(idcs)]client_train_label[i] = train_label[np.concatenate(idcs)]# yield client_train_data[i], client_train_label[i]return client_train_data, client_train_label, test_DataLoader# ----------------------------------定义CNN神经网络模型----------------------------------
class Mnist_CNN(nn.Module):def __init__(self):super().__init__()self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=5, stride=1, padding=2)self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=5, stride=1, padding=2)self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)self.fc1 = nn.Linear(7 * 7 * 64, 512)self.fc2 = nn.Linear(512, 10)def forward(self, inputs):tensor = inputs.view(-1, 1, 28, 28)tensor = F.relu(self.conv1(tensor))tensor = self.pool1(tensor)tensor = F.relu(self.conv2(tensor))tensor = self.pool2(tensor)tensor = tensor.view(-1, 7 * 7 * 64)tensor = F.relu(self.fc1(tensor))tensor = self.fc2(tensor)return tensor# ----------------------------------客户端----------------------------------
class client:def __init__(self):self.dev = devself.train_DataLoader = Noneself.local_parameters = None# 模型训练def localUpdate(self, localEpoch, localBatchSize, Net, lossFun, opti, global_parameters, trainDataSet, dev):# localEpoch: 当前Client的迭代次数# localBatchSize: 当前Client的batchsize大小# Net: Server共享的模型# LossFun: 损失函数# opti: 优化函数# global_parmeters: 当前通讯中最全局参数# return: 返回当前Client基于自己的数据训练得到的新的模型参数# 加载当前通信中最新全局参数# 并将global_parameters传入网络模型Net.load_state_dict(global_parameters, strict=True)# 加载本地数据, client自己的数据集self.train_DataLoader = DataLoader(trainDataSet, batch_size=localBatchSize, shuffle=True)# 设置迭代次数for epoch in range(localEpoch):for data, label in self.train_DataLoader:# 加载到GPU上data, label = data.to(dev), label.to(dev)# 模型上传入数据output = Net(data)# 计算损失函数loss = lossFun(output, label)# 将梯度归零,初始化梯度opti.zero_grad()# 反向传播loss.backward()# 计算梯度,并更新梯度opti.step()# 返回当前Client基于自己的数据训练得到的新的模型参数return Net.state_dict()def local_val(self):pass# ----------------------------------服务器----------------------------------
class server:def __init__(self, client_params):self.client_params = client_paramsdef agg_average(self):w = self.client_paramsweights_avg = w[0]for k in weights_avg.keys():for i in range(1, len(w)):weights_avg[k] = weights_avg[k] + w[i][k]weights_avg[k] = weights_avg[k] / len(w)return weights_avg# ----------------------------------在测试集上评估模型的性能, 计算准确率和平均损失----------------------------------
class test_accuracy:def test_accuracy(self, net, parameters, testDataLoader, dev, lossFun):# 存储损失loss_collector = []with torch.no_grad():net.load_state_dict(parameters, strict=True)sum_accu = 0num = 0loss_collector.clear()# 载入测试集for data, label in testDataLoader:data, label = data.to(dev), label.to(dev)output = net(data)loss = lossFun(output, label)# loss = 1loss_collector.append(loss.item())output = torch.argmax(output, dim=1)sum_accu += (output == label).float().mean()num += 1accuracy = sum_accu / numavg_loss = sum(loss_collector) / len(loss_collector)return avg_loss, accuracyif __name__ == "__main__":# ----------------------------------设置参数----------------------------------dev = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")# ----------------------------------创建clients, 并分配数据----------------------------------IID = False# 假设有10个客户端get_data = GetData(isIID=IID, num_clients=10, dev=dev)clients_data, clients_label, testDataLoader = get_data.load_data()# ----------------------------------初始化模型----------------------------------# 模型实例化net = Mnist_CNN()net = net.to(dev)# 通信轮数rounds = 100# batch_sizebatch_size = 64# 定义损失函数loss_func = nn.CrossEntropyLoss()loss_func = loss_func.to(dev)# 定义优化器if IID:lr = 0.00001else:lr = 0.0001opti = optim.Adam(net.parameters(), lr=lr)# 客户端数量num_in_comm = 10# ----------------------------------训练----------------------------------# 定义变量global_parametersglobal_parameters = net.state_dict()# clients与server之间通信for curr_round in range(1, rounds + 1):local_loss = []client_params = {}for k in range(num_in_comm):# print(k)my_client = client()train_data = clients_data[k]train_label = torch.tensor(clients_label[k])# 每个client训练得到的权重local_parameters = my_client.localUpdate(localEpoch=1, localBatchSize=batch_size, Net=net,lossFun=loss_func,opti=opti,global_parameters=global_parameters,trainDataSet=TensorDataset(train_data, train_label), dev=dev)client_params[k] = local_parametersaccuracy = test_accuracy()local_loss, local_acc = accuracy.test_accuracy(net, local_parameters, testDataLoader, dev, loss_func)if curr_round % 10 == 0:print('[Round: %d Client: %d] accuracy: %f loss: %f ' % (curr_round, k, local_acc, local_loss))# 取平均值,得到本次通信中server得到的更新后的模型参数s = server(client_params)global_parameters = s.agg_average()net.load_state_dict(global_parameters, strict=True)accuracy = test_accuracy()global_loss, global_acc = accuracy.test_accuracy(net, global_parameters, testDataLoader, dev, loss_func)if curr_round % 10 == 0:print('----------------------------------[Round: %d] accuracy: %f loss: %f----------------------------------'% (curr_round, global_acc, global_loss))
运行结果
IID
None-IID
在本次实验中,模型同构时收敛效果比较好,但在异构时效果并不稳定,所以后续学习异构情况下的优化策略
参考
基于PaddlePaddle实现联邦学习算法FedAvg - 飞桨AI Studio星河社区
联邦学习方法FedAvg实现(PyTorch) - 樱桃小屋 (cherry1024.github.io)
PyTorch 实现联邦学习FedAvg (详解)_pytorch fedavg-CSDN博客