Skip to content

Commit

Permalink
tasks: added merging task
Browse files Browse the repository at this point in the history
Signed-off-by: Antonio Cesarano <[email protected]>
Signed-off-by: Riccardo Candido <[email protected]>

* new task including all merging utils needed in the workflow
* unit and integration tests for those functions

This close #1666 and also close #2322.
  • Loading branch information
ammirate authored and kaplun committed Oct 23, 2017
1 parent e12277d commit 4fcdd9d
Show file tree
Hide file tree
Showing 8 changed files with 322 additions and 3 deletions.
8 changes: 7 additions & 1 deletion inspirehep/modules/workflows/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,11 @@


class DownloadError(WorkflowsError):

"""Error representing a failed download in a workflow."""


class MissingHeadUUIDError(WorkflowsError):
"""Error representing the missing field `head_uuid`
This error should be triggered when there is no head_uuid property
inside the extra_data for a given workflow object."""
193 changes: 193 additions & 0 deletions inspirehep/modules/workflows/tasks/merging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
# -*- coding: utf-8 -*-
#
# This file is part of INSPIRE.
# Copyright (C) 2014-2017 CERN.
#
# INSPIRE is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# INSPIRE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with INSPIRE. If not, see <http://www.gnu.org/licenses/>.
#
# In applying this license, CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization
# or submit itself to any jurisdiction.

"""Tasks related to record merging."""

from __future__ import absolute_import, division, print_function

import json

from invenio_db import db
from invenio_pidstore.resolver import PersistentIdentifier
from inspire_json_merger.inspire_json_merger import inspire_json_merge

from inspirehep.modules.records import RecordMetadata
from inspirehep.modules.records.api import InspireRecord
from inspirehep.utils.record import get_source

from inspirehep.modules.workflows.models import WorkflowsRecordSources


from inspirehep.modules.workflows.errors import MissingHeadUUIDError
from inspirehep.modules.workflows.utils import with_debug_logging


def put_matched_record_uuid_in_extra_data(obj):
"""Save in extra_data the key `head_uuid`, with the UUID of the matched
record from ElasticSearch.
"""
match_id = _get_match_recid(obj)
record_uuid = PersistentIdentifier.get('lit', match_id).object_uuid
obj.extra_data['head_uuid'] = str(record_uuid)


def get_head(obj):
"""Retrieve the head from the JSON record.
Retrieves the head JSON corresponding to the current matched record.
The head version is stored in `obj.extra_data['head_json']`.
It assumes that inside obj.extra_data there is head_uuid key populated
"""
head_uuid = obj.extra_data.get('head_uuid')
if not head_uuid:
raise MissingHeadUUIDError

entry = RecordMetadata.query.filter(
RecordMetadata.id == head_uuid
).one_or_none()

return entry.json if entry else {}


@with_debug_logging
def merge_articles(obj, eng):
"""Retrieve root, head, update and perform the merge.
- The workflow payload is overwritten by the merged record.
- The conflicts are stored in obj.extra_data['conflicts']. This variable
contains None if there aren't conflicts, otherwise a string made by
dumping a dictionary.
"""
root = get_root(obj)
put_matched_record_uuid_in_extra_data(obj)
head = get_head(obj)
obj.data, conflicts = inspire_json_merge(
root,
head,
obj.extra_data['new_root']
)

obj.extra_data['conflicts'] = [
json.loads(c.to_json()) for c in conflicts
] if conflicts else None


def put_root_in_extra_data(obj, eng):
"""Save the workflow object payload in extra_data['new_root']
to make it available later.
"""
if not obj.extra_data.get('new_root'):
obj.extra_data['new_root'] = obj.data


def update_record(obj, eng):
"""Stores the merged record in the database.
When this function is called, it assumes:
- obj.extra_data['head_uuid'] is populated
- obj.data is populated with the json merged
"""
record = InspireRecord.get_record(obj.extra_data['head_uuid'])
record.clear()
record.update(obj.data)
record.commit()
obj.save()
db.session.commit()


def get_root(obj):
"""Retrieve the root JSON.
Retrieves the root JSON corresponding to the current matched record and the
provided source.
The root version is stored in `obj.extra_data['root_json']`.
Args:
obj(WorkflowObject): the current workflow object
Return:
(dict): the json of the entry if it exists, else an empty dict
"""
source = get_source(obj.data)
rec_id = _get_match_recid(obj)
record_uuid = PersistentIdentifier.get('lit', rec_id).object_uuid
return _read_wf_record_source(record_uuid, source)


def _get_match_recid(obj):
"""Return the first matched record by `inspire-matcher`
"""
return obj.extra_data['record_matches'][0]


def _read_wf_record_source(record_uuid, source):
entry = WorkflowsRecordSources.query.filter(
WorkflowsRecordSources.record_id == record_uuid,
WorkflowsRecordSources.source == source
).one_or_none()
return entry.json if entry else {}


@with_debug_logging
def store_root(obj, eng):
"""Stores the root record in the database.
When this function is called, it assumes:
- obj.extra_data['head_uuid'] is populated
- obj.data is populated with the json merged
"""
new_root = obj.extra_data['new_root']
record_uuid = obj.extra_data['head_uuid']
source = get_source(new_root)

_insert_wf_record_source(new_root, record_uuid, source)

# this line prevent emptying obj.extra_data
obj.save()
# Commit to DB before indexing
db.session.commit()


def _insert_wf_record_source(json, record_uuid, source):
"""Stores the given json in the WorkflowRecordSource table in the db
Important: does not commit the session, in case some other operation
needs to be done before it
"""
record_source = WorkflowsRecordSources.query.filter(
WorkflowsRecordSources.record_id == record_uuid,
WorkflowsRecordSources.source == source
).one_or_none()
if record_source is None:
record_source = WorkflowsRecordSources(
source=source,
json=json,
record_id=record_uuid
)
else:
record_source.json = json
db.session.add(record_source)


def has_conflicts(obj, eng):
return obj.extra_data.get('conflicts') is not None
5 changes: 5 additions & 0 deletions inspirehep/modules/workflows/tasks/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,14 @@ def store_record(obj, *args, **kwargs):
# Create persistent identifier.
created_pid = inspire_recid_minter(str(record.id), record).pid_value

# store head_uuid to store the root later
obj.extra_data['head_uuid'] = str(record.id)

# Commit any changes to record
record.commit()

obj.save()

# Commit to DB before indexing
db.session.commit()

Expand Down
1 change: 0 additions & 1 deletion inspirehep/modules/workflows/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@

from ..models import WorkflowsAudit


LOGGER = logging.getLogger(__name__)


Expand Down
1 change: 0 additions & 1 deletion inspirehep/modules/workflows/workflows/article.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
refextract,
submission_fulltext_download,
)

from inspirehep.modules.workflows.tasks.classifier import (
classify_paper,
filter_core_keywords,
Expand Down
5 changes: 5 additions & 0 deletions inspirehep/utils/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,8 @@ def is_complete(publication_info):
if is_complete(publication_info):
return False
return had_at_least_one_journal_title


def get_source(record):
"""Return the first available source of a record."""
return get_value(record, 'acquisition_source.source')
68 changes: 68 additions & 0 deletions tests/integration/workflows/test_task_merging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# -*- coding: utf-8 -*-
#
# This file is part of INSPIRE.
# Copyright (C) 2014-2017 CERN.
#
# INSPIRE is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# INSPIRE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with INSPIRE. If not, see <http://www.gnu.org/licenses/>.
#
# In applying this license, CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization
# or submit itself to any jurisdiction.

from __future__ import absolute_import, division, print_function

import pytest

from invenio_db import db

from inspirehep.modules.records.api import InspireRecord
from inspirehep.modules.workflows.tasks.merging import (
_read_wf_record_source,
_insert_wf_record_source
)


@pytest.fixture()
def dummy_record(workflow_app):
record = InspireRecord.create({
"$schema": "http://localhost:5000/schemas/records/hep.json",
"_collections": ['Literature'],
"titles": [{
"title": "foo"
}],
"document_type": ["thesis"]
})
yield record
record._delete(force=True)


def test_wf_record_source_read_and_write(dummy_record):
record_uuid = dummy_record.id
original_root = {"title": "baz"}

_insert_wf_record_source(
json=original_root,
record_uuid=record_uuid,
source='arXiv'
)
db.session.commit()

retrieved_root = _read_wf_record_source(record_uuid=record_uuid, source='arXiv')
assert original_root == retrieved_root


def test_empty_root(dummy_record):
record_uuid = dummy_record.id
retrieved_root = _read_wf_record_source(record_uuid=record_uuid, source='Elsevier')
assert retrieved_root == {}
44 changes: 44 additions & 0 deletions tests/unit/workflows/test_workflows_tasks_merging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
#
# This file is part of INSPIRE.
# Copyright (C) 2014-2017 CERN.
#
# INSPIRE is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# INSPIRE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with INSPIRE. If not, see <http://www.gnu.org/licenses/>.
#
# In applying this license, CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization
# or submit itself to any jurisdiction.

from __future__ import absolute_import, division, print_function

from mocks import MockObj
import pytest


from inspirehep.modules.workflows.tasks.merging import get_head
from inspirehep.modules.workflows.errors import MissingHeadUUIDError


def test_get_head_empty_head_uuid():
workflow_obj = MockObj({}, {'head_uuid': ''})

with pytest.raises(MissingHeadUUIDError):
get_head(workflow_obj)


def test_get_head_no_head_uuid():
workflow_obj = MockObj({}, {})

with pytest.raises(MissingHeadUUIDError):
get_head(workflow_obj)

0 comments on commit 4fcdd9d

Please sign in to comment.