Skip to content

Commit

Permalink
split indexing logic
Browse files Browse the repository at this point in the history
  • Loading branch information
bogdanfazakas committed Mar 25, 2024
1 parent fd5476d commit 134b43a
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 10 deletions.
36 changes: 28 additions & 8 deletions src/components/Indexer/crawlerThread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import {
getDeployedContractBlock,
getNetworkHeight,
processBlocks,
processChunkLogs
processChunkLogs,
retrieveChunkEvents
} from './utils.js'
import { Blockchain } from '../../utils/blockchain.js'
import { BlocksEvents, SupportedNetwork } from '../../@types/blockchain.js'
Expand All @@ -13,6 +14,7 @@ import { sleep } from '../../utils/util.js'
import { EVENTS } from '../../utils/index.js'
import { INDEXER_LOGGER } from '../../utils/logging/common.js'
import { getDatabase } from '../../utils/database.js'
import { Log } from 'ethers'

export interface ReindexTask {
txId: string
Expand Down Expand Up @@ -81,23 +83,19 @@ export async function proccesNetworkData(): Promise<void> {
INDEXER_LOGGER.logMessage(
`network: ${rpcDetails.network} processing ${blocksToProcess} blocks ...`
)

let chunkEvents: Log[] = []
try {
const processedBlocks = await processBlocks(
chunkEvents = await retrieveChunkEvents(
signer,
provider,
rpcDetails.chainId,
startBlock,
blocksToProcess
)
updateLastIndexedBlockNumber(processedBlocks.lastBlock)
checkNewlyIndexedAssets(processedBlocks.foundEvents)
lastIndexedBlock = processedBlocks.lastBlock
chunkSize = chunkSize !== 1 ? chunkSize : rpcDetails.chunkSize
} catch (error) {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`network: ${rpcDetails.network} Error: ${error.message} `,
`Get events for network: ${rpcDetails.network} failure: ${error.message} `,
true
)
chunkSize = Math.floor(chunkSize / 2) < 1 ? 1 : Math.floor(chunkSize / 2)
Expand All @@ -106,6 +104,28 @@ export async function proccesNetworkData(): Promise<void> {
true
)
}
if (chunkEvents && chunkEvents.length > 0) {
try {
const processedBlocks = await processBlocks(
chunkEvents,
signer,
provider,
rpcDetails.chainId,
startBlock,
blocksToProcess
)
updateLastIndexedBlockNumber(processedBlocks.lastBlock)
checkNewlyIndexedAssets(processedBlocks.foundEvents)
lastIndexedBlock = processedBlocks.lastBlock
chunkSize = chunkSize !== 1 ? chunkSize : rpcDetails.chunkSize
} catch (error) {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Processing event from network failed network: ${rpcDetails.network} Error: ${error.message} `,
true
)
}
}
}
processReindex()
await sleep(interval)
Expand Down
19 changes: 17 additions & 2 deletions src/components/Indexer/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ export const getNetworkHeight = async (provider: JsonRpcApiProvider) => {
return networkHeight
}

export const processBlocks = async (
export const retrieveChunkEvents = async (
signer: Signer,
provider: JsonRpcApiProvider,
network: number,
lastIndexedBlock: number,
count: number
): Promise<ProcessingEvents> => {
): Promise<ethers.Log[]> => {
try {
const eventHashes = Object.keys(EVENT_HASHES)
const startIndex = lastIndexedBlock + 1
Expand All @@ -93,6 +93,21 @@ export const processBlocks = async (
toBlock: lastIndexedBlock + count,
topics: [eventHashes]
})
return blockLogs
} catch (error) {
throw new Error(` Error processing chunk of blocks events ${error.message}`)
}
}

export const processBlocks = async (
blockLogs: ethers.Log[],
signer: Signer,
provider: JsonRpcApiProvider,
network: number,
lastIndexedBlock: number,
count: number
): Promise<ProcessingEvents> => {
try {
const events = await processChunkLogs(blockLogs, signer, provider, network)

return {
Expand Down

0 comments on commit 134b43a

Please sign in to comment.