91 lines
3.2 KiB
Python
91 lines
3.2 KiB
Python
from decimal import Decimal
|
|
from django.conf import settings
|
|
import stripe
|
|
from django.core.exceptions import ObjectDoesNotExist
|
|
from accounts.models import IAmPrincipal
|
|
from manage_coupons.models import Coupon
|
|
from manage_subscriptions.models import Subscription
|
|
import logging
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
stripe.api_key = settings.STRIPE_SECRET_KEY
|
|
|
|
|
|
class WebhookService:
|
|
def __init__(self, webhook_data):
|
|
self._webhook_data = webhook_data
|
|
self._event_type = webhook_data["type"]
|
|
self._charge_data = webhook_data["data"]["object"]
|
|
self._metadata = self._fetch_metadata()
|
|
|
|
def _fetch_metadata(self):
|
|
"""Fetch metadata based on the event type."""
|
|
if self._event_type == "checkout.session.completed":
|
|
return self._charge_data.get("metadata", {})
|
|
elif self._event_type == "invoice.payment_succeeded":
|
|
subscription_id = self._charge_data.get("subscription")
|
|
if subscription_id:
|
|
subscription = stripe.Subscription.retrieve(subscription_id)
|
|
return subscription.get("metadata", {})
|
|
return {}
|
|
|
|
@property
|
|
def event_type(self):
|
|
return self._event_type
|
|
|
|
@property
|
|
def charge_data(self):
|
|
return self._charge_data
|
|
|
|
def _get_object_from_metadata(self, model, id_key):
|
|
"""Retrieve object from metadata."""
|
|
obj_id = self._metadata.get(id_key)
|
|
if obj_id:
|
|
try:
|
|
return model.objects.get(id=int(obj_id))
|
|
except (ObjectDoesNotExist, ValueError) as e:
|
|
logger.error(f"Invalid {model.__name__} ID: {obj_id}")
|
|
raise ValueError(f"Invalid {model.__name__} ID: {obj_id}") from e
|
|
return None
|
|
|
|
def get_event_type(self):
|
|
return self.event_type
|
|
|
|
def get_principal(self):
|
|
"""Retrieve principal from metadata."""
|
|
return self._get_object_from_metadata(IAmPrincipal, "principal")
|
|
|
|
def get_subscription(self):
|
|
"""Retrieve subscription from metadata."""
|
|
return self._get_object_from_metadata(Subscription, "subscription_id")
|
|
|
|
def get_coupon(self):
|
|
"""Retrieve coupon from metadata."""
|
|
coupon_code = self._metadata.get("couponCode")
|
|
print("get_coupon:coupon_code: ", coupon_code)
|
|
if coupon_code:
|
|
try:
|
|
return Coupon.objects.get(coupon_code=coupon_code)
|
|
except Coupon.DoesNotExist:
|
|
logger.error(f"Invalid coupon code: {coupon_code}")
|
|
raise ValueError(f"Invalid coupon code: {coupon_code}")
|
|
return None
|
|
|
|
def get_final_amount(self):
|
|
"""Retrieve Amount after coupon discount from either stripe event or metadata."""
|
|
if self.event_type == "checkout.session.completed":
|
|
return (
|
|
Decimal(self._charge_data.get("amount_total", 0)) / 100
|
|
)
|
|
elif self.event_type == "invoice.payment_succeeded":
|
|
return (
|
|
Decimal(self._charge_data.get("amount_paid", 0)) / 100
|
|
)
|
|
|
|
# Fallback: Try to get the amount from metadata
|
|
return (
|
|
Decimal(self._metadata.get("metadata", {}).get("finalAmount", 0)) / 100
|
|
)
|