|
|
import json
|
|
|
import logging
|
|
|
from multiprocessing.connection import Listener, Connection
|
|
|
from typing import Union
|
|
|
|
|
|
|
|
|
class WorkerIOError(KeyboardInterrupt):
|
|
|
pass
|
|
|
|
|
|
|
|
|
class IOModule:
|
|
|
'''Класс модуля ввода/вывода для воркеров.'''
|
|
|
def __init__(self, listener: Listener, command_name: str):
|
|
|
self._listener: Listener = listener
|
|
|
self._connection: Union[Connection, None] = None
|
|
|
|
|
|
self._command: str = command_name
|
|
|
self._script: str = ""
|
|
|
|
|
|
@property
|
|
|
def command(self) -> str:
|
|
|
return self._command
|
|
|
|
|
|
@property
|
|
|
def script(self) -> str:
|
|
|
return self._script
|
|
|
|
|
|
@script.setter
|
|
|
def script(self, script: str) -> None:
|
|
|
self._script = script
|
|
|
|
|
|
def set_debug(self, msg: str) -> None:
|
|
|
self.output(msg, level=logging.DEBUG)
|
|
|
|
|
|
def set_info(self, msg: str) -> None:
|
|
|
self.output(msg, level=logging.INFO)
|
|
|
|
|
|
def set_warning(self, msg: str) -> None:
|
|
|
self.output(msg, level=logging.WARNING)
|
|
|
|
|
|
def set_error(self, msg: str) -> None:
|
|
|
self.output(msg, level=logging.ERROR)
|
|
|
|
|
|
def set_critical(self, msg: str) -> None:
|
|
|
self.output(msg, level=logging.CRITICAL)
|
|
|
|
|
|
def output(self, text: str, level: int = logging.INFO) -> None:
|
|
|
'''Метод для отправки серверу вывода с указанием уровня.'''
|
|
|
output_request = {"state": "exec",
|
|
|
"status": 0,
|
|
|
"data": {
|
|
|
"type": "message",
|
|
|
"logging": level,
|
|
|
"text": text,
|
|
|
"source": self._script
|
|
|
}
|
|
|
}
|
|
|
self.send(output_request)
|
|
|
|
|
|
def input(self, msg: str) -> str:
|
|
|
'''Метод через который возможен ввод данных в скрипт.'''
|
|
|
input_request = {"state": "input",
|
|
|
"status": 0,
|
|
|
"data": {"text": msg,
|
|
|
"source": f"{self.command}:{self.script}"}}
|
|
|
|
|
|
answer = None
|
|
|
while answer is None:
|
|
|
self.send(input_request)
|
|
|
answer = self.receive()
|
|
|
|
|
|
return answer['data']
|
|
|
|
|
|
def send(self, data: dict) -> None:
|
|
|
'''Метод для отправки данных серверу.'''
|
|
|
if self._connection is None:
|
|
|
raise WorkerIOError("No one is connected now.")
|
|
|
data = json.dumps(data).encode()
|
|
|
self._connection.send_bytes(data)
|
|
|
|
|
|
def receive(self) -> dict:
|
|
|
'''Метод для получения данных от сервера.'''
|
|
|
if self._connection is None:
|
|
|
raise WorkerIOError("No one is connected now.")
|
|
|
data: bytes = self._connection.recv_bytes()
|
|
|
return json.loads(data.decode())
|
|
|
|
|
|
def __enter__(self):
|
|
|
self._connection = self._listener.accept()
|
|
|
return self
|
|
|
|
|
|
def __exit__(self, exc_type, exc_value, exc_traceback):
|
|
|
if exc_type:
|
|
|
print("exc_type =", exc_type)
|
|
|
print("exc_value =", exc_value)
|
|
|
print("exc_traceback =", exc_traceback)
|
|
|
if exc_type is KeyboardInterrupt:
|
|
|
print("correctly")
|
|
|
if not self._connection.closed:
|
|
|
self._connection.close()
|