import logging from typing import Optional import requests from django.core.exceptions import ValidationError from rest_framework_simplejwt.tokens import RefreshToken, TokenError from rest_framework_simplejwt.exceptions import TokenError from module_iam.models import IAmPrincipal, IAmPrincipalOtp from module_project import constants from module_project.utils import ApiResponse logger = logging.getLogger(__name__) def generate_token_and_user_data(principal): """ Generate a token and user data based on an 'IAmPrincipal' object. Args: principal (IAmPrincipal): The user object. Returns: dict: A dictionary containing token data and user information. """ refresh = RefreshToken.for_user(principal) data = { "access": str(refresh.access_token), "refresh": str(refresh), "complete": principal.register_complete, } return data def blacklist_token(token): try: RefreshToken(token).blacklist() print("token is blacklisted") except TokenError: print("error occurs") pass class GoogleAuthService(): @staticmethod def get_user_info(access_token): headers = {'Authorization': f'Bearer {access_token}'} response = requests.get( 'https://www.googleapis.com/oauth2/v3/userinfo', headers=headers, ) user_info = response.json() return user_info class AuthService: """ Provides authentication services for IAmPrincipal users. """ def __init__(self, principal_model, otp_model=None): self.principal_model = principal_model self.otp_model = otp_model def authenticate(self, principal_id: str, otp: Optional[str] = None, password: Optional[str] = None) -> None: """ Authenticates a principal using OTP and/or password. Raises: AuthenticationError: If authentication fails. """ try: principal = self.get_principal(principal_id) self.validate_principal_status(principal) self.validate_credentials_provided(otp, password) if otp: self.authenticate_with_otp(principal, otp) else: self.authenticate_with_password(principal, password) except ValidationError as e: raise e except Exception as e: logger.error(f"Authentication failed for principal {principal_id}: {str(e)}") raise ValidationError("Authentication failed") from e def get_principal(self, principal_id: str) -> IAmPrincipal: """Retrieves a principal instance by ID.""" try: return self.principal_model.objects.get(pk=principal_id) except self.principal_model.DoesNotExist: raise ValidationError("Invalid principal ID") def get_principal_by_email(self, email: str): """Retrieves a principal instance by email.""" try: return self.principal_model.objects.get(email=email) except self.principal_model.DoesNotExist: raise ValidationError(constants.EMAIL_NOT_REGISTERED) def validate_principal_status(self, principal: IAmPrincipal) -> None: """Validates that the principal is active.""" if not principal.is_active: raise ValidationError(constants.ACCOUNT_DEACTIVATED) def validate_credentials_provided(self, otp: Optional[str], password: Optional[str]) -> None: """Ensures that at least one of OTP or password is provided.""" if otp is None and password is None: raise ValidationError(constants.OTP_OR_PASSWORD_REQUIRED) def authenticate_with_otp(self, principal: IAmPrincipal, otp: str) -> None: """Authenticates using OTP.""" if self.otp_model is None: raise ValidationError("OTP authentication is not supported") otp_instance = self.otp_model.objects.filter(principal=principal, otp_code=otp).last() if not otp_instance: raise ValidationError(constants.OTP_INVALID) if otp_instance.is_expired(): raise ValidationError(constants.OTP_EXPIRED) otp_instance.is_used = True otp_instance.save() def authenticate_with_password(self, principal: IAmPrincipal, password: str) -> None: """Authenticates using password.""" if not principal.check_password(password): raise ValidationError(constants.INVALID_PASSWORD) def authticate_with_otp_and_passsword(principal: IAmPrincipal, otp=None, password=None): """ Authenticate a principal using OTP and/or Password. Parameters: - principal (User): The principal object to authenticate. - otp (str, optional): One-Time Password (OTP). Default is None. - password (str, optional): User's password. Default is None. Returns: None: Successful authentication. Response: Error response if authentication fails. Example: ``` principal = User.objects.get(phone_no='8987546598') otp = request.data.get("otp") password = request.data.get("password") result = authenticate_with_otp_and_password(principal, otp, password) if isinstance(result, Response): return result # Authentication failed, return error response else: # Authentication successful, proceed with authorized actions. ``` """ if not principal.is_active: return ApiResponse.error( message=constants.ACCOUNT_DEACTIVATED, errors=constants.ACCOUNT_DEACTIVATED ) # Ensure that either OTP or password is provided if otp is None and password is None: return ApiResponse.error( message=constants.OTP_OR_PASSWORD_REQUIRED, errors=constants.OTP_OR_PASSWORD_REQUIRED ) if otp: otp_instance = IAmPrincipalOtp.objects.filter( principal=principal, otp_code=otp, is_used=False ).last() if not otp_instance: return ApiResponse.error( message=constants.OTP_INVALID, errors=constants.OTP_INVALID ) if otp_instance.is_expired(): return ApiResponse.error( message=constants.OTP_EXPIRED, errors=constants.OTP_EXPIRED ) otp_instance.is_used = True otp_instance.save() elif password: print(password) if not principal.check_password(password): return ApiResponse.error( message=constants.INCORRECT_CREDENTIALS, errors=constants.INCORRECT_CREDENTIALS ) print("after passsowrd", password) return None def get_principal_by_email(email: str): try: principal = IAmPrincipal.objects.get(email=email) return principal except IAmPrincipal.DoesNotExist: error_response = { "message": constants.EMAIL_NOT_REGISTERED, "errors": constants.EMAIL_NOT_REGISTERED, } return ApiResponse.error(**error_response)