first commit
This commit is contained in:
183
module_auth/api/utils.py
Normal file
183
module_auth/api/utils.py
Normal file
@@ -0,0 +1,183 @@
|
||||
from typing import Optional
|
||||
from module_project import constants
|
||||
from module_project.utils import ApiResponse
|
||||
from module_iam.models import IAmPrincipal, IAmPrincipalOtp
|
||||
from rest_framework_simplejwt.tokens import RefreshToken
|
||||
from django.core.exceptions import ValidationError
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def generate_token_and_user_data(principal):
|
||||
"""
|
||||
Generate a token and user data based on an 'IAmPrincipal' object.
|
||||
|
||||
Args:
|
||||
principal (IAmPrincipal): The user object.
|
||||
|
||||
Returns:
|
||||
dict: A dictionary containing token data and user information.
|
||||
"""
|
||||
refresh = RefreshToken.for_user(principal)
|
||||
data = {
|
||||
"access": str(refresh.access_token),
|
||||
"refresh": str(refresh),
|
||||
"first_name": principal.first_name,
|
||||
"phone_no": str(principal.phone_no),
|
||||
"complete": principal.register_complete,
|
||||
}
|
||||
return data
|
||||
|
||||
class AuthService:
|
||||
"""
|
||||
Provides authentication services for IAmPrincipal users.
|
||||
"""
|
||||
|
||||
def __init__(self, principal_model, otp_model=None):
|
||||
self.principal_model = principal_model
|
||||
self.otp_model = otp_model
|
||||
|
||||
def authenticate(self, principal_id: str, otp: Optional[str] = None, password: Optional[str] = None) -> None:
|
||||
"""
|
||||
Authenticates a principal using OTP and/or password.
|
||||
|
||||
Raises:
|
||||
AuthenticationError: If authentication fails.
|
||||
"""
|
||||
try:
|
||||
principal = self.get_principal(principal_id)
|
||||
self.validate_principal_status(principal)
|
||||
self.validate_credentials_provided(otp, password)
|
||||
|
||||
if otp:
|
||||
self.authenticate_with_otp(principal, otp)
|
||||
else:
|
||||
self.authenticate_with_password(principal, password)
|
||||
except ValidationError as e:
|
||||
raise e
|
||||
except Exception as e:
|
||||
logger.error(f"Authentication failed for principal {principal_id}: {str(e)}")
|
||||
raise ValidationError("Authentication failed") from e
|
||||
|
||||
def get_principal(self, principal_id: str) -> IAmPrincipal:
|
||||
"""Retrieves a principal instance by ID."""
|
||||
try:
|
||||
return self.principal_model.objects.get(pk=principal_id)
|
||||
except self.principal_model.DoesNotExist:
|
||||
raise ValidationError("Invalid principal ID")
|
||||
|
||||
def get_principal_by_email(self, email: str):
|
||||
"""Retrieves a principal instance by email."""
|
||||
try:
|
||||
return self.principal_model.objects.get(email=email)
|
||||
except self.principal_model.DoesNotExist:
|
||||
raise ValidationError(constants.EMAIL_NOT_REGISTERED)
|
||||
|
||||
def validate_principal_status(self, principal: IAmPrincipal) -> None:
|
||||
"""Validates that the principal is active."""
|
||||
if not principal.is_active:
|
||||
raise ValidationError(constants.ACCOUNT_DEACTIVATED)
|
||||
|
||||
def validate_credentials_provided(self, otp: Optional[str], password: Optional[str]) -> None:
|
||||
"""Ensures that at least one of OTP or password is provided."""
|
||||
if otp is None and password is None:
|
||||
raise ValidationError(constants.OTP_OR_PASSWORD_REQUIRED)
|
||||
|
||||
def authenticate_with_otp(self, principal: IAmPrincipal, otp: str) -> None:
|
||||
"""Authenticates using OTP."""
|
||||
if self.otp_model is None:
|
||||
raise ValidationError("OTP authentication is not supported")
|
||||
|
||||
otp_instance = self.otp_model.objects.filter(principal=principal, otp_code=otp).last()
|
||||
if not otp_instance:
|
||||
raise ValidationError(constants.OTP_INVALID)
|
||||
|
||||
if otp_instance.is_expired():
|
||||
raise ValidationError(constants.OTP_EXPIRED)
|
||||
|
||||
otp_instance.is_used = True
|
||||
otp_instance.save()
|
||||
|
||||
def authenticate_with_password(self, principal: IAmPrincipal, password: str) -> None:
|
||||
"""Authenticates using password."""
|
||||
if not principal.check_password(password):
|
||||
raise ValidationError(constants.INVALID_PASSWORD)
|
||||
|
||||
|
||||
def authticate_with_otp_and_passsword(principal: IAmPrincipal, otp=None, password=None):
|
||||
"""
|
||||
Authenticate a principal using OTP and/or Password.
|
||||
|
||||
Parameters:
|
||||
- principal (User): The principal object to authenticate.
|
||||
- otp (str, optional): One-Time Password (OTP). Default is None.
|
||||
- password (str, optional): User's password. Default is None.
|
||||
|
||||
Returns:
|
||||
None: Successful authentication.
|
||||
Response: Error response if authentication fails.
|
||||
|
||||
Example:
|
||||
```
|
||||
principal = User.objects.get(phone_no='8987546598')
|
||||
otp = request.data.get("otp")
|
||||
password = request.data.get("password")
|
||||
|
||||
result = authenticate_with_otp_and_password(principal, otp, password)
|
||||
if isinstance(result, Response):
|
||||
return result # Authentication failed, return error response
|
||||
else:
|
||||
# Authentication successful, proceed with authorized actions.
|
||||
```
|
||||
"""
|
||||
|
||||
if not principal.is_active:
|
||||
return ApiResponse.error(
|
||||
message=constants.ACCOUNT_DEACTIVATED, errors=constants.ACCOUNT_DEACTIVATED
|
||||
)
|
||||
|
||||
# Ensure that either OTP or password is provided
|
||||
if otp is None and password is None:
|
||||
return ApiResponse.error(
|
||||
message=constants.OTP_OR_PASSWORD_REQUIRED, errors=constants.OTP_OR_PASSWORD_REQUIRED
|
||||
)
|
||||
|
||||
if otp:
|
||||
otp_instance = IAmPrincipalOtp.objects.filter(
|
||||
principal=principal, otp_code=otp
|
||||
).last()
|
||||
|
||||
if not otp_instance:
|
||||
return ApiResponse.error(
|
||||
message=constants.OTP_INVALID, errors=constants.OTP_INVALID
|
||||
)
|
||||
|
||||
if otp_instance.is_expired():
|
||||
return ApiResponse.error(
|
||||
message=constants.OTP_EXPIRED, errors=constants.OTP_EXPIRED
|
||||
)
|
||||
|
||||
otp_instance.is_used = True
|
||||
otp_instance.save()
|
||||
|
||||
elif password:
|
||||
print(password)
|
||||
if not principal.check_password(password):
|
||||
return ApiResponse.error(
|
||||
message=constants.INVALID_PASSWORD, errors=constants.INVALID_PASSWORD
|
||||
)
|
||||
print("after passsowrd", password)
|
||||
|
||||
return None
|
||||
|
||||
def get_principal_by_email(email: str):
|
||||
try:
|
||||
principal = IAmPrincipal.objects.get(email=email)
|
||||
return principal
|
||||
except IAmPrincipal.DoesNotExist:
|
||||
error_response = {
|
||||
"message": constants.EMAIL_NOT_REGISTERED,
|
||||
"errors": constants.EMAIL_NOT_REGISTERED,
|
||||
}
|
||||
return ApiResponse.error(**error_response)
|
||||
Reference in New Issue
Block a user