# vim: fileencoding=utf-8 # from subprocess import Popen, PIPE from io import TextIOWrapper from os import path import os class FilesError(Exception): pass class PipeProcess(): def _get_stdout(self): return PIPE def close(self): pass @property def shell_command(self): return '' class KeyboardInputProcess(): def _get_stdout(self): return None def close(self): pass @property def shell_command(self): return '' class Process(): def __init__(self, command, *parameters, **kwargs): if 'stdin' not in kwargs: self._stdin = PipeProcess() elif kwargs['stdin'] == PIPE: self._stdin = PipeProcess() elif kwargs['stdin'] is None: self._stdin = KeyboardInputProcess() else: self._stdin = kwargs['stdin'] self._stdout = kwargs.get('stdout', PIPE) self._stderr = kwargs.get('stderr', PIPE) self._envdict = kwargs.get('envdict', os.environ.copy()) self._envdict['LANG'] = kwargs.get('lang', 'C') self._timeout = kwargs.get('timeout', None) self._cwd = kwargs.get('cwd', None) self._command = _get_program_path(command) if not self._command: raise FilesError("command not found '{}'".format(command)) self._command = [self._command, *parameters] self._process = None self._iterator = iter([]) # Flags. self._opened = False self._is_read = False self._readable = False self._writable = False self._readable_errors = False # I/O handlers. self.stdin_handler = None self.stdout_handler = None self.stderr_handler = None # Caches. self._output_cache = '' self._error_cache = '' def _get_stdout(self): self._open_process() return self.stdout_handler def _get_stdin(self): return self.stdin_handler def _open_process(self): try: piped_stdin = self._stdin._get_stdout() self._process = Popen(self._command, stdout=self._stdout, stdin=piped_stdin, stderr=self._stderr, cwd=self._cwd, close_fds=True, env=self._envdict) if piped_stdin == PIPE: self.stdin_handler = TextIOWrapper(self._process.stdin, encoding='utf8') self._writable = True elif piped_stdin is not None: self.stdin_handler = self._stdin._get_stdin() self._writable = True if self._stdout == PIPE: self.stdout_handler = TextIOWrapper(self._process.stdout, encoding='utf8') self._readable = True if self._stderr == PIPE: self.stderr_handler = TextIOWrapper(self._process.stderr, encoding='utf8') self._readable_errors = True self._opened = True except Exception as error: print('error:', error) raise FilesError('Can not open process.') def close(self): if self._opened: if self._process.stdin: self.stdin_handler.close() self._stdin.close() self._opened = False def write(self, data): if not self._opened: self._open_process() self._is_read = False self._output_cache = '' try: if self._writable: self.stdin_handler.write(data) self.stdin_handler.flush() else: raise FilesError('Process stdin is not writable.') except IOError as error: raise FilesError(str(error)) def read(self): if not self._opened and not self._writable: self._open_process() if not self._readable: raise FilesError('Process is not readable.') try: if not self._is_read: if self._writable: self.close() if self._readable: self._output_cache = self.stdout_handler.read() if self._readable_errors: self._error_cache = self.stderr_handler.read() self._process.poll() self._is_read = True return self._output_cache except KeyboardInterrupt: self.kill() raise def read_error(self): self.read() if not self._error_cache: try: self._error_cache = self.stderr_handler.read() except IOError: self._error_cache = '' return self._error_cache def kill(self): if self._opened: self._process.kill() def read_lines(self): return self.read().split('\n') def __iter__(self): if not self._iterator: self._iterator = iter(self.read_lines()) return self._iterator def next(self): return next(self.__iter__(), None) @property def writable(self): return self._writable @property def readable(self): return self._readable @property def readable_errors(self): return self._readable_errors def return_code(self): self.read() return self._process.returncode @property def shell_command(self): command = ' '.join(self._command) previous_commands = self._stdin.shell_command if previous_commands == '': return command else: return ' | '.join([previous_commands, command]) def success(self): return self.return_code() == 0 def failed(self): return self.return_code() != 0 class ProgramPathCache(): def __init__(self): self._cache = {} def __call__(self, program_name, prefix='/'): program_base_name = path.basename(program_name) PATH = os.environ['PATH'] PATH = PATH.split(':') cache_key = (program_base_name, prefix) if cache_key in self._cache: self._cache[cache_key] if program_name.startswith('/'): if path.exists(join_paths(prefix, program_name)): self._cache[cache_key] = program_name return program_name for program_name in (join_paths(bin_path, program_base_name) for bin_path in PATH): if path.exists(join_paths(prefix, program_name)): self._cache[cache_key] = program_name return program_name return False _get_program_path = ProgramPathCache() def join_paths(*paths): if len(paths) == 1: return next(iter(paths)) paths_to_join = filter(lambda path: path.strip() and path != "/", map(lambda path: path[1:] if path.startswith('/') else path, paths[1:])) output_path = path.join(paths[0], *paths_to_join) return output_path