Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

595 строки
20 KiB

# vim: fileencoding=utf-8
#
from subprocess import Popen, PIPE, STDOUT
from typing import Union, List
from io import TextIOWrapper
from os import path
from .tools import GenericFS, get_traceback_caller
from glob import glob
import os
import sys
import contextlib
import re
class FilesError(Exception):
pass
class PipeProcess():
def _get_stdout(self):
return PIPE
def close(self):
pass
@property
def shell_command(self):
return ''
class KeyboardInputProcess():
def _get_stdout(self):
return None
def close(self):
pass
@property
def shell_command(self):
return ''
class Process:
'''Класс-обертка для работы с процессами.'''
STDOUT = STDOUT
PIPE = PIPE
def __init__(self, command, *parameters, **kwargs):
if 'stdin' not in kwargs:
self._stdin = PipeProcess()
elif kwargs['stdin'] == PIPE:
self._stdin = PipeProcess()
elif kwargs['stdin'] is None:
self._stdin = KeyboardInputProcess()
else:
self._stdin = kwargs['stdin']
self._stdout = kwargs.get('stdout', PIPE)
self._stderr = kwargs.get('stderr', PIPE)
self._envdict = kwargs.get('envdict', os.environ.copy())
self._envdict['LANG'] = kwargs.get('lang', 'C')
self._timeout = kwargs.get('timeout', None)
self._cwd = kwargs.get('cwd', None)
self._command = get_program_path(command)
if not self._command:
raise FilesError("command not found '{}'".format(command))
self._command = [self._command, *parameters]
self._process = None
self._iterator = iter([])
# Флаги.
self._opened = False
self._is_read = False
self._readable = False
self._writable = False
self._readable_errors = False
# I/O обработчики.
self.stdin_handler = None
self.stdout_handler = None
self.stderr_handler = None
# Кэши.
self._output_cache = ''
self._error_cache = ''
def _get_stdout(self):
self._open_process()
return self.stdout_handler
def _get_stdin(self):
return self.stdin_handler
def _open_process(self):
'''Метод для открытия процесса.'''
try:
piped_stdin = self._stdin._get_stdout()
self._process = Popen(self._command,
stdout=self._stdout,
stdin=piped_stdin,
stderr=self._stderr,
cwd=self._cwd,
close_fds=True,
env=self._envdict)
if piped_stdin == PIPE:
self.stdin_handler = TextIOWrapper(self._process.stdin,
encoding='utf8')
self._writable = True
elif piped_stdin is not None:
self.stdin_handler = self._stdin._get_stdin()
self._writable = True
if self._stdout == PIPE:
self.stdout_handler = TextIOWrapper(self._process.stdout,
encoding='utf8')
self._readable = True
if self._stderr == PIPE:
self.stderr_handler = TextIOWrapper(self._process.stderr,
encoding='utf8')
self._readable_errors = True
self._opened = True
except Exception as error:
raise FilesError(f'Can not open process. Reason: {error}')
def close(self):
'''Метод для закрытия процесса.'''
if self._opened:
if self._process.stdin:
self.stdin_handler.close()
self._stdin.close()
self._opened = False
def write(self, data):
'''Метод для записи данных в stdin процесса.'''
if not self._opened:
self._open_process()
self._is_read = False
self._output_cache = ''
try:
if self._writable:
self.stdin_handler.write(data)
self.stdin_handler.flush()
else:
raise FilesError('Process stdin is not writable.')
except IOError as error:
raise FilesError(str(error))
def read(self):
'''Метод для чтения данных из stdout процесса.'''
if not self._opened and not self._writable:
self._open_process()
if not self._readable:
raise FilesError('Process is not readable.')
try:
if not self._is_read:
if self._writable:
self.close()
if self._readable:
self._output_cache = self.stdout_handler.read()
if self._readable_errors:
self._error_cache = self.stderr_handler.read()
self._process.poll()
self._is_read = True
return self._output_cache
except KeyboardInterrupt:
self.kill()
raise
def read_error(self):
'''Метод для чтения ошибок, появившихся при выполнении процесса, из его
stderr.'''
self.read()
if not self._error_cache:
try:
self._error_cache = self.stderr_handler.read()
except IOError:
self._error_cache = ''
return self._error_cache
def kill(self):
'''Метод для удаления процесса, если он работает.'''
if self._opened:
self._process.kill()
def read_lines(self):
return self.read().split('\n')
def __iter__(self):
if not self._iterator:
self._iterator = iter(self.read_lines())
return self._iterator
def next(self):
return next(self.__iter__(), None)
@property
def writable(self):
'''Метод для проверки возможности записи данных в во входной поток
процесса.'''
return self._writable
@property
def readable(self):
'''Метод для проверки возможности чтения вывода процесса.'''
return self._readable
@property
def readable_errors(self):
'''Метод для проверки возможности чтения ошибок.'''
return self._readable_errors
def return_code(self):
'''Метод возвращающий код возвращенный процессом.'''
self.read()
return self._process.returncode
@property
def shell_command(self):
'''Метод для получения эквивалентной консольной команды.'''
command = ' '.join(self._command)
previous_commands = self._stdin.shell_command
if previous_commands == '':
return command
else:
return ' | '.join([previous_commands, command])
def success(self):
'''Метод для проверки успешности выполнения процесса.'''
return self.return_code() == 0
def failed(self):
'''Метод для проверки неуспешности выполнения процесса.'''
return self.return_code() != 0
class ProgramPathCache:
'''Класс, для поиска и кэширования путей к исполнительным файлам различных
команд.'''
def __init__(self):
self._cache: dict = {}
def __call__(self, program_name: str, prefix: str = '/'):
program_base_name = path.basename(program_name)
PATH = os.environ['PATH']
PATH = PATH.split(':')
cache_key = (program_base_name, prefix)
if cache_key in self._cache:
self._cache[cache_key]
if program_name.startswith('/'):
if path.exists(join_paths(prefix, program_name)):
self._cache[cache_key] = program_name
return program_name
for program_name in (join_paths(bin_path, program_base_name)
for bin_path in PATH):
if path.exists(join_paths(prefix, program_name)):
self._cache[cache_key] = program_name
return program_name
return False
get_program_path = ProgramPathCache()
def check_command(*utils: List[str]):
'''Функция для проверки наличия той или иной команды системе.'''
output = []
for util in utils:
util_path = get_program_path(util)
if not util_path:
raise FilesError("Command not found '{}'".
format(os.path.basename(util)))
output.append(util)
if len(output) == 1:
return output[0]
else:
return output
def join_paths(*paths: List[str]) -> str:
'''Функция для объединения путей. Объединяет также абсолютные пути.'''
if len(paths) == 1:
return next(iter(paths))
paths_to_join = []
for _path in paths[1:]:
if _path.startswith('/'):
_path = _path.strip()[1:]
else:
_path = _path.strip()
if _path and _path != "/":
paths_to_join.append(_path)
output_path = path.join(paths[0], *paths_to_join)
return output_path
def read_link(file_path: str) -> str:
'''Функция для получения целевого пути символьной ссылки.'''
try:
if path.exists(file_path):
source_path = os.readlink(file_path)
return source_path
else:
return None
except (OSError, IOError) as error:
mod, lineno = get_traceback_caller(*sys.exc_info())
FilesError("can not read link: {}({}:{})".
format(str(error), mod, lineno))
def get_target_from_link(link_path: str, link_source: str,
chroot_path: str = '/') -> str:
'''Метод для получения целевого пути из целевого пути символьной ссылки
с учетом того, что целевой путь символьной ссылки может быть
относительным.'''
if os.path.isabs(link_source):
if chroot_path != '/':
return join_paths(chroot_path, link_source)
return link_source
else:
link_source = link_source.split('/')
link_dir = os.path.dirname(link_path).split('/')
if link_source[0] == '.':
link_source.pop()
else:
while link_source[0] == '..':
link_source.pop(0)
link_dir.pop(-1)
link_dir.extend(link_source)
return '/'.join(link_dir)
def read_file(file_path: str, binary: bool = False) -> Union[str, bytes]:
'''Функция для чтения файлов, возвращает текст файла.'''
try:
if path.exists(file_path):
with open(file_path, f'r{"b" if binary else ""}') as opened_file:
return opened_file.read()
except (OSError, IOError) as error:
mod, lineno = get_traceback_caller(*sys.exc_info())
raise FilesError("file read error, {0}({1}:{2})".
format(str(error), mod, lineno))
def grep_file(file_path, regexp, flags=0):
"""
Получить из файла данные по регулярному выражению
"""
data = read_file(file_path)
m = re.search(regexp, data, flags=flags)
if m:
return m.group()
return ""
def write_file(file_path):
'''Функция для открытия и записи файлов. Создает директории на пути к
целевому файлу если это необходимо. Возвращает файловый объект.'''
directory_path = path.dirname(file_path)
if not path.exists(directory_path):
os.makedirs(directory_path)
file_to_write = open(file_path, 'w')
return file_to_write
def read_file_lines(file_name, grab=False):
'''Функция для чтения файлов построчно.'''
try:
if path.exists(file_name):
for file_line in open(file_name, 'r'):
if grab:
file_line = file_line.strip()
if not file_line.startswith('#'):
yield file_line
else:
yield file_line.strip()
except (OSError, IOError):
pass
def quite_unlink(file_path):
try:
if path.lexists(file_path):
os.unlink(file_path)
except OSError:
pass
def list_directory(directory_path, fullpath=False, only_dir=False):
if not path.exists(directory_path):
return []
try:
if fullpath:
if only_dir:
return [node.path for node in os.scandir(directory_path)
if os.path.isdir(node.path)]
else:
return [node.path for node in os.scandir(directory_path)]
else:
if only_dir:
return [node.name for node in os.scandir(directory_path)
if os.path.isdir(node.path)]
else:
return os.listdir(directory_path)
except OSError:
return []
def get_directory_contents(path: str):
'''Функция для получения списка путей ко всем файлам и директориям,
содержащимся в указанной директории.'''
output = []
for entry in os.scandir(path):
try:
if entry.is_symlink():
output.append(entry.path)
elif entry.is_dir():
output.append(entry.path)
output.extend(get_directory_contents(entry.path))
elif entry.is_file():
output.append(entry.path)
except Exception:
continue
return output
def make_directory(directory_path, force=False):
try:
parent = os.path.split(path.normpath(directory_path))[0]
if not path.exists(parent):
make_directory(parent)
else:
if os.path.exists(directory_path):
if force and not os.path.isdir(directory_path):
os.remove(directory_path)
else:
return True
os.mkdir(directory_path)
return True
except (OSError, IOError):
return False
def check_directory_link(link_path, chroot_path='/'):
'''Функция для проверки наличия зацикливающихся ссылок и их корректности в
целом. В случае успешной проверки возвращает целевой путь ссылки.'''
link_target = read_link(link_path)
link_target = get_target_from_link(link_path, link_target,
chroot_path=chroot_path)
if link_target is None:
# Ссылка не существует.
raise FilesError('the source file does not exist')
if not os.path.isdir(link_target):
# Ссылка не на директорию.
raise FilesError('the source is not directory')
linked_path = os.path.abspath(link_target)
# Добавляем / к концу пути, чтобы показать, что это путь к директории.
if linked_path[-1] != '/':
linked_path = linked_path + '/'
# Пути, которые нужно проверить.
to_check = [linked_path]
# Целевые пути из встреченных ссылок.
linked_paths = {linked_path}
while to_check:
current_directory = to_check.pop()
for entry in os.scandir(current_directory):
# Обходим только директории и ссылки на директории.
if not entry.is_dir():
continue
if entry.is_symlink():
linked_path = read_link(entry.path)
linked_path = get_target_from_link(entry.path,
linked_path,
chroot_path=chroot_path)
if linked_path in linked_paths:
raise FilesError(
'the source directory contains cycled links')
linked_paths.add(linked_path)
to_check.append(linked_path)
else:
to_check.append(entry.path)
return link_target
class RealFS(GenericFS):
def __init__(self, prefix='/'):
self.prefix = prefix
if prefix == '/':
self.remove_prefix = lambda x: x
else:
self.remove_prefix = self._remove_prefix
def _remove_prefix(self, file_path):
prefix_length = len(self.prefix)
return file_path[:prefix_length]
def _get_path(self, file_path):
return join_paths(self.prefix, file_path)
def exists(self, file_path):
return os.path.lexists(self._get_path(file_path))
def read(self, file_path):
return read_file(self._get_path(file_path))
def glob(self, file_path):
for glob_path in glob(self._get_path(file_path)):
yield self.remove_prefix(glob_path)
def realpath(self, file_path):
return self.remove_prefix(path.realpath(file_path))
def write(self, file_path, data):
with write_file(file_path) as target_file:
target_file.write(data)
def listdir(self, file_path, full_path=False):
if full_path:
return [self.remove_prefix(listed_path)
for listed_path in list_directory(file_path,
full_path=full_path)]
else:
return list_directory(file_path, full_path=full_path)
def get_run_commands(not_chroot=False, chroot=None, uid=None, with_pid=False):
def get_cmdline(process_number):
cmdline_file = '/proc/{}/cmdline'.format(process_number)
try:
if uid is not None:
fstat = os.stat('/proc/{}'.format(process_number))
if fstat.st_uid != uid:
return ''
if path.exists(cmdline_file):
if not_chroot:
root_link = '/proc/{}/root'.format(process_number)
if os.readlink(root_link) != '/':
return ''
if chroot is not None:
root_link = '/proc/{}/root'.format(process_number)
if os.readlink(root_link) != chroot:
return ''
return read_file(cmdline_file).strip()
except Exception:
pass
return ''
if not os.access('/proc', os.R_OK):
return []
proc_directory = list_directory('/proc')
output = []
for file_name in proc_directory:
if file_name.isdigit():
cmdline = get_cmdline(file_name)
if cmdline:
if with_pid:
output.append((file_name, cmdline))
else:
output.append(cmdline)
return output
@contextlib.contextmanager
def stderr_devnull():
oldstderr = sys.stderr
sys.stderr = open(os.devnull, 'w')
try:
yield
finally:
sys.stderr = oldstderr