Commit 33cd4b94 authored by Serge S. Koval's avatar Serge S. Koval

Simple pymongo backend

parent 728fbf69
import pymongo
from bson.objectid import ObjectId
from flask import Flask
from flask.ext import admin, wtf
from flask.ext.admin.form import Select2Widget
from flask.ext.admin.contrib.pymongo import ModelView, filters
from flask.ext.admin.model import fields
# Create application
app = Flask(__name__)
# Create dummy secrey key so we can use sessions
app.config['SECRET_KEY'] = '123456790'
# Create models
conn = pymongo.Connection()
db = conn.test
# User admin
class InnerForm(wtf.Form):
name = wtf.TextField('Name')
test = wtf.TextField('Test')
class UserForm(wtf.Form):
name = wtf.TextField('Name')
email = wtf.TextField('Email')
password = wtf.TextField('Password')
# Inner form
inner = fields.InlineFormField(InnerForm)
# Form list
form_list = fields.InlineFieldList(fields.InlineFormField(InnerForm))
class UserView(ModelView):
list_columns = ('name', 'email', 'password')
sortable_columns = ('name', 'email', 'password')
form = UserForm
# Tweet view
class TweetForm(wtf.Form):
name = wtf.TextField('Name')
user_id = wtf.SelectField('User', widget=Select2Widget())
text = wtf.TextField('Text')
class TweetView(ModelView):
list_columns = ('name', 'user_name', 'text')
sortable_columns = ('name', 'text')
column_filters = (filters.FilterEqual('name', 'Name'),
filters.FilterNotEqual('name', 'Name'),
filters.FilterLike('name', 'Name'),
filters.FilterNotLike('name', 'Name'))
form = TweetForm
def get_list(self, *args, **kwargs):
count, data = super(TweetView, self).get_list(*args, **kwargs)
# Grab user names
query = {'_id': {'$in': [x['user_id'] for x in data]}}
users = db.user.find(query, fields=('name',))
# Contribute user names to the models
users_map = dict((x['_id'], x['name']) for x in users)
for item in data:
item['user_name'] = users_map.get(item['user_id'])
return count, data
# Contribute list of user choices to the forms
def _feed_user_choices(self, form):
users = db.user.find(fields=('name',))
form.user_id.choices = [(str(x['_id']), x['name']) for x in users]
return form
def create_form(self):
form = super(TweetView, self).create_form()
return self._feed_user_choices(form)
def edit_form(self, obj):
form = super(TweetView, self).edit_form(obj)
return self._feed_user_choices(form)
# Correct user_id reference before saving
def on_model_change(self, form, model):
user_id = model.get('user_id')
if isinstance(user_id, basestring):
model['user_id'] = ObjectId(user_id)
return model
# Flask views
@app.route('/')
def index():
return '<a href="/admin/">Click me to get to Admin!</a>'
if __name__ == '__main__':
# Create admin
admin = admin.Admin(app, 'Simple Models')
# Add views
admin.add_view(UserView(db.user, 'User'))
admin.add_view(TweetView(db.tweet, 'Tweets'))
# Start app
app.debug = True
app.run('0.0.0.0', 8000)
from flask.ext.admin.babel import gettext from flask.ext.admin.babel import gettext
from flask.ext.admin.model import filters from flask.ext.admin.model import filters
from .tools import parse_like_term from .tools import parse_like_term
......
...@@ -92,8 +92,6 @@ class FilterConverter(filters.BaseFilterConverter): ...@@ -92,8 +92,6 @@ class FilterConverter(filters.BaseFilterConverter):
numeric = (FilterEqual, FilterNotEqual, FilterGreater, FilterSmaller) numeric = (FilterEqual, FilterNotEqual, FilterGreater, FilterSmaller)
def convert(self, type_name, column, name): def convert(self, type_name, column, name):
#print type_name, column, name
if type_name in self.converters: if type_name in self.converters:
return self.converters[type_name](column, name) return self.converters[type_name](column, name)
......
try:
import pymongo
except ImportError:
raise Exception('Please install pymongo in order to use peewee integration')
from .view import ModelView
import re
from flask.ext.admin.babel import gettext
from flask.ext.admin.model import filters
from .tools import parse_like_term
class BasePyMongoFilter(filters.BaseFilter):
"""
Base pymongo filter.
"""
def __init__(self, column, name, options=None, data_type=None):
"""
Constructor.
:param column:
Document field name
:param name:
Display name
:param options:
Fixed set of options
:param data_type:
Client data type
"""
super(BasePyMongoFilter, self).__init__(name, options, data_type)
self.column = column
# Common filters
class FilterEqual(BasePyMongoFilter):
def apply(self, query, value):
query.append({self.column: value})
return query
def operation(self):
return gettext('equals')
class FilterNotEqual(BasePyMongoFilter):
def apply(self, query, value):
query.append({self.column: {'$ne': value}})
return query
def operation(self):
return gettext('not equal')
class FilterLike(BasePyMongoFilter):
def apply(self, query, value):
regex = parse_like_term(value)
query.append({self.column: {'$regex': regex}})
return query
def operation(self):
return gettext('contains')
class FilterNotLike(BasePyMongoFilter):
def apply(self, query, value):
regex = parse_like_term(value)
query.append({self.column: {'$not': re.compile(regex)}})
return query
def operation(self):
return gettext('not contains')
class FilterGreater(BasePyMongoFilter):
def apply(self, query, value):
query.append({self.column: {'$gt': value}})
return query
def operation(self):
return gettext('greater than')
class FilterSmaller(BasePyMongoFilter):
def apply(self, query, value):
query.append({self.column: {'$lt': value}})
return query
def operation(self):
return gettext('smaller than')
# Customized type filters
class BooleanEqualFilter(FilterEqual, filters.BaseBooleanFilter):
pass
class BooleanNotEqualFilter(FilterNotEqual, filters.BaseBooleanFilter):
pass
def parse_like_term(term):
"""
Parse search term into (operation, term) tuple
:param term:
Search term
"""
if term.startswith('^'):
return '^%s' % term[1]
elif term.startswith('='):
return '^%s$' % term[1:]
return '%s' % term
import logging
import pymongo
from bson.objectid import ObjectId
from flask import flash
from jinja2 import contextfunction
from flask.ext.admin.babel import gettext, ngettext, lazy_gettext
from flask.ext.admin.model import BaseModelView
from flask.ext.admin.actions import action
from .filters import BasePyMongoFilter
class ModelView(BaseModelView):
"""
MongoEngine model scaffolding.
"""
column_filters = None
"""
Collection of the column filters.
Should contain instances of
:class:`flask.ext.admin.contrib.pymongo.filters.BasePyMongoFilter`
classes.
For example::
class MyModelView(BaseModelView):
column_filters = (BooleanEqualFilter(User.name, 'Name'),)
"""
def __init__(self, coll, name,
category=None, endpoint=None, url=None):
"""
Constructor
:param coll:
MongoDB collection object
:param name:
Display name
:param category:
Display category
:param endpoint:
Endpoint
:param url:
Custom URL
"""
self._search_fields = []
if endpoint is None:
endpoint = ('%sview' % coll.name).lower()
super(ModelView, self).__init__(None, name, category, endpoint, url)
self.coll = coll
def scaffold_pk(self):
return '_id'
def get_pk_value(self, model):
"""
Return primary key value from the model instance
:param model:
Model instance
"""
return model.get('_id')
def scaffold_list_columns(self):
"""
Scaffold list columns
"""
raise NotImplemented()
def scaffold_sortable_columns(self):
"""
Return sortable columns dictionary (name, field)
"""
return []
def init_search(self):
"""
Init search
"""
if self.searchable_columns:
for p in self.searchable_columns:
if not isinstance(p, basestring):
raise ValueError('Expected string')
# TODO: Validation?
self._search_fields.append(p)
return bool(self._search_fields)
def scaffold_filters(self, attr):
"""
Return filter object(s) for the field
:param name:
Either field name or field instance
"""
raise NotImplemented()
def is_valid_filter(self, filter):
"""
Validate if it is valid MongoEngine filter
:param filter:
Filter object
"""
return isinstance(filter, BasePyMongoFilter)
def scaffold_form(self):
raise NotImplemented()
@contextfunction
def get_list_value(self, context, model, name):
"""
Returns value to be displayed in list view
:param context:
:py:class:`jinja2.runtime.Context`
:param model:
Model instance
:param name:
Field name
"""
column_fmt = self.list_formatters.get(name)
if column_fmt is not None:
return column_fmt(context, model, name)
value = model.get(name)
type_fmt = self.list_type_formatters.get(type(value))
if type_fmt is not None:
value = type_fmt(value)
return value
def get_list(self, page, sort_column, sort_desc, search, filters,
execute=True):
"""
Get list of objects from MongoEngine
:param page:
Page number
:param sort_column:
Sort column
:param sort_desc:
Sort descending
:param search:
Search criteria
:param filters:
List of applied fiters
:param execute:
Run query immediately or not
"""
query = {}
# Filters
if self._filters:
data = []
for flt, value in filters:
f = self._filters[flt]
data = f.apply(data, value)
if data:
if len(data) == 1:
query = data[0]
else:
query['$AND'] = data
# TODO: Search
# Get count
count = self.coll.find(query).count()
# Sorting
sort_by = None
if sort_column:
sort_by = [(sort_column, pymongo.DESCENDING if sort_desc else pymongo.ASCENDING)]
# Pagination
skip = None
if page is not None:
skip = page * self.page_size
results = self.coll.find(query, sort=sort_by, skip=skip, limit=self.page_size)
if execute:
results = list(results)
return count, results
def get_one(self, id):
"""
Return single model instance by ID
:param id:
Model ID
"""
# TODO: Validate if it is valid ID
return self.coll.find_one({'_id': ObjectId(id)})
def edit_form(self, obj):
"""
Create edit form from the MongoDB document
"""
return self._edit_form_class(**obj)
def create_model(self, form):
"""
Create model helper
:param form:
Form instance
"""
try:
model = form.data
self.on_model_change(form, model)
self.coll.insert(model)
return True
except Exception, ex:
flash(gettext('Failed to create model. %(error)s', error=str(ex)),
'error')
logging.exception('Failed to create model')
return False
def update_model(self, form, model):
"""
Update model helper
:param form:
Form instance
:param model:
Model instance to update
"""
try:
model.update(form.data)
self.on_model_change(form, model)
pk = self.get_pk_value(model)
self.coll.update({'_id': pk}, model)
return True
except Exception, ex:
flash(gettext('Failed to update model. %(error)s', error=str(ex)),
'error')
logging.exception('Failed to update model')
return False
def delete_model(self, model):
"""
Delete model helper
:param model:
Model instance
"""
try:
pk = self.get_pk_value(model)
if not pk:
raise ValueError('Document does not have _id')
self.on_model_delete(model)
self.coll.remove({'_id': pk})
return True
except Exception, ex:
flash(gettext('Failed to delete model. %(error)s', error=str(ex)),
'error')
logging.exception('Failed to delete model')
return False
# Default model actions
def is_action_allowed(self, name):
# Check delete action permission
if name == 'delete' and not self.can_delete:
return False
return super(ModelView, self).is_action_allowed(name)
@action('delete',
lazy_gettext('Delete'),
lazy_gettext('Are you sure you want to delete selected models?'))
def action_delete(self, ids):
try:
count = 0
# TODO: Optimize me
for pk in ids:
self.coll.remove({'_id': ObjectId(pk)})
count += 1
flash(ngettext('Model was successfully deleted.',
'%(count)s models were successfully deleted.',
count,
count=count))
except Exception, ex:
flash(gettext('Failed to delete models. %(error)s', error=str(ex)),
'error')
...@@ -91,7 +91,9 @@ class Select2Widget(widgets.Select): ...@@ -91,7 +91,9 @@ class Select2Widget(widgets.Select):
work. work.
""" """
def __call__(self, field, **kwargs): def __call__(self, field, **kwargs):
if field.allow_blank and not self.multiple: allow_blank = getattr(field, 'allow_blank', False)
if allow_blank and not self.multiple:
kwargs['data-role'] = u'select2blank' kwargs['data-role'] = u'select2blank'
else: else:
kwargs['data-role'] = u'select2' kwargs['data-role'] = u'select2'
...@@ -106,7 +108,7 @@ class Select2Field(fields.SelectField): ...@@ -106,7 +108,7 @@ class Select2Field(fields.SelectField):
You must include select2.js, form.js and select2 stylesheet for it to You must include select2.js, form.js and select2 stylesheet for it to
work. work.
""" """
widget = Select2Widget widget = Select2Widget()
class DatePickerWidget(widgets.TextInput): class DatePickerWidget(widgets.TextInput):
......
...@@ -85,3 +85,10 @@ class InlineModelFormField(FormField): ...@@ -85,3 +85,10 @@ class InlineModelFormField(FormField):
for name, field in self.form._fields.iteritems(): for name, field in self.form._fields.iteritems():
if name != self._pk: if name != self._pk:
field.populate_obj(obj, name) field.populate_obj(obj, name)
class InlineFormField(FormField):
"""
Inline version of the ``FormField`` widget.
"""
widget = InlineFormWidget()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment