From 9ad43e7b29cddc4e88fa2ae46432956d39096a95 Mon Sep 17 00:00:00 2001 From: Martin Lettry Date: Sat, 6 May 2023 12:00:47 +0200 Subject: [PATCH] wip: index new groups in OpenSearch * changed number of workers per celery task based on task type * closes https://github.com/inveniosoftware/invenio-app-rdm/issues/2186 --- invenio_users_resources/records/api.py | 6 +++ invenio_users_resources/records/hooks.py | 24 +++++++---- invenio_users_resources/records/models.py | 3 ++ .../services/groups/results.py | 4 ++ .../services/groups/tasks.py | 34 ++++++++------- .../services/users/tasks.py | 42 ++++++++++--------- 6 files changed, 69 insertions(+), 44 deletions(-) diff --git a/invenio_users_resources/records/api.py b/invenio_users_resources/records/api.py index 5947b86..cfe34ad 100644 --- a/invenio_users_resources/records/api.py +++ b/invenio_users_resources/records/api.py @@ -52,6 +52,11 @@ def parse_user_data(user): def parse_role_data(role): """Parse the role's information into a dictionary.""" + print("#" * 100) + print("role") + print(role) + print("#" * 100) + data = { "id": role.name, # due to flask security exposing user id "name": role.description, @@ -213,6 +218,7 @@ def avatar_color(self): def role(self): """Cache for the associated role object.""" role = self._role + if role is None: if role is None and self.id is not None: role = current_datastore.find_role(self.id) diff --git a/invenio_users_resources/records/hooks.py b/invenio_users_resources/records/hooks.py index 615d742..254dcee 100644 --- a/invenio_users_resources/records/hooks.py +++ b/invenio_users_resources/records/hooks.py @@ -52,15 +52,23 @@ def post_commit(sender, session): # DB operations are allowed here, not even lazy-loading of # properties! sid = id(session) - for user_id in current_db_change_history.updated_users[sid]: - reindex_user.delay(user_id) - for role_id in current_db_change_history.updated_roles[sid]: - reindex_group.delay(role_id) + user_role_id_reindex = current_db_change_history.updated_users[sid] + group_roles_reindex = current_db_change_history.updated_roles[sid] + user_role_id_unindex = current_db_change_history.deleted_users[sid] + group_role_id_unindex = current_db_change_history.deleted_roles[sid] - for user_id in current_db_change_history.deleted_users[sid]: - unindex_user.delay(user_id) - for role_id in current_db_change_history.deleted_roles[sid]: - unindex_group.delay(role_id) + reindex_user.delay(user_role_id_reindex) + reindex_group.delay(group_roles_reindex) + unindex_user.delay(user_role_id_unindex) + unindex_group.delay(group_role_id_unindex) + + # for role_id in current_db_change_history.updated_roles[sid]: + # reindex_group.delay(role_id) + + # for user_id in current_db_change_history.deleted_users[sid]: + # unindex_user.delay(user_id) + # for role_id in current_db_change_history.deleted_roles[sid]: + # unindex_group.delay(role_id) current_db_change_history._clear_dirty_sets(session) diff --git a/invenio_users_resources/records/models.py b/invenio_users_resources/records/models.py index 1b89a0b..80ae9ad 100644 --- a/invenio_users_resources/records/models.py +++ b/invenio_users_resources/records/models.py @@ -94,5 +94,8 @@ def model_obj(self): """The actual model object behind this mock model.""" if self._model_obj is None: name = self.data.get("id") + print("#" * 100) + print(name) + print("#" * 100) self._model_obj = current_datastore.find_role(name) return self._model_obj diff --git a/invenio_users_resources/services/groups/results.py b/invenio_users_resources/services/groups/results.py index d71f4ed..92960ea 100644 --- a/invenio_users_resources/services/groups/results.py +++ b/invenio_users_resources/services/groups/results.py @@ -110,6 +110,10 @@ def hits(self): if self._links_item_tpl: projection["links"] = self._links_item_tpl.expand(self._identity, group) + print("#" * 100) + print(group) + print("#" * 100) + yield projection def to_dict(self): diff --git a/invenio_users_resources/services/groups/tasks.py b/invenio_users_resources/services/groups/tasks.py index 78b379b..c705551 100644 --- a/invenio_users_resources/services/groups/tasks.py +++ b/invenio_users_resources/services/groups/tasks.py @@ -18,24 +18,26 @@ @shared_task(ignore_result=True) -def reindex_group(role_id): +def reindex_group(role_ids): """Reindex the given user.""" - index = current_groups_service.record_cls.index - if current_groups_service.indexer.exists(index): - try: - group_agg = GroupAggregate.get_record(role_id) - current_groups_service.indexer.index(group_agg) - except search.exceptions.ConflictError as e: - current_app.logger.warn(f"Could not reindex group {role_id}: {e}") + for role_id in role_ids: + index = current_groups_service.record_cls.index + if current_groups_service.indexer.exists(index): + try: + group_agg = GroupAggregate.get_record(role_id) + current_groups_service.indexer.index(group_agg) + except search.exceptions.ConflictError as e: + current_app.logger.warn(f"Could not reindex group {role_id}: {e}") @shared_task(ignore_result=True) -def unindex_group(role_id): +def unindex_group(role_ids): """Unindex the given role/group.""" - index = current_groups_service.record_cls.index - if current_groups_service.indexer.exists(index): - try: - group_agg = GroupAggregate.get_record(role_id) - current_groups_service.indexer.delete(group_agg) - except search.exceptions.ConflictError as e: - current_app.logger.warn(f"Could not unindex group {role_id}: {e}") + for role_id in role_ids: + index = current_groups_service.record_cls.index + if current_groups_service.indexer.exists(index): + try: + group_agg = GroupAggregate.get_record(role_id) + current_groups_service.indexer.delete(group_agg) + except search.exceptions.ConflictError as e: + current_app.logger.warn(f"Could not unindex group {role_id}: {e}") diff --git a/invenio_users_resources/services/users/tasks.py b/invenio_users_resources/services/users/tasks.py index a6be922..1e976bc 100644 --- a/invenio_users_resources/services/users/tasks.py +++ b/invenio_users_resources/services/users/tasks.py @@ -19,28 +19,30 @@ @shared_task(ignore_result=True) -def reindex_user(user_id): +def reindex_user(user_ids): """Reindex the given user.""" - index = current_users_service.record_cls.index - if current_users_service.indexer.exists(index): - try: - user_agg = UserAggregate.get_record(user_id) - current_users_service.indexer.index(user_agg) - # trigger reindexing of related records - send_change_notifications( - "users", [(user_agg.id, str(user_agg.id), user_agg.revision_id)] - ) - except search.exceptions.ConflictError as e: - current_app.logger.warn(f"Could not reindex user {user_id}: {e}") + for user_id in user_ids: + index = current_users_service.record_cls.index + if current_users_service.indexer.exists(index): + try: + user_agg = UserAggregate.get_record(user_id) + current_users_service.indexer.index(user_agg) + # trigger reindexing of related records + send_change_notifications( + "users", [(user_agg.id, str(user_agg.id), user_agg.revision_id)] + ) + except search.exceptions.ConflictError as e: + current_app.logger.warn(f"Could not reindex user {user_id}: {e}") @shared_task(ignore_result=True) -def unindex_user(user_id): +def unindex_user(user_ids): """Delete the given user from the index.""" - index = current_users_service.record_cls.index - if current_users_service.indexer.exists(index): - try: - user_agg = UserAggregate.get_record(user_id) - current_users_service.indexer.delete(user_agg) - except search.exceptions.ConflictError as e: - current_app.logger.warn(f"Could not unindex user {user_id}: {e}") + for user_id in user_ids: + index = current_users_service.record_cls.index + if current_users_service.indexer.exists(index): + try: + user_agg = UserAggregate.get_record(user_id) + current_users_service.indexer.delete(user_agg) + except search.exceptions.ConflictError as e: + current_app.logger.warn(f"Could not unindex user {user_id}: {e}")