Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rewrite concurrency manager #8

Merged
merged 17 commits into from
Dec 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions src/Indomitable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import Cluster, { ClusterSettings } from 'node:cluster';
import EventEmitter from 'node:events';
import Os from 'node:os';
import { clearTimeout } from 'timers';
import { ConcurrencyManager } from './concurrency/ConcurrencyManager';
import { ConcurrencyServer } from './concurrency/ConcurrencyServer';
import { ShardClient } from './client/ShardClient';
import { ClusterManager } from './manager/ClusterManager.js';
import {
Expand All @@ -20,6 +20,7 @@ import {
Sendable
} from './Util';


/**
* Options to control Indomitable behavior
*/
Expand Down Expand Up @@ -143,7 +144,7 @@ export class Indomitable extends EventEmitter {
public clusterCount: number|'auto';
public shardCount: number|'auto';
public cachedSession?: SessionObject;
public concurrencyManager?: ConcurrencyManager;
public concurrencyServer?: ConcurrencyServer;
public readonly clientOptions: DiscordJsClientOptions;
public readonly clusterSettings: ClusterSettings;
public readonly ipcTimeout: number;
Expand Down Expand Up @@ -187,7 +188,7 @@ export class Indomitable extends EventEmitter {
this.token = options.token;
this.clusters = new Map();
this.spawnQueue = [];
this.concurrencyManager = undefined;
this.concurrencyServer = undefined;
this.cachedSession = undefined;
this.busy = false;
}
Expand Down Expand Up @@ -231,8 +232,9 @@ export class Indomitable extends EventEmitter {
}
if (this.handleConcurrency) {
const sessions = await this.fetchSessions();
this.concurrencyManager = new ConcurrencyManager(sessions.session_start_limit.max_concurrency);
this.emit(LibraryEvents.DEBUG, 'Handle concurrency is currently enabled. Indomitable will automatically handle your identifies that can result to more stable connection');
this.concurrencyServer = new ConcurrencyServer(this, sessions.session_start_limit.max_concurrency);
const info = await this.concurrencyServer.start();
this.emit(LibraryEvents.DEBUG, `Handle concurrency is currently enabled! =>\n Server is currently bound to:\n Address: ${info.address}:${info.port}\n Concurrency: ${sessions.session_start_limit.max_concurrency}`);
}
if (typeof this.clusterCount !== 'number')
this.clusterCount = Os.cpus().length;
Expand Down
37 changes: 27 additions & 10 deletions src/Util.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Https, { RequestOptions } from 'node:https';
import Http from 'node:http';
import { WebSocketShardEvents } from '@discordjs/ws';

/**
Expand Down Expand Up @@ -41,8 +42,6 @@ export enum InternalOps {
RESTART = 'restart',
RESTART_ALL = 'restartAll',
DESTROY_CLIENT = 'destroyClient',
REQUEST_IDENTIFY = 'requestIdentify',
CANCEL_IDENTIFY = 'cancelIdentify',
SESSION_INFO = 'sessionInfo',
PING = 'ping'
}
Expand Down Expand Up @@ -212,27 +211,42 @@ export interface SessionObject {
};
}

export interface FetchResponse {
code: number;
message: string;
body?: any;
}

/**
* Wrapper function for fetching data using HTTP
* @param url URL of resource to fetch
* @param options RequestOptions to modify behavior
* @returns A promise containing data fetched, or an error
*/
export function Fetch(url: string|URL, options: RequestOptions): Promise<any> {
export function Fetch(url: string, options: RequestOptions): Promise<FetchResponse> {
return new Promise((resolve, reject) => {
const request = Https.request(url, options, response => {
let client;

if (url.startsWith('https')) {
client = Https.request;
} else if (url.startsWith('http')) {
client = Http.request;
} else {
throw new Error('Unknown url protocol');
}

const request = client(url, options, response => {
const chunks: any[] = [];

response.on('data', chunk => chunks.push(chunk));
response.on('error', reject);
response.on('end', () => {
const code = response.statusCode ?? 500;
const body = chunks.join('');
if (code >= 200 && code <= 299)
resolve(body);
else
reject(new Error(`Response received is not ok, Status Code: ${response.statusCode}, body: ${body}`));
resolve({ code, body, message: response.statusMessage ?? '' });
});
});

request.on('error', reject);
request.end();
});
Expand All @@ -245,11 +259,14 @@ export function Fetch(url: string|URL, options: RequestOptions): Promise<any> {
*/
export async function FetchSessions(token: string): Promise<SessionObject> {
const url = new URL('https://discord.com/api/v10/gateway/bot');
const data = await Fetch(url, {
const response = await Fetch(url.toString(), {
method: 'GET',
headers: { authorization: `Bot ${token}` }
});
return JSON.parse(data);
if (response.code >= 200 && response.code <= 299)
return JSON.parse(response.body);
else
throw new Error(`Response received is not ok, code: ${response.code}`)
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/client/ShardClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export class ShardClient {
clientOptions.shardCount = EnvProcessData.shardCount;
if (manager.handleConcurrency) {
if (!clientOptions.ws) clientOptions.ws = {};
clientOptions.ws.buildIdentifyThrottler = () => Promise.resolve(new ConcurrencyClient(new BaseWorker()));
clientOptions.ws.buildIdentifyThrottler = () => Promise.resolve(new ConcurrencyClient());
}
this.client = new manager.client(clientOptions);
// @ts-expect-error: Override shard client util with indomitable shard client util
Expand Down
15 changes: 6 additions & 9 deletions src/concurrency/AsyncQueue.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
import { EventEmitter, once } from 'events';
import { EventEmitter, once } from 'node:events';

export declare interface AsyncQueueWaitOptions {
signal?: AbortSignal | undefined;
}

export declare interface AsyncQueueEmitter extends EventEmitter {
on(event: 'resolve', listener: (message: string) => void): this;
once(event: 'resolve', listener: (message: string) => void): this;
off(event: 'resolve', listener: (event: unknown) => void): this;
}

export class AsyncQueue {
private queue: AsyncQueueEmitter[];
private readonly queue: NodeJS.EventEmitter[];
constructor() {
this.queue = [];
}
Expand All @@ -21,9 +15,11 @@ export class AsyncQueue {
}

public wait({ signal }: AsyncQueueWaitOptions): Promise<void[]> {

const next = this.remaining ? once(this.queue[this.remaining - 1], 'resolve', { signal }) : Promise.resolve([]);

const emitter: AsyncQueueEmitter = new EventEmitter();
const emitter = new EventEmitter() as NodeJS.EventEmitter;

this.queue.push(emitter);

if (signal) {
Expand All @@ -39,6 +35,7 @@ export class AsyncQueue {

public shift(): void {
const emitter = this.queue.shift();

if (typeof emitter !== 'undefined') emitter.emit('resolve');
}
}
72 changes: 45 additions & 27 deletions src/concurrency/ConcurrencyClient.ts
Original file line number Diff line number Diff line change
@@ -1,44 +1,62 @@
import { InternalOps, InternalOpsData } from '../Util';
import { BaseWorker } from '../ipc/BaseWorker';
import { Delay, Fetch } from '../Util';

/**
* Internal class that is passed to @discordjs/ws to handle concurrency
*/
export class ConcurrencyClient {
private ipc: BaseWorker;
constructor(ipc: BaseWorker) {
this.ipc = ipc;
private readonly address: string;
private readonly port: number;
private readonly password: string;

constructor() {
this.address = process.env.INDOMITABLE_CONCURRENCY_SERVER_ADDRESS!;
this.port = Number(process.env.INDOMITABLE_CONCURRENCY_SERVER_PORT!);
this.password = process.env.INDOMITABLE_CONCURRENCY_SERVER_PASSWORD!;
}

/**
* Method to try and acquire a lock for identify
* Method to try and acquire a lock for identify. This could never error or else it would hang out the whole system.
* Look at (https://github.com/discordjs/discord.js/blob/f1bce54a287eaa431ceb8b1996db87cbc6290317/packages/ws/src/strategies/sharding/WorkerShardingStrategy.ts#L321)
* If it errors that isn't anything from websocket shard, this will have issues
*/
public async waitForIdentify(shardId: number, signal: AbortSignal): Promise<void> {
const content: InternalOpsData = {
op: InternalOps.REQUEST_IDENTIFY,
data: { shardId },
internal: true
};
const listener = () => this.abortIdentify(shardId);
const url = new URL(`http://${this.address}:${this.port}/concurrency/acquire`);
url.searchParams.append('shardId', shardId.toString());

const listener = () => {
const url = new URL(`http://${this.address}:${this.port}/concurrency/cancel`);

url.searchParams.append('shardId', shardId.toString());

Fetch(url.toString(), {
method: 'DELETE',
headers: { authorization: this.password }
}).catch(() => null);
}

try {
signal.addEventListener('abort', listener);
await this.ipc.send({ content, repliable: true });

const response = await Fetch(url.toString(), {
method: 'POST',
headers: { authorization: this.password }
});

if (response.code === 202 || response.code === 204) {
// aborted request || ok request
return;
}

if (response.code >= 400 && response.code <= 499) {
// something happened server didn't accept your req
await Delay(1000);
return;
}
} catch (_) {
// this should not happen but we just delay if it happens
await Delay(1000);
} finally {
signal.removeEventListener('abort', listener);
}
}

/**
* Aborts an acquire lock request
*/
private abortIdentify(shardId: number): void {
const content: InternalOpsData = {
op: InternalOps.CANCEL_IDENTIFY,
data: { shardId },
internal: true
};
this.ipc
.send({ content, repliable: false })
.catch(() => null);
}
}
118 changes: 118 additions & 0 deletions src/concurrency/ConcurrencyServer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import { AddressInfo } from 'node:net';
import { ConcurrencyManager } from './ConcurrencyManager';
import { Indomitable } from '../Indomitable';
import { LibraryEvents } from '../Util';
import Http from 'node:http';
import QueryString from 'node:querystring';

/**
* Server that handles identify locks
*/
export class ConcurrencyServer {
private readonly manager: Indomitable;
/**
* Http server of this instance
* @private
*/
private readonly server: Http.Server;
/**
* Concurrency manager for this server
* @private
*/
private readonly concurrency: ConcurrencyManager;
/**
* Randomly generated password to secure this server
* @private
*/
private readonly password: string;

constructor(manager: Indomitable, concurrency: number) {
this.manager = manager;
this.server = Http.createServer((req, res) => this.handle(req, res));
this.concurrency = new ConcurrencyManager(concurrency);
this.password = Math.random().toString(36).slice(2, 10);
}

/**
* Gets the randomly generated password for this instance
*/
public get key(): string {
return this.password;
}

/**
* Gets the address info assigned for this instance
*/
public get info(): AddressInfo {
return this.server.address() as AddressInfo;
}

/**
* Handles the incoming requests
* @param request
* @param response
* @private
*/
private async handle(request: Http.IncomingMessage, response: Http.ServerResponse): Promise<void> {
if (!request.url || request.method !== 'POST' && request.method !== 'DELETE') {
response.statusCode = 404;
response.statusMessage = 'Not Found';
return void response.end();
}

if (request.headers['authorization'] !== this.password) {
response.statusCode = 401;
response.statusMessage = 'Unauthorized';
return void response.end();
}

if (!request.url.includes('?shardId=')) {
response.statusCode = 400;
response.statusMessage = 'Bad Request';
return void response.end('Missing shardId query string');
}

const shardId = Number(request.url.split('?shardId=')[1]);

if (isNaN(shardId)) {
response.statusCode = 400;
response.statusMessage = 'Bad Request';
return void response.end('Expected shardId to be a number');
}

this.manager.emit(LibraryEvents.DEBUG, `Received a request in concurrency server! =>\n Url: ${request.url}\n Method: ${request.method}\n ShardId: ${shardId}`);

if (request.method === 'DELETE' && request.url.includes('/concurrency/cancel')) {
this.concurrency.abortIdentify(shardId);
response.statusCode = 200;
response.statusMessage = 'OK';
return void response.end();
}

if (request.method === 'POST' && request.url.includes('/concurrency/acquire')) {
try {
await this.concurrency.waitForIdentify(shardId);
response.statusCode = 204;
response.statusMessage = 'No Content';
return void response.end();
} catch (error) {
response.statusCode = 202;
response.statusMessage = 'Accepted';
return void response.end('Acquire lock cancelled');
}
}

response.statusCode = 404;
response.statusMessage = 'Not Found';
return void response.end();
}

/**
* Starts this server
*/
public start(): Promise<AddressInfo> {
return new Promise((resolve) => {
this.server.listen(0 , '127.0.0.1', () => resolve(this.info));
})
}
}
Loading
Loading