Merge pull request #75 from WDI-Ideas/development

Development
This commit is contained in:
BOBBY VISHWAKARMA
2024-07-25 00:54:18 +05:30
committed by GitHub
13 changed files with 573 additions and 9 deletions

View File

@@ -1,5 +1,7 @@
import random import random
import requests
import googlemaps import googlemaps
import tweepy
from django.conf import settings from django.conf import settings
from django.core.files.uploadedfile import UploadedFile from django.core.files.uploadedfile import UploadedFile
from django.core.mail import EmailMessage from django.core.mail import EmailMessage
@@ -745,7 +747,7 @@ class GoogleMapsservice:
def get_place_id_from_coordinates(self, latitude, longitude): def get_place_id_from_coordinates(self, latitude, longitude):
""" """
Get the place ID of the given coordinates. Get the place ID of the given coordinates.
:param latitude: Latitude of the location :param latitude: Latitude of the location
:param longitude: Longitude of the location :param longitude: Longitude of the location
:return: Place ID :return: Place ID
@@ -817,3 +819,268 @@ class GoogleMapsservice:
# queryset = queryset.order_by(preserved_order) # queryset = queryset.order_by(preserved_order)
return queryset return queryset
class TwitterAPI:
def __init__(self):
self.api_key = settings.TWITTER_API_KEY
self.api_secret_key = settings.TWITTER_API_SECRET_KEY
self.access_token = settings.TWITTER_ACCESS_TOKEN
self.access_token_secret = settings.TWITTER_ACCESS_TOKEN_SECRET
self.client, self.api = self._setup_api()
def _setup_api(self):
client = tweepy.Client(
consumer_key=self.api_key,
consumer_secret=self.api_secret_key,
access_token=self.access_token,
access_token_secret=self.access_token_secret,
)
auth = tweepy.OAuth1UserHandler(
self.api_key,
self.api_secret_key,
self.access_token,
self.access_token_secret,
)
api = tweepy.API(auth, wait_on_rate_limit=True)
return client, api
def post_text_tweet(self, caption):
tweet = self.client.create_tweet(text=caption)
return tweet
def post_image_with_caption(self, image_url, caption):
media = self.api.media_upload(image_url)
tweet = self.client.create_tweet(text=caption, media_ids=[media.media_id])
return tweet
class TwitterPoster:
def __init__(self, twitter_api):
self.twitter_api = twitter_api
def post_text_tweet(self, caption):
try:
tweet = self.twitter_api.post_text_tweet(caption)
return {'success': True, 'message': 'Tweet posted successfully!'}
except tweepy.TweepyException as e:
return {'success': False, 'message': f'Error posting tweet: {e}'}
def post_image_with_caption(self, image_url, caption):
try:
tweet = self.twitter_api.post_image_with_caption(image_url, caption)
return {'success': True, 'message': 'Tweet posted successfully!'}
except tweepy.TweepyException as e:
return {'success': False, 'message': f'Error posting tweet: {e}'}
class FacebookAPI:
def __init__(self):
self.app_id = settings.FACEBOOK_APP_ID
self.app_secret = settings.FACEBOOK_APP_SECRET
self.page_id = settings.FACEBOOK_PAGE_ID
self.page_access_token = None
def _get_short_lived_user_access_token(self):
try:
url = f"https://graph.facebook.com/oauth/access_token?grant_type=client_credentials&client_id={self.app_id}&client_secret={self.app_secret}"
response = requests.get(url)
response.raise_for_status()
print(f"short lived token {response.json()}")
return response.json()['access_token']
except requests.exceptions.RequestException as e:
print(f"Error getting short-lived user access token: {e}")
return None
def _get_long_lived_user_access_token(self, short_lived_token):
try:
url = f"https://graph.facebook.com/v20.0/oauth/access_token?grant_type=fb_exchange_token&client_id={self.app_id}&client_secret={self.app_secret}&fb_exchange_token={short_lived_token}"
response = requests.get(url)
response.raise_for_status()
print(f"long lived access token : {response.json()}")
return response.json()['access_token']
except requests.exceptions.RequestException as e:
print(f"Error getting long-lived user access token: {e}")
return None
def _get_page_access_token(self, long_lived_token):
url = f"https://graph.facebook.com/{self.page_id}?fields=access_token&access_token={long_lived_token}"
response = requests.get(url)
# response.raise_for_status()
print(f"page access token is {response.json()}")
# self.page_access_token = response.json()["access_token"]
def authenticate(self):
# short_lived_token = self._get_short_lived_user_access_token()
# if not short_lived_token:
# return False
# long_lived_token = self._get_long_lived_user_access_token(short_lived_token)
# if not long_lived_token:
# return False
# self._get_page_access_token(short_lived_token)
self.page_access_token = settings.FACEBOOK_ACCESS_TOKEN
return True
def post_photo(self, image_url, caption):
if not self.page_access_token:
print("Page access token not obtained. Call authenticate() first.")
return False
try:
url = f"https://graph.facebook.com/v20.0/{self.page_id}/photos"
params = {
"message": caption,
"url": image_url,
"access_token": self.page_access_token,
}
response = requests.post(url, params=params)
response.raise_for_status()
result = response.json()
if "id" not in result:
print(f"Error posting photo: {result}")
return False
print(f"Data posted successfully. Post Id: {result['id']}")
return True
except requests.exceptions.RequestException as e:
print(f"Error posting photo: {e}")
return False
class FacebookPoster:
def __init__(self, facebook_api):
self.facebook_api = facebook_api
def post_photo(self, image_url, caption):
if not self.facebook_api.authenticate():
print("Authentication failed. Please try again.")
return {'success': False, 'message': 'Error posting photo. Authenticate failed'}
result = self.facebook_api.post_photo(image_url, caption)
if not result:
return {'success': False, 'message': 'Error posting photo'}
return {'success': True, 'message': 'Photo posted successfully'}
# import requests
# app_id = "YOUR_APP_ID"
# app_secret = "YOUR_APP_SECRET"
# page_id = "YOUR_PAGE_ID" # You need to specify the page ID
# # Step 1: Get an App Access Token
# response = requests.get(f"https://graph.facebook.com/oauth/access_token?client_id={app_id}&client_secret={app_secret}&grant_type=client_credentials")
# app_access_token = response.json()["access_token"]
# # Step 2: Get a Page Access Token
# response = requests.get(f"https://graph.facebook.com/{page_id}?fields=access_token&access_token={app_access_token}")
# page_access_token = response.json()["access_token"]
# # Use the Page Access Token to query the Page node
# response = requests.get(f"https://graph.facebook.com/{page_id}?access_token={page_access_token}")
# page_data = response.json()
# print(page_data)
class InstagramAPI:
def __init__(self):
self.app_id = settings.FACEBOOK_APP_ID
self.app_secret = settings.FACEBOOK_APP_SECRET
self.page_id = settings.INSTAGRAM_PAGE_ID
self.page_access_token = None
def _get_short_lived_user_access_token(self):
try:
url = f"https://graph.facebook.com/oauth/access_token"
params = {
"grant_type": "client_credentials",
"client_id": self.app_id,
"client_secret": self.app_secret
}
response = requests.get(url, params=params)
response.raise_for_status()
print(f"Short-lived token: {response.json()}")
return response.json()['access_token']
except requests.exceptions.RequestException as e:
print(f"Error getting short-lived user access token: {e}")
return None
def _get_long_lived_user_access_token(self, short_lived_token):
try:
url = f"https://graph.facebook.com/v20.0/oauth/access_token"
params = {
"grant_type": "fb_exchange_token",
"client_id": self.app_id,
"client_secret": self.app_secret,
"fb_exchange_token": short_lived_token
}
response = requests.get(url, params=params)
response.raise_for_status()
print(f"Long-lived access token: {response.json()}")
return response.json()['access_token']
except requests.exceptions.RequestException as e:
print(f"Error getting long-lived user access token: {e}")
return None
def _get_page_access_token(self, long_lived_token):
try:
url = f"https://graph.facebook.com/{self.page_id}"
params = {
"fields": "access_token",
"access_token": long_lived_token
}
response = requests.get(url, params=params)
response.raise_for_status()
print(f"Page access token: {response.json()}")
return response.json()["access_token"]
except requests.exceptions.RequestException as e:
print(f"Error getting page access token: {e}")
return None
def authenticate(self):
# short_lived_token = self._get_short_lived_user_access_token()
# if not short_lived_token:
# return False
# long_lived_token = self._get_long_lived_user_access_token(short_lived_token)
# if not long_lived_token:
# return False
# self.page_access_token = self._get_page_access_token(long_lived_token)
self.page_access_token = settings.FACEBOOK_ACCESS_TOKEN
return True
def post_image_with_caption(self, image_path, caption):
image_path="https://admin.goodtimesltd.co.uk/static/img/goodtimes.png"
if not self.page_access_token:
print("Page access token not obtained. Call Authenticate() first.")
return False
try:
url = f"https://graph.facebook.com/v20.0/{self.page_id}/media"
params = {
"caption": caption,
"image_url": image_path,
"access_token": self.page_access_token
}
response = requests.post(url, data=params)
response.raise_for_status()
result = response.json()
print(f"Post image with caption result: {result['id']}")
url = f"https://graph.facebook.com/v20.0/{self.page_id}/media_publish"
params = {
"creation_id": result["id"],
"access_token": self.page_access_token
}
response = requests.post(url, params=params)
response.raise_for_status()
result = response.json()
return True
except requests.exceptions.RequestException as e:
print(f"Error posting photo on instagram: {e}")
return False
class InstagramPoster:
def __init__(self, instagram_api):
self.instagram_api = instagram_api
def post_image_with_caption(self, image_path, caption):
if not self.instagram_api.authenticate():
print("Instagram API authentication failed.")
return {'success': False, 'message': 'Error posting photo. Authenticate failed'}
result = self.instagram_api.post_image_with_caption(image_path, caption)
if not result:
return {'success': False, 'message': 'Error posting photo.'}
return {'success': True, 'message': 'Photo posted successfully'}

View File

@@ -339,3 +339,18 @@ PLACES_MAPS_API_KEY = env.str("GOOGLE_MAPS_API_KEY")
PLACES_MAP_WIDGET_HEIGHT = 480 PLACES_MAP_WIDGET_HEIGHT = 480
PLACES_MAP_OPTIONS = '{"center": { "lat": 38.971584, "lng": -95.235072 }, "zoom": 10}' PLACES_MAP_OPTIONS = '{"center": { "lat": 38.971584, "lng": -95.235072 }, "zoom": 10}'
PLACES_MARKER_OPTIONS = '{"draggable": true}' PLACES_MARKER_OPTIONS = '{"draggable": true}'
# twitter keys
TWITTER_API_KEY = env.str("TWITTER_API_KEY")
TWITTER_API_SECRET_KEY = env.str("TWITTER_API_SECRET_KEY")
TWITTER_ACCESS_TOKEN = env.str("TWITTER_ACCESS_TOKEN")
TWITTER_ACCESS_TOKEN_SECRET = env.str("TWITTER_ACCESS_TOKEN_SECRET")
# facebook keys
FACEBOOK_APP_ID = env.str("FACEBOOK_APP_ID")
FACEBOOK_APP_SECRET = env.str("FACEBOOK_APP_SECRET")
FACEBOOK_PAGE_ID = env.str("FACEBOOK_PAGE_ID")
FACEBOOK_ACCESS_TOKEN = env.str("FACEBOOK_ACCESS_TOKEN")
# Instagram Key
INSTAGRAM_PAGE_ID = env.str('INSTAGRAM_PAGE_ID')

View File

@@ -0,0 +1,31 @@
import os
from django.conf import settings
from django.core.management.base import BaseCommand
from goodtimes.services import FacebookAPI, FacebookPoster
from ...models import Event
class Command(BaseCommand):
help = 'Test facebook posting functionality'
def handle(self, *args, **kwargs):
event = Event.objects.get(id=20)
if not event:
self.stdout.write(self.style.ERROR("No event found."))
if not event.image:
self.stdout.write(self.style.ERROR("No image found."))
base_domain = settings.BASE_DOMAIN
image_path = f"{base_domain}{event.image.url}"
print(f"complete path of image {image_path}")
caption = f"{event.title}\nDuration: {event.start_date} to {event.end_date}\nAddress: {event.venue.address}"
facebook_api = FacebookAPI()
facebook_poster = FacebookPoster(facebook_api)
response = facebook_poster.post_photo(image_path, caption)
if response['success']:
self.stdout.write(self.style.SUCCESS(response['message']))
else:
self.stdout.write(self.style.ERROR(response['message']))

View File

@@ -0,0 +1,34 @@
import os
from django.conf import settings
from django.core.management.base import BaseCommand
from goodtimes.services import InstagramAPI, InstagramPoster
from ...models import Event
import urllib.request
class Command(BaseCommand):
help = 'Test Instagram posting functionality'
def handle(self, *args, **kwargs):
event = Event.objects.get(id=20)
if not event:
self.stdout.write(self.style.ERROR("No event found."))
if not event.image:
self.stdout.write(self.style.ERROR("No image found."))
# base_domain = settings.BASE_DOMAIN
# image_path = f"{base_domain}{event.image.url}"
image_path = event.image.url
print(f"complete path of image {image_path}")
caption = f"{event.title}\nDuration: {event.start_date} to {event.end_date}\nAddress: {event.venue.address}"
instagram_api = InstagramAPI()
instagram_poster = InstagramPoster(instagram_api)
response = instagram_poster.post_image_with_caption(image_path, caption)
if response['success']:
self.stdout.write(self.style.SUCCESS(response['message']))
else:
self.stdout.write(self.style.ERROR(response['message']))

View File

@@ -0,0 +1,28 @@
import os
from django.core.management.base import BaseCommand
from goodtimes.services import TwitterAPI, TwitterPoster
from ...models import Event
class Command(BaseCommand):
help = 'Test Twitter posting functionality'
def handle(self, *args, **kwargs):
event = Event.objects.get(id=19)
if not event:
self.stdout.write(self.style.ERROR("No event found."))
if not event.image:
self.stdout.write(self.style.ERROR("No image found."))
image_path = event.image.path
caption = f"{event.title}\nDuration: {event.start_date} to {event.end_date}\nAddress: {event.venue.address}"
twitter_api = TwitterAPI()
twitter_poster = TwitterPoster(twitter_api)
response = twitter_poster.post_image_with_caption(image_path, caption)
if response['success']:
self.stdout.write(self.style.SUCCESS(response['message']))
else:
self.stdout.write(self.style.ERROR(response['message']))

View File

@@ -99,4 +99,6 @@ urlpatterns = [
views.GenerateEventReportView.as_view(), views.GenerateEventReportView.as_view(),
name="generate_event_report", name="generate_event_report",
), ),
path("post-to-social-media/<int:id>/<str:platform>/", views.SocialMediaPostView.as_view(), name="social_media_post")
] ]

View File

@@ -1,5 +1,6 @@
from django.shortcuts import get_object_or_404, redirect, render from django.shortcuts import get_object_or_404, redirect, render
from accounts import resource_action from accounts import resource_action
from goodtimes.services import FacebookAPI, FacebookPoster, InstagramAPI, InstagramPoster, TwitterAPI, TwitterPoster
from goodtimes.utils import JsonResponseUtil from goodtimes.utils import JsonResponseUtil
from manage_events.api.serializers import VenueSerializer, VenueShortSerializer from manage_events.api.serializers import VenueSerializer, VenueShortSerializer
from manage_events.forms import ( from manage_events.forms import (
@@ -553,3 +554,52 @@ class GenerateEventReportView(generic.View):
response["Content-Disposition"] = f'attachment; filename="{filename}"' response["Content-Disposition"] = f'attachment; filename="{filename}"'
return response return response
class SocialMediaPostView(generic.View):
def get(self, request, *args, **kwargs):
platform = kwargs.get("platform")
event_id = kwargs.get("id")
print(platform, event_id)
errors = []
try:
event = Event.objects.get(id=event_id)
except Event.DoesNotExist:
errors.append("Event does not exist")
return JsonResponseUtil.error(message=errors, errors=errors)
if not event.active:
errors.append("Event is not active")
return JsonResponseUtil.error(message=errors, errors=errors)
caption = f"{event.title}\nDuration: {event.start_date} to {event.end_date}\nAddress: {event.venue.address}"
print(f"image url and caption is {caption}")
if platform in ['instagram', 'facebook', 'twitter', 'all']:
if platform in ['twitter', 'all']:
image_url = event.image.path
twitter_api = TwitterAPI()
twitter_poster = TwitterPoster(twitter_api)
result = twitter_poster.post_image_with_caption(image_url, caption)
if not result['success']:
errors.append(result['message'])
image_url = request.build_absolute_uri(event.image.url) # fb and insta require complete path with domain
if platform in ['facebook', 'all']:
facebook_api = FacebookAPI()
facebook_poster = FacebookPoster(facebook_api)
result = facebook_poster.post_photo(image_url, caption)
if not result["success"]:
errors.append(result["message"])
if platform in ['instagram', 'all']:
instagram_api = InstagramAPI()
instagram_poster = InstagramPoster(instagram_api)
result = instagram_poster.post_image_with_caption(image_url, caption)
if not result["success"]:
errors.append(result["message"])
if not errors:
return JsonResponseUtil.success(message='Post Successful')
return JsonResponseUtil.error(message=errors, errors=errors)

View File

@@ -69,8 +69,9 @@ sniffio==1.3.1
sqlparse==0.4.4 sqlparse==0.4.4
stripe==8.2.0 stripe==8.2.0
tqdm==4.66.2 tqdm==4.66.2
tweepy==4.14.0
Twisted==23.10.0 Twisted==23.10.0
# twisted-iocpsupport==1.0.4 twisted-iocpsupport==1.0.4
txaio==23.1.1 txaio==23.1.1
typing_extensions==4.9.0 typing_extensions==4.9.0
tzdata==2024.1 tzdata==2024.1

BIN
static/img/facebook.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.5 KiB

BIN
static/img/instagram.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 38 KiB

BIN
static/img/x_twitter.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 16 KiB

View File

@@ -2,12 +2,48 @@
{% load static %} {% load static %}
{% block stylesheet %} {% block stylesheet %}
<!-- include required css cdn link through html here --> <!-- include required css cdn link through html here -->
<link href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0-beta3/css/all.min.css" rel="stylesheet">
{% include "cdn_through_html/filepond_cdn_css.html" %} {% include "cdn_through_html/filepond_cdn_css.html" %}
{% include "cdn_through_html/quill_cdn_css.html" %} {% include "cdn_through_html/quill_cdn_css.html" %}
{% include "cdn_through_html/tagify_cdn_css.html" %} {% include "cdn_through_html/tagify_cdn_css.html" %}
{% include "cdn_through_html/sweetalert2_cdn_css.html" %}
{{form.media}} {{form.media}}
<style>
.social-circle {
display: flex;
justify-content: center;
align-items: center;
gap: 20px;
}
.logos{
text-align:center;
margin-bottom:-390px;
}
.x{
margin:5px;
cursor: pointer;
}
.x_twitter{
width:50px;
height:50px;
}
.forward_all{
width:50px;
height:50px;
}
.instalogo{
width:50px;
height:50px;
}
.facelogo{
width:50px;
height:50px;
}
</style>
{% endblock %} {% endblock %}
{% block content %} {% block content %}
@@ -25,15 +61,41 @@
{% endif %} {% endif %}
</div> </div>
<div class="card mb-3" style="border-radius: 20px; box-shadow: 0 4px 8px rgba(0,0,0,.1);"> <div class="row">
<div class="card-body"> <div class="col-md-8">
<h2 class="card-title">{{ event.brand.title }}</h2> <div class="card mb-3" style="border-radius: 20px; box-shadow: 0 4px 8px rgba(0,0,0,.1);">
<h3 class="card-title">{{ event.title }}</h3> <div class="card-body">
<p class="card-text"><strong>Description:</strong> {{ event.description }}</p> <h2 class="card-title">{{ event.brand.title }}</h2>
<p class="card-text"><strong>Created By:</strong> {{ event.created_by }}</p> <h3 class="card-title">{{ event.title }}</h3>
<p class="card-text"><strong>Description:</strong> {{ event.description }}</p>
<p class="card-text"><strong>Created By:</strong> {{ event.created_by }}</p>
</div>
</div>
</div>
<div class="col-md-4">
<div class="card mb-3" style="border-radius: 20px; box-shadow: 0 4px 8px rgba(0,0,0,.1);">
<div class="card-body">
<div class="social-circle">
<a data-platform="instagram" class="social-icon" data-toggle="tooltip" title="Post to Instagram">
<img src="{% static 'img/instagram.png'%}" class="x instalogo">
</a>
<a data-platform="facebook" class="social-icon" data-toggle="tooltip" title="Post to Facebook">
<img src="{% static 'img/facebook.png'%}" class="x facelogo">
</a>
<a data-platform="twitter" class="social-icon" data-toggle="tooltip" title="Post to Twitter">
<img src="{% static 'img/x_twitter.png'%}" class="x x_twitter">
</a>
<a data-platform="all" class="social-icon" data-toggle="tooltip" title="Post to All">
<img src="{% static 'img/forward_all_icon.png'%}" class="x forward_all">
</a>
</div>
</div>
</div>
</div> </div>
</div> </div>
<div class="row"> <div class="row">
<div class="col-md-6"> <div class="col-md-6">
<div class="card mb-3" style="border-radius: 20px; box-shadow: 0 4px 8px rgba(0,0,0,.1);"> <div class="card mb-3" style="border-radius: 20px; box-shadow: 0 4px 8px rgba(0,0,0,.1);">
@@ -145,4 +207,78 @@
</div> </div>
{% endblock content %} {% endblock content %}
{% block javascript %}
<!-- include required css cdn link through html here -->
{% include "cdn_through_html/sweetalert2_cdn_js.html" %}
<script>
$(document).ready(function() {
// Initialize tooltips
$('[data-toggle="tooltip"]').tooltip();
$(".social-icon").on("click", function(e) {
console.log("social icon clicked");
e.preventDefault();
var platform = $(this).data("platform");
var eventId = "{{ event.id }}";
var platform_url = "{% url 'manage_events:social_media_post' id=0 platform='dummy' %}"
.replace('/0/', '/' + eventId + '/')
.replace('dummy', platform);
console.log(platform_url);
Swal.fire({
title: "Are you sure you want to post this event on " + platform + "?",
showCancelButton: true,
confirmButtonText: 'Yes',
cancelButtonText: 'No',
allowOutsideClick: false
}).then((result) => {
if (result.isConfirmed) {
Swal.fire({
title: 'Posting...',
allowOutsideClick: false,
didOpen: () => {
Swal.showLoading();
}
});
$.ajax({
url: platform_url,
type: "GET",
success: function(response) {
console.log(response);
Swal.close();
if (response.status === 200) {
Swal.fire({
icon: "success",
title: "Success",
text: response.message
});
} else if (response.status === 403) {
Swal.fire({
icon: "error",
title: "Error",
text: response.message
});
}
},
error: function(xhr, status, error) {
Swal.close();
Swal.fire({
icon: "error",
title: "Error",
text: "Something went wrong. Please try again later."
});
}
});
}
});
});
});
</script>
{%endblock javascript%}