171 lines
5.5 KiB
Python
171 lines
5.5 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
from config.database import get_db
|
|
from config.settings import settings
|
|
from models.notification import PushSubscription
|
|
from models.user import User
|
|
from schemas.notification import PushSubscriptionCreate, VapidPublicKeyResponse
|
|
from utils.security import get_current_active_user
|
|
from utils.push_service import is_push_configured, send_push_to_user
|
|
import base64
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
def get_vapid_public_key_for_web_push():
|
|
"""
|
|
Convertit la clé publique VAPID au format attendu par Web Push.
|
|
|
|
Les clés VAPID peuvent être au format:
|
|
1. Base64url brut (65 octets décodés) - Format attendu par Web Push
|
|
2. PEM/DER complet (commence par MFk...) - Doit être converti
|
|
|
|
Web Push attend la clé publique non compressée (65 octets = 0x04 + X + Y)
|
|
"""
|
|
public_key = settings.VAPID_PUBLIC_KEY
|
|
if not public_key:
|
|
return None
|
|
|
|
# Nettoyer la clé (enlever les éventuels espaces/newlines)
|
|
public_key = public_key.strip()
|
|
|
|
# Si la clé commence par MFk, c'est au format DER/SubjectPublicKeyInfo
|
|
# On doit extraire les 65 derniers octets
|
|
if public_key.startswith('MFk'):
|
|
try:
|
|
# Décoder le DER
|
|
# Ajouter le padding si nécessaire
|
|
padding = 4 - len(public_key) % 4
|
|
if padding != 4:
|
|
public_key += '=' * padding
|
|
der_bytes = base64.b64decode(public_key)
|
|
|
|
# La clé publique non compressée est les 65 derniers octets
|
|
# (SubjectPublicKeyInfo header + la clé)
|
|
if len(der_bytes) >= 65:
|
|
raw_key = der_bytes[-65:]
|
|
# Ré-encoder en base64url sans padding
|
|
return base64.urlsafe_b64encode(raw_key).rstrip(b'=').decode('ascii')
|
|
except Exception as e:
|
|
print(f"Erreur conversion clé VAPID: {e}")
|
|
return public_key
|
|
|
|
return public_key
|
|
|
|
|
|
@router.get("/vapid-public-key", response_model=VapidPublicKeyResponse)
|
|
async def get_vapid_public_key_endpoint(current_user: User = Depends(get_current_active_user)):
|
|
"""Get the VAPID public key for push notifications."""
|
|
public_key = get_vapid_public_key_for_web_push()
|
|
if not public_key:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
detail="VAPID keys not configured on server"
|
|
)
|
|
return {"public_key": public_key}
|
|
|
|
@router.post("/subscribe")
|
|
async def subscribe_push(
|
|
subscription: PushSubscriptionCreate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_active_user)
|
|
):
|
|
"""Subscribe to push notifications."""
|
|
# Check if subscription already exists
|
|
existing = db.query(PushSubscription).filter(
|
|
PushSubscription.endpoint == subscription.endpoint
|
|
).first()
|
|
|
|
if existing:
|
|
# Update existing subscription
|
|
existing.user_id = current_user.id
|
|
existing.p256dh = subscription.keys.p256dh
|
|
existing.auth = subscription.keys.auth
|
|
db.commit()
|
|
return {"message": "Subscription updated"}
|
|
|
|
# Create new subscription
|
|
new_sub = PushSubscription(
|
|
user_id=current_user.id,
|
|
endpoint=subscription.endpoint,
|
|
p256dh=subscription.keys.p256dh,
|
|
auth=subscription.keys.auth
|
|
)
|
|
db.add(new_sub)
|
|
db.commit()
|
|
|
|
return {"message": "Subscribed successfully"}
|
|
|
|
@router.delete("/unsubscribe")
|
|
async def unsubscribe_push(
|
|
endpoint: str,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_active_user)
|
|
):
|
|
"""Unsubscribe from push notifications."""
|
|
db.query(PushSubscription).filter(
|
|
PushSubscription.endpoint == endpoint
|
|
).delete()
|
|
db.commit()
|
|
|
|
return {"message": "Unsubscribed successfully"}
|
|
|
|
|
|
@router.post("/test")
|
|
async def test_push_notification(
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_active_user)
|
|
):
|
|
"""Send a test push notification to the current user."""
|
|
if not is_push_configured():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
detail="Push notifications not configured. Check VAPID keys."
|
|
)
|
|
|
|
# Vérifier que l'utilisateur a au moins un abonnement
|
|
sub_count = db.query(PushSubscription).filter(
|
|
PushSubscription.user_id == current_user.id
|
|
).count()
|
|
|
|
if sub_count == 0:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Aucun appareil enregistré pour les notifications push"
|
|
)
|
|
|
|
# Envoyer la notification de test
|
|
sent = send_push_to_user(
|
|
db,
|
|
current_user.id,
|
|
"🔔 Test de notification",
|
|
"Les notifications push fonctionnent correctement !",
|
|
"/"
|
|
)
|
|
|
|
return {
|
|
"message": f"Notification de test envoyée à {sent} appareil(s)",
|
|
"devices_registered": sub_count,
|
|
"sent_successfully": sent
|
|
}
|
|
|
|
|
|
@router.get("/status")
|
|
async def get_push_status(
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_active_user)
|
|
):
|
|
"""Get the push notification configuration status."""
|
|
sub_count = db.query(PushSubscription).filter(
|
|
PushSubscription.user_id == current_user.id
|
|
).count()
|
|
|
|
return {
|
|
"configured": is_push_configured(),
|
|
"vapid_public_key_set": bool(settings.VAPID_PUBLIC_KEY),
|
|
"vapid_private_key_set": bool(settings.VAPID_PRIVATE_KEY),
|
|
"user_subscriptions": sub_count
|
|
}
|
|
|
|
|