Skip to content

Commit

Permalink
Revert getEvents performance improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
kyscott18 committed Jun 13, 2024
1 parent 46c54aa commit bfb5925
Show file tree
Hide file tree
Showing 11 changed files with 399 additions and 252 deletions.
35 changes: 22 additions & 13 deletions packages/core/src/bin/utils/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,13 @@ export async function run({
initialCheckpoint,
});

const handleEvents = async (events: Event[], toCheckpoint: Checkpoint) => {
indexingService.updateTotalSeconds(toCheckpoint);
const handleEvents = async (
events: Event[],
lastEventCheckpoint: Checkpoint | undefined,
) => {
if (lastEventCheckpoint !== undefined) {
indexingService.updateLastEventCheckpoint(lastEventCheckpoint);
}

if (events.length === 0) return { status: "success" } as const;

Expand All @@ -150,14 +155,20 @@ export async function run({
worker: async (event: RealtimeEvent) => {
switch (event.type) {
case "newEvents": {
const lastEventCheckpoint = await syncStore.getLastEventCheckpoint({
sources: sources,
fromCheckpoint: event.fromCheckpoint,
toCheckpoint: event.toCheckpoint,
});
for await (const rawEvents of syncStore.getEvents({
sources,
fromCheckpoint: event.fromCheckpoint,
toCheckpoint: event.toCheckpoint,
limit: 1_000,
})) {
const result = await handleEvents(
decodeEvents(syncService, rawEvents),
event.toCheckpoint,
lastEventCheckpoint,
);
if (result.status === "error") onReloadableError(result.error);
}
Expand Down Expand Up @@ -229,14 +240,21 @@ export async function run({
fromCheckpoint,
toCheckpoint,
} of syncService.getHistoricalCheckpoint()) {
const lastEventCheckpoint = await syncStore.getLastEventCheckpoint({
sources: sources,
fromCheckpoint,
toCheckpoint,
});

for await (const rawEvents of syncStore.getEvents({
sources: sources,
fromCheckpoint,
toCheckpoint,
limit: 1_000,
})) {
const result = await handleEvents(
decodeEvents(syncService, rawEvents),
toCheckpoint,
lastEventCheckpoint,
);

if (result.status === "killed") {
Expand All @@ -250,15 +268,6 @@ export async function run({

await historicalStore.flush({ isFullFlush: true });

// set completed indexing metrics
common.metrics.ponder_indexing_completed_seconds.set(
syncService.checkpoint.blockTimestamp -
syncService.startCheckpoint.blockTimestamp,
);
common.metrics.ponder_indexing_completed_timestamp.set(
syncService.checkpoint.blockTimestamp,
);

// Become healthy
common.logger.info({
service: "indexing",
Expand Down
2 changes: 0 additions & 2 deletions packages/core/src/common/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ export type Options = {
indexingCacheBytes: number;

syncMaxIntervals: number;
syncEventsQuerySize: number;
};

export const buildOptions = ({ cliOptions }: { cliOptions: CliOptions }) => {
Expand Down Expand Up @@ -107,6 +106,5 @@ export const buildOptions = ({ cliOptions }: { cliOptions: CliOptions }) => {
indexingCacheBytes: os.freemem() / 3,

syncMaxIntervals: 50_000,
syncEventsQuerySize: 10_000,
} satisfies Options;
};
4 changes: 2 additions & 2 deletions packages/core/src/indexing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
processEvents,
processSetupEvents,
updateIndexingStore,
updateTotalSeconds,
updateLastEventCheckpoint,
} from "./service.js";
import type { Context, Service } from "./service.js";

Expand All @@ -15,7 +15,7 @@ const methods = {
processEvents,
processSetupEvents,
updateIndexingStore,
updateTotalSeconds,
updateLastEventCheckpoint,
};

export const createIndexingService = extend(create, methods);
Expand Down
49 changes: 37 additions & 12 deletions packages/core/src/indexing/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ export type Service = {
eventCount: {
[eventName: string]: { [networkName: string]: number };
};
startCheckpoint: Checkpoint;
firstEventCheckpoint: Checkpoint | undefined;
lastEventCheckpoint: Checkpoint | undefined;

/**
* Reduce memory usage by reserving space for objects ahead of time
Expand Down Expand Up @@ -181,7 +182,8 @@ export const create = ({
indexingStore,
isKilled: false,
eventCount,
startCheckpoint: syncService.startCheckpoint,
firstEventCheckpoint: undefined,
lastEventCheckpoint: undefined,
currentEvent: {
contextState,
context: {
Expand Down Expand Up @@ -272,6 +274,21 @@ export const processEvents = async (
| { status: "success" }
| { status: "killed" }
> => {
// set first event checkpoint
if (events.length > 0 && indexingService.firstEventCheckpoint === undefined) {
indexingService.firstEventCheckpoint = decodeCheckpoint(
events[0].encodedCheckpoint,
);

// set total seconds
if (indexingService.lastEventCheckpoint !== undefined) {
indexingService.common.metrics.ponder_indexing_total_seconds.set(
indexingService.lastEventCheckpoint.blockTimestamp -
indexingService.firstEventCheckpoint.blockTimestamp,
);
}
}

const eventCounts: { [eventName: string]: number } = {};

for (let i = 0; i < events.length; i++) {
Expand Down Expand Up @@ -377,26 +394,30 @@ export const processEvents = async (
).blockTimestamp;

indexingService.common.metrics.ponder_indexing_completed_seconds.set(
eventTimestamp - indexingService.startCheckpoint.blockTimestamp,
eventTimestamp - indexingService.firstEventCheckpoint!.blockTimestamp,
);
indexingService.common.metrics.ponder_indexing_completed_timestamp.set(
eventTimestamp,
);

// Note: allows for terminal and logs to be updated
// Note(kyle) this is only needed for sqlite
await new Promise(setImmediate);
}
}

// set completed seconds
if (events.length > 0) {
if (
events.length > 0 &&
indexingService.firstEventCheckpoint !== undefined &&
indexingService.lastEventCheckpoint !== undefined
) {
const lastEventInBatchTimestamp = decodeCheckpoint(
events[events.length - 1].encodedCheckpoint,
).blockTimestamp;

indexingService.common.metrics.ponder_indexing_completed_seconds.set(
lastEventInBatchTimestamp -
indexingService.startCheckpoint.blockTimestamp,
indexingService.firstEventCheckpoint.blockTimestamp,
);
indexingService.common.metrics.ponder_indexing_completed_timestamp.set(
lastEventInBatchTimestamp,
Expand Down Expand Up @@ -430,14 +451,18 @@ export const kill = (indexingService: Service) => {
indexingService.isKilled = true;
};

export const updateTotalSeconds = (
export const updateLastEventCheckpoint = (
indexingService: Service,
endCheckpoint: Checkpoint,
lastEventCheckpoint: Checkpoint,
) => {
indexingService.common.metrics.ponder_indexing_total_seconds.set(
endCheckpoint.blockTimestamp -
indexingService.startCheckpoint.blockTimestamp,
);
indexingService.lastEventCheckpoint = lastEventCheckpoint;

if (indexingService.firstEventCheckpoint !== undefined) {
indexingService.common.metrics.ponder_indexing_total_seconds.set(
indexingService.lastEventCheckpoint.blockTimestamp -
indexingService.firstEventCheckpoint.blockTimestamp,
);
}
};

const updateCompletedEvents = (indexingService: Service) => {
Expand Down
5 changes: 5 additions & 0 deletions packages/core/src/sync-historical/service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ test("start() adds log filter events to sync store", async (context) => {
sources: [sources[0]],
fromCheckpoint: zeroCheckpoint,
toCheckpoint: maxCheckpoint,
limit: 100,
});
const events = await drainAsyncGenerator(ag);

Expand Down Expand Up @@ -471,6 +472,7 @@ test("start() adds factory events to sync store", async (context) => {
sources: [sources[1]],
fromCheckpoint: zeroCheckpoint,
toCheckpoint: maxCheckpoint,
limit: 100,
});
const events = await drainAsyncGenerator(ag);

Expand Down Expand Up @@ -501,6 +503,7 @@ test("start() adds block filter events to sync store", async (context) => {
sources: [sources[4]],
fromCheckpoint: zeroCheckpoint,
toCheckpoint: maxCheckpoint,
limit: 100,
});
const events = await drainAsyncGenerator(ag);

Expand Down Expand Up @@ -542,6 +545,7 @@ test("start() adds trace filter events to sync store", async (context) => {
sources: [sources[3]],
fromCheckpoint: zeroCheckpoint,
toCheckpoint: maxCheckpoint,
limit: 100,
});
const events = await drainAsyncGenerator(ag);

Expand Down Expand Up @@ -580,6 +584,7 @@ test("start() adds factory trace filter events to sync store", async (context) =
sources: [sources[2]],
fromCheckpoint: zeroCheckpoint,
toCheckpoint: maxCheckpoint,
limit: 100,
});
const events = await drainAsyncGenerator(ag);

Expand Down
Loading

0 comments on commit bfb5925

Please sign in to comment.