Files
goodtimes/accounts/api/views.py
2024-05-27 16:03:41 +05:30

971 lines
36 KiB
Python

import json
from django.db import transaction
from django.http import JsonResponse
from django.shortcuts import get_object_or_404
from django.utils import timezone
import jwt
from rest_framework import status
from rest_framework.views import APIView
from rest_framework_simplejwt.tokens import RefreshToken
from django.conf import settings
import requests
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
# from .authenticate import authticate_with_otp_and_passsword
from accounts.models import (
AppVersion,
IAmPrincipal,
IAmPrincipalOtp,
IAmPrincipalSource,
IAmPrincipalType,
# IAmPrincipalKYCDetails,
)
from manage_events.models import PrincipalPreference
from manage_referrals.models import ReferralCode, ReferralRecord
from goodtimes import constants
from goodtimes.services import SMSError, SMSService, EmailService
# from nifty11_project.services import SMSError, SMSService
from goodtimes.utils import ApiResponse
from accounts.resource_action import (
PRINCIPAL_TYPE_EVENT_USER,
PRINCIPAL_TYPE_EVENT_MANAGER,
PRINCIPAL_TYPE_FREE_USER,
)
from .serializers import (
AppVersionSerializer,
EmailSerializers,
# RegistrationPasswordSerializer,
# PhoneSerializer,
EmailSerializer,
PlayerIDSerializer,
RegistrationPasswordSerializer,
RegistrationSerializer,
ReferralCodeSerializer,
ReferralRecordSerializer,
ProfileSerializer,
PasswordResetSerializer,
# PrincipalKYCDetailsSerializer,
)
from .utils import (
generate_token_and_user_data,
authticate_with_otp_and_passsword,
get_principal_by_email,
)
from rest_framework.permissions import IsAuthenticated
from rest_framework_simplejwt.authentication import JWTAuthentication
from rest_framework.response import Response
class RegistrationEmailView(APIView):
authentication_classes = []
permission_classes = []
model = IAmPrincipal
@transaction.atomic
def post(self, request):
serializer = EmailSerializer(data=request.data)
if not serializer.is_valid():
error_response = {
"status": status.HTTP_400_BAD_REQUEST,
"message": constants.REGISTRATION_FAIL,
"errors": serializer.errors,
}
return ApiResponse.error(**error_response)
email = serializer.validated_data.get("email")
principal_type = serializer.validated_data.get("principal_type")
print("principal_type: ", principal_type)
principal = None # Declare user variable outside of try-except blocks
# default_email = f"nifty{phone_no[-10:]}@gmail.com"
default_password = f"GTMES{email[::-1]}"
try:
principal = self.model.objects.get(email=email)
principal_type = IAmPrincipalType.get_principal_type(principal_type)
print(f"principal_type in try {principal_type}")
if principal.email_verified:
return ApiResponse.error(
message=constants.EMAIL_EXISTS,
errors=constants.EMAIL_EXISTS,
status=status.HTTP_403_FORBIDDEN,
)
# Update the existing user (unverified) with a default email and password
# principal.email = email
principal.principal_type = principal_type
principal.username = email
principal.set_password(default_password)
principal.save()
except self.model.DoesNotExist:
# Phone number doesn't exist, create a new user
principal = self.model.objects.create(email=email, username=email)
principal_type = IAmPrincipalType.get_principal_type(principal_type)
principal.principal_type = principal_type
principal.set_password(f"GTMES{email[::-1]}")
principal.save()
except Exception as e:
return ApiResponse.error(message=str(e), errors=str(e))
try:
# Send OTP to the user
otp = SMSService().create_otp(
principal=principal, opt_purpose="registration"
)
# Create an instance of the EmailService
email_service = EmailService(
subject="Good Times - OTP",
to=[email],
from_email=settings.EMAIL_HOST_USER,
)
email_service.load_template(
"otp/otp.html", context={"OTP": otp, "action": "Register"}
)
# Send the email
email_service.send()
print("Email sent successfully!")
except SMSError as e:
return ApiResponse.error(message=e.message, errors=e.message)
return ApiResponse.success(
message=constants.OTP_SENT, data=int(otp)
) # this will change
class RegistrationDetailsView(APIView):
authentication_classes = []
permission_classes = []
@transaction.atomic
def post(self, request):
"""
Handle User Registration and Referral.
This view processes user registration, validates data, creates new users, and tracks referrals if a referral code is provided. It returns success or error responses based on the registration outcome.
Parameters:
- request (HttpRequest): HTTP request object with registration data.
Returns:
- ApiResponse: Registration success or error response.
Process:
1. Extract data and validate registration.
2. Create a new user and OTP.
3. Generate referral codes for users.
4. Track referrals if a referral code is provided and not alreay register thorugh referral.
5. Return a response indicating success or error.
"""
# Extract the phone number from the request data
# phone_no = request.data.get("phone_no")
email = request.data.get("email")
print("email: ", email)
referral_code = request.data.get("referral_code")
player_id = request.data.get("player_id")
print("referral_code", referral_code)
# Filter the user instance by phone number through reusable function
principal = get_principal_by_email(email)
if isinstance(principal, Response):
return principal # returning error here as it is error instance
# Validate the incoming data using the serializer
serializer = RegistrationSerializer(principal, data=request.data)
if not serializer.is_valid():
error_response = {
"status": status.HTTP_400_BAD_REQUEST,
"message": constants.VALIDATION_ERROR,
"errors": serializer.errors,
}
return ApiResponse.error(**error_response)
# Save the principal and related OTP
try:
instance = serializer.save()
instance.register_complete = True
if player_id:
instance.player_id = player_id
instance.save()
except Exception as e:
print("instance: E-", e)
error_response = {
"status": status.HTTP_500_INTERNAL_SERVER_ERROR,
"message": constants.INTERNAL_SERVER_ERROR,
"errors": str(e),
}
return ApiResponse.error(**error_response)
# generating referrall_code of the player and merchant
try:
ReferralCode.create_referral_code_for_user_manager(
principal=principal, principal_type=principal.principal_type
)
except Exception as e:
print("ReferralCode: E-", e)
error_response = {
"status": status.HTTP_400_BAD_REQUEST,
"message": constants.FAILURE,
"errors": str(e),
}
return ApiResponse.error(**error_response)
if referral_code:
already_register_through_referral = ReferralRecord.objects.filter(
referred_principal=instance
).exists()
if not already_register_through_referral:
try:
whos_referral_code = ReferralCode.objects.get(
referral_code=referral_code
)
except Exception as e:
print("whos_referral_code: E-", e)
error_response = {
"status": status.HTTP_404_NOT_FOUND,
"message": constants.RECORD_NOT_FOUND,
"errors": str(e),
}
return ApiResponse.error(**error_response)
# principal_type = IAmPrincipalType.objects
ReferralRecord.objects.create(
referrer_principal=whos_referral_code.principal, # principal id of the User who invited
referred_principal=instance, # principal id of the User who join through invitation
principal_type=whos_referral_code.principal_type, # principal type of the user who invited
is_completed=True,
)
token_data = generate_token_and_user_data(principal)
token_data["type"] = str(principal.principal_type)
return ApiResponse.success(
message="Details Added Successfully.", data=token_data
)
class RegistrationPasswordView(APIView):
authentication_classes = []
permission_classes = []
def post(self, request):
email = request.data.get("email")
# type = request.data.get("type")
# Filter the user instance by phone number through reusable function
principal = get_principal_by_email(email)
if isinstance(principal, Response):
return principal # returning error here as it is error instance
serializer = RegistrationPasswordSerializer(principal, data=request.data)
if not serializer.is_valid():
error_response = {
"status": status.HTTP_400_BAD_REQUEST,
"message": constants.VALIDATION_ERROR,
"errors": serializer.errors,
}
return ApiResponse.error(**error_response)
try:
principal = serializer.save()
# principal.register_complete = True
principal.save()
except Exception as e:
error_response = {
"status": status.HTTP_500_INTERNAL_SERVER_ERROR,
"message": constants.INTERNAL_SERVER_ERROR,
"errors": str(e),
}
return ApiResponse.error(**error_response)
token_data = generate_token_and_user_data(principal)
token_data["type"] = str(principal.principal_type)
return ApiResponse.success(
message="Password updated successfully.", data=token_data
)
class OtpRequestView(APIView):
authentication_classes = []
permission_classes = []
def post(self, request, *args, **kwargs):
serializer = EmailSerializers(data=request.data)
if not serializer.is_valid():
error_response = {
"status": status.HTTP_400_BAD_REQUEST,
"message": constants.VALIDATION_ERROR,
"errors": serializer.errors,
}
return ApiResponse.error(**error_response)
email = serializer.validated_data.get("email")
# Filter the user instance by phone number through reusable function
principal = get_principal_by_email(email)
if isinstance(principal, Response):
return principal # returning error here as it is error instance
try:
otp = SMSService().create_otp(principal=principal, opt_purpose="login")
# Create an instance of the EmailService
email_service = EmailService(
subject="Good Times - OTP",
to=[email],
from_email=settings.EMAIL_HOST_USER,
)
email_service.load_template(
"otp/otp.html", context={"OTP": otp, "action": "Login"}
)
# Send the email
email_service.send()
except SMSError as e:
return ApiResponse.error(message=e.message, errors=e.message)
return ApiResponse.success(message=constants.OTP_SENT, data=int(otp))
class OTPVerificationView(APIView):
authentication_classes = []
permission_classes = []
def post(self, request):
serializer = EmailSerializers(data=request.data)
if not serializer.is_valid():
error_response = {
"status": status.HTTP_400_BAD_REQUEST,
"message": "Validation error",
"errors": serializer.errors,
}
return ApiResponse.error(**error_response)
email = serializer.validated_data.get("email")
otp = request.data.get("otp")
# Filter the user instance by phon
# ]
# 4\\\\\\\\\\\\\\\\\\\\\\\7e number through reusable function
principal = get_principal_by_email(email)
if isinstance(principal, Response):
return principal # returning error here as it is error instance
validation_result = authticate_with_otp_and_passsword(principal, otp=otp)
if isinstance(validation_result, Response):
return validation_result # Return the error response if validation fails
try:
principal.email_verified = True # set the phone_verified to true
principal.save()
except Exception as e:
error_response = {
"status": status.HTTP_500_INTERNAL_SERVER_ERROR,
"message": constants.INTERNAL_SERVER_ERROR,
"errors": str(e),
}
return ApiResponse.error(**error_response)
return ApiResponse.success(message=constants.OTP_VERIFIED)
class LoginView(APIView):
authentication_classes = []
permission_classes = []
def post(self, request, *args, **kwargs):
serializer = EmailSerializers(data=request.data)
if not serializer.is_valid():
return Response(
{
"message": "Validation error",
"errors": serializer.errors,
},
status=status.HTTP_400_BAD_REQUEST,
)
email = serializer.validated_data.get("email")
otp = request.data.get("otp")
password = request.data.get("password")
principal = get_principal_by_email(email)
if isinstance(principal, Response):
return principal # If get_principal_by_email returns a Response object, it's an error response.
if not principal.is_active:
return Response(
{
"message": constants.ACCOUNT_DEACTIVATED,
"errors": constants.ACCOUNT_DEACTIVATED,
},
status=status.HTTP_403_FORBIDDEN,
)
if not otp and not password:
return Response(
{
"message": constants.OTP_OR_PASSWORD_REQUIRED,
"errors": constants.OTP_OR_PASSWORD_REQUIRED,
},
status=status.HTTP_400_BAD_REQUEST,
)
if otp:
return self._process_otp_login(principal, otp)
elif password:
return self._process_password_login(principal, password)
return Response(
{"message": constants.LOGIN_FAIL}, status=status.HTTP_400_BAD_REQUEST
)
def _process_otp_login(self, principal, otp):
otp_instance = IAmPrincipalOtp.objects.filter(
principal=principal, otp_code=otp
).last()
if not otp_instance or otp_instance.is_expired():
return Response(
{
"message": (
constants.OTP_INVALID
if not otp_instance
else constants.OTP_EXPIRED
),
"errors": (
constants.OTP_INVALID
if not otp_instance
else constants.OTP_EXPIRED
),
},
status=status.HTTP_400_BAD_REQUEST,
)
otp_instance.is_used = True
otp_instance.save()
principal.email_verified = True
principal.save()
return self._login_success(principal)
def _process_password_login(self, principal, password):
if not principal.check_password(password):
return Response(
{
"message": constants.INVALID_PASSWORD,
"errors": constants.INVALID_PASSWORD,
},
status=status.HTTP_400_BAD_REQUEST,
)
return self._login_success(principal)
def _login_success(self, principal):
try:
# principal.email_verified = True
principal.last_login = timezone.localtime(timezone.now())
principal.save()
except Exception as e:
return Response(
{
"message": constants.INTERNAL_SERVER_ERROR,
"errors": str(e),
},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
token_data = generate_token_and_user_data(principal)
token_data["preference"] = PrincipalPreference.objects.filter(
principal=principal
).exists()
return Response(
{
"message": constants.LOGIN_SUCCESS,
"data": token_data,
},
status=status.HTTP_200_OK,
)
class ProfileView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
model = IAmPrincipal
serializer = ProfileSerializer
def get_object(self):
return self.request.user
def get(self, request, *args, **kwargs):
instance = self.get_object()
context = {"request": request}
# context = {"principal_type": kwargs.get("principal_type"), "request": request}
serializer = self.serializer(instance, context=context)
success_response = {
"status": status.HTTP_200_OK,
"message": constants.SUCCESS,
"data": serializer.data,
}
return ApiResponse.success(**success_response)
def post(self, request, *args, **kwargs):
instance = self.get_object()
serializer = self.serializer(
instance, data=request.data, context={"request": request}
)
if not serializer.is_valid():
error_response = {
"status": status.HTTP_400_BAD_REQUEST,
"message": constants.VALIDATION_ERROR,
"errors": serializer.errors,
}
return ApiResponse.error(**error_response)
try:
serializer.save()
except Exception as e:
return ApiResponse.error(
message=constants.INTERNAL_SERVER_ERROR, errors=str(e)
)
return ApiResponse.success(message=constants.SUCCESS, data=serializer.data)
class ProfilePasswordResetView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
model = IAmPrincipal
serializer = PasswordResetSerializer
def post(self, request, *args, **kwargs):
current_password = request.data.get("current_password")
principal_obj = request.user
print(f"current password is {current_password}")
if current_password is None:
return ApiResponse.error(
message="Current password is required",
errors="Current password is required",
)
if not principal_obj.check_password(current_password):
return ApiResponse.error(
message="Invalid current password.", errors="Invalid current password."
)
serializer = self.serializer(instance=principal_obj, data=request.data)
if not serializer.is_valid():
error_response = {
"status": status.HTTP_400_BAD_REQUEST,
"message": "Validation error",
"errors": serializer.errors,
}
return ApiResponse.error(**error_response)
try:
serializer.save()
except Exception as e:
error_response = {
"status": status.HTTP_400_BAD_REQUEST,
"message": constants.INTERNAL_SERVER_ERROR,
"errors": str(e),
}
return ApiResponse.error(**error_response)
return ApiResponse.success(message=constants.SUCCESS)
# class KycDocumentView(APIView):
# authentication_classes = [JWTAuthentication]
# permission_classes = [IsAuthenticated]
# model = IAmPrincipalKYCDetails
# serializer = PrincipalKYCDetailsSerializer
# def get_object(self):
# try:
# return self.model.objects.get(principal=self.request.user)
# except self.model.DoesNotExist:
# return None
# def get(self, request, *args, **kwargs):
# instance = self.get_object()
# serializer = self.serializer(context={"request": request})
# if instance is not None:
# serializer.instance = instance # Update that instance if record exist
# success_response = {
# "status": status.HTTP_200_OK,
# "message": constants.SUCCESS,
# "data": serializer.data,
# }
# return ApiResponse.success(**success_response)
# def post(self, request, *args, **kwargs):
# instance = self.get_object()
# serializer = self.serializer(
# data=request.data, context={"request": request}
# ) # passing request context to update the principal from serializer
# if instance is not None:
# serializer.instance = instance # Update that instance if record exist
# if not serializer.is_valid():
# error_response = {
# "status": status.HTTP_400_BAD_REQUEST,
# "message": constants.VALIDATION_ERROR,
# "errors": serializer.errors,
# }
# return ApiResponse.error(**error_response)
# try:
# serializer.save()
# except Exception as e:
# return ApiResponse.error(
# message=constants.INTERNAL_SERVER_ERROR, errors=str(e)
# )
# return ApiResponse.success(message=constants.SUCCESS)
class GoogleLoginAPIView(APIView):
authentication_classes = []
permission_classes = []
def post(self, request, *args, **kwargs):
access_token = request.data.get("access_token")
principal_type = request.data.get("principal_type")
referral_code = request.data.get("referral_code")
if not access_token or not principal_type:
return Response(
{"error": "Access token & Principal Type is required"},
status=status.HTTP_400_BAD_REQUEST,
)
principal_type_instance = IAmPrincipalType.objects.filter(
name=principal_type
).first()
if principal_type_instance is None:
return Response(
{"error": "No Principal Type Exists"},
status=status.HTTP_400_BAD_REQUEST,
)
principal_info = self.get_google_user_data(access_token)
print("principal_info: ", principal_info)
# Check if there was an error in fetching user data
if "error" in principal_info:
error_message = principal_info.get("error", {}).get(
"error_description", "An error occurred while fetching user data."
)
return Response(
{"error": error_message},
status=principal_info.get("status", status.HTTP_400_BAD_REQUEST),
)
if not principal_info:
return Response(
{"error": "Failed to fetch user details or invalid access token"},
status=status.HTTP_400_BAD_REQUEST,
)
email = principal_info.get("email")
if not email:
return Response(
{"error": "Email is required but not provided."},
status=status.HTTP_400_BAD_REQUEST,
)
print("email: ", email)
with transaction.atomic():
google_source, _ = IAmPrincipalSource.objects.get_or_create(name="google")
principal = IAmPrincipal.objects.filter(email=email).first()
if principal:
if not principal.is_active:
return Response(
{
"message": constants.ACCOUNT_DEACTIVATED,
"errors": constants.ACCOUNT_DEACTIVATED,
},
status=status.HTTP_403_FORBIDDEN,
)
# Existing user: Update necessary fields
principal.first_name = principal_info.get(
"given_name", principal.first_name
)
principal.last_name = principal_info.get(
"family_name", principal.last_name
)
principal.email_verified = True
principal.principal_source = google_source
# Update any other fields that might change on each login
principal.save()
print("Updated existing user")
message = "Already Registered and Verified User"
else:
defaults = {
"first_name": principal_info.get("given_name", ""),
"last_name": principal_info.get("family_name", ""),
"email": email,
"register_complete": True,
"email_verified": True,
"username": email, # Or generate a unique username if necessary
"principal_source": google_source,
}
defaults["principal_type"] = principal_type_instance
principal = IAmPrincipal(**defaults)
default_password = f"GTMES{email[::-1]}"
principal.set_password(default_password)
principal.save()
print("Created new user")
message = "Details updated"
try:
ReferralCode.create_referral_code_for_user_manager(
principal=principal, principal_type=principal.principal_type
)
except Exception as e:
print("ReferralCode: E-", e)
error_response = {
"status": status.HTTP_400_BAD_REQUEST,
"message": constants.FAILURE,
"errors": str(e),
}
return ApiResponse.error(**error_response)
if referral_code:
already_register_through_referral = ReferralRecord.objects.filter(
referred_principal=principal
).exists()
if not already_register_through_referral:
try:
whos_referral_code = ReferralCode.objects.get(
referral_code=referral_code
)
except Exception as e:
print("whos_referral_code: E-", e)
error_response = {
"status": status.HTTP_404_NOT_FOUND,
"message": constants.RECORD_NOT_FOUND,
"errors": str(e),
}
return ApiResponse.error(**error_response)
# principal_type = IAmPrincipalType.objects
ReferralRecord.objects.create(
referrer_principal=whos_referral_code.principal, # principal id of the User who invited
referred_principal=principal, # principal id of the User who join through invitation
principal_type=whos_referral_code.principal_type, # principal type of the user who invited
is_completed=True,
)
token_data = generate_token_and_user_data(principal)
token_data["type"] = str(principal.principal_type)
token_data["preference"] = PrincipalPreference.objects.filter(
principal=principal
).exists()
return ApiResponse.success(message=message, data=token_data)
def get_google_user_data(self, access_token):
user_info_endpoint = "https://www.googleapis.com/oauth2/v3/userinfo"
response = requests.get(
user_info_endpoint, params={"access_token": access_token}
)
if response.status_code == 200:
return response.json()
if response.status_code != 200:
try:
error_details = response.json()
except ValueError: # Includes simplejson.decoder.JSONDecodeError
error_details = {
"error": "Failed to decode JSON response from Google API.",
"status": response.status_code,
}
return {
"error": error_details,
"status": response.status_code,
}
# Apple's public keys URL
APPLE_PUBLIC_KEYS_URL = "https://appleid.apple.com/auth/keys"
# Your client ID
AUDIENCE = "com.app.goodTimes"
@csrf_exempt
@require_http_methods(["POST"])
def decode_apple_token(request):
try:
data = request.POST
identity_token = data.get("token")
if not identity_token:
return JsonResponse({"error": "Token is required"}, status=400)
principal_type = data.get("principal_type")
principal_type_instance = IAmPrincipalType.objects.filter(
name=principal_type
).first()
if principal_type_instance is None:
return JsonResponse(
{"error": "No Principal Type Exists"},
status=status.HTTP_400_BAD_REQUEST,
content_type="application/json",
)
# Fetch Apple's public keys
# Note: You might want to cache these keys and update them periodically rather than fetching them with every request
apple_keys_response = requests.get(APPLE_PUBLIC_KEYS_URL)
apple_keys = apple_keys_response.json()
# Decode the token
# Note: This is a simplified example; you should handle the selection of the key and any potential errors during decoding
header_data = jwt.get_unverified_header(identity_token)
kid = header_data["kid"]
apple_key = next((key for key in apple_keys["keys"] if key["kid"] == kid), None)
if apple_key is None:
return JsonResponse({"error": "Invalid key ID"}, status=400)
# Construct the public key
public_key = jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(apple_key))
# Decode the token
decoded = jwt.decode(
identity_token, public_key, algorithms=["RS256"], audience=AUDIENCE
)
# Check if there was an error in fetching user data
if "error" in decoded:
error_message = decoded.get("error", {}).get(
"error_description", "An error occurred while fetching user data."
)
return JsonResponse(
{"error": error_message},
status=decoded.get("status", status.HTTP_400_BAD_REQUEST),
)
print("decoded: ", decoded)
email = decoded.get("email")
apple_id = decoded.get("sub")
print("apple_id: ", apple_id)
# Checking if essential values are present and valid
if not apple_id:
# Return an error response if either 'email' or 'sub' is missing or empty
return JsonResponse(
{"error": "The token is missing required information."},
status=status.HTTP_400_BAD_REQUEST,
)
with transaction.atomic():
apple_source, _ = IAmPrincipalSource.objects.get_or_create(name="apple")
principal = IAmPrincipal.objects.filter(apple_id=apple_id).first()
print("principal: ", principal)
if principal:
principal.email_verified = True
principal.principal_source = apple_source
principal.apple_id = apple_id
# Update any other fields that might change on each login
principal.save()
print("Updated existing user")
message = "Already Registered and Verified User"
else:
defaults = {
"email": f"{apple_id}@gmail.com",
"register_complete": True,
"email_verified": True,
"username": apple_id, # Or generate a unique username if necessary
"principal_source": apple_source,
}
defaults["principal_type"] = principal_type_instance
defaults["apple_id"] = apple_id
principal = IAmPrincipal(**defaults)
default_password = f"SEMTG{apple_id[::-1]}"
principal.set_password(default_password)
principal.save()
print("Created new user")
message = "Registered Successfully"
token_data = generate_token_and_user_data(principal)
token_data["type"] = str(principal.principal_type)
return JsonResponse({"message": message, "data": token_data}, status=200)
# return JsonResponse(decoded)
except Exception as e:
return JsonResponse({"error": str(e)}, status=400)
class IAmPrincipalPlayerIDAPIView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
def post(self, request, *args, **kwargs):
serializer = PlayerIDSerializer(data=request.data)
print("serializer: ", serializer)
if serializer.is_valid():
principal = request.user
principal.player_id = serializer.validated_data["player_id"]
principal.save()
return ApiResponse.success(
status=status.HTTP_200_OK,
message=constants.SUCCESS,
data=serializer.data,
)
return ApiResponse.error(
status=status.HTTP_400_BAD_REQUEST,
message=constants.FAILURE,
errors=serializer.errors,
)
class SoftDeletePrincipalAPIView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
"""
Soft delete an IAmPrincipal account.
"""
def delete(self, request, format=None):
principal = request.user
if not principal.is_active: # Check if already deleted
return ApiResponse.error(
status=status.HTTP_400_BAD_REQUEST,
message="Account already deleted.",
errors="Account already deleted.",
)
principal.is_active = False
principal.save()
return ApiResponse.success(
status=status.HTTP_200_OK,
message="Account has been successfully deleted.",
data="Account has been successfully deleted.",
)
class VersionCheck(APIView):
authentication_classes = []
permission_classes = []
model = AppVersion
def get(self, request, *args, **kwargs):
device_type = request.GET.get('type')
if not device_type:
return ApiResponse.error(message=constants.FAILURE, errors="device type is required")
# Query the database to retrieve the upgrade flags based on the app version
version = self.model.objects.filter(app_type=device_type).last()
version_data = AppVersionSerializer(version)
return ApiResponse.success(message=constants.SUCCESS, data=version_data.data)