Commit 84bedef5 authored by Serge S. Koval's avatar Serge S. Koval

Merge pull request #284 from sumpfgottheit/multiple_primary_keys

Multiple Primary keys for sqla backend
parents e5295fe6 1c7b8137
......@@ -103,6 +103,48 @@ you can do something like this::
Check :doc:`api/mod_contrib_sqla` documentation for list of
configuration properties and methods.
Multiple Primary Keys
---------------------
Models with multiple primary keys have limited support, as a few pitfalls are waiting for you.
With using multiple primary keys, weak entities can be used with Flask-Admin.
Lets Model a car with it's tyres::
class Car(db.Model):
__tablename__ = 'cars'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
desc = db.Column(db.String(50))
def __unicode__(self):
return self.desc
class Tyre(db.Model):
__tablename__ = 'tyres'
car_id = db.Column(db.Integer, db.ForeignKey('cars.id'), primary_key=True)
tyre_id = db.Column(db.Integer, primary_key=True)
car = db.relationship('Car', backref='tyres')
desc = db.Column(db.String(50))
A specific tyre is identified by using the two primary key columns of the ``Tyre`` class, of which the ``car_id`` key
is itself a foreign key to the class ``Car``.
To be able to CRUD the ``Tyre`` class, two steps are necessary, when definig the AdminView::
class TyreAdmin(sqla.ModelView):
form_columns = ['car', 'tyre_id', 'desc']
The ``form_columns`` needs to be explizit, as per default only one primary key is displayed. When, like in this
example, one part of the key is a foreign key, do not include the foreign-key-columns here, but the
corresponding relationship.
When having multiple primary keys, **no** validation for uniqueness *prior* to saving of the object will be done. Saving
a model that violates a unique-constraint leads to an Sqlalchemy-Integrity-Error. In this case, ``Flask-Admin`` displays
a proper error message and you can change the data in the form. When the application has been started with ``debug=True``
the ``werkzeug`` debugger catches the exception and displays the stacktrace.
A standalone script with the Examples from above can be found in the examples directory.
Example
-------
......
SQLAlchemy model backend integration example.
\ No newline at end of file
SQLAlchemy model backend integration examples.
from flask import Flask
from flask.ext.sqlalchemy import SQLAlchemy
from flask.ext import admin
from flask.ext.admin.contrib import sqla
# Create application
app = Flask(__name__)
# Create dummy secrey key so we can use sessions
app.config['SECRET_KEY'] = '123456790'
# Create in-memory database
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///test.sqlite'
app.config['SQLALCHEMY_ECHO'] = True
db = SQLAlchemy(app)
# Flask views
@app.route('/')
def index():
return '<a href="/admin/">Click me to get to Admin!</a>'
class Car(db.Model):
__tablename__ = 'cars'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
desc = db.Column(db.String(50))
def __unicode__(self):
return self.desc
class Tyre(db.Model):
__tablename__ = 'tyres'
car_id = db.Column(db.Integer, db.ForeignKey('cars.id'), primary_key=True)
tyre_id = db.Column(db.Integer, primary_key=True)
car = db.relationship('Car', backref='tyres')
desc = db.Column(db.String(50))
class CarAdmin(sqla.ModelView):
column_display_pk = True
form_columns = ['id', 'desc']
class TyreAdmin(sqla.ModelView):
column_display_pk = True
form_columns = ['car', 'tyre_id', 'desc']
if __name__ == '__main__':
# Create admin
admin = admin.Admin(app, 'Simple Models')
admin.add_view(CarAdmin(Car, db.session))
admin.add_view(TyreAdmin(Tyre, db.session))
# Create DB
db.create_all()
# Start app
app.run(debug=True)
......@@ -12,7 +12,7 @@ from flask.ext.admin._compat import iteritems
from .validators import Unique
from .fields import QuerySelectField, QuerySelectMultipleField, InlineModelFormList
from .tools import is_inherited_primary_key, get_column_for_current_model
from .tools import is_inherited_primary_key, get_column_for_current_model, has_multiple_pks
try:
# Field has better input parsing capabilities.
......@@ -166,6 +166,8 @@ class AdminModelConverter(ModelConverterBase):
if prop.key not in form_columns:
return None
# Current Unique Validator does not work with multicolumns-pks
if not has_multiple_pks(model):
kwargs['validators'].append(Unique(self.session,
model,
column))
......
from sqlalchemy import tuple_, or_, and_
from sqlalchemy.sql.operators import eq
from sqlalchemy.exc import DBAPIError
from ast import literal_eval
def parse_like_term(term):
if term.startswith('^'):
stmt = '%s%%' % term[1:]
......@@ -11,19 +16,27 @@ def parse_like_term(term):
def get_primary_key(model):
"""
Return primary key name from a model
Return primary key name from a model. If the primary key consists of multiple columns,
return the corresponding tuple
:param model:
Model class
"""
props = model._sa_class_manager.mapper.iterate_properties
pks = []
for p in props:
if hasattr(p, 'columns'):
for c in p.columns:
if c.primary_key:
return p.key
if hasattr(p, 'expression'): # expression = primary column or expression for this ColumnProperty
if p.expression.primary_key:
if is_inherited_primary_key(p):
pks.append(get_column_for_current_model(p).key)
else:
pks.append(p.columns[0].key)
if len(pks) == 1:
return pks[0]
elif len(pks) > 1:
return tuple(pks)
else:
return None
def is_inherited_primary_key(prop):
......@@ -61,3 +74,73 @@ def get_column_for_current_model(prop):
else:
return candidates[0]
def has_multiple_pks(model):
"""Return True, if the model has more than one primary key
"""
if not hasattr(model, '_sa_class_manager'):
raise TypeError('model must be a sqlalchemy mapped model')
pks = model._sa_class_manager.mapper.primary_key
return len(pks) > 1
def tuple_operator_in(model_pk, ids):
"""The tuple_ Operator only works on certain engines like MySQL or Postgresql. It does not work with sqlite.
The function returns an or_ - operator, that containes and_ - operators for every single tuple in ids.
Example::
model_pk = [ColumnA, ColumnB]
ids = ((1,2), (1,3))
tuple_operator(model_pk, ids) -> or_( and_( ColumnA == 1, ColumnB == 2), and_( ColumnA == 1, ColumnB == 3) )
The returning operator can be used within a filter(), as it is just an or_ operator
"""
l = []
for id in ids:
k = []
for i in range(len(model_pk)):
k.append(eq(model_pk[i],id[i]))
l.append(and_(*k))
if len(l)>=1:
return or_(*l)
else:
return None
def get_query_for_ids(modelquery, model, ids):
"""
Return a query object, that contains all entities of the given model for
the primary keys provided in the ids-parameter.
The ``pks`` parameter is a tuple, that contains the different primary key values,
that should be returned. If the primary key of the model consists of multiple columns
every entry of the ``pks`` parameter must be a tuple containing the columns-values in the
correct order, that make up the primary key of the model
If the model has multiple primary keys, the
`tuple_ <http://docs.sqlalchemy.org/en/latest/core/expression_api.html#sqlalchemy.sql.expression.tuple_>`_
operator will be used. As this operator does not work on certain databases,
notably on sqlite, a workaround function :func:`tuple_operator_in` is provided
that implements the same logic using OR and AND operations.
When having multiple primary keys, the pks are provided as a list of tuple-look-alike-strings,
``[u'(1, 2)', u'(1, 1)']``. These needs to be evaluated into real tuples, where
`Stackoverflow Question 3945856 <http://stackoverflow.com/questions/3945856/converting-string-to-tuple-and-adding-to-tuple>`_
pointed to `Literal Eval <http://docs.python.org/2/library/ast.html#ast.literal_eval>`_, which is now used.
"""
if has_multiple_pks(model):
model_pk = [getattr(model, pk_name).expression for pk_name in get_primary_key(model)]
ids = [literal_eval(id) for id in ids]
try:
query = modelquery.filter(tuple_(*model_pk).in_(ids))
# Only the execution of the query will tell us, if the tuple_
# operator really works
query.all()
except DBAPIError:
query = modelquery.filter(tuple_operator_in(model_pk, ids))
else:
model_pk = getattr(model, get_primary_key(model))
query = modelquery.filter(model_pk.in_(ids))
return query
......@@ -15,7 +15,7 @@ from flask.ext.admin._backwards import ObsoleteAttr
from flask.ext.admin.contrib.sqla import form, filters, tools
from .typefmt import DEFAULT_FORMATTERS
from .tools import is_inherited_primary_key, get_column_for_current_model
from .tools import is_inherited_primary_key, get_column_for_current_model, get_query_for_ids
class ModelView(BaseModelView):
"""
......@@ -287,14 +287,22 @@ class ModelView(BaseModelView):
def scaffold_pk(self):
"""
Return the primary key name from a model
PK can be a single value or a tuple if multiple PKs exist
"""
return tools.get_primary_key(self.model)
def get_pk_value(self, model):
"""
Return the PK value from a model object.
PK can be a single value or a tuple if multiple PKs exist
"""
try:
return getattr(model, self._primary_key)
except TypeError:
v = []
for attr in self._primary_key:
v.append(getattr(model, attr))
return tuple(v)
def scaffold_list_columns(self):
"""
......@@ -850,9 +858,8 @@ class ModelView(BaseModelView):
lazy_gettext('Are you sure you want to delete selected models?'))
def action_delete(self, ids):
try:
model_pk = getattr(self.model, self._primary_key)
query = self.get_query().filter(model_pk.in_(ids))
query = get_query_for_ids(self.get_query(), self.model, ids)
if self.fast_mass_delete:
count = query.delete(synchronize_session=False)
......
......@@ -14,7 +14,7 @@ from flask.ext.admin.helpers import get_form_data, validate_form_on_submit
from flask.ext.admin.tools import rec_getattr
from flask.ext.admin._backwards import ObsoleteAttr
from flask.ext.admin._compat import iteritems, as_unicode
from .helpers import prettify_name
from .helpers import prettify_name, get_mdict_item_or_list
class BaseModelView(BaseView, ActionsMixin):
......@@ -1104,7 +1104,7 @@ class BaseModelView(BaseView, ActionsMixin):
if not self.can_edit:
return redirect(return_url)
id = request.args.get('id')
id = get_mdict_item_or_list(request.args, 'id')
if id is None:
return redirect(return_url)
......@@ -1140,7 +1140,7 @@ class BaseModelView(BaseView, ActionsMixin):
if not self.can_delete:
return redirect(return_url)
id = request.args.get('id')
id = get_mdict_item_or_list(request.args, 'id')
if id is None:
return redirect(return_url)
......
......@@ -8,3 +8,26 @@ def prettify_name(name):
Name to prettify
"""
return name.replace('_', ' ').title()
def get_mdict_item_or_list(mdict, key):
"""
Return the value for the given key of the multidict.
A werkzeug.datastructures.multidict can have a single
value or a list of items. If there is only one item,
return only this item, else the whole list as a tuple
:param mdict: Multidict to search for the key
:type mdict: werkzeug.datastructures.multidict
:param key: key to look for
:return: the value for the key or None if the Key has not be found
"""
if hasattr(mdict, 'getlist'):
v = mdict.getlist(key)
if len(v) == 1:
return v[0]
elif len(v) == 0:
return None
else:
return tuple(v)
return None
......@@ -435,6 +435,42 @@ def test_non_int_pk():
data = rv.data.decode('utf-8')
ok_('test2' in data)
def test_multiple__pk():
# Test multiple primary keys - mix int and string together
app, db, admin = setup()
class Model(db.Model):
id = db.Column(db.Integer, primary_key=True)
id2 = db.Column(db.String(20), primary_key=True)
test = db.Column(db.String)
db.create_all()
view = CustomModelView(Model, db.session, form_columns=['id', 'id2', 'test'])
admin.add_view(view)
client = app.test_client()
rv = client.get('/admin/modelview/')
eq_(rv.status_code, 200)
rv = client.post('/admin/modelview/new/',
data=dict(id=1, id2='two', test='test3'))
eq_(rv.status_code, 302)
rv = client.get('/admin/modelview/')
eq_(rv.status_code, 200)
data = rv.data.decode('utf-8')
ok_('test3' in data)
rv = client.get('/admin/modelview/edit/?id=1&id=two')
eq_(rv.status_code, 200)
data = rv.data.decode('utf-8')
ok_('test3' in data)
# Correct order is mandatory -> fail here
rv = client.get('/admin/modelview/edit/?id=two&id=1')
eq_(rv.status_code, 302)
def test_form_columns():
app, db, admin = setup()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment