|
|
import os
|
|
|
import sys
|
|
|
import json
|
|
|
import socket
|
|
|
import signal
|
|
|
import logging
|
|
|
from multiprocessing import Process
|
|
|
# from asyncio import sleep
|
|
|
# import logging
|
|
|
# import time
|
|
|
from io_module import IOModule
|
|
|
from daemon import daemon
|
|
|
from commands import Command
|
|
|
|
|
|
from typing import Dict, Optional, List, Union, Tuple, Literal
|
|
|
|
|
|
from asyncio.unix_events import _UnixSelectorEventLoop
|
|
|
from asyncio import get_event_loop, sleep
|
|
|
|
|
|
from pprint import pprint
|
|
|
|
|
|
|
|
|
LOG_LEVELS = {logging.ERROR: "ERROR",
|
|
|
logging.INFO: "INFO",
|
|
|
logging.WARNING: "WARNING",
|
|
|
logging.DEBUG: "DEBUG",
|
|
|
logging.CRITICAL: "CRITICAL",
|
|
|
}
|
|
|
|
|
|
|
|
|
class WorkerException(Exception):
|
|
|
pass
|
|
|
|
|
|
|
|
|
class UnexpectedWorkerFinish(WorkerException):
|
|
|
def __init__(self, reason: List[str], output: Optional[List[str]] = []):
|
|
|
"""Исключение кидаемое в случае, если воркер неожиданно умер."""
|
|
|
self.reason: List[str] = reason
|
|
|
self.output: List[dict] = output
|
|
|
|
|
|
|
|
|
class UnexpectedWorkerState(WorkerException):
|
|
|
"""Исключение кидаемое в случае, если состояние воркера не соответствует
|
|
|
ожидаемому."""
|
|
|
def __init__(self, state: str, expected: str):
|
|
|
self.state: str = state
|
|
|
self.expected: str = expected
|
|
|
|
|
|
|
|
|
class DaemonIsDead(WorkerException):
|
|
|
"""Исключение кидаемое в случае, если демон-воркер неожиданно оказался
|
|
|
мертв."""
|
|
|
def __init__(self, info: str, output: Optional[List[str]] = []):
|
|
|
self.info: str = info
|
|
|
self.output: List[dict] = output
|
|
|
|
|
|
|
|
|
class IncorrectConfiguration(WorkerException):
|
|
|
"""Исключение кидаемое при попытке запустить выполнение конфигурации, в
|
|
|
которой есть ошибки."""
|
|
|
def __init__(self, errors: Dict[str, str]):
|
|
|
self._errors = errors
|
|
|
|
|
|
|
|
|
class WorkersManager:
|
|
|
def __init__(self, loop: Optional[_UnixSelectorEventLoop] = None):
|
|
|
if loop is None:
|
|
|
self._loop = get_event_loop()
|
|
|
else:
|
|
|
self._loop = loop
|
|
|
|
|
|
self._configs: Dict[int, Worker] = {}
|
|
|
self._execs: Dict[int, Worker] = {}
|
|
|
self._wids: List[int] = []
|
|
|
|
|
|
async def make_worker(self, command: Command) -> Tuple[Union[int, None],
|
|
|
Union[str, None]]:
|
|
|
wid = self._get_wid()
|
|
|
try:
|
|
|
worker = Worker(wid, loop=self._loop)
|
|
|
await worker.create_daemon(command)
|
|
|
self._configs[wid] = worker
|
|
|
except Exception as error:
|
|
|
if wid in self._configs:
|
|
|
self._configs.pop(wid)
|
|
|
return None, str(error)
|
|
|
return wid, None
|
|
|
|
|
|
async def configure_worker(self, wid: int, parameters: List[dict],
|
|
|
reset: bool = False) -> Union[dict, None]:
|
|
|
"""Метод для модификации или сброса параметров конфигурации указанными
|
|
|
значениями."""
|
|
|
if not parameters:
|
|
|
return {}
|
|
|
if wid not in self._configs:
|
|
|
return None
|
|
|
|
|
|
worker: Worker = self._configs[wid]
|
|
|
try:
|
|
|
result = await worker.configure(parameters)
|
|
|
except (UnexpectedWorkerFinish, DaemonIsDead):
|
|
|
worker.kill()
|
|
|
self._configs.pop(wid)
|
|
|
raise
|
|
|
|
|
|
return result
|
|
|
|
|
|
async def cancel_worker_configuration(self, wid: int):
|
|
|
if wid not in self._configs:
|
|
|
return None
|
|
|
|
|
|
worker: Worker = self._configs[wid]
|
|
|
output = await worker.cancel_configuration()
|
|
|
self._configs.pop(wid)
|
|
|
|
|
|
return output
|
|
|
|
|
|
def clean(self):
|
|
|
"""Метод для убийства всех работающих на данный момент демонов."""
|
|
|
# TODO доработать его логику так, чтобы при перезапуске сервера этот
|
|
|
# метод никого не убивал.
|
|
|
print("KILLING WORKERS:")
|
|
|
for wid, worker in {**self._configs, **self._execs}.items():
|
|
|
result = False
|
|
|
try:
|
|
|
result = worker.kill()
|
|
|
finally:
|
|
|
if result:
|
|
|
print(f"[*] Worker {wid} is killed.")
|
|
|
else:
|
|
|
print(f"[x] Worker {wid} is not killed.")
|
|
|
continue
|
|
|
|
|
|
def get_config(self, wid: int):
|
|
|
if wid in self._configs:
|
|
|
return self._configs[wid]
|
|
|
return None
|
|
|
|
|
|
def get_execution(self, wid: int):
|
|
|
if wid in self._execs:
|
|
|
return self._execs[wid]
|
|
|
return None
|
|
|
|
|
|
def run_execution(self, wid: int) -> Tuple[int, Union[dict, None]]:
|
|
|
'''Метод для запуска исполнения команды, используя указанную
|
|
|
конфигурацию.
|
|
|
Вывод:
|
|
|
0, None -- исполнение успешно запущено;
|
|
|
1, None -- конфигурация с данным wid не найдена;
|
|
|
2, errors -- конфигурация с данным wid содержит ошибки;
|
|
|
3, None -- исполнение с данным wid уже существует;
|
|
|
'''
|
|
|
if wid not in self._configs:
|
|
|
return 1, None
|
|
|
if self._configs[wid].status > 0:
|
|
|
# Берем ошибки, как-то.
|
|
|
errors = {}
|
|
|
return 2, errors
|
|
|
if wid in self._execs:
|
|
|
return 3, None
|
|
|
|
|
|
worker = self._configs.pop(wid)
|
|
|
return 0, None
|
|
|
|
|
|
def _get_wid(self):
|
|
|
if not self._wids:
|
|
|
worker_id = 0
|
|
|
else:
|
|
|
worker_id = self._wids[-1] + 1
|
|
|
self._wids.append(worker_id)
|
|
|
return worker_id
|
|
|
|
|
|
def __repr__(self):
|
|
|
configs = ["Configurations:"]
|
|
|
configs.extend([f"wid={wid}" for wid in self._configs.keys()])
|
|
|
configs_repr = "\n\t".join(configs)
|
|
|
|
|
|
execs = ["Executions:"]
|
|
|
execs.extend([f"wid={wid}" for wid in self._execs.keys()])
|
|
|
execs_repr = "\n\t".join(execs)
|
|
|
|
|
|
return f"{configs_repr}\n{execs_repr}"
|
|
|
|
|
|
|
|
|
class Worker:
|
|
|
def __init__(self, wid: int,
|
|
|
loop: Optional[_UnixSelectorEventLoop] = None):
|
|
|
if loop is None:
|
|
|
self._loop = get_event_loop()
|
|
|
else:
|
|
|
self._loop = loop
|
|
|
|
|
|
self._wid: int = wid
|
|
|
self._socket_path: str = os.path.join(os.getcwd(),
|
|
|
f"worker_{self._wid}.sock")
|
|
|
if os.path.exists(self._socket_path):
|
|
|
os.remove(self._socket_path)
|
|
|
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
|
self._daemon_pid = None
|
|
|
|
|
|
# Контекст для чтения данных из сокета.
|
|
|
self._context: str = ""
|
|
|
|
|
|
self._state: str = ""
|
|
|
self._status: int = 0
|
|
|
|
|
|
@property
|
|
|
def state(self):
|
|
|
return self._state
|
|
|
|
|
|
@property
|
|
|
def status(self):
|
|
|
return self._status
|
|
|
|
|
|
async def configure(self, parameters: List[dict],
|
|
|
reset: bool = False) -> Union[dict, list]:
|
|
|
'''Метод для отправки в демона-воркера занчений параметров во время
|
|
|
конфигурации.'''
|
|
|
if self._state != "config":
|
|
|
raise UnexpectedWorkerState(self._state, "config")
|
|
|
|
|
|
message = self._make_worker_message("config", int(reset), parameters)
|
|
|
await self.send(message)
|
|
|
|
|
|
answers = await self.read()
|
|
|
|
|
|
print("WORKER'S ANSWERS")
|
|
|
pprint(answers)
|
|
|
|
|
|
config_answer = answers[0]
|
|
|
if config_answer["state"] == "finish":
|
|
|
raise UnexpectedWorkerFinish(config_answer["data"],
|
|
|
output=answers[1:])
|
|
|
return config_answer["data"]
|
|
|
|
|
|
async def cancel_configuration(self) -> List[dict]:
|
|
|
if self._state != "config":
|
|
|
raise UnexpectedWorkerState(self._state, "config")
|
|
|
|
|
|
stop_message = self._make_worker_message("finish", 1, None)
|
|
|
await self.send(stop_message)
|
|
|
|
|
|
output = await self.read()
|
|
|
|
|
|
print("OUTPUT")
|
|
|
pprint(output)
|
|
|
if output:
|
|
|
return output
|
|
|
return []
|
|
|
|
|
|
async def send(self, data: dict):
|
|
|
await self._loop.run_in_executor(None, self._send,
|
|
|
json.dumps(data) + '\0')
|
|
|
|
|
|
async def read(self):
|
|
|
messages = await self._loop.run_in_executor(None, self._read)
|
|
|
return messages
|
|
|
|
|
|
async def create_daemon(self, command: Command) -> None:
|
|
|
# Путь к временному файлу, в котором будет лежать pid демона-рабочего.
|
|
|
pid_file: str = os.path.join(os.getcwd(), f"PID_file_{self._wid}")
|
|
|
|
|
|
io = IOModule(self._socket_path, command.id)
|
|
|
|
|
|
runner_process = Process(target=daemonize_worker,
|
|
|
args=(io, self._wid, pid_file,
|
|
|
command)
|
|
|
)
|
|
|
runner_process.start()
|
|
|
self.sock.connect(self._socket_path)
|
|
|
runner_process.join()
|
|
|
|
|
|
self._daemon_pid = 0
|
|
|
while not self._daemon_pid:
|
|
|
await sleep(0.5)
|
|
|
self._daemon_pid = self._get_daemon_pid(pid_file)
|
|
|
self._delete_pid_file(pid_file)
|
|
|
|
|
|
print("DAEMON PID =", self._daemon_pid)
|
|
|
print("IS_ALIVE:", self.is_daemon_alive())
|
|
|
self._state = "config"
|
|
|
|
|
|
def kill(self) -> bool:
|
|
|
"""Метод для убийства демона."""
|
|
|
print("GONNA KILLA DAEMON:", self._daemon_pid)
|
|
|
if self.is_daemon_alive():
|
|
|
try:
|
|
|
os.kill(self._daemon_pid, signal.SIGKILL)
|
|
|
except OSError:
|
|
|
return False
|
|
|
else:
|
|
|
print("ALREADY DEAD")
|
|
|
# self._delete_socket()
|
|
|
self._daemon_pid = None
|
|
|
return True
|
|
|
|
|
|
def _stop_daemon(self) -> bool:
|
|
|
print("STOP DAEMON:", self._daemon_pid)
|
|
|
if self.is_daemon_alive():
|
|
|
try:
|
|
|
# Для остановки демона посылаем в него KeyboardInterrupt.
|
|
|
os.kill(self._daemon_pid, signal.SIGINT)
|
|
|
except OSError as error:
|
|
|
print(f"ERROR: Failed stopping daemon PID={self._daemon_pid}. "
|
|
|
f"Reason: {str(error)}")
|
|
|
return False
|
|
|
return True
|
|
|
|
|
|
def is_daemon_alive(self) -> bool:
|
|
|
try:
|
|
|
os.kill(self._daemon_pid, 0)
|
|
|
except OSError:
|
|
|
return False
|
|
|
else:
|
|
|
return True
|
|
|
|
|
|
def _send(self, data: str) -> bool:
|
|
|
try:
|
|
|
self.sock.sendall(data.encode())
|
|
|
except socket.error as error:
|
|
|
print(f"ERROR: can not send data to worker {self._wid}."
|
|
|
f" Reason: {str(error)}")
|
|
|
return False
|
|
|
return True
|
|
|
|
|
|
def _read(self) -> Union[dict, None]:
|
|
|
input_queue = []
|
|
|
while True:
|
|
|
try:
|
|
|
data = self.sock.recv(1024)
|
|
|
except Exception as error:
|
|
|
print(f"Connection is broken. Error: {str(error)}")
|
|
|
return None
|
|
|
raise DaemonIsDead(f"Connection is broken. "
|
|
|
f"Error: {str(error)}")
|
|
|
if data:
|
|
|
# Если данные получены делим их по нулевому байту.
|
|
|
data = data.decode().split('\0')
|
|
|
break
|
|
|
# Иначе считаем, что соединение нарушилось.
|
|
|
print("Connection is broken. Empty data.")
|
|
|
return None
|
|
|
raise DaemonIsDead("Connection is broken.")
|
|
|
|
|
|
if self._context:
|
|
|
self._context += data.pop(0)
|
|
|
# Если после взятия первого значения еще что-то есть в списке
|
|
|
# данных -- значит среди них был нулевой байт. И из значения
|
|
|
# контекста можно получить прочитанные данные.
|
|
|
if data:
|
|
|
request = json.loads(self._context)
|
|
|
input_queue.append(request)
|
|
|
self._context = ""
|
|
|
|
|
|
if data:
|
|
|
# Если данные еще есть в списке -- значит последнее значение в
|
|
|
# списке, даже если это пустая строка, является котекстом.
|
|
|
self._context = data.pop()
|
|
|
if data:
|
|
|
# Остальное добавляем в очередь.
|
|
|
for request in data:
|
|
|
input_queue.append(json.loads(request))
|
|
|
|
|
|
return input_queue
|
|
|
|
|
|
def _make_worker_message(self, state: Literal["config", "finish",
|
|
|
"exec", "input"],
|
|
|
status: int, data: Union[list, dict]) -> dict:
|
|
|
return {"state": state,
|
|
|
"status": status,
|
|
|
"data": data}
|
|
|
|
|
|
def _get_daemon_pid(self, pid_file_path: str) -> int:
|
|
|
if not os.path.exists(pid_file_path):
|
|
|
return 0
|
|
|
try:
|
|
|
with open(pid_file_path, "r") as pid_file:
|
|
|
daemon_pid = int(pid_file.readline())
|
|
|
return daemon_pid
|
|
|
except Exception:
|
|
|
return 0
|
|
|
|
|
|
def _delete_socket(self) -> None:
|
|
|
"""Метод для удаления сокета."""
|
|
|
try:
|
|
|
self.sock.close()
|
|
|
os.remove(self._socket_path)
|
|
|
except OSError:
|
|
|
pass
|
|
|
|
|
|
def _delete_pid_file(self, pid_file_path: str) -> None:
|
|
|
try:
|
|
|
os.remove(pid_file_path)
|
|
|
except OSError:
|
|
|
pass
|
|
|
|
|
|
|
|
|
def daemonize_worker(io: IOModule, wid: int, pid_file: str,
|
|
|
command: Command) -> None:
|
|
|
'''Функция запускаемая в отдельном процессе и порождающая демона при помощи
|
|
|
магии двойного форка.'''
|
|
|
base_dir = os.getcwd()
|
|
|
|
|
|
new_fork()
|
|
|
|
|
|
os.chdir("/")
|
|
|
os.setsid()
|
|
|
os.umask(0)
|
|
|
|
|
|
new_fork()
|
|
|
|
|
|
# Создаем файл, с помощью которого будем передавать pid демона-рабочего.
|
|
|
daemon_pid = str(os.getpid())
|
|
|
with open(pid_file, "w") as pid_file:
|
|
|
pid_file.write(daemon_pid)
|
|
|
|
|
|
daemon(io, wid, base_dir, command)
|
|
|
|
|
|
|
|
|
def new_fork() -> None:
|
|
|
'''Функция для создания форка текущего процесса.'''
|
|
|
try:
|
|
|
middle_pid = os.fork()
|
|
|
if middle_pid > 0:
|
|
|
sys.exit(0)
|
|
|
except OSError as error:
|
|
|
print(f"FORK ERROR: {error.errno} - {str(error)}")
|
|
|
sys.exit(1)
|