From bb0104d12d1edbfda1e06ea7df236de7ff36bfa4 Mon Sep 17 00:00:00 2001 From: Brian Pugh Date: Sun, 29 Oct 2023 13:30:04 -0700 Subject: [PATCH] Replace hardcoded error values with enums --- src/littlefs/__init__.py | 70 +++++++++++++++------------------------- 1 file changed, 26 insertions(+), 44 deletions(-) diff --git a/src/littlefs/__init__.py b/src/littlefs/__init__.py index 995545c..c86ce51 100644 --- a/src/littlefs/__init__.py +++ b/src/littlefs/__init__.py @@ -10,7 +10,7 @@ try: - __version__ = get_distribution('littlefs-python').version + __version__ = get_distribution("littlefs-python").version except DistributionNotFound: # Package not installed pass @@ -19,10 +19,11 @@ from .context import UserContext from .lfs import LFSStat + class LittleFS: """Littlefs file system""" - def __init__(self, context:Optional['UserContext']=None, mount=True, **kwargs) -> None: + def __init__(self, context: Optional["UserContext"] = None, mount=True, **kwargs) -> None: self.cfg = lfs.LFSConfig(context=context, **kwargs) self.fs = lfs.LFSFilesystem() @@ -38,7 +39,7 @@ def block_count(self) -> int: return self.fs.block_count @property - def context(self) -> 'UserContext': + def context(self) -> "UserContext": """User context of the file system""" return self.cfg.user_context @@ -68,18 +69,12 @@ def fs_grow(self, block_count: int) -> int: return lfs.fs_grow(self.fs, block_count) - def fs_stat(self) -> 'LFSFSStat': + def fs_stat(self) -> "LFSFSStat": """Get the status of the filesystem""" return lfs.fs_stat(self.fs) def open( - self, - fname: str, - mode='r', - buffering: int = -1, - encoding: str = None, - errors: str = None, - newline: str = None + self, fname: str, mode="r", buffering: int = -1, encoding: str = None, errors: str = None, newline: str = None ) -> IO: """Open a file. @@ -140,25 +135,17 @@ def open( exclusive_modes = (creating, reading, writing, appending) if sum(int(m) for m in exclusive_modes) > 1: - raise ValueError( - "must have exactly one of create/read/write/append mode" - ) + raise ValueError("must have exactly one of create/read/write/append mode") if binary: if encoding is not None: - raise ValueError( - "binary mode doesn't take an encoding argument" - ) + raise ValueError("binary mode doesn't take an encoding argument") if errors is not None: - raise ValueError( - "binary mode doesn't take an errors argument" - ) + raise ValueError("binary mode doesn't take an errors argument") if newline is not None: - raise ValueError( - "binary mode doesn't take a newline argument" - ) + raise ValueError("binary mode doesn't take a newline argument") if buffering == 1: msg = ( @@ -210,13 +197,7 @@ def open( if binary: return buffered - wrapped = io.TextIOWrapper( - buffered, - encoding, - errors, - newline, - line_buffering - ) + wrapped = io.TextIOWrapper(buffered, encoding, errors, newline, line_buffering) return wrapped @@ -232,7 +213,7 @@ def removeattr(self, path: str, typ: Union[str, bytes, int]) -> None: typ = _typ_to_uint8(typ) lfs.removeattr(self.fs, path, typ) - def listdir(self, path='.') -> List[str]: + def listdir(self, path=".") -> List[str]: """List directory content List the content of a directory. This function uses :meth:`scandir` @@ -247,7 +228,7 @@ def mkdir(self, path: str) -> int: try: return lfs.mkdir(self.fs, path) except errors.LittleFSError as e: - if e.code == -17: + if e.code == LittleFSError.Error.LFS_ERR_EXIST: msg = "[LittleFSError {:d}] Cannot create a file when that file already exists: '{:s}'.".format( e.code, path ) @@ -256,10 +237,10 @@ def mkdir(self, path: str) -> int: def makedirs(self, name: str, exist_ok=False): """Recursive directory creation function.""" - parts = [p for p in name.split('/') if p] - current_name = '' + parts = [p for p in name.split("/") if p] + current_name = "" for nr, part in enumerate(parts): - current_name += '/%s' % part + current_name += "/%s" % part try: self.mkdir(current_name) except FileExistsError as e: @@ -307,15 +288,15 @@ def removedirs(self, name): function tries to recursively remove all parent directories which are also empty. """ - parts = name.split('/') + parts = name.split("/") while parts: try: - name = '/'.join(parts) + name = "/".join(parts) if not name: break - self.remove('/'.join(parts)) + self.remove("/".join(parts)) except errors.LittleFSError as e: - if e.code == -39: + if e.code == LittleFSError.Error.LFS_ERR_NOTEMPTY: break raise e parts.pop() @@ -331,17 +312,17 @@ def rmdir(self, path: str) -> int: """ return self.remove(path) - def scandir(self, path='.') -> Iterator['LFSStat']: + def scandir(self, path=".") -> Iterator["LFSStat"]: """List directory content""" dh = lfs.dir_open(self.fs, path) info = lfs.dir_read(self.fs, dh) while info: - if info.name not in ['.', '..']: + if info.name not in [".", ".."]: yield info info = lfs.dir_read(self.fs, dh) lfs.dir_close(self.fs, dh) - def stat(self, path: str) -> 'LFSStat': + def stat(self, path: str) -> "LFSStat": """Get the status of a file or directory""" return lfs.stat(self.fs, path) @@ -374,7 +355,7 @@ def walk(self, top: str) -> Iterator[Tuple[str, List[str], List[str]]]: yield top, dirs, files for dirname in dirs: - newtop = '/'.join((top, dirname)).replace('//', '/') + newtop = "/".join((top, dirname)).replace("//", "/") yield from self.walk(newtop) @@ -453,13 +434,14 @@ def flush(self): super().flush() lfs.file_sync(self.fs, self.fh) + def _typ_to_uint8(typ): try: out = ord(typ) except TypeError: out = int(typ) - if not(0 <= out <= 255): + if not (0 <= out <= 255): raise ValueError(f"type must be in range [0, 255]") return out