Skip to content

Commit

Permalink
reformatted with black
Browse files Browse the repository at this point in the history
  • Loading branch information
edsu committed Dec 21, 2023
1 parent 41fb1ab commit e2f1199
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 76 deletions.
16 changes: 14 additions & 2 deletions marctable/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,26 @@
def cli() -> None:
pass


def common_params(f: Callable) -> Callable:
"""
Decorator for specifying input/output arguments and rules.
"""
f = click.argument("outfile", type=click.File("w"), default="-")(f)
f = click.argument("infile", type=click.File("rb"), default="-")(f)
f = click.option("--rule", "-r", "rules", multiple=True, help="Specify a rule for a field or field/subfield to extract, e.g. 245 or 245a")(f)
f = click.option("--batch", "-b", default=1000, help="Batch n records when converting")(f)
f = click.option(
"--rule",
"-r",
"rules",
multiple=True,
help="Specify a rule for a field or field/subfield to extract, e.g. 245 or 245a",
)(f)
f = click.option(
"--batch", "-b", default=1000, help="Batch n records when converting"
)(f)
return f


@cli.command()
@common_params
def csv(infile: click.File, outfile: click.File, rules: list, batch: int) -> None:
Expand All @@ -29,12 +39,14 @@ def csv(infile: click.File, outfile: click.File, rules: list, batch: int) -> Non
"""
to_csv(infile, outfile, rules=rules, batch=batch)


@cli.command()
def yaml() -> None:
"""
Generate YAML for the MARC specification by scraping the Library of Congress.
"""
marctable.marc.main()


def main() -> None:
cli()
74 changes: 38 additions & 36 deletions marctable/marc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,60 +9,55 @@
import yaml
from bs4 import BeautifulSoup

_yaml_file = pathlib.Path(__file__).parent / 'marc.yaml'
_yaml_file = pathlib.Path(__file__).parent / "marc.yaml"
_marc = yaml.safe_load(_yaml_file.open())


class Subfield:
def __init__(self, code: str, name: str, repeatable: bool=False) -> None:
def __init__(self, code: str, name: str, repeatable: bool = False) -> None:
self.code = code
self.name = name
self.repeatable = repeatable

@classmethod
def from_dict(_, d: dict):
return Subfield(
d.get('code'),
d.get('name'),
d.get('repeatable')
)
return Subfield(d.get("code"), d.get("name"), d.get("repeatable"))

def to_dict(self) -> dict:
return {
"code": self.code,
"name": self.name,
"repeatable": self.repeatable
}
return {"code": self.code, "name": self.name, "repeatable": self.repeatable}


class Field:
def __init__(self, tag: str, name: str, subfields: list[Subfield],
repeatable: False) -> None:
def __init__(
self, tag: str, name: str, subfields: list[Subfield], repeatable: False
) -> None:
self.tag = tag
self.name = name
self.subfields = subfields
self.repeatable = repeatable

def __str__(self) -> str:
if len(self.subfields) > 0:
subfields = ': ' + (','.join([sf.code for sf in self.subfields]))
subfields = ": " + (",".join([sf.code for sf in self.subfields]))
else:
subfields = ''
subfields = ""
return f"{self.tag} {self.name}: {'R' if self.repeatable else 'NR'} {subfields}"

@classmethod
def from_dict(klass, d: dict):
return Field(
tag=d.get('tag'),
name=d.get('name'),
repeatable=d.get('repeatable'),
subfields=[Subfield.from_dict(d) for d in d['subfields']]
tag=d.get("tag"),
name=d.get("name"),
repeatable=d.get("repeatable"),
subfields=[Subfield.from_dict(d) for d in d["subfields"]],
)

def to_dict(self) -> dict:
return {
"tag": self.tag,
"name": self.name,
"repeatable": self.repeatable,
"subfields": [sf.to_dict() for sf in self.subfields]
"subfields": [sf.to_dict() for sf in self.subfields],
}

def get_subfield(self, code: str) -> Subfield:
Expand All @@ -71,6 +66,7 @@ def get_subfield(self, code: str) -> Subfield:
return sf
return None


class MARC:
def __init__(self) -> None:
self.fields = self._load()
Expand All @@ -96,40 +92,44 @@ def _load(self) -> dict:


def fields() -> Generator[Field, None, None]:
toc_url = 'https://www.loc.gov/marc/bibliographic/'
toc_url = "https://www.loc.gov/marc/bibliographic/"
toc_doc = _soup(toc_url)
for group_link in toc_doc.select('.contentslist a'):
if re.match(r'^\d+', group_link.text):
group_url = urljoin(toc_url, group_link.attrs['href'])
for group_link in toc_doc.select(".contentslist a"):
if re.match(r"^\d+", group_link.text):
group_url = urljoin(toc_url, group_link.attrs["href"])
group_doc = _soup(group_url)
for field_link in group_doc.select('a'):
if field_link.text == 'Full':
field_url = urljoin(group_url, field_link.attrs['href'])
for field_link in group_doc.select("a"):
if field_link.text == "Full":
field_url = urljoin(group_url, field_link.attrs["href"])
if field := make_field(field_url):
yield field


def make_field(url: str) -> Field:
soup = _soup(url)
h1 = soup.select_one('h1', first=True).text.strip()
if m1 := re.match(r'^(\d+) - (.+) \((.+)\)$', h1):
h1 = soup.select_one("h1", first=True).text.strip()
if m1 := re.match(r"^(\d+) - (.+) \((.+)\)$", h1):
tag, name, repeatable = m1.groups()

# most pages put the subfield info in a list
subfields = []
for el in soup.select('table.subfields li'):
if m2 := re.match(r'^\$(.) - (.+) \((.+)\)$', el.text):
for el in soup.select("table.subfields li"):
if m2 := re.match(r"^\$(.) - (.+) \((.+)\)$", el.text):
subfields.append(Subfield(m2.group(1), m2.group(2), m2.group(3) == "R"))

# some pages use a different layout, of course
if len(subfields) == 0:
for el in soup.select('td[colspan="1"]'):
for text in el.text.split('$'):
for text in el.text.split("$"):
text = text.strip()
if m2 := re.match(r'^(.) - (.+) \((.+)\)$', text):
subfields.append(Subfield(m2.group(1), m2.group(2), m2.group(3) == "R"))
if m2 := re.match(r"^(.) - (.+) \((.+)\)$", text):
subfields.append(
Subfield(m2.group(1), m2.group(2), m2.group(3) == "R")
)

return Field(tag, name, subfields, repeatable == "R")


# scrape the loc website for the marc fields
def main() -> None:
marc_fields = []
Expand All @@ -139,8 +139,10 @@ def main() -> None:
# write out the collected data
yaml.dump(marc_fields, sys.stdout, default_flow_style=False)


def _soup(url: str) -> BeautifulSoup:
return BeautifulSoup(requests.get(url).text, 'html.parser')
return BeautifulSoup(requests.get(url).text, "html.parser")


if __name__ == "__main__":
main()
22 changes: 16 additions & 6 deletions marctable/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,31 @@

marc = MARC()

def to_dataframe(marc_input: typing.BinaryIO, rules: list=[]) -> DataFrame:

def to_dataframe(marc_input: typing.BinaryIO, rules: list = []) -> DataFrame:
"""
Return a single DataFrame for the entire dataset.
"""
return next(dataframe_iter(marc_input, rules, batch=0))

def to_csv(marc_input: typing.BinaryIO, csv_output: typing.TextIO, rules:
list=[], batch: int=1000) -> None:

def to_csv(
marc_input: typing.BinaryIO,
csv_output: typing.TextIO,
rules: list = [],
batch: int = 1000,
) -> None:
"""
Convert MARC to CSV.
"""
first_batch = True
for df in dataframe_iter(marc_input, rules=rules, batch=batch):
df.to_csv(csv_output, header=first_batch, index=False)

def dataframe_iter(marc_input: typing.BinaryIO, rules: list=[], batch: int = 1000) -> Generator[DataFrame, None, None]:

def dataframe_iter(
marc_input: typing.BinaryIO, rules: list = [], batch: int = 1000
) -> Generator[DataFrame, None, None]:
"""
Read MARC input and generate Pandas DataFrames for them in batches.
"""
Expand Down Expand Up @@ -85,7 +94,8 @@ def _stringify_field(field: pymarc.Field) -> str:
if field.is_control_field():
return field.data
else:
return ' '.join([sf.value for sf in field.subfields])
return " ".join([sf.value for sf in field.subfields])


def _mapping(rules: list) -> dict:
"""
Expand All @@ -109,6 +119,7 @@ def _mapping(rules: list) -> dict:

return m


def _columns(mapping: dict) -> list:
cols = []
for field_tag, subfields in mapping.items():
Expand All @@ -118,4 +129,3 @@ def _columns(mapping: dict) -> list:
for sf in subfields:
cols.append(f"F{field_tag}{sf}")
return cols

86 changes: 54 additions & 32 deletions test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,81 +4,103 @@

marc = MARC()


def test_marc() -> None:
assert len(marc.fields) == 215


def test_get_field() -> None:
assert marc.get_field('245')
assert not marc.get_field('abc')
assert marc.get_field("245")
assert not marc.get_field("abc")


def test_get_subfield() -> None:
assert marc.get_subfield('245', 'a').name =='Title'
assert marc.get_subfield('245', '-') is None
assert marc.get_subfield("245", "a").name == "Title"
assert marc.get_subfield("245", "-") is None


def test_non_repeatable_field() -> None:
f245 = marc.get_field('245')
f245 = marc.get_field("245")
assert f245.tag == "245"
assert f245.name == "Title Statement"
assert f245.repeatable is False


def test_repeatable_field() -> None:
f650 = marc.get_field('650')
f650 = marc.get_field("650")
assert f650.tag == "650"
assert f650.name == "Subject Added Entry-Topical Term"
assert f650.repeatable is True


def test_df() -> None:
df = to_dataframe(open('test-data/utf8.marc', 'rb'))
df = to_dataframe(open("test-data/utf8.marc", "rb"))
assert len(df.columns) == 215
assert len(df) == 10612
assert df.iloc[0]['F008'] == '000110s2000 ohu f m eng '
assert df.iloc[0]["F008"] == "000110s2000 ohu f m eng "
# 245 is not repeatable
assert df.iloc[0]['F245'] == 'Leak testing CD-ROM [computer file] / technical editors, Charles N. Jackson, Jr., Charles N. Sherlock ; editor, Patrick O. Moore.'
assert (
df.iloc[0]["F245"]
== "Leak testing CD-ROM [computer file] / technical editors, Charles N. Jackson, Jr., Charles N. Sherlock ; editor, Patrick O. Moore."
)
# 650 is repeatable
assert df.iloc[0]['F650'] == ['Leak detectors.', 'Gas leakage.']
assert df.iloc[0]["F650"] == ["Leak detectors.", "Gas leakage."]


def test_custom_fields_df() -> None:
df = to_dataframe(open('test-data/utf8.marc', 'rb'), rules=['245', '650'])
df = to_dataframe(open("test-data/utf8.marc", "rb"), rules=["245", "650"])
assert len(df) == 10612
# should only have two columns in the dataframe
assert len(df.columns) == 2
assert df.columns[0] == 'F245'
assert df.columns[1] == 'F650'
assert df.iloc[0]['F245'] == 'Leak testing CD-ROM [computer file] / technical editors, Charles N. Jackson, Jr., Charles N. Sherlock ; editor, Patrick O. Moore.'
assert df.iloc[0]['F650'] == ['Leak detectors.', 'Gas leakage.']
assert df.columns[0] == "F245"
assert df.columns[1] == "F650"
assert (
df.iloc[0]["F245"]
== "Leak testing CD-ROM [computer file] / technical editors, Charles N. Jackson, Jr., Charles N. Sherlock ; editor, Patrick O. Moore."
)
assert df.iloc[0]["F650"] == ["Leak detectors.", "Gas leakage."]


def test_custom_subfields_df() -> None:
df = to_dataframe(open('test-data/utf8.marc', 'rb'), rules=['245a', '260c'])
df = to_dataframe(open("test-data/utf8.marc", "rb"), rules=["245a", "260c"])
assert len(df) == 10612
assert len(df.columns) == 2
assert df.columns[0] == 'F245a'
assert df.columns[1] == 'F260c'
assert df.columns[0] == "F245a"
assert df.columns[1] == "F260c"
# 245a is not repeatable
assert df.iloc[0]['F245a'] == 'Leak testing CD-ROM'
assert df.iloc[0]["F245a"] == "Leak testing CD-ROM"
# 260c is repeatable
assert df.iloc[0]['F260c'] == ['c2000.']
assert df.iloc[0]["F260c"] == ["c2000."]


def test_field_mapping() -> None:
m = _mapping(['245', '650'])
assert m['245'] is None
assert m['650'] is None
m = _mapping(["245", "650"])
assert m["245"] is None
assert m["650"] is None


def test_field_subfield_mapping() -> None:
m = _mapping(['245a', '650ax', '260'])
assert set(m['245']) == set(['a'])
assert set(m['650']) == set(['a', 'x'])
assert m['260'] is None
m = _mapping(["245a", "650ax", "260"])
assert set(m["245"]) == set(["a"])
assert set(m["650"]) == set(["a", "x"])
assert m["260"] is None


def test_batch() -> None:
dfs = dataframe_iter(open('test-data/utf8.marc', 'rb'), batch=1000)
dfs = dataframe_iter(open("test-data/utf8.marc", "rb"), batch=1000)
df = next(dfs)
assert type(df), pandas.DataFrame
assert len(df) == 1000


def test_to_csv() -> None:
to_csv(open('test-data/utf8.marc', 'rb'), open('test-data/utf8.csv', 'w'), batch=1000)
df = pandas.read_csv('test-data/utf8.csv')
to_csv(
open("test-data/utf8.marc", "rb"), open("test-data/utf8.csv", "w"), batch=1000
)
df = pandas.read_csv("test-data/utf8.csv")
assert len(df) == 10622
assert len(df.columns) == 215
assert df.iloc[0]['F245'] == 'Leak testing CD-ROM [computer file] / technical editors, Charles N. Jackson, Jr., Charles N. Sherlock ; editor, Patrick O. Moore.'

assert (
df.iloc[0]["F245"]
== "Leak testing CD-ROM [computer file] / technical editors, Charles N. Jackson, Jr., Charles N. Sherlock ; editor, Patrick O. Moore."
)

0 comments on commit e2f1199

Please sign in to comment.