forked from seb_vallee/BillManager
74 lines
2.8 KiB
Python
74 lines
2.8 KiB
Python
import bcrypt
|
|
from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired
|
|
from fastapi import Request, HTTPException, Depends
|
|
from fastapi.responses import RedirectResponse
|
|
from sqlalchemy.orm import Session
|
|
|
|
from config import settings
|
|
from database import get_db
|
|
|
|
|
|
# ── Serializer de session ──────────────────────────────────────────────────────
|
|
|
|
_serializer = URLSafeTimedSerializer(settings.secret_key)
|
|
|
|
COOKIE_NAME = "session"
|
|
|
|
|
|
def creer_session(user_id: int) -> str:
|
|
return _serializer.dumps({"user_id": user_id})
|
|
|
|
|
|
def lire_session(token: str) -> int | None:
|
|
try:
|
|
data = _serializer.loads(token, max_age=settings.session_max_age)
|
|
return data["user_id"]
|
|
except (BadSignature, SignatureExpired, KeyError):
|
|
return None
|
|
|
|
|
|
# ── Mots de passe ──────────────────────────────────────────────────────────────
|
|
|
|
def hasher_mot_de_passe(mdp: str) -> str:
|
|
return bcrypt.hashpw(mdp.encode(), bcrypt.gensalt()).decode()
|
|
|
|
|
|
def verifier_mot_de_passe(mdp: str, hash_: str) -> bool:
|
|
return bcrypt.checkpw(mdp.encode(), hash_.encode())
|
|
|
|
|
|
# ── Dépendances FastAPI ────────────────────────────────────────────────────────
|
|
|
|
def get_current_user(request: Request, db: Session = Depends(get_db)):
|
|
"""Retourne l'utilisateur connecté ou lève une redirection vers /login."""
|
|
from models import User
|
|
token = request.cookies.get(COOKIE_NAME)
|
|
if not token:
|
|
raise HTTPException(status_code=302, headers={"Location": "/login"})
|
|
user_id = lire_session(token)
|
|
if not user_id:
|
|
raise HTTPException(status_code=302, headers={"Location": "/login"})
|
|
user = db.query(User).filter(User.id == user_id, User.actif == True).first()
|
|
if not user:
|
|
raise HTTPException(status_code=302, headers={"Location": "/login"})
|
|
return user
|
|
|
|
|
|
def get_current_admin(user=Depends(get_current_user)):
|
|
"""Retourne l'utilisateur uniquement s'il est admin."""
|
|
if not user.is_admin:
|
|
raise HTTPException(status_code=403, detail="Accès réservé aux administrateurs.")
|
|
return user
|
|
|
|
|
|
def redirect_if_not_logged(request: Request, db: Session):
|
|
"""Variante utilisable hors Depends pour les routes avec gestion manuelle."""
|
|
from models import User
|
|
token = request.cookies.get(COOKIE_NAME)
|
|
if not token:
|
|
return None
|
|
user_id = lire_session(token)
|
|
if not user_id:
|
|
return None
|
|
return db.query(User).filter(User.id == user_id, User.actif == True).first()
|