Commit 1b06aa63 authored by Serge S. Koval's avatar Serge S. Koval

Peewee backend (WIP)

parent 5bec9db7
Flask-Admin is maintained by Serge S. Koval and various contributors:
Development Lead
````````````````
- Serge S. Koval <serge.koval+github@gmail.com>
Patches and Suggestions
```````````````````````
- Priit Laes <plaes@plaes.org>
- Andy Wilson <wilson.andrew.j+github@gmail.com>
- Mikhail Krivushin <krivushinme@gmail.com>
- Dmitry Medvinsky <dmedvinsky@gmail.com>
- Salem Harrache <salem@harrache.info>
- F. Gabriel Gosselin
- Leonardinius <leonids.maslovs@galeoconsulting.com>
from flask import Flask
import peewee
from flask.ext import admin
from flask.ext.admin.contrib import peeweemodel
app = Flask(__name__)
app.config['SECRET_KEY'] = '123456790'
db = peewee.SqliteDatabase('test.sqlite', check_same_thread=False)
class BaseModel(peewee.Model):
class Meta:
database = db
class User(BaseModel):
username = peewee.CharField(max_length=80)
email = peewee.CharField(max_length=120)
def __unicode__(self):
return self.username
class Post(BaseModel):
title = peewee.CharField(max_length=120)
text = peewee.TextField(null=False)
date = peewee.DateTimeField()
user = peewee.ForeignKeyField(User)
def __unicode__(self):
return self.title
class PostAdmin(peeweemodel.ModelView):
# Visible columns in the list view
#list_columns = ('title', 'user')
excluded_list_columns = ['text']
# List of columns that can be sorted. For 'user' column, use User.username as
# a column.
#sortable_columns = ('title', ('user', User.username), 'date')
#searchable_columns = ('title', User.username)
searchable_columns = ('title',)
column_filters = ('title',
'date')
@app.route('/')
def index():
return '<a href="/admin/">Click me to get to Admin!</a>'
if __name__ == '__main__':
admin = admin.Admin(app, 'Peewee Models')
admin.add_view(peeweemodel.ModelView(User))
admin.add_view(PostAdmin(Post))
try:
User.create_table()
Post.create_table()
except:
pass
app.debug = True
app.run('0.0.0.0', 8000)
from flask.ext.admin.babel import gettext
from flask.ext.admin.model import filters
from peewee import Q
def parse_like_term(term):
if term.startswith('^'):
stmt = '%s%%' % term[1:]
elif term.startswith('='):
stmt = term[1:]
else:
stmt = '%%%s%%' % term
return stmt
class BasePeeweeFilter(filters.BaseFilter):
"""
Base SQLAlchemy filter.
"""
def __init__(self, column, name, options=None, data_type=None):
"""
Constructor.
`column`
Model field
`name`
Display name
`options`
Fixed set of options
`data_type`
Client data type
"""
super(BasePeeweeFilter, self).__init__(name, options, data_type)
self.column = column
# Common filters
class FilterEqual(BasePeeweeFilter):
def apply(self, query, value):
stmt = '%s__eq' % self.column.name
return query.where(**{stmt: value})
def operation(self):
return gettext('equals')
class FilterNotEqual(BasePeeweeFilter):
def apply(self, query, value):
stmt = '%s__neq' % self.column.name
return query.where(**{stmt: value})
def operation(self):
return gettext('not equal')
class FilterLike(BasePeeweeFilter):
def apply(self, query, value):
stmt = '%s__icontains' % self.column.name
val = parse_like_term(value)
return query.where(**{stmt: val})
def operation(self):
return gettext('contains')
class FilterNotLike(BasePeeweeFilter):
def apply(self, query, value):
stmt = '%s__icontains' % self.column.name
val = parse_like_term(value)
node = ~Q(**{stmt: val})
return query.where(node)
def operation(self):
return gettext('not contains')
class FilterGreater(BasePeeweeFilter):
def apply(self, query, value):
stmt = '%s__gt' % self.column.name
return query.where(**{stmt: value})
def operation(self):
return gettext('greater than')
class FilterSmaller(BasePeeweeFilter):
def apply(self, query, value):
stmt = '%s__lt' % self.column.name
return query.where(**{stmt: value})
def operation(self):
return gettext('smaller than')
# Customized type filters
class BooleanEqualFilter(FilterEqual, filters.BaseBooleanFilter):
pass
class BooleanNotEqualFilter(FilterNotEqual, filters.BaseBooleanFilter):
pass
# Base peewee filter field converter
class FilterConverter(filters.BaseFilterConverter):
strings = (FilterEqual, FilterNotEqual, FilterLike, FilterNotLike)
numeric = (FilterEqual, FilterNotEqual, FilterGreater, FilterSmaller)
def convert(self, type_name, column, name):
print type_name, column, name
if type_name in self.converters:
return self.converters[type_name](column, name)
return None
@filters.convert('CharField', 'TextField')
def conv_string(self, column, name):
return [f(column, name) for f in self.strings]
@filters.convert('BooleanField')
def conv_bool(self, column, name):
return [BooleanEqualFilter(column, name),
BooleanNotEqualFilter(column, name)]
@filters.convert('IntegerField', 'DecimalField', 'FloatField')
def conv_int(self, column, name):
return [f(column, name) for f in self.numeric]
@filters.convert('DateField')
def conv_date(self, column, name):
return [f(column, name, data_type='datepicker') for f in self.numeric]
@filters.convert('DateTimeField')
def conv_datetime(self, column, name):
return [f(column, name, data_type='datetimepicker')
for f in self.numeric]
from wtforms import fields
from peewee import DateTimeField, DateField, TimeField
from wtfpeewee.orm import ModelConverter
from flask.ext.admin import form
class CustomModelConverter(ModelConverter):
def __init__(self, additional=None):
super(CustomModelConverter, self).__init__(additional)
self.converters[DateTimeField] = self.handle_datetime
self.converters[DateField] = self.handle_date
self.converters[TimeField] = self.handle_time
def handle_date(self, model, field, **kwargs):
kwargs['widget'] = form.DatePickerWidget()
return field.name, fields.DateField(**kwargs)
def handle_datetime(self, model, field, **kwargs):
kwargs['widget'] = form.DateTimePickerWidget()
return field.name, fields.DateTimeField(**kwargs)
def handle_time(self, model, field, **kwargs):
return field.name, form.TimeField(**kwargs)
from flask import flash
from flask.ext.admin import form
from flask.ext.admin.babel import gettext, ngettext, lazy_gettext
from flask.ext.admin.model import BaseModelView
from peewee import PrimaryKeyField, ForeignKeyField, Field, CharField, TextField, Q
from wtfpeewee.orm import model_form
from flask.ext.admin.contrib.peeweemodel import filters
from .form import CustomModelConverter
class ModelView(BaseModelView):
column_filters = None
"""
Collection of the column filters.
Can contain either field names or instances of :class:`flask.ext.admin.contrib.sqlamodel.filters.BaseFilter` classes.
For example::
class MyModelView(BaseModelView):
column_filters = ('user', 'email')
or::
class MyModelView(BaseModelView):
column_filters = (BooleanEqualFilter(User.name, 'Name'))
"""
filter_converter = filters.FilterConverter()
"""
Field to filter converter.
Override this attribute to use non-default converter.
"""
def __init__(self, model, name=None,
category=None, endpoint=None, url=None):
super(ModelView, self).__init__(model, name, category, endpoint, url)
self._primary_key = self.scaffold_pk()
def _get_model_fields(self, model=None):
if model is None:
model = self.model
return model._meta.get_sorted_fields()
def scaffold_pk(self):
for n, f in self._get_model_fields():
if type(f) == PrimaryKeyField:
return n
return None
def get_pk_value(self, model):
return getattr(model, self._primary_key)
def scaffold_list_columns(self):
columns = []
for n, f in self._get_model_fields():
# Filter by name
if (self.excluded_list_columns and
n in self.excluded_list_columns):
continue
# Verify type
field_class = type(f)
if field_class == ForeignKeyField:
columns.append(n)
elif field_class != PrimaryKeyField:
columns.append(n)
return columns
def scaffold_sortable_columns(self):
columns = dict()
for n, f in self._get_model_fields():
if type(f) != PrimaryKeyField:
columns[n] = f
return columns
def init_search(self):
self._search_fields = []
if self.searchable_columns:
for p in self.searchable_columns:
if isinstance(p, basestring):
p = getattr(self.model, p)
field_type = type(p)
# Check type
if (field_type != CharField and
field_type != TextField):
raise Exception('Can only search on text columns. ' +
'Failed to setup search for "%s"' % p)
# Try to find reference from this model to the field
if p.model != self.model:
path = self._find_field(self.model, p, set())
if path is None:
raise Exception('Can not find relation path from the %s' +
'to the %s.%s' % (self.model, p.model, p.name))
self._search_fields.append(path)
else:
self._search_fields.append(p.name)
return bool(self._search_fields)
def _find_field(self, model, field, visited, path=None):
def make_path(n):
if path:
return '%s__%s' % (path, n)
else:
return n
for n, p in self._get_model_fields(model):
if p.model == model and p.name == field.name:
return make_path(n)
if type(p) == ForeignKeyField:
if p.to not in visited:
visited.add(p.to)
result = self._find_field(p.to, field, visited,
make_path(n))
if result is not None:
return result
return None
def scaffold_filters(self, name):
if isinstance(name, basestring):
attr = getattr(self.model, name, None)
else:
attr = name
if attr is None:
raise Exception('Failed to find field for filter: %s' % name)
if not isinstance(name, basestring):
visible_name = self.get_column_name(attr.name)
else:
visible_name = self.get_column_name(name)
type_name = type(attr).__name__
flt = self.filter_converter.convert(type_name,
attr,
visible_name)
if flt:
# TODO: Related table search
pass
return flt
def is_valid_filter(self, filter):
return isinstance(filter, filters.BasePeeweeFilter)
def scaffold_form(self):
return model_form(self.model,
base_class=form.BaseForm,
only=self.form_columns,
exclude=self.excluded_form_columns,
field_args=self.form_args,
converter=CustomModelConverter())
def get_list(self, page, sort_column, sort_desc, search, filters,
execute=True):
query = self.model.select()
# Search
if self._search_supported and search:
terms = search.split(' ')
for term in terms:
if not term:
continue
stmt = None
for field in self._search_fields:
flt = '%s__icontains' % field
q = Q(**{flt: term})
#print flt, term
if stmt is None:
stmt = q
else:
stmt |= q
query = query.where(stmt)
# Filters
if self._filters:
for flt, value in filters:
query = self._filters[flt].apply(query, value)
# Get count
count = query.count()
# Apply sorting
if sort_column is not None:
sort_field = self._sortable_columns[sort_column]
if isinstance(sort_field, basestring):
query = query.order_by((sort_field, sort_desc and 'desc' or 'asc'))
elif isinstance(sort_field, Field):
query = query.order_by((sort_column, sort_desc and 'desc' or 'asc'))
# Pagination
if page is not None:
query = query.offset(page * self.page_size)
query = query.limit(self.page_size)
if execute:
query = query.execute()
return count, query
def get_one(self, id):
return self.model.get(**{self._primary_key: id})
def create_model(self, form):
try:
model = self.model()
form.populate_obj(model)
model.save()
return True
except Exception, ex:
flash(gettext('Failed to create model. %(error)s', error=str(ex)), 'error')
return False
def update_model(self, form, model):
"""
Update model from form.
`form`
Form instance
"""
try:
form.populate_obj(model)
model.save()
return True
except Exception, ex:
flash(gettext('Failed to update model. %(error)s', error=str(ex)), 'error')
return False
def delete_model(self, model):
try:
model.delete_instance(recursive=True)
return True
except Exception, ex:
flash(gettext('Failed to delete model. %(error)s', error=str(ex)), 'error')
return False
"""
Useful form fields for use with SQLAlchemy ORM.
"""
import operator
from wtforms import widgets
from wtforms.fields import SelectFieldBase
from wtforms.validators import ValidationError
try:
from sqlalchemy.orm.util import identity_key
has_identity_key = True
except ImportError:
has_identity_key = False
__all__ = (
'QuerySelectField', 'QuerySelectMultipleField',
)
class QuerySelectField(SelectFieldBase):
"""
Will display a select drop-down field to choose between ORM results in a
sqlalchemy `Query`. The `data` property actually will store/keep an ORM
model instance, not the ID. Submitting a choice which is not in the query
will result in a validation error.
This field only works for queries on models whose primary key column(s)
have a consistent string representation. This means it mostly only works
for those composed of string, unicode, and integer types. For the most
part, the primary keys will be auto-detected from the model, alternately
pass a one-argument callable to `get_pk` which can return a unique
comparable key.
The `query` property on the field can be set from within a view to assign
a query per-instance to the field. If the property is not set, the
`query_factory` callable passed to the field constructor will be called to
obtain a query.
Specify `get_label` to customize the label associated with each option. If
a string, this is the name of an attribute on the model object to use as
the label text. If a one-argument callable, this callable will be passed
model instance and expected to return the label text. Otherwise, the model
object's `__str__` or `__unicode__` will be used.
If `allow_blank` is set to `True`, then a blank choice will be added to the
top of the list. Selecting this choice will result in the `data` property
being `None`. The label for this blank choice can be set by specifying the
`blank_text` parameter.
"""
widget = widgets.Select()
def __init__(self, label=None, validators=None, query_factory=None,
get_pk=None, get_label=None, allow_blank=False,
blank_text=u'', **kwargs):
super(QuerySelectField, self).__init__(label, validators, **kwargs)
self.query_factory = query_factory
if get_pk is None:
if not has_identity_key:
raise Exception('The sqlalchemy identity_key function could not be imported.')
self.get_pk = get_pk_from_identity
else:
self.get_pk = get_pk
if get_label is None:
self.get_label = lambda x: x
elif isinstance(get_label, basestring):
self.get_label = operator.attrgetter(get_label)
else:
self.get_label = get_label
self.allow_blank = allow_blank
self.blank_text = blank_text
self.query = None
self._object_list = None
def _get_data(self):
if self._formdata is not None:
for pk, obj in self._get_object_list():
if pk == self._formdata:
self._set_data(obj)
break
return self._data
def _set_data(self, data):
self._data = data
self._formdata = None
data = property(_get_data, _set_data)
def _get_object_list(self):
if self._object_list is None:
query = self.query or self.query_factory()
get_pk = self.get_pk
self._object_list = list((unicode(get_pk(obj)), obj) for obj in query)
return self._object_list
def iter_choices(self):
if self.allow_blank:
yield (u'__None', self.blank_text, self.data is None)
for pk, obj in self._get_object_list():
yield (pk, self.get_label(obj), obj == self.data)
def process_formdata(self, valuelist):
if valuelist:
if self.allow_blank and valuelist[0] == u'__None':
self.data = None
else:
self._data = None
self._formdata = valuelist[0]
def pre_validate(self, form):
if not self.allow_blank or self.data is not None:
for pk, obj in self._get_object_list():
if self.data == obj:
break
else:
raise ValidationError(self.gettext(u'Not a valid choice'))
class QuerySelectMultipleField(QuerySelectField):
"""
Very similar to QuerySelectField with the difference that this will
display a multiple select. The data property will hold a list with ORM
model instances and will be an empty list when no value is selected.
If any of the items in the data list or submitted form data cannot be
found in the query, this will result in a validation error.
"""
widget = widgets.Select(multiple=True)
def __init__(self, label=None, validators=None, default=None, **kwargs):
if default is None:
default = []
super(QuerySelectMultipleField, self).__init__(label, validators, default=default, **kwargs)
self._invalid_formdata = False
def _get_data(self):
formdata = self._formdata
if formdata is not None:
data = []
for pk, obj in self._get_object_list():
if not formdata:
break
elif pk in formdata:
formdata.remove(pk)
data.append(obj)
if formdata:
self._invalid_formdata = True
self._set_data(data)
return self._data
def _set_data(self, data):
self._data = data
self._formdata = None
data = property(_get_data, _set_data)
def iter_choices(self):
for pk, obj in self._get_object_list():
yield (pk, self.get_label(obj), obj in self.data)
def process_formdata(self, valuelist):
self._formdata = set(valuelist)
def pre_validate(self, form):
if self._invalid_formdata:
raise ValidationError(self.gettext(u'Not a valid choice'))
elif self.data:
obj_list = list(x[1] for x in self._get_object_list())
for v in self.data:
if v not in obj_list:
raise ValidationError(self.gettext('Not a valid choice'))
def get_pk_from_identity(obj):
cls, key = identity_key(instance=obj)
return u':'.join(unicode(x) for x in key)
...@@ -158,7 +158,7 @@ class AdminModelConverter(ModelConverterBase): ...@@ -158,7 +158,7 @@ class AdminModelConverter(ModelConverterBase):
return fields.TextField(**field_args) return fields.TextField(**field_args)
@converts('Text', 'UnicodeText', @converts('Text', 'UnicodeText',
'types.LargeBinary', 'types.Binary') 'sqlalchemy.types.LargeBinary', 'sqlalchemy.types.Binary')
def conv_Text(self, field_args, **extra): def conv_Text(self, field_args, **extra):
self._string_common(field_args=field_args, **extra) self._string_common(field_args=field_args, **extra)
return fields.TextAreaField(**field_args) return fields.TextAreaField(**field_args)
......
from sqlalchemy.orm.exc import NoResultFound
from wtforms import ValidationError
class Unique(object):
"""Checks field value unicity against specified table field.
:param get_session:
A function that return a SQAlchemy Session.
:param model:
The model to check unicity against.
:param column:
The unique column.
:param message:
The error message.
"""
field_flags = ('unique', )
def __init__(self, db_session, model, column, message=None):
self.db_session = db_session
self.model = model
self.column = column
self.message = message
def __call__(self, form, field):
try:
obj = (self.db_session.query(self.model)
.filter(self.column == field.data).one())
if not hasattr(form, '_obj') or not form._obj == obj:
if self.message is None:
self.message = field.gettext(u'Already exists.')
raise ValidationError(self.message)
except NoResultFound:
pass
import inspect
from flask.ext.admin.form import BaseForm
def converts(*args):
def _inner(func):
func._converter_for = frozenset(args)
return func
return _inner
class ModelConverterBase(object):
def __init__(self, converters=None, use_mro=True):
self.use_mro = use_mro
if not converters:
converters = {}
for name in dir(self):
obj = getattr(self, name)
if hasattr(obj, '_converter_for'):
for classname in obj._converter_for:
converters[classname] = obj
self.converters = converters
def get_converter(self, column):
if self.use_mro:
types = inspect.getmro(type(column.type))
else:
types = [type(column.type)]
# Search by module + name
for col_type in types:
type_string = '%s.%s' % (col_type.__module__, col_type.__name__)
if type_string in self.converters:
return self.converters[type_string]
# Search by name
for col_type in types:
if col_type.__name__ in self.converters:
return self.converters[col_type.__name__]
return None
def get_form(self, model, base_class=BaseForm,
only=None, exclude=None,
field_args=None):
raise NotImplemented()
...@@ -78,6 +78,8 @@ var AdminFilters = function(element, filters_element, adminForm, operations, opt ...@@ -78,6 +78,8 @@ var AdminFilters = function(element, filters_element, adminForm, operations, opt
addFilter(name, operations[name]); addFilter(name, operations[name]);
$('button', $root).show(); $('button', $root).show();
return false;
}); });
$('.filter-op', $root).change(changeOperation); $('.filter-op', $root).change(changeOperation);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment