Skip to content

Commit

Permalink
Merge pull request #360 from oceanprotocol/fix/indexer-chunk-size
Browse files Browse the repository at this point in the history
Fix/ Indexer chunk size reduce
  • Loading branch information
bogdanfazakas authored Apr 11, 2024
2 parents 0e7a163 + 5abd053 commit 8c4bb05
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 84 deletions.
125 changes: 84 additions & 41 deletions src/components/Indexer/crawlerThread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import {
getDeployedContractBlock,
getNetworkHeight,
processBlocks,
processChunkLogs
processChunkLogs,
retrieveChunkEvents
} from './utils.js'
import { Blockchain } from '../../utils/blockchain.js'
import { BlocksEvents, SupportedNetwork } from '../../@types/blockchain.js'
Expand All @@ -13,6 +14,7 @@ import { sleep } from '../../utils/util.js'
import { EVENTS } from '../../utils/index.js'
import { INDEXER_LOGGER } from '../../utils/logging/common.js'
import { getDatabase } from '../../utils/database.js'
import { Log } from 'ethers'

export interface ReindexTask {
txId: string
Expand All @@ -24,23 +26,23 @@ const REINDEX_QUEUE: ReindexTask[] = []

interface ThreadData {
rpcDetails: SupportedNetwork
lastIndexedBlock: number
}

let { rpcDetails, lastIndexedBlock } = workerData as ThreadData
const { rpcDetails } = workerData as ThreadData

const blockchain = new Blockchain(rpcDetails.rpc, rpcDetails.chainId)
const provider = blockchain.getProvider()
const signer = blockchain.getSigner()

async function updateLastIndexedBlockNumber(block: number): Promise<void> {
async function updateLastIndexedBlockNumber(block: number): Promise<number> {
try {
const { indexer } = await getDatabase()
const updatedIndex = await indexer.update(rpcDetails.chainId, block)
INDEXER_LOGGER.logMessage(
`New last indexed block : ${updatedIndex.lastIndexedBlock}`,
true
)
return updatedIndex.lastIndexedBlock
} catch (err) {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
Expand All @@ -49,9 +51,25 @@ async function updateLastIndexedBlockNumber(block: number): Promise<void> {
)
}
}

async function getLastIndexedBlock(): Promise<number> {
const { indexer } = await getDatabase()
try {
const networkDetails = await indexer.retrieve(rpcDetails.chainId)
return networkDetails?.lastIndexedBlock
} catch (err) {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
'Error retrieving last indexed block',
true
)
return null
}
}

export async function proccesNetworkData(): Promise<void> {
const deployedBlock = getDeployedContractBlock(rpcDetails.chainId)
if (deployedBlock == null && lastIndexedBlock == null) {
if (deployedBlock == null && (await getLastIndexedBlock()) == null) {
INDEXER_LOGGER.logMessage(
`chain: ${rpcDetails.chainId} Both deployed block and last indexed block are null. Cannot proceed further on this chain`,
true
Expand All @@ -62,52 +80,77 @@ export async function proccesNetworkData(): Promise<void> {
// we can override the default value of 30 secs, by setting process.env.INDEXER_INTERVAL
const interval = getCrawlingInterval()
let { chunkSize } = rpcDetails
let lockProccessing = false
while (true) {
const networkHeight = await getNetworkHeight(provider)
if (!lockProccessing) {
lockProccessing = true
const indexedBlock = await getLastIndexedBlock()
const networkHeight = await getNetworkHeight(provider)

const startBlock =
lastIndexedBlock && lastIndexedBlock > deployedBlock
? lastIndexedBlock
: deployedBlock
const startBlock =
indexedBlock && indexedBlock > deployedBlock ? indexedBlock : deployedBlock

INDEXER_LOGGER.logMessage(
`network: ${rpcDetails.network} Start block ${startBlock} network height ${networkHeight}`,
true
)

if (networkHeight > startBlock) {
const remainingBlocks = networkHeight - startBlock
const blocksToProcess = Math.min(chunkSize, remainingBlocks)
INDEXER_LOGGER.logMessage(
`network: ${rpcDetails.network} processing ${blocksToProcess} blocks ...`
`network: ${rpcDetails.network} Start block ${startBlock} network height ${networkHeight}`,
true
)

try {
const processedBlocks = await processBlocks(
signer,
provider,
rpcDetails.chainId,
startBlock,
blocksToProcess
)
updateLastIndexedBlockNumber(processedBlocks.lastBlock)
checkNewlyIndexedAssets(processedBlocks.foundEvents)
lastIndexedBlock = processedBlocks.lastBlock
chunkSize = chunkSize !== 1 ? chunkSize : rpcDetails.chunkSize
} catch (error) {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`network: ${rpcDetails.network} Error: ${error.message} `,
true
)
chunkSize = Math.floor(chunkSize / 2) < 1 ? 1 : Math.floor(chunkSize / 2)
if (networkHeight > startBlock) {
const remainingBlocks = networkHeight - startBlock
const blocksToProcess = Math.min(chunkSize, remainingBlocks)
INDEXER_LOGGER.logMessage(
`network: ${rpcDetails.network} Reducing chunk size ${chunkSize} `,
true
`network: ${rpcDetails.network} processing ${blocksToProcess} blocks ...`
)
let chunkEvents: Log[] = []
try {
chunkEvents = await retrieveChunkEvents(
signer,
provider,
rpcDetails.chainId,
startBlock,
blocksToProcess
)
} catch (error) {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Get events for network: ${rpcDetails.network} failure: ${error.message} `,
true
)
chunkSize = Math.floor(chunkSize / 2) < 1 ? 1 : Math.floor(chunkSize / 2)
INDEXER_LOGGER.logMessage(
`network: ${rpcDetails.network} Reducing chunk size ${chunkSize} `,
true
)
}
try {
const processedBlocks = await processBlocks(
chunkEvents,
signer,
provider,
rpcDetails.chainId,
startBlock,
blocksToProcess
)
await updateLastIndexedBlockNumber(processedBlocks.lastBlock)
checkNewlyIndexedAssets(processedBlocks.foundEvents)
chunkSize = chunkSize !== 1 ? chunkSize : rpcDetails.chunkSize
} catch (error) {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Processing event from network failed network: ${rpcDetails.network} Error: ${error.message} `,
true
)
await updateLastIndexedBlockNumber(startBlock + blocksToProcess)
}
}
await processReindex()
lockProccessing = false
} else {
INDEXER_LOGGER.logMessage(
`Processing already in progress for network ${rpcDetails.network} waiting untill finishing the current processing ...`,
true
)
}
processReindex()
await sleep(interval)
}
}
Expand Down
19 changes: 2 additions & 17 deletions src/components/Indexer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ export class OceanIndexer {
return network
}

// eslint-disable-next-line require-await
public async startThreads(): Promise<void> {
for (const network of this.supportedChains) {
const chainId = parseInt(network)
const rpcDetails: SupportedNetwork = this.getSupportedNetwork(chainId)
const lastIndexedBlock = await this.getLastIndexedBlock(chainId)
const workerData = { rpcDetails, lastIndexedBlock }
const workerData = { rpcDetails }
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_INFO,
`Starting worker for network ${network} with ${JSON.stringify(workerData)}`,
Expand Down Expand Up @@ -128,21 +128,6 @@ export class OceanIndexer {
}
}

public async getLastIndexedBlock(network: number): Promise<number> {
const dbconn = this.db.indexer
try {
const indexer = await dbconn.retrieve(network)
return indexer?.lastIndexedBlock
} catch (err) {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
'Error retrieving last indexed block',
true
)
return null
}
}

public getIndexingQueue(): ReindexTask[] {
return INDEXING_QUEUE.slice()
}
Expand Down
19 changes: 7 additions & 12 deletions src/components/Indexer/processor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import {
Contract,
Interface,
JsonRpcApiProvider,
Signer,
Expand All @@ -17,6 +16,7 @@ import ERC721Template from '@oceanprotocol/contracts/artifacts/contracts/templat
import ERC20Template from '@oceanprotocol/contracts/artifacts/contracts/templates/ERC20TemplateEnterprise.sol/ERC20TemplateEnterprise.json' assert { type: 'json' }
import { getDatabase } from '../../utils/database.js'
import { PROTOCOL_COMMANDS, EVENTS, MetadataStates } from '../../utils/constants.js'
import { getDtContract, wasNFTDeployedByOurFactory } from './utils.js'
import { INDEXER_LOGGER } from '../../utils/logging/common.js'
import { Purgatory } from './purgatory.js'
import { getConfiguration } from '../../utils/index.js'
Expand All @@ -25,7 +25,6 @@ import { asyncCallWithTimeout, streamToString } from '../../utils/util.js'
import { DecryptDDOCommand } from '../../@types/commands.js'
import { create256Hash } from '../../utils/crypt.js'
import { URLUtils } from '../../utils/url.js'
import { wasNFTDeployedByOurFactory } from './utils.js'

class BaseEventProcessor {
protected networkId: number
Expand Down Expand Up @@ -543,6 +542,7 @@ export class OrderStartedEventProcessor extends BaseEventProcessor {
async processEvent(
event: ethers.Log,
chainId: number,
signer: Signer,
provider: JsonRpcApiProvider
): Promise<any> {
const decodedEventData = await this.getEventData(
Expand All @@ -558,11 +558,8 @@ export class OrderStartedEventProcessor extends BaseEventProcessor {
`Processed new order for service index ${serviceIndex} at ${timestamp}`,
true
)
const datatokenContract = new Contract(
event.address,
ERC20Template.abi,
await provider.getSigner()
)
const datatokenContract = getDtContract(signer, event.address)

const nftAddress = await datatokenContract.getERC721Address()
const did =
'did:op:' +
Expand Down Expand Up @@ -609,6 +606,7 @@ export class OrderReusedEventProcessor extends BaseEventProcessor {
async processEvent(
event: ethers.Log,
chainId: number,
signer: Signer,
provider: JsonRpcApiProvider
): Promise<any> {
const decodedEventData = await this.getEventData(
Expand All @@ -621,11 +619,8 @@ export class OrderReusedEventProcessor extends BaseEventProcessor {
const payer = decodedEventData.args[1].toString()
INDEXER_LOGGER.logMessage(`Processed reused order at ${timestamp}`, true)

const datatokenContract = new Contract(
event.address,
ERC20Template.abi,
await provider.getSigner()
)
const datatokenContract = getDtContract(signer, event.address)

const nftAddress = await datatokenContract.getERC721Address()
const did =
'did:op:' +
Expand Down
48 changes: 41 additions & 7 deletions src/components/Indexer/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ export const getNetworkHeight = async (provider: JsonRpcApiProvider) => {
return networkHeight
}

export const processBlocks = async (
export const retrieveChunkEvents = async (
signer: Signer,
provider: JsonRpcApiProvider,
network: number,
lastIndexedBlock: number,
count: number
): Promise<ProcessingEvents> => {
): Promise<ethers.Log[]> => {
try {
const eventHashes = Object.keys(EVENT_HASHES)
const startIndex = lastIndexedBlock + 1
Expand All @@ -93,7 +93,25 @@ export const processBlocks = async (
toBlock: lastIndexedBlock + count,
topics: [eventHashes]
})
const events = await processChunkLogs(blockLogs, signer, provider, network)
return blockLogs
} catch (error) {
throw new Error(` Error processing chunk of blocks events ${error.message}`)
}
}

export const processBlocks = async (
blockLogs: ethers.Log[],
signer: Signer,
provider: JsonRpcApiProvider,
network: number,
lastIndexedBlock: number,
count: number
): Promise<ProcessingEvents> => {
try {
const events: any[] | BlocksEvents =
blockLogs && blockLogs.length > 0
? await processChunkLogs(blockLogs, signer, provider, network)
: []

return {
lastBlock: lastIndexedBlock + count,
Expand Down Expand Up @@ -190,10 +208,20 @@ export const processChunkLogs = async (
storeEvents[event.type] = processExchangeRateChanged()
} else if (event.type === EVENTS.ORDER_STARTED) {
const processor = getOrderStartedEventProcessor(chainId)
storeEvents[event.type] = await processor.processEvent(log, chainId, provider)
storeEvents[event.type] = await processor.processEvent(
log,
chainId,
signer,
provider
)
} else if (event.type === EVENTS.ORDER_REUSED) {
const processor = getOrderReusedEventProcessor(chainId)
storeEvents[event.type] = await processor.processEvent(log, chainId, provider)
storeEvents[event.type] = await processor.processEvent(
log,
chainId,
signer,
provider
)
} else if (event.type === EVENTS.TOKEN_URI_UPDATE) {
storeEvents[event.type] = processTokenUriUpadate()
}
Expand Down Expand Up @@ -222,18 +250,22 @@ export const getNFTContract = (signer: Signer, address: string): ethers.Contract
return getContract(signer, 'ERC721Template', address)
}

export const getDtContract = (signer: Signer, address: string): ethers.Contract => {
address = getAddress(address)
return getContract(signer, 'ERC20Template', address)
}

export const getNFTFactory = (signer: Signer, address: string): ethers.Contract => {
address = getAddress(address)
return getContract(signer, 'ERC721Factory', address)
}

function getContract(
signer: Signer,
contractName: string,
address: string
): ethers.Contract {
const abi = getContractDefinition(contractName)
return new ethers.Contract(getAddress(address), abi, signer) // was provider.getSigner() => thow no account
return new ethers.Contract(getAddress(address), abi, signer)
}

function getContractDefinition(contractName: string): any {
Expand All @@ -242,6 +274,8 @@ function getContractDefinition(contractName: string): any {
return ERC721Factory.abi
case 'ERC721Template':
return ERC721Template.abi
case 'ERC20Template':
return ERC20Template.abi
default:
return ERC721Factory.abi
}
Expand Down
1 change: 0 additions & 1 deletion src/test/unit/indexer/indexer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ describe('OceanIndexer', () => {
}

stub(oceanIndexer as any, 'startThreads').callsFake(() => {
oceanIndexer.getLastIndexedBlock = stub().resolves(0)
oceanIndexer.startThreads = async () => {
try {
const network = '1'
Expand Down
Loading

0 comments on commit 8c4bb05

Please sign in to comment.