Source code for hfai.utils.mm.file_client
from pathlib import Path
from mmcv import BaseStorageBackend, FileClient
from ffrecord import PackedFolder
[docs]@FileClient.register_backend('ffrecord')
class FFRecordClient(BaseStorageBackend):
"""
为 mmcv 提供的 FFRecord 文件格式的存储后端
通过 `FFRecordClient`,我们可以直接在 mmcv 里访问 `ffrecord.pack_folder` 打包后的文件
具体教程请看 `HFAI X MMCV <https://doc.hfai.high-flyer.cn/performance/mm.html>`_
NOTE:
支持 `mmcv >= 1.3.18`
Args:
fname (str): 通过 `ffrecord.pack_folder` 打包后的文件路径
path_prefix (str): 路径的前缀
check_data (bool): 读取 FFRecord 文件时是否验证校验和
Examples:
>>> from ffrecord import pack_folder
>>> import hfai.utils.mm, mmcv
>>> pack_folder("/your/folder", "packed.ffr")
>>> file_client_args = dict(backend="ffrecord", fname="packed.ffr")
>>> client = mmcv.FileClient.infer_client(file_client_args)
>>> data = client.get("0001.txt")
我们还可以在构建 FFRecordClient 的时候给一个路径的前缀,这样读取文件的时候会自动去除掉输入路径的前缀:
>>> file_client_args = dict(backend="ffrecord", fname="packed.ffr", path_prefix="/home/user/data/")
>>> client = mmcv.FileClient.infer_client(file_client_args)
>>> data = client.get("/home/user/data/0001.txt")
"""
def __init__(self, fname, path_prefix=None, check_data=True):
if path_prefix is not None:
path_prefix = Path(path_prefix)
self.prefix = path_prefix
self.folder = PackedFolder(fname, check_data=check_data)
def get(self, filepath):
filepath = self._remove_prefix(filepath)
data = self.folder.read_one(filepath).tobytes()
return data
def get_text(self, filepath, encoding='utf-8'):
filepath = self._remove_prefix(filepath)
data = self.folder.read_one(filepath)
text = data.tobytes().decode(encoding)
return text
def list_dir_or_file(self, dir_path, list_dir=True, list_file=True, suffix=None, recursive=False):
dir_path = self._remove_prefix(dir_path)
fnames = self.folder.list(dir_path, list_dir=list_dir, list_file=list_file, recursive=recursive)
fnames = [str(p) for p in fnames]
if suffix is not None:
assert isinstance(suffix, (tuple, str))
suffix = suffix if isinstance(suffix, tuple) else (suffix,)
else:
suffix = ()
for suf in suffix:
fnames = [p for p in fnames if p.endswith(suf)]
return fnames
def isfile(self, filepath):
filepath = self._remove_prefix(filepath)
return self.folder.is_file(filepath)
def isdir(self, filepath):
filepath = self._remove_prefix(filepath)
return self.folder.is_dir(filepath)
def exists(self, filepath):
filepath = self._remove_prefix(filepath)
return self.folder.exists(filepath)
def _remove_prefix(self, filepath):
if self.prefix is not None:
filepath = Path(filepath).relative_to(self.prefix)
return filepath