|
|
import os
|
|
|
import sys
|
|
|
import json
|
|
|
import struct
|
|
|
import signal
|
|
|
import asyncio
|
|
|
import logging
|
|
|
from multiprocessing import Process
|
|
|
from multiprocessing.connection import Listener, Connection
|
|
|
# import logging
|
|
|
# import time
|
|
|
from .daemon import daemon
|
|
|
# from commands import Command
|
|
|
|
|
|
from typing import Dict, Optional, List, Union, Tuple, Literal
|
|
|
|
|
|
from asyncio.unix_events import _UnixSelectorEventLoop
|
|
|
from asyncio import get_event_loop
|
|
|
|
|
|
|
|
|
LOG_LEVELS = {logging.ERROR: "ERROR",
|
|
|
logging.INFO: "INFO",
|
|
|
logging.WARNING: "WARNING",
|
|
|
logging.DEBUG: "DEBUG",
|
|
|
logging.CRITICAL: "CRITICAL",
|
|
|
}
|
|
|
|
|
|
|
|
|
class WorkerException(Exception):
|
|
|
pass
|
|
|
|
|
|
|
|
|
class UnexpectedWorkerFinish(WorkerException):
|
|
|
def __init__(self, reason: List[str], output: Optional[List[str]] = []):
|
|
|
"""Исключение кидаемое в случае, если воркер неожиданно умер."""
|
|
|
self.reason: List[str] = reason
|
|
|
self.output: List[dict] = output
|
|
|
|
|
|
|
|
|
class UnexpectedWorkerState(WorkerException):
|
|
|
"""Исключение кидаемое в случае, если состояние воркера не соответствует
|
|
|
ожидаемому."""
|
|
|
def __init__(self, state: str, expected: str):
|
|
|
self.state: str = state
|
|
|
self.expected: str = expected
|
|
|
|
|
|
|
|
|
class DaemonIsDead(WorkerException):
|
|
|
"""Исключение кидаемое в случае, если демон-воркер неожиданно оказался
|
|
|
мертв."""
|
|
|
def __init__(self, info: str, output: Optional[List[str]] = []):
|
|
|
self.info: str = info
|
|
|
self.output: List[dict] = output
|
|
|
|
|
|
|
|
|
class IncorrectConfiguration(WorkerException):
|
|
|
"""Исключение кидаемое при попытке запустить выполнение конфигурации, в
|
|
|
которой есть ошибки."""
|
|
|
def __init__(self, errors: Dict[str, str]):
|
|
|
self._errors = errors
|
|
|
|
|
|
|
|
|
class WorkersManager:
|
|
|
def __init__(self, loop: Optional[_UnixSelectorEventLoop] = None):
|
|
|
if loop is None:
|
|
|
self._loop = get_event_loop()
|
|
|
else:
|
|
|
self._loop = loop
|
|
|
|
|
|
self._configs: Dict[int, WorkerProtocol] = {}
|
|
|
self._execs: Dict[int, WorkerProtocol] = {}
|
|
|
self._wids: List[int] = []
|
|
|
|
|
|
async def make_worker(self, command) -> Tuple[Union[int, None],
|
|
|
Union[str, None]]:
|
|
|
"""Метод для создания воркеров."""
|
|
|
wid = self._get_wid()
|
|
|
try:
|
|
|
# Создаем воркера и получаем интерфейс для взаимодействия с ним.
|
|
|
worker_api = await self._create_worker(wid, command)
|
|
|
self._configs[wid] = worker_api
|
|
|
except Exception as error:
|
|
|
if wid in self._configs:
|
|
|
self._configs.pop(wid)
|
|
|
return None, str(error)
|
|
|
return wid, None
|
|
|
|
|
|
async def configure_worker(self, wid: int, parameters: List[dict],
|
|
|
reset: bool = False) -> Union[dict, None]:
|
|
|
"""Метод для модификации или сброса параметров конфигурации указанными
|
|
|
значениями.
|
|
|
"""
|
|
|
if not parameters:
|
|
|
return {}
|
|
|
if wid not in self._configs:
|
|
|
return None
|
|
|
|
|
|
worker_api: WorkerProtocol = self._configs[wid]
|
|
|
try:
|
|
|
result = await worker_api.configure(parameters)
|
|
|
except (UnexpectedWorkerFinish, DaemonIsDead):
|
|
|
worker_api.kill()
|
|
|
self._configs.pop(wid)
|
|
|
raise
|
|
|
|
|
|
return result
|
|
|
|
|
|
async def cancel_worker_configuration(self, wid: int):
|
|
|
"""Метод для завершения конфигурации воркера и завершения его работы.
|
|
|
"""
|
|
|
if wid not in self._configs:
|
|
|
return None
|
|
|
|
|
|
worker_api: WorkerProtocol = self._configs[wid]
|
|
|
output = await worker_api.cancel_configuration()
|
|
|
worker_api.delete_socket()
|
|
|
self._configs.pop(wid)
|
|
|
|
|
|
return output
|
|
|
|
|
|
def clean(self):
|
|
|
"""Метод для убийства всех работающих на данный момент демонов."""
|
|
|
# TODO доработать его логику так, чтобы при перезапуске сервера этот
|
|
|
# метод никого не убивал.
|
|
|
for wid, worker in {**self._configs, **self._execs}.items():
|
|
|
result = False
|
|
|
try:
|
|
|
result = worker.kill()
|
|
|
finally:
|
|
|
if result:
|
|
|
print(f"[*] Worker {wid} is killed.")
|
|
|
else:
|
|
|
print(f"[x] Worker {wid} is not killed.")
|
|
|
continue
|
|
|
|
|
|
def get_config(self, wid: int):
|
|
|
if wid in self._configs:
|
|
|
return self._configs[wid]
|
|
|
return None
|
|
|
|
|
|
def get_execution(self, wid: int):
|
|
|
if wid in self._execs:
|
|
|
return self._execs[wid]
|
|
|
return None
|
|
|
|
|
|
def run_execution(self, wid: int) -> Tuple[int, Union[dict, None]]:
|
|
|
'''Метод для запуска исполнения команды, используя указанную
|
|
|
конфигурацию.
|
|
|
Вывод:
|
|
|
0, None -- исполнение успешно запущено;
|
|
|
1, None -- конфигурация с данным wid не найдена;
|
|
|
2, errors -- конфигурация с данным wid содержит ошибки;
|
|
|
3, None -- исполнение с данным wid уже существует;
|
|
|
'''
|
|
|
if wid not in self._configs:
|
|
|
return 1, None
|
|
|
if self._configs[wid].status > 0:
|
|
|
# Берем ошибки, как-то.
|
|
|
errors = {}
|
|
|
return 2, errors
|
|
|
if wid in self._execs:
|
|
|
return 3, None
|
|
|
|
|
|
# worker = self._configs.pop(wid)
|
|
|
return 0, None
|
|
|
|
|
|
async def _create_worker(self, wid: int, command) -> "WorkerProtocol":
|
|
|
# Путь к временному файлу, в котором будет лежать pid демона-рабочего.
|
|
|
pid_file_path: str = os.path.join(os.getcwd(), f"pid_file_{wid}")
|
|
|
socket_path: str = os.path.join(os.getcwd(),
|
|
|
f"worker_{wid}.sock")
|
|
|
if os.path.exists(socket_path):
|
|
|
os.remove(socket_path)
|
|
|
|
|
|
process_runner = Process(target=daemonize_worker,
|
|
|
args=(socket_path, wid, pid_file_path,
|
|
|
command)
|
|
|
)
|
|
|
process_runner.start()
|
|
|
process_runner.join()
|
|
|
|
|
|
daemon_pid = await self._get_daemon_pid(pid_file_path)
|
|
|
self._delete_pid_file(pid_file_path)
|
|
|
|
|
|
transport, worker_interface = await self._loop.create_unix_connection(
|
|
|
lambda: WorkerProtocol(daemon_pid,
|
|
|
socket_path),
|
|
|
socket_path)
|
|
|
return worker_interface
|
|
|
|
|
|
def _get_wid(self):
|
|
|
if not self._wids:
|
|
|
worker_id = 0
|
|
|
else:
|
|
|
worker_id = self._wids[-1] + 1
|
|
|
self._wids.append(worker_id)
|
|
|
return worker_id
|
|
|
|
|
|
async def _get_daemon_pid(self, pid_file_path: str) -> int:
|
|
|
while True:
|
|
|
if not os.path.exists(pid_file_path):
|
|
|
# Если файла нет -- уступаем работу другим воркерам.
|
|
|
await asyncio.sleep(0)
|
|
|
with open(pid_file_path, "r") as pid_file:
|
|
|
daemon_pid = int(pid_file.readline())
|
|
|
return daemon_pid
|
|
|
|
|
|
def _delete_pid_file(self, pid_file_path: str) -> None:
|
|
|
try:
|
|
|
os.remove(pid_file_path)
|
|
|
except OSError:
|
|
|
pass
|
|
|
|
|
|
def __repr__(self):
|
|
|
configs = ["Configurations:"]
|
|
|
configs.extend([f"wid={wid}" for wid in self._configs.keys()])
|
|
|
configs_repr = "\n\t".join(configs)
|
|
|
|
|
|
execs = ["Executions:"]
|
|
|
execs.extend([f"wid={wid}" for wid in self._execs.keys()])
|
|
|
execs_repr = "\n\t".join(execs)
|
|
|
|
|
|
return f"{configs_repr}\n{execs_repr}"
|
|
|
|
|
|
|
|
|
def daemonize_worker(socket_path: str, wid: int, pid_file: str,
|
|
|
command) -> None:
|
|
|
'''Функция запускаемая в отдельном процессе и порождающая демона при помощи
|
|
|
магии двойного форка.'''
|
|
|
base_dir = os.getcwd()
|
|
|
|
|
|
new_fork()
|
|
|
|
|
|
os.chdir("/")
|
|
|
os.setsid()
|
|
|
os.umask(0)
|
|
|
|
|
|
new_fork()
|
|
|
|
|
|
# Создаем файл, с помощью которого будем передавать pid демона-рабочего.
|
|
|
daemon_pid = str(os.getpid())
|
|
|
with open(pid_file, "w") as pid_file:
|
|
|
pid_file.write(daemon_pid)
|
|
|
|
|
|
with Listener(socket_path) as listener:
|
|
|
daemon(listener, wid, base_dir, command)
|
|
|
|
|
|
|
|
|
def new_fork() -> None:
|
|
|
'''Функция для создания форка текущего процесса.'''
|
|
|
try:
|
|
|
middle_pid = os.fork()
|
|
|
if middle_pid > 0:
|
|
|
sys.exit(0)
|
|
|
except OSError as error:
|
|
|
print(f"FORK ERROR: {error.errno} - {str(error)}")
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
class WorkerProtocol(asyncio.Protocol):
|
|
|
"""Класс протокола управления воркером."""
|
|
|
def __init__(self, pid: int, socket_path: str):
|
|
|
self.loop = asyncio.get_running_loop()
|
|
|
|
|
|
self._pid: int = pid
|
|
|
self._socket_path: str = socket_path
|
|
|
|
|
|
# Объект Event, предназначенный для остановки записи при переполнении
|
|
|
# буффера записи в трaнспорте.
|
|
|
self._can_write = asyncio.Event()
|
|
|
self._can_write.clear()
|
|
|
|
|
|
self._msg_queue = asyncio.Queue()
|
|
|
|
|
|
# Контекст чтения данных воркера.
|
|
|
self._buffer = bytearray()
|
|
|
self._msg_size: int = 0
|
|
|
|
|
|
self._connection_closed: bool = True
|
|
|
|
|
|
async def configure(self, parameters: List[dict],
|
|
|
reset: bool = False) -> list:
|
|
|
"""Метод для конфигурации воркера указанными значениями параметров.
|
|
|
Возвращает список ошибок, если таковые есть, или пустой список, если
|
|
|
никаких ошибок нет.
|
|
|
"""
|
|
|
message = self._make_worker_message("config", int(reset), parameters)
|
|
|
await self._write(message)
|
|
|
response = await self._read()
|
|
|
response = json.loads(response)
|
|
|
|
|
|
return response["data"]
|
|
|
|
|
|
async def cancel_configuration(self) -> List[dict]:
|
|
|
"""Метод для остановки конфигурации воркера и его работы в целом."""
|
|
|
stop_message: bytes = self._make_worker_message("finish", 1, None)
|
|
|
await self._write(stop_message)
|
|
|
response = await self._read()
|
|
|
response = json.loads(response)
|
|
|
|
|
|
return response["data"]
|
|
|
|
|
|
def connection_made(self, transport: asyncio.Transport) -> None:
|
|
|
self.transport = transport
|
|
|
self._can_write.set()
|
|
|
self._connection_closed: bool = False
|
|
|
print("Connection made.")
|
|
|
|
|
|
def connection_lost(self, exc: BaseException) -> None:
|
|
|
if exc:
|
|
|
print("Connection lost. Error:", str(exc))
|
|
|
else:
|
|
|
print("Connection lost.")
|
|
|
self._connection_closed: bool = True
|
|
|
|
|
|
def data_received(self, data: bytes) -> None:
|
|
|
self._buffer.extend(data)
|
|
|
|
|
|
while True:
|
|
|
if not self._msg_size:
|
|
|
if len(self._buffer) < 4:
|
|
|
return
|
|
|
size_length = struct.calcsize(">i")
|
|
|
self._msg_size, = struct.unpack(">i",
|
|
|
self._buffer[:size_length])
|
|
|
self._buffer = self._buffer[size_length:]
|
|
|
|
|
|
if len(self._buffer) >= self._msg_size:
|
|
|
data = self._buffer[:self._msg_size]
|
|
|
self._buffer = self._buffer[self._msg_size:]
|
|
|
self._msg_size = 0
|
|
|
|
|
|
self._msg_queue.put_nowait(data.decode())
|
|
|
else:
|
|
|
break
|
|
|
|
|
|
def pause_writing(self) -> None:
|
|
|
self._can_write.clear()
|
|
|
|
|
|
def resume_writing(self) -> None:
|
|
|
self._can_write.set()
|
|
|
|
|
|
async def _read(self) -> str:
|
|
|
if self._connection_closed and self._msg_queue.empty():
|
|
|
return None
|
|
|
return await self._msg_queue.get()
|
|
|
|
|
|
async def read_all(self) -> List[str]:
|
|
|
if self._msg_queue.empty():
|
|
|
if self._connection_closed:
|
|
|
return None
|
|
|
else:
|
|
|
return await self._msg_queue.get()
|
|
|
output = []
|
|
|
while not self._msg_queue.empty():
|
|
|
output.append(self._msg_queue.get_nowait())
|
|
|
return output
|
|
|
|
|
|
async def _write(self, data: Union[str, bytes]) -> None:
|
|
|
await self._can_write.wait()
|
|
|
if isinstance(data, str):
|
|
|
data = data.encode()
|
|
|
self.transport.write(struct.pack(">i", len(data)))
|
|
|
self.transport.write(data)
|
|
|
|
|
|
def _make_worker_message(
|
|
|
self,
|
|
|
state: Literal["config", "finish", "exec", "input"],
|
|
|
status: int, data: Union[list, dict]) -> dict:
|
|
|
return json.dumps({"state": state,
|
|
|
"status": status,
|
|
|
"data": data}).encode()
|
|
|
|
|
|
def delete_socket(self) -> None:
|
|
|
"""Метод для удаления сокета."""
|
|
|
try:
|
|
|
os.remove(self._socket_path)
|
|
|
self.transport.close()
|
|
|
self._connection_closed = True
|
|
|
except OSError:
|
|
|
pass
|
|
|
|
|
|
def kill(self) -> bool:
|
|
|
"""Метод для убийства демона."""
|
|
|
print("GONNA KILLA DAEMON:", self._pid)
|
|
|
if self.is_daemon_alive():
|
|
|
try:
|
|
|
os.kill(self._pid, signal.SIGKILL)
|
|
|
except OSError:
|
|
|
return False
|
|
|
else:
|
|
|
print("ALREADY DEAD")
|
|
|
return True
|
|
|
|
|
|
def stop(self) -> bool:
|
|
|
print("STOP DAEMON:", self._daemon_pid)
|
|
|
if self.is_daemon_alive():
|
|
|
try:
|
|
|
# Для остановки демона посылаем в него KeyboardInterrupt.
|
|
|
os.kill(self._pid, signal.SIGINT)
|
|
|
except OSError as error:
|
|
|
print(f"ERROR: Failed stopping daemon PID={self._pid}. "
|
|
|
f"Reason: {str(error)}")
|
|
|
return False
|
|
|
return True
|
|
|
|
|
|
def is_daemon_alive(self) -> bool:
|
|
|
try:
|
|
|
os.kill(self._pid, 0)
|
|
|
return True
|
|
|
except OSError:
|
|
|
return False
|
|
|
|
|
|
|
|
|
class WorkerIOError(KeyboardInterrupt):
|
|
|
pass
|
|
|
|
|
|
|
|
|
class IOModule:
|
|
|
'''Класс модуля ввода/вывода для воркеров.'''
|
|
|
def __init__(self, listener: Listener, command_name: str):
|
|
|
self._listener: Listener = listener
|
|
|
self._connection: Union[Connection, None] = None
|
|
|
|
|
|
self._command: str = command_name
|
|
|
self._script: str = ""
|
|
|
|
|
|
@property
|
|
|
def command(self) -> str:
|
|
|
return self._command
|
|
|
|
|
|
@property
|
|
|
def script(self) -> str:
|
|
|
return self._script
|
|
|
|
|
|
@script.setter
|
|
|
def script(self, script: str) -> None:
|
|
|
self._script = script
|
|
|
|
|
|
def set_debug(self, msg: str) -> None:
|
|
|
self.output(msg, level=logging.DEBUG)
|
|
|
|
|
|
def set_info(self, msg: str) -> None:
|
|
|
self.output(msg, level=logging.INFO)
|
|
|
|
|
|
def set_warning(self, msg: str) -> None:
|
|
|
self.output(msg, level=logging.WARNING)
|
|
|
|
|
|
def set_error(self, msg: str) -> None:
|
|
|
self.output(msg, level=logging.ERROR)
|
|
|
|
|
|
def set_critical(self, msg: str) -> None:
|
|
|
self.output(msg, level=logging.CRITICAL)
|
|
|
|
|
|
def output(self, text: str, level: int = logging.INFO) -> None:
|
|
|
'''Метод для отправки серверу вывода с указанием уровня.'''
|
|
|
output_request = {"state": "exec",
|
|
|
"status": 0,
|
|
|
"data": {
|
|
|
"type": "message",
|
|
|
"logging": level,
|
|
|
"text": text,
|
|
|
"source": self._script
|
|
|
}
|
|
|
}
|
|
|
self.send(output_request)
|
|
|
|
|
|
def input(self, msg: str) -> str:
|
|
|
'''Метод через который возможен ввод данных в скрипт.'''
|
|
|
input_request = {"state": "input",
|
|
|
"status": 0,
|
|
|
"data": {"text": msg,
|
|
|
"source": f"{self.command}:{self.script}"}}
|
|
|
|
|
|
answer = None
|
|
|
while answer is None:
|
|
|
self.send(input_request)
|
|
|
answer = self.receive()
|
|
|
|
|
|
return answer['data']
|
|
|
|
|
|
def send(self, data: dict) -> None:
|
|
|
'''Метод для отправки данных серверу.'''
|
|
|
if self._connection is None:
|
|
|
raise WorkerIOError("No one is connected now.")
|
|
|
data = json.dumps(data).encode()
|
|
|
self._connection.send_bytes(data)
|
|
|
|
|
|
def receive(self) -> dict:
|
|
|
'''Метод для получения данных от сервера.'''
|
|
|
if self._connection is None:
|
|
|
raise WorkerIOError("No one is connected now.")
|
|
|
data: bytes = self._connection.recv_bytes()
|
|
|
return json.loads(data.decode())
|
|
|
|
|
|
def __enter__(self):
|
|
|
self._connection = self._listener.accept()
|
|
|
return self
|
|
|
|
|
|
def __exit__(self, exc_type, exc_value, exc_traceback):
|
|
|
if exc_type:
|
|
|
print("exc_type =", exc_type)
|
|
|
print("exc_value =", exc_value)
|
|
|
print("exc_traceback =", exc_traceback)
|
|
|
if exc_type is KeyboardInterrupt:
|
|
|
print("correctly")
|
|
|
if not self._connection.closed:
|
|
|
self._connection.close()
|