Shortcuts

Source code for hf3fs

from hf3fs_pyclient import Client, iovec

#import attrs
from contextlib import contextmanager, AbstractContextManager
from dataclasses import dataclass
import functools
import os
from pathlib import PurePosixPath
import threading as th

from pkg_resources import get_distribution

try:
    __version__ = get_distribution('hf3fs').version
except:
    __version__ = "debug"

DEFAULT_CLIENT = th.local()
DEFAULT_CLIENT.client = None
DEFAULT_CLIENT.clients = {}

#@attrs.define
@dataclass
class MountInfo:
    token: str
    as_super: bool

MOUNT_INFO = {}

def _getDefaultClient(kwargs):
    if 'client' in kwargs and kwargs['client'] is not None:
        client = kwargs['client']
        del kwargs['client']
    elif 'mount_name' in kwargs and kwargs['mount_name'] is not None:
        mount_name = kwargs['mount_name']
        if mount_name not in DEFAULT_CLIENT.clients:
            setupDefaultClient(mount_name)
        client = DEFAULT_CLIENT.clients[mount_name]
        del kwargs['mount_name']
    elif DEFAULT_CLIENT.client is None:
        raise RuntimeError("default client not setup")
    else:
        client = DEFAULT_CLIENT.client

    return client, kwargs

def _setupH3Method(name):
    @functools.wraps(getattr(Client, name))
    def wrapper(*args, **kwargs):
        nonlocal name
        client, kwargs = _getDefaultClient(kwargs)
        return getattr(client, name)(*args, **kwargs)

    globals()[name] = wrapper

for _name in ['stat', 'fstat', 'mkdir', 'rmdir', 'unlink', 'remove', 'realpath', 'readlink', 'opendir', 'readdir',
              'creat', 'symlink', 'link', 'open', 'close', 'chmod', 'chown', 'chdir', 'ftruncate',
              'iovalloc', 'iovfree', 'preadv', 'pwritev']:
    _setupH3Method(_name)

def setMountInfo(mount_name, token, as_super=False):
    global MOUNT_INFO
    MOUNT_INFO[mount_name] = MountInfo(token, as_super)

def setupDefaultClient(mount_name):
    global MOUNT_INFO
    if mount_name not in MOUNT_INFO:
        raise ValueError(f"unknown mount name '{mount_name}'")

    mount = MOUNT_INFO[mount_name]
    
    global DEFAULT_CLIENT
    client = DEFAULT_CLIENT.clients[mount_name] = Client(mount_name, mount.token, as_super=mount.as_super)
    return client

@contextmanager
def defaultClient(mount_name, token, as_super=False):
    global DEFAULT_CLIENT
    lastClient = DEFAULT_CLIENT.client
    DEFAULT_CLIENT.client = Client(mount_name, token, as_super=as_super)

    try:
        yield DEFAULT_CLIENT.client
    finally:
        DEFAULT_CLIENT.client = lastClient

def withClient(f):
    @functools.wraps(f)
    def wrapper(*args, **kwargs):
        nonlocal f
        client, kwargs = _getDefaultClient(kwargs)
        return f(client=client, *args, **kwargs)

    return wrapper

[docs]@withClient def listdir(path='.', client=None): if path is None: path = '.' dirp = client.opendir(path) # print(dirp, flush=True) fileList = [] while True: dent = client.readdir(dirp) # print('dent', dent, flush=True) if dent is None: break fileList.append(dent.d_name) return fileList
[docs]class DirEntry(object): @withClient def __init__(self, parentPath, name, etype, parentFd, client=None): self._parentPath = PurePosixPath(parentPath) self._name = name self._etype = etype self._parentFd = parentFd self._client = client self._st = None @property def name(self): return self._name @property def path(self): return str(self._parentPath / self._name) _S_IFMT = 0o170000 _S_IFLNK = 0o120000 _S_IFREG = 0o100000 _S_IFDIR = 0o040000 _DT_DIR = 4 _DT_REG = 8 _DT_LNK = 10 def _checkWFollow(self, against, follow_symlinks): if self._etype == against[0]: return True elif follow_symlinks and self.is_symlink(): st = self.stat(True) return (st.st_mode & self._S_IFMT) == against[1] else: return False def is_dir(self, follow_symlinks=True): return self._checkWFollow((self._DT_DIR, self._S_IFDIR), follow_symlinks) def is_file(self, follow_symlinks=True): return self._checkWFollow((self._DT_REG, self._S_IFREG), follow_symlinks) def is_symlink(self): return self._etype == self._DT_LNK def stat(self, follow_symlinks=True): if self._st is None: self._st = self._client.stat(self._name, dir_fd=self._parentFd, follow_symlinks=follow_symlinks) return self._st
[docs]@withClient def scandir(path='.', dir_fd=None, client=None): class DirEntryIter(AbstractContextManager): def __init__(self, path, client, dir_fd): self._path = path self._client = client self._dir_fd = dir_fd self._fd = None self._dirp = None @property def dir_fd(self): return self._fd def close(self): if self._fd is not None: self._client.close(self._fd) self._fd = None self._dirp = None def __iter__(self): self._fd = self._client.open(self._path, os.O_DIRECTORY | os.O_PATH, dir_fd=self._dir_fd) # print('dirfd to scan', self._fd, flush=True) self._dirp = self._client.opendir(self._path, dir_fd=self._dir_fd) return self def __next__(self): dent = self._client.readdir(self._dirp) if dent is None: raise StopIteration() return DirEntry(self._path, dent.d_name, dent.d_type, self._fd, client=self._client) def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.close() return False if path is None: path = '.' return DirEntryIter(path, client, dir_fd)
@withClient def walk2(top, topdown=True, onerror=None, followlinks=False, dir_fd=None, curr_dir=None, client=None): try: with scandir(curr_dir if curr_dir is not None else top, dir_fd, client=client) as sd: dirnames = [] filenames = [] for dent in sd: name = dent.name if dent.is_dir(followlinks): dirnames.append(name) if not topdown: yield from walk2(dent.path, False, onerror, followlinks, sd.dir_fd, name, client=client) else: filenames.append(name) yield (top, dirnames, filenames, sd.dir_fd) if topdown: topp = PurePosixPath(top) for dirname in dirnames: yield from walk2(str(topp / dirname), True, onerror, followlinks, sd.dir_fd, dirname, client=client) except OSError as e: if onerror is not None: onerror(e)
[docs]@withClient def walk(top, topdown=True, onerror=None, followlinks=False, dir_fd=None, client=None): for dp, dns, fns, dfd in walk2(top, topdown, onerror, followlinks, dir_fd, None, client=client): yield (dp, dns, fns)
[docs]class BinaryFile(AbstractContextManager): @withClient def __init__(self, path, mode, dir_fd=None, client=None, ignore_cache=False): self.client = client if mode == 'r': flags = os.O_RDONLY elif mode == 'r+': flags = os.O_RDWR elif mode == 'r+c': flags = os.O_RDWR | os.O_CREAT elif mode == 'w': flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC elif mode == 'w+': flags = os.O_RDWR | os.O_CREAT | os.O_TRUNC else: raise ValueError(f'invalid mode {mode}') if ignore_cache: flags |= os.O_NONBLOCK self._fd = None self._off = 0 self._fd = client.open(path, flags, 0o644, dir_fd=dir_fd) def __del__(self): self.close() def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.close() def close(self): # print('to close fd', self._fd, flush=True) if self._fd is not None: self.client.close(self._fd) self._fd = None def fileno(self): return self._fd def seek(self, pos, how=os.SEEK_SET, readahead=None): self._off = self.client.lseek(self._fd, pos, how, readahead=readahead) return self._off def tell(self): return self._off def _bytesLeft(self): off = self._off flen = self.seek(0, os.SEEK_END) self.seek(off) return flen - off def read(self, size=None, readahead=None): if size is None: size = self._bytesLeft() buf = memoryview(bytearray(size)) red = self.readinto(buf, readahead=readahead) return buf[:red] def readinto(self, buf, readahead=None): red = self.client.read(self._fd, buf, readahead=readahead) self._off += red return red def write(self, buf, flush=False): writ = self.client.write(self._fd, buf, flush=flush) self._off += writ return writ