Shortcuts

Source code for hfai.cuda.pin_memory

import numpy as np
import torch
import collections
import warnings
import ctypes

from hfai._C.cuda import PinMemory


def get_data_ofs(buf):
    data = ctypes.c_char_p()
    ctypes.pythonapi.PyObject_AsCharBuffer(ctypes.py_object(buf),
                                           ctypes.pointer(data),
                                           ctypes.pointer(ctypes.c_size_t()))
    return ctypes.cast(data, ctypes.c_void_p).value


[docs]def pin_memory(data): """ 使用 cuda 内置函数, 把符合 Buffer Protocol 的对象放入锁页内存 Args: data (any): 要放入锁页内存的对象 Returns: any: 被放入锁页内存的对象 Examples: .. code-block:: python from hfai.cuda import pin_memory pinned_data = pin_memory(unpinned_data) """ try: m = memoryview(data) address = get_data_ofs(m) PinMemory.pin_memory(address, m.nbytes) return data except: if isinstance(data, torch.Tensor): return data.pin_memory() elif isinstance(data, str): return data elif isinstance(data, collections.abc.Mapping): try: return type(data)( {k: pin_memory(sample) for k, sample in data.items()}) # type: ignore[call-arg] except TypeError: # The mapping type may not support `__init__(iterable)`. return {k: pin_memory(sample) for k, sample in data.items()} elif isinstance(data, tuple) and hasattr(data, '_fields'): # namedtuple return type(data)(*(pin_memory(sample) for sample in data)) elif isinstance(data, tuple): return tuple([pin_memory(sample) for sample in data]) # Backwards compatibility. elif isinstance(data, collections.abc.Sequence): try: return type(data)([pin_memory(sample) for sample in data ]) # type: ignore[call-arg] except TypeError: # The sequence type may not support `__init__(iterable)` (e.g., `range`). return [pin_memory(sample) for sample in data] elif hasattr(data, "pin_memory"): return data.pin_memory() else: warnings.warn(f"pin_memory is not supported for type {type(data)}") return data
[docs]def unpin_memory(data): """ 使用 cuda 内置函数, 将已放入锁页内存的对象解除锁页 Args: data (any): 需要被解除锁页的对象 Returns: any: 被解除锁页的对象 Examples: .. code-block:: python from hfai.cuda import unpin_memory unpinned_data = unpin_memory(pinned_data) """ try: m = memoryview(data) address = get_data_ofs(m) PinMemory.unpin_memory(address) return data except: if isinstance(data, str): return data elif isinstance(data, collections.abc.Mapping): try: return type(data)( {k: unpin_memory(sample) for k, sample in data.items()}) # type: ignore[call-arg] except TypeError: # The mapping type may not support `__init__(iterable)`. return {k: unpin_memory(sample) for k, sample in data.items()} elif isinstance(data, tuple) and hasattr(data, '_fields'): # namedtuple return type(data)(*(unpin_memory(sample) for sample in data)) elif isinstance(data, tuple): return tuple([unpin_memory(sample) for sample in data]) # Backwards compatibility. elif isinstance(data, collections.abc.Sequence): try: return type(data)([unpin_memory(sample) for sample in data ]) # type: ignore[call-arg] except TypeError: # The sequence type may not support `__init__(iterable)` (e.g., `range`). return [unpin_memory(sample) for sample in data] elif hasattr(data, "unpin_memory"): return data.unpin_memory() else: warnings.warn( f"unpin_memory is not supported for type {type(data)}") return data
def pin_memory_async(buf, stride=None): """ 使用 cuda 内置函数, 把 memoryview 对象异步放入锁页内存 该方法会在后台开启一个线程,将 data 的内存,按照 stride 的大小分块依次放入锁页内存,每次锁住的内存大小为 stride 调用后返回一个 PinMemoryAsyncHandle 对象,该对象拥有 join 和 unpin_memory 两个方法 join: 等待后台线程,直到所有内存全部放入锁页内存 unpin_memory: 将已放入锁页内存的对象解除锁页 PinMemoryAsyncHandle 的引用计数归零时,会自动将已放入锁页内存的对象解除锁页 Args: data (any): 要放入锁页内存的对象 stride (int): 每次放入锁页内存的块大小,单位为 byte,不提供默认一次将全部内存锁页 Returns: handle: 一个 PinMemoryAsyncHandle 对象 Examples: .. code-block:: python from hfai.cuda import pin_memory_async handle = pin_memory_async(unpinned_data, stride) handle.join() # 等待全部内存锁页完成 handle.unpin_memory() # 解除锁页 """ if stride is None: stride = 1 for dim in buf.shape: stride *= dim stride *= buf.itemsize return PinMemory.pin_memory_async(buf, stride)