Skip to content

Commit

Permalink
remove getLastEventCheckpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
kyscott18 committed Jun 12, 2024
1 parent 05c65ed commit 46c54aa
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 304 deletions.
33 changes: 13 additions & 20 deletions packages/core/src/bin/utils/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,8 @@ export async function run({
initialCheckpoint,
});

const handleEvents = async (
events: Event[],
lastEventCheckpoint: Checkpoint | undefined,
) => {
if (lastEventCheckpoint !== undefined) {
indexingService.updateLastEventCheckpoint(lastEventCheckpoint);
}
const handleEvents = async (events: Event[], toCheckpoint: Checkpoint) => {
indexingService.updateTotalSeconds(toCheckpoint);

if (events.length === 0) return { status: "success" } as const;

Expand All @@ -155,19 +150,14 @@ export async function run({
worker: async (event: RealtimeEvent) => {
switch (event.type) {
case "newEvents": {
const lastEventCheckpoint = await syncStore.getLastEventCheckpoint({
sources: sources,
fromCheckpoint: event.fromCheckpoint,
toCheckpoint: event.toCheckpoint,
});
for await (const rawEvents of syncStore.getEvents({
sources,
fromCheckpoint: event.fromCheckpoint,
toCheckpoint: event.toCheckpoint,
})) {
const result = await handleEvents(
decodeEvents(syncService, rawEvents),
lastEventCheckpoint,
event.toCheckpoint,
);
if (result.status === "error") onReloadableError(result.error);
}
Expand Down Expand Up @@ -239,20 +229,14 @@ export async function run({
fromCheckpoint,
toCheckpoint,
} of syncService.getHistoricalCheckpoint()) {
const lastEventCheckpoint = await syncStore.getLastEventCheckpoint({
sources: sources,
fromCheckpoint,
toCheckpoint,
});

for await (const rawEvents of syncStore.getEvents({
sources: sources,
fromCheckpoint,
toCheckpoint,
})) {
const result = await handleEvents(
decodeEvents(syncService, rawEvents),
lastEventCheckpoint,
toCheckpoint,
);

if (result.status === "killed") {
Expand All @@ -266,6 +250,15 @@ export async function run({

await historicalStore.flush({ isFullFlush: true });

// set completed indexing metrics
common.metrics.ponder_indexing_completed_seconds.set(
syncService.checkpoint.blockTimestamp -
syncService.startCheckpoint.blockTimestamp,
);
common.metrics.ponder_indexing_completed_timestamp.set(
syncService.checkpoint.blockTimestamp,
);

// Become healthy
common.logger.info({
service: "indexing",
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/indexing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
processEvents,
processSetupEvents,
updateIndexingStore,
updateLastEventCheckpoint,
updateTotalSeconds,
} from "./service.js";
import type { Context, Service } from "./service.js";

Expand All @@ -15,7 +15,7 @@ const methods = {
processEvents,
processSetupEvents,
updateIndexingStore,
updateLastEventCheckpoint,
updateTotalSeconds,
};

export const createIndexingService = extend(create, methods);
Expand Down
50 changes: 14 additions & 36 deletions packages/core/src/indexing/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ export type Service = {
eventCount: {
[eventName: string]: { [networkName: string]: number };
};
firstEventCheckpoint: Checkpoint | undefined;
lastEventCheckpoint: Checkpoint | undefined;
startCheckpoint: Checkpoint;

/**
* Reduce memory usage by reserving space for objects ahead of time
Expand Down Expand Up @@ -182,8 +181,7 @@ export const create = ({
indexingStore,
isKilled: false,
eventCount,
firstEventCheckpoint: undefined,
lastEventCheckpoint: undefined,
startCheckpoint: syncService.startCheckpoint,
currentEvent: {
contextState,
context: {
Expand Down Expand Up @@ -274,21 +272,6 @@ export const processEvents = async (
| { status: "success" }
| { status: "killed" }
> => {
// set first event checkpoint
if (events.length > 0 && indexingService.firstEventCheckpoint === undefined) {
indexingService.firstEventCheckpoint = decodeCheckpoint(
events[0].encodedCheckpoint,
);

// set total seconds
if (indexingService.lastEventCheckpoint !== undefined) {
indexingService.common.metrics.ponder_indexing_total_seconds.set(
indexingService.lastEventCheckpoint.blockTimestamp -
indexingService.firstEventCheckpoint.blockTimestamp,
);
}
}

const eventCounts: { [eventName: string]: number } = {};

for (let i = 0; i < events.length; i++) {
Expand Down Expand Up @@ -394,27 +377,26 @@ export const processEvents = async (
).blockTimestamp;

indexingService.common.metrics.ponder_indexing_completed_seconds.set(
eventTimestamp - indexingService.firstEventCheckpoint!.blockTimestamp,
eventTimestamp - indexingService.startCheckpoint.blockTimestamp,
);
indexingService.common.metrics.ponder_indexing_completed_timestamp.set(
eventTimestamp,
);

// Note: allows for terminal and logs to be updated
await new Promise(setImmediate);
}
}

// set completed seconds
if (
events.length > 0 &&
indexingService.firstEventCheckpoint !== undefined &&
indexingService.lastEventCheckpoint !== undefined
) {
if (events.length > 0) {
const lastEventInBatchTimestamp = decodeCheckpoint(
events[events.length - 1].encodedCheckpoint,
).blockTimestamp;

indexingService.common.metrics.ponder_indexing_completed_seconds.set(
lastEventInBatchTimestamp -
indexingService.firstEventCheckpoint.blockTimestamp,
indexingService.startCheckpoint.blockTimestamp,
);
indexingService.common.metrics.ponder_indexing_completed_timestamp.set(
lastEventInBatchTimestamp,
Expand Down Expand Up @@ -448,18 +430,14 @@ export const kill = (indexingService: Service) => {
indexingService.isKilled = true;
};

export const updateLastEventCheckpoint = (
export const updateTotalSeconds = (
indexingService: Service,
lastEventCheckpoint: Checkpoint,
endCheckpoint: Checkpoint,
) => {
indexingService.lastEventCheckpoint = lastEventCheckpoint;

if (indexingService.firstEventCheckpoint !== undefined) {
indexingService.common.metrics.ponder_indexing_total_seconds.set(
indexingService.lastEventCheckpoint.blockTimestamp -
indexingService.firstEventCheckpoint.blockTimestamp,
);
}
indexingService.common.metrics.ponder_indexing_total_seconds.set(
endCheckpoint.blockTimestamp -
indexingService.startCheckpoint.blockTimestamp,
);
};

const updateCompletedEvents = (indexingService: Service) => {
Expand Down
97 changes: 0 additions & 97 deletions packages/core/src/sync-store/postgres/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2246,103 +2246,6 @@ export class PostgresSyncStore implements SyncStore {
}
}

async getLastEventCheckpoint({
sources,
fromCheckpoint,
toCheckpoint,
}: {
sources: EventSource[];
fromCheckpoint: Checkpoint;
toCheckpoint: Checkpoint;
}): Promise<Checkpoint | undefined> {
return this.db.wrap({ method: "getLastEventCheckpoint" }, async () => {
const checkpoint = await this.db
.selectFrom("logs")
.where((eb) => {
const logFilterCmprs = sources
.filter(sourceIsLog)
.map((logFilter) => {
const exprs = this.buildLogFilterCmprs({ eb, logFilter });
return eb.and(exprs);
});

const factoryCmprs = sources
.filter(sourceIsFactoryLog)
.map((factory) => {
const exprs = this.buildFactoryLogFilterCmprs({ eb, factory });
return eb.and(exprs);
});

return eb.or([...logFilterCmprs, ...factoryCmprs]);
})
.select("checkpoint")
.unionAll(
this.db
.selectFrom("blocks")
.where((eb) => {
const exprs = [];
const blockFilters = sources.filter(sourceIsBlock);
for (const blockFilter of blockFilters) {
exprs.push(
eb.and([
eb("chainId", "=", blockFilter.chainId),
eb("number", ">=", BigInt(blockFilter.startBlock)),
...(blockFilter.endBlock !== undefined
? [eb("number", "<=", BigInt(blockFilter.endBlock))]
: []),
sql`(number - ${sql.val(
BigInt(blockFilter.criteria.offset),
)}) % ${sql.val(
BigInt(blockFilter.criteria.interval),
)} = 0`,
]),
);
}
return eb.or(exprs);
})
.select("checkpoint"),
)
.unionAll(
this.db
.selectFrom("callTraces")
.where((eb) => {
const traceFilterCmprs = sources
.filter(sourceIsCallTrace)
.map((callTraceSource) => {
const exprs = this.buildTraceFilterCmprs({
eb,
callTraceSource,
});
return eb.and(exprs);
});

const factoryCallTraceCmprs = sources
.filter(sourceIsFactoryCallTrace)
.map((factory) => {
const exprs = this.buildFactoryTraceFilterCmprs({
eb,
factory,
});
return eb.and(exprs);
});

return eb.or([...traceFilterCmprs, ...factoryCallTraceCmprs]);
})
.select("checkpoint"),
)
.where("checkpoint", ">", encodeCheckpoint(fromCheckpoint))
.where("checkpoint", "<=", encodeCheckpoint(toCheckpoint))
.orderBy("checkpoint", "desc")
.executeTakeFirst();

return checkpoint
? checkpoint.checkpoint
? decodeCheckpoint(checkpoint.checkpoint)
: undefined
: undefined;
});
}

private buildLogFilterCmprs = ({
eb,
logFilter,
Expand Down
93 changes: 0 additions & 93 deletions packages/core/src/sync-store/sqlite/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2302,99 +2302,6 @@ export class SqliteSyncStore implements SyncStore {
}
}

async getLastEventCheckpoint({
sources,
fromCheckpoint,
toCheckpoint,
}: {
sources: EventSource[];
fromCheckpoint: Checkpoint;
toCheckpoint: Checkpoint;
}): Promise<Checkpoint | undefined> {
return this.db.wrap({ method: "getLastEventCheckpoint" }, async () => {
const checkpoint = await this.db
.selectFrom("logs")
.where((eb) => {
const logFilterCmprs = sources
.filter(sourceIsLog)
.map((logFilter) => {
const exprs = this.buildLogFilterCmprs({ eb, logFilter });
return eb.and(exprs);
});

const factoryLogFilterCmprs = sources
.filter(sourceIsFactoryLog)
.map((factory) => {
const exprs = this.buildFactoryLogFilterCmprs({ eb, factory });
return eb.and(exprs);
});

return eb.or([...logFilterCmprs, ...factoryLogFilterCmprs]);
})
.select("checkpoint")
.unionAll(
this.db
.selectFrom("blocks")
.where((eb) => {
const exprs = [];
const blockFilters = sources.filter(sourceIsBlock);
for (const blockFilter of blockFilters) {
exprs.push(
eb.and([
eb("chainId", "=", blockFilter.chainId),
eb("number", ">=", encodeAsText(blockFilter.startBlock)),
...(blockFilter.endBlock !== undefined
? [eb("number", "<=", encodeAsText(blockFilter.endBlock))]
: []),
sql`(number - ${blockFilter.criteria.offset}) % ${blockFilter.criteria.interval} = 0`,
]),
);
}
return eb.or(exprs);
})
.select("checkpoint"),
)
.unionAll(
this.db
.selectFrom("callTraces")
.where((eb) => {
const traceFilterCmprs = sources
.filter(sourceIsCallTrace)
.map((callTraceSource) => {
const exprs = this.buildTraceFilterCmprs({
eb,
callTraceSource,
});
return eb.and(exprs);
});

const factoryCallTraceCmprs = sources
.filter(sourceIsFactoryCallTrace)
.map((factory) => {
const exprs = this.buildFactoryTraceFilterCmprs({
eb,
factory,
});
return eb.and(exprs);
});

return eb.or([...traceFilterCmprs, ...factoryCallTraceCmprs]);
})
.select("checkpoint"),
)
.where("checkpoint", ">", encodeCheckpoint(fromCheckpoint))
.where("checkpoint", "<=", encodeCheckpoint(toCheckpoint))
.orderBy("checkpoint", "desc")
.executeTakeFirst();

return checkpoint
? checkpoint.checkpoint
? decodeCheckpoint(checkpoint.checkpoint)
: undefined
: undefined;
});
}

private buildLogFilterCmprs = ({
eb,
logFilter,
Expand Down
Loading

0 comments on commit 46c54aa

Please sign in to comment.