Files
LeDiscord/backend/utils/security.py
EvanChal 7ca168c34c
All checks were successful
Deploy to Development / build-and-deploy (push) Successful in 22s
Deploy to Production / build-and-deploy (push) Successful in 4s
fix
2026-01-28 22:31:17 +01:00

102 lines
3.9 KiB
Python

from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from config.settings import settings
from config.database import get_db
from models.user import User
from schemas.user import TokenData
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2 scheme avec auto_error=False pour pouvoir logger les erreurs
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login", auto_error=False)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a plain password against a hashed password."""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Generate a password hash."""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create a JWT access token."""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.JWT_EXPIRATION_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
return encoded_jwt
def verify_token(token: str, credentials_exception) -> TokenData:
"""Verify a JWT token and return the token data."""
try:
payload = jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM])
user_id: int = payload.get("sub")
email: str = payload.get("email")
if user_id is None:
raise credentials_exception
token_data = TokenData(user_id=user_id, email=email)
return token_data
except JWTError:
raise credentials_exception
async def get_current_user(
token: Optional[str] = Depends(oauth2_scheme),
db: Session = Depends(get_db)
) -> User:
"""Get the current authenticated user."""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Log si pas de token
if token is None:
print("❌ AUTH: No token provided in Authorization header")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated - no token provided",
headers={"WWW-Authenticate": "Bearer"},
)
# Log le token (premiers caractères seulement pour la sécurité)
print(f"🔐 AUTH: Token received, length={len(token)}, preview={token[:20]}...")
try:
token_data = verify_token(token, credentials_exception)
print(f"✅ AUTH: Token valid for user_id={token_data.user_id}")
except HTTPException as e:
print(f"❌ AUTH: Token validation failed - {e.detail}")
raise
user = db.query(User).filter(User.id == token_data.user_id).first()
if user is None:
print(f"❌ AUTH: User not found for id={token_data.user_id}")
raise credentials_exception
print(f"✅ AUTH: User authenticated: {user.username}")
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)) -> User:
"""Get the current active user."""
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
async def get_admin_user(current_user: User = Depends(get_current_active_user)) -> User:
"""Get the current admin user."""
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user