Files
digest_app/module_activity/api/views.py
2024-03-29 18:50:31 +05:30

836 lines
30 KiB
Python

from datetime import datetime, timedelta
from django.db.models import Count, Max, Min, Prefetch
from rest_framework import status
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework_simplejwt.authentication import JWTAuthentication
from module_iam.models import IAmPrincipal
from module_project import constants, date_utils
from module_project.service import OneSignalNotificationService
from module_project.utils import ApiResponse
from ..models import (FoodIngredintDataset, Bowel, ChronicCondition, Intolerance, MealRecord,
MealSymptomRecord, Medication, PastTreatment,
PrincipalHealthData, Symptoms)
from .serializers import (FoodDatasetSerializer, FoodIngredientDatasetSerializer, BowelSerializer, ChronicConditionSerializer,
IAmPrincipalSerializer, IntoleranceSerializer,
MealRecordSerializer, MealSymptomRecordSerializer,
MedicationSerializer, PastTreatmentSerializer,
PrincipalAndHealthSerializer,
PrincipalHealthDataSerializer, SymptomsSerializer)
class ProfileAPIView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
post_serializer_class = IAmPrincipalSerializer
get_serializer_class = PrincipalAndHealthSerializer
model = IAmPrincipal
def get(self, request):
try:
obj = self.model.objects.prefetch_related("health_data_principal").get(
pk=request.user.pk
)
serializer = self.get_serializer_class(obj, context={"request": request})
except self.model.DoesNotExist:
return ApiResponse.error(
status=status.HTTP_404_NOT_FOUND, message=constants.RECORD_NOT_FOUND
)
return ApiResponse.success(message=constants.SUCCESS, data=serializer.data)
def post(self, request):
data = request.data.copy()
# Separate principal and health data
principal_data = {}
health_data = {}
for key, value in data.items():
if key in self.post_serializer_class.Meta.fields:
principal_data[key] = value
else:
health_data[key] = value
principal_serializer = self.post_serializer_class(
instance=request.user, data=principal_data
)
health_serializer = PrincipalHealthDataSerializer(data=health_data)
if not principal_serializer.is_valid():
return ApiResponse.error(
message=constants.FAILURE, errors=principal_serializer.errors
)
if not health_serializer.is_valid():
return ApiResponse.error(
message=constants.FAILURE, errors=health_serializer.errors
)
try:
# with transaction.atomic(): # Ensure atomicity of database operations
principal_instance = principal_serializer.save()
principal_instance.register_complete = True
principal_instance.save()
# Check if health data already exists for the principal
health_data_instance, created = PrincipalHealthData.objects.get_or_create(
principal=principal_instance
)
health_serializer.update(health_data_instance, health_data)
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
return ApiResponse.success(message=constants.SUCCESS)
class ProfileCompleteAPIView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
model = IAmPrincipal
def get(self, request):
user = IAmPrincipal.objects.filter(id=request.user.id).update(register_complete=True)
return ApiResponse.success(message=constants.SUCCESS)
class DailyRecordAPIView(APIView):
def serialize_record(self, record):
time_obj = datetime.strptime(str(record.time), "%H:%M:%S")
return {
"id": record.id,
"date": record.date,
"time": time_obj.strftime("%I:%M %p"),
# Add other fields as needed
}
def get(self, request):
date = request.GET.get("date")
# date = datetime.now().date()
if not date:
return ApiResponse.error(
message=constants.FAILURE, errors="Date parameter is missing"
)
try:
# Convert the date string to a datetime object
date_obj = datetime.strptime(date, "%Y-%m-%d").date()
except ValueError:
return ApiResponse.error(
message=constants.FAILURE, errors="Invalid date format"
)
# Define prefetch related queries for filtering the record of paticular date of all related models
meal_records_prefetch = Prefetch(
"meal_principal",
queryset=MealRecord.objects.filter(date=date, deleted=False),
to_attr="filtered_meal_record",
)
medication_prefetch = Prefetch(
"medication_principal",
queryset=Medication.objects.filter(date=date, deleted=False),
to_attr="filtered_medication",
)
bowel_prefetch = Prefetch(
"bowel_principal",
queryset=Bowel.objects.filter(date=date, deleted=False),
to_attr="filtered_bowel",
)
meal_symptom_prefetch = Prefetch(
"meal_symptom_principal",
queryset=MealSymptomRecord.objects.filter(date=date, deleted=False),
to_attr="filtered_meal_symptom",
)
# Query the IAmPrincipal table of principal with prefetch_related to retrieve related records
principal = IAmPrincipal.objects.prefetch_related(
meal_records_prefetch,
medication_prefetch,
bowel_prefetch,
meal_symptom_prefetch,
).get(id=request.user.id)
serialized_meal_records = [
{"type": f"Meal - {record.meal_type}", **self.serialize_record(record)}
for record in principal.filtered_meal_record
]
serialized_medication = [
{"type": "Medication", **self.serialize_record(record)}
for record in principal.filtered_medication
]
serialized_bowel = [
{"type": "Bowel Movements", **self.serialize_record(record)}
for record in principal.filtered_bowel
]
serialized_symptom = [
{"type": "Symptom - Meal", **self.serialize_record(record)}
for record in principal.filtered_meal_symptom
]
all_records = (
serialized_symptom
+ serialized_meal_records
+ serialized_medication
+ serialized_bowel
)
# all_records_sorted = sorted(all_records, key=lambda x: x["time"], reverse=True)
all_records_sorted = sorted(all_records, key=lambda x: x["time"], reverse=True)
return ApiResponse.success(message=constants.SUCCESS, data=all_records_sorted)
class IntoleranceListCreateAPIView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
serializer_class = IntoleranceSerializer
model = Intolerance
def get(self, request):
obj = self.model.objects.filter(principal=request.user, active=True)
serializer = self.serializer_class(obj, many=True)
return ApiResponse.success(message=constants.SUCCESS, data=serializer.data)
def post(self, request):
print(f"request data for intolerance is {request.data}")
serializer = self.serializer_class(data=request.data, many=True)
if not serializer.is_valid():
return ApiResponse.error(
message=constants.FAILURE, errors=serializer.errors
)
try:
self.model.objects.filter(principal=request.user).delete()
instance = serializer.save(principal=request.user)
saved_data_serializer = self.serializer_class(instance, many=True)
return ApiResponse.success(
message=constants.SUCCESS, data=saved_data_serializer.data
)
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
class SymptomsListCreateAPIView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
serializer_class = SymptomsSerializer
model = Symptoms
def get(self, request):
obj = self.model.objects.filter(principal=request.user, active=True)
serializer = self.serializer_class(obj, many=True)
return ApiResponse.success(message=constants.SUCCESS, data=serializer.data)
def post(self, request):
print(f"request data for Symptoms is {request.data}")
serializer = self.serializer_class(data=request.data, many=True)
if not serializer.is_valid():
return ApiResponse.error(
message=constants.FAILURE, errors=serializer.errors
)
try:
self.model.objects.filter(principal=request.user).delete()
instance = serializer.save(principal=request.user)
saved_data_serializer = self.serializer_class(instance, many=True)
return ApiResponse.success(
message=constants.SUCCESS, data=saved_data_serializer.data
)
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
class PastTreatmentListCreateAPIView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
serializer_class = PastTreatmentSerializer
model = PastTreatment
def get(self, request):
obj = self.model.objects.filter(principal=request.user, active=True)
serializer = self.serializer_class(obj, many=True)
return ApiResponse.success(message=constants.SUCCESS, data=serializer.data)
def post(self, request):
print(f"request data for PastTreatment is {request.data}")
serializer = self.serializer_class(data=request.data, many=True)
if not serializer.is_valid():
return ApiResponse.error(
message=constants.FAILURE, errors=serializer.errors
)
try:
self.model.objects.filter(principal=request.user).delete()
instance = serializer.save(principal=request.user)
saved_data_serializer = self.serializer_class(instance, many=True)
return ApiResponse.success(
message=constants.SUCCESS, data=saved_data_serializer.data
)
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
class ChronicConditionListCreateAPIView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
serializer_class = ChronicConditionSerializer
model = ChronicCondition
def get(self, request):
obj = self.model.objects.filter(principal=request.user, active=True)
serializer = self.serializer_class(obj, many=True)
return ApiResponse.success(message=constants.SUCCESS, data=serializer.data)
def post(self, request):
print(f"request data for PastTreatment is {request.data}")
serializer = self.serializer_class(data=request.data, many=True)
if not serializer.is_valid():
return ApiResponse.error(
message=constants.FAILURE, errors=serializer.errors
)
try:
self.model.objects.filter(principal=request.user).delete()
instance = serializer.save(principal=request.user)
saved_data_serializer = self.serializer_class(instance, many=True)
return ApiResponse.success(
message=constants.SUCCESS, data=saved_data_serializer.data
)
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
class MedicationAPIView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
serializer_class = MedicationSerializer
model = Medication
def get_objects(self, pk):
try:
return self.model.objects.get(pk=pk)
except self.model.DoesNotExist:
return ApiResponse.error(
status=status.HTTP_404_NOT_FOUND, message=constants.RECORD_NOT_FOUND
)
def get(self, request, pk):
obj = self.get_objects(pk)
if isinstance(obj, Response):
return obj
serializer = self.serializer_class(obj)
return ApiResponse.success(message=constants.SUCCESS, data=serializer.data)
def post(self, request):
serializer = self.serializer_class(data=request.data)
if not serializer.is_valid():
return ApiResponse.error(
message=constants.FAILURE, errors=serializer.errors
)
try:
serializer.save(principal=request.user)
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
return ApiResponse.success(
status=status.HTTP_201_CREATED,
message=constants.SUCCESS,
data=serializer.data,
)
def put(self, request, pk):
obj = self.get_objects(pk)
if isinstance(obj, Response):
return obj
serializer = self.serializer_class(obj, data=request.data)
if not serializer.is_valid():
return ApiResponse.error(message=constants.FAILURE, errors=serializer.data)
try:
serializer.save()
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
return ApiResponse.success(
status=status.HTTP_201_CREATED,
message=constants.SUCCESS,
data=serializer.data,
)
def delete(self, request, pk):
obj = self.get_objects(pk)
if isinstance(obj, Response):
return obj
try:
obj.delete()
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
return ApiResponse.success(
message=constants.RECORD_DELETED, status=status.HTTP_204_NO_CONTENT
)
class BowelAPIView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
serializer_class = BowelSerializer
model = Bowel
def get_objects(self, pk):
try:
return self.model.objects.get(pk=pk)
except self.model.DoesNotExist:
return ApiResponse.error(
status=status.HTTP_404_NOT_FOUND, message=constants.RECORD_NOT_FOUND
)
def get(self, request, pk):
obj = self.get_objects(pk)
if isinstance(obj, Response):
return obj
serializer = self.serializer_class(obj)
return ApiResponse.success(message=constants.SUCCESS, data=serializer.data)
def post(self, request):
serializer = self.serializer_class(data=request.data)
if not serializer.is_valid():
return ApiResponse.error(
message=constants.FAILURE, errors=serializer.errors
)
try:
serializer.save(principal=request.user)
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
return ApiResponse.success(
status=status.HTTP_201_CREATED,
message=constants.SUCCESS,
data=serializer.data,
)
def put(self, request, pk):
obj = self.get_objects(pk)
if isinstance(obj, Response):
return obj
serializer = self.serializer_class(obj, data=request.data)
if not serializer.is_valid():
return ApiResponse.error(message=constants.FAILURE, errors=serializer.data)
try:
serializer.save()
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
return ApiResponse.success(
status=status.HTTP_201_CREATED,
message=constants.SUCCESS,
data=serializer.data,
)
def delete(self, request, pk):
obj = self.get_objects(pk)
if isinstance(obj, Response):
return obj
try:
obj.delete()
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
return ApiResponse.success(
message=constants.RECORD_DELETED, status=status.HTTP_204_NO_CONTENT
)
class MealSymptomAPIView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
serializer_class = MealSymptomRecordSerializer
model = MealSymptomRecord
def get_objects(self, pk):
try:
return self.model.objects.get(pk=pk)
except self.model.DoesNotExist:
return ApiResponse.error(
status=status.HTTP_404_NOT_FOUND, message=constants.RECORD_NOT_FOUND
)
def get(self, request, pk):
obj = self.get_objects(pk)
if isinstance(obj, Response):
return obj
serializer = self.serializer_class(obj)
return ApiResponse.success(message=constants.SUCCESS, data=serializer.data)
def post(self, request):
serializer = self.serializer_class(data=request.data)
if not serializer.is_valid():
return ApiResponse.error(
message=constants.FAILURE, errors=serializer.errors
)
try:
serializer.save(principal=request.user)
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
return ApiResponse.success(
status=status.HTTP_201_CREATED,
message=constants.SUCCESS,
data=serializer.data,
)
def put(self, request, pk):
obj = self.get_objects(pk)
if isinstance(obj, Response):
return obj
serializer = self.serializer_class(obj, data=request.data)
if not serializer.is_valid():
return ApiResponse.error(message=constants.FAILURE, errors=serializer.data)
try:
serializer.save()
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
return ApiResponse.success(
status=status.HTTP_201_CREATED,
message=constants.SUCCESS,
data=serializer.data,
)
def delete(self, request, pk):
obj = self.get_objects(pk)
if isinstance(obj, Response):
return obj
try:
obj.delete()
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
return ApiResponse.success(
message=constants.RECORD_DELETED, status=status.HTTP_204_NO_CONTENT
)
class MealAPIView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
serializer_class = MealRecordSerializer
model = MealRecord
def get_objects(self, pk):
try:
return self.model.objects.get(pk=pk)
except self.model.DoesNotExist:
return ApiResponse.error(
status=status.HTTP_404_NOT_FOUND, message=constants.RECORD_NOT_FOUND
)
def get(self, request, pk):
obj = self.get_objects(pk)
if isinstance(obj, Response):
return obj
serializer = self.serializer_class(obj)
return ApiResponse.success(message=constants.SUCCESS, data=serializer.data)
def post(self, request):
serializer = self.serializer_class(data=request.data)
if not serializer.is_valid():
return ApiResponse.error(
message=constants.FAILURE, errors=serializer.errors
)
try:
serializer.save(principal=request.user)
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
return ApiResponse.success(
status=status.HTTP_201_CREATED,
message=constants.SUCCESS,
data=serializer.data,
)
def put(self, request, pk):
obj = self.get_objects(pk)
if isinstance(obj, Response):
return obj
serializer = self.serializer_class(obj, data=request.data)
if not serializer.is_valid():
return ApiResponse.error(message=constants.FAILURE, errors=serializer.data)
try:
serializer.save()
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
return ApiResponse.success(
status=status.HTTP_201_CREATED,
message=constants.SUCCESS,
data=serializer.data,
)
def delete(self, request, pk):
obj = self.get_objects(pk)
if isinstance(obj, Response):
return obj
try:
obj.delete()
except Exception as e:
return ApiResponse.error(message=constants.FAILURE, errors=str(e))
return ApiResponse.success(
message=constants.RECORD_DELETED, status=status.HTTP_204_NO_CONTENT
)
class FoodDataAPIView(APIView):
authentication_classes = []
permission_classes = []
model = FoodIngredintDataset
serializer_class = FoodDatasetSerializer
def get(self, request):
obj = self.model.objects.all().order_by("food_name")
serializer = self.serializer_class(obj, many=True)
return ApiResponse.success(message=constants.SUCCESS, data=serializer.data)
class FoodIngredientSearchAPIView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
model = FoodIngredintDataset
serializer_class = FoodIngredientDatasetSerializer
def get(self, request):
query = request.query_params.getlist('query')
print(f"Query : {query}")
obj = self.model.objects.filter(food_name__in=query).values_list('ingredients', flat=True)
unique_ingredients = set()
for ingredients_list in obj:
unique_ingredients.update(ingredients_list)
# serializer = self.serializer_class(obj, many=True)
return ApiResponse.success(message=constants.SUCCESS, data={'ingredients_list': list(unique_ingredients)})
class MealDateAPIView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
serializer_class = MealRecordSerializer
model = MealRecord
def get(self, request):
date = request.GET.get("date")
if not date:
return ApiResponse.error(
message=constants.FAILURE, errors="Date parameter is missing"
)
try:
# Convert the date string to a datetime object
date_obj = datetime.strptime(date, "%Y-%m-%d").date()
except ValueError:
return ApiResponse.error(
message=constants.FAILURE, errors="Invalid date format"
)
obj = self.model.objects.filter(date=date_obj)
serializer = self.serializer_class(obj, many=True)
return ApiResponse.success(message=constants.SUCCESS, data=serializer.data)
from collections import defaultdict
class ReportAPIView(APIView):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
model = MealRecord
def get_user(self):
return self.request.user
def enough_records_exist(self, start_date, end_date):
"""
Check if at least 7 days of records exist within the date range.
This method calculates the minimum date by subtracting the required number of days
(min_days_required - 1) from the end_date. It then filters the queryset based on the
principal and date range. If any objects exist within this range, the method returns True,
indicating that there are at least 7 days of records.
:param start_date: The initial date in the date range
:type start_date: datetime.date
:param end_date: The final date in the date range
:type end_date: datetime.date
:return: True if at least 7 days of records exist, False otherwise
:rtype: bool
"""
min_days_required = 7
current_date = start_date
count = 0
while current_date <= end_date:
if self.model.objects.filter(principal=self.get_user(), date=current_date).exists():
count += 1
if count >= min_days_required:
return True
else:
count = 0 # Reset count if record is missing for any day
current_date += timedelta(days=1)
return False
def get_top_food_avoid(self, start_date, end_date):
"""Get the top food to avoid."""
food_counts = defaultdict(int)
ingredient_counts = defaultdict(int)
beverage_counts = defaultdict(int)
symptom_records = MealSymptomRecord.objects.filter(
principal=self.get_user(), date__range=(start_date, end_date)
)
for symptom_record in symptom_records:
closest_meal = (
MealRecord.objects.filter(
principal=symptom_record.principal, date__lte=symptom_record.date
)
.order_by("-date", "-time")
.first()
)
if closest_meal:
for food_record in closest_meal.food_records.all():
food_counts[food_record.name] += 1
for ingredient_record in closest_meal.food_ingredient_records.all():
ingredient_counts[ingredient_record.name] += 1
for beverage_record in closest_meal.beverage_records.all():
beverage_counts[beverage_record.beverage_type] += 1
# Sort the dictionaries by their values in descending order and getting only top 3 record
food_counts = dict(
sorted(food_counts.items(), key=lambda x: x[1], reverse=True)[:3]
)
ingredient_counts = dict(
sorted(ingredient_counts.items(), key=lambda x: x[1], reverse=True)[:3]
)
beverage_counts = dict(
sorted(beverage_counts.items(), key=lambda x: x[1], reverse=True)[:3]
)
food_avoid = next(iter(food_counts), None)
return food_avoid, food_counts, ingredient_counts, beverage_counts
def get_symptoms_frequency(self, start_date, end_date):
"""Get the frequency of symptoms."""
symptom_records = MealSymptomRecord.objects.filter(
principal=self.get_user(), date__range=(start_date, end_date)
).annotate(
before_meal_count=Count("symptoms_before_meal"),
after_meal_count=Count("symptoms_after_meal"),
)
symptoms_frequency = defaultdict(int)
for record in symptom_records:
for symptom in record.symptoms_before_meal.all():
symptoms_frequency[symptom.name] += record.before_meal_count
for symptom in record.symptoms_after_meal.all():
symptoms_frequency[symptom.name] += record.after_meal_count
sorted_symptoms_frequency = dict(
sorted(symptoms_frequency.items(), key=lambda x: x[1], reverse=True)[:3]
)
return sorted_symptoms_frequency
def get_stool_type_counts(self, start_date, end_date):
"""Get the count of stool types."""
stool_type_counts = (
Bowel.objects.filter(
principal=self.get_user(), date__range=(start_date, end_date)
)
.values("stool_type")
.annotate(stool_type_count=Count("stool_type"))
)
stool_type_counts_dict = {
item["stool_type"]: item["stool_type_count"] for item in stool_type_counts
}
stool_type_counts_sort = dict(
sorted(stool_type_counts_dict.items(), key=lambda x: x[1], reverse=True)[:3]
)
highest_stool = next(iter(stool_type_counts_sort), None)
return stool_type_counts_sort, highest_stool
def get(self, request):
date_range = request.GET.get("date_range")
start_date, end_date = date_utils.get_date_range(date_range)
print(f"start date is {start_date}, end_date is {end_date}")
print(f"is dats exist {self.enough_records_exist(start_date, end_date)}")
if not self.enough_records_exist(start_date, end_date):
return ApiResponse.error(
message="No report is generated. Minimum Previous 7 days of records required."
)
# Get top food to avoid
food_avoid, food_counts, ingredient_counts, beverage_counts = (
self.get_top_food_avoid(start_date, end_date)
)
# Get symptoms frequency
sorted_symptoms_frequency = self.get_symptoms_frequency(start_date, end_date)
# Get stool type counts
stool_type_counts_sort, highest_stool = self.get_stool_type_counts(
start_date, end_date
)
nested_json = {
"food_avoid": food_avoid,
"same_food_avoid": {
"food": food_counts,
"ingredient": ingredient_counts,
"beverage": beverage_counts,
},
"symptoms_frequency": sorted_symptoms_frequency,
"highest_stool": highest_stool,
"stool_type": stool_type_counts_sort,
}
print(f"nested_json data is {nested_json}")
return ApiResponse.success(message=constants.SUCCESS, data=nested_json)