Files
digest_app/module_activity/views.py

738 lines
26 KiB
Python

import csv
import logging
from collections import defaultdict
from datetime import datetime, timedelta
import pandas as pd
from django.contrib import messages
from django.contrib.auth.mixins import LoginRequiredMixin
from django.db.models import Count, Prefetch, Q
from django.http import JsonResponse
from django.shortcuts import get_object_or_404, redirect, render
from django.urls import reverse_lazy
from django.views import generic
from django_datatables_view.base_datatable_view import BaseDatatableView
from module_iam import iam_constant
from module_iam.models import IAmPrincipal
from module_project import constants, date_utils
from module_project.utils import JsonResponseUtil
from .forms import (ChronicConditionForm, IntoleranceForm, PastTreatmentForm,
SymptomsForm, UploadFileForm)
from .models import (Bowel, ChronicCondition, FoodIngredintDataset,
Intolerance, MealRecord, MealSymptomRecord, Medication,
PastTreatment, Symptoms)
logger = logging.getLogger(__name__)
class BaseView(generic.TemplateView):
page_name = iam_constant.RESOURCE_MANAGE_USER
resource = None
action = None
template_name = None
model = None
context_objext_name = "obj"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["page_name"] = self.page_name
context["principal_id"] = get_object_or_404(IAmPrincipal, id=self.kwargs.get("principal_id"))
return context
class BaseCreateOrUpdateView(LoginRequiredMixin, generic.View):
page_name = iam_constant.RESOURCE_MANAGE_USER
page_title = None
model = None
template_name = "module_activity/base_add.html"
form_class = None
success_url = None
error_message = "An error occurred while saving the data."
def get_success_message(self):
self.success_message = (
constants.RECORD_CREATED if not self.object else constants.RECORD_UPDATED
)
return self.success_message
def get_object(self):
pk = self.kwargs.get("pk")
return get_object_or_404(self.model, pk=pk) if pk else None
def get_context_data(self, **kwargs):
context = {
"page_name": self.page_name,
"page_title": self.page_title,
"principal_id": self.kwargs.get("principal_id"),
"operation": "Add" if not self.object else "Edit",
}
context.update(kwargs) # Include any additional context data passed to the view
return context
def get(self, request, *args, **kwargs):
self.object = self.get_object()
form = self.form_class(instance=self.object)
context = self.get_context_data(form=form)
return render(request, self.template_name, context=context)
def post(self, request, *args, **kwargs):
principal_id = kwargs.get("principal_id")
self.object = self.get_object()
form = self.form_class(request.POST, instance=self.object)
if not form.is_valid():
print(form.errors)
context = self.get_context_data(form=form)
return render(request, self.template_name, context=context)
form.save(principal_id=principal_id)
messages.success(self.request, self.get_success_message())
success_url = reverse_lazy(
self.success_url, kwargs={"principal_id": principal_id}
)
return redirect(success_url)
class BaseListJson(BaseDatatableView):
model = Intolerance
columns = ["id", "name", "duration", "active", "deleted"]
order_columns = ["id", "name", "duration", "active", "deleted"]
def get_initial_queryset(self):
principal_id = self.kwargs.get("principal_id")
deleted_flag = self.request.GET.get("deleted_flag", None)
return self.model.objects.filter(principal=principal_id, deleted=deleted_flag)
def filter_queryset(self, qs):
search_value = self.request.GET.get("search[value]", None)
if search_value:
qs = qs.filter(
Q(name__icontains=search_value) | Q(duration__icontains=search_value)
)
return qs
class BaseActionView(generic.View):
model = None
def post(self, request, *args, **kwargs):
if self.model is None:
raise NotImplementedError(
"Subclasses of BaseActionView must define a 'model' attribute."
)
action = request.POST.get("action") # 'archive', 'active', or 'unarchive'
ids = request.POST.getlist("ids[]") # List of user IDs to perform action on
active = request.POST.get("active")
print(f"arhive action {action} and id is {ids} and active data is {active}")
if action == "archive":
# Update 'deleted' field to True for the selected users
self.model.objects.filter(id__in=ids).update(deleted=True, active=False)
message = "Record archived successfully."
elif action == "active":
# Update 'active' field to True for the selected users
self.model.objects.filter(id__in=ids).update(active=active.capitalize())
message = "Record activated successfully."
elif action == "unarchive":
# Update 'deleted' field to False for the selected users
self.model.objects.filter(id__in=ids).update(deleted=False)
message = "Record unarchived successfully."
else:
return JsonResponseUtil.error(message="Invalid Action")
return JsonResponseUtil.success(message=message)
class BaseArchiveView(generic.TemplateView):
page_name = iam_constant.RESOURCE_MANAGE_USER
template_name = None
def get_context_data(self, **kwargs):
data = super().get_context_data(**kwargs)
data["principal_id"] = kwargs.get("principal_id")
data["page_name"] = self.page_name
return data
class PopulateFoodIngredientView(generic.View):
# Set the page_name and resource
page_name = iam_constant.RESOURCE_MANAGE_DASHBOARD
resource = iam_constant.RESOURCE_MANAGE_DASHBOARD
template_name = "module_activity/food_ingredient_form.html"
model = FoodIngredintDataset
form_class = UploadFileForm
success_url = reverse_lazy("module_iam:dashboard")
error_message = "An error occurred while saving the data."
# Add page_name and operation to the context
def get_context_data(self, **kwargs):
context = {
"page_name": self.page_name,
"operation": "Add",
}
context.update(kwargs) # Include any additional context data passed to the view
return context
def get(self, request, *args, **kwargs):
form = self.form_class()
context = self.get_context_data(form=form)
return render(request, self.template_name, context=context)
def post(self, request, *args, **kwargs):
print("Request data: ", request.POST)
form = self.form_class(request.POST, request.FILES)
if not form.is_valid():
print(form.errors)
context = self.get_context_data(form=form)
return render(request, self.template_name, context=context)
uploaded_file = request.FILES['file']
# decoded_file = uploaded_file.read().decode('utf-8', errors='ignore').splitlines()
# csv_reader = csv.reader(decoded_file)
df = pd.read_excel(uploaded_file)
for index, row in df.iterrows():
food_name = row['Food'].strip().capitalize()
ingredients_str = row['Ingredients'].strip()
ingredients = [ingredient.strip().capitalize() for ingredient in ingredients_str.split(',')]
if self.model.objects.filter(food_name=food_name).exists():
continue
print(f"{food_name} : {ingredients}")
food = self.model(food_name=food_name, ingredients=ingredients)
food.save()
messages.success(self.request, constants.RECORD_CREATED)
return redirect(self.success_url)
class IntoleranceView(BaseView):
model = Intolerance
template_name = "module_activity/intolerance_list.html"
class CreateOrUpdateIntoleranceView(BaseCreateOrUpdateView):
model = Intolerance
page_title = "Intolerance"
form_class = IntoleranceForm
success_url = "module_activity:intolerance"
class IntoleranceListJson(BaseListJson):
model = Intolerance
class IntoleranceActionView(BaseActionView):
model = Intolerance
class IntoleranceArchiveView(generic.TemplateView):
template_name = "module_activity/intolerance_archive_list.html"
class SymptomsView(BaseView):
model = Symptoms
template_name = "module_activity/symptoms_list.html"
class CreateOrUpdateSymptomsView(BaseCreateOrUpdateView):
model = Symptoms
page_title = "Symptoms"
form_class = SymptomsForm
success_url = "module_activity:symptoms"
class SymptomsListJson(BaseListJson):
model = Symptoms
class SymptomsActionView(BaseActionView):
model = Symptoms
class SymptomsArchiveView(generic.TemplateView):
template_name = "module_activity/symptoms_archive_list.html"
class PastTreatmentView(BaseView):
model = PastTreatment
template_name = "module_activity/past_treatment_list.html"
class CreateOrUpdatePastTreatmentView(BaseCreateOrUpdateView):
model = PastTreatment
page_title = "Past Treatment"
form_class = PastTreatmentForm
success_url = "module_activity:past_treatment"
class PastTreatmentListJson(BaseListJson):
model = PastTreatment
class PastTreatmentActionView(BaseActionView):
model = PastTreatment
class PastTreatmentArchiveView(generic.TemplateView):
template_name = "module_activity/past_treatment_archive_list.html"
class ChronicConditionView(BaseView):
model = ChronicCondition
template_name = "module_activity/chronic_conditon_list.html"
class CreateOrUpdateChronicConditionView(BaseCreateOrUpdateView):
model = ChronicCondition
page_title = "Chronic Conditon/Disease"
form_class = ChronicConditionForm
success_url = "module_activity:chronic_condition"
class ChronicConditionListJson(BaseListJson):
model = ChronicCondition
class ChronicConditionActionView(BaseActionView):
model = ChronicCondition
class ChronicConditionArchiveView(generic.TemplateView):
template_name = "module_activity/chronic_condition_archive_list.html"
class MealView(BaseView):
template_name = "module_activity/meal_list.html"
class MealListJsonView(BaseDatatableView):
model = MealRecord
columns = ["id", "date", "time", "meal_type"]
order_columns = ["id", "date", "time", "meal_type"]
def get_initial_queryset(self):
principal_id = self.kwargs.get("principal_id")
deleted_flag = self.request.GET.get("deleted_flag", None)
return self.model.objects.filter(principal=principal_id, deleted=deleted_flag)
def ordering(self, qs):
qs = super().ordering(qs)
order = self.request.GET.get('order[0][dir]', None)
if order:
column_index = int(self.request.GET.get('order[0][column]', None)) - 1
order_column = self.order_columns[column_index]
if order == "asc":
qs = qs.order_by(order_column)
elif order == "desc":
qs = qs.order_by("-" + order_column)
return qs
class MedicationView(BaseView):
template_name = "module_activity/medication_list.html"
class MedicationListJsonView(BaseDatatableView):
model = Medication
columns = ["id", "date", "time"]
order_columns = ["id", "date", "time"]
def get_initial_queryset(self):
principal_id = self.kwargs.get("principal_id")
deleted_flag = self.request.GET.get("deleted_flag", None)
return self.model.objects.filter(principal=principal_id, deleted=deleted_flag)
def ordering(self, qs):
order = self.request.GET.get('order[0][dir]', None)
if order:
column_index = int(self.request.GET.get('order[0][column]', None)) - 1
order_column = self.order_columns[column_index]
if order == "asc":
qs = qs.order_by(order_column)
elif order == "desc":
qs = qs.order_by("-" + order_column)
return qs
class BowelView(BaseView):
template_name = "module_activity/bowel_list.html"
class BowelListJsonView(BaseDatatableView):
model = Bowel
columns = ["id", "date", "time", "stool_type"]
order_columns = ["id", "date", "time", "stool_type"]
def get_initial_queryset(self):
principal_id = self.kwargs.get("principal_id")
deleted_flag = self.request.GET.get("deleted_flag", None)
return self.model.objects.filter(principal=principal_id, deleted=deleted_flag)
def ordering(self, qs):
order = self.request.GET.get('order[0][dir]', None)
if order:
column_index = int(self.request.GET.get('order[0][column]', None)) - 1
order_column = self.order_columns[column_index]
if order == "asc":
qs = qs.order_by(order_column)
elif order == "desc":
qs = qs.order_by("-" + order_column)
return qs
class MealSymptomsView(BaseView):
template_name = "module_activity/meal_symptoms_list.html"
class MealSymptomsListJsonView(BaseDatatableView):
model = MealSymptomRecord
columns = ["id", "date", "time"]
order_columns = ["id", "date", "time"]
def get_initial_queryset(self):
principal_id = self.kwargs.get("principal_id")
deleted_flag = self.request.GET.get("deleted_flag", None)
return self.model.objects.filter(principal=principal_id, deleted=deleted_flag)
def ordering(self, qs):
order = self.request.GET.get('order[0][dir]', None)
if order:
column_index = int(self.request.GET.get('order[0][column]', None)) - 1
order_column = self.order_columns[column_index]
if order == "asc":
qs = qs.order_by(order_column)
elif order == "desc":
qs = qs.order_by("-" + order_column)
return qs
class UserActivityRecordView(generic.View):
def serialize_record(self, record):
time_obj = datetime.strptime(str(record.time), "%H:%M:%S")
return {
"id": record.id,
"date": record.date,
"time": time_obj.strftime("%I:%M %p"),
}
def get(self, request, *args, **kwargs):
try:
principal_id = self.kwargs.get("principal_id")
date_range = request.GET.get("date_range")
if not date_range:
return JsonResponseUtil.error(message="Date range parameter is missing")
start_date, end_date = date_utils.get_date_range(date_range)
# Retrieve data from different models
meal_records = MealRecord.objects.filter(
principal=principal_id, date__range=(start_date, end_date)
)
medication_records = Medication.objects.filter(
principal=principal_id, date__range=(start_date, end_date)
)
bowel_records = Bowel.objects.filter(principal=principal_id, date__range=(start_date, end_date))
meal_symptom_records = MealSymptomRecord.objects.filter(
principal=principal_id, date__range=(start_date, end_date)
)
print(f"==================meal record {meal_records}")
# Prepare combined results
data = []
for record in meal_records:
data.append({"type": "Meal", **self.serialize_record(record)})
for record in medication_records:
data.append({"type": "Medication", **self.serialize_record(record)})
for record in bowel_records:
data.append({"type": "Bowel", **self.serialize_record(record)})
for record in meal_symptom_records:
data.append({"type": "Symptom", **self.serialize_record(record)})
all_records_sorted = sorted(data, key=lambda x: x["time"], reverse=True)
response_data = {
"recordsTotal": len(all_records_sorted),
"recordsFiltered": len(all_records_sorted),
"data": all_records_sorted,
}
return JsonResponse(response_data)
except Exception as e:
return JsonResponseUtil.error(message="Something went wrong", errors=str(e))
class MealDetialView(generic.TemplateView):
page_name = iam_constant.RESOURCE_MANAGE_USER
template_name = "module_activity/meal_detail.html"
model = MealRecord
def get_record(self):
id = self.kwargs.get("pk")
meal_record = get_object_or_404(
self.model.objects.prefetch_related(
"food_records", "beverage_records", "food_ingredient_records"
),
id=id,
)
return meal_record
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["obj"] = self.get_record()
context["page_name"] = self.page_name
return context
class MedicationDetailView(generic.TemplateView):
page_name = iam_constant.RESOURCE_MANAGE_USER
template_name = "module_activity/medication_detail.html"
model = Medication
def get_record(self):
id = self.kwargs.get("pk")
obj = get_object_or_404(self.model.objects.prefetch_related("medicines"), id=id)
return obj
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["obj"] = self.get_record()
context["page_name"] = self.page_name
return context
class BowelDetailView(generic.TemplateView):
page_name = iam_constant.RESOURCE_MANAGE_USER
template_name = "module_activity/bowel_detail.html"
model = Bowel
def get_record(self):
id = self.kwargs.get("pk")
obj = get_object_or_404(self.model, id=id)
print(f"obj data of bowel is {obj}")
return obj
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["obj"] = self.get_record()
context["page_name"] = self.page_name
return context
class MealSymptomDetailView(generic.TemplateView):
page_name = iam_constant.RESOURCE_MANAGE_USER
template_name = "module_activity/meal_symptom_details.html"
model = MealSymptomRecord
def get_record(self):
pk = self.kwargs.get("pk")
obj = get_object_or_404(
MealSymptomRecord.objects.prefetch_related(
"symptoms_before_meal", "symptoms_after_meal"
),
id=pk,
)
return obj
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["obj"] = self.get_record()
context["page_name"] = self.page_name
return context
class ReportChartView(generic.View):
def get(self, request, *args, **kwargs):
current_year = int(self.request.GET.get("year"))
monthly_counts = {
'Meal': [0] * 12,
'Medication': [0] * 12,
'Symptoms': [0] * 12,
'Bowel': [0] * 12,
}
for month in range(1, 13):
start_date = datetime(current_year, month, 1)
end_date = datetime(current_year, month + 1, 1) if month < 12 else datetime(current_year + 1, 1, 1)
monthly_counts['Meal'][month - 1] = MealRecord.objects.filter(date__range=(start_date, end_date)).count()
monthly_counts['Medication'][month - 1] = Medication.objects.filter(date__range=(start_date, end_date)).count()
monthly_counts['Symptoms'][month - 1] = MealSymptomRecord.objects.filter(date__range=(start_date, end_date)).count()
monthly_counts['Bowel'][month - 1] = Bowel.objects.filter(date__range=(start_date, end_date)).count()
print(f"===========================================================data is {monthly_counts}")
return JsonResponseUtil.success(message=constants.SUCCESS, data=monthly_counts)
class ReportDataView(generic.View):
model = MealRecord
def get_user(self, *args, **kwargs):
print(f"user id is {self.kwargs.get('principal_id')}")
user = IAmPrincipal.objects.filter(id=self.kwargs.get("principal_id")).first()
print(f"user is {user}")
return user
def enough_records_exist(self, start_date, end_date):
min_days_required = 7
current_date = start_date
count = 0
obj = self.model.objects.filter(principal=self.get_user())
while current_date <= end_date:
if obj.filter(date=current_date).exists():
count += 1
if count >= min_days_required:
return True
else:
count = 0 # Reset count if record is missing for any day
current_date += timedelta(days=1)
return False
def get_top_food_avoid(self, start_date, end_date):
"""Get the top food to avoid."""
food_counts = defaultdict(int)
ingredient_counts = defaultdict(int)
beverage_counts = defaultdict(int)
symptom_records = MealSymptomRecord.objects.filter(
principal=self.get_user(), date__range=(start_date, end_date)
)
for symptom_record in symptom_records:
closest_meal = (
MealRecord.objects.filter(
principal=symptom_record.principal, date__lte=symptom_record.date
)
.order_by("-date", "-time")
.first()
)
if closest_meal:
for food_record in closest_meal.food_records.all():
food_counts[food_record.name] += 1
for ingredient_record in closest_meal.food_ingredient_records.all():
ingredient_counts[ingredient_record.name] += 1
for beverage_record in closest_meal.beverage_records.all():
beverage_counts[beverage_record.beverage_type] += 1
# Sort the dictionaries by their values in descending order and getting only top 3 record
food_counts = dict(
sorted(food_counts.items(), key=lambda x: x[1], reverse=True)[:3]
)
ingredient_counts = dict(
sorted(ingredient_counts.items(), key=lambda x: x[1], reverse=True)[:3]
)
beverage_counts = dict(
sorted(beverage_counts.items(), key=lambda x: x[1], reverse=True)[:3]
)
food_avoid = next(iter(food_counts), None)
return food_avoid, food_counts, ingredient_counts, beverage_counts
def get_symptoms_frequency(self, start_date, end_date):
"""Get the frequency of symptoms."""
symptom_records = MealSymptomRecord.objects.filter(
principal=self.get_user(), date__range=(start_date, end_date)
).annotate(
before_meal_count=Count("symptoms_before_meal"),
after_meal_count=Count("symptoms_after_meal"),
)
symptoms_frequency = defaultdict(int)
for record in symptom_records:
for symptom in record.symptoms_before_meal.all():
symptoms_frequency[symptom.name] += record.before_meal_count
for symptom in record.symptoms_after_meal.all():
symptoms_frequency[symptom.name] += record.after_meal_count
sorted_symptoms_frequency = dict(
sorted(symptoms_frequency.items(), key=lambda x: x[1], reverse=True)[:3]
)
return sorted_symptoms_frequency
def get_stool_type_counts(self, start_date, end_date):
"""Get the count of stool types."""
stool_type_counts = (
Bowel.objects.filter(
principal=self.get_user(), date__range=(start_date, end_date)
)
.values("stool_type")
.annotate(stool_type_count=Count("stool_type"))
)
stool_type_counts_dict = {
item["stool_type"]: item["stool_type_count"] for item in stool_type_counts
}
stool_type_counts_sort = dict(
sorted(stool_type_counts_dict.items(), key=lambda x: x[1], reverse=True)[:3]
)
highest_stool = next(iter(stool_type_counts_sort), None)
return stool_type_counts_sort, highest_stool
def get(self, request, *args, **kwargs):
date_range = request.GET.get("date_range")
start_date, end_date = date_utils.get_date_range(date_range)
print(f"start date is {start_date}, end_date is {end_date}")
print(f"is dats exist {self.enough_records_exist(start_date, end_date)}")
if not self.enough_records_exist(start_date, end_date):
print("report does not exist")
return JsonResponseUtil.success(
message="No report is generated. Minimum Previous 7 days of records required.", status=202
)
# Get top food to avoid
food_avoid, food_counts, ingredient_counts, beverage_counts = (
self.get_top_food_avoid(start_date, end_date)
)
# Get symptoms frequency
sorted_symptoms_frequency = self.get_symptoms_frequency(start_date, end_date)
# Get stool type counts
stool_type_counts_sort, highest_stool = self.get_stool_type_counts(
start_date, end_date
)
nested_json = {
"food_avoid": food_avoid,
"same_food_avoid": {
"food": food_counts,
"ingredient": ingredient_counts,
"beverage": beverage_counts,
},
"symptoms_frequency": sorted_symptoms_frequency,
"highest_stool": highest_stool,
"stool_type": stool_type_counts_sort,
}
print(f"nested_json data is {nested_json}")
return JsonResponseUtil.success(message=constants.SUCCESS, data=nested_json)