add websocket support with connection management and chat functionality

This commit is contained in:
paritosh18
2026-04-13 15:35:39 +05:30
parent dcb2259c7d
commit 631ae79277
16 changed files with 533 additions and 370 deletions

View File

@@ -1,77 +0,0 @@
import {
APIGatewayProxyEvent,
APIGatewayProxyResult,
Context,
} from 'aws-lambda';
import { prismaClient } from '../../../../common/database/prisma.lambda.service';
import { verifyHostToken } from '../../../../common/middlewares/jwt/authForHost';
import { ChatService } from '../../../../common/services/chat.service';
import { safeHandler } from '../../../../common/utils/handlers/safeHandler';
import ApiError from '../../../../common/utils/helper/ApiError';
const chatService = new ChatService(prismaClient);
export const handler = safeHandler(
async (
event: APIGatewayProxyEvent,
context?: Context
): Promise<APIGatewayProxyResult> => {
const token =
event.headers['x-auth-token'] || event.headers['X-Auth-Token'];
if (!token) {
throw new ApiError(
400,
'This is a protected route. Please provide a valid token.'
);
}
const userInfo = await verifyHostToken(token);
const hostUserId = Number(userInfo.id);
if (!hostUserId || isNaN(hostUserId)) {
throw new ApiError(400, 'Invalid host user ID');
}
const activityXidParam =
event.queryStringParameters?.activityXid ||
event.queryStringParameters?.activity_xid;
const otherUserXidParam =
event.queryStringParameters?.otherUserXid ||
event.queryStringParameters?.other_user_xid ||
event.queryStringParameters?.userXid ||
event.queryStringParameters?.user_xid;
const limitParam = event.queryStringParameters?.limit;
const activityXid = Number(activityXidParam);
const otherUserXid = Number(otherUserXidParam);
const limit = limitParam ? Number(limitParam) : undefined;
if (!activityXid || isNaN(activityXid)) {
throw new ApiError(400, 'Valid activityXid is required');
}
if (!otherUserXid || isNaN(otherUserXid)) {
throw new ApiError(400, 'Valid otherUserXid is required');
}
const messages = await chatService.getMessages({
activityXid,
userXid: hostUserId,
otherUserXid,
limit,
});
return {
statusCode: 200,
headers: {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*',
},
body: JSON.stringify({
success: true,
message: 'Messages retrieved successfully',
data: messages,
}),
};
}
);

View File

@@ -1,78 +0,0 @@
import {
APIGatewayProxyEvent,
APIGatewayProxyResult,
Context,
} from 'aws-lambda';
import { prismaClient } from '../../../../common/database/prisma.lambda.service';
import { verifyHostToken } from '../../../../common/middlewares/jwt/authForHost';
import { ChatService } from '../../../../common/services/chat.service';
import { safeHandler } from '../../../../common/utils/handlers/safeHandler';
import ApiError from '../../../../common/utils/helper/ApiError';
const chatService = new ChatService(prismaClient);
export const handler = safeHandler(
async (
event: APIGatewayProxyEvent,
context?: Context
): Promise<APIGatewayProxyResult> => {
const token =
event.headers['x-auth-token'] || event.headers['X-Auth-Token'];
if (!token) {
throw new ApiError(
400,
'This is a protected route. Please provide a valid token.'
);
}
const userInfo = await verifyHostToken(token);
const hostUserId = Number(userInfo.id);
if (!hostUserId || isNaN(hostUserId)) {
throw new ApiError(400, 'Invalid host user ID');
}
let body: any;
try {
body = JSON.parse(event.body || '{}');
} catch {
throw new ApiError(400, 'Invalid JSON in request body');
}
const activityXid = Number(body.activityXid ?? body.activity_xid);
const receiverXid = Number(
body.receiverXid ?? body.receivedXid ?? body.received_xid
);
const message = body.message;
const status = body.status;
if (!activityXid || isNaN(activityXid)) {
throw new ApiError(400, 'Valid activityXid is required');
}
if (!receiverXid || isNaN(receiverXid)) {
throw new ApiError(400, 'Valid receiverXid is required');
}
const result = await chatService.sendMessage({
activityXid,
senderXid: hostUserId,
receiverXid,
message,
status,
});
return {
statusCode: 201,
headers: {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*',
},
body: JSON.stringify({
success: true,
message: 'Message sent successfully',
data: result,
}),
};
}
);

View File

@@ -1,77 +0,0 @@
import {
APIGatewayProxyEvent,
APIGatewayProxyResult,
Context,
} from 'aws-lambda';
import { prismaClient } from '../../../../common/database/prisma.lambda.service';
import { verifyUserToken } from '../../../../common/middlewares/jwt/authForUser';
import { ChatService } from '../../../../common/services/chat.service';
import { safeHandler } from '../../../../common/utils/handlers/safeHandler';
import ApiError from '../../../../common/utils/helper/ApiError';
const chatService = new ChatService(prismaClient);
export const handler = safeHandler(
async (
event: APIGatewayProxyEvent,
context?: Context
): Promise<APIGatewayProxyResult> => {
const token =
event.headers['x-auth-token'] || event.headers['X-Auth-Token'];
if (!token) {
throw new ApiError(
400,
'This is a protected route. Please provide a valid token.'
);
}
const userInfo = await verifyUserToken(token);
const userId = Number(userInfo.id);
if (!userId || isNaN(userId)) {
throw new ApiError(400, 'Invalid user ID');
}
const activityXidParam =
event.queryStringParameters?.activityXid ||
event.queryStringParameters?.activity_xid;
const otherUserXidParam =
event.queryStringParameters?.otherUserXid ||
event.queryStringParameters?.other_user_xid ||
event.queryStringParameters?.userXid ||
event.queryStringParameters?.user_xid;
const limitParam = event.queryStringParameters?.limit;
const activityXid = Number(activityXidParam);
const otherUserXid = Number(otherUserXidParam);
const limit = limitParam ? Number(limitParam) : undefined;
if (!activityXid || isNaN(activityXid)) {
throw new ApiError(400, 'Valid activityXid is required');
}
if (!otherUserXid || isNaN(otherUserXid)) {
throw new ApiError(400, 'Valid otherUserXid is required');
}
const messages = await chatService.getMessages({
activityXid,
userXid: userId,
otherUserXid,
limit,
});
return {
statusCode: 200,
headers: {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*',
},
body: JSON.stringify({
success: true,
message: 'Messages retrieved successfully',
data: messages,
}),
};
}
);

View File

@@ -1,78 +0,0 @@
import {
APIGatewayProxyEvent,
APIGatewayProxyResult,
Context,
} from 'aws-lambda';
import { prismaClient } from '../../../../common/database/prisma.lambda.service';
import { verifyUserToken } from '../../../../common/middlewares/jwt/authForUser';
import { ChatService } from '../../../../common/services/chat.service';
import { safeHandler } from '../../../../common/utils/handlers/safeHandler';
import ApiError from '../../../../common/utils/helper/ApiError';
const chatService = new ChatService(prismaClient);
export const handler = safeHandler(
async (
event: APIGatewayProxyEvent,
context?: Context
): Promise<APIGatewayProxyResult> => {
const token =
event.headers['x-auth-token'] || event.headers['X-Auth-Token'];
if (!token) {
throw new ApiError(
400,
'This is a protected route. Please provide a valid token.'
);
}
const userInfo = await verifyUserToken(token);
const userId = Number(userInfo.id);
if (!userId || isNaN(userId)) {
throw new ApiError(400, 'Invalid user ID');
}
let body: any;
try {
body = JSON.parse(event.body || '{}');
} catch {
throw new ApiError(400, 'Invalid JSON in request body');
}
const activityXid = Number(body.activityXid ?? body.activity_xid);
const receiverXid = Number(
body.receiverXid ?? body.receivedXid ?? body.received_xid
);
const message = body.message;
const status = body.status;
if (!activityXid || isNaN(activityXid)) {
throw new ApiError(400, 'Valid activityXid is required');
}
if (!receiverXid || isNaN(receiverXid)) {
throw new ApiError(400, 'Valid receiverXid is required');
}
const result = await chatService.sendMessage({
activityXid,
senderXid: userId,
receiverXid,
message,
status,
});
return {
statusCode: 201,
headers: {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*',
},
body: JSON.stringify({
success: true,
message: 'Message sent successfully',
data: result,
}),
};
}
);

View File

@@ -0,0 +1,55 @@
import { APIGatewayProxyEvent, APIGatewayProxyResult, Context } from 'aws-lambda';
import { prismaClient } from '../../../common/database/prisma.lambda.service';
import { verifyAnyToken } from '../../../common/middlewares/jwt/authForAny';
import { WebSocketService } from '../../../common/services/websocket.service';
import { safeHandler } from '../../../common/utils/handlers/safeHandler';
import ApiError from '../../../common/utils/helper/ApiError';
const wsService = new WebSocketService(prismaClient);
export const handler = safeHandler(
async (
event: APIGatewayProxyEvent,
context?: Context
): Promise<APIGatewayProxyResult> => {
const connectionId = event.requestContext.connectionId;
if (!connectionId) {
throw new ApiError(400, 'Missing connection ID');
}
const token =
event.queryStringParameters?.token ||
event.queryStringParameters?.['x-auth-token'] ||
event.headers?.['x-auth-token'] ||
event.headers?.['X-Auth-Token'];
if (!token) {
throw new ApiError(401, 'Token is required for WebSocket connection');
}
const userInfo = await verifyAnyToken(token);
const activityXidParam =
event.queryStringParameters?.activityXid ||
event.queryStringParameters?.activity_xid;
const activityXid = activityXidParam ? Number(activityXidParam) : null;
if (activityXidParam && (!activityXid || isNaN(activityXid))) {
throw new ApiError(400, 'Invalid activityXid');
}
await wsService.connect({
connectionId,
userXid: userInfo.id,
activityXid,
});
return {
statusCode: 200,
body: JSON.stringify({
success: true,
message: 'Connected',
}),
};
}
);

View File

@@ -0,0 +1,17 @@
import { APIGatewayProxyEvent, APIGatewayProxyResult, Context } from 'aws-lambda';
import { safeHandler } from '../../../common/utils/handlers/safeHandler';
export const handler = safeHandler(
async (
event: APIGatewayProxyEvent,
context?: Context
): Promise<APIGatewayProxyResult> => {
return {
statusCode: 200,
body: JSON.stringify({
success: true,
message: 'Default route',
}),
};
}
);

View File

@@ -0,0 +1,26 @@
import { APIGatewayProxyEvent, APIGatewayProxyResult, Context } from 'aws-lambda';
import { prismaClient } from '../../../common/database/prisma.lambda.service';
import { WebSocketService } from '../../../common/services/websocket.service';
import { safeHandler } from '../../../common/utils/handlers/safeHandler';
const wsService = new WebSocketService(prismaClient);
export const handler = safeHandler(
async (
event: APIGatewayProxyEvent,
context?: Context
): Promise<APIGatewayProxyResult> => {
const connectionId = event.requestContext.connectionId;
if (connectionId) {
await wsService.disconnect(connectionId);
}
return {
statusCode: 200,
body: JSON.stringify({
success: true,
message: 'Disconnected',
}),
};
}
);

View File

@@ -0,0 +1,99 @@
import { APIGatewayProxyEvent, APIGatewayProxyResult, Context } from 'aws-lambda';
import AWS from 'aws-sdk';
import { prismaClient } from '../../../common/database/prisma.lambda.service';
import { ChatService } from '../../../common/services/chat.service';
import { WebSocketService } from '../../../common/services/websocket.service';
import { safeHandler } from '../../../common/utils/handlers/safeHandler';
import ApiError from '../../../common/utils/helper/ApiError';
const chatService = new ChatService(prismaClient);
const wsService = new WebSocketService(prismaClient);
function getApiGatewayEndpoint(event: APIGatewayProxyEvent): string {
const domainName = event.requestContext.domainName;
const stage = event.requestContext.stage;
const isLocal =
domainName?.includes('localhost') || domainName?.includes('127.0.0.1');
const protocol = isLocal ? 'http' : 'https';
return `${protocol}://${domainName}/${stage}`;
}
async function postToConnection(
api: AWS.ApiGatewayManagementApi,
connectionId: string,
payload: unknown
) {
await api
.postToConnection({
ConnectionId: connectionId,
Data: Buffer.from(JSON.stringify(payload)),
})
.promise();
}
export const handler = safeHandler(
async (
event: APIGatewayProxyEvent,
context?: Context
): Promise<APIGatewayProxyResult> => {
const connectionId = event.requestContext.connectionId;
if (!connectionId) {
throw new ApiError(400, 'Missing connection ID');
}
let body: any;
try {
body = JSON.parse(event.body || '{}');
} catch {
throw new ApiError(400, 'Invalid JSON in request body');
}
const activityXid = Number(body.activityXid ?? body.activity_xid);
const otherUserXid = Number(body.otherUserXid ?? body.other_user_xid);
const limit = body.limit ? Number(body.limit) : undefined;
if (!activityXid || isNaN(activityXid)) {
throw new ApiError(400, 'Valid activityXid is required');
}
if (!otherUserXid || isNaN(otherUserXid)) {
throw new ApiError(400, 'Valid otherUserXid is required');
}
const connection = await wsService.getConnectionById(connectionId);
if (!connection) {
throw new ApiError(401, 'Unauthorized WebSocket connection');
}
const userXid = connection.userXid;
const messages = await chatService.getMessages({
activityXid,
userXid,
otherUserXid,
limit,
});
const endpoint = getApiGatewayEndpoint(event);
const api = new AWS.ApiGatewayManagementApi({ endpoint });
const payload = { type: 'messages', data: messages };
try {
await postToConnection(api, connectionId, payload);
} catch (err: any) {
if (err?.statusCode === 410) {
await wsService.disconnect(connectionId);
} else {
throw err;
}
}
return {
statusCode: 200,
body: JSON.stringify({
success: true,
message: 'Messages sent',
}),
};
}
);

View File

@@ -0,0 +1,111 @@
import { APIGatewayProxyEvent, APIGatewayProxyResult, Context } from 'aws-lambda';
import AWS from 'aws-sdk';
import { prismaClient } from '../../../common/database/prisma.lambda.service';
import { ChatService } from '../../../common/services/chat.service';
import { WebSocketService } from '../../../common/services/websocket.service';
import { safeHandler } from '../../../common/utils/handlers/safeHandler';
import ApiError from '../../../common/utils/helper/ApiError';
const chatService = new ChatService(prismaClient);
const wsService = new WebSocketService(prismaClient);
function getApiGatewayEndpoint(event: APIGatewayProxyEvent): string {
const domainName = event.requestContext.domainName;
const stage = event.requestContext.stage;
const isLocal =
domainName?.includes('localhost') || domainName?.includes('127.0.0.1');
const protocol = isLocal ? 'http' : 'https';
return `${protocol}://${domainName}/${stage}`;
}
async function postToConnection(
api: AWS.ApiGatewayManagementApi,
connectionId: string,
payload: unknown
) {
await api
.postToConnection({
ConnectionId: connectionId,
Data: Buffer.from(JSON.stringify(payload)),
})
.promise();
}
export const handler = safeHandler(
async (
event: APIGatewayProxyEvent,
context?: Context
): Promise<APIGatewayProxyResult> => {
const connectionId = event.requestContext.connectionId;
if (!connectionId) {
throw new ApiError(400, 'Missing connection ID');
}
let body: any;
try {
body = JSON.parse(event.body || '{}');
} catch {
throw new ApiError(400, 'Invalid JSON in request body');
}
const activityXid = Number(body.activityXid ?? body.activity_xid);
const receiverXid = Number(
body.receiverXid ?? body.receivedXid ?? body.received_xid
);
const message = body.message;
if (!activityXid || isNaN(activityXid)) {
throw new ApiError(400, 'Valid activityXid is required');
}
if (!receiverXid || isNaN(receiverXid)) {
throw new ApiError(400, 'Valid receiverXid is required');
}
const connection = await wsService.getConnectionById(connectionId);
if (!connection) {
throw new ApiError(401, 'Unauthorized WebSocket connection');
}
const senderXid = connection.userXid;
const saved = await chatService.sendMessage({
activityXid,
senderXid,
receiverXid,
message,
});
const endpoint = getApiGatewayEndpoint(event);
const api = new AWS.ApiGatewayManagementApi({ endpoint });
const [receiverConnections, senderConnections] = await Promise.all([
wsService.getConnectionsForUser({ userXid: receiverXid, activityXid }),
wsService.getConnectionsForUser({ userXid: senderXid, activityXid }),
]);
const targets = [...receiverConnections, ...senderConnections];
const payload = { type: 'message', data: saved };
for (const target of targets) {
try {
await postToConnection(api, target.connectionId, payload);
} catch (err: any) {
if (err?.statusCode === 410) {
await wsService.disconnect(target.connectionId);
continue;
}
throw err;
}
}
return {
statusCode: 200,
body: JSON.stringify({
success: true,
message: 'Message sent',
data: saved,
}),
};
}
);