Init project
This commit is contained in:
73
auth.py
Normal file
73
auth.py
Normal file
@@ -0,0 +1,73 @@
|
||||
import bcrypt
|
||||
from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired
|
||||
from fastapi import Request, HTTPException, Depends
|
||||
from fastapi.responses import RedirectResponse
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from config import settings
|
||||
from database import get_db
|
||||
|
||||
|
||||
# ── Serializer de session ──────────────────────────────────────────────────────
|
||||
|
||||
_serializer = URLSafeTimedSerializer(settings.secret_key)
|
||||
|
||||
COOKIE_NAME = "session"
|
||||
|
||||
|
||||
def creer_session(user_id: int) -> str:
|
||||
return _serializer.dumps({"user_id": user_id})
|
||||
|
||||
|
||||
def lire_session(token: str) -> int | None:
|
||||
try:
|
||||
data = _serializer.loads(token, max_age=settings.session_max_age)
|
||||
return data["user_id"]
|
||||
except (BadSignature, SignatureExpired, KeyError):
|
||||
return None
|
||||
|
||||
|
||||
# ── Mots de passe ──────────────────────────────────────────────────────────────
|
||||
|
||||
def hasher_mot_de_passe(mdp: str) -> str:
|
||||
return bcrypt.hashpw(mdp.encode(), bcrypt.gensalt()).decode()
|
||||
|
||||
|
||||
def verifier_mot_de_passe(mdp: str, hash_: str) -> bool:
|
||||
return bcrypt.checkpw(mdp.encode(), hash_.encode())
|
||||
|
||||
|
||||
# ── Dépendances FastAPI ────────────────────────────────────────────────────────
|
||||
|
||||
def get_current_user(request: Request, db: Session = Depends(get_db)):
|
||||
"""Retourne l'utilisateur connecté ou lève une redirection vers /login."""
|
||||
from models import User
|
||||
token = request.cookies.get(COOKIE_NAME)
|
||||
if not token:
|
||||
raise HTTPException(status_code=302, headers={"Location": "/login"})
|
||||
user_id = lire_session(token)
|
||||
if not user_id:
|
||||
raise HTTPException(status_code=302, headers={"Location": "/login"})
|
||||
user = db.query(User).filter(User.id == user_id, User.actif == True).first()
|
||||
if not user:
|
||||
raise HTTPException(status_code=302, headers={"Location": "/login"})
|
||||
return user
|
||||
|
||||
|
||||
def get_current_admin(user=Depends(get_current_user)):
|
||||
"""Retourne l'utilisateur uniquement s'il est admin."""
|
||||
if not user.is_admin:
|
||||
raise HTTPException(status_code=403, detail="Accès réservé aux administrateurs.")
|
||||
return user
|
||||
|
||||
|
||||
def redirect_if_not_logged(request: Request, db: Session):
|
||||
"""Variante utilisable hors Depends pour les routes avec gestion manuelle."""
|
||||
from models import User
|
||||
token = request.cookies.get(COOKIE_NAME)
|
||||
if not token:
|
||||
return None
|
||||
user_id = lire_session(token)
|
||||
if not user_id:
|
||||
return None
|
||||
return db.query(User).filter(User.id == user_id, User.actif == True).first()
|
||||
Reference in New Issue
Block a user