#! env/bin/python3 import sys import asyncio import aiohttp import logging from typing import List, Tuple, Any loop = asyncio.get_event_loop() SERVER_DOMAIN = "http://127.0.0.1:2007" LOG_LEVELS = {logging.ERROR: "ERROR", logging.INFO: "INFO", logging.WARNING: "WARNING", logging.DEBUG: "DEBUG", logging.CRITICAL: "CRITICAL", } async def check_server(client: aiohttp.ClientSession): try: async with client.get(f"{SERVER_DOMAIN}/") as response: assert response.status == 200 data = await response.json() assert data == {"status": "active"} print(f"Connected to the Calculate Server on {SERVER_DOMAIN}") return True except Exception: print("Can not connect to the calculate server.") return False async def get_commands(client: aiohttp.ClientSession): async with client.get(f"{SERVER_DOMAIN}/commands") as response: assert response.status == 200 data = await response.json() return data async def create_worker(client: aiohttp.ClientSession, command: str, arguments: List[str]): worker_args = {"arguments": arguments} async with client.post(f"{SERVER_DOMAIN}/workers/{command}", json=worker_args) as response: assert response.status == 200 data = await response.json() return data["wid"] async def start_worker(client: aiohttp.ClientSession, wid: int): async with client.post(f"{SERVER_DOMAIN}/workers/{wid}/start") as response: assert response.status == 200 data = await response.json() return data async def get_worker_messages(client: aiohttp.ClientSession, wid: int): async with client.get(f"{SERVER_DOMAIN}/workers/{wid}/messages" ) as response: assert response.status == 200 data = await response.json() return data["data"] async def send_data_to_worker(client: aiohttp.ClientSession, wid: int, data: Any): first_try = True while True: answer = {"data": data} try: async with client.post(f"{SERVER_DOMAIN}/workers/{wid}/send", json=answer) as response: assert response.status == 200 data = await response.json() return except aiohttp.client_exceptions.ClientOSError: if first_try: first_try = False continue else: raise # return data async def finish_worker(client: aiohttp.ClientSession, wid: int): async with client.post(f"{SERVER_DOMAIN}/workers/{wid}/finish" ) as response: assert response.status == 200 data = await response.json() return data async def main(): print("Calculate Console Client 0.0.1") async with aiohttp.ClientSession( connector=aiohttp.TCPConnector(force_close=False)) as client: if not await check_server(client): return commands = await get_commands(client) command, command_args = get_console_command() message = check_command(command, commands) if message: print(message) return wid = await create_worker(client, command, command_args) try: print("CREATED WORKER ID:", wid) status_data = await start_worker(client, wid) if status_data.get("status", None) == "error": print("ERROR:", status_data.get("description", "NONE")) return finished = False while not finished: messages = await get_worker_messages(client, wid) for message in messages: if message["type"] == "output": print(f"{LOG_LEVELS[message['level']]}:" f" {message['msg']}") elif message["type"] == "input": print(message["msg"]) input_data = input(">> ") await send_data_to_worker(client, wid, input_data) elif (message["type"] == "control" and message["action"] == "finish"): finished = True finally: await finish_worker(client, wid) def get_console_command() -> Tuple[str, List[str]]: offset = 1 if sys.argv[0].endswith("client.py") else 0 if len(sys.argv) < 1 + offset: command = None command_args = [] else: command = sys.argv[offset] command_args = sys.argv[1 + offset:] return command, command_args def check_command(command: str, available_commands: List[str]) -> str: if command is None: return "Command is not set." if command not in available_commands: return "Command is not available." return '' if __name__ == "__main__": try: loop.run_until_complete(main()) except KeyboardInterrupt: print("\r<< Keyboard interrupt.") loop.close()