Files
LeDiscord/backend/api/routers/events.py
EvanChal dfeaecce73
Some checks failed
Deploy to Development / build-and-deploy (push) Failing after 20s
fix+feat(everything): lot of things
2026-01-25 22:14:48 +01:00

443 lines
16 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session, joinedload
from typing import List
from datetime import datetime
from config.database import get_db
from models.event import Event, EventParticipation, ParticipationStatus
from models.user import User
from models.notification import Notification, NotificationType
from schemas.event import EventCreate, EventUpdate, EventResponse, ParticipationUpdate
from utils.security import get_current_active_user
from utils.email import send_event_notification
router = APIRouter()
@router.post("/", response_model=EventResponse)
async def create_event(
event_data: EventCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""Create a new event."""
event_dict = event_data.dict(exclude={'invited_user_ids'})
event = Event(
**event_dict,
creator_id=current_user.id
)
db.add(event)
db.commit()
db.refresh(event)
# Gérer les invitations selon le type d'événement
if event.is_private:
# Événement privé : inviter uniquement les utilisateurs sélectionnés
invited_user_ids = event_data.invited_user_ids or []
# Toujours inclure le créateur
if current_user.id not in invited_user_ids:
invited_user_ids.append(current_user.id)
for user_id in invited_user_ids:
user = db.query(User).filter(User.id == user_id, User.is_active == True).first()
if user:
participation = EventParticipation(
event_id=event.id,
user_id=user.id,
status=ParticipationStatus.PENDING
)
db.add(participation)
# Create notification
if user.id != current_user.id:
notification = Notification(
user_id=user.id,
type=NotificationType.EVENT_INVITATION,
title=f"Invitation à un événement privé: {event.title}",
message=f"{current_user.full_name} vous a invité à un événement privé",
link=f"/events/{event.id}"
)
db.add(notification)
# Send email notification
try:
send_event_notification(user.email, event)
except:
pass
else:
# Événement public : inviter tous les utilisateurs actifs
users = db.query(User).filter(User.is_active == True).all()
for user in users:
participation = EventParticipation(
event_id=event.id,
user_id=user.id,
status=ParticipationStatus.PENDING
)
db.add(participation)
# Create notification
if user.id != current_user.id:
notification = Notification(
user_id=user.id,
type=NotificationType.EVENT_INVITATION,
title=f"Nouvel événement: {event.title}",
message=f"{current_user.full_name} a créé un nouvel événement",
link=f"/events/{event.id}"
)
db.add(notification)
# Send email notification
try:
send_event_notification(user.email, event)
except:
pass
db.commit()
return format_event_response(event, db)
@router.get("/", response_model=List[EventResponse])
async def get_events(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user),
upcoming: bool = None
):
"""Get all events, optionally filtered by upcoming status."""
query = db.query(Event).options(joinedload(Event.creator))
if upcoming is True:
# Only upcoming events
query = query.filter(Event.date >= datetime.utcnow())
elif upcoming is False:
# Only past events
query = query.filter(Event.date < datetime.utcnow())
# If upcoming is None, return all events
events = query.order_by(Event.date.desc()).all()
# Filtrer les événements privés : ne montrer que ceux où l'utilisateur est invité
filtered_events = []
for event in events:
if event.is_private:
# Vérifier si l'utilisateur est invité (a une participation)
participation = db.query(EventParticipation).filter(
EventParticipation.event_id == event.id,
EventParticipation.user_id == current_user.id
).first()
if participation or event.creator_id == current_user.id or current_user.is_admin:
filtered_events.append(event)
else:
# Événement public : visible par tous
filtered_events.append(event)
return [format_event_response(event, db) for event in filtered_events]
@router.get("/upcoming", response_model=List[EventResponse])
async def get_upcoming_events(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""Get only upcoming events."""
events = db.query(Event).options(joinedload(Event.creator)).filter(
Event.date >= datetime.utcnow()
).order_by(Event.date).all()
# Filtrer les événements privés
filtered_events = []
for event in events:
if event.is_private:
participation = db.query(EventParticipation).filter(
EventParticipation.event_id == event.id,
EventParticipation.user_id == current_user.id
).first()
if participation or event.creator_id == current_user.id or current_user.is_admin:
filtered_events.append(event)
else:
filtered_events.append(event)
return [format_event_response(event, db) for event in filtered_events]
@router.get("/past", response_model=List[EventResponse])
async def get_past_events(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""Get only past events."""
events = db.query(Event).options(joinedload(Event.creator)).filter(
Event.date < datetime.utcnow()
).order_by(Event.date.desc()).all()
# Filtrer les événements privés
filtered_events = []
for event in events:
if event.is_private:
participation = db.query(EventParticipation).filter(
EventParticipation.event_id == event.id,
EventParticipation.user_id == current_user.id
).first()
if participation or event.creator_id == current_user.id or current_user.is_admin:
filtered_events.append(event)
else:
filtered_events.append(event)
return [format_event_response(event, db) for event in filtered_events]
@router.get("/{event_id}", response_model=EventResponse)
async def get_event(
event_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""Get a specific event."""
event = db.query(Event).filter(Event.id == event_id).first()
if not event:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Event not found"
)
# Vérifier l'accès pour les événements privés
if event.is_private:
participation = db.query(EventParticipation).filter(
EventParticipation.event_id == event.id,
EventParticipation.user_id == current_user.id
).first()
if not participation and event.creator_id != current_user.id and not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to this private event"
)
return format_event_response(event, db)
@router.put("/{event_id}", response_model=EventResponse)
async def update_event(
event_id: int,
event_update: EventUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""Update an event."""
event = db.query(Event).filter(Event.id == event_id).first()
if not event:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Event not found"
)
if event.creator_id != current_user.id and not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to update this event"
)
for field, value in event_update.dict(exclude_unset=True).items():
setattr(event, field, value)
db.commit()
db.refresh(event)
return format_event_response(event, db)
@router.delete("/{event_id}")
async def delete_event(
event_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""Delete an event."""
event = db.query(Event).filter(Event.id == event_id).first()
if not event:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Event not found"
)
if event.creator_id != current_user.id and not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to delete this event"
)
db.delete(event)
db.commit()
return {"message": "Event deleted successfully"}
@router.post("/{event_id}/invite", response_model=EventResponse)
async def invite_users_to_event(
event_id: int,
user_ids: List[int],
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""Invite users to a private event."""
event = db.query(Event).filter(Event.id == event_id).first()
if not event:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Event not found"
)
# Vérifier que l'événement est privé et que l'utilisateur est le créateur ou admin
if not event.is_private:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="This endpoint is only for private events"
)
if event.creator_id != current_user.id and not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only the event creator can invite users"
)
# Inviter les utilisateurs
for user_id in user_ids:
# Vérifier si l'utilisateur existe et est actif
user = db.query(User).filter(User.id == user_id, User.is_active == True).first()
if not user:
continue
# Vérifier si l'utilisateur n'est pas déjà invité
existing_participation = db.query(EventParticipation).filter(
EventParticipation.event_id == event_id,
EventParticipation.user_id == user_id
).first()
if not existing_participation:
participation = EventParticipation(
event_id=event.id,
user_id=user.id,
status=ParticipationStatus.PENDING
)
db.add(participation)
# Créer une notification
notification = Notification(
user_id=user.id,
type=NotificationType.EVENT_INVITATION,
title=f"Invitation à un événement privé: {event.title}",
message=f"{current_user.full_name} vous a invité à un événement privé",
link=f"/events/{event.id}"
)
db.add(notification)
# Envoyer un email
try:
send_event_notification(user.email, event)
except:
pass
db.commit()
return format_event_response(event, db)
@router.put("/{event_id}/participation", response_model=EventResponse)
async def update_participation(
event_id: int,
participation_update: ParticipationUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""Update user participation status for an event."""
event = db.query(Event).filter(Event.id == event_id).first()
if not event:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Event not found"
)
# Pour les événements privés, vérifier que l'utilisateur est invité
if event.is_private:
participation_check = db.query(EventParticipation).filter(
EventParticipation.event_id == event_id,
EventParticipation.user_id == current_user.id
).first()
if not participation_check and event.creator_id != current_user.id and not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You are not invited to this private event"
)
participation = db.query(EventParticipation).filter(
EventParticipation.event_id == event_id,
EventParticipation.user_id == current_user.id
).first()
if not participation:
# Create new participation if it doesn't exist
participation = EventParticipation(
event_id=event_id,
user_id=current_user.id,
status=participation_update.status
)
db.add(participation)
else:
participation.status = participation_update.status
participation.response_date = datetime.utcnow()
db.commit()
# Update user attendance rate
update_user_attendance_rate(current_user, db)
event = db.query(Event).filter(Event.id == event_id).first()
return format_event_response(event, db)
def format_event_response(event: Event, db: Session) -> dict:
"""Format event response with participation counts."""
participations = []
present_count = absent_count = maybe_count = pending_count = 0
# Get creator user directly
creator = db.query(User).filter(User.id == event.creator_id).first()
for p in event.participations:
user = db.query(User).filter(User.id == p.user_id).first()
participations.append({
"user_id": user.id,
"username": user.username,
"full_name": user.full_name,
"avatar_url": user.avatar_url,
"status": p.status,
"response_date": p.response_date
})
if p.status == ParticipationStatus.PRESENT:
present_count += 1
elif p.status == ParticipationStatus.ABSENT:
absent_count += 1
elif p.status == ParticipationStatus.MAYBE:
maybe_count += 1
else:
pending_count += 1
return {
"id": event.id,
"title": event.title,
"description": event.description,
"location": event.location,
"date": event.date,
"end_date": event.end_date,
"creator_id": event.creator_id,
"cover_image": event.cover_image,
"is_private": event.is_private if event.is_private is not None else False,
"created_at": event.created_at,
"updated_at": event.updated_at,
"creator_name": creator.full_name if creator else "Unknown",
"creator_avatar": creator.avatar_url if creator else None,
"participations": participations,
"present_count": present_count,
"absent_count": absent_count,
"maybe_count": maybe_count,
"pending_count": pending_count
}
def update_user_attendance_rate(user: User, db: Session):
"""Update user attendance rate based on past events."""
past_participations = db.query(EventParticipation).join(Event).filter(
EventParticipation.user_id == user.id,
Event.date < datetime.utcnow(),
EventParticipation.status.in_([ParticipationStatus.PRESENT, ParticipationStatus.ABSENT])
).all()
if past_participations:
present_count = sum(1 for p in past_participations if p.status == ParticipationStatus.PRESENT)
user.attendance_rate = (present_count / len(past_participations)) * 100
db.commit()