分享

GPU多卡并行训练总结(以pytorch为例)

问题导读:
1、为什么要使用多GPU并行训练?
2、常见的多GPU训练方法有哪些?
3、BN如何在不同设备之间同步?
4、如何理解DistributedDataParallel运行过程?



为什么要使用多GPU并行训练

本简单来说,有两种原因:第一种是模型在一块GPU上放不下,两块或多块GPU上就能运行完整的模型(如早期的AlexNet)。第二种是多块GPU并行计算可以达到加速训练的效果。想要成为“炼丹大师“,多GPU并行训练是不可或缺的技能。

常见的多GPU训练方法:
1.模型并行方式:如果模型特别大,GPU显存不够,无法将一个显存放在GPU上,需要把网络的不同模块放在不同GPU上,这样可以训练比较大的网络。(下图左半部分)

2.数据并行方式:将整个模型放在一块GPU里,再复制到每一块GPU上,同时进行正向传播和反向误差传播。相当于加大了batch_size。(下图右半部分)

2023-04-25_201651.jpg

在pytorch1.7 + cuda10 + TeslaV100的环境下,使用ResNet34,batch_size=16, SGD对花草数据集训练的情况如下:使用一块GPU需要9s一个epoch,使用两块GPU是5.5s, 8块是2s。这里有一个问题,为什么运行时间不是9/8≈1.1s ? 因为使用GPU数量越多,设备之间的通讯会越来越复杂,所以随着GPU数量的增加,训练速度的提升也是递减的。

2023-04-25_201722.jpg

误差梯度如何在不同设备之间通信?
在每个GPU训练step结束后,将每块GPU的损失梯度求平均,而不是每块GPU各计算各的。

BN如何在不同设备之间同步?
假设batch_size=2,每个GPU计算的均值和方差都针对这两个样本而言的。而BN的特性是:batch_size越大,均值和方差越接近与整个数据集的均值和方差,效果越好。使用多块GPU时,会计算每个BN层在所有设备上输入的均值和方差。如果GPU1和GPU2都分别得到两个特征层,那么两块GPU一共计算4个特征层的均值和方差,可以认为batch_size=4。注意:如果不用同步BN,而是每个设备计算自己的批次数据的均值方差,效果与单GPU一致,仅仅能提升训练速度;如果使用同步BN,效果会有一定提升,但是会损失一部分并行速度。
2023-04-25_201759.jpg

下图为单GPU、以及是否使用同步BN训练的三种情况,可以看到使用同步BN(橙线)比不使用同步BN(蓝线)总体效果要好一些,不过训练时间也会更长。使用单GPU(黑线)和不使用同步BN的效果是差不多的。
两种GPU训练方法:DataParallel 和 DistributedDataParallel:

  •     DataParallel是单进程多线程的,仅仅能工作在单机中。而DistributedDataParallel是多进程的,可以工作在单机或多机器中。
  •     DataParallel通常会慢于DistributedDataParallel。所以目前主流的方法是DistributedDataParallel。

pytorch中常见的GPU启动方式:
2023-04-25_201838.jpg

注:distributed.launch方法如果开始训练后,手动终止程序,最好先看下显存占用情况,有小概率进程没kill的情况,会占用一部分GPU显存资源。
下面以分类问题为基准,详细介绍使用DistributedDataParallel时的过程:

首先要初始化各进程环境:

  1. def init_distributed_mode(args):
  2.     # 如果是多机多卡的机器,WORLD_SIZE代表使用的机器数,RANK对应第几台机器
  3.     # 如果是单机多卡的机器,WORLD_SIZE代表有几块GPU,RANK和LOCAL_RANK代表第几块GPU
  4.     if'RANK'in os.environ and'WORLD_SIZE'in os.environ:
  5.         args.rank = int(os.environ["RANK"])
  6.         args.world_size = int(os.environ['WORLD_SIZE'])
  7.         # LOCAL_RANK代表某个机器上第几块GPU
  8.         args.gpu = int(os.environ['LOCAL_RANK'])
  9.     elif'SLURM_PROCID'in os.environ:
  10.         args.rank = int(os.environ['SLURM_PROCID'])
  11.         args.gpu = args.rank % torch.cuda.device_count()
  12.     else:
  13.         print('Not using distributed mode')
  14.         args.distributed = False
  15.         return
  16.     args.distributed = True
  17.     torch.cuda.set_device(args.gpu)  # 对当前进程指定使用的GPU
  18.     args.dist_backend = 'nccl'# 通信后端,nvidia GPU推荐使用NCCL
  19.     dist.barrier()  # 等待每个GPU都运行完这个地方以后再继续
复制代码

在main函数初始阶段,进行以下初始化操作。需要注意的是,学习率需要根据使用GPU的张数增加。在这里使用简单的倍增方法。

  1. def main(args):
  2.     if torch.cuda.is_available() isFalse:
  3.         raise EnvironmentError("not find GPU device for training.")
  4.     # 初始化各进程环境
  5.     init_distributed_mode(args=args)
  6.     rank = args.rank
  7.     device = torch.device(args.device)
  8.     batch_size = args.batch_size
  9.     num_classes = args.num_classes
  10.     weights_path = args.weights
  11.     args.lr *= args.world_size  # 学习率要根据并行GPU的数倍增
复制代码

实例化数据集可以使用单卡相同的方法,但在sample样本时,和单机不同,需要使用DistributedSampler和BatchSampler。

  1. #给每个rank对应的进程分配训练的样本索引
  2. train_sampler=torch.utils.data.distributed.DistributedSampler(train_data_set)
  3. val_sampler=torch.utils.data.distributed.DistributedSampler(val_data_set)
  4. #将样本索引每batch_size个元素组成一个list
  5. train_batch_sampler=torch.utils.data.BatchSampler(
  6. train_sampler,batch_size,drop_last=True)
复制代码

DistributedSampler原理如图所示:假设当前数据集有0~10共11个样本,使用2块GPU计算。首先打乱数据顺序,然后用 11/2 =6(向上取整),然后6乘以GPU个数2 = 12,因为只有11个数据,所以再把第一个数据(索引为6的数据)补到末尾,现在就有12个数据可以均匀分到每块GPU。然后分配数据:间隔将数据分配到不同的GPU中。

2023-04-25_201946.jpg

BatchSampler原理: DistributedSmpler将数据分配到两个GPU上,以第一个GPU为例,分到的数据是6,9,10,1,8,7,假设batch_size=2,就按顺序把数据两两一组,在训练时,每次获取一个batch的数据,就从组织好的一个个batch中取到。注意:只对训练集处理,验证集不使用BatchSampler。

2023-04-25_202021.jpg

接下来使用定义好的数据集和sampler方法加载数据:

  1. train_loader = torch.utils.data.DataLoader(train_data_set,
  2.                                                batch_sampler=train_batch_sampler,
  3.                                                pin_memory=True,   # 直接加载到显存中,达到加速效果
  4.                                                num_workers=nw,
  5.                                                collate_fn=train_data_set.collate_fn)
  6. val_loader = torch.utils.data.DataLoader(val_data_set,
  7.                                              batch_size=batch_size,
  8.                                              sampler=val_sampler,
  9.                                              pin_memory=True,
  10.                                              num_workers=nw,
  11.                                              collate_fn=val_data_set.collate_fn)
复制代码


如果有预训练权重的话,需要保证每块GPU加载的权重是一模一样的。需要在主进程保存模型初始化权重,在不同设备上载入主进程保存的权重。这样才能保证每块GOU上加载的权重是一致的:

  1. # 实例化模型
  2.     model = resnet34(num_classes=num_classes).to(device)
  3.     # 如果存在预训练权重则载入
  4.     if os.path.exists(weights_path):
  5.         weights_dict = torch.load(weights_path, map_location=device)
  6.         # 简单对比每层的权重参数个数是否一致
  7.         load_weights_dict = {k: v for k, v in weights_dict.items()
  8.                              if model.state_dict()[k].numel() == v.numel()}
  9.         model.load_state_dict(load_weights_dict, strict=False)
  10.     else:
  11.         checkpoint_path = os.path.join(tempfile.gettempdir(), "initial_weights.pt")
  12.         # 如果不存在预训练权重,需要将第一个进程中的权重保存,然后其他进程载入,保持初始化权重一致
  13.         if rank == 0:
  14.             torch.save(model.state_dict(), checkpoint_path)
  15.         dist.barrier()
  16.         # 这里注意,一定要指定map_location参数,否则会导致第一块GPU占用更多资源
  17.         model.load_state_dict(torch.load(checkpoint_path, map_location=device))
复制代码

如果需要冻结模型权重,和单GPU基本没有差别。如果不需要冻结权重,可以选择是否同步BN层。然后再把模型包装成DDP模型,就可以方便进程之间的通信了。多GPU和单GPU的优化器设置没有差别,这里不再赘述。

  1. # 是否冻结权重
  2.     if args.freeze_layers:
  3.         for name, para in model.named_parameters():
  4.             # 除最后的全连接层外,其他权重全部冻结
  5.             if"fc"notin name:
  6.                 para.requires_grad_(False)
  7.     else:
  8.         # 只有训练带有BN结构的网络时使用SyncBatchNorm采用意义
  9.         if args.syncBN:
  10.             # 使用SyncBatchNorm后训练会更耗时
  11.             model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model).to(device)
  12. # 转为DDP模型
  13.          model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])
  14. # optimizer使用SGD+余弦淬火策略
  15.     pg = [p for p in model.parameters() if p.requires_grad]
  16.     optimizer = optim.SGD(pg, lr=args.lr, momentum=0.9, weight_decay=0.005)
  17.     lf = lambda x: ((1 + math.cos(x * math.pi / args.epochs)) / 2) * (1 - args.lrf) + args.lrf  # cosine
  18.     scheduler = lr_scheduler.LambdaLR(optimizer, lr_lambda=lf)
复制代码

与单GPU不同的地方:rain_sampler.set_epoch(epoch),这行代码会在每次迭代的时候获得一个不同的生成器,每一轮开始迭代获取数据之前设置随机种子,通过改变传进的epoch参数改变打乱数据顺序。通过设置不同的随机种子,可以让不同GPU每轮拿到的数据不同。后面的部分和单GPU相同。

  1. for epoch in range(args.epochs):
  2.         train_sampler.set_epoch(epoch)  
  3.         mean_loss = train_one_epoch(model=model,
  4.                                     optimizer=optimizer,
  5.                                     data_loader=train_loader,
  6.                                     device=device,
  7.                                     epoch=epoch)
  8.         scheduler.step()
  9.         sum_num = evaluate(model=model,
  10.                            data_loader=val_loader,
  11.                            device=device)
  12.         acc = sum_num / val_sampler.total_size
复制代码

我们详细看看每个epoch是训练时和单GPU训练的差异(上面的train_one_epoch)

  1. def train_one_epoch(model, optimizer, data_loader, device, epoch):
  2.     model.train()
  3.     loss_function = torch.nn.CrossEntropyLoss()
  4.     mean_loss = torch.zeros(1).to(device)
  5.     optimizer.zero_grad()
  6.     # 在进程0中打印训练进度
  7.     if is_main_process():
  8.         data_loader = tqdm(data_loader)
  9.     for step, data in enumerate(data_loader):
  10.         images, labels = data
  11.         pred = model(images.to(device))
  12.         loss = loss_function(pred, labels.to(device))
  13.         loss.backward()
  14.         loss = reduce_value(loss, average=True)  #  在单GPU中不起作用,多GPU时,获得所有GPU的loss的均值。
  15.         mean_loss = (mean_loss * step + loss.detach()) / (step + 1)  # update mean losses
  16.         # 在进程0中打印平均loss
  17.         if is_main_process():
  18.             data_loader.desc = "[epoch {}] mean loss {}".format(epoch, round(mean_loss.item(), 3))
  19.         ifnot torch.isfinite(loss):
  20.             print('WARNING: non-finite loss, ending training ', loss)
  21.             sys.exit(1)
  22.         optimizer.step()
  23.         optimizer.zero_grad()
  24.     # 等待所有进程计算完毕
  25.     if device != torch.device("cpu"):
  26.         torch.cuda.synchronize(device)
  27.     return mean_loss.item()
复制代码
  1. def reduce_value(value, average=True):
  2.     world_size = get_world_size()
  3.     if world_size < 2:  # 单GPU的情况
  4.         return value
  5.     with torch.no_grad():
  6.         dist.all_reduce(value)   # 对不同设备之间的value求和
  7.         if average:  # 如果需要求平均,获得多块GPU计算loss的均值
  8.             value /= world_size
  9.         return value
复制代码


接下来看一下验证阶段的情况,和单GPU最大的额不同之处是预测正确样本个数的地方。

  1. @torch.no_grad()
  2. def evaluate(model, data_loader, device):
  3.     model.eval()
  4.     # 用于存储预测正确的样本个数,每块GPU都会计算自己正确样本的数量
  5.     sum_num = torch.zeros(1).to(device)
  6.     # 在进程0中打印验证进度
  7.     if is_main_process():
  8.         data_loader = tqdm(data_loader)
  9.     for step, data in enumerate(data_loader):
  10.         images, labels = data
  11.         pred = model(images.to(device))
  12.         pred = torch.max(pred, dim=1)[1]
  13.         sum_num += torch.eq(pred, labels.to(device)).sum()
  14.     # 等待所有进程计算完毕
  15.     if device != torch.device("cpu"):
  16.         torch.cuda.synchronize(device)
  17.     sum_num = reduce_value(sum_num, average=False)  # 预测正确样本个数
  18.     return sum_num.item()
复制代码

需要注意的是:保存模型的权重需要在主进程中进行保存。

  1. if rank == 0:
  2.             print("[epoch {}] accuracy: {}".format(epoch, round(acc, 3)))
  3.             tags = ["loss", "accuracy", "learning_rate"]
  4.             tb_writer.add_scalar(tags[0], mean_loss, epoch)
  5.             tb_writer.add_scalar(tags[1], acc, epoch)
  6.             tb_writer.add_scalar(tags[2], optimizer.param_groups[0]["lr"], epoch)
  7.             torch.save(model.module.state_dict(), "./weights/model-{}.pth".format(epoch))
复制代码

如果从头开始训练,主进程生成的初始化权重是以临时文件的形式保存,需要训练完后移除掉。最后还需要撤销进程组。

  1. if rank == 0:# 删除临时缓存文件        if os.path.exists(checkpoint_path) is
复制代码


作者:深度学习初学者
来源:https://mp.weixin.qq.com/s/daK098Mmirj0qGJA8QHUog

最新经典文章,欢迎关注公众号



没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条