Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

updates job status on queue inclusion #217

Merged
merged 15 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions zp-relayer/common/serviceUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ export function buildTxManager(
gasPriceBumpFactor: tmConfig.TX_MIN_GAS_PRICE_BUMP_FACTOR,
gasPriceSurplus: tmConfig.TX_GAS_PRICE_SURPLUS,
gasPriceMaxFeeLimit: tmConfig.TX_MAX_FEE_PER_GAS_LIMIT,
waitingFundsTimeout: tmConfig.BALANCE_CHECK_TIMEOUT
})
} else {
throw new Error('Unsupported network backend')
Expand Down
4 changes: 2 additions & 2 deletions zp-relayer/configs/commitmentWatcherConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { getBaseConfig } from './baseConfig'
import { getGasPriceConfig } from './common/gasPriceConfig'
import { getNetworkConfig } from './common/networkConfig'
import { getTxManagerConfig } from './common/txManagerConfig'
import { zBooleanString } from './common/utils'
import { zBN, zBooleanString } from './common/utils'

const zSchema = z.object({
COMMITMENT_WATCHER_PORT: z.coerce.number().default(8000),
Expand All @@ -14,7 +14,7 @@ const zSchema = z.object({
COMMITMENT_WATCHER_TX_VK_PATH: z.string().default('../params/transfer_verification_key.json'),
COMMITMENT_WATCHER_FETCH_INTERVAL: z.coerce.number().default(10000),
COMMITMENT_WATCHER_TX_REDUNDANCY: zBooleanString().default('false'),
COMMITMENT_WATCHER_FEE: z.coerce.number().default(100_000_000),
COMMITMENT_WATCHER_FEE: zBN().default("100_000_000"),
})

const network = getNetworkConfig()
Expand Down
2 changes: 2 additions & 0 deletions zp-relayer/configs/common/txManagerConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import { zBN } from './utils'
const zBaseConfig = z
.object({
TX_PRIVATE_KEY: z.string(),
RELAYER_INSUFFICIENT_BALANCE_CHECK_TIMEOUT: z.coerce.number().default(5000),
})
.transform(o => ({
TX_ADDRESS: new Web3().eth.accounts.privateKeyToAccount(o.TX_PRIVATE_KEY).address,
TX_PRIVATE_KEY: o.TX_PRIVATE_KEY,
BALANCE_CHECK_TIMEOUT: o.RELAYER_INSUFFICIENT_BALANCE_CHECK_TIMEOUT,
}))

const zTxGas = z
Expand Down
1 change: 1 addition & 0 deletions zp-relayer/configs/indexerConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const schema = z.object({
INDEXER_STATE_DIR_PATH: z.string().default('./INDEXER_STATE'),
INDEXER_TX_VK_PATH: z.string().default('../params/transfer_verification_key.json'),
INDEXER_TOKEN_ADDRESS: z.string(),
INDEXER_BLOCK_CONFIRMATIONS: z.coerce.number().default(1),
})

const config = schema.parse(process.env)
Expand Down
26 changes: 26 additions & 0 deletions zp-relayer/lib/network/evm/EvmTxManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@ import { Mutex } from 'async-mutex'
import BN from 'bn.js'
import type { Redis } from 'ioredis'
import Web3 from 'web3'
import { toBN } from 'web3-utils'
import type { TransactionConfig } from 'web3-core'
import { Logger } from 'winston'
import promiseRetry from 'promise-retry'

export interface EvmTxManagerConfig {
redis: Redis
gasPriceBumpFactor: number
gasPriceSurplus: number
gasPriceMaxFeeLimit: BN | null
waitingFundsTimeout: number
}

type ExtraInfo = TransactionConfig
Expand Down Expand Up @@ -264,4 +267,27 @@ export class EvmTxManager implements TransactionManager<ExtraInfo> {
})
)
}

waitingForFunds(minimumBalance: BN, cb: (balance: BN) => void): Promise<void> {
return promiseRetry(
async retry => {
logger.debug('Getting manager balance')
const newBalance = toBN(await this.web3.eth.getBalance(this.address))
const balanceLog = { balance: newBalance.toString(10), minimumBalance: minimumBalance.toString(10) }
if (newBalance.gte(minimumBalance)) {
logger.info('Relayer has minimum necessary balance', balanceLog)
cb(newBalance)
} else {
logger.warn('Relayer balance is still less than the minimum', balanceLog)
retry(new Error('Not enough balance'))
}
},
{
forever: true,
factor: 1,
maxTimeout: this.config.waitingFundsTimeout,
minTimeout: this.config.waitingFundsTimeout,
}
)
}
}
5 changes: 5 additions & 0 deletions zp-relayer/lib/network/tron/TronTxManager.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { PreparedTx, SendAttempt, SendError, SendTx, TransactionManager, TxInfo } from '../types'
import BN from 'bn.js'

interface ExtraInfo {}

Expand Down Expand Up @@ -69,4 +70,8 @@ export class TronTxManager implements TransactionManager<ExtraInfo> {
const preparedTx = await this.prepareTx(sendTx)
return this.sendPreparedTx(preparedTx)
}

waitingForFunds(minimumBalance: BN, cb: (balance: BN) => void): Promise<void> {
throw new Error('Method not implemented');
}
}
2 changes: 2 additions & 0 deletions zp-relayer/lib/network/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { TransactionConfig } from 'web3-core'
import type { EthereumContract } from './evm/EvmContract'
import type { TronContract } from './tron/TronContract'
import BN from 'bn.js'

export enum Network {
Tron = 'tron',
Expand Down Expand Up @@ -75,6 +76,7 @@ export interface TransactionManager<E> {
attempt?: SendAttempt<E>
error?: SendError
}>
waitingForFunds(minimumBalance: BN, cb: (balance: BN) => void): Promise<void>;
}

export interface INetworkContract {
Expand Down
22 changes: 8 additions & 14 deletions zp-relayer/pool/BasePool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ export abstract class BasePool<N extends Network = Network> {
abstract init(...args: any): Promise<void>

abstract onSend(p: ProcessResult<any>, txHash: string): Promise<void>
abstract onConfirmed(p: ProcessResult<any>, txHash: string, callback?: () => Promise<void>): Promise<void>
abstract onConfirmed(p: ProcessResult<any>, txHash: string, callback?: () => Promise<void>, jobId?: string): Promise<void>

async onFailed(txHash: string): Promise<void> {
async onFailed(txHash: string, jobId: string): Promise<void> {
logger.error('Transaction reverted', { txHash })

await this.clearOptimisticState()
Expand Down Expand Up @@ -143,7 +143,7 @@ export abstract class BasePool<N extends Network = Network> {
return lastBlockNumber
}

async syncState(startBlock?: number, indexerUrl?: string) {
async syncState(startBlock?: number, lastBlock?: number, indexerUrl?: string) {
logger.debug('Syncing state; starting from block %d', startBlock)

const localIndex = this.state.getNextIndex()
Expand All @@ -166,10 +166,10 @@ export abstract class BasePool<N extends Network = Network> {

if (indexerUrl) {
await this.syncStateFromIndexer(indexerUrl)
} else if (startBlock) {
await this.syncStateFromContract(startBlock, contractIndex, localIndex)
} else if (startBlock && lastBlock) {
await this.syncStateFromContract(startBlock, lastBlock, contractIndex, localIndex)
} else {
throw new Error('Either startBlock or indexerUrl should be provided for sync')
throw new Error('Either (startBlock, lastBlock) or indexerUrl should be provided for sync')
}

const newLocalIndex = this.state.getNextIndex()
Expand Down Expand Up @@ -217,23 +217,17 @@ export abstract class BasePool<N extends Network = Network> {
})
}

async syncStateFromContract(startBlock: number, contractIndex: number, localIndex: number) {
async syncStateFromContract(startBlock: number, lastBlock: number, contractIndex: number, localIndex: number) {
const numTxs = Math.floor((contractIndex - localIndex) / OUTPLUSONE)
if (numTxs < 0) {
// TODO: rollback state
throw new Error('State is corrupted, contract index is less than local index')
}

const missedIndices = Array(numTxs)
for (let i = 0; i < numTxs; i++) {
missedIndices[i] = localIndex + (i + 1) * OUTPLUSONE
}

const lastBlockNumber = (await this.getLastBlockToProcess()) + 1
for await (const batch of this.network.getEvents({
contract: this.network.pool,
startBlock,
lastBlock: lastBlockNumber,
lastBlock,
event: 'Message',
batchSize: this.config.eventsBatchSize,
})) {
Expand Down
3 changes: 2 additions & 1 deletion zp-relayer/pool/DefaultPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ export class DefaultPool extends BasePool {
}
await this.permitRecover?.initializeDomain()
if (startBlock) {
await this.syncState(startBlock)
const lastBlock = await this.getLastBlockToProcess()
await this.syncState(startBlock, lastBlock)
}
this.isInitialized = true
}
Expand Down
4 changes: 2 additions & 2 deletions zp-relayer/pool/FinalizerPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export class FinalizerPool extends BasePool {
this.denominator = toBN(await this.network.pool.call('denominator'))
this.poolId = toBN(await this.network.pool.call('pool_id'))

await this.syncState(undefined, indexerUrl)
await this.syncState(undefined, undefined, indexerUrl)

this.isInitialized = true
}
Expand All @@ -38,7 +38,7 @@ export class FinalizerPool extends BasePool {
async buildFinalizeTx({
transaction: { outCommit },
}: PoolTx<WorkerTxType.Finalize>): Promise<ProcessResult<FinalizerPool>> {
await this.syncState(undefined, this.indexerUrl)
await this.syncState(undefined, undefined, this.indexerUrl)

const func = 'proveTreeUpdate(uint256,uint256[8],uint256)'

Expand Down
6 changes: 3 additions & 3 deletions zp-relayer/pool/IndexerPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import { type PermitRecover } from '@/utils/permit/types'
export class IndexerPool extends BasePool {
public permitRecover: PermitRecover | null = null

async init(startBlock: number | null = null) {
async init(startBlock: number | null = null, lastBlock: number | null = null) {
if (this.isInitialized) return

this.denominator = toBN(await this.network.pool.call('denominator'))
this.poolId = toBN(await this.network.pool.call('pool_id'))

if (startBlock) {
await this.syncState(startBlock)
if (startBlock && lastBlock) {
await this.syncState(startBlock, lastBlock)
}
this.isInitialized = true
}
Expand Down
52 changes: 42 additions & 10 deletions zp-relayer/pool/RelayPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import config from '@/configs/relayerConfig'
import { logger } from '@/lib/appLogger'
import { Network } from '@/lib/network'
import { redis } from '@/lib/redisClient'
import { PoolTx, WorkerTxType } from '@/queue/poolTxQueue'
import { JobState, PoolTx, poolTxQueue, WorkerTxType } from '@/queue/poolTxQueue'
import { TxStore } from '@/state/TxStore'
import { ENERGY_SIZE, MOCK_CALLDATA, PERMIT2_CONTRACT, TOKEN_SIZE, TRANSFER_INDEX_SIZE } from '@/utils/constants'
import {
Expand Down Expand Up @@ -250,21 +250,53 @@ export class RelayPool extends BasePool<Network> {
}

async onSend({ outCommit, nullifier, memo, commitIndex }: ProcessResult<RelayPool>, txHash: string): Promise<void> {
const prefixedMemo = buildPrefixedMemo(
outCommit,
'0x0000000000000000000000000000000000000000000000000000000000000000',
memo
)

await this.txStore.add(commitIndex, prefixedMemo)

if (nullifier) {
logger.debug('Adding nullifier %s to OS', nullifier)
await this.optimisticState.nullifiers.add([nullifier])
}

await this.cacheTxLocally(commitIndex, outCommit, txHash, memo);
}

async onConfirmed(res: ProcessResult<RelayPool>, txHash: string, callback?: () => Promise<void>): Promise<void> {}
async onConfirmed(res: ProcessResult<RelayPool>, txHash: string, callback?: () => Promise<void>, jobId?: string): Promise<void> {
logger.debug("Updating pool job %s completed, txHash %s", jobId, txHash);
if (jobId) {
const poolJob = await poolTxQueue.getJob(jobId);
if (!poolJob) {
logger.error('Pool job not found', { jobId });
} else {
poolJob.data.transaction.state = JobState.COMPLETED;
poolJob.data.transaction.txHash = txHash;
await poolJob.update(poolJob.data);

await this.cacheTxLocally(res.commitIndex, res.outCommit, txHash, res.memo);
}
}
}

async onFailed(txHash: string, jobId: string): Promise<void> {
super.onFailed(txHash, jobId);
this.txStore.remove(jobId);
const poolJob = await poolTxQueue.getJob(jobId);
if (!poolJob) {
logger.error('Pool job not found', { jobId });
} else {
poolJob.data.transaction.state = JobState.REVERTED;
poolJob.data.transaction.txHash = txHash;
await poolJob.update(poolJob.data);
}
}

protected async cacheTxLocally(index: number, commit: string, txHash: string, memo: string) {
// store or updating local tx store
// (we should keep sent transaction until the indexer grab them)
const prefixedMemo = buildPrefixedMemo(
commit,
txHash,
memo
);
await this.txStore.add(index, prefixedMemo);
}

async getIndexerInfo() {
const info = await fetchJson(this.indexerUrl, '/info', [])
Expand Down
4 changes: 2 additions & 2 deletions zp-relayer/services/commitment-watcher/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import express from 'express'
import { init } from './init'
import { createRouter } from './router'

init().then(() => {
init().then((pool) => {
const app = express()

app.use(createRouter())
app.use(createRouter(pool))
const PORT = config.COMMITMENT_WATCHER_PORT
app.listen(PORT, () => logger.info(`Started commitment-watcher on port ${PORT}`))
})
2 changes: 2 additions & 0 deletions zp-relayer/services/commitment-watcher/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,6 @@ export async function init() {
workers.forEach(w => w.run())

runWatcher(pool)

return pool
}
9 changes: 7 additions & 2 deletions zp-relayer/services/commitment-watcher/router.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import config from '@/configs/commitmentWatcherConfig'
import { logger } from '@/lib/appLogger'
import { BasePool } from '@/pool/BasePool'
import { poolTxQueue, WorkerTx, WorkerTxType } from '@/queue/poolTxQueue'
import { applyDenominator } from '@/utils/helpers'
import { ValidationError } from '@/validation/api/validation'
import cors from 'cors'
import express, { NextFunction, Request, Response } from 'express'
import { toBN } from 'web3-utils'

export function createRouter() {
export function createRouter(pool: BasePool) {
const router = express.Router()

router.use(cors())
Expand All @@ -26,7 +29,9 @@ export function createRouter() {
})

router.get('/fee', (req, res) => {
res.json({ fee: config.COMMITMENT_WATCHER_FEE })
const dInverse = toBN(1).shln(255)
const fee = applyDenominator(config.COMMITMENT_WATCHER_FEE, pool.denominator.xor(dInverse))
res.json({ fee: fee.toString(10) })
})

router.get('/job/:commitment', async (req, res) => {
Expand Down
7 changes: 4 additions & 3 deletions zp-relayer/services/indexer/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ export async function init() {
eventsBatchSize: config.base.COMMON_EVENTS_PROCESSING_BATCH_SIZE,
})

await Promise.all([networkBackend.init(), pool.init(config.base.COMMON_START_BLOCK)])
const lastInitialSyncBlock = await pool.getLastBlockToProcess()
await Promise.all([networkBackend.init(), pool.init(config.base.COMMON_START_BLOCK, lastInitialSyncBlock)])

const startBlock = await pool.getLastBlockToProcess()
const startBlock = lastInitialSyncBlock + 1
const watcher = new Watcher(networkBackend, networkBackend.pool, 'pool-indexer', {
event: 'allEvents',
blockConfirmations: parseInt(process.env.INDEXER_BLOCK_CONFIRMATIONS || '1'),
blockConfirmations: config.INDEXER_BLOCK_CONFIRMATIONS,
startBlock,
eventPollingInterval: parseInt(process.env.WATCHER_EVENT_POLLING_INTERVAL || '10000'),
batchSize: config.base.COMMON_EVENTS_PROCESSING_BATCH_SIZE,
Expand Down
Loading
Loading