diff --git a/inspirehep/modules/workflows/errors.py b/inspirehep/modules/workflows/errors.py
index b5ecf21865..7e706df524 100644
--- a/inspirehep/modules/workflows/errors.py
+++ b/inspirehep/modules/workflows/errors.py
@@ -26,5 +26,11 @@
class DownloadError(WorkflowsError):
-
"""Error representing a failed download in a workflow."""
+
+
+class MissingHeadUUIDError(WorkflowsError):
+ """Error representing the missing field `head_uuid`
+
+ This error should be triggered when there is no head_uuid property
+ inside the extra_data for a given workflow object."""
diff --git a/inspirehep/modules/workflows/tasks/merging.py b/inspirehep/modules/workflows/tasks/merging.py
new file mode 100644
index 0000000000..e3cb5f6a85
--- /dev/null
+++ b/inspirehep/modules/workflows/tasks/merging.py
@@ -0,0 +1,192 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of INSPIRE.
+# Copyright (C) 2014-2017 CERN.
+#
+# INSPIRE is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# INSPIRE is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with INSPIRE. If not, see .
+#
+# In applying this license, CERN does not waive the privileges and immunities
+# granted to it by virtue of its status as an Intergovernmental Organization
+# or submit itself to any jurisdiction.
+
+"""Tasks related to record merging."""
+
+from __future__ import absolute_import, division, print_function
+
+from invenio_db import db
+from invenio_pidstore.resolver import PersistentIdentifier
+from inspire_json_merger.inspire_json_merger import inspire_json_merge
+
+from inspirehep.modules.records import RecordMetadata
+from inspirehep.modules.records.api import InspireRecord
+from inspirehep.utils.record import get_source
+
+from inspirehep.modules.workflows.models import WorkflowsRecordSources
+
+from inspirehep.modules.workflows.errors import MissingHeadUUIDError
+from inspirehep.modules.workflows.utils import with_debug_logging
+
+
+def put_matched_record_uuid_in_extra_data(obj):
+ """Retrieve the UUID from the JSON record.
+
+ Retrieves the record UUID .
+ The uuid is stored in `obj.extra_data['head_uuid']`.
+ """
+ try:
+ record_matched = obj.extra_data['record_matches']['records'][0]
+ match_id = record_matched['source']['control_number']
+ except Exception as e:
+ raise ValueError(
+ 'Can not get control number from '
+ 'matched record.\n{}'.format(e.message)
+ )
+ record_uuid = PersistentIdentifier.get('lit', match_id).object_uuid
+ obj.extra_data['head_uuid'] = str(record_uuid)
+
+
+def get_head(obj):
+ """Retrieve the head from the JSON record.
+
+ Retrieves the head JSON corresponding to the current matched record.
+ The head version is stored in `obj.extra_data['head_json']`.
+ It assumes that inside obj.extra_data there is head_uuid key populated
+ """
+ head_uuid = obj.extra_data.get('head_uuid')
+ if not head_uuid:
+ raise MissingHeadUUIDError
+
+ entry = RecordMetadata.query.filter(
+ RecordMetadata.id == head_uuid
+ ).one_or_none()
+
+ return entry.json if entry else {}
+
+
+@with_debug_logging
+def merge_articles(obj, eng):
+ """Retrieve root, head, update and perform the merge.
+
+ - The workflow payload is overwritten by the merged record.
+
+ - The conflicts are stored in obj.extra_data['conflicts']. This variable
+ contains None if there aren't conflicts, otherwise a string made by
+ dumping a dictionary.
+ """
+ root = get_root(obj)
+ put_matched_record_uuid_in_extra_data(obj)
+ head = get_head(obj)
+ obj.data, conflicts = inspire_json_merge(
+ root,
+ head,
+ obj.extra_data['new_root']
+ )
+ obj.extra_data['conflicts'] = conflicts if conflicts else None
+
+
+def put_root_in_extra_data(obj, eng):
+ """Save the workflow object payload in extra_data['new_root']
+ to make it available later.
+ """
+ if not obj.extra_data.get('new_root'):
+ obj.extra_data['new_root'] = obj.data
+
+
+def update_record(obj, eng):
+ """Stores the merged record in the database.
+
+ When this function is called, it assumes:
+ - obj.extra_data['head_uuid'] is populated
+ - obj.data is populated with the json merged
+ """
+ record = InspireRecord.get_record(obj.extra_data['head_uuid'])
+ record.clear()
+ record.update(obj.data)
+ record.commit()
+ obj.save()
+ db.session.commit()
+
+
+def get_root(obj):
+ """Retrieve the root JSON.
+
+ Retrieves the root JSON corresponding to the current matched record and the
+ provided source.
+ The root version is stored in `obj.extra_data['root_json']`.
+
+ Args:
+ obj(WorkflowObject): the current workflow object
+
+ Return:
+ (dict): the json of the entry if it exists, else an empty dict
+ """
+ source = get_source(obj.data)
+ control_number = obj.data.get('control_number')
+ record_uuid = PersistentIdentifier.get('lit', control_number).object_uuid
+
+ return _read_wf_record_source(record_uuid, source)
+
+
+def _read_wf_record_source(record_uuid, source):
+ entry = WorkflowsRecordSources.query.filter(
+ WorkflowsRecordSources.record_id == record_uuid,
+ WorkflowsRecordSources.source == source
+ ).one_or_none()
+ return entry.json if entry else {}
+
+
+@with_debug_logging
+def store_root(obj, eng):
+ """Stores the root record in the database.
+
+ When this function is called, it assumes:
+ - obj.extra_data['head_uuid'] is populated
+ - obj.data is populated with the json merged
+ """
+ new_root = obj.extra_data['new_root']
+ record_uuid = obj.extra_data['head_uuid']
+ source = get_source(new_root)
+
+ _insert_wf_record_source(new_root, record_uuid, source)
+
+ # this line prevent emptying obj.extra_data
+ obj.save()
+ # Commit to DB before indexing
+ db.session.commit()
+
+
+def _insert_wf_record_source(json, record_uuid, source):
+ """
+ Stores the given json in the WorkflowRecordSource table in the db
+
+ Important: does not commit the session, in case some other operation
+ needs to be done before it
+ """
+ record_source = WorkflowsRecordSources.query.filter(
+ WorkflowsRecordSources.record_id == record_uuid,
+ WorkflowsRecordSources.source == source
+ ).one_or_none()
+ if record_source is None:
+ record_source = WorkflowsRecordSources(
+ source=source,
+ json=json,
+ record_id=record_uuid
+ )
+ else:
+ record_source.json = json
+ db.session.add(record_source)
+
+
+def has_conflicts(obj, eng):
+ return obj.extra_data.get('conflicts') is not None
diff --git a/inspirehep/modules/workflows/tasks/upload.py b/inspirehep/modules/workflows/tasks/upload.py
index 5d2cbaeb49..3454377f26 100644
--- a/inspirehep/modules/workflows/tasks/upload.py
+++ b/inspirehep/modules/workflows/tasks/upload.py
@@ -52,11 +52,15 @@ def store_record(obj, *args, **kwargs):
# Create persistent identifier.
inspire_recid_minter(str(record.id), record)
+ # store head_uuid to store the root later
+ obj.extra_data['head_uuid'] = str(record.id)
+
# Commit any changes to record
record.commit()
# Dump any changes to record
obj.data = record.dumps()
+ obj.save()
# Commit to DB before indexing
db.session.commit()
diff --git a/inspirehep/modules/workflows/utils.py b/inspirehep/modules/workflows/utils.py
index 63a7b4884f..1873f60334 100644
--- a/inspirehep/modules/workflows/utils.py
+++ b/inspirehep/modules/workflows/utils.py
@@ -36,7 +36,6 @@
from .models import WorkflowsAudit
-
LOGGER = logging.getLogger(__name__)
diff --git a/inspirehep/utils/record.py b/inspirehep/utils/record.py
index 54d3edb1f1..8da86fedbf 100644
--- a/inspirehep/utils/record.py
+++ b/inspirehep/utils/record.py
@@ -108,3 +108,8 @@ def is_complete(publication_info):
if is_complete(publication_info):
return False
return had_at_least_one_journal_title
+
+
+def get_source(record):
+ """Return the first available source of a record."""
+ return get_value(record, 'acquisition_source.source')
diff --git a/tests/integration/workflows/test_workflows_merging.py b/tests/integration/workflows/test_workflows_merging.py
new file mode 100644
index 0000000000..1caf8d534c
--- /dev/null
+++ b/tests/integration/workflows/test_workflows_merging.py
@@ -0,0 +1,67 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of INSPIRE.
+# Copyright (C) 2014-2017 CERN.
+#
+# INSPIRE is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# INSPIRE is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with INSPIRE. If not, see .
+#
+# In applying this license, CERN does not waive the privileges and immunities
+# granted to it by virtue of its status as an Intergovernmental Organization
+# or submit itself to any jurisdiction.
+
+from __future__ import absolute_import, division, print_function
+
+import pytest
+
+from invenio_db import db
+
+from inspirehep.modules.records.api import InspireRecord
+from inspirehep.modules.workflows.tasks.merging import (
+ _read_wf_record_source,
+ _insert_wf_record_source
+)
+
+
+@pytest.fixture()
+def dummy_record(workflow_app):
+ record = InspireRecord.create({
+ "$schema": "http://localhost:5000/schemas/records/hep.json",
+ "titles": [{
+ "title": "foo"
+ }],
+ "document_type": ["thesis"]
+ })
+ yield record
+ record._delete(force=True)
+
+
+def test_wf_record_source_read_and_write(dummy_record):
+ record_uuid = dummy_record.id
+ original_root = {"title": "baz"}
+
+ _insert_wf_record_source(
+ json=original_root,
+ record_uuid=record_uuid,
+ source='arXiv'
+ )
+ db.session.commit()
+
+ retrieved_root = _read_wf_record_source(record_uuid=record_uuid, source='arXiv')
+ assert original_root == retrieved_root
+
+
+def test_empty_root(dummy_record):
+ record_uuid = dummy_record.id
+ retrieved_root = _read_wf_record_source(record_uuid=record_uuid, source='Elsevier')
+ assert retrieved_root == {}
diff --git a/tests/unit/workflows/test_workflows_tasks_merging.py b/tests/unit/workflows/test_workflows_tasks_merging.py
new file mode 100644
index 0000000000..b7ea078416
--- /dev/null
+++ b/tests/unit/workflows/test_workflows_tasks_merging.py
@@ -0,0 +1,44 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of INSPIRE.
+# Copyright (C) 2014-2017 CERN.
+#
+# INSPIRE is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# INSPIRE is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with INSPIRE. If not, see .
+#
+# In applying this license, CERN does not waive the privileges and immunities
+# granted to it by virtue of its status as an Intergovernmental Organization
+# or submit itself to any jurisdiction.
+
+from __future__ import absolute_import, division, print_function
+
+from mocks import MockObj
+import pytest
+
+
+from inspirehep.modules.workflows.tasks.merging import get_head
+from inspirehep.modules.workflows.errors import MissingHeadUUIDError
+
+
+def test_get_head_empty_head_uuid():
+ workflow_obj = MockObj({}, {'head_uuid': ''})
+
+ with pytest.raises(MissingHeadUUIDError):
+ get_head(workflow_obj)
+
+
+def test_get_head_no_head_uuid():
+ workflow_obj = MockObj({}, {})
+
+ with pytest.raises(MissingHeadUUIDError):
+ get_head(workflow_obj)