diff --git a/packages/core/src/bin/utils/run.ts b/packages/core/src/bin/utils/run.ts index 12f9c91ef..4d9d61fa5 100644 --- a/packages/core/src/bin/utils/run.ts +++ b/packages/core/src/bin/utils/run.ts @@ -124,13 +124,8 @@ export async function run({ initialCheckpoint, }); - const handleEvents = async ( - events: Event[], - lastEventCheckpoint: Checkpoint | undefined, - ) => { - if (lastEventCheckpoint !== undefined) { - indexingService.updateLastEventCheckpoint(lastEventCheckpoint); - } + const handleEvents = async (events: Event[], toCheckpoint: Checkpoint) => { + indexingService.updateTotalSeconds(toCheckpoint); if (events.length === 0) return { status: "success" } as const; @@ -155,11 +150,6 @@ export async function run({ worker: async (event: RealtimeEvent) => { switch (event.type) { case "newEvents": { - const lastEventCheckpoint = await syncStore.getLastEventCheckpoint({ - sources: sources, - fromCheckpoint: event.fromCheckpoint, - toCheckpoint: event.toCheckpoint, - }); for await (const rawEvents of syncStore.getEvents({ sources, fromCheckpoint: event.fromCheckpoint, @@ -167,7 +157,7 @@ export async function run({ })) { const result = await handleEvents( decodeEvents(syncService, rawEvents), - lastEventCheckpoint, + event.toCheckpoint, ); if (result.status === "error") onReloadableError(result.error); } @@ -239,12 +229,6 @@ export async function run({ fromCheckpoint, toCheckpoint, } of syncService.getHistoricalCheckpoint()) { - const lastEventCheckpoint = await syncStore.getLastEventCheckpoint({ - sources: sources, - fromCheckpoint, - toCheckpoint, - }); - for await (const rawEvents of syncStore.getEvents({ sources: sources, fromCheckpoint, @@ -252,7 +236,7 @@ export async function run({ })) { const result = await handleEvents( decodeEvents(syncService, rawEvents), - lastEventCheckpoint, + toCheckpoint, ); if (result.status === "killed") { @@ -266,6 +250,15 @@ export async function run({ await historicalStore.flush({ isFullFlush: true }); + // set completed indexing metrics + common.metrics.ponder_indexing_completed_seconds.set( + syncService.checkpoint.blockTimestamp - + syncService.startCheckpoint.blockTimestamp, + ); + common.metrics.ponder_indexing_completed_timestamp.set( + syncService.checkpoint.blockTimestamp, + ); + // Become healthy common.logger.info({ service: "indexing", diff --git a/packages/core/src/indexing/index.ts b/packages/core/src/indexing/index.ts index a2f626c13..ed1b4da7d 100644 --- a/packages/core/src/indexing/index.ts +++ b/packages/core/src/indexing/index.ts @@ -5,7 +5,7 @@ import { processEvents, processSetupEvents, updateIndexingStore, - updateLastEventCheckpoint, + updateTotalSeconds, } from "./service.js"; import type { Context, Service } from "./service.js"; @@ -15,7 +15,7 @@ const methods = { processEvents, processSetupEvents, updateIndexingStore, - updateLastEventCheckpoint, + updateTotalSeconds, }; export const createIndexingService = extend(create, methods); diff --git a/packages/core/src/indexing/service.ts b/packages/core/src/indexing/service.ts index bf078bca1..a042d5843 100644 --- a/packages/core/src/indexing/service.ts +++ b/packages/core/src/indexing/service.ts @@ -66,8 +66,7 @@ export type Service = { eventCount: { [eventName: string]: { [networkName: string]: number }; }; - firstEventCheckpoint: Checkpoint | undefined; - lastEventCheckpoint: Checkpoint | undefined; + startCheckpoint: Checkpoint; /** * Reduce memory usage by reserving space for objects ahead of time @@ -182,8 +181,7 @@ export const create = ({ indexingStore, isKilled: false, eventCount, - firstEventCheckpoint: undefined, - lastEventCheckpoint: undefined, + startCheckpoint: syncService.startCheckpoint, currentEvent: { contextState, context: { @@ -274,21 +272,6 @@ export const processEvents = async ( | { status: "success" } | { status: "killed" } > => { - // set first event checkpoint - if (events.length > 0 && indexingService.firstEventCheckpoint === undefined) { - indexingService.firstEventCheckpoint = decodeCheckpoint( - events[0].encodedCheckpoint, - ); - - // set total seconds - if (indexingService.lastEventCheckpoint !== undefined) { - indexingService.common.metrics.ponder_indexing_total_seconds.set( - indexingService.lastEventCheckpoint.blockTimestamp - - indexingService.firstEventCheckpoint.blockTimestamp, - ); - } - } - const eventCounts: { [eventName: string]: number } = {}; for (let i = 0; i < events.length; i++) { @@ -394,27 +377,26 @@ export const processEvents = async ( ).blockTimestamp; indexingService.common.metrics.ponder_indexing_completed_seconds.set( - eventTimestamp - indexingService.firstEventCheckpoint!.blockTimestamp, + eventTimestamp - indexingService.startCheckpoint.blockTimestamp, ); indexingService.common.metrics.ponder_indexing_completed_timestamp.set( eventTimestamp, ); + + // Note: allows for terminal and logs to be updated + await new Promise(setImmediate); } } // set completed seconds - if ( - events.length > 0 && - indexingService.firstEventCheckpoint !== undefined && - indexingService.lastEventCheckpoint !== undefined - ) { + if (events.length > 0) { const lastEventInBatchTimestamp = decodeCheckpoint( events[events.length - 1].encodedCheckpoint, ).blockTimestamp; indexingService.common.metrics.ponder_indexing_completed_seconds.set( lastEventInBatchTimestamp - - indexingService.firstEventCheckpoint.blockTimestamp, + indexingService.startCheckpoint.blockTimestamp, ); indexingService.common.metrics.ponder_indexing_completed_timestamp.set( lastEventInBatchTimestamp, @@ -448,18 +430,14 @@ export const kill = (indexingService: Service) => { indexingService.isKilled = true; }; -export const updateLastEventCheckpoint = ( +export const updateTotalSeconds = ( indexingService: Service, - lastEventCheckpoint: Checkpoint, + endCheckpoint: Checkpoint, ) => { - indexingService.lastEventCheckpoint = lastEventCheckpoint; - - if (indexingService.firstEventCheckpoint !== undefined) { - indexingService.common.metrics.ponder_indexing_total_seconds.set( - indexingService.lastEventCheckpoint.blockTimestamp - - indexingService.firstEventCheckpoint.blockTimestamp, - ); - } + indexingService.common.metrics.ponder_indexing_total_seconds.set( + endCheckpoint.blockTimestamp - + indexingService.startCheckpoint.blockTimestamp, + ); }; const updateCompletedEvents = (indexingService: Service) => { diff --git a/packages/core/src/sync-store/postgres/store.ts b/packages/core/src/sync-store/postgres/store.ts index 6b24a7bf6..eeedef3d1 100644 --- a/packages/core/src/sync-store/postgres/store.ts +++ b/packages/core/src/sync-store/postgres/store.ts @@ -2246,103 +2246,6 @@ export class PostgresSyncStore implements SyncStore { } } - async getLastEventCheckpoint({ - sources, - fromCheckpoint, - toCheckpoint, - }: { - sources: EventSource[]; - fromCheckpoint: Checkpoint; - toCheckpoint: Checkpoint; - }): Promise { - return this.db.wrap({ method: "getLastEventCheckpoint" }, async () => { - const checkpoint = await this.db - .selectFrom("logs") - .where((eb) => { - const logFilterCmprs = sources - .filter(sourceIsLog) - .map((logFilter) => { - const exprs = this.buildLogFilterCmprs({ eb, logFilter }); - return eb.and(exprs); - }); - - const factoryCmprs = sources - .filter(sourceIsFactoryLog) - .map((factory) => { - const exprs = this.buildFactoryLogFilterCmprs({ eb, factory }); - return eb.and(exprs); - }); - - return eb.or([...logFilterCmprs, ...factoryCmprs]); - }) - .select("checkpoint") - .unionAll( - this.db - .selectFrom("blocks") - .where((eb) => { - const exprs = []; - const blockFilters = sources.filter(sourceIsBlock); - for (const blockFilter of blockFilters) { - exprs.push( - eb.and([ - eb("chainId", "=", blockFilter.chainId), - eb("number", ">=", BigInt(blockFilter.startBlock)), - ...(blockFilter.endBlock !== undefined - ? [eb("number", "<=", BigInt(blockFilter.endBlock))] - : []), - sql`(number - ${sql.val( - BigInt(blockFilter.criteria.offset), - )}) % ${sql.val( - BigInt(blockFilter.criteria.interval), - )} = 0`, - ]), - ); - } - return eb.or(exprs); - }) - .select("checkpoint"), - ) - .unionAll( - this.db - .selectFrom("callTraces") - .where((eb) => { - const traceFilterCmprs = sources - .filter(sourceIsCallTrace) - .map((callTraceSource) => { - const exprs = this.buildTraceFilterCmprs({ - eb, - callTraceSource, - }); - return eb.and(exprs); - }); - - const factoryCallTraceCmprs = sources - .filter(sourceIsFactoryCallTrace) - .map((factory) => { - const exprs = this.buildFactoryTraceFilterCmprs({ - eb, - factory, - }); - return eb.and(exprs); - }); - - return eb.or([...traceFilterCmprs, ...factoryCallTraceCmprs]); - }) - .select("checkpoint"), - ) - .where("checkpoint", ">", encodeCheckpoint(fromCheckpoint)) - .where("checkpoint", "<=", encodeCheckpoint(toCheckpoint)) - .orderBy("checkpoint", "desc") - .executeTakeFirst(); - - return checkpoint - ? checkpoint.checkpoint - ? decodeCheckpoint(checkpoint.checkpoint) - : undefined - : undefined; - }); - } - private buildLogFilterCmprs = ({ eb, logFilter, diff --git a/packages/core/src/sync-store/sqlite/store.ts b/packages/core/src/sync-store/sqlite/store.ts index 65e77f1c7..a040a9b95 100644 --- a/packages/core/src/sync-store/sqlite/store.ts +++ b/packages/core/src/sync-store/sqlite/store.ts @@ -2302,99 +2302,6 @@ export class SqliteSyncStore implements SyncStore { } } - async getLastEventCheckpoint({ - sources, - fromCheckpoint, - toCheckpoint, - }: { - sources: EventSource[]; - fromCheckpoint: Checkpoint; - toCheckpoint: Checkpoint; - }): Promise { - return this.db.wrap({ method: "getLastEventCheckpoint" }, async () => { - const checkpoint = await this.db - .selectFrom("logs") - .where((eb) => { - const logFilterCmprs = sources - .filter(sourceIsLog) - .map((logFilter) => { - const exprs = this.buildLogFilterCmprs({ eb, logFilter }); - return eb.and(exprs); - }); - - const factoryLogFilterCmprs = sources - .filter(sourceIsFactoryLog) - .map((factory) => { - const exprs = this.buildFactoryLogFilterCmprs({ eb, factory }); - return eb.and(exprs); - }); - - return eb.or([...logFilterCmprs, ...factoryLogFilterCmprs]); - }) - .select("checkpoint") - .unionAll( - this.db - .selectFrom("blocks") - .where((eb) => { - const exprs = []; - const blockFilters = sources.filter(sourceIsBlock); - for (const blockFilter of blockFilters) { - exprs.push( - eb.and([ - eb("chainId", "=", blockFilter.chainId), - eb("number", ">=", encodeAsText(blockFilter.startBlock)), - ...(blockFilter.endBlock !== undefined - ? [eb("number", "<=", encodeAsText(blockFilter.endBlock))] - : []), - sql`(number - ${blockFilter.criteria.offset}) % ${blockFilter.criteria.interval} = 0`, - ]), - ); - } - return eb.or(exprs); - }) - .select("checkpoint"), - ) - .unionAll( - this.db - .selectFrom("callTraces") - .where((eb) => { - const traceFilterCmprs = sources - .filter(sourceIsCallTrace) - .map((callTraceSource) => { - const exprs = this.buildTraceFilterCmprs({ - eb, - callTraceSource, - }); - return eb.and(exprs); - }); - - const factoryCallTraceCmprs = sources - .filter(sourceIsFactoryCallTrace) - .map((factory) => { - const exprs = this.buildFactoryTraceFilterCmprs({ - eb, - factory, - }); - return eb.and(exprs); - }); - - return eb.or([...traceFilterCmprs, ...factoryCallTraceCmprs]); - }) - .select("checkpoint"), - ) - .where("checkpoint", ">", encodeCheckpoint(fromCheckpoint)) - .where("checkpoint", "<=", encodeCheckpoint(toCheckpoint)) - .orderBy("checkpoint", "desc") - .executeTakeFirst(); - - return checkpoint - ? checkpoint.checkpoint - ? decodeCheckpoint(checkpoint.checkpoint) - : undefined - : undefined; - }); - } - private buildLogFilterCmprs = ({ eb, logFilter, diff --git a/packages/core/src/sync-store/store.test.ts b/packages/core/src/sync-store/store.test.ts index 813440b51..4ba607660 100644 --- a/packages/core/src/sync-store/store.test.ts +++ b/packages/core/src/sync-store/store.test.ts @@ -2928,43 +2928,6 @@ test("getEvents empty", async (context) => { await cleanup(); }); -test("getLastEventCheckpoint", async (context) => { - const { sources } = context; - const { syncStore, cleanup } = await setupDatabaseServices(context); - const rpcData = await getRawRPCData(sources); - - await syncStore.insertRealtimeBlock({ - chainId: 1, - ...rpcData.block2, - }); - - const lastEventCheckpoint = await syncStore.getLastEventCheckpoint({ - sources, - fromCheckpoint: zeroCheckpoint, - toCheckpoint: maxCheckpoint, - }); - - expect(lastEventCheckpoint?.blockNumber).toBe(2n); - expect(lastEventCheckpoint?.transactionIndex).toBe(1n); - expect(lastEventCheckpoint?.eventIndex).toBe(1n); - - await cleanup(); -}); - -test("getLastEventCheckpoint empty", async (context) => { - const { sources } = context; - const { syncStore, cleanup } = await setupDatabaseServices(context); - - const lastEventCheckpoint = await syncStore.getLastEventCheckpoint({ - sources, - fromCheckpoint: zeroCheckpoint, - toCheckpoint: maxCheckpoint, - }); - expect(lastEventCheckpoint).toBe(undefined); - - await cleanup(); -}); - test("pruneByChainId deletes filters", async (context) => { const { sources } = context; const { syncStore, cleanup } = await setupDatabaseServices(context); diff --git a/packages/core/src/sync-store/store.ts b/packages/core/src/sync-store/store.ts index bd7e45590..c80d1df26 100644 --- a/packages/core/src/sync-store/store.ts +++ b/packages/core/src/sync-store/store.ts @@ -269,12 +269,6 @@ export interface SyncStore { toCheckpoint: Checkpoint; }): AsyncGenerator; - getLastEventCheckpoint(arg: { - sources: EventSource[]; - fromCheckpoint: Checkpoint; - toCheckpoint: Checkpoint; - }): Promise; - /** PRUNING */ pruneByChainId(arg: { chainId: number; block: number }): Promise; diff --git a/packages/core/src/sync/service.ts b/packages/core/src/sync/service.ts index ebd22d372..a7ddc1d9a 100644 --- a/packages/core/src/sync/service.ts +++ b/packages/core/src/sync/service.ts @@ -13,6 +13,7 @@ import { type Checkpoint, checkpointMax, checkpointMin, + isCheckpointEqual, isCheckpointGreaterThan, maxCheckpoint, zeroCheckpoint, @@ -32,6 +33,8 @@ export type Service = { // state checkpoint: Checkpoint; + startCheckpoint: Checkpoint; + endCheckpoint: Checkpoint | undefined; finalizedCheckpoint: Checkpoint; isKilled: boolean; @@ -428,16 +431,23 @@ export const create = async ({ } } + const startCheckpoint = checkpointMin( + ...networkServices.map((ns) => ns.startCheckpoint), + ); + const syncService: Service = { common, syncStore, sources, networkServices, isKilled: false, - // TODO(kyle) - checkpoint: checkpointMin( - ...networkServices.map((ns) => ns.startCheckpoint), - ), + startCheckpoint, + endCheckpoint: networkServices.every((ns) => ns.endCheckpoint !== undefined) + ? checkpointMax(...networkServices.map((ns) => ns.endCheckpoint!)) + : undefined, + checkpoint: isCheckpointEqual(initialCheckpoint, zeroCheckpoint) + ? startCheckpoint + : initialCheckpoint, finalizedCheckpoint: checkpointMin( ...networkServices.map((ns) => ns.initialFinalizedCheckpoint), ), @@ -471,14 +481,6 @@ export const getHistoricalCheckpoint = async function* ( ); if (isComplete) { - const endCheckpoint = syncService.networkServices.every( - (ns) => ns.endCheckpoint !== undefined, - ) - ? checkpointMax( - ...syncService.networkServices.map((ns) => ns.endCheckpoint!), - ) - : undefined; - const finalityCheckpoint = checkpointMin( ...syncService.networkServices.map( ({ initialFinalizedCheckpoint }) => initialFinalizedCheckpoint, @@ -492,7 +494,7 @@ export const getHistoricalCheckpoint = async function* ( yield { fromCheckpoint: syncService.checkpoint, - toCheckpoint: endCheckpoint ?? finalityCheckpoint, + toCheckpoint: syncService.endCheckpoint ?? finalityCheckpoint, }; syncService.checkpoint = finalityCheckpoint;