You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-lib/pym/calculate/lib/utils/mount.py

337 lines
10 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
# Copyright 2008-2017 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import os
import re
from .tools import SingletonParam
from . import files
from . import device
import errno
from contextlib import contextmanager
from .files import process, makeDirectory
from .tools import traverse
import tempfile
from functools import reduce
_ = lambda x: x
from ..cl_lang import setLocalTranslate
setLocalTranslate('cl_lib3', sys.modules[__name__])
def isMount(dn):
"""
Возвращает путь примонтированного dn
"""
def find_names(old_dn):
dn = os.path.abspath(old_dn)
yield dn
if dn.startswith('/dev'):
info = device.udev.get_device_info(name=dn)
if 'DM_VG_NAME' in info and 'DM_LV_NAME' in info:
yield '/dev/mapper/{vg}-{lv}'.format(vg=info['DM_VG_NAME'],
lv=info['DM_LV_NAME'])
def get_overlay_mounts(line):
mounts = line.split(' ')
yield mounts[1]
for dn in re.findall("(?:lowerdir=|upperdir=|workdir=)([^,]+)",
mounts[3]):
yield dn
find_data = set(find_names(dn))
ret = ""
for line in files.readLinesFile('/etc/mtab'):
if " overlay " not in line:
if " " in line:
mounts = set(line.split(' ')[:2])
if mounts & find_data:
ret = (mounts - find_data).pop()
else:
mounts = set(get_overlay_mounts(line))
dest = line.split(' ')[1]
if mounts & find_data:
if dest in find_data:
ret = "overlay"
else:
return dest
return ret
class MountHelperError(Exception):
pass
class MountHelperNotFound(MountHelperError):
pass
class MountHelper():
"""Data reader for fstab"""
data_file = '/etc/fstab'
NAME, DIR, TYPE, OPTS, FREQ, PASSNO = range(0, 6)
def __init__(self, data_file=None, devs=()):
from .device import getUUIDDict
if data_file:
self.data_file = data_file
self.cache = []
self.rotateCache = []
self.dictUUID = getUUIDDict(devs=devs)
self.rebuildCache()
def _readdata(self):
with open(self.data_file, 'r') as f:
return f.read()
def rebuildCache(self):
"""Rebuild cache from fstab file"""
def setlen(ar):
return ar[:6] + [""] * (6 - len(ar))
self.cache = [setlen([y.strip() for y in x.split()]) for x
in self._readdata().split('\n') if x.strip() and not x.lstrip().startswith("#")]
for data in self.cache:
convertDev = lambda x: (os.path.realpath(x)
if x.startswith('/') else x)
data[0] = device.udev.get_device_info(
name=convertDev(self.dictUUID.get(data[0], data[0]))
).get('DEVNAME', data[0])
data[1] = data[1] if data[2] != "swap" else "swap"
self.rotateCache = list(zip(*self.cache))
def getBy(self, what=DIR, where=NAME, _in=None, eq=None,
contains=None, noteq=None, allentry=False):
"""Get data from fstab"""
if eq is not None:
filterfunc = lambda x: x[where] == eq
elif _in is not None:
filterfunc = lambda x: x[where] in _in
elif contains is not None:
filterfunc = lambda x: contains in x[where]
else:
filterfunc = lambda x: x[where] != noteq
res = [x[what] for x in self.cache if filterfunc(x)]
if allentry:
return res
else:
return "" if not res else res[-1]
def getFields(self, *fields):
"""Get all data by specifie fields"""
return list(zip(*reduce(lambda x, y: x + [self.rotateCache[y]], fields, [])))
def isReadonly(self, what=DIR, eq=None):
for data in (x for x in self.cache if x[what] == eq):
opts = data[self.OPTS].split(',')
if "ro" in opts:
return True
elif "rw" in opts:
return False
raise MountHelperNotFound(eq)
def writable(self, dn):
return not self.isReadonly(eq=dn)
def readonly(self, dn):
return self.isReadonly(eq=dn)
def exists(self, dn):
return self.isExists(eq=dn) or self.isExists(what=self.NAME, eq=dn)
def isExists(self, what=DIR, eq=None, noteq=None):
"""Field with condition exist in fstab"""
if not eq is None:
filterfunc = lambda x: x[what] == eq
else:
filterfunc = lambda x: x[what] != noteq
return bool([x for x in self.cache if filterfunc(x)])
class FStab(MountHelper, metaclass=SingletonParam):
"""Data reader for fstab"""
data_file = '/etc/fstab'
class Mounts(MountHelper):
"""Data reader for fstab"""
data_file = '/etc/mtab'
class DiskSpaceError(Exception):
pass
class DiskSpace():
def __init__(self):
self.df_cmd = files.getProgPath('/bin/df')
def get_free(self, dev=None, dn=None):
if dev:
mp = isMount(dev)
if not mp:
raise DiskSpaceError(_("Device %s must be mounted") % dev)
dn = dev
p = files.process(self.df_cmd, dn, "-B1")
if p.success():
data = p.read().strip()
lines = data.split('\n')
if len(lines) >= 2:
cols = [x for x in lines[1].split() if x]
if len(cols) == 6:
return int(cols[3])
raise DiskSpaceError(_("Wrong df output:\n%s") % data)
else:
raise DiskSpaceError(str(p.readerr()))
def commonPath(*paths):
"""Return common path from list of paths"""
paths = [os.path.normpath(x).split('/') for x in paths]
res = [x[0] for x in zip(*paths) if [y for y in x[1:] if x[0] == y]]
return "/".join(res)
def childMounts(pathname):
"""Get all mount points which contain path"""
if pathname != "none":
absPath = os.path.abspath(pathname)
else:
absPath = pathname
mtabFile = '/etc/mtab'
if not os.access(mtabFile, os.R_OK):
return ""
with open(mtabFile) as f:
return reduce(lambda x, y: x + [y],
[x for x in ([y[0], y[1]] for y in (z.split(" ") for z in f))
if commonPath(absPath, x[0]) == absPath or commonPath(absPath, x[1]) == absPath], [])
class MountError(Exception):
pass
def mount(source, target, fstype=None, options=None):
params = [source, target]
if options is not None:
params.insert(0, "-o%s"%",".join(traverse([options])))
if fstype is not None:
params.insert(0, "-t%s"%fstype)
p = process("/bin/mount", *params)
if p.success():
return True
else:
raise MountError(
_("Failed to mount {source} to {target}").format(
source=source,
target=target) +
_(":") + str(p.readerr()))
def umount(target):
p = process("/bin/umount", target)
if p.success():
return True
else:
raise MountError(
_("Failed to umount {target}").format(
target=target) +
_(":") + str(p.readerr()))
def fuser_mountpoint(mp):
"""
Проверить занята ли точка монтирования
"""
p = process("/bin/fuser","-m",mp)
return p.success()
def try_umount(target):
"""
Попытаться отключить точку монтирования
"""
try:
return umount(target)
except MountError:
return False
class BtrfsError(Exception):
pass
class Btrfs():
"""
Получение информации о btrfs
"""
check_path = "/run/calculate/mount"
def __init__(self, block):
self.block = block
if not os.path.exists(block):
raise BtrfsError(_("Device not found"))
@contextmanager
def mount(self):
dn = None
try:
makeDirectory(self.check_path)
dn = tempfile.mkdtemp(prefix="btrfscheck-", dir=self.check_path)
mount(self.block, dn, "btrfs")
yield dn
except KeyboardInterrupt:
raise
except MountError as e:
if "wrong fs type" in str(e):
raise BtrfsError(_("{} is not btrfs").format(self.block))
else:
raise BtrfsError(str(e))
finally:
if dn:
if os.path.ismount(dn):
umount(dn)
os.rmdir(dn)
def get_compression(self, rel_dn):
rel_dn = rel_dn.lstrip("/")
with self.mount() as dn:
try:
full_dn = os.path.join(dn, rel_dn)
if os.path.exists(full_dn):
return files.xattr.get(full_dn, "btrfs.compression")
else:
return ""
except files.XAttrError as e:
if "No such attr" in str(e):
return ""
raise BtrfsError(_("Failed to get btrfs compression"))
@property
def compression(self):
return self.get_compression("")
def set_compression(self, rel_dn, value):
rel_dn = rel_dn.lstrip("/")
with self.mount() as dn:
try:
full_dn = os.path.join(dn, rel_dn)
if not os.path.exists(full_dn):
makeDirectory(full_dn)
return files.xattr.set(full_dn, "btrfs.compression", value)
except (IOError, files.XAttrError) as e:
raise BtrfsError(_("Failed to set btrfs compression"))
@compression.setter
def compression(self, value):
self.set_compression("", value)