Skip to content

Commit

Permalink
Elsevier: processing
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestaP committed Nov 23, 2023
1 parent 4f71d05 commit 306a642
Show file tree
Hide file tree
Showing 24 changed files with 12,992 additions and 208 deletions.
9 changes: 8 additions & 1 deletion dags/common/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,14 @@ def clean_all_affiliations_for_author(data):


def remove_unnecessary_fields(obj):
fieldnames = ["curated", "citeable", "files", "date_published", "source_file_path"]
fieldnames = [
"curated",
"citeable",
"files",
"date_published",
"source_file_path",
"local_files",
]
[obj.pop(field, None) for field in fieldnames]
return obj

Expand Down
71 changes: 52 additions & 19 deletions dags/common/pull_ftp.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import base64
import io
import os
import tarfile
import zipfile
from datetime import datetime
Expand All @@ -19,9 +20,10 @@ def migrate_files(
s_ftp: SFTP_FTP_TYPE,
repo: IRepository,
logger: PrintLogger,
process_archives: bool = True,
):
logger.msg("Processing files.", filenames=archives_names)
extracted_filenames = []
extracted_or_downloaded_filenames = []
for archive_name in archives_names:
logger.msg("Getting file from SFTP.", file=archive_name)
file_bytes = s_ftp.get_file(archive_name)
Expand All @@ -32,26 +34,37 @@ def migrate_files(
or ".tar" in archive_name
and tarfile.is_tarfile(file_bytes)
):
for (archive_file_content, s3_filename) in process_archive(
file_bytes=file_bytes, file_name=archive_name
):
repo.save(s3_filename, io.BytesIO(archive_file_content))
if repo.is_meta(s3_filename):
extracted_filenames.append("extracted/" + s3_filename)
repo.save(archive_name, file_bytes)
if process_archives:
for (archive_file_content, s3_filename) in process_archive(
file_bytes=file_bytes, file_name=archive_name
):
repo.save(s3_filename, io.BytesIO(archive_file_content))
if repo.is_meta(s3_filename):
extracted_or_downloaded_filenames.append(
os.path.join("extracted/", s3_filename)
)
repo.save(archive_name, file_bytes)
else:
extracted_or_downloaded_filenames.append(
os.path.join("raw", archive_name)
)
repo.save(archive_name, file_bytes)

else:
logger.info(
"File is not zip or tar, processing the next one",
file_name=archive_name,
)
continue

return extracted_filenames
return extracted_or_downloaded_filenames


def migrate_from_ftp(
s_ftp: SFTP_FTP_TYPE, repo: IRepository, logger: PrintLogger, **kwargs
s_ftp: SFTP_FTP_TYPE,
repo: IRepository,
logger: PrintLogger,
publisher=None,
**kwargs,
):
params = kwargs["params"]
force_pull_specific_files = (
Expand All @@ -66,10 +79,10 @@ def migrate_from_ftp(
)

if force_pull_all_files:
return _force_pull(s_ftp, repo, logger, **kwargs)
return _force_pull(s_ftp, repo, logger, publisher, **kwargs)
elif force_pull_specific_files:
return _filenames_pull(s_ftp, repo, logger, **kwargs)
return _differential_pull(s_ftp, repo, logger, **kwargs)
return _filenames_pull(s_ftp, repo, logger, publisher, **kwargs)
return _differential_pull(s_ftp, repo, logger, publisher, **kwargs)


def reprocess_files(repo: IRepository, logger: PrintLogger, **kwargs):
Expand All @@ -79,23 +92,36 @@ def reprocess_files(repo: IRepository, logger: PrintLogger, **kwargs):
return _find_files_in_zip(filenames, repo)


def _force_pull(s_ftp: SFTP_FTP_TYPE, repo: IRepository, logger: PrintLogger, **kwargs):
def _force_pull(
s_ftp: SFTP_FTP_TYPE,
repo: IRepository,
logger: PrintLogger,
publisher: str,
**kwargs,
):
logger.msg("Force Pulling from SFTP.")
excluded_directories = kwargs["params"]["excluded_directories"]
filenames = s_ftp.list_files(excluded_directories=excluded_directories)
return migrate_files(filenames, s_ftp, repo, logger)
process_archives = publisher != "elsevier"
return migrate_files(
filenames, s_ftp, repo, logger, process_archives=process_archives
)


def _filenames_pull(
s_ftp: SFTP_FTP_TYPE,
repo: IRepository,
logger: PrintLogger,
publisher: str,
**kwargs,
):
filenames_pull_params = kwargs["params"]["filenames_pull"]
filenames = filenames_pull_params["filenames"]
logger.msg("Pulling specified filenames from SFTP")
return migrate_files(filenames, s_ftp, repo, logger)
process_archives = publisher != "elsevier"
return migrate_files(
filenames, s_ftp, repo, logger, process_archives=process_archives
)


def _find_files_in_zip(filenames, repo: IRepository):
Expand All @@ -113,14 +139,21 @@ def _find_files_in_zip(filenames, repo: IRepository):


def _differential_pull(
s_ftp: SFTP_FTP_TYPE, repo: IRepository, logger: PrintLogger, **kwargs
s_ftp: SFTP_FTP_TYPE,
repo: IRepository,
logger: PrintLogger,
publisher: str,
**kwargs,
):
logger.msg("Pulling missing files only.")
excluded_directories = kwargs["params"]["excluded_directories"]
sftp_files = s_ftp.list_files(excluded_directories=excluded_directories)
s3_files = repo.get_all_raw_filenames()
diff_files = list(filter(lambda x: x not in s3_files, sftp_files))
return migrate_files(diff_files, s_ftp, repo, logger)
process_archives = publisher != "elsevier"
return migrate_files(
diff_files, s_ftp, repo, logger, process_archives=process_archives
)


def trigger_file_processing(
Expand Down
23 changes: 14 additions & 9 deletions dags/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import zipfile
from ftplib import error_perm
from io import StringIO
from os.path import basename
from stat import S_ISDIR, S_ISREG

from airflow.models.dagrun import DagRun
Expand Down Expand Up @@ -111,7 +110,7 @@ def walk_ftp(ftp, remotedir, paths):
walk_ftp(ftp=ftp, remotedir=entry, paths=paths)
except error_perm:
ftp.cwd("/")
paths.append(basename(entry))
paths.append(os.path.basename(entry))


def construct_license(license_type, version, url=None):
Expand Down Expand Up @@ -203,28 +202,34 @@ def check_dagrun_state(dagrun: DagRun, not_allowed_states=[], allowed_states=[])
return all(states_values)


def process_zip_file(file_bytes, file_name):
def process_zip_file(file_bytes, file_name, **kwargs):
file_bytes.seek(0)
only_specific_file = kwargs.get("only_specific_file")
with zipfile.ZipFile(file_bytes) as zip:
for filename in zip.namelist():
file_prefix = ".".join(file_name.split(".")[:-1])
if only_specific_file and only_specific_file not in filename:
continue
zip_file_content = zip.read(filename)
file_prefix = ".".join(file_name.split(".")[:-1])
s3_filename = os.path.join(file_prefix, filename)
yield (zip_file_content, s3_filename)


def process_tar_file(file_bytes, file_name):
def process_tar_file(file_bytes, file_name, **kwargs):
file_bytes.seek(0)
only_specific_file = kwargs.get("only_specific_file")
with tarfile.open(fileobj=file_bytes, mode="r") as tar:
for filename in tar.getnames():
file_prefix = ".".join(file_name.split(".")[:-1])
if only_specific_file and only_specific_file not in filename:
continue
tar_file_content = tar.extractfile(filename).read()
file_prefix = ".".join(file_name.split(".")[:-1])
s3_filename = os.path.join(file_prefix, filename)
yield (tar_file_content, s3_filename)


def process_archive(file_bytes, file_name):
def process_archive(file_bytes, file_name, **kwargs):
if zipfile.is_zipfile(file_bytes):
return process_zip_file(file_bytes, file_name)
return process_zip_file(file_bytes, file_name, **kwargs)
if tarfile.is_tarfile(file_bytes):
return process_tar_file(file_bytes, file_name)
return process_tar_file(file_bytes, file_name, **kwargs)
64 changes: 20 additions & 44 deletions dags/elsevier/elsevier_file_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,25 @@
from common.enhancer import Enhancer
from common.enricher import Enricher
from common.exceptions import EmptyOutputFromPreviousTask
from common.repository import IRepository
from common.utils import parse_without_names_spaces
from elsevier.metadata_parser import ElsevierMetadataParser
from elsevier.parser import ElsevierParser
from elsevier.repository import ElsevierRepository
from jsonschema import validate


def parse_elsevier(**kwargs):
if "params" not in kwargs or "file" not in kwargs["params"]:
raise KeyError("There was no 'file' parameter. Exiting run.")
if "params" not in kwargs or "file_name" not in kwargs["params"]:
raise KeyError("There was 'file_name' parameter. Exiting run.")
encoded_xml = kwargs["params"]["file"]
file_name = kwargs["params"]["file_name"]
try:
encoded_xml = kwargs["params"]["file_content"]
except KeyError:
raise Exception("There was no 'file_content' parameter. Exiting run.")
xml_bytes = base64.b64decode(encoded_xml)
xml = parse_without_names_spaces(xml_bytes.decode("utf-8"))
parser = ElsevierParser()
parsed = parser.parse(xml)
parsed["source_file_path"] = file_name
return parsed
try:
metadata = kwargs["params"]["metadata"]
except KeyError:
raise Exception("Record is missing metadata. Exiting run.")
return {**parsed, **metadata}


def enhance_elsevier(parsed_file):
Expand All @@ -37,21 +35,6 @@ def enrich_elsevier(enhanced_file):
return Enricher()(enhanced_file)


def elsevier_parse_metadata(enriched_file, repo):
file_path = enriched_file["source_file_path"]
dataset_file_path = file_path.split("/")[:3]
dataset_path_parts = dataset_file_path + ["dataset.xml"]
full_path = "/".join((dataset_path_parts))
file = repo.get_by_id(full_path)
xml_bytes = base64.b64decode(base64.b64encode(file.getvalue()).decode())
xml = parse_without_names_spaces(xml_bytes.decode("utf-8"))
parser = ElsevierMetadataParser(
file_path=file_path, doi=enriched_file["dois"][0]["value"]
)
metadata = parser.parse(xml)
return {**enriched_file, **metadata}


def elsevier_validate_record(file_with_metadata):
schema = requests.get(file_with_metadata["$schema"]).json()
validate(file_with_metadata, schema)
Expand All @@ -64,34 +47,27 @@ def parse(**kwargs):
return parse_elsevier(**kwargs)

@task()
def parse_metadata(parsed_file, repo: IRepository = ElsevierRepository()):
def enhance(parsed_file):
if parsed_file:
return elsevier_parse_metadata(parsed_file, repo)
raise EmptyOutputFromPreviousTask("parsed_file")

@task()
def enhance(parsed_file_with_metadata):
if parsed_file_with_metadata:
return parsed_file and enhance_elsevier(parsed_file_with_metadata)
return parsed_file and enhance_elsevier(parsed_file)
raise EmptyOutputFromPreviousTask("parse_metadata")

@task()
def enrich(enhanced_file_with_metadata):
if enhanced_file_with_metadata:
return enrich_elsevier(enhanced_file_with_metadata)
def enrich(enhanced_file):
if enhanced_file:
return enrich_elsevier(enhanced_file)
raise EmptyOutputFromPreviousTask("enhanced_file_with_metadata")

@task()
def validate_record(enriched_file_with_metadata):
if enriched_file_with_metadata:
return elsevier_validate_record(enriched_file_with_metadata)
def validate_record(enriched_file):
if enriched_file:
return elsevier_validate_record(enriched_file)
raise EmptyOutputFromPreviousTask("enriched_file_with_metadata")

parsed_file = parse()
parsed_file_with_metadata = parse_metadata(parsed_file)
enhanced_file_with_metadata = enhance(parsed_file_with_metadata)
enriched_file_with_metadata = enrich(enhanced_file_with_metadata)
validate_record(enriched_file_with_metadata)
enhanced_file = enhance(parsed_file)
enriched_file = enrich(enhanced_file)
validate_record(enriched_file)


Elsevier_file_processing = elsevier_process_file()
16 changes: 10 additions & 6 deletions dags/elsevier/elsevier_pull_ftp.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import common.pull_ftp as pull_ftp
import pendulum
from airflow.decorators import dag, task
from common.ftp_service import FTPService
from common.pull_ftp import migrate_from_ftp as migrate_from_ftp_common
from common.pull_ftp import reprocess_files
from common.repository import IRepository
from elsevier.repository import ElsevierRepository
from elsevier.sftp_service import ElsevierSFTPService
from elsevier.trigger_file_processing import trigger_file_processing_elsevier
from structlog import get_logger


Expand Down Expand Up @@ -32,23 +34,25 @@ def migrate_from_ftp(
and not params["filenames_pull"]["force_from_ftp"]
)
if specific_files:
specific_files_names = pull_ftp.reprocess_files(repo, logger, **kwargs)
specific_files_names = reprocess_files(repo, logger, **kwargs)
return specific_files_names

with sftp:
return pull_ftp.migrate_from_ftp(sftp, repo, logger, **kwargs)
return migrate_from_ftp_common(
sftp, repo, logger, publisher="elsevier", **kwargs
)

@task()
def trigger_file_processing(
repo: IRepository = ElsevierRepository(),
filenames=None,
):
return pull_ftp.trigger_file_processing(
return trigger_file_processing_elsevier(
publisher="elsevier", repo=repo, logger=logger, filenames=filenames or []
)

filenames = migrate_from_ftp()
trigger_file_processing(filenames=filenames)
archive_names = migrate_from_ftp()
trigger_file_processing(filenames=archive_names)


dag_taskflow = elsevier_pull_ftp()
Loading

0 comments on commit 306a642

Please sign in to comment.