50 lines
1.9 KiB
Python
50 lines
1.9 KiB
Python
"""Вспомогательные зависимости FastAPI."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import OAuth2PasswordBearer
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.security import decode_access_token
|
|
from app.db.session import get_db
|
|
from app.models.user import User, UserRole
|
|
|
|
|
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
|
|
|
|
|
|
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)) -> User:
|
|
"""Находим пользователя по токену."""
|
|
|
|
try:
|
|
payload = decode_access_token(token)
|
|
except ValueError as exc:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=str(exc)) from exc
|
|
|
|
email: str | None = payload.get("sub")
|
|
if not email:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Неизвестный токен")
|
|
|
|
user = db.query(User).filter(User.email == email).first()
|
|
if not user:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Пользователь не найден")
|
|
|
|
return user
|
|
|
|
|
|
async def require_hr(current_user: User = Depends(get_current_user)) -> User:
|
|
"""Проверяем, что пользователь HR или администратор."""
|
|
|
|
if current_user.role not in {UserRole.HR, UserRole.ADMIN}:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Недостаточно прав")
|
|
return current_user
|
|
|
|
|
|
async def require_admin(current_user: User = Depends(get_current_user)) -> User:
|
|
"""Проверяем права администратора."""
|
|
|
|
if current_user.role != UserRole.ADMIN:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Только для администраторов")
|
|
return current_user
|