Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spectate Remote Control v1 #426

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
87a4a7f
preliminary Specate Remote Control server
jmlee337 Mar 7, 2024
5d3ae46
oopsies revert onRefreshBroadcast sig in SpectatePage
jmlee337 Mar 8, 2024
d95fb10
add Spectate Remote Control feedback
jmlee337 Mar 8, 2024
68fd4ff
Spectate Remote Control game-end-event
jmlee337 Mar 9, 2024
52e0b9e
game-end-event order matters
jmlee337 Mar 9, 2024
abccb7a
specify port from UI
jmlee337 Mar 11, 2024
6097d7b
correctly track remote server state
jmlee337 Mar 11, 2024
957c24f
only create remote spectate worker on first server start
jmlee337 Mar 11, 2024
1b20925
throttle list-broadcasts-request to 2000 ms to match UI refreshBroadc…
jmlee337 Mar 11, 2024
b81df35
seems to work except switching broadcasts mid game doesn't seem to work
jmlee337 Mar 8, 2024
e694fdb
delay launchPlaybackDolphin to make sure we never invoke more often t…
jmlee337 Mar 12, 2024
02828e9
spectate-broadcast-request check broadcastId is a string
jmlee337 Mar 12, 2024
d94cc59
fix some oopsies
jmlee337 Mar 12, 2024
4b6d104
use scoped electronLog in remote.server.ts
jmlee337 Mar 12, 2024
6512f29
if `dolphinId` is specified, use it as `broadcasterName`
jmlee337 Mar 28, 2024
2710578
refactor: export SpectateController
vinceau Apr 15, 2024
148efec
refactor: fix naming
vinceau Apr 15, 2024
85e87be
clean up vestigial `remote` reconnect logic
jmlee337 Apr 16, 2024
c527bf8
move defer comms file write logic to PlaybackDolphinInstance
jmlee337 Apr 16, 2024
fc402f2
fix build errors!
jmlee337 Apr 16, 2024
ce47147
clean up one last unnecessary handler in remote_server
jmlee337 Apr 16, 2024
8c86b30
explicitly bind to `127.0.0.1` instead of `localhost`
jmlee337 Apr 19, 2024
0a8faa3
send client spectating broadcasts on connect for bookkeeping
jmlee337 Apr 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/broadcast/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
ipc_broadcastErrorOccurredEvent,
ipc_broadcastListUpdatedEvent,
ipc_broadcastReconnectEvent,
ipc_connect,
ipc_dolphinStatusChangedEvent,
ipc_refreshBroadcastList,
ipc_slippiStatusChangedEvent,
Expand Down Expand Up @@ -58,8 +59,11 @@ const broadcastApi: BroadcastService = {
});
return destroy;
},
async refreshBroadcastList(authToken: string): Promise<void> {
await ipc_refreshBroadcastList.renderer!.trigger({ authToken });
async connect(authToken: string): Promise<void> {
await ipc_connect.renderer!.trigger({ authToken });
},
async refreshBroadcastList(): Promise<void> {
await ipc_refreshBroadcastList.renderer!.trigger({});
},
async watchBroadcast(broadcasterId: string): Promise<void> {
await ipc_watchBroadcast.renderer!.trigger({ broadcasterId });
Expand Down
7 changes: 2 additions & 5 deletions src/broadcast/ipc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@ import { _, makeEndpoint } from "utils/ipc";
import type { BroadcasterItem, StartBroadcastConfig } from "./types";

// Handlers
export const ipc_connect = makeEndpoint.main("connect", <{ authToken: string }>_, <SuccessPayload>_);

export const ipc_refreshBroadcastList = makeEndpoint.main(
"refreshBroadcastList",
<{ authToken: string }>_,
<SuccessPayload>_,
);
export const ipc_refreshBroadcastList = makeEndpoint.main("refreshBroadcastList", <EmptyPayload>_, <SuccessPayload>_);

export const ipc_watchBroadcast = makeEndpoint.main("watchBroadcast", <{ broadcasterId: string }>_, <SuccessPayload>_);

Expand Down
42 changes: 37 additions & 5 deletions src/broadcast/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,29 @@ import log from "electron-log";

import type { BroadcastWorker } from "./broadcast.worker.interface";
import { createBroadcastWorker } from "./broadcast.worker.interface";
import { ipc_refreshBroadcastList, ipc_startBroadcast, ipc_stopBroadcast, ipc_watchBroadcast } from "./ipc";
import {
ipc_connect,
ipc_refreshBroadcastList,
ipc_startBroadcast,
ipc_stopBroadcast,
ipc_watchBroadcast,
} from "./ipc";
import type { SpectateWorker } from "./spectate.worker.interface";
import { createSpectateWorker } from "./spectate.worker.interface";
import type { SpectateController } from "./types";

export default function setupBroadcastIpc({
settingsManager,
dolphinManager,
}: {
settingsManager: SettingsManager;
dolphinManager: DolphinManager;
}) {
}): {
getSpectateController: () => Promise<SpectateController>;
} {
let spectateWorker: SpectateWorker | undefined;
let broadcastWorker: BroadcastWorker | undefined;
let prefixOrdinal = 0;

dolphinManager.events
.filter<DolphinPlaybackClosedEvent>((event) => {
Expand All @@ -31,11 +41,21 @@ export default function setupBroadcastIpc({
}
});

ipc_refreshBroadcastList.main!.handle(async ({ authToken }) => {
ipc_connect.main!.handle(async ({ authToken }) => {
if (!spectateWorker) {
spectateWorker = await createSpectateWorker(dolphinManager);
}
await spectateWorker.refreshBroadcastList(authToken);
await spectateWorker.connect(authToken);
return { success: true };
});

ipc_refreshBroadcastList.main!.handle(async () => {
Preconditions.checkExists(
spectateWorker,
"Could not refresh broadcast list, make sure spectateWorker is connected.",
);

await spectateWorker.refreshBroadcastList();
return { success: true };
});

Expand All @@ -46,7 +66,8 @@ export default function setupBroadcastIpc({
);

const folderPath = settingsManager.get().settings.spectateSlpPath;
await spectateWorker.startSpectate(broadcasterId, folderPath);
await spectateWorker.startSpectate(broadcasterId, folderPath, { idPostfix: `broadcast${prefixOrdinal}` });
prefixOrdinal += 1;
return { success: true };
});

Expand All @@ -66,4 +87,15 @@ export default function setupBroadcastIpc({

return { success: true };
});

const getSpectateController = async (): Promise<SpectateController> => {
if (!spectateWorker) {
spectateWorker = await createSpectateWorker(dolphinManager);
}
return spectateWorker;
};

return {
getSpectateController,
};
}
32 changes: 26 additions & 6 deletions src/broadcast/spectate.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,23 @@ import { Observable, Subject } from "threads/observable";
import { expose } from "threads/worker";

import { SpectateManager } from "./spectate_manager";
import type { BroadcasterItem } from "./types";
import type { BroadcasterItem, SpectateDolphinOptions } from "./types";
import { SpectateEvent } from "./types";

interface Methods {
dispose: () => Promise<void>;
startSpectate(broadcastId: string, targetPath: string): Promise<void>;
startSpectate(broadcastId: string, targetPath: string, dolphinOptions: SpectateDolphinOptions): Promise<string>;
stopSpectate(broadcastId: string): Promise<void>;
dolphinClosed(playbackId: string): Promise<void>;
refreshBroadcastList(authToken: string): Promise<void>;
connect(authToken: string): Promise<void>;
refreshBroadcastList(): Promise<void>;
getOpenBroadcasts(): Promise<{ broadcastId: string; dolphinId: string }[]>;
getLogObservable(): Observable<string>;
getErrorObservable(): Observable<Error | string>;
getBroadcastListObservable(): Observable<BroadcasterItem[]>;
getSpectateDetailsObservable(): Observable<{ playbackId: string; filePath: string; broadcasterName: string }>;
getReconnectObservable(): Observable<Record<never, never>>;
getGameEndObservable(): Observable<string>;
}

export type WorkerSpec = ModuleMethods & Methods;
Expand All @@ -32,6 +35,7 @@ const errorSubject = new Subject<Error | string>();
const broadcastListSubject = new Subject<BroadcasterItem[]>();
const spectateDetailsSubject = new Subject<{ playbackId: string; filePath: string; broadcasterName: string }>();
const reconnectSubject = new Subject<Record<never, never>>();
const gameEndSubject = new Subject<string>();

// Forward the events to the renderer
spectateManager.on(SpectateEvent.BROADCAST_LIST_UPDATE, async (data: BroadcasterItem[]) => {
Expand All @@ -54,6 +58,10 @@ spectateManager.on(SpectateEvent.RECONNECT, async () => {
reconnectSubject.next({});
});

spectateManager.on(SpectateEvent.GAME_END, async (dolphinId: string) => {
gameEndSubject.next(dolphinId);
});

const methods: WorkerSpec = {
async dispose(): Promise<void> {
// Clean up worker
Expand All @@ -65,19 +73,28 @@ const methods: WorkerSpec = {

spectateManager.removeAllListeners();
},
async startSpectate(broadcastId: string, targetPath: string): Promise<void> {
await spectateManager.watchBroadcast(broadcastId, targetPath);
async startSpectate(
broadcastId: string,
targetPath: string,
dolphinOptions: SpectateDolphinOptions,
): Promise<string> {
return await spectateManager.watchBroadcast(broadcastId, targetPath, dolphinOptions);
},
async stopSpectate(broadcastId: string): Promise<void> {
spectateManager.stopWatchingBroadcast(broadcastId);
},
async dolphinClosed(playbackId: string): Promise<void> {
spectateManager.handleClosedDolphin(playbackId);
},
async refreshBroadcastList(authToken: string): Promise<void> {
async connect(authToken: string): Promise<void> {
await spectateManager.connect(authToken);
},
async refreshBroadcastList(): Promise<void> {
await spectateManager.refreshBroadcastList();
},
async getOpenBroadcasts() {
return await spectateManager.getOpenBroadcasts();
},
getLogObservable(): Observable<string> {
return Observable.from(logSubject);
},
Expand All @@ -93,6 +110,9 @@ const methods: WorkerSpec = {
getReconnectObservable(): Observable<Record<never, never>> {
return Observable.from(reconnectSubject);
},
getGameEndObservable(): Observable<string> {
return Observable.from(gameEndSubject);
},
};

expose(methods);
56 changes: 34 additions & 22 deletions src/broadcast/spectate_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ import * as fs from "fs-extra";
import type { connection, Message } from "websocket";
import { client as WebSocketClient } from "websocket";

import type { BroadcasterItem } from "./types";
import type { BroadcasterItem, SpectateDolphinOptions } from "./types";
import { SpectateEvent } from "./types";

const SLIPPI_WS_SERVER = process.env.SLIPPI_WS_SERVER;

const DOLPHIN_INSTANCE_ID = "spectate";

type BroadcastInfo = {
broadcastId: string;
cursor: string;
Expand All @@ -20,7 +18,11 @@ type BroadcastInfo = {
dolphinId: string;
};

const generatePlaybackId = (broadcastId: string) => `spectate-${broadcastId}`;
// since we can have multiple SpectateManagers, we need to be careful about creating colliding ids
const generatePlaybackId = (postfix?: string) => {
const now = Date.now().toString();
return postfix ? now + postfix : now;
};

/**
* Responsible for retrieving Dolphin game data over enet and sending the data
Expand Down Expand Up @@ -67,6 +69,9 @@ export class SpectateManager extends EventEmitter {
this.emit(SpectateEvent.LOG, "Game end explicit");
broadcastInfo.fileWriter.endCurrentFile();
broadcastInfo.gameStarted = false;

// Observers should be able to depend on the file being closed, so emit this last.
this.emit(SpectateEvent.GAME_END, broadcastInfo.dolphinId);
break;
}
case "game_event": {
Expand Down Expand Up @@ -219,37 +224,36 @@ export class SpectateManager extends EventEmitter {
*
* @param {string} broadcastId The ID of the broadcast to watch
* @param {string} targetPath Where the SLP files should be stored
* @param {true} [singleton] If true, it will open the broadcasts only
* in a single Dolphin window. Opens each broadcast in their own window otherwise.
* @param {SpectateDolphinOptions} dolphinOptions Options for playback dolphin. One of `dolphinId` or `idPostfix` must be specified
* @param {string?} dolphinOptions.dolphinId The ID of the dolphin window to use, will create a dolphin window with the ID if none exists. If not specified, a dolphin ID will be generated
* @param {string?} dolphinOptions.idPostfix A postfix to use with the generated dolphin ID to avoid collisions
* @returns {string} The ID of the dolphin window used
*/
public watchBroadcast(broadcastId: string, targetPath: string, singleton?: true) {
public watchBroadcast(broadcastId: string, targetPath: string, dolphinOptions: SpectateDolphinOptions) {
Preconditions.checkExists(this.wsConnection, "No websocket connection");

const existingBroadcasts = Object.keys(this.openBroadcasts);
if (existingBroadcasts.includes(broadcastId)) {
// We're already watching this broadcast!
const openBroadcast = this.openBroadcasts[broadcastId];
if (openBroadcast) {
this.emit(SpectateEvent.LOG, `We are already watching the selected broadcast`);
return;
return openBroadcast.dolphinId;
}

let dolphinPlaybackId = generatePlaybackId(broadcastId);

// We're only watching one at at time so stop other broadcasts
if (singleton) {
existingBroadcasts.forEach((broadcastInfo) => {
this.stopWatchingBroadcast(broadcastInfo);
});

// Use the default playback ID
dolphinPlaybackId = DOLPHIN_INSTANCE_ID;
if (dolphinOptions.dolphinId) {
const existingDolphin = Object.values(this.openBroadcasts).find(
(broadcastInfo) => broadcastInfo.dolphinId === dolphinOptions.dolphinId,
);
if (existingDolphin) {
this.stopWatchingBroadcast(existingDolphin.broadcastId);
}
}
const dolphinPlaybackId = dolphinOptions.dolphinId || generatePlaybackId(dolphinOptions.idPostfix);
const broadcasterName = dolphinOptions.dolphinId || this.availableBroadcasts[broadcastId].name;

fs.ensureDirSync(targetPath);
const slpFileWriter = new SlpFileWriter({
folderPath: targetPath,
});

const broadcasterName = this.availableBroadcasts[broadcastId].name;
slpFileWriter.on(SlpFileWriterEvent.NEW_FILE, (currFilePath) => {
this._playFile(currFilePath, dolphinPlaybackId, broadcasterName).catch(console.warn);
});
Expand All @@ -273,6 +277,7 @@ export class SpectateManager extends EventEmitter {
// used to clear out any previous file that we were reading for. The file will get updated
// by the fileWriter
this._playFile("", dolphinPlaybackId, broadcasterName).catch(console.warn);
return dolphinPlaybackId;
}

public handleClosedDolphin(playbackId: string) {
Expand All @@ -292,4 +297,11 @@ export class SpectateManager extends EventEmitter {

this.stopWatchingBroadcast(broadcastInfo.broadcastId);
}

public getOpenBroadcasts() {
return Object.values(this.openBroadcasts).map((value) => ({
broadcastId: value.broadcastId,
dolphinId: value.dolphinId,
}));
}
}
21 changes: 20 additions & 1 deletion src/broadcast/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { DolphinMessageType } from "@slippi/slippi-js";
import type { Observable } from "observable-fns";

export type BroadcasterItem = {
broadcaster: {
Expand Down Expand Up @@ -31,6 +32,7 @@ export enum SpectateEvent {
NEW_FILE = "NEW_FILE",
LOG = "LOG",
RECONNECT = "RECONNECT",
GAME_END = "GAME_END",
}

type TypeMap<M extends { [index: string]: any }> = {
Expand Down Expand Up @@ -71,8 +73,25 @@ export type BroadcastService = {
onDolphinStatusChanged(handle: (status: number) => void): () => void;
onSlippiStatusChanged(handle: (status: number) => void): () => void;
onSpectateErrorMessage(handle: (message: string | null) => void): () => void;
refreshBroadcastList(authToken: string): Promise<void>;
connect(authToken: string): Promise<void>;
refreshBroadcastList(): Promise<void>;
watchBroadcast(broadcasterId: string): Promise<void>;
startBroadcast(config: StartBroadcastConfig): Promise<void>;
stopBroadcast(): Promise<void>;
};

export type SpectateDolphinOptions = {
dolphinId?: string;
idPostfix?: string;
};

export interface SpectateController {
startSpectate(broadcastId: string, targetPath: string, dolphinOptions: SpectateDolphinOptions): Promise<string>;
dolphinClosed(playbackId: string): Promise<void>;
connect(authToken: string): Promise<void>;
refreshBroadcastList(): Promise<void>;
getOpenBroadcasts(): Promise<{ broadcastId: string; dolphinId: string }[]>;
getBroadcastListObservable(): Observable<BroadcasterItem[]>;
getSpectateDetailsObservable(): Observable<{ playbackId: string; filePath: string; broadcasterName: string }>;
getGameEndObservable(): Observable<string>;
}
Loading
Loading