Skip to content

Commit

Permalink
Replace hardcoded error values with enums
Browse files Browse the repository at this point in the history
  • Loading branch information
BrianPugh committed Dec 2, 2023
1 parent 54e3831 commit bb0104d
Showing 1 changed file with 26 additions and 44 deletions.
70 changes: 26 additions & 44 deletions src/littlefs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


try:
__version__ = get_distribution('littlefs-python').version
__version__ = get_distribution("littlefs-python").version
except DistributionNotFound:
# Package not installed
pass
Expand All @@ -19,10 +19,11 @@
from .context import UserContext
from .lfs import LFSStat


class LittleFS:
"""Littlefs file system"""

def __init__(self, context:Optional['UserContext']=None, mount=True, **kwargs) -> None:
def __init__(self, context: Optional["UserContext"] = None, mount=True, **kwargs) -> None:
self.cfg = lfs.LFSConfig(context=context, **kwargs)
self.fs = lfs.LFSFilesystem()

Expand All @@ -38,7 +39,7 @@ def block_count(self) -> int:
return self.fs.block_count

@property
def context(self) -> 'UserContext':
def context(self) -> "UserContext":
"""User context of the file system"""
return self.cfg.user_context

Expand Down Expand Up @@ -68,18 +69,12 @@ def fs_grow(self, block_count: int) -> int:

return lfs.fs_grow(self.fs, block_count)

def fs_stat(self) -> 'LFSFSStat':
def fs_stat(self) -> "LFSFSStat":
"""Get the status of the filesystem"""
return lfs.fs_stat(self.fs)

def open(
self,
fname: str,
mode='r',
buffering: int = -1,
encoding: str = None,
errors: str = None,
newline: str = None
self, fname: str, mode="r", buffering: int = -1, encoding: str = None, errors: str = None, newline: str = None
) -> IO:
"""Open a file.
Expand Down Expand Up @@ -140,25 +135,17 @@ def open(
exclusive_modes = (creating, reading, writing, appending)

if sum(int(m) for m in exclusive_modes) > 1:
raise ValueError(
"must have exactly one of create/read/write/append mode"
)
raise ValueError("must have exactly one of create/read/write/append mode")

if binary:
if encoding is not None:
raise ValueError(
"binary mode doesn't take an encoding argument"
)
raise ValueError("binary mode doesn't take an encoding argument")

if errors is not None:
raise ValueError(
"binary mode doesn't take an errors argument"
)
raise ValueError("binary mode doesn't take an errors argument")

if newline is not None:
raise ValueError(
"binary mode doesn't take a newline argument"
)
raise ValueError("binary mode doesn't take a newline argument")

if buffering == 1:
msg = (
Expand Down Expand Up @@ -210,13 +197,7 @@ def open(
if binary:
return buffered

wrapped = io.TextIOWrapper(
buffered,
encoding,
errors,
newline,
line_buffering
)
wrapped = io.TextIOWrapper(buffered, encoding, errors, newline, line_buffering)

return wrapped

Expand All @@ -232,7 +213,7 @@ def removeattr(self, path: str, typ: Union[str, bytes, int]) -> None:
typ = _typ_to_uint8(typ)
lfs.removeattr(self.fs, path, typ)

def listdir(self, path='.') -> List[str]:
def listdir(self, path=".") -> List[str]:
"""List directory content
List the content of a directory. This function uses :meth:`scandir`
Expand All @@ -247,7 +228,7 @@ def mkdir(self, path: str) -> int:
try:
return lfs.mkdir(self.fs, path)
except errors.LittleFSError as e:
if e.code == -17:
if e.code == LittleFSError.Error.LFS_ERR_EXIST:
msg = "[LittleFSError {:d}] Cannot create a file when that file already exists: '{:s}'.".format(
e.code, path
)
Expand All @@ -256,10 +237,10 @@ def mkdir(self, path: str) -> int:

def makedirs(self, name: str, exist_ok=False):
"""Recursive directory creation function."""
parts = [p for p in name.split('/') if p]
current_name = ''
parts = [p for p in name.split("/") if p]
current_name = ""
for nr, part in enumerate(parts):
current_name += '/%s' % part
current_name += "/%s" % part
try:
self.mkdir(current_name)
except FileExistsError as e:
Expand Down Expand Up @@ -307,15 +288,15 @@ def removedirs(self, name):
function tries to recursively remove all parent directories
which are also empty.
"""
parts = name.split('/')
parts = name.split("/")
while parts:
try:
name = '/'.join(parts)
name = "/".join(parts)
if not name:
break
self.remove('/'.join(parts))
self.remove("/".join(parts))
except errors.LittleFSError as e:
if e.code == -39:
if e.code == LittleFSError.Error.LFS_ERR_NOTEMPTY:
break
raise e
parts.pop()
Expand All @@ -331,17 +312,17 @@ def rmdir(self, path: str) -> int:
"""
return self.remove(path)

def scandir(self, path='.') -> Iterator['LFSStat']:
def scandir(self, path=".") -> Iterator["LFSStat"]:
"""List directory content"""
dh = lfs.dir_open(self.fs, path)
info = lfs.dir_read(self.fs, dh)
while info:
if info.name not in ['.', '..']:
if info.name not in [".", ".."]:
yield info
info = lfs.dir_read(self.fs, dh)
lfs.dir_close(self.fs, dh)

def stat(self, path: str) -> 'LFSStat':
def stat(self, path: str) -> "LFSStat":
"""Get the status of a file or directory"""
return lfs.stat(self.fs, path)

Expand Down Expand Up @@ -374,7 +355,7 @@ def walk(self, top: str) -> Iterator[Tuple[str, List[str], List[str]]]:

yield top, dirs, files
for dirname in dirs:
newtop = '/'.join((top, dirname)).replace('//', '/')
newtop = "/".join((top, dirname)).replace("//", "/")
yield from self.walk(newtop)


Expand Down Expand Up @@ -453,13 +434,14 @@ def flush(self):
super().flush()
lfs.file_sync(self.fs, self.fh)


def _typ_to_uint8(typ):
try:
out = ord(typ)
except TypeError:
out = int(typ)

if not(0 <= out <= 255):
if not (0 <= out <= 255):
raise ValueError(f"type must be in range [0, 255]")

return out

0 comments on commit bb0104d

Please sign in to comment.