Files
digest_app/module_auth/api/views.py

386 lines
13 KiB
Python
Raw Normal View History

from datetime import datetime
import requests
from django.conf import settings
from django.contrib.auth import authenticate
2024-02-26 13:28:32 +05:30
from rest_framework import status
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
2024-02-26 13:28:32 +05:30
from rest_framework_simplejwt.authentication import JWTAuthentication
from module_iam.models import (AppVersion, IAmPrincipal, IAmPrincipalOtp,
IAmPrincipalSource, IAmPrincipalType)
2024-02-26 13:28:32 +05:30
from module_project import constants
from module_project.service import EmailService, SMSService
2024-02-26 13:28:32 +05:30
from module_project.utils import ApiResponse
2024-05-21 19:40:57 +05:30
from .serializers import (AppVersionSerializer, LoginSerializer, OtpVerificationSerializer,
PasswordResetSerializer, RegistrationSerializer)
from .utils import (AuthService, GoogleAuthService,
authticate_with_otp_and_passsword, blacklist_token,
generate_token_and_user_data, get_principal_by_email)
2024-02-26 13:28:32 +05:30
class RegistrationView(APIView):
authentication_classes = []
permission_classes = []
model = IAmPrincipal
serializer_class = RegistrationSerializer
def post(self, request):
serializer = self.serializer_class(data=request.data)
print(f"request data is {request.data}")
if not serializer.is_valid():
error_response = {
"status": status.HTTP_403_FORBIDDEN,
"message": constants.REGISTRATION_FAIL,
"errors": serializer.errors,
}
return ApiResponse.error(**error_response)
try:
instance = serializer.save()
instance.last_login = datetime.now()
instance.principal_type = IAmPrincipalType.get_principal_user()
instance.principal_source = IAmPrincipalSource.get_principal_app()
instance.save()
token_data = generate_token_and_user_data(instance)
2024-02-26 13:28:32 +05:30
except Exception as e:
return ApiResponse.error(
status=status.HTTP_403_FORBIDDEN, message=str(e), errors=str(e)
)
return ApiResponse.success(
message=constants.REGISTRATION_SUCCESS, data=token_data
)
2024-02-26 13:28:32 +05:30
class LoginView(APIView):
authentication_classes = []
permission_classes = []
model = IAmPrincipal
serializer_class = LoginSerializer
def post(self, request):
serializer = self.serializer_class(data=request.data)
if not serializer.is_valid():
error_response = {
"status": status.HTTP_403_FORBIDDEN,
"message": constants.LOGIN_FAIL,
"errors": serializer.errors,
}
return ApiResponse.error(**error_response)
email = request.data.get("email")
otp = request.data.get("otp")
password = request.data.get("password")
player_id = request.data.get("player_id")
2024-03-11 15:34:47 +05:30
try:
principal = IAmPrincipal.objects.get(email=email)
except IAmPrincipal.DoesNotExist:
error_response = {
"message": constants.INCORRECT_CREDENTIALS,
"errors": constants.INCORRECT_CREDENTIALS,
}
return ApiResponse.error(**error_response)
2024-02-26 13:28:32 +05:30
validation_result = authticate_with_otp_and_passsword(
principal, otp=otp, password=password
)
print("pasword instance ", validation_result)
if isinstance(validation_result, Response):
print("Errror reponse")
return validation_result # Return the error response if validation fails
try:
principal.player_id = player_id
principal.last_login = datetime.now()
2024-02-26 13:28:32 +05:30
principal.save()
except Exception as e:
error_response = {
"status": status.HTTP_500_INTERNAL_SERVER_ERROR,
"message": constants.INTERNAL_SERVER_ERROR,
"errors": str(e),
}
return ApiResponse.error(**error_response)
token_data = generate_token_and_user_data(principal)
return ApiResponse.success(message=constants.LOGIN_SUCCESS, data=token_data)
class LogoutView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
model = IAmPrincipal
def post(self, request):
token = request.data.get("refresh")
if not token:
return ApiResponse.error(message=constants.FAILURE, errors='Provide refresh token')
user = request.user
user.player_id = None
user.save()
blacklist_token(token)
return ApiResponse.success(message=constants.LOGOUT_SUCCESS)
2024-02-26 13:28:32 +05:30
class OtpRequestView(APIView):
authentication_classes = []
permission_classes = []
def post(self, request):
if "email" not in request.data:
return ApiResponse.error(
message=constants.EMAIL_REQUIRED, errors=constants.EMAIL_REQUIRED
)
2024-02-26 13:28:32 +05:30
print(f"email auth username: {settings.EMAIL_HOST_USER}")
email = request.data.get("email")
principal = get_principal_by_email(email=email)
if isinstance(principal, Response):
return principal
try:
# auth_service = AuthService(IAmPrincipal)
# principal = auth_service.get_principal_by_email(request.data.get("email"))
otp_code = SMSService().create_otp(
principal=principal, otp_purpose="Password Reset Request"
)
2024-02-26 13:28:32 +05:30
except Exception as e:
return ApiResponse.error(message=str(e), errors=str(e))
email_service = EmailService(
subject="Password Reset Request",
2024-02-26 13:28:32 +05:30
to=principal.email,
from_email=settings.EMAIL_HOST_USER,
2024-02-26 13:28:32 +05:30
)
# Send the email
try:
email_service.load_template(
"module_auth/otp_email_template.html", context={"code": otp_code, "name": principal.first_name}
)
2024-02-26 13:28:32 +05:30
email_service.send()
except Exception as e:
return ApiResponse.error(
message=f"Error sending email: {str(e)}", errors=str(e)
)
2024-02-26 13:28:32 +05:30
return ApiResponse.success(message=constants.SUCCESS)
2024-02-26 13:28:32 +05:30
class OTPVerificationView(APIView):
authentication_classes = []
permission_classes = []
serializer_class = OtpVerificationSerializer
def post(self, request):
serializer = self.serializer_class(data=request.data)
if not serializer.is_valid():
error_response = {
"status": status.HTTP_403_FORBIDDEN,
"message": constants.VALIDATION_ERROR,
"errors": serializer.errors,
}
return ApiResponse.error(**error_response)
2024-02-26 13:28:32 +05:30
email = serializer.validated_data.get("email")
otp = serializer.validated_data.get("otp")
principal = get_principal_by_email(email=email)
if isinstance(principal, Response):
return principal
validation_result = authticate_with_otp_and_passsword(principal, otp=otp)
2024-02-26 13:28:32 +05:30
print("pasword instance ", validation_result)
if isinstance(validation_result, Response):
print("Errror reponse")
return validation_result # Return the error response if validation fails
return ApiResponse.success(message=constants.SUCCESS)
2024-02-26 13:28:32 +05:30
class ForgetPasswordView(APIView):
authentication_classes = []
permission_classes = []
2024-02-26 13:28:32 +05:30
serializer_class = PasswordResetSerializer
def post(self, request):
email = request.data.get("email")
print("email for password reset", email)
principal = get_principal_by_email(email=email)
if isinstance(principal, Response):
return principal
otp_instance = IAmPrincipalOtp.objects.filter(principal=principal, is_used=True).last()
if not otp_instance:
return ApiResponse.error(message=constants.PASSWORD_RESET_SESSION_EXPIRE)
if otp_instance.is_expired():
return ApiResponse.error(message=constants.PASSWORD_RESET_SESSION_EXPIRE)
serializer = self.serializer_class(principal, data=request.data)
2024-02-26 13:28:32 +05:30
if not serializer.is_valid():
error_response = {
"status": status.HTTP_403_FORBIDDEN,
"message": constants.VALIDATION_ERROR,
"errors": serializer.errors,
}
return ApiResponse.error(**error_response)
try:
serializer.save()
except Exception as e:
return ApiResponse.error(message=str(e), errors=str(e))
return ApiResponse.success(message=constants.SUCCESS)
class AccountDeactivateView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
def delete(self, request):
try:
user = IAmPrincipal.objects.get(id=request.user.id)
user.is_active = False
user.deleted = True
user.save()
except Exception as e:
return ApiResponse.error(message=constants.INTERNAL_SERVER_ERROR, errors=str(e))
return ApiResponse.success(message=constants.ACCOUNT_DEACTIVATED)
class GoogleSignin(APIView):
authentication_classes = []
permission_classes = []
def post(self, request):
try:
access_token = request.data["access_token"]
player_id = request.data["player_id"]
user_info = GoogleAuthService.get_user_info(access_token)
print(f"User Info : {user_info} and player id is {player_id}")
# Authenticate user with the email provided by Google
user = IAmPrincipal.objects.filter(email=user_info['email']).first(
) or authenticate(email=user_info['email'], password=None)
if user:
# Update the player_id for the existing user
IAmPrincipal.objects.filter(email=user_info['email']).update(player_id=player_id)
else:
# Create a new user if not found
user = IAmPrincipal.objects.create_user(
username=user_info['email'],
email=user_info['email'],
first_name=f"{user_info['given_name']} {user_info['family_name']}",
last_login=datetime.now(),
player_id=player_id,
principal_type=IAmPrincipalType.get_principal_user(),
principal_source=IAmPrincipalSource.get_principal_google()
)
user.save()
token_data = generate_token_and_user_data(user)
# return Response({"token": token.key}, status=status.HTTP_200_OK)
return ApiResponse.success(
message=constants.SUCCESS, data=token_data
)
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
import jwt
class AppleSignin(APIView):
authentication_classes = []
permission_classes = []
def post(self, request):
try:
authorization_code = request.data['authorization_code']
headers = {
'Authorization': f"Bearer {settings.SOCIAL_AUTH_APPLE_CLIENT_SECRET}"
}
response = requests.post(
'https://appleid.apple.com/auth/token',
data={
'client_id': settings.SOCIAL_AUTH_APPLE_CLIENT_ID,
'code': authorization_code,
'grant_type': 'authorization_code',
'redirect_uri': False,
},
headers=headers,
)
response_data = response.json()
id_token = response_data.get('id_token')
decoded = jwt.decode(
id_token,
'',
algorithms=['ES256'],
options={
'verify_aud': False,
'verify_exp': False,
'verify_iat': False,
},
)
email = decoded.get('email')
full_name = f"{decoded.get('given_name')} {decoded.get('family_name')}"
if IAmPrincipal.objects.filter(email=email).exists():
user = IAmPrincipal.objects.get(email=email)
else:
user = IAmPrincipal.objects.create_user(
username=email,
email=email,
first_name=full_name,
)
user.save()
token_data = generate_token_and_user_data(user)
return ApiResponse.success(
message=constants.SUCCESS, data=token_data
)
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
class VersionCheck(APIView):
2024-05-21 19:40:57 +05:30
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
2024-04-04 16:27:43 +05:30
model = AppVersion
def get(self, request, *args, **kwargs):
2024-05-21 19:40:57 +05:30
print("app version is called")
2024-04-04 16:27:43 +05:30
device_type = request.GET.get('deviceType')
if not device_type:
2024-05-21 19:40:57 +05:30
return ApiResponse.error(message=constants.FAILURE, errors="device type is required")
# Query the database to retrieve the upgrade flags based on the app version
2024-05-21 19:40:57 +05:30
version = self.model.objects.filter(device_type=device_type).last()
version_data = AppVersionSerializer(version)
return ApiResponse.success(message=constants.SUCCESS, data=version_data.data)