Source code for flask_fs.mongo

# -*- coding: utf-8 -*-
from __future__ import unicode_literals

import bisect
import io
import logging
import six

from os.path import splitext

from flask import current_app
from mongoengine.fields import BaseField
from werkzeug.datastructures import FileStorage

from .files import extension
from .images import make_thumbnail, resize, optimize

log = logging.getLogger(__name__)


[docs]class FileReference(object): '''Implements the FileField interface''' def __init__(self, fs=None, filename=None, upload_to=None, basename=None, instance=None, name=None): self.fs = fs self.upload_to = upload_to self._filename = filename self.basename = basename self._instance = instance self._name = name def to_mongo(self): return { 'filename': self.filename }
[docs] def save(self, wfs, filename=None): '''Save a Werkzeug FileStorage object''' if self.basename and not filename: ext = extension(filename or wfs.filename) filename = '.'.join([self.basename(self._instance), ext]) prefix = self.upload_to(self._instance) if callable(self.upload_to) else self.upload_to self.filename = self.fs.save(wfs, filename, prefix=prefix)
return self.filename @property def filename(self): return self._filename @filename.setter def filename(self, value): self._mark_as_changed() self._filename = value @property def url(self): if self.filename: return self.fs.url(self.filename) def __unicode__(self): return self.url or '' __str__ = __unicode__ def __nonzero__(self): return bool(self.filename) __bool__ = __nonzero__ def _mark_as_changed(self): if hasattr(self._instance, '_mark_as_changed'):
self._instance._mark_as_changed(self._name)
[docs]class ImageReference(FileReference): '''Implements the ImageField interface''' def __init__(self, original=None, max_size=None, thumbnail_sizes=None, thumbnails=None, bbox=None, optimize=None, **kwargs): super(ImageReference, self).__init__(**kwargs) self._original = original self.max_size = max_size self.thumbnails = thumbnails or {} self.bbox = bbox self.optimize = optimize self.thumbnail_sizes = thumbnail_sizes def to_mongo(self): data = super(ImageReference, self).to_mongo() if self._original: data['original'] = self._original if self.thumbnails: data['thumbnails'] = self.thumbnails if self.bbox: data['bbox'] = self.bbox return data
[docs] def save(self, file_or_wfs, filename=None, bbox=None, overwrite=None): '''Save a Werkzeug FileStorage object''' self._mark_as_changed() override = filename is not None filename = filename or getattr(file_or_wfs, 'filename') if self.basename and not override: basename = self.basename(self._instance) elif filename: basename = splitext(filename)[0] else: raise ValueError('Filename is required') ext = extension(filename) prefix = self.upload_to(self._instance) if callable(self.upload_to) else self.upload_to kwargs = {'prefix': prefix, 'overwrite': overwrite} if self.optimize is not None: should_optimize = self.optimize else: should_optimize = current_app.config['FS_IMAGES_OPTIMIZE'] def name(size=None): if size: return '.'.join(['-'.join([basename, str(size)]), ext]) else: return '.'.join([basename, ext]) if self.max_size: resized = resize(file_or_wfs, self.max_size) file_or_wfs.seek(0) if resized: self.original = self.fs.save(file_or_wfs, name('original'), **kwargs) self.filename = self.fs.save(resized, name(), **kwargs) else: self.filename = self.fs.save(file_or_wfs, name(), **kwargs) elif should_optimize: optimized = optimize(file_or_wfs) file_or_wfs.seek(0) self.original = self.fs.save(file_or_wfs, name('original'), **kwargs) self.filename = self.fs.save(optimized, name(), **kwargs) else: self.filename = self.fs.save(file_or_wfs, name(), **kwargs) if self.thumbnail_sizes: self.bbox = bbox for size in self.thumbnail_sizes: file_or_wfs.seek(0) thumbnail = make_thumbnail(file_or_wfs, size, self.bbox) self.thumbnails[str(size)] = self.fs.save(FileStorage(thumbnail), name(size), **kwargs)
return self.filename @property def original(self): return self._original or self.filename @original.setter def original(self, value): self._mark_as_changed() self._original = value
[docs] def thumbnail(self, size): '''Get the thumbnail filename for a given size''' if size in self.thumbnail_sizes: return self.thumbnails.get(str(size)) else:
raise ValueError('Unregistered thumbnail size {0}'.format(size))
[docs] def full(self, external=False): '''Get the full image URL in respect with ``max_size``'''
return self.fs.url(self.filename, external=external) if self.filename else None
[docs] def best_url(self, size=None, external=False): ''' Provide the best thumbnail for downscaling. If there is no match, provide the bigger if exists or the original ''' if not self.thumbnail_sizes: return self.url elif not size: self.thumbnail_sizes.sort() best_size = self.thumbnail_sizes[-1] else: self.thumbnail_sizes.sort() index = bisect.bisect_left(self.thumbnail_sizes, size) if index >= len(self.thumbnail_sizes): best_size = self.thumbnail_sizes[-1] else: best_size = self.thumbnail_sizes[index] filename = self.thumbnail(best_size)
return self.fs.url(filename, external=external) if filename else None
[docs] def rerender(self): ''' Rerender all derived images from the original. If optmization settings or expected sizes changed, they will be used for the new rendering. ''' with self.fs.open(self.original, 'rb') as f_img: img = io.BytesIO(f_img.read()) # Store the image in memory to avoid overwritting
self.save(img, filename=self.filename, bbox=self.bbox, overwrite=True)
__call__ = best_url
[docs]class FileField(BaseField): ''' Store reference to files in a given storage. ''' proxy_class = FileReference def __init__(self, fs=None, upload_to=None, basename=None, *args, **kwargs): self.fs = fs self.upload_to = upload_to self.basename = basename super(FileField, self).__init__(*args, **kwargs) def proxy(self, filename=None, instance=None, **kwargs): return self.proxy_class( fs=self.fs, filename=filename, upload_to=self.upload_to, basename=self.basename, instance=instance, name=self.name, **kwargs )
[docs] def to_python(self, value): if not isinstance(value, self.proxy_class): if isinstance(value, dict): value = self.proxy(**value) elif isinstance(value, six.text_type): value = self.proxy(filename=value)
return value def __set__(self, instance, value): if not isinstance(value, self.proxy_class): value = self.proxy(filename=value, instance=instance) return super(FileField, self).__set__(instance, value) def __get__(self, instance, owner): if instance is None: return self fileref = instance._data.get(self.name) if not isinstance(fileref, self.proxy_class): fileref = self.proxy(filename=fileref, instance=instance) instance._data[self.name] = fileref elif fileref._instance is None: fileref._instance = instance return fileref
[docs] def to_mongo(self, value): if not value: return None
return value.to_mongo()
[docs]class ImageField(FileField): ''' Store reference to images in a given Storage. Allow to automatically generate thumbnails or resized image. Original image always stay untouched. ''' proxy_class = ImageReference def __init__(self, max_size=None, thumbnails=None, optimize=None, *args, **kwargs): self.max_size = max_size self.thumbnail_sizes = thumbnails self.optimize = optimize super(ImageField, self).__init__(*args, **kwargs) def proxy(self, **kwargs): return super(ImageField, self).proxy(max_size=self.max_size, thumbnail_sizes=self.thumbnail_sizes, optimize=self.optimize,
**kwargs)