""" Service d'envoi de notifications push via Web Push (VAPID). """ from typing import Optional, Dict, Any from sqlalchemy.orm import Session from config.settings import settings from models.notification import PushSubscription # Import conditionnel de pywebpush try: from pywebpush import webpush, WebPushException WEBPUSH_AVAILABLE = True except ImportError: WEBPUSH_AVAILABLE = False print("⚠️ pywebpush non installé - Les notifications push sont désactivées") def is_push_configured() -> bool: """Vérifie si les notifications push sont configurées.""" return ( WEBPUSH_AVAILABLE and bool(settings.VAPID_PRIVATE_KEY) and bool(settings.VAPID_PUBLIC_KEY) ) def send_push_to_user( db: Session, user_id: int, title: str, body: str, link: str = "/", data: Optional[Dict[str, Any]] = None ) -> int: """ Envoie une notification push à tous les appareils d'un utilisateur. Args: db: Session de base de données user_id: ID de l'utilisateur title: Titre de la notification body: Corps de la notification link: Lien vers lequel rediriger data: Données supplémentaires Returns: Nombre de notifications envoyées avec succès """ if not is_push_configured(): return 0 # Récupérer tous les abonnements de l'utilisateur subscriptions = db.query(PushSubscription).filter( PushSubscription.user_id == user_id ).all() if not subscriptions: return 0 success_count = 0 failed_endpoints = [] import json payload = json.dumps({ "title": title, "body": body, "link": link, "data": data or {} }) vapid_claims = { "sub": settings.VAPID_CLAIMS_EMAIL } for sub in subscriptions: try: webpush( subscription_info={ "endpoint": sub.endpoint, "keys": { "p256dh": sub.p256dh, "auth": sub.auth } }, data=payload, vapid_private_key=settings.VAPID_PRIVATE_KEY, vapid_claims=vapid_claims ) success_count += 1 print(f"✅ Push envoyé à {sub.endpoint[:50]}...") except WebPushException as e: print(f"❌ Erreur push pour {sub.endpoint[:50]}...: {e}") # Si l'abonnement est expiré ou invalide, on le marque pour suppression if e.response and e.response.status_code in [404, 410]: failed_endpoints.append(sub.endpoint) except Exception as e: print(f"❌ Erreur inattendue push: {type(e).__name__}: {e}") # Supprimer les abonnements invalides if failed_endpoints: db.query(PushSubscription).filter( PushSubscription.endpoint.in_(failed_endpoints) ).delete(synchronize_session=False) db.commit() print(f"🗑️ Supprimé {len(failed_endpoints)} abonnements invalides") return success_count def send_push_to_users( db: Session, user_ids: list, title: str, body: str, link: str = "/", data: Optional[Dict[str, Any]] = None ) -> int: """ Envoie une notification push à plusieurs utilisateurs. Args: db: Session de base de données user_ids: Liste des IDs utilisateurs title: Titre de la notification body: Corps de la notification link: Lien vers lequel rediriger data: Données supplémentaires Returns: Nombre total de notifications envoyées avec succès """ total = 0 for user_id in user_ids: total += send_push_to_user(db, user_id, title, body, link, data) return total