From bfb59252b1846d401922537cb0128e7756f85e0a Mon Sep 17 00:00:00 2001 From: Kyle Scott Date: Wed, 12 Jun 2024 20:20:38 -0400 Subject: [PATCH] Revert getEvents performance improvements --- packages/core/src/bin/utils/run.ts | 35 ++-- packages/core/src/common/options.ts | 2 - packages/core/src/indexing/index.ts | 4 +- packages/core/src/indexing/service.ts | 49 ++++-- .../core/src/sync-historical/service.test.ts | 5 + .../core/src/sync-store/postgres/store.ts | 156 +++++++++++++----- packages/core/src/sync-store/sqlite/store.ts | 152 ++++++++++++----- packages/core/src/sync-store/store.bench.ts | 3 + packages/core/src/sync-store/store.test.ts | 151 ++++++++++------- packages/core/src/sync-store/store.ts | 7 + packages/core/src/sync/service.ts | 87 ++-------- 11 files changed, 399 insertions(+), 252 deletions(-) diff --git a/packages/core/src/bin/utils/run.ts b/packages/core/src/bin/utils/run.ts index 4d9d61fa5..5fd57b197 100644 --- a/packages/core/src/bin/utils/run.ts +++ b/packages/core/src/bin/utils/run.ts @@ -124,8 +124,13 @@ export async function run({ initialCheckpoint, }); - const handleEvents = async (events: Event[], toCheckpoint: Checkpoint) => { - indexingService.updateTotalSeconds(toCheckpoint); + const handleEvents = async ( + events: Event[], + lastEventCheckpoint: Checkpoint | undefined, + ) => { + if (lastEventCheckpoint !== undefined) { + indexingService.updateLastEventCheckpoint(lastEventCheckpoint); + } if (events.length === 0) return { status: "success" } as const; @@ -150,14 +155,20 @@ export async function run({ worker: async (event: RealtimeEvent) => { switch (event.type) { case "newEvents": { + const lastEventCheckpoint = await syncStore.getLastEventCheckpoint({ + sources: sources, + fromCheckpoint: event.fromCheckpoint, + toCheckpoint: event.toCheckpoint, + }); for await (const rawEvents of syncStore.getEvents({ sources, fromCheckpoint: event.fromCheckpoint, toCheckpoint: event.toCheckpoint, + limit: 1_000, })) { const result = await handleEvents( decodeEvents(syncService, rawEvents), - event.toCheckpoint, + lastEventCheckpoint, ); if (result.status === "error") onReloadableError(result.error); } @@ -229,14 +240,21 @@ export async function run({ fromCheckpoint, toCheckpoint, } of syncService.getHistoricalCheckpoint()) { + const lastEventCheckpoint = await syncStore.getLastEventCheckpoint({ + sources: sources, + fromCheckpoint, + toCheckpoint, + }); + for await (const rawEvents of syncStore.getEvents({ sources: sources, fromCheckpoint, toCheckpoint, + limit: 1_000, })) { const result = await handleEvents( decodeEvents(syncService, rawEvents), - toCheckpoint, + lastEventCheckpoint, ); if (result.status === "killed") { @@ -250,15 +268,6 @@ export async function run({ await historicalStore.flush({ isFullFlush: true }); - // set completed indexing metrics - common.metrics.ponder_indexing_completed_seconds.set( - syncService.checkpoint.blockTimestamp - - syncService.startCheckpoint.blockTimestamp, - ); - common.metrics.ponder_indexing_completed_timestamp.set( - syncService.checkpoint.blockTimestamp, - ); - // Become healthy common.logger.info({ service: "indexing", diff --git a/packages/core/src/common/options.ts b/packages/core/src/common/options.ts index b18f62c43..e7739a95c 100644 --- a/packages/core/src/common/options.ts +++ b/packages/core/src/common/options.ts @@ -35,7 +35,6 @@ export type Options = { indexingCacheBytes: number; syncMaxIntervals: number; - syncEventsQuerySize: number; }; export const buildOptions = ({ cliOptions }: { cliOptions: CliOptions }) => { @@ -107,6 +106,5 @@ export const buildOptions = ({ cliOptions }: { cliOptions: CliOptions }) => { indexingCacheBytes: os.freemem() / 3, syncMaxIntervals: 50_000, - syncEventsQuerySize: 10_000, } satisfies Options; }; diff --git a/packages/core/src/indexing/index.ts b/packages/core/src/indexing/index.ts index ed1b4da7d..a2f626c13 100644 --- a/packages/core/src/indexing/index.ts +++ b/packages/core/src/indexing/index.ts @@ -5,7 +5,7 @@ import { processEvents, processSetupEvents, updateIndexingStore, - updateTotalSeconds, + updateLastEventCheckpoint, } from "./service.js"; import type { Context, Service } from "./service.js"; @@ -15,7 +15,7 @@ const methods = { processEvents, processSetupEvents, updateIndexingStore, - updateTotalSeconds, + updateLastEventCheckpoint, }; export const createIndexingService = extend(create, methods); diff --git a/packages/core/src/indexing/service.ts b/packages/core/src/indexing/service.ts index a042d5843..5cd296ee9 100644 --- a/packages/core/src/indexing/service.ts +++ b/packages/core/src/indexing/service.ts @@ -66,7 +66,8 @@ export type Service = { eventCount: { [eventName: string]: { [networkName: string]: number }; }; - startCheckpoint: Checkpoint; + firstEventCheckpoint: Checkpoint | undefined; + lastEventCheckpoint: Checkpoint | undefined; /** * Reduce memory usage by reserving space for objects ahead of time @@ -181,7 +182,8 @@ export const create = ({ indexingStore, isKilled: false, eventCount, - startCheckpoint: syncService.startCheckpoint, + firstEventCheckpoint: undefined, + lastEventCheckpoint: undefined, currentEvent: { contextState, context: { @@ -272,6 +274,21 @@ export const processEvents = async ( | { status: "success" } | { status: "killed" } > => { + // set first event checkpoint + if (events.length > 0 && indexingService.firstEventCheckpoint === undefined) { + indexingService.firstEventCheckpoint = decodeCheckpoint( + events[0].encodedCheckpoint, + ); + + // set total seconds + if (indexingService.lastEventCheckpoint !== undefined) { + indexingService.common.metrics.ponder_indexing_total_seconds.set( + indexingService.lastEventCheckpoint.blockTimestamp - + indexingService.firstEventCheckpoint.blockTimestamp, + ); + } + } + const eventCounts: { [eventName: string]: number } = {}; for (let i = 0; i < events.length; i++) { @@ -377,26 +394,30 @@ export const processEvents = async ( ).blockTimestamp; indexingService.common.metrics.ponder_indexing_completed_seconds.set( - eventTimestamp - indexingService.startCheckpoint.blockTimestamp, + eventTimestamp - indexingService.firstEventCheckpoint!.blockTimestamp, ); indexingService.common.metrics.ponder_indexing_completed_timestamp.set( eventTimestamp, ); - // Note: allows for terminal and logs to be updated + // Note(kyle) this is only needed for sqlite await new Promise(setImmediate); } } // set completed seconds - if (events.length > 0) { + if ( + events.length > 0 && + indexingService.firstEventCheckpoint !== undefined && + indexingService.lastEventCheckpoint !== undefined + ) { const lastEventInBatchTimestamp = decodeCheckpoint( events[events.length - 1].encodedCheckpoint, ).blockTimestamp; indexingService.common.metrics.ponder_indexing_completed_seconds.set( lastEventInBatchTimestamp - - indexingService.startCheckpoint.blockTimestamp, + indexingService.firstEventCheckpoint.blockTimestamp, ); indexingService.common.metrics.ponder_indexing_completed_timestamp.set( lastEventInBatchTimestamp, @@ -430,14 +451,18 @@ export const kill = (indexingService: Service) => { indexingService.isKilled = true; }; -export const updateTotalSeconds = ( +export const updateLastEventCheckpoint = ( indexingService: Service, - endCheckpoint: Checkpoint, + lastEventCheckpoint: Checkpoint, ) => { - indexingService.common.metrics.ponder_indexing_total_seconds.set( - endCheckpoint.blockTimestamp - - indexingService.startCheckpoint.blockTimestamp, - ); + indexingService.lastEventCheckpoint = lastEventCheckpoint; + + if (indexingService.firstEventCheckpoint !== undefined) { + indexingService.common.metrics.ponder_indexing_total_seconds.set( + indexingService.lastEventCheckpoint.blockTimestamp - + indexingService.firstEventCheckpoint.blockTimestamp, + ); + } }; const updateCompletedEvents = (indexingService: Service) => { diff --git a/packages/core/src/sync-historical/service.test.ts b/packages/core/src/sync-historical/service.test.ts index 5f9f1f3aa..0f9dc00e6 100644 --- a/packages/core/src/sync-historical/service.test.ts +++ b/packages/core/src/sync-historical/service.test.ts @@ -428,6 +428,7 @@ test("start() adds log filter events to sync store", async (context) => { sources: [sources[0]], fromCheckpoint: zeroCheckpoint, toCheckpoint: maxCheckpoint, + limit: 100, }); const events = await drainAsyncGenerator(ag); @@ -471,6 +472,7 @@ test("start() adds factory events to sync store", async (context) => { sources: [sources[1]], fromCheckpoint: zeroCheckpoint, toCheckpoint: maxCheckpoint, + limit: 100, }); const events = await drainAsyncGenerator(ag); @@ -501,6 +503,7 @@ test("start() adds block filter events to sync store", async (context) => { sources: [sources[4]], fromCheckpoint: zeroCheckpoint, toCheckpoint: maxCheckpoint, + limit: 100, }); const events = await drainAsyncGenerator(ag); @@ -542,6 +545,7 @@ test("start() adds trace filter events to sync store", async (context) => { sources: [sources[3]], fromCheckpoint: zeroCheckpoint, toCheckpoint: maxCheckpoint, + limit: 100, }); const events = await drainAsyncGenerator(ag); @@ -580,6 +584,7 @@ test("start() adds factory trace filter events to sync store", async (context) = sources: [sources[2]], fromCheckpoint: zeroCheckpoint, toCheckpoint: maxCheckpoint, + limit: 100, }); const events = await drainAsyncGenerator(ag); diff --git a/packages/core/src/sync-store/postgres/store.ts b/packages/core/src/sync-store/postgres/store.ts index eeedef3d1..b41c56329 100644 --- a/packages/core/src/sync-store/postgres/store.ts +++ b/packages/core/src/sync-store/postgres/store.ts @@ -67,16 +67,12 @@ export class PostgresSyncStore implements SyncStore { db: HeadlessKysely; common: Common; - private seconds: number; - constructor({ db, common, }: { db: HeadlessKysely; common: Common }) { this.db = db; this.common = common; - - this.seconds = common.options.syncEventsQuerySize * 2; } insertLogFilterInterval = async ({ @@ -1704,14 +1700,15 @@ export class PostgresSyncStore implements SyncStore { sources, fromCheckpoint, toCheckpoint, + limit, }: { sources: EventSource[]; fromCheckpoint: Checkpoint; toCheckpoint: Checkpoint; + limit: number; }) { - let fromCursor = encodeCheckpoint(fromCheckpoint); - let toCursor = encodeCheckpoint(toCheckpoint); - const maxToCursor = toCursor; + let cursor = encodeCheckpoint(fromCheckpoint); + const encodedToCheckpoint = encodeCheckpoint(toCheckpoint); const sourcesById = sources.reduce<{ [sourceId: string]: (typeof sources)[number]; @@ -1742,16 +1739,6 @@ export class PostgresSyncStore implements SyncStore { ); while (true) { - const estimatedToCursor = encodeCheckpoint({ - ...zeroCheckpoint, - blockTimestamp: Math.min( - decodeCheckpoint(fromCursor).blockTimestamp + this.seconds, - 9999999999, - ), - }); - toCursor = - estimatedToCursor > maxToCursor ? maxToCursor : estimatedToCursor; - const events = await this.db.wrap({ method: "getEvents" }, async () => { // Get full log objects, including the eventSelector clause. const requestedLogs = await this.db @@ -2030,10 +2017,10 @@ export class PostgresSyncStore implements SyncStore { "transactionReceipts.type as txr_type", ]), ) - .where("events.checkpoint", ">", fromCursor) - .where("events.checkpoint", "<=", toCursor) + .where("events.checkpoint", ">", cursor) + .where("events.checkpoint", "<=", encodedToCheckpoint) .orderBy("events.checkpoint", "asc") - .limit(this.common.options.syncEventsQuerySize) + .limit(limit + 1) .execute(); return requestedLogs.map((_row) => { @@ -2215,37 +2202,116 @@ export class PostgresSyncStore implements SyncStore { }); }); - // set fromCursor + seconds - if (events.length === 0) { - this.seconds = Math.round(this.seconds * 2); - fromCursor = toCursor; - } else if (events.length === this.common.options.syncEventsQuerySize) { - this.seconds = Math.round(this.seconds / 2); - fromCursor = events[events.length - 1].encodedCheckpoint; - } else { - this.seconds = Math.round( - Math.min( - (this.seconds / events.length) * - this.common.options.syncEventsQuerySize * - 0.9, - this.seconds * 2, - ), - ); - fromCursor = toCursor; - } + const hasNextPage = events.length === limit + 1; - if (events.length > 0) yield events; - - // exit condition - if ( - events.length !== this.common.options.syncEventsQuerySize && - toCursor === maxToCursor - ) { + if (!hasNextPage) { + yield events; break; + } else { + events.pop(); + cursor = events[events.length - 1].encodedCheckpoint; + yield events; } } } + async getLastEventCheckpoint({ + sources, + fromCheckpoint, + toCheckpoint, + }: { + sources: EventSource[]; + fromCheckpoint: Checkpoint; + toCheckpoint: Checkpoint; + }): Promise { + return this.db.wrap({ method: "getLastEventCheckpoint" }, async () => { + const checkpoint = await this.db + .selectFrom("logs") + .where((eb) => { + const logFilterCmprs = sources + .filter(sourceIsLog) + .map((logFilter) => { + const exprs = this.buildLogFilterCmprs({ eb, logFilter }); + return eb.and(exprs); + }); + + const factoryCmprs = sources + .filter(sourceIsFactoryLog) + .map((factory) => { + const exprs = this.buildFactoryLogFilterCmprs({ eb, factory }); + return eb.and(exprs); + }); + + return eb.or([...logFilterCmprs, ...factoryCmprs]); + }) + .select("checkpoint") + .unionAll( + this.db + .selectFrom("blocks") + .where((eb) => { + const exprs = []; + const blockFilters = sources.filter(sourceIsBlock); + for (const blockFilter of blockFilters) { + exprs.push( + eb.and([ + eb("chainId", "=", blockFilter.chainId), + eb("number", ">=", BigInt(blockFilter.startBlock)), + ...(blockFilter.endBlock !== undefined + ? [eb("number", "<=", BigInt(blockFilter.endBlock))] + : []), + sql`(number - ${sql.val( + BigInt(blockFilter.criteria.offset), + )}) % ${sql.val( + BigInt(blockFilter.criteria.interval), + )} = 0`, + ]), + ); + } + return eb.or(exprs); + }) + .select("checkpoint"), + ) + .unionAll( + this.db + .selectFrom("callTraces") + .where((eb) => { + const traceFilterCmprs = sources + .filter(sourceIsCallTrace) + .map((callTraceSource) => { + const exprs = this.buildTraceFilterCmprs({ + eb, + callTraceSource, + }); + return eb.and(exprs); + }); + + const factoryCallTraceCmprs = sources + .filter(sourceIsFactoryCallTrace) + .map((factory) => { + const exprs = this.buildFactoryTraceFilterCmprs({ + eb, + factory, + }); + return eb.and(exprs); + }); + + return eb.or([...traceFilterCmprs, ...factoryCallTraceCmprs]); + }) + .select("checkpoint"), + ) + .where("checkpoint", ">", encodeCheckpoint(fromCheckpoint)) + .where("checkpoint", "<=", encodeCheckpoint(toCheckpoint)) + .orderBy("checkpoint", "desc") + .executeTakeFirst(); + + return checkpoint + ? checkpoint.checkpoint + ? decodeCheckpoint(checkpoint.checkpoint) + : undefined + : undefined; + }); + } + private buildLogFilterCmprs = ({ eb, logFilter, diff --git a/packages/core/src/sync-store/sqlite/store.ts b/packages/core/src/sync-store/sqlite/store.ts index a040a9b95..683e6a5e9 100644 --- a/packages/core/src/sync-store/sqlite/store.ts +++ b/packages/core/src/sync-store/sqlite/store.ts @@ -76,16 +76,12 @@ export class SqliteSyncStore implements SyncStore { db: HeadlessKysely; common: Common; - private seconds: number; - constructor({ db, common, }: { db: HeadlessKysely; common: Common }) { this.db = db; this.common = common; - - this.seconds = common.options.syncEventsQuerySize * 2; } insertLogFilterInterval = async ({ @@ -1741,14 +1737,15 @@ export class SqliteSyncStore implements SyncStore { sources, fromCheckpoint, toCheckpoint, + limit, }: { sources: EventSource[]; fromCheckpoint: Checkpoint; toCheckpoint: Checkpoint; + limit: number; }) { - let fromCursor = encodeCheckpoint(fromCheckpoint); - let toCursor = encodeCheckpoint(toCheckpoint); - const maxToCursor = toCursor; + let cursor = encodeCheckpoint(fromCheckpoint); + const encodedToCheckpoint = encodeCheckpoint(toCheckpoint); const sourcesById = sources.reduce<{ [sourceId: string]: (typeof sources)[number]; @@ -1779,16 +1776,6 @@ export class SqliteSyncStore implements SyncStore { ); while (true) { - const estimatedToCursor = encodeCheckpoint({ - ...zeroCheckpoint, - blockTimestamp: Math.min( - decodeCheckpoint(fromCursor).blockTimestamp + this.seconds, - 9999999999, - ), - }); - toCursor = - estimatedToCursor > maxToCursor ? maxToCursor : estimatedToCursor; - const events = await this.db.wrap({ method: "getEvents" }, async () => { // Query a batch of logs. const requestedLogs = await this.db @@ -2072,10 +2059,10 @@ export class SqliteSyncStore implements SyncStore { "transactionReceipts.type as txr_type", ]), ) - .where("events.checkpoint", ">", fromCursor) - .where("events.checkpoint", "<=", toCursor) + .where("events.checkpoint", ">", cursor) + .where("events.checkpoint", "<=", encodedToCheckpoint) .orderBy("events.checkpoint", "asc") - .limit(this.common.options.syncEventsQuerySize) + .limit(limit + 1) .execute(); return requestedLogs.map((_row) => { @@ -2271,37 +2258,112 @@ export class SqliteSyncStore implements SyncStore { }); }); - // set fromCursor + seconds - if (events.length === 0) { - this.seconds = Math.round(this.seconds * 2); - fromCursor = toCursor; - } else if (events.length === this.common.options.syncEventsQuerySize) { - this.seconds = Math.round(this.seconds / 2); - fromCursor = events[events.length - 1].encodedCheckpoint; - } else { - this.seconds = Math.round( - Math.min( - (this.seconds / events.length) * - this.common.options.syncEventsQuerySize * - 0.9, - this.seconds * 2, - ), - ); - fromCursor = toCursor; - } + const hasNextPage = events.length === limit + 1; - if (events.length > 0) yield events; - - // exit condition - if ( - events.length !== this.common.options.syncEventsQuerySize && - toCursor === maxToCursor - ) { + if (!hasNextPage) { + yield events; break; + } else { + events.pop(); + cursor = events[events.length - 1].encodedCheckpoint; + yield events; } } } + async getLastEventCheckpoint({ + sources, + fromCheckpoint, + toCheckpoint, + }: { + sources: EventSource[]; + fromCheckpoint: Checkpoint; + toCheckpoint: Checkpoint; + }): Promise { + return this.db.wrap({ method: "getLastEventCheckpoint" }, async () => { + const checkpoint = await this.db + .selectFrom("logs") + .where((eb) => { + const logFilterCmprs = sources + .filter(sourceIsLog) + .map((logFilter) => { + const exprs = this.buildLogFilterCmprs({ eb, logFilter }); + return eb.and(exprs); + }); + + const factoryLogFilterCmprs = sources + .filter(sourceIsFactoryLog) + .map((factory) => { + const exprs = this.buildFactoryLogFilterCmprs({ eb, factory }); + return eb.and(exprs); + }); + + return eb.or([...logFilterCmprs, ...factoryLogFilterCmprs]); + }) + .select("checkpoint") + .unionAll( + this.db + .selectFrom("blocks") + .where((eb) => { + const exprs = []; + const blockFilters = sources.filter(sourceIsBlock); + for (const blockFilter of blockFilters) { + exprs.push( + eb.and([ + eb("chainId", "=", blockFilter.chainId), + eb("number", ">=", encodeAsText(blockFilter.startBlock)), + ...(blockFilter.endBlock !== undefined + ? [eb("number", "<=", encodeAsText(blockFilter.endBlock))] + : []), + sql`(number - ${blockFilter.criteria.offset}) % ${blockFilter.criteria.interval} = 0`, + ]), + ); + } + return eb.or(exprs); + }) + .select("checkpoint"), + ) + .unionAll( + this.db + .selectFrom("callTraces") + .where((eb) => { + const traceFilterCmprs = sources + .filter(sourceIsCallTrace) + .map((callTraceSource) => { + const exprs = this.buildTraceFilterCmprs({ + eb, + callTraceSource, + }); + return eb.and(exprs); + }); + + const factoryCallTraceCmprs = sources + .filter(sourceIsFactoryCallTrace) + .map((factory) => { + const exprs = this.buildFactoryTraceFilterCmprs({ + eb, + factory, + }); + return eb.and(exprs); + }); + + return eb.or([...traceFilterCmprs, ...factoryCallTraceCmprs]); + }) + .select("checkpoint"), + ) + .where("checkpoint", ">", encodeCheckpoint(fromCheckpoint)) + .where("checkpoint", "<=", encodeCheckpoint(toCheckpoint)) + .orderBy("checkpoint", "desc") + .executeTakeFirst(); + + return checkpoint + ? checkpoint.checkpoint + ? decodeCheckpoint(checkpoint.checkpoint) + : undefined + : undefined; + }); + } + private buildLogFilterCmprs = ({ eb, logFilter, diff --git a/packages/core/src/sync-store/store.bench.ts b/packages/core/src/sync-store/store.bench.ts index cd1c56d0d..afc7416cb 100644 --- a/packages/core/src/sync-store/store.bench.ts +++ b/packages/core/src/sync-store/store.bench.ts @@ -337,6 +337,7 @@ for (const c of LOG_FILTER_CASES) { ] as EventSource[], toCheckpoint, fromCheckpoint, + limit: 5000, }); const events = await drainAsyncGenerator(ag); @@ -378,6 +379,7 @@ for (const c of LOG_FILTER_CASES) { ] as EventSource[], toCheckpoint, fromCheckpoint, + limit: 5000, }); const events = await drainAsyncGenerator(ag); @@ -419,6 +421,7 @@ for (const c of LOG_FILTER_CASES) { ] as EventSource[], toCheckpoint, fromCheckpoint, + limit: 5000, }); const events = await drainAsyncGenerator(ag); diff --git a/packages/core/src/sync-store/store.test.ts b/packages/core/src/sync-store/store.test.ts index 4ba607660..27dced310 100644 --- a/packages/core/src/sync-store/store.test.ts +++ b/packages/core/src/sync-store/store.test.ts @@ -15,9 +15,7 @@ import { sourceIsFactoryLog, sourceIsLog, } from "@/config/sources.js"; -import type { SyncBlock } from "@/sync/index.js"; import { - type Checkpoint, EVENT_TYPES, decodeCheckpoint, maxCheckpoint, @@ -43,17 +41,6 @@ beforeEach(setupCommon); beforeEach(setupAnvil); beforeEach(setupIsolatedDatabase); -const createBlockCheckpoint = ( - block: SyncBlock, - isInclusive: boolean, -): Checkpoint => { - return { - ...(isInclusive ? maxCheckpoint : zeroCheckpoint), - blockTimestamp: hexToNumber(block.timestamp), - blockNumber: hexToBigInt(block.number), - }; -}; - test("setup creates tables", async (context) => { const { syncStore, cleanup } = await setupDatabaseServices(context); const tables = await syncStore.db.introspection.getTables(); @@ -2203,8 +2190,9 @@ test("getEvents with log filters", async (context) => { const ag = syncStore.getEvents({ sources: sources.filter((s) => sourceIsFactoryLog(s) || sourceIsLog(s)), - fromCheckpoint: createBlockCheckpoint(rpcData.block2.block, false), - toCheckpoint: createBlockCheckpoint(rpcData.block4.block, true), + fromCheckpoint: zeroCheckpoint, + toCheckpoint: maxCheckpoint, + limit: 100, }); const events = await drainAsyncGenerator(ag); @@ -2254,8 +2242,9 @@ test("getEvents with logs filters and receipts", async (context) => { ...s, criteria: { ...s.criteria, includeTransactionReceipts: true }, })), - fromCheckpoint: createBlockCheckpoint(rpcData.block2.block, false), - toCheckpoint: createBlockCheckpoint(rpcData.block4.block, true), + fromCheckpoint: zeroCheckpoint, + toCheckpoint: maxCheckpoint, + limit: 100, }); const events = await drainAsyncGenerator(ag); @@ -2305,8 +2294,9 @@ test("getEvents with block filters", async (context) => { const ag = syncStore.getEvents({ sources: [sources[4]], - fromCheckpoint: createBlockCheckpoint(rpcData.block2.block, false), - toCheckpoint: createBlockCheckpoint(rpcData.block4.block, true), + fromCheckpoint: zeroCheckpoint, + toCheckpoint: maxCheckpoint, + limit: 100, }); const events = await drainAsyncGenerator(ag); @@ -2336,8 +2326,9 @@ test("getEvents with trace filters", async (context) => { const ag = syncStore.getEvents({ sources: [sources[3]], - fromCheckpoint: createBlockCheckpoint(rpcData.block2.block, false), - toCheckpoint: createBlockCheckpoint(rpcData.block4.block, true), + fromCheckpoint: zeroCheckpoint, + toCheckpoint: maxCheckpoint, + limit: 100, }); const events = await drainAsyncGenerator(ag); @@ -2366,8 +2357,9 @@ test("getEvents with factory trace filters", async (context) => { const ag = syncStore.getEvents({ sources: [sources[2]], - fromCheckpoint: createBlockCheckpoint(rpcData.block2.block, false), - toCheckpoint: createBlockCheckpoint(rpcData.block4.block, true), + fromCheckpoint: zeroCheckpoint, + toCheckpoint: maxCheckpoint, + limit: 100, }); const events = await drainAsyncGenerator(ag); @@ -2402,8 +2394,9 @@ test("getEvents filters on log filter with multiple addresses", async (context) }, }, ], - fromCheckpoint: createBlockCheckpoint(rpcData.block2.block, false), - toCheckpoint: createBlockCheckpoint(rpcData.block3.block, true), + fromCheckpoint: zeroCheckpoint, + toCheckpoint: maxCheckpoint, + limit: 100, }); const events = await drainAsyncGenerator(ag); @@ -2450,8 +2443,9 @@ test("getEvents filters on log filter with single topic", async (context) => { }, }, ], - fromCheckpoint: createBlockCheckpoint(rpcData.block2.block, false), - toCheckpoint: createBlockCheckpoint(rpcData.block3.block, true), + fromCheckpoint: zeroCheckpoint, + toCheckpoint: maxCheckpoint, + limit: 100, }); const events = await drainAsyncGenerator(ag); @@ -2503,8 +2497,9 @@ test("getEvents filters on log filter with multiple topics", async (context) => }, }, ], - fromCheckpoint: createBlockCheckpoint(rpcData.block2.block, false), - toCheckpoint: createBlockCheckpoint(rpcData.block3.block, true), + fromCheckpoint: zeroCheckpoint, + toCheckpoint: maxCheckpoint, + limit: 100, }); const events = await drainAsyncGenerator(ag); @@ -2542,8 +2537,9 @@ test("getEvents filters on simple factory", async (context) => { const ag = syncStore.getEvents({ sources: [sources[1]], - fromCheckpoint: createBlockCheckpoint(rpcData.block4.block, false), - toCheckpoint: createBlockCheckpoint(rpcData.block4.block, true), + fromCheckpoint: zeroCheckpoint, + toCheckpoint: maxCheckpoint, + limit: 100, }); const events = await drainAsyncGenerator(ag); @@ -2573,8 +2569,9 @@ test("getEvents filters on startBlock", async (context) => { sources: [ { ...sources[0], startBlock: hexToNumber(rpcData.block4.block.number) }, ], - fromCheckpoint: createBlockCheckpoint(rpcData.block2.block, false), - toCheckpoint: createBlockCheckpoint(rpcData.block3.block, true), + fromCheckpoint: zeroCheckpoint, + toCheckpoint: maxCheckpoint, + limit: 100, }); const events = await drainAsyncGenerator(ag); @@ -2601,8 +2598,9 @@ test("getEvents filters on endBlock", async (context) => { sources: [ { ...sources[0], endBlock: hexToNumber(rpcData.block2.block.number) - 1 }, ], - fromCheckpoint: createBlockCheckpoint(rpcData.block2.block, false), - toCheckpoint: createBlockCheckpoint(rpcData.block3.block, true), + fromCheckpoint: zeroCheckpoint, + toCheckpoint: maxCheckpoint, + limit: 100, }); const events = await drainAsyncGenerator(ag); @@ -2636,7 +2634,8 @@ test("getEvents filters on fromCheckpoint (exclusive)", async (context) => { // Should exclude the 1st log in the first block. eventIndex: hexToBigInt(rpcData.block2.logs[0].logIndex!), }, - toCheckpoint: createBlockCheckpoint(rpcData.block3.block, true), + toCheckpoint: maxCheckpoint, + limit: 100, }); const events = await drainAsyncGenerator(ag); @@ -2662,7 +2661,7 @@ test("getEvents filters on toCheckpoint (inclusive)", async (context) => { const ag = syncStore.getEvents({ sources: [sources[0]], - fromCheckpoint: createBlockCheckpoint(rpcData.block2.block, false), + fromCheckpoint: zeroCheckpoint, toCheckpoint: { chainId: 1n, blockTimestamp: Number(rpcData.block2.block.timestamp!), @@ -2672,6 +2671,7 @@ test("getEvents filters on toCheckpoint (inclusive)", async (context) => { // Should include the 2nd log in the first block. eventIndex: hexToBigInt(rpcData.block2.logs[1].logIndex!), }, + limit: 100, }); const events = await drainAsyncGenerator(ag); @@ -2703,8 +2703,9 @@ test("getEvents filters on block filter criteria", async (context) => { const ag = syncStore.getEvents({ sources: [{ ...sources[4], endBlock: 3 }], - fromCheckpoint: createBlockCheckpoint(rpcData.block2.block, false), - toCheckpoint: createBlockCheckpoint(rpcData.block4.block, true), + fromCheckpoint: zeroCheckpoint, + toCheckpoint: maxCheckpoint, + limit: 100, }); const events = await drainAsyncGenerator(ag); @@ -2744,8 +2745,9 @@ test("getEvents filters on trace filter criteria", async (context) => { endBlock: 3, }, ], - fromCheckpoint: createBlockCheckpoint(rpcData.block2.block, false), - toCheckpoint: createBlockCheckpoint(rpcData.block4.block, true), + fromCheckpoint: zeroCheckpoint, + toCheckpoint: maxCheckpoint, + limit: 100, }); const events = await drainAsyncGenerator(ag); @@ -2774,8 +2776,9 @@ test("getEvents multiple sources", async (context) => { const ag = syncStore.getEvents({ sources, - fromCheckpoint: createBlockCheckpoint(rpcData.block2.block, false), - toCheckpoint: createBlockCheckpoint(rpcData.block4.block, true), + fromCheckpoint: zeroCheckpoint, + toCheckpoint: maxCheckpoint, + limit: 100, }); const events = await drainAsyncGenerator(ag); @@ -2810,8 +2813,9 @@ test("getEvents event filter on factory", async (context) => { criteria: { ...sources[1].criteria, topics: [`0x${"0".repeat(64)}`] }, }, ], - fromCheckpoint: createBlockCheckpoint(rpcData.block3.block, false), - toCheckpoint: createBlockCheckpoint(rpcData.block4.block, true), + fromCheckpoint: zeroCheckpoint, + toCheckpoint: maxCheckpoint, + limit: 100, }); const events = await drainAsyncGenerator(ag); @@ -2838,8 +2842,9 @@ test("getEvents multichain", async (context) => { chainId: 2, }, ], - fromCheckpoint: createBlockCheckpoint(rpcData.block2.block, false), - toCheckpoint: createBlockCheckpoint(rpcData.block2.block, true), + fromCheckpoint: zeroCheckpoint, + toCheckpoint: maxCheckpoint, + limit: 100, }); const events = await drainAsyncGenerator(ag); @@ -2860,10 +2865,6 @@ test("getEvents multichain", async (context) => { test("getEvents pagination", async (context) => { const { erc20, sources } = context; - - // set limit - context.common.options.syncEventsQuerySize = 1; - const { syncStore, cleanup } = await setupDatabaseServices(context); const rpcData = await getRawRPCData(sources); @@ -2874,8 +2875,9 @@ test("getEvents pagination", async (context) => { const ag = syncStore.getEvents({ sources: [sources[0], sources[1]], - fromCheckpoint: createBlockCheckpoint(rpcData.block2.block, false), - toCheckpoint: createBlockCheckpoint(rpcData.block2.block, true), + fromCheckpoint: zeroCheckpoint, + toCheckpoint: maxCheckpoint, + limit: 1, }); const firstBatchEvents = await ag.next(); @@ -2914,12 +2916,12 @@ test("getEvents pagination", async (context) => { test("getEvents empty", async (context) => { const { sources } = context; const { syncStore, cleanup } = await setupDatabaseServices(context); - const rpcData = await getRawRPCData(sources); const ag = syncStore.getEvents({ sources, - fromCheckpoint: createBlockCheckpoint(rpcData.block2.block, false), - toCheckpoint: createBlockCheckpoint(rpcData.block3.block, true), + fromCheckpoint: zeroCheckpoint, + toCheckpoint: maxCheckpoint, + limit: 100, }); const events = await drainAsyncGenerator(ag); @@ -2928,6 +2930,43 @@ test("getEvents empty", async (context) => { await cleanup(); }); +test("getLastEventCheckpoint", async (context) => { + const { sources } = context; + const { syncStore, cleanup } = await setupDatabaseServices(context); + const rpcData = await getRawRPCData(sources); + + await syncStore.insertRealtimeBlock({ + chainId: 1, + ...rpcData.block2, + }); + + const lastEventCheckpoint = await syncStore.getLastEventCheckpoint({ + sources, + fromCheckpoint: zeroCheckpoint, + toCheckpoint: maxCheckpoint, + }); + + expect(lastEventCheckpoint?.blockNumber).toBe(2n); + expect(lastEventCheckpoint?.transactionIndex).toBe(1n); + expect(lastEventCheckpoint?.eventIndex).toBe(1n); + + await cleanup(); +}); + +test("getLastEventCheckpoint empty", async (context) => { + const { sources } = context; + const { syncStore, cleanup } = await setupDatabaseServices(context); + + const lastEventCheckpoint = await syncStore.getLastEventCheckpoint({ + sources, + fromCheckpoint: zeroCheckpoint, + toCheckpoint: maxCheckpoint, + }); + expect(lastEventCheckpoint).toBe(undefined); + + await cleanup(); +}); + test("pruneByChainId deletes filters", async (context) => { const { sources } = context; const { syncStore, cleanup } = await setupDatabaseServices(context); diff --git a/packages/core/src/sync-store/store.ts b/packages/core/src/sync-store/store.ts index c80d1df26..97f1a03b7 100644 --- a/packages/core/src/sync-store/store.ts +++ b/packages/core/src/sync-store/store.ts @@ -267,8 +267,15 @@ export interface SyncStore { sources: EventSource[]; fromCheckpoint: Checkpoint; toCheckpoint: Checkpoint; + limit: number; }): AsyncGenerator; + getLastEventCheckpoint(arg: { + sources: EventSource[]; + fromCheckpoint: Checkpoint; + toCheckpoint: Checkpoint; + }): Promise; + /** PRUNING */ pruneByChainId(arg: { chainId: number; block: number }): Promise; diff --git a/packages/core/src/sync/service.ts b/packages/core/src/sync/service.ts index a7ddc1d9a..aeb241bce 100644 --- a/packages/core/src/sync/service.ts +++ b/packages/core/src/sync/service.ts @@ -11,12 +11,9 @@ import { import type { SyncStore } from "@/sync-store/store.js"; import { type Checkpoint, - checkpointMax, checkpointMin, - isCheckpointEqual, isCheckpointGreaterThan, maxCheckpoint, - zeroCheckpoint, } from "@/utils/checkpoint.js"; import { never } from "@/utils/never.js"; import { type RequestQueue, createRequestQueue } from "@/utils/requestQueue.js"; @@ -33,8 +30,6 @@ export type Service = { // state checkpoint: Checkpoint; - startCheckpoint: Checkpoint; - endCheckpoint: Checkpoint | undefined; finalizedCheckpoint: Checkpoint; isKilled: boolean; @@ -45,8 +40,6 @@ export type Service = { requestQueue: RequestQueue; cachedTransport: Transport; - startCheckpoint: Checkpoint; - endCheckpoint: Checkpoint | undefined; initialFinalizedCheckpoint: Checkpoint; realtime: @@ -227,40 +220,14 @@ export const create = async ({ common, }); - const hasEndBlock = networkSources.every( - (source) => source.endBlock !== undefined, - ); - - const [ - startBlock, - endBlock, - { latestBlock, finalizedBlock }, - remoteChainId, - ] = await Promise.all([ - _eth_getBlockByNumber( - { requestQueue }, - { - blockNumber: Math.min( - ...networkSources.map((source) => source.startBlock), - ), - }, - ), - hasEndBlock - ? _eth_getBlockByNumber( - { requestQueue }, - { - blockNumber: Math.max( - ...networkSources.map((source) => source.endBlock!), - ), - }, - ) - : undefined, - getLatestAndFinalizedBlocks({ - network, - requestQueue, - }), - requestQueue.request({ method: "eth_chainId" }).then(hexToNumber), - ]); + const [{ latestBlock, finalizedBlock }, remoteChainId] = + await Promise.all([ + getLatestAndFinalizedBlocks({ + network, + requestQueue, + }), + requestQueue.request({ method: "eth_chainId" }).then(hexToNumber), + ]); if (network.chainId !== remoteChainId) { common.logger.warn({ @@ -312,18 +279,6 @@ export const create = async ({ sources: networkSources, requestQueue, cachedTransport: cachedTransport({ requestQueue, syncStore }), - startCheckpoint: { - ...zeroCheckpoint, - blockTimestamp: hexToNumber(startBlock.timestamp), - blockNumber: hexToBigInt(startBlock.number), - }, - endCheckpoint: endBlock - ? { - ...zeroCheckpoint, - blockTimestamp: hexToNumber(endBlock.timestamp), - blockNumber: hexToBigInt(endBlock.number), - } - : undefined, initialFinalizedCheckpoint, realtime: undefined, historical: { @@ -349,18 +304,6 @@ export const create = async ({ sources: networkSources, requestQueue, cachedTransport: cachedTransport({ requestQueue, syncStore }), - startCheckpoint: { - ...zeroCheckpoint, - blockTimestamp: hexToNumber(startBlock.timestamp), - blockNumber: hexToBigInt(startBlock.number), - }, - endCheckpoint: endBlock - ? { - ...zeroCheckpoint, - blockTimestamp: hexToNumber(endBlock.timestamp), - blockNumber: hexToBigInt(endBlock.number), - } - : undefined, initialFinalizedCheckpoint, realtime: { realtimeSync, @@ -431,23 +374,13 @@ export const create = async ({ } } - const startCheckpoint = checkpointMin( - ...networkServices.map((ns) => ns.startCheckpoint), - ); - const syncService: Service = { common, syncStore, sources, networkServices, isKilled: false, - startCheckpoint, - endCheckpoint: networkServices.every((ns) => ns.endCheckpoint !== undefined) - ? checkpointMax(...networkServices.map((ns) => ns.endCheckpoint!)) - : undefined, - checkpoint: isCheckpointEqual(initialCheckpoint, zeroCheckpoint) - ? startCheckpoint - : initialCheckpoint, + checkpoint: initialCheckpoint, finalizedCheckpoint: checkpointMin( ...networkServices.map((ns) => ns.initialFinalizedCheckpoint), ), @@ -494,7 +427,7 @@ export const getHistoricalCheckpoint = async function* ( yield { fromCheckpoint: syncService.checkpoint, - toCheckpoint: syncService.endCheckpoint ?? finalityCheckpoint, + toCheckpoint: finalityCheckpoint, }; syncService.checkpoint = finalityCheckpoint;