From 2ac7a8d26cfb1586f1f4836ae39dad4d0142ee84 Mon Sep 17 00:00:00 2001 From: vladimir Date: Mon, 13 May 2024 19:13:21 +0300 Subject: [PATCH 01/15] updates job status on queue inclusion --- zp-relayer/pool/BasePool.ts | 2 +- zp-relayer/pool/RelayPool.ts | 16 ++++++++++++++-- zp-relayer/workers/sentTxWorker.ts | 2 +- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/zp-relayer/pool/BasePool.ts b/zp-relayer/pool/BasePool.ts index bb53805d..1afe0a3d 100644 --- a/zp-relayer/pool/BasePool.ts +++ b/zp-relayer/pool/BasePool.ts @@ -44,7 +44,7 @@ export abstract class BasePool { abstract init(...args: any): Promise abstract onSend(p: ProcessResult, txHash: string): Promise - abstract onConfirmed(p: ProcessResult, txHash: string, callback?: () => Promise): Promise + abstract onConfirmed( p: ProcessResult, txHash: string, callback?: () => Promise,jobId?: string): Promise async onFailed(txHash: string): Promise { logger.error('Transaction reverted', { txHash }) diff --git a/zp-relayer/pool/RelayPool.ts b/zp-relayer/pool/RelayPool.ts index 477c0ce3..8ae4a3ce 100644 --- a/zp-relayer/pool/RelayPool.ts +++ b/zp-relayer/pool/RelayPool.ts @@ -2,7 +2,7 @@ import config from '@/configs/relayerConfig' import { logger } from '@/lib/appLogger' import { Network } from '@/lib/network' import { redis } from '@/lib/redisClient' -import { PoolTx, WorkerTxType } from '@/queue/poolTxQueue' +import { JobState, PoolTx, poolTxQueue, WorkerTxType } from '@/queue/poolTxQueue' import { TxStore } from '@/state/TxStore' import { ENERGY_SIZE, MOCK_CALLDATA, PERMIT2_CONTRACT, TOKEN_SIZE, TRANSFER_INDEX_SIZE } from '@/utils/constants' import { @@ -264,7 +264,19 @@ export class RelayPool extends BasePool { } } - async onConfirmed(res: ProcessResult, txHash: string, callback?: () => Promise): Promise {} + async onConfirmed(res: ProcessResult, txHash: string, callback?: () => Promise, jobId?: string): Promise { + logger.debug("Updating pool job %s completed, txHash %s", jobId, txHash); + if(jobId) { + const poolJob = await poolTxQueue.getJob(jobId); + if (!poolJob) { + logger.error('Pool job not found', { jobId }); + } else { + poolJob.data.transaction.state = JobState.COMPLETED; + poolJob.data.transaction.txHash = txHash; + await poolJob.update(poolJob.data); + } + } + } async getIndexerInfo() { const info = await fetchJson(this.indexerUrl, '/info', []) diff --git a/zp-relayer/workers/sentTxWorker.ts b/zp-relayer/workers/sentTxWorker.ts index 4ed7016d..3f6df7f3 100644 --- a/zp-relayer/workers/sentTxWorker.ts +++ b/zp-relayer/workers/sentTxWorker.ts @@ -85,7 +85,7 @@ export async function createSentTxWorker({ redis, mutex, pool, txManager }: ISen // Successful jobLogger.info('Transaction was successfully mined', { txHash, blockNumber: tx.blockNumber }) - await pool.onConfirmed(processResult, txHash, updatePoolJobState) + await pool.onConfirmed(processResult, txHash, updatePoolJobState,poolJobId) } else { await pool.onFailed(txHash) await updatePoolJobState() From b1cf354c32b263d2e719ef45cb38c104a671635b Mon Sep 17 00:00:00 2001 From: Alexander Filippov Date: Mon, 20 May 2024 16:53:22 +0300 Subject: [PATCH 02/15] Remove additional logic from getTransactionV2 --- zp-relayer/pool/RelayPool.ts | 6 ----- zp-relayer/services/relayer/endpoints.ts | 34 +----------------------- zp-relayer/state/TxStore.ts | 22 --------------- 3 files changed, 1 insertion(+), 61 deletions(-) delete mode 100644 zp-relayer/state/TxStore.ts diff --git a/zp-relayer/pool/RelayPool.ts b/zp-relayer/pool/RelayPool.ts index 8ae4a3ce..76a7fd87 100644 --- a/zp-relayer/pool/RelayPool.ts +++ b/zp-relayer/pool/RelayPool.ts @@ -3,7 +3,6 @@ import { logger } from '@/lib/appLogger' import { Network } from '@/lib/network' import { redis } from '@/lib/redisClient' import { JobState, PoolTx, poolTxQueue, WorkerTxType } from '@/queue/poolTxQueue' -import { TxStore } from '@/state/TxStore' import { ENERGY_SIZE, MOCK_CALLDATA, PERMIT2_CONTRACT, TOKEN_SIZE, TRANSFER_INDEX_SIZE } from '@/utils/constants' import { applyDenominator, @@ -50,13 +49,10 @@ export class RelayPool extends BasePool { public permitRecover: PermitRecover | null = null private proxyAddress!: string private indexerUrl!: string - txStore!: TxStore async init(permitConfig: PermitConfig, proxyAddress: string, indexerUrl: string) { if (this.isInitialized) return - this.txStore = new TxStore('tmp-tx-store', redis) - this.proxyAddress = proxyAddress this.indexerUrl = indexerUrl @@ -256,8 +252,6 @@ export class RelayPool extends BasePool { memo ) - await this.txStore.add(commitIndex, prefixedMemo) - if (nullifier) { logger.debug('Adding nullifier %s to OS', nullifier) await this.optimisticState.nullifiers.add([nullifier]) diff --git a/zp-relayer/services/relayer/endpoints.ts b/zp-relayer/services/relayer/endpoints.ts index 45e4a097..2c89caef 100644 --- a/zp-relayer/services/relayer/endpoints.ts +++ b/zp-relayer/services/relayer/endpoints.ts @@ -96,39 +96,7 @@ async function getTransactionsV2(req: Request, res: Response, { pool }: PoolInje throw new Error(`Failed to fetch transactions from indexer. Status: ${res.status}`) } const indexerTxs: string[] = await response.json() - - const lastIndex = offset + indexerTxs.length * OUTPLUSONE - const txStore = (pool as RelayPool).txStore - const indices = await txStore.getAll().then(keys => { - return Object.entries(keys) - .map(([i, v]) => [parseInt(i), v] as [number, string]) - .filter(([i]) => offset <= i && i <= lastIndex) - .sort(([i1], [i2]) => i1 - i2) - }) - - // TODO: optimize - const optimisticTxs = new Set() - const duplicates = new Set() - for (const tx of indexerTxs) { - const commit = tx.slice(65, 129) - for (const [index, memoV2] of indices) { - const commitLocal = memoV2.slice(0, 64) - if (commit === commitLocal) { - duplicates.add(index.toString()) - } else { - optimisticTxs.add(memoV2) - } - } - } - - for (const index of duplicates) { - logger.info('Deleting index from optimistic state', { index }) - await txStore.remove(index.toString()) - } - - const txs: string[] = [...indexerTxs, ...Array.from(optimisticTxs.values()).map(tx => txToV2Format('0', tx))] - - res.json(txs) + res.json(indexerTxs) } async function getJob(req: Request, res: Response, { pool }: PoolInjection) { diff --git a/zp-relayer/state/TxStore.ts b/zp-relayer/state/TxStore.ts deleted file mode 100644 index 67782925..00000000 --- a/zp-relayer/state/TxStore.ts +++ /dev/null @@ -1,22 +0,0 @@ -import type { Redis } from 'ioredis' - -export class TxStore { - constructor(public name: string, private redis: Redis) {} - - async add(index: number, memo: string) { - await this.redis.hset(this.name, { [index]: memo }) - } - - async remove(index: string) { - await this.redis.hdel(this.name, index) - } - - async get(index: string) { - const memo = await this.redis.hget(this.name, index) - return memo - } - - async getAll() { - return await this.redis.hgetall(this.name) - } -} From 9d3c96815003a6c87dfc8efe452b7b05b916a32f Mon Sep 17 00:00:00 2001 From: EvgenKor Date: Wed, 22 May 2024 23:35:15 +0300 Subject: [PATCH 03/15] Added /proverAddress endpoint --- zp-relayer/services/relayer/endpoints.ts | 7 +++++++ zp-relayer/services/relayer/router.ts | 1 + 2 files changed, 8 insertions(+) diff --git a/zp-relayer/services/relayer/endpoints.ts b/zp-relayer/services/relayer/endpoints.ts index 2c89caef..e0bb38a1 100644 --- a/zp-relayer/services/relayer/endpoints.ts +++ b/zp-relayer/services/relayer/endpoints.ts @@ -236,6 +236,12 @@ async function getProverFee(req: Request, res: Response) { res.json(fee) } +async function getProverAddress(req: Request, res: Response) { + const url = new URL('/address', config.RELAYER_PROVER_URL) + const address = await fetch(url.toString()).then(r => r.json()) + res.json(address) +} + function root(req: Request, res: Response) { return res.sendStatus(200) } @@ -252,6 +258,7 @@ export default { getSiblings, getParamsHash, getProverFee, + getProverAddress, relayerVersion, root, } diff --git a/zp-relayer/services/relayer/router.ts b/zp-relayer/services/relayer/router.ts index 43a67a2c..f47aa6b4 100644 --- a/zp-relayer/services/relayer/router.ts +++ b/zp-relayer/services/relayer/router.ts @@ -72,6 +72,7 @@ export function createRouter({ feeManager, pool }: IRouterConfig) { }) ) router.get('/proverFee', wrapErr(endpoints.getProverFee)) + router.get('/proverAddress', wrapErr(endpoints.getProverAddress)) router.post('/sendTransactions', wrapErr(inject({ pool }, endpoints.sendTransactions))) router.get('/transactions/v2', wrapErr(inject({ pool }, endpoints.getTransactionsV2))) router.get('/merkle/root/:index?', wrapErr(inject({ pool }, endpoints.merkleRoot))) From 834d938257fae130693248dcac200f2f5d809c1b Mon Sep 17 00:00:00 2001 From: Evgen Date: Thu, 23 May 2024 17:35:57 +0300 Subject: [PATCH 04/15] Propagating job validation error --- zp-relayer/workers/poolTxWorker.ts | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/zp-relayer/workers/poolTxWorker.ts b/zp-relayer/workers/poolTxWorker.ts index ac52445e..1af2e2a7 100644 --- a/zp-relayer/workers/poolTxWorker.ts +++ b/zp-relayer/workers/poolTxWorker.ts @@ -36,13 +36,21 @@ export async function createPoolTxWorker({ redis, mutex, pool, txManager }: IPoo const jobLogger = workerLogger.child({ jobId: job.id, traceId }) jobLogger.info('Processing...') - await pool.validateTx( - job.data, - { - // TODO: optional checks - }, - traceId - ) + try { + await pool.validateTx( + job.data, + { + // TODO: optional checks + }, + traceId + ) + } catch(e) { + job.data.transaction.state = JobState.FAILED; + job.failedReason = (e as Error).message; + await job.update(job.data); + throw e; + } + const processResult = await pool.buildTx(job.data) const { data, func } = processResult From f00b22e62f15a05448092a4571ff6a3d8804ef19 Mon Sep 17 00:00:00 2001 From: EvgenKor Date: Thu, 23 May 2024 22:36:22 +0300 Subject: [PATCH 05/15] Job error description --- zp-relayer/services/relayer/endpoints.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zp-relayer/services/relayer/endpoints.ts b/zp-relayer/services/relayer/endpoints.ts index e0bb38a1..07f40b17 100644 --- a/zp-relayer/services/relayer/endpoints.ts +++ b/zp-relayer/services/relayer/endpoints.ts @@ -137,7 +137,7 @@ async function getJob(req: Request, res: Response, { pool }: PoolInjection) { let result: GetJobResponse = { resolvedJobId: jobId, createdOn: job.timestamp, - failedReason: null, + failedReason: job.failedReason, finishedOn: null, state, txHash, From b21307cdf3faa3436f2312c1d2dfed713a120b05 Mon Sep 17 00:00:00 2001 From: Evgen Date: Fri, 24 May 2024 16:00:32 +0300 Subject: [PATCH 06/15] Trying to restore balance waiting routine --- zp-relayer/common/serviceUtils.ts | 1 + zp-relayer/configs/common/txManagerConfig.ts | 2 ++ zp-relayer/lib/network/evm/EvmTxManager.ts | 26 ++++++++++++++++++++ zp-relayer/lib/network/tron/TronTxManager.ts | 5 ++++ zp-relayer/lib/network/types.ts | 2 ++ zp-relayer/workers/poolTxWorker.ts | 19 +++++++++++++- 6 files changed, 54 insertions(+), 1 deletion(-) diff --git a/zp-relayer/common/serviceUtils.ts b/zp-relayer/common/serviceUtils.ts index 7361a68f..649e7642 100644 --- a/zp-relayer/common/serviceUtils.ts +++ b/zp-relayer/common/serviceUtils.ts @@ -102,6 +102,7 @@ export function buildTxManager( gasPriceBumpFactor: tmConfig.TX_MIN_GAS_PRICE_BUMP_FACTOR, gasPriceSurplus: tmConfig.TX_GAS_PRICE_SURPLUS, gasPriceMaxFeeLimit: tmConfig.TX_MAX_FEE_PER_GAS_LIMIT, + waitingFundsTimeout: tmConfig.BALANCE_CHECK_TIMEOUT }) } else { throw new Error('Unsupported network backend') diff --git a/zp-relayer/configs/common/txManagerConfig.ts b/zp-relayer/configs/common/txManagerConfig.ts index c1ec45d9..54a03f71 100644 --- a/zp-relayer/configs/common/txManagerConfig.ts +++ b/zp-relayer/configs/common/txManagerConfig.ts @@ -7,10 +7,12 @@ import { zBN } from './utils' const zBaseConfig = z .object({ TX_PRIVATE_KEY: z.string(), + RELAYER_INSUFFICIENT_BALANCE_CHECK_TIMEOUT: z.coerce.number().default(5000), }) .transform(o => ({ TX_ADDRESS: new Web3().eth.accounts.privateKeyToAccount(o.TX_PRIVATE_KEY).address, TX_PRIVATE_KEY: o.TX_PRIVATE_KEY, + BALANCE_CHECK_TIMEOUT: o.RELAYER_INSUFFICIENT_BALANCE_CHECK_TIMEOUT, })) const zTxGas = z diff --git a/zp-relayer/lib/network/evm/EvmTxManager.ts b/zp-relayer/lib/network/evm/EvmTxManager.ts index a0816012..8b94cb2b 100644 --- a/zp-relayer/lib/network/evm/EvmTxManager.ts +++ b/zp-relayer/lib/network/evm/EvmTxManager.ts @@ -22,14 +22,17 @@ import { Mutex } from 'async-mutex' import BN from 'bn.js' import type { Redis } from 'ioredis' import Web3 from 'web3' +import { toBN } from 'web3-utils' import type { TransactionConfig } from 'web3-core' import { Logger } from 'winston' +import promiseRetry from 'promise-retry' export interface EvmTxManagerConfig { redis: Redis gasPriceBumpFactor: number gasPriceSurplus: number gasPriceMaxFeeLimit: BN | null + waitingFundsTimeout: number } type ExtraInfo = TransactionConfig @@ -264,4 +267,27 @@ export class EvmTxManager implements TransactionManager { }) ) } + + waitingForFunds(minimumBalance: BN, cb: (balance: BN) => void): Promise { + return promiseRetry( + async retry => { + logger.debug('Getting manager balance') + const newBalance = toBN(await this.web3.eth.getBalance(this.address)) + const balanceLog = { balance: newBalance.toString(10), minimumBalance: minimumBalance.toString(10) } + if (newBalance.gte(minimumBalance)) { + logger.info('Relayer has minimum necessary balance', balanceLog) + cb(newBalance) + } else { + logger.warn('Relayer balance is still less than the minimum', balanceLog) + retry(new Error('Not enough balance')) + } + }, + { + forever: true, + factor: 1, + maxTimeout: this.config.waitingFundsTimeout, + minTimeout: this.config.waitingFundsTimeout, + } + ) + } } diff --git a/zp-relayer/lib/network/tron/TronTxManager.ts b/zp-relayer/lib/network/tron/TronTxManager.ts index 1e1a5e1f..0dd0b344 100644 --- a/zp-relayer/lib/network/tron/TronTxManager.ts +++ b/zp-relayer/lib/network/tron/TronTxManager.ts @@ -1,4 +1,5 @@ import { PreparedTx, SendAttempt, SendError, SendTx, TransactionManager, TxInfo } from '../types' +import BN from 'bn.js' interface ExtraInfo {} @@ -69,4 +70,8 @@ export class TronTxManager implements TransactionManager { const preparedTx = await this.prepareTx(sendTx) return this.sendPreparedTx(preparedTx) } + + waitingForFunds(minimumBalance: BN, cb: (balance: BN) => void): Promise { + throw new Error('Method not implemented'); + } } diff --git a/zp-relayer/lib/network/types.ts b/zp-relayer/lib/network/types.ts index 20efcc81..0be72e09 100644 --- a/zp-relayer/lib/network/types.ts +++ b/zp-relayer/lib/network/types.ts @@ -1,6 +1,7 @@ import type { TransactionConfig } from 'web3-core' import type { EthereumContract } from './evm/EvmContract' import type { TronContract } from './tron/TronContract' +import BN from 'bn.js' export enum Network { Tron = 'tron', @@ -75,6 +76,7 @@ export interface TransactionManager { attempt?: SendAttempt error?: SendError }> + waitingForFunds(minimumBalance: BN, cb: (balance: BN) => void): Promise; } export interface INetworkContract { diff --git a/zp-relayer/workers/poolTxWorker.ts b/zp-relayer/workers/poolTxWorker.ts index 1af2e2a7..ac9bb1a8 100644 --- a/zp-relayer/workers/poolTxWorker.ts +++ b/zp-relayer/workers/poolTxWorker.ts @@ -1,5 +1,6 @@ import { logger } from '@/lib/appLogger' import { JobState, PoolTx, WorkerTxType } from '@/queue/poolTxQueue' +import { poolTxQueue } from '@/queue/poolTxQueue' import { sentTxQueue } from '@/queue/sentTxQueue' import { TX_QUEUE_NAME } from '@/utils/constants' import { withErrorLog, withMutex } from '@/utils/helpers' @@ -7,6 +8,8 @@ import { TxValidationError } from '@/validation/tx/common' import { Job, Worker } from 'bullmq' import Redis from 'ioredis' import type { IPoolWorkerConfig } from './workerTypes' +import { isInsufficientBalanceError } from '@/utils/web3Errors' +import { toBN } from 'web3-utils' const REVERTED_SET = 'reverted' const RECHECK_ERROR = 'Waiting for next check' @@ -54,6 +57,7 @@ export async function createPoolTxWorker({ redis, mutex, pool, txManager }: IPoo const processResult = await pool.buildTx(job.data) const { data, func } = processResult + const gas = 2000000; const preparedTx = await txManager.prepareTx({ txDesc: { to: pool.network.pool.address(), // TODO: mpc @@ -68,13 +72,26 @@ export async function createPoolTxWorker({ redis, mutex, pool, txManager }: IPoo }, extraData: { // TODO: abstract gas for EVM - gas: 2000000, + gas, }, }) const sendAttempt = preparedTx[1] try { await txManager.sendPreparedTx(preparedTx) } catch (e) { + if (isInsufficientBalanceError(e as Error)) { + if (sendAttempt.extraData.gas && sendAttempt.extraData.gasPrice) { + const minimumBalance = toBN(sendAttempt.extraData.gas).mul(toBN(sendAttempt.extraData.gasPrice)); + logger.error('Insufficient balance, waiting for funds', { minimumBalance: minimumBalance.toString(10) }) + + await Promise.all([poolTxQueue.pause(), sentTxQueue.pause()]) + txManager.waitingForFunds( + minimumBalance, + () => Promise.all([poolTxQueue.resume(), sentTxQueue.resume()]) + ) + } + } + logger.warn('Tx send failed; it will be re-sent later', { txHash: preparedTx[1].txHash, error: (e as Error).message, From e4581aab3b6a138343ddc7d35f582191eff01e61 Mon Sep 17 00:00:00 2001 From: Evgen Date: Mon, 27 May 2024 16:48:42 +0300 Subject: [PATCH 07/15] Optimistic state for sent transactions --- zp-relayer/pool/RelayPool.ts | 8 +++++++- zp-relayer/services/relayer/endpoints.ts | 26 +++++++++++++++++++++++- zp-relayer/state/TxStore.ts | 22 ++++++++++++++++++++ 3 files changed, 54 insertions(+), 2 deletions(-) create mode 100644 zp-relayer/state/TxStore.ts diff --git a/zp-relayer/pool/RelayPool.ts b/zp-relayer/pool/RelayPool.ts index 76a7fd87..8907b8cb 100644 --- a/zp-relayer/pool/RelayPool.ts +++ b/zp-relayer/pool/RelayPool.ts @@ -3,6 +3,7 @@ import { logger } from '@/lib/appLogger' import { Network } from '@/lib/network' import { redis } from '@/lib/redisClient' import { JobState, PoolTx, poolTxQueue, WorkerTxType } from '@/queue/poolTxQueue' +import { TxStore } from '@/state/TxStore' import { ENERGY_SIZE, MOCK_CALLDATA, PERMIT2_CONTRACT, TOKEN_SIZE, TRANSFER_INDEX_SIZE } from '@/utils/constants' import { applyDenominator, @@ -49,10 +50,13 @@ export class RelayPool extends BasePool { public permitRecover: PermitRecover | null = null private proxyAddress!: string private indexerUrl!: string + txStore!: TxStore async init(permitConfig: PermitConfig, proxyAddress: string, indexerUrl: string) { if (this.isInitialized) return + this.txStore = new TxStore('tmp-tx-store', redis) + this.proxyAddress = proxyAddress this.indexerUrl = indexerUrl @@ -248,10 +252,12 @@ export class RelayPool extends BasePool { async onSend({ outCommit, nullifier, memo, commitIndex }: ProcessResult, txHash: string): Promise { const prefixedMemo = buildPrefixedMemo( outCommit, - '0x0000000000000000000000000000000000000000000000000000000000000000', + txHash, memo ) + await this.txStore.add(commitIndex, prefixedMemo) + if (nullifier) { logger.debug('Adding nullifier %s to OS', nullifier) await this.optimisticState.nullifiers.add([nullifier]) diff --git a/zp-relayer/services/relayer/endpoints.ts b/zp-relayer/services/relayer/endpoints.ts index 07f40b17..36ca7540 100644 --- a/zp-relayer/services/relayer/endpoints.ts +++ b/zp-relayer/services/relayer/endpoints.ts @@ -96,7 +96,31 @@ async function getTransactionsV2(req: Request, res: Response, { pool }: PoolInje throw new Error(`Failed to fetch transactions from indexer. Status: ${res.status}`) } const indexerTxs: string[] = await response.json() - res.json(indexerTxs) + + const lastIndex = offset + indexerTxs.length * OUTPLUSONE + const txStore = (pool as RelayPool).txStore + const indices = await txStore.getAll().then(keys => { + return Object.entries(keys) + .map(([i, v]) => [parseInt(i), v] as [number, string]) + .filter(([i]) => offset <= i && i <= lastIndex) + .sort(([i1], [i2]) => i1 - i2) + }) + + const indexerCommitments = indexerTxs.map(tx => tx.slice(65, 129)); + const optimisticTxs: string[] = [] + for (const [index, memo] of indices) { + const commitLocal = memo.slice(0, 64) + if (indexerCommitments.includes(commitLocal)) { + logger.info('Deleting index from optimistic state', { index }) + await txStore.remove(index.toString()) + } else { + optimisticTxs.push(txToV2Format('0', memo)) + } + } + + const txs: string[] = [...indexerTxs, ...optimisticTxs] + + res.json(txs) } async function getJob(req: Request, res: Response, { pool }: PoolInjection) { diff --git a/zp-relayer/state/TxStore.ts b/zp-relayer/state/TxStore.ts new file mode 100644 index 00000000..e16193df --- /dev/null +++ b/zp-relayer/state/TxStore.ts @@ -0,0 +1,22 @@ +import type { Redis } from 'ioredis' + +export class TxStore { + constructor(public name: string, private redis: Redis) {} + + async add(index: number, memo: string) { + await this.redis.hset(this.name, { [index]: memo }) + } + + async remove(index: string) { + await this.redis.hdel(this.name, index) + } + + async get(index: string) { + const memo = await this.redis.hget(this.name, index) + return memo + } + + async getAll() { + return await this.redis.hgetall(this.name) + } +} \ No newline at end of file From 179dc44ce347b4ea05fa39c27c467af4d4ec77d6 Mon Sep 17 00:00:00 2001 From: Evgen Date: Mon, 27 May 2024 23:50:23 +0300 Subject: [PATCH 08/15] Updating txHash in cached txs, introducing pendingDeltaIndex field in /info endpoint --- zp-relayer/pool/RelayPool.ts | 23 +++++++++++++++-------- zp-relayer/services/relayer/endpoints.ts | 13 +++++++++++++ 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/zp-relayer/pool/RelayPool.ts b/zp-relayer/pool/RelayPool.ts index 8907b8cb..ef5d53bd 100644 --- a/zp-relayer/pool/RelayPool.ts +++ b/zp-relayer/pool/RelayPool.ts @@ -250,18 +250,12 @@ export class RelayPool extends BasePool { } async onSend({ outCommit, nullifier, memo, commitIndex }: ProcessResult, txHash: string): Promise { - const prefixedMemo = buildPrefixedMemo( - outCommit, - txHash, - memo - ) - - await this.txStore.add(commitIndex, prefixedMemo) - if (nullifier) { logger.debug('Adding nullifier %s to OS', nullifier) await this.optimisticState.nullifiers.add([nullifier]) } + + await this.cacheTxLocally(commitIndex, outCommit, txHash, memo); } async onConfirmed(res: ProcessResult, txHash: string, callback?: () => Promise, jobId?: string): Promise { @@ -274,10 +268,23 @@ export class RelayPool extends BasePool { poolJob.data.transaction.state = JobState.COMPLETED; poolJob.data.transaction.txHash = txHash; await poolJob.update(poolJob.data); + + await this.cacheTxLocally(res.commitIndex, res.outCommit, txHash, res.memo); } } } + protected async cacheTxLocally(index: number, commit: string, txHash: string, memo: string) { + // store or updating local tx store + // (we should keep sent transaction until the indexer grab them) + const prefixedMemo = buildPrefixedMemo( + commit, + txHash, + memo + ); + await this.txStore.add(index, prefixedMemo); + } + async getIndexerInfo() { const info = await fetchJson(this.indexerUrl, '/info', []) return info diff --git a/zp-relayer/services/relayer/endpoints.ts b/zp-relayer/services/relayer/endpoints.ts index 36ca7540..6f6a650f 100644 --- a/zp-relayer/services/relayer/endpoints.ts +++ b/zp-relayer/services/relayer/endpoints.ts @@ -187,6 +187,19 @@ async function relayerInfo(req: Request, res: Response, { pool }: PoolInjection) } const info = await response.json() + const indexerMaxIdx = Math.max(parseInt(info.deltaIndex ?? '0'), parseInt(info.optimisticDeltaIndex ?? '0')) + + const txStore = (pool as RelayPool).txStore + const pendingCnt = await txStore.getAll() + .then(keys => { + return Object.entries(keys) + .map(([i]) => parseInt(i) as number) + .filter(i => indexerMaxIdx <= i) + }) + .then(a => a.length); + + info.pendingDeltaIndex = indexerMaxIdx + pendingCnt * OUTPLUSONE; + res.json(info) } From 95227b14d4a35d58a5956d041d5366bb9c110058 Mon Sep 17 00:00:00 2001 From: EvgenKor Date: Fri, 31 May 2024 00:12:35 +0300 Subject: [PATCH 09/15] Setting reverted state to failed txs --- zp-relayer/pool/BasePool.ts | 2 +- zp-relayer/pool/RelayPool.ts | 17 +++++++++++++++++ zp-relayer/workers/sentTxWorker.ts | 4 ++-- 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/zp-relayer/pool/BasePool.ts b/zp-relayer/pool/BasePool.ts index 1afe0a3d..3868025d 100644 --- a/zp-relayer/pool/BasePool.ts +++ b/zp-relayer/pool/BasePool.ts @@ -46,7 +46,7 @@ export abstract class BasePool { abstract onSend(p: ProcessResult, txHash: string): Promise abstract onConfirmed( p: ProcessResult, txHash: string, callback?: () => Promise,jobId?: string): Promise - async onFailed(txHash: string): Promise { + async onFailed(txHash: string, jobId?: string): Promise { logger.error('Transaction reverted', { txHash }) await this.clearOptimisticState() diff --git a/zp-relayer/pool/RelayPool.ts b/zp-relayer/pool/RelayPool.ts index ef5d53bd..9b233504 100644 --- a/zp-relayer/pool/RelayPool.ts +++ b/zp-relayer/pool/RelayPool.ts @@ -274,6 +274,23 @@ export class RelayPool extends BasePool { } } + async onFailed(txHash: string, jobId?: string): Promise { + super.onFailed(txHash, jobId); + + if(jobId) { + const poolJob = await poolTxQueue.getJob(jobId); + if (!poolJob) { + logger.error('Pool job not found', { jobId }); + } else { + poolJob.data.transaction.state = JobState.REVERTED; + poolJob.data.transaction.txHash = txHash; + await poolJob.update(poolJob.data); + } + } + + // TODO: remove cached tx from txStore + } + protected async cacheTxLocally(index: number, commit: string, txHash: string, memo: string) { // store or updating local tx store // (we should keep sent transaction until the indexer grab them) diff --git a/zp-relayer/workers/sentTxWorker.ts b/zp-relayer/workers/sentTxWorker.ts index 3f6df7f3..568c8c1c 100644 --- a/zp-relayer/workers/sentTxWorker.ts +++ b/zp-relayer/workers/sentTxWorker.ts @@ -87,8 +87,8 @@ export async function createSentTxWorker({ redis, mutex, pool, txManager }: ISen await pool.onConfirmed(processResult, txHash, updatePoolJobState,poolJobId) } else { - await pool.onFailed(txHash) - await updatePoolJobState() + await pool.onFailed(txHash, poolJobId); + //await updatePoolJobState() } } From f0a79882f8f7747c2a9093bf44859b7ad8278bcf Mon Sep 17 00:00:00 2001 From: Evgen Date: Fri, 31 May 2024 15:32:18 +0300 Subject: [PATCH 10/15] Clearing pending cache on tx revert --- zp-relayer/pool/RelayPool.ts | 1 + zp-relayer/state/TxStore.ts | 5 +++++ zp-relayer/workers/sentTxWorker.ts | 2 +- 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/zp-relayer/pool/RelayPool.ts b/zp-relayer/pool/RelayPool.ts index 9b233504..cbf692a1 100644 --- a/zp-relayer/pool/RelayPool.ts +++ b/zp-relayer/pool/RelayPool.ts @@ -276,6 +276,7 @@ export class RelayPool extends BasePool { async onFailed(txHash: string, jobId?: string): Promise { super.onFailed(txHash, jobId); + this.txStore.removeAll(); if(jobId) { const poolJob = await poolTxQueue.getJob(jobId); diff --git a/zp-relayer/state/TxStore.ts b/zp-relayer/state/TxStore.ts index e16193df..2b5f735d 100644 --- a/zp-relayer/state/TxStore.ts +++ b/zp-relayer/state/TxStore.ts @@ -19,4 +19,9 @@ export class TxStore { async getAll() { return await this.redis.hgetall(this.name) } + + async removeAll() { + const allKeys = await this.getAll().then(res => Object.keys(res)) + await this.redis.hdel(this.name, ...allKeys) + } } \ No newline at end of file diff --git a/zp-relayer/workers/sentTxWorker.ts b/zp-relayer/workers/sentTxWorker.ts index 568c8c1c..d9a838fc 100644 --- a/zp-relayer/workers/sentTxWorker.ts +++ b/zp-relayer/workers/sentTxWorker.ts @@ -85,7 +85,7 @@ export async function createSentTxWorker({ redis, mutex, pool, txManager }: ISen // Successful jobLogger.info('Transaction was successfully mined', { txHash, blockNumber: tx.blockNumber }) - await pool.onConfirmed(processResult, txHash, updatePoolJobState,poolJobId) + await pool.onConfirmed(processResult, txHash, updatePoolJobState, poolJobId) } else { await pool.onFailed(txHash, poolJobId); //await updatePoolJobState() From b83b7e8423d8b72764d677bcac167524c9a66946 Mon Sep 17 00:00:00 2001 From: Alexander Filippov Date: Mon, 10 Jun 2024 12:58:33 +0300 Subject: [PATCH 11/15] Fix minor bugs (#218) * Fix potential indexer gap * Fix prover fee * Don't import FeeOptions * Use applyDenominator * Move INDEXER_BLOCK_CONFIRMATIONS to config --- zp-relayer/configs/commitmentWatcherConfig.ts | 4 ++-- zp-relayer/configs/indexerConfig.ts | 1 + zp-relayer/pool/BasePool.ts | 18 ++++++------------ zp-relayer/pool/DefaultPool.ts | 3 ++- zp-relayer/pool/FinalizerPool.ts | 4 ++-- zp-relayer/pool/IndexerPool.ts | 6 +++--- .../services/commitment-watcher/index.ts | 4 ++-- zp-relayer/services/commitment-watcher/init.ts | 2 ++ .../services/commitment-watcher/router.ts | 9 +++++++-- zp-relayer/services/indexer/init.ts | 7 ++++--- 10 files changed, 31 insertions(+), 27 deletions(-) diff --git a/zp-relayer/configs/commitmentWatcherConfig.ts b/zp-relayer/configs/commitmentWatcherConfig.ts index bf0557f3..dff91337 100644 --- a/zp-relayer/configs/commitmentWatcherConfig.ts +++ b/zp-relayer/configs/commitmentWatcherConfig.ts @@ -3,7 +3,7 @@ import { getBaseConfig } from './baseConfig' import { getGasPriceConfig } from './common/gasPriceConfig' import { getNetworkConfig } from './common/networkConfig' import { getTxManagerConfig } from './common/txManagerConfig' -import { zBooleanString } from './common/utils' +import { zBN, zBooleanString } from './common/utils' const zSchema = z.object({ COMMITMENT_WATCHER_PORT: z.coerce.number().default(8000), @@ -14,7 +14,7 @@ const zSchema = z.object({ COMMITMENT_WATCHER_TX_VK_PATH: z.string().default('../params/transfer_verification_key.json'), COMMITMENT_WATCHER_FETCH_INTERVAL: z.coerce.number().default(10000), COMMITMENT_WATCHER_TX_REDUNDANCY: zBooleanString().default('false'), - COMMITMENT_WATCHER_FEE: z.coerce.number().default(100_000_000), + COMMITMENT_WATCHER_FEE: zBN().default("100_000_000"), }) const network = getNetworkConfig() diff --git a/zp-relayer/configs/indexerConfig.ts b/zp-relayer/configs/indexerConfig.ts index 5632f848..943a2bd0 100644 --- a/zp-relayer/configs/indexerConfig.ts +++ b/zp-relayer/configs/indexerConfig.ts @@ -10,6 +10,7 @@ const schema = z.object({ INDEXER_STATE_DIR_PATH: z.string().default('./INDEXER_STATE'), INDEXER_TX_VK_PATH: z.string().default('../params/transfer_verification_key.json'), INDEXER_TOKEN_ADDRESS: z.string(), + INDEXER_BLOCK_CONFIRMATIONS: z.coerce.number().default(1), }) const config = schema.parse(process.env) diff --git a/zp-relayer/pool/BasePool.ts b/zp-relayer/pool/BasePool.ts index 3868025d..ff90742b 100644 --- a/zp-relayer/pool/BasePool.ts +++ b/zp-relayer/pool/BasePool.ts @@ -143,7 +143,7 @@ export abstract class BasePool { return lastBlockNumber } - async syncState(startBlock?: number, indexerUrl?: string) { + async syncState(startBlock?: number, lastBlock?: number, indexerUrl?: string) { logger.debug('Syncing state; starting from block %d', startBlock) const localIndex = this.state.getNextIndex() @@ -166,10 +166,10 @@ export abstract class BasePool { if (indexerUrl) { await this.syncStateFromIndexer(indexerUrl) - } else if (startBlock) { - await this.syncStateFromContract(startBlock, contractIndex, localIndex) + } else if (startBlock && lastBlock) { + await this.syncStateFromContract(startBlock, lastBlock, contractIndex, localIndex) } else { - throw new Error('Either startBlock or indexerUrl should be provided for sync') + throw new Error('Either (startBlock, lastBlock) or indexerUrl should be provided for sync') } const newLocalIndex = this.state.getNextIndex() @@ -217,23 +217,17 @@ export abstract class BasePool { }) } - async syncStateFromContract(startBlock: number, contractIndex: number, localIndex: number) { + async syncStateFromContract(startBlock: number, lastBlock: number, contractIndex: number, localIndex: number) { const numTxs = Math.floor((contractIndex - localIndex) / OUTPLUSONE) if (numTxs < 0) { // TODO: rollback state throw new Error('State is corrupted, contract index is less than local index') } - const missedIndices = Array(numTxs) - for (let i = 0; i < numTxs; i++) { - missedIndices[i] = localIndex + (i + 1) * OUTPLUSONE - } - - const lastBlockNumber = (await this.getLastBlockToProcess()) + 1 for await (const batch of this.network.getEvents({ contract: this.network.pool, startBlock, - lastBlock: lastBlockNumber, + lastBlock, event: 'Message', batchSize: this.config.eventsBatchSize, })) { diff --git a/zp-relayer/pool/DefaultPool.ts b/zp-relayer/pool/DefaultPool.ts index 42a37f16..96d4c411 100644 --- a/zp-relayer/pool/DefaultPool.ts +++ b/zp-relayer/pool/DefaultPool.ts @@ -75,7 +75,8 @@ export class DefaultPool extends BasePool { } await this.permitRecover?.initializeDomain() if (startBlock) { - await this.syncState(startBlock) + const lastBlock = await this.getLastBlockToProcess() + await this.syncState(startBlock, lastBlock) } this.isInitialized = true } diff --git a/zp-relayer/pool/FinalizerPool.ts b/zp-relayer/pool/FinalizerPool.ts index 3c744776..029a8110 100644 --- a/zp-relayer/pool/FinalizerPool.ts +++ b/zp-relayer/pool/FinalizerPool.ts @@ -28,7 +28,7 @@ export class FinalizerPool extends BasePool { this.denominator = toBN(await this.network.pool.call('denominator')) this.poolId = toBN(await this.network.pool.call('pool_id')) - await this.syncState(undefined, indexerUrl) + await this.syncState(undefined, undefined, indexerUrl) this.isInitialized = true } @@ -38,7 +38,7 @@ export class FinalizerPool extends BasePool { async buildFinalizeTx({ transaction: { outCommit }, }: PoolTx): Promise> { - await this.syncState(undefined, this.indexerUrl) + await this.syncState(undefined, undefined, this.indexerUrl) const func = 'proveTreeUpdate(uint256,uint256[8],uint256)' diff --git a/zp-relayer/pool/IndexerPool.ts b/zp-relayer/pool/IndexerPool.ts index 4cbc29fe..a673331e 100644 --- a/zp-relayer/pool/IndexerPool.ts +++ b/zp-relayer/pool/IndexerPool.ts @@ -6,14 +6,14 @@ import { type PermitRecover } from '@/utils/permit/types' export class IndexerPool extends BasePool { public permitRecover: PermitRecover | null = null - async init(startBlock: number | null = null) { + async init(startBlock: number | null = null, lastBlock: number | null = null) { if (this.isInitialized) return this.denominator = toBN(await this.network.pool.call('denominator')) this.poolId = toBN(await this.network.pool.call('pool_id')) - if (startBlock) { - await this.syncState(startBlock) + if (startBlock && lastBlock) { + await this.syncState(startBlock, lastBlock) } this.isInitialized = true } diff --git a/zp-relayer/services/commitment-watcher/index.ts b/zp-relayer/services/commitment-watcher/index.ts index 9a7b2692..5ce49903 100644 --- a/zp-relayer/services/commitment-watcher/index.ts +++ b/zp-relayer/services/commitment-watcher/index.ts @@ -4,10 +4,10 @@ import express from 'express' import { init } from './init' import { createRouter } from './router' -init().then(() => { +init().then((pool) => { const app = express() - app.use(createRouter()) + app.use(createRouter(pool)) const PORT = config.COMMITMENT_WATCHER_PORT app.listen(PORT, () => logger.info(`Started commitment-watcher on port ${PORT}`)) }) diff --git a/zp-relayer/services/commitment-watcher/init.ts b/zp-relayer/services/commitment-watcher/init.ts index 18cb4841..7c4a9046 100644 --- a/zp-relayer/services/commitment-watcher/init.ts +++ b/zp-relayer/services/commitment-watcher/init.ts @@ -113,4 +113,6 @@ export async function init() { workers.forEach(w => w.run()) runWatcher(pool) + + return pool } diff --git a/zp-relayer/services/commitment-watcher/router.ts b/zp-relayer/services/commitment-watcher/router.ts index 8cb4b850..03243265 100644 --- a/zp-relayer/services/commitment-watcher/router.ts +++ b/zp-relayer/services/commitment-watcher/router.ts @@ -1,11 +1,14 @@ import config from '@/configs/commitmentWatcherConfig' import { logger } from '@/lib/appLogger' +import { BasePool } from '@/pool/BasePool' import { poolTxQueue, WorkerTx, WorkerTxType } from '@/queue/poolTxQueue' +import { applyDenominator } from '@/utils/helpers' import { ValidationError } from '@/validation/api/validation' import cors from 'cors' import express, { NextFunction, Request, Response } from 'express' +import { toBN } from 'web3-utils' -export function createRouter() { +export function createRouter(pool: BasePool) { const router = express.Router() router.use(cors()) @@ -26,7 +29,9 @@ export function createRouter() { }) router.get('/fee', (req, res) => { - res.json({ fee: config.COMMITMENT_WATCHER_FEE }) + const dInverse = toBN(1).shln(255) + const fee = applyDenominator(config.COMMITMENT_WATCHER_FEE, pool.denominator.xor(dInverse)) + res.json({ fee: fee.toString(10) }) }) router.get('/job/:commitment', async (req, res) => { diff --git a/zp-relayer/services/indexer/init.ts b/zp-relayer/services/indexer/init.ts index 163b3d30..d582624b 100644 --- a/zp-relayer/services/indexer/init.ts +++ b/zp-relayer/services/indexer/init.ts @@ -12,12 +12,13 @@ export async function init() { eventsBatchSize: config.base.COMMON_EVENTS_PROCESSING_BATCH_SIZE, }) - await Promise.all([networkBackend.init(), pool.init(config.base.COMMON_START_BLOCK)]) + const lastInitialSyncBlock = await pool.getLastBlockToProcess() + await Promise.all([networkBackend.init(), pool.init(config.base.COMMON_START_BLOCK, lastInitialSyncBlock)]) - const startBlock = await pool.getLastBlockToProcess() + const startBlock = lastInitialSyncBlock + 1 const watcher = new Watcher(networkBackend, networkBackend.pool, 'pool-indexer', { event: 'allEvents', - blockConfirmations: parseInt(process.env.INDEXER_BLOCK_CONFIRMATIONS || '1'), + blockConfirmations: config.INDEXER_BLOCK_CONFIRMATIONS, startBlock, eventPollingInterval: parseInt(process.env.WATCHER_EVENT_POLLING_INTERVAL || '10000'), batchSize: config.base.COMMON_EVENTS_PROCESSING_BATCH_SIZE, From b6975c21c530bdabb54130fd02500aff50691a1b Mon Sep 17 00:00:00 2001 From: r0wdy1 <103738251+r0wdy1@users.noreply.github.com> Date: Thu, 13 Jun 2024 13:04:01 +0400 Subject: [PATCH 12/15] Update zp-relayer/workers/sentTxWorker.ts Co-authored-by: Alexander Filippov --- zp-relayer/workers/sentTxWorker.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/zp-relayer/workers/sentTxWorker.ts b/zp-relayer/workers/sentTxWorker.ts index d9a838fc..18336b9e 100644 --- a/zp-relayer/workers/sentTxWorker.ts +++ b/zp-relayer/workers/sentTxWorker.ts @@ -88,7 +88,6 @@ export async function createSentTxWorker({ redis, mutex, pool, txManager }: ISen await pool.onConfirmed(processResult, txHash, updatePoolJobState, poolJobId) } else { await pool.onFailed(txHash, poolJobId); - //await updatePoolJobState() } } From 135908d8963117fc205dd912f4b5c3bdc1afada2 Mon Sep 17 00:00:00 2001 From: r0wdy1 <103738251+r0wdy1@users.noreply.github.com> Date: Thu, 13 Jun 2024 13:04:11 +0400 Subject: [PATCH 13/15] Update zp-relayer/pool/RelayPool.ts Co-authored-by: Alexander Filippov --- zp-relayer/pool/RelayPool.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/zp-relayer/pool/RelayPool.ts b/zp-relayer/pool/RelayPool.ts index cbf692a1..ca098beb 100644 --- a/zp-relayer/pool/RelayPool.ts +++ b/zp-relayer/pool/RelayPool.ts @@ -289,7 +289,6 @@ export class RelayPool extends BasePool { } } - // TODO: remove cached tx from txStore } protected async cacheTxLocally(index: number, commit: string, txHash: string, memo: string) { From a3120433a18fd38245534a0ed110547fccb577ae Mon Sep 17 00:00:00 2001 From: r0wdy1 <103738251+r0wdy1@users.noreply.github.com> Date: Thu, 13 Jun 2024 13:15:28 +0400 Subject: [PATCH 14/15] Update zp-relayer/pool/BasePool.ts Co-authored-by: Alexander Filippov --- zp-relayer/pool/BasePool.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zp-relayer/pool/BasePool.ts b/zp-relayer/pool/BasePool.ts index ff90742b..b39d5d3e 100644 --- a/zp-relayer/pool/BasePool.ts +++ b/zp-relayer/pool/BasePool.ts @@ -44,7 +44,7 @@ export abstract class BasePool { abstract init(...args: any): Promise abstract onSend(p: ProcessResult, txHash: string): Promise - abstract onConfirmed( p: ProcessResult, txHash: string, callback?: () => Promise,jobId?: string): Promise + abstract onConfirmed(p: ProcessResult, txHash: string, callback?: () => Promise, jobId?: string): Promise async onFailed(txHash: string, jobId?: string): Promise { logger.error('Transaction reverted', { txHash }) From 0edb5e81b1ec5f45d8868825f48e1ac365844583 Mon Sep 17 00:00:00 2001 From: vladimir Date: Thu, 13 Jun 2024 20:14:50 +0300 Subject: [PATCH 15/15] make jobId mandatory for onFailed callback --- zp-relayer/pool/BasePool.ts | 2 +- zp-relayer/pool/RelayPool.ts | 24 ++++++++++-------------- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/zp-relayer/pool/BasePool.ts b/zp-relayer/pool/BasePool.ts index b39d5d3e..e7f264ce 100644 --- a/zp-relayer/pool/BasePool.ts +++ b/zp-relayer/pool/BasePool.ts @@ -46,7 +46,7 @@ export abstract class BasePool { abstract onSend(p: ProcessResult, txHash: string): Promise abstract onConfirmed(p: ProcessResult, txHash: string, callback?: () => Promise, jobId?: string): Promise - async onFailed(txHash: string, jobId?: string): Promise { + async onFailed(txHash: string, jobId: string): Promise { logger.error('Transaction reverted', { txHash }) await this.clearOptimisticState() diff --git a/zp-relayer/pool/RelayPool.ts b/zp-relayer/pool/RelayPool.ts index ca098beb..4ddbc013 100644 --- a/zp-relayer/pool/RelayPool.ts +++ b/zp-relayer/pool/RelayPool.ts @@ -260,7 +260,7 @@ export class RelayPool extends BasePool { async onConfirmed(res: ProcessResult, txHash: string, callback?: () => Promise, jobId?: string): Promise { logger.debug("Updating pool job %s completed, txHash %s", jobId, txHash); - if(jobId) { + if (jobId) { const poolJob = await poolTxQueue.getJob(jobId); if (!poolJob) { logger.error('Pool job not found', { jobId }); @@ -274,21 +274,17 @@ export class RelayPool extends BasePool { } } - async onFailed(txHash: string, jobId?: string): Promise { + async onFailed(txHash: string, jobId: string): Promise { super.onFailed(txHash, jobId); - this.txStore.removeAll(); - - if(jobId) { - const poolJob = await poolTxQueue.getJob(jobId); - if (!poolJob) { - logger.error('Pool job not found', { jobId }); - } else { - poolJob.data.transaction.state = JobState.REVERTED; - poolJob.data.transaction.txHash = txHash; - await poolJob.update(poolJob.data); - } + this.txStore.remove(jobId); + const poolJob = await poolTxQueue.getJob(jobId); + if (!poolJob) { + logger.error('Pool job not found', { jobId }); + } else { + poolJob.data.transaction.state = JobState.REVERTED; + poolJob.data.transaction.txHash = txHash; + await poolJob.update(poolJob.data); } - } protected async cacheTxLocally(index: number, commit: string, txHash: string, memo: string) {