Skip to content

Commit

Permalink
draft: multi chain rewind
Browse files Browse the repository at this point in the history
  • Loading branch information
yoozo committed Dec 12, 2024
1 parent d3c2a58 commit 53d8065
Show file tree
Hide file tree
Showing 10 changed files with 291 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
protected async rewind(lastCorrectHeader: Header): Promise<void> {
if (lastCorrectHeader.blockHeight <= this.currentProcessingHeight) {
logger.info(`Found last verified block at height ${lastCorrectHeader.blockHeight}, rewinding...`);
await this.projectService.reindex(lastCorrectHeader);
await this.projectService.reindex(lastCorrectHeader, true);
this.setLatestProcessedHeight(lastCorrectHeader.blockHeight);
logger.info(`Successful rewind to block ${lastCorrectHeader.blockHeight}!`);
}
Expand Down
44 changes: 44 additions & 0 deletions packages/node-core/src/indexer/entities/GlobalData.entity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import {blake2AsHex} from '@subql/utils';
import {BuildOptions, DataTypes, Model, Sequelize} from '@subql/x-sequelize';

type RewindTimestampKey = `rewindTimestamp_${string}`;
export interface GlobalDataKeys {
rewindLock: number;
[key: RewindTimestampKey]: number;
}

export interface GlobalData {
key: keyof GlobalDataKeys;
value: GlobalDataKeys[keyof GlobalDataKeys];
}

interface GlobalDataEntity extends Model<GlobalData>, GlobalData {}

export type GlobalDataRepo = typeof Model & {
new (values?: unknown, options?: BuildOptions): GlobalDataEntity;
};

export function GlobalDataFactory(sequelize: Sequelize, schema: string): GlobalDataRepo {
const tableName = '_global';

return <GlobalDataRepo>sequelize.define(
tableName,
{
key: {
type: DataTypes.STRING,
primaryKey: true,
},
value: {
type: DataTypes.JSONB,
},
},
{freezeTableName: true, schema: schema}
);
}

export function generateRewindTimestampKey(chainId: string): RewindTimestampKey {
return `rewindTimestamp_${blake2AsHex(chainId)})`.substring(0, 63) as RewindTimestampKey;
}
1 change: 1 addition & 0 deletions packages/node-core/src/indexer/entities/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@

export * from './Poi.entity';
export * from './Metadata.entity';
export * from './GlobalData.entity';
15 changes: 15 additions & 0 deletions packages/node-core/src/indexer/fetch.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,21 @@ export abstract class BaseFetchService<DS extends BaseDataSource, B extends IBlo
// Update the target height, this happens here to stay in sync with the rest of indexing
void this.storeModelProvider.metadata.set('targetHeight', latestHeight);

// If we're rewinding, we should wait until it's done
const {needRewind, needWaitRewind} = await this.projectService.getRewindStatus();
if (needRewind || needWaitRewind) {
logger.info(`Fetch service is waiting for rewind to finish`);
if (needRewind) {
// TODO Retrieve the block headers that require rollback.
const rewindBlockHeader: Header = {} as any;

this.blockDispatcher.flushQueue(rewindBlockHeader.blockHeight);
await this.projectService.reindex(rewindBlockHeader, false);
}
await delay(3);
continue;
}

// This could be latestBestHeight, dictionary should never include finalized blocks
// TODO add buffer so dictionary not used when project synced
if (startBlockHeight < this.latestBestHeight - scaledBatchSize) {
Expand Down
88 changes: 51 additions & 37 deletions packages/node-core/src/indexer/project.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,26 @@
// SPDX-License-Identifier: GPL-3.0

import assert from 'assert';
import { isMainThread } from 'worker_threads';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { BaseDataSource, IProjectNetworkConfig } from '@subql/types-core';
import { Sequelize } from '@subql/x-sequelize';
import { IApi } from '../api.service';
import { IProjectUpgradeService, NodeConfig } from '../configure';
import { IndexerEvent } from '../events';
import { getLogger } from '../logger';
import { exitWithError, monitorWrite } from '../process';
import { getExistingProjectSchema, getStartHeight, hasValue, initDbSchema, mainThreadOnly, reindex } from '../utils';
import { BlockHeightMap } from '../utils/blockHeightMap';
import { BaseDsProcessorService } from './ds-processor.service';
import { DynamicDsService } from './dynamic-ds.service';
import { MetadataKeys } from './entities';
import { PoiSyncService } from './poi';
import { PoiService } from './poi/poi.service';
import { StoreService } from './store.service';
import { cacheProviderFlushData } from './storeModelProvider';
import { ISubqueryProject, IProjectService, BypassBlocks, HistoricalMode, Header } from './types';
import { IUnfinalizedBlocksService } from './unfinalizedBlocks.service';
import {isMainThread} from 'worker_threads';
import {EventEmitter2} from '@nestjs/event-emitter';
import {BaseDataSource, IProjectNetworkConfig} from '@subql/types-core';
import {Sequelize} from '@subql/x-sequelize';
import {IApi} from '../api.service';
import {IProjectUpgradeService, NodeConfig} from '../configure';
import {IndexerEvent} from '../events';
import {getLogger} from '../logger';
import {exitWithError, monitorWrite} from '../process';
import {getExistingProjectSchema, getStartHeight, hasValue, initDbSchema, mainThreadOnly, reindex} from '../utils';
import {BlockHeightMap} from '../utils/blockHeightMap';
import {BaseDsProcessorService} from './ds-processor.service';
import {DynamicDsService} from './dynamic-ds.service';
import {MetadataKeys} from './entities';
import {PoiSyncService} from './poi';
import {PoiService} from './poi/poi.service';
import {StoreService} from './store.service';
import {cacheProviderFlushData} from './storeModelProvider';
import {ISubqueryProject, IProjectService, BypassBlocks, HistoricalMode, Header} from './types';
import {IUnfinalizedBlocksService} from './unfinalizedBlocks.service';

const logger = getLogger('Project');

Expand All @@ -35,7 +35,8 @@ export abstract class BaseProjectService<
API extends IApi,
DS extends BaseDataSource,
UnfinalizedBlocksService extends IUnfinalizedBlocksService<any> = IUnfinalizedBlocksService<any>,
> implements IProjectService<DS> {
> implements IProjectService<DS>
{
private _schema?: string;
private _startHeight?: number;
private _blockOffset?: number;
Expand Down Expand Up @@ -218,16 +219,16 @@ export abstract class BaseProjectService<

const existing = await metadata.findMany(keys);

const { chain, genesisHash, specName } = this.apiService.networkMeta;
const {chain, genesisHash, specName} = this.apiService.networkMeta;

if (this.project.runner) {
const { node, query } = this.project.runner;
const {node, query} = this.project.runner;

await metadata.setBulk([
{ key: 'runnerNode', value: node.name },
{ key: 'runnerNodeVersion', value: node.version },
{ key: 'runnerQuery', value: query.name },
{ key: 'runnerQueryVersion', value: query.version },
{key: 'runnerNode', value: node.name},
{key: 'runnerNodeVersion', value: node.version},
{key: 'runnerQuery', value: query.name},
{key: 'runnerQueryVersion', value: query.version},
]);
}
if (!existing.genesisHash) {
Expand Down Expand Up @@ -340,7 +341,7 @@ export abstract class BaseProjectService<
const nextProject = projects[i + 1][1];
nextMinStartHeight = Math.max(
nextProject.dataSources
.filter((ds): ds is DS & { startBlock: number } => !!ds.startBlock)
.filter((ds): ds is DS & {startBlock: number} => !!ds.startBlock)
.sort((a, b) => a.startBlock - b.startBlock)[0].startBlock,
projects[i + 1][0]
);
Expand All @@ -355,12 +356,12 @@ export abstract class BaseProjectService<
}[] = [];

[...project.dataSources, ...dynamicDs]
.filter((ds): ds is DS & { startBlock: number } => {
.filter((ds): ds is DS & {startBlock: number} => {
return !!ds.startBlock && (!nextMinStartHeight || nextMinStartHeight > ds.startBlock);
})
.forEach((ds) => {
events.push({ block: Math.max(height, ds.startBlock), start: true, ds });
if (ds.endBlock) events.push({ block: ds.endBlock + 1, start: false, ds });
events.push({block: Math.max(height, ds.startBlock), start: true, ds});
if (ds.endBlock) events.push({block: ds.endBlock + 1, start: false, ds});
});

// sort events by block in ascending order, start events come before end events
Expand Down Expand Up @@ -432,10 +433,13 @@ export abstract class BaseProjectService<

const timestamp = await this.getBlockTimestamp(upgradePoint);
// Only timestamp and blockHeight are used with reindexing so its safe to convert to a header
await this.reindex({
blockHeight: upgradePoint,
timestamp,
} as Header);
await this.reindex(
{
blockHeight: upgradePoint,
timestamp,
} as Header,
true
);
return upgradePoint + 1;
}
}
Expand All @@ -454,7 +458,7 @@ export abstract class BaseProjectService<
await this.onProjectChange(this.project);
}

async reindex(targetBlockHeader: Header): Promise<void> {
async reindex(targetBlockHeader: Header, flushGlobalLock: boolean): Promise<void> {
const [height, timestamp] = await Promise.all([
this.getLastProcessedHeight(),
this.storeService.modelProvider.metadata.find('lastProcessedBlockTimestamp'),
Expand All @@ -464,10 +468,13 @@ export abstract class BaseProjectService<
throw new Error('Cannot reindex with missing lastProcessedHeight');
}

if (flushGlobalLock && targetBlockHeader.timestamp) {
await this.storeService.setGlobalRewindLock(targetBlockHeader.timestamp.getTime());
}
return reindex(
this.getStartBlockFromDataSources(),
targetBlockHeader,
{ height, timestamp },
{height, timestamp},
this.storeService,
this.unfinalizedBlockService,
this.dynamicDsService,
Expand All @@ -477,4 +484,11 @@ export abstract class BaseProjectService<
/* Not providing force clean service, it should never be needed */
);
}

// `needRewind` indicates that the current chain requires a rewind,
// while `needWaitRewind` suggests that there are chains (including the current chain) in the project that have pending rewinds.
async getRewindStatus(): Promise<{needRewind: boolean; needWaitRewind: boolean}> {
const globalRewindStatus = await this.storeService.getGlobalRewindStatus();
return {needRewind: !!globalRewindStatus.rewindTimestamp, needWaitRewind: !!globalRewindStatus.rewindLock};
}
}
Loading

0 comments on commit 53d8065

Please sign in to comment.