445 lines
17 KiB
Python
445 lines
17 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import func
|
|
from typing import List
|
|
import os
|
|
from pathlib import Path
|
|
from config.database import get_db
|
|
from config.settings import settings
|
|
from models.user import User
|
|
from models.album import Album, Media
|
|
from models.vlog import Vlog
|
|
from models.event import Event
|
|
from models.post import Post
|
|
from utils.security import get_admin_user
|
|
|
|
router = APIRouter()
|
|
|
|
def get_directory_size(path):
|
|
"""Calculate total size of a directory."""
|
|
total = 0
|
|
try:
|
|
for entry in os.scandir(path):
|
|
if entry.is_file():
|
|
total += entry.stat().st_size
|
|
elif entry.is_dir():
|
|
total += get_directory_size(entry.path)
|
|
except (OSError, PermissionError):
|
|
pass
|
|
return total
|
|
|
|
def format_bytes(bytes):
|
|
"""Format bytes to human readable string."""
|
|
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
|
|
if bytes < 1024.0:
|
|
return f"{bytes:.2f} {unit}"
|
|
bytes /= 1024.0
|
|
return f"{bytes:.2f} PB"
|
|
|
|
@router.get("/dashboard")
|
|
async def get_admin_dashboard(
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_admin_user)
|
|
):
|
|
"""Get admin dashboard information."""
|
|
# User statistics
|
|
total_users = db.query(User).count()
|
|
active_users = db.query(User).filter(User.is_active == True).count()
|
|
admin_users = db.query(User).filter(User.is_admin == True).count()
|
|
|
|
# Content statistics
|
|
total_events = db.query(Event).count()
|
|
total_posts = db.query(Post).count()
|
|
total_vlogs = db.query(Vlog).count()
|
|
total_media = db.query(Media).count()
|
|
|
|
# Storage statistics
|
|
upload_path = Path(settings.UPLOAD_PATH)
|
|
total_storage = 0
|
|
storage_breakdown = {}
|
|
|
|
if upload_path.exists():
|
|
# Calculate storage by category
|
|
categories = ['avatars', 'albums', 'vlogs', 'posts']
|
|
for category in categories:
|
|
category_path = upload_path / category
|
|
if category_path.exists():
|
|
size = get_directory_size(category_path)
|
|
storage_breakdown[category] = {
|
|
"bytes": size,
|
|
"formatted": format_bytes(size)
|
|
}
|
|
total_storage += size
|
|
|
|
# Database storage
|
|
db_size = db.query(func.sum(Media.file_size)).scalar() or 0
|
|
|
|
return {
|
|
"users": {
|
|
"total": total_users,
|
|
"active": active_users,
|
|
"admins": admin_users
|
|
},
|
|
"content": {
|
|
"events": total_events,
|
|
"posts": total_posts,
|
|
"vlogs": total_vlogs,
|
|
"media_files": total_media
|
|
},
|
|
"storage": {
|
|
"total_bytes": total_storage,
|
|
"total_formatted": format_bytes(total_storage),
|
|
"breakdown": storage_breakdown,
|
|
"database_tracked": format_bytes(db_size)
|
|
}
|
|
}
|
|
|
|
@router.get("/users")
|
|
async def get_all_users_admin(
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_admin_user)
|
|
):
|
|
"""Get all users with admin details."""
|
|
users = db.query(User).all()
|
|
|
|
user_list = []
|
|
for user in users:
|
|
# Calculate user storage
|
|
user_media = db.query(func.sum(Media.file_size)).join(Album).filter(
|
|
Album.creator_id == user.id
|
|
).scalar() or 0
|
|
|
|
user_vlogs = db.query(func.sum(Vlog.id)).filter(
|
|
Vlog.author_id == user.id
|
|
).scalar() or 0
|
|
|
|
user_list.append({
|
|
"id": user.id,
|
|
"email": user.email,
|
|
"username": user.username,
|
|
"full_name": user.full_name,
|
|
"is_active": user.is_active,
|
|
"is_admin": user.is_admin,
|
|
"created_at": user.created_at,
|
|
"attendance_rate": user.attendance_rate,
|
|
"storage_used": format_bytes(user_media),
|
|
"content_count": {
|
|
"posts": db.query(Post).filter(Post.author_id == user.id).count(),
|
|
"vlogs": user_vlogs,
|
|
"albums": db.query(Album).filter(Album.creator_id == user.id).count()
|
|
}
|
|
})
|
|
|
|
return user_list
|
|
|
|
@router.put("/users/{user_id}/toggle-active")
|
|
async def toggle_user_active(
|
|
user_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_admin_user)
|
|
):
|
|
"""Toggle user active status."""
|
|
if user_id == current_user.id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Cannot deactivate your own account"
|
|
)
|
|
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="User not found"
|
|
)
|
|
|
|
user.is_active = not user.is_active
|
|
db.commit()
|
|
|
|
return {
|
|
"message": f"User {'activated' if user.is_active else 'deactivated'} successfully",
|
|
"is_active": user.is_active
|
|
}
|
|
|
|
@router.put("/users/{user_id}/toggle-admin")
|
|
async def toggle_user_admin(
|
|
user_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_admin_user)
|
|
):
|
|
"""Toggle user admin status."""
|
|
if user_id == current_user.id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Cannot change your own admin status"
|
|
)
|
|
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="User not found"
|
|
)
|
|
|
|
user.is_admin = not user.is_admin
|
|
db.commit()
|
|
|
|
return {
|
|
"message": f"User admin status {'granted' if user.is_admin else 'revoked'} successfully",
|
|
"is_admin": user.is_admin
|
|
}
|
|
|
|
@router.delete("/users/{user_id}")
|
|
async def delete_user(
|
|
user_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_admin_user)
|
|
):
|
|
"""Delete a user and all their content."""
|
|
if user_id == current_user.id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Cannot delete your own account"
|
|
)
|
|
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="User not found"
|
|
)
|
|
|
|
# Delete user's files
|
|
user_dirs = [
|
|
Path(settings.UPLOAD_PATH) / "avatars" / str(user_id),
|
|
Path(settings.UPLOAD_PATH) / "vlogs" / str(user_id)
|
|
]
|
|
|
|
for dir_path in user_dirs:
|
|
if dir_path.exists():
|
|
import shutil
|
|
shutil.rmtree(dir_path)
|
|
|
|
# Delete user (cascade will handle related records)
|
|
db.delete(user)
|
|
db.commit()
|
|
|
|
return {"message": "User deleted successfully"}
|
|
|
|
@router.get("/storage/cleanup")
|
|
async def cleanup_storage(
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_admin_user)
|
|
):
|
|
"""Find and optionally clean up orphaned files."""
|
|
upload_path = Path(settings.UPLOAD_PATH)
|
|
orphaned_files = []
|
|
total_orphaned_size = 0
|
|
|
|
try:
|
|
# Check for orphaned media files
|
|
if (upload_path / "albums").exists():
|
|
for album_dir in (upload_path / "albums").iterdir():
|
|
if album_dir.is_dir():
|
|
try:
|
|
album_id = int(album_dir.name)
|
|
album = db.query(Album).filter(Album.id == album_id).first()
|
|
|
|
if not album:
|
|
# Entire album directory is orphaned
|
|
size = get_directory_size(album_dir)
|
|
orphaned_files.append({
|
|
"path": str(album_dir),
|
|
"type": "album_directory",
|
|
"size": format_bytes(size)
|
|
})
|
|
total_orphaned_size += size
|
|
else:
|
|
# Check individual files
|
|
for file_path in album_dir.glob("*"):
|
|
if file_path.is_file():
|
|
relative_path = f"/albums/{album_id}/{file_path.name}"
|
|
media = db.query(Media).filter(
|
|
(Media.file_path == relative_path) |
|
|
(Media.thumbnail_path == relative_path)
|
|
).first()
|
|
|
|
if not media:
|
|
size = file_path.stat().st_size
|
|
orphaned_files.append({
|
|
"path": str(file_path),
|
|
"type": "media_file",
|
|
"size": format_bytes(size)
|
|
})
|
|
total_orphaned_size += size
|
|
except (ValueError, OSError) as e:
|
|
print(f"Error processing album directory {album_dir}: {e}")
|
|
continue
|
|
|
|
# Check for orphaned avatar files
|
|
if (upload_path / "avatars").exists():
|
|
for user_dir in (upload_path / "avatars").iterdir():
|
|
if user_dir.is_dir():
|
|
try:
|
|
user_id = int(user_dir.name)
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
|
|
if not user:
|
|
size = get_directory_size(user_dir)
|
|
orphaned_files.append({
|
|
"path": str(user_dir),
|
|
"type": "user_avatar_directory",
|
|
"size": format_bytes(size)
|
|
})
|
|
total_orphaned_size += size
|
|
except (ValueError, OSError) as e:
|
|
print(f"Error processing avatar directory {user_dir}: {e}")
|
|
continue
|
|
|
|
# Check for orphaned vlog files
|
|
if (upload_path / "vlogs").exists():
|
|
for user_dir in (upload_path / "vlogs").iterdir():
|
|
if user_dir.is_dir():
|
|
try:
|
|
user_id = int(user_dir.name)
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
|
|
if not user:
|
|
size = get_directory_size(user_dir)
|
|
orphaned_files.append({
|
|
"path": str(user_dir),
|
|
"type": "user_vlog_directory",
|
|
"size": format_bytes(size)
|
|
})
|
|
total_orphaned_size += size
|
|
except (ValueError, OSError) as e:
|
|
print(f"Error processing vlog directory {user_dir}: {e}")
|
|
continue
|
|
|
|
return {
|
|
"orphaned_files": orphaned_files,
|
|
"total_orphaned": len(orphaned_files),
|
|
"total_orphaned_size": format_bytes(total_orphaned_size),
|
|
"message": f"Trouvé {len(orphaned_files)} fichier(s) orphelin(s) pour un total de {format_bytes(total_orphaned_size)}. Utilisez DELETE pour les supprimer."
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Error during cleanup scan: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Error during cleanup scan: {str(e)}"
|
|
)
|
|
|
|
@router.delete("/storage/cleanup")
|
|
async def delete_orphaned_files(
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_admin_user)
|
|
):
|
|
"""Delete orphaned files."""
|
|
upload_path = Path(settings.UPLOAD_PATH)
|
|
deleted_files = []
|
|
total_freed_space = 0
|
|
|
|
try:
|
|
# Check for orphaned media files
|
|
if (upload_path / "albums").exists():
|
|
for album_dir in (upload_path / "albums").iterdir():
|
|
if album_dir.is_dir():
|
|
try:
|
|
album_id = int(album_dir.name)
|
|
album = db.query(Album).filter(Album.id == album_id).first()
|
|
|
|
if not album:
|
|
# Entire album directory is orphaned
|
|
size = get_directory_size(album_dir)
|
|
import shutil
|
|
shutil.rmtree(album_dir)
|
|
deleted_files.append({
|
|
"path": str(album_dir),
|
|
"type": "album_directory",
|
|
"size": format_bytes(size)
|
|
})
|
|
total_freed_space += size
|
|
else:
|
|
# Check individual files
|
|
for file_path in album_dir.glob("*"):
|
|
if file_path.is_file():
|
|
relative_path = f"/albums/{album_id}/{file_path.name}"
|
|
media = db.query(Media).filter(
|
|
(Media.file_path == relative_path) |
|
|
(Media.thumbnail_path == relative_path)
|
|
).first()
|
|
|
|
if not media:
|
|
size = file_path.stat().st_size
|
|
file_path.unlink() # Delete the file
|
|
deleted_files.append({
|
|
"path": str(file_path),
|
|
"type": "media_file",
|
|
"size": format_bytes(size)
|
|
})
|
|
total_freed_space += size
|
|
except (ValueError, OSError) as e:
|
|
print(f"Error processing album directory {album_dir}: {e}")
|
|
continue
|
|
|
|
# Check for orphaned avatar files
|
|
if (upload_path / "avatars").exists():
|
|
for user_dir in (upload_path / "avatars").iterdir():
|
|
if user_dir.is_dir():
|
|
try:
|
|
user_id = int(user_dir.name)
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
|
|
if not user:
|
|
# User directory is orphaned
|
|
size = get_directory_size(user_dir)
|
|
import shutil
|
|
shutil.rmtree(user_dir)
|
|
deleted_files.append({
|
|
"path": str(user_dir),
|
|
"type": "user_avatar_directory",
|
|
"size": format_bytes(size)
|
|
})
|
|
total_freed_space += size
|
|
except (ValueError, OSError) as e:
|
|
print(f"Error processing avatar directory {user_dir}: {e}")
|
|
continue
|
|
|
|
# Check for orphaned vlog files
|
|
if (upload_path / "vlogs").exists():
|
|
for user_dir in (upload_path / "vlogs").iterdir():
|
|
if user_dir.is_dir():
|
|
try:
|
|
user_id = int(user_dir.name)
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
|
|
if not user:
|
|
# User directory is orphaned
|
|
size = get_directory_size(user_dir)
|
|
import shutil
|
|
shutil.rmtree(user_dir)
|
|
deleted_files.append({
|
|
"path": str(user_dir),
|
|
"type": "user_vlog_directory",
|
|
"size": format_bytes(size)
|
|
})
|
|
total_freed_space += size
|
|
except (ValueError, OSError) as e:
|
|
print(f"Error processing vlog directory {user_dir}: {e}")
|
|
continue
|
|
|
|
return {
|
|
"message": "Cleanup completed successfully",
|
|
"deleted_files": deleted_files,
|
|
"total_deleted": len(deleted_files),
|
|
"total_freed_space": format_bytes(total_freed_space),
|
|
"details": f"Supprimé {len(deleted_files)} fichier(s) pour libérer {format_bytes(total_freed_space)}"
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Error during cleanup: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Error during cleanup: {str(e)}"
|
|
)
|