from hashlib import pbkdf2_hmac from random import choices from string import ascii_letters from datetime import datetime, timedelta from typing import Optional, Union from jose import jwt from ..schemas.tokens import TokenData from ..schemas.users import UserData from .users import get_user_by_username from calculate.utils.files import Process def make_secret_key() -> str: openssl_process = Process("/usr/bin/openssl", "rand", "-hex", "32") secret_key = openssl_process.read() return secret_key.strip() # SECRET_KEY =\ # "efe90242c1c221b20fc718edc3aa5da4f78147eb2f4e81e809e945bbcdf0710e" SECRET_KEY = make_secret_key() ALGORITHM = "HS256" async def auth_user(username: str, password: str) -> Union[UserData, bool]: user_row = await get_user_by_username(username) if not user_row: return False user = UserData(**user_row) if not validate_password(password, user.password): return False return user def decode_jwt(token: str) -> Union[TokenData, None]: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") expire = payload.get("exp") if username is None: return None return TokenData(username=username, expire=expire) def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def get_salt(length=12): """Метод для получения случайной строки символов используемой как соль при кэшировании.""" return "".join(choices(ascii_letters, k=length)) def validate_password(password: str, hashed_password: str) -> bool: salt, db_hash = hashed_password.split("$") hashed = hash_password(password, salt) return hashed == db_hash def hash_password(password: str, salt: str) -> str: if salt is None: salt = get_salt() enc = pbkdf2_hmac("sha256", password.encode(), salt.encode(), 100_000) return enc.hex()