Files
goodtimes/manage_notifications/utils.py
2024-03-14 13:48:41 +05:30

125 lines
4.4 KiB
Python

import json
import requests
from onesignal_sdk.client import Client as OneSignalClient
from django.conf import settings
from .models import IAmPrincipalNotificationSettings, IAmPrincipal, PrincipalType
def onesignal_send_notification(
title, message, image_url="http://127.0.0.1:8000/", eligible_principals=None
):
onesignal_app_id = settings.ONE_SIGNAL_APP_ID
onesignal_rest_api_key = settings.ONE_SIGNAL_API_KEY
headers = {
"Content-Type": "application/json; charset=utf-8",
"Authorization": f"Basic {onesignal_rest_api_key}",
}
if not eligible_principals:
return None # Or handle this scenario as needed
# Extract OneSignal player IDs
# player_ids = eligible_principals.values_list("player_id", flat=True)
# Construct the notification payload
data = {
"app_id": onesignal_app_id,
"headings": {"en": title},
"contents": {"en": message},
"include_player_ids": eligible_principals, # Include the filtered player IDs
# Add other optional notification data according to OneSignal documentation
}
if image_url:
# Include image URL if provided (requires additional OneSignal configuration)
data["large_icon"] = (
str(image_url.url)
)
# data = json.dumps(data)
print("Data: ", data)
# Send the notification request to OneSignal
response = requests.post(
"https://onesignal.com/api/v1/notifications", headers=headers, json=data
)
if response.status_code == 200:
print("Notification sent successfully!")
return True # Indicate success
else:
print(f"Error sending notification: {response.text}")
return False # Indicate failure
def send_notification(title, message, image_url=None, eligible_principals=None):
# Initialize OneSignal client
onesignal_client = OneSignalClient(
app_id=settings.ONE_SIGNAL_APP_ID, rest_api_key=settings.ONE_SIGNAL_API_KEY
)
if not eligible_principals:
return None # Or handle this scenario as needed
# Extract OneSignal player IDs
player_ids = eligible_principals.values_list("player_id", flat=True)
# Prepare notification payload
notification_payload = {
"headings": {"en": title},
"contents": {"en": message},
"include_player_ids": list(player_ids),
}
if image_url:
notification_payload["big_picture"] = image_url
# Send notification
response = onesignal_client.send_notification(notification_payload)
print(response.status_code, response.body)
return response
def get_eligible_principals_for_notification(push_notification):
principal_type = push_notification.principal_type
notification_category = push_notification.notification_category
if principal_type == PrincipalType.BOTH:
# If BOTH is selected, fetch users categorized under both EVENT_USER and EVENT_MANAGER
event_user_principals = IAmPrincipal.objects.filter(
principal_type__name=PrincipalType.EVENT_USER,
notifications_principal__notification_category=notification_category,
notifications_principal__is_enabled=True,
)
event_manager_principals = IAmPrincipal.objects.filter(
principal_type__name=PrincipalType.EVENT_MANAGER,
notifications_principal__notification_category=notification_category,
notifications_principal__is_enabled=True,
)
# Combine the QuerySets. Use | operator for OR query (union) and distinct() to avoid duplicates.
eligible_principals = (
event_user_principals | event_manager_principals
).distinct()
elif principal_type == PrincipalType.EVENT_USER:
# Fetch only EVENT_USER principals
eligible_principals = IAmPrincipal.objects.filter(
principal_type__name=PrincipalType.EVENT_USER,
notifications_principal__notification_category=notification_category,
notifications_principal__is_enabled=True,
)
elif principal_type == PrincipalType.EVENT_MANAGER:
# Fetch only EVENT_MANAGER principals
eligible_principals = IAmPrincipal.objects.filter(
principal_type__name=PrincipalType.EVENT_MANAGER,
notifications_principal__notification_category=notification_category,
notifications_principal__is_enabled=True,
)
return eligible_principals