Shortcuts

Source code for hfai.file.file

import ctypes
import os
from contextlib import contextmanager
from io import BytesIO


def get_alignment(custom_alignment=None):
    '''
    Returns alignment of O_DIRECT (DirectIO)
    @:param custom_alignment always overrides DEFAULT_ALIGNMENT (if provided)
    '''
    DEFAULT_ALIGNMENT = 4096
    if custom_alignment != None:
        return custom_alignment
    else:
        return DEFAULT_ALIGNMENT


def get_validated_alignment(custom_alignment=None):
    '''
    Returns validated alignment of O_DIRECT (DirectIO)
    @:param custom_alignment always overrides DEFAULT_ALIGNMENT (if provided)
    @:raises ValueError if invalid custom_alignment is provided
    '''
    MIN_ALIGNMENT = 512

    alignment = get_alignment(custom_alignment)
    # validates alignment (0 is treated as valid)
    if alignment < 0:
        raise ValueError('Invalid negative alignment:', alignment)
    if alignment > 0 and alignment < MIN_ALIGNMENT:
        raise ValueError('Invalid alignment, should >=', MIN_ALIGNMENT, ' if not zero:', alignment)
    if alignment > 0 and alignment % MIN_ALIGNMENT != 0:
        raise ValueError('Invalid alignment, should be a muliple of', MIN_ALIGNMENT, ' if not zero:', alignment)
    # alignment is valid
    return alignment


def allocate_aligned_memory(nbytes, custom_alignment=None):
    '''
    Returns allocated memory with aligned address in the format of (aligned_memory, end_index)
    @:param nbytes: #bytes to be allocated
    @:param custom_alignment always overrides DEFAULT_ALIGNMENT (if provided)
    @:raises ValueError if invalid custom_alignment is provided
    '''
    # get validated alignment, fail early by raising ValueError if custom_alignment is invalid
    alignment = get_validated_alignment(custom_alignment)
    # zero alignment
    if alignment == 0:
        return (bytearray(nbytes), nbytes)
    # prepare buffer
    buf_size = (nbytes + alignment - 1) // alignment * alignment + alignment * 2
    raw_memory = bytearray(buf_size)

    ctypes_raw_type = (ctypes.c_char * buf_size)
    ctypes_raw_memory = ctypes_raw_type.from_buffer(raw_memory)

    raw_address = ctypes.addressof(ctypes_raw_memory)
    offset = raw_address % alignment
    offset_to_aligned = (alignment - offset) % alignment

    ctypes_aligned_type = (ctypes.c_char * (buf_size - offset_to_aligned) )
    ctypes_aligned_memory = ctypes_aligned_type.from_buffer(raw_memory, offset_to_aligned)

    buffer_end = (nbytes + alignment - 1) // alignment * alignment
    # returns (aligned_memory, end_index)
    return (ctypes_aligned_memory, buffer_end)


[docs]def read_file(file_path, custom_alignment=None): """ 通过 O_DIRECT 直接读取文件 自定义对齐字节数有误将触发 ValueError,读取失败会触发 RuntimeError Args: file_path (str): 文件路径 custom_alignment (int, optional): 自定义对齐的字节数 Returns: 返回一个二进制字符串 Examples: >>> from hfai.file import read_file >>> read_file('/your/path') """ try: file_size = os.stat(file_path).st_size except Exception as e: raise RuntimeError('stat {} failed due to {}'.format(file_path, e) ) # get buffer (aligned_memory, buffer_end) = allocate_aligned_memory(file_size, custom_alignment) aligned_memview = memoryview(aligned_memory) # open file total_read = 0 try: fd = os.open(file_path, os.O_RDONLY | os.O_DIRECT) except Exception as e: raise RuntimeError('open{} failed due to {}'.format(file_path, e) ) # read file try: with os.fdopen(fd, mode='rb', buffering=0) as f: while total_read < file_size: current_read = f.readinto(aligned_memview[total_read : buffer_end]) total_read += current_read except Exception as e: raise RuntimeError('read {} failed due to {}'.format(file_path, e) ) # returns content read from the provided file_path return aligned_memory[ : total_read]
[docs]def read_file_as_bytesio(file_path, custom_alignment=None): """ 通过 O_DIRECT 直接读取文件 自定义对齐字节数有误将触发 ValueError,读取失败会触发 RuntimeError Args: file_path (str): 文件路径 custom_alignment (int, optional): 自定义对齐的字节数 Returns: 返回一个 BytesIO Examples: >>> from hfai.file import read_file_as_bytesio >>> read_file_as_bytesio('/your/path') """ return BytesIO(read_file(file_path, custom_alignment))
class OSyncWriter: """ 保证文件能够写入硬盘 一般而言,flush,保证了我们的数据从 [内存] 中释放了,但是并不能保证落到 [硬盘]上,特别是对于并行文件系统来说 所以这个类,就能够保证用户的 write,写到了硬盘上 """ def __init__(self, file, osync=True): """ Args: file: 要打开的文件,写入二进制信息 osync: True,每次 write 都保证落盘,会慢;False,会在 close 的时候做一次 sync 来保证落盘 """ if osync: self.fd = os.open(file, os.O_WRONLY | os.O_CREAT | os.O_TRUNC | os.O_SYNC) else: self.fd = os.open(file, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) def write(self, buf): """ Args: buf: 写入的二进制内容 """ os.write(self.fd, buf) def flush(self): pass def close(self, ): """ 关闭文件,释放 fd """ if self.fd is not None: os.fdatasync(self.fd) os.close(self.fd) self.fd = None @contextmanager def o_sync_write(file, osync=True): """ 打开一个文件用来写 Args: file: 要打开的文件,写入二进制信息 osync: True,每次 write 都保证落盘,会慢;False,会在 退出 o_sync_write 作用域 的时候做一次 sync 来保证落盘 Examples: >>> with o_sync_write('/tmp/file', osync=True) as w: >>> w.write(b'0000000001') """ file = OSyncWriter(file, osync) try: yield file finally: file.close() def main(): from argparse import ArgumentParser parser = ArgumentParser('Test read file') parser.add_argument('--src_file_path', type=str, default=None) parser.add_argument('--dst_file_path', type=str, default=None) parser.add_argument('--custom_alignment', type=int, default=None) args = parser.parse_args() if args.src_file_path: bytes_io = read_file_as_bytesio(args.src_file_path, args.custom_alignment) if args.dst_file_path: with open(args.dst_file_path, 'wb') as f: f.write(bytes_io.read() ) else: print('dst_file_path is empty') else: print('src_file_path is empty') if __name__ == '__main__': main()