diff --git a/next-app/components/HomeView.tsx b/next-app/components/HomeView.tsx index a433d46..26991c9 100644 --- a/next-app/components/HomeView.tsx +++ b/next-app/components/HomeView.tsx @@ -3,6 +3,7 @@ import { toast } from "react-toastify"; import { Appbar } from "@/components/Appbar"; import { Card, CardHeader, CardContent } from "@/components/ui/card"; import { Button } from "@/components/ui/button"; +import Image from "next/image"; import { Dialog, DialogContent, @@ -13,7 +14,7 @@ import { import React, { useEffect, useState } from "react"; import { useRouter } from "next/navigation"; import CardSkeleton from "./ui/cardSkeleton"; -import Image from "next/image"; + interface Space { @@ -141,8 +142,10 @@ export default function HomeView() { Card background
diff --git a/next-app/components/OldStreamView.tsx b/next-app/components/OldStreamView.tsx index ce9ff0d..51b5a0c 100644 --- a/next-app/components/OldStreamView.tsx +++ b/next-app/components/OldStreamView.tsx @@ -13,6 +13,7 @@ import { YT_REGEX } from "../lib/utils"; import YouTubePlayer from "youtube-player"; import { useSession } from "next-auth/react"; import type { Session } from "next-auth"; +import Image from "next/image"; import { Dialog, DialogContent, @@ -22,7 +23,8 @@ import { DialogFooter, } from "@/components/ui/dialog"; import { DropdownMenu, DropdownMenuTrigger, DropdownMenuContent, DropdownMenuItem, DropdownMenuLabel, DropdownMenuSeparator } from "@/components/ui/dropdown-menu"; -import Image from "next/image"; + + interface Video { id: string; @@ -389,6 +391,8 @@ export default function StreamView({ > {`Thumbnail {currentVideo.bigImg} {`Thumbnail

{video.title}

diff --git a/next-app/components/StreamView/index.tsx b/next-app/components/StreamView/index.tsx index 068df25..0ff8afe 100644 --- a/next-app/components/StreamView/index.tsx +++ b/next-app/components/StreamView/index.tsx @@ -36,9 +36,10 @@ export default function StreamView({ if (socket) { socket.onmessage = async (event) => { const { type, data } = JSON.parse(event.data) || {}; - if (type === "new-stream") { + if (type === `new-stream/${spaceId}`) { + console.log(type) addToQueue(data); - } else if (type === "new-vote") { + } else if (type === `new-vote/${spaceId}`) { setQueue((prev) => { return prev .map((v) => { @@ -59,13 +60,13 @@ export default function StreamView({ } else if (type === "error") { enqueueToast("error", data.message); setLoading(false); - } else if (type === "play-next") { + } else if (type === `play-next/${spaceId}`) { await refreshStreams(); - } else if (type === "remove-song") { + } else if (type === `remove-song/${spaceId}`) { setQueue((prev) => { return prev.filter((stream) => stream.id !== data.streamId); }); - } else if (type === "empty-queue") { + } else if (type === `empty-queue/${spaceId}`) { setQueue([]); } }; diff --git a/next-app/context/socket-context.tsx b/next-app/context/socket-context.tsx index db47ccb..849be42 100644 --- a/next-app/context/socket-context.tsx +++ b/next-app/context/socket-context.tsx @@ -32,6 +32,7 @@ export const SocketContextProvider = ({ children }: PropsWithChildren) => { const [loading, setLoading] = useState(true); const session = useSession(); + useEffect(() => { if (!socket && session.data?.user.id) { const ws = new WebSocket(process.env.NEXT_PUBLIC_WSS_URL as string); diff --git a/next-app/next.config.mjs b/next-app/next.config.mjs index 4f9de89..76c1ebc 100644 --- a/next-app/next.config.mjs +++ b/next-app/next.config.mjs @@ -2,6 +2,9 @@ const nextConfig = { reactStrictMode: false, output: 'standalone', + images: { + domains: ['images.unsplash.com','i.ytimg.com'], + }, }; export default nextConfig; diff --git a/ws/src/StramManager.ts b/ws/src/StramManager.ts index a71509f..fa28927 100644 --- a/ws/src/StramManager.ts +++ b/ws/src/StramManager.ts @@ -31,6 +31,7 @@ export class RoomManager { public prisma: PrismaClient; public queue: Queue; public worker: Worker; + public wstoSpace: Map private constructor() { this.spaces = new Map(); @@ -45,6 +46,7 @@ export class RoomManager { this.worker = new Worker(process.pid.toString(), this.processJob, { connection, }); + this.wstoSpace=new Map(); } static getInstance() { @@ -121,7 +123,6 @@ export class RoomManager { this.spaces.set(spaceId, { users:new Map(), creatorId:"", - musicQueue:[] }); // const roomsString = await this.redisClient.get("rooms"); // if (roomsString) { @@ -142,11 +143,20 @@ export class RoomManager { } async addUser(userId: string, ws: WebSocket, token: string) { - this.users.set(userId, { - userId, - ws, - token, - }); + let user = this.users.get(userId); + if(!user){ + this.users.set(userId, { + userId, + ws:[ws], + token, + }); + } + else{ + if (!user.ws.some(existingWs => existingWs === ws)) { + user.ws.push(ws); + } + } + } async joinRoom( @@ -156,10 +166,12 @@ export class RoomManager { ws: WebSocket, token: string ) { + + console.log("Join Room"+spaceId) let space = this.spaces.get(spaceId); let user = this.users.get(userId); - + if (!space) { await this.createRoom(spaceId); space = this.spaces.get(spaceId); @@ -169,12 +181,19 @@ export class RoomManager { await this.addUser(userId, ws, token); user = this.users.get(userId); } + else{ + if (!user.ws.some(existingWs => existingWs === ws)) { + user.ws.push(ws); + } + } + + this.wstoSpace.set(ws,spaceId); if (space && user) { space.users.set(userId,user) this.spaces.set(spaceId, { ...space, - users:space.users, + users:new Map(space.users), creatorId:creatorId }); @@ -184,11 +203,13 @@ export class RoomManager { publishEmptyQueue(spaceId: string) { const space = this.spaces.get(spaceId); space?.users.forEach((user,userId) => { - user?.ws.send( - JSON.stringify({ - type: "empty-queue", - }) - ); + user?.ws.forEach((ws)=>{ + ws.send( + JSON.stringify({ + type: `empty-queue/${spaceId}`, + }) + ); + }) }); } @@ -222,15 +243,17 @@ export class RoomManager { console.log("publishRemoveSong"); const space = this.spaces.get(spaceId); space?.users.forEach((user,userId) => { - user?.ws.send( - JSON.stringify({ - type: "remove-song", - data: { - streamId, - spaceId - }, - }) - ); + user?.ws.forEach((ws)=>{ + ws.send( + JSON.stringify({ + type: `remove-song/${spaceId}`, + data: { + streamId, + spaceId + }, + }) + ); + }) }); } @@ -259,25 +282,29 @@ export class RoomManager { ); } else{ - user?.ws.send( - JSON.stringify({ - type: "error", - data: { - message: "You cant remove the song . You are not the host", - }, - }) - ); + user?.ws.forEach((ws)=>{ + ws.send( + JSON.stringify({ + type: "error", + data: { + message: "You cant remove the song . You are not the host", + }, + }) + ); + }) } } publishPlayNext(spaceId: string) { const space = this.spaces.get(spaceId); space?.users.forEach((user,userId) => { - user?.ws.send( - JSON.stringify({ - type: "play-next", - }) - ); + user?.ws.forEach((ws)=>{ + ws.send( + JSON.stringify({ + type: `play-next/${spaceId}`, + }) + ); + }) }); } @@ -290,14 +317,16 @@ export class RoomManager { } if (targetUser.userId !== creatorId) { - targetUser.ws.send( - JSON.stringify({ - type: "error", - data: { - message: "You can't perform this action.", - }, - }) - ); + targetUser.ws.forEach((ws)=>{ + ws.send( + JSON.stringify({ + type: "error", + data: { + message: "You can't perform this action.", + }, + }) + ); + }) return; } @@ -314,14 +343,16 @@ export class RoomManager { }); if (!mostUpvotedStream) { - targetUser.ws.send( - JSON.stringify({ - type: "error", - data: { - message: "Please add video in queue", - }, - }) - ); + targetUser.ws.forEach((ws)=>{ + ws.send( + JSON.stringify({ + type: "error", + data: { + message: "Please add video in queue", + }, + }) + ); + }) return; } @@ -380,17 +411,19 @@ export class RoomManager { console.log(process.pid + " publishNewVote"); const spaces= this.spaces.get(spaceId); spaces?.users.forEach((user,userId) => { - user?.ws.send( - JSON.stringify({ - type: "new-vote", - data: { - vote, - streamId, - votedBy, - spaceId - }, - }) - ); + user?.ws.forEach((ws)=>{ + ws.send( + JSON.stringify({ + type: `new-vote/${spaceId}`, + data: { + vote, + streamId, + votedBy, + spaceId + }, + }) + ); + }) }); } @@ -458,14 +491,16 @@ export class RoomManager { ); if (lastVoted) { - currentUser?.ws.send( - JSON.stringify({ - type: "error", - data: { - message: "You can vote after 20 mins", - }, - }) - ); + currentUser?.ws.forEach((ws)=>{ + ws.send( + JSON.stringify({ + type: "error", + data: { + message: "You can vote after 20 mins", + }, + }) + ); + }) return; } } @@ -483,16 +518,20 @@ export class RoomManager { publishNewStream(spaceId: string, data: any) { console.log(process.pid + ": publishNewStream"); console.log("Publish New Stream",spaceId) - const space= RoomManager.getInstance().spaces.get(spaceId); + const space= this.spaces.get(spaceId); + console.log(this.spaces) if (space) { + space?.users.forEach((user,userId) => { - user?.ws.send( - JSON.stringify({ - type: "new-stream", - data: data, - }) - ); + user?.ws.forEach((ws)=>{ + ws.send( + JSON.stringify({ + type: `new-stream/${spaceId}`, + data: data, + }) + ); + }) }); } } @@ -573,14 +612,16 @@ export class RoomManager { }) ); } else { - currentUser?.ws.send( - JSON.stringify({ - type: "error", - data: { - message: "Video not found", - }, - }) - ); + currentUser?.ws.forEach((ws)=>{ + ws.send( + JSON.stringify({ + type: "error", + data: { + message: "Video not found", + }, + }) + ); + }) } } @@ -616,39 +657,45 @@ export class RoomManager { let lastAdded = await this.redisClient.get(`lastAdded-${spaceId}-${currentUserId}`); if (lastAdded) { - currentUser.ws.send( - JSON.stringify({ - type: "error", - data: { - message: "You can add again after 20 min.", - }, - }) - ); + currentUser.ws.forEach((ws)=>{ + ws.send( + JSON.stringify({ + type: "error", + data: { + message: "You can add again after 20 min.", + }, + }) + ); + }) return; } let alreadyAdded = await this.redisClient.get(`${spaceId}-${url}`); if (alreadyAdded) { - currentUser.ws.send( - JSON.stringify({ - type: "error", - data: { - message: "This song is blocked for 1 hour", - }, - }) - ); + currentUser.ws.forEach((ws)=>{ + ws.send( + JSON.stringify({ + type: "error", + data: { + message: "This song is blocked for 1 hour", + }, + }) + ); + }) return; } if (previousQueueLength >= MAX_QUEUE_LENGTH) { - currentUser.ws.send( - JSON.stringify({ - type: "error", - data: { - message: "Queue limit reached", - }, - }) - ); + currentUser.ws.forEach((ws)=>{ + ws.send( + JSON.stringify({ + type: "error", + data: { + message: "Queue limit reached", + }, + }) + ); + }) return; } } @@ -664,42 +711,43 @@ export class RoomManager { disconnect(ws: WebSocket) { console.log(process.pid + ": disconnect"); - let user: string | null = null; - this.users.forEach((usr) => { - if (!user && usr.ws === ws) { - user = usr.userId; + let userId: string | null = null; + const spaceId= this.wstoSpace.get(ws); + this.users.forEach((user,id) => { + const wsIndex= user.ws.indexOf(ws); + + if(wsIndex!==-1){ + userId = id; + user.ws.splice(wsIndex, 1); + } + if(user.ws.length===0){ + this.users.delete(id) } }); - if (user) { - - this.spaces.forEach((space, spaceId) => { - - if (space.users.has(user as string)) { - const updatedUsers = new Map( - Array.from(space.users).filter(([userId]) => userId !== user) - ) - this.spaces.set(spaceId, { - ...space, - users: updatedUsers - }); - } - }); - - this.users.delete(user); + if (userId && spaceId) { + const space=this.spaces.get(spaceId) + if(space){ + const updatedUsers = new Map( + Array.from(space.users).filter(([usrId]) => userId !== usrId) + ) + this.spaces.set(spaceId, { + ...space, + users: updatedUsers + }); + } } } } type User = { userId: string; - ws: WebSocket; + ws: WebSocket[]; token: string; }; type Space = { creatorId: string; users: Map; - musicQueue: any[]; -}; +}; \ No newline at end of file