Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update helia and add job for pruning peer store #496

Merged
merged 2 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ipfs-service/example.env
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,7 @@ DHT_QUEUE_ATTEMPTS=10
# Backoff (exponential, fixed)
DHT_QUEUE_BACKOFF_TYPE=exponential
# Backoff delay (ms)
DHT_QUEUE_BACKOFF_DELAY=1000
DHT_QUEUE_BACKOFF_DELAY=1000

MAX_PEERS=5000
PRUNE_PEER_STORE_INTERVAL=0 0 */12 * * * # every 12h
10,696 changes: 4,493 additions & 6,203 deletions ipfs-service/package-lock.json

Large diffs are not rendered by default.

15 changes: 8 additions & 7 deletions ipfs-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,34 @@
"test:e2e": "NODE_OPTIONS=--experimental-vm-modules jest --config ./test/jest-e2e.json"
},
"dependencies": {
"@bull-board/nestjs": "^5.17.1",
"@types/morgan": "^1.9.7",
"@aws-sdk/client-s3": "^3.552.0",
"morgan": "^1.10.0",
"@bull-board/nestjs": "^5.17.1",
"@chainsafe/libp2p-gossipsub": "^13.0.0",
"@helia/ipns": "^7.2.0",
"@helia/unixfs": "^3.0.3",
"@nestjs/bullmq": "^10.1.1",
"@nestjs/common": "^10.0.0",
"@nestjs/config": "^3.2.2",
"@nestjs/core": "^10.0.0",
"@nestjs/platform-express": "^10.0.0",
"@nestjs/bullmq": "^10.1.1",
"@nestjs/schedule": "^4.1.1",
"@types/morgan": "^1.9.7",
"@types/multer": "^1.4.11",
"blockstore-fs": "^1.1.10",
"blockstore-s3": "^1.0.15",
"bull": "^4.12.3",
"bullmq": "^5.7.9",
"datastore-level": "^10.1.8",
"datastore-s3": "^11.1.11",
"helia": "^4.0.1",
"i": "^0.3.7",
"morgan": "^1.10.0",
"multer": "^1.4.5-lts.1",
"nest-winston": "^1.9.4",
"npm": "^10.5.2",
"reflect-metadata": "^0.2.1",
"rxjs": "^7.8.1",
"winston": "^3.11.0",
"bull": "^4.12.3",
"bullmq": "^5.7.9",
"nest-winston": "^1.9.4",
"winston-daily-rotate-file": "^5.0.0"
},
"devDependencies": {
Expand Down
37 changes: 32 additions & 5 deletions ipfs-service/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,43 @@ import { BullModule } from '@nestjs/bullmq';
import { BullmqModule } from './bullmq/bullmq.module.js';
import { ProvideToDHTProcessor } from './queues/processors/provide-to-dht.processor.js';
import { ProvideToDHTProducer } from './queues/producers/provide-to-dht.producer.js';
import { QUEUE_NAME_PROVIDE_TO_DHT } from './constants/bullmq.constants.js';
import { QUEUE_NAME_PROVIDE_TO_DHT, QUEUE_NAME_PRUNE_PEER_STORE } from './constants/bullmq.constants.js';
import { PrunePeerStoreProducer } from './queues/producers/prune-peer-store.producer.js';
import { PrunePeerStoreProcessor } from './queues/processors/prune-peer-store.processor.js';
import { PrunePeerStoreJob } from './jobs/impl/prune-peer-store.job.js';
import { Scheduler } from './jobs/scheduler.js';
import { SchedulerRegistry } from '@nestjs/schedule';

@Module({
imports: [
ConfigModule.forRoot({ isGlobal: true }),
BullmqModule,
BullModule.registerQueue({
name: QUEUE_NAME_PROVIDE_TO_DHT,
}),],
name: QUEUE_NAME_PROVIDE_TO_DHT,
}),
BullModule.registerQueue({
name: QUEUE_NAME_PRUNE_PEER_STORE,
}),
],
controllers: [AppController],
providers: [AppService, ProvideToDHTProcessor, ProvideToDHTProducer],
providers: [
AppService,
ProvideToDHTProcessor,
ProvideToDHTProducer,
PrunePeerStoreProducer,
PrunePeerStoreProcessor,
SchedulerRegistry,
Scheduler,
PrunePeerStoreJob,
],
})
export class AppModule {}
export class AppModule {
constructor(
private readonly scheduler: Scheduler,
private readonly prunePeerStoreJob: PrunePeerStoreJob,
) {}

onModuleInit() {
this.scheduler.registerJob(this.prunePeerStoreJob);
}
}
60 changes: 40 additions & 20 deletions ipfs-service/src/app.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import { IpfsDto } from './dto/ipfs.dto.js';
import { PeerId } from '@libp2p/interface';
import { config } from 'dotenv';
import { ProvideToDHTProducer } from './queues/producers/provide-to-dht.producer.js';
import { PrunePeerStoreProducer } from './queues/producers/prune-peer-store.producer.js';
config();

const libp2pOptions = {
Expand Down Expand Up @@ -91,10 +92,7 @@ const libp2pOptions = {
identify: identify(),
keychain: keychain(),
ping: ping(),
relay: circuitRelayServer({
advertise: true,
hopTimeout: 60000,
}),
relay: circuitRelayServer(),
upnp: uPnPNAT(),
},
};
Expand All @@ -107,7 +105,10 @@ export class AppService implements OnModuleInit {
private ipnsPeerId: PeerId;
private logger = new Logger(AppService.name);

constructor(private readonly provideToDHTProducer: ProvideToDHTProducer) {}
constructor(
private readonly provideToDHTProducer: ProvideToDHTProducer,
private readonly prunePeerStoreProducer: PrunePeerStoreProducer,
) {}

async onModuleInit() {
console.log(`Initialization helia...`);
Expand Down Expand Up @@ -222,27 +223,27 @@ export class AppService implements OnModuleInit {

async getDocByCid(cidString: string): Promise<IpfsDto> {
try {
this.fs = unixfs(this.helia);
const decoder = new TextDecoder();
const cid = CID.parse(cidString);

let text = '';
for await (const chunk of this.fs.cat(cid)) {
text += decoder.decode(chunk, {
stream: true,
});
this.fs = unixfs(this.helia);
const decoder = new TextDecoder();
const cid = CID.parse(cidString);

let text = '';
for await (const chunk of this.fs.cat(cid)) {
text += decoder.decode(chunk, {
stream: true,
});
}
return IpfsMapper.ipfsToIpfsDto(cidString, text);
} catch (error) {
this.logger.error(`Failed to get doc by CID - ${cidString} error: ${error}`);
return null;
}
return IpfsMapper.ipfsToIpfsDto(cidString, text);
} catch (error) {
this.logger.error(`Failed to get doc by CID - ${cidString} error: ${error}`);
return null;
}
}

async provideCidtoDHTViaQueue(cid: CID) {
await this.helia.libp2p.contentRouting.provide(cid);
this.logger.log(`Announced CID to the DHT: ${cid.toString()}`);
}
}

async getIpnsUrl(): Promise<string> {
if (!this.ipnsPeerId) {
Expand All @@ -251,4 +252,23 @@ export class AppService implements OnModuleInit {
const ipnsUrl = process.env.IPNS_PUBLIC_URL + this.ipnsPeerId.toString()
return ipnsUrl;
}

async addPrunePeerStoreJob(): Promise<void> {
await this.prunePeerStoreProducer.addToQueue(process.env.MAX_PEERS);
}

async prunePeerStore(maxPeers: number): Promise<void> {
let removedPeers = 0;
const peers = await this.helia.libp2p.peerStore.all();
this.logger.debug(`Total number of stored peers: ${peers.length}`);

if (peers.length > maxPeers) {
const excessPeers = peers.slice(0, peers.length - maxPeers);
for (const peer of excessPeers) {
await this.helia.libp2p.peerStore.delete(peer.id);
removedPeers++;
}
}
this.logger.debug(`Removed peers: ${removedPeers}`);
}
}
6 changes: 5 additions & 1 deletion ipfs-service/src/bullmq/bullmq.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { BullBoardModule } from "@bull-board/nestjs";
import { BullModule } from "@nestjs/bullmq";
import { Module } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { QUEUE_NAME_PROVIDE_TO_DHT } from "../constants/bullmq.constants.js";
import { QUEUE_NAME_PROVIDE_TO_DHT, QUEUE_NAME_PRUNE_PEER_STORE } from "../constants/bullmq.constants.js";
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter.js";
import { ExpressAdapter } from "@bull-board/express";

Expand Down Expand Up @@ -32,6 +32,10 @@ import { ExpressAdapter } from "@bull-board/express";
name: QUEUE_NAME_PROVIDE_TO_DHT,
adapter: BullMQAdapter,
}),
BullBoardModule.forFeature({
name: QUEUE_NAME_PRUNE_PEER_STORE,
adapter: BullMQAdapter,
}),
],
})
export class BullmqModule {}
3 changes: 3 additions & 0 deletions ipfs-service/src/constants/bullmq.constants.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
export const QUEUE_NAME_PROVIDE_TO_DHT = "provide_to_dht_queue";
export const JOB_NAME_PROVIDE_TO_DHT = "provide_to_dht_job";

export const QUEUE_NAME_PRUNE_PEER_STORE = "prune_peer_store_queue";
export const JOB_NAME_PRUNE_PEER_STORE = "prune_peer_store_job";
5 changes: 5 additions & 0 deletions ipfs-service/src/jobs/i-job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export interface IJob {
getJobName(): string;
getInterval(): string;
execute(): void;
}
24 changes: 24 additions & 0 deletions ipfs-service/src/jobs/impl/prune-peer-store.job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { ConfigService } from '@nestjs/config';
import { IJob } from '../i-job';
import { Injectable } from '@nestjs/common';
import { JOB_NAME_PRUNE_PEER_STORE } from '../../constants/bullmq.constants.js';
import { AppService } from '../../app.service.js';

@Injectable()
export class PrunePeerStoreJob implements IJob {
constructor(
private readonly configService: ConfigService,
private readonly appService: AppService,
) {}
getJobName(): string {
return JOB_NAME_PRUNE_PEER_STORE;
}
getInterval(): string {
return (
this.configService.get<string>('PRUNE_PEER_STORE_INTERVAL') || '0 0 * * * *'
);
}
execute(): void {
this.appService.addPrunePeerStoreJob();
}
}
35 changes: 35 additions & 0 deletions ipfs-service/src/jobs/scheduler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { SchedulerRegistry } from '@nestjs/schedule';
import { IJob } from './i-job';
import { Injectable, Logger } from '@nestjs/common';
import { CronJob } from 'cron';

@Injectable()
export class Scheduler {
private readonly logger = new Logger(Scheduler.name);

constructor(private readonly schedulerRegistry: SchedulerRegistry) {}

registerJob(job: IJob) {
const jobName = job.getJobName();
const interval = job.getInterval();

const cronJob = new CronJob(interval, async () => {
try {
this.logger.log(`Executing job ${jobName}`);

job.execute();
} catch (error) {
this.logger.error(
`Error executing job ${jobName}: ${error.message}`,
error.stack,
);
}
});

this.schedulerRegistry.addCronJob(jobName, cronJob);
cronJob.start();
this.logger.log(
`Registered and started job '${jobName}' with frequency '${interval}'`,
);
}
}
39 changes: 39 additions & 0 deletions ipfs-service/src/queues/processors/prune-peer-store.processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
import {
JOB_NAME_PRUNE_PEER_STORE,
QUEUE_NAME_PRUNE_PEER_STORE,
} from '../../constants/bullmq.constants.js';
import { Logger } from '@nestjs/common';
import { AppService } from '../../app.service.js';

@Processor(QUEUE_NAME_PRUNE_PEER_STORE)
export class PrunePeerStoreProcessor extends WorkerHost {
protected readonly logger = new Logger(PrunePeerStoreProcessor.name);
constructor(private readonly appService: AppService) {
super();
}

async process(job: Job<any>): Promise<any> {
switch (job.name) {
case JOB_NAME_PRUNE_PEER_STORE: {
const maxPeers = Number(job.data);
this.logger.debug('Job triggered: Prune Peer Store');
try {
await this.appService.prunePeerStore(maxPeers);
} catch (error) {
this.logger.error(`Error processing job id: ${job.id}, name: ${job.name}. - Error: ${error}`);
throw error;
}
}
}
}

@OnWorkerEvent('completed')
onCompleted(job: Job) {
const { id, name, queueName, finishedOn, returnvalue } = job;
const completionTime = finishedOn ? new Date(finishedOn).toISOString() : '';
this.logger.log(
`Job Finished - id: ${id}, name: ${name} in queue ${queueName} on ${completionTime}.`,);
}
}
27 changes: 27 additions & 0 deletions ipfs-service/src/queues/producers/prune-peer-store.producer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { InjectQueue } from '@nestjs/bullmq';
import { Injectable } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Queue } from 'bullmq';
import {
JOB_NAME_PRUNE_PEER_STORE,
QUEUE_NAME_PRUNE_PEER_STORE,
} from '../../constants/bullmq.constants.js';
import { randomUUID } from 'crypto';

@Injectable()
export class PrunePeerStoreProducer {
constructor(
@InjectQueue(QUEUE_NAME_PRUNE_PEER_STORE) private readonly prunePeerStoreQueue: Queue,
) {}

async addToQueue(inputData: string) {
const job = await this.prunePeerStoreQueue.add(JOB_NAME_PRUNE_PEER_STORE, inputData, {
jobId: randomUUID(),
removeOnComplete: true,
removeOnFail: false,
attempts: 3,
backoff: { type: 'fixed', delay: 300000 }, // 300 seconds
});
return job;
}
}
Loading