You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

200 lines
7.7 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from fastapi import status, HTTPException
from fastapi.encoders import jsonable_encoder
from starlette.responses import JSONResponse
from starlette.requests import Request
from pydantic.main import ModelMetaclass
from pydantic import parse_obj_as
from typing import Dict, Union, List, _GenericAlias, Optional
from commands import Command
# from pprint import pprint
class ResponseStructure:
'''Класс структуры данных, для получения HATEAOS ответа на запросы
клиента.'''
def __init__(self, base_url: str):
self._base_url: str = base_url.strip("/")
self._data: dict = dict()
self._links: Dict[str, Dict[str, str]] = dict()
self._embedded: Dict[str, dict] = dict()
def get_dict(self) -> dict:
dictionary = dict()
if self._data:
dictionary["data"] = self._data
if self._links:
dictionary["_links"] = self._links
if self._embedded:
dictionary["_embedded"] = self._embedded
return dictionary
def add_data(self, *args, **kwargs) -> "ResponseStructure":
if args:
self._data = args[0]
if kwargs:
self._data.update(kwargs)
return self
def embed(self, resource: str,
content: Union[dict, list]) -> "ResponseStructure":
self._embedded[resource] = content
return self
def add_link(self, action: str, uri: str,
templated: bool = False) -> "ResponseStructure":
uri = self._get_uri(uri)
if action in self._links:
link_section = self._links[action]
if link_section["href"] != uri:
link_section["href"] = uri
else:
link_section = {"href": uri}
self._links[action] = link_section
if templated:
print("TEMPLATED:", action)
link_section["templated"] = templated
return self
def _get_uri(self, uri: str) -> str:
if uri.startswith(self._base_url):
return uri
return f"{self._base_url}/{uri.strip('/')}"
class Commands:
'''Предварительная реализация контейнера описаний команд.'''
def __init__(self, *commands: List[Command]):
self._by_id: dict = dict()
self._by_command: dict = dict()
for command in commands:
self._by_id[command.id] = command
self._by_command[command.command] = command
def get_commands(self, base_url: str) -> Dict[str, dict]:
response = dict()
for command in self._by_id.values():
data = ResponseStructure(base_url)
data.add_data(id=command.id,
title=command.title,
category=command.category,
command=command.command)
data.add_link("self", f"/commands/{command.id}?by_id=true")
data.add_link("parameters", f"/commands/{command.id}/parameters")
data.add_link("configure", f"/configs/{command.id}")
response[command.id] = data.get_dict()
return response
def get_by_id(self, command_id: str, base_url: str) -> Dict[str, Dict]:
if command_id in self._by_id:
command = self._by_id[command_id]
data = ResponseStructure(base_url)
data.add_data(id=command.id,
title=command.title,
category=command.category,
command=command.command)
data.add_link("self", f"/commands/{command.id}?by_id=1")
data.add_link("parameters", f"/commands/{command.id}/parameters")
data.add_link("configure", f"/configs/{command.id}")
parameters_data = self._get_parameters_data(command.parameters,
command.id, base_url)
data.embed("parameters", parameters_data)
return data.get_dict()
return None
def find_command(self, console_command: str,
base_url: str) -> Union[dict, None]:
if console_command in self._by_command:
command = self._by_command[console_command]
data = ResponseStructure(base_url)
data.add_data(id=command.id,
title=command.title,
category=command.category,
command=command.command)
data.add_link("self", f"/commands/{command.id}?by_id=1")
data.add_link("parameters", f"/commands/{command.id}/parameters")
data.add_link("configure", f"/configs/{command.id}")
parameters_data = self._get_parameters_data(command.parameters,
command.id, base_url)
data.embed("parameters", parameters_data)
return data.get_dict()
else:
return None
def get_parameters(self, command_id: str, base_url: str
) -> Union[dict, None]:
if command_id in self._by_id:
command = self._by_id[command_id]
parameters_data = self._get_parameters_data(command.parameters,
command.id, base_url)
return parameters_data
else:
return None
def get_command_object(self, command_id: str) -> Command:
if command_id in self._by_id:
return self._by_id[command_id]
return None
def _get_parameters_data(self, parameters: list, command_id: str,
base_url: str) -> List[dict]:
data = ResponseStructure(base_url)
data.add_data(parameters)
data.add_link("self", f"/commands/{command_id}/parameters")
return data.get_dict()
def get_base_url(request: Request):
base_url = f"{request.base_url.scheme}://{request.base_url.netloc}"
return base_url
def get_command_not_found(command_id: str) -> HTTPException:
return HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail=(f'Command with id "{command_id}"'
' is not found.'))
def get_cl_command_not_found(console_command: str) -> HTTPException:
return HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail=(f'Console command "{console_command}" is not'
' found.'))
def get_configuration_not_found(wid: int) -> HTTPException:
return HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail=f"Configuration with id={wid} is not found.")
def validate_response(data: Union[dict, list],
schema: Union[ModelMetaclass, _GenericAlias, type],
media_type: Optional[str] = None,
status_code: int = 200
) -> JSONResponse:
"""Функция для валидации данных ответа сервера по указанной схеме с учетом
наличия псевдонимов полей и возможности того, что схема задана обычным
типом, типом из typing или BaseModel с __root__. Возвращает объект ответа.
"""
if isinstance(schema, ModelMetaclass):
if "__root__" in schema.__annotations__:
validated = schema(__root__=data)
else:
validated = schema(**data)
elif isinstance(schema, (_GenericAlias, type)):
validated = parse_obj_as(schema, data)
return JSONResponse(content=jsonable_encoder(validated, by_alias=True),
media_type=media_type, status_code=status_code)