from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordRequestForm from sqlalchemy.orm import Session from datetime import timedelta from config.database import get_db from config.settings import settings from models.user import User from schemas.user import UserCreate, UserResponse, Token from utils.security import verify_password, get_password_hash, create_access_token router = APIRouter() @router.post("/register", response_model=Token) async def register(user_data: UserCreate, db: Session = Depends(get_db)): """Register a new user.""" # Check if registration is enabled from utils.settings_service import SettingsService if not SettingsService.get_setting(db, "enable_registration", True): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Registration is currently disabled" ) # Check if max users limit is reached max_users = SettingsService.get_setting(db, "max_users", 50) current_users_count = db.query(User).count() if current_users_count >= max_users: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Maximum number of users reached" ) # Check if email already exists if db.query(User).filter(User.email == user_data.email).first(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered" ) # Check if username already exists if db.query(User).filter(User.username == user_data.username).first(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Username already taken" ) # Create new user user = User( email=user_data.email, username=user_data.username, full_name=user_data.full_name, hashed_password=get_password_hash(user_data.password) ) db.add(user) db.commit() db.refresh(user) # Create access token access_token = create_access_token( data={"sub": str(user.id), "email": user.email} ) return { "access_token": access_token, "token_type": "bearer", "user": user } @router.post("/login", response_model=Token) async def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): """Login with email and password.""" # Find user by email (username field is used for email in OAuth2PasswordRequestForm) user = db.query(User).filter(User.email == form_data.username).first() if not user or not verify_password(form_data.password, user.hashed_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password", headers={"WWW-Authenticate": "Bearer"}, ) if not user.is_active: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user" ) # Create access token access_token = create_access_token( data={"sub": str(user.id), "email": user.email} ) return { "access_token": access_token, "token_type": "bearer", "user": user }