Commit b98bb7a3 authored by Serge S. Koval's avatar Serge S. Koval

Minor sqla refactoring, sqla tests.

parent f44613ed
from sqlalchemy.orm.exc import NoResultFound
from wtforms import ValidationError, fields, validators
from wtforms.ext.sqlalchemy.orm import converts, ModelConverter
from wtforms.ext.sqlalchemy.fields import QuerySelectField, QuerySelectMultipleField
from flask.ext.adminex import form
class Unique(object):
"""Checks field value unicity against specified table field.
:param get_session:
A function that return a SQAlchemy Session.
:param model:
The model to check unicity against.
:param column:
The unique column.
:param message:
The error message.
"""
field_flags = ('unique', )
def __init__(self, db_session, model, column, message=None):
self.db_session = db_session
self.model = model
self.column = column
self.message = message
def __call__(self, form, field):
try:
obj = (self.db_session.query(self.model)
.filter(self.column == field.data).one())
if not hasattr(form, '_obj') or not form._obj == obj:
if self.message is None:
self.message = field.gettext(u'Already exists.')
raise ValidationError(self.message)
except NoResultFound:
pass
class AdminModelConverter(ModelConverter):
"""
SQLAlchemy model to form converter
"""
def __init__(self, view):
super(AdminModelConverter, self).__init__()
self.view = view
def _get_label(self, name, field_args):
if 'label' in field_args:
return field_args['label']
if self.view.rename_columns:
return self.view.rename_columns.get(name)
return None
def convert(self, model, mapper, prop, field_args):
kwargs = {
'validators': [],
'filters': []
}
if field_args:
kwargs.update(field_args)
if hasattr(prop, 'direction'):
remote_model = prop.mapper.class_
local_column = prop.local_remote_pairs[0][0]
kwargs.update({
'allow_blank': local_column.nullable,
'label': self._get_label(prop.key, kwargs),
'query_factory': lambda: self.view.session.query(remote_model)
})
if local_column.nullable:
kwargs['validators'].append(validators.Optional())
else:
kwargs['validators'].append(validators.Required())
if prop.direction.name == 'MANYTOONE':
return QuerySelectField(widget=form.ChosenSelectWidget(),
**kwargs)
elif prop.direction.name == 'ONETOMANY':
# Skip backrefs
if not local_column.foreign_keys and self.view.hide_backrefs:
return None
return QuerySelectMultipleField(
widget=form.ChosenSelectWidget(multiple=True),
**kwargs)
elif prop.direction.name == 'MANYTOMANY':
return QuerySelectMultipleField(
widget=form.ChosenSelectWidget(multiple=True),
**kwargs)
else:
# Ignore pk/fk
if hasattr(prop, 'columns'):
column = prop.columns[0]
if column.foreign_keys or column.primary_key:
return None
# If field is unique, validate it
if column.unique:
kwargs['validators'].append(Unique(self.view.session,
model,
column))
if not column.nullable:
kwargs['validators'].append(validators.Required())
# Apply label
kwargs['label'] = self._get_label(prop.key, kwargs)
return super(AdminModelConverter, self).convert(model,
mapper,
prop,
kwargs)
@converts('Date')
def convert_date(self, field_args, **extra):
field_args['widget'] = form.DatePickerWidget()
return fields.DateField(**field_args)
@converts('DateTime')
def convert_datetime(self, field_args, **extra):
field_args['widget'] = form.DateTimePickerWidget()
return fields.DateTimeField(**field_args)
@converts('Time')
def convert_time(self, field_args, **extra):
return form.TimeField(**field_args)
from sqlalchemy.orm.attributes import InstrumentedAttribute from sqlalchemy.orm.attributes import InstrumentedAttribute
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.orm import subqueryload from sqlalchemy.orm import subqueryload
from sqlalchemy.sql.expression import desc from sqlalchemy.sql.expression import desc
from sqlalchemy import or_ from sqlalchemy import or_
from wtforms.ext.sqlalchemy.orm import model_form
from wtforms import ValidationError, fields, validators
from wtforms.ext.sqlalchemy.orm import model_form, converts, ModelConverter
from wtforms.ext.sqlalchemy.fields import QuerySelectField, QuerySelectMultipleField
from flask import flash from flask import flash
from flask.ext.adminex import form from flask.ext.adminex.form import BaseForm
from flask.ext.adminex.model import BaseModelView from flask.ext.adminex.model import BaseModelView
from flask.ext.adminex.ext.sqlamodel import filters, tools
class Unique(object):
"""Checks field value unicity against specified table field.
:param get_session:
A function that return a SQAlchemy Session.
:param model:
The model to check unicity against.
:param column:
The unique column.
:param message:
The error message.
"""
field_flags = ('unique', )
def __init__(self, db_session, model, column, message=None):
self.db_session = db_session
self.model = model
self.column = column
self.message = message
def __call__(self, form, field):
try:
obj = (self.db_session.query(self.model)
.filter(self.column == field.data).one())
if not hasattr(form, '_obj') or not form._obj == obj:
if self.message is None:
self.message = field.gettext(u'Already exists.')
raise ValidationError(self.message)
except NoResultFound:
pass
class AdminModelConverter(ModelConverter):
"""
SQLAlchemy model to form converter
"""
def __init__(self, view):
super(AdminModelConverter, self).__init__()
self.view = view
def _get_label(self, name, field_args):
if 'label' in field_args:
return field_args['label']
if self.view.rename_columns:
return self.view.rename_columns.get(name)
return None
def convert(self, model, mapper, prop, field_args):
kwargs = {
'validators': [],
'filters': []
}
if field_args:
kwargs.update(field_args)
if hasattr(prop, 'direction'):
remote_model = prop.mapper.class_
local_column = prop.local_remote_pairs[0][0]
kwargs.update({
'allow_blank': local_column.nullable,
'label': self._get_label(prop.key, kwargs),
'query_factory': lambda: self.view.session.query(remote_model)
})
if local_column.nullable:
kwargs['validators'].append(validators.Optional())
else:
kwargs['validators'].append(validators.Required())
if prop.direction.name == 'MANYTOONE':
return QuerySelectField(widget=form.ChosenSelectWidget(),
**kwargs)
elif prop.direction.name == 'ONETOMANY':
# Skip backrefs
if not local_column.foreign_keys and self.view.hide_backrefs:
return None
return QuerySelectMultipleField(
widget=form.ChosenSelectWidget(multiple=True),
**kwargs)
elif prop.direction.name == 'MANYTOMANY':
return QuerySelectMultipleField(
widget=form.ChosenSelectWidget(multiple=True),
**kwargs)
else:
# Ignore pk/fk
if hasattr(prop, 'columns'):
column = prop.columns[0]
if column.foreign_keys or column.primary_key:
return None
# If field is unique, validate it
if column.unique:
kwargs['validators'].append(Unique(self.view.session,
model,
column))
if not column.nullable:
kwargs['validators'].append(validators.Required())
# Apply label
kwargs['label'] = self._get_label(prop.key, kwargs)
return super(AdminModelConverter, self).convert(model,
mapper,
prop,
kwargs)
@converts('Date')
def convert_date(self, field_args, **extra):
field_args['widget'] = form.DatePickerWidget()
return fields.DateField(**field_args)
@converts('DateTime')
def convert_datetime(self, field_args, **extra):
field_args['widget'] = form.DateTimePickerWidget()
return fields.DateTimeField(**field_args)
@converts('Time') from flask.ext.adminex.ext.sqlamodel import form, filters, tools
def convert_time(self, field_args, **extra):
return form.TimeField(**field_args)
class ModelView(BaseModelView): class ModelView(BaseModelView):
...@@ -441,11 +308,11 @@ class ModelView(BaseModelView): ...@@ -441,11 +308,11 @@ class ModelView(BaseModelView):
Create form from the model. Create form from the model.
""" """
return model_form(self.model, return model_form(self.model,
form.BaseForm, BaseForm,
only=self.form_columns, only=self.form_columns,
exclude=self.excluded_form_columns, exclude=self.excluded_form_columns,
field_args=self.form_args, field_args=self.form_args,
converter=AdminModelConverter(self)) converter=form.AdminModelConverter(self))
def scaffold_auto_joins(self): def scaffold_auto_joins(self):
""" """
......
from nose.tools import eq_, ok_ from nose.tools import eq_, ok_, raises
from flask import Flask from flask import Flask
from flask.helpers import get_flashed_messages from flask.helpers import get_flashed_messages
...@@ -23,6 +23,12 @@ class Form(wtf.Form): ...@@ -23,6 +23,12 @@ class Form(wtf.Form):
col3 = wtf.TextField() col3 = wtf.TextField()
class SimpleFilter(filters.BaseFilter):
def apply(self, query):
query._applied = True
return query
class MockModelView(base.BaseModelView): class MockModelView(base.BaseModelView):
def __init__(self, model, name=None, category=None, endpoint=None, url=None, def __init__(self, model, name=None, category=None, endpoint=None, url=None,
**kwargs): **kwargs):
...@@ -57,8 +63,8 @@ class MockModelView(base.BaseModelView): ...@@ -57,8 +63,8 @@ class MockModelView(base.BaseModelView):
def init_search(self): def init_search(self):
return bool(self.searchable_columns) return bool(self.searchable_columns)
def scaffold_filters(self): def scaffold_filters(self, name):
return None return [SimpleFilter(name)]
def scaffold_sortable_columns(self): def scaffold_sortable_columns(self):
return ['col1', 'col2', 'col3'] return ['col1', 'col2', 'col3']
...@@ -189,6 +195,14 @@ def test_permissions(): ...@@ -189,6 +195,14 @@ def test_permissions():
eq_(rv.status_code, 302) eq_(rv.status_code, 302)
@raises(Exception)
def test_no_pk():
app, admin = setup()
view = MockModelView(Model, scaffold_pk=lambda: None)
admin.add_view(view)
def test_templates(): def test_templates():
app, admin = setup() app, admin = setup()
...@@ -260,3 +274,27 @@ def test_searchable_columns(): ...@@ -260,3 +274,27 @@ def test_searchable_columns():
admin.add_view(view) admin.add_view(view)
eq_(view._search_supported, True) eq_(view._search_supported, True)
# TODO: Make calls with search
def test_column_filters():
app, admin = setup()
view = MockModelView(Model, column_filters=['col1', 'col2'])
admin.add_view(view)
eq_(len(view._filters), 2)
eq_(view._filters[0].name, 'col1')
eq_(view._filters[1].name, 'col2')
eq_(view._filter_names, ['col1', 'col2'])
# TODO: Make calls with filters
def test_form():
# TODO: form_columns
# TODO: excluded_form_columns
# TODO: form_args
pass
from nose.tools import eq_, ok_, raises
from flask import Flask
from flask.ext import wtf
from flask.ext.sqlalchemy import SQLAlchemy
from flask.ext.adminex import Admin
from flask.ext.adminex.ext.sqlamodel import ModelView
class CustomModelView(ModelView):
def __init__(self, model, session,
name=None, category=None, endpoint=None, url=None,
**kwargs):
for k, v in kwargs.iteritems():
setattr(self, k, v)
super(CustomModelView, self).__init__(model, session,
name, category,
endpoint, url)
def create_models(db):
class Model1(db.Model):
id = db.Column(db.Integer, primary_key=True)
test1 = db.Column(db.String(20))
test2 = db.Column(db.Unicode(20))
test3 = db.Column(db.Text)
test4 = db.Column(db.UnicodeText)
db.create_all()
return Model1
def setup():
app = Flask(__name__)
app.config['SECRET_KEY'] = '1'
app.config['CSRF_ENABLED'] = False
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///'
db = SQLAlchemy(app)
admin = Admin(app)
return app, db, admin
def test_model():
app, db, admin = setup()
Model1 = create_models(db)
db.create_all()
view = CustomModelView(Model1, db.session)
admin.add_view(view)
eq_(view.model, Model1)
eq_(view.name, 'Model1')
eq_(view.endpoint, 'model1view')
eq_(view._primary_key, 'id')
eq_(view._sortable_columns, dict(test1='test1',
test2='test2',
test3='test3',
test4='test4'))
ok_(view._create_form_class is not None)
ok_(view._edit_form_class is not None)
eq_(view._search_supported, False)
eq_(view._filters, None)
# Verify form
eq_(view._create_form_class.test1.field_class, wtf.TextField)
eq_(view._create_form_class.test2.field_class, wtf.TextField)
eq_(view._create_form_class.test3.field_class, wtf.TextAreaField)
eq_(view._create_form_class.test4.field_class, wtf.TextAreaField)
# Make some test clients
client = app.test_client()
rv = client.get('/admin/model1view/')
eq_(rv.status_code, 200)
rv = client.get('/admin/model1view/new/')
eq_(rv.status_code, 200)
rv = client.post('/admin/model1view/new/',
data=dict(test1='test1large', test2='test2'))
eq_(rv.status_code, 302)
model = db.session.query(Model1).first()
eq_(model.test1, 'test1large')
eq_(model.test2, 'test2')
eq_(model.test3, '')
eq_(model.test4, '')
rv = client.get('/admin/model1view/')
eq_(rv.status_code, 200)
ok_('test1large' in rv.data)
url = '/admin/model1view/edit/%d/' % model.id
rv = client.get(url)
eq_(rv.status_code, 200)
rv = client.post(url,
data=dict(test1='test1small', test2='test2large'))
eq_(rv.status_code, 302)
model = db.session.query(Model1).first()
eq_(model.test1, 'test1small')
eq_(model.test2, 'test2large')
eq_(model.test3, '')
eq_(model.test4, '')
url = '/admin/model1view/delete/%d/' % model.id
rv = client.post(url)
eq_(rv.status_code, 302)
eq_(db.session.query(Model1).count(), 0)
@raises(Exception)
def test_no_pk():
app, db, admin = setup()
class Model(db.Model):
test = db.Column(db.Integer)
view = CustomModelView(Model)
admin.add_view(view)
def test_list_columns():
app, db, admin = setup()
Model1 = create_models(db)
view = CustomModelView(Model1, db.session,
list_columns=['test1', 'test3'],
rename_columns=dict(test1='Column1'))
admin.add_view(view)
eq_(len(view._list_columns), 2)
eq_(view._list_columns, [('test1', 'Column1'), ('test3', 'Test3')])
client = app.test_client()
rv = client.get('/admin/model1view/')
ok_('Column1' in rv.data)
ok_('Test2' not in rv.data)
def test_exclude_columns():
app, db, admin = setup()
Model1 = create_models(db)
view = CustomModelView(Model1, db.session,
excluded_list_columns=['test2', 'test4'])
admin.add_view(view)
eq_(view._list_columns, [('test1', 'Test1'), ('test3', 'Test3')])
def test_searchable_columns():
app, db, admin = setup()
Model1 = create_models(db)
view = CustomModelView(Model1, db.session,
searchable_columns=['test1', 'test2'])
admin.add_view(view)
eq_(view._search_supported, True)
eq_(len(view._search_fields), 2)
ok_(isinstance(view._search_fields[0], db.Column))
ok_(isinstance(view._search_fields[1], db.Column))
eq_(view._search_fields[0].name, 'test1')
eq_(view._search_fields[1].name, 'test2')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment