hfai.distributed¶
功能与 |
|
功能与 |
|
功能与 |
|
基于 zmq 提供的 broadcast,与 torch 提供的 broadcast 相比暂不支持 async_op 与 group 参数 |
|
功能与 |
|
功能与 |
|
设置萤火2号集群NCCL优化等级和自定义配置 |
|
给定一个函数 |
- hfai.distributed.init_process_group(backend, init_method=None, timeout=datetime.timedelta(seconds=1800), world_size=- 1, rank=- 1, store=None, group_name='', pg_options=None)[source]¶
功能与
torch.distributed.init_process_group
类似基于 torch.distributed.init_process_group 提供的
init_process_group
,支持 backend 为hf_barrier
,通过 zmq 的方式作 barrier,使用方式与torch.distributed.init_process_group
保持一致Examples:
import hfai.distributed as dist dist.init_process_group(backend=dist.HF_ZMQ_BACKEND)
- hfai.distributed.barrier(group=None, async_op=False)[source]¶
功能与
torch.distributed.barrier
类似基于 zmq 提供的 barrier, 支持 backend 为
hf_zmq
,参数与与 torch.distributed.barrier 保持一致Examples:
import hfai.distributed as dist dist.barrier()
- hfai.distributed.gather(obj, gather_list=None, dst=0, group=None, async_op=False)[source]¶
功能与
torch.distributed.gather
类似基于 zmq 提供的 gather,与 torch 提供的 gather 相比暂不支持 async_op 与 group 参数,其余参数保持一致
- hfai.distributed.broadcast(objs, src=0, group=None, async_op=False)[source]¶
基于 zmq 提供的 broadcast,与 torch 提供的 broadcast 相比暂不支持 async_op 与 group 参数
- Parameters
objs (list[object]) – 只有一项的list,对于 rank 为 src 的调用者,该项应为输入的 python object,对于其他调用者,会将该项置为输入的 object
src (int) – 源 rank
- Returns
None
Examples:
import hfai.distributed as dist objs = ['to broadcast'] dist.broadcast(objs, src=0)
- hfai.distributed.all_gather(tensor_list, tensor, group=None, async_op=False)[source]¶
功能与
torch.distributed.all_gather
一样,但支持不同大小的 tensor。- Parameters
tensor_list (list[Tensor]) – gather 后的 tensors,每个 tensor 大小不一定要相同
tensor (Tensor) – 当前 rank 要 broadcast 的 tensor
group (ProcessGroup, optional) –
async_op (bool, optional) – 是否为异步的操作
Note
tensor 大小不一样时不支持设置
async_op = True
Examples
>>> import hfai.distributed as dist >>> tensor_list = [torch.zeros(2 + i, dtype=torch.int64) for i in range(2)] >>> tensor_list [tensor([0, 0]), tensor([0, 0, 0])] # Rank 0 and 1 >>> tensor = torch.arange(2 + rank, dtype=torch.int64) + 1 + 2 * rank >>> tensor tensor([1, 2]) # Rank 0 tensor([3, 4, 5]) # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list
- hfai.distributed.reduce_scatter(output, input_list, op=<ReduceOp.SUM: 0>, group=None, async_op=False)[source]¶
功能与
torch.distributed.reduce_scatter
一样,但支持不同大小的 tensor。- Parameters
output (Tensor) – 输出的 tensor
input_list (list[Tensor]) – 准备做 reduce scatter 的 tensors,每个 tensor 大小不一定要相同
group (ProcessGroup, optional) –
async_op (bool, optional) – 是否为异步的操作
Note
tensor 大小不一样时不支持设置
async_op = True
- hfai.distributed.set_nccl_opt_level(OPT_LEVEL=0, CUSTOM_CONFIG=None)[source]¶
设置萤火2号集群NCCL优化等级和自定义配置
- Parameters
OPT_LEVEL (int) –
HFAI_NCCL_OPT_LEVEL
定义的枚举类型,有DISABLED
,AUTO
,FULL
,COVER_AUTO
,CUSTOM
5种CUSTOM_CONFIG (dict, optional) – [可选] 自定义优化配置
OPT_LEVEL
表示优化等级,有DISABLED
,AUTO
,FULL
,COVER_AUTO
,CUSTOM
5种,使用HFAI_NCCL_OPT_LEVEL
类定义好的属性即可:class HFAI_NCCL_OPT_LEVEL(object): DISABLED = 0 # 代表无优化,默认情况 AUTO = 1 # 自动选择 NCCL 优化,保守策略,会根据节点信息自动选择可以用优化参数 FULL = 3 # 自动优化,激进策略,开启全部最佳优化,已知跟 sub group 有冲突 COVER_AUTO = 10 # 在 AUTO 的基础上,用户自定义某些选项 CUSTOM = 101 # 用户自行选择优化组合,通过第二个参数 CUSTOM_CONFIG 传入具体优化选项
CUSTOM_CONFIG
可选参数,代表传入具体优化选项,一个自定义优化参数配置(python Dict类型)示例如下:CUSTOM_CONFIG={ 'GRAPH_OPT': 'path/to/your/graph.txt', # 自定义GRAPH,可选参数 GRAPH_OPT_CUSTOM_FILE 输入自定义GRAPH文件的路径 'NCCL_ALGO': 'Ring', # 设置NCCL通讯拓扑算法 Ring/Tree 'NCCL_PROTO': 'Simple', # 设置NCCL通讯协议 LL/LL128/Simple 低延迟/128Byte低延迟/常规 'GDR': True, # True/False 强制开启GPU Direct RDMA 'MIN_NCHANNELS': '1', # 设置MIN_NCHANNELS 大于等于'1'的str 'MAX_NCHANNELS': '1', # 设置MAX_NCHANNELS 大于等于'1'的str }
一般情况下,建议使用
AUTO
或者FULL
等级优化。目前已知FULL
等级优化跟sub group冲突,使用时请注意。Examples:
import hfai hfai.distributed.set_nccl_opt_level(hfai.distributed.HFAI_NCCL_OPT_LEVEL.AUTO) # 接正常代码...
- hfai.distributed.pairwise_apply(f, x, args=(), group=None, equal_size=False)[source]¶
给定一个函数
f
和当前(第i
个)GPU 的输入x_i
,返回[f(x_i, y_j) for j in range(nranks)]
, 其中x_j
为第j
个 GPU 的输入返回的结果等价于以下实现:
def pairwise_apply(f, x, args=(), group=None): nranks = dist.get_world_size(group) xs = [torch.empty_like(x) for _ in range(nranks)] dist.all_gather(xs, x, group=group) results = [f(x, xs[i], *args) for i in range(nranks)] return results
- Parameters
f (Callable[Tensor, Tensor]) – 需要调用的函数,通过
f(x, y, *args)
的方式调用x (torch.Tensor) – 输入的 tensor;每块 GPU 上 tensor 的形状可以不相同,但维度数量要相同
args (tuple) – 需要额外传入
f
的参数group (ProcessGroup) – ProcessGroup 对象;默认是
None
equal_size (bool) – 每块 GPU 上输入的 tensor 形状是否相同;默认是
False
- Returns
f
作用在每块 GPU 上的结果
Note
当 torch 的版本小于 1.12.0 时,本函数是通过 cupy 调用底层 nccl 实现的,第一次调用本函数的时候需要等待一小段初始化 NCCL 的时间。
每块 GPU 上的 tensor 小于 1 MiB 时,通讯的性能会有所下降。
Examples
>>> import hfai.distributed as dist >>> def f(x, y): ... return x + y >>> rank = dist.get_rank() >>> x = torch.ones(1, device="cuda") * rank >>> dist.pairwise_apply(f, x) [tensor(0), tensor(1), tensor(2), tensor(3)] # Rank 0 [tensor(1), tensor(2), tensor(3), tensor(4)] # Rank 1 [tensor(2), tensor(3), tensor(4), tensor(5)] # Rank 2 [tensor(3), tensor(4), tensor(5), tensor(6)] # Rank 3