diff --git a/packages/core/CHANGELOG.md b/packages/core/CHANGELOG.md index aac2e9471..e8a5d9462 100644 --- a/packages/core/CHANGELOG.md +++ b/packages/core/CHANGELOG.md @@ -1,5 +1,11 @@ # @ponder/core +## 0.4.36 + +### Patch Changes + +- [#931](https://github.com/ponder-sh/ponder/pull/931) [`3bf69809b25c7d3e50be6eaf2f8d3564c7bb8f14`](https://github.com/ponder-sh/ponder/commit/3bf69809b25c7d3e50be6eaf2f8d3564c7bb8f14) Thanks [@kyscott18](https://github.com/kyscott18)! - Fixed bug with cache intervals occasionally causing statement timeouts for large apps. + ## 0.4.35 ### Patch Changes diff --git a/packages/core/package.json b/packages/core/package.json index 9aefb86e5..868247db6 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -1,6 +1,6 @@ { "name": "@ponder/core", - "version": "0.4.35", + "version": "0.4.36", "description": "An open-source framework for crypto application backends", "license": "MIT", "type": "module", diff --git a/packages/core/src/_test/setup.ts b/packages/core/src/_test/setup.ts index c797ff36c..d93ad3da6 100644 --- a/packages/core/src/_test/setup.ts +++ b/packages/core/src/_test/setup.ts @@ -155,7 +155,10 @@ export async function setupDatabaseServices( await database.migrateSyncStore(); - const syncStore = new SqliteSyncStore({ db: database.syncDb }); + const syncStore = new SqliteSyncStore({ + db: database.syncDb, + common: context.common, + }); const indexingStore = { ...getReadonlyStore({ @@ -209,7 +212,10 @@ export async function setupDatabaseServices( await database.migrateSyncStore(); - const syncStore = new PostgresSyncStore({ db: database.syncDb }); + const syncStore = new PostgresSyncStore({ + db: database.syncDb, + common: context.common, + }); const indexingStore = { ...getReadonlyStore({ diff --git a/packages/core/src/bin/utils/run.ts b/packages/core/src/bin/utils/run.ts index 2d72a275f..dfd03be71 100644 --- a/packages/core/src/bin/utils/run.ts +++ b/packages/core/src/bin/utils/run.ts @@ -78,7 +78,7 @@ export async function run({ .setup({ schema, buildId }) .then(({ namespaceInfo, checkpoint }) => [namespaceInfo, checkpoint]); - syncStore = new SqliteSyncStore({ db: database.syncDb }); + syncStore = new SqliteSyncStore({ db: database.syncDb, common }); } else { const { poolConfig, schema: userNamespace, publishSchema } = databaseConfig; database = new PostgresDatabaseService({ @@ -91,7 +91,7 @@ export async function run({ .setup({ schema, buildId }) .then(({ namespaceInfo, checkpoint }) => [namespaceInfo, checkpoint]); - syncStore = new PostgresSyncStore({ db: database.syncDb }); + syncStore = new PostgresSyncStore({ db: database.syncDb, common }); } const readonlyStore = getReadonlyStore({ diff --git a/packages/core/src/common/options.ts b/packages/core/src/common/options.ts index 2687f6f9b..e7739a95c 100644 --- a/packages/core/src/common/options.ts +++ b/packages/core/src/common/options.ts @@ -33,6 +33,8 @@ export type Options = { databaseHeartbeatTimeout: number; indexingCacheBytes: number; + + syncMaxIntervals: number; }; export const buildOptions = ({ cliOptions }: { cliOptions: CliOptions }) => { @@ -102,5 +104,7 @@ export const buildOptions = ({ cliOptions }: { cliOptions: CliOptions }) => { databaseHeartbeatTimeout: 25 * 1000, indexingCacheBytes: os.freemem() / 3, + + syncMaxIntervals: 50_000, } satisfies Options; }; diff --git a/packages/core/src/sync-historical/service.ts b/packages/core/src/sync-historical/service.ts index 63bdae963..d5433c5f6 100644 --- a/packages/core/src/sync-historical/service.ts +++ b/packages/core/src/sync-historical/service.ts @@ -188,610 +188,604 @@ export class HistoricalSyncService extends Emittery { this.isShuttingDown = false; this.blockTasksEnqueuedCheckpoint = 0; - await Promise.all( - this.sources.map(async (source) => { - const startBlock = source.startBlock; - const endBlock = source.endBlock ?? finalizedBlockNumber; - - if (source.startBlock > finalizedBlockNumber) { - switch (source.type) { - case "log": - case "factoryLog": { - this.common.metrics.ponder_historical_total_blocks.set( - { - network: this.network.name, - source: source.contractName, - type: "log", - }, - 0, - ); - this.common.logger.warn({ - service: "historical", - msg: `Skipped syncing '${this.network.name}' logs for '${source.contractName}' because the start block is not finalized`, - }); - break; - } - - case "callTrace": - case "factoryCallTrace": { - this.common.metrics.ponder_historical_total_blocks.set( - { - network: this.network.name, - source: source.contractName, - type: "trace", - }, - 0, - ); - this.common.logger.warn({ - service: "historical", - msg: `Skipped syncing '${this.network.name}' call traces for '${source.contractName}' because the start block is not finalized`, - }); - break; - } - - case "block": { - this.common.metrics.ponder_historical_total_blocks.set( - { - network: this.network.name, - source: source.sourceName, - type: "block", - }, - 0, - ); - this.common.logger.warn({ - service: "historical", - msg: `Skipped syncing '${this.network.name}' blocks for '${source.sourceName}' because the start block is not finalized`, - }); - break; - } - - default: - never(source); - } - - return; - } + for (const source of this.sources) { + const startBlock = source.startBlock; + const endBlock = source.endBlock ?? finalizedBlockNumber; + if (source.startBlock > finalizedBlockNumber) { switch (source.type) { - case "log": { - const completedLogFilterIntervals = - await this.syncStore.getLogFilterIntervals({ - chainId: source.chainId, - logFilter: source.criteria, - }); - const logFilterProgressTracker = new ProgressTracker({ - target: [startBlock, endBlock], - completed: completedLogFilterIntervals, - }); - this.logFilterProgressTrackers[source.id] = - logFilterProgressTracker; - - const requiredLogFilterIntervals = - logFilterProgressTracker.getRequired(); - - const logFilterTaskChunks = getChunks({ - intervals: requiredLogFilterIntervals, - maxChunkSize: - source.maxBlockRange ?? this.network.defaultMaxBlockRange, - }); - - for (const [fromBlock, toBlock] of logFilterTaskChunks) { - this.queue.addTask( - { - kind: "LOG_FILTER", - logFilter: source, - fromBlock, - toBlock, - }, - { priority: Number.MAX_SAFE_INTEGER - fromBlock }, - ); - } - if (logFilterTaskChunks.length > 0) { - const total = intervalSum(requiredLogFilterIntervals); - this.common.logger.debug({ - service: "historical", - msg: `Added '${this.network.name}' LOG_FILTER tasks for '${source.contractName}' over a ${total} block range`, - }); - } - - const targetBlockCount = endBlock - startBlock + 1; - const cachedBlockCount = - targetBlockCount - intervalSum(requiredLogFilterIntervals); - + case "log": + case "factoryLog": { this.common.metrics.ponder_historical_total_blocks.set( { network: this.network.name, source: source.contractName, type: "log", }, - targetBlockCount, + 0, ); - this.common.metrics.ponder_historical_cached_blocks.set( + this.common.logger.warn({ + service: "historical", + msg: `Skipped syncing '${this.network.name}' logs for '${source.contractName}' because the start block is not finalized`, + }); + break; + } + + case "callTrace": + case "factoryCallTrace": { + this.common.metrics.ponder_historical_total_blocks.set( { network: this.network.name, source: source.contractName, - type: "log", + type: "trace", }, - cachedBlockCount, + 0, ); - - this.common.logger.info({ + this.common.logger.warn({ service: "historical", - msg: `Started syncing '${this.network.name}' logs for '${ - source.contractName - }' with ${formatPercentage( - Math.min(1, cachedBlockCount / (targetBlockCount || 1)), - )} cached`, + msg: `Skipped syncing '${this.network.name}' call traces for '${source.contractName}' because the start block is not finalized`, }); + break; } - break; - - case "factoryLog": { - // Note that factory child address progress is stored using - // log intervals for the factory log. - const completedFactoryChildAddressIntervals = - await this.syncStore.getLogFilterIntervals({ - chainId: source.chainId, - logFilter: { - address: source.criteria.address, - topics: [source.criteria.eventSelector], - includeTransactionReceipts: false, - }, - }); - const factoryChildAddressProgressTracker = new ProgressTracker({ - target: [startBlock, endBlock], - completed: completedFactoryChildAddressIntervals, - }); - this.factoryChildAddressProgressTrackers[source.id] = - factoryChildAddressProgressTracker; - - const requiredFactoryChildAddressIntervals = - factoryChildAddressProgressTracker.getRequired(); - const factoryChildAddressTaskChunks = getChunks({ - intervals: requiredFactoryChildAddressIntervals, - maxChunkSize: - source.maxBlockRange ?? this.network.defaultMaxBlockRange, - }); - - for (const [fromBlock, toBlock] of factoryChildAddressTaskChunks) { - this.queue.addTask( - { - kind: "FACTORY_CHILD_ADDRESS", - factory: source, - fromBlock, - toBlock, - }, - { priority: Number.MAX_SAFE_INTEGER - fromBlock }, - ); - } - if (factoryChildAddressTaskChunks.length > 0) { - const total = intervalSum(requiredFactoryChildAddressIntervals); - this.common.logger.debug({ - service: "historical", - msg: `Added '${this.network.name}' FACTORY_CHILD_ADDRESS tasks for '${source.contractName}' over a ${total} block range`, - }); - } - - const targetFactoryChildAddressBlockCount = - endBlock - startBlock + 1; - const cachedFactoryChildAddressBlockCount = - targetFactoryChildAddressBlockCount - - intervalSum(requiredFactoryChildAddressIntervals); + case "block": { this.common.metrics.ponder_historical_total_blocks.set( { network: this.network.name, - source: `${source.contractName}_factory`, - type: "log", - }, - targetFactoryChildAddressBlockCount, - ); - this.common.metrics.ponder_historical_cached_blocks.set( - { - network: this.network.name, - source: `${source.contractName}_factory`, - type: "log", + source: source.sourceName, + type: "block", }, - cachedFactoryChildAddressBlockCount, + 0, ); - - const completedFactoryLogFilterIntervals = - await this.syncStore.getFactoryLogFilterIntervals({ - chainId: source.chainId, - factory: source.criteria, - }); - const factoryLogFilterProgressTracker = new ProgressTracker({ - target: [startBlock, endBlock], - completed: completedFactoryLogFilterIntervals, + this.common.logger.warn({ + service: "historical", + msg: `Skipped syncing '${this.network.name}' blocks for '${source.sourceName}' because the start block is not finalized`, }); - this.factoryLogFilterProgressTrackers[source.id] = - factoryLogFilterProgressTracker; - - // Only add factory log filter tasks for any intervals where the - // child address tasks are completed, but the factory log filter tasks are not, - // because these won't be added automatically by child address tasks. - const requiredFactoryLogFilterIntervals = - factoryLogFilterProgressTracker.getRequired(); - const missingFactoryLogFilterIntervals = intervalDifference( - requiredFactoryLogFilterIntervals, - requiredFactoryChildAddressIntervals, - ); + break; + } - const missingFactoryLogFilterTaskChunks = getChunks({ - intervals: missingFactoryLogFilterIntervals, - maxChunkSize: - source.maxBlockRange ?? this.network.defaultMaxBlockRange, + default: + never(source); + } + + return; + } + + switch (source.type) { + case "log": { + const completedLogFilterIntervals = + await this.syncStore.getLogFilterIntervals({ + chainId: source.chainId, + logFilter: source.criteria, }); + const logFilterProgressTracker = new ProgressTracker({ + target: [startBlock, endBlock], + completed: completedLogFilterIntervals, + }); + this.logFilterProgressTrackers[source.id] = logFilterProgressTracker; - for (const [ - fromBlock, - toBlock, - ] of missingFactoryLogFilterTaskChunks) { - this.queue.addTask( - { - kind: "FACTORY_LOG_FILTER", - factoryLogFilter: source, - fromBlock, - toBlock, - }, - { priority: Number.MAX_SAFE_INTEGER - fromBlock }, - ); - } - if (missingFactoryLogFilterTaskChunks.length > 0) { - const total = intervalSum(missingFactoryLogFilterIntervals); - this.common.logger.debug({ - service: "historical", - msg: `Added '${this.network.name}' FACTORY_LOG_FILTER tasks for '${source.contractName}' over a ${total} block range`, - }); - } - - const targetFactoryLogFilterBlockCount = endBlock - startBlock + 1; - const cachedFactoryLogFilterBlockCount = - targetFactoryLogFilterBlockCount - - intervalSum(requiredFactoryLogFilterIntervals); + const requiredLogFilterIntervals = + logFilterProgressTracker.getRequired(); - this.common.metrics.ponder_historical_total_blocks.set( - { - network: this.network.name, - source: source.contractName, - type: "log", - }, - targetFactoryLogFilterBlockCount, - ); - this.common.metrics.ponder_historical_cached_blocks.set( + const logFilterTaskChunks = getChunks({ + intervals: requiredLogFilterIntervals, + maxChunkSize: + source.maxBlockRange ?? this.network.defaultMaxBlockRange, + }); + + for (const [fromBlock, toBlock] of logFilterTaskChunks) { + this.queue.addTask( { - network: this.network.name, - source: source.contractName, - type: "log", + kind: "LOG_FILTER", + logFilter: source, + fromBlock, + toBlock, }, - cachedFactoryLogFilterBlockCount, - ); - - // Use factory log filter progress for the logger because it better represents - // user-facing progress. - const cacheRate = Math.min( - 1, - cachedFactoryLogFilterBlockCount / - (targetFactoryLogFilterBlockCount || 1), + { priority: Number.MAX_SAFE_INTEGER - fromBlock }, ); - this.common.logger.info({ + } + if (logFilterTaskChunks.length > 0) { + const total = intervalSum(requiredLogFilterIntervals); + this.common.logger.debug({ service: "historical", - msg: `Started syncing '${this.network.name}' logs for '${ - source.contractName - }' with ${formatPercentage(cacheRate)} cached`, + msg: `Added '${this.network.name}' LOG_FILTER tasks for '${source.contractName}' over a ${total} block range`, }); } - break; - case "callTrace": { - const completedTraceFilterIntervals = - await this.syncStore.getTraceFilterIntervals({ - chainId: source.chainId, - traceFilter: source.criteria, - }); - const traceFilterProgressTracker = new ProgressTracker({ - target: [startBlock, endBlock], - completed: completedTraceFilterIntervals, - }); - this.traceFilterProgressTrackers[source.id] = - traceFilterProgressTracker; + const targetBlockCount = endBlock - startBlock + 1; + const cachedBlockCount = + targetBlockCount - intervalSum(requiredLogFilterIntervals); - const requiredTraceFilterIntervals = - traceFilterProgressTracker.getRequired(); + this.common.metrics.ponder_historical_total_blocks.set( + { + network: this.network.name, + source: source.contractName, + type: "log", + }, + targetBlockCount, + ); + this.common.metrics.ponder_historical_cached_blocks.set( + { + network: this.network.name, + source: source.contractName, + type: "log", + }, + cachedBlockCount, + ); - const traceFilterTaskChunks = getChunks({ - intervals: requiredTraceFilterIntervals, - maxChunkSize: TRACE_FILTER_CHUNK_SIZE, + this.common.logger.info({ + service: "historical", + msg: `Started syncing '${this.network.name}' logs for '${ + source.contractName + }' with ${formatPercentage( + Math.min(1, cachedBlockCount / (targetBlockCount || 1)), + )} cached`, + }); + } + break; + + case "factoryLog": { + // Note that factory child address progress is stored using + // log intervals for the factory log. + const completedFactoryChildAddressIntervals = + await this.syncStore.getLogFilterIntervals({ + chainId: source.chainId, + logFilter: { + address: source.criteria.address, + topics: [source.criteria.eventSelector], + includeTransactionReceipts: false, + }, }); + const factoryChildAddressProgressTracker = new ProgressTracker({ + target: [startBlock, endBlock], + completed: completedFactoryChildAddressIntervals, + }); + this.factoryChildAddressProgressTrackers[source.id] = + factoryChildAddressProgressTracker; - for (const [fromBlock, toBlock] of traceFilterTaskChunks) { - this.queue.addTask( - { - kind: "TRACE_FILTER", - traceFilter: source, - fromBlock, - toBlock, - }, - { priority: Number.MAX_SAFE_INTEGER - fromBlock }, - ); - } - if (traceFilterTaskChunks.length > 0) { - const total = intervalSum(requiredTraceFilterIntervals); - this.common.logger.debug({ - service: "historical", - msg: `Added '${this.network.name}' TRACE_FILTER tasks for '${source.contractName}' over a ${total} block range`, - }); - } - - const targetBlockCount = endBlock - startBlock + 1; - const cachedBlockCount = - targetBlockCount - intervalSum(requiredTraceFilterIntervals); + const requiredFactoryChildAddressIntervals = + factoryChildAddressProgressTracker.getRequired(); + const factoryChildAddressTaskChunks = getChunks({ + intervals: requiredFactoryChildAddressIntervals, + maxChunkSize: + source.maxBlockRange ?? this.network.defaultMaxBlockRange, + }); - this.common.metrics.ponder_historical_total_blocks.set( - { - network: this.network.name, - source: source.contractName, - type: "trace", - }, - targetBlockCount, - ); - this.common.metrics.ponder_historical_cached_blocks.set( + for (const [fromBlock, toBlock] of factoryChildAddressTaskChunks) { + this.queue.addTask( { - network: this.network.name, - source: source.contractName, - type: "trace", + kind: "FACTORY_CHILD_ADDRESS", + factory: source, + fromBlock, + toBlock, }, - cachedBlockCount, + { priority: Number.MAX_SAFE_INTEGER - fromBlock }, ); - - this.common.logger.info({ + } + if (factoryChildAddressTaskChunks.length > 0) { + const total = intervalSum(requiredFactoryChildAddressIntervals); + this.common.logger.debug({ service: "historical", - msg: `Started syncing '${this.network.name}' call traces for '${ - source.contractName - }' with ${formatPercentage( - Math.min(1, cachedBlockCount / (targetBlockCount || 1)), - )} cached`, + msg: `Added '${this.network.name}' FACTORY_CHILD_ADDRESS tasks for '${source.contractName}' over a ${total} block range`, }); } - break; - case "factoryCallTrace": { - // Note that factory child address progress is stored using - // log intervals for the factory log. - const completedFactoryChildAddressIntervals = - await this.syncStore.getLogFilterIntervals({ - chainId: source.chainId, - logFilter: { - address: source.criteria.address, - topics: [source.criteria.eventSelector], - includeTransactionReceipts: false, - }, - }); - const factoryChildAddressProgressTracker = new ProgressTracker({ - target: [startBlock, endBlock], - completed: completedFactoryChildAddressIntervals, - }); - this.factoryChildAddressProgressTrackers[source.id] = - factoryChildAddressProgressTracker; - - const requiredFactoryChildAddressIntervals = - factoryChildAddressProgressTracker.getRequired(); - const factoryChildAddressTaskChunks = getChunks({ - intervals: requiredFactoryChildAddressIntervals, - maxChunkSize: - source.maxBlockRange ?? this.network.defaultMaxBlockRange, + const targetFactoryChildAddressBlockCount = endBlock - startBlock + 1; + const cachedFactoryChildAddressBlockCount = + targetFactoryChildAddressBlockCount - + intervalSum(requiredFactoryChildAddressIntervals); + + this.common.metrics.ponder_historical_total_blocks.set( + { + network: this.network.name, + source: `${source.contractName}_factory`, + type: "log", + }, + targetFactoryChildAddressBlockCount, + ); + this.common.metrics.ponder_historical_cached_blocks.set( + { + network: this.network.name, + source: `${source.contractName}_factory`, + type: "log", + }, + cachedFactoryChildAddressBlockCount, + ); + + const completedFactoryLogFilterIntervals = + await this.syncStore.getFactoryLogFilterIntervals({ + chainId: source.chainId, + factory: source.criteria, }); + const factoryLogFilterProgressTracker = new ProgressTracker({ + target: [startBlock, endBlock], + completed: completedFactoryLogFilterIntervals, + }); + this.factoryLogFilterProgressTrackers[source.id] = + factoryLogFilterProgressTracker; + + // Only add factory log filter tasks for any intervals where the + // child address tasks are completed, but the factory log filter tasks are not, + // because these won't be added automatically by child address tasks. + const requiredFactoryLogFilterIntervals = + factoryLogFilterProgressTracker.getRequired(); + const missingFactoryLogFilterIntervals = intervalDifference( + requiredFactoryLogFilterIntervals, + requiredFactoryChildAddressIntervals, + ); - for (const [fromBlock, toBlock] of factoryChildAddressTaskChunks) { - this.queue.addTask( - { - kind: "FACTORY_CHILD_ADDRESS", - factory: source, - fromBlock, - toBlock, - }, - { priority: Number.MAX_SAFE_INTEGER - fromBlock }, - ); - } - if (factoryChildAddressTaskChunks.length > 0) { - const total = intervalSum(requiredFactoryChildAddressIntervals); - this.common.logger.debug({ - service: "historical", - msg: `Added '${this.network.name}' FACTORY_CHILD_ADDRESS tasks for '${source.contractName}' over a ${total} block range`, - }); - } - - const targetFactoryChildAddressBlockCount = - endBlock - startBlock + 1; - const cachedFactoryChildAddressBlockCount = - targetFactoryChildAddressBlockCount - - intervalSum(requiredFactoryChildAddressIntervals); + const missingFactoryLogFilterTaskChunks = getChunks({ + intervals: missingFactoryLogFilterIntervals, + maxChunkSize: + source.maxBlockRange ?? this.network.defaultMaxBlockRange, + }); - this.common.metrics.ponder_historical_total_blocks.set( + for (const [ + fromBlock, + toBlock, + ] of missingFactoryLogFilterTaskChunks) { + this.queue.addTask( { - network: this.network.name, - source: `${source.contractName}_factory`, - type: "trace", + kind: "FACTORY_LOG_FILTER", + factoryLogFilter: source, + fromBlock, + toBlock, }, - targetFactoryChildAddressBlockCount, + { priority: Number.MAX_SAFE_INTEGER - fromBlock }, ); - this.common.metrics.ponder_historical_cached_blocks.set( + } + if (missingFactoryLogFilterTaskChunks.length > 0) { + const total = intervalSum(missingFactoryLogFilterIntervals); + this.common.logger.debug({ + service: "historical", + msg: `Added '${this.network.name}' FACTORY_LOG_FILTER tasks for '${source.contractName}' over a ${total} block range`, + }); + } + + const targetFactoryLogFilterBlockCount = endBlock - startBlock + 1; + const cachedFactoryLogFilterBlockCount = + targetFactoryLogFilterBlockCount - + intervalSum(requiredFactoryLogFilterIntervals); + + this.common.metrics.ponder_historical_total_blocks.set( + { + network: this.network.name, + source: source.contractName, + type: "log", + }, + targetFactoryLogFilterBlockCount, + ); + this.common.metrics.ponder_historical_cached_blocks.set( + { + network: this.network.name, + source: source.contractName, + type: "log", + }, + cachedFactoryLogFilterBlockCount, + ); + + // Use factory log filter progress for the logger because it better represents + // user-facing progress. + const cacheRate = Math.min( + 1, + cachedFactoryLogFilterBlockCount / + (targetFactoryLogFilterBlockCount || 1), + ); + this.common.logger.info({ + service: "historical", + msg: `Started syncing '${this.network.name}' logs for '${ + source.contractName + }' with ${formatPercentage(cacheRate)} cached`, + }); + } + break; + + case "callTrace": { + const completedTraceFilterIntervals = + await this.syncStore.getTraceFilterIntervals({ + chainId: source.chainId, + traceFilter: source.criteria, + }); + const traceFilterProgressTracker = new ProgressTracker({ + target: [startBlock, endBlock], + completed: completedTraceFilterIntervals, + }); + this.traceFilterProgressTrackers[source.id] = + traceFilterProgressTracker; + + const requiredTraceFilterIntervals = + traceFilterProgressTracker.getRequired(); + + const traceFilterTaskChunks = getChunks({ + intervals: requiredTraceFilterIntervals, + maxChunkSize: TRACE_FILTER_CHUNK_SIZE, + }); + + for (const [fromBlock, toBlock] of traceFilterTaskChunks) { + this.queue.addTask( { - network: this.network.name, - source: `${source.contractName}_factory`, - type: "trace", + kind: "TRACE_FILTER", + traceFilter: source, + fromBlock, + toBlock, }, - cachedFactoryChildAddressBlockCount, + { priority: Number.MAX_SAFE_INTEGER - fromBlock }, ); - - const completedFactoryTraceFilterIntervals = - await this.syncStore.getFactoryTraceFilterIntervals({ - chainId: source.chainId, - factory: source.criteria, - }); - const factoryTraceFilterProgressTracker = new ProgressTracker({ - target: [startBlock, endBlock], - completed: completedFactoryTraceFilterIntervals, + } + if (traceFilterTaskChunks.length > 0) { + const total = intervalSum(requiredTraceFilterIntervals); + this.common.logger.debug({ + service: "historical", + msg: `Added '${this.network.name}' TRACE_FILTER tasks for '${source.contractName}' over a ${total} block range`, }); - this.factoryTraceFilterProgressTrackers[source.id] = - factoryTraceFilterProgressTracker; - - // Only add factory trace filter tasks for any intervals where the - // child address tasks are completed, but the factory trace filter tasks are not, - // because these won't be added automatically by child address tasks. - const requiredFactoryTraceFilterIntervals = - factoryTraceFilterProgressTracker.getRequired(); - const missingFactoryTraceFilterIntervals = intervalDifference( - requiredFactoryTraceFilterIntervals, - requiredFactoryChildAddressIntervals, - ); + } + + const targetBlockCount = endBlock - startBlock + 1; + const cachedBlockCount = + targetBlockCount - intervalSum(requiredTraceFilterIntervals); + + this.common.metrics.ponder_historical_total_blocks.set( + { + network: this.network.name, + source: source.contractName, + type: "trace", + }, + targetBlockCount, + ); + this.common.metrics.ponder_historical_cached_blocks.set( + { + network: this.network.name, + source: source.contractName, + type: "trace", + }, + cachedBlockCount, + ); + + this.common.logger.info({ + service: "historical", + msg: `Started syncing '${this.network.name}' call traces for '${ + source.contractName + }' with ${formatPercentage( + Math.min(1, cachedBlockCount / (targetBlockCount || 1)), + )} cached`, + }); + } + break; - const missingFactoryTraceFilterTaskChunks = getChunks({ - intervals: missingFactoryTraceFilterIntervals, - maxChunkSize: TRACE_FILTER_CHUNK_SIZE, + case "factoryCallTrace": { + // Note that factory child address progress is stored using + // log intervals for the factory log. + const completedFactoryChildAddressIntervals = + await this.syncStore.getLogFilterIntervals({ + chainId: source.chainId, + logFilter: { + address: source.criteria.address, + topics: [source.criteria.eventSelector], + includeTransactionReceipts: false, + }, }); + const factoryChildAddressProgressTracker = new ProgressTracker({ + target: [startBlock, endBlock], + completed: completedFactoryChildAddressIntervals, + }); + this.factoryChildAddressProgressTrackers[source.id] = + factoryChildAddressProgressTracker; - for (const [ - fromBlock, - toBlock, - ] of missingFactoryTraceFilterTaskChunks) { - this.queue.addTask( - { - kind: "FACTORY_TRACE_FILTER", - factoryTraceFilter: source, - fromBlock, - toBlock, - }, - { priority: Number.MAX_SAFE_INTEGER - fromBlock }, - ); - } - if (missingFactoryTraceFilterTaskChunks.length > 0) { - const total = intervalSum(missingFactoryTraceFilterIntervals); - this.common.logger.debug({ - service: "historical", - msg: `Added '${this.network.name}' FACTORY_TRACE_FILTER tasks for '${source.contractName}' over a ${total} block range`, - }); - } - - const targetFactoryTraceFilterBlockCount = - endBlock - startBlock + 1; - const cachedFactoryTraceFilterBlockCount = - targetFactoryTraceFilterBlockCount - - intervalSum(requiredFactoryTraceFilterIntervals); + const requiredFactoryChildAddressIntervals = + factoryChildAddressProgressTracker.getRequired(); + const factoryChildAddressTaskChunks = getChunks({ + intervals: requiredFactoryChildAddressIntervals, + maxChunkSize: + source.maxBlockRange ?? this.network.defaultMaxBlockRange, + }); - this.common.metrics.ponder_historical_total_blocks.set( + for (const [fromBlock, toBlock] of factoryChildAddressTaskChunks) { + this.queue.addTask( { - network: this.network.name, - source: source.contractName, - type: "trace", + kind: "FACTORY_CHILD_ADDRESS", + factory: source, + fromBlock, + toBlock, }, - targetFactoryTraceFilterBlockCount, + { priority: Number.MAX_SAFE_INTEGER - fromBlock }, ); - this.common.metrics.ponder_historical_cached_blocks.set( + } + if (factoryChildAddressTaskChunks.length > 0) { + const total = intervalSum(requiredFactoryChildAddressIntervals); + this.common.logger.debug({ + service: "historical", + msg: `Added '${this.network.name}' FACTORY_CHILD_ADDRESS tasks for '${source.contractName}' over a ${total} block range`, + }); + } + + const targetFactoryChildAddressBlockCount = endBlock - startBlock + 1; + const cachedFactoryChildAddressBlockCount = + targetFactoryChildAddressBlockCount - + intervalSum(requiredFactoryChildAddressIntervals); + + this.common.metrics.ponder_historical_total_blocks.set( + { + network: this.network.name, + source: `${source.contractName}_factory`, + type: "trace", + }, + targetFactoryChildAddressBlockCount, + ); + this.common.metrics.ponder_historical_cached_blocks.set( + { + network: this.network.name, + source: `${source.contractName}_factory`, + type: "trace", + }, + cachedFactoryChildAddressBlockCount, + ); + + const completedFactoryTraceFilterIntervals = + await this.syncStore.getFactoryTraceFilterIntervals({ + chainId: source.chainId, + factory: source.criteria, + }); + const factoryTraceFilterProgressTracker = new ProgressTracker({ + target: [startBlock, endBlock], + completed: completedFactoryTraceFilterIntervals, + }); + this.factoryTraceFilterProgressTrackers[source.id] = + factoryTraceFilterProgressTracker; + + // Only add factory trace filter tasks for any intervals where the + // child address tasks are completed, but the factory trace filter tasks are not, + // because these won't be added automatically by child address tasks. + const requiredFactoryTraceFilterIntervals = + factoryTraceFilterProgressTracker.getRequired(); + const missingFactoryTraceFilterIntervals = intervalDifference( + requiredFactoryTraceFilterIntervals, + requiredFactoryChildAddressIntervals, + ); + + const missingFactoryTraceFilterTaskChunks = getChunks({ + intervals: missingFactoryTraceFilterIntervals, + maxChunkSize: TRACE_FILTER_CHUNK_SIZE, + }); + + for (const [ + fromBlock, + toBlock, + ] of missingFactoryTraceFilterTaskChunks) { + this.queue.addTask( { - network: this.network.name, - source: source.contractName, - type: "trace", + kind: "FACTORY_TRACE_FILTER", + factoryTraceFilter: source, + fromBlock, + toBlock, }, - cachedFactoryTraceFilterBlockCount, - ); - - // Use factory trace filter progress for the logger because it better represents - // user-facing progress. - const cacheRate = Math.min( - 1, - cachedFactoryTraceFilterBlockCount / - (targetFactoryTraceFilterBlockCount || 1), + { priority: Number.MAX_SAFE_INTEGER - fromBlock }, ); - this.common.logger.info({ + } + if (missingFactoryTraceFilterTaskChunks.length > 0) { + const total = intervalSum(missingFactoryTraceFilterIntervals); + this.common.logger.debug({ service: "historical", - msg: `Started syncing '${this.network.name}' call traces for '${ - source.contractName - }' with ${formatPercentage(cacheRate)} cached`, + msg: `Added '${this.network.name}' FACTORY_TRACE_FILTER tasks for '${source.contractName}' over a ${total} block range`, }); } - break; - case "block": { - const completedBlockFilterIntervals = - await this.syncStore.getBlockFilterIntervals({ - chainId: source.chainId, - blockFilter: source.criteria, - }); - const blockFilterProgressTracker = new ProgressTracker({ - target: [startBlock, endBlock], - completed: completedBlockFilterIntervals, - }); - this.blockFilterProgressTrackers[source.id] = - blockFilterProgressTracker; + const targetFactoryTraceFilterBlockCount = endBlock - startBlock + 1; + const cachedFactoryTraceFilterBlockCount = + targetFactoryTraceFilterBlockCount - + intervalSum(requiredFactoryTraceFilterIntervals); - const requiredBlockFilterIntervals = - blockFilterProgressTracker.getRequired(); + this.common.metrics.ponder_historical_total_blocks.set( + { + network: this.network.name, + source: source.contractName, + type: "trace", + }, + targetFactoryTraceFilterBlockCount, + ); + this.common.metrics.ponder_historical_cached_blocks.set( + { + network: this.network.name, + source: source.contractName, + type: "trace", + }, + cachedFactoryTraceFilterBlockCount, + ); - // block filters are chunked into intervals to avoid unmanageable - // amounts of block callbacks being added at once. + // Use factory trace filter progress for the logger because it better represents + // user-facing progress. + const cacheRate = Math.min( + 1, + cachedFactoryTraceFilterBlockCount / + (targetFactoryTraceFilterBlockCount || 1), + ); + this.common.logger.info({ + service: "historical", + msg: `Started syncing '${this.network.name}' call traces for '${ + source.contractName + }' with ${formatPercentage(cacheRate)} cached`, + }); + } + break; - const blockFilterTaskChunks = getChunks({ - intervals: requiredBlockFilterIntervals, - maxChunkSize: this.network.defaultMaxBlockRange, + case "block": { + const completedBlockFilterIntervals = + await this.syncStore.getBlockFilterIntervals({ + chainId: source.chainId, + blockFilter: source.criteria, }); + const blockFilterProgressTracker = new ProgressTracker({ + target: [startBlock, endBlock], + completed: completedBlockFilterIntervals, + }); + this.blockFilterProgressTrackers[source.id] = + blockFilterProgressTracker; - for (const [fromBlock, toBlock] of blockFilterTaskChunks) { - this.queue.addTask( - { - kind: "BLOCK_FILTER", - blockFilter: source, - fromBlock, - toBlock, - }, - { priority: Number.MAX_SAFE_INTEGER - fromBlock }, - ); - } - if (blockFilterTaskChunks.length > 0) { - const total = intervalSum(requiredBlockFilterIntervals); - this.common.logger.debug({ - service: "historical", - msg: `Added '${this.network.name}' BLOCK_FILTER tasks for '${source.sourceName}' over a ${total} block range`, - }); - } - - const targetBlockCount = endBlock - startBlock + 1; - const cachedBlockCount = - targetBlockCount - intervalSum(requiredBlockFilterIntervals); + const requiredBlockFilterIntervals = + blockFilterProgressTracker.getRequired(); - this.common.metrics.ponder_historical_total_blocks.set( - { - network: this.network.name, - source: source.sourceName, - type: "block", - }, - targetBlockCount, - ); - this.common.metrics.ponder_historical_cached_blocks.set( + // block filters are chunked into intervals to avoid unmanageable + // amounts of block callbacks being added at once. + + const blockFilterTaskChunks = getChunks({ + intervals: requiredBlockFilterIntervals, + maxChunkSize: this.network.defaultMaxBlockRange, + }); + + for (const [fromBlock, toBlock] of blockFilterTaskChunks) { + this.queue.addTask( { - network: this.network.name, - source: source.sourceName, - type: "block", + kind: "BLOCK_FILTER", + blockFilter: source, + fromBlock, + toBlock, }, - cachedBlockCount, + { priority: Number.MAX_SAFE_INTEGER - fromBlock }, ); - - this.common.logger.info({ + } + if (blockFilterTaskChunks.length > 0) { + const total = intervalSum(requiredBlockFilterIntervals); + this.common.logger.debug({ service: "historical", - msg: `Started syncing '${this.network.name}' blocks for '${ - source.sourceName - }' with ${formatPercentage( - Math.min(1, cachedBlockCount / (targetBlockCount || 1)), - )} cached`, + msg: `Added '${this.network.name}' BLOCK_FILTER tasks for '${source.sourceName}' over a ${total} block range`, }); } - break; - default: - never(source); + const targetBlockCount = endBlock - startBlock + 1; + const cachedBlockCount = + targetBlockCount - intervalSum(requiredBlockFilterIntervals); + + this.common.metrics.ponder_historical_total_blocks.set( + { + network: this.network.name, + source: source.sourceName, + type: "block", + }, + targetBlockCount, + ); + this.common.metrics.ponder_historical_cached_blocks.set( + { + network: this.network.name, + source: source.sourceName, + type: "block", + }, + cachedBlockCount, + ); + + this.common.logger.info({ + service: "historical", + msg: `Started syncing '${this.network.name}' blocks for '${ + source.sourceName + }' with ${formatPercentage( + Math.min(1, cachedBlockCount / (targetBlockCount || 1)), + )} cached`, + }); } - }), - ); + break; + + default: + never(source); + } + } } start() { diff --git a/packages/core/src/sync-store/postgres/store.ts b/packages/core/src/sync-store/postgres/store.ts index 882a84e2c..b41c56329 100644 --- a/packages/core/src/sync-store/postgres/store.ts +++ b/packages/core/src/sync-store/postgres/store.ts @@ -1,3 +1,5 @@ +import type { Common } from "@/common/common.js"; +import { NonRetryableError } from "@/common/errors.js"; import { type BlockFilterCriteria, type CallTraceFilterCriteria, @@ -63,9 +65,14 @@ import { export class PostgresSyncStore implements SyncStore { kind = "postgres" as const; db: HeadlessKysely; + common: Common; - constructor({ db }: { db: HeadlessKysely }) { + constructor({ + db, + common, + }: { db: HeadlessKysely; common: Common }) { this.db = db; + this.common = common; } insertLogFilterInterval = async ({ @@ -161,9 +168,9 @@ export class PostgresSyncStore implements SyncStore { const fragments = buildLogFilterFragments({ ...logFilter, chainId }); // First, attempt to merge overlapping and adjacent intervals. - await Promise.all( - fragments.map(async (fragment) => { - return await this.db.transaction().execute(async (tx) => { + for (const fragment of fragments) { + await this.db.transaction().execute(async (tx) => { + while (true) { const { id: logFilterId } = await tx .insertInto("logFilters") .values(fragment) @@ -171,14 +178,23 @@ export class PostgresSyncStore implements SyncStore { .returningAll() .executeTakeFirstOrThrow(); - const existingIntervalRows = await tx + // This is a trick to add a LIMIT to a DELETE statement + const existingIntervals = await tx .deleteFrom("logFilterIntervals") - .where("logFilterId", "=", logFilterId) - .returningAll() + .where( + "id", + "in", + tx + .selectFrom("logFilterIntervals") + .where("logFilterId", "=", logFilterId) + .select("id") + .limit(this.common.options.syncMaxIntervals), + ) + .returning(["startBlock", "endBlock"]) .execute(); const mergedIntervals = intervalUnion( - existingIntervalRows.map((i) => [ + existingIntervals.map((i) => [ Number(i.startBlock), Number(i.endBlock), ]), @@ -198,9 +214,23 @@ export class PostgresSyncStore implements SyncStore { .values(mergedIntervalRows) .execute(); } - }); - }), - ); + + if ( + mergedIntervalRows.length === this.common.options.syncMaxIntervals + ) { + // This occurs when there are too many non-mergeable ranges with the same logFilterId. Should be almost impossible. + throw new NonRetryableError( + `'logFilterIntervals' table for chain '${chainId}' has reached an unrecoverable level of fragmentation.`, + ); + } + + if ( + existingIntervals.length !== this.common.options.syncMaxIntervals + ) + break; + } + }); + } const intervals = await this.db .with( @@ -438,9 +468,9 @@ export class PostgresSyncStore implements SyncStore { async () => { const fragments = buildFactoryLogFragments({ ...factory, chainId }); - await Promise.all( - fragments.map(async (fragment) => { - await this.db.transaction().execute(async (tx) => { + for (const fragment of fragments) { + await this.db.transaction().execute(async (tx) => { + while (true) { const { id: factoryId } = await tx .insertInto("factoryLogFilters") .values(fragment) @@ -448,10 +478,19 @@ export class PostgresSyncStore implements SyncStore { .returningAll() .executeTakeFirstOrThrow(); + // This is a trick to add a LIMIT to a DELETE statement const existingIntervals = await tx .deleteFrom("factoryLogFilterIntervals") - .where("factoryId", "=", factoryId) - .returningAll() + .where( + "id", + "in", + tx + .selectFrom("factoryLogFilterIntervals") + .where("factoryId", "=", factoryId) + .select("id") + .limit(this.common.options.syncMaxIntervals), + ) + .returning(["startBlock", "endBlock"]) .execute(); const mergedIntervals = intervalUnion( @@ -475,9 +514,25 @@ export class PostgresSyncStore implements SyncStore { .values(mergedIntervalRows) .execute(); } - }); - }), - ); + + if ( + mergedIntervalRows.length === + this.common.options.syncMaxIntervals + ) { + // This occurs when there are too many non-mergeable ranges with the same factoryId. Should be almost impossible. + throw new NonRetryableError( + `'factoryLogFilterIntervals' table for chain '${chainId}' has reached an unrecoverable level of fragmentation.`, + ); + } + + if ( + existingIntervals.length !== + this.common.options.syncMaxIntervals + ) + break; + } + }); + } const intervals = await this.db .with( @@ -804,9 +859,9 @@ export class PostgresSyncStore implements SyncStore { const fragments = buildTraceFragments({ ...traceFilter, chainId }); // First, attempt to merge overlapping and adjacent intervals. - await Promise.all( - fragments.map(async (fragment) => { - return await this.db.transaction().execute(async (tx) => { + for (const fragment of fragments) { + await this.db.transaction().execute(async (tx) => { + while (true) { const { id: traceFilterId } = await tx .insertInto("traceFilters") .values(fragment) @@ -814,14 +869,23 @@ export class PostgresSyncStore implements SyncStore { .returningAll() .executeTakeFirstOrThrow(); - const existingIntervalRows = await tx + // This is a trick to add a LIMIT to a DELETE statement This is a trick to add a LIMIT to a DELETE statement + const existingIntervals = await tx .deleteFrom("traceFilterIntervals") - .where("traceFilterId", "=", traceFilterId) - .returningAll() + .where( + "id", + "in", + tx + .selectFrom("traceFilterIntervals") + .where("traceFilterId", "=", traceFilterId) + .select("id") + .limit(this.common.options.syncMaxIntervals), + ) + .returning(["startBlock", "endBlock"]) .execute(); const mergedIntervals = intervalUnion( - existingIntervalRows.map((i) => [ + existingIntervals.map((i) => [ Number(i.startBlock), Number(i.endBlock), ]), @@ -841,9 +905,23 @@ export class PostgresSyncStore implements SyncStore { .values(mergedIntervalRows) .execute(); } - }); - }), - ); + + if ( + mergedIntervalRows.length === this.common.options.syncMaxIntervals + ) { + // This occurs when there are too many non-mergeable ranges with the same factoryId. Should be almost impossible. + throw new NonRetryableError( + `'traceFilterIntervals' table for chain '${chainId}' has reached an unrecoverable level of fragmentation.`, + ); + } + + if ( + existingIntervals.length !== this.common.options.syncMaxIntervals + ) + break; + } + }); + } const intervals = await this.db .with( @@ -1029,9 +1107,9 @@ export class PostgresSyncStore implements SyncStore { async () => { const fragments = buildFactoryTraceFragments({ ...factory, chainId }); - await Promise.all( - fragments.map(async (fragment) => { - return await this.db.transaction().execute(async (tx) => { + for (const fragment of fragments) { + await this.db.transaction().execute(async (tx) => { + while (true) { const { id: factoryId } = await tx .insertInto("factoryTraceFilters") .values(fragment) @@ -1039,10 +1117,19 @@ export class PostgresSyncStore implements SyncStore { .returningAll() .executeTakeFirstOrThrow(); + // This is a trick to add a LIMIT to a DELETE statement const existingIntervals = await tx .deleteFrom("factoryTraceFilterIntervals") - .where("factoryId", "=", factoryId) - .returningAll() + .where( + "id", + "in", + tx + .selectFrom("factoryTraceFilterIntervals") + .where("factoryId", "=", factoryId) + .select("id") + .limit(this.common.options.syncMaxIntervals), + ) + .returning(["startBlock", "endBlock"]) .execute(); const mergedIntervals = intervalUnion( @@ -1066,9 +1153,25 @@ export class PostgresSyncStore implements SyncStore { .values(mergedIntervalRows) .execute(); } - }); - }), - ); + + if ( + mergedIntervalRows.length === + this.common.options.syncMaxIntervals + ) { + // This occurs when there are too many non-mergeable ranges with the same factoryId. Should be almost impossible. + throw new NonRetryableError( + `'factoryTraceFilterIntervals' table for chain '${chainId}' has reached an unrecoverable level of fragmentation.`, + ); + } + + if ( + existingIntervals.length !== + this.common.options.syncMaxIntervals + ) + break; + } + }); + } const intervals = await this.db .with( diff --git a/packages/core/src/sync-store/sqlite/store.ts b/packages/core/src/sync-store/sqlite/store.ts index 682a1eefa..683e6a5e9 100644 --- a/packages/core/src/sync-store/sqlite/store.ts +++ b/packages/core/src/sync-store/sqlite/store.ts @@ -1,3 +1,5 @@ +import type { Common } from "@/common/common.js"; +import { NonRetryableError } from "@/common/errors.js"; import { type BlockFilterCriteria, type CallTraceFilterCriteria, @@ -72,9 +74,14 @@ import { export class SqliteSyncStore implements SyncStore { kind = "sqlite" as const; db: HeadlessKysely; + common: Common; - constructor({ db }: { db: HeadlessKysely }) { + constructor({ + db, + common, + }: { db: HeadlessKysely; common: Common }) { this.db = db; + this.common = common; } insertLogFilterInterval = async ({ @@ -169,10 +176,9 @@ export class SqliteSyncStore implements SyncStore { return this.db.wrap({ method: "getLogFilterIntervals" }, async () => { const fragments = buildLogFilterFragments({ ...logFilter, chainId }); - // First, attempt to merge overlapping and adjacent intervals. - await Promise.all( - fragments.map(async (fragment) => { - return await this.db.transaction().execute(async (tx) => { + for (const fragment of fragments) { + await this.db.transaction().execute(async (tx) => { + while (true) { const { id: logFilterId } = await tx .insertInto("logFilters") .values(fragment) @@ -180,14 +186,23 @@ export class SqliteSyncStore implements SyncStore { .returningAll() .executeTakeFirstOrThrow(); - const existingIntervalRows = await tx + // This is a trick to add a LIMIT to a DELETE statement + const existingIntervals = await tx .deleteFrom("logFilterIntervals") - .where("logFilterId", "=", logFilterId) - .returningAll() + .where( + "id", + "in", + tx + .selectFrom("logFilterIntervals") + .where("logFilterId", "=", logFilterId) + .select("id") + .limit(this.common.options.syncMaxIntervals), + ) + .returning(["startBlock", "endBlock"]) .execute(); const mergedIntervals = intervalUnion( - existingIntervalRows.map((i) => [ + existingIntervals.map((i) => [ Number(decodeToBigInt(i.startBlock)), Number(decodeToBigInt(i.endBlock)), ]), @@ -207,9 +222,23 @@ export class SqliteSyncStore implements SyncStore { .values(mergedIntervalRows) .execute(); } - }); - }), - ); + + if ( + mergedIntervalRows.length === this.common.options.syncMaxIntervals + ) { + // This occurs when there are too many non-mergeable ranges with the same logFilterId. Should be almost impossible. + throw new NonRetryableError( + `'logFilterIntervals' table for chain '${chainId}' has reached an unrecoverable level of fragmentation.`, + ); + } + + if ( + existingIntervals.length !== this.common.options.syncMaxIntervals + ) + break; + } + }); + } const intervals = await this.db .with( @@ -449,9 +478,9 @@ export class SqliteSyncStore implements SyncStore { async () => { const fragments = buildFactoryLogFragments({ ...factory, chainId }); - await Promise.all( - fragments.map(async (fragment) => { - return await this.db.transaction().execute(async (tx) => { + for (const fragment of fragments) { + await this.db.transaction().execute(async (tx) => { + while (true) { const { id: factoryId } = await tx .insertInto("factoryLogFilters") .values(fragment) @@ -459,10 +488,19 @@ export class SqliteSyncStore implements SyncStore { .returningAll() .executeTakeFirstOrThrow(); + // This is a trick to add a LIMIT to a DELETE statement const existingIntervals = await tx .deleteFrom("factoryLogFilterIntervals") - .where("factoryId", "=", factoryId) - .returningAll() + .where( + "id", + "in", + tx + .selectFrom("factoryLogFilterIntervals") + .where("factoryId", "=", factoryId) + .select("id") + .limit(this.common.options.syncMaxIntervals), + ) + .returning(["startBlock", "endBlock"]) .execute(); const mergedIntervals = intervalUnion( @@ -486,9 +524,25 @@ export class SqliteSyncStore implements SyncStore { .values(mergedIntervalRows) .execute(); } - }); - }), - ); + + if ( + mergedIntervalRows.length === + this.common.options.syncMaxIntervals + ) { + // This occurs when there are too many non-mergeable ranges with the same factoryId. Should be almost impossible. + throw new NonRetryableError( + `'factoryLogFilterIntervals' table for chain '${chainId}' has reached an unrecoverable level of fragmentation.`, + ); + } + + if ( + existingIntervals.length !== + this.common.options.syncMaxIntervals + ) + break; + } + }); + } const intervals = await this.db .with( @@ -814,10 +868,9 @@ export class SqliteSyncStore implements SyncStore { return this.db.wrap({ method: "getTraceFilterIntervals" }, async () => { const fragments = buildTraceFragments({ ...traceFilter, chainId }); - // First, attempt to merge overlapping and adjacent intervals. - await Promise.all( - fragments.map(async (fragment) => { - return await this.db.transaction().execute(async (tx) => { + for (const fragment of fragments) { + await this.db.transaction().execute(async (tx) => { + while (true) { const { id: traceFilterId } = await tx .insertInto("traceFilters") .values(fragment) @@ -825,14 +878,23 @@ export class SqliteSyncStore implements SyncStore { .returningAll() .executeTakeFirstOrThrow(); - const existingIntervalRows = await tx + // This is a trick to add a LIMIT to a DELETE statement + const existingIntervals = await tx .deleteFrom("traceFilterIntervals") - .where("traceFilterId", "=", traceFilterId) - .returningAll() + .where( + "id", + "in", + tx + .selectFrom("traceFilterIntervals") + .where("traceFilterId", "=", traceFilterId) + .select("id") + .limit(this.common.options.syncMaxIntervals), + ) + .returning(["startBlock", "endBlock"]) .execute(); const mergedIntervals = intervalUnion( - existingIntervalRows.map((i) => [ + existingIntervals.map((i) => [ Number(decodeToBigInt(i.startBlock)), Number(decodeToBigInt(i.endBlock)), ]), @@ -852,9 +914,23 @@ export class SqliteSyncStore implements SyncStore { .values(mergedIntervalRows) .execute(); } - }); - }), - ); + + if ( + mergedIntervalRows.length === this.common.options.syncMaxIntervals + ) { + // This occurs when there are too many non-mergeable ranges with the same factoryId. Should be almost impossible. + throw new NonRetryableError( + `'traceFilterIntervals' table for chain '${chainId}' has reached an unrecoverable level of fragmentation.`, + ); + } + + if ( + existingIntervals.length !== this.common.options.syncMaxIntervals + ) + break; + } + }); + } const intervals = await this.db .with( @@ -1040,9 +1116,9 @@ export class SqliteSyncStore implements SyncStore { async () => { const fragments = buildFactoryTraceFragments({ ...factory, chainId }); - await Promise.all( - fragments.map(async (fragment) => { - return await this.db.transaction().execute(async (tx) => { + for (const fragment of fragments) { + await this.db.transaction().execute(async (tx) => { + while (true) { const { id: factoryId } = await tx .insertInto("factoryTraceFilters") .values(fragment) @@ -1050,10 +1126,19 @@ export class SqliteSyncStore implements SyncStore { .returningAll() .executeTakeFirstOrThrow(); + // This is a trick to add a LIMIT to a DELETE statement const existingIntervals = await tx .deleteFrom("factoryTraceFilterIntervals") - .where("factoryId", "=", factoryId) - .returningAll() + .where( + "id", + "in", + tx + .selectFrom("factoryTraceFilterIntervals") + .where("factoryId", "=", factoryId) + .select("id") + .limit(this.common.options.syncMaxIntervals), + ) + .returning(["startBlock", "endBlock"]) .execute(); const mergedIntervals = intervalUnion( @@ -1077,9 +1162,25 @@ export class SqliteSyncStore implements SyncStore { .values(mergedIntervalRows) .execute(); } - }); - }), - ); + + if ( + mergedIntervalRows.length === + this.common.options.syncMaxIntervals + ) { + // This occurs when there are too many non-mergeable ranges with the same factoryId. Should be almost impossible. + throw new NonRetryableError( + `'factoryTraceFilterIntervals' table for chain '${chainId}' has reached an unrecoverable level of fragmentation.`, + ); + } + + if ( + existingIntervals.length !== + this.common.options.syncMaxIntervals + ) + break; + } + }); + } const intervals = await this.db .with( diff --git a/packages/core/src/sync-store/store.test.ts b/packages/core/src/sync-store/store.test.ts index b8a5c7ef6..27dced310 100644 --- a/packages/core/src/sync-store/store.test.ts +++ b/packages/core/src/sync-store/store.test.ts @@ -7,6 +7,7 @@ import { setupIsolatedDatabase, } from "@/_test/setup.js"; import { getRawRPCData, publicClient } from "@/_test/utils.js"; +import { NonRetryableError } from "@/common/errors.js"; import { type BlockFilterCriteria, type FactoryLogFilterCriteria, @@ -21,6 +22,7 @@ import { zeroCheckpoint, } from "@/utils/checkpoint.js"; import { drainAsyncGenerator } from "@/utils/drainAsyncGenerator.js"; +import { range } from "@/utils/range.js"; import { type Address, type Hex, @@ -437,46 +439,49 @@ test("getLogFilterIntervals merges overlapping intervals that both match a filte await syncStore.insertLogFilterInterval({ chainId: 1, logFilter: { - topics: [["0xc", "0xd"], null, null, null], + address: ["0xa", "0xb"], + topics: [["0xc", "0xd"], null, "0xe", null], includeTransactionReceipts: false, }, ...rpcData.block2, - interval: { startBlock: 0n, endBlock: 50n }, + interval: { startBlock: 0n, endBlock: 100n }, }); - // Broad criteria only includes broad intervals. - let logFilterIntervals = await syncStore.getLogFilterIntervals({ + // This is a narrower inclusion criteria on `address` and `topic0`. Full range is available. + let logFilterRanges = await syncStore.getLogFilterIntervals({ chainId: 1, logFilter: { - address: "0xaddress", - topics: [["0xc"], null, null, null], + address: ["0xa"], + topics: [["0xc"], null, "0xe", null], includeTransactionReceipts: false, }, }); - expect(logFilterIntervals).toMatchObject([[0, 50]]); - await syncStore.insertLogFilterInterval({ + expect(logFilterRanges).toMatchObject([[0, 100]]); + + // This is a broader inclusion criteria on `address`. No ranges available. + logFilterRanges = await syncStore.getLogFilterIntervals({ chainId: 1, logFilter: { - address: "0xaddress", - topics: [["0xc"], null, null, null], + address: undefined, + topics: [["0xc"], null, "0xe", null], includeTransactionReceipts: false, }, - ...rpcData.block2, - interval: { startBlock: 0n, endBlock: 100n }, }); - logFilterIntervals = await syncStore.getLogFilterIntervals({ + expect(logFilterRanges).toMatchObject([]); + + // This is a narrower inclusion criteria on `topic1`. Full range available. + logFilterRanges = await syncStore.getLogFilterIntervals({ chainId: 1, logFilter: { - address: "0xaddress", - topics: [["0xc"], null, null, null], + address: ["0xa"], + topics: [["0xc"], "0xd", "0xe", null], includeTransactionReceipts: false, }, }); - expect(logFilterIntervals).toMatchObject([[0, 100]]); - + expect(logFilterRanges).toMatchObject([[0, 100]]); await cleanup(); }); @@ -552,6 +557,72 @@ test("getLogFilterIntervals handles includeTransactionReceipts", async (context) await cleanup(); }); +test("getLogFilterIntervals handles size over MAX", async (context) => { + const { sources } = context; + const { syncStore, cleanup } = await setupDatabaseServices(context); + const rpcData = await getRawRPCData(sources); + + context.common.options = { ...context.common.options, syncMaxIntervals: 20 }; + + for (const i in range(0, 25)) { + await syncStore.insertLogFilterInterval({ + chainId: 1, + logFilter: { + topics: [null, null, null, null], + includeTransactionReceipts: false, + }, + ...rpcData.block2, + interval: { startBlock: BigInt(i), endBlock: BigInt(i) }, + }); + } + + const logFilterRanges = await syncStore.getLogFilterIntervals({ + chainId: 1, + logFilter: { + topics: [null, null, null, null], + includeTransactionReceipts: false, + }, + }); + + expect(logFilterRanges).toMatchObject([[0, 24]]); + + await cleanup(); +}); + +test("getLogFilterIntervals throws non-retryable error after no merges", async (context) => { + const { sources } = context; + const { syncStore, cleanup } = await setupDatabaseServices(context); + const rpcData = await getRawRPCData(sources); + + context.common.options = { ...context.common.options, syncMaxIntervals: 20 }; + + for (let i = 0; i < 50; i += 2) { + await syncStore.insertLogFilterInterval({ + chainId: 1, + logFilter: { + topics: [null, null, null, null], + includeTransactionReceipts: false, + }, + ...rpcData.block2, + interval: { startBlock: BigInt(i), endBlock: BigInt(i) }, + }); + } + + const error = await syncStore + .getLogFilterIntervals({ + chainId: 1, + logFilter: { + topics: [null, null, null, null], + includeTransactionReceipts: false, + }, + }) + .catch((err) => err); + + expect(error).toBeInstanceOf(NonRetryableError); + + await cleanup(); +}); + test("insertFactoryChildAddressLogs inserts logs", async (context) => { const { sources } = context; const { syncStore, cleanup } = await setupDatabaseServices(context); diff --git a/packages/core/src/sync-store/store.ts b/packages/core/src/sync-store/store.ts index 0db03336d..97f1a03b7 100644 --- a/packages/core/src/sync-store/store.ts +++ b/packages/core/src/sync-store/store.ts @@ -1,3 +1,4 @@ +import type { Common } from "@/common/common.js"; import type { BlockFilterCriteria, CallTraceFilterCriteria, @@ -38,6 +39,7 @@ export type RawEvent = { export interface SyncStore { kind: "sqlite" | "postgres"; db: HeadlessKysely; + common: Common; /** * Insert a list of logs & associated transactions matching a given log filter diff --git a/packages/create-ponder/CHANGELOG.md b/packages/create-ponder/CHANGELOG.md index 90c623539..e4bfcc01e 100644 --- a/packages/create-ponder/CHANGELOG.md +++ b/packages/create-ponder/CHANGELOG.md @@ -1,5 +1,7 @@ # create-ponder +## 0.4.36 + ## 0.4.35 ## 0.4.34 diff --git a/packages/create-ponder/package.json b/packages/create-ponder/package.json index 78df00b57..eaa8d8358 100644 --- a/packages/create-ponder/package.json +++ b/packages/create-ponder/package.json @@ -1,6 +1,6 @@ { "name": "create-ponder", - "version": "0.4.35", + "version": "0.4.36", "type": "module", "description": "A CLI tool to create Ponder apps", "license": "MIT", diff --git a/packages/eslint-config-ponder/CHANGELOG.md b/packages/eslint-config-ponder/CHANGELOG.md index adfe45439..af7dba1ab 100644 --- a/packages/eslint-config-ponder/CHANGELOG.md +++ b/packages/eslint-config-ponder/CHANGELOG.md @@ -1,5 +1,7 @@ # eslint-config-ponder +## 0.4.36 + ## 0.4.35 ## 0.4.34 diff --git a/packages/eslint-config-ponder/package.json b/packages/eslint-config-ponder/package.json index 329a4430a..6018593ba 100644 --- a/packages/eslint-config-ponder/package.json +++ b/packages/eslint-config-ponder/package.json @@ -1,6 +1,6 @@ { "name": "eslint-config-ponder", - "version": "0.4.35", + "version": "0.4.36", "description": "ESLint config for Ponder apps", "license": "MIT", "main": "./index.js",