from datetime import datetime from rest_framework import status from rest_framework.views import APIView from rest_framework.permissions import IsAuthenticated from rest_framework_simplejwt.authentication import JWTAuthentication from module_project import constants from module_project.service import SMSService, EmailService from module_project.utils import ApiResponse from .utils import AuthService, GoogleAuthService from django.contrib.auth import authenticate import requests from module_iam.models import AppVersion, IAmPrincipal, IAmPrincipalOtp, IAmPrincipalType, IAmPrincipalSource from .serializers import ( RegistrationSerializer, LoginSerializer, OtpVerificationSerializer, PasswordResetSerializer, ) from django.conf import settings from rest_framework.response import Response from .utils import ( generate_token_and_user_data, get_principal_by_email, authticate_with_otp_and_passsword, ) class RegistrationView(APIView): authentication_classes = [] permission_classes = [] model = IAmPrincipal serializer_class = RegistrationSerializer def post(self, request): serializer = self.serializer_class(data=request.data) print(f"request data is {request.data}") if not serializer.is_valid(): error_response = { "status": status.HTTP_403_FORBIDDEN, "message": constants.REGISTRATION_FAIL, "errors": serializer.errors, } return ApiResponse.error(**error_response) try: instance = serializer.save() instance.last_login = datetime.now() instance.principal_type = IAmPrincipalType.get_principal_user() instance.principal_source = IAmPrincipalSource.get_principal_app() instance.save() token_data = generate_token_and_user_data(instance) except Exception as e: return ApiResponse.error( status=status.HTTP_403_FORBIDDEN, message=str(e), errors=str(e) ) return ApiResponse.success( message=constants.REGISTRATION_SUCCESS, data=token_data ) class LoginView(APIView): authentication_classes = [] permission_classes = [] model = IAmPrincipal serializer_class = LoginSerializer def post(self, request): serializer = self.serializer_class(data=request.data) if not serializer.is_valid(): error_response = { "status": status.HTTP_403_FORBIDDEN, "message": constants.LOGIN_FAIL, "errors": serializer.errors, } return ApiResponse.error(**error_response) email = request.data.get("email") otp = request.data.get("otp") password = request.data.get("password") player_id = request.data.get("player_id") try: principal = IAmPrincipal.objects.get(email=email) except IAmPrincipal.DoesNotExist: error_response = { "message": constants.INCORRECT_CREDENTIALS, "errors": constants.INCORRECT_CREDENTIALS, } return ApiResponse.error(**error_response) validation_result = authticate_with_otp_and_passsword( principal, otp=otp, password=password ) print("pasword instance ", validation_result) if isinstance(validation_result, Response): print("Errror reponse") return validation_result # Return the error response if validation fails try: principal.player_id = player_id principal.last_login = datetime.now() principal.save() except Exception as e: error_response = { "status": status.HTTP_500_INTERNAL_SERVER_ERROR, "message": constants.INTERNAL_SERVER_ERROR, "errors": str(e), } return ApiResponse.error(**error_response) token_data = generate_token_and_user_data(principal) return ApiResponse.success(message=constants.LOGIN_SUCCESS, data=token_data) class OtpRequestView(APIView): authentication_classes = [] permission_classes = [] def post(self, request): if "email" not in request.data: return ApiResponse.error( message=constants.EMAIL_REQUIRED, errors=constants.EMAIL_REQUIRED ) print(f"email auth username: {settings.EMAIL_HOST_USER}") email = request.data.get("email") principal = get_principal_by_email(email=email) if isinstance(principal, Response): return principal try: # auth_service = AuthService(IAmPrincipal) # principal = auth_service.get_principal_by_email(request.data.get("email")) otp_code = SMSService().create_otp( principal=principal, otp_purpose="Forget password" ) except Exception as e: return ApiResponse.error(message=str(e), errors=str(e)) email_service = EmailService( subject="Forget Password", to=principal.email, from_email=settings.EMAIL_HOST_USER, ) # Send the email try: email_service.load_template( "module_auth/email_template.html", context={"code": otp_code, "name": principal.first_name} ) email_service.send() except Exception as e: return ApiResponse.error( message=f"Error sending email: {str(e)}", errors=str(e) ) return ApiResponse.success(message=constants.SUCCESS) class OTPVerificationView(APIView): authentication_classes = [] permission_classes = [] serializer_class = OtpVerificationSerializer def post(self, request): serializer = self.serializer_class(data=request.data) if not serializer.is_valid(): error_response = { "status": status.HTTP_403_FORBIDDEN, "message": constants.VALIDATION_ERROR, "errors": serializer.errors, } return ApiResponse.error(**error_response) email = serializer.validated_data.get("email") otp = serializer.validated_data.get("otp") principal = get_principal_by_email(email=email) if isinstance(principal, Response): return principal validation_result = authticate_with_otp_and_passsword(principal, otp=otp) print("pasword instance ", validation_result) if isinstance(validation_result, Response): print("Errror reponse") return validation_result # Return the error response if validation fails return ApiResponse.success(message=constants.SUCCESS) class ForgetPasswordView(APIView): authentication_classes = [JWTAuthentication] permission_classes = [IsAuthenticated] serializer_class = PasswordResetSerializer def post(self, request): email = request.data.get("email") principal = get_principal_by_email(email=email) otp_instance = IAmPrincipalOtp.objects.filter(principal=principal).last() if not otp_instance: return ApiResponse.error(message=constants.SESSION_EXPIRED) if otp_instance.is_expired(): return ApiResponse.error(message=constants.SESSION_EXPIRED) serializer = self.serializer_class(request.user, data=request.data) if not serializer.is_valid(): error_response = { "status": status.HTTP_403_FORBIDDEN, "message": constants.VALIDATION_ERROR, "errors": serializer.errors, } return ApiResponse.error(**error_response) try: serializer.save() except Exception as e: return ApiResponse.error(message=str(e), errors=str(e)) return ApiResponse.success(message=constants.SUCCESS) class AccountDeactivateView(APIView): authentication_classes = [JWTAuthentication] permission_classes = [IsAuthenticated] def delete(self, request): try: user = IAmPrincipal.objects.get(id=request.user.id) user.is_active = False user.deleted = True user.save() except Exception as e: return ApiResponse.error(message=constants.INTERNAL_SERVER_ERROR, errors=str(e)) return ApiResponse.success(message=constants.ACCOUNT_DEACTIVATED) class GoogleSignin(APIView): authentication_classes = [] permission_classes = [] def post(self, request): try: access_token = request.data["access_token"] user_info = GoogleAuthService.get_user_info(access_token) print(f"User Info : {user_info}") # Authenticate user with the email provided by Google user = IAmPrincipal.objects.filter(email=user_info['email']).first( ) or authenticate(email=user_info['email'], password=None) if user is None: # Create a new user if not found user = IAmPrincipal.objects.create_user( username=user_info['email'], email=user_info['email'], first_name=f"{user_info['given_name']} {user_info['family_name']}", last_login=datetime.now(), principal_type=IAmPrincipalType.get_principal_user(), principal_source=IAmPrincipalSource.get_principal_google() ) user.save() token_data = generate_token_and_user_data(user) # return Response({"token": token.key}, status=status.HTTP_200_OK) return ApiResponse.success( message=constants.SUCCESS, data=token_data ) except Exception as e: return ApiResponse.error(message=constants.FAILURE, errors=str(e)) import jwt class AppleSignin(APIView): authentication_classes = [] permission_classes = [] def post(self, request): try: authorization_code = request.data['authorization_code'] headers = { 'Authorization': f"Bearer {settings.SOCIAL_AUTH_APPLE_CLIENT_SECRET}" } response = requests.post( 'https://appleid.apple.com/auth/token', data={ 'client_id': settings.SOCIAL_AUTH_APPLE_CLIENT_ID, 'code': authorization_code, 'grant_type': 'authorization_code', 'redirect_uri': False, }, headers=headers, ) response_data = response.json() id_token = response_data.get('id_token') decoded = jwt.decode( id_token, '', algorithms=['ES256'], options={ 'verify_aud': False, 'verify_exp': False, 'verify_iat': False, }, ) email = decoded.get('email') full_name = f"{decoded.get('given_name')} {decoded.get('family_name')}" if IAmPrincipal.objects.filter(email=email).exists(): user = IAmPrincipal.objects.get(email=email) else: user = IAmPrincipal.objects.create_user( username=email, email=email, first_name=full_name, ) user.save() token_data = generate_token_and_user_data(user) return ApiResponse.success( message=constants.SUCCESS, data=token_data ) except Exception as e: return ApiResponse.error(message=constants.FAILURE, errors=str(e)) class VersionCheck(APIView): authentication_classes = [] permission_classes = [] def get(self, request, *args, **kwargs): app_version = request.GET.get('appVersion') # Query the database to retrieve the upgrade flags based on the app version try: version = AppVersion.objects.get(version=app_version) except AppVersion.DoesNotExist: version = None if version: upgrade_flags = { 'forceUpgrade': version.force_upgrade, 'recommendUpgrade': version.recommend_upgrade, } else: upgrade_flags = { 'forceUpgrade': False, 'recommendUpgrade': False, } return ApiResponse.success(message=constants.SUCCESS, data=upgrade_flags)