From b7e37ddc90c675f83825b1294aa03bf618a97c43 Mon Sep 17 00:00:00 2001 From: ammirate Date: Tue, 12 Sep 2017 15:26:05 +0200 Subject: [PATCH] tasks: added merging task Signed-off-by: Antonio Cesarano Signed-off-by: Riccardo Candido * new task including all merging utils needed in the workflow * unit and integration tests for those functions This close #1666 and also close #2322. --- inspirehep/modules/workflows/errors.py | 8 +- inspirehep/modules/workflows/tasks/merging.py | 191 ++++++++++++++++++ inspirehep/modules/workflows/tasks/upload.py | 4 + inspirehep/modules/workflows/utils.py | 1 - inspirehep/utils/record.py | 5 + .../workflows/test_task_merging.py | 68 +++++++ .../workflows/test_workflows_tasks_merging.py | 44 ++++ 7 files changed, 319 insertions(+), 2 deletions(-) create mode 100644 inspirehep/modules/workflows/tasks/merging.py create mode 100644 tests/integration/workflows/test_task_merging.py create mode 100644 tests/unit/workflows/test_workflows_tasks_merging.py diff --git a/inspirehep/modules/workflows/errors.py b/inspirehep/modules/workflows/errors.py index b5ecf21865..7e706df524 100644 --- a/inspirehep/modules/workflows/errors.py +++ b/inspirehep/modules/workflows/errors.py @@ -26,5 +26,11 @@ class DownloadError(WorkflowsError): - """Error representing a failed download in a workflow.""" + + +class MissingHeadUUIDError(WorkflowsError): + """Error representing the missing field `head_uuid` + + This error should be triggered when there is no head_uuid property + inside the extra_data for a given workflow object.""" diff --git a/inspirehep/modules/workflows/tasks/merging.py b/inspirehep/modules/workflows/tasks/merging.py new file mode 100644 index 0000000000..db62705d09 --- /dev/null +++ b/inspirehep/modules/workflows/tasks/merging.py @@ -0,0 +1,191 @@ +# -*- coding: utf-8 -*- +# +# This file is part of INSPIRE. +# Copyright (C) 2014-2017 CERN. +# +# INSPIRE is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# INSPIRE is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with INSPIRE. If not, see . +# +# In applying this license, CERN does not waive the privileges and immunities +# granted to it by virtue of its status as an Intergovernmental Organization +# or submit itself to any jurisdiction. + +"""Tasks related to record merging.""" + +from __future__ import absolute_import, division, print_function + +from invenio_db import db +from invenio_pidstore.resolver import PersistentIdentifier +from inspire_json_merger.inspire_json_merger import inspire_json_merge + +from inspirehep.modules.records import RecordMetadata +from inspirehep.modules.records.api import InspireRecord +from inspirehep.utils.record import get_source + +from inspirehep.modules.workflows.models import WorkflowsRecordSources + +from inspirehep.modules.workflows.errors import MissingHeadUUIDError +from inspirehep.modules.workflows.utils import with_debug_logging + + +def put_matched_record_uuid_in_extra_data(obj): + """Retrieve the UUID from the JSON record. + + Retrieves the record UUID . + The uuid is stored in `obj.extra_data['head_uuid']`. + """ + try: + record_matched = obj.extra_data['record_matches']['records'][0] + match_id = record_matched['source']['control_number'] + except Exception as e: + raise ValueError( + 'Can not get control number from ' + 'matched record.\n{}'.format(e.message) + ) + record_uuid = PersistentIdentifier.get('lit', match_id).object_uuid + obj.extra_data['head_uuid'] = str(record_uuid) + + +def get_head(obj): + """Retrieve the head from the JSON record. + + Retrieves the head JSON corresponding to the current matched record. + The head version is stored in `obj.extra_data['head_json']`. + It assumes that inside obj.extra_data there is head_uuid key populated + """ + head_uuid = obj.extra_data.get('head_uuid') + if not head_uuid: + raise MissingHeadUUIDError + + entry = RecordMetadata.query.filter( + RecordMetadata.id == head_uuid + ).one_or_none() + + return entry.json if entry else {} + + +@with_debug_logging +def merge_articles(obj, eng): + """Retrieve root, head, update and perform the merge. + + - The workflow payload is overwritten by the merged record. + + - The conflicts are stored in obj.extra_data['conflicts']. This variable + contains None if there aren't conflicts, otherwise a string made by + dumping a dictionary. + """ + root = get_root(obj) + put_matched_record_uuid_in_extra_data(obj) + head = get_head(obj) + obj.data, conflicts = inspire_json_merge( + root, + head, + obj.extra_data['new_root'] + ) + obj.extra_data['conflicts'] = conflicts if conflicts else None + + +def put_root_in_extra_data(obj, eng): + """Save the workflow object payload in extra_data['new_root'] + to make it available later. + """ + if not obj.extra_data.get('new_root'): + obj.extra_data['new_root'] = obj.data + + +def update_record(obj, eng): + """Stores the merged record in the database. + + When this function is called, it assumes: + - obj.extra_data['head_uuid'] is populated + - obj.data is populated with the json merged + """ + record = InspireRecord.get_record(obj.extra_data['head_uuid']) + record.clear() + record.update(obj.data) + record.commit() + obj.save() + db.session.commit() + + +def get_root(obj): + """Retrieve the root JSON. + + Retrieves the root JSON corresponding to the current matched record and the + provided source. + The root version is stored in `obj.extra_data['root_json']`. + + Args: + obj(WorkflowObject): the current workflow object + + Return: + (dict): the json of the entry if it exists, else an empty dict + """ + source = get_source(obj.data) + control_number = obj.data.get('control_number') + record_uuid = PersistentIdentifier.get('lit', control_number).object_uuid + + return _read_wf_record_source(record_uuid, source) + + +def _read_wf_record_source(record_uuid, source): + entry = WorkflowsRecordSources.query.filter( + WorkflowsRecordSources.record_id == record_uuid, + WorkflowsRecordSources.source == source + ).one_or_none() + return entry.json if entry else {} + + +@with_debug_logging +def store_root(obj, eng): + """Stores the root record in the database. + + When this function is called, it assumes: + - obj.extra_data['head_uuid'] is populated + - obj.data is populated with the json merged + """ + new_root = obj.extra_data['new_root'] + record_uuid = obj.extra_data['head_uuid'] + source = get_source(new_root) + + _insert_wf_record_source(new_root, record_uuid, source) + + # this line prevent emptying obj.extra_data + obj.save() + # Commit to DB before indexing + db.session.commit() + + +def _insert_wf_record_source(json, record_uuid, source): + """Stores the given json in the WorkflowRecordSource table in the db + + Important: does not commit the session, in case some other operation + needs to be done before it + """ + record_source = WorkflowsRecordSources.query.filter( + WorkflowsRecordSources.record_id == record_uuid, + WorkflowsRecordSources.source == source + ).one_or_none() + if record_source is None: + record_source = WorkflowsRecordSources( + source=source, + json=json, + record_id=record_uuid + ) + else: + record_source.json = json + db.session.add(record_source) + + +def has_conflicts(obj, eng): + return obj.extra_data.get('conflicts') is not None diff --git a/inspirehep/modules/workflows/tasks/upload.py b/inspirehep/modules/workflows/tasks/upload.py index 5d2cbaeb49..3454377f26 100644 --- a/inspirehep/modules/workflows/tasks/upload.py +++ b/inspirehep/modules/workflows/tasks/upload.py @@ -52,11 +52,15 @@ def store_record(obj, *args, **kwargs): # Create persistent identifier. inspire_recid_minter(str(record.id), record) + # store head_uuid to store the root later + obj.extra_data['head_uuid'] = str(record.id) + # Commit any changes to record record.commit() # Dump any changes to record obj.data = record.dumps() + obj.save() # Commit to DB before indexing db.session.commit() diff --git a/inspirehep/modules/workflows/utils.py b/inspirehep/modules/workflows/utils.py index 63a7b4884f..1873f60334 100644 --- a/inspirehep/modules/workflows/utils.py +++ b/inspirehep/modules/workflows/utils.py @@ -36,7 +36,6 @@ from .models import WorkflowsAudit - LOGGER = logging.getLogger(__name__) diff --git a/inspirehep/utils/record.py b/inspirehep/utils/record.py index 54d3edb1f1..8da86fedbf 100644 --- a/inspirehep/utils/record.py +++ b/inspirehep/utils/record.py @@ -108,3 +108,8 @@ def is_complete(publication_info): if is_complete(publication_info): return False return had_at_least_one_journal_title + + +def get_source(record): + """Return the first available source of a record.""" + return get_value(record, 'acquisition_source.source') diff --git a/tests/integration/workflows/test_task_merging.py b/tests/integration/workflows/test_task_merging.py new file mode 100644 index 0000000000..ade2727163 --- /dev/null +++ b/tests/integration/workflows/test_task_merging.py @@ -0,0 +1,68 @@ +# -*- coding: utf-8 -*- +# +# This file is part of INSPIRE. +# Copyright (C) 2014-2017 CERN. +# +# INSPIRE is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# INSPIRE is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with INSPIRE. If not, see . +# +# In applying this license, CERN does not waive the privileges and immunities +# granted to it by virtue of its status as an Intergovernmental Organization +# or submit itself to any jurisdiction. + +from __future__ import absolute_import, division, print_function + +import pytest + +from invenio_db import db + +from inspirehep.modules.records.api import InspireRecord +from inspirehep.modules.workflows.tasks.merging import ( + _read_wf_record_source, + _insert_wf_record_source +) + + +@pytest.fixture() +def dummy_record(workflow_app): + record = InspireRecord.create({ + "$schema": "http://localhost:5000/schemas/records/hep.json", + "_collections": ['Literature'], + "titles": [{ + "title": "foo" + }], + "document_type": ["thesis"] + }) + yield record + record._delete(force=True) + + +def test_wf_record_source_read_and_write(dummy_record): + record_uuid = dummy_record.id + original_root = {"title": "baz"} + + _insert_wf_record_source( + json=original_root, + record_uuid=record_uuid, + source='arXiv' + ) + db.session.commit() + + retrieved_root = _read_wf_record_source(record_uuid=record_uuid, source='arXiv') + assert original_root == retrieved_root + + +def test_empty_root(dummy_record): + record_uuid = dummy_record.id + retrieved_root = _read_wf_record_source(record_uuid=record_uuid, source='Elsevier') + assert retrieved_root == {} diff --git a/tests/unit/workflows/test_workflows_tasks_merging.py b/tests/unit/workflows/test_workflows_tasks_merging.py new file mode 100644 index 0000000000..b7ea078416 --- /dev/null +++ b/tests/unit/workflows/test_workflows_tasks_merging.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +# +# This file is part of INSPIRE. +# Copyright (C) 2014-2017 CERN. +# +# INSPIRE is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# INSPIRE is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with INSPIRE. If not, see . +# +# In applying this license, CERN does not waive the privileges and immunities +# granted to it by virtue of its status as an Intergovernmental Organization +# or submit itself to any jurisdiction. + +from __future__ import absolute_import, division, print_function + +from mocks import MockObj +import pytest + + +from inspirehep.modules.workflows.tasks.merging import get_head +from inspirehep.modules.workflows.errors import MissingHeadUUIDError + + +def test_get_head_empty_head_uuid(): + workflow_obj = MockObj({}, {'head_uuid': ''}) + + with pytest.raises(MissingHeadUUIDError): + get_head(workflow_obj) + + +def test_get_head_no_head_uuid(): + workflow_obj = MockObj({}, {}) + + with pytest.raises(MissingHeadUUIDError): + get_head(workflow_obj)