Skip to content

Commit

Permalink
migrate from os.path to pathlib in subyt as suggested in #138
Browse files Browse the repository at this point in the history
  • Loading branch information
marc-portier committed Jan 17, 2025
1 parent c456e21 commit 90690ff
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 90 deletions.
33 changes: 17 additions & 16 deletions sema/subyt/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,18 @@
log = logging.getLogger(__name__)


def assert_writable(file_path: str, force_output: bool = False):
def assert_writable(path_name: str | Path, force_output: bool = False):
out_path = Path(path_name)
if not force_output:
assert not os.path.isfile(
file_path
), f"File to write '{file_path}' already exists"
parent_path = Path(file_path).parent.absolute()
if not os.path.exists(parent_path):
os.makedirs(parent_path)
assert os.access(
parent_path, os.W_OK
), f"Can not write to folder '{parent_path}' for creating new files"
assert not out_path.exists(), (
f"File to write '{path_name}' already exists"
)
# ensure parent folder exists
parent_path = out_path.parent.absolute()
parent_path.mkdir(parents=True, exist_ok=True)
assert os.access(parent_path, os.W_OK), (
f"Can not write to folder '{parent_path}' for creating new files",
)


class SinkFactory:
Expand Down Expand Up @@ -74,17 +75,17 @@ def add(


class SingleFileSink(Sink):
def __init__(self, file_path: str, force_output: bool = False):
def __init__(self, path_name: str, force_output: bool = False):
super().__init__()
assert_writable(file_path, force_output)
self._file_path = file_path
assert_writable(path_name, force_output)
self._file_path: Path = Path(path_name)
self._force_output = force_output
if Path(file_path).exists():
self.mtimes = {file_path: os.stat(file_path).st_mtime}
if self._file_path.exists():
self.mtimes = {path_name: self._file_pathpath_name.stats().st_mtime}

def __repr__(self):
return (
f"SingleFileSink('{str(Path(self._file_path).resolve())}', "
f"SingleFileSink('{str(self._file_path.resolve())}', "
f"{self._force_output})"
)

Expand Down
143 changes: 69 additions & 74 deletions sema/subyt/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@
from typing import Callable

import requests
import validators
from typeguard import check_type

from sema.commons.clean.clean import check_valid_url

from .api import Source

log = logging.getLogger(__name__)


def assert_readable(file_path):
assert os.path.isfile(
file_path
), f"File to read '{file_path}' does not exist"
assert os.access(file_path, os.R_OK), f"Can not read '{file_path}'"
def assert_readable(path_name: str | Path):
in_path = Path(path_name)
assert in_path.is_file(), f"File to read '{path_name}' does not exist"
assert os.access(in_path, os.R_OK), f"Can not read '{path_name}'"


def fname_from_cdisp(cdisp):
Expand All @@ -45,9 +45,9 @@ def _add(self, mime: str, sourceClass: Callable[[str], Source]) -> None:
self._register[mime] = sourceClass

def _find(self, mime: str):
assert (
mime in self._register
), f"no Source class available for mime '{mime}'"
assert mime in self._register, (
f"no Source class available for mime '{mime}'",
)
return self._register[mime]

@staticmethod
Expand Down Expand Up @@ -87,43 +87,54 @@ def mime_from_identifier(identifier: str) -> str:
return mimetypes.guess_type(identifier)[0] # type: ignore

@staticmethod
def make_source(identifier: str) -> Source:
if validators.url(identifier):
def make_source(identifier: str | Path) -> Source:
# check for url
if check_valid_url(str(identifier)):
mime: str = SourceFactory.mime_from_remote(identifier) # type: ignore # noqa
assert False, "TODO remote Source support - see issues #8"

# else
if os.path.isdir(identifier):
source = FolderSource(identifier)
# else get input types nicely split str vs Path
source_path: Path = Path(identifier)
identifier = str(identifier)
# check for folder source
source: Source = None
if source_path.is_dir():
source = FolderSource(source_path)
return source

# else
# else check for glob
if glob.has_magic(identifier):
source = GlobSource(identifier)
return source

# else
# else should be single file with source tuned to mime
mime: str = SourceFactory.mime_from_identifier(identifier)
assert (
mime is not None
), f"no valid mime derived from identifier '{identifier}'"
sourceClass: Callable[[str], Source] = SourceFactory.instance()._find(
mime
assert mime is not None, (
f"no valid mime derived from identifier '{identifier}'",
)
source: Source = sourceClass(identifier)

sourceClass: Callable[[str], Source] = None
sourceClass = SourceFactory.instance()._find(mime)
source = sourceClass(source_path)
return source


class CollectionSource(Source):
def __init__(self) -> None:
super().__init__()
self._collection_path = "."
self._sourcefiles = []
self._collection_path: Path = Path(".")
self._sourcefiles: list[Path] = []

def __repr__(self):
return f"{type(self).__name__}('{self._collection_path}')"

def _init_sourcefiles(self, source_paths: list[Path]):
self._sourcefiles = sorted(source_paths)
assert len(self._sourcefiles) > 0, (
f"{self} should have content files.",
)
self.mtimes = {str(p): p.stat().st_mtime for p in self._sourcefiles}
self._reset()

def _reset(self):
self._current_source = None
self._current_iter = None
Expand All @@ -138,9 +149,7 @@ def _nextSource(self):
self._ix += 1
if self._ix < len(self._sourcefiles):
self._current_source = SourceFactory.make_source(
os.path.join(
self._collection_path, self._sourcefiles[self._ix]
)
self._sourcefiles[self._ix]
)
self._current_iter = self._current_source.__enter__()
else:
Expand Down Expand Up @@ -179,37 +188,28 @@ def __exit__(self):


class FolderSource(CollectionSource):
def __init__(self, folder_path):
def __init__(self, folder_path: Path):
super().__init__()
self._collection_path = os.path.abspath(folder_path)
self._sourcefiles = sorted(
list(next(os.walk(self._collection_path), (None, None, []))[2])
)
assert (
len(self._sourcefiles) > 0
), f"FolderSource '{self._collection_path}' should have content files."
self._reset()
self.mtimes = {}
for p in self._sourcefiles:
p = Path(self._collection_path) / Path(p)
self.mtimes.update({str(p): os.stat(p).st_mtime})
self._collection_path = folder_path.absolute()
self._init_sourcefiles([
f for f in self._collection_path.iterdir() if f.is_file()
])

def __repr__(self):
return f"FolderSource('{self._collection_path}')"


class GlobSource(CollectionSource):
def __init__(self, pattern, pattern_root_dir="."):
def __init__(self, pattern: str, pattern_root_dir: str = "."):
super().__init__()
self._collection_path = pattern_root_dir
self._sourcefiles = sorted(
[p for p in glob.glob(pattern) if os.path.isfile(p)]
)
assert (
len(self._sourcefiles) > 0
), f"GlobSource '{self._collection_path}' should have content files."
self._reset()
self.mtimes = {}
for p in self._sourcefiles:
p = Path(self._collection_path) / Path(p)
self.mtimes.update({str(p): os.stat(p).st_mtime})
self._collection_path = Path(pattern_root_dir).absolute()
self._pattern: str = pattern
self._init_sourcefiles([
f for f in self._collection_path.glob(pattern) if f.is_file()
])

def __repr__(self):
return f"GlobSource('{self._pattern}', '{self._collection_path}')"


try:
Expand All @@ -220,12 +220,11 @@ class CSVFileSource(Source):
Source producing iterator over data-set coming from CSV on file
"""

def __init__(self, csv_file_path):
def __init__(self, csv_file_path: Path):
super().__init__()
assert_readable(csv_file_path)
self._csv = csv_file_path
if Path(csv_file_path).exists():
self.mtimes = {csv_file_path: os.stat(csv_file_path).st_mtime}
self._csv: Path = csv_file_path.absolute()
self._init_source(self._csv)

def __enter__(self):
self._csvfile = open(self._csv, mode="r", encoding="utf-8-sig")
Expand All @@ -235,13 +234,13 @@ def __exit__(self):
self._csvfile.close()

def __repr__(self):
return f"CSVFileSource('{os.path.abspath(self._csv)}')"
return f"CSVFileSource('{self._csv!s}')"

SourceFactory.register("text/csv", CSVFileSource)
# wrong, yet useful mime for csv:
SourceFactory.register("application/csv", CSVFileSource)
except ImportError:
log.warn("Python CSV module not available -- disabling CSV support!")
log.warning("Python CSV module not available -- disabling CSV support!")


try:
Expand All @@ -252,14 +251,11 @@ class JsonFileSource(Source):
Source producing iterator over data-set coming from json on file
"""

def __init__(self, json_file_path):
def __init__(self, json_file_path: Path):
super().__init__()
assert_readable(json_file_path)
self._json = json_file_path
if Path(json_file_path).exists():
self.mtimes = {
json_file_path: os.stat(json_file_path).st_mtime
}
self._json = json_file_path.absolute()
self._init_source(self._json)

def __enter__(self):
# note this is loading everything in memory
Expand All @@ -283,11 +279,11 @@ def __exit__(self):
pass

def __repr__(self):
return f"JsonFileSource('{os.path.abspath(self._json)}')"
return f"JsonFileSource('{self._json!s}')"

SourceFactory.register("application/json", JsonFileSource)
except ImportError:
log.warn("Python JSON module not available -- disabling JSON support!")
log.warning("Python JSON module not available -- disabling JSON support!")


try:
Expand All @@ -298,12 +294,11 @@ class XMLFileSource(Source):
Source producing iterator over data-set coming from XML on file
"""

def __init__(self, xml_file_path):
def __init__(self, xml_file_path: Path):
super().__init__()
assert_readable(xml_file_path)
self._xml = xml_file_path
if Path(xml_file_path).exists():
self.mtimes = {xml_file_path: os.stat(xml_file_path).st_mtime}
self._xml: Path = xml_file_path.absolute()
self._init_source(self._xml)

def __enter__(self):
with open(self._xml, mode="r", encoding="utf-8-sig") as xmlfile:
Expand All @@ -324,11 +319,11 @@ def __exit__(self):
pass

def __repr__(self):
return f"XMLFileSource('{os.path.abspath(self._xml)}')"
return f"XMLFileSource('{self._xml}')"

SourceFactory.map("eml", "text/xml")
SourceFactory.register("text/xml", XMLFileSource)
# wrong, yet useful mime for xml:
SourceFactory.register("application/xml", XMLFileSource)
except ImportError:
log.warn("Python XML module not available -- disabling XML support!")
log.warning("Python XML module not available -- disabling XML support!")

0 comments on commit 90690ff

Please sign in to comment.