Source code for flask_fs.backends.swift
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import io
import logging
from contextlib import contextmanager
from dateutil import parser
import swiftclient
from . import BaseBackend
log = logging.getLogger(__name__)
[docs]class SwiftBackend(BaseBackend):
'''
An OpenStack Swift backend
Expect the following settings:
- `authurl`: The Swift Auth URL
- `user`: The Swift user in
- `key`: The user API Key
'''
def __init__(self, name, config):
super(SwiftBackend, self).__init__(name, config)
self.conn = swiftclient.Connection(
user=config.user,
key=config.key,
authurl=config.authurl
)
self.conn.put_container(self.name)
def exists(self, filename):
try:
self.conn.head_object(self.name, filename)
return True
except swiftclient.ClientException:
return False
@contextmanager
def open(self, filename, mode='r', encoding='utf8'):
if 'r' in mode:
obj = self.read(filename)
yield io.BytesIO(obj) if 'b' in mode else io.StringIO(obj.decode(encoding))
else: # mode == 'w'
f = io.BytesIO() if 'b' in mode else io.StringIO()
yield f
self.write(filename, f.getvalue())
def read(self, filename):
_, data = self.conn.get_object(self.name, filename)
return data
def write(self, filename, content):
self.conn.put_object(self.name, filename, contents=content)
def delete(self, filename):
if self.exists(filename):
self.conn.delete_object(self.name, filename)
else:
headers, items = self.conn.get_container(self.name, path=filename)
for i in items:
self.conn.delete_object(self.name, i['name'])
def copy(self, filename, target):
dest = '/'.join((self.name, target))
self.conn.copy_object(self.name, filename, destination=dest)
def list_files(self):
headers, items = self.conn.get_container(self.name)
for i in items:
yield i['name']
def get_metadata(self, filename):
data = self.conn.head_object(self.name, filename)
return {
'checksum': 'md5:{0}'.format(data['etag']),
'size': int(data['content-length']),
'mime': data['content-type'],
'modified': parser.parse(data['last-modified']),
}