Files
digest_app/module_activity/views.py

569 lines
20 KiB
Python

import logging
from collections import defaultdict
from datetime import datetime, timedelta
from django.contrib import messages
from django.contrib.auth.mixins import LoginRequiredMixin
from django.db.models import Count, Prefetch, Q
from django.http import JsonResponse
from django.shortcuts import get_object_or_404, redirect, render
from django.urls import reverse_lazy
from django.views import generic
from django_datatables_view.base_datatable_view import BaseDatatableView
from module_iam import iam_constant
from module_iam.models import IAmPrincipal
from module_project import constants, date_utils
from module_project.utils import JsonResponseUtil
from .forms import (ChronicConditionForm, IntoleranceForm, PastTreatmentForm,
SymptomsForm)
from .models import (Bowel, ChronicCondition, Intolerance, MealRecord,
MealSymptomRecord, Medication, PastTreatment, Symptoms)
logger = logging.getLogger(__name__)
class BaseView(generic.TemplateView):
page_name = iam_constant.RESOURCE_MANAGE_USER
resource = None
action = None
template_name = None
model = Intolerance
context_objext_name = "obj"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["page_name"] = self.page_name
context["principal_id"] = get_object_or_404(IAmPrincipal, id=self.kwargs.get("principal_id"))
return context
class BaseCreateOrUpdateView(LoginRequiredMixin, generic.View):
page_name = iam_constant.RESOURCE_MANAGE_USER
page_title = None
model = None
template_name = "module_activity/base_add.html"
form_class = None
success_url = None
error_message = "An error occurred while saving the data."
def get_success_message(self):
self.success_message = (
constants.RECORD_CREATED if not self.object else constants.RECORD_UPDATED
)
return self.success_message
def get_object(self):
pk = self.kwargs.get("pk")
return get_object_or_404(self.model, pk=pk) if pk else None
def get_context_data(self, **kwargs):
context = {
"page_name": self.page_name,
"page_title": self.page_title,
"principal_id": self.kwargs.get("principal_id"),
"operation": "Add" if not self.object else "Edit",
}
context.update(kwargs) # Include any additional context data passed to the view
return context
def get(self, request, *args, **kwargs):
self.object = self.get_object()
form = self.form_class(instance=self.object)
context = self.get_context_data(form=form)
return render(request, self.template_name, context=context)
def post(self, request, *args, **kwargs):
principal_id = kwargs.get("principal_id")
self.object = self.get_object()
form = self.form_class(request.POST, instance=self.object)
if not form.is_valid():
print(form.errors)
context = self.get_context_data(form=form)
return render(request, self.template_name, context=context)
form.save(principal_id=principal_id)
messages.success(self.request, self.get_success_message())
success_url = reverse_lazy(
self.success_url, kwargs={"principal_id": principal_id}
)
return redirect(success_url)
class BaseListJson(BaseDatatableView):
model = Intolerance
columns = ["id", "name", "duration", "active", "deleted"]
order_columns = ["id", "name", "duration", "active", "deleted"]
def get_initial_queryset(self):
principal_id = self.kwargs.get("principal_id")
deleted_flag = self.request.GET.get("deleted_flag", None)
return self.model.objects.filter(principal=principal_id, deleted=deleted_flag)
def filter_queryset(self, qs):
search_value = self.request.GET.get("search[value]", None)
if search_value:
qs = qs.filter(
Q(name__icontains=search_value) | Q(duration__icontains=search_value)
)
return qs
class BaseActionView(generic.View):
model = None
def post(self, request, *args, **kwargs):
if self.model is None:
raise NotImplementedError(
"Subclasses of BaseActionView must define a 'model' attribute."
)
action = request.POST.get("action") # 'archive', 'active', or 'unarchive'
ids = request.POST.getlist("ids[]") # List of user IDs to perform action on
active = request.POST.get("active")
print(f"arhive action {action} and id is {ids} and active data is {active}")
if action == "archive":
# Update 'deleted' field to True for the selected users
self.model.objects.filter(id__in=ids).update(deleted=True, active=False)
message = "Record archived successfully."
elif action == "active":
# Update 'active' field to True for the selected users
self.model.objects.filter(id__in=ids).update(active=active.capitalize())
message = "Record activated successfully."
elif action == "unarchive":
# Update 'deleted' field to False for the selected users
self.model.objects.filter(id__in=ids).update(deleted=False)
message = "Record unarchived successfully."
else:
return JsonResponseUtil.error(message="Invalid Action")
return JsonResponseUtil.success(message=message)
class BaseArchiveView(generic.TemplateView):
page_name = iam_constant.RESOURCE_MANAGE_USER
template_name = None
def get_context_data(self, **kwargs):
data = super().get_context_data(**kwargs)
data["principal_id"] = kwargs.get("principal_id")
data["page_name"] = self.page_name
return data
class IntoleranceView(BaseView):
model = Intolerance
template_name = "module_activity/intolerance_list.html"
class CreateOrUpdateIntoleranceView(BaseCreateOrUpdateView):
model = Intolerance
page_title = "Intolerance"
form_class = IntoleranceForm
success_url = "module_activity:intolerance"
class IntoleranceListJson(BaseListJson):
model = Intolerance
class IntoleranceActionView(BaseActionView):
model = Intolerance
class IntoleranceArchiveView(generic.TemplateView):
template_name = "module_activity/intolerance_archive_list.html"
class SymptomsView(BaseView):
model = Symptoms
template_name = "module_activity/symptoms_list.html"
class CreateOrUpdateSymptomsView(BaseCreateOrUpdateView):
model = Symptoms
page_title = "Symptoms"
form_class = SymptomsForm
success_url = "module_activity:symptoms"
class SymptomsListJson(BaseListJson):
model = Symptoms
class SymptomsActionView(BaseActionView):
model = Symptoms
class SymptomsArchiveView(generic.TemplateView):
template_name = "module_activity/symptoms_archive_list.html"
class PastTreatmentView(BaseView):
model = PastTreatment
template_name = "module_activity/past_treatment_list.html"
class CreateOrUpdatePastTreatmentView(BaseCreateOrUpdateView):
model = PastTreatment
page_title = "Past Treatment"
form_class = PastTreatmentForm
success_url = "module_activity:past_treatment"
class PastTreatmentListJson(BaseListJson):
model = PastTreatment
class PastTreatmentActionView(BaseActionView):
model = PastTreatment
class PastTreatmentArchiveView(generic.TemplateView):
template_name = "module_activity/past_treatment_archive_list.html"
class ChronicConditionView(BaseView):
model = ChronicCondition
template_name = "module_activity/chronic_conditon_list.html"
class CreateOrUpdateChronicConditionView(BaseCreateOrUpdateView):
model = ChronicCondition
page_title = "Chronic Conditon/Disease"
form_class = ChronicConditionForm
success_url = "module_activity:chronic_condition"
class ChronicConditionListJson(BaseListJson):
model = ChronicCondition
class ChronicConditionActionView(BaseActionView):
model = ChronicCondition
class ChronicConditionArchiveView(generic.TemplateView):
template_name = "module_activity/chronic_condition_archive_list.html"
class UserActivityRecordView(generic.View):
def serialize_record(self, record):
time_obj = datetime.strptime(str(record.time), "%H:%M:%S")
return {
"id": record.id,
"date": record.date,
"time": time_obj.strftime("%I:%M %p"),
}
def get(self, request, *args, **kwargs):
try:
principal_id = self.kwargs.get("principal_id")
date = request.GET.get("date")
print(
f"principal_id is {principal_id} data is {date} and type is {type(date)}"
)
if not date:
return JsonResponseUtil.error(message="Date parameter is missing")
try:
date_obj = datetime.strptime(date, "%Y-%m-%d").date()
except ValueError:
return JsonResponseUtil.error(message="Invalid date format")
# Retrieve data from different models
meal_records = MealRecord.objects.filter(
principal=principal_id, date=date_obj
)
medication_records = Medication.objects.filter(
principal=principal_id, date=date_obj
)
bowel_records = Bowel.objects.filter(principal=principal_id, date=date_obj)
meal_symptom_records = MealSymptomRecord.objects.filter(
principal=principal_id, date=date_obj
)
print(f"==================meal record {meal_records}")
# Prepare combined results
data = []
for record in meal_records:
data.append({"type": "Meal", **self.serialize_record(record)})
for record in medication_records:
data.append({"type": "Medication", **self.serialize_record(record)})
for record in bowel_records:
data.append({"type": "Bowel", **self.serialize_record(record)})
for record in meal_symptom_records:
data.append({"type": "Symptom", **self.serialize_record(record)})
all_records_sorted = sorted(data, key=lambda x: x["time"], reverse=True)
response_data = {
"recordsTotal": len(all_records_sorted),
"recordsFiltered": len(all_records_sorted),
"data": all_records_sorted,
}
return JsonResponse(response_data)
except Exception as e:
return JsonResponseUtil.error(message="Something went wrong", errors=str(e))
class MealDetialView(generic.TemplateView):
page_name = iam_constant.RESOURCE_MANAGE_USER
template_name = "module_activity/meal_detail.html"
model = MealRecord
def get_record(self):
id = self.kwargs.get("pk")
meal_record = get_object_or_404(
self.model.objects.prefetch_related(
"food_records", "beverage_records", "food_ingredient_records"
),
id=id,
)
return meal_record
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["obj"] = self.get_record()
context["page_name"] = self.page_name
return context
class MedicationDetailView(generic.TemplateView):
page_name = iam_constant.RESOURCE_MANAGE_USER
template_name = "module_activity/medication_detail.html"
model = Medication
def get_record(self):
id = self.kwargs.get("pk")
obj = get_object_or_404(self.model.objects.prefetch_related("medicines"), id=id)
return obj
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["obj"] = self.get_record()
context["page_name"] = self.page_name
return context
class BowelDetailView(generic.TemplateView):
page_name = iam_constant.RESOURCE_MANAGE_USER
template_name = "module_activity/bowel_detail.html"
model = Bowel
def get_record(self):
id = self.kwargs.get("pk")
obj = get_object_or_404(self.model, id=id)
return obj
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["obj"] = self.get_record()
context["page_name"] = self.page_name
return context
class MealSymptomDetailView(generic.TemplateView):
page_name = iam_constant.RESOURCE_MANAGE_USER
template_name = "module_activity/meal_symptom_details.html"
model = MealSymptomRecord
def get_record(self):
pk = self.kwargs.get("pk")
obj = get_object_or_404(
MealSymptomRecord.objects.prefetch_related(
"symptoms_before_meal", "symptoms_after_meal"
),
id=pk,
)
return obj
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["obj"] = self.get_record()
context["page_name"] = self.page_name
return context
class ReportChartView(generic.View):
def get(self, request, *args, **kwargs):
current_year = int(self.request.GET.get("year"))
monthly_counts = {
'Meal': [0] * 12,
'Medication': [0] * 12,
'Symptoms': [0] * 12,
'Bowel': [0] * 12,
}
for month in range(1, 13):
start_date = datetime(current_year, month, 1)
end_date = datetime(current_year, month + 1, 1) if month < 12 else datetime(current_year + 1, 1, 1)
monthly_counts['Meal'][month - 1] = MealRecord.objects.filter(date__range=(start_date, end_date)).count()
monthly_counts['Medication'][month - 1] = Medication.objects.filter(date__range=(start_date, end_date)).count()
monthly_counts['Symptoms'][month - 1] = MealSymptomRecord.objects.filter(date__range=(start_date, end_date)).count()
monthly_counts['Bowel'][month - 1] = Bowel.objects.filter(date__range=(start_date, end_date)).count()
print(f"===========================================================data is {monthly_counts}")
return JsonResponseUtil.success(message=constants.SUCCESS, data=monthly_counts)
class ReportDataView(generic.View):
model = MealRecord
def get_user(self, *args, **kwargs):
print(f"user id is {self.kwargs.get('principal_id')}")
user = IAmPrincipal.objects.filter(id=self.kwargs.get("principal_id")).first()
print(f"user is {user}")
return user
def enough_records_exist(self, start_date, end_date):
min_days_required = 7
current_date = start_date
count = 0
obj = self.model.objects.filter(principal=self.get_user())
while current_date <= end_date:
if obj.filter(date=current_date).exists():
count += 1
if count >= min_days_required:
return True
else:
count = 0 # Reset count if record is missing for any day
current_date += timedelta(days=1)
return False
def get_top_food_avoid(self, start_date, end_date):
"""Get the top food to avoid."""
food_counts = defaultdict(int)
ingredient_counts = defaultdict(int)
beverage_counts = defaultdict(int)
symptom_records = MealSymptomRecord.objects.filter(
principal=self.get_user(), date__range=(start_date, end_date)
)
for symptom_record in symptom_records:
closest_meal = (
MealRecord.objects.filter(
principal=symptom_record.principal, date__lte=symptom_record.date
)
.order_by("-date", "-time")
.first()
)
if closest_meal:
for food_record in closest_meal.food_records.all():
food_counts[food_record.name] += 1
for ingredient_record in closest_meal.food_ingredient_records.all():
ingredient_counts[ingredient_record.name] += 1
for beverage_record in closest_meal.beverage_records.all():
beverage_counts[beverage_record.beverage_type] += 1
# Sort the dictionaries by their values in descending order and getting only top 3 record
food_counts = dict(
sorted(food_counts.items(), key=lambda x: x[1], reverse=True)[:3]
)
ingredient_counts = dict(
sorted(ingredient_counts.items(), key=lambda x: x[1], reverse=True)[:3]
)
beverage_counts = dict(
sorted(beverage_counts.items(), key=lambda x: x[1], reverse=True)[:3]
)
food_avoid = next(iter(food_counts), None)
return food_avoid, food_counts, ingredient_counts, beverage_counts
def get_symptoms_frequency(self, start_date, end_date):
"""Get the frequency of symptoms."""
symptom_records = MealSymptomRecord.objects.filter(
principal=self.get_user(), date__range=(start_date, end_date)
).annotate(
before_meal_count=Count("symptoms_before_meal"),
after_meal_count=Count("symptoms_after_meal"),
)
symptoms_frequency = defaultdict(int)
for record in symptom_records:
for symptom in record.symptoms_before_meal.all():
symptoms_frequency[symptom.name] += record.before_meal_count
for symptom in record.symptoms_after_meal.all():
symptoms_frequency[symptom.name] += record.after_meal_count
sorted_symptoms_frequency = dict(
sorted(symptoms_frequency.items(), key=lambda x: x[1], reverse=True)[:3]
)
return sorted_symptoms_frequency
def get_stool_type_counts(self, start_date, end_date):
"""Get the count of stool types."""
stool_type_counts = (
Bowel.objects.filter(
principal=self.get_user(), date__range=(start_date, end_date)
)
.values("stool_type")
.annotate(stool_type_count=Count("stool_type"))
)
stool_type_counts_dict = {
item["stool_type"]: item["stool_type_count"] for item in stool_type_counts
}
stool_type_counts_sort = dict(
sorted(stool_type_counts_dict.items(), key=lambda x: x[1], reverse=True)[:3]
)
highest_stool = next(iter(stool_type_counts_sort), None)
return stool_type_counts_sort, highest_stool
def get(self, request, *args, **kwargs):
date_range = request.GET.get("date_range")
start_date, end_date = date_utils.get_date_range(date_range)
print(f"start date is {start_date}, end_date is {end_date}")
print(f"is dats exist {self.enough_records_exist(start_date, end_date)}")
if not self.enough_records_exist(start_date, end_date):
print("report does not exist")
return JsonResponseUtil.success(
message="No report is generated. Minimum Previous 7 days of records required.", status=204
)
# Get top food to avoid
food_avoid, food_counts, ingredient_counts, beverage_counts = (
self.get_top_food_avoid(start_date, end_date)
)
# Get symptoms frequency
sorted_symptoms_frequency = self.get_symptoms_frequency(start_date, end_date)
# Get stool type counts
stool_type_counts_sort, highest_stool = self.get_stool_type_counts(
start_date, end_date
)
nested_json = {
"food_avoid": food_avoid,
"same_food_avoid": {
"food": food_counts,
"ingredient": ingredient_counts,
"beverage": beverage_counts,
},
"symptoms_frequency": sorted_symptoms_frequency,
"highest_stool": highest_stool,
"stool_type": stool_type_counts_sort,
}
print(f"nested_json data is {nested_json}")
return JsonResponseUtil.success(message=constants.SUCCESS, data=nested_json)