Skip to content

Commit

Permalink
Merge pull request #3005 from jacquerie/integrate-pr-2904
Browse files Browse the repository at this point in the history
workflows: add manual_merge workflow
  • Loading branch information
jacquerie authored Nov 29, 2017
2 parents 97cf582 + 5b1a7e2 commit 8d9e7dc
Show file tree
Hide file tree
Showing 19 changed files with 875 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# -*- coding: utf-8 -*-
#
# This file is part of INSPIRE.
# Copyright (C) 2014-2017 CERN.
#
# INSPIRE is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# INSPIRE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with INSPIRE. If not, see <http://www.gnu.org/licenses/>.
#
# In applying this license, CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization
# or submit itself to any jurisdiction.

"""Create the ``workflow_record_sources`` table."""

from __future__ import absolute_import, division, print_function

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
from sqlalchemy_utils.types import JSONType, UUIDType


revision = 'cb5153afd839'
down_revision = 'fddb3cfe7a9c'
branch_labels = ()
depends_on = '862037093962'


def upgrade():
"""Upgrade database."""
op.create_table(
'workflows_record_sources',
sa.Column(
'source',
sa.Text,
default='',
nullable=False,
),
sa.Column(
'record_id',
UUIDType,
sa.ForeignKey('records_metadata.id', ondelete='CASCADE'),
nullable=False,
),
sa.PrimaryKeyConstraint('record_id', 'source'),
sa.Column(
'json',
JSONType().with_variant(
postgresql.JSON(none_as_null=True),
'postgresql',
),
default=lambda: dict(),
),
)


def downgrade():
"""Downgrade database."""
op.drop_table('workflows_record_sources')
25 changes: 25 additions & 0 deletions inspirehep/modules/editor/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

from inspirehep.modules.pidstore.utils import get_pid_type_from_endpoint
from inspirehep.modules.tools import authorlist
from inspirehep.modules.workflows.workflows.manual_merge import start_merger
from inspirehep.utils.record_getter import get_db_record
from inspirehep.utils.references import (
get_refextract_kbs_path,
Expand Down Expand Up @@ -216,6 +217,30 @@ def get_rt_queues():
return jsonify(tickets.get_queues())


@blueprint.route('/manual_merge', methods=['POST'])
@editor_use_api_permission.require(http_exception=403)
def manual_merge():
"""Start a manual merge workflow on two records.
Todo:
The following two assertions must be replaced with proper access
control checks, as currently any curator who has access to the
editor API can merge any two records, even if they are not among
those who can see or edit them.
"""
assert request.json['head_recid']
assert request.json['update_recid']

workflow_object_id = start_merger(
request.json['head_recid'],
request.json['update_recid'],
current_user.get_id(),
)

return jsonify(workflow_object_id=workflow_object_id)


def _simplify_ticket_response(ticket):
return dict(
id=ticket['Id'],
Expand Down
12 changes: 11 additions & 1 deletion inspirehep/modules/workflows/actions/merge_approval.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,14 @@ class MergeApproval(object):
@staticmethod
def resolve(obj, *args, **kwargs):
"""Resolve the action taken in the approval action."""
pass
obj.extra_data['approved'] = True
obj.extra_data['auto-approved'] = False
obj.remove_action()
obj.save()

delayed = True
if obj.workflow.name == 'manual_merge':
delayed = False

obj.continue_workflow(delayed=delayed)
return True
5 changes: 5 additions & 0 deletions inspirehep/modules/workflows/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,8 @@
class DownloadError(WorkflowsError):

"""Error representing a failed download in a workflow."""


class MergeError(WorkflowsError):

"""Error representing a failed merge in a workflow."""
29 changes: 29 additions & 0 deletions inspirehep/modules/workflows/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

from datetime import datetime

from sqlalchemy.dialects import postgresql
from sqlalchemy_utils.types import JSONType, UUIDType

from invenio_db import db


Expand Down Expand Up @@ -76,3 +79,29 @@ class WorkflowsPendingRecord(db.Model):
nullable=False,
)
record_id = db.Column(db.Integer, nullable=False)


class WorkflowsRecordSources(db.Model):

__tablename__ = 'workflows_record_sources'
__table_args__ = (
db.PrimaryKeyConstraint('record_id', 'source'),
)

source = db.Column(
db.Text,
default='',
nullable=False,
)
record_id = db.Column(
UUIDType,
db.ForeignKey('records_metadata.id', ondelete='CASCADE'),
nullable=False,
)
json = db.Column(
JSONType().with_variant(
postgresql.JSON(none_as_null=True),
'postgresql',
),
default=lambda: dict(),
)
171 changes: 171 additions & 0 deletions inspirehep/modules/workflows/tasks/manual_merging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
# -*- coding: utf-8 -*-
#
# This file is part of INSPIRE.
# Copyright (C) 2014-2017 CERN.
#
# INSPIRE is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# INSPIRE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with INSPIRE. If not, see <http://www.gnu.org/licenses/>.
#
# In applying this license, CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization
# or submit itself to any jurisdiction.

"""Tasks related to manual merging."""

from __future__ import absolute_import, division, print_function

import json

from invenio_db import db

from inspire_dojson.utils import get_record_ref
from inspire_json_merger.api import merge
from inspirehep.modules.workflows.utils import insert_wf_record_source
from inspirehep.modules.workflows.utils import with_debug_logging
from inspirehep.utils.record_getter import get_db_record


@with_debug_logging
def merge_records(obj, eng):
"""Perform a manual merge.
Merges two records stored in the workflow object as the content of the
``head`` and ``update`` keys, and stores the result in ``obj.data``.
Also stores the eventual conflicts in ``obj.extra_data['conflicts']``.
Because this is a manual merge we assume that the two records have no
common ancestor, so ``root`` is the empty dictionary.
Args:
obj: a workflow object.
eng: a workflow engine.
Returns:
None
"""
head, update = obj.extra_data['head'], obj.extra_data['update']
head_source = obj.extra_data['head_source']

merged, conflicts = merge(
root={},
head=head,
update=update,
head_source=head_source,
)

obj.data = merged
obj.extra_data['conflicts'] = [json.loads(el.to_json()) for el in conflicts]
obj.save()


@with_debug_logging
def halt_for_merge_approval(obj, eng):
"""Wait for curator approval.
Pauses the workflow using the ``merge_approval`` action, which is resolved
whenever the curator says that the conflicts have been solved.
Args:
obj: a workflow object.
eng: a workflow engine.
Returns:
None
"""
eng.halt(
action='merge_approval',
msg='Manual Merge halted for curator approval.',
)


@with_debug_logging
def save_roots(obj, eng):
"""Save ``head`` and ``update`` in the ``WorkflowRecordSources`` table.
Note:
The ``update`` is saved if and only if they have different sources,
because there can only be one root per source for a given record.
Args:
obj: a workflow object.
eng: a workflow engine.
Returns:
None
"""
head, update = obj.extra_data['head'], obj.extra_data['update']
head_source, update_source = obj.extra_data['head_source'], obj.extra_data['update_source']
head_uuid, update_uuid = obj.extra_data['head_uuid'], obj.extra_data['update_uuid']

obj.save() # XXX: otherwise obj.extra_data will be wiped by a db session commit below.

if update_source != head_source:
insert_wf_record_source(
json=update,
record_uuid=update_uuid,
source=update_source,
)

insert_wf_record_source(
json=head,
record_uuid=head_uuid,
source=head_source,
)


@with_debug_logging
def store_records(obj, eng):
"""Store the records involved in the manual merge.
Performs the following steps:
1. Updates the ``head`` so that it contains the result of the merge.
2. Marks the ``update`` as merged with the ``head`` and deletes it.
3. Populates the ``deleted_records`` and ``new_record`` keys in,
respectively, ``head`` and ``update`` so that they contain a JSON
reference to each other.
Todo:
The last step should be performed by the ``merge`` method itself.
Args:
obj: a workflow object.
eng: a workflow engine.
Returns:
None
"""
head_control_number = obj.extra_data['head_control_number']
update_control_number = obj.extra_data['update_control_number']

head = get_db_record('lit', head_control_number)
update = get_db_record('lit', update_control_number)

# 1. Updates the head so that it contains the result of the merge.
head.clear()
head.update(obj.data)
# 2. Marks the update as merged with the head and deletes it.
update.merge(head)
update.delete()
# 3. Populates the deleted_records and new_record keys.
update['new_record'] = get_record_ref(head_control_number, 'literature')
update_ref = get_record_ref(update_control_number, 'literature')
head.setdefault('deleted_records', []).append(update_ref)

head.commit()
update.commit()
db.session.commit()
50 changes: 50 additions & 0 deletions inspirehep/modules/workflows/tasks/merging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# -*- coding: utf-8 -*-
#
# This file is part of INSPIRE.
# Copyright (C) 2014-2017 CERN.
#
# INSPIRE is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# INSPIRE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with INSPIRE. If not, see <http://www.gnu.org/licenses/>.
#
# In applying this license, CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization
# or submit itself to any jurisdiction.

"""Tasks related to record merging."""

from __future__ import absolute_import, division, print_function

from inspirehep.modules.workflows.models import WorkflowsRecordSources


def get_head_source(head_uuid):
"""Return the right source for the record having uuid=``uuid``.
Args:
head_uuid(string): the uuid of the record to get the source
Return:
(string):
* ``publisher`` if there is at least a non arxiv root
* ``arxiv`` if there are no publisher roots and an arxiv root
* None if there are no root records
"""
roots_sources = set(
r.source for r in
WorkflowsRecordSources.query.filter_by(record_id=head_uuid).all()
)

if not roots_sources:
return None

return 'arxiv' if 'arxiv' in roots_sources else 'publisher'
Loading

0 comments on commit 8d9e7dc

Please sign in to comment.