from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from config.database import get_db from config.settings import settings from models.notification import PushSubscription from models.user import User from schemas.notification import PushSubscriptionCreate, VapidPublicKeyResponse from utils.security import get_current_active_user from utils.push_service import is_push_configured, send_push_to_user import base64 router = APIRouter() def get_vapid_public_key_for_web_push(): """ Convertit la clé publique VAPID au format attendu par Web Push. Les clés VAPID peuvent être au format: 1. Base64url brut (65 octets décodés) - Format attendu par Web Push 2. PEM/DER complet (commence par MFk...) - Doit être converti Web Push attend la clé publique non compressée (65 octets = 0x04 + X + Y) """ public_key = settings.VAPID_PUBLIC_KEY if not public_key: return None # Nettoyer la clé (enlever les éventuels espaces/newlines) public_key = public_key.strip() # Si la clé commence par MFk, c'est au format DER/SubjectPublicKeyInfo # On doit extraire les 65 derniers octets if public_key.startswith('MFk'): try: # Décoder le DER # Ajouter le padding si nécessaire padding = 4 - len(public_key) % 4 if padding != 4: public_key += '=' * padding der_bytes = base64.b64decode(public_key) # La clé publique non compressée est les 65 derniers octets # (SubjectPublicKeyInfo header + la clé) if len(der_bytes) >= 65: raw_key = der_bytes[-65:] # Ré-encoder en base64url sans padding return base64.urlsafe_b64encode(raw_key).rstrip(b'=').decode('ascii') except Exception as e: print(f"Erreur conversion clé VAPID: {e}") return public_key return public_key @router.get("/vapid-public-key", response_model=VapidPublicKeyResponse) async def get_vapid_public_key_endpoint(current_user: User = Depends(get_current_active_user)): """Get the VAPID public key for push notifications.""" public_key = get_vapid_public_key_for_web_push() if not public_key: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="VAPID keys not configured on server" ) return {"public_key": public_key} @router.post("/subscribe") async def subscribe_push( subscription: PushSubscriptionCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_active_user) ): """Subscribe to push notifications.""" # Check if subscription already exists existing = db.query(PushSubscription).filter( PushSubscription.endpoint == subscription.endpoint ).first() if existing: # Update existing subscription existing.user_id = current_user.id existing.p256dh = subscription.keys.p256dh existing.auth = subscription.keys.auth db.commit() return {"message": "Subscription updated"} # Create new subscription new_sub = PushSubscription( user_id=current_user.id, endpoint=subscription.endpoint, p256dh=subscription.keys.p256dh, auth=subscription.keys.auth ) db.add(new_sub) db.commit() return {"message": "Subscribed successfully"} @router.delete("/unsubscribe") async def unsubscribe_push( endpoint: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_active_user) ): """Unsubscribe from push notifications.""" db.query(PushSubscription).filter( PushSubscription.endpoint == endpoint ).delete() db.commit() return {"message": "Unsubscribed successfully"} @router.post("/test") async def test_push_notification( db: Session = Depends(get_db), current_user: User = Depends(get_current_active_user) ): """Send a test push notification to the current user.""" if not is_push_configured(): raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Push notifications not configured. Check VAPID keys." ) # Vérifier que l'utilisateur a au moins un abonnement sub_count = db.query(PushSubscription).filter( PushSubscription.user_id == current_user.id ).count() if sub_count == 0: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Aucun appareil enregistré pour les notifications push" ) # Envoyer la notification de test sent = send_push_to_user( db, current_user.id, "🔔 Test de notification", "Les notifications push fonctionnent correctement !", "/" ) return { "message": f"Notification de test envoyée à {sent} appareil(s)", "devices_registered": sub_count, "sent_successfully": sent } @router.get("/status") async def get_push_status( db: Session = Depends(get_db), current_user: User = Depends(get_current_active_user) ): """Get the push notification configuration status.""" sub_count = db.query(PushSubscription).filter( PushSubscription.user_id == current_user.id ).count() return { "configured": is_push_configured(), "vapid_public_key_set": bool(settings.VAPID_PUBLIC_KEY), "vapid_private_key_set": bool(settings.VAPID_PRIVATE_KEY), "user_subscriptions": sub_count }