From 190a8d9228b67bddd14b80eb8488c6325865ebee Mon Sep 17 00:00:00 2001 From: Dimitris Theodorou <dimitris.theodorou@gmail.com> Date: Sun, 9 Nov 2014 23:54:20 +0100 Subject: [PATCH 01/11] failing test --- tests/test_insert.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/tests/test_insert.py b/tests/test_insert.py index 12b0bf04..aa16537b 100644 --- a/tests/test_insert.py +++ b/tests/test_insert.py @@ -1,5 +1,5 @@ import sqlalchemy as sa -from sqlalchemy_continuum import count_versions, versioning_manager +from sqlalchemy_continuum import count_versions, versioning_manager, Operation from tests import TestCase @@ -39,6 +39,18 @@ def test_multiple_consecutive_flushes(self): assert article.versions.count() == 1 assert article2.versions.count() == 1 + def test_multiple_flushes_store_operation_type(self): + """Test that after multiple flushes that affect a newly created object, + the insert operation type is commited + """ + article = self.Article(name=u"Article name") + self.session.add(article) + self.session.flush() + article.name = u"Changed my mind" + self.session.commit() + assert article.versions.count() == 1 + assert article.versions[0].operation_type == Operation.INSERT + class TestInsertWithDeferredColumn(TestCase): def create_models(self): From 091d1b7bcc11dc8808fc1e367b0282b000cc1143 Mon Sep 17 00:00:00 2001 From: Dimitris Theodorou <dimitris.theodorou@gmail.com> Date: Sun, 9 Nov 2014 23:55:13 +0100 Subject: [PATCH 02/11] Fix erroneous set of Operation.UPDATE to newly created items --- sqlalchemy_continuum/operation.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/sqlalchemy_continuum/operation.py b/sqlalchemy_continuum/operation.py index 72c72b2d..7cf45240 100644 --- a/sqlalchemy_continuum/operation.py +++ b/sqlalchemy_continuum/operation.py @@ -98,7 +98,17 @@ def add_update(self, target): del state_copy[rel_key] if state_copy: - self.add(Operation(target, Operation.UPDATE)) + key = self.format_key(target) + # if the object has already been added with an INSERT, + # then this is a modification within the same transaction and + # this is still an INSERT + if (target in self and + self[key].type == Operation.INSERT): + operation = Operation.INSERT + else: + operation = Operation.UPDATE + + self.add(Operation(target, operation)) def add_delete(self, target): - self.add(Operation(target, Operation.DELETE)) + self.add(Operation(target, Operation.DELETE)) \ No newline at end of file From a72ac1ecdb8de517379088ee1610757044ae13e6 Mon Sep 17 00:00:00 2001 From: Dimitris Theodorou <dimitris.theodorou@gmail.com> Date: Sun, 9 Nov 2014 23:56:30 +0100 Subject: [PATCH 03/11] ignore pycharm files --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index a015a2d6..9b3348a9 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,6 @@ nosetests.xml .mr.developer.cfg .project .pydevproject + +#Pycharm +.idea \ No newline at end of file From 82341e91c540b8f50c65dceb8e46b5aa1fdba8b9 Mon Sep 17 00:00:00 2001 From: Dimitris Theodorou <dimitris.theodorou@gmail.com> Date: Mon, 10 Nov 2014 00:59:53 +0100 Subject: [PATCH 04/11] Tests for PK modifications within the same transaction Added tests on behavior when primary keys are modified within the same transaction but after a flush() --- tests/test_delete.py | 21 +++++++++++++++++++++ tests/test_insert.py | 25 ++++++++++++++++++++++--- tests/test_update.py | 22 ++++++++++++++++++++++ 3 files changed, 65 insertions(+), 3 deletions(-) diff --git a/tests/test_delete.py b/tests/test_delete.py index 3934d2d7..efc4f628 100644 --- a/tests/test_delete.py +++ b/tests/test_delete.py @@ -1,4 +1,6 @@ import sqlalchemy as sa +from sqlalchemy_continuum import Operation, version_class + from tests import TestCase @@ -25,6 +27,25 @@ def test_creates_versions_on_delete(self): assert versions[1].name == u'Some article' assert versions[1].content == u'Some content' + def test_modify_primary_key(self): + """Test that modifying the primary key within the same transaction + maintains correct delete behavior""" + article = self.Article(name=u'Article name') + self.session.add(article) + self.session.commit() + + article.name = u'Second name' + self.session.flush() + article.id += 1 + self.session.delete(article) + self.session.commit() + + ArticleVersion = version_class(self.Article) + versions_q = self.session.query(ArticleVersion)\ + .order_by(ArticleVersion.transaction_id) + assert versions_q.count() == 2 + assert versions_q[1].operation_type == Operation.DELETE + class TestDeleteWithDeferredColumn(TestCase): def create_models(self): diff --git a/tests/test_insert.py b/tests/test_insert.py index aa16537b..b12fdd3d 100644 --- a/tests/test_insert.py +++ b/tests/test_insert.py @@ -1,5 +1,6 @@ import sqlalchemy as sa -from sqlalchemy_continuum import count_versions, versioning_manager, Operation +from sqlalchemy_continuum import count_versions, versioning_manager, \ + Operation, version_class from tests import TestCase @@ -43,14 +44,32 @@ def test_multiple_flushes_store_operation_type(self): """Test that after multiple flushes that affect a newly created object, the insert operation type is commited """ - article = self.Article(name=u"Article name") + article = self.Article(name=u'Article name') self.session.add(article) self.session.flush() - article.name = u"Changed my mind" + article.name = u'Changed my mind' self.session.commit() assert article.versions.count() == 1 assert article.versions[0].operation_type == Operation.INSERT + def test_modify_primary_key(self): + """Test that modifying the primary key within the insert transaction + maintains correct insert behavior""" + article = self.Article(name=u'Article name') + self.session.add(article) + self.session.flush() + article.id += 1 + self.session.commit() + assert article.versions.count() == 1 + assert article.versions[-1].operation_type == Operation.INSERT + + # also check that no additional article versions have leaked... + ArticleVersion = version_class(self.Article) + versions_query = self.session.query(ArticleVersion)\ + .order_by(ArticleVersion.transaction_id) + assert versions_query.count() == 1 + assert versions_query[0].operation_type == Operation.INSERT + class TestInsertWithDeferredColumn(TestCase): def create_models(self): diff --git a/tests/test_update.py b/tests/test_update.py index 29e0e53c..e8bb2b99 100644 --- a/tests/test_update.py +++ b/tests/test_update.py @@ -1,4 +1,6 @@ import sqlalchemy as sa +from sqlalchemy_continuum import Operation, version_class + from tests import TestCase @@ -76,6 +78,26 @@ def test_multiple_updates_within_same_transaction(self): assert version.name == u'Some article' assert version.content == u'Updated content 2' + def test_modify_primary_key(self): + """Test that modifying the primary key within the same transaction + maintains correct update behavior""" + article = self.Article(name=u'Article name') + self.session.add(article) + self.session.commit() + + article.name = u'Second name' + self.session.flush() + article.id += 1 + self.session.commit() + + assert article.versions.count() == 1 + + ArticleVersion = version_class(self.Article) + versions_q = self.session.query(ArticleVersion)\ + .order_by(ArticleVersion.transaction_id) + assert versions_q.count() == 2 + assert versions_q[1].operation_type == Operation.UPDATE + class TestUpdateWithDefaultValues(TestCase): def create_models(self): From 8227d02a8ffa2e4676ec6a9f9ea18989ebfe3ed4 Mon Sep 17 00:00:00 2001 From: Dimitris Theodorou <dimitris.theodorou@gmail.com> Date: Mon, 10 Nov 2014 01:56:19 +0100 Subject: [PATCH 05/11] Update internal dict keys when primary keys are modified mid-transaction Both operations and unit of work internal dicts use PKs to point to stored objects. If the PKs change mid-transaction, keys are now updated to the new correct values. --- sqlalchemy_continuum/operation.py | 21 +++++++++++++++++-- sqlalchemy_continuum/unit_of_work.py | 30 ++++++++++++++++++++++------ sqlalchemy_continuum/utils.py | 17 ++++++++++++++++ 3 files changed, 60 insertions(+), 8 deletions(-) diff --git a/sqlalchemy_continuum/operation.py b/sqlalchemy_continuum/operation.py index 7cf45240..0dbaa902 100644 --- a/sqlalchemy_continuum/operation.py +++ b/sqlalchemy_continuum/operation.py @@ -6,8 +6,9 @@ import six import sqlalchemy as sa -from sqlalchemy_utils import identity +from sqlalchemy_utils import identity, get_primary_keys, has_changes +from .utils import commited_identity class Operation(object): INSERT = 0 @@ -98,6 +99,7 @@ def add_update(self, target): del state_copy[rel_key] if state_copy: + self._sanitize_keys(target) key = self.format_key(target) # if the object has already been added with an INSERT, # then this is a modification within the same transaction and @@ -111,4 +113,19 @@ def add_update(self, target): self.add(Operation(target, operation)) def add_delete(self, target): - self.add(Operation(target, Operation.DELETE)) \ No newline at end of file + self.add(Operation(target, Operation.DELETE)) + + def _sanitize_keys(self, target): + """The operations key for target may not be valid if this target is in + `self.objects` but its primary key has been modified. Check against that + and update the key. + """ + key = self.format_key(target) + mapper = sa.inspect(target).mapper + for pk in mapper.primary_key: + if has_changes(target, mapper.get_property_by_column(pk).key): + old_key = target.__class__, commited_identity(target) + if old_key in self.objects: + # replace old key with the new one + self.objects[key] = self.objects.pop(old_key) + break \ No newline at end of file diff --git a/sqlalchemy_continuum/unit_of_work.py b/sqlalchemy_continuum/unit_of_work.py index 49d89f48..daf5a021 100644 --- a/sqlalchemy_continuum/unit_of_work.py +++ b/sqlalchemy_continuum/unit_of_work.py @@ -1,14 +1,15 @@ from copy import copy import sqlalchemy as sa -from sqlalchemy_utils import get_primary_keys, identity +from sqlalchemy_utils import get_primary_keys, identity, has_changes from .operation import Operations from .utils import ( end_tx_column_name, version_class, is_session_modified, tx_column_name, - versioned_column_properties + versioned_column_properties, + commited_identity ) @@ -123,6 +124,25 @@ def create_transaction(self, session): session.add(self.current_transaction) return self.current_transaction + def _sanitize_obj_key(self, target): + """ + The key for target in `self.version_objs` may not be valid if its + primary key has been modified. Check against that and update the key. + """ + key = self._create_key(target, identity(target)) + mapper = sa.inspect(target).mapper + for pk in mapper.primary_key: + if has_changes(target, mapper.get_property_by_column(pk).key): + old_key = self._create_key(target, commited_identity(target)) + if old_key in self.version_objs: + # replace old key with the new one + self.version_objs[key] = self.version_objs.pop(old_key) + break + return key + + def _create_key(self, target, pks): + return version_class(target.__class__), (pks, self.current_transaction.id) + def get_or_create_version_object(self, target): """ Return version object for given parent object. If no version object @@ -130,12 +150,10 @@ def get_or_create_version_object(self, target): :param target: Parent object to create the version object for """ - version_cls = version_class(target.__class__) - version_id = identity(target) + (self.current_transaction.id, ) - version_key = (version_cls, version_id) + version_key = self._sanitize_obj_key(target) if version_key not in self.version_objs: - version_obj = version_cls() + version_obj = version_class(target.__class__)() self.version_objs[version_key] = version_obj self.version_session.add(version_obj) tx_column = self.manager.option( diff --git a/sqlalchemy_continuum/utils.py b/sqlalchemy_continuum/utils.py index e81a04f9..06aa477b 100644 --- a/sqlalchemy_continuum/utils.py +++ b/sqlalchemy_continuum/utils.py @@ -413,3 +413,20 @@ def changeset(obj): if new_value: data[prop.key] = [new_value, old_value] return data + + +def commited_identity(obj): + """Returns a tuple of the primary keys of the object without any + modifications that may have occured within the session + """ + old_pks = [] + obj_inspect = sa.inspect(obj) + mapper = obj_inspect.mapper + for column in get_primary_keys(obj).itervalues(): + old_pk = obj_inspect.attrs.get( + mapper.get_property_by_column(column).key).history.deleted + if old_pk: + old_pks.append(old_pk[0]) + else: + old_pks.append(getattr(obj, column.name)) + return tuple(old_pks) From e4c7bbe2854e5e20bec04eb69c101104c9695044 Mon Sep 17 00:00:00 2001 From: Dimitris Theodorou <dimitris.theodorou@gmail.com> Date: Sun, 28 Dec 2014 03:03:32 +0200 Subject: [PATCH 06/11] Remove redundant function utils.commited_identity is InstanceState.identity --- sqlalchemy_continuum/operation.py | 4 +--- sqlalchemy_continuum/unit_of_work.py | 5 ++--- sqlalchemy_continuum/utils.py | 17 ----------------- 3 files changed, 3 insertions(+), 23 deletions(-) diff --git a/sqlalchemy_continuum/operation.py b/sqlalchemy_continuum/operation.py index 0dbaa902..9d67cd55 100644 --- a/sqlalchemy_continuum/operation.py +++ b/sqlalchemy_continuum/operation.py @@ -8,8 +8,6 @@ import sqlalchemy as sa from sqlalchemy_utils import identity, get_primary_keys, has_changes -from .utils import commited_identity - class Operation(object): INSERT = 0 UPDATE = 1 @@ -124,7 +122,7 @@ def _sanitize_keys(self, target): mapper = sa.inspect(target).mapper for pk in mapper.primary_key: if has_changes(target, mapper.get_property_by_column(pk).key): - old_key = target.__class__, commited_identity(target) + old_key = target.__class__, sa.inspect(target).identity if old_key in self.objects: # replace old key with the new one self.objects[key] = self.objects.pop(old_key) diff --git a/sqlalchemy_continuum/unit_of_work.py b/sqlalchemy_continuum/unit_of_work.py index daf5a021..7e570d8e 100644 --- a/sqlalchemy_continuum/unit_of_work.py +++ b/sqlalchemy_continuum/unit_of_work.py @@ -8,8 +8,7 @@ version_class, is_session_modified, tx_column_name, - versioned_column_properties, - commited_identity + versioned_column_properties ) @@ -133,7 +132,7 @@ def _sanitize_obj_key(self, target): mapper = sa.inspect(target).mapper for pk in mapper.primary_key: if has_changes(target, mapper.get_property_by_column(pk).key): - old_key = self._create_key(target, commited_identity(target)) + old_key = self._create_key(target, sa.inspect(target).identity) if old_key in self.version_objs: # replace old key with the new one self.version_objs[key] = self.version_objs.pop(old_key) diff --git a/sqlalchemy_continuum/utils.py b/sqlalchemy_continuum/utils.py index 06aa477b..e81a04f9 100644 --- a/sqlalchemy_continuum/utils.py +++ b/sqlalchemy_continuum/utils.py @@ -413,20 +413,3 @@ def changeset(obj): if new_value: data[prop.key] = [new_value, old_value] return data - - -def commited_identity(obj): - """Returns a tuple of the primary keys of the object without any - modifications that may have occured within the session - """ - old_pks = [] - obj_inspect = sa.inspect(obj) - mapper = obj_inspect.mapper - for column in get_primary_keys(obj).itervalues(): - old_pk = obj_inspect.attrs.get( - mapper.get_property_by_column(column).key).history.deleted - if old_pk: - old_pks.append(old_pk[0]) - else: - old_pks.append(getattr(obj, column.name)) - return tuple(old_pks) From 6bca6e22c0e1cc2aeb44b42fc7ad79ed031b31ae Mon Sep 17 00:00:00 2001 From: Dimitris Theodorou <dimitris.theodorou@gmail.com> Date: Mon, 29 Dec 2014 18:14:06 +0200 Subject: [PATCH 07/11] Don't save versions when an object is created and deleted in the same trasaction Added tests for the create/delete and update/delete within the same transaction cases. Added new operation type STALE_DATA that marks an object's version entry as stale that should be removed. Added method to remove version object in the uow. --- sqlalchemy_continuum/operation.py | 10 ++++- .../plugins/transaction_changes.py | 3 +- sqlalchemy_continuum/unit_of_work.py | 39 +++++++++++++------ tests/test_delete.py | 32 +++++++++++++++ 4 files changed, 71 insertions(+), 13 deletions(-) diff --git a/sqlalchemy_continuum/operation.py b/sqlalchemy_continuum/operation.py index 9d67cd55..05728ffa 100644 --- a/sqlalchemy_continuum/operation.py +++ b/sqlalchemy_continuum/operation.py @@ -12,6 +12,7 @@ class Operation(object): INSERT = 0 UPDATE = 1 DELETE = 2 + STALE_VERSION = -1 def __init__(self, target, type): self.target = target @@ -111,7 +112,14 @@ def add_update(self, target): self.add(Operation(target, operation)) def add_delete(self, target): - self.add(Operation(target, Operation.DELETE)) + if target in self and \ + self[self.format_key(target)].type == Operation.INSERT: + # if the target's existing operation is INSERT, it is being + # deleted within the same transaction and no version entry + # should be persisted + self.add(Operation(target, Operation.STALE_VERSION)) + else: + self.add(Operation(target, Operation.DELETE)) def _sanitize_keys(self, target): """The operations key for target may not be valid if this target is in diff --git a/sqlalchemy_continuum/plugins/transaction_changes.py b/sqlalchemy_continuum/plugins/transaction_changes.py index cece8d5b..f665ca9a 100644 --- a/sqlalchemy_continuum/plugins/transaction_changes.py +++ b/sqlalchemy_continuum/plugins/transaction_changes.py @@ -107,6 +107,7 @@ def after_version_class_built(self, parent_cls, version_cls): primaryjoin=( self.model_class.transaction_id == transaction_column ), - foreign_keys=[self.model_class.transaction_id] + foreign_keys=[self.model_class.transaction_id], + passive_deletes='all' ) parent_cls.__versioned__['transaction_changes'] = self.model_class diff --git a/sqlalchemy_continuum/unit_of_work.py b/sqlalchemy_continuum/unit_of_work.py index 7e570d8e..0bac7789 100644 --- a/sqlalchemy_continuum/unit_of_work.py +++ b/sqlalchemy_continuum/unit_of_work.py @@ -2,7 +2,7 @@ import sqlalchemy as sa from sqlalchemy_utils import get_primary_keys, identity, has_changes -from .operation import Operations +from .operation import Operations, Operation from .utils import ( end_tx_column_name, version_class, @@ -168,6 +168,20 @@ def get_or_create_version_object(self, target): else: return self.version_objs[version_key] + def delete_version_object(self, target): + """ + Delete version object for `target` parent object, if a version object + exists. + + :param target: Parent object for which the version object should be + removed + """ + version_key = self._sanitize_obj_key(target) + version_obj = self.version_objs.pop(version_key, None) + if version_obj is not None: + self.version_session.delete(version_obj) + + def process_operation(self, operation): """ Process given operation object. The operation processing has x stages: @@ -181,18 +195,21 @@ def process_operation(self, operation): :param operation: Operation object """ target = operation.target - version_obj = self.get_or_create_version_object(target) - version_obj.operation_type = operation.type - self.assign_attributes(target, version_obj) + if operation.type == Operation.STALE_VERSION: + self.delete_version_object(target) + else: + version_obj = self.get_or_create_version_object(target) + version_obj.operation_type = operation.type + self.assign_attributes(target, version_obj) - self.manager.plugins.after_create_version_object( - self, target, version_obj - ) - if self.manager.option(target, 'strategy') == 'validity': - self.update_version_validity( - target, - version_obj + self.manager.plugins.after_create_version_object( + self, target, version_obj ) + if self.manager.option(target, 'strategy') == 'validity': + self.update_version_validity( + target, + version_obj + ) operation.processed = True def create_version_objects(self, session): diff --git a/tests/test_delete.py b/tests/test_delete.py index efc4f628..ae5f3c26 100644 --- a/tests/test_delete.py +++ b/tests/test_delete.py @@ -27,6 +27,38 @@ def test_creates_versions_on_delete(self): assert versions[1].name == u'Some article' assert versions[1].content == u'Some content' + def test_insert_delete_in_single_transaction(self): + """Test that when an object is created and then deleted within the + same transaction, no history entry is created. + """ + article = self.Article(name=u'Article name') + self.session.add(article) + self.session.flush() + + self.session.delete(article) + self.session.commit() + + ArticleVersion = version_class(self.Article) + assert self.session.query(ArticleVersion).count() == 0 + + def test_update_delete_in_single_transaction(self): + """Test that when an object is updated and then deleted within the + same transaction, the operation type DELETE is stored. + """ + article = self.Article(name=u'Article name') + self.session.add(article) + self.session.commit() + + article.name = u'Updated name' + self.session.flush() + self.session.delete(article) + self.session.commit() + + ArticleVersion = version_class(self.Article) + versions_query = self.session.query(self.ArticleVersion) + assert versions_query.count() == 2 + assert versions_query[1].operation_type == Operation.DELETE + def test_modify_primary_key(self): """Test that modifying the primary key within the same transaction maintains correct delete behavior""" From 73fe0dbc8ffe06c01733f11440298a58fb796a23 Mon Sep 17 00:00:00 2001 From: Dimitris Theodorou <dimitris.theodorou@gmail.com> Date: Mon, 29 Dec 2014 22:46:35 +0200 Subject: [PATCH 08/11] Update the postgres UPDATE trigger Modify the UPDATE trigger to: 1. Leave operation type as INSERT for existing versions whose operation type is INSERT. This fixes the missing INSERT version. 2. Use the OLD primary keys to check if a version already exists. This fixes behavior on primary key updates 3. Added documentation --- sqlalchemy_continuum/dialects/postgresql.py | 97 ++++++++++++++++++++- 1 file changed, 96 insertions(+), 1 deletion(-) diff --git a/sqlalchemy_continuum/dialects/postgresql.py b/sqlalchemy_continuum/dialects/postgresql.py index f24d9077..cd97a5c9 100644 --- a/sqlalchemy_continuum/dialects/postgresql.py +++ b/sqlalchemy_continuum/dialects/postgresql.py @@ -29,6 +29,30 @@ WHERE NOT EXISTS (SELECT 1 FROM upsert); """ +update_upsert_cte_sql = """ +WITH upsert as +( + UPDATE {version_table_name} + SET {update_values} + WHERE + {transaction_column} = transaction_id_value + AND + ( + ({old_primary_key_criteria}) + OR + ({new_primary_key_criteria}) + ) + RETURNING * +) +INSERT INTO {version_table_name} +({transaction_column}, {operation_type_column}, {column_names}) +SELECT + transaction_id_value, + {operation_type}, + {insert_values} +WHERE NOT EXISTS (SELECT 1 FROM upsert); +""" + temporary_transaction_sql = """ CREATE TEMP TABLE IF NOT EXISTS {temporary_transaction_table} ({transaction_table_columns}) @@ -205,6 +229,8 @@ class UpsertSQL(SQLConstruct): 'primary_key_criteria': ' AND ', } + upsert_cte_sql = upsert_cte_sql + def __init__(self, *args, **kwargs): SQLConstruct.__init__(self, *args, **kwargs) @@ -267,7 +293,7 @@ def __str__(self): for key, join_operator in self.builders.items(): params[key] = join_operator.join(getattr(self, key)) - sql = upsert_cte_sql.format(**params) + sql = self.upsert_cte_sql.format(**params) return sql @@ -301,7 +327,37 @@ def build_mod_tracking_values(self): class UpdateUpsertSQL(UpsertSQL): + """ + AFTER UPDATE on parent_table: + exists (OLD.[pks] OR NEW.[pks], current_tx_id) in version_table? + (Normally we expect to find the OLD.[pks] in the version table. + However, the NEW.[pks] can already exist in the version table in the + following edge case: + the record with the OLD pks was deleted and a different record was updated + to highjack the OLD pks. In this case the value of OLD.[pks] is irrelevant) + No: + INSERT with operation_type = 1 + Yes: + we have one of the following scenarios: + if existing operation_type = 0 UPDATE with operation_type = 0 + (means that the record is new in this transaction but is being + updated. its version should remain with operation type INSERT) + if existing operation_type = 1 UPDATE with operation_type = 1 + (a 2nd update to the same object in the same transaction) + if existing operation_type = 2 UPDATE with operation_type = 1 + (this is the case described in the justification of why we check for + NEW.[pks]. This is also an UPDATE) + """ operation_type = 1 + upsert_cte_sql = update_upsert_cte_sql + + @property + def builders(self): + builders = super(UpdateUpsertSQL, self).builders.copy() + del builders['primary_key_criteria'] + builders.update(old_primary_key_criteria=' AND ', + new_primary_key_criteria=' AND ') + return builders def build_mod_tracking_values(self): return [ @@ -309,6 +365,45 @@ def build_mod_tracking_values(self): .format(c.name) for c in self.columns_without_pks ] + def build_new_primary_key_criteria(self): + return [ + '"{name}" = NEW."{name}"'.format(name=c.name) + for c in self.columns if c.primary_key + ] + + def build_old_primary_key_criteria(self): + return [ + '"{name}" = OLD."{name}"'.format(name=c.name) + for c in self.columns if c.primary_key + ] + + def build_update_values(self): + parent_columns = [ + '"{name}" = NEW."{name}"'.format(name=c.name) + for c in self.columns + ] + mod_columns = [] + if self.use_property_mod_tracking: + mod_columns = [ + '{0}_mod = {0}_mod OR OLD."{0}" IS DISTINCT FROM NEW."{0}"' + .format(c.name) + for c in self.columns_without_pks + ] + + operation_type_update = """{operation_type} = ( + CASE + WHEN {operation_type} = 2 THEN 1 + WHEN {operation_type} = 0 THEN 0 + ELSE 1 + END +)""".format(operation_type=self.operation_type_column_name) + + return ( + [operation_type_update] + + parent_columns + + mod_columns + ) + class ValiditySQL(SQLConstruct): @property From 998c8bdf4a9ba6f3c3d1c767b55ca5cebf5e6f6c Mon Sep 17 00:00:00 2001 From: Dimitris Theodorou <dimitris.theodorou@gmail.com> Date: Mon, 29 Dec 2014 23:09:15 +0200 Subject: [PATCH 09/11] Update the postgres DELETE trigger 1. Delete versions that already exist with an operation type of INSERT 2. Added documentation --- sqlalchemy_continuum/dialects/postgresql.py | 52 ++++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/sqlalchemy_continuum/dialects/postgresql.py b/sqlalchemy_continuum/dialects/postgresql.py index cd97a5c9..5b86898d 100644 --- a/sqlalchemy_continuum/dialects/postgresql.py +++ b/sqlalchemy_continuum/dialects/postgresql.py @@ -53,6 +53,39 @@ WHERE NOT EXISTS (SELECT 1 FROM upsert); """ +delete_upsert_cte_sql = """ +WITH delete_stale as ( + DELETE FROM {version_table_name} + WHERE + {transaction_column} = transaction_id_value + AND + {primary_key_criteria} + AND + "operation_type" = 0 + RETURNING * +), upsert as +( + UPDATE {version_table_name} + SET {update_values} + WHERE + {transaction_column} = transaction_id_value + AND + {primary_key_criteria} + RETURNING * +) +INSERT INTO {version_table_name} +({transaction_column}, {operation_type_column}, {column_names}) +SELECT + transaction_id_value, + {operation_type}, + {insert_values} +WHERE + NOT EXISTS (SELECT 1 from delete_stale) + AND + NOT EXISTS (SELECT 1 FROM upsert); +""" + + temporary_transaction_sql = """ CREATE TEMP TABLE IF NOT EXISTS {temporary_transaction_table} ({transaction_table_columns}) @@ -298,7 +331,24 @@ def __str__(self): class DeleteUpsertSQL(UpsertSQL): + """ + AFTER DELETE on parent_table: + exists (OLD.[pks], current_tx_id) in version_table? + No: + INSERT with operation_type = 2 + Yes: + we have one of the following scenarios: + if existing operation_type = 0, DELETE version entry + (means that a new record has been created but now is being deleted + in the same transaction. No version should be created) + if existing operation_type = 1, UPDATE with operation_type = 2 + (an object has been updated but is now being deleted) + if existing operation_type == 2, UPDATE with operation_type = 2 + (not sure if this can happen, however a second DELETE of the same + PKs still results in a DELETE) + """ operation_type = 2 + upsert_cte_sql = delete_upsert_cte_sql def build_primary_key_criteria(self): return [ @@ -310,7 +360,7 @@ def build_mod_tracking_values(self): return ['True'] * len(self.columns_without_pks) def build_update_values(self): - return [ + return ['%s = 2' % self.operation_type_column_name] + [ '"{name}" = OLD."{name}"'.format(name=c.name) for c in self.columns ] From 454f957494327e8fb4a63618ab4acf74d3c3b1ce Mon Sep 17 00:00:00 2001 From: Dimitris Theodorou <dimitris.theodorou@gmail.com> Date: Mon, 29 Dec 2014 23:10:45 +0200 Subject: [PATCH 10/11] Document postgres INSERT trigger --- sqlalchemy_continuum/dialects/postgresql.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/sqlalchemy_continuum/dialects/postgresql.py b/sqlalchemy_continuum/dialects/postgresql.py index 5b86898d..f6a0ed01 100644 --- a/sqlalchemy_continuum/dialects/postgresql.py +++ b/sqlalchemy_continuum/dialects/postgresql.py @@ -370,6 +370,16 @@ def build_values(self): class InsertUpsertSQL(UpsertSQL): + """ + AFTER INSERT on parent_table: + exists (NEW.[pks], current_tx_id) in version_table? + No: + INSERT with operation_type = 0 + Yes: + (means target was deleted and re-inserted in the same transaction, + so its actually an update) + UPDATE with operation_type = 1 + """ operation_type = 0 def build_mod_tracking_values(self): From 95123e68a391338f9855e79cbead0a5bf93f9dfe Mon Sep 17 00:00:00 2001 From: Dimitris Theodorou <dimitris.theodorou@gmail.com> Date: Mon, 29 Dec 2014 23:30:27 +0200 Subject: [PATCH 11/11] add comment in test --- tests/test_exotic_operation_combos.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_exotic_operation_combos.py b/tests/test_exotic_operation_combos.py index 8a24f947..3f25a816 100644 --- a/tests/test_exotic_operation_combos.py +++ b/tests/test_exotic_operation_combos.py @@ -40,6 +40,9 @@ def test_insert_deleted_and_flushed_object(self): assert article2.versions[1].operation_type == 1 def test_replace_deleted_object_with_update(self): + """Test that deleting an object and hijacking its primary key results + in turning the operation_type = 2 to an operation_type = 1 + """ article = self.Article() article.name = u'Some article' article.content = u'Some content'