diff --git a/zp-relayer/common/serviceUtils.ts b/zp-relayer/common/serviceUtils.ts index 7361a68f..649e7642 100644 --- a/zp-relayer/common/serviceUtils.ts +++ b/zp-relayer/common/serviceUtils.ts @@ -102,6 +102,7 @@ export function buildTxManager( gasPriceBumpFactor: tmConfig.TX_MIN_GAS_PRICE_BUMP_FACTOR, gasPriceSurplus: tmConfig.TX_GAS_PRICE_SURPLUS, gasPriceMaxFeeLimit: tmConfig.TX_MAX_FEE_PER_GAS_LIMIT, + waitingFundsTimeout: tmConfig.BALANCE_CHECK_TIMEOUT }) } else { throw new Error('Unsupported network backend') diff --git a/zp-relayer/configs/commitmentWatcherConfig.ts b/zp-relayer/configs/commitmentWatcherConfig.ts index bf0557f3..dff91337 100644 --- a/zp-relayer/configs/commitmentWatcherConfig.ts +++ b/zp-relayer/configs/commitmentWatcherConfig.ts @@ -3,7 +3,7 @@ import { getBaseConfig } from './baseConfig' import { getGasPriceConfig } from './common/gasPriceConfig' import { getNetworkConfig } from './common/networkConfig' import { getTxManagerConfig } from './common/txManagerConfig' -import { zBooleanString } from './common/utils' +import { zBN, zBooleanString } from './common/utils' const zSchema = z.object({ COMMITMENT_WATCHER_PORT: z.coerce.number().default(8000), @@ -14,7 +14,7 @@ const zSchema = z.object({ COMMITMENT_WATCHER_TX_VK_PATH: z.string().default('../params/transfer_verification_key.json'), COMMITMENT_WATCHER_FETCH_INTERVAL: z.coerce.number().default(10000), COMMITMENT_WATCHER_TX_REDUNDANCY: zBooleanString().default('false'), - COMMITMENT_WATCHER_FEE: z.coerce.number().default(100_000_000), + COMMITMENT_WATCHER_FEE: zBN().default("100_000_000"), }) const network = getNetworkConfig() diff --git a/zp-relayer/configs/common/txManagerConfig.ts b/zp-relayer/configs/common/txManagerConfig.ts index c1ec45d9..54a03f71 100644 --- a/zp-relayer/configs/common/txManagerConfig.ts +++ b/zp-relayer/configs/common/txManagerConfig.ts @@ -7,10 +7,12 @@ import { zBN } from './utils' const zBaseConfig = z .object({ TX_PRIVATE_KEY: z.string(), + RELAYER_INSUFFICIENT_BALANCE_CHECK_TIMEOUT: z.coerce.number().default(5000), }) .transform(o => ({ TX_ADDRESS: new Web3().eth.accounts.privateKeyToAccount(o.TX_PRIVATE_KEY).address, TX_PRIVATE_KEY: o.TX_PRIVATE_KEY, + BALANCE_CHECK_TIMEOUT: o.RELAYER_INSUFFICIENT_BALANCE_CHECK_TIMEOUT, })) const zTxGas = z diff --git a/zp-relayer/configs/indexerConfig.ts b/zp-relayer/configs/indexerConfig.ts index 5632f848..943a2bd0 100644 --- a/zp-relayer/configs/indexerConfig.ts +++ b/zp-relayer/configs/indexerConfig.ts @@ -10,6 +10,7 @@ const schema = z.object({ INDEXER_STATE_DIR_PATH: z.string().default('./INDEXER_STATE'), INDEXER_TX_VK_PATH: z.string().default('../params/transfer_verification_key.json'), INDEXER_TOKEN_ADDRESS: z.string(), + INDEXER_BLOCK_CONFIRMATIONS: z.coerce.number().default(1), }) const config = schema.parse(process.env) diff --git a/zp-relayer/lib/network/evm/EvmTxManager.ts b/zp-relayer/lib/network/evm/EvmTxManager.ts index a0816012..8b94cb2b 100644 --- a/zp-relayer/lib/network/evm/EvmTxManager.ts +++ b/zp-relayer/lib/network/evm/EvmTxManager.ts @@ -22,14 +22,17 @@ import { Mutex } from 'async-mutex' import BN from 'bn.js' import type { Redis } from 'ioredis' import Web3 from 'web3' +import { toBN } from 'web3-utils' import type { TransactionConfig } from 'web3-core' import { Logger } from 'winston' +import promiseRetry from 'promise-retry' export interface EvmTxManagerConfig { redis: Redis gasPriceBumpFactor: number gasPriceSurplus: number gasPriceMaxFeeLimit: BN | null + waitingFundsTimeout: number } type ExtraInfo = TransactionConfig @@ -264,4 +267,27 @@ export class EvmTxManager implements TransactionManager { }) ) } + + waitingForFunds(minimumBalance: BN, cb: (balance: BN) => void): Promise { + return promiseRetry( + async retry => { + logger.debug('Getting manager balance') + const newBalance = toBN(await this.web3.eth.getBalance(this.address)) + const balanceLog = { balance: newBalance.toString(10), minimumBalance: minimumBalance.toString(10) } + if (newBalance.gte(minimumBalance)) { + logger.info('Relayer has minimum necessary balance', balanceLog) + cb(newBalance) + } else { + logger.warn('Relayer balance is still less than the minimum', balanceLog) + retry(new Error('Not enough balance')) + } + }, + { + forever: true, + factor: 1, + maxTimeout: this.config.waitingFundsTimeout, + minTimeout: this.config.waitingFundsTimeout, + } + ) + } } diff --git a/zp-relayer/lib/network/tron/TronTxManager.ts b/zp-relayer/lib/network/tron/TronTxManager.ts index 1e1a5e1f..0dd0b344 100644 --- a/zp-relayer/lib/network/tron/TronTxManager.ts +++ b/zp-relayer/lib/network/tron/TronTxManager.ts @@ -1,4 +1,5 @@ import { PreparedTx, SendAttempt, SendError, SendTx, TransactionManager, TxInfo } from '../types' +import BN from 'bn.js' interface ExtraInfo {} @@ -69,4 +70,8 @@ export class TronTxManager implements TransactionManager { const preparedTx = await this.prepareTx(sendTx) return this.sendPreparedTx(preparedTx) } + + waitingForFunds(minimumBalance: BN, cb: (balance: BN) => void): Promise { + throw new Error('Method not implemented'); + } } diff --git a/zp-relayer/lib/network/types.ts b/zp-relayer/lib/network/types.ts index 20efcc81..0be72e09 100644 --- a/zp-relayer/lib/network/types.ts +++ b/zp-relayer/lib/network/types.ts @@ -1,6 +1,7 @@ import type { TransactionConfig } from 'web3-core' import type { EthereumContract } from './evm/EvmContract' import type { TronContract } from './tron/TronContract' +import BN from 'bn.js' export enum Network { Tron = 'tron', @@ -75,6 +76,7 @@ export interface TransactionManager { attempt?: SendAttempt error?: SendError }> + waitingForFunds(minimumBalance: BN, cb: (balance: BN) => void): Promise; } export interface INetworkContract { diff --git a/zp-relayer/pool/BasePool.ts b/zp-relayer/pool/BasePool.ts index bb53805d..e7f264ce 100644 --- a/zp-relayer/pool/BasePool.ts +++ b/zp-relayer/pool/BasePool.ts @@ -44,9 +44,9 @@ export abstract class BasePool { abstract init(...args: any): Promise abstract onSend(p: ProcessResult, txHash: string): Promise - abstract onConfirmed(p: ProcessResult, txHash: string, callback?: () => Promise): Promise + abstract onConfirmed(p: ProcessResult, txHash: string, callback?: () => Promise, jobId?: string): Promise - async onFailed(txHash: string): Promise { + async onFailed(txHash: string, jobId: string): Promise { logger.error('Transaction reverted', { txHash }) await this.clearOptimisticState() @@ -143,7 +143,7 @@ export abstract class BasePool { return lastBlockNumber } - async syncState(startBlock?: number, indexerUrl?: string) { + async syncState(startBlock?: number, lastBlock?: number, indexerUrl?: string) { logger.debug('Syncing state; starting from block %d', startBlock) const localIndex = this.state.getNextIndex() @@ -166,10 +166,10 @@ export abstract class BasePool { if (indexerUrl) { await this.syncStateFromIndexer(indexerUrl) - } else if (startBlock) { - await this.syncStateFromContract(startBlock, contractIndex, localIndex) + } else if (startBlock && lastBlock) { + await this.syncStateFromContract(startBlock, lastBlock, contractIndex, localIndex) } else { - throw new Error('Either startBlock or indexerUrl should be provided for sync') + throw new Error('Either (startBlock, lastBlock) or indexerUrl should be provided for sync') } const newLocalIndex = this.state.getNextIndex() @@ -217,23 +217,17 @@ export abstract class BasePool { }) } - async syncStateFromContract(startBlock: number, contractIndex: number, localIndex: number) { + async syncStateFromContract(startBlock: number, lastBlock: number, contractIndex: number, localIndex: number) { const numTxs = Math.floor((contractIndex - localIndex) / OUTPLUSONE) if (numTxs < 0) { // TODO: rollback state throw new Error('State is corrupted, contract index is less than local index') } - const missedIndices = Array(numTxs) - for (let i = 0; i < numTxs; i++) { - missedIndices[i] = localIndex + (i + 1) * OUTPLUSONE - } - - const lastBlockNumber = (await this.getLastBlockToProcess()) + 1 for await (const batch of this.network.getEvents({ contract: this.network.pool, startBlock, - lastBlock: lastBlockNumber, + lastBlock, event: 'Message', batchSize: this.config.eventsBatchSize, })) { diff --git a/zp-relayer/pool/DefaultPool.ts b/zp-relayer/pool/DefaultPool.ts index 42a37f16..96d4c411 100644 --- a/zp-relayer/pool/DefaultPool.ts +++ b/zp-relayer/pool/DefaultPool.ts @@ -75,7 +75,8 @@ export class DefaultPool extends BasePool { } await this.permitRecover?.initializeDomain() if (startBlock) { - await this.syncState(startBlock) + const lastBlock = await this.getLastBlockToProcess() + await this.syncState(startBlock, lastBlock) } this.isInitialized = true } diff --git a/zp-relayer/pool/FinalizerPool.ts b/zp-relayer/pool/FinalizerPool.ts index 3c744776..029a8110 100644 --- a/zp-relayer/pool/FinalizerPool.ts +++ b/zp-relayer/pool/FinalizerPool.ts @@ -28,7 +28,7 @@ export class FinalizerPool extends BasePool { this.denominator = toBN(await this.network.pool.call('denominator')) this.poolId = toBN(await this.network.pool.call('pool_id')) - await this.syncState(undefined, indexerUrl) + await this.syncState(undefined, undefined, indexerUrl) this.isInitialized = true } @@ -38,7 +38,7 @@ export class FinalizerPool extends BasePool { async buildFinalizeTx({ transaction: { outCommit }, }: PoolTx): Promise> { - await this.syncState(undefined, this.indexerUrl) + await this.syncState(undefined, undefined, this.indexerUrl) const func = 'proveTreeUpdate(uint256,uint256[8],uint256)' diff --git a/zp-relayer/pool/IndexerPool.ts b/zp-relayer/pool/IndexerPool.ts index 4cbc29fe..a673331e 100644 --- a/zp-relayer/pool/IndexerPool.ts +++ b/zp-relayer/pool/IndexerPool.ts @@ -6,14 +6,14 @@ import { type PermitRecover } from '@/utils/permit/types' export class IndexerPool extends BasePool { public permitRecover: PermitRecover | null = null - async init(startBlock: number | null = null) { + async init(startBlock: number | null = null, lastBlock: number | null = null) { if (this.isInitialized) return this.denominator = toBN(await this.network.pool.call('denominator')) this.poolId = toBN(await this.network.pool.call('pool_id')) - if (startBlock) { - await this.syncState(startBlock) + if (startBlock && lastBlock) { + await this.syncState(startBlock, lastBlock) } this.isInitialized = true } diff --git a/zp-relayer/pool/RelayPool.ts b/zp-relayer/pool/RelayPool.ts index 477c0ce3..4ddbc013 100644 --- a/zp-relayer/pool/RelayPool.ts +++ b/zp-relayer/pool/RelayPool.ts @@ -2,7 +2,7 @@ import config from '@/configs/relayerConfig' import { logger } from '@/lib/appLogger' import { Network } from '@/lib/network' import { redis } from '@/lib/redisClient' -import { PoolTx, WorkerTxType } from '@/queue/poolTxQueue' +import { JobState, PoolTx, poolTxQueue, WorkerTxType } from '@/queue/poolTxQueue' import { TxStore } from '@/state/TxStore' import { ENERGY_SIZE, MOCK_CALLDATA, PERMIT2_CONTRACT, TOKEN_SIZE, TRANSFER_INDEX_SIZE } from '@/utils/constants' import { @@ -250,21 +250,53 @@ export class RelayPool extends BasePool { } async onSend({ outCommit, nullifier, memo, commitIndex }: ProcessResult, txHash: string): Promise { - const prefixedMemo = buildPrefixedMemo( - outCommit, - '0x0000000000000000000000000000000000000000000000000000000000000000', - memo - ) - - await this.txStore.add(commitIndex, prefixedMemo) - if (nullifier) { logger.debug('Adding nullifier %s to OS', nullifier) await this.optimisticState.nullifiers.add([nullifier]) } + + await this.cacheTxLocally(commitIndex, outCommit, txHash, memo); } - async onConfirmed(res: ProcessResult, txHash: string, callback?: () => Promise): Promise {} + async onConfirmed(res: ProcessResult, txHash: string, callback?: () => Promise, jobId?: string): Promise { + logger.debug("Updating pool job %s completed, txHash %s", jobId, txHash); + if (jobId) { + const poolJob = await poolTxQueue.getJob(jobId); + if (!poolJob) { + logger.error('Pool job not found', { jobId }); + } else { + poolJob.data.transaction.state = JobState.COMPLETED; + poolJob.data.transaction.txHash = txHash; + await poolJob.update(poolJob.data); + + await this.cacheTxLocally(res.commitIndex, res.outCommit, txHash, res.memo); + } + } + } + + async onFailed(txHash: string, jobId: string): Promise { + super.onFailed(txHash, jobId); + this.txStore.remove(jobId); + const poolJob = await poolTxQueue.getJob(jobId); + if (!poolJob) { + logger.error('Pool job not found', { jobId }); + } else { + poolJob.data.transaction.state = JobState.REVERTED; + poolJob.data.transaction.txHash = txHash; + await poolJob.update(poolJob.data); + } + } + + protected async cacheTxLocally(index: number, commit: string, txHash: string, memo: string) { + // store or updating local tx store + // (we should keep sent transaction until the indexer grab them) + const prefixedMemo = buildPrefixedMemo( + commit, + txHash, + memo + ); + await this.txStore.add(index, prefixedMemo); + } async getIndexerInfo() { const info = await fetchJson(this.indexerUrl, '/info', []) diff --git a/zp-relayer/services/commitment-watcher/index.ts b/zp-relayer/services/commitment-watcher/index.ts index 9a7b2692..5ce49903 100644 --- a/zp-relayer/services/commitment-watcher/index.ts +++ b/zp-relayer/services/commitment-watcher/index.ts @@ -4,10 +4,10 @@ import express from 'express' import { init } from './init' import { createRouter } from './router' -init().then(() => { +init().then((pool) => { const app = express() - app.use(createRouter()) + app.use(createRouter(pool)) const PORT = config.COMMITMENT_WATCHER_PORT app.listen(PORT, () => logger.info(`Started commitment-watcher on port ${PORT}`)) }) diff --git a/zp-relayer/services/commitment-watcher/init.ts b/zp-relayer/services/commitment-watcher/init.ts index 18cb4841..7c4a9046 100644 --- a/zp-relayer/services/commitment-watcher/init.ts +++ b/zp-relayer/services/commitment-watcher/init.ts @@ -113,4 +113,6 @@ export async function init() { workers.forEach(w => w.run()) runWatcher(pool) + + return pool } diff --git a/zp-relayer/services/commitment-watcher/router.ts b/zp-relayer/services/commitment-watcher/router.ts index 8cb4b850..03243265 100644 --- a/zp-relayer/services/commitment-watcher/router.ts +++ b/zp-relayer/services/commitment-watcher/router.ts @@ -1,11 +1,14 @@ import config from '@/configs/commitmentWatcherConfig' import { logger } from '@/lib/appLogger' +import { BasePool } from '@/pool/BasePool' import { poolTxQueue, WorkerTx, WorkerTxType } from '@/queue/poolTxQueue' +import { applyDenominator } from '@/utils/helpers' import { ValidationError } from '@/validation/api/validation' import cors from 'cors' import express, { NextFunction, Request, Response } from 'express' +import { toBN } from 'web3-utils' -export function createRouter() { +export function createRouter(pool: BasePool) { const router = express.Router() router.use(cors()) @@ -26,7 +29,9 @@ export function createRouter() { }) router.get('/fee', (req, res) => { - res.json({ fee: config.COMMITMENT_WATCHER_FEE }) + const dInverse = toBN(1).shln(255) + const fee = applyDenominator(config.COMMITMENT_WATCHER_FEE, pool.denominator.xor(dInverse)) + res.json({ fee: fee.toString(10) }) }) router.get('/job/:commitment', async (req, res) => { diff --git a/zp-relayer/services/indexer/init.ts b/zp-relayer/services/indexer/init.ts index 163b3d30..d582624b 100644 --- a/zp-relayer/services/indexer/init.ts +++ b/zp-relayer/services/indexer/init.ts @@ -12,12 +12,13 @@ export async function init() { eventsBatchSize: config.base.COMMON_EVENTS_PROCESSING_BATCH_SIZE, }) - await Promise.all([networkBackend.init(), pool.init(config.base.COMMON_START_BLOCK)]) + const lastInitialSyncBlock = await pool.getLastBlockToProcess() + await Promise.all([networkBackend.init(), pool.init(config.base.COMMON_START_BLOCK, lastInitialSyncBlock)]) - const startBlock = await pool.getLastBlockToProcess() + const startBlock = lastInitialSyncBlock + 1 const watcher = new Watcher(networkBackend, networkBackend.pool, 'pool-indexer', { event: 'allEvents', - blockConfirmations: parseInt(process.env.INDEXER_BLOCK_CONFIRMATIONS || '1'), + blockConfirmations: config.INDEXER_BLOCK_CONFIRMATIONS, startBlock, eventPollingInterval: parseInt(process.env.WATCHER_EVENT_POLLING_INTERVAL || '10000'), batchSize: config.base.COMMON_EVENTS_PROCESSING_BATCH_SIZE, diff --git a/zp-relayer/services/relayer/endpoints.ts b/zp-relayer/services/relayer/endpoints.ts index 45e4a097..6f6a650f 100644 --- a/zp-relayer/services/relayer/endpoints.ts +++ b/zp-relayer/services/relayer/endpoints.ts @@ -96,7 +96,7 @@ async function getTransactionsV2(req: Request, res: Response, { pool }: PoolInje throw new Error(`Failed to fetch transactions from indexer. Status: ${res.status}`) } const indexerTxs: string[] = await response.json() - + const lastIndex = offset + indexerTxs.length * OUTPLUSONE const txStore = (pool as RelayPool).txStore const indices = await txStore.getAll().then(keys => { @@ -106,27 +106,19 @@ async function getTransactionsV2(req: Request, res: Response, { pool }: PoolInje .sort(([i1], [i2]) => i1 - i2) }) - // TODO: optimize - const optimisticTxs = new Set() - const duplicates = new Set() - for (const tx of indexerTxs) { - const commit = tx.slice(65, 129) - for (const [index, memoV2] of indices) { - const commitLocal = memoV2.slice(0, 64) - if (commit === commitLocal) { - duplicates.add(index.toString()) - } else { - optimisticTxs.add(memoV2) - } + const indexerCommitments = indexerTxs.map(tx => tx.slice(65, 129)); + const optimisticTxs: string[] = [] + for (const [index, memo] of indices) { + const commitLocal = memo.slice(0, 64) + if (indexerCommitments.includes(commitLocal)) { + logger.info('Deleting index from optimistic state', { index }) + await txStore.remove(index.toString()) + } else { + optimisticTxs.push(txToV2Format('0', memo)) } } - for (const index of duplicates) { - logger.info('Deleting index from optimistic state', { index }) - await txStore.remove(index.toString()) - } - - const txs: string[] = [...indexerTxs, ...Array.from(optimisticTxs.values()).map(tx => txToV2Format('0', tx))] + const txs: string[] = [...indexerTxs, ...optimisticTxs] res.json(txs) } @@ -169,7 +161,7 @@ async function getJob(req: Request, res: Response, { pool }: PoolInjection) { let result: GetJobResponse = { resolvedJobId: jobId, createdOn: job.timestamp, - failedReason: null, + failedReason: job.failedReason, finishedOn: null, state, txHash, @@ -195,6 +187,19 @@ async function relayerInfo(req: Request, res: Response, { pool }: PoolInjection) } const info = await response.json() + const indexerMaxIdx = Math.max(parseInt(info.deltaIndex ?? '0'), parseInt(info.optimisticDeltaIndex ?? '0')) + + const txStore = (pool as RelayPool).txStore + const pendingCnt = await txStore.getAll() + .then(keys => { + return Object.entries(keys) + .map(([i]) => parseInt(i) as number) + .filter(i => indexerMaxIdx <= i) + }) + .then(a => a.length); + + info.pendingDeltaIndex = indexerMaxIdx + pendingCnt * OUTPLUSONE; + res.json(info) } @@ -268,6 +273,12 @@ async function getProverFee(req: Request, res: Response) { res.json(fee) } +async function getProverAddress(req: Request, res: Response) { + const url = new URL('/address', config.RELAYER_PROVER_URL) + const address = await fetch(url.toString()).then(r => r.json()) + res.json(address) +} + function root(req: Request, res: Response) { return res.sendStatus(200) } @@ -284,6 +295,7 @@ export default { getSiblings, getParamsHash, getProverFee, + getProverAddress, relayerVersion, root, } diff --git a/zp-relayer/services/relayer/router.ts b/zp-relayer/services/relayer/router.ts index 43a67a2c..f47aa6b4 100644 --- a/zp-relayer/services/relayer/router.ts +++ b/zp-relayer/services/relayer/router.ts @@ -72,6 +72,7 @@ export function createRouter({ feeManager, pool }: IRouterConfig) { }) ) router.get('/proverFee', wrapErr(endpoints.getProverFee)) + router.get('/proverAddress', wrapErr(endpoints.getProverAddress)) router.post('/sendTransactions', wrapErr(inject({ pool }, endpoints.sendTransactions))) router.get('/transactions/v2', wrapErr(inject({ pool }, endpoints.getTransactionsV2))) router.get('/merkle/root/:index?', wrapErr(inject({ pool }, endpoints.merkleRoot))) diff --git a/zp-relayer/state/TxStore.ts b/zp-relayer/state/TxStore.ts index 67782925..2b5f735d 100644 --- a/zp-relayer/state/TxStore.ts +++ b/zp-relayer/state/TxStore.ts @@ -19,4 +19,9 @@ export class TxStore { async getAll() { return await this.redis.hgetall(this.name) } -} + + async removeAll() { + const allKeys = await this.getAll().then(res => Object.keys(res)) + await this.redis.hdel(this.name, ...allKeys) + } +} \ No newline at end of file diff --git a/zp-relayer/workers/poolTxWorker.ts b/zp-relayer/workers/poolTxWorker.ts index ac52445e..ac9bb1a8 100644 --- a/zp-relayer/workers/poolTxWorker.ts +++ b/zp-relayer/workers/poolTxWorker.ts @@ -1,5 +1,6 @@ import { logger } from '@/lib/appLogger' import { JobState, PoolTx, WorkerTxType } from '@/queue/poolTxQueue' +import { poolTxQueue } from '@/queue/poolTxQueue' import { sentTxQueue } from '@/queue/sentTxQueue' import { TX_QUEUE_NAME } from '@/utils/constants' import { withErrorLog, withMutex } from '@/utils/helpers' @@ -7,6 +8,8 @@ import { TxValidationError } from '@/validation/tx/common' import { Job, Worker } from 'bullmq' import Redis from 'ioredis' import type { IPoolWorkerConfig } from './workerTypes' +import { isInsufficientBalanceError } from '@/utils/web3Errors' +import { toBN } from 'web3-utils' const REVERTED_SET = 'reverted' const RECHECK_ERROR = 'Waiting for next check' @@ -36,16 +39,25 @@ export async function createPoolTxWorker({ redis, mutex, pool, txManager }: IPoo const jobLogger = workerLogger.child({ jobId: job.id, traceId }) jobLogger.info('Processing...') - await pool.validateTx( - job.data, - { - // TODO: optional checks - }, - traceId - ) + try { + await pool.validateTx( + job.data, + { + // TODO: optional checks + }, + traceId + ) + } catch(e) { + job.data.transaction.state = JobState.FAILED; + job.failedReason = (e as Error).message; + await job.update(job.data); + throw e; + } + const processResult = await pool.buildTx(job.data) const { data, func } = processResult + const gas = 2000000; const preparedTx = await txManager.prepareTx({ txDesc: { to: pool.network.pool.address(), // TODO: mpc @@ -60,13 +72,26 @@ export async function createPoolTxWorker({ redis, mutex, pool, txManager }: IPoo }, extraData: { // TODO: abstract gas for EVM - gas: 2000000, + gas, }, }) const sendAttempt = preparedTx[1] try { await txManager.sendPreparedTx(preparedTx) } catch (e) { + if (isInsufficientBalanceError(e as Error)) { + if (sendAttempt.extraData.gas && sendAttempt.extraData.gasPrice) { + const minimumBalance = toBN(sendAttempt.extraData.gas).mul(toBN(sendAttempt.extraData.gasPrice)); + logger.error('Insufficient balance, waiting for funds', { minimumBalance: minimumBalance.toString(10) }) + + await Promise.all([poolTxQueue.pause(), sentTxQueue.pause()]) + txManager.waitingForFunds( + minimumBalance, + () => Promise.all([poolTxQueue.resume(), sentTxQueue.resume()]) + ) + } + } + logger.warn('Tx send failed; it will be re-sent later', { txHash: preparedTx[1].txHash, error: (e as Error).message, diff --git a/zp-relayer/workers/sentTxWorker.ts b/zp-relayer/workers/sentTxWorker.ts index 4ed7016d..18336b9e 100644 --- a/zp-relayer/workers/sentTxWorker.ts +++ b/zp-relayer/workers/sentTxWorker.ts @@ -85,10 +85,9 @@ export async function createSentTxWorker({ redis, mutex, pool, txManager }: ISen // Successful jobLogger.info('Transaction was successfully mined', { txHash, blockNumber: tx.blockNumber }) - await pool.onConfirmed(processResult, txHash, updatePoolJobState) + await pool.onConfirmed(processResult, txHash, updatePoolJobState, poolJobId) } else { - await pool.onFailed(txHash) - await updatePoolJobState() + await pool.onFailed(txHash, poolJobId); } }