|
|
import asyncio
|
|
|
|
|
|
# import logging
|
|
|
import uvicorn
|
|
|
|
|
|
from starlette.responses import JSONResponse
|
|
|
from starlette.requests import Request
|
|
|
|
|
|
from fastapi import FastAPI, status, HTTPException
|
|
|
from fastapi.encoders import jsonable_encoder
|
|
|
|
|
|
from pydantic.main import ModelMetaclass
|
|
|
from pydantic import parse_obj_as
|
|
|
|
|
|
from typing import _GenericAlias, Union, Optional
|
|
|
|
|
|
from commands import command_0, command_1, command_2
|
|
|
from utils import ResponseStructure, Commands
|
|
|
from worker import (
|
|
|
WorkersManager,
|
|
|
UnexpectedWorkerFinish,
|
|
|
UnexpectedWorkerState,
|
|
|
DaemonIsDead
|
|
|
)
|
|
|
|
|
|
from schemas import (
|
|
|
GetRootResponse,
|
|
|
GetCommandsResponse,
|
|
|
FoundCommandInfo,
|
|
|
GetCommandParametersResponse,
|
|
|
CreateConfigResponse,
|
|
|
ModifyConfigRequest,
|
|
|
ModifyConfigResponse,
|
|
|
ResetConfigRequest,
|
|
|
ResetConfigResponse,
|
|
|
StopConfigResponse,
|
|
|
CreateExecutionResponse
|
|
|
)
|
|
|
|
|
|
from pprint import pprint
|
|
|
|
|
|
|
|
|
api = FastAPI(title="Calculate Server")
|
|
|
workers = WorkersManager(loop=asyncio.get_event_loop())
|
|
|
|
|
|
|
|
|
commands = Commands(command_0,
|
|
|
command_1,
|
|
|
command_2)
|
|
|
|
|
|
|
|
|
@api.on_event("startup")
|
|
|
async def startup_event():
|
|
|
check_pid_files()
|
|
|
|
|
|
|
|
|
@api.on_event("shutdown")
|
|
|
async def shutdown_event():
|
|
|
workers.clean()
|
|
|
|
|
|
|
|
|
@api.get("/", response_model=GetRootResponse, tags=["Home"])
|
|
|
async def get_primary_commands(request: Request):
|
|
|
resp = ResponseStructure(get_base_url(request))
|
|
|
resp.add_data(name="Calculate Server", version="0.1")
|
|
|
resp.add_link("commands", "/commands?gui={is_gui}", templated=True)
|
|
|
resp.add_link("find_command", "/commands/{console_command}?gui={is_gui}",
|
|
|
templated=True)
|
|
|
resp.add_link("workers", "/workers")
|
|
|
return validate_response(resp.get_dict(), GetRootResponse,
|
|
|
media_type="application/hal+json")
|
|
|
|
|
|
|
|
|
@api.get("/commands", response_model=GetCommandsResponse, tags=["Commands"])
|
|
|
async def get_available_commands(request: Request,
|
|
|
gui: Optional[bool] = False):
|
|
|
response_data = commands.get_commands(get_base_url(request))
|
|
|
return validate_response(response_data, GetCommandsResponse,
|
|
|
media_type="application/hal+json")
|
|
|
|
|
|
|
|
|
@api.get("/commands/{command}",
|
|
|
response_model=FoundCommandInfo,
|
|
|
tags=["Commands"])
|
|
|
async def find_command_data(command: str, request: Request,
|
|
|
gui: Optional[bool] = False,
|
|
|
by_id: Optional[bool] = False):
|
|
|
base_url = get_base_url(request)
|
|
|
|
|
|
if by_id:
|
|
|
command_data = commands.get_by_id(command, base_url)
|
|
|
if command_data is None:
|
|
|
raise get_command_not_found(command)
|
|
|
else:
|
|
|
command_data = commands.find_command(command, base_url)
|
|
|
if command_data is None:
|
|
|
raise get_cl_command_not_found(command)
|
|
|
|
|
|
return validate_response(command_data, FoundCommandInfo,
|
|
|
media_type="application/hal+json")
|
|
|
|
|
|
|
|
|
@api.get("/commands/{command_id}/parameters",
|
|
|
response_model=GetCommandParametersResponse,
|
|
|
tags=["Commands"])
|
|
|
async def get_command_parameters(command_id: str, request: Request):
|
|
|
parameters_data = commands.get_parameters(command_id,
|
|
|
get_base_url(request))
|
|
|
if parameters_data is None:
|
|
|
raise get_command_not_found(command_id)
|
|
|
return parameters_data
|
|
|
|
|
|
|
|
|
@api.post("/configs/{command_id}", response_model=CreateConfigResponse,
|
|
|
tags=["Configurations"])
|
|
|
async def create_configuration(command_id: str, request: Request):
|
|
|
command = commands.get_command_object(command_id)
|
|
|
if command is None:
|
|
|
raise get_command_not_found(command_id)
|
|
|
|
|
|
print("Making worker")
|
|
|
worker_id, error = await workers.make_worker(command)
|
|
|
if error is not None:
|
|
|
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
|
detail=error)
|
|
|
|
|
|
resp = ResponseStructure(get_base_url(request))
|
|
|
resp.add_link("configure", f"/configs/{worker_id}/parameters")
|
|
|
resp.add_link("execute", f"/execs/{worker_id}")
|
|
|
resp.add_link("cancel", f"/configs/{worker_id}")
|
|
|
|
|
|
return validate_response(resp.get_dict(), CreateConfigResponse)
|
|
|
|
|
|
|
|
|
@api.patch("/configs/{wid}/parameters", tags=["Configurations"])
|
|
|
async def modify_configuration_parameters(wid: int,
|
|
|
parameters: ModifyConfigRequest,
|
|
|
request: Request):
|
|
|
response = await change_config(get_base_url(request), wid, parameters)
|
|
|
return validate_response(response, ModifyConfigResponse)
|
|
|
|
|
|
|
|
|
@api.put("/configs/{wid}/parameters", tags=["Configurations"])
|
|
|
async def reset_configuration_parameters(wid: int,
|
|
|
parameters: ResetConfigRequest,
|
|
|
request: Request):
|
|
|
response = await change_config(get_base_url(request), wid, parameters,
|
|
|
reset=True)
|
|
|
return validate_response(response, ResetConfigResponse)
|
|
|
|
|
|
|
|
|
async def change_config(base_url: str, wid: int,
|
|
|
parameters: ModifyConfigRequest,
|
|
|
reset: bool = False) -> dict:
|
|
|
parameters = parameters.dict()["__root__"]
|
|
|
try:
|
|
|
result = await workers.configure_worker(wid, parameters, reset=reset)
|
|
|
except UnexpectedWorkerFinish as error:
|
|
|
return HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
|
detail=("Unexpected configuration finish. Reason:"
|
|
|
f" {error.reason[0]}"))
|
|
|
except UnexpectedWorkerState as error:
|
|
|
print("Unexpected worker state.")
|
|
|
return HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
|
detail=('Unexpected configuration state:'
|
|
|
f' "{error.state}". Expected:'
|
|
|
f' "{error.expected}"'))
|
|
|
except DaemonIsDead:
|
|
|
print("DAEMON IS DEAD")
|
|
|
return HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
|
detail="Daemon is dead")
|
|
|
print("RESULT:")
|
|
|
pprint(result)
|
|
|
if result is None:
|
|
|
raise get_configuration_not_found(wid)
|
|
|
|
|
|
resp = ResponseStructure(base_url)
|
|
|
resp.add_link("configure", f"configs/{wid}/parameters")
|
|
|
resp.add_link("reset", f"configs/{wid}/parameters")
|
|
|
resp.add_link("cancel", f"configs/{wid}")
|
|
|
|
|
|
if not result:
|
|
|
resp.add_link("execute", f"/execs/{wid}")
|
|
|
else:
|
|
|
resp.add_data(result)
|
|
|
|
|
|
return resp.get_dict()
|
|
|
|
|
|
|
|
|
@api.delete("/configs/{wid}", response_model=StopConfigResponse,
|
|
|
tags=["Configurations"])
|
|
|
async def stop_configuration(wid: int):
|
|
|
output = await workers.cancel_worker_configuration(wid)
|
|
|
if output is None:
|
|
|
raise get_configuration_not_found(wid)
|
|
|
return []
|
|
|
|
|
|
|
|
|
@api.post("/execs/{wid}", tags=["Executions"])
|
|
|
async def start_command_execution(wid: int, request: Request):
|
|
|
result = workers.run_execution(wid)
|
|
|
if result > 0:
|
|
|
raise get_configuration_not_found(wid)
|
|
|
elif result < 0:
|
|
|
raise HTTPException(status_code=status.HTTP_409_CONFLICT,
|
|
|
detail=f"Execution id = {wid} already exists.")
|
|
|
|
|
|
resp = ResponseStructure(get_base_url(request))
|
|
|
resp.add_link("configure", f"/configs/{wid}/parameters")
|
|
|
resp.add_link("reset", f"/configs/{wid}/parameters")
|
|
|
resp.add_link("cancel", f"/configs/{wid}")
|
|
|
|
|
|
return validate_response(resp.get_dict(), CreateExecutionResponse)
|
|
|
|
|
|
|
|
|
@api.delete("/execs/{wid}", tags=["Executions"])
|
|
|
async def stop_command_execution(wid: int):
|
|
|
raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED,
|
|
|
detail="'stop execution' is not implemented")
|
|
|
|
|
|
|
|
|
@api.get("/execs/{wid}/output", tags=["Executions"])
|
|
|
async def read_execution_output(wid: int):
|
|
|
raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED,
|
|
|
detail="'stop execution' is not implemented")
|
|
|
|
|
|
|
|
|
@api.patch("/execs/{wid}/input", tags=["Executions"])
|
|
|
async def write_to_execution(wid: int):
|
|
|
raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED,
|
|
|
detail="'write to execution' is not implemented")
|
|
|
|
|
|
|
|
|
def check_pid_files(pid_files_dir: str = "./") -> None:
|
|
|
pass
|
|
|
|
|
|
|
|
|
def get_base_url(request: Request):
|
|
|
base_url = f"{request.base_url.scheme}://{request.base_url.netloc}"
|
|
|
return base_url
|
|
|
|
|
|
|
|
|
def get_command_not_found(command_id: str) -> HTTPException:
|
|
|
return HTTPException(status_code=status.HTTP_404_NOT_FOUND,
|
|
|
detail=(f'Command with id "{command_id}"'
|
|
|
' is not found.'))
|
|
|
|
|
|
|
|
|
def get_cl_command_not_found(console_command: str) -> HTTPException:
|
|
|
return HTTPException(status_code=status.HTTP_404_NOT_FOUND,
|
|
|
detail=(f'Console command "{console_command}" is not'
|
|
|
' found.'))
|
|
|
|
|
|
|
|
|
def get_configuration_not_found(wid: int) -> HTTPException:
|
|
|
return HTTPException(status_code=status.HTTP_404_NOT_FOUND,
|
|
|
detail=f"Configuration with id={wid} is not found.")
|
|
|
|
|
|
|
|
|
def validate_response(data: Union[dict, list],
|
|
|
schema: Union[ModelMetaclass, _GenericAlias, type],
|
|
|
media_type: Optional[str] = None,
|
|
|
status_code: int = 200
|
|
|
) -> JSONResponse:
|
|
|
"""Функция для валидации данных ответа сервера по указанной схеме с учетом
|
|
|
наличия псевдонимов полей и возможности того, что схема задана обычным
|
|
|
типом, типом из typing или BaseModel с __root__. Возвращает объект ответа.
|
|
|
"""
|
|
|
if isinstance(schema, ModelMetaclass):
|
|
|
if "__root__" in schema.__annotations__:
|
|
|
validated = schema(__root__=data)
|
|
|
else:
|
|
|
validated = schema(**data)
|
|
|
elif isinstance(schema, (_GenericAlias, type)):
|
|
|
validated = parse_obj_as(schema, data)
|
|
|
|
|
|
return JSONResponse(content=jsonable_encoder(validated, by_alias=True),
|
|
|
media_type=media_type, status_code=status_code)
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
uvicorn.run("server:api", host="127.0.0.1", port=2007, reload=True)
|