# ============================================================ # app/core/security.py - JWT & Password Management # ============================================================ import jwt from datetime import datetime, timedelta, timezone from passlib.context import CryptContext import logging from app.config import settings from typing import Optional, Dict, Any from datetime import datetime, timedelta, timezone import logging from app.config import settings from typing import Optional, Dict, Any logger = logging.getLogger(__name__) # ============================================================ # Password Hashing # ============================================================ pwd_context = CryptContext( schemes=["bcrypt"], deprecated="auto", bcrypt__rounds=settings.BCRYPT_ROUNDS, ) def hash_password(password: str) -> str: """Hash password using bcrypt""" try: return pwd_context.hash(password) except Exception as e: logger.error(f"Error hashing password: {str(e)}") raise def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify password against hash""" try: return pwd_context.verify(plain_password, hashed_password) except Exception as e: logger.error(f"Error verifying password: {str(e)}") return False # ============================================================ # JWT Token Management # ============================================================ def create_token( data: Dict[str, Any], expires_in_days: Optional[int] = None, expires_in_minutes: Optional[int] = None, ) -> str: """ Create JWT token with expiration Args: data: Payload data expires_in_days: Token expiry in days expires_in_minutes: Token expiry in minutes Returns: Encoded JWT token """ try: to_encode = data.copy() # Calculate expiration if expires_in_days: expire = datetime.now(timezone.utc) + timedelta(days=expires_in_days) elif expires_in_minutes: expire = datetime.now(timezone.utc) + timedelta(minutes=expires_in_minutes) else: # No expiration expire = None if expire: to_encode.update({"exp": expire}) encoded_jwt = jwt.encode( to_encode, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM, ) logger.info(f"Token created for user: {data.get('user_id', 'unknown')}") return encoded_jwt except Exception as e: logger.error(f"Error creating token: {str(e)}") raise def verify_token(token: str) -> Optional[Dict[str, Any]]: """ Verify and decode JWT token Args: token: JWT token string Returns: Decoded token payload or None if invalid """ try: payload = jwt.decode( token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM], ) return payload except jwt.ExpiredSignatureError: logger.warning("Token has expired") return None except jwt.InvalidTokenError as e: logger.warning(f"Invalid token: {str(e)}") return None except Exception as e: logger.error(f"Error verifying token: {str(e)}") return None # ============================================================ # Token Creation Helpers # ============================================================ def create_login_token(user_id: str, email: Optional[str], phone: Optional[str], role: str) -> str: """Create long-lived login token (60 days)""" return create_token( data={ "user_id": user_id, "email": email, "phone": phone, "role": role, "type": "access", }, expires_in_days=settings.JWT_LOGIN_EXPIRY_DAYS, ) def create_reset_token(identifier: str) -> str: """Create short-lived password reset token (10 minutes)""" return create_token( data={ "identifier": identifier, "purpose": "password_reset", "type": "reset", }, expires_in_minutes=settings.JWT_RESET_EXPIRY_MINUTES, ) def create_signup_token(user_id: str, email: Optional[str], phone: Optional[str], role: str) -> str: """Create signup verification token (10 minutes)""" return create_token( data={ "user_id": user_id, "email": email, "phone": phone, "role": role, "type": "signup", }, expires_in_minutes=10, ) # ============================================================ # Token Validation Helpers # ============================================================ def decode_reset_token(token: str) -> Optional[Dict[str, Any]]: """Decode and validate reset token""" payload = verify_token(token) if not payload or payload.get("purpose") != "password_reset": logger.warning("Invalid or expired reset token") return None return payload def decode_access_token(token: str) -> Optional[Dict[str, Any]]: """Decode and validate access token""" payload = verify_token(token) if not payload or payload.get("type") != "access": logger.warning("Invalid or expired access token") return None return payload