
It is somewhat inconvenient that callers to mk_fs() must create their own temporary directory. Convert it to a class so this can be handled automatically, using a context manager. Rather than specifying the file size in bytes, use MB since it is rare to need a smaller file. Update mk_fs() to use this new class. Signed-off-by: Simon Glass <sjg@chromium.org> --- test/py/tests/fs_helper.py | 200 +++++++++++++++++++++++++++---------- 1 file changed, 145 insertions(+), 55 deletions(-) diff --git a/test/py/tests/fs_helper.py b/test/py/tests/fs_helper.py index 6649aa7bc47..7d20dbacf76 100644 --- a/test/py/tests/fs_helper.py +++ b/test/py/tests/fs_helper.py @@ -9,12 +9,148 @@ import re import os from subprocess import call, check_call, check_output, CalledProcessError from subprocess import DEVNULL +import tempfile -# size_gran (int): Size granularity of file system image in bytes -SIZE_GRAN = 1 << 20 +class FsHelper: + """Creating a filesystem containing test files -def mk_fs(config, fs_type, size, prefix, src_dir=None, fs_img=None, quiet=False): + Usage: + with FsHelper(ubman.config, 'ext4', 10, 'mmc1') as fsh: + # create files in the self.srcdir directory + fsh.mk_fs() + # Now use the filesystem + + # The filesystem and srcdir are erased after the 'with' statement. + + To set the image filename: + + with FsHelper(ubman.config, 'ext4', 10, 'mmc1') as fsh: + self.fs_img = 'myfile.img' + fsh.mk_fs() + ... + + It is also possible to use an existing srcdir: + + with FsHelper(ubman.config, 'fat32', 10, 'usb2') as fsh: + fsh.srcdir = src_dir + fsh.mk_fs() + ... + + Properties: + fs_img (str): Filename for the filesystem image; this is set to a + default value but can be overwritten + """ + def __init__(self, config, fs_type, size_mb, prefix): + """Set up a new object + + Args: + config (u_boot_config): U-Boot configuration + fs_type (str): File system type: one of ext2, ext3, ext4, vfat, + fat12, fat16, fat32, exfat, fs_generic (which means vfat) + size_mb (int): Size of file system in MB + prefix (str): Prefix string of volume's file name + """ + if ('fat' not in fs_type and 'ext' not in fs_type and + fs_type not in ['exfat', 'fs_generic']): + raise ValueError(f"Unsupported filesystem type '{fs_type}'") + + self.config = config + self.fs_type = fs_type + self.size_mb = size_mb + self.prefix = prefix + self.quiet = True + + # Use a default filename; the caller can adjust it + leaf = f'{prefix}.{fs_type}.img' + self.fs_img = os.path.join(config.persistent_data_dir, leaf) + + # Some distributions do not add /sbin to the default PATH, where mkfs + # lives + if '/sbin' not in os.environ["PATH"].split(os.pathsep): + os.environ["PATH"] += os.pathsep + '/sbin' + + self.srcdir = None + self.tmpdir = None + self._do_cleanup = True + + def _get_fs_args(self): + """Get the mkfs options and program to use + + Returns: + tuple: + str: mkfs options, e.g. '-F 32' for fat32 + str: mkfs program to use, e.g 'ext4' for ext4 + """ + if self.fs_type == 'fat12': + mkfs_opt = '-F 12' + elif self.fs_type == 'fat16': + mkfs_opt = '-F 16' + elif self.fs_type == 'fat32': + mkfs_opt = '-F 32' + elif self.fs_type.startswith('ext'): + mkfs_opt = f'-d {self.srcdir}' + else: + mkfs_opt = '' + + if self.fs_type == 'exfat': + fs_lnxtype = 'exfat' + elif re.match('fat', self.fs_type) or self.fs_type == 'fs_generic': + fs_lnxtype = 'vfat' + else: + fs_lnxtype = self.fs_type + return mkfs_opt, fs_lnxtype + + def mk_fs(self): + """Make a new filesystem and copy in the files""" + self.setup() + mkfs_opt, fs_lnxtype = self._get_fs_args() + fs_img = self.fs_img + with open(fs_img, 'wb') as fsi: + fsi.truncate(self.size_mb << 20) + + try: + mkfs_opt, fs_lnxtype = self._get_fs_args() + check_call(f'mkfs.{fs_lnxtype} {mkfs_opt} {fs_img}', shell=True, + stdout=DEVNULL if self.quiet else None) + + if self.fs_type.startswith('ext'): + sb_content = check_output(f'tune2fs -l {fs_img}', + shell=True).decode() + if 'metadata_csum' in sb_content: + check_call(f'tune2fs -O ^metadata_csum {fs_img}', shell=True) + elif fs_lnxtype == 'exfat': + check_call(f'fattools cp {self.srcdir}/* {fs_img}', shell=True) + elif self.srcdir and os.listdir(self.srcdir): + flags = f"-smpQ{'' if self.quiet else 'v'}" + check_call(f'mcopy -i {fs_img} {flags} {self.srcdir}/* ::/', + shell=True) + except CalledProcessError: + os.remove(fs_img) + raise + + def setup(self): + """Set up the srcdir ready to receive files""" + if not self.srcdir: + self.tmpdir = tempfile.TemporaryDirectory('fs_helper') + self.srcdir = self.tmpdir.name + + def cleanup(self): + """Remove created files""" + if self.tmpdir: + self.tmpdir.cleanup() + os.remove(self.fs_img) + + def __enter__(self): + self.setup() + return self + + def __exit__(self, extype, value, traceback): + self.cleanup() + + +def mk_fs(config, fs_type, size, prefix, src_dir=None, size_gran = 0x100000, + fs_img=None, quiet=False): """Create a file system volume Args: @@ -29,59 +165,13 @@ def mk_fs(config, fs_type, size, prefix, src_dir=None, fs_img=None, quiet=False) Raises: CalledProcessError: if any error occurs when creating the filesystem """ - if not fs_img: - leaf = f'{prefix}.{fs_type}.img' - fs_img = os.path.join(config.persistent_data_dir, leaf) - - if fs_type == 'fat12': - mkfs_opt = '-F 12' - elif fs_type == 'fat16': - mkfs_opt = '-F 16' - elif fs_type == 'fat32': - mkfs_opt = '-F 32' - else: - mkfs_opt = '' - - if fs_type == 'exfat': - fs_lnxtype = 'exfat' - elif re.match('fat', fs_type) or fs_type == 'fs_generic': - fs_lnxtype = 'vfat' - else: - fs_lnxtype = fs_type - - if src_dir: - if fs_lnxtype == 'ext4': - mkfs_opt = mkfs_opt + ' -d ' + src_dir - elif fs_lnxtype not in ('vfat', 'exfat'): - raise ValueError(f'src_dir not implemented for fs {fs_lnxtype}') - - count = (size + SIZE_GRAN - 1) // SIZE_GRAN - - # Some distributions do not add /sbin to the default PATH, where mkfs lives - if '/sbin' not in os.environ["PATH"].split(os.pathsep): - os.environ["PATH"] += os.pathsep + '/sbin' + fsh = FsHelper(config, fs_type, size >> 20, prefix) + fsh.srcdir = src_dir + if fs_img: + fsh.fs_img = fs_img + fsh.mk_fs() + return fsh.fs_img - try: - check_call(f'rm -f {fs_img}', shell=True) - check_call(f'truncate -s $(( {SIZE_GRAN} * {count} )) {fs_img}', - shell=True) - check_call(f'mkfs.{fs_lnxtype} {mkfs_opt} {fs_img}', shell=True, - stdout=DEVNULL if quiet else None) - if fs_type == 'ext4': - sb_content = check_output(f'tune2fs -l {fs_img}', - shell=True).decode() - if 'metadata_csum' in sb_content: - check_call(f'tune2fs -O ^metadata_csum {fs_img}', shell=True) - elif fs_lnxtype == 'vfat' and src_dir: - flags = f"-smpQ{'' if quiet else 'v'}" - check_call(f'mcopy -i {fs_img} {flags} {src_dir}/* ::/', - shell=True) - elif fs_lnxtype == 'exfat' and src_dir: - check_call(f'fattools cp {src_dir}/* {fs_img}', shell=True) - return fs_img - except CalledProcessError: - call(f'rm -f {fs_img}', shell=True) - raise def setup_image(ubman, devnum, part_type, img_size=20, second_part=False, basename='mmc'): -- 2.43.0