Commit 57ff13b7 authored by Florian Sachs's avatar Florian Sachs

Merge branch 'multiple_primary_keys'

parents 563e441a 1c7b8137
from sqlalchemy import tuple_, or_, and_
from sqlalchemy.sql.operators import eq
from sqlalchemy.exc import DBAPIError
from ast import literal_eval
def parse_like_term(term): def parse_like_term(term):
if term.startswith('^'): if term.startswith('^'):
stmt = '%s%%' % term[1:] stmt = '%s%%' % term[1:]
...@@ -77,3 +82,65 @@ def has_multiple_pks(model): ...@@ -77,3 +82,65 @@ def has_multiple_pks(model):
raise TypeError('model must be a sqlalchemy mapped model') raise TypeError('model must be a sqlalchemy mapped model')
pks = model._sa_class_manager.mapper.primary_key pks = model._sa_class_manager.mapper.primary_key
return len(pks) > 1 return len(pks) > 1
def tuple_operator_in(model_pk, ids):
"""The tuple_ Operator only works on certain engines like MySQL or Postgresql. It does not work with sqlite.
The function returns an or_ - operator, that containes and_ - operators for every single tuple in ids.
Example::
model_pk = [ColumnA, ColumnB]
ids = ((1,2), (1,3))
tuple_operator(model_pk, ids) -> or_( and_( ColumnA == 1, ColumnB == 2), and_( ColumnA == 1, ColumnB == 3) )
The returning operator can be used within a filter(), as it is just an or_ operator
"""
l = []
for id in ids:
k = []
for i in range(len(model_pk)):
k.append(eq(model_pk[i],id[i]))
l.append(and_(*k))
if len(l)>=1:
return or_(*l)
else:
return None
def get_query_for_ids(modelquery, model, ids):
"""
Return a query object, that contains all entities of the given model for
the primary keys provided in the ids-parameter.
The ``pks`` parameter is a tuple, that contains the different primary key values,
that should be returned. If the primary key of the model consists of multiple columns
every entry of the ``pks`` parameter must be a tuple containing the columns-values in the
correct order, that make up the primary key of the model
If the model has multiple primary keys, the
`tuple_ <http://docs.sqlalchemy.org/en/latest/core/expression_api.html#sqlalchemy.sql.expression.tuple_>`_
operator will be used. As this operator does not work on certain databases,
notably on sqlite, a workaround function :func:`tuple_operator_in` is provided
that implements the same logic using OR and AND operations.
When having multiple primary keys, the pks are provided as a list of tuple-look-alike-strings,
``[u'(1, 2)', u'(1, 1)']``. These needs to be evaluated into real tuples, where
`Stackoverflow Question 3945856 <http://stackoverflow.com/questions/3945856/converting-string-to-tuple-and-adding-to-tuple>`_
pointed to `Literal Eval <http://docs.python.org/2/library/ast.html#ast.literal_eval>`_, which is now used.
"""
if has_multiple_pks(model):
model_pk = [getattr(model, pk_name).expression for pk_name in get_primary_key(model)]
ids = [literal_eval(id) for id in ids]
try:
query = modelquery.filter(tuple_(*model_pk).in_(ids))
# Only the execution of the query will tell us, if the tuple_
# operator really works
query.all()
except DBAPIError:
query = modelquery.filter(tuple_operator_in(model_pk, ids))
else:
model_pk = getattr(model, get_primary_key(model))
query = modelquery.filter(model_pk.in_(ids))
return query
...@@ -15,7 +15,7 @@ from flask.ext.admin._backwards import ObsoleteAttr ...@@ -15,7 +15,7 @@ from flask.ext.admin._backwards import ObsoleteAttr
from flask.ext.admin.contrib.sqla import form, filters, tools from flask.ext.admin.contrib.sqla import form, filters, tools
from .typefmt import DEFAULT_FORMATTERS from .typefmt import DEFAULT_FORMATTERS
from .tools import is_inherited_primary_key, get_column_for_current_model from .tools import is_inherited_primary_key, get_column_for_current_model, get_query_for_ids
class ModelView(BaseModelView): class ModelView(BaseModelView):
""" """
...@@ -858,9 +858,8 @@ class ModelView(BaseModelView): ...@@ -858,9 +858,8 @@ class ModelView(BaseModelView):
lazy_gettext('Are you sure you want to delete selected models?')) lazy_gettext('Are you sure you want to delete selected models?'))
def action_delete(self, ids): def action_delete(self, ids):
try: try:
model_pk = getattr(self.model, self._primary_key)
query = self.get_query().filter(model_pk.in_(ids)) query = get_query_for_ids(self.get_query(), self.model, ids)
if self.fast_mass_delete: if self.fast_mass_delete:
count = query.delete(synchronize_session=False) count = query.delete(synchronize_session=False)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment