Skip to content

Commit

Permalink
sync 2023-02-21 (#28)
Browse files Browse the repository at this point in the history
* sync 2023-02-21

* Clean up

* Remove unused functions
  • Loading branch information
stwiname authored Feb 20, 2023
1 parent d8cccd4 commit 02b0742
Show file tree
Hide file tree
Showing 22 changed files with 365 additions and 131 deletions.
1 change: 0 additions & 1 deletion .gitpod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,3 @@ image:

tasks:
- init: yarn install && yarn run build

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"@actions/core": "^1.6.0",
"@babel/preset-env": "^7.16.11",
"@octokit/request": "^5.6.3",
"@types/cron-converter": "^1",
"@types/node": "^14.18.10",
"@typescript-eslint/eslint-plugin": "^5.10.2",
"@typescript-eslint/parser": "^5.10.2",
Expand Down
2 changes: 2 additions & 0 deletions packages/common-algorand/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
"@subql/types-algorand": "workspace:*",
"class-transformer": "0.4.0",
"class-validator": "^0.13.2",
"fs-extra": "^10.1.0",
"ipfs-http-client": "^52.0.3",
"js-yaml": "^4.1.0",
"reflect-metadata": "^0.1.13",
"semver": "^7.3.7"
Expand Down
36 changes: 36 additions & 0 deletions packages/common-algorand/test/project_bypass.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
specVersion: 1.0.0
name: polkadot-starter
version: 0.0.1
runner:
node:
name: '@subql/node'
version: '*'
query:
name: '@subql/query'
version: '*'
description: >-
This project can be used as a starting point for developing your SubQuery
project
repository: 'https://github.com/subquery/subql-starter'
schema:
file: ./schema.graphql
network:
chainId: '0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3'
endpoint: 'wss://polkadot.api.onfinality.io/public-ws'
dictionary: 'https://api.subquery.network/sq/subquery/polkadot-dictionary'
bypassBlocks: [1, 2, 4, 5]
dataSources:
- kind: substrate/Runtime
startBlock: 1
mapping:
file: ./dist/index.js
handlers:
- handler: handleBlock
kind: substrate/BlockHandler
- handler: handleEvent
kind: substrate/EventHandler
filter:
module: balances
method: Deposit
- handler: handleCall
kind: substrate/CallHandler
36 changes: 36 additions & 0 deletions packages/common-algorand/test/project_bypass_range.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
specVersion: 1.0.0
name: polkadot-starter
version: 0.0.1
runner:
node:
name: '@subql/node'
version: '*'
query:
name: '@subql/query'
version: '*'
description: >-
This project can be used as a starting point for developing your SubQuery
project
repository: 'https://github.com/subquery/subql-starter'
schema:
file: ./schema.graphql
network:
chainId: '0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3'
endpoint: 'wss://polkadot.api.onfinality.io/public-ws'
dictionary: 'https://api.subquery.network/sq/subquery/polkadot-dictionary'
bypassBlocks: [1, 2, 4, 5, '10 - 40']
dataSources:
- kind: substrate/Runtime
startBlock: 1
mapping:
file: ./dist/index.js
handlers:
- handler: handleBlock
kind: substrate/BlockHandler
- handler: handleEvent
kind: substrate/EventHandler
filter:
module: balances
method: Deposit
- handler: handleCall
kind: substrate/CallHandler
5 changes: 3 additions & 2 deletions packages/node/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,11 @@ Options:
used
[string] [choices: "fatal", "error", "warn", "info", "debug", "trace",
"silent"]
--migrate Migrate db schema (for management tables only)
[boolean] [default: false]
--timestamp-field Enable/disable created_at and updated_at in schema
[boolean] [default: false]
-d, --network-dictionary Specify the dictionary api for this network [string]
--dictionary-timeout Max timeout for dictionary query [number]
--sponsored-dictionary Use subquery network sponsored dictionary [string]
-m, --mmr-path Local path of the merkle mountain range (.mmr) file
[string]
--proof-of-index Enable/disable proof of index
Expand All @@ -69,6 +68,8 @@ Options:
[boolean] [default: true]
-w, --workers Number of worker threads to use for fetching and
processing blocks. Disabled by default. [number]
--multi-chain Enables indexing multiple subquery projects into the
same database schema [boolean] [default: false]
```

## License
Expand Down
11 changes: 11 additions & 0 deletions packages/node/docker/cockroach-db/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
version: '3.5'

services:
crdb:
image: cockroachdb/cockroach:latest-v22.1
ports:
- '26257:26257'
- '8080:8080'
command: start-single-node --insecure
volumes:
- '${PWD}/cockroach-data/crdb:/cockroach/cockroach-data'
9 changes: 4 additions & 5 deletions packages/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
"@nestjs/event-emitter": "^1.3.0",
"@nestjs/platform-express": "^8.2.6",
"@nestjs/schedule": "^1.0.2",
"@polkadot/api": "9.4.2",
"@polkadot/api": "9.11.1",
"@subql/apollo-links": "^0.2.1-0",
"@subql/common": "1.5.0",
"@subql/common-algorand": "workspace:*",
"@subql/node-core": "^1.8.0",
"@subql/node-core": "1.8.1-4",
"@subql/types-algorand": "workspace:*",
"@subql/utils": "1.3.1",
"@subql/x-merkle-mountain-range": "2.0.0-0.1.2",
Expand All @@ -47,7 +47,7 @@
"reflect-metadata": "^0.1.13",
"rimraf": "^3.0.2",
"rxjs": "^7.5.2",
"sequelize": "6.23.0",
"sequelize": "^6.28.0",
"tar": "^6.1.11",
"typescript": "^4.4.4",
"vm2": "^3.9.9",
Expand All @@ -71,6 +71,5 @@
"files": [
"/dist",
"/bin"
],
"stableVersion": "1.18.0"
]
}
4 changes: 2 additions & 2 deletions packages/node/src/configure/configure.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ export class ConfigureModule {
useValue: config,
},
{
provide: SubqueryProject,
provide: 'ISubqueryProject',
useFactory: project,
},
],
exports: [NodeConfig, SubqueryProject],
exports: [NodeConfig, 'ISubqueryProject'],
};
}
static register(): DynamicModule {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,10 @@ export class BlockDispatcherService
);
// const blocks = await this.apiService.fetchBlocks(blockNums);

if (bufferedHeight > this._latestBufferedHeight) {
if (
bufferedHeight > this._latestBufferedHeight ||
this.queue.peek() < Math.min(...blockNums)
) {
logger.debug(`Queue was reset for new DS, discarding fetched blocks`);
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import path from 'path';
import { Injectable, OnApplicationShutdown } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { Interval } from '@nestjs/schedule';
import { hexToU8a, u8aEq } from '@polkadot/util';
import {
getLogger,
NodeConfig,
Expand All @@ -27,6 +26,7 @@ import {
NumFetchedBlocks,
NumFetchingBlocks,
GetWorkerStatus,
ReloadDynamicDs,
} from '../worker/worker';
import { BaseBlockDispatcher } from './base-block-dispatcher';

Expand All @@ -38,6 +38,7 @@ type IIndexerWorker = {
numFetchedBlocks: NumFetchedBlocks;
numFetchingBlocks: NumFetchingBlocks;
getStatus: GetWorkerStatus;
reloadDynamicDs: ReloadDynamicDs;
};

type IInitIndexerWorker = IIndexerWorker & {
Expand All @@ -58,6 +59,7 @@ async function createIndexerWorker(): Promise<IndexerWorker> {
'numFetchedBlocks',
'numFetchingBlocks',
'getStatus',
'reloadDynamicDs',
],
);

Expand Down Expand Up @@ -123,7 +125,11 @@ export class WorkerBlockDispatcherService
}

enqueueBlocks(heights: number[], latestBufferHeight?: number): void {
if (!heights.length) return;
if (!!latestBufferHeight && !heights.length) {
this.latestBufferedHeight = latestBufferHeight;
return;
}

logger.info(
`Enqueing blocks [${heights[0]}...${last(heights)}], total ${
heights.length
Expand Down Expand Up @@ -187,10 +193,6 @@ export class WorkerBlockDispatcherService
);
}

// logger.info(
// `worker ${workerIdx} processing block ${height}, fetched blocks: ${await worker.numFetchedBlocks()}, fetching blocks: ${await worker.numFetchingBlocks()}`,
// );

this.preProcessBlock(height);

const { dynamicDsCreated, operationHash, reindexBlockHeight } =
Expand All @@ -201,6 +203,11 @@ export class WorkerBlockDispatcherService
operationHash: Buffer.from(operationHash, 'base64'),
reindexBlockHeight,
});

if (dynamicDsCreated) {
// Ensure all workers are aware of all dynamic ds
await Promise.all(this.workers.map((w) => w.reloadDynamicDs()));
}
} catch (e) {
logger.error(
e,
Expand Down
45 changes: 29 additions & 16 deletions packages/node/src/indexer/dynamic-ds.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,24 +76,41 @@ export class DynamicDsService {
}

async getDynamicDatasources(): Promise<SubqlProjectDs[]> {
// Workers should not cache this result in order to keep in sync
if (!this._datasources) {
try {
const params = await this.getDynamicDatasourceParams();

this._datasources = await Promise.all(
params.map((params) => this.getDatasource(params)),
);
} catch (e) {
logger.error(e, `Unable to get dynamic datasources`);
process.exit(1);
}
this._datasources = await this.loadDynamicDatasources();
}

return this._datasources;
}

deleteTempDsRecords(blockHeight: number) {
delete this.tempDsRecords[TEMP_DS_PREFIX + blockHeight];
// This is used to sync between worker threads
async reloadDynamicDatasources(): Promise<void> {
this._datasources = await this.loadDynamicDatasources();
}

private async loadDynamicDatasources(): Promise<SubqlProjectDs[]> {
try {
const params = await this.getDynamicDatasourceParams();

const dataSources = await Promise.all(
params.map((params) => this.getDatasource(params)),
);

logger.info(`Loaded ${dataSources.length} dynamic datasources`);

return dataSources;
} catch (e) {
logger.error(`Unable to get dynamic datasources:\n${e.message}`);
process.exit(1);
}
}

deleteTempDsRecords(blockHeight: number): void {
// Main thread will not have tempDsRecords with workers
if (this.tempDsRecords) {
delete this.tempDsRecords[TEMP_DS_PREFIX + blockHeight];
}
}

private async getDynamicDatasourceParams(
Expand Down Expand Up @@ -154,10 +171,6 @@ export class DynamicDsService {
);
}

logger.info(
`Initialised dynamic datasource from template: "${params.templateName}"`,
);

const dsObj = {
...template,
startBlock: params.startBlock,
Expand Down
12 changes: 1 addition & 11 deletions packages/node/src/indexer/fetch.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,12 @@ import {
} from '@subql/node-core';
import { DictionaryQueryCondition } from '@subql/types';
import {
AlgorandBlock,
AlgorandBlockFilter,
DictionaryQueryEntry,
} from '@subql/types-algorand';
import { MetaData } from '@subql/utils';
import { Indexer } from 'algosdk';
import {
filter,
intersection,
last,
range,
sortBy,
uniqBy,
without,
} from 'lodash';
import { range, sortBy, uniqBy, without } from 'lodash';
import { SubqlProjectDs, SubqueryProject } from '../configure/SubqueryProject';
import { calcInterval } from '../utils/algorand';
import { isBaseHandler, isCustomHandler } from '../utils/project';
Expand Down Expand Up @@ -127,7 +118,6 @@ export class FetchService implements OnApplicationShutdown {
} else {
filterList = [handler.filter];
}

// Filter out any undefined
filterList = filterList.filter(Boolean);
if (!filterList.length) return [];
Expand Down
1 change: 0 additions & 1 deletion packages/node/src/indexer/indexer.manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// SPDX-License-Identifier: Apache-2.0

import { Inject, Injectable } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { hexToU8a, u8aEq } from '@polkadot/util';
import {
isRuntimeDs,
Expand Down
8 changes: 7 additions & 1 deletion packages/node/src/indexer/project.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ export class ProjectService {
get isHistorical(): boolean {
return this.storeService.historical;
}

// eslint-disable-next-line @typescript-eslint/require-await
get metadataName(): string {
return this.metadataRepo.tableName;
Expand Down Expand Up @@ -110,6 +111,7 @@ export class ProjectService {

this._startHeight = await this.getStartHeight();
} else {
this._schema = await this.getExistingProjectSchema();
this.metadataRepo = await MetadataFactory(
this.sequelize,
this.schema,
Expand All @@ -121,7 +123,6 @@ export class ProjectService {

await this.sequelize.sync();

this._schema = await this.getExistingProjectSchema();
assert(this._schema, 'Schema should be created in main thread');
await this.initDbSchema();

Expand Down Expand Up @@ -248,6 +249,11 @@ export class ProjectService {
await metadataRepo.upsert({ key: 'processedBlockCount', value: 0 });
}

// If project was created before this feature, don't add the key. If it is project created after, add this key.
if (!keyValue.processedBlockCount && !keyValue.lastProcessedHeight) {
await metadataRepo.upsert({ key: 'processedBlockCount', value: 0 });
}

if (keyValue.indexerNodeVersion !== packageVersion) {
await metadataRepo.upsert({
key: 'indexerNodeVersion',
Expand Down
Loading

0 comments on commit 02b0742

Please sign in to comment.