Skip to content

Commit

Permalink
feat: replacing getTransactionReceipt with getBlockReceipts (#1342)
Browse files Browse the repository at this point in the history
* _eth_getBlockReceipts instead of _eth_getTransactionReceipt

* blockReceipts cache and blockHash for _eth_getBlockReceipts

* lint fix

* cleanup error checking and removing excess database inserts

* transactionReceipts fallback

* sync-realtime: getTransactionReceipt fallback

* cleanup

* chore: changeset

* error logging at fallback

---------

Co-authored-by: Kyle Scott <[email protected]>
  • Loading branch information
khaidarkairbek and kyscott18 authored Jan 5, 2025
1 parent e0e1a58 commit f49e62d
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 25 deletions.
5 changes: 5 additions & 0 deletions .changeset/long-ligers-travel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"ponder": patch
---

Added support for "eth_getBlockReceipts" request for better performance and cost.
151 changes: 133 additions & 18 deletions packages/core/src/sync-historical/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ import {
shouldGetTransactionReceipt,
} from "@/sync/source.js";
import type { Source, TransactionFilter } from "@/sync/source.js";
import type { SyncBlock, SyncLog, SyncTrace } from "@/types/sync.js";
import type {
SyncBlock,
SyncLog,
SyncTrace,
SyncTransactionReceipt,
} from "@/types/sync.js";
import {
type Interval,
getChunks,
Expand All @@ -30,6 +35,7 @@ import type { RequestQueue } from "@/utils/requestQueue.js";
import {
_debug_traceBlockByNumber,
_eth_getBlockByNumber,
_eth_getBlockReceipts,
_eth_getLogs,
_eth_getTransactionReceipt,
} from "@/utils/rpc.js";
Expand Down Expand Up @@ -66,7 +72,10 @@ export const createHistoricalSync = async (
args: CreateHistoricalSyncParameters,
): Promise<HistoricalSync> => {
let isKilled = false;

/**
* Flag to fetch transaction receipts through _eth_getBlockReceipts (true) or _eth_getTransactionReceipt (false)
*/
let isBlockReceipts = true;
/**
* Blocks that have already been extracted.
* Note: All entries are deleted at the end of each call to `sync()`.
Expand All @@ -82,6 +91,20 @@ export const createHistoricalSync = async (
* Note: All entries are deleted at the end of each call to `sync()`.
*/
const transactionsCache = new Set<Hash>();
/**
* Block transaction receipts that have already been fetched.
* Note: All entries are deleted at the end of each call to `sync()`.
*/
const blockReceiptsCache = new Map<Hash, Promise<SyncTransactionReceipt[]>>();
/**
* Transaction receipts that have already been fetched.
* Note: All entries are deleted at the end of each call to `sync()`.
*/
const transactionReceiptsCache = new Map<
Hash,
Promise<SyncTransactionReceipt>
>();

/**
* Data about the range passed to "eth_getLogs" for all log
* filters and log factories.
Expand Down Expand Up @@ -318,6 +341,79 @@ export const createHistoricalSync = async (
}
};

const syncTransactionReceipts = async (
block: Hash,
transactionHashes: Set<Hash>,
): Promise<SyncTransactionReceipt[]> => {
if (isBlockReceipts === false) {
const transactionReceipts = await Promise.all(
Array.from(transactionHashes).map((hash) =>
syncTransactionReceipt(hash),
),
);

return transactionReceipts;
}

let blockReceipts: SyncTransactionReceipt[];
try {
blockReceipts = await syncBlockReceipts(block);
} catch (_error) {
const error = _error as Error;
args.common.logger.warn({
service: "sync",
msg: `Caught eth_getBlockReceipts error on '${
args.network.name
}', switching to eth_getTransactionReceipt method.`,
error,
});

isBlockReceipts = false;
return syncTransactionReceipts(block, transactionHashes);
}

const blockReceiptsTransactionHashes = new Set(
blockReceipts.map((r) => r.transactionHash),
);
// Validate that block transaction receipts include all required transactions
for (const hash of Array.from(transactionHashes)) {
if (blockReceiptsTransactionHashes.has(hash) === false) {
throw new Error(
`Detected inconsistent RPC responses. 'transaction.hash' ${hash} not found in eth_getBlockReceipts response for block '${block}'`,
);
}
}
const transactionReceipts = blockReceipts.filter((receipt) =>
transactionHashes.has(receipt.transactionHash),
);

return transactionReceipts;
};

const syncTransactionReceipt = async (transaction: Hash) => {
if (transactionReceiptsCache.has(transaction)) {
return await transactionReceiptsCache.get(transaction)!;
} else {
const receipt = _eth_getTransactionReceipt(args.requestQueue, {
hash: transaction,
});
transactionReceiptsCache.set(transaction, receipt);
return await receipt;
}
};

const syncBlockReceipts = async (block: Hash) => {
if (blockReceiptsCache.has(block)) {
return await blockReceiptsCache.get(block)!;
} else {
const blockReceipts = _eth_getBlockReceipts(args.requestQueue, {
blockHash: block,
});
blockReceiptsCache.set(block, blockReceipts);
return await blockReceipts;
}
};

/** Extract and insert the log-based addresses that match `filter` + `interval`. */
const syncLogFactory = async (filter: LogFactory, interval: Interval) => {
const logs = await syncLogsDynamic({
Expand Down Expand Up @@ -380,6 +476,8 @@ export const createHistoricalSync = async (
logs.map((log) => syncBlock(hexToNumber(log.blockNumber))),
);

const requiredBlocks = new Set(blocks.map((b) => b.hash));

// Validate that logs point to the valid transaction hash in the block
for (let i = 0; i < logs.length; i++) {
const log = logs[i]!;
Expand Down Expand Up @@ -418,10 +516,15 @@ export const createHistoricalSync = async (

if (shouldGetTransactionReceipt(filter)) {
const transactionReceipts = await Promise.all(
Array.from(transactionHashes).map((hash) =>
_eth_getTransactionReceipt(args.requestQueue, { hash }),
),
);
Array.from(requiredBlocks).map((blockHash) => {
const blockTransactionHashes = new Set(
logs
.filter((l) => l.blockHash === blockHash)
.map((l) => l.transactionHash),
);
return syncTransactionReceipts(blockHash, blockTransactionHashes);
}),
).then((receipts) => receipts.flat());

if (isKilled) return;

Expand Down Expand Up @@ -472,6 +575,7 @@ export const createHistoricalSync = async (
if (isKilled) return;

const transactionHashes: Set<Hash> = new Set();
const requiredBlocks: Set<SyncBlock> = new Set();

for (const block of blocks) {
block.transactions.map((transaction) => {
Expand All @@ -485,6 +589,7 @@ export const createHistoricalSync = async (
})
) {
transactionHashes.add(transaction.hash);
requiredBlocks.add(block);
}
});
}
Expand All @@ -496,10 +601,15 @@ export const createHistoricalSync = async (
if (isKilled) return;

const transactionReceipts = await Promise.all(
Array.from(transactionHashes).map((hash) =>
_eth_getTransactionReceipt(args.requestQueue, { hash }),
),
);
Array.from(requiredBlocks).map((block) => {
const blockTransactionHashes = new Set(
block.transactions
.filter((t) => transactionHashes.has(t.hash))
.map((t) => t.hash),
);
return syncTransactionReceipts(block.hash, blockTransactionHashes);
}),
).then((receipts) => receipts.flat());

if (isKilled) return;

Expand All @@ -521,6 +631,7 @@ export const createHistoricalSync = async (
? await syncAddressFactory(filter.toAddress, interval)
: undefined;

const requiredBlocks: Set<Hash> = new Set();
const traces = await Promise.all(
intervalRange(interval).map(async (number) => {
let traces = await syncTrace(number);
Expand Down Expand Up @@ -555,6 +666,7 @@ export const createHistoricalSync = async (
if (traces.length === 0) return [];

const block = await syncBlock(number);
requiredBlocks.add(block.hash);

return traces.map((trace) => {
const transaction = block.transactions.find(
Expand All @@ -576,10 +688,6 @@ export const createHistoricalSync = async (

if (isKilled) return;

const transactionHashes = new Set(
traces.map(({ transaction }) => transaction.hash),
);

await args.syncStore.insertTraces({
traces,
chainId: args.network.chainId,
Expand All @@ -589,10 +697,15 @@ export const createHistoricalSync = async (

if (shouldGetTransactionReceipt(filter)) {
const transactionReceipts = await Promise.all(
Array.from(transactionHashes).map((hash) =>
_eth_getTransactionReceipt(args.requestQueue, { hash }),
),
);
Array.from(requiredBlocks).map((blockHash) => {
const blockTransactionHashes = new Set(
traces
.filter((t) => t.block.hash === blockHash)
.map((t) => t.transaction.hash),
);
return syncTransactionReceipts(blockHash, blockTransactionHashes);
}),
).then((receipts) => receipts.flat());

if (isKilled) return;

Expand Down Expand Up @@ -722,6 +835,8 @@ export const createHistoricalSync = async (
blockCache.clear();
traceCache.clear();
transactionsCache.clear();
blockReceiptsCache.clear();
transactionReceiptsCache.clear();

return latestBlock;
},
Expand Down
64 changes: 57 additions & 7 deletions packages/core/src/sync-realtime/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
_debug_traceBlockByHash,
_eth_getBlockByHash,
_eth_getBlockByNumber,
_eth_getBlockReceipts,
_eth_getLogs,
_eth_getTransactionReceipt,
} from "@/utils/rpc.js";
Expand Down Expand Up @@ -100,6 +101,7 @@ export const createRealtimeSync = (
// state
////////
let isKilled = false;
let isBlockReceipts = true;
let finalizedBlock: LightBlock;
let finalizedChildAddresses: Map<Factory, Set<Address>>;
const unfinalizedChildAddresses = new Map<Factory, Set<Address>>();
Expand Down Expand Up @@ -575,6 +577,57 @@ export const createRealtimeSync = (
}
};

const syncTransactionReceipts = async (
blockHash: Hash,
transactionHashes: Set<Hash>,
): Promise<SyncTransactionReceipt[]> => {
if (isBlockReceipts === false) {
const transactionReceipts = await Promise.all(
Array.from(transactionHashes).map(async (hash) =>
_eth_getTransactionReceipt(args.requestQueue, { hash }),
),
);

return transactionReceipts;
}

let blockReceipts: SyncTransactionReceipt[];
try {
blockReceipts = await _eth_getBlockReceipts(args.requestQueue, {
blockHash,
});
} catch (_error) {
const error = _error as Error;
args.common.logger.warn({
service: "realtime",
msg: `Caught eth_getBlockReceipts error on '${
args.network.name
}', switching to eth_getTransactionReceipt method.`,
error,
});

isBlockReceipts = false;
return syncTransactionReceipts(blockHash, transactionHashes);
}

const blockReceiptsTransactionHashes = new Set(
blockReceipts.map((r) => r.transactionHash),
);
// Validate that block transaction receipts include all required transactions
for (const hash of Array.from(transactionHashes)) {
if (blockReceiptsTransactionHashes.has(hash) === false) {
throw new Error(
`Detected inconsistent RPC responses. Transaction receipt with transactionHash ${hash} is missing in \`blockReceipts\`.`,
);
}
}
const transactionReceipts = blockReceipts.filter((receipt) =>
transactionHashes.has(receipt.transactionHash),
);

return transactionReceipts;
};

/**
* Fetch all data (logs, traces, receipts) for the specified block required by `args.sources`
*
Expand Down Expand Up @@ -769,7 +822,7 @@ export const createRealtimeSync = (
for (const hash of Array.from(requiredTransactions)) {
if (blockTransactionsHashes.has(hash) === false) {
throw new Error(
`Detected inconsistent RPC responses. Transaction with hash ${hash} is missing in \`block.transactions\`.`,
`Detected inconsistent RPC responses. 'transaction.hash' ${hash} not found in eth_getBlockReceipts response for block '${block.hash}'.`,
);
}
}
Expand All @@ -778,12 +831,9 @@ export const createRealtimeSync = (
// Transaction Receipts
////////

const transactionReceipts = await Promise.all(
block.transactions
.filter(({ hash }) => requiredTransactionReceipts.has(hash))
.map(({ hash }) =>
_eth_getTransactionReceipt(args.requestQueue, { hash }),
),
const transactionReceipts = await syncTransactionReceipts(
block.hash,
requiredTransactionReceipts,
);

return {
Expand Down
14 changes: 14 additions & 0 deletions packages/core/src/utils/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,20 @@ export const _eth_getTransactionReceipt = (
return receipt as SyncTransactionReceipt;
});

/**
* Helper function for "eth_getBlockReceipts" request.
*/
export const _eth_getBlockReceipts = (
requestQueue: RequestQueue,
{ blockHash }: { blockHash: Hash },
): Promise<SyncTransactionReceipt[]> =>
requestQueue
.request({
method: "eth_getBlockReceipts",
params: [blockHash],
} as any)
.then((receipts) => receipts as unknown as SyncTransactionReceipt[]);

/**
* Helper function for "debug_traceBlockByNumber" request.
*/
Expand Down

0 comments on commit f49e62d

Please sign in to comment.