Shortcuts

Source code for hfai.utils.mm.file_client

from pathlib import Path
from mmcv import BaseStorageBackend, FileClient
from ffrecord import PackedFolder


[docs]@FileClient.register_backend('ffrecord') class FFRecordClient(BaseStorageBackend): """ 为 mmcv 提供的 FFRecord 文件格式的存储后端 通过 `FFRecordClient`,我们可以直接在 mmcv 里访问 `ffrecord.pack_folder` 打包后的文件 具体教程请看 `HFAI X MMCV <https://doc.hfai.high-flyer.cn/performance/mm.html>`_ NOTE: 支持 `mmcv >= 1.3.18` Args: fname (str): 通过 `ffrecord.pack_folder` 打包后的文件路径 path_prefix (str): 路径的前缀 check_data (bool): 读取 FFRecord 文件时是否验证校验和 Examples: >>> from ffrecord import pack_folder >>> import hfai.utils.mm, mmcv >>> pack_folder("/your/folder", "packed.ffr") >>> file_client_args = dict(backend="ffrecord", fname="packed.ffr") >>> client = mmcv.FileClient.infer_client(file_client_args) >>> data = client.get("0001.txt") 我们还可以在构建 FFRecordClient 的时候给一个路径的前缀,这样读取文件的时候会自动去除掉输入路径的前缀: >>> file_client_args = dict(backend="ffrecord", fname="packed.ffr", path_prefix="/home/user/data/") >>> client = mmcv.FileClient.infer_client(file_client_args) >>> data = client.get("/home/user/data/0001.txt") """ def __init__(self, fname, path_prefix=None, check_data=True): if path_prefix is not None: path_prefix = Path(path_prefix) self.prefix = path_prefix self.folder = PackedFolder(fname, check_data=check_data) def get(self, filepath): filepath = self._remove_prefix(filepath) data = self.folder.read_one(filepath).tobytes() return data def get_text(self, filepath, encoding='utf-8'): filepath = self._remove_prefix(filepath) data = self.folder.read_one(filepath) text = data.tobytes().decode(encoding) return text def list_dir_or_file(self, dir_path, list_dir=True, list_file=True, suffix=None, recursive=False): dir_path = self._remove_prefix(dir_path) fnames = self.folder.list(dir_path, list_dir=list_dir, list_file=list_file, recursive=recursive) fnames = [str(p) for p in fnames] if suffix is not None: assert isinstance(suffix, (tuple, str)) suffix = suffix if isinstance(suffix, tuple) else (suffix,) else: suffix = () for suf in suffix: fnames = [p for p in fnames if p.endswith(suf)] return fnames def isfile(self, filepath): filepath = self._remove_prefix(filepath) return self.folder.is_file(filepath) def isdir(self, filepath): filepath = self._remove_prefix(filepath) return self.folder.is_dir(filepath) def exists(self, filepath): filepath = self._remove_prefix(filepath) return self.folder.exists(filepath) def _remove_prefix(self, filepath): if self.prefix is not None: filepath = Path(filepath).relative_to(self.prefix) return filepath