Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backend): 1つのMisskeyで複数のHTTPサーバプロセスを起動できるように #15398

Open
wants to merge 12 commits into
base: develop
Choose a base branch
from
27 changes: 27 additions & 0 deletions .config/docker_example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,33 @@ id: 'aidx'
# Number of worker processes
#clusterLimit: 1

# Cluster configuration
#cluster:
# ---------------------------------------------------------
# This setting determines the role of worker processes.
# Roles refer to the HTTP server (Web API server) function and the Job Queue function.
# You can configure how many worker processes should be started for each role, such as two worker processes for the HTTP server role and four for the Job Queue role.
#
# The workers setting is an array of the following objects:
# name: The name of the worker process (optional).
# instances: The number of worker processes to start for this configuration. The total of these values must not exceed the clusterLimit setting.
# type: Specifies the role of the worker process. Available options are http and jobQueue. Since this is an array, a worker process can have multiple roles.
# ---------------------------------------------------------
# workers:
# - name: http worker
# instances: 2
# type:
# - http
# - name: jobQueue worker
# instances: 4
# type:
# - jobQueue
# - name: both worker
# instances: 2
# type:
# - http
# - jobQueue

# Job concurrency per worker
# deliverJobConcurrency: 128
# inboxJobConcurrency: 16
Expand Down
27 changes: 27 additions & 0 deletions .config/example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,33 @@ id: 'aidx'
# Number of worker processes
#clusterLimit: 1

# Cluster configuration
#cluster:
# ---------------------------------------------------------
# This setting determines the role of worker processes.
# Roles refer to the HTTP server (Web API server) function and the Job Queue function.
# You can configure how many worker processes should be started for each role, such as two worker processes for the HTTP server role and four for the Job Queue role.
#
# The workers setting is an array of the following objects:
# name: The name of the worker process (optional).
# instances: The number of worker processes to start for this configuration. The total of these values must not exceed the clusterLimit setting.
# type: Specifies the role of the worker process. Available options are http and jobQueue. Since this is an array, a worker process can have multiple roles.
# ---------------------------------------------------------
# workers:
# - name: http worker
# instances: 2
# type:
# - http
# - name: jobQueue worker
# instances: 4
# type:
# - jobQueue
# - name: both worker
# instances: 2
# type:
# - http
# - jobQueue

# Job concurrency per worker
#deliverJobConcurrency: 128
#inboxJobConcurrency: 16
Expand Down
3 changes: 1 addition & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
-

### Server
-

- 1つのMisskeyで複数のHTTPサーバプロセスを起動できるように ( #13662 )

## 2025.2.0

Expand Down
53 changes: 47 additions & 6 deletions packages/backend/src/boot/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@
* SPDX-License-Identifier: AGPL-3.0-only
*/

import os from 'node:os';
import * as process from 'node:process';
import { INestApplicationContext } from '@nestjs/common';
import { NestFactory } from '@nestjs/core';
import { Config } from '@/config.js';
import { ChartManagementService } from '@/core/chart/ChartManagementService.js';
import { QueueProcessorService } from '@/queue/QueueProcessorService.js';
import { NestLogger } from '@/NestLogger.js';
import { QueueProcessorModule } from '@/queue/QueueProcessorModule.js';
import { QueueStatsService } from '@/daemons/QueueStatsService.js';
import { ServerStatsService } from '@/daemons/ServerStatsService.js';
import { ServerService } from '@/server/ServerService.js';
import { MainModule } from '@/MainModule.js';
import { NestLogger } from '@/NestLogger.js';
import { QueueProcessorModule } from '@/queue/QueueProcessorModule.js';
import { QueueProcessorService } from '@/queue/QueueProcessorService.js';
import { ServerService } from '@/server/ServerService.js';

export async function server() {
export async function server(): Promise<INestApplicationContext> {
const app = await NestFactory.createApplicationContext(MainModule, {
logger: new NestLogger(),
});
Expand All @@ -30,7 +34,7 @@ export async function server() {
return app;
}

export async function jobQueue() {
export async function jobQueue(): Promise<INestApplicationContext> {
const jobQueue = await NestFactory.createApplicationContext(QueueProcessorModule, {
logger: new NestLogger(),
});
Expand All @@ -40,3 +44,40 @@ export async function jobQueue() {

return jobQueue;
}

export function actualClusterLimit(config: Partial<Config>): number {
return Math.min(config.clusterLimit ?? 1, cpuCount);
}

/** メインプロセス上でHTTPサーバモジュールを動作させるべきかを判断する */
export function isHttpServerOnPrimary(config: Partial<Config>): boolean {
const actualLimit = actualClusterLimit(config);
if (actualLimit === 1) {
// - クラスタ数の設定が無い(デフォルト値1を使用)
// - クラスタ数の設定が存在するものの、値が1である
// - そもそもCPUコアが1つしかない
return true;
}

if (!config.cluster?.workers || config.cluster.workers.length === 0) {
// - ワーカーの構成が無い
return true;
}

// ワーカーの構成が存在する+httpサーバ用プロセスとする設定が1つ以下のようなケースも考えられるが、ケアしない
// (明示的にそのようなconfigを記述しているので、挙動を理解したうえでの設定と判断する)
return false;
}

// for testing
export function setCpuCount(count: number): void {
// だいぶ苦肉の策だが、jestで上手くmockできないためこのような実装になっている
if (process.env.NODE_ENV !== 'test') {
throw new Error('This function is only available in test environment');
}

cpuCount = count;
}

// for testing
let cpuCount = os.cpus().length;
12 changes: 12 additions & 0 deletions packages/backend/src/boot/const.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/

// 環境変数を経由するので名前の衝突を避けるためにアンダースコアを付ける
export type WorkerArguments = {
__workerIndex: number;
__workerName?: string;
__moduleServer: boolean;
__moduleJobQueue: boolean;
}
10 changes: 6 additions & 4 deletions packages/backend/src/boot/entry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@ import Xev from 'xev';
import Logger from '@/logger.js';
import { envOption } from '../env.js';
import { masterMain } from './master.js';
import { workerMain } from './worker.js';
import { parseWorkerArguments, workerMain } from './worker.js';
import { readyRef } from './ready.js';

import 'reflect-metadata';

process.title = `Misskey (${cluster.isPrimary ? 'master' : 'worker'})`;

Error.stackTraceLimit = Infinity;
EventEmitter.defaultMaxListeners = 128;

Expand Down Expand Up @@ -71,17 +69,21 @@ process.on('exit', code => {
if (!envOption.disableClustering) {
if (cluster.isPrimary) {
logger.info(`Start main process... pid: ${process.pid}`);
process.title = 'Misskey (master)';
await masterMain();
ev.mount();
} else if (cluster.isWorker) {
logger.info(`Start worker process... pid: ${process.pid}`);
await workerMain();
const workerArguments = parseWorkerArguments(process.env);
process.title = `Misskey (worker${workerArguments.__workerName ? `: ${workerArguments.__workerName}` : ''})`;
await workerMain(workerArguments);
} else {
throw new Error('Unknown process type');
}
} else {
// 非clusterの場合はMasterのみが起動するため、Workerの処理は行わない(cluster.isWorker === trueの状態でこのブロックに来ることはない)
logger.info(`Start main process... pid: ${process.pid}`);
process.title = 'Misskey (master)';
await masterMain();
ev.mount();
}
Expand Down
59 changes: 24 additions & 35 deletions packages/backend/src/boot/master.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,22 @@
* SPDX-License-Identifier: AGPL-3.0-only
*/

import * as fs from 'node:fs';
import cluster from 'node:cluster';
import { fileURLToPath } from 'node:url';
import { dirname } from 'node:path';
import * as os from 'node:os';
import cluster from 'node:cluster';
import { dirname } from 'node:path';
import * as fs from 'node:fs';
import chalk from 'chalk';
import chalkTemplate from 'chalk-template';
import * as Sentry from '@sentry/node';
import { nodeProfilingIntegration } from '@sentry/profiling-node';
import Logger from '@/logger.js';
import { loadConfig } from '@/config.js';
import { WorkerArguments } from '@/boot/const.js';
import { sentryInit } from '@/boot/sentry.js';
import { computeWorkerArguments } from '@/boot/worker.js';
import type { Config } from '@/config.js';
import { showMachineInfo } from '@/misc/show-machine-info.js';
import { loadConfig } from '@/config.js';
import { envOption } from '@/env.js';
import { jobQueue, server } from './common.js';
import Logger from '@/logger.js';
import { showMachineInfo } from '@/misc/show-machine-info.js';
import { isHttpServerOnPrimary, jobQueue, server } from './common.js';

const _filename = fileURLToPath(import.meta.url);
const _dirname = dirname(_filename);
Expand Down Expand Up @@ -73,23 +74,7 @@ export async function masterMain() {

bootLogger.succ('Misskey initialized');

if (config.sentryForBackend) {
Sentry.init({
integrations: [
...(config.sentryForBackend.enableNodeProfiling ? [nodeProfilingIntegration()] : []),
],

// Performance Monitoring
tracesSampleRate: 1.0, // Capture 100% of the transactions

// Set sampling rate for profiling - this is relative to tracesSampleRate
profilesSampleRate: 1.0,

maxBreadcrumbs: 0,

...config.sentryForBackend.options,
});
}
sentryInit(config);

bootLogger.info(
`mode: [disableClustering: ${envOption.disableClustering}, onlyServer: ${envOption.onlyServer}, onlyQueue: ${envOption.onlyQueue}]`,
Expand All @@ -98,8 +83,8 @@ export async function masterMain() {
if (!envOption.disableClustering) {
// clusterモジュール有効時

if (envOption.onlyServer) {
// onlyServer かつ enableCluster な場合、メインプロセスはforkのみに制限する(listenしない)。
if (envOption.onlyServer || !isHttpServerOnPrimary(config)) {
// このブロックに入る場合はワーカープロセス側でのlistenが必要になると判断されているため、メインプロセスはforkのみに制限する(listenしない)。
// ワーカープロセス側でlistenすると、メインプロセスでポートへの着信を受け入れてワーカープロセスへの分配を行う動作をする。
// そのため、メインプロセスでも直接listenするとポートの競合が発生して起動に失敗してしまう。
// see: https://nodejs.org/api/cluster.html#cluster
Expand All @@ -109,7 +94,7 @@ export async function masterMain() {
await server();
}

await spawnWorkers(config.clusterLimit);
await spawnWorkers(config);
} else {
// clusterモジュール無効時

Expand Down Expand Up @@ -187,16 +172,20 @@ async function connectDb(): Promise<void> {
}
*/

async function spawnWorkers(limit = 1) {
const workers = Math.min(limit, os.cpus().length);
bootLogger.info(`Starting ${workers} worker${workers === 1 ? '' : 's'}...`);
await Promise.all([...Array(workers)].map(spawnWorker));
async function spawnWorkers(config: Config) {
const workerArgs = computeWorkerArguments(config, envOption);
bootLogger.info(`Starting ${workerArgs.length} worker${workerArgs.length === 1 ? '' : 's'}...`);

await Promise.all(
workerArgs.map(it => spawnWorker(it)),
);

bootLogger.succ('All workers started');
}

function spawnWorker(): Promise<void> {
function spawnWorker(env: WorkerArguments): Promise<void> {
return new Promise(res => {
const worker = cluster.fork();
const worker = cluster.fork(env);
worker.on('message', message => {
if (message === 'listenFailed') {
bootLogger.error('The server Listen failed due to the previous error.');
Expand Down
30 changes: 30 additions & 0 deletions packages/backend/src/boot/sentry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/

import * as Sentry from '@sentry/node';
import { nodeProfilingIntegration } from '@sentry/profiling-node';
import { Config } from '@/config.js';

export function sentryInit(config: Config) {
if (!config.sentryForBackend) {
return;
}

Sentry.init({
integrations: [
...(config.sentryForBackend.enableNodeProfiling ? [nodeProfilingIntegration()] : []),
],

// Performance Monitoring
tracesSampleRate: 1.0, // Capture 100% of the transactions

// Set sampling rate for profiling - this is relative to tracesSampleRate
profilesSampleRate: 1.0,

maxBreadcrumbs: 0,

...config.sentryForBackend.options,
});
}
Loading
Loading