# -*- coding: utf-8 -*- # Copyright 2016 Mir Calculate. http://www.calculate-linux.org # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os from .files import readFile from collections import deque from collections.abc import Mapping from os import path import json from functools import wraps import fnmatch from . import tools class VFSError(Exception): pass class VFSErrorNotFound(VFSError): pass class VFSErrorExists(VFSError): pass class VFSErrorBadsymlink(VFSError): pass class VFSErrorCycliclink(VFSErrorBadsymlink): pass class VFSErrorNotImplemented(VFSError): pass class VFSErrorRestoreFailed(VFSError): pass class VFS(tools.GenericFs): """ Виртуальная файловая система """ def __init__(self, vfs=None): if isinstance(vfs, VFS): self.root = vfs.root else: self.root = VFSRoot() def get(self, fn): r = self.root for dn in (x for x in fn.split('/') if x): r = r[dn] return r def walk(self, sort=False): deq = deque((self.root,)) if sort: sortfunc = lambda z: sorted(z, key=lambda y: y.name, reverse=True) else: sortfunc = iter while deq: d = deq.pop() if isinstance(d, VFSDirectory): for x in sortfunc(d): deq.append(x) yield d def read(self, fn): return self.get(fn).read() def write(self, fn, data): return self.get(fn).write(data) def listdir(self, fn, fullpath=False): return self.get(fn).listdir(fullpath=fullpath) def lexists(self, fn): try: self.get(fn) return True except VFSErrorNotFound: return False def dereflink(self, obj): objs = [obj] while isinstance(obj, VFSLink): obj = obj.deref() if obj in objs: raise VFSErrorCycliclink(obj.fullname()) return obj def _glob(self, pathname): paths = deque(x for x in pathname.split("/") if x) pat = paths.popleft() SEPARATOR = (None, None) deq = deque([("/", self.root), SEPARATOR]) while deq: dirname, d = deq.popleft() if d is None: if paths: pat = paths.popleft() deq.append(SEPARATOR) continue else: break if d.isdir(): dirname = path.join(dirname, d.name) for x in d: if fnmatch.fnmatch(x.name, pat): if not paths: yield (path.join(dirname, x.name), x) else: deq.append((dirname, x)) def glob(self, pathname): return (x for x, y in self._glob(pathname)) def realglob(self, pathname): return [y.fullname() for x, y in self._glob(pathname)] def exists(self, fn): try: obj = self.get(fn) if isinstance(obj, VFSLink): obj = self.dereflink(obj) return True except (VFSErrorNotFound, VFSErrorBadsymlink): return False def realpath(self, fn): bestpath = fn obj = None try: obj = self.get(fn) if isinstance(obj, VFSLink): obj = self.dereflink(obj) return obj.fullname() except VFSErrorBadsymlink: if isinstance(obj, VFSLink): bestpath = obj.symlink() except VFSErrorNotFound: pass return path.normpath(bestpath) # serialization def dump(self): return self.root.dump() def restore(self, dumpdata): self.root = self.root.restore(dumpdata) class SafeVFS(VFS): """ Виртуатльная файловая система, не возвращающая ошибок при обращении к отсутстующему файлу """ def exists(self, fn): return self.lexists(fn) def read(self, fn): try: return super().read(fn) except VFSError: return "" def listdir(self, fn, fullpath=False): try: return super().listdir(fn, fullpath) except VFSError: return [] class VFSObject(): def __init__(self, name): self.parent = None self.name = name def dump(self): raise NotImplemented() def fullname(self): if self.parent: dirname = self.parent.fullname() dirname = "" if dirname == "/" else dirname return "%s/%s" % (dirname, self.name) else: return self.name def read(self): raise NotImplemented() def write(self, data): raise NotImplemented() def listdir(self, fullpath=False): raise NotImplemented() def isdir(self): return False @classmethod def restore(cls, data): for clsobj in (VFSFile, VFSLink, VFSDirectory, VFSRoot): obj = clsobj.restore(data) if obj is not None: return obj raise VFSErrorRestoreFailed(data) class VFSDirectory(VFSObject, Mapping): def __init__(self, name, *args): super().__init__(name) self.children = {} for o in args: self.append(o) def dump(self): return ['d', self.name, [x.dump() for x in self]] def isdir(self): return True @classmethod def restore(cls, data): if data[0] == "d": objtype, name, objdata = data d = cls(name) for x in objdata: d.append(VFSObject.restore(x)) return d def __contains__(self, item): return item.name in self.children def append(self, obj): if obj in self: raise VFSErrorExists("{dirn}/{name}".format(name=obj.name, dirn=self.fullname())) self.children[obj.name] = obj obj.parent = self def __iter__(self): return iter(self.children.values()) def __len__(self): return len(self.children) def __getitem__(self, item): if item == ".": return self if item == "..": return self.parent try: return self.children[item] except KeyError: raise VFSErrorNotFound(item) def listdir(self, fullpath=False): if fullpath: dirname = self.fullname() else: dirname = "" return [path.join(dirname, x.name) for x in self.children.values()] def read(self): raise VFSErrorNotImplemented("Directory does not support reading") def write(self, data): raise VFSErrorNotImplemented("Directory does not support writing") def __eq__(self, obj): return self is obj def __ne__(self, obj): return self is not obj class VFSRoot(VFSDirectory): def __init__(self, *args): super().__init__("/", *args) def __getitem__(self, item): if item == "..": return self return super().__getitem__(item) def dump(self): return ['/', self.name, [x.dump() for x in self]] @classmethod def restore(cls, data): if data[0] == "/": objtype, name, objdata = data d = cls() for x in objdata: d.append(VFSObject.restore(x)) return d class VFSFile(VFSObject): def __init__(self, name, data=""): super().__init__(name) self.data = data def dump(self): return ['r', self.name, self.read()] @classmethod def restore(cls, data): if data[0] == "r": objtype, name, objdata = data return cls(name, objdata) def read(self): return self.data def write(self, data): self.data = data def listdir(self, fullpath=False): raise VFSErrorNotImplemented("File does not support listing") def __getitem__(self, item): raise VFSErrorNotImplemented("File does not support listing") def detect_cyclic(f): @wraps(f) def wrapper(self, *args, **kw): if self.deref_start: raise VFSErrorCycliclink(self.fullname()) self.deref_start = True try: return f(self, *args, **kw) finally: self.deref_start = False return wrapper class VFSLink(VFSObject): def __init__(self, name, target): super().__init__(name) self.name = name self._target = target self.deref_start = False @classmethod def restore(cls, data): if data[0] == "l": objtype, name, objdata = data return cls(name, objdata) def dump(self): return ['l', self.name, self.symlink()] def symlink(self): return self._target def get_root(self): r = self.parent while r.parent: r = r.parent return r def deref(self): r = self.parent if r is None: raise VFSErrorBadsymlink(self._target) if self._target.startswith("/"): r = self.get_root() for dn in (x for x in self._target.split('/') if x): try: r = r[dn] except VFSErrorNotFound: raise VFSErrorBadsymlink(self._target) return r @detect_cyclic def _isdir(self): return self.deref().isdir() def isdir(self): try: return self._isdir() except VFSErrorBadsymlink: return False def __iter__(self): if self.isdir(): return iter(self.deref()) else: raise VFSErrorNotImplemented("Symlink is not directory") @detect_cyclic def read(self): return self.deref().read() @detect_cyclic def write(self, data): return self.deref().write(data) @detect_cyclic def listdir(self, fullpath=False): return self.deref().listdir() @detect_cyclic def __getitem__(self, item): return self.deref().__getitem__(item) class VFSImporter(): """ Имортёр файловой системы """ def __init__(self, dn, prefix="/"): self.importdn = dn self.prefix = prefix def importfs(self): vfs = VFS() dn = self.importdn r = vfs.root for dns in (x for x in path.relpath(dn, self.prefix).split('/') if x): d = VFSDirectory(dns) r.append(d) r = d for root, dnames, fnames in os.walk(dn): vroot = path.join("/", path.relpath(root, self.prefix)) r = vfs.get(vroot) remove_dirs = deque() remove_files = deque() for dn in dnames: fdn = path.join(root, dn) if path.islink(fdn): self.import_link(r, dn, fdn) else: self.import_directory(r, dn, fdn) if not self.filter_dirs(root, dn): remove_dirs.append(dn) for fn in fnames: ffn = path.join(root, fn) if path.islink(ffn): self.import_link(r, fn, ffn) else: self.import_file(r, fn, ffn) if not self.filter_dirs(root, dn): remove_dirs.append(dn) while remove_dirs: dnames.remove(remove_dirs.popleft()) while remove_files: fnames.remove(remove_files.popleft()) return vfs def filter_dirs(self, root, dn): return True def filter_files(self, root, fn): return True def import_link(self, dirobj, fn, ffn): target = os.readlink(ffn) dirobj.append(VFSLink(fn, target)) def import_file(self, dirobj, fn, ffn): dirobj.append(VFSFile(fn)) def import_directory(self, dirobj, dn, fdn): dirobj.append(VFSDirectory(dn)) class VFSDevfsImporter(VFSImporter): """ Импортер /dev """ def __init__(self, dn="/dev", prefix="/"): super().__init__(dn, prefix) def filter_dirs(self, root, dn): if dn == "shm": return False return True class VFSSysfsImporter(VFSImporter): """ Импортёр /sys """ def __init__(self, dn="/sys", prefix="/"): super().__init__(dn, prefix) def import_file(self, dirobj, fn, ffn): if fn in ("version", "vendor", "model", "board_name", "board_vendor", "refcnt", "boardname", "size", "removable", "type", "operstate"): dirobj.append(VFSFile(fn, readFile(ffn))) else: dirobj.append(VFSFile(fn)) class VFSJsonKeeper(): """ Сохранение и загрузыка виртуальной файловой системы """ @staticmethod def save(f, vfs): json.dump(vfs.dump(), f) @staticmethod def load(f): vfs = VFS() vfs.restore(json.load(f)) return vfs VFSKeeper = VFSJsonKeeper