diff --git a/packages/core/package.json b/packages/core/package.json index de01553c4..425e48730 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -72,6 +72,7 @@ "kysely-pglite": "^0.6.0", "pg": "^8.11.3", "pg-connection-string": "^2.6.2", + "pg-cursor": "^2.12.1", "picocolors": "^1.0.0", "pino": "^8.16.2", "prom-client": "^15.0.0", @@ -87,6 +88,7 @@ "@types/node": "^20.10.0", "@types/pg": "^8.10.9", "@types/pg-copy-streams": "^1.2.5", + "@types/pg-cursor": "^2.7.2", "@types/react": "^18.2.38", "@viem/anvil": "^0.0.6", "@wagmi/cli": "^1.5.2", diff --git a/packages/core/src/database/index.ts b/packages/core/src/database/index.ts index cce8ebab8..4b52243d6 100644 --- a/packages/core/src/database/index.ts +++ b/packages/core/src/database/index.ts @@ -41,6 +41,7 @@ import { } from "kysely"; import { KyselyPGlite } from "kysely-pglite"; import type { Pool } from "pg"; +import Cursor from "pg-cursor"; import prometheus from "prom-client"; import { HeadlessKysely } from "./kysely.js"; @@ -249,7 +250,9 @@ export const createDatabase = (args: { internal: new HeadlessKysely({ name: "internal", common: args.common, - dialect: new PostgresDialect({ pool: driver.internal }), + dialect: new PostgresDialect({ + pool: driver.internal, + }), log(event) { if (event.level === "query") { args.common.metrics.ponder_postgres_query_total.inc({ @@ -288,7 +291,8 @@ export const createDatabase = (args: { sync: new HeadlessKysely({ name: "sync", common: args.common, - dialect: new PostgresDialect({ pool: driver.sync }), + allowCursor: true, + dialect: new PostgresDialect({ pool: driver.sync, cursor: Cursor }), log(event) { if (event.level === "query") { args.common.metrics.ponder_postgres_query_total.inc({ diff --git a/packages/core/src/database/kysely.ts b/packages/core/src/database/kysely.ts index 714dd91f2..8f1298eb5 100644 --- a/packages/core/src/database/kysely.ts +++ b/packages/core/src/database/kysely.ts @@ -11,15 +11,21 @@ export class HeadlessKysely extends Kysely { private common: Common; private name: string; private isKilled = false; + readonly allowCursor: boolean; constructor({ common, name, ...args - }: (KyselyConfig | KyselyProps) & { name: string; common: Common }) { + }: (KyselyConfig | KyselyProps) & { + name: string; + common: Common; + allowCursor?: boolean; + }) { super(args); this.common = common; this.name = name; + this.allowCursor = args.allowCursor ?? false; } override async destroy() { diff --git a/packages/core/src/sync-store/index.ts b/packages/core/src/sync-store/index.ts index bd070a815..aa0a8659b 100644 --- a/packages/core/src/sync-store/index.ts +++ b/packages/core/src/sync-store/index.ts @@ -45,6 +45,7 @@ import { } from "./encoding.js"; export type SyncStore = { + allowCursor: boolean; insertIntervals(args: { intervals: { filter: Filter; @@ -97,6 +98,13 @@ export type SyncStore = { to: string; limit: number; }): Promise<{ events: RawEvent[]; cursor: string }>; + /** Returns an ordered list of events based on the `filters` and pagination arguments. */ + getEventsStream(args: { + filters: Filter[]; + from: string; + to: string; + limit: number; + }): AsyncGenerator; insertRpcRequestResult(args: { request: string; chainId: number; @@ -117,6 +125,501 @@ export type SyncStore = { }): Promise; }; +const formatRawEvent = ({ filters, row }: { filters: Filter[]; row: any }) => { + // Without this cast, the block_ and tx_ fields are all nullable + // which makes this very annoying. Should probably add a runtime check + // that those fields are indeed present before continuing here. + row = row as NonNull; + + const filter = filters[row.event_filterIndex]!; + + const hasLog = row.log_id !== null; + const hasTransaction = row.tx_hash !== null; + const hasTrace = row.trace_id !== null; + const hasTransactionReceipt = shouldGetTransactionReceipt(filter); + + const event = { + chainId: filter.chainId, + sourceIndex: Number(row.event_filterIndex), + checkpoint: row.event_checkpoint, + block: { + baseFeePerGas: + row.block_baseFeePerGas !== null + ? BigInt(row.block_baseFeePerGas) + : null, + difficulty: BigInt(row.block_difficulty), + extraData: row.block_extraData, + gasLimit: BigInt(row.block_gasLimit), + gasUsed: BigInt(row.block_gasUsed), + hash: row.block_hash, + logsBloom: row.block_logsBloom, + miner: checksumAddress(row.block_miner), + mixHash: row.block_mixHash, + nonce: row.block_nonce, + number: BigInt(row.block_number), + parentHash: row.block_parentHash, + receiptsRoot: row.block_receiptsRoot, + sha3Uncles: row.block_sha3Uncles, + size: BigInt(row.block_size), + stateRoot: row.block_stateRoot, + timestamp: BigInt(row.block_timestamp), + totalDifficulty: + row.block_totalDifficulty !== null + ? BigInt(row.block_totalDifficulty) + : null, + transactionsRoot: row.block_transactionsRoot, + }, + log: hasLog + ? { + address: checksumAddress(row.log_address!), + data: row.log_data, + id: row.log_id as Log["id"], + logIndex: Number(row.log_logIndex), + removed: false, + topics: [ + row.log_topic0, + row.log_topic1, + row.log_topic2, + row.log_topic3, + ].filter((t): t is Hex => t !== null) as [Hex, ...Hex[]] | [], + } + : undefined, + transaction: hasTransaction + ? { + from: checksumAddress(row.tx_from), + gas: BigInt(row.tx_gas), + hash: row.tx_hash, + input: row.tx_input, + nonce: Number(row.tx_nonce), + r: row.tx_r, + s: row.tx_s, + to: row.tx_to ? checksumAddress(row.tx_to) : row.tx_to, + transactionIndex: Number(row.tx_transactionIndex), + value: BigInt(row.tx_value), + v: row.tx_v !== null ? BigInt(row.tx_v) : null, + ...(row.tx_type === "0x0" + ? { + type: "legacy", + gasPrice: BigInt(row.tx_gasPrice), + } + : row.tx_type === "0x1" + ? { + type: "eip2930", + gasPrice: BigInt(row.tx_gasPrice), + accessList: JSON.parse(row.tx_accessList), + } + : row.tx_type === "0x2" + ? { + type: "eip1559", + maxFeePerGas: BigInt(row.tx_maxFeePerGas), + maxPriorityFeePerGas: BigInt(row.tx_maxPriorityFeePerGas), + } + : row.tx_type === "0x7e" + ? { + type: "deposit", + maxFeePerGas: + row.tx_maxFeePerGas !== null + ? BigInt(row.tx_maxFeePerGas) + : undefined, + maxPriorityFeePerGas: + row.tx_maxPriorityFeePerGas !== null + ? BigInt(row.tx_maxPriorityFeePerGas) + : undefined, + } + : { + type: row.tx_type, + }), + } + : undefined, + trace: hasTrace + ? { + id: row.trace_id, + type: row.trace_callType as Trace["type"], + from: checksumAddress(row.trace_from), + to: checksumAddress(row.trace_to), + gas: BigInt(row.trace_gas), + gasUsed: BigInt(row.trace_gasUsed), + input: row.trace_input, + output: row.trace_output, + value: BigInt(row.trace_value), + traceIndex: Number(row.trace_index), + subcalls: Number(row.trace_subcalls), + } + : undefined, + transactionReceipt: hasTransactionReceipt + ? { + contractAddress: row.txr_contractAddress + ? checksumAddress(row.txr_contractAddress) + : null, + cumulativeGasUsed: BigInt(row.txr_cumulativeGasUsed), + effectiveGasPrice: BigInt(row.txr_effectiveGasPrice), + from: checksumAddress(row.txr_from), + gasUsed: BigInt(row.txr_gasUsed), + logsBloom: row.txr_logsBloom, + status: + row.txr_status === "0x1" + ? "success" + : row.txr_status === "0x0" + ? "reverted" + : (row.txr_status as TransactionReceipt["status"]), + to: row.txr_to ? checksumAddress(row.txr_to) : null, + type: + row.txr_type === "0x0" + ? "legacy" + : row.txr_type === "0x1" + ? "eip2930" + : row.tx_type === "0x2" + ? "eip1559" + : row.tx_type === "0x7e" + ? "deposit" + : row.tx_type, + } + : undefined, + } satisfies RawEvent; + return event; +}; + +const getEventsQuery = ({ + filters, + from, + to, + limit, + db, +}: { + filters: Filter[]; + from: string; + to: string; + limit: number; + db: Kysely; +}) => { + const addressSQL = ( + qb: SelectQueryBuilder, + address: LogFilter["address"], + column: "address" | "from" | "to", + ) => { + if (typeof address === "string") return qb.where(column, "=", address); + if (isAddressFactory(address)) { + return qb.where( + column, + "in", + db.selectFrom("logs").$call((qb) => logFactorySQL(qb, address)), + ); + } + if (Array.isArray(address)) return qb.where(column, "in", address); + + return qb; + }; + + const logSQL = ( + filter: LogFilter, + db: Kysely, + index: number, + ) => + db + .selectFrom("logs") + .select([ + ksql.raw(`'${index}'`).as("filterIndex"), + "checkpoint", + "chainId", + "blockHash", + "transactionHash", + "id as logId", + ksql`null`.as("traceId"), + ]) + .where("chainId", "=", filter.chainId) + .$call((qb) => { + for (const idx of [0, 1, 2, 3] as const) { + // If it's an array of length 1, collapse it. + const raw = filter[`topic${idx}`] ?? null; + if (raw === null) continue; + const topic = Array.isArray(raw) && raw.length === 1 ? raw[0]! : raw; + if (Array.isArray(topic)) { + qb = qb.where((eb) => + eb.or(topic.map((t) => eb(`logs.topic${idx}`, "=", t))), + ); + } else { + qb = qb.where(`logs.topic${idx}`, "=", topic); + } + } + return qb; + }) + .$call((qb) => addressSQL(qb as any, filter.address, "address")) + .$if(filter.fromBlock !== undefined, (qb) => + qb.where("blockNumber", ">=", filter.fromBlock!.toString()), + ) + .$if(filter.toBlock !== undefined, (qb) => + qb.where("blockNumber", "<=", filter.toBlock!.toString()), + ); + + const blockSQL = ( + filter: BlockFilter, + db: Kysely, + index: number, + ) => + db + .selectFrom("blocks") + .select([ + ksql.raw(`'${index}'`).as("filterIndex"), + "checkpoint", + "chainId", + "hash as blockHash", + ksql`null`.as("transactionHash"), + ksql`null`.as("logId"), + ksql`null`.as("traceId"), + ]) + .where("chainId", "=", filter.chainId) + .$if(filter !== undefined && filter.interval !== undefined, (qb) => + qb.where(ksql`(number - ${filter.offset}) % ${filter.interval} = 0`), + ) + .$if(filter.fromBlock !== undefined, (qb) => + qb.where("number", ">=", filter.fromBlock!.toString()), + ) + .$if(filter.toBlock !== undefined, (qb) => + qb.where("number", "<=", filter.toBlock!.toString()), + ); + + const transactionSQL = ( + filter: TransactionFilter, + db: Kysely, + index: number, + ) => + db + .selectFrom("transactions") + .select([ + ksql.raw(`'${index}'`).as("filterIndex"), + "checkpoint", + "chainId", + "blockHash", + "hash as transactionHash", + ksql`null`.as("logId"), + ksql`null`.as("traceId"), + ]) + .where("chainId", "=", filter.chainId) + .$call((qb) => addressSQL(qb as any, filter.fromAddress, "from")) + .$call((qb) => addressSQL(qb as any, filter.toAddress, "to")) + .$if(filter.includeReverted === false, (qb) => + qb.where( + db + .selectFrom("transactionReceipts") + .select("status") + .where( + "transactionReceipts.transactionHash", + "=", + sql.ref("transactions.hash"), + ), + "=", + "0x1", + ), + ) + .$if(filter.fromBlock !== undefined, (qb) => + qb.where("blockNumber", ">=", filter.fromBlock!.toString()), + ) + .$if(filter.toBlock !== undefined, (qb) => + qb.where("blockNumber", "<=", filter.toBlock!.toString()), + ); + + const transferSQL = ( + filter: TransferFilter, + db: Kysely, + index: number, + ) => + db + .selectFrom("traces") + .select([ + ksql.raw(`'${index}'`).as("filterIndex"), + "checkpoint", + "chainId", + "blockHash", + "transactionHash", + ksql`null`.as("logId"), + "id as traceId", + ]) + .where("chainId", "=", filter.chainId) + .$call((qb) => addressSQL(qb as any, filter.fromAddress, "from")) + .$call((qb) => addressSQL(qb as any, filter.toAddress, "to")) + .where("value", ">", "0") + .$if(filter.includeReverted === false, (qb) => + qb.where("isReverted", "=", 0), + ) + .$if(filter.fromBlock !== undefined, (qb) => + qb.where("blockNumber", ">=", filter.fromBlock!.toString()), + ) + .$if(filter.toBlock !== undefined, (qb) => + qb.where("blockNumber", "<=", filter.toBlock!.toString()), + ); + + const traceSQL = ( + filter: TraceFilter, + db: Kysely, + index: number, + ) => + db + .selectFrom("traces") + .select([ + ksql.raw(`'${index}'`).as("filterIndex"), + "checkpoint", + "chainId", + "blockHash", + "transactionHash", + ksql`null`.as("logId"), + "id as traceId", + ]) + .where("chainId", "=", filter.chainId) + .$call((qb) => addressSQL(qb as any, filter.fromAddress, "from")) + .$call((qb) => addressSQL(qb as any, filter.toAddress, "to")) + .$if(filter.includeReverted === false, (qb) => + qb.where("isReverted", "=", 0), + ) + .$if(filter.callType !== undefined, (qb) => + qb.where("type", "=", filter.callType!), + ) + .$if(filter.functionSelector !== undefined, (qb) => { + if (Array.isArray(filter.functionSelector)) { + return qb.where("functionSelector", "in", filter.functionSelector!); + } else { + return qb.where("functionSelector", "=", filter.functionSelector!); + } + }) + .$if(filter.fromBlock !== undefined, (qb) => + qb.where("blockNumber", ">=", filter.fromBlock!.toString()), + ) + .$if(filter.toBlock !== undefined, (qb) => + qb.where("blockNumber", "<=", filter.toBlock!.toString()), + ); + + let query: + | SelectQueryBuilder< + PonderSyncSchema, + "logs" | "blocks" | "traces" | "transactions", + { + filterIndex: number; + checkpoint: string; + chainId: number; + logId: string; + blockHash: string; + transactionHash: string; + traceId: string; + } + > + | undefined; + + for (let i = 0; i < filters.length; i++) { + const filter = filters[i]!; + + const _query = + filter.type === "log" + ? logSQL(filter, db, i) + : filter.type === "block" + ? blockSQL(filter, db, i) + : filter.type === "transaction" + ? transactionSQL(filter, db, i) + : filter.type === "transfer" + ? transferSQL(filter, db, i) + : traceSQL(filter, db, i); + + // @ts-ignore + query = query === undefined ? _query : query.unionAll(_query); + } + + return db + .with("event", () => query!) + .selectFrom("event") + .select([ + "event.filterIndex as event_filterIndex", + "event.checkpoint as event_checkpoint", + ]) + .innerJoin("blocks", "blocks.hash", "event.blockHash") + .select([ + "blocks.baseFeePerGas as block_baseFeePerGas", + "blocks.difficulty as block_difficulty", + "blocks.extraData as block_extraData", + "blocks.gasLimit as block_gasLimit", + "blocks.gasUsed as block_gasUsed", + "blocks.hash as block_hash", + "blocks.logsBloom as block_logsBloom", + "blocks.miner as block_miner", + "blocks.mixHash as block_mixHash", + "blocks.nonce as block_nonce", + "blocks.number as block_number", + "blocks.parentHash as block_parentHash", + "blocks.receiptsRoot as block_receiptsRoot", + "blocks.sha3Uncles as block_sha3Uncles", + "blocks.size as block_size", + "blocks.stateRoot as block_stateRoot", + "blocks.timestamp as block_timestamp", + "blocks.totalDifficulty as block_totalDifficulty", + "blocks.transactionsRoot as block_transactionsRoot", + ]) + .leftJoin("logs", "logs.id", "event.logId") + .select([ + "logs.address as log_address", + "logs.chainId as log_chainId", + "logs.data as log_data", + "logs.id as log_id", + "logs.logIndex as log_logIndex", + "logs.topic0 as log_topic0", + "logs.topic1 as log_topic1", + "logs.topic2 as log_topic2", + "logs.topic3 as log_topic3", + ]) + .leftJoin("transactions", "transactions.hash", "event.transactionHash") + .select([ + "transactions.accessList as tx_accessList", + "transactions.from as tx_from", + "transactions.gas as tx_gas", + "transactions.gasPrice as tx_gasPrice", + "transactions.hash as tx_hash", + "transactions.input as tx_input", + "transactions.maxFeePerGas as tx_maxFeePerGas", + "transactions.maxPriorityFeePerGas as tx_maxPriorityFeePerGas", + "transactions.nonce as tx_nonce", + "transactions.r as tx_r", + "transactions.s as tx_s", + "transactions.to as tx_to", + "transactions.transactionIndex as tx_transactionIndex", + "transactions.type as tx_type", + "transactions.value as tx_value", + "transactions.v as tx_v", + ]) + .leftJoin("traces", "traces.id", "event.traceId") + .select([ + "traces.id as trace_id", + "traces.type as trace_callType", + "traces.from as trace_from", + "traces.to as trace_to", + "traces.gas as trace_gas", + "traces.gasUsed as trace_gasUsed", + "traces.input as trace_input", + "traces.output as trace_output", + "traces.error as trace_error", + "traces.revertReason as trace_revertReason", + "traces.value as trace_value", + "traces.index as trace_index", + "traces.subcalls as trace_subcalls", + ]) + .leftJoin( + "transactionReceipts", + "transactionReceipts.transactionHash", + "event.transactionHash", + ) + .select([ + "transactionReceipts.contractAddress as txr_contractAddress", + "transactionReceipts.cumulativeGasUsed as txr_cumulativeGasUsed", + "transactionReceipts.effectiveGasPrice as txr_effectiveGasPrice", + "transactionReceipts.from as txr_from", + "transactionReceipts.gasUsed as txr_gasUsed", + "transactionReceipts.logsBloom as txr_logsBloom", + "transactionReceipts.status as txr_status", + "transactionReceipts.to as txr_to", + "transactionReceipts.type as txr_type", + ]) + .where("event.checkpoint", ">", from) + .where("event.checkpoint", "<=", to) + .orderBy("event.checkpoint", "asc") + .orderBy("event.filterIndex", "asc") + .limit(limit); +}; + const logFactorySQL = ( qb: SelectQueryBuilder, factory: LogFactory, @@ -156,6 +659,7 @@ export const createSyncStore = ({ common: Common; db: HeadlessKysely; }): SyncStore => ({ + allowCursor: db.allowCursor, insertIntervals: async ({ intervals }) => { if (intervals.length === 0) return; @@ -474,206 +978,7 @@ export const createSyncStore = ({ }); }, getEvents: async ({ filters, from, to, limit }) => { - const addressSQL = ( - qb: SelectQueryBuilder< - PonderSyncSchema, - "logs" | "blocks" | "traces", - {} - >, - address: LogFilter["address"], - column: "address" | "from" | "to", - ) => { - if (typeof address === "string") return qb.where(column, "=", address); - if (isAddressFactory(address)) { - return qb.where( - column, - "in", - db.selectFrom("logs").$call((qb) => logFactorySQL(qb, address)), - ); - } - if (Array.isArray(address)) return qb.where(column, "in", address); - - return qb; - }; - - const logSQL = ( - filter: LogFilter, - db: Kysely, - index: number, - ) => - db - .selectFrom("logs") - .select([ - ksql.raw(`'${index}'`).as("filterIndex"), - "checkpoint", - "chainId", - "blockHash", - "transactionHash", - "id as logId", - ksql`null`.as("traceId"), - ]) - .where("chainId", "=", filter.chainId) - .$call((qb) => { - for (const idx of [0, 1, 2, 3] as const) { - // If it's an array of length 1, collapse it. - const raw = filter[`topic${idx}`] ?? null; - if (raw === null) continue; - const topic = - Array.isArray(raw) && raw.length === 1 ? raw[0]! : raw; - if (Array.isArray(topic)) { - qb = qb.where((eb) => - eb.or(topic.map((t) => eb(`logs.topic${idx}`, "=", t))), - ); - } else { - qb = qb.where(`logs.topic${idx}`, "=", topic); - } - } - return qb; - }) - .$call((qb) => addressSQL(qb as any, filter.address, "address")) - .$if(filter.fromBlock !== undefined, (qb) => - qb.where("blockNumber", ">=", filter.fromBlock!.toString()), - ) - .$if(filter.toBlock !== undefined, (qb) => - qb.where("blockNumber", "<=", filter.toBlock!.toString()), - ); - - const blockSQL = ( - filter: BlockFilter, - db: Kysely, - index: number, - ) => - db - .selectFrom("blocks") - .select([ - ksql.raw(`'${index}'`).as("filterIndex"), - "checkpoint", - "chainId", - "hash as blockHash", - ksql`null`.as("transactionHash"), - ksql`null`.as("logId"), - ksql`null`.as("traceId"), - ]) - .where("chainId", "=", filter.chainId) - .$if(filter !== undefined && filter.interval !== undefined, (qb) => - qb.where(ksql`(number - ${filter.offset}) % ${filter.interval} = 0`), - ) - .$if(filter.fromBlock !== undefined, (qb) => - qb.where("number", ">=", filter.fromBlock!.toString()), - ) - .$if(filter.toBlock !== undefined, (qb) => - qb.where("number", "<=", filter.toBlock!.toString()), - ); - - const transactionSQL = ( - filter: TransactionFilter, - db: Kysely, - index: number, - ) => - db - .selectFrom("transactions") - .select([ - ksql.raw(`'${index}'`).as("filterIndex"), - "checkpoint", - "chainId", - "blockHash", - "hash as transactionHash", - ksql`null`.as("logId"), - ksql`null`.as("traceId"), - ]) - .where("chainId", "=", filter.chainId) - .$call((qb) => addressSQL(qb as any, filter.fromAddress, "from")) - .$call((qb) => addressSQL(qb as any, filter.toAddress, "to")) - .$if(filter.includeReverted === false, (qb) => - qb.where( - db - .selectFrom("transactionReceipts") - .select("status") - .where( - "transactionReceipts.transactionHash", - "=", - sql.ref("transactions.hash"), - ), - "=", - "0x1", - ), - ) - .$if(filter.fromBlock !== undefined, (qb) => - qb.where("blockNumber", ">=", filter.fromBlock!.toString()), - ) - .$if(filter.toBlock !== undefined, (qb) => - qb.where("blockNumber", "<=", filter.toBlock!.toString()), - ); - - const transferSQL = ( - filter: TransferFilter, - db: Kysely, - index: number, - ) => - db - .selectFrom("traces") - .select([ - ksql.raw(`'${index}'`).as("filterIndex"), - "checkpoint", - "chainId", - "blockHash", - "transactionHash", - ksql`null`.as("logId"), - "id as traceId", - ]) - .where("chainId", "=", filter.chainId) - .$call((qb) => addressSQL(qb as any, filter.fromAddress, "from")) - .$call((qb) => addressSQL(qb as any, filter.toAddress, "to")) - .where("value", ">", "0") - .$if(filter.includeReverted === false, (qb) => - qb.where("isReverted", "=", 0), - ) - .$if(filter.fromBlock !== undefined, (qb) => - qb.where("blockNumber", ">=", filter.fromBlock!.toString()), - ) - .$if(filter.toBlock !== undefined, (qb) => - qb.where("blockNumber", "<=", filter.toBlock!.toString()), - ); - - const traceSQL = ( - filter: TraceFilter, - db: Kysely, - index: number, - ) => - db - .selectFrom("traces") - .select([ - ksql.raw(`'${index}'`).as("filterIndex"), - "checkpoint", - "chainId", - "blockHash", - "transactionHash", - ksql`null`.as("logId"), - "id as traceId", - ]) - .where("chainId", "=", filter.chainId) - .$call((qb) => addressSQL(qb as any, filter.fromAddress, "from")) - .$call((qb) => addressSQL(qb as any, filter.toAddress, "to")) - .$if(filter.includeReverted === false, (qb) => - qb.where("isReverted", "=", 0), - ) - .$if(filter.callType !== undefined, (qb) => - qb.where("type", "=", filter.callType!), - ) - .$if(filter.functionSelector !== undefined, (qb) => { - if (Array.isArray(filter.functionSelector)) { - return qb.where("functionSelector", "in", filter.functionSelector!); - } else { - return qb.where("functionSelector", "=", filter.functionSelector!); - } - }) - .$if(filter.fromBlock !== undefined, (qb) => - qb.where("blockNumber", ">=", filter.fromBlock!.toString()), - ) - .$if(filter.toBlock !== undefined, (qb) => - qb.where("blockNumber", "<=", filter.toBlock!.toString()), - ); - + const query = getEventsQuery({ filters, from, to, limit, db }); const rows = await db.wrap( { method: "getEvents", @@ -682,299 +987,13 @@ export const createSyncStore = ({ }, }, async () => { - let query: - | SelectQueryBuilder< - PonderSyncSchema, - "logs" | "blocks" | "traces" | "transactions", - { - filterIndex: number; - checkpoint: string; - chainId: number; - logId: string; - blockHash: string; - transactionHash: string; - traceId: string; - } - > - | undefined; - - for (let i = 0; i < filters.length; i++) { - const filter = filters[i]!; - - const _query = - filter.type === "log" - ? logSQL(filter, db, i) - : filter.type === "block" - ? blockSQL(filter, db, i) - : filter.type === "transaction" - ? transactionSQL(filter, db, i) - : filter.type === "transfer" - ? transferSQL(filter, db, i) - : traceSQL(filter, db, i); - - // @ts-ignore - query = query === undefined ? _query : query.unionAll(_query); - } - - return await db - .with("event", () => query!) - .selectFrom("event") - .select([ - "event.filterIndex as event_filterIndex", - "event.checkpoint as event_checkpoint", - ]) - .innerJoin("blocks", "blocks.hash", "event.blockHash") - .select([ - "blocks.baseFeePerGas as block_baseFeePerGas", - "blocks.difficulty as block_difficulty", - "blocks.extraData as block_extraData", - "blocks.gasLimit as block_gasLimit", - "blocks.gasUsed as block_gasUsed", - "blocks.hash as block_hash", - "blocks.logsBloom as block_logsBloom", - "blocks.miner as block_miner", - "blocks.mixHash as block_mixHash", - "blocks.nonce as block_nonce", - "blocks.number as block_number", - "blocks.parentHash as block_parentHash", - "blocks.receiptsRoot as block_receiptsRoot", - "blocks.sha3Uncles as block_sha3Uncles", - "blocks.size as block_size", - "blocks.stateRoot as block_stateRoot", - "blocks.timestamp as block_timestamp", - "blocks.totalDifficulty as block_totalDifficulty", - "blocks.transactionsRoot as block_transactionsRoot", - ]) - .leftJoin("logs", "logs.id", "event.logId") - .select([ - "logs.address as log_address", - "logs.chainId as log_chainId", - "logs.data as log_data", - "logs.id as log_id", - "logs.logIndex as log_logIndex", - "logs.topic0 as log_topic0", - "logs.topic1 as log_topic1", - "logs.topic2 as log_topic2", - "logs.topic3 as log_topic3", - ]) - .leftJoin( - "transactions", - "transactions.hash", - "event.transactionHash", - ) - .select([ - "transactions.accessList as tx_accessList", - "transactions.from as tx_from", - "transactions.gas as tx_gas", - "transactions.gasPrice as tx_gasPrice", - "transactions.hash as tx_hash", - "transactions.input as tx_input", - "transactions.maxFeePerGas as tx_maxFeePerGas", - "transactions.maxPriorityFeePerGas as tx_maxPriorityFeePerGas", - "transactions.nonce as tx_nonce", - "transactions.r as tx_r", - "transactions.s as tx_s", - "transactions.to as tx_to", - "transactions.transactionIndex as tx_transactionIndex", - "transactions.type as tx_type", - "transactions.value as tx_value", - "transactions.v as tx_v", - ]) - .leftJoin("traces", "traces.id", "event.traceId") - .select([ - "traces.id as trace_id", - "traces.type as trace_callType", - "traces.from as trace_from", - "traces.to as trace_to", - "traces.gas as trace_gas", - "traces.gasUsed as trace_gasUsed", - "traces.input as trace_input", - "traces.output as trace_output", - "traces.error as trace_error", - "traces.revertReason as trace_revertReason", - "traces.value as trace_value", - "traces.index as trace_index", - "traces.subcalls as trace_subcalls", - ]) - .leftJoin( - "transactionReceipts", - "transactionReceipts.transactionHash", - "event.transactionHash", - ) - .select([ - "transactionReceipts.contractAddress as txr_contractAddress", - "transactionReceipts.cumulativeGasUsed as txr_cumulativeGasUsed", - "transactionReceipts.effectiveGasPrice as txr_effectiveGasPrice", - "transactionReceipts.from as txr_from", - "transactionReceipts.gasUsed as txr_gasUsed", - "transactionReceipts.logsBloom as txr_logsBloom", - "transactionReceipts.status as txr_status", - "transactionReceipts.to as txr_to", - "transactionReceipts.type as txr_type", - ]) - .where("event.checkpoint", ">", from) - .where("event.checkpoint", "<=", to) - .orderBy("event.checkpoint", "asc") - .orderBy("event.filterIndex", "asc") - .limit(limit) - .execute(); + return query.execute(); }, ); - - const events = rows.map((_row) => { - // Without this cast, the block_ and tx_ fields are all nullable - // which makes this very annoying. Should probably add a runtime check - // that those fields are indeed present before continuing here. - const row = _row as NonNull<(typeof rows)[number]>; - - const filter = filters[row.event_filterIndex]!; - - const hasLog = row.log_id !== null; - const hasTransaction = row.tx_hash !== null; - const hasTrace = row.trace_id !== null; - const hasTransactionReceipt = shouldGetTransactionReceipt(filter); - - return { - chainId: filter.chainId, - sourceIndex: Number(row.event_filterIndex), - checkpoint: row.event_checkpoint, - block: { - baseFeePerGas: - row.block_baseFeePerGas !== null - ? BigInt(row.block_baseFeePerGas) - : null, - difficulty: BigInt(row.block_difficulty), - extraData: row.block_extraData, - gasLimit: BigInt(row.block_gasLimit), - gasUsed: BigInt(row.block_gasUsed), - hash: row.block_hash, - logsBloom: row.block_logsBloom, - miner: checksumAddress(row.block_miner), - mixHash: row.block_mixHash, - nonce: row.block_nonce, - number: BigInt(row.block_number), - parentHash: row.block_parentHash, - receiptsRoot: row.block_receiptsRoot, - sha3Uncles: row.block_sha3Uncles, - size: BigInt(row.block_size), - stateRoot: row.block_stateRoot, - timestamp: BigInt(row.block_timestamp), - totalDifficulty: - row.block_totalDifficulty !== null - ? BigInt(row.block_totalDifficulty) - : null, - transactionsRoot: row.block_transactionsRoot, - }, - log: hasLog - ? { - address: checksumAddress(row.log_address!), - data: row.log_data, - id: row.log_id as Log["id"], - logIndex: Number(row.log_logIndex), - removed: false, - topics: [ - row.log_topic0, - row.log_topic1, - row.log_topic2, - row.log_topic3, - ].filter((t): t is Hex => t !== null) as [Hex, ...Hex[]] | [], - } - : undefined, - transaction: hasTransaction - ? { - from: checksumAddress(row.tx_from), - gas: BigInt(row.tx_gas), - hash: row.tx_hash, - input: row.tx_input, - nonce: Number(row.tx_nonce), - r: row.tx_r, - s: row.tx_s, - to: row.tx_to ? checksumAddress(row.tx_to) : row.tx_to, - transactionIndex: Number(row.tx_transactionIndex), - value: BigInt(row.tx_value), - v: row.tx_v !== null ? BigInt(row.tx_v) : null, - ...(row.tx_type === "0x0" - ? { - type: "legacy", - gasPrice: BigInt(row.tx_gasPrice), - } - : row.tx_type === "0x1" - ? { - type: "eip2930", - gasPrice: BigInt(row.tx_gasPrice), - accessList: JSON.parse(row.tx_accessList), - } - : row.tx_type === "0x2" - ? { - type: "eip1559", - maxFeePerGas: BigInt(row.tx_maxFeePerGas), - maxPriorityFeePerGas: BigInt( - row.tx_maxPriorityFeePerGas, - ), - } - : row.tx_type === "0x7e" - ? { - type: "deposit", - maxFeePerGas: - row.tx_maxFeePerGas !== null - ? BigInt(row.tx_maxFeePerGas) - : undefined, - maxPriorityFeePerGas: - row.tx_maxPriorityFeePerGas !== null - ? BigInt(row.tx_maxPriorityFeePerGas) - : undefined, - } - : { - type: row.tx_type, - }), - } - : undefined, - trace: hasTrace - ? { - id: row.trace_id, - type: row.trace_callType as Trace["type"], - from: checksumAddress(row.trace_from), - to: checksumAddress(row.trace_to), - gas: BigInt(row.trace_gas), - gasUsed: BigInt(row.trace_gasUsed), - input: row.trace_input, - output: row.trace_output, - value: BigInt(row.trace_value), - traceIndex: Number(row.trace_index), - subcalls: Number(row.trace_subcalls), - } - : undefined, - transactionReceipt: hasTransactionReceipt - ? { - contractAddress: row.txr_contractAddress - ? checksumAddress(row.txr_contractAddress) - : null, - cumulativeGasUsed: BigInt(row.txr_cumulativeGasUsed), - effectiveGasPrice: BigInt(row.txr_effectiveGasPrice), - from: checksumAddress(row.txr_from), - gasUsed: BigInt(row.txr_gasUsed), - logsBloom: row.txr_logsBloom, - status: - row.txr_status === "0x1" - ? "success" - : row.txr_status === "0x0" - ? "reverted" - : (row.txr_status as TransactionReceipt["status"]), - to: row.txr_to ? checksumAddress(row.txr_to) : null, - type: - row.txr_type === "0x0" - ? "legacy" - : row.txr_type === "0x1" - ? "eip2930" - : row.tx_type === "0x2" - ? "eip1559" - : row.tx_type === "0x7e" - ? "deposit" - : row.tx_type, - } - : undefined, - } satisfies RawEvent; - }); + const events: RawEvent[] = []; + for (const row of rows) { + events.push(formatRawEvent({ filters, row })); + } let cursor: string; if (events.length !== limit) { @@ -982,9 +1001,30 @@ export const createSyncStore = ({ } else { cursor = events[events.length - 1]!.checkpoint!; } - return { events, cursor }; }, + getEventsStream: async function* ({ + filters, + from, + to, + limit, + }): AsyncGenerator { + const query = getEventsQuery({ filters, from, to, limit, db }); + const stream = await db.wrap( + { + method: "getEventsStream", + shouldRetry(error) { + return error.message.includes("statement timeout") === false; + }, + }, + async () => { + return query.stream(10000); + }, + ); + for await (const row of stream) { + yield formatRawEvent({ filters, row }); + } + }, insertRpcRequestResult: async ({ request, blockNumber, chainId, result }) => db.wrap({ method: "insertRpcRequestResult" }, async () => { await db @@ -1001,64 +1041,77 @@ export const createSyncStore = ({ .execute(); }), getRpcRequestResult: async ({ request, chainId }) => - db.wrap({ method: "getRpcRequestResult" }, async () => { - const result = await db - .selectFrom("rpc_request_results") - .select("result") - - .where("request_hash", "=", sql`MD5(${request})`) - .where("chain_id", "=", chainId) - .executeTakeFirst(); + db.wrap( + { + method: "getRpcRequestResult", + }, + async () => { + const result = await db + .selectFrom("rpc_request_results") + .select("result") + .where("request_hash", "=", sql`MD5(${request})`) + .where("chain_id", "=", chainId) + .executeTakeFirst(); - return result?.result; - }), + return result?.result; + }, + ), pruneRpcRequestResult: async ({ blocks, chainId }) => - db.wrap({ method: "pruneRpcRequestResult" }, async () => { - if (blocks.length === 0) return; + db.wrap( + { + method: "pruneRpcRequestResult", + }, + async () => { + if (blocks.length === 0) return; - const numbers = blocks.map(({ number }) => - hexToBigInt(number).toString(), - ); + const numbers = blocks.map(({ number }) => + hexToBigInt(number).toString(), + ); - await db - .deleteFrom("rpc_request_results") - .where("chain_id", "=", chainId) - .where("block_number", "in", numbers) - .execute(); - }), - pruneByChain: async ({ fromBlock, chainId }) => - db.wrap({ method: "pruneByChain" }, () => - db.transaction().execute(async (tx) => { - await tx - .deleteFrom("logs") - .where("chainId", "=", chainId) - .where("blockNumber", ">=", fromBlock.toString()) - .execute(); - await tx - .deleteFrom("blocks") - .where("chainId", "=", chainId) - .where("number", ">=", fromBlock.toString()) - .execute(); - await tx + await db .deleteFrom("rpc_request_results") .where("chain_id", "=", chainId) - .where("block_number", ">=", fromBlock.toString()) - .execute(); - await tx - .deleteFrom("traces") - .where("chainId", "=", chainId) - .where("blockNumber", ">=", fromBlock.toString()) - .execute(); - await tx - .deleteFrom("transactions") - .where("chainId", "=", chainId) - .where("blockNumber", ">=", fromBlock.toString()) + .where("block_number", "in", numbers) .execute(); - await tx - .deleteFrom("transactionReceipts") - .where("chainId", "=", chainId) - .where("blockNumber", ">=", fromBlock.toString()) - .execute(); - }), + }, + ), + pruneByChain: async ({ fromBlock, chainId }) => + db.wrap( + { + method: "pruneByChain", + }, + async () => + db.transaction().execute(async (tx) => { + await tx + .deleteFrom("logs") + .where("chainId", "=", chainId) + .where("blockNumber", ">=", fromBlock.toString()) + .execute(); + await tx + .deleteFrom("blocks") + .where("chainId", "=", chainId) + .where("number", ">=", fromBlock.toString()) + .execute(); + await tx + .deleteFrom("rpc_request_results") + .where("chain_id", "=", chainId) + .where("block_number", ">=", fromBlock.toString()) + .execute(); + await tx + .deleteFrom("traces") + .where("chainId", "=", chainId) + .where("blockNumber", ">=", fromBlock.toString()) + .execute(); + await tx + .deleteFrom("transactions") + .where("chainId", "=", chainId) + .where("blockNumber", ">=", fromBlock.toString()) + .execute(); + await tx + .deleteFrom("transactionReceipts") + .where("chainId", "=", chainId) + .where("blockNumber", ">=", fromBlock.toString()) + .execute(); + }), ), }); diff --git a/packages/core/src/sync/index.ts b/packages/core/src/sync/index.ts index 314f833f5..751fc050e 100644 --- a/packages/core/src/sync/index.ts +++ b/packages/core/src/sync/index.ts @@ -418,7 +418,7 @@ export const createSync = async (args: CreateSyncParameters): Promise => { ? args.initialCheckpoint : getChainCheckpoint({ syncProgress, network, tag: "start" })!, to, - batch: 1000, + batch: 10000, }); return bufferAsyncGenerator(localEventGenerator, 2); @@ -1031,6 +1031,7 @@ export async function* getLocalEventGenerator(params: { for await (const syncCheckpoint of getNonBlockingAsyncGenerator( params.localSyncGenerator, )) { + let consecutiveErrors = 0; while (cursor < min(syncCheckpoint, params.to)) { const to = min( syncCheckpoint, @@ -1044,15 +1045,35 @@ export async function* getLocalEventGenerator(params: { }), ); - let consecutiveErrors = 0; try { - const { events, cursor: queryCursor } = - await params.syncStore.getEvents({ + let events: RawEvent[] = []; + let queryCursor: string; + if (params.syncStore.allowCursor) { + const stream = await params.syncStore.getEventsStream({ filters: params.filters, from: cursor, to, limit: params.batch, }); + for await (const row of stream) { + events.push(row); + } + if (events.length !== params.batch) { + queryCursor = to; + } else { + queryCursor = events[events.length - 1]!.checkpoint!; + } + } else { + const { events: _events, cursor: _queryCursor } = + await params.syncStore.getEvents({ + filters: params.filters, + from: cursor, + to, + limit: params.batch, + }); + events = _events; + queryCursor = _queryCursor; + } estimateSeconds = estimate({ from: decodeCheckpoint(cursor).blockTimestamp, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index e8d64a367..378a74227 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -579,7 +579,7 @@ importers: dependencies: forge-std: specifier: github:foundry-rs/forge-std - version: https://codeload.github.com/foundry-rs/forge-std/tar.gz/83c5d212a01f8950727da4095cdfe2654baccb5b + version: https://codeload.github.com/foundry-rs/forge-std/tar.gz/d3db4ef90a72b7d24aa5a2e5c649593eaef7801d examples/with-foundry/ponder: dependencies: @@ -819,6 +819,9 @@ importers: pg-connection-string: specifier: ^2.6.2 version: 2.6.2 + pg-cursor: + specifier: ^2.12.1 + version: 2.12.1(pg@8.11.3) picocolors: specifier: ^1.0.0 version: 1.0.0 @@ -859,6 +862,9 @@ importers: '@types/pg-copy-streams': specifier: ^1.2.5 version: 1.2.5 + '@types/pg-cursor': + specifier: ^2.7.2 + version: 2.7.2 '@types/react': specifier: ^18.2.38 version: 18.2.46 @@ -3145,6 +3151,9 @@ packages: '@types/pg-copy-streams@1.2.5': resolution: {integrity: sha512-7D6/GYW2uHIaVU6S/5omI+6RZnwlZBpLQDZAH83xX1rjxAOK0f6/deKyyUTewxqts145VIGn6XWYz1YGf50G5g==} + '@types/pg-cursor@2.7.2': + resolution: {integrity: sha512-m3xT8bVFCvx98LuzbvXyuCdT/Hjdd/v8ml4jL4K1QF70Y8clOfCFdgoaEB1FWdcSwcpoFYZTJQaMD9/GQ27efQ==} + '@types/pg@8.10.9': resolution: {integrity: sha512-UksbANNE/f8w0wOMxVKKIrLCbEMV+oM1uKejmwXr39olg4xqcfBDbXxObJAt6XxHbDa4XTKOlUEcEltXDX+XLQ==} @@ -5122,8 +5131,8 @@ packages: forever-agent@0.6.1: resolution: {integrity: sha512-j0KLYPhm6zeac4lz3oJ3o65qvgQCcPubiyotZrXqEaG4hNagNYO8qdlUrX5vwqv9ohqeT/Z3j6+yW067yWWdUw==} - forge-std@https://codeload.github.com/foundry-rs/forge-std/tar.gz/83c5d212a01f8950727da4095cdfe2654baccb5b: - resolution: {tarball: https://codeload.github.com/foundry-rs/forge-std/tar.gz/83c5d212a01f8950727da4095cdfe2654baccb5b} + forge-std@https://codeload.github.com/foundry-rs/forge-std/tar.gz/d3db4ef90a72b7d24aa5a2e5c649593eaef7801d: + resolution: {tarball: https://codeload.github.com/foundry-rs/forge-std/tar.gz/d3db4ef90a72b7d24aa5a2e5c649593eaef7801d} version: 1.9.4 form-data@2.3.3: @@ -7099,6 +7108,11 @@ packages: pg-connection-string@2.6.2: resolution: {integrity: sha512-ch6OwaeaPYcova4kKZ15sbJ2hKb/VP48ZD2gE7i1J+L4MspCtBMAx8nMgz7bksc7IojCIIWuEhHibSMFH8m8oA==} + pg-cursor@2.12.1: + resolution: {integrity: sha512-V13tEaA9Oq1w+V6Q3UBIB/blxJrwbbr35/dY54r/86soBJ7xkP236bXaORUTVXUPt9B6Ql2BQu+uwQiuMfRVgg==} + peerDependencies: + pg: ^8 + pg-int8@1.0.1: resolution: {integrity: sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==} engines: {node: '>=4.0.0'} @@ -11155,6 +11169,11 @@ snapshots: '@types/node': 20.11.24 '@types/pg': 8.10.9 + '@types/pg-cursor@2.7.2': + dependencies: + '@types/node': 20.11.24 + '@types/pg': 8.10.9 + '@types/pg@8.10.9': dependencies: '@types/node': 20.11.24 @@ -13386,7 +13405,7 @@ snapshots: forever-agent@0.6.1: {} - forge-std@https://codeload.github.com/foundry-rs/forge-std/tar.gz/83c5d212a01f8950727da4095cdfe2654baccb5b: {} + forge-std@https://codeload.github.com/foundry-rs/forge-std/tar.gz/d3db4ef90a72b7d24aa5a2e5c649593eaef7801d: {} form-data@2.3.3: dependencies: @@ -16026,6 +16045,10 @@ snapshots: pg-connection-string@2.6.2: {} + pg-cursor@2.12.1(pg@8.11.3): + dependencies: + pg: 8.11.3 + pg-int8@1.0.1: {} pg-numeric@1.0.2: {}