Commit 959e866b authored by Serge S. Koval's avatar Serge S. Koval

Basic FileUploadField for models

parent d6447e8b
......@@ -9,6 +9,7 @@ pyenv
build
source/_static*
source/_templates*
flask_admin/tests/tmp
dist/*
make.bat
venv
......
......@@ -3,6 +3,7 @@ from wtforms.fields.core import UnboundField
from .fields import *
from .widgets import *
from .upload import *
class BaseForm(form.Form):
......
import os
import os.path as op
from werkzeug import secure_filename
from werkzeug.datastructures import FileStorage
from jinja2 import escape
from wtforms import ValidationError, fields
from wtforms.widgets import HTMLString, html_params
from wtforms.fields.core import _unset_value
from flask.ext.admin.babel import gettext
__all__ = ['FileUploadInput', 'FileUploadField', 'namefn_keep_filename']
# Widgets
class FileUploadInput(object):
"""
Renders a file input chooser field.
"""
template = ('<input %(text)s><input %(file)s>')
def __call__(self, field, **kwargs):
kwargs.setdefault('id', field.id)
return HTMLString(self.template % {
'text': html_params(type='text',
value=kwargs.get('value')),
'file': html_params(type='file',
**kwargs)
})
# Fields
class FileUploadField(fields.TextField):
"""
Customizable file-upload field
"""
widget = FileUploadInput()
def __init__(self, label=None, validators=None,
path=None, namefn=None, endpoint='static', allowed_extensions=None,
**kwargs):
if not path:
raise ValueError('FileUploadField field requires target path.')
self.path = path
self.namefn = namefn or namefn_keep_filename
self.endpoint = endpoint
self.allowed_extensions = allowed_extensions
self._should_delete = False
super(FileUploadField, self).__init__(label, validators, **kwargs)
def is_file_allowed(self, filename):
if not self.allowed_extensions:
return True
return ('.' in filename and
filename.rsplit('.', 1)[1] in self.allowed_extensions)
def pre_validate(self, form):
if isinstance(self.data, FileStorage) and not self.is_file_allowed(self.data.filename):
raise ValidationError(gettext('Invalid file extension'))
def process(self, formdata, data=_unset_value):
if formdata:
marker = '_%s-delete' % self.name
if marker in formdata:
self._should_delete = True
return super(FileUploadField, self).process(formdata, data)
def populate_obj(self, obj, name):
field = getattr(obj, name, None)
if field:
# If field should be deleted, clean it up
if self._should_delete:
self._delete_file(field)
return
if isinstance(self.data, FileStorage):
if field:
self._delete_file(field)
filename = self.namefn(obj, self.data)
self._save_file(self.data, filename)
setattr(obj, name, filename)
def _delete_file(self, filename):
path = op.join(self.path, filename)
os.remove(path)
def _save_file(self, data, filename):
data.save(op.join(self.path, filename))
# Helpers
def namefn_keep_filename(obj, file_data):
return secure_filename(file_data.filename)
import os
import os.path as op
from StringIO import StringIO
from nose.tools import eq_, ok_
from flask import Flask
from flask.ext.admin import form, helpers
def _create_temp():
path = op.join(op.dirname(__file__), 'tmp')
if not op.exists(path):
os.mkdir(path)
return path
def _remove_testfiles(path):
try:
os.remove(op.join(path, 'test1.txt'))
os.remove(op.join(path, 'test2.txt'))
except:
pass
def test_upload_field():
app = Flask(__name__)
path = _create_temp()
class TestForm(form.BaseForm):
upload = form.FileUploadField('Upload', path=path)
class Dummy(object):
pass
my_form = TestForm()
eq_(my_form.upload.path, path)
_remove_testfiles(path)
dummy = Dummy()
# Check upload
with app.test_request_context(method='POST', data={'upload': (StringIO('Hello World'), 'test1.txt')}):
my_form = TestForm(helpers.get_form_data())
ok_(my_form.validate())
my_form.populate_obj(dummy)
eq_(dummy.upload, 'test1.txt')
ok_(op.exists(op.join(path, 'test1.txt')))
# Check replace
with app.test_request_context(method='POST', data={'upload': (StringIO('Hello World'), 'test2.txt')}):
my_form = TestForm(helpers.get_form_data())
ok_(my_form.validate())
my_form.populate_obj(dummy)
eq_(dummy.upload, 'test2.txt')
ok_(not op.exists(op.join(path, 'test1.txt')))
ok_(op.exists(op.join(path, 'test2.txt')))
# Check delete
with app.test_request_context(method='POST', data={'_upload-delete': 'checked'}):
my_form = TestForm(helpers.get_form_data())
ok_(my_form.validate())
my_form.populate_obj(dummy)
ok_(not op.exists(op.join(path, 'test2.txt')))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment