You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-lib/pym/calculate/lib/utils/mount.py

327 lines
10 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
# Copyright 2008-2017 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import os
import re
from calculate.lib.utils.tools import SingletonParam
import files
import device
import xattr
import errno
from contextlib import contextmanager
from files import process, makeDirectory
from tools import traverse
import tempfile
_ = lambda x: x
from calculate.lib.cl_lang import setLocalTranslate
setLocalTranslate('cl_lib3', sys.modules[__name__])
def isMount(dn):
"""
Возвращает путь примонтированного dn
"""
def find_names(old_dn):
dn = os.path.abspath(old_dn)
yield dn
if dn.startswith('/dev'):
info = device.udev.get_device_info(name=dn)
if 'DM_VG_NAME' in info and 'DM_LV_NAME' in info:
yield '/dev/mapper/{vg}-{lv}'.format(vg=info['DM_VG_NAME'],
lv=info['DM_LV_NAME'])
def get_overlay_mounts(line):
mounts = line.split(' ')
yield mounts[1]
for dn in re.findall("(?:lowerdir=|upperdir=|workdir=)([^,]+)",
mounts[3]):
yield dn
find_data = set(find_names(dn))
ret = ""
for line in files.readLinesFile('/etc/mtab'):
if " overlay " not in line:
if " " in line:
mounts = set(line.split(' ')[:2])
if mounts & find_data:
ret = (mounts - find_data).pop()
else:
mounts = set(get_overlay_mounts(line))
dest = line.split(' ')[1]
if mounts & find_data:
if dest in find_data:
ret = "overlay"
else:
return dest
return ret
class MountHelperError(Exception):
pass
class MountHelperNotFound(MountHelperError):
pass
class MountHelper(object):
"""Data reader for fstab"""
data_file = '/etc/fstab'
NAME, DIR, TYPE, OPTS, FREQ, PASSNO = range(0, 6)
def __init__(self, data_file=None, devs=()):
from device import getUUIDDict
if data_file:
self.data_file = data_file
self.cache = []
self.rotateCache = []
self.dictUUID = getUUIDDict(devs=devs)
self.rebuildCache()
def _readdata(self):
with open(self.data_file, 'r') as f:
return f.read()
def rebuildCache(self):
"""Rebuild cache from fstab file"""
def setlen(ar):
return ar[:6] + [""] * (6 - len(ar))
self.cache = \
map(lambda x: setlen(map(lambda y: y.strip(), x.split())),
filter(lambda x: x.strip() and not x.lstrip().startswith("#"),
self._readdata().split('\n')))
for data in self.cache:
convertDev = lambda x: (os.path.realpath(x)
if x.startswith('/') else x)
data[0] = device.udev.get_device_info(
name=convertDev(self.dictUUID.get(data[0], data[0]))
).get('DEVNAME', data[0])
data[1] = data[1] if data[2] != "swap" else "swap"
self.rotateCache = zip(*self.cache)
def getBy(self, what=DIR, where=NAME, _in=None, eq=None,
contains=None, noteq=None, allentry=False):
"""Get data from fstab"""
if eq is not None:
filterfunc = lambda x: x[where] == eq
elif _in is not None:
filterfunc = lambda x: x[where] in _in
elif contains is not None:
filterfunc = lambda x: contains in x[where]
else:
filterfunc = lambda x: x[where] != noteq
res = map(lambda x: x[what], filter(filterfunc, self.cache))
if allentry:
return res
else:
return "" if not res else res[-1]
def getFields(self, *fields):
"""Get all data by specifie fields"""
return zip(*reduce(lambda x, y: x + [self.rotateCache[y]], fields, []))
def isReadonly(self, what=DIR, eq=None):
for data in filter(lambda x: x[what] == eq, self.cache):
opts = data[self.OPTS].split(',')
if "ro" in opts:
return True
elif "rw" in opts:
return False
raise MountHelperNotFound(eq)
def writable(self, dn):
return not self.isReadonly(eq=dn)
def readonly(self, dn):
return self.isReadonly(eq=dn)
def exists(self, dn):
return self.isExists(eq=dn) or self.isExists(what=self.NAME, eq=dn)
def isExists(self, what=DIR, eq=None, noteq=None):
"""Field with condition exist in fstab"""
if not eq is None:
filterfunc = lambda x: x[what] == eq
else:
filterfunc = lambda x: x[what] != noteq
return bool(filter(filterfunc, self.cache))
class FStab(MountHelper):
"""Data reader for fstab"""
__metaclass__ = SingletonParam
data_file = '/etc/fstab'
class Mounts(MountHelper):
"""Data reader for fstab"""
data_file = '/etc/mtab'
class DiskSpaceError(Exception):
pass
class DiskSpace(object):
def __init__(self):
self.df_cmd = files.getProgPath('/bin/df')
def get_free(self, dev=None, dn=None):
if dev:
mp = isMount(dev)
if not mp:
raise DiskSpaceError(_("Device %s must be mounted") % dev)
dn = dev
p = files.process(self.df_cmd, dn, "-B1")
if p.success():
data = p.read().strip()
lines = data.split('\n')
if len(lines) >= 2:
cols = filter(None, lines[1].split())
if len(cols) == 6:
return int(cols[3])
raise DiskSpaceError(_("Wrong df output:\n%s") % data)
else:
raise DiskSpaceError(str(p.readerr()))
def commonPath(*paths):
"""Return common path from list of paths"""
paths = map(lambda x: os.path.normpath(x).split('/'), paths)
res = map(lambda x: x[0],
filter(lambda x: filter(lambda y: x[0] == y, x[1:]), zip(*paths)))
return "/".join(res)
def childMounts(pathname):
"""Get all mount points which contain path"""
if pathname != "none":
absPath = os.path.abspath(pathname)
else:
absPath = pathname
mtabFile = '/etc/mtab'
if not os.access(mtabFile, os.R_OK):
return ""
with open(mtabFile) as f:
return reduce(lambda x, y: x + [y],
filter(lambda x: commonPath(absPath, x[0]) == absPath or \
commonPath(absPath, x[1]) == absPath,
map(lambda x: [x[0], x[1]],
map(lambda x: x.split(" "), f))),
[])
class MountError(Exception):
pass
def mount(source, target, fstype=None, options=None):
params = [source, target]
if options is not None:
params.insert(0, "-o%s"%",".join(traverse([options])))
if fstype is not None:
params.insert(0, "-t%s"%fstype)
p = process("/bin/mount", *params)
if p.success():
return True
else:
raise MountError(
_("Failed to mount {source} to {target}").format(
source=source,
target=target) +
_(":") + str(p.readerr()))
def umount(target):
p = process("/bin/umount", target)
if p.success():
return True
else:
raise MountError(
_("Failed to umount {target}").format(
target=target) +
_(":") + str(p.readerr()))
class BtrfsError(Exception):
pass
class Btrfs(object):
"""
Получение информации о btrfs
"""
check_path = "/run/calculate/mount"
def __init__(self, block):
self.block = block
if not os.path.exists(block):
raise BtrfsError(_("Device not found"))
@contextmanager
def mount(self):
dn = None
try:
makeDirectory(self.check_path)
dn = tempfile.mkdtemp(prefix="btrfscheck-", dir=self.check_path)
mount(self.block, dn, "btrfs")
yield dn
except KeyboardInterrupt:
raise
except MountError as e:
if "wrong fs type" in str(e):
raise BtrfsError(_("{} is not btrfs").format(self.block))
else:
raise BtrfsError(str(e))
finally:
if dn:
if os.path.ismount(dn):
umount(dn)
os.rmdir(dn)
def get_compression(self, rel_dn):
rel_dn = rel_dn.lstrip("/")
with self.mount() as dn:
try:
full_dn = os.path.join(dn, rel_dn)
if os.path.exists(full_dn):
return xattr.get(full_dn, "btrfs.compression")
else:
return ""
except IOError as e:
if e.errno == errno.ENODATA:
return ""
raise BtrfsError(_("Failed to get btrfs compression"))
@property
def compression(self):
return self.get_compression("")
def set_compression(self, rel_dn, value):
rel_dn = rel_dn.lstrip("/")
with self.mount() as dn:
try:
full_dn = os.path.join(dn, rel_dn)
if not os.path.exists(full_dn):
makeDirectory(full_dn)
return xattr.set(full_dn, "btrfs.compression", value)
except IOError as e:
if e.errno == errno.ENODATA:
return ""
raise BtrfsError(_("Failed to set btrfs compression"))
@compression.setter
def compression(self, value):
self.set_compression("", value)