From c8c58ddce14cffe061bdedd5eb73f3ef9d4b1223 Mon Sep 17 00:00:00 2001 From: Konsta Vesterinen Date: Wed, 27 Jan 2016 16:49:55 +0200 Subject: [PATCH] Add Unique validator --- CHANGES.rst | 3 +- tests/test_phone_number_field.py | 2 +- tests/test_types.py | 6 +- tests/test_unique_validator.py | 375 +++++++++++++++++++++++++++++++ wtforms_alchemy/__init__.py | 8 +- wtforms_alchemy/validators.py | 90 ++++++++ 6 files changed, 476 insertions(+), 8 deletions(-) create mode 100644 tests/test_unique_validator.py create mode 100644 wtforms_alchemy/validators.py diff --git a/CHANGES.rst b/CHANGES.rst index 3dfc159..d41ae93 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -4,12 +4,13 @@ Changelog Here you can see the full list of changes between each WTForms-Alchemy release. -0.15.0 (2016-01-xx) +0.15.0 (2016-01-27) ^^^^^^^^^^^^^^^^^^^ - Moved GroupedQuerySelectField from WTForms-Components package to WTForms-Alchemy - Moved WeekdaysField from WTForms-Components package to WTForms-Alchemy - Moved PhoneNumberField from WTForms-Components package to WTForms-Alchemy +- Moved Unique validator from WTForms-Components package to WTForms-Alchemy 0.14.0 (2016-01-23) diff --git a/tests/test_phone_number_field.py b/tests/test_phone_number_field.py index b998630..29217e4 100644 --- a/tests/test_phone_number_field.py +++ b/tests/test_phone_number_field.py @@ -1,7 +1,7 @@ from wtforms import Form from tests import MultiDict -from wtforms_components import PhoneNumberField +from wtforms_alchemy import PhoneNumberField class TestPhoneNumberField(object): diff --git a/tests/test_types.py b/tests/test_types.py index 981d5eb..abc1167 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -28,19 +28,19 @@ EmailField, IntegerField, IntIntervalField, - PhoneNumberField, StringField, TimeField ) -from wtforms_components.fields.weekdays import WeekDaysField from tests import ModelFormTestCase from wtforms_alchemy import ( CountryField, ModelForm, null_or_unicode, + PhoneNumberField, SelectField, - UnknownTypeException + UnknownTypeException, + WeekDaysField ) from wtforms_alchemy.utils import ClassMap diff --git a/tests/test_unique_validator.py b/tests/test_unique_validator.py new file mode 100644 index 0000000..d8de2c3 --- /dev/null +++ b/tests/test_unique_validator.py @@ -0,0 +1,375 @@ +import sqlalchemy as sa +from pytest import mark, raises +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import relationship +from wtforms import Form +from wtforms.ext.sqlalchemy.fields import QuerySelectField +from wtforms.fields import TextField + +from tests import MultiDict +from wtforms_alchemy import ModelForm, Unique + +base = declarative_base() + + +class Color(base): + __tablename__ = 'color' + id = sa.Column(sa.Integer, primary_key=True) + name = sa.Column(sa.Unicode(255), unique=True) + + +class User(base): + __tablename__ = 'event' + id = sa.Column(sa.Integer, primary_key=True) + name = sa.Column(sa.Unicode(255), unique=True) + email = sa.Column(sa.Unicode(255)) + favorite_color_id = sa.Column(sa.Integer, sa.ForeignKey(Color.id)) + favorite_color = relationship(Color) + + +class TestUniqueValidator(object): + def create_models(self): + # This is a hack so we can use our classes + # without initializing self first + self.base = base + + def setup_method(self, method): + self.engine = sa.create_engine('sqlite:///:memory:') + + self.base = declarative_base() + self.create_models() + + self.base.metadata.create_all(self.engine) + + Session = sa.orm.session.sessionmaker(bind=self.engine) + self.session = Session() + + def teardown_method(self, method): + self.session.close_all() + self.base.metadata.drop_all(self.engine) + self.engine.dispose() + + def _test_syntax(self, column, expected_dict): + class MyForm(ModelForm): + name = TextField() + email = TextField() + + validator = Unique( + column, + get_session=lambda: self.session + ) + form = MyForm() + if not hasattr(form, 'Meta'): + form.Meta = lambda: None + form.Meta.model = User + result = validator._syntaxes_as_tuples(form, form.name, column) + assert result == expected_dict + + def test_with_form_obj_unavailable(self): + class MyForm(Form): + name = TextField( + validators=[ + Unique(User.name, get_session=lambda: self.session) + ] + ) + + form = MyForm() + with raises(Exception) as e: + form.validate() + assert "Couldn't access Form._obj attribute" in str(e) + + @mark.parametrize(['column', 'expected_dict'], ( + (User.name, (('name', User.name),)), + ('name', (('name', User.name),)), + (('name', 'email'), (('name', User.name), ('email', User.email))), + ({'exampleName': User.name}, (('exampleName', User.name),)), + ( + (User.name, User.email), + (('name', User.name), ('email', User.email)) + ), + ( + (User.name, User.favorite_color), + (('name', User.name), ('favorite_color', User.favorite_color)) + ), + )) + def test_columns_as_tuples(self, column, expected_dict): + self._test_syntax(column, expected_dict) + + def test_columns_as_tuples_classical_mapping(self): + users = sa.Table( + 'users', + sa.MetaData(None), + sa.Column('name', sa.Unicode(255)) + ) + self._test_syntax( + users.c.name, + (('name', users.c.name),) + ) + + @mark.parametrize('column', ( + User.name, + {'name': User.name}, + (('name', User.name),) + )) + def test_raises_exception_if_improperly_configured(self, column): + class MyForm(ModelForm): + name = TextField( + validators=[Unique( + column, + )] + ) + with raises(Exception): + MyForm().validate() + + def test_raises_exception_string_if_improperly_configured(self): + class MyForm(ModelForm): + name = TextField( + validators=[Unique( + ('name', 'email'), + )] + ) + with raises(Exception): + MyForm().validate() + + def test_existing_name_collision(self): + class MyForm(ModelForm): + name = TextField( + validators=[Unique( + User.name, + get_session=lambda: self.session + )] + ) + + self.session.add(User(name=u'someone')) + self.session.commit() + + form = MyForm(MultiDict({'name': u'someone'})) + form.validate() + assert form.errors == {'name': [u'Already exists.']} + + def test_existing_name_collision_multiple(self): + class MyForm(ModelForm): + name = TextField( + validators=[Unique( + [User.name, User.email], + get_session=lambda: self.session + )] + ) + email = TextField() + + self.session.add(User( + name=u'someone', + email=u'someone@example.com' + )) + self.session.commit() + + form = MyForm(MultiDict({ + 'name': u'someone', + 'email': u'someone@example.com' + })) + form.validate() + assert form.errors == {'name': [u'Already exists.']} + + def test_works_with_flask_sqlalchemy_syntax(self, monkeypatch): + monkeypatch.setattr(User, 'query', self.session.query(User), False) + + class MyForm(ModelForm): + name = TextField( + validators=[Unique( + [User.name, User.email], + get_session=lambda: self.session + )] + ) + email = TextField() + + self.session.add(User( + name=u'someone', + email=u'someone@example.com' + )) + self.session.commit() + + form = MyForm(MultiDict({ + 'name': u'someone', + 'email': u'someone@example.com' + })) + form.validate() + assert form.errors == {'name': [u'Already exists.']} + + def test_existing_name_collision_classical_mapping(self): + sa.Table( + 'user', + sa.MetaData(None), + sa.Column('name', sa.String(255)), + sa.Column('email', sa.String(255)) + ) + + class MyForm(ModelForm): + name = TextField( + validators=[Unique( + [User.name, User.email], + get_session=lambda: self.session + )] + ) + email = TextField() + + self.session.add(User( + name=u'someone', + email=u'someone@example.com' + )) + self.session.commit() + + form = MyForm(MultiDict({ + 'name': u'someone', + 'email': u'someone@example.com' + })) + form.validate() + assert form.errors == {'name': [u'Already exists.']} + + def test_relationship_multiple_collision(self): + class MyForm(ModelForm): + name = TextField( + validators=[Unique( + [User.name, User.favorite_color], + get_session=lambda: self.session + )] + ) + email = TextField() + favorite_color = QuerySelectField( + query_factory=lambda: self.session.query(Color).all(), + allow_blank=True + ) + + red_color = Color(name='red') + blue_color = Color(name='blue') + self.session.add(red_color) + self.session.add(blue_color) + self.session.add(User( + name=u'someone', + email=u'first.email@example.com', + favorite_color=red_color + )) + self.session.commit() + + form = MyForm(MultiDict({ + 'name': u'someone', + 'email': u'second.email@example.com', + 'favorite_color': str(red_color.id) + })) + form.validate() + assert form.errors == {'name': [u'Already exists.']} + + def test_relationship_multiple_no_collision(self): + class MyForm(ModelForm): + name = TextField( + validators=[Unique( + [User.name, User.favorite_color], + get_session=lambda: self.session + )] + ) + email = TextField() + favorite_color = QuerySelectField( + query_factory=lambda: self.session.query(Color).all(), + allow_blank=True + ) + + red_color = Color(name='red') + blue_color = Color(name='blue') + self.session.add(red_color) + self.session.add(blue_color) + self.session.add(User( + name=u'someone', + email=u'first.email@example.com', + favorite_color=red_color + )) + self.session.commit() + + form = MyForm(MultiDict({ + 'name': u'someone', + 'email': u'second.email@example.com', + 'favorite_color': str(blue_color.id) + })) + form.validate() + assert form.errors == {} + + def test_without_obj_without_collision(self): + class MyForm(ModelForm): + name = TextField( + validators=[Unique( + User.name, + get_session=lambda: self.session + )] + ) + + self.session.add(User(name=u'someone else')) + self.session.commit() + + form = MyForm(MultiDict({'name': u'someone'})) + assert form.validate() + + def test_without_obj_without_collision_multiple(self): + class MyForm(ModelForm): + name = TextField( + validators=[Unique( + [User.name, User.email], + get_session=lambda: self.session + )] + ) + email = TextField() + + self.session.add(User( + name=u'someone', + email=u'someone@example.com' + )) + self.session.commit() + + form = MyForm(MultiDict({ + 'name': u'someone', + 'email': u'else@example.com' + })) + assert form.validate() + + def test_existing_name_no_collision(self): + class MyForm(ModelForm): + name = TextField( + validators=[Unique( + User.name, + get_session=lambda: self.session + )] + ) + + obj = User(name=u'someone') + self.session.add(obj) + + form = MyForm(MultiDict({'name': u'someone'}), obj=obj) + assert form.validate() + + def test_existing_name_no_collision_multiple(self): + class MyForm(ModelForm): + name = TextField( + validators=[Unique( + (User.name, User.email), + get_session=lambda: self.session + )] + ) + email = TextField() + + obj = User(name=u'someone', email=u'hello@world.com') + self.session.add(obj) + + form = MyForm(MultiDict( + {'name': u'someone', 'email': 'hello@world.com'} + ), obj=obj) + assert form.validate() + + def test_supports_model_query_parameter(self): + User.query = self.session.query(User) + + class MyForm(ModelForm): + name = TextField( + validators=[Unique( + User.name, + )] + ) + + form = MyForm(MultiDict({'name': u'someone'})) + assert form.validate() diff --git a/wtforms_alchemy/__init__.py b/wtforms_alchemy/__init__.py index 8812869..e42f008 100644 --- a/wtforms_alchemy/__init__.py +++ b/wtforms_alchemy/__init__.py @@ -16,8 +16,7 @@ PhoneNumberField, SelectField, SelectMultipleField, - TimeRange, - Unique + TimeRange ) from .exc import ( @@ -31,8 +30,10 @@ GroupedQuerySelectField, ModelFieldList, ModelFormField, + PhoneNumberField, QuerySelectField, - QuerySelectMultipleField + QuerySelectMultipleField, + WeekDaysField ) from .generator import FormGenerator from .utils import ( @@ -42,6 +43,7 @@ null_or_int, null_or_unicode ) +from .validators import Unique # noqa __all__ = ( AttributeTypeException, diff --git a/wtforms_alchemy/validators.py b/wtforms_alchemy/validators.py new file mode 100644 index 0000000..81a99bb --- /dev/null +++ b/wtforms_alchemy/validators.py @@ -0,0 +1,90 @@ +from collections import Iterable, Mapping + +import six +from sqlalchemy import Column +from sqlalchemy.orm.attributes import InstrumentedAttribute +from wtforms import ValidationError + + +class Unique(object): + """Checks field values unicity against specified table fields. + + :param column: + InstrumentedAttribute object, eg. User.name, or + Column object, eg. user.c.name, or + a field name, eg. 'name' or + a tuple of InstrumentedAttributes, eg. (User.name, User.email) or + a dictionary mapping field names to InstrumentedAttributes, eg. + { + 'name': User.name, + 'email': User.email + } + :param get_session: + A function that returns a SQAlchemy Session. This parameter is not + needed if the given model supports Flask-SQLAlchemy styled query + parameter. + :param message: + The error message. + """ + field_flags = ('unique', ) + + def __init__(self, column, get_session=None, message=None): + self.column = column + self.message = message + self.get_session = get_session + + @property + def query(self): + self._check_for_session(self.model) + if self.get_session: + return self.get_session().query(self.model) + elif hasattr(self.model, 'query'): + return getattr(self.model, 'query') + else: + raise Exception( + 'Validator requires either get_session or Flask-SQLAlchemy' + ' styled query parameter' + ) + + def _check_for_session(self, model): + if not hasattr(model, 'query') and not self.get_session: + raise Exception('Could not obtain SQLAlchemy session.') + + def _syntaxes_as_tuples(self, form, field, column): + """Converts a set of different syntaxes into a tuple of tuples""" + if isinstance(column, six.string_types): + return ((column, getattr(form.Meta.model, column)),) + elif isinstance(column, Mapping): + return tuple( + (x[0], self._syntaxes_as_tuples(form, field, x[1])[0][1]) + for x in column.items() + ) + elif isinstance(column, Iterable): + return tuple( + self._syntaxes_as_tuples(form, field, x)[0] + for x in column + ) + elif isinstance(column, (Column, InstrumentedAttribute)): + return ((column.key, column),) + else: + raise TypeError("Invalid syntax for column") + + def __call__(self, form, field): + columns = self._syntaxes_as_tuples(form, field, self.column) + self.model = columns[0][1].class_ + query = self.query + for field_name, column in columns: + query = query.filter(column == form[field_name].data) + obj = query.first() + + if not hasattr(form, '_obj'): + raise Exception( + "Couldn't access Form._obj attribute. Either make your form " + "inherit WTForms-Alchemy ModelForm or WTForms-Components " + "ModelForm or make this attribute available in your form." + ) + + if obj and not form._obj == obj: + if self.message is None: + self.message = field.gettext(u'Already exists.') + raise ValidationError(self.message)