From 631ae792778952ecbab1f20b67caa0b29fae0a46 Mon Sep 17 00:00:00 2001 From: paritosh18 Date: Mon, 13 Apr 2026 15:35:39 +0530 Subject: [PATCH] add websocket support with connection management and chat functionality --- prisma/schema.prisma | 17 +++ serverless.yml | 8 ++ serverless/functions/host.yml | 30 ----- serverless/functions/user.yml | 30 ----- serverless/functions/websocket.yml | 64 ++++++++++ src/common/middlewares/jwt/authForAny.ts | 78 ++++++++++++ src/common/services/websocket.service.ts | 58 +++++++++ src/modules/host/handlers/chat/getMessages.ts | 77 ------------ src/modules/host/handlers/chat/sendMessage.ts | 78 ------------ src/modules/user/handlers/chat/getMessages.ts | 77 ------------ src/modules/user/handlers/chat/sendMessage.ts | 78 ------------ src/modules/websocket/handlers/connect.ts | 55 +++++++++ src/modules/websocket/handlers/default.ts | 17 +++ src/modules/websocket/handlers/disconnect.ts | 26 ++++ src/modules/websocket/handlers/getMessages.ts | 99 ++++++++++++++++ src/modules/websocket/handlers/sendMessage.ts | 111 ++++++++++++++++++ 16 files changed, 533 insertions(+), 370 deletions(-) create mode 100644 serverless/functions/websocket.yml create mode 100644 src/common/middlewares/jwt/authForAny.ts create mode 100644 src/common/services/websocket.service.ts delete mode 100644 src/modules/host/handlers/chat/getMessages.ts delete mode 100644 src/modules/host/handlers/chat/sendMessage.ts delete mode 100644 src/modules/user/handlers/chat/getMessages.ts delete mode 100644 src/modules/user/handlers/chat/sendMessage.ts create mode 100644 src/modules/websocket/handlers/connect.ts create mode 100644 src/modules/websocket/handlers/default.ts create mode 100644 src/modules/websocket/handlers/disconnect.ts create mode 100644 src/modules/websocket/handlers/getMessages.ts create mode 100644 src/modules/websocket/handlers/sendMessage.ts diff --git a/prisma/schema.prisma b/prisma/schema.prisma index a2e4799..a8ea901 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -81,6 +81,7 @@ model User { activitySortings ActivitySorting[] sentActivityMessages ActivityMessages[] @relation("ActivityMessageSender") receivedActivityMessages ActivityMessages[] @relation("ActivityMessageReceiver") + chatConnections ChatConnections[] @@map("users") @@schema("usr") @@ -734,6 +735,22 @@ model Token { @@schema("usr") } +model ChatConnections { + id Int @id @default(autoincrement()) + userXid Int @map("user_xid") + user User @relation(fields: [userXid], references: [id], onDelete: Cascade) + activityXid Int? @map("activity_xid") + connectionId String @unique @map("connection_id") @db.VarChar(200) + isActive Boolean @default(true) @map("is_active") + createdAt DateTime @default(now()) @map("created_at") + updatedAt DateTime @updatedAt @map("updated_at") + deletedAt DateTime? @map("deleted_at") + + @@index([userXid, activityXid]) + @@map("chat_connections") + @@schema("usr") +} + //HOST MODELS model HostHeader { diff --git a/serverless.yml b/serverless.yml index ec8f4ef..afeec56 100644 --- a/serverless.yml +++ b/serverless.yml @@ -34,6 +34,8 @@ provider: binaryMediaTypes: - '*/*' minimumCompressionSize: 1024 + websocketsApiName: minglar-ws-${sls:stage} + websocketsApiRouteSelectionExpression: $request.body.action environment: DATABASE_URL: ${env:DATABASE_URL} @@ -81,6 +83,11 @@ provider: Resource: - 'arn:aws:s3:::${env:S3_BUCKET_NAME}' - 'arn:aws:s3:::${env:S3_BUCKET_NAME}/*' + - Effect: Allow + Action: + - execute-api:ManageConnections + Resource: + - 'arn:aws:execute-api:${self:provider.region}:*:*/*/@connections/*' build: esbuild: @@ -151,6 +158,7 @@ functions: - ${file(./serverless/functions/minglaradmin.yml)} - ${file(./serverless/functions/prepopulate.yml)} - ${file(./serverless/functions/user.yml)} + - ${file(./serverless/functions/websocket.yml)} plugins: - serverless-offline diff --git a/serverless/functions/host.yml b/serverless/functions/host.yml index 2a029b4..ba81f1d 100644 --- a/serverless/functions/host.yml +++ b/serverless/functions/host.yml @@ -585,33 +585,3 @@ submitPQAnswer: - httpApi: path: /Activity_Hub/OnBoarding/submit-pq-answer method: patch - -sendHostChatMessage: - handler: src/modules/host/handlers/chat/sendMessage.handler - memorySize: 384 - package: - patterns: - - 'src/modules/host/**' - - ${file(./serverless/patterns/base.yml):pattern1} - - ${file(./serverless/patterns/base.yml):pattern2} - - ${file(./serverless/patterns/base.yml):pattern3} - - ${file(./serverless/patterns/base.yml):pattern4} - events: - - httpApi: - path: /host/chat/send-message - method: post - -getHostChatMessages: - handler: src/modules/host/handlers/chat/getMessages.handler - memorySize: 384 - package: - patterns: - - 'src/modules/host/**' - - ${file(./serverless/patterns/base.yml):pattern1} - - ${file(./serverless/patterns/base.yml):pattern2} - - ${file(./serverless/patterns/base.yml):pattern3} - - ${file(./serverless/patterns/base.yml):pattern4} - events: - - httpApi: - path: /host/chat/messages - method: get diff --git a/serverless/functions/user.yml b/serverless/functions/user.yml index 33bb60f..6c15442 100644 --- a/serverless/functions/user.yml +++ b/serverless/functions/user.yml @@ -557,33 +557,3 @@ getMatchingBucketInterestedActivities: - httpApi: path: /itinerary/get-matching-bucket-interested-activities method: post - -sendUserChatMessage: - handler: src/modules/user/handlers/chat/sendMessage.handler - memorySize: 384 - package: - patterns: - - 'src/modules/user/**' - - ${file(./serverless/patterns/base.yml):pattern1} - - ${file(./serverless/patterns/base.yml):pattern2} - - ${file(./serverless/patterns/base.yml):pattern3} - - ${file(./serverless/patterns/base.yml):pattern4} - events: - - httpApi: - path: /chat/send-message - method: post - -getUserChatMessages: - handler: src/modules/user/handlers/chat/getMessages.handler - memorySize: 384 - package: - patterns: - - 'src/modules/user/**' - - ${file(./serverless/patterns/base.yml):pattern1} - - ${file(./serverless/patterns/base.yml):pattern2} - - ${file(./serverless/patterns/base.yml):pattern3} - - ${file(./serverless/patterns/base.yml):pattern4} - events: - - httpApi: - path: /chat/messages - method: get diff --git a/serverless/functions/websocket.yml b/serverless/functions/websocket.yml new file mode 100644 index 0000000..eb2fedc --- /dev/null +++ b/serverless/functions/websocket.yml @@ -0,0 +1,64 @@ +websocketConnect: + handler: src/modules/websocket/handlers/connect.handler + memorySize: 256 + package: + patterns: + - 'src/modules/websocket/**' + - 'src/common/**' + - ${file(./serverless/patterns/base.yml):pattern3} + - ${file(./serverless/patterns/base.yml):pattern4} + events: + - websocket: + route: $connect + +websocketDisconnect: + handler: src/modules/websocket/handlers/disconnect.handler + memorySize: 256 + package: + patterns: + - 'src/modules/websocket/**' + - 'src/common/**' + - ${file(./serverless/patterns/base.yml):pattern3} + - ${file(./serverless/patterns/base.yml):pattern4} + events: + - websocket: + route: $disconnect + +websocketDefault: + handler: src/modules/websocket/handlers/default.handler + memorySize: 256 + package: + patterns: + - 'src/modules/websocket/**' + - 'src/common/**' + - ${file(./serverless/patterns/base.yml):pattern3} + - ${file(./serverless/patterns/base.yml):pattern4} + events: + - websocket: + route: $default + +websocketSendMessage: + handler: src/modules/websocket/handlers/sendMessage.handler + memorySize: 384 + package: + patterns: + - 'src/modules/websocket/**' + - 'src/common/**' + - ${file(./serverless/patterns/base.yml):pattern3} + - ${file(./serverless/patterns/base.yml):pattern4} + events: + - websocket: + route: sendMessage + +websocketGetMessages: + handler: src/modules/websocket/handlers/getMessages.handler + memorySize: 384 + package: + patterns: + - 'src/modules/websocket/**' + - 'src/common/**' + - ${file(./serverless/patterns/base.yml):pattern3} + - ${file(./serverless/patterns/base.yml):pattern4} + events: + - websocket: + route: getMessages diff --git a/src/common/middlewares/jwt/authForAny.ts b/src/common/middlewares/jwt/authForAny.ts new file mode 100644 index 0000000..13115df --- /dev/null +++ b/src/common/middlewares/jwt/authForAny.ts @@ -0,0 +1,78 @@ +import jwt from 'jsonwebtoken'; +import httpStatus from 'http-status'; +import ApiError from '../../utils/helper/ApiError'; +import config from '../../../config/config'; +import { ROLE } from '@/common/utils/constants/common.constant'; +import { prisma } from '../../database/prisma.client'; + +interface DecodedToken { + id?: number; + sub?: string | number; + role?: string; + iat: number; + exp: number; +} + +export async function verifyAnyToken( + token: string +): Promise<{ id: number; roleXid: number; role?: string }> { + if (!token) { + throw new ApiError(httpStatus.UNAUTHORIZED, 'Please authenticate'); + } + + try { + const decoded = jwt.verify(token, config.jwt.secret) as unknown as DecodedToken; + + const userId = decoded.id ?? (decoded.sub ? Number(decoded.sub) : null); + if (!userId) { + throw new ApiError(httpStatus.UNAUTHORIZED, 'Invalid token payload'); + } + + const user = await prisma.user.findUnique({ + where: { id: userId }, + include: { role: true }, + }); + + const latestToken = await prisma.token.findFirst({ + where: { userXid: userId }, + orderBy: { id: 'desc' }, + }); + + if (latestToken?.isBlackListed === true) { + throw new ApiError(401, 'This session is expired. Please login.'); + } + + if (!user) { + throw new ApiError(httpStatus.UNAUTHORIZED, 'User not found'); + } + + if (user.isActive === false) { + throw new ApiError( + httpStatus.FORBIDDEN, + 'Your account is deactivated by admin.' + ); + } + + if (user.roleXid !== ROLE.USER && user.roleXid !== ROLE.HOST) { + throw new ApiError(httpStatus.FORBIDDEN, 'Access denied.'); + } + + return { id: user.id, roleXid: user.roleXid || 0, role: user.role?.roleName }; + } catch (error) { + if (error instanceof jwt.TokenExpiredError) { + throw new ApiError( + httpStatus.UNAUTHORIZED, + 'Your session has expired. Please log in again.' + ); + } + + if (error instanceof ApiError) { + throw error; + } + + throw new ApiError( + httpStatus.FORBIDDEN, + 'Invalid or expired authentication token.' + ); + } +} diff --git a/src/common/services/websocket.service.ts b/src/common/services/websocket.service.ts new file mode 100644 index 0000000..6e41b86 --- /dev/null +++ b/src/common/services/websocket.service.ts @@ -0,0 +1,58 @@ +import { PrismaClient } from '@prisma/client'; + +export class WebSocketService { + constructor(private prisma: PrismaClient) {} + + async connect(params: { + connectionId: string; + userXid: number; + activityXid?: number | null; + }) { + const { connectionId, userXid, activityXid } = params; + return this.prisma.chatConnections.upsert({ + where: { connectionId }, + create: { + connectionId, + userXid, + activityXid: activityXid ?? null, + isActive: true, + }, + update: { + userXid, + activityXid: activityXid ?? null, + isActive: true, + deletedAt: null, + }, + }); + } + + async disconnect(connectionId: string) { + return this.prisma.chatConnections.updateMany({ + where: { connectionId }, + data: { + isActive: false, + deletedAt: new Date(), + }, + }); + } + + async getConnectionById(connectionId: string) { + return this.prisma.chatConnections.findFirst({ + where: { connectionId, isActive: true }, + }); + } + + async getConnectionsForUser(params: { + userXid: number; + activityXid?: number | null; + }) { + const { userXid, activityXid } = params; + return this.prisma.chatConnections.findMany({ + where: { + userXid, + isActive: true, + ...(activityXid ? { activityXid } : {}), + }, + }); + } +} diff --git a/src/modules/host/handlers/chat/getMessages.ts b/src/modules/host/handlers/chat/getMessages.ts deleted file mode 100644 index 1f5bd31..0000000 --- a/src/modules/host/handlers/chat/getMessages.ts +++ /dev/null @@ -1,77 +0,0 @@ -import { - APIGatewayProxyEvent, - APIGatewayProxyResult, - Context, -} from 'aws-lambda'; -import { prismaClient } from '../../../../common/database/prisma.lambda.service'; -import { verifyHostToken } from '../../../../common/middlewares/jwt/authForHost'; -import { ChatService } from '../../../../common/services/chat.service'; -import { safeHandler } from '../../../../common/utils/handlers/safeHandler'; -import ApiError from '../../../../common/utils/helper/ApiError'; - -const chatService = new ChatService(prismaClient); - -export const handler = safeHandler( - async ( - event: APIGatewayProxyEvent, - context?: Context - ): Promise => { - const token = - event.headers['x-auth-token'] || event.headers['X-Auth-Token']; - if (!token) { - throw new ApiError( - 400, - 'This is a protected route. Please provide a valid token.' - ); - } - - const userInfo = await verifyHostToken(token); - const hostUserId = Number(userInfo.id); - - if (!hostUserId || isNaN(hostUserId)) { - throw new ApiError(400, 'Invalid host user ID'); - } - - const activityXidParam = - event.queryStringParameters?.activityXid || - event.queryStringParameters?.activity_xid; - const otherUserXidParam = - event.queryStringParameters?.otherUserXid || - event.queryStringParameters?.other_user_xid || - event.queryStringParameters?.userXid || - event.queryStringParameters?.user_xid; - const limitParam = event.queryStringParameters?.limit; - - const activityXid = Number(activityXidParam); - const otherUserXid = Number(otherUserXidParam); - const limit = limitParam ? Number(limitParam) : undefined; - - if (!activityXid || isNaN(activityXid)) { - throw new ApiError(400, 'Valid activityXid is required'); - } - - if (!otherUserXid || isNaN(otherUserXid)) { - throw new ApiError(400, 'Valid otherUserXid is required'); - } - - const messages = await chatService.getMessages({ - activityXid, - userXid: hostUserId, - otherUserXid, - limit, - }); - - return { - statusCode: 200, - headers: { - 'Content-Type': 'application/json', - 'Access-Control-Allow-Origin': '*', - }, - body: JSON.stringify({ - success: true, - message: 'Messages retrieved successfully', - data: messages, - }), - }; - } -); diff --git a/src/modules/host/handlers/chat/sendMessage.ts b/src/modules/host/handlers/chat/sendMessage.ts deleted file mode 100644 index a5d2adf..0000000 --- a/src/modules/host/handlers/chat/sendMessage.ts +++ /dev/null @@ -1,78 +0,0 @@ -import { - APIGatewayProxyEvent, - APIGatewayProxyResult, - Context, -} from 'aws-lambda'; -import { prismaClient } from '../../../../common/database/prisma.lambda.service'; -import { verifyHostToken } from '../../../../common/middlewares/jwt/authForHost'; -import { ChatService } from '../../../../common/services/chat.service'; -import { safeHandler } from '../../../../common/utils/handlers/safeHandler'; -import ApiError from '../../../../common/utils/helper/ApiError'; - -const chatService = new ChatService(prismaClient); - -export const handler = safeHandler( - async ( - event: APIGatewayProxyEvent, - context?: Context - ): Promise => { - const token = - event.headers['x-auth-token'] || event.headers['X-Auth-Token']; - if (!token) { - throw new ApiError( - 400, - 'This is a protected route. Please provide a valid token.' - ); - } - - const userInfo = await verifyHostToken(token); - const hostUserId = Number(userInfo.id); - - if (!hostUserId || isNaN(hostUserId)) { - throw new ApiError(400, 'Invalid host user ID'); - } - - let body: any; - try { - body = JSON.parse(event.body || '{}'); - } catch { - throw new ApiError(400, 'Invalid JSON in request body'); - } - - const activityXid = Number(body.activityXid ?? body.activity_xid); - const receiverXid = Number( - body.receiverXid ?? body.receivedXid ?? body.received_xid - ); - const message = body.message; - const status = body.status; - - if (!activityXid || isNaN(activityXid)) { - throw new ApiError(400, 'Valid activityXid is required'); - } - - if (!receiverXid || isNaN(receiverXid)) { - throw new ApiError(400, 'Valid receiverXid is required'); - } - - const result = await chatService.sendMessage({ - activityXid, - senderXid: hostUserId, - receiverXid, - message, - status, - }); - - return { - statusCode: 201, - headers: { - 'Content-Type': 'application/json', - 'Access-Control-Allow-Origin': '*', - }, - body: JSON.stringify({ - success: true, - message: 'Message sent successfully', - data: result, - }), - }; - } -); diff --git a/src/modules/user/handlers/chat/getMessages.ts b/src/modules/user/handlers/chat/getMessages.ts deleted file mode 100644 index 10eb48b..0000000 --- a/src/modules/user/handlers/chat/getMessages.ts +++ /dev/null @@ -1,77 +0,0 @@ -import { - APIGatewayProxyEvent, - APIGatewayProxyResult, - Context, -} from 'aws-lambda'; -import { prismaClient } from '../../../../common/database/prisma.lambda.service'; -import { verifyUserToken } from '../../../../common/middlewares/jwt/authForUser'; -import { ChatService } from '../../../../common/services/chat.service'; -import { safeHandler } from '../../../../common/utils/handlers/safeHandler'; -import ApiError from '../../../../common/utils/helper/ApiError'; - -const chatService = new ChatService(prismaClient); - -export const handler = safeHandler( - async ( - event: APIGatewayProxyEvent, - context?: Context - ): Promise => { - const token = - event.headers['x-auth-token'] || event.headers['X-Auth-Token']; - if (!token) { - throw new ApiError( - 400, - 'This is a protected route. Please provide a valid token.' - ); - } - - const userInfo = await verifyUserToken(token); - const userId = Number(userInfo.id); - - if (!userId || isNaN(userId)) { - throw new ApiError(400, 'Invalid user ID'); - } - - const activityXidParam = - event.queryStringParameters?.activityXid || - event.queryStringParameters?.activity_xid; - const otherUserXidParam = - event.queryStringParameters?.otherUserXid || - event.queryStringParameters?.other_user_xid || - event.queryStringParameters?.userXid || - event.queryStringParameters?.user_xid; - const limitParam = event.queryStringParameters?.limit; - - const activityXid = Number(activityXidParam); - const otherUserXid = Number(otherUserXidParam); - const limit = limitParam ? Number(limitParam) : undefined; - - if (!activityXid || isNaN(activityXid)) { - throw new ApiError(400, 'Valid activityXid is required'); - } - - if (!otherUserXid || isNaN(otherUserXid)) { - throw new ApiError(400, 'Valid otherUserXid is required'); - } - - const messages = await chatService.getMessages({ - activityXid, - userXid: userId, - otherUserXid, - limit, - }); - - return { - statusCode: 200, - headers: { - 'Content-Type': 'application/json', - 'Access-Control-Allow-Origin': '*', - }, - body: JSON.stringify({ - success: true, - message: 'Messages retrieved successfully', - data: messages, - }), - }; - } -); diff --git a/src/modules/user/handlers/chat/sendMessage.ts b/src/modules/user/handlers/chat/sendMessage.ts deleted file mode 100644 index fa5122a..0000000 --- a/src/modules/user/handlers/chat/sendMessage.ts +++ /dev/null @@ -1,78 +0,0 @@ -import { - APIGatewayProxyEvent, - APIGatewayProxyResult, - Context, -} from 'aws-lambda'; -import { prismaClient } from '../../../../common/database/prisma.lambda.service'; -import { verifyUserToken } from '../../../../common/middlewares/jwt/authForUser'; -import { ChatService } from '../../../../common/services/chat.service'; -import { safeHandler } from '../../../../common/utils/handlers/safeHandler'; -import ApiError from '../../../../common/utils/helper/ApiError'; - -const chatService = new ChatService(prismaClient); - -export const handler = safeHandler( - async ( - event: APIGatewayProxyEvent, - context?: Context - ): Promise => { - const token = - event.headers['x-auth-token'] || event.headers['X-Auth-Token']; - if (!token) { - throw new ApiError( - 400, - 'This is a protected route. Please provide a valid token.' - ); - } - - const userInfo = await verifyUserToken(token); - const userId = Number(userInfo.id); - - if (!userId || isNaN(userId)) { - throw new ApiError(400, 'Invalid user ID'); - } - - let body: any; - try { - body = JSON.parse(event.body || '{}'); - } catch { - throw new ApiError(400, 'Invalid JSON in request body'); - } - - const activityXid = Number(body.activityXid ?? body.activity_xid); - const receiverXid = Number( - body.receiverXid ?? body.receivedXid ?? body.received_xid - ); - const message = body.message; - const status = body.status; - - if (!activityXid || isNaN(activityXid)) { - throw new ApiError(400, 'Valid activityXid is required'); - } - - if (!receiverXid || isNaN(receiverXid)) { - throw new ApiError(400, 'Valid receiverXid is required'); - } - - const result = await chatService.sendMessage({ - activityXid, - senderXid: userId, - receiverXid, - message, - status, - }); - - return { - statusCode: 201, - headers: { - 'Content-Type': 'application/json', - 'Access-Control-Allow-Origin': '*', - }, - body: JSON.stringify({ - success: true, - message: 'Message sent successfully', - data: result, - }), - }; - } -); diff --git a/src/modules/websocket/handlers/connect.ts b/src/modules/websocket/handlers/connect.ts new file mode 100644 index 0000000..e0dc957 --- /dev/null +++ b/src/modules/websocket/handlers/connect.ts @@ -0,0 +1,55 @@ +import { APIGatewayProxyEvent, APIGatewayProxyResult, Context } from 'aws-lambda'; +import { prismaClient } from '../../../common/database/prisma.lambda.service'; +import { verifyAnyToken } from '../../../common/middlewares/jwt/authForAny'; +import { WebSocketService } from '../../../common/services/websocket.service'; +import { safeHandler } from '../../../common/utils/handlers/safeHandler'; +import ApiError from '../../../common/utils/helper/ApiError'; + +const wsService = new WebSocketService(prismaClient); + +export const handler = safeHandler( + async ( + event: APIGatewayProxyEvent, + context?: Context + ): Promise => { + const connectionId = event.requestContext.connectionId; + if (!connectionId) { + throw new ApiError(400, 'Missing connection ID'); + } + + const token = + event.queryStringParameters?.token || + event.queryStringParameters?.['x-auth-token'] || + event.headers?.['x-auth-token'] || + event.headers?.['X-Auth-Token']; + + if (!token) { + throw new ApiError(401, 'Token is required for WebSocket connection'); + } + + const userInfo = await verifyAnyToken(token); + + const activityXidParam = + event.queryStringParameters?.activityXid || + event.queryStringParameters?.activity_xid; + const activityXid = activityXidParam ? Number(activityXidParam) : null; + + if (activityXidParam && (!activityXid || isNaN(activityXid))) { + throw new ApiError(400, 'Invalid activityXid'); + } + + await wsService.connect({ + connectionId, + userXid: userInfo.id, + activityXid, + }); + + return { + statusCode: 200, + body: JSON.stringify({ + success: true, + message: 'Connected', + }), + }; + } +); diff --git a/src/modules/websocket/handlers/default.ts b/src/modules/websocket/handlers/default.ts new file mode 100644 index 0000000..49fdf5f --- /dev/null +++ b/src/modules/websocket/handlers/default.ts @@ -0,0 +1,17 @@ +import { APIGatewayProxyEvent, APIGatewayProxyResult, Context } from 'aws-lambda'; +import { safeHandler } from '../../../common/utils/handlers/safeHandler'; + +export const handler = safeHandler( + async ( + event: APIGatewayProxyEvent, + context?: Context + ): Promise => { + return { + statusCode: 200, + body: JSON.stringify({ + success: true, + message: 'Default route', + }), + }; + } +); diff --git a/src/modules/websocket/handlers/disconnect.ts b/src/modules/websocket/handlers/disconnect.ts new file mode 100644 index 0000000..5b0c746 --- /dev/null +++ b/src/modules/websocket/handlers/disconnect.ts @@ -0,0 +1,26 @@ +import { APIGatewayProxyEvent, APIGatewayProxyResult, Context } from 'aws-lambda'; +import { prismaClient } from '../../../common/database/prisma.lambda.service'; +import { WebSocketService } from '../../../common/services/websocket.service'; +import { safeHandler } from '../../../common/utils/handlers/safeHandler'; + +const wsService = new WebSocketService(prismaClient); + +export const handler = safeHandler( + async ( + event: APIGatewayProxyEvent, + context?: Context + ): Promise => { + const connectionId = event.requestContext.connectionId; + if (connectionId) { + await wsService.disconnect(connectionId); + } + + return { + statusCode: 200, + body: JSON.stringify({ + success: true, + message: 'Disconnected', + }), + }; + } +); diff --git a/src/modules/websocket/handlers/getMessages.ts b/src/modules/websocket/handlers/getMessages.ts new file mode 100644 index 0000000..4192cc6 --- /dev/null +++ b/src/modules/websocket/handlers/getMessages.ts @@ -0,0 +1,99 @@ +import { APIGatewayProxyEvent, APIGatewayProxyResult, Context } from 'aws-lambda'; +import AWS from 'aws-sdk'; +import { prismaClient } from '../../../common/database/prisma.lambda.service'; +import { ChatService } from '../../../common/services/chat.service'; +import { WebSocketService } from '../../../common/services/websocket.service'; +import { safeHandler } from '../../../common/utils/handlers/safeHandler'; +import ApiError from '../../../common/utils/helper/ApiError'; + +const chatService = new ChatService(prismaClient); +const wsService = new WebSocketService(prismaClient); + +function getApiGatewayEndpoint(event: APIGatewayProxyEvent): string { + const domainName = event.requestContext.domainName; + const stage = event.requestContext.stage; + const isLocal = + domainName?.includes('localhost') || domainName?.includes('127.0.0.1'); + const protocol = isLocal ? 'http' : 'https'; + return `${protocol}://${domainName}/${stage}`; +} + +async function postToConnection( + api: AWS.ApiGatewayManagementApi, + connectionId: string, + payload: unknown +) { + await api + .postToConnection({ + ConnectionId: connectionId, + Data: Buffer.from(JSON.stringify(payload)), + }) + .promise(); +} + +export const handler = safeHandler( + async ( + event: APIGatewayProxyEvent, + context?: Context + ): Promise => { + const connectionId = event.requestContext.connectionId; + if (!connectionId) { + throw new ApiError(400, 'Missing connection ID'); + } + + let body: any; + try { + body = JSON.parse(event.body || '{}'); + } catch { + throw new ApiError(400, 'Invalid JSON in request body'); + } + + const activityXid = Number(body.activityXid ?? body.activity_xid); + const otherUserXid = Number(body.otherUserXid ?? body.other_user_xid); + const limit = body.limit ? Number(body.limit) : undefined; + + if (!activityXid || isNaN(activityXid)) { + throw new ApiError(400, 'Valid activityXid is required'); + } + + if (!otherUserXid || isNaN(otherUserXid)) { + throw new ApiError(400, 'Valid otherUserXid is required'); + } + + const connection = await wsService.getConnectionById(connectionId); + if (!connection) { + throw new ApiError(401, 'Unauthorized WebSocket connection'); + } + + const userXid = connection.userXid; + + const messages = await chatService.getMessages({ + activityXid, + userXid, + otherUserXid, + limit, + }); + + const endpoint = getApiGatewayEndpoint(event); + const api = new AWS.ApiGatewayManagementApi({ endpoint }); + const payload = { type: 'messages', data: messages }; + + try { + await postToConnection(api, connectionId, payload); + } catch (err: any) { + if (err?.statusCode === 410) { + await wsService.disconnect(connectionId); + } else { + throw err; + } + } + + return { + statusCode: 200, + body: JSON.stringify({ + success: true, + message: 'Messages sent', + }), + }; + } +); diff --git a/src/modules/websocket/handlers/sendMessage.ts b/src/modules/websocket/handlers/sendMessage.ts new file mode 100644 index 0000000..8f977e2 --- /dev/null +++ b/src/modules/websocket/handlers/sendMessage.ts @@ -0,0 +1,111 @@ +import { APIGatewayProxyEvent, APIGatewayProxyResult, Context } from 'aws-lambda'; +import AWS from 'aws-sdk'; +import { prismaClient } from '../../../common/database/prisma.lambda.service'; +import { ChatService } from '../../../common/services/chat.service'; +import { WebSocketService } from '../../../common/services/websocket.service'; +import { safeHandler } from '../../../common/utils/handlers/safeHandler'; +import ApiError from '../../../common/utils/helper/ApiError'; + +const chatService = new ChatService(prismaClient); +const wsService = new WebSocketService(prismaClient); + +function getApiGatewayEndpoint(event: APIGatewayProxyEvent): string { + const domainName = event.requestContext.domainName; + const stage = event.requestContext.stage; + const isLocal = + domainName?.includes('localhost') || domainName?.includes('127.0.0.1'); + const protocol = isLocal ? 'http' : 'https'; + return `${protocol}://${domainName}/${stage}`; +} + +async function postToConnection( + api: AWS.ApiGatewayManagementApi, + connectionId: string, + payload: unknown +) { + await api + .postToConnection({ + ConnectionId: connectionId, + Data: Buffer.from(JSON.stringify(payload)), + }) + .promise(); +} + +export const handler = safeHandler( + async ( + event: APIGatewayProxyEvent, + context?: Context + ): Promise => { + const connectionId = event.requestContext.connectionId; + if (!connectionId) { + throw new ApiError(400, 'Missing connection ID'); + } + + let body: any; + try { + body = JSON.parse(event.body || '{}'); + } catch { + throw new ApiError(400, 'Invalid JSON in request body'); + } + + const activityXid = Number(body.activityXid ?? body.activity_xid); + const receiverXid = Number( + body.receiverXid ?? body.receivedXid ?? body.received_xid + ); + const message = body.message; + + if (!activityXid || isNaN(activityXid)) { + throw new ApiError(400, 'Valid activityXid is required'); + } + + if (!receiverXid || isNaN(receiverXid)) { + throw new ApiError(400, 'Valid receiverXid is required'); + } + + const connection = await wsService.getConnectionById(connectionId); + if (!connection) { + throw new ApiError(401, 'Unauthorized WebSocket connection'); + } + + const senderXid = connection.userXid; + + const saved = await chatService.sendMessage({ + activityXid, + senderXid, + receiverXid, + message, + }); + + const endpoint = getApiGatewayEndpoint(event); + const api = new AWS.ApiGatewayManagementApi({ endpoint }); + + const [receiverConnections, senderConnections] = await Promise.all([ + wsService.getConnectionsForUser({ userXid: receiverXid, activityXid }), + wsService.getConnectionsForUser({ userXid: senderXid, activityXid }), + ]); + + const targets = [...receiverConnections, ...senderConnections]; + const payload = { type: 'message', data: saved }; + + for (const target of targets) { + try { + await postToConnection(api, target.connectionId, payload); + } catch (err: any) { + if (err?.statusCode === 410) { + await wsService.disconnect(target.connectionId); + continue; + } + throw err; + } + } + + return { + statusCode: 200, + body: JSON.stringify({ + success: true, + message: 'Message sent', + data: saved, + }), + }; + } +);