Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support bulk operations #448

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Features
- Elasticsearch auto mapping from django models fields.
- Complex field type support (ObjectField, NestedField).
- Index fast using `parallel` indexing.
- Bulk operations support.
- Requirements

- Django >= 1.11
Expand Down
4 changes: 4 additions & 0 deletions django_elasticsearch_dsl/db/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .managers import ( # noqa
DjangoElasticsearchDslManagerMixin,
DjangoElasticsearchDslModelManager
)
123 changes: 123 additions & 0 deletions django_elasticsearch_dsl/db/managers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
from django.db import models

from .utils import get_queryset_by_ids
from ..registries import registry


class DjangoElasticsearchDslManagerMixin(object):
"""Elasticsearch DSL manager mixin for processing mass work with objects.

Performs normalization by supported types and causes updating the
search engine appropriately.

It acts similarly to a signal processor.
"""
_registry = registry

def _normalize_results(self, result):
if isinstance(result, models.Model):
return [result]
elif isinstance(result, (list, models.QuerySet)):
return result
else:
raise TypeError(
"Incorrect results type. "
"Expected 'django.db.models.Model', "
"<class 'list'> or 'django.db.models.Queryset', "
"but got %s" % type(result)
)

def _handle_save(self, result):
"""Handle save.

Given a many model instances, update the objects in the index.
Update the related objects either.
"""
results = self._normalize_results(result)

self._registry.update(results)
self._registry.update_related(results, many=True)

def _handle_pre_delete(self, result):
"""Handle removing of objects from related models instances.

We need to do this before the real delete otherwise the relation
doesn't exist anymore, and we can't get the related models instances.
"""
results = self._normalize_results(result)

self._registry.delete_related(
results,
many=True,
raise_on_error=False,
)

def _handle_delete(self, result):
"""Handle delete.

Given a many model instances, delete the objects in the index.
"""
results = self._normalize_results(result)

self._registry.delete(
results,
raise_on_error=False,
)


class DjangoElasticsearchDslModelManager(models.QuerySet,
DjangoElasticsearchDslManagerMixin):
"""Django Elasticsearch Dsl model manager.

Working with possible bulk operations, updates documents accordingly.
"""

def bulk_create(self, objs, *args, **kwargs):
"""Bulk create.

Calls `handle_save` after saving is completed
"""
result = super().bulk_create(objs, *args, **kwargs)
self._handle_save(result)
return result

def bulk_update(self, objs, *args, **kwargs):
"""Bulk update.

Calls `handle_save` after saving is completed
"""
result = super().bulk_update(objs, *args, **kwargs)
self._handle_save(objs)
return result

def update(self, **kwargs):
"""Update.

Calls `handle_save` after saving is completed
"""
ids = list(self.values_list("id", flat=True))
result = super().update(**kwargs)
if not ids:
return result
self._handle_save(get_queryset_by_ids(self.model, ids))
return result

def delete(self):
"""Delete.

Calls `handle_pre_delete` before performing the deletion.

After deleting it causes `handle_delete`.
"""
objs = get_queryset_by_ids(
self.model,
list(self.values_list("id", flat=True))
)
self._handle_pre_delete(objs)
objs = list(objs)

result = super().delete()

self._handle_delete(objs)

return result
9 changes: 9 additions & 0 deletions django_elasticsearch_dsl/db/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from typing import List

from django.db import models


def get_queryset_by_ids(model: models.Model, ids: List[int]):
return model.objects.filter(
id__in=ids
)
9 changes: 9 additions & 0 deletions django_elasticsearch_dsl/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@ def get_value_from_instance(self, instance, field_value_to_ignore=None):

if instance == field_value_to_ignore:
return None
elif isinstance(field_value_to_ignore, models.QuerySet) and \
isinstance(instance, models.Model) and \
field_value_to_ignore.contains(instance):
return None
elif isinstance(field_value_to_ignore, models.QuerySet) and \
isinstance(instance, models.QuerySet):
instance = instance.exclude(
id__in=field_value_to_ignore.values_list("id", flat=True)
)

# convert lazy object like lazy translations to string
if isinstance(instance, Promise):
Expand Down
48 changes: 42 additions & 6 deletions django_elasticsearch_dsl/registries.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from django.core.exceptions import ObjectDoesNotExist
from django.core.exceptions import ImproperlyConfigured
from django.db import models
from elasticsearch_dsl import AttrDict
from six import itervalues, iterkeys, iteritems

Expand Down Expand Up @@ -88,22 +89,47 @@ def register_document(self, document):
return document

def _get_related_doc(self, instance):
for model in self._related_models.get(instance.__class__, []):
instance_cls = self._get_cls_from_instance(instance)
for model in self._related_models.get(instance_cls, []):
for doc in self._models[model]:
if instance.__class__ in doc.django.related_models:
if instance_cls in doc.django.related_models:
yield doc

def _get_cls_from_instance(self, instance):
"""
Get class from instance.

Supports getting a class from a model, list, or queryset.
"""
if isinstance(instance, models.Model):
return instance.__class__
elif isinstance(instance, list):
return instance[0].__class__
elif isinstance(instance, models.QuerySet):
return instance.model
else:
return None

def update_related(self, instance, **kwargs):
"""
Update docs that have related_models.

The `many` flag has been introduced to handle mass updates of objects.
"""
if not DEDConfig.autosync_enabled():
return

many = kwargs.pop("many", False)
for doc in self._get_related_doc(instance):
doc_instance = doc()
try:
related = doc_instance.get_instances_from_related(instance)
if many:
related = doc_instance.get_instances_from_many_related(
self._get_cls_from_instance(instance),
instance
)
else:
related = doc_instance.get_instances_from_related(instance)
except ObjectDoesNotExist:
related = None

Expand All @@ -113,14 +139,23 @@ def update_related(self, instance, **kwargs):
def delete_related(self, instance, **kwargs):
"""
Remove `instance` from related models.

The `many` flag has been introduced to handle mass updates of objects.
"""
if not DEDConfig.autosync_enabled():
return

many = kwargs.pop("many", False)
for doc in self._get_related_doc(instance):
doc_instance = doc(related_instance_to_ignore=instance)
try:
related = doc_instance.get_instances_from_related(instance)
if many:
related = doc_instance.get_instances_from_many_related(
self._get_cls_from_instance(instance),
instance
)
else:
related = doc_instance.get_instances_from_related(instance)
except ObjectDoesNotExist:
related = None

Expand All @@ -135,8 +170,9 @@ def update(self, instance, **kwargs):
if not DEDConfig.autosync_enabled():
return

if instance.__class__ in self._models:
for doc in self._models[instance.__class__]:
instance_cls = self._get_cls_from_instance(instance)
if instance_cls in self._models:
for doc in self._models[instance_cls]:
if not doc.django.ignore_signals:
doc().update(instance, **kwargs)

Expand Down
14 changes: 11 additions & 3 deletions django_elasticsearch_dsl/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def handle_m2m_changed(self, sender, instance, action, **kwargs):
if action in ('post_add', 'post_remove', 'post_clear'):
self.handle_save(sender, instance)
elif action in ('pre_remove', 'pre_clear'):
self.handle_pre_delete(sender, instance)
self.handle_pre_delete(sender, instance, origin=kwargs['model']())

def handle_save(self, sender, instance, **kwargs):
"""Handle save.
Expand All @@ -62,15 +62,23 @@ def handle_pre_delete(self, sender, instance, **kwargs):
"""Handle removing of instance object from related models instance.
We need to do this before the real delete otherwise the relation
doesn't exists anymore and we can't get the related models instance.

Disabling distribution for deletion cases other
than deletion by entity.
"""
registry.delete_related(instance)
if isinstance(kwargs.get("origin"), models.Model):
registry.delete_related(instance)

def handle_delete(self, sender, instance, **kwargs):
"""Handle delete.

Given an individual model instance, delete the object from index.

Disabling distribution for deletion cases other
than deletion by entity.
"""
registry.delete(instance, raise_on_error=False)
if isinstance(kwargs.get("origin"), models.Model):
registry.delete(instance, raise_on_error=False)


class RealTimeSignalProcessor(BaseSignalProcessor):
Expand Down
Loading