Robustness improvements:

* refactor error handling through common transaction() method
* maintain consistent index file on disk in the face of errors
* implement locking for multithreaded use
This commit is contained in:
Jarno Seppänen 2014-05-14 22:35:58 +03:00
parent eb35116c65
commit 6018e19158
2 changed files with 162 additions and 199 deletions

358
gpgfs.py
View File

@ -10,6 +10,8 @@ import struct
import time import time
from cStringIO import StringIO from cStringIO import StringIO
import gpgstore import gpgstore
from contextlib import contextmanager
from threading import Lock
magic = 'GPGFS1\n' magic = 'GPGFS1\n'
@ -23,11 +25,15 @@ class Entry:
for k,v in kwargs.iteritems(): for k,v in kwargs.iteritems():
setattr(self, k, v) setattr(self, k, v)
# entry types:
ENT_FILE = 0
ENT_DIR = 1
def read_index(store, path): def read_index(store, path):
if not store.exists(path):
now = time.time()
root = Entry(children={}, nlink=3, size=0,
mode=stat.S_IFDIR | 0755,
mtime=now, ctime=now)
write_index(store, path, root)
log.info('created %s', path)
return root
data = store.get(path) data = store.get(path)
buf = StringIO(data) buf = StringIO(data)
if buf.read(len(magic)) != magic: if buf.read(len(magic)) != magic:
@ -60,13 +66,20 @@ def write_dict(fd, dct):
buf.write('E') buf.write('E')
children.append(val) children.append(val)
elif isinstance(val, (int, long)): elif isinstance(val, (int, long)):
buf.write('I') if val < 2**32:
buf.write(struct.pack('<I', val)) buf.write('I')
buf.write(struct.pack('<I', val))
else:
buf.write('L')
buf.write(struct.pack('<Q', val))
elif isinstance(val, float):
buf.write('F')
buf.write(struct.pack('<d', val))
elif isinstance(val, str): elif isinstance(val, str):
buf.write('S') buf.write('B')
write_atom(buf, val) write_atom(buf, val)
elif isinstance(val, unicode): elif isinstance(val, unicode):
buf.write('U') buf.write('S')
write_atom(buf, val.encode('utf8')) write_atom(buf, val.encode('utf8'))
else: else:
raise TypeError, type(val) raise TypeError, type(val)
@ -85,8 +98,10 @@ def read_dict(fd):
if tag == 'D': val = read_dict(fd) if tag == 'D': val = read_dict(fd)
elif tag == 'E': val = Entry(**read_dict(fd)) elif tag == 'E': val = Entry(**read_dict(fd))
elif tag == 'I': val = struct.unpack('<I', buf.read(4))[0] elif tag == 'I': val = struct.unpack('<I', buf.read(4))[0]
elif tag == 'S': val = read_atom(buf) elif tag == 'L': val = struct.unpack('<Q', buf.read(8))[0]
elif tag == 'U': val = read_atom(buf).decode('utf8') elif tag == 'F': val = struct.unpack('<d', buf.read(8))[0]
elif tag == 'B': val = read_atom(buf)
elif tag == 'S': val = read_atom(buf).decode('utf8')
else: raise TypeError, tag else: raise TypeError, tag
dct[key] = val dct[key] = val
return dct return dct
@ -114,6 +129,9 @@ class LoggingMixIn:
except OSError, e: except OSError, e:
ret = str(e) ret = str(e)
raise raise
except:
log.exception('unhandled error in %s:', op)
raise
finally: finally:
rtxt = repr(ret) rtxt = repr(ret)
if op=='read': if op=='read':
@ -133,15 +151,8 @@ class GpgFs(LoggingMixIn, Operations):
#self.cache = cache #self.cache = cache
self.store = gpgstore.GpgStore(self.encroot, keyid) self.store = gpgstore.GpgStore(self.encroot, keyid)
self.index_path = 'index' self.index_path = 'index'
if os.path.exists(self.encroot + '/' + self.index_path): self.root = read_index(self.store, self.index_path)
self.root = read_index(self.store, self.index_path) self.txlock = Lock()
else:
self.root = Entry(type=ENT_DIR, children={},
st_mode=0755,
st_mtime=int(time.time()),
st_ctime=int(time.time()))
self._write_index()
log.info('created %s', self.index_path)
self.fd = 0 self.fd = 0
self._clear_write_cache() self._clear_write_cache()
@ -171,87 +182,83 @@ class GpgFs(LoggingMixIn, Operations):
self.write_len = 0 self.write_len = 0
self.write_dirty = False self.write_dirty = False
@contextmanager
def transaction(self):
paths = {'old': None, 'new': None}
def putx(data, old_path = None):
paths['new'] = self.store.put(data)
paths['old'] = old_path
return paths['new']
with self.txlock:
try:
yield putx
# commit
write_index(self.store, self.index_path, self.root)
if paths['old']:
self.store.delete(paths['old'])
except:
# rollback
try:
log.warning('starting rollback')
self.root = read_index(self.store, self.index_path)
if paths['new']:
self.store.delete(paths['new'])
log.warning('rollback done')
except:
log.exception('rollback failed')
raise
def chmod(self, path, mode): def chmod(self, path, mode):
# sanitize mode (clear setuid/gid/sticky bits) # sanitize mode (clear setuid/gid/sticky bits)
mode &= 0777 mode &= 0777
ent = self._find(path) with self.transaction():
if ent.type == ENT_DIR: ent = self._find(path)
prev_mode = ent.st_mode ent.mode = mode | (ent.mode & 0170000)
ent.st_mode = mode
try:
self._write_index()
except:
ent.st_mode = prev_mode
raise
else:
encpath = self.encroot + '/' + ent.path
os.chmod(encpath, mode)
def chown(self, path, uid, gid): def chown(self, path, uid, gid):
raise FuseOSError(errno.ENOSYS) raise FuseOSError(errno.ENOSYS)
def create(self, path, mode): def create(self, path, mode):
dir, path = self._find(path, parent=True) mode &= 0777
if path in dir.children: mode |= stat.S_IFREG
raise FuseOSError(errno.EEXIST) with self.transaction() as putx:
# FIXME mode parent, name = self._find(path, parent=True)
encpath = self.store.put('') if name in parent.children:
prev_mtime = dir.st_mtime raise FuseOSError(errno.EEXIST)
dir.children[path] = Entry(type=ENT_FILE, path=encpath, st_size=0) now = time.time()
log.debug('new path %s => %s', path, encpath) encpath = putx('')
dir.st_mtime = int(time.time()) parent.children[name] = Entry(mode=mode, encpath=encpath, size=0,
try: nlink=1, ctime=now, mtime=now)
self._write_index() parent.mtime = now
except: log.debug('new path %s => %s', path, encpath)
try: self.store.delete(encpath) self.fd += 1
except: pass return self.fd
del dir.children[path]
dir.st_mtime = prev_mtime
raise
self.fd += 1
return self.fd
def flush(self, path, fh): def flush(self, path, fh):
if not self.write_dirty: if not self.write_dirty:
log.debug('nothing to flush') log.debug('nothing to flush')
return 0 return 0
buf = ''.join(self.write_buf) with self.transaction() as putx:
self.write_buf = [buf] buf = ''.join(self.write_buf)
ent = self._find(self.write_path) self.write_buf = [buf]
prev_size = ent.st_size ent = self._find(self.write_path)
prev_path = ent.path ent.size = len(buf)
ent.st_size = len(buf) ent.encpath = putx(buf, ent.encpath)
ent.path = self.store.put(buf) self.write_dirty = False
try: log.debug('flushed %d bytes to %s', len(buf), self.write_path)
self._write_index() return 0
except:
self.store.delete(ent.path)
ent.st_size = prev_size
ent.path = prev_path
raise
self.store.delete(prev_path)
self.write_dirty = False
log.debug('flushed %d bytes to %s', len(buf), self.write_path)
return 0
def fsync(self, path, datasync, fh): def fsync(self, path, datasync, fh):
self.flush(path, fh) self.flush(path, fh)
return 0 return 0
def getattr(self, path, fh = None): def getattr(self, path, fh = None):
ent = self._find(path) # don't do full blown transaction
if ent.type == ENT_DIR: with self.txlock:
return dict(st_mode = stat.S_IFDIR | ent.st_mode, ent = self._find(path)
st_size = len(ent.children), return dict(st_mode = ent.mode, st_size = ent.size,
st_ctime = ent.st_ctime, st_mtime = ent.st_mtime, st_ctime = ent.ctime, st_mtime = ent.mtime,
st_atime = 0, st_nlink = 3) st_atime = 0, st_nlink = ent.nlink)
# ensure st_size is up-to-date
self.flush(path, 0)
encpath = self.encroot + '/' + ent.path
s = os.stat(encpath)
return dict(st_mode = s.st_mode, st_size = ent.st_size,
st_atime = s.st_atime, st_mtime = s.st_mtime,
st_ctime = s.st_ctime, st_nlink = s.st_nlink)
def getxattr(self, path, name, position = 0): def getxattr(self, path, name, position = 0):
raise FuseOSError(errno.ENODATA) # ENOATTR raise FuseOSError(errno.ENODATA) # ENOATTR
@ -260,21 +267,16 @@ class GpgFs(LoggingMixIn, Operations):
return [] return []
def mkdir(self, path, mode): def mkdir(self, path, mode):
dir, path = self._find(path, parent=True) mode &= 0777
if path in dir.children: mode |= stat.S_IFDIR
raise FuseOSError(errno.EEXIST) with self.transaction():
prev_mtime = dir.st_mtime parent, name = self._find(path, parent=True)
dir.children[path] = Entry(type=ENT_DIR, children={}, if name in parent.children:
st_mode=(mode & 0777), raise FuseOSError(errno.EEXIST)
st_mtime=int(time.time()), now = time.time()
st_ctime=int(time.time())) parent.children[name] = Entry(children={}, mode=mode, nlink=2,
dir.st_mtime = int(time.time()) size=0, mtime=now, ctime=now)
try: parent.mtime = now
self._write_index()
except:
del dir.children[path]
dir.st_mtime = prev_mtime
raise
def open(self, path, flags): def open(self, path, flags):
return 0 return 0
@ -282,13 +284,13 @@ class GpgFs(LoggingMixIn, Operations):
def read(self, path, size, offset, fh): def read(self, path, size, offset, fh):
self.flush(path, 0) self.flush(path, 0)
ent = self._find(path) ent = self._find(path)
assert ent.type == ENT_FILE assert ent.mode & stat.S_IFREG
data = self.store.get(ent.path) data = self.store.get(ent.encpath)
return data[offset:offset + size] return data[offset:offset + size]
def readdir(self, path, fh): def readdir(self, path, fh):
dir = self._find(path) dirent = self._find(path)
return ['.', '..'] + list(dir.children) return ['.', '..'] + list(dirent.children)
def readlink(self, path): def readlink(self, path):
raise FuseOSError(errno.ENOSYS) raise FuseOSError(errno.ENOSYS)
@ -301,53 +303,38 @@ class GpgFs(LoggingMixIn, Operations):
self._clear_write_cache() self._clear_write_cache()
if new.startswith(old): if new.startswith(old):
raise FuseOSError(errno.EINVAL) raise FuseOSError(errno.EINVAL)
old_dir, old_name = self._find(old, parent=True) with self.transaction():
if old_name not in old_dir.children: old_dir, old_name = self._find(old, parent=True)
raise FuseOSError(errno.ENOENT) if old_name not in old_dir.children:
new_dir, new_name = self._find(new, parent=True) raise FuseOSError(errno.ENOENT)
prev_ent = new_dir.children.get(new_name) new_dir, new_name = self._find(new, parent=True)
if prev_ent: old_ent = old_dir.children[old_name]
if prev_ent.type == ENT_DIR: new_ent = new_dir.children.get(new_name)
if old_dir[old_name].type != ENT_DIR: if new_ent:
raise FuseOSError(errno.EISDIR) if new_ent.mode & stat.S_IFDIR:
if prev_ent.children: if not old_ent.mode & stat.S_IFDIR:
raise FuseOSError(errno.ENOTEMPTY) raise FuseOSError(errno.EISDIR)
elif old_dir[old_name].type == ENT_DIR: if new_ent.children:
raise FuseOSError(errno.ENOTDIR) raise FuseOSError(errno.ENOTEMPTY)
prev_old_mtime = old_dir.st_mtime elif old_ent.mode & stat.S_IFDIR:
prev_new_mtime = new_dir.st_mtime raise FuseOSError(errno.ENOTDIR)
new_dir.children[new_name] = old_dir.children.pop(old_name) new_dir.children[new_name] = old_dir.children.pop(old_name)
old_dir.st_mtime = new_dir.st_mtime = int(time.time()) old_dir.mtime = new_dir.mtime = time.time()
try: if new_ent != None and new_ent.mode & stat.S_IFREG:
self._write_index() self.store.delete(new_ent.encpath)
except:
old_dir.children[old_name] = new_dir.children.pop(new_name)
if prev_ent:
new_dir.children[new_name] = prev_ent
old_dir.st_mtime = prev_old_mtime
new_dir.st_mtime = prev_new_mtime
raise
if prev_ent and prev_ent.type == ENT_FILE:
os.remove(self.encroot + '/' + prev_ent.path)
def rmdir(self, path): def rmdir(self, path):
parent, path = self._find(path, parent=True) with self.transaction():
if path not in parent.children: parent, name = self._find(path, parent=True)
raise FuseOSError(errno.ENOENT) if name not in parent.children:
ent = parent.children[path] raise FuseOSError(errno.ENOENT)
if ent.type != ENT_DIR: ent = parent.children[name]
raise FuseOSError(errno.ENOTDIR) if not ent.mode & stat.S_IFDIR:
if ent.children: raise FuseOSError(errno.ENOTDIR)
raise FuseOSError(errno.ENOTEMPTY) if ent.children:
prev_mtime = parent.st_mtime raise FuseOSError(errno.ENOTEMPTY)
del parent.children[path] del parent.children[name]
parent.st_mtime = int(time.time()) parent.mtime = time.time()
try:
self._write_index()
except:
parent.children[path] = ent
parent.st_mtime = prev_mtime
raise
def setxattr(self, path, name, value, options, position = 0): def setxattr(self, path, name, value, options, position = 0):
raise FuseOSError(errno.ENOSYS) raise FuseOSError(errno.ENOSYS)
@ -361,69 +348,42 @@ class GpgFs(LoggingMixIn, Operations):
def truncate(self, path, length, fh = None): def truncate(self, path, length, fh = None):
self.flush(path, 0) self.flush(path, 0)
self._clear_write_cache() self._clear_write_cache()
ent = self._find(path) with self.transaction() as putx:
if length == 0: ent = self._find(path)
buf = '' if length == 0:
else: buf = ''
buf = self.store.get(ent.path) else:
buf = buf[:length] buf = self.store.get(ent.encpath)
prev_size = ent.st_size buf = buf[:length]
prev_path = ent.path ent.encpath = putx(buf, ent.encpath)
ent.st_size = length ent.size = length
ent.path = self.store.put(buf)
try:
self._write_index()
except:
os.remove(ent.path)
ent.st_size = prev_size
ent.path = prev_path
raise
self.store.delete(prev_path)
def unlink(self, path): def unlink(self, path):
if self.write_path == path: with self.transaction():
# no need to flush afterwards if self.write_path == path:
self._clear_write_cache() # no need to flush afterwards
dir, name = self._find(path, parent=True) self._clear_write_cache()
if name not in dir.children: parent, name = self._find(path, parent=True)
raise FuseOSError(errno.ENOENT) if name not in parent.children:
ent = dir.children[name] raise FuseOSError(errno.ENOENT)
encpath = self.encroot + '/' + ent.path ent = parent.children.pop(name)
del dir.children[name] parent.mtime = time.time()
prev_mtime = dir.st_mtime self.store.delete(ent.encpath)
dir.st_mtime = int(time.time())
try:
self._write_index()
except:
dir.children[name] = ent
dir.st_mtime = prev_mtime
raise
os.remove(encpath)
def utimens(self, path, times = None): def utimens(self, path, times = None):
ent = self._find(path) if times is None:
if ent.type == ENT_DIR: mtime = time.time()
prev_mtime = ent.st_mtime
if times is None:
ent.st_mtime = int(time.time())
else:
ent.st_mtime = times[1]
try:
self._write_index()
except:
ent.st_mtime = prev_mtime
raise
else: else:
# flush may mess with mtime mtime = times[1]
self.flush(path, 0) with self.transaction():
encpath = self.encroot + '/' + ent.path ent = self._find(path)
os.utime(encpath, times) ent.mtime = mtime
def write(self, path, data, offset, fh): def write(self, path, data, offset, fh):
ent = self._find(path)
if path != self.write_path: if path != self.write_path:
self.flush(self.write_path, None) self.flush(self.write_path, None)
buf = self.store.get(ent.path) ent = self._find(path)
buf = self.store.get(ent.encpath)
self.write_buf = [buf] self.write_buf = [buf]
self.write_len = len(buf) self.write_len = len(buf)
self.write_path = path self.write_path = path

View File

@ -60,3 +60,6 @@ class GpgStore(object):
def delete(self, path): def delete(self, path):
os.remove(self.encroot + '/' + path) os.remove(self.encroot + '/' + path)
log.debug('deleted %s' % path) log.debug('deleted %s' % path)
def exists(self, path):
return os.path.exists(self.encroot + '/' + path)