You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

122 lines
4.1 KiB

# import time
from typing import Union, List, Optional, Dict
from multiprocessing.connection import Listener
from commands import Command
from io_module import IOModule
def daemon(listener: Listener, wid: int, base_dir: str, command: Command):
"""Daemon main function."""
with IOModule(listener, command.id) as io:
io.script = "worker"
try:
# int("lolkek")
parameters = configure(io,
get_default_parameters(command.parameters))
except KeyboardInterrupt:
# Остановлено пользователем.
send_finish(io)
return
except Exception as error:
# Остановлено из-за ошибки.
send_finish(io, errors=[str(error)])
return
try:
for name, script in command.scripts.items():
script_func = script[0]
parameters = configure(io, dict())
io.set_info(f"Running script '{name}'")
io.script = name
script_func(io, parameters)
io.script = "worker"
for num in range(5):
io.set_error(f"test_error_{num}")
raise Exception("critical error LOL")
io.set_error("there is no life after CRITICAL error")
except Exception as error:
io.set_error(str(error))
finally:
io.set_info("Command is done.")
io.send({"type": "finish", "msg": "finishing."})
for num in range(5):
io.set_info(f"After finish message_{num}.")
def configure(io: IOModule, parameters: dict) -> Union[None, dict]:
# Копируем чтобы уже для тестов реализовать возможность сброса.
processing_parameters = parameters.copy()
while True:
errors = []
server_message = io.receive()
state = server_message["state"]
status = server_message["status"]
if state == "config":
if status == 1:
# Сброс параметров.
processing_parameters = parameters.copy()
values: list = server_message.get("data", None)
if values is not None:
# Модифицируем параметры.
errors = modify_parameters(processing_parameters, values)
elif state == "finish":
if status == 0:
# Заканчиваем конфигурацию и возвращаем полученные параметры
# если статус 1.
return parameters
# Останавливаем конфигурацию и работу воркера если статус 1.
raise KeyboardInterrupt()
if errors:
send_config_status(io, errors=errors)
else:
send_config_status(io)
def modify_parameters(current: dict, new: list) -> None:
errors = {}
for parameter in new:
parameter_id = parameter["id"]
parameter_value = parameter["value"]
if parameter_id in current:
try:
parameter_value = int(parameter_value)
current[parameter_id] = parameter_value
except ValueError as error:
errors[parameter_id] = str(error)
else:
errors[parameter_id] = f'Parameter "{parameter_id}" is not found.'
return errors
def send_finish(io: IOModule, errors: Optional[List[str]] = []) -> None:
io.send({"state": "finish",
"status": int(bool(errors)),
"data": errors})
def send_config_status(io: IOModule,
errors: Optional[Dict[str, str]] = {}) -> None:
io.send({"state": "config",
"status": int(bool(errors)),
"data": errors})
def get_default_parameters(parameters_description):
parameters = {}
for group in parameters_description:
for parameter in group["parameters"]:
parameters[parameter["id"]] = parameter["default"]
return parameters