Skip to content

Commit

Permalink
tasks: added merging functions. Closes inspirehep#1666 inspirehep#2322
Browse files Browse the repository at this point in the history
Signed-off-by: Antonio Cesarano <[email protected]>
Signed-off-by: Riccardo Candido <[email protected]>
  • Loading branch information
ammirate committed Sep 20, 2017
1 parent 750c30e commit 9f69a8c
Show file tree
Hide file tree
Showing 6 changed files with 314 additions and 2 deletions.
8 changes: 7 additions & 1 deletion inspirehep/modules/workflows/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,11 @@


class DownloadError(WorkflowsError):

"""Error representing a failed download in a workflow."""


class MissingHeadUUIDError(WorkflowsError):
"""Error representing the missing field `head_uuid`
This error should be triggered when there is no head_uuid property
inside the extra_data for a given workflow object."""
192 changes: 192 additions & 0 deletions inspirehep/modules/workflows/tasks/merging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
# -*- coding: utf-8 -*-
#
# This file is part of INSPIRE.
# Copyright (C) 2014-2017 CERN.
#
# INSPIRE is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# INSPIRE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with INSPIRE. If not, see <http://www.gnu.org/licenses/>.
#
# In applying this license, CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization
# or submit itself to any jurisdiction.

"""Tasks related to record merging."""

from __future__ import absolute_import, division, print_function

from invenio_db import db
from invenio_pidstore.resolver import PersistentIdentifier
from inspire_json_merger.inspire_json_merger import inspire_json_merge

from inspirehep.modules.records import RecordMetadata
from inspirehep.modules.records import InspireRecord
from inspirehep.utils.record import get_source

from inspirehep.modules.workflows.models import WorkflowsRecordSources

from inspirehep.modules.workflows.errors import MissingHeadUUIDError
from inspirehep.modules.workflows.utils import with_debug_logging


def put_matched_record_uuid_in_extra_data(obj):
"""Retrieve the UUID from the JSON record.
Retrieves the record UUID .
The uuid is stored in `obj.extra_data['head_uuid']`.
"""
try:
record_matched = obj.extra_data['record_matches']['records'][0]
match_id = record_matched['source']['control_number']
except Exception as e:
raise ValueError(
'Can not get control number from '
'matched record.\n{}'.format(e.message)
)
record_uuid = PersistentIdentifier.get('lit', match_id).object_uuid
obj.extra_data['head_uuid'] = str(record_uuid)


def get_head(obj):
"""Retrieve the head from the JSON record.
Retrieves the head JSON corresponding to the current matched record.
The head version is stored in `obj.extra_data['head_json']`.
It assumes that inside obj.extra_data there is head_uuid key populated
"""
head_uuid = obj.extra_data.get('head_uuid')
if not head_uuid:
raise MissingHeadUUIDError

entry = RecordMetadata.query.filter(
RecordMetadata.id == head_uuid
).one_or_none()

return entry.json if entry else {}


@with_debug_logging
def merge_articles(obj, eng):
"""Retrieve root, head, update and perform the merge.
- The workflow payload is overwritten by the merged record.
- The conflicts are stored in obj.extra_data['conflicts']. This variable
contains None if there aren't conflicts, otherwise a string made by
dumping a dictionary.
"""
root = get_root(obj)
put_matched_record_uuid_in_extra_data(obj)
head = get_head(obj)
obj.data, conflicts = inspire_json_merge(
root,
head,
obj.extra_data['new_root']
)
obj.extra_data['conflicts'] = conflicts if conflicts else None


def put_root_in_extra_data(obj, eng):
"""Save the workflow object payload in extra_data['new_root']
to make it available later.
"""
if not obj.extra_data.get('new_root'):
obj.extra_data['new_root'] = obj.data


def update_record(obj, eng):
"""Stores the merged record in the database.
When this function is called, it assumes:
- obj.extra_data['head_uuid'] is populated
- obj.data is populated with the json merged
"""
record = InspireRecord.get_record(obj.extra_data['head_uuid'])
record.clear()
record.update(obj.data)
record.commit()
obj.save()
db.session.commit()


def get_root(obj):
"""Retrieve the root JSON.
Retrieves the root JSON corresponding to the current matched record and the
provided source.
The root version is stored in `obj.extra_data['root_json']`.
Args:
obj(WorkflowObject): the current workflow object
Return:
(dict): the json of the entry if it exists, else an empty dict
"""
source = get_source(obj.data)
control_number = obj.data.get('control_number')
record_uuid = PersistentIdentifier.get('lit', control_number).object_uuid

return _read_wf_record_source(record_uuid, source)


def _read_wf_record_source(record_uuid, source):
entry = WorkflowsRecordSources.query.filter(
WorkflowsRecordSources.record_id == record_uuid,
WorkflowsRecordSources.source == source
).one_or_none()
return entry.json if entry else {}


@with_debug_logging
def store_root(obj, eng):
"""Stores the root record in the database.
When this function is called, it assumes:
- obj.extra_data['head_uuid'] is populated
- obj.data is populated with the json merged
"""
new_root = obj.extra_data['new_root']
record_uuid = obj.extra_data['head_uuid']
source = get_source(new_root)

_insert_wf_record_source(new_root, record_uuid, source)

# this line prevent emptying obj.extra_data
obj.save()
# Commit to DB before indexing
db.session.commit()


def _insert_wf_record_source(json, record_uuid, source):
"""
Stores the given json in the WorkflowRecordSource table in the db
Important: does not commit the session, in case some other operation
needs to be done before it
"""
record_source = WorkflowsRecordSources.query.filter(
WorkflowsRecordSources.record_id == record_uuid,
WorkflowsRecordSources.source == source
).one_or_none()
if record_source is None:
record_source = WorkflowsRecordSources(
source=source,
json=json,
record_id=record_uuid
)
else:
record_source.json = json
db.session.add(record_source)


def has_conflicts(obj, eng):
return obj.extra_data.get('conflicts') is not None
4 changes: 4 additions & 0 deletions inspirehep/modules/workflows/tasks/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,15 @@ def store_record(obj, *args, **kwargs):
# Create persistent identifier.
inspire_recid_minter(str(record.id), record)

# store head_uuid to store the root later
obj.extra_data['head_uuid'] = str(record.id)

# Commit any changes to record
record.commit()

# Dump any changes to record
obj.data = record.dumps()
obj.save()

# Commit to DB before indexing
db.session.commit()
Expand Down
1 change: 0 additions & 1 deletion inspirehep/modules/workflows/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@

from .models import WorkflowsAudit


LOGGER = logging.getLogger(__name__)


Expand Down
67 changes: 67 additions & 0 deletions tests/integration/workflows/test_workflows_merging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# -*- coding: utf-8 -*-
#
# This file is part of INSPIRE.
# Copyright (C) 2014-2017 CERN.
#
# INSPIRE is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# INSPIRE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with INSPIRE. If not, see <http://www.gnu.org/licenses/>.
#
# In applying this license, CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization
# or submit itself to any jurisdiction.

from __future__ import absolute_import, division, print_function

import pytest

from invenio_db import db

from inspirehep.modules.records.api import InspireRecord
from inspirehep.modules.workflows.tasks.merging import (
_read_wf_record_source,
_insert_wf_record_source
)


@pytest.fixture()
def dummy_record(workflow_app):
record = InspireRecord.create({
"$schema": "http://localhost:5000/schemas/records/hep.json",
"titles": [{
"title": "foo"
}],
"document_type": ["thesis"]
})
yield record
record._delete(force=True)


def test_wf_record_source_read_and_write(dummy_record):
record_uuid = dummy_record.id
original_root = {"title": "baz"}

_insert_wf_record_source(
json=original_root,
record_uuid=record_uuid,
source='arXiv'
)
db.session.commit()

retrieved_root = _read_wf_record_source(record_uuid=record_uuid, source='arXiv')
assert original_root == retrieved_root


def test_empty_root(dummy_record):
record_uuid = dummy_record.id
retrieved_root = _read_wf_record_source(record_uuid=record_uuid, source='Elsevier')
assert retrieved_root == {}
44 changes: 44 additions & 0 deletions tests/unit/workflows/test_workflows_tasks_merging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
#
# This file is part of INSPIRE.
# Copyright (C) 2014-2017 CERN.
#
# INSPIRE is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# INSPIRE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with INSPIRE. If not, see <http://www.gnu.org/licenses/>.
#
# In applying this license, CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization
# or submit itself to any jurisdiction.

from __future__ import absolute_import, division, print_function

from mocks import MockObj
import pytest


from inspirehep.modules.workflows.tasks.merging import get_head
from inspirehep.modules.workflows.errors import MissingHeadUUIDError


def test_get_head_empty_head_uuid():
workflow_obj = MockObj({}, {'head_uuid': ''})

with pytest.raises(MissingHeadUUIDError):
get_head(workflow_obj)


def test_get_head_no_head_uuid():
workflow_obj = MockObj({}, {})

with pytest.raises(MissingHeadUUIDError):
get_head(workflow_obj)

0 comments on commit 9f69a8c

Please sign in to comment.