From e2f1199cf63b046e94bde2f4705a53a5912fdf78 Mon Sep 17 00:00:00 2001 From: Ed Summers Date: Thu, 21 Dec 2023 18:59:54 -0500 Subject: [PATCH] reformatted with black --- marctable/__init__.py | 16 +++++++- marctable/marc.py | 74 +++++++++++++++++++------------------ marctable/utils.py | 22 ++++++++--- test.py | 86 +++++++++++++++++++++++++++---------------- 4 files changed, 122 insertions(+), 76 deletions(-) diff --git a/marctable/__init__.py b/marctable/__init__.py index 497155d..17ad800 100644 --- a/marctable/__init__.py +++ b/marctable/__init__.py @@ -11,16 +11,26 @@ def cli() -> None: pass + def common_params(f: Callable) -> Callable: """ Decorator for specifying input/output arguments and rules. """ f = click.argument("outfile", type=click.File("w"), default="-")(f) f = click.argument("infile", type=click.File("rb"), default="-")(f) - f = click.option("--rule", "-r", "rules", multiple=True, help="Specify a rule for a field or field/subfield to extract, e.g. 245 or 245a")(f) - f = click.option("--batch", "-b", default=1000, help="Batch n records when converting")(f) + f = click.option( + "--rule", + "-r", + "rules", + multiple=True, + help="Specify a rule for a field or field/subfield to extract, e.g. 245 or 245a", + )(f) + f = click.option( + "--batch", "-b", default=1000, help="Batch n records when converting" + )(f) return f + @cli.command() @common_params def csv(infile: click.File, outfile: click.File, rules: list, batch: int) -> None: @@ -29,6 +39,7 @@ def csv(infile: click.File, outfile: click.File, rules: list, batch: int) -> Non """ to_csv(infile, outfile, rules=rules, batch=batch) + @cli.command() def yaml() -> None: """ @@ -36,5 +47,6 @@ def yaml() -> None: """ marctable.marc.main() + def main() -> None: cli() diff --git a/marctable/marc.py b/marctable/marc.py index fe56a96..ca13b4b 100644 --- a/marctable/marc.py +++ b/marctable/marc.py @@ -9,33 +9,28 @@ import yaml from bs4 import BeautifulSoup -_yaml_file = pathlib.Path(__file__).parent / 'marc.yaml' +_yaml_file = pathlib.Path(__file__).parent / "marc.yaml" _marc = yaml.safe_load(_yaml_file.open()) + class Subfield: - def __init__(self, code: str, name: str, repeatable: bool=False) -> None: + def __init__(self, code: str, name: str, repeatable: bool = False) -> None: self.code = code self.name = name self.repeatable = repeatable @classmethod def from_dict(_, d: dict): - return Subfield( - d.get('code'), - d.get('name'), - d.get('repeatable') - ) + return Subfield(d.get("code"), d.get("name"), d.get("repeatable")) def to_dict(self) -> dict: - return { - "code": self.code, - "name": self.name, - "repeatable": self.repeatable - } + return {"code": self.code, "name": self.name, "repeatable": self.repeatable} + class Field: - def __init__(self, tag: str, name: str, subfields: list[Subfield], - repeatable: False) -> None: + def __init__( + self, tag: str, name: str, subfields: list[Subfield], repeatable: False + ) -> None: self.tag = tag self.name = name self.subfields = subfields @@ -43,18 +38,18 @@ def __init__(self, tag: str, name: str, subfields: list[Subfield], def __str__(self) -> str: if len(self.subfields) > 0: - subfields = ': ' + (','.join([sf.code for sf in self.subfields])) + subfields = ": " + (",".join([sf.code for sf in self.subfields])) else: - subfields = '' + subfields = "" return f"{self.tag} {self.name}: {'R' if self.repeatable else 'NR'} {subfields}" @classmethod def from_dict(klass, d: dict): return Field( - tag=d.get('tag'), - name=d.get('name'), - repeatable=d.get('repeatable'), - subfields=[Subfield.from_dict(d) for d in d['subfields']] + tag=d.get("tag"), + name=d.get("name"), + repeatable=d.get("repeatable"), + subfields=[Subfield.from_dict(d) for d in d["subfields"]], ) def to_dict(self) -> dict: @@ -62,7 +57,7 @@ def to_dict(self) -> dict: "tag": self.tag, "name": self.name, "repeatable": self.repeatable, - "subfields": [sf.to_dict() for sf in self.subfields] + "subfields": [sf.to_dict() for sf in self.subfields], } def get_subfield(self, code: str) -> Subfield: @@ -71,6 +66,7 @@ def get_subfield(self, code: str) -> Subfield: return sf return None + class MARC: def __init__(self) -> None: self.fields = self._load() @@ -96,40 +92,44 @@ def _load(self) -> dict: def fields() -> Generator[Field, None, None]: - toc_url = 'https://www.loc.gov/marc/bibliographic/' + toc_url = "https://www.loc.gov/marc/bibliographic/" toc_doc = _soup(toc_url) - for group_link in toc_doc.select('.contentslist a'): - if re.match(r'^\d+', group_link.text): - group_url = urljoin(toc_url, group_link.attrs['href']) + for group_link in toc_doc.select(".contentslist a"): + if re.match(r"^\d+", group_link.text): + group_url = urljoin(toc_url, group_link.attrs["href"]) group_doc = _soup(group_url) - for field_link in group_doc.select('a'): - if field_link.text == 'Full': - field_url = urljoin(group_url, field_link.attrs['href']) + for field_link in group_doc.select("a"): + if field_link.text == "Full": + field_url = urljoin(group_url, field_link.attrs["href"]) if field := make_field(field_url): yield field + def make_field(url: str) -> Field: soup = _soup(url) - h1 = soup.select_one('h1', first=True).text.strip() - if m1 := re.match(r'^(\d+) - (.+) \((.+)\)$', h1): + h1 = soup.select_one("h1", first=True).text.strip() + if m1 := re.match(r"^(\d+) - (.+) \((.+)\)$", h1): tag, name, repeatable = m1.groups() # most pages put the subfield info in a list subfields = [] - for el in soup.select('table.subfields li'): - if m2 := re.match(r'^\$(.) - (.+) \((.+)\)$', el.text): + for el in soup.select("table.subfields li"): + if m2 := re.match(r"^\$(.) - (.+) \((.+)\)$", el.text): subfields.append(Subfield(m2.group(1), m2.group(2), m2.group(3) == "R")) # some pages use a different layout, of course if len(subfields) == 0: for el in soup.select('td[colspan="1"]'): - for text in el.text.split('$'): + for text in el.text.split("$"): text = text.strip() - if m2 := re.match(r'^(.) - (.+) \((.+)\)$', text): - subfields.append(Subfield(m2.group(1), m2.group(2), m2.group(3) == "R")) + if m2 := re.match(r"^(.) - (.+) \((.+)\)$", text): + subfields.append( + Subfield(m2.group(1), m2.group(2), m2.group(3) == "R") + ) return Field(tag, name, subfields, repeatable == "R") + # scrape the loc website for the marc fields def main() -> None: marc_fields = [] @@ -139,8 +139,10 @@ def main() -> None: # write out the collected data yaml.dump(marc_fields, sys.stdout, default_flow_style=False) + def _soup(url: str) -> BeautifulSoup: - return BeautifulSoup(requests.get(url).text, 'html.parser') + return BeautifulSoup(requests.get(url).text, "html.parser") + if __name__ == "__main__": main() diff --git a/marctable/utils.py b/marctable/utils.py index a30ed58..7c0093d 100644 --- a/marctable/utils.py +++ b/marctable/utils.py @@ -8,14 +8,20 @@ marc = MARC() -def to_dataframe(marc_input: typing.BinaryIO, rules: list=[]) -> DataFrame: + +def to_dataframe(marc_input: typing.BinaryIO, rules: list = []) -> DataFrame: """ Return a single DataFrame for the entire dataset. """ return next(dataframe_iter(marc_input, rules, batch=0)) -def to_csv(marc_input: typing.BinaryIO, csv_output: typing.TextIO, rules: - list=[], batch: int=1000) -> None: + +def to_csv( + marc_input: typing.BinaryIO, + csv_output: typing.TextIO, + rules: list = [], + batch: int = 1000, +) -> None: """ Convert MARC to CSV. """ @@ -23,7 +29,10 @@ def to_csv(marc_input: typing.BinaryIO, csv_output: typing.TextIO, rules: for df in dataframe_iter(marc_input, rules=rules, batch=batch): df.to_csv(csv_output, header=first_batch, index=False) -def dataframe_iter(marc_input: typing.BinaryIO, rules: list=[], batch: int = 1000) -> Generator[DataFrame, None, None]: + +def dataframe_iter( + marc_input: typing.BinaryIO, rules: list = [], batch: int = 1000 +) -> Generator[DataFrame, None, None]: """ Read MARC input and generate Pandas DataFrames for them in batches. """ @@ -85,7 +94,8 @@ def _stringify_field(field: pymarc.Field) -> str: if field.is_control_field(): return field.data else: - return ' '.join([sf.value for sf in field.subfields]) + return " ".join([sf.value for sf in field.subfields]) + def _mapping(rules: list) -> dict: """ @@ -109,6 +119,7 @@ def _mapping(rules: list) -> dict: return m + def _columns(mapping: dict) -> list: cols = [] for field_tag, subfields in mapping.items(): @@ -118,4 +129,3 @@ def _columns(mapping: dict) -> list: for sf in subfields: cols.append(f"F{field_tag}{sf}") return cols - diff --git a/test.py b/test.py index 61a0617..c0933b5 100644 --- a/test.py +++ b/test.py @@ -4,81 +4,103 @@ marc = MARC() + def test_marc() -> None: assert len(marc.fields) == 215 + def test_get_field() -> None: - assert marc.get_field('245') - assert not marc.get_field('abc') + assert marc.get_field("245") + assert not marc.get_field("abc") + def test_get_subfield() -> None: - assert marc.get_subfield('245', 'a').name =='Title' - assert marc.get_subfield('245', '-') is None + assert marc.get_subfield("245", "a").name == "Title" + assert marc.get_subfield("245", "-") is None + def test_non_repeatable_field() -> None: - f245 = marc.get_field('245') + f245 = marc.get_field("245") assert f245.tag == "245" assert f245.name == "Title Statement" assert f245.repeatable is False + def test_repeatable_field() -> None: - f650 = marc.get_field('650') + f650 = marc.get_field("650") assert f650.tag == "650" assert f650.name == "Subject Added Entry-Topical Term" assert f650.repeatable is True + def test_df() -> None: - df = to_dataframe(open('test-data/utf8.marc', 'rb')) + df = to_dataframe(open("test-data/utf8.marc", "rb")) assert len(df.columns) == 215 assert len(df) == 10612 - assert df.iloc[0]['F008'] == '000110s2000 ohu f m eng ' + assert df.iloc[0]["F008"] == "000110s2000 ohu f m eng " # 245 is not repeatable - assert df.iloc[0]['F245'] == 'Leak testing CD-ROM [computer file] / technical editors, Charles N. Jackson, Jr., Charles N. Sherlock ; editor, Patrick O. Moore.' + assert ( + df.iloc[0]["F245"] + == "Leak testing CD-ROM [computer file] / technical editors, Charles N. Jackson, Jr., Charles N. Sherlock ; editor, Patrick O. Moore." + ) # 650 is repeatable - assert df.iloc[0]['F650'] == ['Leak detectors.', 'Gas leakage.'] + assert df.iloc[0]["F650"] == ["Leak detectors.", "Gas leakage."] + def test_custom_fields_df() -> None: - df = to_dataframe(open('test-data/utf8.marc', 'rb'), rules=['245', '650']) + df = to_dataframe(open("test-data/utf8.marc", "rb"), rules=["245", "650"]) assert len(df) == 10612 # should only have two columns in the dataframe assert len(df.columns) == 2 - assert df.columns[0] == 'F245' - assert df.columns[1] == 'F650' - assert df.iloc[0]['F245'] == 'Leak testing CD-ROM [computer file] / technical editors, Charles N. Jackson, Jr., Charles N. Sherlock ; editor, Patrick O. Moore.' - assert df.iloc[0]['F650'] == ['Leak detectors.', 'Gas leakage.'] + assert df.columns[0] == "F245" + assert df.columns[1] == "F650" + assert ( + df.iloc[0]["F245"] + == "Leak testing CD-ROM [computer file] / technical editors, Charles N. Jackson, Jr., Charles N. Sherlock ; editor, Patrick O. Moore." + ) + assert df.iloc[0]["F650"] == ["Leak detectors.", "Gas leakage."] + def test_custom_subfields_df() -> None: - df = to_dataframe(open('test-data/utf8.marc', 'rb'), rules=['245a', '260c']) + df = to_dataframe(open("test-data/utf8.marc", "rb"), rules=["245a", "260c"]) assert len(df) == 10612 assert len(df.columns) == 2 - assert df.columns[0] == 'F245a' - assert df.columns[1] == 'F260c' + assert df.columns[0] == "F245a" + assert df.columns[1] == "F260c" # 245a is not repeatable - assert df.iloc[0]['F245a'] == 'Leak testing CD-ROM' + assert df.iloc[0]["F245a"] == "Leak testing CD-ROM" # 260c is repeatable - assert df.iloc[0]['F260c'] == ['c2000.'] + assert df.iloc[0]["F260c"] == ["c2000."] + def test_field_mapping() -> None: - m = _mapping(['245', '650']) - assert m['245'] is None - assert m['650'] is None + m = _mapping(["245", "650"]) + assert m["245"] is None + assert m["650"] is None + def test_field_subfield_mapping() -> None: - m = _mapping(['245a', '650ax', '260']) - assert set(m['245']) == set(['a']) - assert set(m['650']) == set(['a', 'x']) - assert m['260'] is None + m = _mapping(["245a", "650ax", "260"]) + assert set(m["245"]) == set(["a"]) + assert set(m["650"]) == set(["a", "x"]) + assert m["260"] is None + def test_batch() -> None: - dfs = dataframe_iter(open('test-data/utf8.marc', 'rb'), batch=1000) + dfs = dataframe_iter(open("test-data/utf8.marc", "rb"), batch=1000) df = next(dfs) assert type(df), pandas.DataFrame assert len(df) == 1000 + def test_to_csv() -> None: - to_csv(open('test-data/utf8.marc', 'rb'), open('test-data/utf8.csv', 'w'), batch=1000) - df = pandas.read_csv('test-data/utf8.csv') + to_csv( + open("test-data/utf8.marc", "rb"), open("test-data/utf8.csv", "w"), batch=1000 + ) + df = pandas.read_csv("test-data/utf8.csv") assert len(df) == 10622 assert len(df.columns) == 215 - assert df.iloc[0]['F245'] == 'Leak testing CD-ROM [computer file] / technical editors, Charles N. Jackson, Jr., Charles N. Sherlock ; editor, Patrick O. Moore.' - + assert ( + df.iloc[0]["F245"] + == "Leak testing CD-ROM [computer file] / technical editors, Charles N. Jackson, Jr., Charles N. Sherlock ; editor, Patrick O. Moore." + )