Store metadata in an sqlite database

This commit is contained in:
Jarno Seppänen 2014-05-07 22:59:54 +03:00
parent eb35116c65
commit 02add3c977
2 changed files with 225 additions and 283 deletions

463
gpgfs.py
View File

@ -6,12 +6,10 @@ import stat
import os import os
import sys import sys
import logging import logging
import struct
import time import time
from cStringIO import StringIO
import gpgstore import gpgstore
import sqlite3
magic = 'GPGFS1\n' from contextlib import contextmanager
log = logging.getLogger('gpgfs') log = logging.getLogger('gpgfs')
@ -23,82 +21,6 @@ class Entry:
for k,v in kwargs.iteritems(): for k,v in kwargs.iteritems():
setattr(self, k, v) setattr(self, k, v)
# entry types:
ENT_FILE = 0
ENT_DIR = 1
def read_index(store, path):
data = store.get(path)
buf = StringIO(data)
if buf.read(len(magic)) != magic:
raise IOError, 'index parse error: %s' % path
read_atom(buf)
root = Entry(**read_dict(buf))
return root
def write_index(store, path, root):
buf = StringIO()
buf.write(magic)
header = ''
write_atom(buf, header)
write_dict(buf, root)
store.put(buf.getvalue(), path=path)
def write_dict(fd, dct):
# breadth-first
children = []
buf = StringIO()
if not isinstance(dct, dict):
dct = dct.__dict__
for key in dct:
write_atom(buf, key.encode('utf8'))
val = dct[key]
if isinstance(val, dict):
buf.write('D')
children.append(val)
elif isinstance(val, Entry):
buf.write('E')
children.append(val)
elif isinstance(val, (int, long)):
buf.write('I')
buf.write(struct.pack('<I', val))
elif isinstance(val, str):
buf.write('S')
write_atom(buf, val)
elif isinstance(val, unicode):
buf.write('U')
write_atom(buf, val.encode('utf8'))
else:
raise TypeError, type(val)
write_atom(fd, buf.getvalue())
for c in children:
write_dict(fd, c)
def read_dict(fd):
dct = {}
buf = read_atom(fd)
buflen = len(buf)
buf = StringIO(buf)
while buf.tell() < buflen:
key = read_atom(buf).decode('utf8')
tag = buf.read(1)
if tag == 'D': val = read_dict(fd)
elif tag == 'E': val = Entry(**read_dict(fd))
elif tag == 'I': val = struct.unpack('<I', buf.read(4))[0]
elif tag == 'S': val = read_atom(buf)
elif tag == 'U': val = read_atom(buf).decode('utf8')
else: raise TypeError, tag
dct[key] = val
return dct
def write_atom(fd, atom):
assert isinstance(atom, str)
fd.write(struct.pack('<I', len(atom)))
fd.write(atom)
def read_atom(fd):
return fd.read(struct.unpack('<I', fd.read(4))[0])
class LoggingMixIn: class LoggingMixIn:
def __call__(self, op, path, *args): def __call__(self, op, path, *args):
@ -120,50 +42,80 @@ class LoggingMixIn:
rtxt = rtxt[:10] rtxt = rtxt[:10]
log.debug('<- %s %s', op, rtxt) log.debug('<- %s %s', op, rtxt)
@contextmanager
def transaction(cur, active=True):
if not active:
yield
return
cur.execute('BEGIN EXCLUSIVE')
try:
yield
except:
cur.execute('ROLLBACK')
raise
try:
cur.execute('COMMIT')
except sqlite3.OperationalError:
log.exception("transaction failed")
raise FuseOSError(errno.EIO)
class GpgFs(LoggingMixIn, Operations): class GpgFs(LoggingMixIn, Operations):
#class GpgFs(Operations): #class GpgFs(Operations):
def __init__(self, encroot, keyid): def __init__(self, encroot, mountpoint, keyid):
''' '''
:param encroot: Encrypted root directory :param encroot: Encrypted root directory
''' '''
self.encroot = encroot.rstrip('/') self.encroot = encroot.rstrip('/')
assert os.path.exists(self.encroot) assert os.path.exists(self.encroot)
assert os.path.isdir(self.encroot) assert os.path.isdir(self.encroot)
#self.cache = cache
self.store = gpgstore.GpgStore(self.encroot, keyid) self.store = gpgstore.GpgStore(self.encroot, keyid)
self.index_path = 'index' self.index_path = 'index'
if os.path.exists(self.encroot + '/' + self.index_path): self.dbpath = '.gpgfs.db'
self.root = read_index(self.store, self.index_path) self.mountpoint = mountpoint
else:
self.root = Entry(type=ENT_DIR, children={},
st_mode=0755,
st_mtime=int(time.time()),
st_ctime=int(time.time()))
self._write_index()
log.info('created %s', self.index_path)
self.fd = 0 self.fd = 0
self._clear_write_cache() self._clear_write_cache()
def _write_index(self): def _find(self, path, parent=False, **kwargs):
write_index(self.store, self.index_path, self.root)
def _find(self, path, parent=False):
assert path.startswith('/') assert path.startswith('/')
if path == '/': names = path[1:].split('/')
return self.root
node = self.root
path = path[1:].split('/')
if parent: if parent:
basename = path[-1] basename = names[-1]
path = path[:-1] path = names[:-1]
for name in path: sql = 'JOIN entry e{i} ON e{i}.parent_id=e{j}.id AND e{i}.name=?'
if name not in node.children: joins = [{i:i+1, j:i} for i in range(len(names))]
joins = '\n'.join(sql.format(j) for j in joins)
sql = """
SELECT e{i}.* FROM entry e0
{joins}
WHERE e0.name='' AND e0.parent_id=0
""".format(joins=joins, i=len(names)-1)
cur = self.db.execute(sql, names)
if cur.rowcount != 1:
if 'default' in kwargs:
return kwargs['default']
raise FuseOSError(errno.ENOENT) raise FuseOSError(errno.ENOENT)
node = node.children[name] ent = cur.fetchone()
if parent: if parent:
return node, basename return ent, basename
return node return ent
def _put(self, path, data, transaction_=True):
if path[1:] == self.dbpath:
encpath = self.store.put(data, self.index_path)
else:
encpath = self.store.put(data)
with transaction(self.db, transaction_):
try:
ent = self._find(path)
sql = "UPDATE entry SET size=?, encpath=?, mtime=? WHERE id=?"
self.db.execute(sql, [len(data), encpath, time.time(), ent.id])
except:
self.store.delete(encpath)
raise
if ent.encpath != None:
self.store.delete(ent.encpath)
return encpath
def _clear_write_cache(self): def _clear_write_cache(self):
self.write_path = None self.write_path = None
@ -171,43 +123,80 @@ class GpgFs(LoggingMixIn, Operations):
self.write_len = 0 self.write_len = 0
self.write_dirty = False self.write_dirty = False
def _init_db(self, db):
sql = """
CREATE TABLE entry (
id INT PRIMARY KEY,
name TEXT NOT NULL,
parent_id INT NOT NULL,
encpath TEXT UNIQUE,
mode INT NOT NULL,
nlink INT,
size INT,
mtime FLOAT,
ctime FLOAT,
UNIQUE (name, parent_id),
FOREIGN KEY(parent_id) REFERENCES entry(id)
)"""
db.execute(sql)
db.execute('BEGIN EXCLUSIVE')
sql = """
INSERT INTO entry (id, name, parent_id, mode,
nlink, size, mtime, ctime)
VALUES (?,?,?,?,?,?,?,?)"""
now = time.time()
db.execute(sql, [0, '', 0, stat.S_IFDIR | 0755,
3, 0, now, now])
db.execute('COMMIT')
def init(self, path):
init = not self.store.exists(self.index_path)
path = self.mountpoint + '/' + self.dbpath
log.debug('opening %s', path)
self.dbconn = sqlite3.connect(path, isolation_level=None)
self.dbconn.row_factory = sqlite3.Row
self.db = self.dbconn.cursor()
if init:
self._init_db(self.db)
log.info('created %s', path)
def destroy(self, path):
self.db.close()
def access(self, path, amode):
self._find(path)
return 0
def chmod(self, path, mode): def chmod(self, path, mode):
# sanitize mode (clear setuid/gid/sticky bits) # sanitize mode (clear setuid/gid/sticky bits)
mode &= 0777 mode &= 0777
with transaction(self.db):
ent = self._find(path) ent = self._find(path)
if ent.type == ENT_DIR: mode |= (ent.mode & 0170000)
prev_mode = ent.st_mode self.db.execute('UPDATE entry SET mode=? WHERE id=?', [mode, ent.id])
ent.st_mode = mode if not self.db.rowcount:
try: raise FuseOSError(errno.ENOENT)
self._write_index()
except:
ent.st_mode = prev_mode
raise
else:
encpath = self.encroot + '/' + ent.path
os.chmod(encpath, mode)
def chown(self, path, uid, gid): def chown(self, path, uid, gid):
raise FuseOSError(errno.ENOSYS) raise FuseOSError(errno.ENOSYS)
def create(self, path, mode): def create(self, path, mode):
dir, path = self._find(path, parent=True) mode &= 0777
if path in dir.children: mode |= stat.S_IFREG
raise FuseOSError(errno.EEXIST) with transaction(self.db):
# FIXME mode parent, name = self._find(path, parent=True)
encpath = self.store.put('') sql = """
prev_mtime = dir.st_mtime INSERT INTO entry (name, parent_id, mode, nlink, ctime)
dir.children[path] = Entry(type=ENT_FILE, path=encpath, st_size=0) VALUES (?,?,?,?,?)
log.debug('new path %s => %s', path, encpath) """
dir.st_mtime = int(time.time()) now = time.time()
try: try:
self._write_index() self.db.execute(sql, [name, parent.id, mode, 1, now])
except: except sqlite3.IntegrityError:
try: self.store.delete(encpath) raise FuseOSError(errno.EEXIST)
except: pass self._put(path, '', transaction_=False)
del dir.children[path] sql = "UPDATE entry SET mtime=? WHERE id=?"
dir.st_mtime = prev_mtime self.db.execute(sql, [now, parent.id])
raise
self.fd += 1 self.fd += 1
return self.fd return self.fd
@ -217,19 +206,7 @@ class GpgFs(LoggingMixIn, Operations):
return 0 return 0
buf = ''.join(self.write_buf) buf = ''.join(self.write_buf)
self.write_buf = [buf] self.write_buf = [buf]
ent = self._find(self.write_path) self._put(self.write_path, buf)
prev_size = ent.st_size
prev_path = ent.path
ent.st_size = len(buf)
ent.path = self.store.put(buf)
try:
self._write_index()
except:
self.store.delete(ent.path)
ent.st_size = prev_size
ent.path = prev_path
raise
self.store.delete(prev_path)
self.write_dirty = False self.write_dirty = False
log.debug('flushed %d bytes to %s', len(buf), self.write_path) log.debug('flushed %d bytes to %s', len(buf), self.write_path)
return 0 return 0
@ -239,19 +216,11 @@ class GpgFs(LoggingMixIn, Operations):
return 0 return 0
def getattr(self, path, fh = None): def getattr(self, path, fh = None):
with transaction(self.db):
ent = self._find(path) ent = self._find(path)
if ent.type == ENT_DIR: return dict(st_mode = ent.mode, st_size = ent.size,
return dict(st_mode = stat.S_IFDIR | ent.st_mode, st_ctime = ent.ctime, st_mtime = ent.mtime,
st_size = len(ent.children), st_atime = 0, st_nlink = ent.nlink)
st_ctime = ent.st_ctime, st_mtime = ent.st_mtime,
st_atime = 0, st_nlink = 3)
# ensure st_size is up-to-date
self.flush(path, 0)
encpath = self.encroot + '/' + ent.path
s = os.stat(encpath)
return dict(st_mode = s.st_mode, st_size = ent.st_size,
st_atime = s.st_atime, st_mtime = s.st_mtime,
st_ctime = s.st_ctime, st_nlink = s.st_nlink)
def getxattr(self, path, name, position = 0): def getxattr(self, path, name, position = 0):
raise FuseOSError(errno.ENODATA) # ENOATTR raise FuseOSError(errno.ENODATA) # ENOATTR
@ -260,21 +229,23 @@ class GpgFs(LoggingMixIn, Operations):
return [] return []
def mkdir(self, path, mode): def mkdir(self, path, mode):
dir, path = self._find(path, parent=True) mode &= 0777
if path in dir.children: mode |= stat.S_IFDIR
raise FuseOSError(errno.EEXIST) with transaction(self.db):
prev_mtime = dir.st_mtime parent, name = self._find(path, parent=True)
dir.children[path] = Entry(type=ENT_DIR, children={}, sql = """
st_mode=(mode & 0777), INSERT INTO entry
st_mtime=int(time.time()), (name, type, parent_id, mode, nlink, size, mtime, ctime)
st_ctime=int(time.time())) VALUES (?,?,?,?,?,?,?,?)
dir.st_mtime = int(time.time()) """
now = time.time()
try: try:
self._write_index() self.db.execute(sql, [name, parent.id,
except: mode, 2, 0, now, now])
del dir.children[path] except sqlite3.IntegrityError:
dir.st_mtime = prev_mtime raise FuseOSError(errno.EEXIST)
raise sql = "UPDATE entry SET mtime=? WHERE id=?"
self.db.execute(sql, [now, parent.id])
def open(self, path, flags): def open(self, path, flags):
return 0 return 0
@ -282,13 +253,18 @@ class GpgFs(LoggingMixIn, Operations):
def read(self, path, size, offset, fh): def read(self, path, size, offset, fh):
self.flush(path, 0) self.flush(path, 0)
ent = self._find(path) ent = self._find(path)
assert ent.type == ENT_FILE assert ent.mode & stat.S_IFREG
data = self.store.get(ent.path) try:
data = self.store.get(ent.encpath)
except IOError:
raise FuseOSError(errno.ENOENT)
return data[offset:offset + size] return data[offset:offset + size]
def readdir(self, path, fh): def readdir(self, path, fh):
dir = self._find(path) dirent = self._find(path)
return ['.', '..'] + list(dir.children) sql = "SELECT name FROM entry WHERE parent_id=?"
self.db.execute(sql, [dirent.id])
return ['.', '..'] + [name for name, in self.db]
def readlink(self, path): def readlink(self, path):
raise FuseOSError(errno.ENOSYS) raise FuseOSError(errno.ENOSYS)
@ -301,53 +277,43 @@ class GpgFs(LoggingMixIn, Operations):
self._clear_write_cache() self._clear_write_cache()
if new.startswith(old): if new.startswith(old):
raise FuseOSError(errno.EINVAL) raise FuseOSError(errno.EINVAL)
old_dir, old_name = self._find(old, parent=True) with transaction(self.db):
if old_name not in old_dir.children: old_ent = self._find(old)
raise FuseOSError(errno.ENOENT) new_ent = self._find(new, default=None)
new_dir, new_name = self._find(new, parent=True) old_parent, old_name = self._find(old, parent=True)
prev_ent = new_dir.children.get(new_name) new_parent, new_name = self._find(new, parent=True)
if prev_ent: if new_ent != None:
if prev_ent.type == ENT_DIR: if new_ent.mode & stat.S_IFDIR:
if old_dir[old_name].type != ENT_DIR: if not old_ent.mode & stat.S_IFDIR:
raise FuseOSError(errno.EISDIR) raise FuseOSError(errno.EISDIR)
if prev_ent.children: sql = "SELECT COUNT(*) FROM entry WHERE parent_id=?"
self.db.execute(sql, [new_ent.id])
if self.db.fetchone()[0]:
raise FuseOSError(errno.ENOTEMPTY) raise FuseOSError(errno.ENOTEMPTY)
elif old_dir[old_name].type == ENT_DIR: elif old_ent.mode & stat.S_IFDIR:
raise FuseOSError(errno.ENOTDIR) raise FuseOSError(errno.ENOTDIR)
prev_old_mtime = old_dir.st_mtime sql = "DELETE FROM entry WHERE id=?"
prev_new_mtime = new_dir.st_mtime self.db.execute(sql, [new_ent.id])
new_dir.children[new_name] = old_dir.children.pop(old_name) sql = "UPDATE entry SET parent_id=? WHERE id=?"
old_dir.st_mtime = new_dir.st_mtime = int(time.time()) self.db.execute(sql, [new_parent.id, old_ent.id])
try: sql = "UPDATE entry SET mtime=? WHERE id IN (?,?)"
self._write_index() self.db.execute(sql, [time.time(), old_parent.id, new_parent.id])
except: if new_ent != None and new_ent.mode & stat.S_IFREG:
old_dir.children[old_name] = new_dir.children.pop(new_name) self.store.delete(new_ent.encpath)
if prev_ent:
new_dir.children[new_name] = prev_ent
old_dir.st_mtime = prev_old_mtime
new_dir.st_mtime = prev_new_mtime
raise
if prev_ent and prev_ent.type == ENT_FILE:
os.remove(self.encroot + '/' + prev_ent.path)
def rmdir(self, path): def rmdir(self, path):
parent, path = self._find(path, parent=True) with transaction(self.db):
if path not in parent.children: ent = self._find(path)
raise FuseOSError(errno.ENOENT) if not ent.mode & stat.S_IFDIR:
ent = parent.children[path]
if ent.type != ENT_DIR:
raise FuseOSError(errno.ENOTDIR) raise FuseOSError(errno.ENOTDIR)
if ent.children: sql = "SELECT COUNT(*) FROM entry WHERE parent_id=?"
self.db.execute(sql, [ent.id])
if self.db.fetchone()[0]:
raise FuseOSError(errno.ENOTEMPTY) raise FuseOSError(errno.ENOTEMPTY)
prev_mtime = parent.st_mtime sql = "DELETE FROM entry WHERE id=?"
del parent.children[path] self.db.execute(sql, [ent.id])
parent.st_mtime = int(time.time()) sql = "UPDATE entry SET mtime=? WHERE id=?"
try: self.db.execute(sql, [time.time(), ent.parent_id])
self._write_index()
except:
parent.children[path] = ent
parent.st_mtime = prev_mtime
raise
def setxattr(self, path, name, value, options, position = 0): def setxattr(self, path, name, value, options, position = 0):
raise FuseOSError(errno.ENOSYS) raise FuseOSError(errno.ENOSYS)
@ -361,69 +327,42 @@ class GpgFs(LoggingMixIn, Operations):
def truncate(self, path, length, fh = None): def truncate(self, path, length, fh = None):
self.flush(path, 0) self.flush(path, 0)
self._clear_write_cache() self._clear_write_cache()
with transaction(self.db):
ent = self._find(path) ent = self._find(path)
if length == 0: if length == 0:
buf = '' buf = ''
else: else:
buf = self.store.get(ent.path) buf = self.store.get(ent.encpath)
buf = buf[:length] buf = buf[:length]
prev_size = ent.st_size self._put(path, buf, transaction_=False)
prev_path = ent.path
ent.st_size = length
ent.path = self.store.put(buf)
try:
self._write_index()
except:
os.remove(ent.path)
ent.st_size = prev_size
ent.path = prev_path
raise
self.store.delete(prev_path)
def unlink(self, path): def unlink(self, path):
with transaction(self.db):
if self.write_path == path: if self.write_path == path:
# no need to flush afterwards # no need to flush afterwards
self._clear_write_cache() self._clear_write_cache()
dir, name = self._find(path, parent=True) ent = self._find(path)
if name not in dir.children: sql = "DELETE FROM entry WHERE id=?"
raise FuseOSError(errno.ENOENT) self.db.execute(sql, [ent.id])
ent = dir.children[name] sql = "UPDATE entry SET mtime=? WHERE id=?"
encpath = self.encroot + '/' + ent.path self.db.execute(sql, [time.time(), ent.parent_id])
del dir.children[name] self.store.delete(ent.encpath)
prev_mtime = dir.st_mtime
dir.st_mtime = int(time.time())
try:
self._write_index()
except:
dir.children[name] = ent
dir.st_mtime = prev_mtime
raise
os.remove(encpath)
def utimens(self, path, times = None): def utimens(self, path, times = None):
ent = self._find(path)
if ent.type == ENT_DIR:
prev_mtime = ent.st_mtime
if times is None: if times is None:
ent.st_mtime = int(time.time()) mtime = time.time()
else: else:
ent.st_mtime = times[1] mtime = times[1]
try: with transaction(self.db):
self._write_index() ent = self._find(path)
except: sql = "UPDATE entry SET mtime=? WHERE id=?"
ent.st_mtime = prev_mtime self.db.execute(sql, [mtime, ent.id])
raise
else:
# flush may mess with mtime
self.flush(path, 0)
encpath = self.encroot + '/' + ent.path
os.utime(encpath, times)
def write(self, path, data, offset, fh): def write(self, path, data, offset, fh):
ent = self._find(path)
if path != self.write_path: if path != self.write_path:
self.flush(self.write_path, None) self.flush(self.write_path, None)
buf = self.store.get(ent.path) ent = self._find(path)
buf = self.store.get(ent.encpath)
self.write_buf = [buf] self.write_buf = [buf]
self.write_len = len(buf) self.write_len = len(buf)
self.write_path = path self.write_path = path
@ -445,5 +384,5 @@ if __name__ == '__main__':
logpath = os.path.join(os.path.dirname(__file__), 'gpgfs.log') logpath = os.path.join(os.path.dirname(__file__), 'gpgfs.log')
log.addHandler(logging.FileHandler(logpath, 'w')) log.addHandler(logging.FileHandler(logpath, 'w'))
log.setLevel(logging.DEBUG) log.setLevel(logging.DEBUG)
fs = GpgFs(sys.argv[2], sys.argv[1]) fs = GpgFs(sys.argv[2], sys.argv[3], sys.argv[1])
FUSE(fs, sys.argv[3], foreground=True) FUSE(fs, sys.argv[3], foreground=True)

View File

@ -60,3 +60,6 @@ class GpgStore(object):
def delete(self, path): def delete(self, path):
os.remove(self.encroot + '/' + path) os.remove(self.encroot + '/' + path)
log.debug('deleted %s' % path) log.debug('deleted %s' % path)
def exists(self, path):
return os.path.exists(self.encroot + '/' + path)