Commit 1a4669df authored by Arthur Bressan's avatar Arthur Bressan

Implements a File Admin storage object to allow implementations for differents file back-ends

parent 7445f6a4
from datetime import datetime
import os
import os.path as op
import platform
import re
import shutil
from datetime import datetime
from operator import itemgetter
from werkzeug import secure_filename
from flask import flash, redirect, abort, request, send_file
......@@ -19,6 +19,102 @@ from flask_admin.actions import action, ActionsMixin
from flask_admin.babel import gettext, lazy_gettext
class LocalFileStorage(object):
def __init__(self, base_path):
self.base_path = as_unicode(base_path)
self.separator = os.sep
if not self.path_exists(self.base_path):
raise IOError('FileAdmin path "%s" does not exist or is not accessible' % self.base_path)
def get_base_path(self):
"""
Return base path. Override to customize behavior (per-user
directories, etc)
"""
return op.normpath(self.base_path)
def make_dir(self, path, directory):
"""
Creates a directory `directory` under the `path`
"""
os.mkdir(op.join(path, directory))
def get_files(self, path, directory):
"""
Gets a list of tuples representing the files in the `directory`
under the `path`
:param path:
The path up to the directory
:param directory:
The directory that will have its files listed
Each tuple represents a file and it should contain the file name,
the relative path, a flag signifying if it is a directory, the file
size in bytes and the time last modified in seconds since the epoch
"""
items = []
for f in os.listdir(directory):
fp = op.join(directory, f)
rel_path = op.join(path, f)
is_dir = self.is_dir(fp)
size = op.getsize(fp)
last_modified = op.getmtime(fp)
items.append((f, rel_path, is_dir, size, last_modified))
return items
def delete_tree(self, directory):
"""
Deletes the directory `directory` and all its files and subdirectories
"""
shutil.rmtree(directory)
def delete_file(self, file_path):
"""
Deletes the file located at `file_path`
"""
os.remove(file_path)
def path_exists(self, path):
"""
Check if `path` exists
"""
return op.exists(path)
def rename_path(self, src, dst):
"""
Renames `src` to `dst`
"""
os.rename(src, dst)
def is_dir(self, path):
"""
Check if `path` is a directory
"""
return op.isdir(path)
def send_file(self, file_path):
"""
Sends the file located at `file_path` to the user
"""
return send_file(file_path)
def save_file(self, path, file_data):
"""
Save uploaded file to the disk
:param path:
Path to save to
:param file_data:
Werkzeug `FileStorage` object
"""
file_data.save(path)
class FileAdmin(BaseView, ActionsMixin):
"""
Simple file-management interface.
......@@ -171,7 +267,8 @@ class FileAdmin(BaseView, ActionsMixin):
def __init__(self, base_path, base_url=None,
name=None, category=None, endpoint=None, url=None,
verify_path=True, menu_class_name=None, menu_icon_type=None, menu_icon_value=None):
verify_path=True, menu_class_name=None, menu_icon_type=None, menu_icon_value=None,
storage=LocalFileStorage, storage_args=None):
"""
Constructor.
......@@ -191,8 +288,12 @@ class FileAdmin(BaseView, ActionsMixin):
Verify if path exists. If set to `True` and path does not exist
will raise an exception.
"""
self.base_path = as_unicode(base_path)
if storage_args is None:
storage_args = {}
storage_args.setdefault('base_path', base_path)
self.base_url = base_url
self.storage = storage(**storage_args)
self.init_actions()
......@@ -208,10 +309,6 @@ class FileAdmin(BaseView, ActionsMixin):
not isinstance(self.editable_extensions, set)):
self.editable_extensions = set(self.editable_extensions)
# Check if path exists
if not op.exists(base_path):
raise IOError('FileAdmin path "%s" does not exist or is not accessible' % base_path)
super(FileAdmin, self).__init__(name, category, endpoint, url,
menu_class_name=menu_class_name, menu_icon_type=menu_icon_type,
menu_icon_value=menu_icon_value)
......@@ -232,7 +329,7 @@ class FileAdmin(BaseView, ActionsMixin):
Return base path. Override to customize behavior (per-user
directories, etc)
"""
return op.normpath(self.base_path)
return self.storage.get_base_path()
def get_base_url(self):
"""
......@@ -425,14 +522,14 @@ class FileAdmin(BaseView, ActionsMixin):
def save_file(self, path, file_data):
"""
Save uploaded file to the disk
Save uploaded file to the storage
:param path:
Path to save to
:param file_data:
Werkzeug `FileStorage` object
"""
file_data.save(path)
self.storage.save_file(path, file_data)
def validate_form(self, form):
"""
......@@ -493,12 +590,12 @@ class FileAdmin(BaseView, ActionsMixin):
path = ''
else:
path = op.normpath(path)
directory = op.normpath(op.join(base_path, path))
directory = op.normpath(self._separator.join([base_path, path]))
if not self.is_in_folder(base_path, directory):
abort(404)
if not op.exists(directory):
if not self.storage.path_exists(directory):
abort(404)
return base_path, directory, path
......@@ -592,17 +689,32 @@ class FileAdmin(BaseView, ActionsMixin):
pass
def _save_form_files(self, directory, path, form):
filename = op.join(directory,
secure_filename(form.upload.data.filename))
filename = self._separator.join([directory, secure_filename(form.upload.data.filename)])
if op.exists(filename):
secure_name = op.join(path, secure_filename(form.upload.data.filename))
if self.storage.path_exists(filename):
secure_name = self._separator.join([path, secure_filename(form.upload.data.filename)])
raise Exception(gettext('File "%(name)s" already exists.',
name=secure_name))
else:
self.save_file(filename, form.upload.data)
self.on_file_upload(directory, path, filename)
@property
def _separator(self):
return self.storage.separator
def _get_breadcrumbs(self, path):
"""
Returns a list of tuples with each tuple containing the folder and
the tree up to that folder when traversing down the `path`
"""
accumulator = []
breadcrumbs = []
for n in path.split(self._separator):
accumulator.append(n)
breadcrumbs.append((n, self._separator.join(accumulator)))
return breadcrumbs
@expose('/')
@expose('/b/<path:path>')
def index(self, path=None):
......@@ -619,7 +731,6 @@ class FileAdmin(BaseView, ActionsMixin):
# Get path and verify if it is valid
base_path, directory, path = self._normalize_path(path)
if not self.is_accessible_path(path):
flash(gettext('Permission denied.'), 'error')
return redirect(self._get_dir_url('.index'))
......@@ -629,18 +740,16 @@ class FileAdmin(BaseView, ActionsMixin):
# Parent directory
if directory != base_path:
parent_path = op.normpath(op.join(path, '..'))
parent_path = op.normpath(self._separator.join([path, '..']))
if parent_path == '.':
parent_path = None
items.append(('..', parent_path, True, 0, 0))
for f in os.listdir(directory):
fp = op.join(directory, f)
rel_path = op.join(path, f)
for item in self.storage.get_files(path, directory):
file_name, rel_path, is_dir, size, last_modified = item
if self.is_accessible_path(rel_path):
items.append((f, rel_path, op.isdir(fp), op.getsize(fp), op.getmtime(fp)))
items.append(item)
# Sort by name
items.sort(key=itemgetter(0))
......@@ -652,11 +761,7 @@ class FileAdmin(BaseView, ActionsMixin):
items.sort(key=lambda values: (values[0], values[1], values[2], values[3], datetime.fromtimestamp(values[4])), reverse=True)
# Generate breadcrumbs
accumulator = []
breadcrumbs = []
for n in path.split(os.sep):
accumulator.append(n)
breadcrumbs.append((n, op.join(*accumulator)))
breadcrumbs = self._get_breadcrumbs(path)
# Actions
actions, actions_confirmation = self.get_actions_list()
......@@ -729,7 +834,7 @@ class FileAdmin(BaseView, ActionsMixin):
base_url = urljoin(self.get_url('.index'), base_url)
return redirect(urljoin(base_url, path))
return send_file(directory)
return self.storage.send_file(directory)
@expose('/mkdir/', methods=('GET', 'POST'))
@expose('/mkdir/<path:path>', methods=('GET', 'POST'))
......@@ -757,7 +862,7 @@ class FileAdmin(BaseView, ActionsMixin):
if self.validate_form(form):
try:
os.mkdir(op.join(directory, form.name.data))
self.storage.make_dir(directory, form.name.data)
self.on_mkdir(directory, form.name.data)
flash(gettext('Successfully created directory: %(directory)s',
directory=form.name.data))
......@@ -775,6 +880,12 @@ class FileAdmin(BaseView, ActionsMixin):
return self.render(template, form=form, dir_url=dir_url,
header_text=gettext('Create Directory'))
def delete_file(self, file_path):
"""
Deletes the file located at `file_path`
"""
self.storage.delete_file(file_path)
@expose('/delete/', methods=('POST',))
def delete(self):
"""
......@@ -800,14 +911,13 @@ class FileAdmin(BaseView, ActionsMixin):
flash(gettext('Permission denied.'), 'error')
return redirect(self._get_dir_url('.index'))
if op.isdir(full_path):
if self.storage.is_dir(full_path):
if not self.can_delete_dirs:
flash(gettext('Directory deletion is disabled.'), 'error')
return redirect(return_url)
try:
self.before_directory_delete(full_path, path)
shutil.rmtree(full_path)
self.storage.delete_tree(full_path)
self.on_directory_delete(full_path, path)
flash(gettext('Directory "%(path)s" was successfully deleted.', path=path))
except Exception as ex:
......@@ -815,7 +925,7 @@ class FileAdmin(BaseView, ActionsMixin):
else:
try:
self.before_file_delete(full_path, path)
os.remove(full_path)
self.delete_file(full_path)
self.on_file_delete(full_path, path)
flash(gettext('File "%(name)s" was successfully deleted.', name=path))
except Exception as ex:
......@@ -848,7 +958,7 @@ class FileAdmin(BaseView, ActionsMixin):
flash(gettext('Permission denied.'), 'error')
return redirect(self._get_dir_url('.index'))
if not op.exists(full_path):
if not self.storage.path_exists(full_path):
flash(gettext('Path does not exist.'), 'error')
return redirect(return_url)
......@@ -856,8 +966,7 @@ class FileAdmin(BaseView, ActionsMixin):
try:
dir_base = op.dirname(full_path)
filename = secure_filename(form.name.data)
os.rename(full_path, op.join(dir_base, filename))
self.storage.rename_path(full_path, self._separator.join([dir_base, filename]))
self.on_rename(full_path, dir_base, filename)
flash(gettext('Successfully renamed "%(src)s" to "%(dst)s"',
src=op.basename(path),
......@@ -901,7 +1010,7 @@ class FileAdmin(BaseView, ActionsMixin):
flash(gettext('Permission denied.'), 'error')
return redirect(self._get_dir_url('.index'))
dir_url = self._get_dir_url('.index', os.path.dirname(path))
dir_url = self._get_dir_url('.index', op.dirname(path))
next_url = next_url or dir_url
form = self.edit_form()
......@@ -974,7 +1083,7 @@ class FileAdmin(BaseView, ActionsMixin):
if self.is_accessible_path(path):
try:
os.remove(full_path)
self.delete_file(full_path)
flash(gettext('File "%(name)s" was successfully deleted.', name=path))
except Exception as ex:
flash(gettext('Failed to delete file: %(name)s', name=ex), 'error')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment