initial commit - LeDiscord plateforme des copains

This commit is contained in:
EvanChal
2025-08-21 00:28:21 +02:00
commit b7a84a53aa
93 changed files with 16247 additions and 0 deletions

72
backend/utils/email.py Normal file
View File

@@ -0,0 +1,72 @@
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from config.settings import settings
def send_email(to_email: str, subject: str, body: str, html_body: str = None):
"""Send an email notification."""
if not settings.SMTP_USER or not settings.SMTP_PASSWORD:
print(f"Email configuration missing, skipping email to {to_email}")
return
msg = MIMEMultipart('alternative')
msg['Subject'] = subject
msg['From'] = settings.SMTP_FROM
msg['To'] = to_email
# Add plain text part
text_part = MIMEText(body, 'plain')
msg.attach(text_part)
# Add HTML part if provided
if html_body:
html_part = MIMEText(html_body, 'html')
msg.attach(html_part)
try:
with smtplib.SMTP(settings.SMTP_HOST, settings.SMTP_PORT) as server:
server.starttls()
server.login(settings.SMTP_USER, settings.SMTP_PASSWORD)
server.send_message(msg)
print(f"Email sent successfully to {to_email}")
except Exception as e:
print(f"Failed to send email to {to_email}: {e}")
def send_event_notification(to_email: str, event):
"""Send event notification email."""
subject = f"Nouvel événement: {event.title}"
body = f"""
Bonjour,
Un nouvel événement a été créé sur LeDiscord:
{event.title}
Date: {event.date.strftime('%d/%m/%Y à %H:%M')}
Lieu: {event.location or 'Non spécifié'}
{event.description or ''}
Connectez-vous pour indiquer votre présence: {settings.APP_URL}/events/{event.id}
À bientôt !
L'équipe LeDiscord
"""
html_body = f"""
<html>
<body style="font-family: Arial, sans-serif;">
<h2>Nouvel événement sur LeDiscord</h2>
<h3>{event.title}</h3>
<p><strong>Date:</strong> {event.date.strftime('%d/%m/%Y à %H:%M')}</p>
<p><strong>Lieu:</strong> {event.location or 'Non spécifié'}</p>
{f'<p>{event.description}</p>' if event.description else ''}
<a href="{settings.APP_URL}/events/{event.id}"
style="display: inline-block; padding: 10px 20px; background-color: #007bff; color: white; text-decoration: none; border-radius: 5px;">
Indiquer ma présence
</a>
</body>
</html>
"""
send_email(to_email, subject, body, html_body)

32
backend/utils/init_db.py Normal file
View File

@@ -0,0 +1,32 @@
from sqlalchemy.orm import Session
from config.database import SessionLocal
from config.settings import settings
from models.user import User
from utils.security import get_password_hash
def init_database():
"""Initialize database with default admin user."""
db = SessionLocal()
try:
# Check if admin user exists
admin = db.query(User).filter(User.email == settings.ADMIN_EMAIL).first()
if not admin:
# Create admin user
admin = User(
email=settings.ADMIN_EMAIL,
username="admin",
full_name="Administrator",
hashed_password=get_password_hash(settings.ADMIN_PASSWORD),
is_active=True,
is_admin=True
)
db.add(admin)
db.commit()
print(f"Admin user created: {settings.ADMIN_EMAIL}")
else:
print(f"Admin user already exists: {settings.ADMIN_EMAIL}")
except Exception as e:
print(f"Error initializing database: {e}")
db.rollback()
finally:
db.close()

View File

@@ -0,0 +1,136 @@
from sqlalchemy.orm import Session
from models.notification import Notification, NotificationType
from models.user import User
from models.post import Post
from models.vlog import Vlog
from models.album import Album
from models.event import Event
from datetime import datetime
class NotificationService:
"""Service for managing notifications."""
@staticmethod
def create_mention_notification(
db: Session,
mentioned_user_id: int,
author: User,
content_type: str,
content_id: int,
content_preview: str = None
):
"""Create a notification for a user mention."""
if mentioned_user_id == author.id:
return # Don't notify self
notification = Notification(
user_id=mentioned_user_id,
type=NotificationType.POST_MENTION,
title="Vous avez été mentionné",
message=f"{author.full_name} vous a mentionné dans un(e) {content_type}",
link=f"/{content_type}s/{content_id}",
is_read=False,
created_at=datetime.utcnow()
)
db.add(notification)
db.commit()
return notification
@staticmethod
def create_event_notification(
db: Session,
user_id: int,
event: Event,
author: User
):
"""Create a notification for a new event."""
if user_id == author.id:
return # Don't notify creator
notification = Notification(
user_id=user_id,
type=NotificationType.EVENT_INVITATION,
title="Nouvel événement",
message=f"{author.full_name} a créé un nouvel événement : {event.title}",
link=f"/events/{event.id}",
is_read=False,
created_at=datetime.utcnow()
)
db.add(notification)
db.commit()
return notification
@staticmethod
def create_album_notification(
db: Session,
user_id: int,
album: Album,
author: User
):
"""Create a notification for a new album."""
if user_id == author.id:
return # Don't notify creator
notification = Notification(
user_id=user_id,
type=NotificationType.NEW_ALBUM,
title="Nouvel album",
message=f"{author.full_name} a créé un nouvel album : {album.title}",
link=f"/albums/{album.id}",
is_read=False,
created_at=datetime.utcnow()
)
db.add(notification)
db.commit()
return notification
@staticmethod
def create_vlog_notification(
db: Session,
user_id: int,
vlog: Vlog,
author: User
):
"""Create a notification for a new vlog."""
if user_id == author.id:
return # Don't notify creator
notification = Notification(
user_id=user_id,
type=NotificationType.NEW_VLOG,
title="Nouveau vlog",
message=f"{author.full_name} a publié un nouveau vlog : {vlog.title}",
link=f"/vlogs/{vlog.id}",
is_read=False,
created_at=datetime.utcnow()
)
db.add(notification)
db.commit()
return notification
@staticmethod
def create_system_notification(
db: Session,
user_id: int,
title: str,
message: str,
link: str = None
):
"""Create a system notification."""
notification = Notification(
user_id=user_id,
type=NotificationType.SYSTEM,
title=title,
message=message,
link=link,
is_read=False,
created_at=datetime.utcnow()
)
db.add(notification)
db.commit()
return notification

74
backend/utils/security.py Normal file
View File

@@ -0,0 +1,74 @@
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from config.settings import settings
from config.database import get_db
from models.user import User
from schemas.user import TokenData
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a plain password against a hashed password."""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Generate a password hash."""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create a JWT access token."""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.JWT_EXPIRATION_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
return encoded_jwt
def verify_token(token: str, credentials_exception) -> TokenData:
"""Verify a JWT token and return the token data."""
try:
payload = jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM])
user_id: int = payload.get("sub")
email: str = payload.get("email")
if user_id is None:
raise credentials_exception
token_data = TokenData(user_id=user_id, email=email)
return token_data
except JWTError:
raise credentials_exception
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)) -> User:
"""Get the current authenticated user."""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
token_data = verify_token(token, credentials_exception)
user = db.query(User).filter(User.id == token_data.user_id).first()
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)) -> User:
"""Get the current active user."""
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
async def get_admin_user(current_user: User = Depends(get_current_active_user)) -> User:
"""Get the current admin user."""
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user

View File

@@ -0,0 +1,92 @@
from sqlalchemy.orm import Session
from models.settings import SystemSettings
from typing import Optional, Dict, Any
import json
class SettingsService:
"""Service for managing system settings."""
@staticmethod
def get_setting(db: Session, key: str, default: Any = None) -> Any:
"""Get a setting value by key."""
setting = db.query(SystemSettings).filter(SystemSettings.key == key).first()
if not setting:
return default
# Essayer de convertir en type approprié
value = setting.value
# Booléens
if value.lower() in ['true', 'false']:
return value.lower() == 'true'
# Nombres entiers
try:
return int(value)
except ValueError:
pass
# Nombres flottants
try:
return float(value)
except ValueError:
pass
# JSON
if value.startswith('{') or value.startswith('['):
try:
return json.loads(value)
except json.JSONDecodeError:
pass
# Liste séparée par des virgules
if ',' in value and not value.startswith('{') and not value.startswith('['):
return [item.strip() for item in value.split(',')]
return value
@staticmethod
def get_upload_limits(db: Session) -> Dict[str, Any]:
"""Get all upload-related settings."""
return {
"max_album_size_mb": SettingsService.get_setting(db, "max_album_size_mb", 100),
"max_vlog_size_mb": SettingsService.get_setting(db, "max_vlog_size_mb", 500),
"max_image_size_mb": SettingsService.get_setting(db, "max_image_size_mb", 10),
"max_video_size_mb": SettingsService.get_setting(db, "max_video_size_mb", 100),
"allowed_image_types": SettingsService.get_setting(db, "allowed_image_types",
["image/jpeg", "image/png", "image/gif", "image/webp"]),
"allowed_video_types": SettingsService.get_setting(db, "allowed_video_types",
["video/mp4", "video/mpeg", "video/quicktime", "video/webm"])
}
@staticmethod
def get_max_upload_size(db: Session, content_type: str) -> int:
"""Get max upload size for a specific content type."""
if content_type.startswith('image/'):
max_size_mb = SettingsService.get_setting(db, "max_image_size_mb", 10)
max_size_bytes = max_size_mb * 1024 * 1024
print(f"DEBUG - Image upload limit: {max_size_mb}MB = {max_size_bytes} bytes")
return max_size_bytes
elif content_type.startswith('video/'):
max_size_mb = SettingsService.get_setting(db, "max_video_size_mb", 100)
max_size_bytes = max_size_mb * 1024 * 1024
print(f"DEBUG - Video upload limit: {max_size_mb}MB = {max_size_bytes} bytes")
return max_size_bytes
else:
default_size = 10 * 1024 * 1024 # 10MB par défaut
print(f"DEBUG - Default upload limit: 10MB = {default_size} bytes")
return default_size
@staticmethod
def is_file_type_allowed(db: Session, content_type: str) -> bool:
"""Check if a file type is allowed."""
if content_type.startswith('image/'):
allowed_types = SettingsService.get_setting(db, "allowed_image_types",
["image/jpeg", "image/png", "image/gif", "image/webp"])
elif content_type.startswith('video/'):
allowed_types = SettingsService.get_setting(db, "allowed_video_types",
["video/mp4", "video/mpeg", "video/quicktime", "video/webm"])
else:
return False
return content_type in allowed_types

View File

@@ -0,0 +1,94 @@
import cv2
import os
from pathlib import Path
from PIL import Image
import tempfile
def generate_video_thumbnail(video_path: str, output_path: str, frame_time: float = 1.0) -> bool:
"""
Generate a thumbnail from a video at a specific time.
Args:
video_path: Path to the video file
output_path: Path where to save the thumbnail
frame_time: Time in seconds to extract the frame (default: 1 second)
Returns:
bool: True if successful, False otherwise
"""
try:
# Open video file
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return False
# Get video properties
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if total_frames == 0:
return False
# Calculate frame number for the specified time
frame_number = min(int(fps * frame_time), total_frames - 1)
# Set position to the specified frame
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
# Read the frame
ret, frame = cap.read()
if not ret:
return False
# Convert BGR to RGB
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Convert to PIL Image
pil_image = Image.fromarray(frame_rgb)
# Resize to thumbnail size (400x400)
pil_image.thumbnail((400, 400), Image.Resampling.LANCZOS)
# Save thumbnail
pil_image.save(output_path, "JPEG", quality=85, optimize=True)
# Release video capture
cap.release()
return True
except Exception as e:
print(f"Error generating thumbnail: {e}")
return False
def get_video_duration(video_path: str) -> float:
"""
Get the duration of a video file in seconds.
Args:
video_path: Path to the video file
Returns:
float: Duration in seconds, or 0 if error
"""
try:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return 0
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
cap.release()
if fps > 0 and total_frames > 0:
return total_frames / fps
return 0
except Exception as e:
print(f"Error getting video duration: {e}")
return 0