Files
LeDiscord/backend/api/routers/admin.py
2025-08-27 18:34:38 +02:00

445 lines
17 KiB
Python
Executable File

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from sqlalchemy import func
from typing import List
import os
from pathlib import Path
from config.database import get_db
from config.settings import settings
from models.user import User
from models.album import Album, Media
from models.vlog import Vlog
from models.event import Event
from models.post import Post
from utils.security import get_admin_user
router = APIRouter()
def get_directory_size(path):
"""Calculate total size of a directory."""
total = 0
try:
for entry in os.scandir(path):
if entry.is_file():
total += entry.stat().st_size
elif entry.is_dir():
total += get_directory_size(entry.path)
except (OSError, PermissionError):
pass
return total
def format_bytes(bytes):
"""Format bytes to human readable string."""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes < 1024.0:
return f"{bytes:.2f} {unit}"
bytes /= 1024.0
return f"{bytes:.2f} PB"
@router.get("/dashboard")
async def get_admin_dashboard(
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Get admin dashboard information."""
# User statistics
total_users = db.query(User).count()
active_users = db.query(User).filter(User.is_active == True).count()
admin_users = db.query(User).filter(User.is_admin == True).count()
# Content statistics
total_events = db.query(Event).count()
total_posts = db.query(Post).count()
total_vlogs = db.query(Vlog).count()
total_media = db.query(Media).count()
# Storage statistics
upload_path = Path(settings.UPLOAD_PATH)
total_storage = 0
storage_breakdown = {}
if upload_path.exists():
# Calculate storage by category
categories = ['avatars', 'albums', 'vlogs', 'posts']
for category in categories:
category_path = upload_path / category
if category_path.exists():
size = get_directory_size(category_path)
storage_breakdown[category] = {
"bytes": size,
"formatted": format_bytes(size)
}
total_storage += size
# Database storage
db_size = db.query(func.sum(Media.file_size)).scalar() or 0
return {
"users": {
"total": total_users,
"active": active_users,
"admins": admin_users
},
"content": {
"events": total_events,
"posts": total_posts,
"vlogs": total_vlogs,
"media_files": total_media
},
"storage": {
"total_bytes": total_storage,
"total_formatted": format_bytes(total_storage),
"breakdown": storage_breakdown,
"database_tracked": format_bytes(db_size)
}
}
@router.get("/users")
async def get_all_users_admin(
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Get all users with admin details."""
users = db.query(User).all()
user_list = []
for user in users:
# Calculate user storage
user_media = db.query(func.sum(Media.file_size)).join(Album).filter(
Album.creator_id == user.id
).scalar() or 0
user_vlogs = db.query(func.sum(Vlog.id)).filter(
Vlog.author_id == user.id
).scalar() or 0
user_list.append({
"id": user.id,
"email": user.email,
"username": user.username,
"full_name": user.full_name,
"is_active": user.is_active,
"is_admin": user.is_admin,
"created_at": user.created_at,
"attendance_rate": user.attendance_rate,
"storage_used": format_bytes(user_media),
"content_count": {
"posts": db.query(Post).filter(Post.author_id == user.id).count(),
"vlogs": user_vlogs,
"albums": db.query(Album).filter(Album.creator_id == user.id).count()
}
})
return user_list
@router.put("/users/{user_id}/toggle-active")
async def toggle_user_active(
user_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Toggle user active status."""
if user_id == current_user.id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot deactivate your own account"
)
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
user.is_active = not user.is_active
db.commit()
return {
"message": f"User {'activated' if user.is_active else 'deactivated'} successfully",
"is_active": user.is_active
}
@router.put("/users/{user_id}/toggle-admin")
async def toggle_user_admin(
user_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Toggle user admin status."""
if user_id == current_user.id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot change your own admin status"
)
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
user.is_admin = not user.is_admin
db.commit()
return {
"message": f"User admin status {'granted' if user.is_admin else 'revoked'} successfully",
"is_admin": user.is_admin
}
@router.delete("/users/{user_id}")
async def delete_user(
user_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Delete a user and all their content."""
if user_id == current_user.id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot delete your own account"
)
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Delete user's files
user_dirs = [
Path(settings.UPLOAD_PATH) / "avatars" / str(user_id),
Path(settings.UPLOAD_PATH) / "vlogs" / str(user_id)
]
for dir_path in user_dirs:
if dir_path.exists():
import shutil
shutil.rmtree(dir_path)
# Delete user (cascade will handle related records)
db.delete(user)
db.commit()
return {"message": "User deleted successfully"}
@router.get("/storage/cleanup")
async def cleanup_storage(
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Find and optionally clean up orphaned files."""
upload_path = Path(settings.UPLOAD_PATH)
orphaned_files = []
total_orphaned_size = 0
try:
# Check for orphaned media files
if (upload_path / "albums").exists():
for album_dir in (upload_path / "albums").iterdir():
if album_dir.is_dir():
try:
album_id = int(album_dir.name)
album = db.query(Album).filter(Album.id == album_id).first()
if not album:
# Entire album directory is orphaned
size = get_directory_size(album_dir)
orphaned_files.append({
"path": str(album_dir),
"type": "album_directory",
"size": format_bytes(size)
})
total_orphaned_size += size
else:
# Check individual files
for file_path in album_dir.glob("*"):
if file_path.is_file():
relative_path = f"/albums/{album_id}/{file_path.name}"
media = db.query(Media).filter(
(Media.file_path == relative_path) |
(Media.thumbnail_path == relative_path)
).first()
if not media:
size = file_path.stat().st_size
orphaned_files.append({
"path": str(file_path),
"type": "media_file",
"size": format_bytes(size)
})
total_orphaned_size += size
except (ValueError, OSError) as e:
print(f"Error processing album directory {album_dir}: {e}")
continue
# Check for orphaned avatar files
if (upload_path / "avatars").exists():
for user_dir in (upload_path / "avatars").iterdir():
if user_dir.is_dir():
try:
user_id = int(user_dir.name)
user = db.query(User).filter(User.id == user_id).first()
if not user:
size = get_directory_size(user_dir)
orphaned_files.append({
"path": str(user_dir),
"type": "user_avatar_directory",
"size": format_bytes(size)
})
total_orphaned_size += size
except (ValueError, OSError) as e:
print(f"Error processing avatar directory {user_dir}: {e}")
continue
# Check for orphaned vlog files
if (upload_path / "vlogs").exists():
for user_dir in (upload_path / "vlogs").iterdir():
if user_dir.is_dir():
try:
user_id = int(user_dir.name)
user = db.query(User).filter(User.id == user_id).first()
if not user:
size = get_directory_size(user_dir)
orphaned_files.append({
"path": str(user_dir),
"type": "user_vlog_directory",
"size": format_bytes(size)
})
total_orphaned_size += size
except (ValueError, OSError) as e:
print(f"Error processing vlog directory {user_dir}: {e}")
continue
return {
"orphaned_files": orphaned_files,
"total_orphaned": len(orphaned_files),
"total_orphaned_size": format_bytes(total_orphaned_size),
"message": f"Trouvé {len(orphaned_files)} fichier(s) orphelin(s) pour un total de {format_bytes(total_orphaned_size)}. Utilisez DELETE pour les supprimer."
}
except Exception as e:
print(f"Error during cleanup scan: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error during cleanup scan: {str(e)}"
)
@router.delete("/storage/cleanup")
async def delete_orphaned_files(
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Delete orphaned files."""
upload_path = Path(settings.UPLOAD_PATH)
deleted_files = []
total_freed_space = 0
try:
# Check for orphaned media files
if (upload_path / "albums").exists():
for album_dir in (upload_path / "albums").iterdir():
if album_dir.is_dir():
try:
album_id = int(album_dir.name)
album = db.query(Album).filter(Album.id == album_id).first()
if not album:
# Entire album directory is orphaned
size = get_directory_size(album_dir)
import shutil
shutil.rmtree(album_dir)
deleted_files.append({
"path": str(album_dir),
"type": "album_directory",
"size": format_bytes(size)
})
total_freed_space += size
else:
# Check individual files
for file_path in album_dir.glob("*"):
if file_path.is_file():
relative_path = f"/albums/{album_id}/{file_path.name}"
media = db.query(Media).filter(
(Media.file_path == relative_path) |
(Media.thumbnail_path == relative_path)
).first()
if not media:
size = file_path.stat().st_size
file_path.unlink() # Delete the file
deleted_files.append({
"path": str(file_path),
"type": "media_file",
"size": format_bytes(size)
})
total_freed_space += size
except (ValueError, OSError) as e:
print(f"Error processing album directory {album_dir}: {e}")
continue
# Check for orphaned avatar files
if (upload_path / "avatars").exists():
for user_dir in (upload_path / "avatars").iterdir():
if user_dir.is_dir():
try:
user_id = int(user_dir.name)
user = db.query(User).filter(User.id == user_id).first()
if not user:
# User directory is orphaned
size = get_directory_size(user_dir)
import shutil
shutil.rmtree(user_dir)
deleted_files.append({
"path": str(user_dir),
"type": "user_avatar_directory",
"size": format_bytes(size)
})
total_freed_space += size
except (ValueError, OSError) as e:
print(f"Error processing avatar directory {user_dir}: {e}")
continue
# Check for orphaned vlog files
if (upload_path / "vlogs").exists():
for user_dir in (upload_path / "vlogs").iterdir():
if user_dir.is_dir():
try:
user_id = int(user_dir.name)
user = db.query(User).filter(User.id == user_id).first()
if not user:
# User directory is orphaned
size = get_directory_size(user_dir)
import shutil
shutil.rmtree(user_dir)
deleted_files.append({
"path": str(user_dir),
"type": "user_vlog_directory",
"size": format_bytes(size)
})
total_freed_space += size
except (ValueError, OSError) as e:
print(f"Error processing vlog directory {user_dir}: {e}")
continue
return {
"message": "Cleanup completed successfully",
"deleted_files": deleted_files,
"total_deleted": len(deleted_files),
"total_freed_space": format_bytes(total_freed_space),
"details": f"Supprimé {len(deleted_files)} fichier(s) pour libérer {format_bytes(total_freed_space)}"
}
except Exception as e:
print(f"Error during cleanup: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error during cleanup: {str(e)}"
)