398 lines
14 KiB
Python
398 lines
14 KiB
Python
from datetime import datetime
|
|
|
|
import requests
|
|
from django.conf import settings
|
|
from django.contrib.auth import authenticate
|
|
from rest_framework import status
|
|
from rest_framework.permissions import IsAuthenticated
|
|
from rest_framework.response import Response
|
|
from rest_framework.views import APIView
|
|
from rest_framework_simplejwt.authentication import JWTAuthentication
|
|
|
|
from module_iam.models import (AppVersion, IAmPrincipal, IAmPrincipalOtp,
|
|
IAmPrincipalSource, IAmPrincipalType)
|
|
from module_project import constants
|
|
from module_project.service import EmailService, SMSService
|
|
from module_project.utils import ApiResponse
|
|
|
|
from .serializers import (LoginSerializer, OtpVerificationSerializer,
|
|
PasswordResetSerializer, RegistrationSerializer)
|
|
from .utils import (AuthService, GoogleAuthService,
|
|
authticate_with_otp_and_passsword, blacklist_token,
|
|
generate_token_and_user_data, get_principal_by_email)
|
|
|
|
|
|
class RegistrationView(APIView):
|
|
authentication_classes = []
|
|
permission_classes = []
|
|
model = IAmPrincipal
|
|
serializer_class = RegistrationSerializer
|
|
|
|
def post(self, request):
|
|
serializer = self.serializer_class(data=request.data)
|
|
print(f"request data is {request.data}")
|
|
if not serializer.is_valid():
|
|
error_response = {
|
|
"status": status.HTTP_403_FORBIDDEN,
|
|
"message": constants.REGISTRATION_FAIL,
|
|
"errors": serializer.errors,
|
|
}
|
|
return ApiResponse.error(**error_response)
|
|
|
|
try:
|
|
instance = serializer.save()
|
|
instance.last_login = datetime.now()
|
|
instance.principal_type = IAmPrincipalType.get_principal_user()
|
|
instance.principal_source = IAmPrincipalSource.get_principal_app()
|
|
instance.save()
|
|
token_data = generate_token_and_user_data(instance)
|
|
except Exception as e:
|
|
return ApiResponse.error(
|
|
status=status.HTTP_403_FORBIDDEN, message=str(e), errors=str(e)
|
|
)
|
|
|
|
return ApiResponse.success(
|
|
message=constants.REGISTRATION_SUCCESS, data=token_data
|
|
)
|
|
|
|
|
|
class LoginView(APIView):
|
|
authentication_classes = []
|
|
permission_classes = []
|
|
model = IAmPrincipal
|
|
serializer_class = LoginSerializer
|
|
|
|
def post(self, request):
|
|
serializer = self.serializer_class(data=request.data)
|
|
if not serializer.is_valid():
|
|
error_response = {
|
|
"status": status.HTTP_403_FORBIDDEN,
|
|
"message": constants.LOGIN_FAIL,
|
|
"errors": serializer.errors,
|
|
}
|
|
return ApiResponse.error(**error_response)
|
|
|
|
email = request.data.get("email")
|
|
otp = request.data.get("otp")
|
|
password = request.data.get("password")
|
|
player_id = request.data.get("player_id")
|
|
|
|
try:
|
|
principal = IAmPrincipal.objects.get(email=email)
|
|
except IAmPrincipal.DoesNotExist:
|
|
error_response = {
|
|
"message": constants.INCORRECT_CREDENTIALS,
|
|
"errors": constants.INCORRECT_CREDENTIALS,
|
|
}
|
|
return ApiResponse.error(**error_response)
|
|
|
|
validation_result = authticate_with_otp_and_passsword(
|
|
principal, otp=otp, password=password
|
|
)
|
|
print("pasword instance ", validation_result)
|
|
|
|
if isinstance(validation_result, Response):
|
|
print("Errror reponse")
|
|
return validation_result # Return the error response if validation fails
|
|
|
|
try:
|
|
principal.player_id = player_id
|
|
principal.last_login = datetime.now()
|
|
principal.save()
|
|
except Exception as e:
|
|
error_response = {
|
|
"status": status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
"message": constants.INTERNAL_SERVER_ERROR,
|
|
"errors": str(e),
|
|
}
|
|
return ApiResponse.error(**error_response)
|
|
|
|
token_data = generate_token_and_user_data(principal)
|
|
return ApiResponse.success(message=constants.LOGIN_SUCCESS, data=token_data)
|
|
|
|
|
|
class LogoutView(APIView):
|
|
authentication_classes = [JWTAuthentication]
|
|
permission_classes = [IsAuthenticated]
|
|
model = IAmPrincipal
|
|
|
|
def post(self, request):
|
|
token = request.data.get("refresh")
|
|
if not token:
|
|
return ApiResponse.error(message=constants.FAILURE, errors='Provide refresh token')
|
|
|
|
user = request.user
|
|
user.player_id = None
|
|
user.save()
|
|
|
|
blacklist_token(token)
|
|
|
|
return ApiResponse.success(message=constants.LOGOUT_SUCCESS)
|
|
|
|
|
|
class OtpRequestView(APIView):
|
|
authentication_classes = []
|
|
permission_classes = []
|
|
|
|
def post(self, request):
|
|
if "email" not in request.data:
|
|
return ApiResponse.error(
|
|
message=constants.EMAIL_REQUIRED, errors=constants.EMAIL_REQUIRED
|
|
)
|
|
print(f"email auth username: {settings.EMAIL_HOST_USER}")
|
|
email = request.data.get("email")
|
|
|
|
principal = get_principal_by_email(email=email)
|
|
|
|
if isinstance(principal, Response):
|
|
return principal
|
|
|
|
try:
|
|
# auth_service = AuthService(IAmPrincipal)
|
|
# principal = auth_service.get_principal_by_email(request.data.get("email"))
|
|
|
|
otp_code = SMSService().create_otp(
|
|
principal=principal, otp_purpose="Password Reset Request"
|
|
)
|
|
|
|
except Exception as e:
|
|
return ApiResponse.error(message=str(e), errors=str(e))
|
|
|
|
email_service = EmailService(
|
|
subject="Password Reset Request",
|
|
to=principal.email,
|
|
from_email=settings.EMAIL_HOST_USER,
|
|
)
|
|
|
|
# Send the email
|
|
try:
|
|
email_service.load_template(
|
|
"module_auth/otp_email_template.html", context={"code": otp_code, "name": principal.first_name}
|
|
)
|
|
email_service.send()
|
|
except Exception as e:
|
|
return ApiResponse.error(
|
|
message=f"Error sending email: {str(e)}", errors=str(e)
|
|
)
|
|
|
|
return ApiResponse.success(message=constants.SUCCESS)
|
|
|
|
|
|
class OTPVerificationView(APIView):
|
|
authentication_classes = []
|
|
permission_classes = []
|
|
serializer_class = OtpVerificationSerializer
|
|
|
|
def post(self, request):
|
|
serializer = self.serializer_class(data=request.data)
|
|
if not serializer.is_valid():
|
|
error_response = {
|
|
"status": status.HTTP_403_FORBIDDEN,
|
|
"message": constants.VALIDATION_ERROR,
|
|
"errors": serializer.errors,
|
|
}
|
|
return ApiResponse.error(**error_response)
|
|
|
|
email = serializer.validated_data.get("email")
|
|
otp = serializer.validated_data.get("otp")
|
|
|
|
principal = get_principal_by_email(email=email)
|
|
|
|
if isinstance(principal, Response):
|
|
return principal
|
|
|
|
validation_result = authticate_with_otp_and_passsword(principal, otp=otp)
|
|
print("pasword instance ", validation_result)
|
|
|
|
if isinstance(validation_result, Response):
|
|
print("Errror reponse")
|
|
return validation_result # Return the error response if validation fails
|
|
|
|
return ApiResponse.success(message=constants.SUCCESS)
|
|
|
|
|
|
class ForgetPasswordView(APIView):
|
|
authentication_classes = []
|
|
permission_classes = []
|
|
serializer_class = PasswordResetSerializer
|
|
|
|
def post(self, request):
|
|
email = request.data.get("email")
|
|
print("email for password reset", email)
|
|
principal = get_principal_by_email(email=email)
|
|
|
|
if isinstance(principal, Response):
|
|
return principal
|
|
|
|
otp_instance = IAmPrincipalOtp.objects.filter(principal=principal, is_used=True).last()
|
|
|
|
if not otp_instance:
|
|
return ApiResponse.error(message=constants.PASSWORD_RESET_SESSION_EXPIRE)
|
|
|
|
if otp_instance.is_expired():
|
|
return ApiResponse.error(message=constants.PASSWORD_RESET_SESSION_EXPIRE)
|
|
|
|
serializer = self.serializer_class(principal, data=request.data)
|
|
if not serializer.is_valid():
|
|
error_response = {
|
|
"status": status.HTTP_403_FORBIDDEN,
|
|
"message": constants.VALIDATION_ERROR,
|
|
"errors": serializer.errors,
|
|
}
|
|
return ApiResponse.error(**error_response)
|
|
|
|
try:
|
|
serializer.save()
|
|
except Exception as e:
|
|
return ApiResponse.error(message=str(e), errors=str(e))
|
|
|
|
return ApiResponse.success(message=constants.SUCCESS)
|
|
|
|
|
|
class AccountDeactivateView(APIView):
|
|
authentication_classes = [JWTAuthentication]
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
def delete(self, request):
|
|
try:
|
|
user = IAmPrincipal.objects.get(id=request.user.id)
|
|
user.is_active = False
|
|
user.deleted = True
|
|
user.save()
|
|
except Exception as e:
|
|
return ApiResponse.error(message=constants.INTERNAL_SERVER_ERROR, errors=str(e))
|
|
|
|
return ApiResponse.success(message=constants.ACCOUNT_DEACTIVATED)
|
|
|
|
|
|
class GoogleSignin(APIView):
|
|
authentication_classes = []
|
|
permission_classes = []
|
|
|
|
def post(self, request):
|
|
try:
|
|
access_token = request.data["access_token"]
|
|
player_id = request.data["player_id"]
|
|
user_info = GoogleAuthService.get_user_info(access_token)
|
|
|
|
print(f"User Info : {user_info} and player id is {player_id}")
|
|
|
|
# Authenticate user with the email provided by Google
|
|
user = IAmPrincipal.objects.filter(email=user_info['email']).first(
|
|
) or authenticate(email=user_info['email'], password=None)
|
|
|
|
if user:
|
|
# Update the player_id for the existing user
|
|
IAmPrincipal.objects.filter(email=user_info['email']).update(player_id=player_id)
|
|
else:
|
|
# Create a new user if not found
|
|
user = IAmPrincipal.objects.create_user(
|
|
username=user_info['email'],
|
|
email=user_info['email'],
|
|
first_name=f"{user_info['given_name']} {user_info['family_name']}",
|
|
last_login=datetime.now(),
|
|
player_id=player_id,
|
|
principal_type=IAmPrincipalType.get_principal_user(),
|
|
principal_source=IAmPrincipalSource.get_principal_google()
|
|
)
|
|
user.save()
|
|
|
|
token_data = generate_token_and_user_data(user)
|
|
|
|
# return Response({"token": token.key}, status=status.HTTP_200_OK)
|
|
return ApiResponse.success(
|
|
message=constants.SUCCESS, data=token_data
|
|
)
|
|
|
|
except Exception as e:
|
|
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
|
|
|
|
|
|
import jwt
|
|
|
|
|
|
class AppleSignin(APIView):
|
|
authentication_classes = []
|
|
permission_classes = []
|
|
|
|
def post(self, request):
|
|
try:
|
|
authorization_code = request.data['authorization_code']
|
|
headers = {
|
|
'Authorization': f"Bearer {settings.SOCIAL_AUTH_APPLE_CLIENT_SECRET}"
|
|
}
|
|
|
|
response = requests.post(
|
|
'https://appleid.apple.com/auth/token',
|
|
data={
|
|
'client_id': settings.SOCIAL_AUTH_APPLE_CLIENT_ID,
|
|
'code': authorization_code,
|
|
'grant_type': 'authorization_code',
|
|
'redirect_uri': False,
|
|
},
|
|
headers=headers,
|
|
)
|
|
|
|
response_data = response.json()
|
|
id_token = response_data.get('id_token')
|
|
|
|
decoded = jwt.decode(
|
|
id_token,
|
|
'',
|
|
algorithms=['ES256'],
|
|
options={
|
|
'verify_aud': False,
|
|
'verify_exp': False,
|
|
'verify_iat': False,
|
|
},
|
|
)
|
|
email = decoded.get('email')
|
|
full_name = f"{decoded.get('given_name')} {decoded.get('family_name')}"
|
|
if IAmPrincipal.objects.filter(email=email).exists():
|
|
user = IAmPrincipal.objects.get(email=email)
|
|
else:
|
|
user = IAmPrincipal.objects.create_user(
|
|
username=email,
|
|
email=email,
|
|
first_name=full_name,
|
|
)
|
|
user.save()
|
|
|
|
token_data = generate_token_and_user_data(user)
|
|
|
|
return ApiResponse.success(
|
|
message=constants.SUCCESS, data=token_data
|
|
)
|
|
|
|
except Exception as e:
|
|
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
|
|
|
|
|
|
class VersionCheck(APIView):
|
|
authentication_classes = []
|
|
permission_classes = []
|
|
model = AppVersion
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
app_version = request.GET.get('appVersion')
|
|
device_type = request.GET.get('deviceType')
|
|
|
|
if not app_version or not device_type:
|
|
return ApiResponse.error(message=constants.FAILURE, errors="App version and device type is required")
|
|
# Query the database to retrieve the upgrade flags based on the app version
|
|
try:
|
|
version = self.model.objects.get(version=app_version)
|
|
except self.model.DoesNotExist:
|
|
version = None
|
|
|
|
if version:
|
|
upgrade_flags = {
|
|
'forceUpgrade': version.force_upgrade,
|
|
'recommendUpgrade': version.recommend_upgrade,
|
|
}
|
|
else:
|
|
upgrade_flags = {
|
|
'forceUpgrade': False,
|
|
'recommendUpgrade': False,
|
|
}
|
|
return ApiResponse.success(message=constants.SUCCESS, data=upgrade_flags) |