Source code for hfai.multiprocessing.multiprocessing
from torch.multiprocessing import Process as ProcessLG
import threading
from hfai.client import bind_hf_except_hook
from hfai.client.api import self_health_check
from .turbo_trans import turbo_trans
from hfai._C.multiprocessing import numa
from multiprocessing.context import BaseContext
from multiprocessing.reduction import dump
[docs]class Process(ProcessLG):
"""
多进程管理
继承自 `torch.multiprocessing.Process <https://pytorch.org/docs/stable/multiprocessing.html?highlight=torch%20multiprocessing%20process#module-torch.multiprocessing>`_ ,差异如下:
1. 在捕获到子进程异常或是子进程退出码非0时,会启动自检程序。硬件故障则会重启该任务,否则结束该任务
#. 修复包括python3.6在内的部分版本无法向子进程传入超过4GB参数的问题
#. 提供numa选项,其余参数与 `torch.multiprocessing.Process <https://pytorch.org/docs/stable/multiprocessing.html?highlight=torch%20multiprocessing%20process#module-torch.multiprocessing>`_ 一致
Args:
numa (int): 该子进程对应的 numa(不指定则选用默认numa)
Examples:
.. code-block:: python
from hfai.multiprocessing import Process
from hfai.utils import which_numa
Process(target=..., args=(), numa=which_numa(i_gpu=0))
"""
def __init__(self, group=None, target=None, *args, **kwargs):
self.numa = kwargs.pop('numa') if 'numa' in kwargs else -1
super().__init__(group=group, target=self._bind_numa, *args, **kwargs)
self._th = None
self.target = target
def _bind_numa(self, *args, **kwargs):
if self.numa != -1:
numa.bind_numa(self.numa)
return self.target(*args, **kwargs)
def _monitor(self, p):
super(ProcessLG, p).join()
if p.exitcode != 0:
print(f'子进程出现异常,exit_code: {p.exitcode}')
self_health_check(p.pid)
def start(self):
super().start()
self._th = threading.Thread(target=self._monitor, args=(self, ))
self._th.start()
def join(self, timeout=None):
self._th.join()
bind_hf_except_hook(Process)
turbo_trans()
BaseContext.Pool.__defaults__ = (None, turbo_trans, (), None)
dump.__defaults__ = (4, )