Files
digest_app/module_activity/api/views.py
2024-03-11 14:48:48 +05:30

796 lines
27 KiB
Python

from datetime import datetime, timedelta
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from rest_framework.permissions import IsAuthenticated
from rest_framework_simplejwt.authentication import JWTAuthentication
from django.db.models import Prefetch, Count, Max, Min
from module_project import constants, date_utils
from module_project.utils import ApiResponse
from module_iam.models import IAmPrincipal
from ..models import (
PrincipalHealthData,
Intolerance,
Symptoms,
PastTreatment,
ChronicCondition,
Medication,
Bowel,
MealSymptomRecord,
MealRecord,
)
from .serializers import (
IntoleranceSerializer,
SymptomsSerializer,
PastTreatmentSerializer,
ChronicConditionSerializer,
MedicationSerializer,
BowelSerializer,
MealSymptomRecordSerializer,
MealRecordSerializer,
IAmPrincipalSerializer,
PrincipalHealthDataSerializer,
PrincipalAndHealthSerializer,
)
from module_project.service import OneSignalNotificationService
class ProfileAPIView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
post_serializer_class = IAmPrincipalSerializer
get_serializer_class = PrincipalAndHealthSerializer
model = IAmPrincipal
def get(self, request):
try:
obj = self.model.objects.prefetch_related("health_data_principal").get(
pk=request.user.pk
)
serializer = self.get_serializer_class(obj, context={"request": request})
except self.model.DoesNotExist:
return ApiResponse.error(
status=status.HTTP_404_NOT_FOUND, message=constants.RECORD_NOT_FOUND
)
return ApiResponse.success(message=constants.SUCCESS, data=serializer.data)
def post(self, request):
data = request.data.copy()
# Separate principal and health data
principal_data = {}
health_data = {}
for key, value in data.items():
if key in self.post_serializer_class.Meta.fields:
principal_data[key] = value
else:
health_data[key] = value
principal_serializer = self.post_serializer_class(
instance=request.user, data=principal_data
)
health_serializer = PrincipalHealthDataSerializer(data=health_data)
if not principal_serializer.is_valid():
return ApiResponse.error(
message=constants.FAILURE, errors=principal_serializer.errors
)
if not health_serializer.is_valid():
return ApiResponse.error(
message=constants.FAILURE, errors=health_serializer.errors
)
try:
# with transaction.atomic(): # Ensure atomicity of database operations
principal_instance = principal_serializer.save()
principal_instance.register_complete = True
principal_instance.save()
# Check if health data already exists for the principal
health_data_instance, created = PrincipalHealthData.objects.get_or_create(
principal=principal_instance
)
health_serializer.update(health_data_instance, health_data)
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
return ApiResponse.success(message=constants.SUCCESS)
class ProfileCompleteAPIView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
model = IAmPrincipal
def get(self, request):
user = IAmPrincipal.objects.filter(id=request.user.id).update(register_complete=True)
return ApiResponse.success(message=constants.SUCCESS)
class DailyRecordAPIView(APIView):
def serialize_record(self, record):
time_obj = datetime.strptime(str(record.time), "%H:%M:%S")
return {
"id": record.id,
"date": record.date,
"time": time_obj.strftime("%I:%M %p"),
# Add other fields as needed
}
def get(self, request):
date = request.GET.get("date")
# date = datetime.now().date()
if not date:
return ApiResponse.error(
message=constants.FAILURE, errors="Date parameter is missing"
)
try:
# Convert the date string to a datetime object
date_obj = datetime.strptime(date, "%Y-%m-%d").date()
except ValueError:
return ApiResponse.error(
message=constants.FAILURE, errors="Invalid date format"
)
# Define prefetch related queries for filtering the record of paticular date of all related models
meal_records_prefetch = Prefetch(
"meal_principal",
queryset=MealRecord.objects.filter(date=date, deleted=False),
to_attr="filtered_meal_record",
)
medication_prefetch = Prefetch(
"medication_principal",
queryset=Medication.objects.filter(date=date, deleted=False),
to_attr="filtered_medication",
)
bowel_prefetch = Prefetch(
"bowel_principal",
queryset=Bowel.objects.filter(date=date, deleted=False),
to_attr="filtered_bowel",
)
meal_symptom_prefetch = Prefetch(
"meal_symptom_principal",
queryset=MealSymptomRecord.objects.filter(date=date, deleted=False),
to_attr="filtered_meal_symptom",
)
# Query the IAmPrincipal table of principal with prefetch_related to retrieve related records
principal = IAmPrincipal.objects.prefetch_related(
meal_records_prefetch,
medication_prefetch,
bowel_prefetch,
meal_symptom_prefetch,
).get(id=request.user.id)
serialized_meal_records = [
{"type": "Meal", **self.serialize_record(record)}
for record in principal.filtered_meal_record
]
serialized_medication = [
{"type": "Medication", **self.serialize_record(record)}
for record in principal.filtered_medication
]
serialized_bowel = [
{"type": "Bowel Movements", **self.serialize_record(record)}
for record in principal.filtered_bowel
]
serialized_symptom = [
{"type": "Symptom - Meal", **self.serialize_record(record)}
for record in principal.filtered_meal_symptom
]
all_records = (
serialized_symptom
+ serialized_meal_records
+ serialized_medication
+ serialized_bowel
)
# all_records_sorted = sorted(all_records, key=lambda x: x["time"], reverse=True)
all_records_sorted = sorted(all_records, key=lambda x: x["time"], reverse=True)
return ApiResponse.success(message=constants.SUCCESS, data=all_records_sorted)
class IntoleranceListCreateAPIView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
serializer_class = IntoleranceSerializer
model = Intolerance
def get(self, request):
obj = self.model.objects.filter(principal=request.user)
serializer = self.serializer_class(obj, many=True)
return ApiResponse.success(message=constants.SUCCESS, data=serializer.data)
def post(self, request):
print(f"request data for intolerance is {request.data}")
serializer = self.serializer_class(data=request.data, many=True)
if not serializer.is_valid():
return ApiResponse.error(
message=constants.FAILURE, errors=serializer.errors
)
try:
self.model.objects.filter(principal=request.user).delete()
instance = serializer.save(principal=request.user)
saved_data_serializer = self.serializer_class(instance, many=True)
return ApiResponse.success(
message=constants.SUCCESS, data=saved_data_serializer.data
)
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
class SymptomsListCreateAPIView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
serializer_class = SymptomsSerializer
model = Symptoms
def get(self, request):
obj = self.model.objects.filter(principal=request.user)
serializer = self.serializer_class(obj, many=True)
return ApiResponse.success(message=constants.SUCCESS, data=serializer.data)
def post(self, request):
print(f"request data for Symptoms is {request.data}")
serializer = self.serializer_class(data=request.data, many=True)
if not serializer.is_valid():
return ApiResponse.error(
message=constants.FAILURE, errors=serializer.errors
)
try:
self.model.objects.filter(principal=request.user).delete()
instance = serializer.save(principal=request.user)
saved_data_serializer = self.serializer_class(instance, many=True)
return ApiResponse.success(
message=constants.SUCCESS, data=saved_data_serializer.data
)
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
class PastTreatmentListCreateAPIView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
serializer_class = PastTreatmentSerializer
model = PastTreatment
def get(self, request):
obj = self.model.objects.filter(principal=request.user)
serializer = self.serializer_class(obj, many=True)
return ApiResponse.success(message=constants.SUCCESS, data=serializer.data)
def post(self, request):
print(f"request data for PastTreatment is {request.data}")
serializer = self.serializer_class(data=request.data, many=True)
if not serializer.is_valid():
return ApiResponse.error(
message=constants.FAILURE, errors=serializer.errors
)
try:
self.model.objects.filter(principal=request.user).delete()
instance = serializer.save(principal=request.user)
saved_data_serializer = self.serializer_class(instance, many=True)
return ApiResponse.success(
message=constants.SUCCESS, data=saved_data_serializer.data
)
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
class ChronicConditionListCreateAPIView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
serializer_class = ChronicConditionSerializer
model = ChronicCondition
def get(self, request):
obj = self.model.objects.filter(principal=request.user)
serializer = self.serializer_class(obj, many=True)
return ApiResponse.success(message=constants.SUCCESS, data=serializer.data)
def post(self, request):
print(f"request data for PastTreatment is {request.data}")
serializer = self.serializer_class(data=request.data, many=True)
if not serializer.is_valid():
return ApiResponse.error(
message=constants.FAILURE, errors=serializer.errors
)
try:
self.model.objects.filter(principal=request.user).delete()
instance = serializer.save(principal=request.user)
saved_data_serializer = self.serializer_class(instance, many=True)
return ApiResponse.success(
message=constants.SUCCESS, data=saved_data_serializer.data
)
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
class MedicationAPIView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
serializer_class = MedicationSerializer
model = Medication
def get_objects(self, pk):
try:
return self.model.objects.get(pk=pk)
except self.model.DoesNotExist:
return ApiResponse.error(
status=status.HTTP_404_NOT_FOUND, message=constants.RECORD_NOT_FOUND
)
def get(self, request, pk):
obj = self.get_objects(pk)
if isinstance(obj, Response):
return obj
serializer = self.serializer_class(obj)
return ApiResponse.success(message=constants.SUCCESS, data=serializer.data)
def post(self, request):
serializer = self.serializer_class(data=request.data)
if not serializer.is_valid():
return ApiResponse.error(
message=constants.FAILURE, errors=serializer.errors
)
try:
serializer.save(principal=request.user)
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
return ApiResponse.success(
status=status.HTTP_201_CREATED,
message=constants.SUCCESS,
data=serializer.data,
)
def put(self, request, pk):
obj = self.get_objects(pk)
if isinstance(obj, Response):
return obj
serializer = self.serializer_class(obj, data=request.data)
if not serializer.is_valid():
return ApiResponse.error(message=constants.FAILURE, errors=serializer.data)
try:
serializer.save()
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
return ApiResponse.success(
status=status.HTTP_201_CREATED,
message=constants.SUCCESS,
data=serializer.data,
)
def delete(self, request, pk):
obj = self.get_objects(pk)
if isinstance(obj, Response):
return obj
try:
obj.delete()
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
return ApiResponse.success(
message=constants.RECORD_DELETED, status=status.HTTP_204_NO_CONTENT
)
class BowelAPIView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
serializer_class = BowelSerializer
model = Bowel
def get_objects(self, pk):
try:
return self.model.objects.get(pk=pk)
except self.model.DoesNotExist:
return ApiResponse.error(
status=status.HTTP_404_NOT_FOUND, message=constants.RECORD_NOT_FOUND
)
def get(self, request, pk):
obj = self.get_objects(pk)
if isinstance(obj, Response):
return obj
serializer = self.serializer_class(obj)
return ApiResponse.success(message=constants.SUCCESS, data=serializer.data)
def post(self, request):
serializer = self.serializer_class(data=request.data)
if not serializer.is_valid():
return ApiResponse.error(
message=constants.FAILURE, errors=serializer.errors
)
try:
serializer.save(principal=request.user)
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
return ApiResponse.success(
status=status.HTTP_201_CREATED,
message=constants.SUCCESS,
data=serializer.data,
)
def put(self, request, pk):
obj = self.get_objects(pk)
if isinstance(obj, Response):
return obj
serializer = self.serializer_class(obj, data=request.data)
if not serializer.is_valid():
return ApiResponse.error(message=constants.FAILURE, errors=serializer.data)
try:
serializer.save()
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
return ApiResponse.success(
status=status.HTTP_201_CREATED,
message=constants.SUCCESS,
data=serializer.data,
)
def delete(self, request, pk):
obj = self.get_objects(pk)
if isinstance(obj, Response):
return obj
try:
obj.delete()
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
return ApiResponse.success(
message=constants.RECORD_DELETED, status=status.HTTP_204_NO_CONTENT
)
class MealSymptomAPIView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
serializer_class = MealSymptomRecordSerializer
model = MealSymptomRecord
def get_objects(self, pk):
try:
return self.model.objects.get(pk=pk)
except self.model.DoesNotExist:
return ApiResponse.error(
status=status.HTTP_404_NOT_FOUND, message=constants.RECORD_NOT_FOUND
)
def get(self, request, pk):
obj = self.get_objects(pk)
if isinstance(obj, Response):
return obj
serializer = self.serializer_class(obj)
return ApiResponse.success(message=constants.SUCCESS, data=serializer.data)
def post(self, request):
serializer = self.serializer_class(data=request.data)
if not serializer.is_valid():
return ApiResponse.error(
message=constants.FAILURE, errors=serializer.errors
)
try:
serializer.save(principal=request.user)
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
return ApiResponse.success(
status=status.HTTP_201_CREATED,
message=constants.SUCCESS,
data=serializer.data,
)
def put(self, request, pk):
obj = self.get_objects(pk)
if isinstance(obj, Response):
return obj
serializer = self.serializer_class(obj, data=request.data)
if not serializer.is_valid():
return ApiResponse.error(message=constants.FAILURE, errors=serializer.data)
try:
serializer.save()
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
return ApiResponse.success(
status=status.HTTP_201_CREATED,
message=constants.SUCCESS,
data=serializer.data,
)
def delete(self, request, pk):
obj = self.get_objects(pk)
if isinstance(obj, Response):
return obj
try:
obj.delete()
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
return ApiResponse.success(
message=constants.RECORD_DELETED, status=status.HTTP_204_NO_CONTENT
)
class MealAPIView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
serializer_class = MealRecordSerializer
model = MealRecord
def get_objects(self, pk):
try:
return self.model.objects.get(pk=pk)
except self.model.DoesNotExist:
return ApiResponse.error(
status=status.HTTP_404_NOT_FOUND, message=constants.RECORD_NOT_FOUND
)
def get(self, request, pk):
obj = self.get_objects(pk)
if isinstance(obj, Response):
return obj
serializer = self.serializer_class(obj)
return ApiResponse.success(message=constants.SUCCESS, data=serializer.data)
def post(self, request):
serializer = self.serializer_class(data=request.data)
if not serializer.is_valid():
return ApiResponse.error(
message=constants.FAILURE, errors=serializer.errors
)
try:
serializer.save(principal=request.user)
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
return ApiResponse.success(
status=status.HTTP_201_CREATED,
message=constants.SUCCESS,
data=serializer.data,
)
def put(self, request, pk):
obj = self.get_objects(pk)
if isinstance(obj, Response):
return obj
serializer = self.serializer_class(obj, data=request.data)
if not serializer.is_valid():
return ApiResponse.error(message=constants.FAILURE, errors=serializer.data)
try:
serializer.save()
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
return ApiResponse.success(
status=status.HTTP_201_CREATED,
message=constants.SUCCESS,
data=serializer.data,
)
def delete(self, request, pk):
obj = self.get_objects(pk)
if isinstance(obj, Response):
return obj
try:
obj.delete()
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
return ApiResponse.success(
message=constants.RECORD_DELETED, status=status.HTTP_204_NO_CONTENT
)
from collections import defaultdict
class ReportAPIView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
model = MealRecord
def get_user(self):
return self.request.user
def enough_records_exist(self, start_date, end_date):
"""
Check if at least 7 days of records exist within the date range.
This method calculates the minimum date by subtracting the required number of days
(min_days_required - 1) from the end_date. It then filters the queryset based on the
principal and date range. If any objects exist within this range, the method returns True,
indicating that there are at least 7 days of records.
:param start_date: The initial date in the date range
:type start_date: datetime.date
:param end_date: The final date in the date range
:type end_date: datetime.date
:return: True if at least 7 days of records exist, False otherwise
:rtype: bool
"""
min_days_required = 7
current_date = start_date
count = 0
while current_date <= end_date:
if self.model.objects.filter(principal=self.get_user(), date=current_date).exists():
count += 1
if count >= min_days_required:
return True
else:
count = 0 # Reset count if record is missing for any day
current_date += timedelta(days=1)
return False
def get_top_food_avoid(self, start_date, end_date):
"""Get the top food to avoid."""
food_counts = defaultdict(int)
ingredient_counts = defaultdict(int)
beverage_counts = defaultdict(int)
symptom_records = MealSymptomRecord.objects.filter(
principal=self.get_user(), date__range=(start_date, end_date)
)
for symptom_record in symptom_records:
closest_meal = (
MealRecord.objects.filter(
principal=symptom_record.principal, date__lte=symptom_record.date
)
.order_by("-date", "-time")
.first()
)
if closest_meal:
for food_record in closest_meal.food_records.all():
food_counts[food_record.name] += 1
for ingredient_record in closest_meal.food_ingredient_records.all():
ingredient_counts[ingredient_record.name] += 1
for beverage_record in closest_meal.beverage_records.all():
beverage_counts[beverage_record.beverage_type] += 1
# Sort the dictionaries by their values in descending order and getting only top 3 record
food_counts = dict(
sorted(food_counts.items(), key=lambda x: x[1], reverse=True)[:3]
)
ingredient_counts = dict(
sorted(ingredient_counts.items(), key=lambda x: x[1], reverse=True)[:3]
)
beverage_counts = dict(
sorted(beverage_counts.items(), key=lambda x: x[1], reverse=True)[:3]
)
food_avoid = next(iter(food_counts), None)
return food_avoid, food_counts, ingredient_counts, beverage_counts
def get_symptoms_frequency(self, start_date, end_date):
"""Get the frequency of symptoms."""
symptom_records = MealSymptomRecord.objects.filter(
principal=self.get_user(), date__range=(start_date, end_date)
).annotate(
before_meal_count=Count("symptoms_before_meal"),
after_meal_count=Count("symptoms_after_meal"),
)
symptoms_frequency = defaultdict(int)
for record in symptom_records:
for symptom in record.symptoms_before_meal.all():
symptoms_frequency[symptom.name] += record.before_meal_count
for symptom in record.symptoms_after_meal.all():
symptoms_frequency[symptom.name] += record.after_meal_count
sorted_symptoms_frequency = dict(
sorted(symptoms_frequency.items(), key=lambda x: x[1], reverse=True)[:3]
)
return sorted_symptoms_frequency
def get_stool_type_counts(self, start_date, end_date):
"""Get the count of stool types."""
stool_type_counts = (
Bowel.objects.filter(
principal=self.get_user(), date__range=(start_date, end_date)
)
.values("stool_type")
.annotate(stool_type_count=Count("stool_type"))
)
stool_type_counts_dict = {
item["stool_type"]: item["stool_type_count"] for item in stool_type_counts
}
stool_type_counts_sort = dict(
sorted(stool_type_counts_dict.items(), key=lambda x: x[1], reverse=True)[:3]
)
highest_stool = next(iter(stool_type_counts_sort), None)
return stool_type_counts_sort, highest_stool
def get(self, request):
date_range = request.GET.get("date_range")
start_date, end_date = date_utils.get_date_range(date_range)
print(f"start date is {start_date}, end_date is {end_date}")
print(f"is dats exist {self.enough_records_exist(start_date, end_date)}")
if not self.enough_records_exist(start_date, end_date):
return ApiResponse.error(
message="No report is generated. Minimum Previous 7 days of records required."
)
# Get top food to avoid
food_avoid, food_counts, ingredient_counts, beverage_counts = (
self.get_top_food_avoid(start_date, end_date)
)
# Get symptoms frequency
sorted_symptoms_frequency = self.get_symptoms_frequency(start_date, end_date)
# Get stool type counts
stool_type_counts_sort, highest_stool = self.get_stool_type_counts(
start_date, end_date
)
nested_json = {
"food_avoid": food_avoid,
"same_food_avoid": {
"food": food_counts,
"ingredient": ingredient_counts,
"beverage": beverage_counts,
},
"symptoms_frequency": sorted_symptoms_frequency,
"highest_stool": highest_stool,
"stool_type": stool_type_counts_sort,
}
print(f"nested_json data is {nested_json}")
return ApiResponse.success(message=constants.SUCCESS, data=nested_json)