# import time from typing import Union, List, Optional, Dict from multiprocessing.connection import Listener from commands import Command from io_module import IOModule def daemon(listener: Listener, wid: int, base_dir: str, command: Command): """Daemon main function.""" with IOModule(listener, command.id) as io: io.script = "worker" try: # int("lolkek") parameters = configure(io, get_default_parameters(command.parameters)) except KeyboardInterrupt: # Остановлено пользователем. send_finish(io) return except Exception as error: # Остановлено из-за ошибки. send_finish(io, errors=[str(error)]) return try: for name, script in command.scripts.items(): script_func = script[0] parameters = configure(io, dict()) io.set_info(f"Running script '{name}'") io.script = name script_func(io, parameters) io.script = "worker" for num in range(5): io.set_error(f"test_error_{num}") raise Exception("critical error LOL") io.set_error("there is no life after CRITICAL error") except Exception as error: io.set_error(str(error)) finally: io.set_info("Command is done.") io.send({"type": "finish", "msg": "finishing."}) for num in range(5): io.set_info(f"After finish message_{num}.") def configure(io: IOModule, parameters: dict) -> Union[None, dict]: # Копируем чтобы уже для тестов реализовать возможность сброса. processing_parameters = parameters.copy() while True: errors = [] server_message = io.receive() state = server_message["state"] status = server_message["status"] if state == "config": if status == 1: # Сброс параметров. processing_parameters = parameters.copy() values: list = server_message.get("data", None) if values is not None: # Модифицируем параметры. errors = modify_parameters(processing_parameters, values) elif state == "finish": if status == 0: # Заканчиваем конфигурацию и возвращаем полученные параметры # если статус 1. return parameters # Останавливаем конфигурацию и работу воркера если статус 1. raise KeyboardInterrupt() if errors: send_config_status(io, errors=errors) else: send_config_status(io) def modify_parameters(current: dict, new: list) -> None: errors = {} for parameter in new: parameter_id = parameter["id"] parameter_value = parameter["value"] if parameter_id in current: try: parameter_value = int(parameter_value) current[parameter_id] = parameter_value except ValueError as error: errors[parameter_id] = str(error) else: errors[parameter_id] = f'Parameter "{parameter_id}" is not found.' return errors def send_finish(io: IOModule, errors: Optional[List[str]] = []) -> None: io.send({"state": "finish", "status": int(bool(errors)), "data": errors}) def send_config_status(io: IOModule, errors: Optional[Dict[str, str]] = {}) -> None: io.send({"state": "config", "status": int(bool(errors)), "data": errors}) def get_default_parameters(parameters_description): parameters = {} for group in parameters_description: for parameter in group["parameters"]: parameters[parameter["id"]] = parameter["default"] return parameters