Source code for hfai.cuda.pin_memory
import numpy as np
import torch
import collections
import warnings
import ctypes
from hfai._C.cuda import PinMemory
def get_data_ofs(buf):
data = ctypes.c_char_p()
ctypes.pythonapi.PyObject_AsCharBuffer(ctypes.py_object(buf),
ctypes.pointer(data),
ctypes.pointer(ctypes.c_size_t()))
return ctypes.cast(data, ctypes.c_void_p).value
[docs]def pin_memory(data):
"""
使用 cuda 内置函数, 把符合 Buffer Protocol 的对象放入锁页内存
Args:
data (any): 要放入锁页内存的对象
Returns:
any: 被放入锁页内存的对象
Examples:
.. code-block:: python
from hfai.cuda import pin_memory
pinned_data = pin_memory(unpinned_data)
"""
try:
m = memoryview(data)
address = get_data_ofs(m)
PinMemory.pin_memory(address, m.nbytes)
return data
except:
if isinstance(data, torch.Tensor):
return data.pin_memory()
elif isinstance(data, str):
return data
elif isinstance(data, collections.abc.Mapping):
try:
return type(data)(
{k: pin_memory(sample)
for k, sample in data.items()}) # type: ignore[call-arg]
except TypeError:
# The mapping type may not support `__init__(iterable)`.
return {k: pin_memory(sample) for k, sample in data.items()}
elif isinstance(data, tuple) and hasattr(data,
'_fields'): # namedtuple
return type(data)(*(pin_memory(sample) for sample in data))
elif isinstance(data, tuple):
return tuple([pin_memory(sample)
for sample in data]) # Backwards compatibility.
elif isinstance(data, collections.abc.Sequence):
try:
return type(data)([pin_memory(sample) for sample in data
]) # type: ignore[call-arg]
except TypeError:
# The sequence type may not support `__init__(iterable)` (e.g., `range`).
return [pin_memory(sample) for sample in data]
elif hasattr(data, "pin_memory"):
return data.pin_memory()
else:
warnings.warn(f"pin_memory is not supported for type {type(data)}")
return data
[docs]def unpin_memory(data):
"""
使用 cuda 内置函数, 将已放入锁页内存的对象解除锁页
Args:
data (any): 需要被解除锁页的对象
Returns:
any: 被解除锁页的对象
Examples:
.. code-block:: python
from hfai.cuda import unpin_memory
unpinned_data = unpin_memory(pinned_data)
"""
try:
m = memoryview(data)
address = get_data_ofs(m)
PinMemory.unpin_memory(address)
return data
except:
if isinstance(data, str):
return data
elif isinstance(data, collections.abc.Mapping):
try:
return type(data)(
{k: unpin_memory(sample)
for k, sample in data.items()}) # type: ignore[call-arg]
except TypeError:
# The mapping type may not support `__init__(iterable)`.
return {k: unpin_memory(sample) for k, sample in data.items()}
elif isinstance(data, tuple) and hasattr(data,
'_fields'): # namedtuple
return type(data)(*(unpin_memory(sample) for sample in data))
elif isinstance(data, tuple):
return tuple([unpin_memory(sample)
for sample in data]) # Backwards compatibility.
elif isinstance(data, collections.abc.Sequence):
try:
return type(data)([unpin_memory(sample) for sample in data
]) # type: ignore[call-arg]
except TypeError:
# The sequence type may not support `__init__(iterable)` (e.g., `range`).
return [unpin_memory(sample) for sample in data]
elif hasattr(data, "unpin_memory"):
return data.unpin_memory()
else:
warnings.warn(
f"unpin_memory is not supported for type {type(data)}")
return data
def pin_memory_async(buf, stride=None):
"""
使用 cuda 内置函数, 把 memoryview 对象异步放入锁页内存
该方法会在后台开启一个线程,将 data 的内存,按照 stride 的大小分块依次放入锁页内存,每次锁住的内存大小为 stride
调用后返回一个 PinMemoryAsyncHandle 对象,该对象拥有 join 和 unpin_memory 两个方法
join: 等待后台线程,直到所有内存全部放入锁页内存
unpin_memory: 将已放入锁页内存的对象解除锁页
PinMemoryAsyncHandle 的引用计数归零时,会自动将已放入锁页内存的对象解除锁页
Args:
data (any): 要放入锁页内存的对象
stride (int): 每次放入锁页内存的块大小,单位为 byte,不提供默认一次将全部内存锁页
Returns:
handle: 一个 PinMemoryAsyncHandle 对象
Examples:
.. code-block:: python
from hfai.cuda import pin_memory_async
handle = pin_memory_async(unpinned_data, stride)
handle.join() # 等待全部内存锁页完成
handle.unpin_memory() # 解除锁页
"""
if stride is None:
stride = 1
for dim in buf.shape:
stride *= dim
stride *= buf.itemsize
return PinMemory.pin_memory_async(buf, stride)