|
|
|
|
import os
|
|
|
|
|
import json
|
|
|
|
|
import socket
|
|
|
|
|
import logging
|
|
|
|
|
from typing import Union
|
|
|
|
|
from ..variables.loader import Datavars
|
|
|
|
|
from multiprocessing import Queue, Process
|
|
|
|
|
from ..commands.commands import CommandRunner, Command
|
|
|
|
|
# from time import sleep
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class WorkerIOError(KeyboardInterrupt):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class IOModule:
|
|
|
|
|
'''Класс модуля ввода/вывода для воркеров.'''
|
|
|
|
|
def __init__(self, socket_path: str):
|
|
|
|
|
self._sock = socket.socket(socket.AF_UNIX,
|
|
|
|
|
socket.SOCK_STREAM)
|
|
|
|
|
self._sock.bind(socket_path)
|
|
|
|
|
self._sock.listen()
|
|
|
|
|
self._connection = None
|
|
|
|
|
|
|
|
|
|
def input(self, msg: str) -> str:
|
|
|
|
|
'''Метод через который возможен ввод данных в скрипт.'''
|
|
|
|
|
input_request = json.dumps({"type": "input",
|
|
|
|
|
"msg": msg}) + '\0'
|
|
|
|
|
answer = None
|
|
|
|
|
while answer is None:
|
|
|
|
|
if not self._check_connection(self._connection):
|
|
|
|
|
self._connection = self._make_connection()
|
|
|
|
|
self._connection.sendall(input_request.encode())
|
|
|
|
|
answer = self._get()
|
|
|
|
|
return answer['data']
|
|
|
|
|
|
|
|
|
|
def set_debug(self, msg: str) -> None:
|
|
|
|
|
self.output(msg, level=logging.DEBUG)
|
|
|
|
|
|
|
|
|
|
def set_info(self, msg: str) -> None:
|
|
|
|
|
self.output(msg, level=logging.INFO)
|
|
|
|
|
|
|
|
|
|
def set_warning(self, msg: str) -> None:
|
|
|
|
|
self.output(msg, level=logging.WARNING)
|
|
|
|
|
|
|
|
|
|
def set_error(self, msg: str) -> None:
|
|
|
|
|
self.output(msg, level=logging.ERROR)
|
|
|
|
|
|
|
|
|
|
def set_critical(self, msg: str) -> None:
|
|
|
|
|
self.output(msg, level=logging.CRITICAL)
|
|
|
|
|
|
|
|
|
|
def output(self, msg: str, level: int = logging.INFO) -> None:
|
|
|
|
|
'''Метод для отправки серверу вывода с указанием уровня.'''
|
|
|
|
|
if not self._check_connection(self._connection):
|
|
|
|
|
self._connection = self._make_connection()
|
|
|
|
|
output_request = json.dumps({"type": "output",
|
|
|
|
|
"level": level,
|
|
|
|
|
"msg": msg}) + '\0'
|
|
|
|
|
self._connection.sendall(output_request.encode())
|
|
|
|
|
|
|
|
|
|
def send(self, data: dict) -> None:
|
|
|
|
|
'''Метод для отправки данных серверу.'''
|
|
|
|
|
if not self._check_connection(self._connection):
|
|
|
|
|
self._connection = self._make_connection()
|
|
|
|
|
data = json.dumps(data) + '\0'
|
|
|
|
|
self._connection.sendall(data.encode())
|
|
|
|
|
|
|
|
|
|
def receive(self) -> dict:
|
|
|
|
|
'''Метод для получения данных от сервера.'''
|
|
|
|
|
data = None
|
|
|
|
|
while data is None:
|
|
|
|
|
if not self._check_connection(self._connection):
|
|
|
|
|
self._connection = self._make_connection()
|
|
|
|
|
data = self._get()
|
|
|
|
|
return data
|
|
|
|
|
|
|
|
|
|
def _get(self) -> Union[None, dict]:
|
|
|
|
|
'''Метод для считывания данных, возможно, поделенных на кадры.'''
|
|
|
|
|
try:
|
|
|
|
|
data = b''
|
|
|
|
|
while True:
|
|
|
|
|
chunk = self._connection.recv(1024)
|
|
|
|
|
if not chunk:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
data += chunk
|
|
|
|
|
if not data.endswith(b'\0'):
|
|
|
|
|
if b'\0' in data:
|
|
|
|
|
# Если после символа конца сообщения есть еще какие-то
|
|
|
|
|
# данные -- считаем их наличие ошибкой.
|
|
|
|
|
raise WorkerIOError("Unexpected message.")
|
|
|
|
|
continue
|
|
|
|
|
return json.loads(data[:-1].decode())
|
|
|
|
|
except ConnectionResetError:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def _make_connection(self) -> socket.socket:
|
|
|
|
|
'''Метод для создания подключения.'''
|
|
|
|
|
connection, parent_address = self._sock.accept()
|
|
|
|
|
return connection
|
|
|
|
|
|
|
|
|
|
def _check_connection(self, connection: socket.socket) -> bool:
|
|
|
|
|
'''Метод для проверки соединения путем отправки на сокет пустого
|
|
|
|
|
сообщения.'''
|
|
|
|
|
if connection is None:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
connection.sendall(b'')
|
|
|
|
|
return True
|
|
|
|
|
except BrokenPipeError:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
def __del__(self) -> None:
|
|
|
|
|
if self._connection is not None:
|
|
|
|
|
self._connection.close()
|
|
|
|
|
self._sock.close()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Worker:
|
|
|
|
|
def __init__(self, wid: int, command: Command, datavars: Datavars):
|
|
|
|
|
self._wid: int = wid
|
|
|
|
|
self._datavars: Datavars = datavars
|
|
|
|
|
|
|
|
|
|
self._socket_path: str = f"./worker_{self._wid}.sock"
|
|
|
|
|
if os.path.exists(self._socket_path):
|
|
|
|
|
os.remove(self._socket_path)
|
|
|
|
|
self._input_queue: list = []
|
|
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
|
io = IOModule(self._socket_path)
|
|
|
|
|
title = io.receive()
|
|
|
|
|
print(title['msg'])
|
|
|
|
|
while True:
|
|
|
|
|
msg = "What is your desire?"
|
|
|
|
|
print(">", msg)
|
|
|
|
|
answer = io.input(msg)
|
|
|
|
|
print(f'> {answer}')
|
|
|
|
|
if answer == "stop":
|
|
|
|
|
break
|
|
|
|
|
elif answer == "output":
|
|
|
|
|
msg = "What kind of output you wanna see?"
|
|
|
|
|
print(">", msg)
|
|
|
|
|
answer = io.input(msg)
|
|
|
|
|
io.set_info(answer)
|
|
|
|
|
else:
|
|
|
|
|
msg = "I am sorry, but I could not help you("
|
|
|
|
|
print(">", msg)
|
|
|
|
|
io.output(msg)
|
|
|
|
|
print('STOPPED')
|
|
|
|
|
|
|
|
|
|
def initialize(self, io: IOModule):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
def run_command(self):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DeprecatedWorker:
|
|
|
|
|
def __init__(self, wid, loop, sockets_path: str = 'calculate/server/'):
|
|
|
|
|
self._wid = wid
|
|
|
|
|
self._event_loop = loop
|
|
|
|
|
|
|
|
|
|
# Создаем сокет для взаимодействия воркера и сервера.
|
|
|
|
|
socket_path = os.path.join(sockets_path, f'worker_{self._wid}')
|
|
|
|
|
if os.path.exists(socket_path):
|
|
|
|
|
os.remove(socket_path)
|
|
|
|
|
self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
|
|
|
|
self._socket.bind(socket_path)
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def _get_output(output_queue: Queue) -> dict:
|
|
|
|
|
return output_queue.get()
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def _main_loop(command, in_queue: Queue, out_queue: Queue):
|
|
|
|
|
data = in_queue.get()
|
|
|
|
|
print('\nworker for command:', command)
|
|
|
|
|
output = {"type": "log",
|
|
|
|
|
"level": "INFO",
|
|
|
|
|
"msg": f"recieved message {data['text']}"}
|
|
|
|
|
out_queue.put(output)
|
|
|
|
|
|
|
|
|
|
def run(self, command):
|
|
|
|
|
'''Метод для запуска процесса воркера с заданным '''
|
|
|
|
|
worker_process = Process(target=self._main_loop,
|
|
|
|
|
args=(command,
|
|
|
|
|
self._in_queue,
|
|
|
|
|
self._output_queue))
|
|
|
|
|
worker_process.start()
|