from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from sqlalchemy import func from typing import List import os from pathlib import Path from config.database import get_db from config.settings import settings from models.user import User from models.album import Album, Media from models.vlog import Vlog from models.event import Event from models.post import Post from utils.security import get_admin_user router = APIRouter() def get_directory_size(path): """Calculate total size of a directory.""" total = 0 try: for entry in os.scandir(path): if entry.is_file(): total += entry.stat().st_size elif entry.is_dir(): total += get_directory_size(entry.path) except (OSError, PermissionError): pass return total def format_bytes(bytes): """Format bytes to human readable string.""" for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if bytes < 1024.0: return f"{bytes:.2f} {unit}" bytes /= 1024.0 return f"{bytes:.2f} PB" @router.get("/dashboard") async def get_admin_dashboard( db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Get admin dashboard information.""" # User statistics total_users = db.query(User).count() active_users = db.query(User).filter(User.is_active == True).count() admin_users = db.query(User).filter(User.is_admin == True).count() # Content statistics total_events = db.query(Event).count() total_posts = db.query(Post).count() total_vlogs = db.query(Vlog).count() total_media = db.query(Media).count() # Storage statistics upload_path = Path(settings.UPLOAD_PATH) total_storage = 0 storage_breakdown = {} if upload_path.exists(): # Calculate storage by category categories = ['avatars', 'albums', 'vlogs', 'posts'] for category in categories: category_path = upload_path / category if category_path.exists(): size = get_directory_size(category_path) storage_breakdown[category] = { "bytes": size, "formatted": format_bytes(size) } total_storage += size # Database storage db_size = db.query(func.sum(Media.file_size)).scalar() or 0 return { "users": { "total": total_users, "active": active_users, "admins": admin_users }, "content": { "events": total_events, "posts": total_posts, "vlogs": total_vlogs, "media_files": total_media }, "storage": { "total_bytes": total_storage, "total_formatted": format_bytes(total_storage), "breakdown": storage_breakdown, "database_tracked": format_bytes(db_size) } } @router.get("/users") async def get_all_users_admin( db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Get all users with admin details.""" users = db.query(User).all() user_list = [] for user in users: # Calculate user storage user_media = db.query(func.sum(Media.file_size)).join(Album).filter( Album.creator_id == user.id ).scalar() or 0 user_vlogs = db.query(func.sum(Vlog.id)).filter( Vlog.author_id == user.id ).scalar() or 0 user_list.append({ "id": user.id, "email": user.email, "username": user.username, "full_name": user.full_name, "is_active": user.is_active, "is_admin": user.is_admin, "created_at": user.created_at, "attendance_rate": user.attendance_rate, "storage_used": format_bytes(user_media), "content_count": { "posts": db.query(Post).filter(Post.author_id == user.id).count(), "vlogs": user_vlogs, "albums": db.query(Album).filter(Album.creator_id == user.id).count() } }) return user_list @router.put("/users/{user_id}/toggle-active") async def toggle_user_active( user_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Toggle user active status.""" if user_id == current_user.id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot deactivate your own account" ) user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) user.is_active = not user.is_active db.commit() return { "message": f"User {'activated' if user.is_active else 'deactivated'} successfully", "is_active": user.is_active } @router.put("/users/{user_id}/toggle-admin") async def toggle_user_admin( user_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Toggle user admin status.""" if user_id == current_user.id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot change your own admin status" ) user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) user.is_admin = not user.is_admin db.commit() return { "message": f"User admin status {'granted' if user.is_admin else 'revoked'} successfully", "is_admin": user.is_admin } @router.delete("/users/{user_id}") async def delete_user( user_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Delete a user and all their content.""" if user_id == current_user.id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot delete your own account" ) user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Delete user's files user_dirs = [ Path(settings.UPLOAD_PATH) / "avatars" / str(user_id), Path(settings.UPLOAD_PATH) / "vlogs" / str(user_id) ] for dir_path in user_dirs: if dir_path.exists(): import shutil shutil.rmtree(dir_path) # Delete user (cascade will handle related records) db.delete(user) db.commit() return {"message": "User deleted successfully"} @router.get("/storage/cleanup") async def cleanup_storage( db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Find and optionally clean up orphaned files.""" upload_path = Path(settings.UPLOAD_PATH) orphaned_files = [] total_orphaned_size = 0 try: # Check for orphaned media files if (upload_path / "albums").exists(): for album_dir in (upload_path / "albums").iterdir(): if album_dir.is_dir(): try: album_id = int(album_dir.name) album = db.query(Album).filter(Album.id == album_id).first() if not album: # Entire album directory is orphaned size = get_directory_size(album_dir) orphaned_files.append({ "path": str(album_dir), "type": "album_directory", "size": format_bytes(size) }) total_orphaned_size += size else: # Check individual files for file_path in album_dir.glob("*"): if file_path.is_file(): relative_path = f"/albums/{album_id}/{file_path.name}" media = db.query(Media).filter( (Media.file_path == relative_path) | (Media.thumbnail_path == relative_path) ).first() if not media: size = file_path.stat().st_size orphaned_files.append({ "path": str(file_path), "type": "media_file", "size": format_bytes(size) }) total_orphaned_size += size except (ValueError, OSError) as e: print(f"Error processing album directory {album_dir}: {e}") continue # Check for orphaned avatar files if (upload_path / "avatars").exists(): for user_dir in (upload_path / "avatars").iterdir(): if user_dir.is_dir(): try: user_id = int(user_dir.name) user = db.query(User).filter(User.id == user_id).first() if not user: size = get_directory_size(user_dir) orphaned_files.append({ "path": str(user_dir), "type": "user_avatar_directory", "size": format_bytes(size) }) total_orphaned_size += size except (ValueError, OSError) as e: print(f"Error processing avatar directory {user_dir}: {e}") continue # Check for orphaned vlog files if (upload_path / "vlogs").exists(): for user_dir in (upload_path / "vlogs").iterdir(): if user_dir.is_dir(): try: user_id = int(user_dir.name) user = db.query(User).filter(User.id == user_id).first() if not user: size = get_directory_size(user_dir) orphaned_files.append({ "path": str(user_dir), "type": "user_vlog_directory", "size": format_bytes(size) }) total_orphaned_size += size except (ValueError, OSError) as e: print(f"Error processing vlog directory {user_dir}: {e}") continue return { "orphaned_files": orphaned_files, "total_orphaned": len(orphaned_files), "total_orphaned_size": format_bytes(total_orphaned_size), "message": f"Trouvé {len(orphaned_files)} fichier(s) orphelin(s) pour un total de {format_bytes(total_orphaned_size)}. Utilisez DELETE pour les supprimer." } except Exception as e: print(f"Error during cleanup scan: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error during cleanup scan: {str(e)}" ) @router.delete("/storage/cleanup") async def delete_orphaned_files( db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Delete orphaned files.""" upload_path = Path(settings.UPLOAD_PATH) deleted_files = [] total_freed_space = 0 try: # Check for orphaned media files if (upload_path / "albums").exists(): for album_dir in (upload_path / "albums").iterdir(): if album_dir.is_dir(): try: album_id = int(album_dir.name) album = db.query(Album).filter(Album.id == album_id).first() if not album: # Entire album directory is orphaned size = get_directory_size(album_dir) import shutil shutil.rmtree(album_dir) deleted_files.append({ "path": str(album_dir), "type": "album_directory", "size": format_bytes(size) }) total_freed_space += size else: # Check individual files for file_path in album_dir.glob("*"): if file_path.is_file(): relative_path = f"/albums/{album_id}/{file_path.name}" media = db.query(Media).filter( (Media.file_path == relative_path) | (Media.thumbnail_path == relative_path) ).first() if not media: size = file_path.stat().st_size file_path.unlink() # Delete the file deleted_files.append({ "path": str(file_path), "type": "media_file", "size": format_bytes(size) }) total_freed_space += size except (ValueError, OSError) as e: print(f"Error processing album directory {album_dir}: {e}") continue # Check for orphaned avatar files if (upload_path / "avatars").exists(): for user_dir in (upload_path / "avatars").iterdir(): if user_dir.is_dir(): try: user_id = int(user_dir.name) user = db.query(User).filter(User.id == user_id).first() if not user: # User directory is orphaned size = get_directory_size(user_dir) import shutil shutil.rmtree(user_dir) deleted_files.append({ "path": str(user_dir), "type": "user_avatar_directory", "size": format_bytes(size) }) total_freed_space += size except (ValueError, OSError) as e: print(f"Error processing avatar directory {user_dir}: {e}") continue # Check for orphaned vlog files if (upload_path / "vlogs").exists(): for user_dir in (upload_path / "vlogs").iterdir(): if user_dir.is_dir(): try: user_id = int(user_dir.name) user = db.query(User).filter(User.id == user_id).first() if not user: # User directory is orphaned size = get_directory_size(user_dir) import shutil shutil.rmtree(user_dir) deleted_files.append({ "path": str(user_dir), "type": "user_vlog_directory", "size": format_bytes(size) }) total_freed_space += size except (ValueError, OSError) as e: print(f"Error processing vlog directory {user_dir}: {e}") continue return { "message": "Cleanup completed successfully", "deleted_files": deleted_files, "total_deleted": len(deleted_files), "total_freed_space": format_bytes(total_freed_space), "details": f"Supprimé {len(deleted_files)} fichier(s) pour libérer {format_bytes(total_freed_space)}" } except Exception as e: print(f"Error during cleanup: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error during cleanup: {str(e)}" )