Pytorch分布式训练

Pytorch分布式训练学习整理

参考资料

源码解析:PyTorch 源码解读之 DP & DDP:模型并行和分布式训练解析

简单小模型示例:pytorch中分布式训练DDP教程(新手快速入门!)

Pytorch - 弹性训练极简实现(附源码)

系列文章:【分布式训练】单机多卡的正确打开方式(一):理论基础

【分布式训练】基于PyTorch进行多GPU分布式模型训练(补充)

较新较详细的教程:torch分布式训练

博客:pytorch弹性分布式训练

数据并行

简介

数据并行化中,模型训练作业是在数据上进行分割的。作业中的每个GPU接收到自己独立的数据批处理切片。每个GPU使用这些数据来独立计算梯度更新。例如,如果要使用两个GPU和32的批处理大小,一个GPU将处理前16条记录的向前和向后传播,第二个处理后16条记录的向后和向前传播。这些梯度更新然后在GPU之间同步,一起平均,最后应用到模型。

流程

../_images/ps.svg

DP 基于单机多卡,所有设备都负责计算和训练网络,除此之外, device[0] (并非 GPU 真实标号而是输入参数 device_ids 首位) 还要负责整合梯度,更新参数。从图中我们可以看出,有三个主要过程:

  • 过程一(图中红色部分):各卡分别计算损失和梯度
  • 过程二(图中蓝色部分):所有梯度整合到 device[0]
  • 过程三(图中绿色部分):device[0] 进行参数更新,其他卡拉取 device[0] 的参数进行更新

所有卡都并行运算(图中红色),将梯度收集到 device[0](图中浅蓝色)和 device[0] 分享模型参数给其他 GPU(图中绿色)三个主要过程。

更详细的流程如下图所示:

分析

  1. 负载不均衡:device[0] 负载大一些
  2. 通信开销:假设有个GPU, 传输总量是,每一次的通信上限是,则完成一次通信需要时间,那么总共需要花费时间
  3. 完全是单进程,仅在复制过程中利用一下多线程,同时存在Global Interpreter Lock (GIL)全局解释器锁,一个 Python 进程只能利用一个 CPU kernel,即单核多线程并发时,只能执行一个线程。

模型并行

简介

随着大模型的出现,简单的数据并行已经无法满足需求,毕竟一个模型的大小就有可能超过显卡的显存,更不可能将其复制多份。因此需要让每一张卡仅负责模型的一部分计算,承载模型的一小部分。

使用DDP进行分布式训练有以下几个优势:

  1. 加速训练:通过数据并行,DDP能够在多个设备或节点上同时处理不同批次的数据,从而加快训练速度。
  2. 内存效率:DDP在每个设备上只保存模型的局部副本和相应的梯度,而不是整个模型的副本,这样可以节省内存。
  3. 不需要额外的代码:在PyTorch中,使用DDP进行分布式训练几乎不需要修改您的原始模型和训练代码。

流程

Ring All Reduce

Scatter Reduce过程:首先将参数分为份,相邻的GPU传递不同的参数,在传递次之后,可以得到每一份参数的累积(在不同的GPU上)。

动图

All Gather:得到每一份参数的累积之后,再做一次传递,同步到所有的GPU上。

动图

假设有个GPU, 传输总量是,每一次的通信上限是,则完成一次通信需要时间,那么总共需要花费时间,可以看到通信成本与GPU数量无关。

分析

DDP采用多进程控制多GPU,共同训练模型,一份代码会被pytorch自动分配到n个进程并在n个GPU上运行。 DDP运用Ring-Reduce通信算法在每个GPU间对梯度进行通讯,交换彼此的梯度,从而获得所有GPU的梯度。对比DP,不需要在进行模型本体的通信,因此可以加速训练。

需要注意以下几点:

  1. 设置DistributedSampler来打乱数据,因为一个batch被分配到了好几个进程中,要确保不同的GPU拿到的不是同一份数据。
  2. 要告诉每个进程自己的id,即使用哪一块GPU。
  3. 如果需要做BatchNormalization,需要对数据进行同步。

代码

Torchrun使用及参数详解

核心概念

  • rank:进程号,在多进程上下文中,我们通常假定rank 0是第一个进程或者主进程,其它进程分别具有1,2,3不同rank号,这样总共具有4个进程。
  • node:物理节点,可以是一个容器也可以是一台机器,节点内部可以有多个GPU;nnodes指物理节点数量, nproc_per_node指每个物理节点上面进程的数量
  • local_rank:指在一个node上进程的相对序号,local_rank在node之间相互独立
  • WORLD_SIZE:全局进程总个数,即在一个分布式任务中rank的数量
  • Group:进程组,一个分布式任务对应了一个进程组。只有用户需要创立多个进程组时才会用到group来管理,默认情况下只有一个group
  • backend:通信后端,可选的包括:nccl(NVIDIA推出)、gloo(Facebook推出)、mpi(OpenMPI)。一般建议GPU训练选择nccl,CPU训练选择gloo
  • master_addr与master_port:主节点的地址以及端口,供init_method 的tcp方式使用。 因为pytorch中网络通信建立是从机去连接主机,运行ddp只需要指定主节点的IP与端口,其它节点的IP不需要填写。

如下图所示,共有3个节点(机器),每个节点上有4个GPU,每台机器上起4个进程,每个进程占一块GPU,那么图中一共有12个rank,nproc_per_node=4,nnodes=3,每个节点都有一个对应的node_rank

在这里插入图片描述

rank与GPU之间没有必然的对应关系,一个rank可以包含多个GPU;一个GPU也可以为多个rank服务(多进程共享GPU),在torch的分布式训练中习惯默认一个rank对应着一个GPU,因此local_rank可以当作GPU号

简介

torchrun相当于原来的torch.distributed.launch,有一些额外增加的功能:

  • 通过重启优雅处理某一个worker运行过程中的错误
  • worker的RANK和WORLD_SIZE都是被自动分配的
  • Node的数量允许从最小值到最大值中间弹性伸缩

torchrun命令与 python -m torch.distributed.run命令完全等同,为命令行命令

从旧版本迁移 --use_env

有一个参数 --use_env在目前版本的torchrun中是不存在的,因此需要做一点处理

  1. 将原始指定的–local-rank参数修改为从环境变量中读取
  2. 命令行不需要再次指定 --use_env参数

旧版本代码:

$ python -m torch.distributed.launch --use-env train_script.py
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--local-rank", type=int)
args = parser.parse_args()

local_rank = args.local_rank

新版本代码:

$ torchrun train_script.py
import os
local_rank = int(os.environ["LOCAL_RANK"])

命令行参数

参数名称 含义 示例
–nnodes 节点数量,一个节点对应一个主机 1或MIN_SIZE:MAX_SIZE(弹性训练)
–nproc-per-node 一个节点中的进程数量,一般一个进程使用一个显卡,故也通常表述为一个节点中显卡的数量 [auto, cpu, gpu, int]
–rdzv-backend rendezvous 后端 c10d etcd
–rdzv-endpoint rendezvous 后端地址 <host>:<port>
–rdzv-id 用户可以指定当前rendezvous的id,所有的node都要使用这同一个id
–rdzv-conf 希望传入rendezvous的其他参数 <key1>=<value1>
–standalone 单节点多卡的默认配置,不需要再传入上述的rendezvous参数,默认为C10d TCP 29400(–master-addr等也会失效) 选项
–max-restarts worker group重启的最大次数
–monitor-interval 检测worker状态的时间间隔(以秒为单位)
–start-method 创建子进程的方式 {spawn,fork,forkserver}
–role User-defined role for the workers.
-m 与python -m相同,将模块当作脚本运行 选项
–no-python 不使用python命令而是直接执行(如果这个文件并不是一个py文件会使用这个)
–run-path 使用runpy.run_path执行文件
–log-dir 日志文件存放目录
–redirects 将控制台输出的日志信息重定向到日志目录中的文件 [-r 3] 将所有worker的标准输出和标准错误进行重定向,[-r 0:1,1:2] 将rank 0的标准输出重定向,将rank 1的标准错误重定向
–tee 除将日志输出到控制台外也输出到日志文件 日志文件流
–node-rank 多节点分布式训练的时候该节点的Rank
–master-addr master 节点的 IP 地址,也就是 rank=0 对应的主机地址
–master-port master 节点的端口号,用于通信
–local-addr 本地节点的IP地址

torchrun主要是对多节点作了分布式的优化,从而可以满足容错性和弹性伸缩。如果是单节点就不需要很复杂。

环境变量

名称 含义 示例
LOCAL_RANK GPU在单节点中的序号 0 1
RANK GPU在全部节点的序号 0 1
GROUP_RANK worker组的rank 0 0
ROLE_RANK 相同ROLE的worker的rank 0 1
LOCAL_WORLD_SIZE 与–nproc-per-node相同 2 2
WORLD_SIZE job中worker的总数 2 2
ROLE_WORLD_SIZE 相同角色的worker的数量 1 2
MASTER_ADDR rank为0的worker的地址 127.0.0.1 127.0.0.1
MASTER_PORT rank为0的worker的端口 29500 29500
TORCHELASTIC_RESTART_COUNT 最近重启的worker组的数量 0 0
TORCHELASTIC_MAX_RESTARTS 配置的最大重启次数 0 0
TORCHELASTIC_RUN_ID 与–rdzv-id相同 none none
PYTHON_EXEC 执行这个脚本的python的位置 没有 没有

代码示例

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel
from torch.distributed import init_process_group, destroy_process_group
import os
import time

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))

class MyTrainDataset(Dataset):
    def __init__(self, size):
        self.size = size
        self.data = [(torch.rand(10), 0) for _ in range(size)]

    def __len__(self):
        return self.size
  
    def __getitem__(self, index):
        return self.data[index]

class Trainer:
    def __init__(
        self,
        model: torch.nn.Module,
        train_data: DataLoader,
        optimizer: torch.optim.Optimizer,
        save_every: int,
        snapshot_path: str,
    ) -> None:
        self.gpu_id = int(os.environ["LOCAL_RANK"])
        self.model = model.to(self.gpu_id)
        self.train_data = train_data
        self.optimizer = optimizer
        self.save_every = save_every
        self.epochs_run = 0
        self.snapshot_path = snapshot_path
        if os.path.exists(snapshot_path):
            print("Loading snapshot")
            self._load_snapshot(snapshot_path)

        self.model = DistributedDataParallel(self.model, device_ids=[self.gpu_id])

    def _load_snapshot(self, snapshot_path):
        loc = f"cuda:{self.gpu_id}"
        snapshot = torch.load(snapshot_path, map_location=loc)
        self.model.load_state_dict(snapshot["MODEL_STATE"])
        self.epochs_run = snapshot["EPOCHS_RUN"]
        print(f"Resuming training from snapshot at Epoch {self.epochs_run}")

    def _run_batch(self, source, targets):
        self.optimizer.zero_grad()
        output = self.model(source)
        # print(output,targets)
        loss = F.cross_entropy(output, targets)
        print(f"[GPU{self.gpu_id}] Loss {loss.item()}")
        loss.backward()
        self.optimizer.step()

    def _run_epoch(self, epoch):
        b_sz = len(next(iter(self.train_data))[0])
        print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
        self.train_data.sampler.set_epoch(epoch)
        for source, targets in self.train_data:
            source = source.to(self.gpu_id)
            targets = targets.to(self.gpu_id)
            self._run_batch(source, targets)

    def _save_snapshot(self, epoch):
        snapshot = {
            "MODEL_STATE": self.model.module.state_dict(),
            "EPOCHS_RUN": epoch,
        }
        torch.save(snapshot, self.snapshot_path)
        print(f"Epoch {epoch} | Training snapshot saved at {self.snapshot_path}")

    def train(self, max_epochs: int):
        for epoch in range(self.epochs_run, max_epochs):
            self._run_epoch(epoch)
            if self.gpu_id == 0 and epoch % self.save_every == 0:
                self._save_snapshot(epoch)
            time.sleep(1)

def ddp_setup():
    init_process_group(backend="nccl")
    print("Parameters")
    print(f"LOCAL_RANK:{os.environ['LOCAL_RANK']}")
    print(f"RANK:{os.environ['RANK']}")
    print(f"GROUP_RANK:{os.environ['GROUP_RANK']}")
    print(f"ROLE_RANK:{os.environ['ROLE_RANK']}")
    print(f"LOCAL_WORLD_SIZE:{os.environ['LOCAL_WORLD_SIZE']}")
    print(f"WORLD_SIZE:{os.environ['WORLD_SIZE']}")
    print(f"ROLE_WORLD_SIZE:{os.environ['ROLE_WORLD_SIZE']}")
    print(f"MASTER_ADDR:{os.environ['MASTER_ADDR']}")
    print(f"MASTER_PORT:{os.environ['MASTER_PORT']}")
    print("")
    torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))

def load_train_objs():
    train_set = MyTrainDataset(2048)  # load your dataset
    model = ToyModel()
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
    return train_set, model, optimizer

def prepare_dataloader(dataset: Dataset, batch_size: int):
    return DataLoader(
        dataset,
        batch_size=batch_size,
        pin_memory=True,
        shuffle=False,
        sampler=DistributedSampler(dataset)
    )

def main(save_every: int, total_epochs: int, batch_size: int, snapshot_path: str = "snapshot.pt"):
    ddp_setup()
    dataset, model, optimizer = load_train_objs()
    train_data = prepare_dataloader(dataset, batch_size)
    trainer = Trainer(model, train_data, optimizer, save_every, snapshot_path)
    trainer.train(total_epochs)
    destroy_process_group()

if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser(description='simple distributed training job')
    parser.add_argument('--total_epochs', default=10, type=int, help='Total epochs to train the model')
    parser.add_argument('--save_every', default=2, type=int, help='How often to save a snapshot')
    parser.add_argument('--batch_size', default=512, type=int, help='Input batch size on each device (default: 32)')
    args = parser.parse_args()
  
    main(args.save_every, args.total_epochs, args.batch_size)

与单卡有几点不同:

  1. 初始化进程组:init_process_group(backend="nccl"),后端一般选择nccl
  2. 分布式数据采样器:sampler=DistributedSampler(dataset)
  3. 封装模型:self.model = DistributedDataParallel(self.model, device_ids=[self.gpu_id])
  4. 启动torchrun脚本进行训练

训练脚本:

  1. 单机多卡
torchrun \
    --nnodes=1 \
    --nproc_per_node=2 \
	--master-addr=127.0.0.1 \
	--master-port=29500 \
	main.py
  1. 多机多卡
export NCCL_DEBUG=info
export NCCL_SOCKET_IFNAME=bond0
export NCCL_IB_DISABLE=1

torchrun \
    --nnodes=2 \
    --nproc_per_node=2 \
	--master-addr=10.208.58.27 \
	--master-port=29602 \
	--node-rank=0 \
	main.py
export NCCL_DEBUG=info
export NCCL_SOCKET_IFNAME=bond0
export NCCL_IB_DISABLE=1

torchrun \
    --nnodes=2 \
    --nproc_per_node=1 \
	--master-addr=10.208.58.27 \
	--master-port=29602 \
	--node-rank=1 \
	main.py

注意事项:

  1. 多进程训练,也就是会同时运行多份代码,因此训练时候要想好GPU的序号等需要自己指定的变量
  2. 数据是按照进程数量分的,比如总共2048条,如果三个进程就每一个进程683

测试环境:

master:10.208.58.27 2*V100

Parameters
LOCAL_RANK:0
RANK:0
GROUP_RANK:0
ROLE_RANK:0
LOCAL_WORLD_SIZE:2
WORLD_SIZE:3
ROLE_WORLD_SIZE:3
MASTER_ADDR:10.208.58.27
MASTER_PORT:29602

Parameters
LOCAL_RANK:1
RANK:1
GROUP_RANK:0
ROLE_RANK:1
LOCAL_WORLD_SIZE:2
WORLD_SIZE:3
ROLE_WORLD_SIZE:3
MASTER_ADDR:10.208.58.27
MASTER_PORT:29602

worker:1*A100

Parameters
LOCAL_RANK:0
RANK:2
GROUP_RANK:1
ROLE_RANK:2
LOCAL_WORLD_SIZE:1
WORLD_SIZE:3
ROLE_WORLD_SIZE:3
MASTER_ADDR:10.208.58.27
MASTER_PORT:29602

Pytorch分布式训练
https://zhangzhao219.github.io/2023/08/12/Pytorch-distributed/
作者
Zhang Zhao
发布于
2023年8月12日
许可协议