Source code for hfai.file.file
import ctypes
import os
from contextlib import contextmanager
from io import BytesIO
def get_alignment(custom_alignment=None):
'''
Returns alignment of O_DIRECT (DirectIO)
@:param custom_alignment always overrides DEFAULT_ALIGNMENT (if provided)
'''
DEFAULT_ALIGNMENT = 4096
if custom_alignment != None:
return custom_alignment
else:
return DEFAULT_ALIGNMENT
def get_validated_alignment(custom_alignment=None):
'''
Returns validated alignment of O_DIRECT (DirectIO)
@:param custom_alignment always overrides DEFAULT_ALIGNMENT (if provided)
@:raises ValueError if invalid custom_alignment is provided
'''
MIN_ALIGNMENT = 512
alignment = get_alignment(custom_alignment)
# validates alignment (0 is treated as valid)
if alignment < 0:
raise ValueError('Invalid negative alignment:', alignment)
if alignment > 0 and alignment < MIN_ALIGNMENT:
raise ValueError('Invalid alignment, should >=', MIN_ALIGNMENT, ' if not zero:', alignment)
if alignment > 0 and alignment % MIN_ALIGNMENT != 0:
raise ValueError('Invalid alignment, should be a muliple of', MIN_ALIGNMENT, ' if not zero:', alignment)
# alignment is valid
return alignment
def allocate_aligned_memory(nbytes, custom_alignment=None):
'''
Returns allocated memory with aligned address in the format of (aligned_memory, end_index)
@:param nbytes: #bytes to be allocated
@:param custom_alignment always overrides DEFAULT_ALIGNMENT (if provided)
@:raises ValueError if invalid custom_alignment is provided
'''
# get validated alignment, fail early by raising ValueError if custom_alignment is invalid
alignment = get_validated_alignment(custom_alignment)
# zero alignment
if alignment == 0:
return (bytearray(nbytes), nbytes)
# prepare buffer
buf_size = (nbytes + alignment - 1) // alignment * alignment + alignment * 2
raw_memory = bytearray(buf_size)
ctypes_raw_type = (ctypes.c_char * buf_size)
ctypes_raw_memory = ctypes_raw_type.from_buffer(raw_memory)
raw_address = ctypes.addressof(ctypes_raw_memory)
offset = raw_address % alignment
offset_to_aligned = (alignment - offset) % alignment
ctypes_aligned_type = (ctypes.c_char * (buf_size - offset_to_aligned) )
ctypes_aligned_memory = ctypes_aligned_type.from_buffer(raw_memory, offset_to_aligned)
buffer_end = (nbytes + alignment - 1) // alignment * alignment
# returns (aligned_memory, end_index)
return (ctypes_aligned_memory, buffer_end)
[docs]def read_file(file_path, custom_alignment=None):
"""
通过 O_DIRECT 直接读取文件
自定义对齐字节数有误将触发 ValueError,读取失败会触发 RuntimeError
Args:
file_path (str): 文件路径
custom_alignment (int, optional): 自定义对齐的字节数
Returns:
返回一个二进制字符串
Examples:
>>> from hfai.file import read_file
>>> read_file('/your/path')
"""
try:
file_size = os.stat(file_path).st_size
except Exception as e:
raise RuntimeError('stat {} failed due to {}'.format(file_path, e) )
# get buffer
(aligned_memory, buffer_end) = allocate_aligned_memory(file_size, custom_alignment)
aligned_memview = memoryview(aligned_memory)
# open file
total_read = 0
try:
fd = os.open(file_path, os.O_RDONLY | os.O_DIRECT)
except Exception as e:
raise RuntimeError('open{} failed due to {}'.format(file_path, e) )
# read file
try:
with os.fdopen(fd, mode='rb', buffering=0) as f:
while total_read < file_size:
current_read = f.readinto(aligned_memview[total_read : buffer_end])
total_read += current_read
except Exception as e:
raise RuntimeError('read {} failed due to {}'.format(file_path, e) )
# returns content read from the provided file_path
return aligned_memory[ : total_read]
[docs]def read_file_as_bytesio(file_path, custom_alignment=None):
"""
通过 O_DIRECT 直接读取文件
自定义对齐字节数有误将触发 ValueError,读取失败会触发 RuntimeError
Args:
file_path (str): 文件路径
custom_alignment (int, optional): 自定义对齐的字节数
Returns:
返回一个 BytesIO
Examples:
>>> from hfai.file import read_file_as_bytesio
>>> read_file_as_bytesio('/your/path')
"""
return BytesIO(read_file(file_path, custom_alignment))
class OSyncWriter:
"""
保证文件能够写入硬盘
一般而言,flush,保证了我们的数据从 [内存] 中释放了,但是并不能保证落到 [硬盘]上,特别是对于并行文件系统来说
所以这个类,就能够保证用户的 write,写到了硬盘上
"""
def __init__(self, file, osync=True):
"""
Args:
file: 要打开的文件,写入二进制信息
osync: True,每次 write 都保证落盘,会慢;False,会在 close 的时候做一次 sync 来保证落盘
"""
if osync:
self.fd = os.open(file, os.O_WRONLY | os.O_CREAT | os.O_TRUNC | os.O_SYNC)
else:
self.fd = os.open(file, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
def write(self, buf):
"""
Args:
buf: 写入的二进制内容
"""
os.write(self.fd, buf)
def flush(self):
pass
def close(self, ):
"""
关闭文件,释放 fd
"""
if self.fd is not None:
os.fdatasync(self.fd)
os.close(self.fd)
self.fd = None
@contextmanager
def o_sync_write(file, osync=True):
"""
打开一个文件用来写
Args:
file: 要打开的文件,写入二进制信息
osync: True,每次 write 都保证落盘,会慢;False,会在 退出 o_sync_write 作用域 的时候做一次 sync 来保证落盘
Examples:
>>> with o_sync_write('/tmp/file', osync=True) as w:
>>> w.write(b'0000000001')
"""
file = OSyncWriter(file, osync)
try:
yield file
finally:
file.close()
def main():
from argparse import ArgumentParser
parser = ArgumentParser('Test read file')
parser.add_argument('--src_file_path', type=str, default=None)
parser.add_argument('--dst_file_path', type=str, default=None)
parser.add_argument('--custom_alignment', type=int, default=None)
args = parser.parse_args()
if args.src_file_path:
bytes_io = read_file_as_bytesio(args.src_file_path, args.custom_alignment)
if args.dst_file_path:
with open(args.dst_file_path, 'wb') as f:
f.write(bytes_io.read() )
else:
print('dst_file_path is empty')
else:
print('src_file_path is empty')
if __name__ == '__main__':
main()