Skip to content

Commit

Permalink
wip: index new groups in OpenSearch
Browse files Browse the repository at this point in the history
 * changed number of workers per celery task based on task type
 * closes inveniosoftware/invenio-app-rdm#2186
  • Loading branch information
TLGINO committed May 8, 2023
1 parent 66278b7 commit 9ad43e7
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 44 deletions.
6 changes: 6 additions & 0 deletions invenio_users_resources/records/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ def parse_user_data(user):

def parse_role_data(role):
"""Parse the role's information into a dictionary."""
print("#" * 100)
print("role")
print(role)
print("#" * 100)

data = {
"id": role.name, # due to flask security exposing user id
"name": role.description,
Expand Down Expand Up @@ -213,6 +218,7 @@ def avatar_color(self):
def role(self):
"""Cache for the associated role object."""
role = self._role

if role is None:
if role is None and self.id is not None:
role = current_datastore.find_role(self.id)
Expand Down
24 changes: 16 additions & 8 deletions invenio_users_resources/records/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,23 @@ def post_commit(sender, session):
# DB operations are allowed here, not even lazy-loading of
# properties!
sid = id(session)
for user_id in current_db_change_history.updated_users[sid]:
reindex_user.delay(user_id)

for role_id in current_db_change_history.updated_roles[sid]:
reindex_group.delay(role_id)
user_role_id_reindex = current_db_change_history.updated_users[sid]
group_roles_reindex = current_db_change_history.updated_roles[sid]
user_role_id_unindex = current_db_change_history.deleted_users[sid]
group_role_id_unindex = current_db_change_history.deleted_roles[sid]

for user_id in current_db_change_history.deleted_users[sid]:
unindex_user.delay(user_id)
for role_id in current_db_change_history.deleted_roles[sid]:
unindex_group.delay(role_id)
reindex_user.delay(user_role_id_reindex)
reindex_group.delay(group_roles_reindex)
unindex_user.delay(user_role_id_unindex)
unindex_group.delay(group_role_id_unindex)

# for role_id in current_db_change_history.updated_roles[sid]:
# reindex_group.delay(role_id)

# for user_id in current_db_change_history.deleted_users[sid]:
# unindex_user.delay(user_id)
# for role_id in current_db_change_history.deleted_roles[sid]:
# unindex_group.delay(role_id)

current_db_change_history._clear_dirty_sets(session)
3 changes: 3 additions & 0 deletions invenio_users_resources/records/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,8 @@ def model_obj(self):
"""The actual model object behind this mock model."""
if self._model_obj is None:
name = self.data.get("id")
print("#" * 100)
print(name)
print("#" * 100)
self._model_obj = current_datastore.find_role(name)
return self._model_obj
4 changes: 4 additions & 0 deletions invenio_users_resources/services/groups/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ def hits(self):
if self._links_item_tpl:
projection["links"] = self._links_item_tpl.expand(self._identity, group)

print("#" * 100)
print(group)
print("#" * 100)

yield projection

def to_dict(self):
Expand Down
34 changes: 18 additions & 16 deletions invenio_users_resources/services/groups/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,26 @@


@shared_task(ignore_result=True)
def reindex_group(role_id):
def reindex_group(role_ids):
"""Reindex the given user."""
index = current_groups_service.record_cls.index
if current_groups_service.indexer.exists(index):
try:
group_agg = GroupAggregate.get_record(role_id)
current_groups_service.indexer.index(group_agg)
except search.exceptions.ConflictError as e:
current_app.logger.warn(f"Could not reindex group {role_id}: {e}")
for role_id in role_ids:
index = current_groups_service.record_cls.index
if current_groups_service.indexer.exists(index):
try:
group_agg = GroupAggregate.get_record(role_id)
current_groups_service.indexer.index(group_agg)
except search.exceptions.ConflictError as e:
current_app.logger.warn(f"Could not reindex group {role_id}: {e}")


@shared_task(ignore_result=True)
def unindex_group(role_id):
def unindex_group(role_ids):
"""Unindex the given role/group."""
index = current_groups_service.record_cls.index
if current_groups_service.indexer.exists(index):
try:
group_agg = GroupAggregate.get_record(role_id)
current_groups_service.indexer.delete(group_agg)
except search.exceptions.ConflictError as e:
current_app.logger.warn(f"Could not unindex group {role_id}: {e}")
for role_id in role_ids:
index = current_groups_service.record_cls.index
if current_groups_service.indexer.exists(index):
try:
group_agg = GroupAggregate.get_record(role_id)
current_groups_service.indexer.delete(group_agg)
except search.exceptions.ConflictError as e:
current_app.logger.warn(f"Could not unindex group {role_id}: {e}")
42 changes: 22 additions & 20 deletions invenio_users_resources/services/users/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,30 @@


@shared_task(ignore_result=True)
def reindex_user(user_id):
def reindex_user(user_ids):
"""Reindex the given user."""
index = current_users_service.record_cls.index
if current_users_service.indexer.exists(index):
try:
user_agg = UserAggregate.get_record(user_id)
current_users_service.indexer.index(user_agg)
# trigger reindexing of related records
send_change_notifications(
"users", [(user_agg.id, str(user_agg.id), user_agg.revision_id)]
)
except search.exceptions.ConflictError as e:
current_app.logger.warn(f"Could not reindex user {user_id}: {e}")
for user_id in user_ids:
index = current_users_service.record_cls.index
if current_users_service.indexer.exists(index):
try:
user_agg = UserAggregate.get_record(user_id)
current_users_service.indexer.index(user_agg)
# trigger reindexing of related records
send_change_notifications(
"users", [(user_agg.id, str(user_agg.id), user_agg.revision_id)]
)
except search.exceptions.ConflictError as e:
current_app.logger.warn(f"Could not reindex user {user_id}: {e}")


@shared_task(ignore_result=True)
def unindex_user(user_id):
def unindex_user(user_ids):
"""Delete the given user from the index."""
index = current_users_service.record_cls.index
if current_users_service.indexer.exists(index):
try:
user_agg = UserAggregate.get_record(user_id)
current_users_service.indexer.delete(user_agg)
except search.exceptions.ConflictError as e:
current_app.logger.warn(f"Could not unindex user {user_id}: {e}")
for user_id in user_ids:
index = current_users_service.record_cls.index
if current_users_service.indexer.exists(index):
try:
user_agg = UserAggregate.get_record(user_id)
current_users_service.indexer.delete(user_agg)
except search.exceptions.ConflictError as e:
current_app.logger.warn(f"Could not unindex user {user_id}: {e}")

0 comments on commit 9ad43e7

Please sign in to comment.