You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

414 lines
15 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import sys
import json
import struct
import signal
import asyncio
import logging
from multiprocessing import Process
from multiprocessing.connection import Listener
# import logging
# import time
from daemon import daemon
from commands import Command
from typing import Dict, Optional, List, Union, Tuple, Literal
from asyncio.unix_events import _UnixSelectorEventLoop
from asyncio import get_event_loop
LOG_LEVELS = {logging.ERROR: "ERROR",
logging.INFO: "INFO",
logging.WARNING: "WARNING",
logging.DEBUG: "DEBUG",
logging.CRITICAL: "CRITICAL",
}
class WorkerException(Exception):
pass
class UnexpectedWorkerFinish(WorkerException):
def __init__(self, reason: List[str], output: Optional[List[str]] = []):
"""Исключение кидаемое в случае, если воркер неожиданно умер."""
self.reason: List[str] = reason
self.output: List[dict] = output
class UnexpectedWorkerState(WorkerException):
"""Исключение кидаемое в случае, если состояние воркера не соответствует
ожидаемому."""
def __init__(self, state: str, expected: str):
self.state: str = state
self.expected: str = expected
class DaemonIsDead(WorkerException):
"""Исключение кидаемое в случае, если демон-воркер неожиданно оказался
мертв."""
def __init__(self, info: str, output: Optional[List[str]] = []):
self.info: str = info
self.output: List[dict] = output
class IncorrectConfiguration(WorkerException):
"""Исключение кидаемое при попытке запустить выполнение конфигурации, в
которой есть ошибки."""
def __init__(self, errors: Dict[str, str]):
self._errors = errors
class WorkersManager:
def __init__(self, loop: Optional[_UnixSelectorEventLoop] = None):
if loop is None:
self._loop = get_event_loop()
else:
self._loop = loop
self._configs: Dict[int, WorkerProtocol] = {}
self._execs: Dict[int, WorkerProtocol] = {}
self._wids: List[int] = []
async def make_worker(self, command: Command) -> Tuple[Union[int, None],
Union[str, None]]:
"""Метод для создания воркеров."""
wid = self._get_wid()
try:
# Создаем воркера и получаем интерфейс для взаимодействия с ним.
worker_api = await self._create_worker(wid, command)
self._configs[wid] = worker_api
except Exception as error:
if wid in self._configs:
self._configs.pop(wid)
return None, str(error)
return wid, None
async def configure_worker(self, wid: int, parameters: List[dict],
reset: bool = False) -> Union[dict, None]:
"""Метод для модификации или сброса параметров конфигурации указанными
значениями.
"""
if not parameters:
return {}
if wid not in self._configs:
return None
worker_api: WorkerProtocol = self._configs[wid]
try:
result = await worker_api.configure(parameters)
except (UnexpectedWorkerFinish, DaemonIsDead):
worker_api.kill()
self._configs.pop(wid)
raise
return result
async def cancel_worker_configuration(self, wid: int):
"""Метод для завершения конфигурации воркера и завершения его работы.
"""
if wid not in self._configs:
return None
worker_api: WorkerProtocol = self._configs[wid]
output = await worker_api.cancel_configuration()
worker_api.delete_socket()
self._configs.pop(wid)
return output
def clean(self):
"""Метод для убийства всех работающих на данный момент демонов."""
# TODO доработать его логику так, чтобы при перезапуске сервера этот
# метод никого не убивал.
for wid, worker in {**self._configs, **self._execs}.items():
result = False
try:
result = worker.kill()
finally:
if result:
print(f"[*] Worker {wid} is killed.")
else:
print(f"[x] Worker {wid} is not killed.")
continue
def get_config(self, wid: int):
if wid in self._configs:
return self._configs[wid]
return None
def get_execution(self, wid: int):
if wid in self._execs:
return self._execs[wid]
return None
def run_execution(self, wid: int) -> Tuple[int, Union[dict, None]]:
'''Метод для запуска исполнения команды, используя указанную
конфигурацию.
Вывод:
0, None -- исполнение успешно запущено;
1, None -- конфигурация с данным wid не найдена;
2, errors -- конфигурация с данным wid содержит ошибки;
3, None -- исполнение с данным wid уже существует;
'''
if wid not in self._configs:
return 1, None
if self._configs[wid].status > 0:
# Берем ошибки, как-то.
errors = {}
return 2, errors
if wid in self._execs:
return 3, None
# worker = self._configs.pop(wid)
return 0, None
async def _create_worker(self, wid: int, command: Command
) -> "WorkerProtocol":
# Путь к временному файлу, в котором будет лежать pid демона-рабочего.
pid_file_path: str = os.path.join(os.getcwd(), f"pid_file_{wid}")
socket_path: str = os.path.join(os.getcwd(),
f"worker_{wid}.sock")
if os.path.exists(socket_path):
os.remove(socket_path)
process_runner = Process(target=daemonize_worker,
args=(socket_path, wid, pid_file_path,
command)
)
process_runner.start()
process_runner.join()
daemon_pid = await self._get_daemon_pid(pid_file_path)
self._delete_pid_file(pid_file_path)
transport, worker_interface = await self._loop.create_unix_connection(
lambda: WorkerProtocol(daemon_pid,
socket_path),
socket_path)
return worker_interface
def _get_wid(self):
if not self._wids:
worker_id = 0
else:
worker_id = self._wids[-1] + 1
self._wids.append(worker_id)
return worker_id
async def _get_daemon_pid(self, pid_file_path: str) -> int:
while True:
if not os.path.exists(pid_file_path):
# Если файла нет -- уступаем работу другим воркерам.
await asyncio.sleep(0)
with open(pid_file_path, "r") as pid_file:
daemon_pid = int(pid_file.readline())
return daemon_pid
def _delete_pid_file(self, pid_file_path: str) -> None:
try:
os.remove(pid_file_path)
except OSError:
pass
def __repr__(self):
configs = ["Configurations:"]
configs.extend([f"wid={wid}" for wid in self._configs.keys()])
configs_repr = "\n\t".join(configs)
execs = ["Executions:"]
execs.extend([f"wid={wid}" for wid in self._execs.keys()])
execs_repr = "\n\t".join(execs)
return f"{configs_repr}\n{execs_repr}"
def daemonize_worker(socket_path: str, wid: int, pid_file: str,
command: Command) -> None:
'''Функция запускаемая в отдельном процессе и порождающая демона при помощи
магии двойного форка.'''
base_dir = os.getcwd()
new_fork()
os.chdir("/")
os.setsid()
os.umask(0)
new_fork()
# Создаем файл, с помощью которого будем передавать pid демона-рабочего.
daemon_pid = str(os.getpid())
with open(pid_file, "w") as pid_file:
pid_file.write(daemon_pid)
with Listener(socket_path) as listener:
daemon(listener, wid, base_dir, command)
def new_fork() -> None:
'''Функция для создания форка текущего процесса.'''
try:
middle_pid = os.fork()
if middle_pid > 0:
sys.exit(0)
except OSError as error:
print(f"FORK ERROR: {error.errno} - {str(error)}")
sys.exit(1)
class WorkerProtocol(asyncio.Protocol):
"""Класс протокола управления воркером."""
def __init__(self, pid: int, socket_path: str):
self.loop = asyncio.get_running_loop()
self._pid: int = pid
self._socket_path: str = socket_path
# Объект Event, предназначенный для остановки записи при переполнении
# буффера записи в трспорте.
self._can_write = asyncio.Event()
self._can_write.clear()
self._msg_queue = asyncio.Queue()
# Контекст чтения данных воркера.
self._buffer = bytearray()
self._msg_size: int = 0
self._connection_closed: bool = True
async def configure(self, parameters: List[dict],
reset: bool = False) -> list:
"""Метод для конфигурации воркера указанными значениями параметров.
Возвращает список ошибок, если таковые есть, или пустой список, если
никаких ошибок нет.
"""
message = self._make_worker_message("config", int(reset), parameters)
await self._write(message)
response = await self._read()
response = json.loads(response)
return response["data"]
async def cancel_configuration(self) -> List[dict]:
"""Метод для остановки конфигурации воркера и его работы в целом."""
stop_message: bytes = self._make_worker_message("finish", 1, None)
await self._write(stop_message)
response = await self._read()
response = json.loads(response)
return response["data"]
def connection_made(self, transport: asyncio.Transport) -> None:
self.transport = transport
self._can_write.set()
self._connection_closed: bool = False
print("Connection made.")
def connection_lost(self, exc: BaseException) -> None:
if exc:
print("Connection lost. Error:", str(exc))
else:
print("Connection lost.")
self._connection_closed: bool = True
def data_received(self, data: bytes) -> None:
self._buffer.extend(data)
while True:
if not self._msg_size:
if len(self._buffer) < 4:
return
size_length = struct.calcsize(">i")
self._msg_size, = struct.unpack(">i",
self._buffer[:size_length])
self._buffer = self._buffer[size_length:]
if len(self._buffer) >= self._msg_size:
data = self._buffer[:self._msg_size]
self._buffer = self._buffer[self._msg_size:]
self._msg_size = 0
self._msg_queue.put_nowait(data.decode())
else:
break
def pause_writing(self) -> None:
self._can_write.clear()
def resume_writing(self) -> None:
self._can_write.set()
async def _read(self) -> str:
if self._connection_closed and self._msg_queue.empty():
return None
return await self._msg_queue.get()
async def read_all(self) -> List[str]:
if self._msg_queue.empty():
if self._connection_closed:
return None
else:
return await self._msg_queue.get()
output = []
while not self._msg_queue.empty():
output.append(self._msg_queue.get_nowait())
return output
async def _write(self, data: Union[str, bytes]) -> None:
await self._can_write.wait()
if isinstance(data, str):
data = data.encode()
self.transport.write(struct.pack(">i", len(data)))
self.transport.write(data)
def _make_worker_message(
self,
state: Literal["config", "finish", "exec", "input"],
status: int, data: Union[list, dict]) -> dict:
return json.dumps({"state": state,
"status": status,
"data": data}).encode()
def delete_socket(self) -> None:
"""Метод для удаления сокета."""
try:
os.remove(self._socket_path)
self.transport.close()
self._connection_closed = True
except OSError:
pass
def kill(self) -> bool:
"""Метод для убийства демона."""
print("GONNA KILLA DAEMON:", self._pid)
if self.is_daemon_alive():
try:
os.kill(self._pid, signal.SIGKILL)
except OSError:
return False
else:
print("ALREADY DEAD")
return True
def stop(self) -> bool:
print("STOP DAEMON:", self._daemon_pid)
if self.is_daemon_alive():
try:
# Для остановки демона посылаем в него KeyboardInterrupt.
os.kill(self._pid, signal.SIGINT)
except OSError as error:
print(f"ERROR: Failed stopping daemon PID={self._pid}. "
f"Reason: {str(error)}")
return False
return True
def is_daemon_alive(self) -> bool:
try:
os.kill(self._pid, 0)
return True
except OSError:
return False