Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement PRA fetcher #352

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 239 additions & 0 deletions metadata_fetcher/fetchers/preservica_api_fetcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
import json
from xml.etree import ElementTree
from .Fetcher import Fetcher
from requests.auth import HTTPBasicAuth
from requests.adapters import Retry
from requests.adapters import HTTPAdapter
import requests
import re


class PreservicaApiFetcher(Fetcher):
BASE_URL: str = "https://us.preservica.com/api/entity/v6.0"

NAMESPACES: dict = {
"pra": "http://preservica.com/EntityAPI/v6.0",
"xip": "http://preservica.com/XIP/v6.0"
}

def __init__(self, params: dict[str]):
"""
Parameters:
params: dict[str]
"""
super(PreservicaApiFetcher, self).__init__(params)

# Tracks where we're at processing record sub-requests
self.record_index = 1
self.record_total = 0

# If `next_url` is a param, we know that this is not
# the fetch of the first page, so skip setting those
# attributes
if "next_url" in params:
for key in params:
setattr(self, key, params[key])
return

credentials = params.get("harvest_data").get("harvest_extra_data")
self.basic_auth_credentials = ([v.strip() for v in credentials.split(',')])
self.internal_collection_id = \
re.search(
r"(?<=SO_)[0-9a-f-]+",
params.get("harvest_data").get("url")
).group(0)

self.next_url = (
f"{self.BASE_URL}/structural-objects/"
f"{self.internal_collection_id}/children?max=1"
)

def build_fetch_request(self) -> dict[str]:
"""
Generates arguments for `requests.get()`.

Returns: dict[str]
"""
return self.build_url_request(self.next_url)

def aggregate_vernacular_content(self, response: str) -> str:
"""
TODO: at time of dev, response is a requests.Response, but will be str when
merged

Parameters:
response: str

Returns: str
"""

response_body = response.text

# Starting with a list of `information-objects` URLs
object_url = ElementTree.fromstring(response_body).\
find("pra:Children/pra:Child", self.NAMESPACES).text

# Getting an individual `information-object`, extracting the URL
# for the oai_dc metadata fragment
metadata_url = self.get_metadata_url_from_object(object_url)

self.record_total = 1 if metadata_url else 0

# Getting the metadata
return self.get_metadata_from_url(metadata_url)

def get_object_type(self, response_body: str):
io_tag = ElementTree.fromstring(response_body).\
find("xip:InformationObject", self.NAMESPACES)

if io_tag:
return "information"

so_tag = ElementTree.fromstring(response_body).\
find("xip:StructuralObject", self.NAMESPACES)

if so_tag:
return "structural"

def get_information_response(self, response_body):
"""
Takes an information object response and performs two additional requests
to get to the structural object. This function is optimistic about the
structure of the XML responses.

Parameters:
response_body: string
Returns: string
"""
object_type = self.get_object_type(response_body)

if object_type == "information":
return response_body

path = "pra:AdditionalInformation/pra:Children"
url = ElementTree.fromstring(response_body).find(path, self.NAMESPACES).text

request = self.build_url_request(url)
response_body = self.build_retry_session().get(**request).text

path = "pra:Children/pra:Child"
url = ElementTree.fromstring(response_body).find(path, self.NAMESPACES).text

request = self.build_url_request(url)
return self.build_retry_session().get(**request).text

def get_metadata_url_from_object(self, url: str):
"""
Second request. It may get a structural object or a information object. With
the former, a couple additional requests have to take place.

Parameters:
url: str

Returns: str
"""
request = self.build_url_request(url)
response_body = self.build_retry_session().get(**request).text

if self.get_object_type(response_body) == "structural":
response_body = self.get_information_response(response_body)

root = ElementTree.fromstring(response_body)

path = ".//pra:Fragment[@schema='http://www.openarchives.org/OAI/2.0/oai_dc/']"
fragment = root.find(path, self.NAMESPACES)

return fragment.text if fragment is not None else None

def get_metadata_from_url(self, url: str) -> str:
"""
Final request. It returns the metadata.

Parameters:
url: str

Returns: str
"""
print(
f"[{self.collection_id}]: Fetching record "
f"({self.record_index} of {self.record_total}) at {url}"
)

self.record_index += 1

request = self.build_url_request(url)
response = self.build_retry_session().get(**request)

return response.text

def check_page(self, response: requests.Response) -> int:
"""
Parameters:
response: requests.Response

Return: int
"""
hits = len(ElementTree.fromstring(response.content).
findall(".//pra:Child", self.NAMESPACES))

print(
f"[{self.collection_id}]: Fetched page {self.write_page} "
f"at {response.url} with {hits} hits"
)

return hits

def increment(self, response: requests.Response):
"""
Sets the `next_url` to fetch and increments the page number.

Parameters:
response: requests.Response
"""
super(PreservicaApiFetcher, self).increment(response)

next_element = ElementTree.fromstring(response.content).\
find("pra:Paging/pra:Next", self.NAMESPACES)
self.next_url = next_element.text if next_element is not None else None

def json(self) -> str:
"""
Generates JSON for the next page of results.

Returns: str
"""
current_state = {
"harvest_type": self.harvest_type,
"basic_auth_credentials": self.basic_auth_credentials,
"collection_id": self.collection_id,
"internal_collection_id": self.internal_collection_id,
"next_url": self.next_url,
"write_page": self.write_page
}
if not self.next_url:
current_state.update({"finished": True})

return json.dumps(current_state)

def build_url_request(self, url: str) -> dict:
"""
Creates a dictionary of parameters needed for requests for this fetcher

Parameters:
url: str

Returns: dict
"""
return {"url": url, "auth": HTTPBasicAuth(*self.basic_auth_credentials)}

@staticmethod
def build_retry_session() -> requests.Session:
"""
Creates and returns request sessions that will retry requests

Requests: requests.Session
"""
session = requests.Session()
retries = Retry(total=3, backoff_factor=2)
session.mount("https://", HTTPAdapter(max_retries=retries))
return session
2 changes: 1 addition & 1 deletion metadata_mapper/mappers/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ def select_cmis_atom_id(self):
return self

def select_preservica_id(self):
calisphere_id = self.mapped_data.get("preservica_id", {}).get('$')
calisphere_id = self.source_metadata.get("entity_id")
self.legacy_couch_db_id = f"{self.collection_id}--{calisphere_id}"
return self

Expand Down
74 changes: 74 additions & 0 deletions metadata_mapper/mappers/preservica/preservica_mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import re
from xml.etree import ElementTree

from ..mapper import Record, Vernacular


class PreservicaRecord(Record):
BASE_URL = "https://oakland.access.preservica.com"

FILE_PREPEND = "sdb:digitalFile%7C"

def UCLDC_map(self):
entity_id = self.source_metadata.get("entity_id")

return {
"calisphere-id": entity_id,
"contributor": self.source_metadata.get("contributor"),
"coverage": self.source_metadata.get("spatial"),
"creator": self.source_metadata.get("creator"),
"date": self.source_metadata.get("date"),
"description": self.source_metadata.get("description"),
"format": self.source_metadata.get("format"),
"identifier": self.source_metadata.get("identifier"),
"isShownAt": (
f"{self.BASE_URL}/file/{self.FILE_PREPEND}{entity_id}/"
),
"isShownBy": (
f"{self.BASE_URL}/download/thumbnail/{self.FILE_PREPEND}{entity_id}"
),
"language": self.source_metadata.get("language"),
"publisher": self.source_metadata.get("publisher"),
"relation": self.source_metadata.get("relation"),
"rights": self.source_metadata.get("rights"),
"source": self.source_metadata.get("source"),
"state_located_in": {"stateLocatedIn": "California"},
"subject": self.source_metadata.get("subject"),
"title": self.source_metadata.get("title"),
"type": self.source_metadata.get("type"),
}


class PreservicaVernacular(Vernacular):
record_cls = PreservicaRecord

# TODO: consider putting this namespace mapping in a place that can be imported
# into both the mapper and fetcher
NAMESPACES: dict = {
"pra": "http://preservica.com/EntityAPI/v6.0",
"xip": "http://preservica.com/XIP/v6.0",
"oai_dc": "http://www.openarchives.org/OAI/2.0/oai_dc/"
}

def parse(self, response_body):
"""
We expect only one record per file for preservica. Minor changes will need to
be made if we begin importing more per page.
"""
et = ElementTree.fromstring(response_body)
container = et.find(".//xip:MetadataContainer", self.NAMESPACES)

dc_record = container.find("xip:Content", self.NAMESPACES).\
find("oai_dc:dc", self.NAMESPACES)

record = {
"entity_id": container.find("xip:Entity", self.NAMESPACES).text,
}
for element in dc_record:
key = re.sub(r"{\S+}", "", element.tag) # Strip the namespace off the tag
value = element.text
if key not in record:
record[key] = []
record[key].append(value)

return self.get_records([record])