From 7ce8e1db9119e48e1dfbff39c0aae089af5f7919 Mon Sep 17 00:00:00 2001 From: ammirate Date: Fri, 15 Sep 2017 13:35:11 +0200 Subject: [PATCH] workflow: added new workflow for manual merger * define a new workflow for the manual merge use case * add new task functions specific for such use case * integration tests Signed-off-by: Antonio Cesarano --- .../workflows/actions/merge_approval.py | 14 +- .../modules/workflows/tasks/manual_merging.py | 129 ++++++++++++++++++ .../modules/workflows/workflows/__init__.py | 1 + .../workflows/workflows/manual_merge.py | 121 ++++++++++++++++ setup.py | 1 + .../fixtures/manual_merge_record.json | 87 ++++++++++++ .../fixtures/manual_merge_record_2.json | 83 +++++++++++ tests/integration/workflows/helpers/calls.py | 10 ++ .../workflows/test_manual_merge_workflow.py | 105 ++++++++++++++ 9 files changed, 550 insertions(+), 1 deletion(-) create mode 100644 inspirehep/modules/workflows/tasks/manual_merging.py create mode 100644 inspirehep/modules/workflows/workflows/manual_merge.py create mode 100644 tests/integration/workflows/fixtures/manual_merge_record.json create mode 100644 tests/integration/workflows/fixtures/manual_merge_record_2.json create mode 100644 tests/integration/workflows/test_manual_merge_workflow.py diff --git a/inspirehep/modules/workflows/actions/merge_approval.py b/inspirehep/modules/workflows/actions/merge_approval.py index 5bc410fd4c..b26c0cbc04 100644 --- a/inspirehep/modules/workflows/actions/merge_approval.py +++ b/inspirehep/modules/workflows/actions/merge_approval.py @@ -32,4 +32,16 @@ class MergeApproval(object): @staticmethod def resolve(obj, *args, **kwargs): """Resolve the action taken in the approval action.""" - pass + + obj.extra_data["approved"] = True + obj.extra_data["auto-approved"] = False + obj.remove_action() + obj.save() + + delayed = True + if obj.workflow.name == 'manual_merge': + # the manual merge wf should be sync + delayed = False + + obj.continue_workflow(delayed=delayed) + return True diff --git a/inspirehep/modules/workflows/tasks/manual_merging.py b/inspirehep/modules/workflows/tasks/manual_merging.py new file mode 100644 index 0000000000..f6917ddf6c --- /dev/null +++ b/inspirehep/modules/workflows/tasks/manual_merging.py @@ -0,0 +1,129 @@ +# -*- coding: utf-8 -*- +# +# This file is part of INSPIRE. +# Copyright (C) 2014-2017 CERN. +# +# INSPIRE is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# INSPIRE is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with INSPIRE. If not, see . +# +# In applying this license, CERN does not waive the privileges and immunities +# granted to it by virtue of its status as an Intergovernmental Organization +# or submit itself to any jurisdiction. + +"""Tasks related to manual merge generic record.""" + +from __future__ import absolute_import, division, print_function + +from invenio_db import db + +from inspire_dojson.utils import get_record_ref +from inspire_json_merger.inspire_json_merger import inspire_json_merge + +from inspirehep.modules.workflows.tasks.merging import ( + insert_wf_record_source, +) +from inspirehep.modules.workflows.utils import with_debug_logging +from inspirehep.utils.record_getter import get_db_record + + +def _get_head_and_update(obj): + head = obj.extra_data['head'] + update = obj.extra_data['update'] + return head, update + + +@with_debug_logging +def merge_records(obj, eng): + """Merge the records whose ids are defined in the `obj` parameter and store + the merged record and relative conflicts in `obj.data` and + `obj.extra_data['conflicts']`. + """ + head, update = _get_head_and_update(obj) + + obj.data, obj.extra_data['conflicts'] = inspire_json_merge( + root={}, + head=head, + update=update, + head_source=obj.extra_data['head_source'] + ) + + +@with_debug_logging +def halt_for_approval(obj, eng): + """Stop the Workflow engine""" + eng.halt( + action="merge_approval", + msg='Manual Merge halted for curator approval.' + ) + + +@with_debug_logging +def edit_metadata_and_store(obj, eng): + """Replace the `head` record with the previously merged record and updates + some reference in order to delete the `update` record, linking it to the + new `head`. + """ + + head = get_db_record('lit', obj.extra_data['head_control_number']) + update = get_db_record('lit', obj.extra_data['update_control_number']) + + head.clear() + head.update(obj.data) # head's content will be replaced by merged + update.merge(head) # update's uuid will point to head's uuid + update.delete() # mark update record as deleted + + # add schema contents to refer deleted record to the merged one + update['new_record'] = get_record_ref( + head['control_number'], + endpoint='record' + ) + _add_deleted_records(head, update) + + head.commit() + update.commit() + db.session.commit() + + +def _add_deleted_records(new_rec, deleted_rec): + """Mark `deleted_rec` as replaced by `new_rec` by adding its id to the + deleted_record list property. + """ + ref = get_record_ref(deleted_rec['control_number'], 'record') + new_rec.setdefault('deleted_records', []).append(ref) + + +def save_records_as_roots(obj, eng): + """Save `head` and `update` records in the Root table in the db if they + have different `sources, otherwise only `head` is saved. + """ + head, update = _get_head_and_update(obj) + + head_source = obj.extra_data['head_source'] + + insert_wf_record_source( + json=head, + source=head_source, + record_uuid=obj.extra_data['head_uuid'], + ) + + update_source = obj.extra_data['update_source'] + + # need to save just one root per source + if update_source != head_source: + insert_wf_record_source( + json=update, + source=update_source.lower(), + record_uuid=obj.extra_data['update_uuid'], + ) + obj.save() + db.session.commit() diff --git a/inspirehep/modules/workflows/workflows/__init__.py b/inspirehep/modules/workflows/workflows/__init__.py index 7e90703987..39b4f2e765 100644 --- a/inspirehep/modules/workflows/workflows/__init__.py +++ b/inspirehep/modules/workflows/workflows/__init__.py @@ -26,3 +26,4 @@ from .article import Article # noqa: F401 from .author import Author # noqa: F401 +from .manual_merge import ManualMerge # noqa: F401 diff --git a/inspirehep/modules/workflows/workflows/manual_merge.py b/inspirehep/modules/workflows/workflows/manual_merge.py new file mode 100644 index 0000000000..bc3589cce8 --- /dev/null +++ b/inspirehep/modules/workflows/workflows/manual_merge.py @@ -0,0 +1,121 @@ +# -*- coding: utf-8 -*- +# +# This file is part of INSPIRE. +# Copyright (C) 2014-2017 CERN. +# +# INSPIRE is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# INSPIRE is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with INSPIRE. If not, see . +# +# In applying this license, CERN does not waive the privileges and immunities +# granted to it by virtue of its status as an Intergovernmental Organization +# or submit itself to any jurisdiction. + +"""Workflow for manual merging generic records.""" + +from __future__ import absolute_import, division, print_function + +from invenio_workflows import start, workflow_object_class +from inspire_json_merger.inspire_json_merger import ( + get_head_source as merger_get_source +) + +from inspirehep.modules.workflows.tasks.manual_merging import ( + halt_for_approval, + edit_metadata_and_store, merge_records, save_records_as_roots, +) +from inspirehep.modules.workflows.tasks.merging import ( + get_head_source +) +from inspirehep.modules.workflows.errors import MergingError +from inspirehep.utils.record import get_source +from inspirehep.utils.record_getter import get_db_record, RecordGetterError + + +class ManualMerge(object): + name = 'MERGE' + data_type = '' + + workflow = ([ + merge_records, + halt_for_approval, + # when resume here, we expect the workflow_object contains the + # merged record accepted by a curator + save_records_as_roots, + edit_metadata_and_store, + ]) + + +def start_merger( + head_id, + update_id, + current_user_id=None, +): + """Start a new ManualMerge workflow to merge two records manually. + + Args: + head_id: the id of the first record to merge. This record is the one + that will be updated with the new information. + update_id: the id of the second record to merge. This record is the + one that is going to be deleted and replaced by `head`. + current_user_id: Id of the current user provided by the Flask app. + + Returns: + (int): the current workflow object's id. + """ + data = { + 'pid_type': 'lit', # TODO: support + 'recid_head': head_id, + 'recid_update': update_id, + } + head = load_record_or_exception(head_id) + update = load_record_or_exception(update_id) + + workflow_object = workflow_object_class.create( + data=None, + id_user=current_user_id, + data_type='hep' + ) + + wf_id = workflow_object.id # to retrieve it later + workflow_object.extra_data.update(data) + + # preparing identifiers in order to do less requests possible later + head_source = get_head_source(head.id) or merger_get_source(head) + + update_source = get_source(update) + update_source = update_source if update_source else 'arxiv' + + workflow_object.extra_data['head_source'] = head_source.lower() + workflow_object.extra_data['update_source'] = update_source.lower() + + workflow_object.extra_data['head_control_number'] = head_id + workflow_object.extra_data['update_control_number'] = update_id + + workflow_object.extra_data['head_uuid'] = str(head.id) + workflow_object.extra_data['update_uuid'] = str(update.id) + + workflow_object.extra_data['head'] = head + workflow_object.extra_data['update'] = update + + workflow_object.save() + + start('manual_merge', object_id=wf_id) + + return wf_id + + +def load_record_or_exception(record_uuid): + try: + return get_db_record('lit', record_uuid) + except RecordGetterError: + raise MergingError('Cannot load record {} for merging'.format(record_uuid)) diff --git a/setup.py b/setup.py index 3ec01e3fbe..87bdfad1b3 100644 --- a/setup.py +++ b/setup.py @@ -242,6 +242,7 @@ 'invenio_workflows.workflows': [ 'article = inspirehep.modules.workflows.workflows:Article', 'author = inspirehep.modules.workflows.workflows:Author', + 'manual_merge = inspirehep.modules.workflows.workflows:ManualMerge', ], 'invenio_workflows_ui.actions': [ 'author_approval = inspirehep.modules.workflows.actions.author_approval:AuthorApproval', diff --git a/tests/integration/workflows/fixtures/manual_merge_record.json b/tests/integration/workflows/fixtures/manual_merge_record.json new file mode 100644 index 0000000000..efdbeb7808 --- /dev/null +++ b/tests/integration/workflows/fixtures/manual_merge_record.json @@ -0,0 +1,87 @@ +{ + "_private_notes": [ + { + "source": "source1", + "value": "value-foo" + } + ], + "preprint_date": "2017-05-06", + "acquisition_source": { + "source": "arXiv", + "datetime": "2017-05-11T08:50:25.184741", + "method": "hepcrawl", + "submission_number": "db9325b2362611e78bfd0242ac12000b" + }, + "accelerator_experiments": [ + { + "legacy_name": "CERN-LHC-ALICE" + } + ], + "license": [ + { + "url": "http://arxiv.org/licenses/nonexclusive-distrib/1.0/", + "license": "arXiv-1.0" + } + ], + "control_number": 123456, + "public_notes": [ + { + "source": "arXiv", + "value": "21 pages, 13 figures" + } + ], + "number_of_pages": 23, + "_files": [ + { + "key": "1705.02541.pdf", + "size": 2806666 + } + ], + "inspire_categories": [ + { + "source": "arxiv", + "term": "General Physics" + } + ], + "authors": [ + { + "affiliations": [], + "full_name": "Assis, M." + }, + { + "affiliations": [], + "full_name": "Jacobsen, J.L." + }, + { + "affiliations": [], + "full_name": "Jensen, I." + } + ], + "titles": [ + { + "source": "arXiv", + "title": "OLD: Analyticity of the Ising susceptibility: An interpretation" + } + ], + "$schema": "http://localhost:5000/schemas/records/hep.json", + "_collections": ["Literature"], + "document_type": [ + "thesis" + ], + "abstracts": [ + { + "source": "arXiv", + "value": "FooBAR - Let's assume this is an old record in the system. We discuss the implications of studies of partition function zeros and equimodular curves for the analytic properties of the Ising model on a square lattice in a magnetic field. In particular we consider the dense set of singularities in the susceptibility of the Ising model at $H=0$ found by Nickel and its relation to the analyticity of the field theory computations of Fonseca and Zamolodchikov." + } + ], + "citeable": true, + "arxiv_eprints": [ + { + "categories": [ + "math-ph", + "cond-mat.stat-mech" + ], + "value": "1705.02541" + } + ] +} diff --git a/tests/integration/workflows/fixtures/manual_merge_record_2.json b/tests/integration/workflows/fixtures/manual_merge_record_2.json new file mode 100644 index 0000000000..fff5425d7c --- /dev/null +++ b/tests/integration/workflows/fixtures/manual_merge_record_2.json @@ -0,0 +1,83 @@ +{ + "_private_notes": [ + { + "source": "source1", + "value": "value1" + }, + { + "source": "source2", + "value": "value2" + } + ], + "preprint_date": "2017-05-06", + "acquisition_source": { + "source": "arXiv", + "datetime": "2017-05-11T08:50:25.184741", + "method": "hepcrawl", + "submission_number": "db9325b2362611e78bfd0242ac12000b" + }, + "accelerator_experiments": [ + { + "legacy_name": "CERN-LHC-CONFLICT" + }, + { + "legacy_name": "CERN-LHCb" + } + ], + "control_number": 654321, + "public_notes": [ + { + "source": "arXiv", + "value": "21 pages, 13 figures" + } + ], + "number_of_pages": 23, + "_files": [ + { + "key": "1705.02541.pdf", + "size": 2808888 + } + ], + "authors": [ + { + "affiliations": [], + "full_name": "Assis, M." + }, + { + "affiliations": [], + "full_name": "Jacobsen, J.L." + }, + { + "affiliations": [], + "full_name": "Jensen, I." + } + ], + "titles": [ + { + "source": "arXiv", + "title": "OLD: Analyticity of the Ising susceptibility: An interpretation" + } + ], + "$schema": "http://localhost:5000/schemas/records/hep.json", + "_collections": ["Literature"], + "document_type": [ + "book" + ], + "abstracts": [ + { + "source": "arXiv", + "value": "Let's assume this is an old record in the system. We discuss the implications of studies of partition function zeros and equimodular curves for the analytic properties of the Ising model on a square lattice in a magnetic field. In particular we consider the dense set of singularities in the susceptibility of the Ising model at $H=0$ found by Nickel and its relation to the analyticity of the field theory computations of Fonseca and Zamolodchikov." + } + ], + "citeable": true, + "arxiv_eprints": [ + { + "categories": [ + "math-ph", + "cond-mat.stat-mech", + "math.MP" + ], + "value": "1705.02541" + } + ] +} diff --git a/tests/integration/workflows/helpers/calls.py b/tests/integration/workflows/helpers/calls.py index 8ebe171d9e..05172f55c2 100644 --- a/tests/integration/workflows/helpers/calls.py +++ b/tests/integration/workflows/helpers/calls.py @@ -57,6 +57,16 @@ def do_resolve_workflow(app, workflow_id, action='accept_core'): ) +def do_resolve_manual_merge_wf(app, workflow_id): + """Solve the the given workflow's conflicts. + """ + response = do_resolve_workflow( + app=app, + workflow_id=workflow_id + ) + assert response.status_code == 200 + + def do_accept_core(app, workflow_id): """Accepts the given workflow as core. diff --git a/tests/integration/workflows/test_manual_merge_workflow.py b/tests/integration/workflows/test_manual_merge_workflow.py new file mode 100644 index 0000000000..3d1041eb14 --- /dev/null +++ b/tests/integration/workflows/test_manual_merge_workflow.py @@ -0,0 +1,105 @@ +# -*- coding: utf-8 -*- +# +# This file is part of INSPIRE. +# Copyright (C) 2014-2017 CERN. +# +# INSPIRE is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# INSPIRE is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with INSPIRE. If not, see . +# +# In applying this license, CERN does not waive the privileges and immunities +# granted to it by virtue of its status as an Intergovernmental Organization +# or submit itself to any jurisdiction. + +from __future__ import absolute_import, division, print_function + +import json +import os +import pytest + +from invenio_workflows import ObjectStatus, workflow_object_class + +from inspirehep.modules.workflows.tasks.merging import read_wf_record_source +from inspirehep.modules.workflows.workflows.manual_merge import start_merger +from inspirehep.modules.workflows.errors import MergingError +from inspirehep.utils.record import get_source +from inspirehep.utils.record_getter import get_db_record + +from calls import do_resolve_manual_merge_wf + + +def read_file(file_name): + base_dir = os.path.dirname(os.path.realpath(__file__)) + with open(os.path.join(base_dir, 'fixtures', file_name)) as f: + return json.loads(f.read()) + + +@pytest.fixture +def prepare_records(): + from inspirehep.modules.migrator.tasks import record_insert_or_replace # noqa: F401 + head = record_insert_or_replace(read_file('manual_merge_record.json')) + update = record_insert_or_replace(read_file('manual_merge_record_2.json')) + return head, update + + +def test_manual_merge_existing_records_with_conflicts(workflow_app, prepare_records): + head, update = prepare_records + head_id = head.id + update_id = update.id + + obj_id = start_merger( + head_id=head['control_number'], + update_id=update['control_number'], + current_user_id=1, + ) + + do_resolve_manual_merge_wf(workflow_app, obj_id) + + # retrieve it again, otherwise Detached Instance Error + obj = workflow_object_class.get(obj_id) + + assert obj.status == ObjectStatus.COMPLETED + assert obj.extra_data['conflicts'] != [] + assert obj.extra_data['approved'] is True + assert obj.extra_data['auto-approved'] is False + + last_root = read_wf_record_source(head_id, get_source(head)) + assert last_root.json == head + + head_source = obj.extra_data['head_source'] + update_source = obj.extra_data['update_source'] + + assert head_source == update_source + # since head and update have the same source, only head is saved as root + root_update = read_wf_record_source(update_id, update_source) + assert root_update is None + + # check that head's content has been replaced by merged + latest_record = get_db_record('lit', head['control_number']) + deleted_rec_ref = { + '$ref': 'http://localhost:5000/api/record/{}'.format( + update['control_number'] + ) + } + assert [deleted_rec_ref] == latest_record['deleted_records'] + + del latest_record['deleted_records'] + assert latest_record == obj.data # -> resulted merged record + + +def test_manual_merge_with_none_record(workflow_app): + with pytest.raises(MergingError): + start_merger( + head_id=987654, + update_id=123456789, + current_user_id=1, + )