diff --git a/src/components/Indexer/crawlerThread.ts b/src/components/Indexer/crawlerThread.ts index e89617fb5..3495cb7a2 100644 --- a/src/components/Indexer/crawlerThread.ts +++ b/src/components/Indexer/crawlerThread.ts @@ -4,7 +4,8 @@ import { getDeployedContractBlock, getNetworkHeight, processBlocks, - processChunkLogs + processChunkLogs, + retrieveChunkEvents } from './utils.js' import { Blockchain } from '../../utils/blockchain.js' import { BlocksEvents, SupportedNetwork } from '../../@types/blockchain.js' @@ -13,6 +14,7 @@ import { sleep } from '../../utils/util.js' import { EVENTS } from '../../utils/index.js' import { INDEXER_LOGGER } from '../../utils/logging/common.js' import { getDatabase } from '../../utils/database.js' +import { Log } from 'ethers' export interface ReindexTask { txId: string @@ -24,16 +26,15 @@ const REINDEX_QUEUE: ReindexTask[] = [] interface ThreadData { rpcDetails: SupportedNetwork - lastIndexedBlock: number } -let { rpcDetails, lastIndexedBlock } = workerData as ThreadData +const { rpcDetails } = workerData as ThreadData const blockchain = new Blockchain(rpcDetails.rpc, rpcDetails.chainId) const provider = blockchain.getProvider() const signer = blockchain.getSigner() -async function updateLastIndexedBlockNumber(block: number): Promise { +async function updateLastIndexedBlockNumber(block: number): Promise { try { const { indexer } = await getDatabase() const updatedIndex = await indexer.update(rpcDetails.chainId, block) @@ -41,6 +42,7 @@ async function updateLastIndexedBlockNumber(block: number): Promise { `New last indexed block : ${updatedIndex.lastIndexedBlock}`, true ) + return updatedIndex.lastIndexedBlock } catch (err) { INDEXER_LOGGER.log( LOG_LEVELS_STR.LEVEL_ERROR, @@ -49,9 +51,25 @@ async function updateLastIndexedBlockNumber(block: number): Promise { ) } } + +async function getLastIndexedBlock(): Promise { + const { indexer } = await getDatabase() + try { + const networkDetails = await indexer.retrieve(rpcDetails.chainId) + return networkDetails?.lastIndexedBlock + } catch (err) { + INDEXER_LOGGER.log( + LOG_LEVELS_STR.LEVEL_ERROR, + 'Error retrieving last indexed block', + true + ) + return null + } +} + export async function proccesNetworkData(): Promise { const deployedBlock = getDeployedContractBlock(rpcDetails.chainId) - if (deployedBlock == null && lastIndexedBlock == null) { + if (deployedBlock == null && (await getLastIndexedBlock()) == null) { INDEXER_LOGGER.logMessage( `chain: ${rpcDetails.chainId} Both deployed block and last indexed block are null. Cannot proceed further on this chain`, true @@ -62,52 +80,77 @@ export async function proccesNetworkData(): Promise { // we can override the default value of 30 secs, by setting process.env.INDEXER_INTERVAL const interval = getCrawlingInterval() let { chunkSize } = rpcDetails + let lockProccessing = false while (true) { - const networkHeight = await getNetworkHeight(provider) + if (!lockProccessing) { + lockProccessing = true + const indexedBlock = await getLastIndexedBlock() + const networkHeight = await getNetworkHeight(provider) - const startBlock = - lastIndexedBlock && lastIndexedBlock > deployedBlock - ? lastIndexedBlock - : deployedBlock + const startBlock = + indexedBlock && indexedBlock > deployedBlock ? indexedBlock : deployedBlock - INDEXER_LOGGER.logMessage( - `network: ${rpcDetails.network} Start block ${startBlock} network height ${networkHeight}`, - true - ) - - if (networkHeight > startBlock) { - const remainingBlocks = networkHeight - startBlock - const blocksToProcess = Math.min(chunkSize, remainingBlocks) INDEXER_LOGGER.logMessage( - `network: ${rpcDetails.network} processing ${blocksToProcess} blocks ...` + `network: ${rpcDetails.network} Start block ${startBlock} network height ${networkHeight}`, + true ) - try { - const processedBlocks = await processBlocks( - signer, - provider, - rpcDetails.chainId, - startBlock, - blocksToProcess - ) - updateLastIndexedBlockNumber(processedBlocks.lastBlock) - checkNewlyIndexedAssets(processedBlocks.foundEvents) - lastIndexedBlock = processedBlocks.lastBlock - chunkSize = chunkSize !== 1 ? chunkSize : rpcDetails.chunkSize - } catch (error) { - INDEXER_LOGGER.log( - LOG_LEVELS_STR.LEVEL_ERROR, - `network: ${rpcDetails.network} Error: ${error.message} `, - true - ) - chunkSize = Math.floor(chunkSize / 2) < 1 ? 1 : Math.floor(chunkSize / 2) + if (networkHeight > startBlock) { + const remainingBlocks = networkHeight - startBlock + const blocksToProcess = Math.min(chunkSize, remainingBlocks) INDEXER_LOGGER.logMessage( - `network: ${rpcDetails.network} Reducing chunk size ${chunkSize} `, - true + `network: ${rpcDetails.network} processing ${blocksToProcess} blocks ...` ) + let chunkEvents: Log[] = [] + try { + chunkEvents = await retrieveChunkEvents( + signer, + provider, + rpcDetails.chainId, + startBlock, + blocksToProcess + ) + } catch (error) { + INDEXER_LOGGER.log( + LOG_LEVELS_STR.LEVEL_ERROR, + `Get events for network: ${rpcDetails.network} failure: ${error.message} `, + true + ) + chunkSize = Math.floor(chunkSize / 2) < 1 ? 1 : Math.floor(chunkSize / 2) + INDEXER_LOGGER.logMessage( + `network: ${rpcDetails.network} Reducing chunk size ${chunkSize} `, + true + ) + } + try { + const processedBlocks = await processBlocks( + chunkEvents, + signer, + provider, + rpcDetails.chainId, + startBlock, + blocksToProcess + ) + await updateLastIndexedBlockNumber(processedBlocks.lastBlock) + checkNewlyIndexedAssets(processedBlocks.foundEvents) + chunkSize = chunkSize !== 1 ? chunkSize : rpcDetails.chunkSize + } catch (error) { + INDEXER_LOGGER.log( + LOG_LEVELS_STR.LEVEL_ERROR, + `Processing event from network failed network: ${rpcDetails.network} Error: ${error.message} `, + true + ) + await updateLastIndexedBlockNumber(startBlock + blocksToProcess) + } } + await processReindex() + lockProccessing = false + } else { + INDEXER_LOGGER.logMessage( + `Processing already in progress for network ${rpcDetails.network} waiting untill finishing the current processing ...`, + true + ) } - processReindex() await sleep(interval) } } diff --git a/src/components/Indexer/index.ts b/src/components/Indexer/index.ts index 3417fa0db..3f505031f 100644 --- a/src/components/Indexer/index.ts +++ b/src/components/Indexer/index.ts @@ -52,12 +52,12 @@ export class OceanIndexer { return network } + // eslint-disable-next-line require-await public async startThreads(): Promise { for (const network of this.supportedChains) { const chainId = parseInt(network) const rpcDetails: SupportedNetwork = this.getSupportedNetwork(chainId) - const lastIndexedBlock = await this.getLastIndexedBlock(chainId) - const workerData = { rpcDetails, lastIndexedBlock } + const workerData = { rpcDetails } INDEXER_LOGGER.log( LOG_LEVELS_STR.LEVEL_INFO, `Starting worker for network ${network} with ${JSON.stringify(workerData)}`, @@ -128,21 +128,6 @@ export class OceanIndexer { } } - public async getLastIndexedBlock(network: number): Promise { - const dbconn = this.db.indexer - try { - const indexer = await dbconn.retrieve(network) - return indexer?.lastIndexedBlock - } catch (err) { - INDEXER_LOGGER.log( - LOG_LEVELS_STR.LEVEL_ERROR, - 'Error retrieving last indexed block', - true - ) - return null - } - } - public getIndexingQueue(): ReindexTask[] { return INDEXING_QUEUE.slice() } diff --git a/src/components/Indexer/processor.ts b/src/components/Indexer/processor.ts index 731687b8a..9ed4fb03e 100644 --- a/src/components/Indexer/processor.ts +++ b/src/components/Indexer/processor.ts @@ -1,5 +1,4 @@ import { - Contract, Interface, JsonRpcApiProvider, Signer, @@ -17,6 +16,7 @@ import ERC721Template from '@oceanprotocol/contracts/artifacts/contracts/templat import ERC20Template from '@oceanprotocol/contracts/artifacts/contracts/templates/ERC20TemplateEnterprise.sol/ERC20TemplateEnterprise.json' assert { type: 'json' } import { getDatabase } from '../../utils/database.js' import { PROTOCOL_COMMANDS, EVENTS, MetadataStates } from '../../utils/constants.js' +import { getDtContract, wasNFTDeployedByOurFactory } from './utils.js' import { INDEXER_LOGGER } from '../../utils/logging/common.js' import { Purgatory } from './purgatory.js' import { getConfiguration } from '../../utils/index.js' @@ -25,7 +25,6 @@ import { asyncCallWithTimeout, streamToString } from '../../utils/util.js' import { DecryptDDOCommand } from '../../@types/commands.js' import { create256Hash } from '../../utils/crypt.js' import { URLUtils } from '../../utils/url.js' -import { wasNFTDeployedByOurFactory } from './utils.js' class BaseEventProcessor { protected networkId: number @@ -543,6 +542,7 @@ export class OrderStartedEventProcessor extends BaseEventProcessor { async processEvent( event: ethers.Log, chainId: number, + signer: Signer, provider: JsonRpcApiProvider ): Promise { const decodedEventData = await this.getEventData( @@ -558,11 +558,8 @@ export class OrderStartedEventProcessor extends BaseEventProcessor { `Processed new order for service index ${serviceIndex} at ${timestamp}`, true ) - const datatokenContract = new Contract( - event.address, - ERC20Template.abi, - await provider.getSigner() - ) + const datatokenContract = getDtContract(signer, event.address) + const nftAddress = await datatokenContract.getERC721Address() const did = 'did:op:' + @@ -609,6 +606,7 @@ export class OrderReusedEventProcessor extends BaseEventProcessor { async processEvent( event: ethers.Log, chainId: number, + signer: Signer, provider: JsonRpcApiProvider ): Promise { const decodedEventData = await this.getEventData( @@ -621,11 +619,8 @@ export class OrderReusedEventProcessor extends BaseEventProcessor { const payer = decodedEventData.args[1].toString() INDEXER_LOGGER.logMessage(`Processed reused order at ${timestamp}`, true) - const datatokenContract = new Contract( - event.address, - ERC20Template.abi, - await provider.getSigner() - ) + const datatokenContract = getDtContract(signer, event.address) + const nftAddress = await datatokenContract.getERC721Address() const did = 'did:op:' + diff --git a/src/components/Indexer/utils.ts b/src/components/Indexer/utils.ts index 2ac0718cf..13331cbb4 100644 --- a/src/components/Indexer/utils.ts +++ b/src/components/Indexer/utils.ts @@ -78,13 +78,13 @@ export const getNetworkHeight = async (provider: JsonRpcApiProvider) => { return networkHeight } -export const processBlocks = async ( +export const retrieveChunkEvents = async ( signer: Signer, provider: JsonRpcApiProvider, network: number, lastIndexedBlock: number, count: number -): Promise => { +): Promise => { try { const eventHashes = Object.keys(EVENT_HASHES) const startIndex = lastIndexedBlock + 1 @@ -93,7 +93,25 @@ export const processBlocks = async ( toBlock: lastIndexedBlock + count, topics: [eventHashes] }) - const events = await processChunkLogs(blockLogs, signer, provider, network) + return blockLogs + } catch (error) { + throw new Error(` Error processing chunk of blocks events ${error.message}`) + } +} + +export const processBlocks = async ( + blockLogs: ethers.Log[], + signer: Signer, + provider: JsonRpcApiProvider, + network: number, + lastIndexedBlock: number, + count: number +): Promise => { + try { + const events: any[] | BlocksEvents = + blockLogs && blockLogs.length > 0 + ? await processChunkLogs(blockLogs, signer, provider, network) + : [] return { lastBlock: lastIndexedBlock + count, @@ -190,10 +208,20 @@ export const processChunkLogs = async ( storeEvents[event.type] = processExchangeRateChanged() } else if (event.type === EVENTS.ORDER_STARTED) { const processor = getOrderStartedEventProcessor(chainId) - storeEvents[event.type] = await processor.processEvent(log, chainId, provider) + storeEvents[event.type] = await processor.processEvent( + log, + chainId, + signer, + provider + ) } else if (event.type === EVENTS.ORDER_REUSED) { const processor = getOrderReusedEventProcessor(chainId) - storeEvents[event.type] = await processor.processEvent(log, chainId, provider) + storeEvents[event.type] = await processor.processEvent( + log, + chainId, + signer, + provider + ) } else if (event.type === EVENTS.TOKEN_URI_UPDATE) { storeEvents[event.type] = processTokenUriUpadate() } @@ -222,18 +250,22 @@ export const getNFTContract = (signer: Signer, address: string): ethers.Contract return getContract(signer, 'ERC721Template', address) } +export const getDtContract = (signer: Signer, address: string): ethers.Contract => { + address = getAddress(address) + return getContract(signer, 'ERC20Template', address) +} + export const getNFTFactory = (signer: Signer, address: string): ethers.Contract => { address = getAddress(address) return getContract(signer, 'ERC721Factory', address) } - function getContract( signer: Signer, contractName: string, address: string ): ethers.Contract { const abi = getContractDefinition(contractName) - return new ethers.Contract(getAddress(address), abi, signer) // was provider.getSigner() => thow no account + return new ethers.Contract(getAddress(address), abi, signer) } function getContractDefinition(contractName: string): any { @@ -242,6 +274,8 @@ function getContractDefinition(contractName: string): any { return ERC721Factory.abi case 'ERC721Template': return ERC721Template.abi + case 'ERC20Template': + return ERC20Template.abi default: return ERC721Factory.abi } diff --git a/src/test/unit/indexer/indexer.test.ts b/src/test/unit/indexer/indexer.test.ts index 36f8af16c..b7c7d5fa4 100644 --- a/src/test/unit/indexer/indexer.test.ts +++ b/src/test/unit/indexer/indexer.test.ts @@ -27,7 +27,6 @@ describe('OceanIndexer', () => { } stub(oceanIndexer as any, 'startThreads').callsFake(() => { - oceanIndexer.getLastIndexedBlock = stub().resolves(0) oceanIndexer.startThreads = async () => { try { const network = '1' diff --git a/src/test/utils/utils.ts b/src/test/utils/utils.ts index bb2d50cd9..e45826687 100644 --- a/src/test/utils/utils.ts +++ b/src/test/utils/utils.ts @@ -138,12 +138,6 @@ export function getMockSupportedNetworks(): RPCS { network: 'development', rpc: 'http://127.0.0.1:8545', chunkSize: 100 - }, - '137': { - chainId: 137, - network: 'polygon', - rpc: 'https://polygon-rpc.com', - chunkSize: 1000 } } return mockSupportedNetworks