import os import sys import json import struct import signal import asyncio import logging from multiprocessing import Process from multiprocessing.connection import Listener # import logging # import time from daemon import daemon from commands import Command from typing import Dict, Optional, List, Union, Tuple, Literal from asyncio.unix_events import _UnixSelectorEventLoop from asyncio import get_event_loop LOG_LEVELS = {logging.ERROR: "ERROR", logging.INFO: "INFO", logging.WARNING: "WARNING", logging.DEBUG: "DEBUG", logging.CRITICAL: "CRITICAL", } class WorkerException(Exception): pass class UnexpectedWorkerFinish(WorkerException): def __init__(self, reason: List[str], output: Optional[List[str]] = []): """Исключение кидаемое в случае, если воркер неожиданно умер.""" self.reason: List[str] = reason self.output: List[dict] = output class UnexpectedWorkerState(WorkerException): """Исключение кидаемое в случае, если состояние воркера не соответствует ожидаемому.""" def __init__(self, state: str, expected: str): self.state: str = state self.expected: str = expected class DaemonIsDead(WorkerException): """Исключение кидаемое в случае, если демон-воркер неожиданно оказался мертв.""" def __init__(self, info: str, output: Optional[List[str]] = []): self.info: str = info self.output: List[dict] = output class IncorrectConfiguration(WorkerException): """Исключение кидаемое при попытке запустить выполнение конфигурации, в которой есть ошибки.""" def __init__(self, errors: Dict[str, str]): self._errors = errors class WorkersManager: def __init__(self, loop: Optional[_UnixSelectorEventLoop] = None): if loop is None: self._loop = get_event_loop() else: self._loop = loop self._configs: Dict[int, WorkerProtocol] = {} self._execs: Dict[int, WorkerProtocol] = {} self._wids: List[int] = [] async def make_worker(self, command: Command) -> Tuple[Union[int, None], Union[str, None]]: """Метод для создания воркеров.""" wid = self._get_wid() try: # Создаем воркера и получаем интерфейс для взаимодействия с ним. worker_api = await self._create_worker(wid, command) self._configs[wid] = worker_api except Exception as error: if wid in self._configs: self._configs.pop(wid) return None, str(error) return wid, None async def configure_worker(self, wid: int, parameters: List[dict], reset: bool = False) -> Union[dict, None]: """Метод для модификации или сброса параметров конфигурации указанными значениями. """ if not parameters: return {} if wid not in self._configs: return None worker_api: WorkerProtocol = self._configs[wid] try: result = await worker_api.configure(parameters) except (UnexpectedWorkerFinish, DaemonIsDead): worker_api.kill() self._configs.pop(wid) raise return result async def cancel_worker_configuration(self, wid: int): """Метод для завершения конфигурации воркера и завершения его работы. """ if wid not in self._configs: return None worker_api: WorkerProtocol = self._configs[wid] output = await worker_api.cancel_configuration() worker_api.delete_socket() self._configs.pop(wid) return output def clean(self): """Метод для убийства всех работающих на данный момент демонов.""" # TODO доработать его логику так, чтобы при перезапуске сервера этот # метод никого не убивал. for wid, worker in {**self._configs, **self._execs}.items(): result = False try: result = worker.kill() finally: if result: print(f"[*] Worker {wid} is killed.") else: print(f"[x] Worker {wid} is not killed.") continue def get_config(self, wid: int): if wid in self._configs: return self._configs[wid] return None def get_execution(self, wid: int): if wid in self._execs: return self._execs[wid] return None def run_execution(self, wid: int) -> Tuple[int, Union[dict, None]]: '''Метод для запуска исполнения команды, используя указанную конфигурацию. Вывод: 0, None -- исполнение успешно запущено; 1, None -- конфигурация с данным wid не найдена; 2, errors -- конфигурация с данным wid содержит ошибки; 3, None -- исполнение с данным wid уже существует; ''' if wid not in self._configs: return 1, None if self._configs[wid].status > 0: # Берем ошибки, как-то. errors = {} return 2, errors if wid in self._execs: return 3, None # worker = self._configs.pop(wid) return 0, None async def _create_worker(self, wid: int, command: Command ) -> "WorkerProtocol": # Путь к временному файлу, в котором будет лежать pid демона-рабочего. pid_file_path: str = os.path.join(os.getcwd(), f"pid_file_{wid}") socket_path: str = os.path.join(os.getcwd(), f"worker_{wid}.sock") if os.path.exists(socket_path): os.remove(socket_path) process_runner = Process(target=daemonize_worker, args=(socket_path, wid, pid_file_path, command) ) process_runner.start() process_runner.join() daemon_pid = await self._get_daemon_pid(pid_file_path) self._delete_pid_file(pid_file_path) transport, worker_interface = await self._loop.create_unix_connection( lambda: WorkerProtocol(daemon_pid, socket_path), socket_path) return worker_interface def _get_wid(self): if not self._wids: worker_id = 0 else: worker_id = self._wids[-1] + 1 self._wids.append(worker_id) return worker_id async def _get_daemon_pid(self, pid_file_path: str) -> int: while True: if not os.path.exists(pid_file_path): # Если файла нет -- уступаем работу другим воркерам. await asyncio.sleep(0) with open(pid_file_path, "r") as pid_file: daemon_pid = int(pid_file.readline()) return daemon_pid def _delete_pid_file(self, pid_file_path: str) -> None: try: os.remove(pid_file_path) except OSError: pass def __repr__(self): configs = ["Configurations:"] configs.extend([f"wid={wid}" for wid in self._configs.keys()]) configs_repr = "\n\t".join(configs) execs = ["Executions:"] execs.extend([f"wid={wid}" for wid in self._execs.keys()]) execs_repr = "\n\t".join(execs) return f"{configs_repr}\n{execs_repr}" def daemonize_worker(socket_path: str, wid: int, pid_file: str, command: Command) -> None: '''Функция запускаемая в отдельном процессе и порождающая демона при помощи магии двойного форка.''' base_dir = os.getcwd() new_fork() os.chdir("/") os.setsid() os.umask(0) new_fork() # Создаем файл, с помощью которого будем передавать pid демона-рабочего. daemon_pid = str(os.getpid()) with open(pid_file, "w") as pid_file: pid_file.write(daemon_pid) with Listener(socket_path) as listener: daemon(listener, wid, base_dir, command) def new_fork() -> None: '''Функция для создания форка текущего процесса.''' try: middle_pid = os.fork() if middle_pid > 0: sys.exit(0) except OSError as error: print(f"FORK ERROR: {error.errno} - {str(error)}") sys.exit(1) class WorkerProtocol(asyncio.Protocol): """Класс протокола управления воркером.""" def __init__(self, pid: int, socket_path: str): self.loop = asyncio.get_running_loop() self._pid: int = pid self._socket_path: str = socket_path # Объект Event, предназначенный для остановки записи при переполнении # буффера записи в трaнспорте. self._can_write = asyncio.Event() self._can_write.clear() self._msg_queue = asyncio.Queue() # Контекст чтения данных воркера. self._buffer = bytearray() self._msg_size: int = 0 self._connection_closed: bool = True async def configure(self, parameters: List[dict], reset: bool = False) -> list: """Метод для конфигурации воркера указанными значениями параметров. Возвращает список ошибок, если таковые есть, или пустой список, если никаких ошибок нет. """ message = self._make_worker_message("config", int(reset), parameters) await self._write(message) response = await self._read() response = json.loads(response) return response["data"] async def cancel_configuration(self) -> List[dict]: """Метод для остановки конфигурации воркера и его работы в целом.""" stop_message: bytes = self._make_worker_message("finish", 1, None) await self._write(stop_message) response = await self._read() response = json.loads(response) return response["data"] def connection_made(self, transport: asyncio.Transport) -> None: self.transport = transport self._can_write.set() self._connection_closed: bool = False print("Connection made.") def connection_lost(self, exc: BaseException) -> None: if exc: print("Connection lost. Error:", str(exc)) else: print("Connection lost.") self._connection_closed: bool = True def data_received(self, data: bytes) -> None: self._buffer.extend(data) while True: if not self._msg_size: if len(self._buffer) < 4: return size_length = struct.calcsize(">i") self._msg_size, = struct.unpack(">i", self._buffer[:size_length]) self._buffer = self._buffer[size_length:] if len(self._buffer) >= self._msg_size: data = self._buffer[:self._msg_size] self._buffer = self._buffer[self._msg_size:] self._msg_size = 0 self._msg_queue.put_nowait(data.decode()) else: break def pause_writing(self) -> None: self._can_write.clear() def resume_writing(self) -> None: self._can_write.set() async def _read(self) -> str: if self._connection_closed and self._msg_queue.empty(): return None return await self._msg_queue.get() async def read_all(self) -> List[str]: if self._msg_queue.empty(): if self._connection_closed: return None else: return await self._msg_queue.get() output = [] while not self._msg_queue.empty(): output.append(self._msg_queue.get_nowait()) return output async def _write(self, data: Union[str, bytes]) -> None: await self._can_write.wait() if isinstance(data, str): data = data.encode() self.transport.write(struct.pack(">i", len(data))) self.transport.write(data) def _make_worker_message( self, state: Literal["config", "finish", "exec", "input"], status: int, data: Union[list, dict]) -> dict: return json.dumps({"state": state, "status": status, "data": data}).encode() def delete_socket(self) -> None: """Метод для удаления сокета.""" try: os.remove(self._socket_path) self.transport.close() self._connection_closed = True except OSError: pass def kill(self) -> bool: """Метод для убийства демона.""" print("GONNA KILLA DAEMON:", self._pid) if self.is_daemon_alive(): try: os.kill(self._pid, signal.SIGKILL) except OSError: return False else: print("ALREADY DEAD") return True def stop(self) -> bool: print("STOP DAEMON:", self._daemon_pid) if self.is_daemon_alive(): try: # Для остановки демона посылаем в него KeyboardInterrupt. os.kill(self._pid, signal.SIGINT) except OSError as error: print(f"ERROR: Failed stopping daemon PID={self._pid}. " f"Reason: {str(error)}") return False return True def is_daemon_alive(self) -> bool: try: os.kill(self._pid, 0) return True except OSError: return False