from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from passlib.context import CryptContext from fastapi import Depends, HTTPException, status, Request from fastapi.security import OAuth2PasswordBearer from sqlalchemy.orm import Session from config.settings import settings from config.database import get_db from models.user import User from schemas.user import TokenData pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # OAuth2 scheme avec auto_error=False pour pouvoir logger les erreurs oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login", auto_error=False) def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a plain password against a hashed password.""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """Generate a password hash.""" return pwd_context.hash(password) def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: """Create a JWT access token.""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=settings.JWT_EXPIRATION_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM) return encoded_jwt def verify_token(token: str, credentials_exception) -> TokenData: """Verify a JWT token and return the token data.""" try: payload = jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM]) user_id: int = payload.get("sub") email: str = payload.get("email") if user_id is None: raise credentials_exception token_data = TokenData(user_id=user_id, email=email) return token_data except JWTError: raise credentials_exception async def get_current_user( token: Optional[str] = Depends(oauth2_scheme), db: Session = Depends(get_db) ) -> User: """Get the current authenticated user.""" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) # Log si pas de token if token is None: print("❌ AUTH: No token provided in Authorization header") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated - no token provided", headers={"WWW-Authenticate": "Bearer"}, ) # Log le token (premiers caractères seulement pour la sécurité) print(f"🔐 AUTH: Token received, length={len(token)}, preview={token[:20]}...") try: token_data = verify_token(token, credentials_exception) print(f"✅ AUTH: Token valid for user_id={token_data.user_id}") except HTTPException as e: print(f"❌ AUTH: Token validation failed - {e.detail}") raise user = db.query(User).filter(User.id == token_data.user_id).first() if user is None: print(f"❌ AUTH: User not found for id={token_data.user_id}") raise credentials_exception print(f"✅ AUTH: User authenticated: {user.username}") return user async def get_current_active_user(current_user: User = Depends(get_current_user)) -> User: """Get the current active user.""" if not current_user.is_active: raise HTTPException(status_code=400, detail="Inactive user") return current_user async def get_admin_user(current_user: User = Depends(get_current_active_user)) -> User: """Get the current admin user.""" if not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions" ) return current_user