from collections import defaultdict
from tabulate import tabulate
import numpy as np
import torch
from .utils import Timer, is_device, get_device
[docs]def bench(model, inputs, optimizer=None, iters=100, warmup_iters=10,
verbose=True, return_results=False, only_fwd=False):
"""
给定一个模型或函数和它的输入,对 forward 和 backward 进行计时。
Examples:
>>> from torchvision import models
>>> model = models.resnet18().cuda(0)
>>> x = torch.randn(16, 3, 224, 224).cuda(0)
>>> hfai.nn.bench(model, (x,), iters=100)
+---------------+-------------+
| measurement | time (us) |
+===============+=============+
| forward | 6241.785 |
+---------------+-------------+
| backward | 8546.621 |
+---------------+-------------+
| fwd + bwd | 14788.406 |
+---------------+-------------+
Args:
model: PyTorch 模型或函数
inputs (tuple): 输入,必须和 model 在同一个 device 上
optimizer (torch.optim.Optimizer): 优化器对象。默认是 ``None``
iters (int): 迭代的次数,默认是 ``100``
warmup_iters (int): 预热的迭代次数,默认是 ``10``
verbose (bool): 是否打印计时的结果,默认是 ``True``
return_results (bool): 是否返回各项指标的结果,默认是 ``False``
only_fwd (bool): 是否只做 forward,默认是 ``False``
"""
device = get_device(model, inputs)
assert device is not None
assert device != torch.device('cpu'), "Expected model to be on GPU devices"
assert is_device(device, inputs), "Expected model and inputs to be on the same device"
torch.cuda.reset_peak_memory_stats(device)
mem1 = torch.cuda.memory_allocated(device)
timers = defaultdict(Timer)
for _ in range(warmup_iters):
training_step(timers, model, inputs, optimizer, only_fwd)
losses = []
timers = defaultdict(Timer)
for _ in range(iters):
loss, grad_list = training_step(timers, model, inputs, optimizer, only_fwd)
losses.append(loss)
if not only_fwd:
timers['fwd + bwd'].t = timers['forward'].t + timers['backward'].t
timers['fwd + bwd'].iters = timers['forward'].iters
mem2 = torch.cuda.max_memory_allocated(device)
mem = (mem2 - mem1) / (1 << 20) # MiB
if verbose:
print(f'>>> model device: {device}')
print(f'>>> {warmup_iters} warmup iters, {iters} iters')
headers = ['measurement', 'time (us)']
tab = tabulate(timers.items(), headers=headers, tablefmt="grid")
print(tab, flush=True)
print(f"peak allocated mem {mem:.2f}", flush=True)
if return_results:
return timers, losses, grad_list, mem
[docs]def compare(model1, model2, inputs1, inputs2, optimizer1=None, optimizer2=None, iters=100,
warmup_iters=10, compare_loss=True, rtol=1e-7, atol=1e-10, only_fwd=False):
"""
比较两个模型或函数的性能。
Examples:
>>> import copy
>>> from torchvision import models
>>> model1 = models.resnet18().cuda(0)
>>> model2 = copy.deepcopy(model1).cuda(1)
>>> x1 = torch.randn(16, 3, 224, 224).cuda(0)
>>> x2 = x1.clone().cuda(1)
>>> hfai.nn.compare(model1, model2, (x1,), (x2,), iters=100)
+---------------+--------------------+--------------------+------------------------+
| measurement | model1 time (us) | model2 time (us) | model1 / model2 time |
+===============+====================+====================+========================+
| forward | 5363.802 | 5450.234 | 98.41 % |
+---------------+--------------------+--------------------+------------------------+
| backward | 8114.942 | 7987.469 | 101.60 % |
+---------------+--------------------+--------------------+------------------------+
| fwd + bwd | 13478.745 | 13437.703 | 100.31 % |
+---------------+--------------------+--------------------+------------------------+
Args:
model1: 模型或函数1
model2: 模型或函数2
inputs1 (tuple): model1 的输入,必须和 model1 在同一个 device 上
inputs2 (tuple): model2 的输入,必须和 model2 在同一个 device 上
optimizer1 (torch.optim.Optimizer): 优化器对象1。默认是 ``None``
optimizer2 (torch.optim.Optimizer): 优化器对象2。默认是 ``None``
iters (int): 迭代的次数,默认是 ``100``
warmup_iters (int): 预热的迭代次数,默认是 ``10``
compare_loss (bool): 如果是 ``True``,还会比较两个模型或函数的输出结果和 backward 的梯度是否相同,默认是 ``True``
rtol (float): 允许的最大相对误差,默认是 ``1e-7``
atol (float): 允许的最大绝对误差,默认是 ``1e-10``
only_fwd (bool): 是否只比较 forward,默认是 ``False``
"""
device1 = get_device(model1, inputs1)
device2 = get_device(model2, inputs2)
assert device1 != device2, "Expected model1 and model2 to be on two different devices"
timers1, losses1, grad_list1, _ = bench(model1, inputs1, optimizer1, iters, warmup_iters, verbose=False,
return_results=True, only_fwd=only_fwd)
timers2, losses2, grad_list2, _ = bench(model2, inputs2, optimizer2, iters, warmup_iters, verbose=False,
return_results=True, only_fwd=only_fwd)
data = []
for k in timers1:
ratio = f'{timers1[k].t / timers2[k].t * 100:.2f} %'
data.append((k, timers1[k], timers2[k], ratio))
print(f'>>> model1 device: {device1}, model2 device: {device2}')
print(f'>>> {warmup_iters} warmup iters, {iters} iters')
print(f'>>> comprare loss: {compare_loss}')
headers = ['measurement', 'model1 time (us)', 'model2 time (us)', 'model1 / model2 time']
tab = tabulate(data, headers=headers, tablefmt="grid")
print(tab, flush=True)
if compare_loss:
np.testing.assert_allclose(losses1, losses2, rtol=rtol, atol=atol)
for grad1, grad2 in zip(grad_list1, grad_list2):
np.testing.assert_allclose(grad1.cpu(), grad2.cpu(), rtol=rtol, atol=atol)
def training_step(timers, model, inputs, optimizer, only_fwd):
device = get_device(model, inputs)
for input in inputs:
if isinstance(input, torch.Tensor) and input.dtype.is_floating_point:
input.detach_()
input.requires_grad_()
if input.grad is not None:
input.grad.zero_()
# forward
out = timers['forward'].time(lambda: model(*inputs), device)
# compute loss
loss = compute_loss(out)
if only_fwd:
return loss.item(), []
# backward
grad_list = []
timers['backward'].time(lambda: loss.backward(), device)
for input in inputs:
if isinstance(input, torch.Tensor):
if input.grad is not None:
grad_list.append(input.grad)
if isinstance(model, torch.nn.Module):
for parameter in model.parameters():
if parameter.grad is not None:
grad_list.append(parameter.grad)
# optimize
if optimizer is not None:
timers['optimize'].time(lambda: optimizer.step(), device)
return loss.item(), grad_list
def compute_loss(out):
if isinstance(out, torch.Tensor):
return out.mean()
elif isinstance(out, (list, tuple)):
out = torch.stack([compute_loss(x) for x in out])
return out.mean()
else:
raise RuntimeError("Only torch.Tensor, tuple, list are supported")