Shortcuts

Source code for hfai.multiprocessing.multiprocessing

from torch.multiprocessing import Process as ProcessLG
import threading
from hfai.client import bind_hf_except_hook
from hfai.client.api import self_health_check
from .turbo_trans import turbo_trans
from hfai._C.multiprocessing import numa
from multiprocessing.context import BaseContext
from multiprocessing.reduction import dump


[docs]class Process(ProcessLG): """ 多进程管理 继承自 `torch.multiprocessing.Process <https://pytorch.org/docs/stable/multiprocessing.html?highlight=torch%20multiprocessing%20process#module-torch.multiprocessing>`_ ,差异如下: 1. 在捕获到子进程异常或是子进程退出码非0时,会启动自检程序。硬件故障则会重启该任务,否则结束该任务 #. 修复包括python3.6在内的部分版本无法向子进程传入超过4GB参数的问题 #. 提供numa选项,其余参数与 `torch.multiprocessing.Process <https://pytorch.org/docs/stable/multiprocessing.html?highlight=torch%20multiprocessing%20process#module-torch.multiprocessing>`_ 一致 Args: numa (int): 该子进程对应的 numa(不指定则选用默认numa) Examples: .. code-block:: python from hfai.multiprocessing import Process from hfai.utils import which_numa Process(target=..., args=(), numa=which_numa(i_gpu=0)) """ def __init__(self, group=None, target=None, *args, **kwargs): self.numa = kwargs.pop('numa') if 'numa' in kwargs else -1 super().__init__(group=group, target=self._bind_numa, *args, **kwargs) self._th = None self.target = target def _bind_numa(self, *args, **kwargs): if self.numa != -1: numa.bind_numa(self.numa) return self.target(*args, **kwargs) def _monitor(self, p): super(ProcessLG, p).join() if p.exitcode != 0: print(f'子进程出现异常,exit_code: {p.exitcode}') self_health_check(p.pid) def start(self): super().start() self._th = threading.Thread(target=self._monitor, args=(self, )) self._th.start() def join(self, timeout=None): self._th.join()
bind_hf_except_hook(Process) turbo_trans() BaseContext.Pool.__defaults__ = (None, turbo_trans, (), None) dump.__defaults__ = (4, )