You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-lib/pym/calculate/lib/utils/files.py

1790 lines
57 KiB

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
# Copyright 2008-2017 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import select
import random
import shutil
import string
from hashlib import sha256
import os
from os import path
import subprocess
from subprocess import Popen, PIPE, STDOUT
import stat
from shutil import copytree, rmtree, copy2
import re
import sys
from itertools import *
import tarfile
from . import tools
import fcntl
from .tools import ignore
from glob import glob
from contextlib import contextmanager
import signal
from functools import reduce
_ = lambda x: x
from ..cl_lang import setLocalTranslate
setLocalTranslate('cl_lib3', sys.modules[__name__])
@contextmanager
def timeout(seconds):
def timeout_handler(signum, frame):
pass
original_handler = signal.signal(signal.SIGALRM, timeout_handler)
try:
signal.alarm(seconds)
yield
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, original_handler)
try:
from magic import (open as type_file, MAGIC_NONE, MAGIC_CONTINUE,
MAGIC_MIME_TYPE, MAGIC_COMPRESS,
MAGIC_MIME_ENCODING, MAGIC_SYMLINK)
except ImportError as e:
try:
from magic import (open as type_file, NONE as MAGIC_NONE,
CONTINUE as MAGIC_CONTNUE,
MIME_TYPE as MAGIC_MIME_TYPE,
COMPRESS as MAGIC_COMPRESS,
MIME_ENCODING as MIME_ENCODING,
SYMLINK as MAGIC_SYMLINK)
except (ImportError, AttributeError):
type_file = None
MAGIC_NONE = 0
MAGIC_SYMLINK = 0x002
MAGIC_COMPRESS = 0x004
MAGIC_CONTINUE = 0x020
MAGIC_MIME_TYPE = 0x010
MAGIC_MIME_ENCODING = 0x400
from ..cl_lang import setLocalTranslate
setLocalTranslate('cl_lib3', sys.modules[__name__])
reSearch = lambda pattern, listar: [x.groups() for x in map(pattern.search, listar) if x]
class FilesError(Exception):
pass
class StdoutableProcess():
def __init__(self):
self.pipe = None
def get_stdout(self):
return PIPE
def close(self):
pass
class KeyboardInputProcess(StdoutableProcess):
def get_stdout(self):
return None
class PipeProcess(StdoutableProcess):
def get_stdout(self):
return PIPE
class ProcessTimeout(FilesError):
pass
class process(StdoutableProcess):
"""Execute system command by Popen
Examples:
execute program and get result:
if process("/bin/gzip","/boot/somefile").success():
print "Gzip success"
unzip and process unzip data by cpio (list files):
processGzip = process("/bin/gzip","-dc","/boot/initrd")
processCpio = process("/bin/cpio","-tf",stdin=processGzip)
filelist = processCpio.readlines()
execute command and send data:
processGrub = process("/sbin/grub")
processGrub.write("root (hd0,0)\n")
processGrub.write("setup (hd0)\n")
processGrub.write("quit\n")
isok = processGrub.success()
union stdout and stderr:
process("/bin/ls","/",stderr=STDOUT)
result to stdout:
process("/bin/ls",stdout=None)
get data from keyboard:
process("/bin/cat",stdin=None)
"""
STDOUT = STDOUT
PIPE = PIPE
def __init__(self, command, *params, **kwarg):
super().__init__()
if "stdin" not in kwarg:
stdin = PipeProcess()
else:
if kwarg["stdin"] is None:
stdin = KeyboardInputProcess()
else:
stdin = kwarg["stdin"]
self.stdout = kwarg.get("stdout", PIPE)
self.envdict = kwarg.get("envdict", os.environ.copy())
self.envdict["LANG"] = kwarg.get('lang', 'C')
self.langc = "langc" in kwarg
self.timeout = kwarg.get("timeout", None)
self.stderr = kwarg.get("stderr", PIPE)
self.cwd = kwarg.get("cwd", None)
self.command = getProgPath(command)
if not self.command:
raise FilesError(_("Command not found '%s'") %
command)
self.command = [self.command] + list(params)
self.stdin = stdin
self.iter = None
self.cacheresult = None
self.cacheerr = None
self.communicated = False
self._cachedata = []
def _open(self):
"""Open pipe if it not open"""
if not self.pipe:
self.pipe = Popen(self.command,
stdout=self.stdout,
stdin=self.stdin.get_stdout(),
stderr=self.stderr,
cwd=self.cwd,
close_fds=True,
env=self.envdict)
def get_stdout(self):
"""Get current stdout"""
if self.pipe:
self.close()
self._open()
self.cacheresult = ""
return self.pipe.stdout
def write(self, data):
"""Write to process stdin"""
self._open()
try:
self.pipe.stdin.write(data if isinstance(data, bytes) else data.encode("UTF-8"))
self.pipe.stdin.flush()
except IOError as e:
raise FilesError(str(e))
def close(self):
"""Close stdin"""
if self.pipe:
self.pipe.poll()
if self.pipe.stdin:
self.pipe.stdin.close()
self.pipe.stdin = None
if self.stdin:
self.stdin.close()
self.stdin = None
def readerr(self):
self.read()
if self.cacheerr is None:
try:
self.cacheerr = self.pipe.stderr.read()
except IOError:
self.cacheerr = b""
return self.cacheerr.decode("UTF-8")
def readByLine(self):
_cacheerr = []
try:
self._open()
if self.cacheresult is None:
_stdout = self.pipe.stdout.fileno()
_stderr = self.pipe.stderr.fileno()
reads = [_stdout, _stderr]
self.pipe.stdin.close()
self.pipe.stdin = None
while True:
ret = select.select(reads, [], [], self.timeout)
if not ret[0]:
self.kill()
raise ProcessTimeout()
for fd in ret[0]:
if fd == _stdout:
s = self.pipe.stdout.readline()
yield s.decode("UTF-8")
self._cachedata.append(s)
if fd == _stderr:
s = self.pipe.stderr.readline()
_cacheerr.append(s)
if self.pipe.poll() is not None:
s = " "
while True:
s = self.pipe.stdout.readline()
if not s:
break
yield s.decode("UTF-8")
self._cachedata.append(s)
while True:
s = self.pipe.stderr.readline()
if not s:
break
yield s.decode("UTF-8")
_cacheerr.append(s)
break
except KeyboardInterrupt:
self.kill()
raise KeyboardInterrupt
finally:
if self.cacheresult is None:
self.cacheresult = b'\n'.join(self._cachedata)
self.cacheerr = b''.join(_cacheerr)
self.close()
def read(self):
"""Read all data"""
try:
self._open()
if self.cacheresult is None:
if self.timeout is None:
self.cacheresult, self.cacheerr = self.pipe.communicate()
else:
for line in self.readByLine():
pass
try:
return self.cacheresult.decode(encoding="UTF-8")
except UnicodeDecodeError as e:
return self.cacheresult.decode(encoding="UTF-16")
except KeyboardInterrupt:
self.kill()
raise KeyboardInterrupt
finally:
self.close()
def readlines(self):
"""Read lines"""
return self.read().split('\n')
def __iter__(self):
"""Get iterator"""
if not self.iter:
self.iter = iter(self.readlines())
return self.iter
def kill(self):
"""Kill this process"""
if self.pipe:
self.pipe.kill()
def next(self):
"""Next string from stdout"""
return next(self.__iter__())
def __next__(self):
"""Next string from stdout"""
return next(self.__iter__())
def returncode(self):
"""Get return code"""
self.read()
return self.pipe.returncode
def success(self):
"""Success or not"""
return self.returncode() == 0
def failed(self):
"""Failed or not"""
return self.returncode() != 0
def getModeFile(nameFile, mode="all"):
"""Выдает информацию о файле
mode=="all"
права файла, владелец, группа файла
mode=="mode"
права файла
mode=="owner"
владелец, группа файла
"""
fst = os.lstat(nameFile)
if mode == "all":
return stat.S_IMODE(fst.st_mode), fst.st_uid, fst.st_gid
if mode == "mode":
return stat.S_IMODE(fst.st_mode)
if mode == "owner":
return fst.st_uid, fst.st_gid
def chown(fn, uid=None, gid=None):
"""
Поменять uid и gid файла если текущие отличаются
:param fn: файл
:param uid: UID или None
:param gid: GID или None
"""
f_stat = os.stat(fn)
if uid is None:
uid = f_stat.st_uid
if gid is None:
gid = f_stat.st_gid
if uid != f_stat.st_uid or gid != f_stat.st_gid:
os.lchown(fn, uid, gid)
class FilePermission():
"""
Права на файл
"""
UserWrite = 0o200
UserRead = 0o400
UserExecute = 0o100
UserAll = UserWrite | UserRead | UserExecute
GroupWrite = 0o20
GroupRead = 0o40
GroupExecute = 0o10
GroupAll = GroupWrite | GroupRead | GroupExecute
OtherWrite = 0o2
OtherRead = 0o4
OtherExecute = 0o1
OtherAll = OtherWrite | OtherRead | OtherExecute
SetUid = 0o4000
SetGid = 0o2000
StickyBit = 0o1000
def chmod(fn, mode):
"""
Поменять права на файл если текущие права отличаются
:param fn: файл
:param mode: права (0755)
"""
f_stat = os.stat(fn)
if f_stat.st_mode != mode:
os.chmod(fn, mode)
def chownR(directory, uid, gid):
"""Recusive chown"""
def chownPaths(rootPath, listPath, uid, gid):
for chPath in listPath:
chownPath = path.join(rootPath, chPath)
statInfo = os.lstat(chownPath)[stat.ST_MODE]
if stat.S_ISLNK(statInfo):
os.lchown(chownPath, uid, gid)
else:
os.chown(chownPath, uid, gid)
for root, dirs, files in os.walk(directory):
# меняем владельца директории
os.chown(root, uid, gid)
# Меняем владельца директорий
chownPaths(root, dirs, uid, gid)
# Меняем владельца файлов
chownPaths(root, files, uid, gid)
return True
class UnixUtilityFile:
def __init__(self, flags):
self.flags = flags
def load(self):
pass
def setflags(self, flags):
self.flags = flags
def close(self):
pass
def errno(self):
pass
def _get_cmd_by_flags(self, flags):
"""
Получить команду для выполнения по флагам
"""
paramdict = {MAGIC_CONTINUE: "-k",
MAGIC_SYMLINK: "-L",
MAGIC_MIME_TYPE: "--mime-type",
MAGIC_MIME_ENCODING: "--mime-encoding",
MAGIC_COMPRESS: "-z"}
appendParam = [paramdict[x] for x
in sorted(paramdict.keys()) if flags & x]
fileCmd = getProgPath('/usr/bin/file')
return [fileCmd, '-b'] + appendParam
def file(self, filename):
if path.exists(filename):
processFile = process(
*(self._get_cmd_by_flags(self.flags) + [filename]))
if processFile.success():
return processFile.read().rstrip()
return None
def buffer(self, data):
processFile = Popen(
self._get_cmd_by_flags(self.flags) + ["-"],
stdout=PIPE, stdin=PIPE, stderr=None)
try:
out, err = processFile.communicate(input=data)
if processFile.returncode == 0:
return out.rstrip()
finally:
try:
processFile.terminate()
except OSError:
pass
return None
class typeFile():
"""Получение типа файла"""
def __init__(self, magic=MAGIC_MIME_ENCODING | MAGIC_MIME_TYPE):
if type_file is None:
self.magicObject = UnixUtilityFile(MAGIC_NONE)
else:
self.magicObject = type_file(MAGIC_NONE)
self.magicObject.load()
self.magicObject.setflags(magic)
self.magic = magic
def __del__(self):
"""Закрываем magic"""
self.magicObject.close()
def getMTypeBuf_(self, buf):
try:
ret = self.magicObject.buffer(buf).decode('utf-8')
except UnicodeDecodeError:
ret = self.magicObject.buffer(buf)
return None
return ret
def getMTypeBuf(self, buf):
self.magicObject.setflags(self.magic & (~(self.magic&0x4)))
rdtype = self.getMTypeBuf_(buf)
if not self.magic & 0x4:
return rdtype
if "LZ4" in rdtype:
arch_cmd = '/usr/bin/lz4'
elif "XZ" in rdtype:
arch_cmd = '/usr/bin/xz'
elif "bzip2" in rdtype:
arch_cmd = '/usr/bin/bzip2'
elif "gzip" in rdtype:
arch_cmd = '/bin/gzip'
elif "Zstandard" in rdtype:
arch_cmd = '/usr/bin/zstd'
else:
return rdtype
self.magicObject.setflags(self.magic)
with open(os.devnull, 'w') as DEVNULL:
gz = Popen([arch_cmd, "-dc"], stdout=PIPE, stderr=DEVNULL,
stdin=PIPE, close_fds=True)
try:
return "{} ({})".format(
self.getMTypeBuf_(gz.communicate(input=buf)[0]),
rdtype)
finally:
try:
gz.terminate()
except OSError:
pass
def getMType(self, filename):
"""Информация о типе файла"""
try:
return self.magicObject.file(filename)
except UnicodeDecodeError:
try:
return self.magicObject.file(filename.decode('utf-8'))
except UnicodeDecodeError:
return None
# fix for kernel 3.7.7 (bad work samba)
if ret is None and self.magicObject.errno() == 5:
r, w = os.pipe()
devnull = os.open(os.devnull, os.O_WRONLY)
cat = subprocess.Popen(['/bin/cat', filename], stdout=w,
stderr=devnull, close_fds=True)
ret = self.magicObject.descriptor(r)
os.close(w)
os.close(devnull)
cat.wait()
return ret
def isBinary(self, filename):
"""является ли файл бинарным"""
mime = self.getMType(filename)
if mime:
if mime.startswith('text'):
return False
else:
return True
return None
class processProgress(process):
"""
Generator like process progress
"""
def __init__(self, command, *params, **kwarg):
process.__init__(self, command, *params, **kwarg)
self.readsize = kwarg.get("readsize", 10)
self.delimeter = re.compile(b"\n")
self.init(**kwarg)
self._cachedata = []
self.buf = b""
def init(self, *args, **kwarg):
pass
def processInit(self):
pass
def processEnd(self):
pass
def processString(self, strdata):
self._cachedata.append(strdata)
return strdata
def processBuffer(self, buf):
return b""
def progress(self):
try:
res = self.processInit()
if res is not None:
yield res
self._open()
if self.cacheresult is None:
self.buf = b""
fd = self.pipe.stdout.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
hasData = True
while hasData or self.pipe.poll() is None:
hasData = False
try:
part = self.pipe.stdout.read(self.readsize)
if not part:
continue
except IOError as er:
# re-raise not "Resource temporary unavailable"
if er.errno != 11:
raise
continue
hasData = True
if self.buf:
self.buf += part
else:
self.buf = part
res = self.processBuffer(part)
if res:
yield res
while self.delimeter.search(self.buf):
strdata, self.buf = self.delimeter.split(self.buf, 1)
res = self.processString(strdata)
if res:
yield res
res = self.processEnd()
if res is not None:
yield res
except KeyboardInterrupt:
self.pipe.kill()
raise
finally:
if self._cachedata:
self.cacheresult = b"\n".join((x if isinstance(x, bytes)
else x.encode("UTF-8") for x in self._cachedata))
else:
self.cacheresult = b""
def countFiles(dirpath, onefilesystem=True):
"""
Count files in specified dirpath
"""
num = 1
for dirpath, dirnames, filenames in os.walk(dirpath):
num += len(set(dirnames) | set(filenames))
if onefilesystem:
mountDirs = (x for x in dirnames if path.ismount(path.join(dirpath, x)))
for dirname in mountDirs:
dirnames.remove(dirname)
return num
def getFilesCount(directory):
"""Alias for compatibility"""
return countFiles(directory, onefilesystem=False)
def copyDir(srcDir, destDir):
"""Копируем директорию srcDir в destDir
При копировании сохраняются владелец, группа, права
"""
def ignoreFile(pathname, names):
"""Игнорирование сокетов при копировании"""
ignore = []
for name in names:
if stat.S_ISSOCK(os.lstat(path.join(pathname, name))[stat.ST_MODE]):
ignore.append(name)
return ignore
copytree(srcDir, destDir, ignore=ignoreFile)
return True
def removeDir(rmDir):
"""Рекурсивное удаление директории"""
rmtree(rmDir)
return True
def clearDirectory(dn):
"""
Очистить содержимое директории
:param dn:
:return:
"""
for dn in listDirectory(dn, fullPath=True):
removeDir(dn)
def check_rw(dn):
"""
Проверить досупена ли указанная директория на запись
:param dn:
:return:
"""
def _random_string(l=10):
return "".join(
random.choice(string.ascii_letters) for i in range(0, l))
mark_file = None
while mark_file is None or path.exists(mark_file):
mark_file = path.join(dn, "_mark_%s" % _random_string())
try:
open(mark_file, 'w').close()
os.unlink(mark_file)
return True
except (OSError, IOError):
return False
def quite_unlink(fn):
"""
Проверить существует ли файл и удалить его, не поднимать Exception
если не удалось удалить
:param fn:
:return:
"""
with ignore(OSError):
if path.lexists(fn):
os.unlink(fn)
def removeFileWithEmptyDirectory(rmName, stopDirectory="/"):
"""
Удалить файл и все пустые родительские директории
:param rmName: путь до файла или директории
"""
if rmName != stopDirectory and path.exists(rmName):
if path.isdir(rmName):
if not listDirectory(rmName, fullPath=True):
os.rmdir(rmName)
removeFileWithEmptyDirectory(path.dirname(rmName),
stopDirectory=stopDirectory)
else:
os.unlink(rmName)
removeFileWithEmptyDirectory(path.dirname(rmName),
stopDirectory=stopDirectory)
def pathJoin(*paths):
"""Складывает пути, в отличии от os.path.join, складывает абсолютные пути"""
if len(paths) == 1:
return paths[0]
return reduce(path.join, [x for x
in (y.startswith("/") and y[1:] or y for y
in paths[1:]) if x and x != "/"], paths[0])
def listDirectory(directory, fullPath=False, onlyDir=False):
"""Get files from directory, if it exists"""
#just to make sure:
if isinstance(directory, bytes):
directory = directory.decode("UTF-8")
if not path.exists(directory):
return []
try:
if fullPath:
if onlyDir:
return [x for x in [path.join(directory, z) for z in os.listdir(directory)] if path.isdir(x)]
else:
return [path.join(directory, x) for x in os.listdir(directory)]
else:
if onlyDir:
return [x for x in os.listdir(directory) if path.isdir(path.join(directory, x))]
else:
return os.listdir(directory)
except OSError:
pass
return []
def makeDirectory(pathname, force=False):
"""
Создать каталог вместе с вышестоящими. force - используется для удаления
файла, который является частью пути.
Возвращает False при ошибке"""
try:
parent = path.split(path.normpath(pathname))[0]
if not path.exists(parent):
makeDirectory(parent)
else:
if path.exists(pathname):
if force and not path.isdir(pathname):
os.unlink(pathname)
else:
return True
os.mkdir(pathname)
return True
except (OSError, IOError):
return False
def readLinesFile(filename, grab=False, binary=False):
"""Read file by line"""
mode = 'rb' if binary else 'r'
encoding = None if binary else 'utf-8'
startsymbol = b'#' if binary else "#"
line_break = b'\n' if binary else '\n'
try:
if path.exists(filename):
for line in open(filename, mode, encoding=encoding):
if grab:
line = line.strip()
if not line.startswith(startsymbol):
yield line
else:
yield line.rstrip(line_break)
except (OSError, IOError):
pass
# finally:
# raise StopIteration
def isEmptyFile(filename, grab=False):
return not any(readLinesFile(filename, grab=grab))
def grepFile(filename, regexp, flags=0):
"""
Получить из файла данные по регулярному выражению
"""
data = readFile(filename)
match = re.search(regexp, data, flags=flags)
if match:
return match.group()
return ""
def readFile(filename, *, encoding='utf-8', binary=False):
"""
Прочитать целый файл или вернуть пустую строку в случае ошибки
"""
try:
if path.exists(filename):
mode = 'rb' if binary else 'r'
encoding = None if binary else encoding
with open(filename, mode, encoding=encoding) as f:
tmp = f.read()
return tmp
except (OSError, IOError) as e:
mod, lno = tools.get_traceback_caller(*sys.exc_info())
sys.stderr.write("WARNING: file read error, {}({}:{})\n".format(
str(e), mod, lno))
sys.stderr.flush()
return "" if not binary else b""
def readFileEx(filename, tailbyte=None, headbyte=None, grab=False):
"""
Прочитать целый файл или вернуть пустую строку.
tailbyte: прочитать только последнее указанное количество байт
headbyte: прочитать только первое указанное количество байт
grab: отбросить пустые строки и комментарии (#)
"""
try:
if path.exists(filename):
with open(filename, 'rb') as f:
if grab:
filterfunc = lambda s: b"\n".join(
x for x in s.split(b"\n")
if not x.startswith(b"#") and x.strip())
else:
filterfunc = lambda x: x
if tailbyte:
seeksize = max(0, os.stat(filename).st_size - tailbyte)
if seeksize:
f.seek(seeksize)
if headbyte:
return filterfunc(f.read(headbyte))
else:
return filterfunc(f.read())
except (OSError, IOError):
mod, lno = tools.get_traceback_caller(*sys.exc_info())
sys.stderr.write("WARNING: file read error, {}({}:{})\n".format(
str(e), mod, lno))
sys.stderr.flush()
return b""
def writeFile(filename, *, encoding='utf-8', binary=False):
"""
Открыть файл на запись и создать необходимые каталоги
"""
dn = path.dirname(filename)
if not path.exists(dn):
os.makedirs(dn)
mode = 'wb' if binary else 'w'
encoding = None if binary else encoding
return open(filename, mode, encoding=encoding)
class GetProgPath():
"""Get full path of program or False"""
cache = {}
def __call__(self, progname, prefix="/"):
baseprogname = path.basename(progname)
key = (baseprogname, prefix)
if key in self.cache:
return self.cache[key]
bindir = ('/bin', '/sbin', '/usr/sbin', '/usr/bin')
if progname.startswith('/'):
if path.exists(pathJoin(prefix, progname)):
self.cache[key] = progname
return progname
for progname in (pathJoin(x, baseprogname) for x in bindir):
if path.exists(pathJoin(prefix, progname)):
self.cache[key] = progname
return progname
return False
# env = {"LANG":"C"}
# env.update(os.environ.items() + [("PATH",common.getpathenv())] +\
# env.items())
# res = runOsCommand("which %s"%baseprogname,env_dict=env)
# if path.isabs(progname) and path.exists(progname):
# self.cache[baseprogname] = progname
# return self.cache[baseprogname]
# elif res[0] == 0:
# self.cache[baseprogname] = res[1][0].strip()
# return self.cache[baseprogname]
# else:
# return False
getProgPath = GetProgPath()
def checkUtils(*utils):
"""Check utils, exit if it not found and return fullpath"""
retval = []
for util in utils:
utilPath = getProgPath(util)
if not utilPath:
raise FilesError(_("Command not found '%s'") %
path.basename(util))
retval.append(utilPath)
if len(retval) == 1:
return retval[0]
else:
return retval
def getCmdLineParam(paramName):
"""Get value of param /proc/cmdline. If param not found then empty.
"""
cmdLine = '/proc/cmdline'
paramName = "%s=" % paramName
params = [x.partition('=')[2] for x
in readFile(cmdLine).split(' ') if x.startswith(paramName)]
if params:
return params[-1]
else:
return ""
def tarLinks(rootpath, archpath, skip=()):
"""
Поместить все симлинки в архив
"""
links = []
lenprefix = len(path.normpath(rootpath)) + 1
if path.exists(archpath):
os.unlink(archpath)
# create arch
tar = tarfile.open(archpath, "w:bz2")
# find links
if skip:
reSkip = re.compile("|".join((x.replace("*", ".*") for x in skip))).search
else:
reSkip = lambda x: False
for link in filterfalse(reSkip,
find(rootpath, filetype=FindFileType.SymbolicLink,
onefilesystem=True)):
ti = tar.gettarinfo(link)
ti.name = link[lenprefix:]
tar.addfile(ti)
links.append(link)
tar.close()
return links
def getch():
"""
Get char from keyboard
"""
import sys, tty, termios
try:
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(sys.stdin.fileno())
ch = sys.stdin.read(1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
return ch
except Exception:
return ""
class FindFileType():
"""
Типы файлов для find
"""
RegularFile = "f"
Directory = "d"
SymbolicLink = "l"
Block = "b"
Character = "c"
NamedPipe = "p"
def find(directory, onefilesystem=False, filetype=None,
fullpath=True, depth=None, downfilter=None):
"""
Поиск файлов в каталоге
:param directory: каталог в котором производится поиск
:param onefilesystem: искать только в текущей файловой системе
:param filetype: тип файла (FindFileType)
:param fullpath: выдавать полный путь
:param depth: глубина поиска
:param downfilter: фильтр поиска
:return: генератор списка файлов
"""
directory = path.normpath(directory)
ldirectory = len(directory)
if downfilter or onefilesystem or depth:
_downfilter = lambda dp: (
(depth is None or dp[ldirectory:].count('/') < depth) and
(not downfilter or downfilter(dp)) and
(not onefilesystem or not path.ismount(dp)))
else:
_downfilter = None
if filetype:
def dirfilter(dp):
link = path.islink(dp)
return ((FindFileType.SymbolicLink in filetype and link)
or (FindFileType.Directory in filetype and not link))
_dirfilter = dirfilter
else:
_dirfilter = None
_dirfullpath = lambda root, fn: path.join(root, fn)
if filetype:
def filefilter(fp):
link = path.islink(fp)
return ((FindFileType.SymbolicLink in filetype and link)
or (FindFileType.RegularFile in filetype and not link))
_filefilter = filefilter
else:
_filefilter = None
_filefullpath = lambda root, fn: path.join(root, fn)
if directory.endswith('/'):
clearlen = len(directory)
else:
clearlen = len(directory) + 1
shortpath = lambda fp: fp[clearlen:]
for root, directories, files in os.walk(directory):
for fn in files:
fp = _filefullpath(root, fn)
if not _filefilter or _filefilter(fp):
yield fp if fullpath else shortpath(fp)
for dn in directories[:]:
dp = _dirfullpath(root, dn)
if not _dirfilter or _dirfilter(dp):
yield dp if fullpath else shortpath(dp)
if _downfilter and not _downfilter(dp):
directories.remove(dn)
def isEmpty(dn):
"""
Проверить пустая ли директория
:param dn: директория
:return:
"""
return not listDirectory(dn)
def copyWithPath(fn, dest, prefix=None):
"""
Копировать файл в dst с созданием каталогов
dest: каталог для копирования
fn: имя файла
prefix: префикс исходного файла
Пример:
copyWithPath("/etc/conf.d/net", "/tmp/conf", prefix="/etc")
/etc/conf.d/net -> /tmp/conf/conf.d/net
"""
if prefix:
destFile = pathJoin(dest, fn[len(prefix):])
else:
destFile = pathJoin(dest, path.basename(fn))
try:
destDir = path.dirname(destFile)
if not path.exists(destDir):
os.makedirs(destDir)
copy2(fn, destFile)
except Exception:
raise FilesError(
_("Failed to copy '%(src)s' to '%(dst)s'") %
{'src': fn,
'dst': destFile})
def getLoopFromPath(directory):
"""
Получить loop, использующие файлы в данной директории
"""
losetup = getProgPath('losetup')
p = process(losetup, '-a')
rePattern = re.compile('(^/dev/loop[^:]+)[^(]+\(([^)]+)\).*')
return [x[0] for x in reSearch(rePattern, p.readlines())
if x[1].startswith(directory)]
def getMdRaidDevices():
"""
Получить словарь: какой mdraid какие использует устройства
"""
reMdInfo = re.compile('^(\w+)\s+:\s+active.*?raid\d+\s+(.*)')
return dict(((x[0], [y.partition('[')[0] for y in x[1].split()]) for x
in reSearch(reMdInfo, readLinesFile('/proc/mdstat'))))
class PercentProgress(processProgress):
"""
Объект выдает прогресс, ища в выводе \d+%
Args:
part: количество прогрессов в программе
delimeter: разделители строк по умолчанию \\n и \\r
cachefilter: фильтр вывода программы (регулярная строка)
startpart: используется для вывода процентов прогрессбар
с определенного места (0 по умолчанию)
end: конечный прогрессбар (по умолчанию)
atty: для получения данных создается pty
"""
def init(self, *args, **kwargs):
self.rePerc = re.compile(b"(\d+(?:\.\d+)?)%", re.S)
self.part = kwargs.get("part", 1)
if self.part < 1:
self.part = 1
self.add_offset = 100 // self.part
self.offset = 0 + kwargs.get("startpart", 0) * self.add_offset
self.is_end = kwargs.get("end", True)
self.stderr = STDOUT
self.delimeter = re.compile(bytes("[%s]" % kwargs.get("delimeter", "\n\r"), encoding="UTF-8"))
self.cachedata = re.compile(bytes(kwargs.get("cachefilter",
"((?:\[31;01m\*|\[33;01m\*|"
"Bad Option|No space left|FATAL ERROR|SYNTAX:|mkisofs:|"
"error:|warning:|fatal:).*)"), encoding="UTF-8"))
self.atty = kwargs.get("atty", False)
self.alldata = b""
def _open(self):
self.master, self.slave = None, None
if self.atty:
if not self.pipe:
self.master, self.slave = os.openpty()
import termios, struct, fcntl
TIOCSWINSZ = getattr(termios, 'TIOCSWINSZ', -2146929561)
s = struct.pack('HHHH', 25, 80, 0, 0)
fcntl.ioctl(self.slave, TIOCSWINSZ, s)
self.pipe = Popen(self.command,
stdout=self.slave,
stderr=self.slave,
cwd=self.cwd,
close_fds=True,
env=self.envdict)
self.pipe.stdout = os.fdopen(self.master, 'rb')
else:
process._open(self)
def processInit(self):
self.percent = 0
self.showval = 0
if not self.offset:
return 0
def processEnd(self):
if not self.slave is None:
os.close(self.slave)
if self.is_end:
self.percent = 100
return 100
def processString(self, strdata):
self.alldata += strdata
match = self.rePerc.search(strdata)
resSearch = self.cachedata.search(strdata)
if resSearch:
self._cachedata.append(
re.sub(b"\[31;01m\*|\[33;01m\*|\[.*?m",
b"", resSearch.group(1)))
if match:
percent = int(float(match.group(1)))
if percent < self.percent:
self.offset += self.add_offset
percent /= self.part
if percent != self.percent:
self.percent = percent
showval = min(99, self.percent + self.offset)
if showval != self.showval:
self.showval = showval
return self.showval
def set_active_tty(ttynum):
"""
Установить активный терминал
"""
chvt = getProgPath("/usr/bin/chvt")
if chvt:
os.system('chvt %d &>/dev/null' % ttynum)
def get_active_tty():
"""
Получить активный терминал
"""
getvt = getProgPath("/usr/bin/fgconsole")
if getvt:
p = process(getvt)
if p.success():
return p.read().strip()
return None
class scanDirectory():
"""Класс для cканирования директории"""
def processingFile(self, pathname, prefix):
"""Обработка в случае файла"""
return True
def processingDirectory(self, pathname, prefix):
"""Обработка в случае директории если возвращаем None то пропуск дир."""
return True
def scanningDirectory(self, scanDir, skipFile=(), skipDir=(),
prefix=None, flagDir=False):
"""Сканирование и обработка шаблонов в директории scanDir"""
ret = True
if not prefix:
prefix = path.join(scanDir, "")[:-1]
if not flagDir:
# проверка корневой директории
retDir = self.processingDirectory(scanDir, scanDir)
if retDir is None:
return None
elif retDir is False:
return False
if flagDir or stat.S_ISDIR(os.lstat(scanDir)[stat.ST_MODE]):
for absPath in sorted(listDirectory(scanDir, fullPath=True)):
relPath = absPath.split(prefix)[1]
stInfo = os.lstat(absPath)
statInfo = stInfo[stat.ST_MODE]
if stat.S_ISREG(statInfo):
# Обработка файла
if relPath in skipFile:
continue
if not self.processingFile(absPath, prefix):
return False
elif stat.S_ISDIR(statInfo):
# Обработка директории
if relPath in skipDir:
continue
retDir = self.processingDirectory(absPath, prefix)
if retDir is None:
continue
elif retDir is False:
return False
ret = self.scanningDirectory(absPath, skipFile,
skipDir, prefix, True)
if ret is False:
return False
return ret
def calculate_exclude(base_dn, exclude=(), include=()):
"""
Вычисление исключений с использованием фильтров include
"""
# корректируем входные данные, чтобы не учитывался первый /
exclude = [x[1:] if x.startswith('/') else x for x in exclude]
include = [x[1:] if x.startswith('/') else x for x in include]
# перебираем все исключения
for exclude_dn in exclude:
# ищем фильтры для текущего исключения
f_include = [x for x in include
if x == exclude_dn or x.startswith("%s/" % exclude_dn)]
# если фильтра нет - то исключение добавляется полностью
if not f_include:
yield exclude_dn
else:
# пребираем все файлы в указанно исключении
for root, dirs, files in os.walk(path.join(base_dn, exclude_dn)):
rel_root = root[len(base_dn) + 1:]
# если файл указан в фильтре, пропускаем его
for fn in (path.join(rel_root, x) for x in files):
if fn not in f_include:
yield fn
filtered_dn = []
for dn in dirs:
rel_dn = path.join(rel_root, dn)
# если директория не используется как часть фильтра - возвращем её
if all(not x.startswith("%s/" % rel_dn) and x != rel_dn
for x in f_include):
filtered_dn.append(dn)
yield rel_dn
elif rel_dn in f_include:
filtered_dn.append(dn)
# удаляем из обрабатываемых дирекоторий не использующиеся в фильтре
# или указанную конечную директорию
for dn in filtered_dn:
dirs.remove(dn)
def get_free_dirname(dn):
"""
Возвращает имя указанной директорий если оно отсутствует либо добавляются
цифры
:param dn:
:return:
"""
if not path.exists(dn):
return dn
base_dn, name = path.split(dn)
for n in range(0, 9999):
dn = path.join(base_dn, "%s.%02d" % (name, n))
if not path.exists(dn):
return dn
raise FilesError(_("No directory name available for %s") % dn)
def dir_sync(src, dst):
"""
Скопировать файлы из src в dst, заменяя одинаковые файлы
:return:
"""
if not path.isdir(src):
raise FilesError(_("%s is not directory") % src)
if path.lexists(dst):
dst_stat = os.lstat(dst)
if stat.S_ISDIR(dst_stat.st_mode):
for fn in listDirectory(src):
src_fn = path.join(src, fn)
dst_fn = path.join(dst, fn)
fn_stat = os.lstat(src_fn)
if stat.S_ISDIR(fn_stat.st_mode):
dir_sync(src_fn, dst_fn)
continue
if path.lexists(dst_fn):
dst_stat = os.lstat(dst_fn)
if stat.S_ISDIR(dst_stat.st_mode):
shutil.rmtree(dst_fn)
else:
os.unlink(dst_fn)
shutil.copy2(src_fn, dst_fn)
return
else:
os.unlink(dst)
shutil.copytree(src, dst)
def tar_directory(dn, archfile):
"""
Заархивировать указанную директорию
:param dn:
:param archfile:
:return:
"""
try:
with tarfile.open(archfile, "w:bz2") as tar:
for x in listDirectory(dn, fullPath=True):
tar.add(x, path.basename(x))
except (IOError, OSError) as e:
raise FilesError(_("Failed to create tarball: %s") % str(e))
def tar_xz_directory(dn, archfile):
"""
Заархивировать указанную директорю в xz формате
:param dn:
:param archfile:
:return:
"""
try:
tar = "/bin/tar"
p = process(tar, "-cJf", archfile, "-C", dn, *listDirectory(dn))
if p.failed():
raise FilesError(_("Failed to create tarball: %s")
% str(p.readerr()))
except (IOError, OSError) as e:
raise FilesError(_("Failed to create tarball: %s") % str(e))
def tar_list(file_list, archfile, prefix='/'):
"""
Заархивировать указанный список
:param file_list:
:param archfile:
:param prefix:
:return:
"""
try:
with tarfile.open(archfile, "w:bz2") as tar:
for fn in (pathJoin(prefix, x) for x in file_list):
tar.add(fn, path.relpath(fn, prefix))
except (IOError, OSError) as e:
raise FilesError(_("Failed to create tarball: %s") % str(e))
def tar_unpack(archfile, target_dn):
"""
Распаковать архив в указанную директорию
:param archfile:
:param target_dn:
:return:
"""
try:
with tarfile.open(archfile, "r:bz2") as tar:
for item in tar:
tar.extract(item, target_dn)
os.utime(path.join(target_dn, item.path),
(item.mtime, item.mtime))
except (tarfile.TarError, IOError, OSError) as e:
raise FilesError(_("Failed to create tarball: %s") % str(e))
class RsyncOptions():
Xattrs = "--xattrs"
KeepDirLinks = "--keep-dirlinks"
OneFileSystem = "--one-file-system"
Archive = "--archive"
FilesFromStdin = "--files-from=-"
@classmethod
def Chown(cls, uid, gid):
return "--chown={0}:{1}".format(uid, gid)
# опции используемые при "скрывании пакета"
# --keep-dirlinks решает проблему восстановления "скрытых" файлов
HidePackageOpts = (Xattrs, KeepDirLinks, OneFileSystem, Archive)
def rsync_files(source_dn, target_dn, *file_list, **kw):
"""
Перенести файлы file_list из prefix в target_dn с сохранением путей и
аттрибутов
:param file_list:
:param target_dn:
:param prefix:
:return:
"""
opts = kw.get('opts', ())
rsync = getProgPath('/usr/bin/rsync')
source_dn = "%s/" % path.normpath(source_dn)
target_dn = "%s/" % path.normpath(target_dn)
if file_list:
params = list(opts) + [RsyncOptions.FilesFromStdin, source_dn,
target_dn]
else:
params = list(opts) + [source_dn, target_dn]
p = process(rsync, *params, stderr=PIPE)
if file_list:
for fn in file_list:
p.write("%s\n" % fn)
if p.failed():
raise FilesError(_("Failed to rsync {src} to {dst}: {error}").format(
src=source_dn, dst=target_dn,
error=p.readerr()))
def xz(data, decompress=False):
xz_prog = getProgPath('/usr/bin/xz')
p = Popen([xz_prog, "-dc" if decompress else "-zc"],
stdin=PIPE, stderr=PIPE, stdout=PIPE, close_fds=True)
data, errors = p.communicate(input=data)
p.poll()
if p.returncode == 0:
return data
else:
raise FilesError(errors)
class XZStreamRead():
"""
XZ объект для чтения данных для tarfile
"""
def __init__(self, data):
self._data = xz(data, decompress=True)
self.pos = 0
def tell(self):
return self.pos
def seek(self, n=0):
self.pos = n
def read(self, size=None):
if size is None:
try:
return self._data[self.pos:]
finally:
self.pos = len(self._data)
try:
return self._data[self.pos:self.pos+size]
finally:
self.pos += min(size,len(self._data))
@contextmanager
def xztaropen(fn):
f = None
try:
f = tarfile.TarFile(mode="r", fileobj=XZStreamRead(readFile(fn, binary=True)))
yield f
finally:
if f:
f.close()
def sha256sum(*fns):
"""
Вычислить контрольную сумму у перечисленных файлов
:param fns:
:return:
"""
kb = 1024
hasher = sha256()
for fn in fns:
with open(fn, "rb") as f:
b = True
while b:
b = f.read(512 * kb)
hasher.update(b)
return hasher.hexdigest()
def forceFileRename(source_fn, target_fn):
"""
Принудительно перенести файл, даже если назначение занято
:param source_fn:
:param target_fn:
:return:
"""
if path.exists(target_fn) and not path.isfile(target_fn):
if path.isdir(target_fn):
removeDir(target_fn)
else:
os.unlink(target_fn)
os.rename(source_fn, target_fn)
def move_files(source, target):
"""
Переменстить все файлы из source в target вместе со структурой
:param source:
:param target:
:return:
"""
for fn in find(source, filetype=FindFileType.RegularFile,
fullpath=False):
source_fn = path.join(source, fn)
target_fn = path.join(target, fn)
target_dn = path.dirname(target_fn)
# принудительно создать каталог, удалив файлы являющиейся частью пути
makeDirectory(target_dn, force=True)
forceFileRename(source_fn, target_fn)
clearDirectory(source)
class RCSError(Exception):
pass
class DirectoryRCS():
"""
Объект оперирует тремя каталогами, в котором хранит:
фиксированную версию данных
изменения для фиксирования данных (без учёта удаления файлов)
рабочая версия данных
"""
def __init__(self, workdir, indexdir, fixeddir):
self.workdir = workdir
self.indexdir = indexdir
self.fixeddir = fixeddir
def is_clear(self):
"""
Изменения есть только в workdir или в fixed
:return:
"""
return (isEmpty(self.indexdir) and isEmpty(self.workdir) and
not isEmpty(self.fixeddir))
def is_worked(self):
"""
Есть непроиндексированные изменения
:return:
"""
return not isEmpty(self.workdir)
def not_prepared(self):
return (isEmpty(self.indexdir) and not isEmpty(self.workdir) and
isEmpty(self.fixeddir))
def fixing(self):
"""
Зафиксировать все изменения
:return:
"""
for dn in (self.indexdir, self.workdir):
if not isEmpty(dn):
self.movedata(dn, self.fixeddir)
if path.exists(dn):
removeDir(dn)
def indexing(self):
"""
Зафиксировать рабочие изменения в индекс
:return:
"""
self.movedata(self.workdir, self.indexdir)
def movedata(self, source, target):
"""
Поместить все work изменения в index
:return:
"""
if not path.exists(target) and path.exists(source):
try:
os.rename(source, target)
except OSError as e:
raise RCSError(_("Failed to rename data: %s") % str(e))
makeDirectory(source)
else:
try:
move_files(source, target)
except FilesError as e:
raise RCSError(_("Failed to move data: %s") % str(e))
class RealFs(tools.GenericFs):
"""
Файловая система, предоставляющая доступ к реальным данным
"""
def __init__(self, prefix="/"):
self.prefix = prefix
if prefix == "/":
self.remove_prefix = lambda x: x
else:
self.remove_prefix = self._remove_prefix
def _remove_prefix(self, dn):
return dn[:len(self.prefix)]
def _path(self, dn):
return pathJoin(self.prefix, dn)
def exists(self, fn):
return path.lexists(self._path(fn))
def read(self, fn, *, encoding="UTF-8", binary=False):
return readFile(self._path(fn), encoding=encoding, binary=binary)
def glob(self, dn):
for fn in glob(self._path(dn)):
yield self.remove_prefix(fn)
def realpath(self, fn):
return self.remove_prefix(path.realpath(fn))
def write(self, fn, data, *, encoding="UTF-8", binary=False):
with writeFile(fn, encoding=encoding, binary=binary) as f:
f.write(data)
def listdir(self, dn, fullpath=False):
if fullpath:
return [self.remove_prefix(x)
for x in listDirectory(dn, fullPath=True)]
else:
return listDirectory(dn, fullPath=False)
def checkDigestFile(digestfile):
"""Check digest by digestfile"""
reEntry = re.compile(r"# (\S+) HASH\n(\S+) (\S+)", re.S)
result = []
for alg, hashdata, filename in reEntry.findall(readFile(digestfile)):
if hasattr(hashlib, alg.lower()):
hashobj = getattr(hashlib, alg.lower())
filename = path.join(path.dirname(digestfile), filename)
if os.path.exists(filename):
with open(filename, 'r', encoding='utf-8') as f:
digest = hashobj(f.read())
result.append((alg,
digest.hexdigest().upper() == hashdata.upper()))
return result
def getParentPids(pid):
"""
Получить ID всех родительских процессов
"""
pid = str(pid)
while pid != "0":
statFile = "/proc/%s/stat" % pid
try:
data = readFile(statFile).split()
if len(data) > 4:
pid = data[3]
yield pid
continue
except (OSError,IOError) as e:
pass
break
def getRunCommands(not_chroot=False, chroot=None, uid=None, withpid=False,
with_lxc=False):
"""List run program"""
def get_mnt(pid):
try:
return os.readlink("/proc/%s/ns/mnt" %pid).strip()
except Exception:
return ""
main_mnt = get_mnt("1")
def getCmd(procNum):
cmdLineFile = '/proc/%s/cmdline' % procNum
try:
if uid is not None:
fstat = os.stat("/proc/%s"%procNum)
if fstat.st_uid != uid:
return ""
if path.exists(cmdLineFile):
if not with_lxc:
if get_mnt(procNum) != main_mnt:
return ""
if not_chroot:
root_link = '/proc/%s/root' % procNum
if os.readlink(root_link) != "/":
return ""
if chroot is not None:
root_link = '/proc/%s/root' % procNum
if os.readlink(root_link) != chroot:
return ""
return readFile(cmdLineFile).strip()
except Exception:
pass
return ""
if not os.access('/proc', os.R_OK):
return []
if withpid:
return [x for x in [(y, getCmd(y)) for y in listDirectory('/proc') if y.isdigit()] if x[1]]
else:
return [x for x in [getCmd(y) for y in listDirectory('/proc') if y.isdigit()] if x]
class XAttrError(Exception):
pass
class xattr():
getfattr = '/bin/getfattr'
setfattr = '/bin/setfattr'
@classmethod
def get(cls, dn, attrname):
p = process(cls.getfattr, "-n", attrname, "--only-values", dn)
if p.success():
return p.read()
err = p.readerr()
raise XAttrError(err.partition(":")[2].strip())
@classmethod
def set(cls, dn, attrname, value):
p = process(cls.setfattr, "-n", attrname, "-v", value, dn)
if not p.success():
err = p.readerr()
raise XAttrError(err.rpartition(":")[2].strip())