Skip to content

Commit

Permalink
start postgres getEvents query performance
Browse files Browse the repository at this point in the history
  • Loading branch information
kyscott18 committed Jun 12, 2024
1 parent b5a0c0e commit 25a104b
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 25 deletions.
2 changes: 1 addition & 1 deletion packages/core/src/bin/utils/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ export async function run({
sources: sources,
fromCheckpoint,
toCheckpoint,
limit: 1_000,
limit: 10_000,
})) {
const result = await handleEvents(
decodeEvents(syncService, rawEvents),
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/common/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export type Options = {
indexingCacheBytes: number;

syncMaxIntervals: number;
syncEventsQuerySize: number;
};

export const buildOptions = ({ cliOptions }: { cliOptions: CliOptions }) => {
Expand Down Expand Up @@ -106,5 +107,6 @@ export const buildOptions = ({ cliOptions }: { cliOptions: CliOptions }) => {
indexingCacheBytes: os.freemem() / 3,

syncMaxIntervals: 50_000,
syncEventsQuerySize: 10_000,
} satisfies Options;
};
48 changes: 34 additions & 14 deletions packages/core/src/sync-store/postgres/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,16 @@ export class PostgresSyncStore implements SyncStore {
db: HeadlessKysely<SyncStoreTables>;
common: Common;

private seconds: number;

constructor({
db,
common,
}: { db: HeadlessKysely<SyncStoreTables>; common: Common }) {
this.db = db;
this.common = common;

this.seconds = common.options.syncEventsQuerySize * 2;
}

insertLogFilterInterval = async ({
Expand Down Expand Up @@ -1700,15 +1704,14 @@ export class PostgresSyncStore implements SyncStore {
sources,
fromCheckpoint,
toCheckpoint,
limit,
}: {
sources: EventSource[];
fromCheckpoint: Checkpoint;
toCheckpoint: Checkpoint;
limit: number;
}) {
let cursor = encodeCheckpoint(fromCheckpoint);
const encodedToCheckpoint = encodeCheckpoint(toCheckpoint);
let fromCursor = encodeCheckpoint(fromCheckpoint);
let toCursor = encodeCheckpoint(toCheckpoint);
const maxToCursor = toCursor;

const sourcesById = sources.reduce<{
[sourceId: string]: (typeof sources)[number];
Expand Down Expand Up @@ -1739,6 +1742,14 @@ export class PostgresSyncStore implements SyncStore {
);

while (true) {
const estimatedToCursor = encodeCheckpoint({
...zeroCheckpoint,
blockTimestamp:
decodeCheckpoint(fromCursor).blockTimestamp + this.seconds,
});
toCursor =
estimatedToCursor > maxToCursor ? maxToCursor : estimatedToCursor;

const events = await this.db.wrap({ method: "getEvents" }, async () => {
// Get full log objects, including the eventSelector clause.
const requestedLogs = await this.db
Expand Down Expand Up @@ -2017,10 +2028,10 @@ export class PostgresSyncStore implements SyncStore {
"transactionReceipts.type as txr_type",
]),
)
.where("events.checkpoint", ">", cursor)
.where("events.checkpoint", "<=", encodedToCheckpoint)
.where("events.checkpoint", ">", fromCursor)
.where("events.checkpoint", "<=", toCursor)
.orderBy("events.checkpoint", "asc")
.limit(limit + 1)
.limit(this.common.options.syncEventsQuerySize)
.execute();

return requestedLogs.map((_row) => {
Expand Down Expand Up @@ -2202,15 +2213,24 @@ export class PostgresSyncStore implements SyncStore {
});
});

const hasNextPage = events.length === limit + 1;
yield events;
if (toCursor === maxToCursor) break;

if (!hasNextPage) {
yield events;
break;
if (events.length === 0) {
this.seconds = Math.round(this.seconds * 2);
} else if (events.length === this.common.options.syncEventsQuerySize) {
this.seconds = Math.round(this.seconds / 2);
fromCursor = events[events.length - 1].encodedCheckpoint;
} else {
events.pop();
cursor = events[events.length - 1].encodedCheckpoint;
yield events;
this.seconds = Math.round(
Math.min(
(this.seconds / events.length) *
this.common.options.syncEventsQuerySize *
0.9,
this.seconds * 2,
),
);
fromCursor = toCursor;
}
}
}
Expand Down
85 changes: 75 additions & 10 deletions packages/core/src/sync/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import {
import type { SyncStore } from "@/sync-store/store.js";
import {
type Checkpoint,
checkpointMax,
checkpointMin,
isCheckpointGreaterThan,
maxCheckpoint,
zeroCheckpoint,
} from "@/utils/checkpoint.js";
import { never } from "@/utils/never.js";
import { type RequestQueue, createRequestQueue } from "@/utils/requestQueue.js";
Expand All @@ -40,6 +42,8 @@ export type Service = {
requestQueue: RequestQueue;
cachedTransport: Transport;

startCheckpoint: Checkpoint;
endCheckpoint: Checkpoint | undefined;
initialFinalizedCheckpoint: Checkpoint;

realtime:
Expand Down Expand Up @@ -220,14 +224,40 @@ export const create = async ({
common,
});

const [{ latestBlock, finalizedBlock }, remoteChainId] =
await Promise.all([
getLatestAndFinalizedBlocks({
network,
requestQueue,
}),
requestQueue.request({ method: "eth_chainId" }).then(hexToNumber),
]);
const hasEndBlock = networkSources.every(
(source) => source.endBlock !== undefined,
);

const [
startBlock,
endBlock,
{ latestBlock, finalizedBlock },
remoteChainId,
] = await Promise.all([
_eth_getBlockByNumber(
{ requestQueue },
{
blockNumber: Math.min(
...networkSources.map((source) => source.startBlock),
),
},
),
hasEndBlock
? _eth_getBlockByNumber(
{ requestQueue },
{
blockNumber: Math.max(
...networkSources.map((source) => source.endBlock!),
),
},
)
: undefined,
getLatestAndFinalizedBlocks({
network,
requestQueue,
}),
requestQueue.request({ method: "eth_chainId" }).then(hexToNumber),
]);

if (network.chainId !== remoteChainId) {
common.logger.warn({
Expand Down Expand Up @@ -279,6 +309,18 @@ export const create = async ({
sources: networkSources,
requestQueue,
cachedTransport: cachedTransport({ requestQueue, syncStore }),
startCheckpoint: {
...zeroCheckpoint,
blockTimestamp: hexToNumber(startBlock.timestamp),
blockNumber: hexToBigInt(startBlock.number),
},
endCheckpoint: endBlock
? {
...zeroCheckpoint,
blockTimestamp: hexToNumber(endBlock.timestamp),
blockNumber: hexToBigInt(endBlock.number),
}
: undefined,
initialFinalizedCheckpoint,
realtime: undefined,
historical: {
Expand All @@ -304,6 +346,18 @@ export const create = async ({
sources: networkSources,
requestQueue,
cachedTransport: cachedTransport({ requestQueue, syncStore }),
startCheckpoint: {
...zeroCheckpoint,
blockTimestamp: hexToNumber(startBlock.timestamp),
blockNumber: hexToBigInt(startBlock.number),
},
endCheckpoint: endBlock
? {
...zeroCheckpoint,
blockTimestamp: hexToNumber(endBlock.timestamp),
blockNumber: hexToBigInt(endBlock.number),
}
: undefined,
initialFinalizedCheckpoint,
realtime: {
realtimeSync,
Expand Down Expand Up @@ -380,7 +434,10 @@ export const create = async ({
sources,
networkServices,
isKilled: false,
checkpoint: initialCheckpoint,
// TODO(kyle)
checkpoint: checkpointMin(
...networkServices.map((ns) => ns.startCheckpoint),
),
finalizedCheckpoint: checkpointMin(
...networkServices.map((ns) => ns.initialFinalizedCheckpoint),
),
Expand Down Expand Up @@ -414,6 +471,14 @@ export const getHistoricalCheckpoint = async function* (
);

if (isComplete) {
const endCheckpoint = syncService.networkServices.every(
(ns) => ns.endCheckpoint !== undefined,
)
? checkpointMax(
...syncService.networkServices.map((ns) => ns.endCheckpoint!),
)
: undefined;

const finalityCheckpoint = checkpointMin(
...syncService.networkServices.map(
({ initialFinalizedCheckpoint }) => initialFinalizedCheckpoint,
Expand All @@ -427,7 +492,7 @@ export const getHistoricalCheckpoint = async function* (

yield {
fromCheckpoint: syncService.checkpoint,
toCheckpoint: finalityCheckpoint,
toCheckpoint: endCheckpoint ?? finalityCheckpoint,
};

syncService.checkpoint = finalityCheckpoint;
Expand Down

0 comments on commit 25a104b

Please sign in to comment.