# vim: fileencoding=utf-8 # from subprocess import Popen, PIPE, STDOUT from typing import Union, List from io import TextIOWrapper from os import path from .tools import GenericFS, get_traceback_caller from glob import glob import os import sys import contextlib import re class FilesError(Exception): pass class PipeProcess(): def _get_stdout(self): return PIPE def close(self): pass @property def shell_command(self): return '' class KeyboardInputProcess(): def _get_stdout(self): return None def close(self): pass @property def shell_command(self): return '' class Process: '''Класс-обертка для работы с процессами.''' STDOUT = STDOUT PIPE = PIPE def __init__(self, command, *parameters, **kwargs): if 'stdin' not in kwargs: self._stdin = PipeProcess() elif kwargs['stdin'] == PIPE: self._stdin = PipeProcess() elif kwargs['stdin'] is None: self._stdin = KeyboardInputProcess() else: self._stdin = kwargs['stdin'] self._stdout = kwargs.get('stdout', PIPE) self._stderr = kwargs.get('stderr', PIPE) self._envdict = kwargs.get('envdict', os.environ.copy()) self._envdict['LANG'] = kwargs.get('lang', 'C') self._timeout = kwargs.get('timeout', None) self._cwd = kwargs.get('cwd', None) self._command = get_program_path(command) if not self._command: raise FilesError("command not found '{}'".format(command)) self._command = [self._command, *parameters] self._process = None self._iterator = iter([]) # Флаги. self._opened = False self._is_read = False self._readable = False self._writable = False self._readable_errors = False # I/O обработчики. self.stdin_handler = None self.stdout_handler = None self.stderr_handler = None # Кэши. self._output_cache = '' self._error_cache = '' def _get_stdout(self): self._open_process() return self.stdout_handler def _get_stdin(self): return self.stdin_handler def _open_process(self): '''Метод для открытия процесса.''' try: piped_stdin = self._stdin._get_stdout() self._process = Popen(self._command, stdout=self._stdout, stdin=piped_stdin, stderr=self._stderr, cwd=self._cwd, close_fds=True, env=self._envdict) if piped_stdin == PIPE: self.stdin_handler = TextIOWrapper(self._process.stdin, encoding='utf8') self._writable = True elif piped_stdin is not None: self.stdin_handler = self._stdin._get_stdin() self._writable = True if self._stdout == PIPE: self.stdout_handler = TextIOWrapper(self._process.stdout, encoding='utf8') self._readable = True if self._stderr == PIPE: self.stderr_handler = TextIOWrapper(self._process.stderr, encoding='utf8') self._readable_errors = True self._opened = True except Exception as error: raise FilesError(f'Can not open process. Reason: {error}') def close(self): '''Метод для закрытия процесса.''' if self._opened: if self._process.stdin: self.stdin_handler.close() self._stdin.close() self._opened = False def write(self, data): '''Метод для записи данных в stdin процесса.''' if not self._opened: self._open_process() self._is_read = False self._output_cache = '' try: if self._writable: self.stdin_handler.write(data) self.stdin_handler.flush() else: raise FilesError('Process stdin is not writable.') except IOError as error: raise FilesError(str(error)) def read(self): '''Метод для чтения данных из stdout процесса.''' if not self._opened and not self._writable: self._open_process() if not self._readable: raise FilesError('Process is not readable.') try: if not self._is_read: if self._writable: self.close() if self._readable: self._output_cache = self.stdout_handler.read() if self._readable_errors: self._error_cache = self.stderr_handler.read() self._process.poll() self._is_read = True return self._output_cache except KeyboardInterrupt: self.kill() raise def read_error(self): '''Метод для чтения ошибок, появившихся при выполнении процесса, из его stderr.''' self.read() if not self._error_cache: try: self._error_cache = self.stderr_handler.read() except IOError: self._error_cache = '' return self._error_cache def kill(self): '''Метод для удаления процесса, если он работает.''' if self._opened: self._process.kill() def read_lines(self): return self.read().split('\n') def __iter__(self): if not self._iterator: self._iterator = iter(self.read_lines()) return self._iterator def next(self): return next(self.__iter__(), None) @property def writable(self): '''Метод для проверки возможности записи данных в во входной поток процесса.''' return self._writable @property def readable(self): '''Метод для проверки возможности чтения вывода процесса.''' return self._readable @property def readable_errors(self): '''Метод для проверки возможности чтения ошибок.''' return self._readable_errors def return_code(self): '''Метод возвращающий код возвращенный процессом.''' self.read() return self._process.returncode @property def shell_command(self): '''Метод для получения эквивалентной консольной команды.''' command = ' '.join(self._command) previous_commands = self._stdin.shell_command if previous_commands == '': return command else: return ' | '.join([previous_commands, command]) def success(self): '''Метод для проверки успешности выполнения процесса.''' return self.return_code() == 0 def failed(self): '''Метод для проверки неуспешности выполнения процесса.''' return self.return_code() != 0 class ProgramPathCache: '''Класс, для поиска и кэширования путей к исполнительным файлам различных команд.''' def __init__(self): self._cache: dict = {} def __call__(self, program_name: str, prefix: str = '/'): program_base_name = path.basename(program_name) PATH = os.environ['PATH'] PATH = PATH.split(':') cache_key = (program_base_name, prefix) if cache_key in self._cache: self._cache[cache_key] if program_name.startswith('/'): if path.exists(join_paths(prefix, program_name)): self._cache[cache_key] = program_name return program_name for program_name in (join_paths(bin_path, program_base_name) for bin_path in PATH): if path.exists(join_paths(prefix, program_name)): self._cache[cache_key] = program_name return program_name return False get_program_path = ProgramPathCache() def check_command(*utils: List[str]): '''Функция для проверки наличия той или иной команды системе.''' output = [] for util in utils: util_path = get_program_path(util) if not util_path: raise FilesError("Command not found '{}'". format(os.path.basename(util))) output.append(util) if len(output) == 1: return output[0] else: return output def join_paths(*paths: List[str]) -> str: '''Функция для объединения путей. Объединяет также абсолютные пути.''' if len(paths) == 1: return next(iter(paths)) paths_to_join = [] for _path in paths[1:]: if _path.startswith('/'): _path = _path.strip()[1:] else: _path = _path.strip() if _path and _path != "/": paths_to_join.append(_path) output_path = path.join(paths[0], *paths_to_join) return output_path def read_link(file_path: str) -> str: '''Функция для получения целевого пути символьной ссылки.''' try: if path.exists(file_path): source_path = os.readlink(file_path) return source_path else: return None except (OSError, IOError) as error: mod, lineno = get_traceback_caller(*sys.exc_info()) FilesError("can not read link: {}({}:{})". format(str(error), mod, lineno)) def get_target_from_link(link_path: str, link_source: str, chroot_path: str = '/') -> str: '''Метод для получения целевого пути из целевого пути символьной ссылки с учетом того, что целевой путь символьной ссылки может быть относительным.''' if os.path.isabs(link_source): if chroot_path != '/': return join_paths(chroot_path, link_source) return link_source else: link_source = link_source.split('/') link_dir = os.path.dirname(link_path).split('/') if link_source[0] == '.': link_source.pop() else: while link_source[0] == '..': link_source.pop(0) link_dir.pop(-1) link_dir.extend(link_source) return '/'.join(link_dir) def read_file(file_path: str, binary: bool = False) -> Union[str, bytes]: '''Функция для чтения файлов, возвращает текст файла.''' try: if path.exists(file_path): with open(file_path, f'r{"b" if binary else ""}') as opened_file: return opened_file.read() except (OSError, IOError) as error: mod, lineno = get_traceback_caller(*sys.exc_info()) raise FilesError("file read error, {0}({1}:{2})". format(str(error), mod, lineno)) def grep_file(file_path, regexp, flags=0): """ Получить из файла данные по регулярному выражению """ data = read_file(file_path) m = re.search(regexp, data, flags=flags) if m: return m.group() return "" def write_file(file_path): '''Функция для открытия и записи файлов. Создает директории на пути к целевому файлу если это необходимо. Возвращает файловый объект.''' directory_path = path.dirname(file_path) if not path.exists(directory_path): os.makedirs(directory_path) file_to_write = open(file_path, 'w') return file_to_write def read_file_lines(file_name, grab=False): '''Функция для чтения файлов построчно.''' try: if path.exists(file_name): for file_line in open(file_name, 'r'): if grab: file_line = file_line.strip() if not file_line.startswith('#'): yield file_line else: yield file_line.strip() except (OSError, IOError): pass def quite_unlink(file_path): try: if path.lexists(file_path): os.unlink(file_path) except OSError: pass def list_directory(directory_path, fullpath=False, only_dir=False): if not path.exists(directory_path): return [] try: if fullpath: if only_dir: return [node.path for node in os.scandir(directory_path) if os.path.isdir(node.path)] else: return [node.path for node in os.scandir(directory_path)] else: if only_dir: return [node.name for node in os.scandir(directory_path) if os.path.isdir(node.path)] else: return os.listdir(directory_path) except OSError: return [] def get_directory_contents(path: str): '''Функция для получения списка путей ко всем файлам и директориям, содержащимся в указанной директории.''' output = [] for entry in os.scandir(path): try: if entry.is_symlink(): output.append(entry.path) elif entry.is_dir(): output.append(entry.path) output.extend(get_directory_contents(entry.path)) elif entry.is_file(): output.append(entry.path) except Exception: continue return output def make_directory(directory_path, force=False): try: parent = os.path.split(path.normpath(directory_path))[0] if not path.exists(parent): make_directory(parent) else: if os.path.exists(directory_path): if force and not os.path.isdir(directory_path): os.remove(directory_path) else: return True os.mkdir(directory_path) return True except (OSError, IOError): return False def check_directory_link(link_path, chroot_path='/'): '''Функция для проверки наличия зацикливающихся ссылок и их корректности в целом. В случае успешной проверки возвращает целевой путь ссылки.''' link_target = read_link(link_path) link_target = get_target_from_link(link_path, link_target, chroot_path=chroot_path) if link_target is None: # Ссылка не существует. raise FilesError('the source file does not exist') if not os.path.isdir(link_target): # Ссылка не на директорию. raise FilesError('the source is not directory') linked_path = os.path.abspath(link_target) # Добавляем / к концу пути, чтобы показать, что это путь к директории. if linked_path[-1] != '/': linked_path = linked_path + '/' # Пути, которые нужно проверить. to_check = [linked_path] # Целевые пути из встреченных ссылок. linked_paths = {linked_path} while to_check: current_directory = to_check.pop() for entry in os.scandir(current_directory): # Обходим только директории и ссылки на директории. if not entry.is_dir(): continue if entry.is_symlink(): linked_path = read_link(entry.path) linked_path = get_target_from_link(entry.path, linked_path, chroot_path=chroot_path) if linked_path in linked_paths: raise FilesError( 'the source directory contains cycled links') linked_paths.add(linked_path) to_check.append(linked_path) else: to_check.append(entry.path) return link_target class RealFS(GenericFS): def __init__(self, prefix='/'): self.prefix = prefix if prefix == '/': self.remove_prefix = lambda x: x else: self.remove_prefix = self._remove_prefix def _remove_prefix(self, file_path): prefix_length = len(self.prefix) return file_path[:prefix_length] def _get_path(self, file_path): return join_paths(self.prefix, file_path) def exists(self, file_path): return os.path.lexists(self._get_path(file_path)) def read(self, file_path): return read_file(self._get_path(file_path)) def glob(self, file_path): for glob_path in glob(self._get_path(file_path)): yield self.remove_prefix(glob_path) def realpath(self, file_path): return self.remove_prefix(path.realpath(file_path)) def write(self, file_path, data): with write_file(file_path) as target_file: target_file.write(data) def listdir(self, file_path, full_path=False): if full_path: return [self.remove_prefix(listed_path) for listed_path in list_directory(file_path, full_path=full_path)] else: return list_directory(file_path, full_path=full_path) def get_run_commands(not_chroot=False, chroot=None, uid=None, with_pid=False): def get_cmdline(process_number): cmdline_file = '/proc/{}/cmdline'.format(process_number) try: if uid is not None: fstat = os.stat('/proc/{}'.format(process_number)) if fstat.st_uid != uid: return '' if path.exists(cmdline_file): if not_chroot: root_link = '/proc/{}/root'.format(process_number) if os.readlink(root_link) != '/': return '' if chroot is not None: root_link = '/proc/{}/root'.format(process_number) if os.readlink(root_link) != chroot: return '' return read_file(cmdline_file).strip() except Exception: pass return '' if not os.access('/proc', os.R_OK): return [] proc_directory = list_directory('/proc') output = [] for file_name in proc_directory: if file_name.isdigit(): cmdline = get_cmdline(file_name) if cmdline: if with_pid: output.append((file_name, cmdline)) else: output.append(cmdline) return output @contextlib.contextmanager def stderr_devnull(): oldstderr = sys.stderr sys.stderr = open(os.devnull, 'w') try: yield finally: sys.stderr = oldstderr