Skip to content

Commit

Permalink
Historical timestamp and db store (#141)
Browse files Browse the repository at this point in the history
* db store support

* historical by timestamp

* historical by timestamp

* fixed IStoreModelProvider inject

* change log

* changelog

* update depend

* update node-core

* fix test

* fix test

* some change
  • Loading branch information
yoozo authored Nov 26, 2024
1 parent 9ca745f commit 4e3da67
Show file tree
Hide file tree
Showing 18 changed files with 246 additions and 262 deletions.
2 changes: 2 additions & 0 deletions packages/common-algorand/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Changed
- Bump `@subql/common` dependency

## [4.2.4] - 2024-10-23
### Changed
Expand Down
2 changes: 1 addition & 1 deletion packages/common-algorand/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"main": "dist/index.js",
"license": "GPL-3.0",
"dependencies": {
"@subql/common": "^5.1.4",
"@subql/common": "^5.2.1",
"@subql/types-algorand": "workspace:*"
},
"peerDependencies": {
Expand Down
7 changes: 7 additions & 0 deletions packages/node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added
- Support for historical indexing by timestamp as well as block height
- Add an `--enable-cache` flag, allowing you to choose between DB or cache for IO operations.

### Changed
- Modify the injection logic of apiService

## [3.14.4] - 2024-10-23
### Changed
- Bump `@subql/common` dependency (#139)
Expand Down
4 changes: 2 additions & 2 deletions packages/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
"@nestjs/event-emitter": "^2.0.0",
"@nestjs/platform-express": "^9.4.0",
"@nestjs/schedule": "^3.0.1",
"@subql/common": "^5.1.4",
"@subql/common": "^5.2.1",
"@subql/common-algorand": "workspace:*",
"@subql/node-core": "^14.1.6",
"@subql/node-core": "^15.0.3",
"@subql/types-algorand": "workspace:*",
"algosdk": "^2.8.0",
"lodash": "^4.17.21",
Expand Down
10 changes: 7 additions & 3 deletions packages/node/src/algorand/algorand.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: GPL-3.0

import { INestApplication } from '@nestjs/common';
import { EventEmitterModule } from '@nestjs/event-emitter';
import { EventEmitter2, EventEmitterModule } from '@nestjs/event-emitter';
import { Test } from '@nestjs/testing';
import {
ApiService,
Expand Down Expand Up @@ -49,15 +49,19 @@ export const prepareApiService = async (
NodeConfig,
ConnectionPoolStateManager,
ConnectionPoolService,
AlgorandApiService,
{
provide: AlgorandApiService,
useFactory: AlgorandApiService.init,
inject: ['ISubqueryProject', ConnectionPoolService, EventEmitter2],
},
],
imports: [EventEmitterModule.forRoot()],
}).compile();

const app = module.createNestApplication();
await app.init();
const apiService = app.get(AlgorandApiService);
await apiService.init();

return [app, apiService];
};

Expand Down
25 changes: 20 additions & 5 deletions packages/node/src/algorand/api.service.algorand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,39 @@ export class AlgorandApiService extends ApiService<
SafeAPIService,
IBlock<BlockContent>[]
> {
constructor(
private constructor(
@Inject('ISubqueryProject') private project: SubqueryProject,
connectionPoolService: ConnectionPoolService<AlgorandApiConnection>,
eventEmitter: EventEmitter2,
) {
super(connectionPoolService, eventEmitter);
}

async init(): Promise<AlgorandApiService> {
static async init(
project: SubqueryProject,
connectionPoolService: ConnectionPoolService<AlgorandApiConnection>,
eventEmitter: EventEmitter2,
): Promise<AlgorandApiService> {
const apiService = new AlgorandApiService(
project,
connectionPoolService,
eventEmitter,
);
try {
await this.createConnections(this.project.network, (endpoint, config) =>
AlgorandApiConnection.create(endpoint, config, this.fetchBlockBatches),
await apiService.createConnections(
apiService.project.network,
(endpoint, config) =>
AlgorandApiConnection.create(
endpoint,
config,
apiService.fetchBlockBatches,
),
);
} catch (e) {
exitWithError(new Error(`Failed to init api`, { cause: e }), logger);
}

return this;
return apiService;
}

get api(): AlgorandApi {
Expand Down
10 changes: 7 additions & 3 deletions packages/node/src/indexer/api.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: GPL-3.0

import { INestApplication } from '@nestjs/common';
import { EventEmitterModule } from '@nestjs/event-emitter';
import { EventEmitter2, EventEmitterModule } from '@nestjs/event-emitter';
import { Test } from '@nestjs/testing';
import {
ConnectionPoolService,
Expand Down Expand Up @@ -50,15 +50,19 @@ describe('ApiService', () => {
ConnectionPoolStateManager,
ConnectionPoolService,
NodeConfig,
AlgorandApiService,
{
provide: AlgorandApiService,
useFactory: AlgorandApiService.init,
inject: ['ISubqueryProject', ConnectionPoolService, EventEmitter2],
},
],
imports: [EventEmitterModule.forRoot()],
}).compile();

app = module.createNestApplication();
await app.init();
const apiService = app.get(AlgorandApiService);
await apiService.init();

return apiService;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import {
NodeConfig,
StoreCacheService,
IStoreModelProvider,
StoreService,
IProjectService,
PoiSyncService,
Expand Down Expand Up @@ -37,7 +37,7 @@ export class BlockDispatcherService
@Inject('IProjectUpgradeService')
projectUpgradeService: IProjectUpgradeService,
storeService: StoreService,
storeCacheService: StoreCacheService,
@Inject('IStoreModelProvider') storeModelProvider: IStoreModelProvider,
poiSyncService: PoiSyncService,
@Inject('ISubqueryProject') project: SubqueryProject,
) {
Expand All @@ -47,7 +47,7 @@ export class BlockDispatcherService
projectService,
projectUpgradeService,
storeService,
storeCacheService,
storeModelProvider,
poiSyncService,
project,
apiService.fetchBlocks.bind(apiService),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import {
NodeConfig,
StoreService,
PoiSyncService,
StoreCacheService,
IProjectService,
WorkerBlockDispatcher,
ConnectionPoolStateManager,
IProjectUpgradeService,
InMemoryCacheService,
createIndexerWorker,
MonitorServiceInterface,
IStoreModelProvider,
} from '@subql/node-core';
import { AlgorandBlock, AlgorandDataSource } from '@subql/types-algorand';
import { AlgorandApiConnection } from '../../algorand';
Expand All @@ -24,6 +24,7 @@ import { DynamicDsService } from '../dynamic-ds.service';
import { BlockContent } from '../types';
import { UnfinalizedBlocksService } from '../unfinalizedBlocks.service';
import { IIndexerWorker } from '../worker/worker';
import { FetchBlockResponse } from '../worker/worker.service';

type IndexerWorker = IIndexerWorker & {
terminate: () => Promise<number>;
Expand All @@ -47,7 +48,7 @@ export class WorkerBlockDispatcherService
projectUpgadeService: IProjectUpgradeService,
cacheService: InMemoryCacheService,
storeService: StoreService,
storeCacheService: StoreCacheService,
@Inject('IStoreModelProvider') storeModelProvider: IStoreModelProvider,
poiSyncService: PoiSyncService,
@Inject('ISubqueryProject') project: SubqueryProject,
dynamicDsService: DynamicDsService,
Expand All @@ -61,7 +62,7 @@ export class WorkerBlockDispatcherService
projectService,
projectUpgadeService,
storeService,
storeCacheService,
storeModelProvider,
poiSyncService,
project,
() =>
Expand Down Expand Up @@ -89,7 +90,7 @@ export class WorkerBlockDispatcherService
protected async fetchBlock(
worker: IndexerWorker,
height: number,
): Promise<void> {
await worker.fetchBlock(height, 0 /* Unused with algorand */);
): Promise<FetchBlockResponse> {
return worker.fetchBlock(height, 0 /* Unused with algorand */);
}
}
24 changes: 6 additions & 18 deletions packages/node/src/indexer/fetch.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import {
PoiSyncService,
NodeConfig,
ConnectionPoolService,
StoreCacheService,
ConnectionPoolStateManager,
IProjectUpgradeService,
InMemoryCacheService,
MonitorService,
CoreModule,
IStoreModelProvider,
} from '@subql/node-core';
import { AlgorandApiConnection, AlgorandApiService } from '../algorand';
import { SubqueryProject } from '../configure/SubqueryProject';
Expand All @@ -35,19 +35,7 @@ import { UnfinalizedBlocksService } from './unfinalizedBlocks.service';
UnfinalizedBlocksService,
{
provide: AlgorandApiService,
useFactory: async (
project: SubqueryProject,
connectionPoolService: ConnectionPoolService<AlgorandApiConnection>,
eventEmitter: EventEmitter2,
) => {
const apiService = new AlgorandApiService(
project,
connectionPoolService,
eventEmitter,
);
await apiService.init();
return apiService;
},
useFactory: AlgorandApiService.init,
inject: ['ISubqueryProject', ConnectionPoolService, EventEmitter2],
},
IndexerManager,
Expand All @@ -62,7 +50,7 @@ import { UnfinalizedBlocksService } from './unfinalizedBlocks.service';
indexerManager: IndexerManager,
cacheService: InMemoryCacheService,
storeService: StoreService,
storeCacheService: StoreCacheService,
storeModelProvider: IStoreModelProvider,
poiSyncService: PoiSyncService,
project: SubqueryProject,
dynamicDsService: DynamicDsService,
Expand All @@ -78,7 +66,7 @@ import { UnfinalizedBlocksService } from './unfinalizedBlocks.service';
projectUpgradeService,
cacheService,
storeService,
storeCacheService,
storeModelProvider,
poiSyncService,
project,
dynamicDsService,
Expand All @@ -94,7 +82,7 @@ import { UnfinalizedBlocksService } from './unfinalizedBlocks.service';
projectService,
projectUpgradeService,
storeService,
storeCacheService,
storeModelProvider,
poiSyncService,
project,
),
Expand All @@ -107,7 +95,7 @@ import { UnfinalizedBlocksService } from './unfinalizedBlocks.service';
IndexerManager,
InMemoryCacheService,
StoreService,
StoreCacheService,
'IStoreModelProvider',
PoiSyncService,
'ISubqueryProject',
DynamicDsService,
Expand Down
6 changes: 3 additions & 3 deletions packages/node/src/indexer/fetch.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
BaseFetchService,
getModulos,
Header,
StoreCacheService,
IStoreModelProvider,
} from '@subql/node-core';
import { AlgorandBlock } from '@subql/types-algorand';
import {
Expand Down Expand Up @@ -48,7 +48,7 @@ export class FetchService extends BaseFetchService<
unfinalizedBlocksService: UnfinalizedBlocksService,
eventEmitter: EventEmitter2,
schedulerRegistry: SchedulerRegistry,
storeCacheService: StoreCacheService,
@Inject('IStoreModelProvider') storeModelProvider: IStoreModelProvider,
) {
super(
nodeConfig,
Expand All @@ -58,7 +58,7 @@ export class FetchService extends BaseFetchService<
eventEmitter,
schedulerRegistry,
unfinalizedBlocksService,
storeCacheService,
storeModelProvider,
);
}

Expand Down
10 changes: 5 additions & 5 deletions packages/node/src/indexer/unfinalizedBlocks.service.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import { Injectable } from '@nestjs/common';
import { Inject, Injectable } from '@nestjs/common';
import {
BaseUnfinalizedBlocksService,
Header,
NodeConfig,
StoreCacheService,
IStoreModelProvider,
mainThreadOnly,
} from '@subql/node-core';
import { AlgorandApiService, algorandBlockToHeader } from '../algorand';
Expand All @@ -17,9 +17,9 @@ export class UnfinalizedBlocksService extends BaseUnfinalizedBlocksService<Block
constructor(
private readonly apiService: AlgorandApiService,
nodeConfig: NodeConfig,
storeCache: StoreCacheService,
@Inject('IStoreModelProvider') storeModelProvider: IStoreModelProvider,
) {
super(nodeConfig, storeCache);
super(nodeConfig, storeModelProvider);
}

@mainThreadOnly()
Expand All @@ -35,7 +35,7 @@ export class UnfinalizedBlocksService extends BaseUnfinalizedBlocksService<Block
}

@mainThreadOnly()
protected async getHeaderForHeight(height: number): Promise<Header> {
async getHeaderForHeight(height: number): Promise<Header> {
return algorandBlockToHeader(
await this.apiService.api.getHeaderOnly(height),
);
Expand Down
14 changes: 1 addition & 13 deletions packages/node/src/indexer/worker/worker-fetch.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,7 @@ import { WorkerService } from './worker.service';
IndexerManager,
{
provide: AlgorandApiService,
useFactory: async (
project: SubqueryProject,
connectionPoolService: ConnectionPoolService<AlgorandApiConnection>,
eventEmitter: EventEmitter2,
) => {
const apiService = new AlgorandApiService(
project,
connectionPoolService,
eventEmitter,
);
await apiService.init();
return apiService;
},
useFactory: AlgorandApiService.init,
inject: ['ISubqueryProject', ConnectionPoolService, EventEmitter2],
},
DsProcessorService,
Expand Down
Loading

0 comments on commit 4e3da67

Please sign in to comment.