hfai.distributed¶
| 功能与  | |
| 功能与  | |
| 功能与  | |
| 基于 zmq 提供的 broadcast,与 torch 提供的 broadcast 相比暂不支持 async_op 与 group 参数 | |
| 功能与  | |
| 功能与  | |
| 设置萤火2号集群NCCL优化等级和自定义配置 | |
| 给定一个函数  | 
- hfai.distributed.init_process_group(backend, init_method=None, timeout=datetime.timedelta(seconds=1800), world_size=- 1, rank=- 1, store=None, group_name='', pg_options=None)[source]¶
- 功能与 - torch.distributed.init_process_group类似- 基于 torch.distributed.init_process_group 提供的 - init_process_group,支持 backend 为- hf_barrier,通过 zmq 的方式作 barrier,使用方式与- torch.distributed.init_process_group保持一致- Examples: - import hfai.distributed as dist dist.init_process_group(backend=dist.HF_ZMQ_BACKEND) 
- hfai.distributed.barrier(group=None, async_op=False)[source]¶
- 功能与 - torch.distributed.barrier类似- 基于 zmq 提供的 barrier, 支持 backend 为 - hf_zmq,参数与与 torch.distributed.barrier 保持一致- Examples: - import hfai.distributed as dist dist.barrier() 
- hfai.distributed.gather(obj, gather_list=None, dst=0, group=None, async_op=False)[source]¶
- 功能与 - torch.distributed.gather类似- 基于 zmq 提供的 gather,与 torch 提供的 gather 相比暂不支持 async_op 与 group 参数,其余参数保持一致 
- hfai.distributed.broadcast(objs, src=0, group=None, async_op=False)[source]¶
- 基于 zmq 提供的 broadcast,与 torch 提供的 broadcast 相比暂不支持 async_op 与 group 参数 - Parameters
- objs (list[object]) – 只有一项的list,对于 rank 为 src 的调用者,该项应为输入的 python object,对于其他调用者,会将该项置为输入的 object 
- src (int) – 源 rank 
 
- Returns
- None 
 - Examples: - import hfai.distributed as dist objs = ['to broadcast'] dist.broadcast(objs, src=0) 
- hfai.distributed.all_gather(tensor_list, tensor, group=None, async_op=False)[source]¶
- 功能与 - torch.distributed.all_gather一样,但支持不同大小的 tensor。- Parameters
- tensor_list (list[Tensor]) – gather 后的 tensors,每个 tensor 大小不一定要相同 
- tensor (Tensor) – 当前 rank 要 broadcast 的 tensor 
- group (ProcessGroup, optional) – 
- async_op (bool, optional) – 是否为异步的操作 
 
 - Note - tensor 大小不一样时不支持设置 - async_op = True- Examples - >>> import hfai.distributed as dist >>> tensor_list = [torch.zeros(2 + i, dtype=torch.int64) for i in range(2)] >>> tensor_list [tensor([0, 0]), tensor([0, 0, 0])] # Rank 0 and 1 >>> tensor = torch.arange(2 + rank, dtype=torch.int64) + 1 + 2 * rank >>> tensor tensor([1, 2]) # Rank 0 tensor([3, 4, 5]) # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list 
- hfai.distributed.reduce_scatter(output, input_list, op=<ReduceOp.SUM: 0>, group=None, async_op=False)[source]¶
- 功能与 - torch.distributed.reduce_scatter一样,但支持不同大小的 tensor。- Parameters
- output (Tensor) – 输出的 tensor 
- input_list (list[Tensor]) – 准备做 reduce scatter 的 tensors,每个 tensor 大小不一定要相同 
- group (ProcessGroup, optional) – 
- async_op (bool, optional) – 是否为异步的操作 
 
 - Note - tensor 大小不一样时不支持设置 - async_op = True
- hfai.distributed.set_nccl_opt_level(OPT_LEVEL=0, CUSTOM_CONFIG=None)[source]¶
- 设置萤火2号集群NCCL优化等级和自定义配置 - Parameters
- OPT_LEVEL (int) – - HFAI_NCCL_OPT_LEVEL定义的枚举类型,有- DISABLED,- AUTO,- FULL,- COVER_AUTO,- CUSTOM5种
- CUSTOM_CONFIG (dict, optional) – [可选] 自定义优化配置 
 
 - OPT_LEVEL表示优化等级,有- DISABLED,- AUTO,- FULL,- COVER_AUTO,- CUSTOM5种,使用- HFAI_NCCL_OPT_LEVEL类定义好的属性即可:- class HFAI_NCCL_OPT_LEVEL(object): DISABLED = 0 # 代表无优化,默认情况 AUTO = 1 # 自动选择 NCCL 优化,保守策略,会根据节点信息自动选择可以用优化参数 FULL = 3 # 自动优化,激进策略,开启全部最佳优化,已知跟 sub group 有冲突 COVER_AUTO = 10 # 在 AUTO 的基础上,用户自定义某些选项 CUSTOM = 101 # 用户自行选择优化组合,通过第二个参数 CUSTOM_CONFIG 传入具体优化选项 - CUSTOM_CONFIG可选参数,代表传入具体优化选项,一个自定义优化参数配置(python Dict类型)示例如下:- CUSTOM_CONFIG={ 'GRAPH_OPT': 'path/to/your/graph.txt', # 自定义GRAPH,可选参数 GRAPH_OPT_CUSTOM_FILE 输入自定义GRAPH文件的路径 'NCCL_ALGO': 'Ring', # 设置NCCL通讯拓扑算法 Ring/Tree 'NCCL_PROTO': 'Simple', # 设置NCCL通讯协议 LL/LL128/Simple 低延迟/128Byte低延迟/常规 'GDR': True, # True/False 强制开启GPU Direct RDMA 'MIN_NCHANNELS': '1', # 设置MIN_NCHANNELS 大于等于'1'的str 'MAX_NCHANNELS': '1', # 设置MAX_NCHANNELS 大于等于'1'的str } - 一般情况下,建议使用 - AUTO或者- FULL等级优化。目前已知- FULL等级优化跟sub group冲突,使用时请注意。- Examples: - import hfai hfai.distributed.set_nccl_opt_level(hfai.distributed.HFAI_NCCL_OPT_LEVEL.AUTO) # 接正常代码... 
- hfai.distributed.pairwise_apply(f, x, args=(), group=None, equal_size=False)[source]¶
- 给定一个函数 - f和当前(第- i个)GPU 的输入- x_i,返回- [f(x_i, y_j) for j in range(nranks)], 其中- x_j为第- j个 GPU 的输入- 返回的结果等价于以下实现: - def pairwise_apply(f, x, args=(), group=None): nranks = dist.get_world_size(group) xs = [torch.empty_like(x) for _ in range(nranks)] dist.all_gather(xs, x, group=group) results = [f(x, xs[i], *args) for i in range(nranks)] return results - Parameters
- f (Callable[Tensor, Tensor]) – 需要调用的函数,通过 - f(x, y, *args)的方式调用
- x (torch.Tensor) – 输入的 tensor;每块 GPU 上 tensor 的形状可以不相同,但维度数量要相同 
- args (tuple) – 需要额外传入 - f的参数
- group (ProcessGroup) – ProcessGroup 对象;默认是 - None
- equal_size (bool) – 每块 GPU 上输入的 tensor 形状是否相同;默认是 - False
 
- Returns
- f作用在每块 GPU 上的结果
 - Note - 当 torch 的版本小于 1.12.0 时,本函数是通过 cupy 调用底层 nccl 实现的,第一次调用本函数的时候需要等待一小段初始化 NCCL 的时间。 
- 每块 GPU 上的 tensor 小于 1 MiB 时,通讯的性能会有所下降。 
 - Examples - >>> import hfai.distributed as dist >>> def f(x, y): ... return x + y >>> rank = dist.get_rank() >>> x = torch.ones(1, device="cuda") * rank >>> dist.pairwise_apply(f, x) [tensor(0), tensor(1), tensor(2), tensor(3)] # Rank 0 [tensor(1), tensor(2), tensor(3), tensor(4)] # Rank 1 [tensor(2), tensor(3), tensor(4), tensor(5)] # Rank 2 [tensor(3), tensor(4), tensor(5), tensor(6)] # Rank 3