Files
digest_app/module_notification/tasks.py

117 lines
3.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import datetime, timedelta
import itertools
from module_project.service import OneSignalService
from .models import InAppNotification
from module_iam.models import IAmPrincipal, IAmPrincipalType
from module_activity.models import MealRecord, Bowel, MealSymptomRecord, Medication
def notification_for_meal():
notification_message = "You havent filled your meal details yet."
current_date = datetime.now()
fifteen_days_ago = current_date - timedelta(days=20)
users = IAmPrincipal.objects.filter(
last_login__gte=fifteen_days_ago,
principal_type=IAmPrincipalType.get_principal_user(),
).values_list("id", flat=True)
meal_obj = MealRecord.objects.filter(date=current_date).values_list(
"principal", flat=True
)
# Remove IDs of users who have recorded meals for the current day
users_without_meals = set(users) - set(meal_obj)
print(f"user id {set(users)}")
print(
f"userwithoutmeal {users_without_meals}"
)
player_ids = list(
IAmPrincipal.objects.filter(
id__in=users_without_meals
).values_list("player_id", flat=True)
)
# removing none from list
player_ids = list(itertools.filterfalse(lambda x: x is None, player_ids))
notifications_to_create = []
if users_without_meals:
for user_id in users_without_meals:
message = notification_message
notifications_to_create.append(
InAppNotification(user_id=user_id, message=message)
)
# Bulk create notifications
if notifications_to_create:
InAppNotification.objects.bulk_create(notifications_to_create)
try:
notification = OneSignalService()
response = notification.send_notification(
headings="Meal",
contents=notification_message,
include_player_ids=player_ids,
)
except Exception as e:
print(f"Notification error in meal {str(e)}")
pass
def notification_for_medication():
notification_message = "You havent filled your medication details yet. "
current_date = datetime.now()
fifteen_days_ago = current_date - timedelta(days=20)
users = IAmPrincipal.objects.filter(
last_login__gte=fifteen_days_ago,
principal_type=IAmPrincipalType.get_principal_user(),
).values_list("id", flat=True)
medication_obj = Medication.objects.filter(date=current_date).values_list(
"principal", flat=True
)
# Remove IDs of users who have recorded medication for the current day
users_without_medications = set(users) - set(medication_obj)
print(f"user id {set(users)}")
print(
f"users_without_medication {users_without_medications}"
)
player_ids = list(
IAmPrincipal.objects.filter(
id__in=users_without_medications
).values_list("player_id", flat=True)
)
# removing none from list
player_ids = list(itertools.filterfalse(lambda x: x is None, player_ids))
notifications_to_create = []
if users_without_medications:
for user_id in users_without_medications:
message = notification_message
notifications_to_create.append(
InAppNotification(user_id=user_id, message=message)
)
# Bulk create notifications
if notifications_to_create:
InAppNotification.objects.bulk_create(notifications_to_create)
try:
notification = OneSignalService()
response = notification.send_notification(
headings="Medication",
contents=notification_message,
include_player_ids=player_ids,
)
except Exception as e:
print(f"Notification error in medication {str(e)}")
pass
def testing_cron():
print("cron is working ")