Shortcuts

hfai.distributed

init_process_group

功能与 torch.distributed.init_process_group 类似

barrier

功能与 torch.distributed.barrier 类似

gather

功能与 torch.distributed.gather 类似

broadcast

基于 zmq 提供的 broadcast,与 torch 提供的 broadcast 相比暂不支持 async_op 与 group 参数

all_gather

功能与 torch.distributed.all_gather 一样,但支持不同大小的 tensor。

reduce_scatter

功能与 torch.distributed.reduce_scatter 一样,但支持不同大小的 tensor。

set_nccl_opt_level

设置萤火2号集群NCCL优化等级和自定义配置

pairwise_apply

给定一个函数 f 和当前(第 i 个)GPU 的输入 x_i,返回 [f(x_i, y_j) for j in range(nranks)], 其中 x_j 为第 j 个 GPU 的输入

hfai.distributed.init_process_group(backend, init_method=None, timeout=datetime.timedelta(seconds=1800), world_size=- 1, rank=- 1, store=None, group_name='', pg_options=None)[source]

功能与 torch.distributed.init_process_group 类似

基于 torch.distributed.init_process_group 提供的 init_process_group,支持 backend 为 hf_barrier,通过 zmq 的方式作 barrier,使用方式与 torch.distributed.init_process_group 保持一致

Examples:

import hfai.distributed as dist
dist.init_process_group(backend=dist.HF_ZMQ_BACKEND)
hfai.distributed.barrier(group=None, async_op=False)[source]

功能与 torch.distributed.barrier 类似

基于 zmq 提供的 barrier, 支持 backend 为 hf_zmq,参数与与 torch.distributed.barrier 保持一致

Examples:

import hfai.distributed as dist
dist.barrier()
hfai.distributed.gather(obj, gather_list=None, dst=0, group=None, async_op=False)[source]

功能与 torch.distributed.gather 类似

基于 zmq 提供的 gather,与 torch 提供的 gather 相比暂不支持 async_op 与 group 参数,其余参数保持一致

hfai.distributed.broadcast(objs, src=0, group=None, async_op=False)[source]

基于 zmq 提供的 broadcast,与 torch 提供的 broadcast 相比暂不支持 async_op 与 group 参数

Parameters
  • objs (list[object]) – 只有一项的list,对于 rank 为 src 的调用者,该项应为输入的 python object,对于其他调用者,会将该项置为输入的 object

  • src (int) – 源 rank

Returns

None

Examples:

import hfai.distributed as dist
objs = ['to broadcast']
dist.broadcast(objs, src=0)
hfai.distributed.all_gather(tensor_list, tensor, group=None, async_op=False)[source]

功能与 torch.distributed.all_gather 一样,但支持不同大小的 tensor。

Parameters
  • tensor_list (list[Tensor]) – gather 后的 tensors,每个 tensor 大小不一定要相同

  • tensor (Tensor) – 当前 rank 要 broadcast 的 tensor

  • group (ProcessGroup, optional) –

  • async_op (bool, optional) – 是否为异步的操作

Note

tensor 大小不一样时不支持设置 async_op = True

Examples

>>> import hfai.distributed as dist
>>> tensor_list = [torch.zeros(2 + i, dtype=torch.int64) for i in range(2)]
>>> tensor_list
[tensor([0, 0]), tensor([0, 0, 0])] # Rank 0 and 1
>>> tensor = torch.arange(2 + rank, dtype=torch.int64) + 1 + 2 * rank
>>> tensor
tensor([1, 2]) # Rank 0
tensor([3, 4, 5]) # Rank 1
>>> dist.all_gather(tensor_list, tensor)
>>> tensor_list
hfai.distributed.reduce_scatter(output, input_list, op=<ReduceOp.SUM: 0>, group=None, async_op=False)[source]

功能与 torch.distributed.reduce_scatter 一样,但支持不同大小的 tensor。

Parameters
  • output (Tensor) – 输出的 tensor

  • input_list (list[Tensor]) – 准备做 reduce scatter 的 tensors,每个 tensor 大小不一定要相同

  • group (ProcessGroup, optional) –

  • async_op (bool, optional) – 是否为异步的操作

Note

tensor 大小不一样时不支持设置 async_op = True

hfai.distributed.set_nccl_opt_level(OPT_LEVEL=0, CUSTOM_CONFIG=None)[source]

设置萤火2号集群NCCL优化等级和自定义配置

Parameters
  • OPT_LEVEL (int) – HFAI_NCCL_OPT_LEVEL 定义的枚举类型,有 DISABLED , AUTO , FULL , COVER_AUTO , CUSTOM 5种

  • CUSTOM_CONFIG (dict, optional) – [可选] 自定义优化配置

OPT_LEVEL 表示优化等级,有 DISABLED , AUTO , FULL , COVER_AUTO , CUSTOM 5种,使用 HFAI_NCCL_OPT_LEVEL 类定义好的属性即可:

class HFAI_NCCL_OPT_LEVEL(object):
    DISABLED = 0     # 代表无优化,默认情况
    AUTO = 1         # 自动选择 NCCL 优化,保守策略,会根据节点信息自动选择可以用优化参数
    FULL = 3         # 自动优化,激进策略,开启全部最佳优化,已知跟 sub group 有冲突
    COVER_AUTO = 10  # 在 AUTO 的基础上,用户自定义某些选项
    CUSTOM = 101     # 用户自行选择优化组合,通过第二个参数 CUSTOM_CONFIG 传入具体优化选项

CUSTOM_CONFIG 可选参数,代表传入具体优化选项,一个自定义优化参数配置(python Dict类型)示例如下:

CUSTOM_CONFIG={
    'GRAPH_OPT': 'path/to/your/graph.txt', # 自定义GRAPH,可选参数 GRAPH_OPT_CUSTOM_FILE 输入自定义GRAPH文件的路径
    'NCCL_ALGO': 'Ring',                   # 设置NCCL通讯拓扑算法 Ring/Tree
    'NCCL_PROTO': 'Simple',                # 设置NCCL通讯协议 LL/LL128/Simple 低延迟/128Byte低延迟/常规
    'GDR': True,                           # True/False 强制开启GPU Direct RDMA
    'MIN_NCHANNELS': '1',                  # 设置MIN_NCHANNELS 大于等于'1'的str
    'MAX_NCHANNELS': '1',                  # 设置MAX_NCHANNELS 大于等于'1'的str
}

一般情况下,建议使用 AUTO 或者 FULL 等级优化。目前已知 FULL 等级优化跟sub group冲突,使用时请注意。

Examples:

import hfai
hfai.distributed.set_nccl_opt_level(hfai.distributed.HFAI_NCCL_OPT_LEVEL.AUTO)
# 接正常代码...
hfai.distributed.pairwise_apply(f, x, args=(), group=None, equal_size=False)[source]

给定一个函数 f 和当前(第 i 个)GPU 的输入 x_i,返回 [f(x_i, y_j) for j in range(nranks)], 其中 x_j 为第 j 个 GPU 的输入

返回的结果等价于以下实现:

def pairwise_apply(f, x, args=(), group=None):
    nranks = dist.get_world_size(group)
    xs = [torch.empty_like(x) for _ in range(nranks)]
    dist.all_gather(xs, x, group=group)
    results = [f(x, xs[i], *args) for i in range(nranks)]
    return results
Parameters
  • f (Callable[Tensor, Tensor]) – 需要调用的函数,通过 f(x, y, *args) 的方式调用

  • x (torch.Tensor) – 输入的 tensor;每块 GPU 上 tensor 的形状可以不相同,但维度数量要相同

  • args (tuple) – 需要额外传入 f 的参数

  • group (ProcessGroup) – ProcessGroup 对象;默认是 None

  • equal_size (bool) – 每块 GPU 上输入的 tensor 形状是否相同;默认是 False

Returns

f 作用在每块 GPU 上的结果

Note

  1. 当 torch 的版本小于 1.12.0 时,本函数是通过 cupy 调用底层 nccl 实现的,第一次调用本函数的时候需要等待一小段初始化 NCCL 的时间。

  2. 每块 GPU 上的 tensor 小于 1 MiB 时,通讯的性能会有所下降。

Examples

>>> import hfai.distributed as dist
>>> def f(x, y):
...     return x + y
>>> rank = dist.get_rank()
>>> x = torch.ones(1, device="cuda") * rank
>>> dist.pairwise_apply(f, x)
[tensor(0), tensor(1), tensor(2), tensor(3)]  # Rank 0
[tensor(1), tensor(2), tensor(3), tensor(4)]  # Rank 1
[tensor(2), tensor(3), tensor(4), tensor(5)]  # Rank 2
[tensor(3), tensor(4), tensor(5), tensor(6)]  # Rank 3