from ..variables.loader import Datavars from ..commands.commands import Command from ..logging import dictLogConfig from logging.config import dictConfig from typing import ( Callable, Optional, NoReturn ) from logging import getLogger from fastapi import FastAPI from .worker import Worker import importlib import uvicorn import asyncio import os # TODO # 1. Разобраться с описанием команд как ресурсов и со всем, что от них зависит. # 2. Разобраться с объектами воркеров. И способом их функционирования. class Server: def __init__(self, socket_path: str = './input.sock', datavars_path: str = 'calculate/vars/', commands_path: str = 'calculate/commands'): self._app = FastAPI() self._socket_path = socket_path self._event_loop = asyncio.get_event_loop() # Конфигурируем логгирование. dictConfig(dictLogConfig) self._logger = getLogger("main") self.log_msg = {'DEBUG': self._logger.debug, 'INFO': self._logger.info, 'WARNING': self._logger.warning, 'ERROR': self._logger.error, 'CRITICAL': self._logger.critical} self._datavars = Datavars(variables_path=datavars_path, logger=self._logger) # Словарь описаний команд. self._commands = self._get_commands_list(commands_path) # Словарь CID и экземпляров команд, передаваемых воркерам. self._commands_instances = {} # Словарь WID и экземпляров процессов-воркеров, передаваемых воркерам. self._workers = {} # Соответствие путей обработчикам запросов для HTTP-метода GET. self._add_routes(self._app.get, {"/": self._get_root, "/commands": self._get_commands, "/commands/{cid}": self._get_command, "/workers/{wid}": self._get_worker}) self._add_routes(self._app.post, {"/commands/{command_id}": self._post_command}) def _get_commands_list(self, commands_path: str) -> list: '''Метод для получения совокупности описаний команд.''' output = {} package = ".".join(commands_path.split("/")) for entry in os.scandir(commands_path): if (not entry.name.endswith('.py') or entry.name in {"commands.py", "__init__.py"}): continue module_name = entry.name[:-3] try: module = importlib.import_module("{}.{}".format(package, module_name)) for obj in dir(module): if type(module.__getattribute__(obj)) == Command: command_object = module.__getattribute__(obj) output[command_object.id] = command_object except Exception: continue return output # Обработчики запросов серверу. async def _get_root(self) -> dict: '''Обработчик корневых запросов.''' return {'msg': 'root msg'} async def _get_commands(self) -> dict: '''Обработчик, отвечающий на запросы списка команд.''' response = {} for command_id, command_object in self._commands.items(): response.update({command_id: {"title": command_object.title, "category": command_object.category, "icon": command_object.icon, "command": command_object.command}}) return response async def _get_command(self, cid: int) -> dict: '''Обработчик запросов списка команд.''' if cid not in self._commands_instances: # TODO добавить какую-то обработку ошибки. pass return {'id': cid, 'name': f'command_{cid}'} async def _get_worker(self, wid: int): '''Тестовый обработчик.''' self._make_worker(wid=wid) worker = self._workers[wid] worker.run(None) await worker.send({"text": "INFO"}) data = await worker.get() if data['type'] == 'log': self.log_msg[data['level']](data['msg']) return data async def _post_command(self, command_id: str) -> int: if command_id not in self._commands: # TODO добавить какую-то обработку ошибки. pass return # Обработчики сообщений воркеров. # Вспомогательные методы. def _add_routes(self, method: Callable, routes: dict) -> NoReturn: '''Метод для добавления методов.''' for path, handler in routes.items(): router = method(path) router(handler) def _make_worker(self, wid: Optional[int] = None): '''Метод для создания воркера для команды.''' if wid is not None: self._workers[wid] = Worker(wid, self._event_loop) return wid elif not self._workers: self._workers[0] = Worker(0, self._event_loop) return 0 else: wid = max(self._workers.keys()) + 1 self._workers[wid] = Worker(wid, self._event_loop) return wid def _make_command(self, command_id: str) -> int: '''Метод для создания команды по ее описанию.''' command_description = self._commands[command] @property def app(self): return self._app def run(self): '''Метод для запуска сервера.''' # Выгружаем список команд. uvicorn.run(self._app, uds=self._socket_path) if __name__ == '__main__': server = Server() server.run()