Pytorch分布式训练
Pytorch分布式训练学习整理
参考资料
源码解析:PyTorch 源码解读之 DP & DDP:模型并行和分布式训练解析
简单小模型示例:pytorch中分布式训练DDP教程(新手快速入门!)
系列文章:【分布式训练】单机多卡的正确打开方式(一):理论基础
【分布式训练】基于PyTorch进行多GPU分布式模型训练(补充)
较新较详细的教程:torch分布式训练
数据并行
简介
数据并行化中,模型训练作业是在数据上进行分割的。作业中的每个GPU接收到自己独立的数据批处理切片。每个GPU使用这些数据来独立计算梯度更新。例如,如果要使用两个GPU和32的批处理大小,一个GPU将处理前16条记录的向前和向后传播,第二个处理后16条记录的向后和向前传播。这些梯度更新然后在GPU之间同步,一起平均,最后应用到模型。
流程
DP 基于单机多卡,所有设备都负责计算和训练网络,除此之外, device[0] (并非 GPU 真实标号而是输入参数 device_ids 首位) 还要负责整合梯度,更新参数。从图中我们可以看出,有三个主要过程:
- 过程一(图中红色部分):各卡分别计算损失和梯度
- 过程二(图中蓝色部分):所有梯度整合到 device[0]
- 过程三(图中绿色部分):device[0] 进行参数更新,其他卡拉取 device[0] 的参数进行更新
所有卡都并行运算(图中红色),将梯度收集到 device[0](图中浅蓝色)和 device[0] 分享模型参数给其他 GPU(图中绿色)三个主要过程。
更详细的流程如下图所示:
分析
- 负载不均衡:device[0] 负载大一些
- 通信开销:假设有个GPU, 传输总量是,每一次的通信上限是,则完成一次通信需要时间,那么总共需要花费时间
- 完全是单进程,仅在复制过程中利用一下多线程,同时存在Global Interpreter Lock (GIL)全局解释器锁,一个 Python 进程只能利用一个 CPU kernel,即单核多线程并发时,只能执行一个线程。
模型并行
简介
随着大模型的出现,简单的数据并行已经无法满足需求,毕竟一个模型的大小就有可能超过显卡的显存,更不可能将其复制多份。因此需要让每一张卡仅负责模型的一部分计算,承载模型的一小部分。
使用DDP进行分布式训练有以下几个优势:
- 加速训练:通过数据并行,DDP能够在多个设备或节点上同时处理不同批次的数据,从而加快训练速度。
- 内存效率:DDP在每个设备上只保存模型的局部副本和相应的梯度,而不是整个模型的副本,这样可以节省内存。
- 不需要额外的代码:在PyTorch中,使用DDP进行分布式训练几乎不需要修改您的原始模型和训练代码。
流程
Ring All Reduce
Scatter Reduce过程:首先将参数分为份,相邻的GPU传递不同的参数,在传递次之后,可以得到每一份参数的累积(在不同的GPU上)。
All Gather:得到每一份参数的累积之后,再做一次传递,同步到所有的GPU上。
假设有个GPU, 传输总量是,每一次的通信上限是,则完成一次通信需要时间,那么总共需要花费时间,可以看到通信成本与GPU数量无关。
分析
DDP采用多进程控制多GPU,共同训练模型,一份代码会被pytorch自动分配到n个进程并在n个GPU上运行。 DDP运用Ring-Reduce通信算法在每个GPU间对梯度进行通讯,交换彼此的梯度,从而获得所有GPU的梯度。对比DP,不需要在进行模型本体的通信,因此可以加速训练。
需要注意以下几点:
- 设置DistributedSampler来打乱数据,因为一个batch被分配到了好几个进程中,要确保不同的GPU拿到的不是同一份数据。
- 要告诉每个进程自己的id,即使用哪一块GPU。
- 如果需要做BatchNormalization,需要对数据进行同步。
代码
Torchrun使用及参数详解
核心概念
- rank:进程号,在多进程上下文中,我们通常假定rank 0是第一个进程或者主进程,其它进程分别具有1,2,3不同rank号,这样总共具有4个进程。
- node:物理节点,可以是一个容器也可以是一台机器,节点内部可以有多个GPU;nnodes指物理节点数量, nproc_per_node指每个物理节点上面进程的数量
- local_rank:指在一个node上进程的相对序号,local_rank在node之间相互独立
- WORLD_SIZE:全局进程总个数,即在一个分布式任务中rank的数量
- Group:进程组,一个分布式任务对应了一个进程组。只有用户需要创立多个进程组时才会用到group来管理,默认情况下只有一个group
- backend:通信后端,可选的包括:nccl(NVIDIA推出)、gloo(Facebook推出)、mpi(OpenMPI)。一般建议GPU训练选择nccl,CPU训练选择gloo
- master_addr与master_port:主节点的地址以及端口,供init_method 的tcp方式使用。 因为pytorch中网络通信建立是从机去连接主机,运行ddp只需要指定主节点的IP与端口,其它节点的IP不需要填写。
如下图所示,共有3个节点(机器),每个节点上有4个GPU,每台机器上起4个进程,每个进程占一块GPU,那么图中一共有12个rank,nproc_per_node=4,nnodes=3,每个节点都有一个对应的node_rank
rank与GPU之间没有必然的对应关系,一个rank可以包含多个GPU;一个GPU也可以为多个rank服务(多进程共享GPU),在torch的分布式训练中习惯默认一个rank对应着一个GPU,因此local_rank可以当作GPU号
简介
torchrun相当于原来的torch.distributed.launch,有一些额外增加的功能:
- 通过重启优雅处理某一个worker运行过程中的错误
- worker的RANK和WORLD_SIZE都是被自动分配的
- Node的数量允许从最小值到最大值中间弹性伸缩
torchrun
命令与 python -m torch.distributed.run
命令完全等同,为命令行命令
从旧版本迁移 --use_env
有一个参数 --use_env
在目前版本的torchrun中是不存在的,因此需要做一点处理
- 将原始指定的–local-rank参数修改为从环境变量中读取
- 命令行不需要再次指定
--use_env
参数
旧版本代码:
$ python -m torch.distributed.launch --use-env train_script.py
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--local-rank", type=int)
args = parser.parse_args()
local_rank = args.local_rank
新版本代码:
$ torchrun train_script.py
import os
local_rank = int(os.environ["LOCAL_RANK"])
命令行参数
参数名称 | 含义 | 示例 |
---|---|---|
–nnodes | 节点数量,一个节点对应一个主机 | 1或MIN_SIZE:MAX_SIZE(弹性训练) |
–nproc-per-node | 一个节点中的进程数量,一般一个进程使用一个显卡,故也通常表述为一个节点中显卡的数量 | [auto, cpu, gpu, int] |
–rdzv-backend | rendezvous 后端 | c10d etcd |
–rdzv-endpoint | rendezvous 后端地址 | <host> :<port> |
–rdzv-id | 用户可以指定当前rendezvous的id,所有的node都要使用这同一个id | |
–rdzv-conf | 希望传入rendezvous的其他参数 | <key1> =<value1> |
–standalone | 单节点多卡的默认配置,不需要再传入上述的rendezvous参数,默认为C10d TCP 29400(–master-addr等也会失效) | 选项 |
–max-restarts | worker group重启的最大次数 | |
–monitor-interval | 检测worker状态的时间间隔(以秒为单位) | |
–start-method | 创建子进程的方式 | {spawn,fork,forkserver} |
–role | User-defined role for the workers. | |
-m | 与python -m相同,将模块当作脚本运行 | 选项 |
–no-python | 不使用python命令而是直接执行(如果这个文件并不是一个py文件会使用这个) | |
–run-path | 使用runpy.run_path执行文件 | |
–log-dir | 日志文件存放目录 | |
–redirects | 将控制台输出的日志信息重定向到日志目录中的文件 | [-r 3] 将所有worker的标准输出和标准错误进行重定向,[-r 0:1,1:2] 将rank 0的标准输出重定向,将rank 1的标准错误重定向 |
–tee | 除将日志输出到控制台外也输出到日志文件 | 日志文件流 |
–node-rank | 多节点分布式训练的时候该节点的Rank | |
–master-addr | master 节点的 IP 地址,也就是 rank=0 对应的主机地址 | |
–master-port | master 节点的端口号,用于通信 | |
–local-addr | 本地节点的IP地址 |
torchrun主要是对多节点作了分布式的优化,从而可以满足容错性和弹性伸缩。如果是单节点就不需要很复杂。
环境变量
名称 | 含义 | 示例 | |
---|---|---|---|
LOCAL_RANK | GPU在单节点中的序号 | 0 | 1 |
RANK | GPU在全部节点的序号 | 0 | 1 |
GROUP_RANK | worker组的rank | 0 | 0 |
ROLE_RANK | 相同ROLE的worker的rank | 0 | 1 |
LOCAL_WORLD_SIZE | 与–nproc-per-node相同 | 2 | 2 |
WORLD_SIZE | job中worker的总数 | 2 | 2 |
ROLE_WORLD_SIZE | 相同角色的worker的数量 | 1 | 2 |
MASTER_ADDR | rank为0的worker的地址 | 127.0.0.1 | 127.0.0.1 |
MASTER_PORT | rank为0的worker的端口 | 29500 | 29500 |
TORCHELASTIC_RESTART_COUNT | 最近重启的worker组的数量 | 0 | 0 |
TORCHELASTIC_MAX_RESTARTS | 配置的最大重启次数 | 0 | 0 |
TORCHELASTIC_RUN_ID | 与–rdzv-id相同 | none | none |
PYTHON_EXEC | 执行这个脚本的python的位置 | 没有 | 没有 |
代码示例
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel
from torch.distributed import init_process_group, destroy_process_group
import os
import time
class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = nn.Linear(10, 10)
self.relu = nn.ReLU()
self.net2 = nn.Linear(10, 5)
def forward(self, x):
return self.net2(self.relu(self.net1(x)))
class MyTrainDataset(Dataset):
def __init__(self, size):
self.size = size
self.data = [(torch.rand(10), 0) for _ in range(size)]
def __len__(self):
return self.size
def __getitem__(self, index):
return self.data[index]
class Trainer:
def __init__(
self,
model: torch.nn.Module,
train_data: DataLoader,
optimizer: torch.optim.Optimizer,
save_every: int,
snapshot_path: str,
) -> None:
self.gpu_id = int(os.environ["LOCAL_RANK"])
self.model = model.to(self.gpu_id)
self.train_data = train_data
self.optimizer = optimizer
self.save_every = save_every
self.epochs_run = 0
self.snapshot_path = snapshot_path
if os.path.exists(snapshot_path):
print("Loading snapshot")
self._load_snapshot(snapshot_path)
self.model = DistributedDataParallel(self.model, device_ids=[self.gpu_id])
def _load_snapshot(self, snapshot_path):
loc = f"cuda:{self.gpu_id}"
snapshot = torch.load(snapshot_path, map_location=loc)
self.model.load_state_dict(snapshot["MODEL_STATE"])
self.epochs_run = snapshot["EPOCHS_RUN"]
print(f"Resuming training from snapshot at Epoch {self.epochs_run}")
def _run_batch(self, source, targets):
self.optimizer.zero_grad()
output = self.model(source)
# print(output,targets)
loss = F.cross_entropy(output, targets)
print(f"[GPU{self.gpu_id}] Loss {loss.item()}")
loss.backward()
self.optimizer.step()
def _run_epoch(self, epoch):
b_sz = len(next(iter(self.train_data))[0])
print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
self.train_data.sampler.set_epoch(epoch)
for source, targets in self.train_data:
source = source.to(self.gpu_id)
targets = targets.to(self.gpu_id)
self._run_batch(source, targets)
def _save_snapshot(self, epoch):
snapshot = {
"MODEL_STATE": self.model.module.state_dict(),
"EPOCHS_RUN": epoch,
}
torch.save(snapshot, self.snapshot_path)
print(f"Epoch {epoch} | Training snapshot saved at {self.snapshot_path}")
def train(self, max_epochs: int):
for epoch in range(self.epochs_run, max_epochs):
self._run_epoch(epoch)
if self.gpu_id == 0 and epoch % self.save_every == 0:
self._save_snapshot(epoch)
time.sleep(1)
def ddp_setup():
init_process_group(backend="nccl")
print("Parameters")
print(f"LOCAL_RANK:{os.environ['LOCAL_RANK']}")
print(f"RANK:{os.environ['RANK']}")
print(f"GROUP_RANK:{os.environ['GROUP_RANK']}")
print(f"ROLE_RANK:{os.environ['ROLE_RANK']}")
print(f"LOCAL_WORLD_SIZE:{os.environ['LOCAL_WORLD_SIZE']}")
print(f"WORLD_SIZE:{os.environ['WORLD_SIZE']}")
print(f"ROLE_WORLD_SIZE:{os.environ['ROLE_WORLD_SIZE']}")
print(f"MASTER_ADDR:{os.environ['MASTER_ADDR']}")
print(f"MASTER_PORT:{os.environ['MASTER_PORT']}")
print("")
torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
def load_train_objs():
train_set = MyTrainDataset(2048) # load your dataset
model = ToyModel()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
return train_set, model, optimizer
def prepare_dataloader(dataset: Dataset, batch_size: int):
return DataLoader(
dataset,
batch_size=batch_size,
pin_memory=True,
shuffle=False,
sampler=DistributedSampler(dataset)
)
def main(save_every: int, total_epochs: int, batch_size: int, snapshot_path: str = "snapshot.pt"):
ddp_setup()
dataset, model, optimizer = load_train_objs()
train_data = prepare_dataloader(dataset, batch_size)
trainer = Trainer(model, train_data, optimizer, save_every, snapshot_path)
trainer.train(total_epochs)
destroy_process_group()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='simple distributed training job')
parser.add_argument('--total_epochs', default=10, type=int, help='Total epochs to train the model')
parser.add_argument('--save_every', default=2, type=int, help='How often to save a snapshot')
parser.add_argument('--batch_size', default=512, type=int, help='Input batch size on each device (default: 32)')
args = parser.parse_args()
main(args.save_every, args.total_epochs, args.batch_size)
与单卡有几点不同:
- 初始化进程组:
init_process_group(backend="nccl")
,后端一般选择nccl - 分布式数据采样器:
sampler=DistributedSampler(dataset)
- 封装模型:
self.model = DistributedDataParallel(self.model, device_ids=[self.gpu_id])
- 启动torchrun脚本进行训练
训练脚本:
- 单机多卡
torchrun \
--nnodes=1 \
--nproc_per_node=2 \
--master-addr=127.0.0.1 \
--master-port=29500 \
main.py
- 多机多卡
export NCCL_DEBUG=info
export NCCL_SOCKET_IFNAME=bond0
export NCCL_IB_DISABLE=1
torchrun \
--nnodes=2 \
--nproc_per_node=2 \
--master-addr=10.208.58.27 \
--master-port=29602 \
--node-rank=0 \
main.py
export NCCL_DEBUG=info
export NCCL_SOCKET_IFNAME=bond0
export NCCL_IB_DISABLE=1
torchrun \
--nnodes=2 \
--nproc_per_node=1 \
--master-addr=10.208.58.27 \
--master-port=29602 \
--node-rank=1 \
main.py
注意事项:
- 多进程训练,也就是会同时运行多份代码,因此训练时候要想好GPU的序号等需要自己指定的变量
- 数据是按照进程数量分的,比如总共2048条,如果三个进程就每一个进程683
测试环境:
master:10.208.58.27 2*V100
Parameters
LOCAL_RANK:0
RANK:0
GROUP_RANK:0
ROLE_RANK:0
LOCAL_WORLD_SIZE:2
WORLD_SIZE:3
ROLE_WORLD_SIZE:3
MASTER_ADDR:10.208.58.27
MASTER_PORT:29602
Parameters
LOCAL_RANK:1
RANK:1
GROUP_RANK:0
ROLE_RANK:1
LOCAL_WORLD_SIZE:2
WORLD_SIZE:3
ROLE_WORLD_SIZE:3
MASTER_ADDR:10.208.58.27
MASTER_PORT:29602
worker:1*A100
Parameters
LOCAL_RANK:0
RANK:2
GROUP_RANK:1
ROLE_RANK:2
LOCAL_WORLD_SIZE:1
WORLD_SIZE:3
ROLE_WORLD_SIZE:3
MASTER_ADDR:10.208.58.27
MASTER_PORT:29602