From 01c46d599374ed1397242dddf15d2ff28143f662 Mon Sep 17 00:00:00 2001 From: Martin Lettry Date: Mon, 26 Jun 2023 11:47:18 +0200 Subject: [PATCH] indexing: index groups and users in OpenSearch * use bulk indexing instead of iterative single indexing * closes https://github.com/inveniosoftware/invenio-users-resources/issues/76 --- invenio_users_resources/records/hooks.py | 19 ++++++++------- .../services/groups/tasks.py | 14 +++++------ .../services/users/tasks.py | 24 ++++++++++++------- 3 files changed, 32 insertions(+), 25 deletions(-) diff --git a/invenio_users_resources/records/hooks.py b/invenio_users_resources/records/hooks.py index bcfd20a..71b0e03 100644 --- a/invenio_users_resources/records/hooks.py +++ b/invenio_users_resources/records/hooks.py @@ -50,15 +50,18 @@ def post_commit(sender, session): # DB operations are allowed here, not even lazy-loading of # properties! sid = id(session) + if current_db_change_history.sessions.get(sid): - for user_id in current_db_change_history.sessions[sid].updated_users: - reindex_user.delay(user_id) + user_ids_updated = list(current_db_change_history.sessions[sid].updated_users) + reindex_user.delay(user_ids_updated) + + role_ids_updated = list(current_db_change_history.sessions[sid].updated_roles) + reindex_group.delay(role_ids_updated) - for role_id in current_db_change_history.sessions[sid].updated_roles: - reindex_group.delay(role_id) + # ------------------- - for user_id in current_db_change_history.sessions[sid].deleted_users: - unindex_user.delay(user_id) + user_ids_deleted = list(current_db_change_history.sessions[sid].deleted_users) + unindex_user.delay(user_ids_deleted) - for role_id in current_db_change_history.sessions[sid].deleted_roles: - unindex_group.delay(role_id) + role_ids_deleted = list(current_db_change_history.sessions[sid].deleted_roles) + unindex_group.delay(role_ids_deleted) diff --git a/invenio_users_resources/services/groups/tasks.py b/invenio_users_resources/services/groups/tasks.py index 78b379b..f2527cc 100644 --- a/invenio_users_resources/services/groups/tasks.py +++ b/invenio_users_resources/services/groups/tasks.py @@ -18,24 +18,22 @@ @shared_task(ignore_result=True) -def reindex_group(role_id): +def reindex_group(role_ids): """Reindex the given user.""" index = current_groups_service.record_cls.index if current_groups_service.indexer.exists(index): try: - group_agg = GroupAggregate.get_record(role_id) - current_groups_service.indexer.index(group_agg) + current_groups_service.indexer.bulk_index(role_ids) except search.exceptions.ConflictError as e: - current_app.logger.warn(f"Could not reindex group {role_id}: {e}") + current_app.logger.warn(f"Could not bulk-reindex groups: {e}") @shared_task(ignore_result=True) -def unindex_group(role_id): +def unindex_group(role_ids): """Unindex the given role/group.""" index = current_groups_service.record_cls.index if current_groups_service.indexer.exists(index): try: - group_agg = GroupAggregate.get_record(role_id) - current_groups_service.indexer.delete(group_agg) + current_groups_service.indexer.bulk_delete(role_ids) except search.exceptions.ConflictError as e: - current_app.logger.warn(f"Could not unindex group {role_id}: {e}") + current_app.logger.warn(f"Could not bulk-unindex groups: {e}") diff --git a/invenio_users_resources/services/users/tasks.py b/invenio_users_resources/services/users/tasks.py index a6be922..ed83c0f 100644 --- a/invenio_users_resources/services/users/tasks.py +++ b/invenio_users_resources/services/users/tasks.py @@ -19,28 +19,34 @@ @shared_task(ignore_result=True) -def reindex_user(user_id): +def reindex_user(user_ids): """Reindex the given user.""" index = current_users_service.record_cls.index if current_users_service.indexer.exists(index): try: - user_agg = UserAggregate.get_record(user_id) - current_users_service.indexer.index(user_agg) + user_agg = { + user_id: UserAggregate.get_record(user_id) for user_id in user_ids + } + current_users_service.indexer.bulk_index(user_ids) + # trigger reindexing of related records send_change_notifications( - "users", [(user_agg.id, str(user_agg.id), user_agg.revision_id)] + "users", + [ + (user_agg[user_id].id, str(user_agg[user_id].id), user_agg[user_id].revision_id) + for user_id in user_ids + ], ) except search.exceptions.ConflictError as e: - current_app.logger.warn(f"Could not reindex user {user_id}: {e}") + current_app.logger.warn(f"Could not bulk-reindex users: {e}") @shared_task(ignore_result=True) -def unindex_user(user_id): +def unindex_user(user_ids): """Delete the given user from the index.""" index = current_users_service.record_cls.index if current_users_service.indexer.exists(index): try: - user_agg = UserAggregate.get_record(user_id) - current_users_service.indexer.delete(user_agg) + current_users_service.indexer.bulk_delete(user_ids) except search.exceptions.ConflictError as e: - current_app.logger.warn(f"Could not unindex user {user_id}: {e}") + current_app.logger.warn(f"Could not bulk-unindex users: {e}")