Files
LeDiscord/backend/api/routers/albums.py
2025-08-27 18:34:38 +02:00

465 lines
16 KiB
Python
Executable File

from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File, Form
from sqlalchemy.orm import Session
from typing import List, Optional
import os
import uuid
from pathlib import Path
from PIL import Image
import magic
from config.database import get_db
from config.settings import settings
from models.album import Album, Media, MediaType, MediaLike
from models.user import User
from schemas.album import AlbumCreate, AlbumUpdate, AlbumResponse
from utils.security import get_current_active_user
from utils.settings_service import SettingsService
router = APIRouter()
@router.post("/", response_model=AlbumResponse)
async def create_album(
album_data: AlbumCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""Create a new album."""
album = Album(
**album_data.dict(),
creator_id=current_user.id
)
db.add(album)
db.commit()
db.refresh(album)
return format_album_response(album, db, current_user.id)
@router.get("/", response_model=List[AlbumResponse])
async def get_albums(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user),
event_id: Optional[int] = None
):
"""Get all albums, optionally filtered by event."""
query = db.query(Album)
if event_id:
query = query.filter(Album.event_id == event_id)
albums = query.order_by(Album.created_at.desc()).all()
return [format_album_response(album, db, current_user.id) for album in albums]
@router.get("/{album_id}", response_model=AlbumResponse)
async def get_album(
album_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""Get a specific album."""
album = db.query(Album).filter(Album.id == album_id).first()
if not album:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Album not found"
)
return format_album_response(album, db, current_user.id)
@router.put("/{album_id}", response_model=AlbumResponse)
async def update_album(
album_id: int,
album_update: AlbumUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""Update an album."""
album = db.query(Album).filter(Album.id == album_id).first()
if not album:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Album not found"
)
if album.creator_id != current_user.id and not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to update this album"
)
for field, value in album_update.dict(exclude_unset=True).items():
setattr(album, field, value)
db.commit()
db.refresh(album)
return format_album_response(album, db, current_user.id)
@router.delete("/{album_id}")
async def delete_album(
album_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""Delete an album."""
album = db.query(Album).filter(Album.id == album_id).first()
if not album:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Album not found"
)
if album.creator_id != current_user.id and not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to delete this album"
)
# Delete media files
for media in album.media:
try:
if media.file_path:
os.remove(settings.UPLOAD_PATH + media.file_path)
if media.thumbnail_path:
os.remove(settings.UPLOAD_PATH + media.thumbnail_path)
except:
pass
db.delete(album)
db.commit()
return {"message": "Album deleted successfully"}
@router.post("/{album_id}/media")
async def upload_media(
album_id: int,
files: List[UploadFile] = File(...),
captions: Optional[List[str]] = Form(None),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""Upload media files to an album."""
album = db.query(Album).filter(Album.id == album_id).first()
if not album:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Album not found"
)
if album.creator_id != current_user.id and not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to add media to this album"
)
# Check max media per album limit
from utils.settings_service import SettingsService
max_media_per_album = SettingsService.get_setting(db, "max_media_per_album", 50)
current_media_count = len(album.media)
if current_media_count + len(files) > max_media_per_album:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Album would exceed maximum of {max_media_per_album} media files. Current: {current_media_count}, trying to add: {len(files)}"
)
# Create album directory
album_dir = Path(settings.UPLOAD_PATH) / "albums" / str(album_id)
album_dir.mkdir(parents=True, exist_ok=True)
thumbnails_dir = album_dir / "thumbnails"
thumbnails_dir.mkdir(exist_ok=True)
uploaded_media = []
for i, file in enumerate(files):
# Check file size
contents = await file.read()
file_size = len(contents)
# Get dynamic upload limits
max_size = SettingsService.get_max_upload_size(db, file.content_type or "unknown")
if file_size > max_size:
max_size_mb = max_size // (1024 * 1024)
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail=f"File {file.filename} exceeds maximum size of {max_size_mb}MB"
)
# Check file type
mime = magic.from_buffer(contents, mime=True)
if not SettingsService.is_file_type_allowed(db, mime):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"File {file.filename} has unsupported type: {mime}"
)
# Determine media type
if mime.startswith('image/'):
media_type = MediaType.IMAGE
elif mime.startswith('video/'):
media_type = MediaType.VIDEO
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"File {file.filename} has unsupported type: {mime}"
)
# Generate unique filename
file_extension = file.filename.split(".")[-1]
filename = f"{uuid.uuid4()}.{file_extension}"
file_path = album_dir / filename
# Save file
with open(file_path, "wb") as f:
f.write(contents)
# Process media
thumbnail_path = None
width = height = duration = None
if media_type == MediaType.IMAGE:
# Get image dimensions and create thumbnail
try:
img = Image.open(file_path)
width, height = img.size
# Create thumbnail with better quality
thumbnail = img.copy()
thumbnail.thumbnail((800, 800)) # Larger thumbnail for better quality
thumbnail_filename = f"thumb_{filename}"
thumbnail_path_full = thumbnails_dir / thumbnail_filename
thumbnail.save(thumbnail_path_full, quality=95, optimize=True) # High quality
thumbnail_path = f"/albums/{album_id}/thumbnails/{thumbnail_filename}"
except Exception as e:
print(f"Error processing image: {e}")
# Create media record
media = Media(
album_id=album_id,
file_path=f"/albums/{album_id}/{filename}",
thumbnail_path=thumbnail_path,
media_type=media_type,
caption=captions[i] if captions and i < len(captions) else None,
file_size=file_size,
width=width,
height=height,
duration=duration
)
db.add(media)
uploaded_media.append(media)
# Set first image as album cover if not set
if not album.cover_image and uploaded_media:
first_image = next((m for m in uploaded_media if m.media_type == MediaType.IMAGE), None)
if first_image:
album.cover_image = first_image.thumbnail_path or first_image.file_path
db.commit()
# Update event cover image if this album is linked to an event
if album.event_id:
update_event_cover_from_album(album.event_id, db)
return {
"message": f"Successfully uploaded {len(uploaded_media)} files",
"media_count": len(uploaded_media)
}
@router.delete("/{album_id}/media/{media_id}")
async def delete_media(
album_id: int,
media_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""Delete a media file from an album."""
media = db.query(Media).filter(
Media.id == media_id,
Media.album_id == album_id
).first()
if not media:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Media not found"
)
album = media.album
if album.creator_id != current_user.id and not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to delete this media"
)
# Delete files
try:
if media.file_path:
os.remove(settings.UPLOAD_PATH + media.file_path)
if media.thumbnail_path:
os.remove(settings.UPLOAD_PATH + media.thumbnail_path)
except:
pass
db.delete(media)
db.commit()
# Update event cover image if this album is linked to an event
if album.event_id:
update_event_cover_from_album(album.event_id, db)
return {"message": "Media deleted successfully"}
@router.post("/{album_id}/media/{media_id}/like")
async def toggle_media_like(
album_id: int,
media_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""Toggle like on a media file."""
media = db.query(Media).filter(
Media.id == media_id,
Media.album_id == album_id
).first()
if not media:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Media not found"
)
existing_like = db.query(MediaLike).filter(
MediaLike.media_id == media_id,
MediaLike.user_id == current_user.id
).first()
if existing_like:
# Unlike
db.delete(existing_like)
media.likes_count = max(0, media.likes_count - 1)
message = "Like removed"
else:
# Like
like = MediaLike(media_id=media_id, user_id=current_user.id)
db.add(like)
media.likes_count += 1
message = "Media liked"
db.commit()
# Update event cover image if this album is linked to an event
album = media.album
if album.event_id:
update_event_cover_from_album(album.event_id, db)
return {"message": message, "likes_count": media.likes_count}
def format_album_response(album: Album, db: Session, current_user_id: int) -> dict:
"""Format album response with additional information."""
# Get top media (most liked)
top_media = db.query(Media).filter(
Media.album_id == album.id
).order_by(Media.likes_count.desc()).limit(3).all()
# Format media with likes information
formatted_media = []
for media in album.media:
# Check if current user liked this media
is_liked = db.query(MediaLike).filter(
MediaLike.media_id == media.id,
MediaLike.user_id == current_user_id
).first() is not None
# Format likes
likes = []
for like in media.likes:
user = db.query(User).filter(User.id == like.user_id).first()
if user:
likes.append({
"id": like.id,
"user_id": user.id,
"username": user.username,
"full_name": user.full_name,
"avatar_url": user.avatar_url,
"created_at": like.created_at
})
formatted_media.append({
"id": media.id,
"file_path": media.file_path,
"thumbnail_path": media.thumbnail_path,
"media_type": media.media_type,
"caption": media.caption,
"file_size": media.file_size,
"width": media.width,
"height": media.height,
"duration": media.duration,
"likes_count": media.likes_count,
"is_liked": is_liked,
"likes": likes,
"created_at": media.created_at
})
# Format top media
formatted_top_media = []
for media in top_media:
is_liked = db.query(MediaLike).filter(
MediaLike.media_id == media.id,
MediaLike.user_id == current_user_id
).first() is not None
likes = []
for like in media.likes:
user = db.query(User).filter(User.id == like.user_id).first()
if user:
likes.append({
"id": like.id,
"user_id": user.id,
"username": user.username,
"full_name": user.full_name,
"avatar_url": user.avatar_url,
"created_at": like.created_at
})
formatted_top_media.append({
"id": media.id,
"file_path": media.file_path,
"thumbnail_path": media.thumbnail_path,
"media_type": media.media_type,
"caption": media.caption,
"file_size": media.file_size,
"width": media.width,
"height": media.height,
"duration": media.duration,
"likes_count": media.likes_count,
"is_liked": is_liked,
"likes": likes,
"created_at": media.created_at
})
return {
"id": album.id,
"title": album.title,
"description": album.description,
"creator_id": album.creator_id,
"event_id": album.event_id,
"cover_image": album.cover_image,
"created_at": album.created_at,
"updated_at": album.updated_at,
"creator_name": album.creator.full_name,
"media_count": len(album.media),
"media": formatted_media,
"top_media": formatted_top_media,
"event_title": album.event.title if album.event else None
}
def update_event_cover_from_album(event_id: int, db: Session):
"""Update event cover image from the most liked media in linked albums."""
from models.event import Event
event = db.query(Event).filter(Event.id == event_id).first()
if not event:
return
# Get the most liked media from all albums linked to this event
most_liked_media = db.query(Media).join(Album).filter(
Album.event_id == event_id,
Media.media_type == MediaType.IMAGE
).order_by(Media.likes_count.desc()).first()
if most_liked_media:
event.cover_image = most_liked_media.thumbnail_path or most_liked_media.file_path
db.commit()